Use `take_while` instead of `scan` for `Option`

split-by-pattern
Oleg Nosov 5 years ago
parent 98d45f4be1
commit 83afbab2ef
No known key found for this signature in database
GPG Key ID: DE90B83800644E24

@ -17,24 +17,25 @@ where
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_error = false; let mut found_none = false;
let out: V = stream let out: V = stream
.scan((), |_, elem| { .take_while(|elem| {
match elem { elem.is_some() || {
Some(elem) => Some(elem), found_none = true;
None => { false
found_error = true;
// Stop processing the stream on error
None
}
} }
}) })
.map(Option::unwrap)
.collect() .collect()
.await; .await;
if found_error { None } else { Some(out) } if found_none {
None
} else {
Some(out)
}
}) })
} }
} }

@ -1,7 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Stream, Product}; use crate::stream::{Product, Stream};
impl<T, U> Product<Option<U>> for Option<T> impl<T, U> Product<Option<U>> for Option<T>
where where
@ -36,23 +36,24 @@ where
``` ```
"#] "#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>> fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a where
S: Stream<Item = Option<U>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_none = false; let mut found_none = false;
let out = <T as Product<U>>::product(stream let out = <T as Product<U>>::product(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Some(elem) => Some(elem), elem.is_some() || {
None => {
found_none = true; found_none = true;
// Stop processing the stream on error false
None
}
} }
})).await; })
.map(Option::unwrap),
)
.await;
if found_none { if found_none {
None None

@ -31,23 +31,24 @@ where
``` ```
"#] "#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>> fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a where
S: Stream<Item = Option<U>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_none = false; let mut found_none = false;
let out = <T as Sum<U>>::sum(stream let out = <T as Sum<U>>::sum(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Some(elem) => Some(elem), elem.is_some() || {
None => {
found_none = true; found_none = true;
// Stop processing the stream on error false
None
}
} }
})).await; })
.map(Option::unwrap),
)
.await;
if found_none { if found_none {
None None

Loading…
Cancel
Save