diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index 62710240..3b65fc76 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs @@ -1,43 +1,36 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - use std::marker::PhantomData; use std::pin::Pin; -#[derive(Debug)] -pub struct AllFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct AllFuture<'a, S, F, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, pub(crate) result: bool, - pub(crate) __item: PhantomData, + pub(crate) _marker: PhantomData, } -impl<'a, S, F, T> AllFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(result: bool); - pin_utils::unsafe_unpinned!(f: F); -} +impl Unpin for AllFuture<'_, S, F, T> {} impl Future for AllFuture<'_, S, F, S::Item> where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); + match next { Some(v) => { - let result = (self.as_mut().f())(v); - *self.as_mut().result() = result; + let result = (&mut self.f)(v); + self.result = result; + if result { // don't forget to wake this task again to pull the next item from stream cx.waker().wake_by_ref(); diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index f1f551a1..a23adf4b 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs @@ -1,43 +1,36 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - use std::marker::PhantomData; use std::pin::Pin; -#[derive(Debug)] -pub struct AnyFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct AnyFuture<'a, S, F, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, pub(crate) result: bool, - pub(crate) __item: PhantomData, + pub(crate) _marker: PhantomData, } -impl<'a, S, F, T> AnyFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(result: bool); - pin_utils::unsafe_unpinned!(f: F); -} +impl Unpin for AnyFuture<'_, S, F, T> {} impl Future for AnyFuture<'_, S, F, S::Item> where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); + match next { Some(v) => { - let result = (self.as_mut().f())(v); - *self.as_mut().result() = result; + let result = (&mut self.f)(v); + self.result = result; + if result { Poll::Ready(true) } else { diff --git a/src/stream/stream/min_by.rs b/src/stream/stream/min_by.rs index f63d52b8..a68cf313 100644 --- a/src/stream/stream/min_by.rs +++ b/src/stream/stream/min_by.rs @@ -2,8 +2,10 @@ use std::cmp::Ordering; use std::pin::Pin; use crate::future::Future; +use crate::stream::Stream; use crate::task::{Context, Poll}; +#[doc(hidden)] #[allow(missing_debug_implementations)] pub struct MinByFuture { stream: S, @@ -27,7 +29,7 @@ impl MinByFuture { impl Future for MinByFuture where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, S::Item: Copy, F: FnMut(&S::Item, &S::Item) -> Ordering, { diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 462aeaed..fb8b38d6 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -44,9 +44,12 @@ use nth::NthFuture; use std::cmp::Ordering; use std::marker::PhantomData; +use std::pin::Pin; use cfg_if::cfg_if; +use crate::task::{Context, Poll}; + cfg_if! { if #[cfg(feature = "docs")] { #[doc(hidden)] @@ -83,6 +86,55 @@ pub trait Stream { /// The type of items yielded by this stream. type Item; + /// Attempts to receive the next item from the stream. + /// + /// There are several possible return values: + /// + /// * `Poll::Pending` means this stream's next value is not ready yet. + /// * `Poll::Ready(None)` means this stream has been exhausted. + /// * `Poll::Ready(Some(item))` means `item` was received out of the stream. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::pin::Pin; + /// + /// use async_std::prelude::*; + /// use async_std::stream; + /// use async_std::task::{Context, Poll}; + /// + /// fn increment(s: impl Stream + Unpin) -> impl Stream + Unpin { + /// struct Increment(S); + /// + /// impl + Unpin> Stream for Increment { + /// type Item = S::Item; + /// + /// fn poll_next( + /// mut self: Pin<&mut Self>, + /// cx: &mut Context<'_>, + /// ) -> Poll> { + /// match Pin::new(&mut self.0).poll_next(cx) { + /// Poll::Pending => Poll::Pending, + /// Poll::Ready(None) => Poll::Ready(None), + /// Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + /// } + /// } + /// } + /// + /// Increment(s) + /// } + /// + /// let mut s = increment(stream::once(7)); + /// + /// assert_eq!(s.next().await, Some(8)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + /// Advances the stream and returns the next value. /// /// Returns [`None`] when iteration is finished. Individual stream implementations may @@ -108,7 +160,10 @@ pub trait Stream { /// ``` fn next(&mut self) -> ret!('_, NextFuture, Option) where - Self: Unpin; + Self: Unpin, + { + NextFuture { stream: self } + } /// Creates a stream that yields its first `n` elements. /// @@ -312,13 +367,13 @@ pub trait Stream { #[inline] fn all(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item) where - Self: Sized, + Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { AllFuture { stream: self, result: true, // the default if the empty stream - __item: PhantomData, + _marker: PhantomData, f, } } @@ -436,13 +491,13 @@ pub trait Stream { #[inline] fn any(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item) where - Self: Sized, + Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { AnyFuture { stream: self, result: false, // the default if the empty stream - __item: PhantomData, + _marker: PhantomData, f, } } @@ -451,10 +506,7 @@ pub trait Stream { impl Stream for T { type Item = ::Item; - fn next(&mut self) -> ret!('_, NextFuture, Option) - where - Self: Unpin, - { - NextFuture { stream: self } + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + futures_core::stream::Stream::poll_next(self, cx) } } diff --git a/src/stream/stream/next.rs b/src/stream/stream/next.rs index b64750d0..de75f5e9 100644 --- a/src/stream/stream/next.rs +++ b/src/stream/stream/next.rs @@ -1,6 +1,8 @@ +use std::pin::Pin; + use crate::future::Future; +use crate::stream::Stream; use crate::task::{Context, Poll}; -use std::pin::Pin; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -8,7 +10,7 @@ pub struct NextFuture<'a, T: Unpin + ?Sized> { pub(crate) stream: &'a mut T, } -impl Future for NextFuture<'_, T> { +impl Future for NextFuture<'_, T> { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 0499a6ac..0dea1d0c 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -1,7 +1,8 @@ -use crate::task::{Context, Poll}; - use std::pin::Pin; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + /// A stream that yields the first `n` items of another stream. #[derive(Clone, Debug)] pub struct Take { @@ -11,12 +12,12 @@ pub struct Take { impl Unpin for Take {} -impl Take { +impl Take { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(remaining: usize); } -impl futures_core::stream::Stream for Take { +impl futures_core::stream::Stream for Take { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {