From 76b10c4784fc109e1574fa7874975e2a9ed80888 Mon Sep 17 00:00:00 2001 From: Sunjay Varma Date: Sat, 28 Sep 2019 22:10:53 -0400 Subject: [PATCH 1/4] FromStream for Option --- src/lib.rs | 1 + src/option/from_stream.rs | 49 +++++++++++++++++++++++++++++++++++++++ src/option/mod.rs | 9 +++++++ 3 files changed, 59 insertions(+) create mode 100644 src/option/from_stream.rs create mode 100644 src/option/mod.rs diff --git a/src/lib.rs b/src/lib.rs index f188a68..ba0326a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,6 +67,7 @@ cfg_if! { mod vec; mod result; + mod option; } } diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs new file mode 100644 index 0000000..2d36ca1 --- /dev/null +++ b/src/option/from_stream.rs @@ -0,0 +1,49 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{FromStream, IntoStream}; + +impl FromStream> for Option +where + V: FromStream, +{ + /// Takes each element in the stream: if it is `None`, no further + /// elements are taken, and `None` is returned. Should no `None` + /// occur, a container with the values of each `Option` is returned. + #[inline] + fn from_stream<'a, S: IntoStream>>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Pin::from(Box::new(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_error = false; + let out: V = stream + .scan((), |_, elem| { + match elem { + Some(elem) => Some(elem), + None => { + found_error = true; + // Stop processing the stream on error + None + } + } + }) + .collect() + .await; + + if found_error { + None + } else { + Some(out) + } + })) + } +} diff --git a/src/option/mod.rs b/src/option/mod.rs new file mode 100644 index 0000000..afb29ad --- /dev/null +++ b/src/option/mod.rs @@ -0,0 +1,9 @@ +//! The Rust core optional value type +//! +//! This module provides the `Option` type for returning and +//! propagating optional values. + +mod from_stream; + +#[doc(inline)] +pub use std::option::Option; From ab7129cd45c7cddabf9629d5c4a1b2d533b33dfd Mon Sep 17 00:00:00 2001 From: Sunjay Varma Date: Sat, 28 Sep 2019 22:11:13 -0400 Subject: [PATCH 2/4] FromStream for Vec in terms of Extend --- src/vec/from_stream.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index ad82797..8848bda 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -1,7 +1,6 @@ use std::pin::Pin; -use crate::prelude::*; -use crate::stream::{FromStream, IntoStream}; +use crate::stream::{FromStream, IntoStream, Extend}; impl FromStream for Vec { #[inline] @@ -17,9 +16,7 @@ impl FromStream for Vec { pin_utils::pin_mut!(stream); let mut out = vec![]; - while let Some(item) = stream.next().await { - out.push(item); - } + out.stream_extend(stream).await; out })) } From fb7582bd7ae73c8cd6eaa1b49dc1f024d8b7fd8e Mon Sep 17 00:00:00 2001 From: Sunjay Varma Date: Sat, 28 Sep 2019 22:16:12 -0400 Subject: [PATCH 3/4] Using Box::pin(...) instead of Pin::from(Box::new(...)) --- src/option/from_stream.rs | 4 ++-- src/result/from_stream.rs | 4 ++-- src/vec/from_stream.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index 2d36ca1..2c4c139 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -19,7 +19,7 @@ where { let stream = stream.into_stream(); - Pin::from(Box::new(async move { + Box::pin(async move { pin_utils::pin_mut!(stream); // Using `scan` here because it is able to stop the stream early @@ -44,6 +44,6 @@ where } else { Some(out) } - })) + }) } } diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 74cc567..6033eb9 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -19,7 +19,7 @@ where { let stream = stream.into_stream(); - Pin::from(Box::new(async move { + Box::pin(async move { pin_utils::pin_mut!(stream); // Using `scan` here because it is able to stop the stream early @@ -43,6 +43,6 @@ where Some(err) => Err(err), None => Ok(out), } - })) + }) } } diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index 8848bda..692c7fa 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -12,12 +12,12 @@ impl FromStream for Vec { { let stream = stream.into_stream(); - Pin::from(Box::new(async move { + Box::pin(async move { pin_utils::pin_mut!(stream); let mut out = vec![]; out.stream_extend(stream).await; out - })) + }) } } From a05b564486cdeddd225f3b3999054cc908a736f2 Mon Sep 17 00:00:00 2001 From: Sunjay Varma Date: Mon, 30 Sep 2019 20:14:16 -0400 Subject: [PATCH 4/4] rustfmt --- src/option/from_stream.rs | 6 +----- src/vec/from_stream.rs | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index 2c4c139..e4da809 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -39,11 +39,7 @@ where .collect() .await; - if found_error { - None - } else { - Some(out) - } + if found_error { None } else { Some(out) } }) } } diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index 692c7fa..26196af 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -1,6 +1,6 @@ use std::pin::Pin; -use crate::stream::{FromStream, IntoStream, Extend}; +use crate::stream::{Extend, FromStream, IntoStream}; impl FromStream for Vec { #[inline]