diff --git a/src/stream/stream/inspect.rs b/src/stream/stream/inspect.rs new file mode 100644 index 0000000..e63b584 --- /dev/null +++ b/src/stream/stream/inspect.rs @@ -0,0 +1,43 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that does something with each element of another stream. +#[derive(Debug)] +pub struct Inspect { + stream: S, + f: F, + __t: PhantomData, +} + +impl Inspect { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(f: F); + + pub(super) fn new(stream: S, f: F) -> Self { + Inspect { + stream, + f, + __t: PhantomData, + } + } +} + +impl Stream for Inspect +where + S: Stream, + F: FnMut(&S::Item), +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + Poll::Ready(next.and_then(|x| { + (self.as_mut().f())(&x); + Some(x) + })) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d79605e..e4c29bc 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -30,6 +30,7 @@ mod find; mod find_map; mod fold; mod fuse; +mod inspect; mod min_by; mod next; mod nth; @@ -41,6 +42,7 @@ mod zip; pub use filter::Filter; pub use fuse::Fuse; +pub use inspect::Inspect; pub use scan::Scan; pub use skip::Skip; pub use skip_while::SkipWhile; @@ -260,6 +262,37 @@ pub trait Stream { Enumerate::new(self) } + /// A combinator that does something with each element in the stream, passing the value on. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let a: VecDeque<_> = vec![1u8, 2, 3, 4, 5].into_iter().collect(); + /// let sum = a + /// .inspect(|x| println!("about to filter {}", x)) + /// .filter(|x| x % 2 == 0) + /// .inspect(|x| println!("made it through filter: {}", x)) + /// .fold(0, |sum, i| sum + i).await; + /// + /// assert_eq!(sum, 6); + /// # + /// # }) } + /// ``` + fn inspect(self, f: F) -> Inspect + where + Self: Sized, + F: FnMut(&Self::Item), + { + Inspect::new(self, f) + } + /// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll` /// returns `Poll::Ready(None)`, all future calls to `poll` will also return /// `Poll::Ready(None)`.