diff --git a/src/lib.rs b/src/lib.rs index f188a682..ba0326ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,6 +67,7 @@ cfg_if! { mod vec; mod result; + mod option; } } diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs new file mode 100644 index 00000000..e4da809e --- /dev/null +++ b/src/option/from_stream.rs @@ -0,0 +1,45 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{FromStream, IntoStream}; + +impl FromStream> for Option +where + V: FromStream, +{ + /// Takes each element in the stream: if it is `None`, no further + /// elements are taken, and `None` is returned. Should no `None` + /// occur, a container with the values of each `Option` is returned. + #[inline] + fn from_stream<'a, S: IntoStream>>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_error = false; + let out: V = stream + .scan((), |_, elem| { + match elem { + Some(elem) => Some(elem), + None => { + found_error = true; + // Stop processing the stream on error + None + } + } + }) + .collect() + .await; + + if found_error { None } else { Some(out) } + }) + } +} diff --git a/src/option/mod.rs b/src/option/mod.rs new file mode 100644 index 00000000..afb29adc --- /dev/null +++ b/src/option/mod.rs @@ -0,0 +1,9 @@ +//! The Rust core optional value type +//! +//! This module provides the `Option` type for returning and +//! propagating optional values. + +mod from_stream; + +#[doc(inline)] +pub use std::option::Option; diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 74cc5679..6033eb97 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -19,7 +19,7 @@ where { let stream = stream.into_stream(); - Pin::from(Box::new(async move { + Box::pin(async move { pin_utils::pin_mut!(stream); // Using `scan` here because it is able to stop the stream early @@ -43,6 +43,6 @@ where Some(err) => Err(err), None => Ok(out), } - })) + }) } } diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index ad82797d..26196af9 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -1,7 +1,6 @@ use std::pin::Pin; -use crate::prelude::*; -use crate::stream::{FromStream, IntoStream}; +use crate::stream::{Extend, FromStream, IntoStream}; impl FromStream for Vec { #[inline] @@ -13,14 +12,12 @@ impl FromStream for Vec { { let stream = stream.into_stream(); - Pin::from(Box::new(async move { + Box::pin(async move { pin_utils::pin_mut!(stream); let mut out = vec![]; - while let Some(item) = stream.next().await { - out.push(item); - } + out.stream_extend(stream).await; out - })) + }) } }