diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 310d910..aaf5d35 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -41,6 +41,7 @@ mod skip; mod skip_while; mod step_by; mod take; +mod try_for_each; mod zip; use all::AllFuture; @@ -54,6 +55,7 @@ use for_each::ForEachFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; +use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; @@ -958,6 +960,51 @@ extension_trait! { Skip::new(self, n) } + #[doc = r#" + Applies a falliable function to each element in a stream, stopping at first error and returning it. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use std::sync::mpsc::channel; + use async_std::prelude::*; + + let (tx, rx) = channel(); + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let s = s.try_for_each(|v| { + if v % 2 == 1 { + tx.clone().send(v).unwrap(); + Ok(()) + } else { + Err("even") + } + }); + + let res = s.await; + drop(tx); + let values: Vec<_> = rx.iter().collect(); + + assert_eq!(values, vec![1]); + assert_eq!(res, Err("even")); + # + # }) } + ``` + "#] + fn try_for_each( + self, + f: F, + ) -> impl Future [TryForEeachFuture] + where + Self: Sized, + F: FnMut(Self::Item) -> Result<(), E>, + { + TryForEeachFuture::new(self, f) + } + #[doc = r#" 'Zips up' two streams into a single stream of pairs. diff --git a/src/stream/stream/try_for_each.rs b/src/stream/stream/try_for_each.rs new file mode 100644 index 0000000..ae3d5ea --- /dev/null +++ b/src/stream/stream/try_for_each.rs @@ -0,0 +1,54 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct TryForEeachFuture { + stream: S, + f: F, + __from: PhantomData, + __to: PhantomData, +} + +impl TryForEeachFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(f: F); + + pub(crate) fn new(stream: S, f: F) -> Self { + TryForEeachFuture { + stream, + f, + __from: PhantomData, + __to: PhantomData, + } + } +} + +impl Future for TryForEeachFuture +where + S: Stream, + S::Item: std::fmt::Debug, + F: FnMut(S::Item) -> Result<(), E>, +{ + type Output = Result<(), E>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let item = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match item { + None => return Poll::Ready(Ok(())), + Some(v) => { + let res = (self.as_mut().f())(v); + if let Err(e) = res { + return Poll::Ready(Err(e)); + } + } + } + } + } +}