From 55194edbf736c5909b0f5d03d7f36379813f1d7e Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Wed, 20 Nov 2019 07:45:40 +0000 Subject: [PATCH] Add try_rfold --- src/stream/double_ended/mod.rs | 43 +++++++++++++++++++++ src/stream/double_ended/try_rfold.rs | 56 ++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 src/stream/double_ended/try_rfold.rs diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index 2491775..c2372d2 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -1,10 +1,12 @@ mod nth_back; mod rfind; mod rfold; +mod try_rfold; use nth_back::NthBackFuture; use rfind::RFindFuture; use rfold::RFoldFuture; +use try_rfold::TryRFoldFuture; extension_trait! { use crate::stream::Stream; @@ -121,5 +123,46 @@ extension_trait! { { RFoldFuture::new(self, accum, f) } + + #[doc = r#" + A combinator that applies a function as long as it returns successfully, producing a single, final value. + Immediately returns the error when the function returns unsuccessfully. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::Sample; + use async_std::stream::double_ended::DoubleEndedStreamExt; + + let s = Sample::from(vec![1, 2, 3, 4, 5]); + let sum = s.try_rfold(0, |acc, v| { + if (acc+v) % 2 == 1 { + Ok(v+3) + } else { + Err("fail") + } + }).await; + + assert_eq!(sum, Err("fail")); + # + # }) } + ``` + "#] + fn try_rfold( + self, + accum: B, + f: F, + ) -> impl Future> [TryRFoldFuture] + where + Self: Sized, + F: FnMut(B, Self::Item) -> Result, + { + TryRFoldFuture::new(self, accum, f) + } + } } diff --git a/src/stream/double_ended/try_rfold.rs b/src/stream/double_ended/try_rfold.rs new file mode 100644 index 0000000..4c97cea --- /dev/null +++ b/src/stream/double_ended/try_rfold.rs @@ -0,0 +1,56 @@ +use crate::future::Future; +use std::pin::Pin; +use crate::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +use crate::stream::DoubleEndedStream; + +pin_project! { +#[doc(hidden)] +#[allow(missing_debug_implementations)] + pub struct TryRFoldFuture { + #[pin] + stream: S, + f: F, + acc: Option, + } +} + +impl TryRFoldFuture { + pub(super) fn new(stream: S, init: T, f: F) -> Self { + TryRFoldFuture { + stream, + f, + acc: Some(init), + } + } +} + +impl Future for TryRFoldFuture +where + S: DoubleEndedStream + Unpin, + F: FnMut(T, S::Item) -> Result, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); + + match next { + Some(v) => { + let old = this.acc.take().unwrap(); + let new = (this.f)(old, v); + + match new { + Ok(o) => *this.acc = Some(o), + Err(e) => return Poll::Ready(Err(e)), + } + } + None => return Poll::Ready(Ok(this.acc.take().unwrap())), + } + } + } +}