diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 281e4d88..1d9ae6e1 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -54,6 +54,7 @@ mod ne; mod next; mod nth; mod partial_cmp; +mod partition; mod position; mod scan; mod skip; @@ -91,6 +92,7 @@ use ne::NeFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; +use partition::PartitionFuture; use position::PositionFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEachFuture; @@ -1308,6 +1310,42 @@ extension_trait! { FoldFuture::new(self, init, f) } + #[doc = r#" + A combinator that applies a function to every element in a stream + creating two collections from it. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let (even, odd): (Vec, Vec) = stream::from_iter(vec![1, 2, 3]) + .partition(|&n| n % 2 == 0).await; + + assert_eq!(even, vec![2]); + assert_eq!(odd, vec![1, 3]); + + # + # }) } + ``` + "#] + fn partition( + self, + f: F, + ) -> impl Future [PartitionFuture] + where + Self: Sized, + F: FnMut(&Self::Item) -> bool, + B: Default, + { + PartitionFuture::new(self, f) + } + #[doc = r#" Call a closure on each element of the stream. diff --git a/src/stream/stream/partition.rs b/src/stream/stream/partition.rs new file mode 100644 index 00000000..46e957cb --- /dev/null +++ b/src/stream/stream/partition.rs @@ -0,0 +1,57 @@ +use std::future::Future; +use std::pin::Pin; +use std::default::Default; +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[derive(Debug)] + pub struct PartitionFuture { + #[pin] + stream: S, + f: F, + res: Option<(B, B)>, + } +} + +impl PartitionFuture { + pub(super) fn new(stream: S, f: F) -> Self { + Self { + stream, + f, + res: Some((B::default(), B::default())), + } + } +} + +impl Future for PartitionFuture +where + S: Stream + Sized, + F: FnMut(&S::Item) -> bool, + B: Default + Extend, +{ + type Output = (B, B); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some(v) => { + let mut res = this.res.take().unwrap(); + match (this.f)(&v) { + true => res.0.extend(Some(v)), + false => res.1.extend(Some(v)), + }; + + *this.res = Some(res); + } + None => return Poll::Ready(this.res.take().unwrap()), + } + } + } +}