Add Stream::sum() and Stream::product() implementations

These are the stream equivalents to `std::iter::Iterator::sum()` and
`std::iter::Iterator::product()`.

Note that this changeset tweaks the `Stream::Sum` and `Stream::Product`
traits a little: rather than returning a generic future `F`, they return
a pinned, boxed, `Future` trait object now. This is in line with other
traits that return a future, e.g. `FromStream`.
This commit is contained in:
Kyle Tomsic 2019-10-20 10:05:55 -04:00
parent ec23632f3e
commit e26eb7a719
9 changed files with 476 additions and 7 deletions

View file

@ -7,3 +7,8 @@ mod from_stream;
#[doc(inline)]
pub use std::option::Option;
cfg_unstable! {
mod product;
mod sum;
}

66
src/option/product.rs Normal file
View file

@ -0,0 +1,66 @@
use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{Stream, Product};
impl<T, U> Product<Option<U>> for Option<T>
where
T: Product<U>,
{
#[doc = r#"
Takes each element in the `Stream`: if it is a `None`, no further
elements are taken, and the `None` is returned. Should no `None` occur,
the product of all elements is returned.
# Examples
This multiplies every integer in a vector, rejecting the product if a negative element is
encountered:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
let prod: Option<i32> = v.map(|x|
if x < 0 {
None
} else {
Some(x)
}).product().await;
assert_eq!(prod, Some(8));
#
# }) }
```
"#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_none = false;
let out = <T as Product<U>>::product(stream
.scan((), |_, elem| {
match elem {
Some(elem) => Some(elem),
None => {
found_none = true;
// Stop processing the stream on error
None
}
}
})).await;
if found_none {
None
} else {
Some(out)
}
})
}
}

63
src/option/sum.rs Normal file
View file

@ -0,0 +1,63 @@
use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{Stream, Sum};
impl<T, U> Sum<Option<U>> for Option<T>
where
T: Sum<U>,
{
#[doc = r#"
Takes each element in the `Iterator`: if it is a `None`, no further
elements are taken, and the `None` is returned. Should no `None` occur,
the sum of all elements is returned.
# Examples
This sums up the position of the character 'a' in a vector of strings,
if a word did not have the character 'a' the operation returns `None`:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let words: VecDeque<_> = vec!["have", "a", "great", "day"]
.into_iter()
.collect();
let total: Option<usize> = words.map(|w| w.find('a')).sum().await;
assert_eq!(total, Some(5));
#
# }) }
```
"#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_none = false;
let out = <T as Sum<U>>::sum(stream
.scan((), |_, elem| {
match elem {
Some(elem) => Some(elem),
None => {
found_none = true;
// Stop processing the stream on error
None
}
}
})).await;
if found_none {
None
} else {
Some(out)
}
})
}
}

View file

@ -7,3 +7,8 @@ mod from_stream;
#[doc(inline)]
pub use std::result::Result;
cfg_unstable! {
mod product;
mod sum;
}

64
src/result/product.rs Normal file
View file

@ -0,0 +1,64 @@
use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{Stream, Product};
impl<T, U, E> Product<Result<U, E>> for Result<T, E>
where
T: Product<U>,
{
#[doc = r#"
Takes each element in the `Stream`: if it is an `Err`, no further
elements are taken, and the `Err` is returned. Should no `Err` occur,
the product of all elements is returned.
# Examples
This multiplies every integer in a vector, rejecting the product if a negative element is
encountered:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
}).product().await;
assert_eq!(res, Ok(8));
#
# }) }
```
"#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_error = None;
let out = <T as Product<U>>::product(stream
.scan((), |_, elem| {
match elem {
Ok(elem) => Some(elem),
Err(err) => {
found_error = Some(err);
// Stop processing the stream on error
None
}
}
})).await;
match found_error {
Some(err) => Err(err),
None => Ok(out)
}
})
}
}

64
src/result/sum.rs Normal file
View file

@ -0,0 +1,64 @@
use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{Stream, Sum};
impl<T, U, E> Sum<Result<U, E>> for Result<T, E>
where
T: Sum<U>,
{
#[doc = r#"
Takes each element in the `Stream`: if it is an `Err`, no further
elements are taken, and the `Err` is returned. Should no `Err` occur,
the sum of all elements is returned.
# Examples
This sums up every integer in a vector, rejecting the sum if a negative
element is encountered:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let v: VecDeque<_> = vec![1, 2].into_iter().collect();
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
}).sum().await;
assert_eq!(res, Ok(3));
#
# }) }
```
"#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_error = None;
let out = <T as Sum<U>>::sum(stream
.scan((), |_, elem| {
match elem {
Ok(elem) => Some(elem),
Err(err) => {
found_error = Some(err);
// Stop processing the stream on error
None
}
}
})).await;
match found_error {
Some(err) => Err(err),
None => Ok(out)
}
})
}
}

View file

