From 31cf932d808bdfb3cbdf024968b333f23d510547 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 00:24:59 +0900 Subject: [PATCH] wip: Add stream unzip --- src/stream/stream/mod.rs | 13 ++++++++++ src/stream/stream/unzip.rs | 53 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 src/stream/stream/unzip.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8938583..900bde3 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -130,6 +130,7 @@ cfg_unstable! { pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; pub use throttle::Throttle; + pub use unzip::UnzipFuture; mod count; mod merge; @@ -138,6 +139,7 @@ cfg_unstable! { mod partition; mod timeout; mod throttle; + mod unzip; } extension_trait! { @@ -1717,6 +1719,17 @@ extension_trait! { Zip::new(self, other) } + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn unzip(self) -> impl Future [UnzipFuture] + where + FromA: Default + Extend, + FromB: Default + Extend, + Self: Stream + Sized, + { + UnzipFuture::new(self) + } + #[doc = r#" Transforms a stream into a collection. diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs new file mode 100644 index 0000000..ef1ff3a --- /dev/null +++ b/src/stream/stream/unzip.rs @@ -0,0 +1,53 @@ +use std::future::Future; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub struct UnzipFuture { + #[pin] + stream: S, + res: (FromA, FromB), + } +} + +impl UnzipFuture +where + FromA: Default, + FromB: Default, +{ + pub(super) fn new(stream: S) -> Self { + UnzipFuture { + stream, + res: (FromA::default(), FromB::default()), + } + } +} + +impl Future for UnzipFuture +where + S: Stream, + FromA: Default + Extend + Copy, + FromB: Default + Extend + Copy, +{ + type Output = (FromA, FromB); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some((a, b)) => { + this.res.0.extend(Some(a)); + this.res.1.extend(Some(b)); + Poll::Pending + } + None => Poll::Ready(*this.res), + } + } +}