diff --git a/src/lib.rs b/src/lib.rs index 8336517..dfa9a07 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ pub mod io; pub mod net; pub mod os; pub mod prelude; +mod result; pub mod stream; pub mod sync; pub mod task; diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs new file mode 100644 index 0000000..71cf61d --- /dev/null +++ b/src/result/from_stream.rs @@ -0,0 +1,47 @@ +use crate::stream::{FromStream, IntoStream, Stream}; + +use std::pin::Pin; + +impl FromStream> for Result +where + V: FromStream, +{ + /// Takes each element in the stream: if it is an `Err`, no further + /// elements are taken, and the `Err` is returned. Should no `Err` + /// occur, a container with the values of each `Result` is returned. + #[inline] + fn from_stream<'a, S: IntoStream>>( + stream: S, + ) -> Pin + Send + 'a>> + where + ::IntoStream: Send + 'a, + { + let stream = stream.into_stream(); + + Pin::from(Box::new(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_error = None; + let out: V = stream + .scan((), |_, elem| { + match elem { + Ok(elem) => Some(elem), + Err(err) => { + found_error = Some(err); + // Stop processing the stream on error + None + } + } + }) + .collect() + .await; + + match found_error { + Some(err) => Err(err), + None => Ok(out), + } + })) + } +} diff --git a/src/result/mod.rs b/src/result/mod.rs new file mode 100644 index 0000000..908f9c4 --- /dev/null +++ b/src/result/mod.rs @@ -0,0 +1,9 @@ +//! The Rust core error handling type +//! +//! This module provides the `Result` type for returning and +//! propagating errors. + +mod from_stream; + +#[doc(inline)] +pub use std::result::Result; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e13e2eb..62f1b4b 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -726,6 +726,22 @@ pub trait Stream { /// let buf: Vec = s.collect().await; /// /// assert_eq!(buf, vec![9; 3]); + /// + /// // You can also collect streams of Result values + /// // into any collection that implements FromStream + /// let s = stream::repeat(Ok(9)).take(3); + /// // We are using Vec here, but other collections + /// // are supported as well + /// let buf: Result, ()> = s.collect().await; + /// + /// assert_eq!(buf, Ok(vec![9; 3])); + /// + /// // The stream will stop on the first Err and + /// // return that instead + /// let s = stream::repeat(Err(5)).take(3); + /// let buf: Result, u8> = s.collect().await; + /// + /// assert_eq!(buf, Err(5)); /// # /// # }) } /// ```