forked from mirror/async-std
Added the ability to collect a stream of results
This commit is contained in:
parent
e6880e12e8
commit
ad0510110c
4 changed files with 69 additions and 0 deletions
|
@ -52,6 +52,7 @@ pub mod stream;
|
|||
pub mod sync;
|
||||
pub mod task;
|
||||
mod vec;
|
||||
mod result;
|
||||
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[cfg(feature = "unstable")]
|
||||
|
|
43
src/result/from_stream.rs
Normal file
43
src/result/from_stream.rs
Normal file
|
@ -0,0 +1,43 @@
|
|||
use crate::stream::{FromStream, IntoStream, Stream};
|
||||
|
||||
use std::pin::Pin;
|
||||
|
||||
impl<T: Send, E: Send, V> FromStream<Result<T, E>> for Result<V, E>
|
||||
where V: FromStream<T> {
|
||||
/// Takes each element in the stream: if it is an `Err`, no further
|
||||
/// elements are taken, and the `Err` is returned. Should no `Err`
|
||||
/// occur, a container with the values of each `Result` is returned.
|
||||
#[inline]
|
||||
fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>(
|
||||
stream: S,
|
||||
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
|
||||
where
|
||||
<S as IntoStream>::IntoStream: Send + 'a,
|
||||
{
|
||||
let stream = stream.into_stream();
|
||||
|
||||
Pin::from(Box::new(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_error = None;
|
||||
let out: V = stream.scan((), |_, elem| {
|
||||
match elem {
|
||||
Ok(elem) => Some(elem),
|
||||
Err(err) => {
|
||||
found_error = Some(err);
|
||||
// Stop processing the stream on error
|
||||
None
|
||||
},
|
||||
}
|
||||
}).collect().await;
|
||||
|
||||
match found_error {
|
||||
Some(err) => Err(err),
|
||||
None => Ok(out),
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
9
src/result/mod.rs
Normal file
9
src/result/mod.rs
Normal file
|
@ -0,0 +1,9 @@
|
|||
//! The Rust core error handling type
|
||||
//!
|
||||
//! This module provides the `Result<T, E>` type for returning and
|
||||
//! propagating errors.
|
||||
|
||||
mod from_stream;
|
||||
|
||||
#[doc(inline)]
|
||||
pub use std::result::Result;
|
|
@ -695,6 +695,22 @@ pub trait Stream {
|
|||
/// let buf: Vec<u8> = s.collect().await;
|
||||
///
|
||||
/// assert_eq!(buf, vec![9; 3]);
|
||||
///
|
||||
/// // You can also collect streams of Result values
|
||||
/// // into any collection that implements FromStream
|
||||
/// let s = stream::repeat(Ok(9)).take(3);
|
||||
/// // We are using Vec here, but other collections
|
||||
/// // are supported as well
|
||||
/// let buf: Result<Vec<u8>, ()> = s.collect().await;
|
||||
///
|
||||
/// assert_eq!(buf, Ok(vec![9; 3]));
|
||||
///
|
||||
/// // The stream will stop on the first Err and
|
||||
/// // return that instead
|
||||
/// let s = stream::repeat(Err(5)).take(3);
|
||||
/// let buf: Result<Vec<u8>, u8> = s.collect().await;
|
||||
///
|
||||
/// assert_eq!(buf, Err(5));
|
||||
/// #
|
||||
/// # }) }
|
||||
/// ```
|
||||
|
|
Loading…
Reference in a new issue