diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs new file mode 100644 index 00000000..ed3268ea --- /dev/null +++ b/src/stream/stream/flat_map.rs @@ -0,0 +1,62 @@ +use pin_project_lite::pin_project; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::stream::map::Map; +use crate::stream::{IntoStream, Stream}; +use crate::task::{Context, Poll}; + +pin_project! { + /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`flat_map`]: trait.Stream.html#method.flat_map + /// [`Stream`]: trait.Stream.html + #[allow(missing_debug_implementations)] + pub struct FlatMap { + #[pin] + stream: Map, + #[pin] + inner_stream: Option, + } +} + +impl FlatMap +where + S: Stream, + U: IntoStream, + F: FnMut(S::Item) -> U, +{ + pub(super) fn new(stream: S, f: F) -> FlatMap { + FlatMap { + stream: stream.map(f), + inner_stream: None, + } + } +} + +impl Stream for FlatMap +where + S: Stream, + S::Item: IntoStream, + U: Stream, + F: FnMut(S::Item) -> U, +{ + type Item = U::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { + if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) { + return Poll::Ready(item); + } + } + + match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { + None => return Poll::Ready(None), + Some(inner) => this.inner_stream.set(Some(inner.into_stream())), + } + } + } +} diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 2533f379..2ea0673e 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -6,46 +6,6 @@ use crate::stream::stream::map::Map; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; -pin_project! { - /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its - /// documentation for more. - /// - /// [`flat_map`]: trait.Stream.html#method.flat_map - /// [`Stream`]: trait.Stream.html - #[allow(missing_debug_implementations)] - pub struct FlatMap { - #[pin] - inner: FlattenCompat, U>, - } -} - -impl FlatMap -where - S: Stream, - U: IntoStream, - F: FnMut(S::Item) -> U, -{ - pub(super) fn new(stream: S, f: F) -> FlatMap { - FlatMap { - inner: FlattenCompat::new(stream.map(f)), - } - } -} - -impl Stream for FlatMap -where - S: Stream, - S::Item: IntoStream, - U: Stream, - F: FnMut(S::Item) -> U, -{ - type Item = U::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next(cx) - } -} - pin_project! { /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its /// documentation for more. @@ -55,7 +15,9 @@ pin_project! { #[allow(missing_debug_implementations)] pub struct Flatten { #[pin] - inner: FlattenCompat + stream: S, + #[pin] + inner_stream: Option, } } @@ -66,47 +28,13 @@ where { pub(super) fn new(stream: S) -> Flatten { Flatten { - inner: FlattenCompat::new(stream), + stream, + inner_stream: None, } } } impl Stream for Flatten::IntoStream> -where - S: Stream, - S::Item: IntoStream, - U: Stream, -{ - type Item = U::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next(cx) - } -} - -pin_project! { - /// Real logic of both `Flatten` and `FlatMap` which simply delegate to - /// this type. - #[derive(Clone, Debug)] - struct FlattenCompat { - #[pin] - stream: S, - #[pin] - frontiter: Option, - } -} - -impl FlattenCompat { - /// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`. - fn new(stream: S) -> FlattenCompat { - FlattenCompat { - stream, - frontiter: None, - } - } -} - -impl Stream for FlattenCompat where S: Stream, S::Item: IntoStream, @@ -117,7 +45,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { - if let Some(inner) = this.frontiter.as_mut().as_pin_mut() { + if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) { return Poll::Ready(item); } @@ -125,34 +53,8 @@ where match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { None => return Poll::Ready(None), - Some(inner) => this.frontiter.set(Some(inner.into_stream())), + Some(inner) => this.inner_stream.set(Some(inner.into_stream())), } } } } - -#[cfg(test)] -mod tests { - use super::FlattenCompat; - - use crate::prelude::*; - use crate::task; - - use std::collections::VecDeque; - - #[test] - fn test_poll_next() -> std::io::Result<()> { - let inner1: VecDeque = vec![1, 2, 3].into_iter().collect(); - let inner2: VecDeque = vec![4, 5, 6].into_iter().collect(); - - let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); - - task::block_on(async move { - let flat = FlattenCompat::new(s); - let v: Vec = flat.collect().await; - - assert_eq!(v, vec![1, 2, 3, 4, 5, 6]); - Ok(()) - }) - } -} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 596dc419..29e68fc4 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -99,10 +99,12 @@ cfg_unstable! { use crate::stream::into_stream::IntoStream; pub use merge::Merge; - pub use flatten::{FlatMap, Flatten}; + pub use flatten::Flatten; + pub use flat_map::FlatMap; mod merge; mod flatten; + mod flat_map; } extension_trait! {