diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index 18fcde3..a2f9d26 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -1,8 +1,10 @@ mod nth_back; mod rfind; +mod rfold; use nth_back::NthBackFuture; use rfind::RFindFuture; +use rfold::RFoldFuture; extension_trait! { use crate::stream::Stream; @@ -67,5 +69,16 @@ extension_trait! { RFindFuture::new(self, p) } + fn rfold( + self, + accum: B, + f: F, + ) -> impl Future> [RFoldFuture] + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + RFoldFuture::new(self, accum, f) + } } } diff --git a/src/stream/double_ended/rfold.rs b/src/stream/double_ended/rfold.rs new file mode 100644 index 0000000..4df7d9f --- /dev/null +++ b/src/stream/double_ended/rfold.rs @@ -0,0 +1,50 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +use crate::stream::DoubleEndedStream; + +pin_project! { + pub struct RFoldFuture { + #[pin] + stream: S, + f: F, + acc: Option, + } +} + +impl RFoldFuture { + pub(super) fn new(stream: S, init: B, f: F) -> Self { + RFoldFuture { + stream, + f, + acc: Some(init), + } + } +} + +impl Future for RFoldFuture +where + S: DoubleEndedStream + Sized, + F: FnMut(B, S::Item) -> B, +{ + type Output = B; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); + + match next { + Some(v) => { + let old = this.acc.take().unwrap(); + let new = (this.f)(old, v); + *this.acc = Some(new); + } + None => return Poll::Ready(this.acc.take().unwrap()), + } + } + } +}