From 6da7efc5ace0607a8b5180822f9745321ba949b7 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Mon, 30 Sep 2019 23:45:00 +0300 Subject: [PATCH] Adds for_each stream combinator --- src/stream/stream/for_each.rs | 46 +++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 37 ++++++++++++++++++++++++++++ 2 files changed, 83 insertions(+) create mode 100644 src/stream/stream/for_each.rs diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs new file mode 100644 index 00000000..0406a507 --- /dev/null +++ b/src/stream/stream/for_each.rs @@ -0,0 +1,46 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ForEachFuture { + stream: S, + f: F, + __t: PhantomData, +} + +impl ForEachFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(f: F); + + pub(super) fn new(stream: S, f: F) -> Self { + ForEachFuture { + stream, + f, + __t: PhantomData, + } + } +} + +impl Future for ForEachFuture +where + S: Stream + Sized, + F: FnMut(S::Item), +{ + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => (self.as_mut().f())(v), + None => return Poll::Ready(()), + } + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 07de323a..310d9100 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -30,6 +30,7 @@ mod filter_map; mod find; mod find_map; mod fold; +mod for_each; mod fuse; mod inspect; mod min_by; @@ -49,6 +50,7 @@ use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture; use fold::FoldFuture; +use for_each::ForEachFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; @@ -750,6 +752,41 @@ extension_trait! { FoldFuture::new(self, init, f) } + #[doc = r#" + Call a closure on each element of the stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + use std::sync::mpsc::channel; + + let (tx, rx) = channel(); + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await; + + let v: Vec<_> = rx.iter().collect(); + + assert_eq!(v, vec![1, 2, 3]); + # + # }) } + ``` + "#] + fn for_each( + self, + f: F, + ) -> impl Future [ForEachFuture] + where + Self: Sized, + F: FnMut(Self::Item), + { + ForEachFuture::new(self, f) + } + #[doc = r#" Tests if any element of the stream matches a predicate.