From 83afbab2efbe35ae71b749bf53b550794dfdc77c Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Sun, 12 Jan 2020 17:32:59 +0300 Subject: [PATCH 1/6] Use `take_while` instead of `scan` for `Option` --- src/option/from_stream.rs | 23 ++++++++++++----------- src/option/product.rs | 25 +++++++++++++------------ src/option/sum.rs | 23 ++++++++++++----------- 3 files changed, 37 insertions(+), 34 deletions(-) diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index 8679114..dc6d841 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -17,24 +17,25 @@ where let stream = stream.into_stream(); Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs - let mut found_error = false; + let mut found_none = false; let out: V = stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { - found_error = true; - // Stop processing the stream on error - None - } + .take_while(|elem| { + elem.is_some() || { + found_none = true; + false } }) + .map(Option::unwrap) .collect() .await; - if found_error { None } else { Some(out) } + if found_none { + None + } else { + Some(out) + } }) } } diff --git a/src/option/product.rs b/src/option/product.rs index abaab73..ff61eaa 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -1,7 +1,7 @@ use std::pin::Pin; use crate::prelude::*; -use crate::stream::{Stream, Product}; +use crate::stream::{Product, Stream}; impl Product> for Option where @@ -36,23 +36,24 @@ where ``` "#] fn product<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; - let out = >::product(stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { + let out = >::product( + stream + .take_while(|elem| { + elem.is_some() || { found_none = true; - // Stop processing the stream on error - None + false } - } - })).await; + }) + .map(Option::unwrap), + ) + .await; if found_none { None diff --git a/src/option/sum.rs b/src/option/sum.rs index d2e4483..cd92f78 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -31,23 +31,24 @@ where ``` "#] fn sum<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; - let out = >::sum(stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { + let out = >::sum( + stream + .take_while(|elem| { + elem.is_some() || { found_none = true; - // Stop processing the stream on error - None + false } - } - })).await; + }) + .map(Option::unwrap), + ) + .await; if found_none { None From fb567a3a09da32569f05712bdb07b4f31f2f9526 Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Sun, 12 Jan 2020 17:53:16 +0300 Subject: [PATCH 2/6] Recovered comments --- src/option/from_stream.rs | 1 + src/option/product.rs | 1 + src/option/sum.rs | 1 + 3 files changed, 3 insertions(+) diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index dc6d841..f0aafea 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -24,6 +24,7 @@ where .take_while(|elem| { elem.is_some() || { found_none = true; + // Stop processing the stream on `None` false } }) diff --git a/src/option/product.rs b/src/option/product.rs index ff61eaa..2238a91 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -48,6 +48,7 @@ where .take_while(|elem| { elem.is_some() || { found_none = true; + // Stop processing the stream on `None` false } }) diff --git a/src/option/sum.rs b/src/option/sum.rs index cd92f78..0570a1e 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -43,6 +43,7 @@ where .take_while(|elem| { elem.is_some() || { found_none = true; + // Stop processing the stream on error false } }) From 134089af2c711e3786beb4174e0f1592d9de7752 Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Wed, 15 Jan 2020 08:55:18 +0300 Subject: [PATCH 3/6] Use `filter_map(identity)` + other fixes --- src/option/from_stream.rs | 3 ++- src/option/product.rs | 3 ++- src/option/sum.rs | 5 +++-- src/result/from_stream.rs | 2 +- src/result/product.rs | 28 +++++++++++++++------------- src/result/sum.rs | 26 ++++++++++++++------------ src/unit/from_stream.rs | 2 +- src/utils.rs | 8 ++++++++ 8 files changed, 46 insertions(+), 31 deletions(-) diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index f0aafea..2194f29 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -2,6 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{FromStream, IntoStream}; +use crate::utils::identity; impl FromStream> for Option where @@ -28,7 +29,7 @@ where false } }) - .map(Option::unwrap) + .filter_map(identity) .collect() .await; diff --git a/src/option/product.rs b/src/option/product.rs index 2238a91..f733bef 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -2,6 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{Product, Stream}; +use crate::utils::identity; impl Product> for Option where @@ -52,7 +53,7 @@ where false } }) - .map(Option::unwrap), + .filter_map(identity), ) .await; diff --git a/src/option/sum.rs b/src/option/sum.rs index 0570a1e..64fdb07 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -2,6 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{Stream, Sum}; +use crate::utils::identity; impl Sum> for Option where @@ -43,11 +44,11 @@ where .take_while(|elem| { elem.is_some() || { found_none = true; - // Stop processing the stream on error + // Stop processing the stream on `None` false } }) - .map(Option::unwrap), + .filter_map(identity), ) .await; diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index a8490d6..424a53b 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -21,7 +21,7 @@ where // if a failure occurs let mut found_error = None; let out: V = stream - .scan((), |_, elem| { + .scan((), |(), elem| { match elem { Ok(elem) => Some(elem), Err(err) => { diff --git a/src/result/product.rs b/src/result/product.rs index ec9d94a..ad2e014 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -1,7 +1,7 @@ use std::pin::Pin; use crate::prelude::*; -use crate::stream::{Stream, Product}; +use crate::stream::{Product, Stream}; impl Product> for Result where @@ -36,26 +36,28 @@ where ``` "#] fn product<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_error = None; - let out = >::product(stream - .scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - found_error = Some(err); - // Stop processing the stream on error - None - } + let out = >::product(stream.scan((), |(), elem| { + match elem { + Ok(elem) => Some(elem), + Err(err) => { + found_error = Some(err); + // Stop processing the stream on error + None } - })).await; + } + })) + .await; + match found_error { Some(err) => Err(err), - None => Ok(out) + None => Ok(out), } }) } diff --git a/src/result/sum.rs b/src/result/sum.rs index ccc4240..1cf5b7a 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -36,26 +36,28 @@ where ``` "#] fn sum<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_error = None; - let out = >::sum(stream - .scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - found_error = Some(err); - // Stop processing the stream on error - None - } + let out = >::sum(stream.scan((), |(), elem| { + match elem { + Ok(elem) => Some(elem), + Err(err) => { + found_error = Some(err); + // Stop processing the stream on error + None } - })).await; + } + })) + .await; + match found_error { Some(err) => Err(err), - None => Ok(out) + None => Ok(out), } }) } diff --git a/src/unit/from_stream.rs b/src/unit/from_stream.rs index da216e2..7b78438 100644 --- a/src/unit/from_stream.rs +++ b/src/unit/from_stream.rs @@ -8,6 +8,6 @@ impl FromStream<()> for () { fn from_stream<'a, S: IntoStream + 'a>( stream: S, ) -> Pin + 'a>> { - Box::pin(stream.into_stream().for_each(|_| ())) + Box::pin(stream.into_stream().for_each(drop)) } } diff --git a/src/utils.rs b/src/utils.rs index ef50ed0..780e733 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -52,6 +52,14 @@ pub fn random(n: u32) -> u32 { }) } +/// Returns given argument without changes. +#[allow(dead_code)] +#[doc(hidden)] +#[inline(always)] +pub(crate) fn identity(arg: T) -> T { + arg +} + /// Add additional context to errors pub(crate) trait Context { fn context(self, message: impl Fn() -> String) -> Self; From 38de0bfd2267cb299952046f129eb6cc896bde9a Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Wed, 15 Jan 2020 09:42:30 +0300 Subject: [PATCH 4/6] Use `std::convert::identity` --- src/option/from_stream.rs | 8 ++------ src/option/product.rs | 8 ++------ src/option/sum.rs | 8 ++------ src/utils.rs | 8 -------- 4 files changed, 6 insertions(+), 26 deletions(-) diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index 2194f29..de929ca 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -2,7 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{FromStream, IntoStream}; -use crate::utils::identity; +use std::convert::identity; impl FromStream> for Option where @@ -33,11 +33,7 @@ where .collect() .await; - if found_none { - None - } else { - Some(out) - } + if found_none { None } else { Some(out) } }) } } diff --git a/src/option/product.rs b/src/option/product.rs index f733bef..b446c1f 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -2,7 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{Product, Stream}; -use crate::utils::identity; +use std::convert::identity; impl Product> for Option where @@ -57,11 +57,7 @@ where ) .await; - if found_none { - None - } else { - Some(out) - } + if found_none { None } else { Some(out) } }) } } diff --git a/src/option/sum.rs b/src/option/sum.rs index 64fdb07..de404f4 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -2,7 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{Stream, Sum}; -use crate::utils::identity; +use std::convert::identity; impl Sum> for Option where @@ -52,11 +52,7 @@ where ) .await; - if found_none { - None - } else { - Some(out) - } + if found_none { None } else { Some(out) } }) } } diff --git a/src/utils.rs b/src/utils.rs index 780e733..ef50ed0 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -52,14 +52,6 @@ pub fn random(n: u32) -> u32 { }) } -/// Returns given argument without changes. -#[allow(dead_code)] -#[doc(hidden)] -#[inline(always)] -pub(crate) fn identity(arg: T) -> T { - arg -} - /// Add additional context to errors pub(crate) trait Context { fn context(self, message: impl Fn() -> String) -> Self; From ed248017b476f17334260db31daf82cc07c2a465 Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Wed, 15 Jan 2020 12:06:50 +0300 Subject: [PATCH 5/6] Use internal `scan` state in `Result`s implementation --- src/result/from_stream.rs | 4 ++-- src/result/product.rs | 4 ++-- src/result/sum.rs | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 424a53b..86405ce 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -21,11 +21,11 @@ where // if a failure occurs let mut found_error = None; let out: V = stream - .scan((), |(), elem| { + .scan(&mut found_error, |error, elem| { match elem { Ok(elem) => Some(elem), Err(err) => { - found_error = Some(err); + **error = Some(err); // Stop processing the stream on error None } diff --git a/src/result/product.rs b/src/result/product.rs index ad2e014..89ddacb 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -43,11 +43,11 @@ where // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_error = None; - let out = >::product(stream.scan((), |(), elem| { + let out = >::product(stream.scan(&mut found_error, |error, elem| { match elem { Ok(elem) => Some(elem), Err(err) => { - found_error = Some(err); + **error = Some(err); // Stop processing the stream on error None } diff --git a/src/result/sum.rs b/src/result/sum.rs index 1cf5b7a..c0ef4c2 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -43,11 +43,11 @@ where // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_error = None; - let out = >::sum(stream.scan((), |(), elem| { + let out = >::sum(stream.scan(&mut found_error, |error, elem| { match elem { Ok(elem) => Some(elem), Err(err) => { - found_error = Some(err); + **error = Some(err); // Stop processing the stream on error None } From ed7ddacb28f1cb3a5c4b7b2a92afa6003dc320ef Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Fri, 17 Jan 2020 17:13:52 +0300 Subject: [PATCH 6/6] Rewrote `Result`s implementation using `take_while` and `filter_map` --- src/result/from_stream.rs | 32 ++++++++++++++++++++------------ src/result/product.rs | 39 +++++++++++++++++++++++++-------------- src/result/sum.rs | 39 +++++++++++++++++++++++++-------------- 3 files changed, 70 insertions(+), 40 deletions(-) diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 86405ce..8ce4b85 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -17,26 +17,34 @@ where let stream = stream.into_stream(); Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; let out: V = stream - .scan(&mut found_error, |error, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - **error = Some(err); - // Stop processing the stream on error - None - } + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None } }) .collect() .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/product.rs b/src/result/product.rs index 89ddacb..45782ff 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -40,24 +40,35 @@ where S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::product(stream.scan(&mut found_error, |error, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - **error = Some(err); - // Stop processing the stream on error - None - } - } - })) + let out = >::product( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }), + ) .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/sum.rs b/src/result/sum.rs index c0ef4c2..b6d84a0 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -40,24 +40,35 @@ where S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::sum(stream.scan(&mut found_error, |error, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - **error = Some(err); - // Stop processing the stream on error - None - } - } - })) + let out = >::sum( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }), + ) .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) }