diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs new file mode 100644 index 0000000..4bc2e36 --- /dev/null +++ b/src/stream/stream/map.rs @@ -0,0 +1,41 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Map { + stream: S, + f: F, + __from: PhantomData, + __to: PhantomData, +} + +impl Map { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(f: F); + + pub(crate) fn new(stream: S, f: F) -> Self { + Map { + stream, + f, + __from: PhantomData, + __to: PhantomData, + } + } +} + +impl Stream for Map +where + S: Stream, + F: FnMut(S::Item) -> B, +{ + type Item = B; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + Poll::Ready(next.map(self.as_mut().f())) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index aaf5d35..8849605 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -33,6 +33,7 @@ mod fold; mod for_each; mod fuse; mod inspect; +mod map; mod min_by; mod next; mod nth; @@ -61,6 +62,7 @@ pub use chain::Chain; pub use filter::Filter; pub use fuse::Fuse; pub use inspect::Inspect; +pub use map::Map; pub use scan::Scan; pub use skip::Skip; pub use skip_while::SkipWhile; @@ -338,6 +340,37 @@ extension_trait! { Enumerate::new(self) } + #[doc = r#" + Takes a closure and creates a stream that calls that closure on every element of this stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect(); + let mut s = s.map(|x| 2 * x); + + assert_eq!(s.next().await, Some(2)); + assert_eq!(s.next().await, Some(4)); + assert_eq!(s.next().await, Some(6)); + assert_eq!(s.next().await, None); + + # + # }) } + ``` + "#] + fn map(self, f: F) -> Map + where + Self: Sized, + F: FnMut(Self::Item) -> B, + { + Map::new(self, f) + } + #[doc = r#" A combinator that does something with each element in the stream, passing the value on.