From ad0510110c85fe928d5588f9269870c70b8ba179 Mon Sep 17 00:00:00 2001 From: Sunjay Varma Date: Tue, 17 Sep 2019 16:25:26 -0400 Subject: [PATCH 1/2] Added the ability to collect a stream of results --- src/lib.rs | 1 + src/result/from_stream.rs | 43 +++++++++++++++++++++++++++++++++++++++ src/result/mod.rs | 9 ++++++++ src/stream/stream/mod.rs | 16 +++++++++++++++ 4 files changed, 69 insertions(+) create mode 100644 src/result/from_stream.rs create mode 100644 src/result/mod.rs diff --git a/src/lib.rs b/src/lib.rs index 8336517..76ea9c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,6 +52,7 @@ pub mod stream; pub mod sync; pub mod task; mod vec; +mod result; #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg(feature = "unstable")] diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs new file mode 100644 index 0000000..f74d06d --- /dev/null +++ b/src/result/from_stream.rs @@ -0,0 +1,43 @@ +use crate::stream::{FromStream, IntoStream, Stream}; + +use std::pin::Pin; + +impl FromStream> for Result + where V: FromStream { + /// Takes each element in the stream: if it is an `Err`, no further + /// elements are taken, and the `Err` is returned. Should no `Err` + /// occur, a container with the values of each `Result` is returned. + #[inline] + fn from_stream<'a, S: IntoStream>>( + stream: S, + ) -> Pin + Send + 'a>> + where + ::IntoStream: Send + 'a, + { + let stream = stream.into_stream(); + + Pin::from(Box::new(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_error = None; + let out: V = stream.scan((), |_, elem| { + match elem { + Ok(elem) => Some(elem), + Err(err) => { + found_error = Some(err); + // Stop processing the stream on error + None + }, + } + }).collect().await; + + match found_error { + Some(err) => Err(err), + None => Ok(out), + } + })) + } +} + diff --git a/src/result/mod.rs b/src/result/mod.rs new file mode 100644 index 0000000..908f9c4 --- /dev/null +++ b/src/result/mod.rs @@ -0,0 +1,9 @@ +//! The Rust core error handling type +//! +//! This module provides the `Result` type for returning and +//! propagating errors. + +mod from_stream; + +#[doc(inline)] +pub use std::result::Result; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index ca83fbb..0ad50e7 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -695,6 +695,22 @@ pub trait Stream { /// let buf: Vec = s.collect().await; /// /// assert_eq!(buf, vec![9; 3]); + /// + /// // You can also collect streams of Result values + /// // into any collection that implements FromStream + /// let s = stream::repeat(Ok(9)).take(3); + /// // We are using Vec here, but other collections + /// // are supported as well + /// let buf: Result, ()> = s.collect().await; + /// + /// assert_eq!(buf, Ok(vec![9; 3])); + /// + /// // The stream will stop on the first Err and + /// // return that instead + /// let s = stream::repeat(Err(5)).take(3); + /// let buf: Result, u8> = s.collect().await; + /// + /// assert_eq!(buf, Err(5)); /// # /// # }) } /// ``` From c87dab2d5eb75c95d1d3b81dff264c82e0aff82a Mon Sep 17 00:00:00 2001 From: Sunjay Varma Date: Tue, 17 Sep 2019 16:48:58 -0400 Subject: [PATCH 2/2] rustfmt --- src/lib.rs | 2 +- src/result/from_stream.rs | 28 ++++++++++++++++------------ 2 files changed, 17 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 76ea9c4..dfa9a07 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,11 +48,11 @@ pub mod io; pub mod net; pub mod os; pub mod prelude; +mod result; pub mod stream; pub mod sync; pub mod task; mod vec; -mod result; #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg(feature = "unstable")] diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index f74d06d..71cf61d 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -3,7 +3,9 @@ use crate::stream::{FromStream, IntoStream, Stream}; use std::pin::Pin; impl FromStream> for Result - where V: FromStream { +where + V: FromStream, +{ /// Takes each element in the stream: if it is an `Err`, no further /// elements are taken, and the `Err` is returned. Should no `Err` /// occur, a container with the values of each `Result` is returned. @@ -22,16 +24,19 @@ impl FromStream> for Result // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_error = None; - let out: V = stream.scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - found_error = Some(err); - // Stop processing the stream on error - None - }, - } - }).collect().await; + let out: V = stream + .scan((), |_, elem| { + match elem { + Ok(elem) => Some(elem), + Err(err) => { + found_error = Some(err); + // Stop processing the stream on error + None + } + } + }) + .collect() + .await; match found_error { Some(err) => Err(err), @@ -40,4 +45,3 @@ impl FromStream> for Result })) } } -