@ -1,7 +1,9 @@
use std::pin::Pin;
use crate::future::Future;
use crate::stream::Stream;
/// Trait to represent types that can be created by productming up a stream.
/// Trait to represent types that can be created by multiplying the elements of a stream.
///
/// This trait is used to implement the [`product`] method on streams. Types which
/// implement the trait can be generated by the [`product`] method. Like
@ -16,8 +18,62 @@ use crate::stream::Stream;
pub trait Product<A = Self>: Sized {
/// Method which takes a stream and generates `Self` from the elements by
/// multiplying the items.
fn product<S, F>(stream: S) -> F
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
where
S: Stream<Item = A>,
F: Future<Output = Self>;
S: Stream<Item = A> + 'a;
}
use core::ops::Mul;
use core::num::Wrapping;
use crate::stream::stream::StreamExt;
macro_rules! integer_product {
(@impls $one: expr, $($a:ty)*) => ($(
impl Product for $a {
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
where
S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold($one, Mul::mul).await } )
}
}
impl<'a> Product<&'a $a> for $a {
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
where
S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold($one, Mul::mul).await } )
}
}
)*);
($($a:ty)*) => (
integer_product!(@impls 1, $($a)*);
integer_product!(@impls Wrapping(1), $(Wrapping<$a>)*);
);
}
macro_rules! float_product {
($($a:ty)*) => ($(
impl Product for $a {
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
where S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
}
}
impl<'a> Product<&'a $a> for $a {
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'b>>
where S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
}
}
)*);
($($a:ty)*) => (
float_product!($($a)*);
float_product!($(Wrapping<$a>)*);
);
}
integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
float_product!{ f32 f64 }

View file

@ -96,6 +96,7 @@ cfg_unstable! {
use crate::future::Future;
use crate::stream::FromStream;
use crate::stream::{Product, Sum};
pub use merge::Merge;
@ -1536,6 +1537,95 @@ extension_trait! {
{
LtFuture::new(self, other)
}
#[doc = r#"
Sums the elements of an iterator.
Takes each element, adds them together, and returns the result.
An empty iterator returns the zero value of the type.
# Panics
When calling `sum()` and a primitive integer type is being returned, this
method will panic if the computation overflows and debug assertions are
enabled.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect();
let sum: u8 = s.sum().await;
assert_eq!(sum, 10);
#
# }) }
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn sum<'a, S>(
self,
) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>]
where
Self: Sized + Stream<Item = S> + 'a,
S: Sum,
{
Sum::sum(self)
}
#[doc = r#"
Iterates over the entire iterator, multiplying all the elements
An empty iterator returns the one value of the type.
# Panics
When calling `product()` and a primitive integer type is being returned,
method will panic if the computation overflows and debug assertions are
enabled.
# Examples
This example calculates the factorial of n (i.e. the product of the numbers from 1 to
n, inclusive):
```
# fn main() { async_std::task::block_on(async {
#
async fn factorial(n: u32) -> u32 {
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<_> = (1..=n).collect();
s.product().await
}
assert_eq!(factorial(0).await, 1);
assert_eq!(factorial(1).await, 1);
assert_eq!(factorial(5).await, 120);
#
# }) }
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn product<'a, P>(
self,
) -> impl Future<Output = P> + 'a [Pin<Box<dyn Future<Output = P> + 'a>>]
where
Self: Sized + Stream<Item = P> + 'a,
P: Product,
{
Product::product(self)
}
}
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {

View file

@ -1,3 +1,5 @@
use std::pin::Pin;
use crate::future::Future;
use crate::stream::Stream;
@ -16,8 +18,62 @@ use crate::stream::Stream;
pub trait Sum<A = Self>: Sized {
/// Method which takes a stream and generates `Self` from the elements by
/// "summing up" the items.
fn sum<S, F>(stream: S) -> F
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
where
S: Stream<Item = A>,
F: Future<Output = Self>;
S: Stream<Item = A> + 'a;
}
use core::ops::Add;
use core::num::Wrapping;
use crate::stream::stream::StreamExt;
macro_rules! integer_sum {
(@impls $zero: expr, $($a:ty)*) => ($(
impl Sum for $a {
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
where
S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold($zero, Add::add).await } )
}
}
impl<'a> Sum<&'a $a> for $a {
fn sum<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
where
S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold($zero, Add::add).await } )
}
}
)*);
($($a:ty)*) => (
integer_sum!(@impls 0, $($a)*);
integer_sum!(@impls Wrapping(0), $(Wrapping<$a>)*);
);
}
macro_rules! float_sum {
($($a:ty)*) => ($(
impl Sum for $a {
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
where S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } )
}
}
impl<'a> Sum<&'a $a> for $a {
fn sum<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
where S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } )
}
}
)*);
($($a:ty)*) => (
float_sum!(@impls 0.0, $($a)*);
float_sum!(@impls Wrapping(0.0), $(Wrapping<$a>)*);
);
}
integer_sum!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
float_sum!{ f32 f64 }