From 2dee2897509f0ac5c1a8e26d768bfdf3cbe54099 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 18 Oct 2019 10:43:21 +0900 Subject: [PATCH] Add FlatMap struct --- src/stream/stream/flatten.rs | 37 ++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 7265d17..e922e94 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,8 +1,44 @@ use std::pin::Pin; +use crate::prelude::*; +use crate::stream::stream::map::Map; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; +#[allow(missing_debug_implementations)] +pub struct FlatMap { + inner: FlattenCompat, U>, +} + +impl FlatMap +where + S: Stream, + U: IntoStream, + F: FnMut(S::Item) -> U, +{ + pin_utils::unsafe_pinned!(inner: FlattenCompat, U>); + + pub fn new(stream: S, f: F) -> FlatMap { + FlatMap { + inner: FlattenCompat::new(stream.map(f)), + } + } +} + +impl Stream for FlatMap +where + S: Stream> + std::marker::Unpin, + S::Item: std::marker::Unpin, + U: Stream + std::marker::Unpin, + F: FnMut(S::Item) -> U + std::marker::Unpin, +{ + type Item = U::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.as_mut().inner().poll_next(cx) + } +} + /// Real logic of both `Flatten` and `FlatMap` which simply delegate to /// this type. #[derive(Clone, Debug)] @@ -10,6 +46,7 @@ struct FlattenCompat { stream: S, frontiter: Option, } + impl FlattenCompat { pin_utils::unsafe_unpinned!(stream: S); pin_utils::unsafe_unpinned!(frontiter: Option);