diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 7cb9f36..5982e92 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -136,34 +136,31 @@ pub trait Stream { /// use async_std::prelude::*; /// use async_std::stream; /// - /// let mut s = stream::repeat(9).take(3); + /// let mut s = stream::repeat::(42).take(3); + /// assert!(s.all(|x| x == 42).await); /// - /// while let Some(v) = s.next().await { - /// assert_eq!(v, 9); - /// } /// # /// # }) } /// ``` - /// ``` - /// let a = [1, 2, 3]; /// - /// assert!(a.iter().all(|&x| x > 0)); + /// Empty stream: /// - /// assert!(!a.iter().all(|&x| x > 2)); /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; /// - /// Stopping at the first `false`: + /// let mut s = stream::empty::(); + /// assert!(s.all(|_| false).await); /// + /// # + /// # }) } /// ``` - /// let a = [1, 2, 3]; - /// - /// let mut iter = a.iter(); - /// - /// assert!(!iter.all(|&x| x != 2)); + /// Stopping at the first `false`: /// - /// // we can still use `iter`, as there are more elements. - /// assert_eq!(iter.next(), Some(&3)); - /// ``` + /// TODO: add example here + /// TODO: add more examples #[inline] fn all(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item> where @@ -235,9 +232,9 @@ impl futures::Stream for Take { } } +#[derive(Debug)] pub struct AllFuture<'a, S, F, T> where - S: ?Sized, F: FnMut(T) -> bool, { stream: &'a mut S, @@ -248,7 +245,6 @@ where impl<'a, S, F, T> AllFuture<'a, S, F, T> where - S: ?Sized, F: FnMut(T) -> bool, { pin_utils::unsafe_pinned!(stream: &'a mut S); @@ -258,8 +254,9 @@ where impl Future for AllFuture<'_, S, F, S::Item> where - S: futures::Stream + Unpin + ?Sized, + S: futures::Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, + S::Item: std::fmt::Debug, { type Output = bool; @@ -268,9 +265,15 @@ where let next = futures::ready!(self.as_mut().stream().poll_next(cx)); match next { Some(v) => { - // me: *screams* - *self.as_mut().result() = (self.as_mut().f())(v); - Poll::Pending + let result = (self.as_mut().f())(v); + *self.as_mut().result() = result; + if result { + // don't forget to wake this task again to pull the next item from stream + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(false) + } } None => Poll::Ready(self.result), }