diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 6623207b..7cb9f36d 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -27,6 +27,7 @@ use cfg_if::cfg_if; use crate::future::Future; use crate::task::{Context, Poll}; +use std::marker::PhantomData; cfg_if! { if #[cfg(feature = "docs")] { @@ -111,6 +112,71 @@ pub trait Stream { remaining: n, } } + + /// Tests if every element of the stream matches a predicate. + /// + /// `all()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if they all return + /// `true`, then so does `all()`. If any of them return `false`, it + /// returns `false`. + /// + /// `all()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `false`, given that no matter what else happens, + /// the result will also be `false`. + /// + /// An empty stream returns `true`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat(9).take(3); + /// + /// while let Some(v) = s.next().await { + /// assert_eq!(v, 9); + /// } + /// # + /// # }) } + /// ``` + /// ``` + /// let a = [1, 2, 3]; + /// + /// assert!(a.iter().all(|&x| x > 0)); + /// + /// assert!(!a.iter().all(|&x| x > 2)); + /// ``` + /// + /// Stopping at the first `false`: + /// + /// ``` + /// let a = [1, 2, 3]; + /// + /// let mut iter = a.iter(); + /// + /// assert!(!iter.all(|&x| x != 2)); + /// + /// // we can still use `iter`, as there are more elements. + /// assert_eq!(iter.next(), Some(&3)); + /// ``` + #[inline] + fn all(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item> + where + Self: Sized, + F: FnMut(Self::Item) -> bool, + { + AllFuture { + stream: self, + result: true, + __item: PhantomData, + f, + } + } } impl Stream for T { @@ -168,3 +234,45 @@ impl futures::Stream for Take { } } } + +pub struct AllFuture<'a, S, F, T> +where + S: ?Sized, + F: FnMut(T) -> bool, +{ + stream: &'a mut S, + f: F, + result: bool, + __item: PhantomData, +} + +impl<'a, S, F, T> AllFuture<'a, S, F, T> +where + S: ?Sized, + F: FnMut(T) -> bool, +{ + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(result: bool); + pin_utils::unsafe_unpinned!(f: F); +} + +impl Future for AllFuture<'_, S, F, S::Item> +where + S: futures::Stream + Unpin + ?Sized, + F: FnMut(S::Item) -> bool, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures::Stream; + let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + // me: *screams* + *self.as_mut().result() = (self.as_mut().f())(v); + Poll::Pending + } + None => Poll::Ready(self.result), + } + } +}