From 3b801655326676fcc5fbf6b474205943f502bc2c Mon Sep 17 00:00:00 2001 From: Shady Khalifa Date: Fri, 30 Aug 2019 17:42:35 +0200 Subject: [PATCH 1/5] add stream::all method --- src/stream/stream.rs | 108 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 6623207..7cb9f36 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -27,6 +27,7 @@ use cfg_if::cfg_if; use crate::future::Future; use crate::task::{Context, Poll}; +use std::marker::PhantomData; cfg_if! { if #[cfg(feature = "docs")] { @@ -111,6 +112,71 @@ pub trait Stream { remaining: n, } } + + /// Tests if every element of the stream matches a predicate. + /// + /// `all()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if they all return + /// `true`, then so does `all()`. If any of them return `false`, it + /// returns `false`. + /// + /// `all()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `false`, given that no matter what else happens, + /// the result will also be `false`. + /// + /// An empty stream returns `true`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat(9).take(3); + /// + /// while let Some(v) = s.next().await { + /// assert_eq!(v, 9); + /// } + /// # + /// # }) } + /// ``` + /// ``` + /// let a = [1, 2, 3]; + /// + /// assert!(a.iter().all(|&x| x > 0)); + /// + /// assert!(!a.iter().all(|&x| x > 2)); + /// ``` + /// + /// Stopping at the first `false`: + /// + /// ``` + /// let a = [1, 2, 3]; + /// + /// let mut iter = a.iter(); + /// + /// assert!(!iter.all(|&x| x != 2)); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next(), Some(&3)); + /// ``` + #[inline] + fn all(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item> + where + Self: Sized, + F: FnMut(Self::Item) -> bool, + { + AllFuture { + stream: self, + result: true, + __item: PhantomData, + f, + } + } } impl Stream for T { @@ -168,3 +234,45 @@ impl futures::Stream for Take { } } } + +pub struct AllFuture<'a, S, F, T> +where + S: ?Sized, + F: FnMut(T) -> bool, +{ + stream: &'a mut S, + f: F, + result: bool, + __item: PhantomData, +} + +impl<'a, S, F, T> AllFuture<'a, S, F, T> +where + S: ?Sized, + F: FnMut(T) -> bool, +{ + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(result: bool); + pin_utils::unsafe_unpinned!(f: F); +} + +impl Future for AllFuture<'_, S, F, S::Item> +where + S: futures::Stream + Unpin + ?Sized, + F: FnMut(S::Item) -> bool, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures::Stream; + let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + // me: *screams* + *self.as_mut().result() = (self.as_mut().f())(v); + Poll::Pending + } + None => Poll::Ready(self.result), + } + } +} From fe45ba5628075a82741e761795e5358d7da005b9 Mon Sep 17 00:00:00 2001 From: Shady Khalifa Date: Fri, 30 Aug 2019 18:35:51 +0200 Subject: [PATCH 2/5] update docs and examples --- src/stream/stream.rs | 49 +++++++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 7cb9f36..5982e92 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -136,34 +136,31 @@ pub trait Stream { /// use async_std::prelude::*; /// use async_std::stream; /// - /// let mut s = stream::repeat(9).take(3); + /// let mut s = stream::repeat::(42).take(3); + /// assert!(s.all(|x| x == 42).await); /// - /// while let Some(v) = s.next().await { - /// assert_eq!(v, 9); - /// } /// # /// # }) } /// ``` - /// ``` - /// let a = [1, 2, 3]; /// - /// assert!(a.iter().all(|&x| x > 0)); + /// Empty stream: /// - /// assert!(!a.iter().all(|&x| x > 2)); /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; /// - /// Stopping at the first `false`: + /// let mut s = stream::empty::(); + /// assert!(s.all(|_| false).await); /// + /// # + /// # }) } /// ``` - /// let a = [1, 2, 3]; - /// - /// let mut iter = a.iter(); - /// - /// assert!(!iter.all(|&x| x != 2)); + /// Stopping at the first `false`: /// - /// // we can still use `iter`, as there are more elements. - /// assert_eq!(iter.next(), Some(&3)); - /// ``` + /// TODO: add example here + /// TODO: add more examples #[inline] fn all(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item> where @@ -235,9 +232,9 @@ impl futures::Stream for Take { } } +#[derive(Debug)] pub struct AllFuture<'a, S, F, T> where - S: ?Sized, F: FnMut(T) -> bool, { stream: &'a mut S, @@ -248,7 +245,6 @@ where impl<'a, S, F, T> AllFuture<'a, S, F, T> where - S: ?Sized, F: FnMut(T) -> bool, { pin_utils::unsafe_pinned!(stream: &'a mut S); @@ -258,8 +254,9 @@ where impl Future for AllFuture<'_, S, F, S::Item> where - S: futures::Stream + Unpin + ?Sized, + S: futures::Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, + S::Item: std::fmt::Debug, { type Output = bool; @@ -268,9 +265,15 @@ where let next = futures::ready!(self.as_mut().stream().poll_next(cx)); match next { Some(v) => { - // me: *screams* - *self.as_mut().result() = (self.as_mut().f())(v); - Poll::Pending + let result = (self.as_mut().f())(v); + *self.as_mut().result() = result; + if result { + // don't forget to wake this task again to pull the next item from stream + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(false) + } } None => Poll::Ready(self.result), } From 243a48c14ed10b3d1ee42b2843abd2ae9677bce9 Mon Sep 17 00:00:00 2001 From: Shady Khalifa Date: Fri, 30 Aug 2019 18:37:58 +0200 Subject: [PATCH 3/5] remove debug --- src/stream/stream.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 5982e92..8710cb2 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -256,7 +256,6 @@ impl Future for AllFuture<'_, S, F, S::Item> where S: futures::Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, - S::Item: std::fmt::Debug, { type Output = bool; From e8860454e7e9d242dee25a3c62cfff0504ae692d Mon Sep 17 00:00:00 2001 From: Shady Khalifa Date: Fri, 30 Aug 2019 20:30:48 +0200 Subject: [PATCH 4/5] remove extra newline Co-Authored-By: Yoshua Wuyts --- src/stream/stream.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 8710cb2..a4f98f1 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -153,7 +153,6 @@ pub trait Stream { /// /// let mut s = stream::empty::(); /// assert!(s.all(|_| false).await); - /// /// # /// # }) } /// ``` From e517c60fb1e1d7470a5d63490b73f3be0885997a Mon Sep 17 00:00:00 2001 From: Shady Khalifa Date: Fri, 30 Aug 2019 20:32:03 +0200 Subject: [PATCH 5/5] remove comments --- src/stream/stream.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/stream/stream.rs b/src/stream/stream.rs index a4f98f1..e2dc856 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -156,10 +156,6 @@ pub trait Stream { /// # /// # }) } /// ``` - /// Stopping at the first `false`: - /// - /// TODO: add example here - /// TODO: add more examples #[inline] fn all(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item> where