Merge pull request #543 from k-nasa/stream_unzip

Add stream unzip
new-scheduler
Yoshua Wuyts 5 years ago committed by GitHub
commit 850b8ae9d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -123,6 +123,7 @@ cfg_unstable! {
use count::CountFuture; use count::CountFuture;
use partition::PartitionFuture; use partition::PartitionFuture;
use unzip::UnzipFuture;
pub use merge::Merge; pub use merge::Merge;
pub use flatten::Flatten; pub use flatten::Flatten;
@ -137,6 +138,7 @@ cfg_unstable! {
mod partition; mod partition;
mod timeout; mod timeout;
mod throttle; mod throttle;
mod unzip;
} }
extension_trait! { extension_trait! {
@ -1748,6 +1750,44 @@ extension_trait! {
Zip::new(self, other) Zip::new(self, other)
} }
#[doc = r#"
Converts an stream of pairs into a pair of containers.
`unzip()` consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements.
This function is, in some sense, the opposite of [`zip`].
[`zip`]: trait.Stream.html#method.zip
# Example
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let s = stream::from_iter(vec![(1,2), (3,4)]);
let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
assert_eq!(left, [1, 3]);
assert_eq!(right, [2, 4]);
#
# }) }
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn unzip<A, B, FromA, FromB>(self) -> impl Future<Output = (FromA, FromB)> [UnzipFuture<Self, FromA, FromB>]
where
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
Self: Stream<Item = (A, B)> + Sized,
{
UnzipFuture::new(self)
}
#[doc = r#" #[doc = r#"
Transforms a stream into a collection. Transforms a stream into a collection.

@ -0,0 +1,57 @@
use std::future::Future;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[derive(Clone, Debug)]
#[cfg(all(feature = "default", feature = "unstable"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub struct UnzipFuture<S, FromA, FromB> {
#[pin]
stream: S,
res: Option<(FromA, FromB)>,
}
}
impl<S: Stream, FromA, FromB> UnzipFuture<S, FromA, FromB>
where
FromA: Default,
FromB: Default,
{
pub(super) fn new(stream: S) -> Self {
UnzipFuture {
stream,
res: Some((FromA::default(), FromB::default())),
}
}
}
impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
where
S: Stream<Item = (A, B)>,
FromA: Default + Extend<A>,
FromB: Default + Extend<B>,
{
type Output = (FromA, FromB);
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));
match next {
Some((a, b)) => {
let res = this.res.as_mut().unwrap();
res.0.extend(Some(a));
res.1.extend(Some(b));
}
None => return Poll::Ready(this.res.take().unwrap()),
}
}
}
}
Loading…
Cancel
Save