diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index de5eb38..8938583 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -120,8 +120,11 @@ cfg_unstable! { use crate::stream::into_stream::IntoStream; use crate::stream::{FromStream, Product, Sum}; + use crate::stream::Extend; use count::CountFuture; + use partition::PartitionFuture; + pub use merge::Merge; pub use flatten::Flatten; pub use flat_map::FlatMap; @@ -132,6 +135,7 @@ cfg_unstable! { mod merge; mod flatten; mod flat_map; + mod partition; mod timeout; mod throttle; } @@ -1308,6 +1312,44 @@ extension_trait! { FoldFuture::new(self, init, f) } + #[doc = r#" + A combinator that applies a function to every element in a stream + creating two collections from it. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let (even, odd): (Vec, Vec) = stream::from_iter(vec![1, 2, 3]) + .partition(|&n| n % 2 == 0).await; + + assert_eq!(even, vec![2]); + assert_eq!(odd, vec![1, 3]); + + # + # }) } + ``` + "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn partition( + self, + f: F, + ) -> impl Future [PartitionFuture] + where + Self: Sized, + F: FnMut(&Self::Item) -> bool, + B: Default + Extend, + { + PartitionFuture::new(self, f) + } + #[doc = r#" Call a closure on each element of the stream. diff --git a/src/stream/stream/partition.rs b/src/stream/stream/partition.rs new file mode 100644 index 0000000..ba4938c --- /dev/null +++ b/src/stream/stream/partition.rs @@ -0,0 +1,60 @@ +use std::future::Future; +use std::pin::Pin; +use std::default::Default; +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[derive(Debug)] + #[allow(missing_debug_implementations)] + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub struct PartitionFuture { + #[pin] + stream: S, + f: F, + res: Option<(B, B)>, + } +} + +impl PartitionFuture { + pub(super) fn new(stream: S, f: F) -> Self { + Self { + stream, + f, + res: Some((B::default(), B::default())), + } + } +} + +impl Future for PartitionFuture +where + S: Stream + Sized, + F: FnMut(&S::Item) -> bool, + B: Default + Extend, +{ + type Output = (B, B); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some(v) => { + let mut res = this.res.take().unwrap(); + match (this.f)(&v) { + true => res.0.extend(Some(v)), + false => res.1.extend(Some(v)), + }; + + *this.res = Some(res); + } + None => return Poll::Ready(this.res.take().unwrap()), + } + } + } +}