From 658a16bebe858013da61dabcdd2c245bf3182609 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Mon, 30 Sep 2019 23:17:25 +0300 Subject: [PATCH] Adds stream map combinator --- src/stream/stream/map.rs | 41 ++++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 33 ++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 src/stream/stream/map.rs diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs new file mode 100644 index 0000000..4bc2e36 --- /dev/null +++ b/src/stream/stream/map.rs @@ -0,0 +1,41 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Map { + stream: S, + f: F, + __from: PhantomData, + __to: PhantomData, +} + +impl Map { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(f: F); + + pub(crate) fn new(stream: S, f: F) -> Self { + Map { + stream, + f, + __from: PhantomData, + __to: PhantomData, + } + } +} + +impl Stream for Map +where + S: Stream, + F: FnMut(S::Item) -> B, +{ + type Item = B; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + Poll::Ready(next.map(self.as_mut().f())) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 07de323..d84a32e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -32,6 +32,7 @@ mod find_map; mod fold; mod fuse; mod inspect; +mod map; mod min_by; mod next; mod nth; @@ -57,6 +58,7 @@ pub use chain::Chain; pub use filter::Filter; pub use fuse::Fuse; pub use inspect::Inspect; +pub use map::Map; pub use scan::Scan; pub use skip::Skip; pub use skip_while::SkipWhile; @@ -334,6 +336,37 @@ extension_trait! { Enumerate::new(self) } + #[doc = r#" + Takes a closure and creates a stream that calls that closure on every element of this stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect(); + let mut s = s.map(|x| 2 * x); + + assert_eq!(s.next().await, Some(2)); + assert_eq!(s.next().await, Some(4)); + assert_eq!(s.next().await, Some(6)); + assert_eq!(s.next().await, None); + + # + # }) } + ``` + "#] + fn map(self, f: F) -> Map + where + Self: Sized, + F: FnMut(Self::Item) -> B, + { + Map::new(self, f) + } + #[doc = r#" A combinator that does something with each element in the stream, passing the value on.