2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-01-19 20:13:51 +00:00
207: Added the ability to collect a stream of results r=yoshuawuyts a=sunjay

As requested here: https://twitter.com/yoshuawuyts/status/1174026374316773377

The standard library has a very useful implementation of `FromIterator` that takes an iterator of `Result<T, E>` values and is able to produce a value of type `Result<Vec<T>, E>`. I asked for this in `async-std` and @yoshuawuyts recommended that I contribute the impl. It turns out that the implementation in the standard library is even more general than I initially thought. It allows any collection that implements `FromIterator` to be collected from an iterator of `Result<T, E>` values. That means that you can collect into `Result<Vec<T>, E>`, `Result<HashSet<T>, E>`, etc.

I wanted to add a similarly generic impl for this crate so we can also support collecting into any collection that implements `FromStream`. 

The implementation for this is based heavily on [what exists in `std`](9150f844e2/src/libcore/result.rs (L1379-L1429)). I made a new `result` module since that's where this impl is in `std`. I still wanted to maintain the conventions of this repo, so I copied the `vec` module that @yoshuawuyts created in #125. Much like in that PR, the new `result` module is private.

There is a doctest in the documentation for `collect` that both teaches that this feature exists and tests that it works in some simple cases.

## Documentation Screenshot

![image](https://user-images.githubusercontent.com/530939/65075935-de89ae00-d965-11e9-9cd6-8b19b694ed3e.png)


Co-authored-by: Sunjay Varma <varma.sunjay@gmail.com>
This commit is contained in:
bors[bot] 2019-09-17 22:35:00 +00:00 committed by GitHub
commit c8475ca95e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 73 additions and 0 deletions

View file

@ -48,6 +48,7 @@ pub mod io;
pub mod net; pub mod net;
pub mod os; pub mod os;
pub mod prelude; pub mod prelude;
mod result;
pub mod stream; pub mod stream;
pub mod sync; pub mod sync;
pub mod task; pub mod task;

47
src/result/from_stream.rs Normal file
View file

@ -0,0 +1,47 @@
use crate::stream::{FromStream, IntoStream, Stream};
use std::pin::Pin;
impl<T: Send, E: Send, V> FromStream<Result<T, E>> for Result<V, E>
where
V: FromStream<T>,
{
/// Takes each element in the stream: if it is an `Err`, no further
/// elements are taken, and the `Err` is returned. Should no `Err`
/// occur, a container with the values of each `Result` is returned.
#[inline]
fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
where
<S as IntoStream>::IntoStream: Send + 'a,
{
let stream = stream.into_stream();
Pin::from(Box::new(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_error = None;
let out: V = stream
.scan((), |_, elem| {
match elem {
Ok(elem) => Some(elem),
Err(err) => {
found_error = Some(err);
// Stop processing the stream on error
None
}
}
})
.collect()
.await;
match found_error {
Some(err) => Err(err),
None => Ok(out),
}
}))
}
}

9
src/result/mod.rs Normal file
View file

@ -0,0 +1,9 @@
//! The Rust core error handling type
//!
//! This module provides the `Result<T, E>` type for returning and
//! propagating errors.
mod from_stream;
#[doc(inline)]
pub use std::result::Result;

View file

@ -726,6 +726,22 @@ pub trait Stream {
/// let buf: Vec<u8> = s.collect().await; /// let buf: Vec<u8> = s.collect().await;
/// ///
/// assert_eq!(buf, vec![9; 3]); /// assert_eq!(buf, vec![9; 3]);
///
/// // You can also collect streams of Result values
/// // into any collection that implements FromStream
/// let s = stream::repeat(Ok(9)).take(3);
/// // We are using Vec here, but other collections
/// // are supported as well
/// let buf: Result<Vec<u8>, ()> = s.collect().await;
///
/// assert_eq!(buf, Ok(vec![9; 3]));
///
/// // The stream will stop on the first Err and
/// // return that instead
/// let s = stream::repeat(Err(5)).take(3);
/// let buf: Result<Vec<u8>, u8> = s.collect().await;
///
/// assert_eq!(buf, Err(5));
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```