Use `filter_map(identity)` + other fixes

pull/667/head
Oleg Nosov 5 years ago
parent fb567a3a09
commit 134089af2c
No known key found for this signature in database
GPG Key ID: DE90B83800644E24

@ -2,6 +2,7 @@ use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{FromStream, IntoStream}; use crate::stream::{FromStream, IntoStream};
use crate::utils::identity;
impl<T, V> FromStream<Option<T>> for Option<V> impl<T, V> FromStream<Option<T>> for Option<V>
where where
@ -28,7 +29,7 @@ where
false false
} }
}) })
.map(Option::unwrap) .filter_map(identity)
.collect() .collect()
.await; .await;

@ -2,6 +2,7 @@ use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Product, Stream}; use crate::stream::{Product, Stream};
use crate::utils::identity;
impl<T, U> Product<Option<U>> for Option<T> impl<T, U> Product<Option<U>> for Option<T>
where where
@ -52,7 +53,7 @@ where
false false
} }
}) })
.map(Option::unwrap), .filter_map(identity),
) )
.await; .await;

@ -2,6 +2,7 @@ use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Stream, Sum}; use crate::stream::{Stream, Sum};
use crate::utils::identity;
impl<T, U> Sum<Option<U>> for Option<T> impl<T, U> Sum<Option<U>> for Option<T>
where where
@ -43,11 +44,11 @@ where
.take_while(|elem| { .take_while(|elem| {
elem.is_some() || { elem.is_some() || {
found_none = true; found_none = true;
// Stop processing the stream on error // Stop processing the stream on `None`
false false
} }
}) })
.map(Option::unwrap), .filter_map(identity),
) )
.await; .await;

@ -21,7 +21,7 @@ where
// if a failure occurs // if a failure occurs
let mut found_error = None; let mut found_error = None;
let out: V = stream let out: V = stream
.scan((), |_, elem| { .scan((), |(), elem| {
match elem { match elem {
Ok(elem) => Some(elem), Ok(elem) => Some(elem),
Err(err) => { Err(err) => {

@ -1,7 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Stream, Product}; use crate::stream::{Product, Stream};
impl<T, U, E> Product<Result<U, E>> for Result<T, E> impl<T, U, E> Product<Result<U, E>> for Result<T, E>
where where
@ -36,26 +36,28 @@ where
``` ```
"#] "#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>> fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a where
S: Stream<Item = Result<U, E>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `scan` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_error = None; let mut found_error = None;
let out = <T as Product<U>>::product(stream let out = <T as Product<U>>::product(stream.scan((), |(), elem| {
.scan((), |_, elem| { match elem {
match elem { Ok(elem) => Some(elem),
Ok(elem) => Some(elem), Err(err) => {
Err(err) => { found_error = Some(err);
found_error = Some(err); // Stop processing the stream on error
// Stop processing the stream on error None
None
}
} }
})).await; }
}))
.await;
match found_error { match found_error {
Some(err) => Err(err), Some(err) => Err(err),
None => Ok(out) None => Ok(out),
} }
}) })
} }

@ -36,26 +36,28 @@ where
``` ```
"#] "#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>> fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a where
S: Stream<Item = Result<U, E>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `scan` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_error = None; let mut found_error = None;
let out = <T as Sum<U>>::sum(stream let out = <T as Sum<U>>::sum(stream.scan((), |(), elem| {
.scan((), |_, elem| { match elem {
match elem { Ok(elem) => Some(elem),
Ok(elem) => Some(elem), Err(err) => {
Err(err) => { found_error = Some(err);
found_error = Some(err); // Stop processing the stream on error
// Stop processing the stream on error None
None
}
} }
})).await; }
}))
.await;
match found_error { match found_error {
Some(err) => Err(err), Some(err) => Err(err),
None => Ok(out) None => Ok(out),
} }
}) })
} }

@ -8,6 +8,6 @@ impl FromStream<()> for () {
fn from_stream<'a, S: IntoStream<Item = ()> + 'a>( fn from_stream<'a, S: IntoStream<Item = ()> + 'a>(
stream: S, stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> { ) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
Box::pin(stream.into_stream().for_each(|_| ())) Box::pin(stream.into_stream().for_each(drop))
} }
} }

@ -52,6 +52,14 @@ pub fn random(n: u32) -> u32 {
}) })
} }
/// Returns given argument without changes.
#[allow(dead_code)]
#[doc(hidden)]
#[inline(always)]
pub(crate) fn identity<T>(arg: T) -> T {
arg
}
/// Add additional context to errors /// Add additional context to errors
pub(crate) trait Context { pub(crate) trait Context {
fn context(self, message: impl Fn() -> String) -> Self; fn context(self, message: impl Fn() -> String) -> Self;

Loading…
Cancel
Save