adds stream::inspect combinator

staging
Fedor Sakharov 5 years ago
parent 2acc07065c
commit bf7121d2d4
No known key found for this signature in database
GPG Key ID: 93D436E666BF0FEE

@ -0,0 +1,43 @@
use std::marker::PhantomData;
use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream that does something with each element of another stream.
#[derive(Debug)]
pub struct Inspect<S, F, T> {
stream: S,
f: F,
__t: PhantomData<T>,
}
impl<S, F, T> Inspect<S, F, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pub(super) fn new(stream: S, f: F) -> Self {
Inspect {
stream,
f,
__t: PhantomData,
}
}
}
impl<S, F> Stream for Inspect<S, F, S::Item>
where
S: Stream,
F: FnMut(&S::Item),
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
Poll::Ready(next.and_then(|x| {
(self.as_mut().f())(&x);
Some(x)
}))
}
}

@ -30,6 +30,7 @@ mod find;
mod find_map;
mod fold;
mod fuse;
mod inspect;
mod min_by;
mod next;
mod nth;
@ -41,6 +42,7 @@ mod zip;
pub use filter::Filter;
pub use fuse::Fuse;
pub use inspect::Inspect;
pub use scan::Scan;
pub use skip::Skip;
pub use skip_while::SkipWhile;
@ -260,6 +262,37 @@ pub trait Stream {
Enumerate::new(self)
}
/// A combinator that does something with each element in the stream, passing the value on.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use std::collections::VecDeque;
///
/// let a: VecDeque<_> = vec![1u8, 2, 3, 4, 5].into_iter().collect();
/// let sum = a
/// .inspect(|x| println!("about to filter {}", x))
/// .filter(|x| x % 2 == 0)
/// .inspect(|x| println!("made it through filter: {}", x))
/// .fold(0, |sum, i| sum + i).await;
///
/// assert_eq!(sum, 6);
/// #
/// # }) }
/// ```
fn inspect<F>(self, f: F) -> Inspect<Self, F, Self::Item>
where
Self: Sized,
F: FnMut(&Self::Item),
{
Inspect::new(self, f)
}
/// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll`
/// returns `Poll::Ready(None)`, all future calls to `poll` will also return
/// `Poll::Ready(None)`.

Loading…
Cancel
Save