From ed7ddacb28f1cb3a5c4b7b2a92afa6003dc320ef Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Fri, 17 Jan 2020 17:13:52 +0300 Subject: [PATCH] Rewrote `Result`s implementation using `take_while` and `filter_map` --- src/result/from_stream.rs | 32 ++++++++++++++++++++------------ src/result/product.rs | 39 +++++++++++++++++++++++++-------------- src/result/sum.rs | 39 +++++++++++++++++++++++++-------------- 3 files changed, 70 insertions(+), 40 deletions(-) diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 86405ce0..8ce4b853 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -17,26 +17,34 @@ where let stream = stream.into_stream(); Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; let out: V = stream - .scan(&mut found_error, |error, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - **error = Some(err); - // Stop processing the stream on error - None - } + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None } }) .collect() .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/product.rs b/src/result/product.rs index 89ddacb9..45782ff7 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -40,24 +40,35 @@ where S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::product(stream.scan(&mut found_error, |error, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - **error = Some(err); - // Stop processing the stream on error - None - } - } - })) + let out = >::product( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }), + ) .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/sum.rs b/src/result/sum.rs index c0ef4c26..b6d84a0c 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -40,24 +40,35 @@ where S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::sum(stream.scan(&mut found_error, |error, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - **error = Some(err); - // Stop processing the stream on error - None - } - } - })) + let out = >::sum( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }), + ) .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) }