diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index 86791143..dc6d8416 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -17,24 +17,25 @@ where let stream = stream.into_stream(); Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs - let mut found_error = false; + let mut found_none = false; let out: V = stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { - found_error = true; - // Stop processing the stream on error - None - } + .take_while(|elem| { + elem.is_some() || { + found_none = true; + false } }) + .map(Option::unwrap) .collect() .await; - if found_error { None } else { Some(out) } + if found_none { + None + } else { + Some(out) + } }) } } diff --git a/src/option/product.rs b/src/option/product.rs index abaab73e..ff61eaab 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -1,7 +1,7 @@ use std::pin::Pin; use crate::prelude::*; -use crate::stream::{Stream, Product}; +use crate::stream::{Product, Stream}; impl Product> for Option where @@ -36,23 +36,24 @@ where ``` "#] fn product<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; - let out = >::product(stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { + let out = >::product( + stream + .take_while(|elem| { + elem.is_some() || { found_none = true; - // Stop processing the stream on error - None + false } - } - })).await; + }) + .map(Option::unwrap), + ) + .await; if found_none { None diff --git a/src/option/sum.rs b/src/option/sum.rs index d2e44830..cd92f78d 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -31,23 +31,24 @@ where ``` "#] fn sum<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; - let out = >::sum(stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { + let out = >::sum( + stream + .take_while(|elem| { + elem.is_some() || { found_none = true; - // Stop processing the stream on error - None + false } - } - })).await; + }) + .map(Option::unwrap), + ) + .await; if found_none { None