From 11268a80fbc1fe833bee5d022eb99379a6aa937c Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Fri, 15 Nov 2019 12:28:03 +0800 Subject: [PATCH 1/2] add stream-partition --- src/stream/stream/mod.rs | 38 +++++++++++++++++++++++ src/stream/stream/partition.rs | 57 ++++++++++++++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 src/stream/stream/partition.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 281e4d8..1d9ae6e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -54,6 +54,7 @@ mod ne; mod next; mod nth; mod partial_cmp; +mod partition; mod position; mod scan; mod skip; @@ -91,6 +92,7 @@ use ne::NeFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; +use partition::PartitionFuture; use position::PositionFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEachFuture; @@ -1308,6 +1310,42 @@ extension_trait! { FoldFuture::new(self, init, f) } + #[doc = r#" + A combinator that applies a function to every element in a stream + creating two collections from it. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let (even, odd): (Vec, Vec) = stream::from_iter(vec![1, 2, 3]) + .partition(|&n| n % 2 == 0).await; + + assert_eq!(even, vec![2]); + assert_eq!(odd, vec![1, 3]); + + # + # }) } + ``` + "#] + fn partition( + self, + f: F, + ) -> impl Future [PartitionFuture] + where + Self: Sized, + F: FnMut(&Self::Item) -> bool, + B: Default, + { + PartitionFuture::new(self, f) + } + #[doc = r#" Call a closure on each element of the stream. diff --git a/src/stream/stream/partition.rs b/src/stream/stream/partition.rs new file mode 100644 index 0000000..46e957c --- /dev/null +++ b/src/stream/stream/partition.rs @@ -0,0 +1,57 @@ +use std::future::Future; +use std::pin::Pin; +use std::default::Default; +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[derive(Debug)] + pub struct PartitionFuture { + #[pin] + stream: S, + f: F, + res: Option<(B, B)>, + } +} + +impl PartitionFuture { + pub(super) fn new(stream: S, f: F) -> Self { + Self { + stream, + f, + res: Some((B::default(), B::default())), + } + } +} + +impl Future for PartitionFuture +where + S: Stream + Sized, + F: FnMut(&S::Item) -> bool, + B: Default + Extend, +{ + type Output = (B, B); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some(v) => { + let mut res = this.res.take().unwrap(); + match (this.f)(&v) { + true => res.0.extend(Some(v)), + false => res.1.extend(Some(v)), + }; + + *this.res = Some(res); + } + None => return Poll::Ready(this.res.take().unwrap()), + } + } + } +} From d76b32e6d45e2bd6b5693e0705490332af616fa2 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Fri, 15 Nov 2019 14:23:34 +0800 Subject: [PATCH 2/2] make it unstable and fix trait bound --- src/stream/stream/mod.rs | 10 +++++++--- src/stream/stream/partition.rs | 3 +++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 1d9ae6e..672a085 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -54,7 +54,6 @@ mod ne; mod next; mod nth; mod partial_cmp; -mod partition; mod position; mod scan; mod skip; @@ -92,7 +91,6 @@ use ne::NeFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; -use partition::PartitionFuture; use position::PositionFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEachFuture; @@ -122,8 +120,11 @@ cfg_unstable! { use crate::stream::into_stream::IntoStream; use crate::stream::{FromStream, Product, Sum}; + use crate::stream::Extend; use count::CountFuture; + use partition::PartitionFuture; + pub use merge::Merge; pub use flatten::Flatten; pub use flat_map::FlatMap; @@ -134,6 +135,7 @@ cfg_unstable! { mod merge; mod flatten; mod flat_map; + mod partition; mod timeout; mod throttle; } @@ -1334,6 +1336,8 @@ extension_trait! { # }) } ``` "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn partition( self, f: F, @@ -1341,7 +1345,7 @@ extension_trait! { where Self: Sized, F: FnMut(&Self::Item) -> bool, - B: Default, + B: Default + Extend, { PartitionFuture::new(self, f) } diff --git a/src/stream/stream/partition.rs b/src/stream/stream/partition.rs index 46e957c..ba4938c 100644 --- a/src/stream/stream/partition.rs +++ b/src/stream/stream/partition.rs @@ -8,6 +8,9 @@ use crate::task::{Context, Poll}; pin_project! { #[derive(Debug)] + #[allow(missing_debug_implementations)] + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub struct PartitionFuture { #[pin] stream: S,