diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 7265d17..e922e94 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,8 +1,44 @@ use std::pin::Pin; +use crate::prelude::*; +use crate::stream::stream::map::Map; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; +#[allow(missing_debug_implementations)] +pub struct FlatMap { + inner: FlattenCompat, U>, +} + +impl FlatMap +where + S: Stream, + U: IntoStream, + F: FnMut(S::Item) -> U, +{ + pin_utils::unsafe_pinned!(inner: FlattenCompat, U>); + + pub fn new(stream: S, f: F) -> FlatMap { + FlatMap { + inner: FlattenCompat::new(stream.map(f)), + } + } +} + +impl Stream for FlatMap +where + S: Stream> + std::marker::Unpin, + S::Item: std::marker::Unpin, + U: Stream + std::marker::Unpin, + F: FnMut(S::Item) -> U + std::marker::Unpin, +{ + type Item = U::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.as_mut().inner().poll_next(cx) + } +} + /// Real logic of both `Flatten` and `FlatMap` which simply delegate to /// this type. #[derive(Clone, Debug)] @@ -10,6 +46,7 @@ struct FlattenCompat { stream: S, frontiter: Option, } + impl FlattenCompat { pin_utils::unsafe_unpinned!(stream: S); pin_utils::unsafe_unpinned!(frontiter: Option);