diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs new file mode 100644 index 0000000..ed3268e --- /dev/null +++ b/src/stream/stream/flat_map.rs @@ -0,0 +1,62 @@ +use pin_project_lite::pin_project; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::stream::map::Map; +use crate::stream::{IntoStream, Stream}; +use crate::task::{Context, Poll}; + +pin_project! { + /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`flat_map`]: trait.Stream.html#method.flat_map + /// [`Stream`]: trait.Stream.html + #[allow(missing_debug_implementations)] + pub struct FlatMap { + #[pin] + stream: Map, + #[pin] + inner_stream: Option, + } +} + +impl FlatMap +where + S: Stream, + U: IntoStream, + F: FnMut(S::Item) -> U, +{ + pub(super) fn new(stream: S, f: F) -> FlatMap { + FlatMap { + stream: stream.map(f), + inner_stream: None, + } + } +} + +impl Stream for FlatMap +where + S: Stream, + S::Item: IntoStream, + U: Stream, + F: FnMut(S::Item) -> U, +{ + type Item = U::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { + if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) { + return Poll::Ready(item); + } + } + + match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { + None => return Poll::Ready(None), + Some(inner) => this.inner_stream.set(Some(inner.into_stream())), + } + } + } +} diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs new file mode 100644 index 0000000..5e791cd --- /dev/null +++ b/src/stream/stream/flatten.rs @@ -0,0 +1,58 @@ +use pin_project_lite::pin_project; +use std::pin::Pin; + +use crate::stream::{IntoStream, Stream}; +use crate::task::{Context, Poll}; + +pin_project! { + /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`flatten`]: trait.Stream.html#method.flatten + /// [`Stream`]: trait.Stream.html + #[allow(missing_debug_implementations)] + pub struct Flatten { + #[pin] + stream: S, + #[pin] + inner_stream: Option, + } +} + +impl Flatten +where + S: Stream, + S::Item: IntoStream, +{ + pub(super) fn new(stream: S) -> Flatten { + Flatten { + stream, + inner_stream: None, + } + } +} + +impl Stream for Flatten::IntoStream> +where + S: Stream, + S::Item: IntoStream, + U: Stream, +{ + type Item = U::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { + if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) { + return Poll::Ready(item); + } + } + + match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { + None => return Poll::Ready(None), + Some(inner) => this.inner_stream.set(Some(inner.into_stream())), + } + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2b237de..e1a51bf 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -96,13 +96,17 @@ cfg_unstable! { use std::time::Duration; use crate::future::Future; - use crate::stream::FromStream; - use crate::stream::{Product, Sum}; + use crate::stream::into_stream::IntoStream; + use crate::stream::{FromStream, Product, Sum}; pub use merge::Merge; + pub use flatten::Flatten; + pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; mod merge; + mod flatten; + mod flat_map; mod timeout; } @@ -563,6 +567,76 @@ extension_trait! { Filter::new(self, predicate) } + #[doc= r#" + Creates an stream that works like map, but flattens nested structure. + + # Examples + + Basic usage: + + ``` + # async_std::task::block_on(async { + + use std::collections::VecDeque; + use async_std::prelude::*; + use async_std::stream::IntoStream; + + let inner1: VecDeque = vec![1,2,3].into_iter().collect(); + let inner2: VecDeque = vec![4,5,6].into_iter().collect(); + + let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); + + let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await; + + assert_eq!(v, vec![1,2,3,4,5,6]); + + # }); + ``` + "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn flat_map(self, f: F) -> FlatMap + where + Self: Sized, + U: IntoStream, + F: FnMut(Self::Item) -> U, + { + FlatMap::new(self, f) + } + + #[doc = r#" + Creates an stream that flattens nested structure. + + # Examples + + Basic usage: + + ``` + # async_std::task::block_on(async { + + use std::collections::VecDeque; + use async_std::prelude::*; + + let inner1: VecDeque = vec![1,2,3].into_iter().collect(); + let inner2: VecDeque = vec![4,5,6].into_iter().collect(); + let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); + + let v: Vec<_> = s.flatten().collect().await; + + assert_eq!(v, vec![1,2,3,4,5,6]); + + # }); + "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn flatten(self) -> Flatten + where + Self: Sized, + Self::Item: IntoStream, + { + Flatten::new(self) + } + #[doc = r#" Both filters and maps a stream.