Merge pull request #667 from olegnn/option_take_while

Use `take_while` instead of `scan` in `impl` of `Product`, `Sum` and `FromStream` for `Option` and `Result`
pull/642/merge
Yoshua Wuyts 5 years ago committed by GitHub
commit 6c1b5eb3ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,6 +2,7 @@ use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{FromStream, IntoStream}; use crate::stream::{FromStream, IntoStream};
use std::convert::identity;
impl<T, V> FromStream<Option<T>> for Option<V> impl<T, V> FromStream<Option<T>> for Option<V>
where where
@ -17,24 +18,22 @@ where
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_error = false; let mut found_none = false;
let out: V = stream let out: V = stream
.scan((), |_, elem| { .take_while(|elem| {
match elem { elem.is_some() || {
Some(elem) => Some(elem), found_none = true;
None => { // Stop processing the stream on `None`
found_error = true; false
// Stop processing the stream on error
None
}
} }
}) })
.filter_map(identity)
.collect() .collect()
.await; .await;
if found_error { None } else { Some(out) } if found_none { None } else { Some(out) }
}) })
} }
} }

@ -1,7 +1,8 @@
use std::pin::Pin; use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Stream, Product}; use crate::stream::{Product, Stream};
use std::convert::identity;
impl<T, U> Product<Option<U>> for Option<T> impl<T, U> Product<Option<U>> for Option<T>
where where
@ -36,29 +37,27 @@ where
``` ```
"#] "#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>> fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a where
S: Stream<Item = Option<U>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_none = false; let mut found_none = false;
let out = <T as Product<U>>::product(stream let out = <T as Product<U>>::product(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Some(elem) => Some(elem), elem.is_some() || {
None => {
found_none = true; found_none = true;
// Stop processing the stream on error // Stop processing the stream on `None`
None false
} }
} })
})).await; .filter_map(identity),
)
.await;
if found_none { if found_none { None } else { Some(out) }
None
} else {
Some(out)
}
}) })
} }
} }

@ -2,6 +2,7 @@ use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Stream, Sum}; use crate::stream::{Stream, Sum};
use std::convert::identity;
impl<T, U> Sum<Option<U>> for Option<T> impl<T, U> Sum<Option<U>> for Option<T>
where where
@ -31,29 +32,27 @@ where
``` ```
"#] "#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>> fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a where
S: Stream<Item = Option<U>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_none = false; let mut found_none = false;
let out = <T as Sum<U>>::sum(stream let out = <T as Sum<U>>::sum(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Some(elem) => Some(elem), elem.is_some() || {
None => {
found_none = true; found_none = true;
// Stop processing the stream on error // Stop processing the stream on `None`
None false
} }
} })
})).await; .filter_map(identity),
)
.await;
if found_none { if found_none { None } else { Some(out) }
None
} else {
Some(out)
}
}) })
} }
} }

@ -34,26 +34,34 @@ where
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut is_error = false;
let mut found_error = None; let mut found_error = None;
let out: V = stream let out: V = stream
.scan((), |_, elem| { .take_while(|elem| {
match elem { // Stop processing the stream on `Err`
Ok(elem) => Some(elem), !is_error
Err(err) => { && (elem.is_ok() || {
found_error = Some(err); is_error = true;
// Stop processing the stream on error // Capture first `Err`
None true
} })
})
.filter_map(|elem| match elem {
Ok(value) => Some(value),
Err(err) => {
found_error = Some(err);
None
} }
}) })
.collect() .collect()
.await; .await;
match found_error { if is_error {
Some(err) => Err(err), Err(found_error.unwrap())
None => Ok(out), } else {
Ok(out)
} }
}) })
} }

@ -1,7 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Stream, Product}; use crate::stream::{Product, Stream};
impl<T, U, E> Product<Result<U, E>> for Result<T, E> impl<T, U, E> Product<Result<U, E>> for Result<T, E>
where where
@ -36,26 +36,39 @@ where
``` ```
"#] "#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>> fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a where
S: Stream<Item = Result<U, E>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut is_error = false;
let mut found_error = None; let mut found_error = None;
let out = <T as Product<U>>::product(stream let out = <T as Product<U>>::product(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Ok(elem) => Some(elem), // Stop processing the stream on `Err`
!is_error
&& (elem.is_ok() || {
is_error = true;
// Capture first `Err`
true
})
})
.filter_map(|elem| match elem {
Ok(value) => Some(value),
Err(err) => { Err(err) => {
found_error = Some(err); found_error = Some(err);
// Stop processing the stream on error
None None
} }
} }),
})).await; )
match found_error { .await;
Some(err) => Err(err),
None => Ok(out) if is_error {
Err(found_error.unwrap())
} else {
Ok(out)
} }
}) })
} }

@ -36,26 +36,39 @@ where
``` ```
"#] "#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>> fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a where
S: Stream<Item = Result<U, E>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut is_error = false;
let mut found_error = None; let mut found_error = None;
let out = <T as Sum<U>>::sum(stream let out = <T as Sum<U>>::sum(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Ok(elem) => Some(elem), // Stop processing the stream on `Err`
!is_error
&& (elem.is_ok() || {
is_error = true;
// Capture first `Err`
true
})
})
.filter_map(|elem| match elem {
Ok(value) => Some(value),
Err(err) => { Err(err) => {
found_error = Some(err); found_error = Some(err);
// Stop processing the stream on error
None None
} }
} }),
})).await; )
match found_error { .await;
Some(err) => Err(err),
None => Ok(out) if is_error {
Err(found_error.unwrap())
} else {
Ok(out)
} }
}) })
} }

@ -8,6 +8,6 @@ impl FromStream<()> for () {
fn from_stream<'a, S: IntoStream<Item = ()> + 'a>( fn from_stream<'a, S: IntoStream<Item = ()> + 'a>(
stream: S, stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> { ) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
Box::pin(stream.into_stream().for_each(|_| ())) Box::pin(stream.into_stream().for_each(drop))
} }
} }

Loading…
Cancel
Save