From 603b3c508559129bf2f866291511e35db0a93c4d Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 01:16:35 +0900 Subject: [PATCH] add: Add stream unzip --- src/stream/stream/mod.rs | 2 +- src/stream/stream/unzip.rs | 30 ++++++++++++++++++------------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 900bde3a..04aa4d68 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -124,13 +124,13 @@ cfg_unstable! { use count::CountFuture; use partition::PartitionFuture; + use unzip::UnzipFuture; pub use merge::Merge; pub use flatten::Flatten; pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; pub use throttle::Throttle; - pub use unzip::UnzipFuture; mod count; mod merge; diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs index ef1ff3a0..4f5dfa19 100644 --- a/src/stream/stream/unzip.rs +++ b/src/stream/stream/unzip.rs @@ -7,12 +7,13 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { + #[derive(Clone, Debug)] #[cfg(all(feature = "default", feature = "unstable"))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - pub struct UnzipFuture { + pub struct UnzipFuture { #[pin] stream: S, - res: (FromA, FromB), + res: Option<(FromA, FromB)>, } } @@ -24,7 +25,7 @@ where pub(super) fn new(stream: S) -> Self { UnzipFuture { stream, - res: (FromA::default(), FromB::default()), + res: Some((FromA::default(), FromB::default())), } } } @@ -32,22 +33,27 @@ where impl Future for UnzipFuture where S: Stream, - FromA: Default + Extend + Copy, - FromB: Default + Extend + Copy, + FromA: Default + Extend, + FromB: Default + Extend, { type Output = (FromA, FromB); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); - let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); - match next { - Some((a, b)) => { - this.res.0.extend(Some(a)); - this.res.1.extend(Some(b)); - Poll::Pending + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some((a, b)) => { + let mut res = this.res.take().unwrap(); + res.0.extend(Some(a)); + res.1.extend(Some(b)); + + *this.res = Some(res); + } + None => return Poll::Ready(this.res.take().unwrap()), } - None => Poll::Ready(*this.res), } } }