diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index 982c2c7..18fcde3 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -1,6 +1,8 @@ mod nth_back; +mod rfind; use nth_back::NthBackFuture; +use rfind::RFindFuture; extension_trait! { use crate::stream::Stream; @@ -53,5 +55,17 @@ extension_trait! { { NthBackFuture::new(self, n) } + + fn rfind

( + &mut self, + p: P, + ) -> impl Future> + '_ [RFindFuture<'_, Self, P>] + where + Self: Unpin + Sized, + P: FnMut(&Self::Item) -> bool, + { + RFindFuture::new(self, p) + } + } } diff --git a/src/stream/double_ended/rfind.rs b/src/stream/double_ended/rfind.rs new file mode 100644 index 0000000..b05e14e --- /dev/null +++ b/src/stream/double_ended/rfind.rs @@ -0,0 +1,39 @@ +use std::task::{Context, Poll}; +use std::future::Future; +use std::pin::Pin; + +use crate::stream::DoubleEndedStream; + +pub struct RFindFuture<'a, S, P> { + stream: &'a mut S, + p: P, +} + +impl<'a, S, P> RFindFuture<'a, S, P> { + pub(super) fn new(stream: &'a mut S, p: P) -> Self { + RFindFuture { stream, p } + } +} + +impl Unpin for RFindFuture<'_, S, P> {} + +impl<'a, S, P> Future for RFindFuture<'a, S, P> +where + S: DoubleEndedStream + Unpin + Sized, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx)); + + match item { + Some(v) if (&mut self.p)(&v) => Poll::Ready(Some(v)), + Some(_) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + None => Poll::Ready(None), + } + } +}