From 6be8467cdcf1706c1705df351c34bb67c2efb3f2 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Tue, 15 Oct 2019 09:50:03 +0200 Subject: [PATCH] impl Stream::take_while adapter (#332) * impl take_while stream adapter * fmt * add comment * unindent where clauses --- src/stream/mod.rs | 4 ++- src/stream/stream/mod.rs | 31 ++++++++++++++++++++++ src/stream/stream/take_while.rs | 47 +++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 src/stream/stream/take_while.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 9f5097d0..6372d818 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -26,7 +26,9 @@ use cfg_if::cfg_if; pub use empty::{empty, Empty}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, Zip}; +pub use stream::{ + Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip, +}; pub(crate) mod stream; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 27539af3..0e563f6d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -43,6 +43,7 @@ mod skip; mod skip_while; mod step_by; mod take; +mod take_while; mod try_for_each; mod zip; @@ -70,6 +71,7 @@ pub use skip::Skip; pub use skip_while::SkipWhile; pub use step_by::StepBy; pub use take::Take; +pub use take_while::TakeWhile; pub use zip::Zip; use std::cmp::Ordering; @@ -241,6 +243,35 @@ extension_trait! { } } + #[doc = r#" + Creates a stream that yields elements based on a predicate. + + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3, 4].into_iter().collect(); + let mut s = s.take_while(|x| x < &3 ); + + assert_eq!(s.next().await, Some(1)); + assert_eq!(s.next().await, Some(2)); + assert_eq!(s.next().await, None); + + # + # }) } + "#] + fn take_while

(self, predicate: P) -> TakeWhile + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + TakeWhile::new(self, predicate) + } + #[doc = r#" Creates a stream that yields each `step`th element. diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs new file mode 100644 index 00000000..6f3cc8f2 --- /dev/null +++ b/src/stream/stream/take_while.rs @@ -0,0 +1,47 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that yields elements based on a predicate. +#[derive(Debug)] +pub struct TakeWhile { + stream: S, + predicate: P, + __t: PhantomData, +} + +impl TakeWhile { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(predicate: P); + + pub(super) fn new(stream: S, predicate: P) -> Self { + TakeWhile { + stream, + predicate, + __t: PhantomData, + } + } +} + +impl Stream for TakeWhile +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) if (self.as_mut().predicate())(&v) => Poll::Ready(Some(v)), + Some(_) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + None => Poll::Ready(None), + } + } +}