From 5b720ab1e26cf644f55f953522e34df736be7424 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 11 Sep 2019 09:54:25 +0300 Subject: [PATCH] adds stream::fold combinator --- src/stream/stream/fold.rs | 61 +++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 30 +++++++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 src/stream/stream/fold.rs diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs new file mode 100644 index 0000000..0e3dd67 --- /dev/null +++ b/src/stream/stream/fold.rs @@ -0,0 +1,61 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::future::Future; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct FoldFuture { + stream: S, + f: F, + acc: Option, + __t: PhantomData, +} + +impl FoldFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_unpinned!(acc: Option); + + pub(super) fn new(stream: S, init: B, f: F) -> Self { + FoldFuture { + stream, + f, + acc: Some(init), + __t: PhantomData, + } + } +} + +impl Future for FoldFuture +where + S: futures_core::stream::Stream + Unpin + Sized, + F: FnMut(B, S::Item) -> B, +{ + type Output = B; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(v) => { + cx.waker().wake_by_ref(); + let old = self + .as_mut() + .acc() + .take() + .expect("FoldFuture should never contain None"); + let new = (self.as_mut().f())(old, v); + *self.as_mut().acc() = Some(new); + Poll::Pending + } + None => Poll::Ready( + self.as_mut() + .acc() + .take() + .expect("FoldFuture should never contain None"), + ), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index eddafe2..7825396 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -25,6 +25,7 @@ mod all; mod any; mod filter_map; mod find_map; +mod fold; mod min_by; mod next; mod nth; @@ -36,6 +37,7 @@ use all::AllFuture; use any::AnyFuture; use filter_map::FilterMap; use find_map::FindMapFuture; +use fold::FoldFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; @@ -344,6 +346,34 @@ pub trait Stream { FindMapFuture::new(self, f) } + /// A combinator that applies a function to every element in a stream + /// producing a single, final value. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let sum = s.fold(0, |acc, x| acc + x).await; + /// + /// assert_eq!(sum, 6); + /// # + /// # }) } + /// ``` + fn fold(self, init: B, f: F) -> FoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + FoldFuture::new(self, init, f) + } + /// Tests if any element of the stream matches a predicate. /// /// `any()` takes a closure that returns `true` or `false`. It applies