diff --git a/src/stream/double_ended_stream.rs b/src/stream/double_ended_stream.rs deleted file mode 100644 index 129bb1cd..00000000 --- a/src/stream/double_ended_stream.rs +++ /dev/null @@ -1,24 +0,0 @@ -use crate::stream::Stream; - -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// A stream able to yield elements from both ends. -/// -/// Something that implements `DoubleEndedStream` has one extra capability -/// over something that implements [`Stream`]: the ability to also take -/// `Item`s from the back, as well as the front. -/// -/// [`Stream`]: trait.Stream.html -#[cfg(feature = "unstable")] -#[cfg_attr(feature = "docs", doc(cfg(unstable)))] -pub trait DoubleEndedStream: Stream { - /// Removes and returns an element from the end of the stream. - /// - /// Returns `None` when there are no more elements. - /// - /// The [trait-level] docs contain more details. - /// - /// [trait-level]: trait.DoubleEndedStream.html - fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; -} diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs new file mode 100644 index 00000000..dc2a45c9 --- /dev/null +++ b/src/stream/double_ended_stream/mod.rs @@ -0,0 +1,241 @@ +use crate::stream::Stream; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +mod next_back; +mod nth_back; +mod rfind; +mod rfold; +mod try_rfold; + +use next_back::NextBackFuture; +use nth_back::NthBackFuture; +use rfind::RFindFuture; +use rfold::RFoldFuture; +use try_rfold::TryRFoldFuture; + +/// A stream able to yield elements from both ends. +/// +/// Something that implements `DoubleEndedStream` has one extra capability +/// over something that implements [`Stream`]: the ability to also take +/// `Item`s from the back, as well as the front. +/// +/// [`Stream`]: trait.Stream.html +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub trait DoubleEndedStream: Stream { + #[doc = r#" + Attempts to receive the next item from the back of the stream. + + There are several possible return values: + + * `Poll::Pending` means this stream's next_back value is not ready yet. + * `Poll::Ready(None)` means this stream has been exhausted. + * `Poll::Ready(Some(item))` means `item` was received out of the stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::pin::Pin; + + use async_std::prelude::*; + use async_std::stream; + use async_std::task::{Context, Poll}; + + fn increment( + s: impl DoubleEndedStream + Unpin, + ) -> impl DoubleEndedStream + Unpin { + struct Increment(S); + + impl + Unpin> Stream for Increment { + type Item = S::Item; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.0).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + } + } + } + + impl + Unpin> DoubleEndedStream for Increment { + fn poll_next_back( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.0).poll_next_back(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + } + } + } + + Increment(s) + } + + let mut s = increment(stream::once(7)); + + assert_eq!(s.next_back().await, Some(8)); + assert_eq!(s.next_back().await, None); + # + # }) } + ``` + "#] + fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + #[doc = r#" + Advances the stream and returns the next value. + + Returns [`None`] when iteration is finished. Individual stream implementations may + choose to resume iteration, and so calling `next()` again may or may not eventually + start returning more values. + + [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let mut s = double_ended_stream::from_iter(vec![7u8]); + + assert_eq!(s.next_back().await, Some(7)); + assert_eq!(s.next_back().await, None); + # + # }) } + ``` + "#] + fn next_back(&mut self) -> NextBackFuture<'_, Self> + where + Self: Unpin, + { + NextBackFuture { stream: self } + } + + #[doc = r#" + Returns the nth element from the back of the stream. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.nth_back(1).await; + assert_eq!(second, Some(4)); + # + # }) } + ``` + "#] + fn nth_back(&mut self, n: usize) -> NthBackFuture<'_, Self> + where + Self: Unpin + Sized, + { + NthBackFuture::new(self, n) + } + + #[doc = r#" + Returns the the frist element from the right that matches the predicate. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.rfind(|v| v % 2 == 0).await; + assert_eq!(second, Some(4)); + # + # }) } + ``` + "#] + fn rfind

(&mut self, p: P) -> RFindFuture<'_, Self, P> + where + Self: Unpin + Sized, + P: FnMut(&Self::Item) -> bool, + { + RFindFuture::new(self, p) + } + + #[doc = r#" + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.rfold(0, |acc, v| v + acc).await; + + assert_eq!(second, 15); + # + # }) } + ``` + "#] + fn rfold(self, accum: B, f: F) -> RFoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + RFoldFuture::new(self, accum, f) + } + + #[doc = r#" + A combinator that applies a function as long as it returns successfully, producing a single, final value. + Immediately returns the error when the function returns unsuccessfully. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + let sum = s.try_rfold(0, |acc, v| { + if (acc+v) % 2 == 1 { + Ok(v+3) + } else { + Err("fail") + } + }).await; + + assert_eq!(sum, Err("fail")); + # + # }) } + ``` + "#] + fn try_rfold(self, accum: B, f: F) -> TryRFoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> Result, + { + TryRFoldFuture::new(self, accum, f) + } +} diff --git a/src/stream/double_ended_stream/next_back.rs b/src/stream/double_ended_stream/next_back.rs new file mode 100644 index 00000000..aa642d09 --- /dev/null +++ b/src/stream/double_ended_stream/next_back.rs @@ -0,0 +1,19 @@ +use std::pin::Pin; +use std::future::Future; + +use crate::stream::DoubleEndedStream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct NextBackFuture<'a, T: Unpin + ?Sized> { + pub(crate) stream: &'a mut T, +} + +impl Future for NextBackFuture<'_, T> { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut *self.stream).poll_next_back(cx) + } +} diff --git a/src/stream/double_ended_stream/nth_back.rs b/src/stream/double_ended_stream/nth_back.rs new file mode 100644 index 00000000..e32a28fd --- /dev/null +++ b/src/stream/double_ended_stream/nth_back.rs @@ -0,0 +1,41 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::stream::DoubleEndedStream; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct NthBackFuture<'a, S> { + stream: &'a mut S, + n: usize, +} + +impl<'a, S> NthBackFuture<'a, S> { + pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { + NthBackFuture { stream, n } + } +} + +impl<'a, S> Future for NthBackFuture<'a, S> +where + S: DoubleEndedStream + Sized + Unpin, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx)); + match next { + Some(v) => match self.n { + 0 => Poll::Ready(Some(v)), + _ => { + self.n -= 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} + diff --git a/src/stream/double_ended_stream/rfind.rs b/src/stream/double_ended_stream/rfind.rs new file mode 100644 index 00000000..94726934 --- /dev/null +++ b/src/stream/double_ended_stream/rfind.rs @@ -0,0 +1,41 @@ +use std::task::{Context, Poll}; +use std::future::Future; +use std::pin::Pin; + +use crate::stream::DoubleEndedStream; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct RFindFuture<'a, S, P> { + stream: &'a mut S, + p: P, +} + +impl<'a, S, P> RFindFuture<'a, S, P> { + pub(super) fn new(stream: &'a mut S, p: P) -> Self { + RFindFuture { stream, p } + } +} + +impl Unpin for RFindFuture<'_, S, P> {} + +impl<'a, S, P> Future for RFindFuture<'a, S, P> +where + S: DoubleEndedStream + Unpin + Sized, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx)); + + match item { + Some(v) if (&mut self.p)(&v) => Poll::Ready(Some(v)), + Some(_) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/double_ended_stream/rfold.rs b/src/stream/double_ended_stream/rfold.rs new file mode 100644 index 00000000..9002f8d9 --- /dev/null +++ b/src/stream/double_ended_stream/rfold.rs @@ -0,0 +1,52 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +use crate::stream::DoubleEndedStream; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct RFoldFuture { + #[pin] + stream: S, + f: F, + acc: Option, + } +} + +impl RFoldFuture { + pub(super) fn new(stream: S, init: B, f: F) -> Self { + RFoldFuture { + stream, + f, + acc: Some(init), + } + } +} + +impl Future for RFoldFuture +where + S: DoubleEndedStream + Sized, + F: FnMut(B, S::Item) -> B, +{ + type Output = B; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); + + match next { + Some(v) => { + let old = this.acc.take().unwrap(); + let new = (this.f)(old, v); + *this.acc = Some(new); + } + None => return Poll::Ready(this.acc.take().unwrap()), + } + } + } +} diff --git a/src/stream/double_ended_stream/try_rfold.rs b/src/stream/double_ended_stream/try_rfold.rs new file mode 100644 index 00000000..9e6295a7 --- /dev/null +++ b/src/stream/double_ended_stream/try_rfold.rs @@ -0,0 +1,56 @@ +use crate::future::Future; +use std::pin::Pin; +use crate::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +use crate::stream::DoubleEndedStream; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct TryRFoldFuture { + #[pin] + stream: S, + f: F, + acc: Option, + } +} + +impl TryRFoldFuture { + pub(super) fn new(stream: S, init: T, f: F) -> Self { + TryRFoldFuture { + stream, + f, + acc: Some(init), + } + } +} + +impl Future for TryRFoldFuture +where + S: DoubleEndedStream + Unpin, + F: FnMut(T, S::Item) -> Result, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); + + match next { + Some(v) => { + let old = this.acc.take().unwrap(); + let new = (this.f)(old, v); + + match new { + Ok(o) => *this.acc = Some(o), + Err(e) => return Poll::Ready(Err(e)), + } + } + None => return Poll::Ready(Ok(this.acc.take().unwrap())), + } + } + } +} diff --git a/src/stream/from_iter.rs b/src/stream/from_iter.rs index d7a31d6c..705d1504 100644 --- a/src/stream/from_iter.rs +++ b/src/stream/from_iter.rs @@ -3,6 +3,8 @@ use std::pin::Pin; use pin_project_lite::pin_project; use crate::stream::Stream; +#[cfg(feature = "unstable")] +use crate::stream::double_ended_stream::DoubleEndedStream; use crate::task::{Context, Poll}; pin_project! { @@ -51,3 +53,10 @@ impl Stream for FromIter { Poll::Ready(self.iter.next()) } } + +#[cfg(feature = "unstable")] +impl DoubleEndedStream for FromIter { + fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.iter.next_back()) + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index d8b96ec2..ebce3a36 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -308,7 +308,7 @@ pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; pub use stream::*; -pub(crate) mod stream; +pub mod stream; mod empty; mod from_fn; @@ -318,7 +318,8 @@ mod repeat; mod repeat_with; cfg_unstable! { - mod double_ended_stream; + #[doc(hidden)] + pub mod double_ended_stream; mod exact_size_stream; mod extend; mod from_stream; diff --git a/src/stream/once.rs b/src/stream/once.rs index e4ac682c..939722d9 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -5,6 +5,9 @@ use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; +#[cfg(feature = "unstable")] +use crate::stream::DoubleEndedStream; + /// Creates a stream that yields a single item. /// /// # Examples @@ -46,3 +49,10 @@ impl Stream for Once { Poll::Ready(self.project().value.take()) } } + +#[cfg(feature = "unstable")] +impl DoubleEndedStream for Once { + fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.project().value.take()) + } +}