From fa288931c681a4dd5210fc497f73068a3457fd3c Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 15 Nov 2019 21:15:50 +0100 Subject: [PATCH 01/18] Skeleton for DoubleEndedStreamExt trait --- src/stream/double_ended/mod.rs | 22 ++++++++++++++++++++++ src/stream/mod.rs | 1 + 2 files changed, 23 insertions(+) create mode 100644 src/stream/double_ended/mod.rs diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs new file mode 100644 index 00000000..101ccbd3 --- /dev/null +++ b/src/stream/double_ended/mod.rs @@ -0,0 +1,22 @@ +extension_trait! { + use crate::stream::Stream; + + use std::pin::Pin; + use std::task::{Context, Poll}; + + #[doc = r#" + Something fancy + "#] + pub trait DoubleEndedStream { + type Item; + + fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + } + + #[doc = r#" + Something else + "#] + pub trait DoubleEndedStreamExt: crate::stream::DoubleEndedStream { + } +} + diff --git a/src/stream/mod.rs b/src/stream/mod.rs index d8b96ec2..e15d0818 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -318,6 +318,7 @@ mod repeat; mod repeat_with; cfg_unstable! { + mod double_ended; mod double_ended_stream; mod exact_size_stream; mod extend; From d0ef48c75354445b7f6e89e0c34d587859140158 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 15 Nov 2019 22:07:25 +0100 Subject: [PATCH 02/18] Sketch out nth_back --- src/stream/double_ended/mod.rs | 37 ++++++++++++++++++++++++++- src/stream/double_ended/nth_back.rs | 39 +++++++++++++++++++++++++++++ src/stream/mod.rs | 2 +- 3 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 src/stream/double_ended/nth_back.rs diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index 101ccbd3..982c2c71 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -1,9 +1,14 @@ +mod nth_back; + +use nth_back::NthBackFuture; + extension_trait! { use crate::stream::Stream; use std::pin::Pin; use std::task::{Context, Poll}; + #[doc = r#" Something fancy "#] @@ -17,6 +22,36 @@ extension_trait! { Something else "#] pub trait DoubleEndedStreamExt: crate::stream::DoubleEndedStream { + + #[doc = r#" + Returns the nth element from the back of the stream. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended::DoubleEndedStreamExt; + use async_std::stream; + + let mut s = stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.nth_back(1).await; + assert_eq!(second, Some(4)); + # + # }) } + ``` + "#] + fn nth_back( + &mut self, + n: usize, + ) -> impl Future> + '_ [NthBackFuture<'_, Self>] + where + Self: Unpin + Sized, + { + NthBackFuture::new(self, n) + } } } - diff --git a/src/stream/double_ended/nth_back.rs b/src/stream/double_ended/nth_back.rs new file mode 100644 index 00000000..e318e79a --- /dev/null +++ b/src/stream/double_ended/nth_back.rs @@ -0,0 +1,39 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::future::Future; + +use crate::stream::DoubleEndedStream; + +pub struct NthBackFuture<'a, S> { + stream: &'a mut S, + n: usize, +} + +impl<'a, S> NthBackFuture<'a, S> { + pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { + NthBackFuture { stream, n } + } +} + +impl<'a, S> Future for NthBackFuture<'a, S> +where + S: DoubleEndedStream + Sized + Unpin, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx)); + match next { + Some(v) => match self.n { + 0 => Poll::Ready(Some(v)), + _ => { + self.n -= 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} + diff --git a/src/stream/mod.rs b/src/stream/mod.rs index e15d0818..318733b2 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -318,7 +318,7 @@ mod repeat; mod repeat_with; cfg_unstable! { - mod double_ended; + pub mod double_ended; mod double_ended_stream; mod exact_size_stream; mod extend; From 78bafbb88f679749434e020f8163acd4285a5ac6 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 15 Nov 2019 22:15:28 +0100 Subject: [PATCH 03/18] Sketch outch rfind --- src/stream/double_ended/mod.rs | 14 ++++++++++++ src/stream/double_ended/rfind.rs | 39 ++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) create mode 100644 src/stream/double_ended/rfind.rs diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index 982c2c71..18fcde3b 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -1,6 +1,8 @@ mod nth_back; +mod rfind; use nth_back::NthBackFuture; +use rfind::RFindFuture; extension_trait! { use crate::stream::Stream; @@ -53,5 +55,17 @@ extension_trait! { { NthBackFuture::new(self, n) } + + fn rfind

( + &mut self, + p: P, + ) -> impl Future> + '_ [RFindFuture<'_, Self, P>] + where + Self: Unpin + Sized, + P: FnMut(&Self::Item) -> bool, + { + RFindFuture::new(self, p) + } + } } diff --git a/src/stream/double_ended/rfind.rs b/src/stream/double_ended/rfind.rs new file mode 100644 index 00000000..b05e14ee --- /dev/null +++ b/src/stream/double_ended/rfind.rs @@ -0,0 +1,39 @@ +use std::task::{Context, Poll}; +use std::future::Future; +use std::pin::Pin; + +use crate::stream::DoubleEndedStream; + +pub struct RFindFuture<'a, S, P> { + stream: &'a mut S, + p: P, +} + +impl<'a, S, P> RFindFuture<'a, S, P> { + pub(super) fn new(stream: &'a mut S, p: P) -> Self { + RFindFuture { stream, p } + } +} + +impl Unpin for RFindFuture<'_, S, P> {} + +impl<'a, S, P> Future for RFindFuture<'a, S, P> +where + S: DoubleEndedStream + Unpin + Sized, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx)); + + match item { + Some(v) if (&mut self.p)(&v) => Poll::Ready(Some(v)), + Some(_) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + None => Poll::Ready(None), + } + } +} From cc493df433c1ca6bc16a3cd376e503696e26f91c Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 15 Nov 2019 22:29:00 +0100 Subject: [PATCH 04/18] Sketch out rfold --- src/stream/double_ended/mod.rs | 13 +++++++++ src/stream/double_ended/rfold.rs | 50 ++++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 src/stream/double_ended/rfold.rs diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index 18fcde3b..a2f9d266 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -1,8 +1,10 @@ mod nth_back; mod rfind; +mod rfold; use nth_back::NthBackFuture; use rfind::RFindFuture; +use rfold::RFoldFuture; extension_trait! { use crate::stream::Stream; @@ -67,5 +69,16 @@ extension_trait! { RFindFuture::new(self, p) } + fn rfold( + self, + accum: B, + f: F, + ) -> impl Future> [RFoldFuture] + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + RFoldFuture::new(self, accum, f) + } } } diff --git a/src/stream/double_ended/rfold.rs b/src/stream/double_ended/rfold.rs new file mode 100644 index 00000000..4df7d9fb --- /dev/null +++ b/src/stream/double_ended/rfold.rs @@ -0,0 +1,50 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +use crate::stream::DoubleEndedStream; + +pin_project! { + pub struct RFoldFuture { + #[pin] + stream: S, + f: F, + acc: Option, + } +} + +impl RFoldFuture { + pub(super) fn new(stream: S, init: B, f: F) -> Self { + RFoldFuture { + stream, + f, + acc: Some(init), + } + } +} + +impl Future for RFoldFuture +where + S: DoubleEndedStream + Sized, + F: FnMut(B, S::Item) -> B, +{ + type Output = B; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); + + match next { + Some(v) => { + let old = this.acc.take().unwrap(); + let new = (this.f)(old, v); + *this.acc = Some(new); + } + None => return Poll::Ready(this.acc.take().unwrap()), + } + } + } +} From aabfefd0154a35173b1a705da7fdf06410c0f318 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sat, 16 Nov 2019 12:43:05 +0000 Subject: [PATCH 05/18] Add a sample implementation of a double ended stream --- src/stream/double_ended_stream.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/stream/double_ended_stream.rs b/src/stream/double_ended_stream.rs index 129bb1cd..95e47633 100644 --- a/src/stream/double_ended_stream.rs +++ b/src/stream/double_ended_stream.rs @@ -22,3 +22,31 @@ pub trait DoubleEndedStream: Stream { /// [trait-level]: trait.DoubleEndedStream.html fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; } + +pub struct Sample { + inner: Vec, +} + +impl From> for Sample { + fn from(data: Vec) -> Self { + Sample { inner: data } + } +} + +impl Unpin for Sample {} + +impl Stream for Sample { + type Item = T; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.inner.len() > 0 { + return Poll::Ready(Some(self.inner.remove(0))); + } + return Poll::Ready(None); + } +} + +impl DoubleEndedStream for Sample { + fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.inner.pop()) + } +} From c4b9a7f680070c254ed6f9220ad58b7080a93b54 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Tue, 19 Nov 2019 22:20:36 +0000 Subject: [PATCH 06/18] Add samples for some of the functions --- src/stream/double_ended/mod.rs | 45 +++++++++++++++++++++++++++++-- src/stream/double_ended_stream.rs | 28 ------------------- src/stream/mod.rs | 4 ++- src/stream/sample.rs | 33 +++++++++++++++++++++++ 4 files changed, 79 insertions(+), 31 deletions(-) create mode 100644 src/stream/sample.rs diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index a2f9d266..24917756 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -37,10 +37,10 @@ extension_trait! { ``` # fn main() { async_std::task::block_on(async { # + use async_std::stream::Sample; use async_std::stream::double_ended::DoubleEndedStreamExt; - use async_std::stream; - let mut s = stream::from_iter(vec![1u8, 2, 3, 4, 5]); + let mut s = Sample::from(vec![1u8, 2, 3, 4, 5]); let second = s.nth_back(1).await; assert_eq!(second, Some(4)); @@ -58,6 +58,27 @@ extension_trait! { NthBackFuture::new(self, n) } + #[doc = r#" + Returns the the frist element from the right that matches the predicate. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::Sample; + use async_std::stream::double_ended::DoubleEndedStreamExt; + + let mut s = Sample::from(vec![1u8, 2, 3, 4, 5]); + + let second = s.rfind(|v| v % 2 == 0).await; + assert_eq!(second, Some(4)); + # + # }) } + ``` + "#] fn rfind

( &mut self, p: P, @@ -69,6 +90,26 @@ extension_trait! { RFindFuture::new(self, p) } + #[doc = r#" + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::Sample; + use async_std::stream::double_ended::DoubleEndedStreamExt; + + let s = Sample::from(vec![1, 2, 3, 4, 5]); + + let second = s.rfold(0, |acc, v| v + acc).await; + + assert_eq!(second, 15); + # + # }) } + ``` + "#] fn rfold( self, accum: B, diff --git a/src/stream/double_ended_stream.rs b/src/stream/double_ended_stream.rs index 95e47633..129bb1cd 100644 --- a/src/stream/double_ended_stream.rs +++ b/src/stream/double_ended_stream.rs @@ -22,31 +22,3 @@ pub trait DoubleEndedStream: Stream { /// [trait-level]: trait.DoubleEndedStream.html fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; } - -pub struct Sample { - inner: Vec, -} - -impl From> for Sample { - fn from(data: Vec) -> Self { - Sample { inner: data } - } -} - -impl Unpin for Sample {} - -impl Stream for Sample { - type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.inner.len() > 0 { - return Poll::Ready(Some(self.inner.remove(0))); - } - return Poll::Ready(None); - } -} - -impl DoubleEndedStream for Sample { - fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(self.inner.pop()) - } -} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 318733b2..6ce9aac7 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -307,8 +307,9 @@ pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; pub use stream::*; +pub use crate::stream::sample::Sample; -pub(crate) mod stream; +pub mod stream; mod empty; mod from_fn; @@ -316,6 +317,7 @@ mod from_iter; mod once; mod repeat; mod repeat_with; +mod sample; cfg_unstable! { pub mod double_ended; diff --git a/src/stream/sample.rs b/src/stream/sample.rs new file mode 100644 index 00000000..90caeed1 --- /dev/null +++ b/src/stream/sample.rs @@ -0,0 +1,33 @@ +use crate::stream::Stream; + +use std::pin::Pin; +use std::task::{Context, Poll}; +use crate::stream::DoubleEndedStream; + +pub struct Sample { + inner: Vec, +} + +impl From> for Sample { + fn from(data: Vec) -> Self { + Sample { inner: data } + } +} + +impl Unpin for Sample {} + +impl Stream for Sample { + type Item = T; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.inner.len() > 0 { + return Poll::Ready(Some(self.inner.remove(0))); + } + return Poll::Ready(None); + } +} + +impl DoubleEndedStream for Sample { + fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.inner.pop()) + } +} From 55194edbf736c5909b0f5d03d7f36379813f1d7e Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Wed, 20 Nov 2019 07:45:40 +0000 Subject: [PATCH 07/18] Add try_rfold --- src/stream/double_ended/mod.rs | 43 +++++++++++++++++++++ src/stream/double_ended/try_rfold.rs | 56 ++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 src/stream/double_ended/try_rfold.rs diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index 24917756..c2372d2a 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -1,10 +1,12 @@ mod nth_back; mod rfind; mod rfold; +mod try_rfold; use nth_back::NthBackFuture; use rfind::RFindFuture; use rfold::RFoldFuture; +use try_rfold::TryRFoldFuture; extension_trait! { use crate::stream::Stream; @@ -121,5 +123,46 @@ extension_trait! { { RFoldFuture::new(self, accum, f) } + + #[doc = r#" + A combinator that applies a function as long as it returns successfully, producing a single, final value. + Immediately returns the error when the function returns unsuccessfully. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::Sample; + use async_std::stream::double_ended::DoubleEndedStreamExt; + + let s = Sample::from(vec![1, 2, 3, 4, 5]); + let sum = s.try_rfold(0, |acc, v| { + if (acc+v) % 2 == 1 { + Ok(v+3) + } else { + Err("fail") + } + }).await; + + assert_eq!(sum, Err("fail")); + # + # }) } + ``` + "#] + fn try_rfold( + self, + accum: B, + f: F, + ) -> impl Future> [TryRFoldFuture] + where + Self: Sized, + F: FnMut(B, Self::Item) -> Result, + { + TryRFoldFuture::new(self, accum, f) + } + } } diff --git a/src/stream/double_ended/try_rfold.rs b/src/stream/double_ended/try_rfold.rs new file mode 100644 index 00000000..4c97cea0 --- /dev/null +++ b/src/stream/double_ended/try_rfold.rs @@ -0,0 +1,56 @@ +use crate::future::Future; +use std::pin::Pin; +use crate::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +use crate::stream::DoubleEndedStream; + +pin_project! { +#[doc(hidden)] +#[allow(missing_debug_implementations)] + pub struct TryRFoldFuture { + #[pin] + stream: S, + f: F, + acc: Option, + } +} + +impl TryRFoldFuture { + pub(super) fn new(stream: S, init: T, f: F) -> Self { + TryRFoldFuture { + stream, + f, + acc: Some(init), + } + } +} + +impl Future for TryRFoldFuture +where + S: DoubleEndedStream + Unpin, + F: FnMut(T, S::Item) -> Result, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); + + match next { + Some(v) => { + let old = this.acc.take().unwrap(); + let new = (this.f)(old, v); + + match new { + Ok(o) => *this.acc = Some(o), + Err(e) => return Poll::Ready(Err(e)), + } + } + None => return Poll::Ready(Ok(this.acc.take().unwrap())), + } + } + } +} From ee2f52f3cee2c86aaf8e27fdc22c25b116a70bf3 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Wed, 20 Nov 2019 07:54:05 +0000 Subject: [PATCH 08/18] Add next_back --- src/stream/double_ended/mod.rs | 33 ++++++++++++++++++++++++++++ src/stream/double_ended/next_back.rs | 19 ++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 src/stream/double_ended/next_back.rs diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index c2372d2a..a3dbafd6 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -1,8 +1,10 @@ +mod next_back; mod nth_back; mod rfind; mod rfold; mod try_rfold; +use next_back::NextBackFuture; use nth_back::NthBackFuture; use rfind::RFindFuture; use rfold::RFoldFuture; @@ -28,6 +30,37 @@ extension_trait! { Something else "#] pub trait DoubleEndedStreamExt: crate::stream::DoubleEndedStream { + #[doc = r#" + Advances the stream and returns the next value. + + Returns [`None`] when iteration is finished. Individual stream implementations may + choose to resume iteration, and so calling `next()` again may or may not eventually + start returning more values. + + [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::Sample; + use async_std::stream::double_ended::DoubleEndedStreamExt; + + let mut s = Sample::from(vec![7u8]); + + assert_eq!(s.next().await, Some(7)); + assert_eq!(s.next().await, None); + # + # }) } + ``` + "#] + fn next(&mut self) -> impl Future> + '_ [NextBackFuture<'_, Self>] + where + Self: Unpin, + { + NextBackFuture { stream: self } + } #[doc = r#" Returns the nth element from the back of the stream. diff --git a/src/stream/double_ended/next_back.rs b/src/stream/double_ended/next_back.rs new file mode 100644 index 00000000..aa642d09 --- /dev/null +++ b/src/stream/double_ended/next_back.rs @@ -0,0 +1,19 @@ +use std::pin::Pin; +use std::future::Future; + +use crate::stream::DoubleEndedStream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct NextBackFuture<'a, T: Unpin + ?Sized> { + pub(crate) stream: &'a mut T, +} + +impl Future for NextBackFuture<'_, T> { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut *self.stream).poll_next_back(cx) + } +} From 02aa2f3d2a918d5e50944d8b9c6b2e5620e78bc1 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Wed, 20 Nov 2019 21:48:04 +0000 Subject: [PATCH 09/18] Fix next_back --- src/stream/double_ended/mod.rs | 6 +++--- src/stream/double_ended/nth_back.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index a3dbafd6..ec9e6ae6 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -49,13 +49,13 @@ extension_trait! { let mut s = Sample::from(vec![7u8]); - assert_eq!(s.next().await, Some(7)); - assert_eq!(s.next().await, None); + assert_eq!(s.next_back().await, Some(7)); + assert_eq!(s.next_back().await, None); # # }) } ``` "#] - fn next(&mut self) -> impl Future> + '_ [NextBackFuture<'_, Self>] + fn next_back(&mut self) -> impl Future> + '_ [NextBackFuture<'_, Self>] where Self: Unpin, { diff --git a/src/stream/double_ended/nth_back.rs b/src/stream/double_ended/nth_back.rs index e318e79a..8e1ed637 100644 --- a/src/stream/double_ended/nth_back.rs +++ b/src/stream/double_ended/nth_back.rs @@ -1,6 +1,6 @@ +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use std::future::Future; use crate::stream::DoubleEndedStream; From 94893d29249c970038b9620ea886099e402e751a Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Wed, 20 Nov 2019 22:14:36 +0000 Subject: [PATCH 10/18] Move more of the documentation --- src/stream/double_ended/mod.rs | 73 +++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 2 deletions(-) diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index ec9e6ae6..3267ae58 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -18,16 +18,85 @@ extension_trait! { #[doc = r#" - Something fancy + A stream able to yield elements from both ends. + + Something that implements `DoubleEndedStream` has one extra capability + over something that implements [`Stream`]: the ability to also take + `Item`s from the back, as well as the front. + + It is important to note that both back and forth work on the same range, + and do not cross: iteration is over when they meet in the middle. + + In a similar fashion to the [`Stream`] protocol, once a + `DoubleEndedStream` returns `None` from a `next_back()`, calling it again + may or may not ever return `Some` again. `next()` and `next_back()` are + interchangeable for this purpose. + ``` "#] pub trait DoubleEndedStream { + #[doc = r#" + The type of items yielded by this stream. + "#] type Item; + #[doc = r#" + Attempts to receive the next item from the back of the stream. + + There are several possible return values: + + * `Poll::Pending` means this stream's next_back value is not ready yet. + * `Poll::Ready(None)` means this stream has been exhausted. + * `Poll::Ready(Some(item))` means `item` was received out of the stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::pin::Pin; + + use async_std::prelude::*; + use async_std::stream; + use async_std::task::{Context, Poll}; + + fn increment( + s: impl DoubleEndedStream + Unpin, + ) -> impl DoubleEndedStream + Unpin { + struct Increment(S); + + impl + Unpin> Stream for Increment { + type Item = S::Item; + + fn poll_next_back( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.0).poll_next_back(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + } + } + } + + Increment(s) + } + + let mut s = increment(stream::once(7)); // will need to implement DoubleEndedStream + + assert_eq!(s.next_back().await, Some(8)); + assert_eq!(s.next_back().await, None); + # + # }) } + ``` + "#] fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; } #[doc = r#" - Something else + Extension methods for [`DoubleEndedStreamExt`]. + + [`Stream`]: ../stream/trait.Stream.html "#] pub trait DoubleEndedStreamExt: crate::stream::DoubleEndedStream { #[doc = r#" From abd360893c45984fefad2fa97ae05b855c6fe02b Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Wed, 20 Nov 2019 22:16:39 +0000 Subject: [PATCH 11/18] Disable docs and Debug for unexposed structs --- src/stream/double_ended/nth_back.rs | 2 ++ src/stream/double_ended/rfind.rs | 2 ++ src/stream/double_ended/rfold.rs | 2 ++ src/stream/double_ended/try_rfold.rs | 4 ++-- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/stream/double_ended/nth_back.rs b/src/stream/double_ended/nth_back.rs index 8e1ed637..e32a28fd 100644 --- a/src/stream/double_ended/nth_back.rs +++ b/src/stream/double_ended/nth_back.rs @@ -4,6 +4,8 @@ use std::task::{Context, Poll}; use crate::stream::DoubleEndedStream; +#[doc(hidden)] +#[allow(missing_debug_implementations)] pub struct NthBackFuture<'a, S> { stream: &'a mut S, n: usize, diff --git a/src/stream/double_ended/rfind.rs b/src/stream/double_ended/rfind.rs index b05e14ee..94726934 100644 --- a/src/stream/double_ended/rfind.rs +++ b/src/stream/double_ended/rfind.rs @@ -4,6 +4,8 @@ use std::pin::Pin; use crate::stream::DoubleEndedStream; +#[doc(hidden)] +#[allow(missing_debug_implementations)] pub struct RFindFuture<'a, S, P> { stream: &'a mut S, p: P, diff --git a/src/stream/double_ended/rfold.rs b/src/stream/double_ended/rfold.rs index 4df7d9fb..9002f8d9 100644 --- a/src/stream/double_ended/rfold.rs +++ b/src/stream/double_ended/rfold.rs @@ -7,6 +7,8 @@ use pin_project_lite::pin_project; use crate::stream::DoubleEndedStream; pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] pub struct RFoldFuture { #[pin] stream: S, diff --git a/src/stream/double_ended/try_rfold.rs b/src/stream/double_ended/try_rfold.rs index 4c97cea0..9e6295a7 100644 --- a/src/stream/double_ended/try_rfold.rs +++ b/src/stream/double_ended/try_rfold.rs @@ -7,8 +7,8 @@ use pin_project_lite::pin_project; use crate::stream::DoubleEndedStream; pin_project! { -#[doc(hidden)] -#[allow(missing_debug_implementations)] + #[doc(hidden)] + #[allow(missing_debug_implementations)] pub struct TryRFoldFuture { #[pin] stream: S, From 892c6008c24e7ec5c1ed1986204730240f8bb4e4 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 22 Nov 2019 16:53:42 +0000 Subject: [PATCH 12/18] Replace sample with a hidden from_iter implementation for double-ended-stream --- src/stream/double_ended/from_iter.rs | 38 ++++++++++++++++++++++++++++ src/stream/double_ended/mod.rs | 27 +++++++++----------- src/stream/mod.rs | 2 -- src/stream/sample.rs | 33 ------------------------ 4 files changed, 50 insertions(+), 50 deletions(-) create mode 100644 src/stream/double_ended/from_iter.rs delete mode 100644 src/stream/sample.rs diff --git a/src/stream/double_ended/from_iter.rs b/src/stream/double_ended/from_iter.rs new file mode 100644 index 00000000..616724f8 --- /dev/null +++ b/src/stream/double_ended/from_iter.rs @@ -0,0 +1,38 @@ +use crate::stream::Stream; + +use std::pin::Pin; +use std::task::{Context, Poll}; +use crate::stream::DoubleEndedStream; + +/// A double-ended stream that was created from iterator. +/// +/// This stream is created by the [`from_iter`] function. +/// See it documentation for more. +/// +/// [`from_iter`]: fn.from_iter.html +#[derive(Debug)] +pub struct FromIter { + inner: Vec, +} + +pub fn from_iter(iter: I) -> FromIter { + FromIter { inner: iter.into_iter().collect() } +} + +impl Unpin for FromIter {} + +impl Stream for FromIter { + type Item = T; + fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + if self.inner.len() > 0 { + return Poll::Ready(Some(self.inner.remove(0))); + } + return Poll::Ready(None); + } +} + +impl DoubleEndedStream for FromIter { + fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.inner.pop()) + } +} diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs index 3267ae58..b8b78b27 100644 --- a/src/stream/double_ended/mod.rs +++ b/src/stream/double_ended/mod.rs @@ -3,12 +3,14 @@ mod nth_back; mod rfind; mod rfold; mod try_rfold; +mod from_iter; use next_back::NextBackFuture; use nth_back::NthBackFuture; use rfind::RFindFuture; use rfold::RFoldFuture; use try_rfold::TryRFoldFuture; +pub use from_iter::{from_iter, FromIter}; extension_trait! { use crate::stream::Stream; @@ -113,10 +115,9 @@ extension_trait! { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::Sample; - use async_std::stream::double_ended::DoubleEndedStreamExt; + use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - let mut s = Sample::from(vec![7u8]); + let mut s = double_ended::from_iter(vec![7u8]); assert_eq!(s.next_back().await, Some(7)); assert_eq!(s.next_back().await, None); @@ -141,10 +142,9 @@ extension_trait! { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::Sample; - use async_std::stream::double_ended::DoubleEndedStreamExt; + use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - let mut s = Sample::from(vec![1u8, 2, 3, 4, 5]); + let mut s = double_ended::from_iter(vec![1u8, 2, 3, 4, 5]); let second = s.nth_back(1).await; assert_eq!(second, Some(4)); @@ -172,10 +172,9 @@ extension_trait! { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::Sample; - use async_std::stream::double_ended::DoubleEndedStreamExt; + use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - let mut s = Sample::from(vec![1u8, 2, 3, 4, 5]); + let mut s = double_ended::from_iter(vec![1u8, 2, 3, 4, 5]); let second = s.rfind(|v| v % 2 == 0).await; assert_eq!(second, Some(4)); @@ -202,10 +201,9 @@ extension_trait! { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::Sample; - use async_std::stream::double_ended::DoubleEndedStreamExt; + use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - let s = Sample::from(vec![1, 2, 3, 4, 5]); + let s = double_ended::from_iter(vec![1u8, 2, 3, 4, 5]); let second = s.rfold(0, |acc, v| v + acc).await; @@ -237,10 +235,9 @@ extension_trait! { ``` # fn main() { async_std::task::block_on(async { # - use async_std::stream::Sample; - use async_std::stream::double_ended::DoubleEndedStreamExt; + use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - let s = Sample::from(vec![1, 2, 3, 4, 5]); + let s = double_ended::from_iter(vec![1u8, 2, 3, 4, 5]); let sum = s.try_rfold(0, |acc, v| { if (acc+v) % 2 == 1 { Ok(v+3) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 6ce9aac7..f6b6497b 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -307,7 +307,6 @@ pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; pub use stream::*; -pub use crate::stream::sample::Sample; pub mod stream; @@ -317,7 +316,6 @@ mod from_iter; mod once; mod repeat; mod repeat_with; -mod sample; cfg_unstable! { pub mod double_ended; diff --git a/src/stream/sample.rs b/src/stream/sample.rs deleted file mode 100644 index 90caeed1..00000000 --- a/src/stream/sample.rs +++ /dev/null @@ -1,33 +0,0 @@ -use crate::stream::Stream; - -use std::pin::Pin; -use std::task::{Context, Poll}; -use crate::stream::DoubleEndedStream; - -pub struct Sample { - inner: Vec, -} - -impl From> for Sample { - fn from(data: Vec) -> Self { - Sample { inner: data } - } -} - -impl Unpin for Sample {} - -impl Stream for Sample { - type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.inner.len() > 0 { - return Poll::Ready(Some(self.inner.remove(0))); - } - return Poll::Ready(None); - } -} - -impl DoubleEndedStream for Sample { - fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(self.inner.pop()) - } -} From 6e8236d0e17ade1493916f36c1c166c388cb6d4f Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 22 Nov 2019 17:24:52 +0000 Subject: [PATCH 13/18] Document from_iter for DoubleEndedStream --- src/stream/double_ended/from_iter.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/stream/double_ended/from_iter.rs b/src/stream/double_ended/from_iter.rs index 616724f8..ae424a50 100644 --- a/src/stream/double_ended/from_iter.rs +++ b/src/stream/double_ended/from_iter.rs @@ -15,6 +15,25 @@ pub struct FromIter { inner: Vec, } +/// Converts an iterator into a double-ended stream. +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; +/// +/// let mut s = double_ended::from_iter(vec![0, 1, 2, 3]); +/// +/// assert_eq!(s.next_back().await, Some(3)); +/// assert_eq!(s.next_back().await, Some(2)); +/// assert_eq!(s.next_back().await, Some(1)); +/// assert_eq!(s.next_back().await, Some(0)); +/// assert_eq!(s.next_back().await, None); +/// # +/// # }) +/// ``` pub fn from_iter(iter: I) -> FromIter { FromIter { inner: iter.into_iter().collect() } } From f9a4c35fd69c386393f1406f8e1c8d97f416afb3 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sat, 23 Nov 2019 22:20:37 +0000 Subject: [PATCH 14/18] Silence warning about missing docs for the double_ended module --- src/stream/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f6b6497b..9b8ee8a3 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -318,6 +318,7 @@ mod repeat; mod repeat_with; cfg_unstable! { + #[doc(hidden)] pub mod double_ended; mod double_ended_stream; mod exact_size_stream; From 41cf0f855b8de6860f3bff3125725bab77eee24f Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sat, 23 Nov 2019 22:27:08 +0000 Subject: [PATCH 15/18] Make Once a DoubleEndedStream --- src/stream/once.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/stream/once.rs b/src/stream/once.rs index e4ac682c..9ce93aaf 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -4,6 +4,7 @@ use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; +use crate::stream::double_ended_stream::DoubleEndedStream; /// Creates a stream that yields a single item. /// @@ -46,3 +47,9 @@ impl Stream for Once { Poll::Ready(self.project().value.take()) } } + +impl DoubleEndedStream for Once { + fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.project().value.take()) + } +} From 8e5dedec34e50dd920a40eb8f9cea6bd90baf560 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Thu, 28 Nov 2019 12:33:37 +0100 Subject: [PATCH 16/18] Restructure package. No longer use a extension trait to match std. Still outstanding: How do I hide the concrete structs from the trait? --- src/stream/double_ended/mod.rs | 267 ------------------ src/stream/double_ended_stream.rs | 24 -- .../from_iter.rs | 4 +- src/stream/double_ended_stream/mod.rs | 243 ++++++++++++++++ .../next_back.rs | 0 .../nth_back.rs | 0 .../rfind.rs | 0 .../rfold.rs | 0 .../try_rfold.rs | 0 src/stream/mod.rs | 3 +- 10 files changed, 246 insertions(+), 295 deletions(-) delete mode 100644 src/stream/double_ended/mod.rs delete mode 100644 src/stream/double_ended_stream.rs rename src/stream/{double_ended => double_ended_stream}/from_iter.rs (90%) create mode 100644 src/stream/double_ended_stream/mod.rs rename src/stream/{double_ended => double_ended_stream}/next_back.rs (100%) rename src/stream/{double_ended => double_ended_stream}/nth_back.rs (100%) rename src/stream/{double_ended => double_ended_stream}/rfind.rs (100%) rename src/stream/{double_ended => double_ended_stream}/rfold.rs (100%) rename src/stream/{double_ended => double_ended_stream}/try_rfold.rs (100%) diff --git a/src/stream/double_ended/mod.rs b/src/stream/double_ended/mod.rs deleted file mode 100644 index b8b78b27..00000000 --- a/src/stream/double_ended/mod.rs +++ /dev/null @@ -1,267 +0,0 @@ -mod next_back; -mod nth_back; -mod rfind; -mod rfold; -mod try_rfold; -mod from_iter; - -use next_back::NextBackFuture; -use nth_back::NthBackFuture; -use rfind::RFindFuture; -use rfold::RFoldFuture; -use try_rfold::TryRFoldFuture; -pub use from_iter::{from_iter, FromIter}; - -extension_trait! { - use crate::stream::Stream; - - use std::pin::Pin; - use std::task::{Context, Poll}; - - - #[doc = r#" - A stream able to yield elements from both ends. - - Something that implements `DoubleEndedStream` has one extra capability - over something that implements [`Stream`]: the ability to also take - `Item`s from the back, as well as the front. - - It is important to note that both back and forth work on the same range, - and do not cross: iteration is over when they meet in the middle. - - In a similar fashion to the [`Stream`] protocol, once a - `DoubleEndedStream` returns `None` from a `next_back()`, calling it again - may or may not ever return `Some` again. `next()` and `next_back()` are - interchangeable for this purpose. - ``` - "#] - pub trait DoubleEndedStream { - #[doc = r#" - The type of items yielded by this stream. - "#] - type Item; - - #[doc = r#" - Attempts to receive the next item from the back of the stream. - - There are several possible return values: - - * `Poll::Pending` means this stream's next_back value is not ready yet. - * `Poll::Ready(None)` means this stream has been exhausted. - * `Poll::Ready(Some(item))` means `item` was received out of the stream. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use std::pin::Pin; - - use async_std::prelude::*; - use async_std::stream; - use async_std::task::{Context, Poll}; - - fn increment( - s: impl DoubleEndedStream + Unpin, - ) -> impl DoubleEndedStream + Unpin { - struct Increment(S); - - impl + Unpin> Stream for Increment { - type Item = S::Item; - - fn poll_next_back( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - match Pin::new(&mut self.0).poll_next_back(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => Poll::Ready(None), - Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), - } - } - } - - Increment(s) - } - - let mut s = increment(stream::once(7)); // will need to implement DoubleEndedStream - - assert_eq!(s.next_back().await, Some(8)); - assert_eq!(s.next_back().await, None); - # - # }) } - ``` - "#] - fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - } - - #[doc = r#" - Extension methods for [`DoubleEndedStreamExt`]. - - [`Stream`]: ../stream/trait.Stream.html - "#] - pub trait DoubleEndedStreamExt: crate::stream::DoubleEndedStream { - #[doc = r#" - Advances the stream and returns the next value. - - Returns [`None`] when iteration is finished. Individual stream implementations may - choose to resume iteration, and so calling `next()` again may or may not eventually - start returning more values. - - [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - - let mut s = double_ended::from_iter(vec![7u8]); - - assert_eq!(s.next_back().await, Some(7)); - assert_eq!(s.next_back().await, None); - # - # }) } - ``` - "#] - fn next_back(&mut self) -> impl Future> + '_ [NextBackFuture<'_, Self>] - where - Self: Unpin, - { - NextBackFuture { stream: self } - } - - #[doc = r#" - Returns the nth element from the back of the stream. - - # Examples - - Basic usage: - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - - let mut s = double_ended::from_iter(vec![1u8, 2, 3, 4, 5]); - - let second = s.nth_back(1).await; - assert_eq!(second, Some(4)); - # - # }) } - ``` - "#] - fn nth_back( - &mut self, - n: usize, - ) -> impl Future> + '_ [NthBackFuture<'_, Self>] - where - Self: Unpin + Sized, - { - NthBackFuture::new(self, n) - } - - #[doc = r#" - Returns the the frist element from the right that matches the predicate. - - # Examples - - Basic usage: - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - - let mut s = double_ended::from_iter(vec![1u8, 2, 3, 4, 5]); - - let second = s.rfind(|v| v % 2 == 0).await; - assert_eq!(second, Some(4)); - # - # }) } - ``` - "#] - fn rfind

( - &mut self, - p: P, - ) -> impl Future> + '_ [RFindFuture<'_, Self, P>] - where - Self: Unpin + Sized, - P: FnMut(&Self::Item) -> bool, - { - RFindFuture::new(self, p) - } - - #[doc = r#" - # Examples - - Basic usage: - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - - let s = double_ended::from_iter(vec![1u8, 2, 3, 4, 5]); - - let second = s.rfold(0, |acc, v| v + acc).await; - - assert_eq!(second, 15); - # - # }) } - ``` - "#] - fn rfold( - self, - accum: B, - f: F, - ) -> impl Future> [RFoldFuture] - where - Self: Sized, - F: FnMut(B, Self::Item) -> B, - { - RFoldFuture::new(self, accum, f) - } - - #[doc = r#" - A combinator that applies a function as long as it returns successfully, producing a single, final value. - Immediately returns the error when the function returns unsuccessfully. - - # Examples - - Basic usage: - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; - - let s = double_ended::from_iter(vec![1u8, 2, 3, 4, 5]); - let sum = s.try_rfold(0, |acc, v| { - if (acc+v) % 2 == 1 { - Ok(v+3) - } else { - Err("fail") - } - }).await; - - assert_eq!(sum, Err("fail")); - # - # }) } - ``` - "#] - fn try_rfold( - self, - accum: B, - f: F, - ) -> impl Future> [TryRFoldFuture] - where - Self: Sized, - F: FnMut(B, Self::Item) -> Result, - { - TryRFoldFuture::new(self, accum, f) - } - - } -} diff --git a/src/stream/double_ended_stream.rs b/src/stream/double_ended_stream.rs deleted file mode 100644 index 129bb1cd..00000000 --- a/src/stream/double_ended_stream.rs +++ /dev/null @@ -1,24 +0,0 @@ -use crate::stream::Stream; - -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// A stream able to yield elements from both ends. -/// -/// Something that implements `DoubleEndedStream` has one extra capability -/// over something that implements [`Stream`]: the ability to also take -/// `Item`s from the back, as well as the front. -/// -/// [`Stream`]: trait.Stream.html -#[cfg(feature = "unstable")] -#[cfg_attr(feature = "docs", doc(cfg(unstable)))] -pub trait DoubleEndedStream: Stream { - /// Removes and returns an element from the end of the stream. - /// - /// Returns `None` when there are no more elements. - /// - /// The [trait-level] docs contain more details. - /// - /// [trait-level]: trait.DoubleEndedStream.html - fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; -} diff --git a/src/stream/double_ended/from_iter.rs b/src/stream/double_ended_stream/from_iter.rs similarity index 90% rename from src/stream/double_ended/from_iter.rs rename to src/stream/double_ended_stream/from_iter.rs index ae424a50..29a3e7d8 100644 --- a/src/stream/double_ended/from_iter.rs +++ b/src/stream/double_ended_stream/from_iter.rs @@ -22,9 +22,9 @@ pub struct FromIter { /// ``` /// # async_std::task::block_on(async { /// # -/// use async_std::stream::double_ended::{self, DoubleEndedStreamExt}; +/// use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; /// -/// let mut s = double_ended::from_iter(vec![0, 1, 2, 3]); +/// let mut s = double_ended_stream::from_iter(vec![0, 1, 2, 3]); /// /// assert_eq!(s.next_back().await, Some(3)); /// assert_eq!(s.next_back().await, Some(2)); diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs new file mode 100644 index 00000000..92136615 --- /dev/null +++ b/src/stream/double_ended_stream/mod.rs @@ -0,0 +1,243 @@ +use crate::stream::Stream; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +mod from_iter; +mod next_back; +mod nth_back; +mod rfind; +mod rfold; +mod try_rfold; + +pub use from_iter::{from_iter, FromIter}; +use next_back::NextBackFuture; +use nth_back::NthBackFuture; +use rfind::RFindFuture; +use rfold::RFoldFuture; +use try_rfold::TryRFoldFuture; + +/// A stream able to yield elements from both ends. +/// +/// Something that implements `DoubleEndedStream` has one extra capability +/// over something that implements [`Stream`]: the ability to also take +/// `Item`s from the back, as well as the front. +/// +/// [`Stream`]: trait.Stream.html +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub trait DoubleEndedStream: Stream { + #[doc = r#" + Attempts to receive the next item from the back of the stream. + + There are several possible return values: + + * `Poll::Pending` means this stream's next_back value is not ready yet. + * `Poll::Ready(None)` means this stream has been exhausted. + * `Poll::Ready(Some(item))` means `item` was received out of the stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::pin::Pin; + + use async_std::prelude::*; + use async_std::stream; + use async_std::task::{Context, Poll}; + + fn increment( + s: impl DoubleEndedStream + Unpin, + ) -> impl DoubleEndedStream + Unpin { + struct Increment(S); + + impl + Unpin> Stream for Increment { + type Item = S::Item; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.0).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + } + } + } + + impl + Unpin> DoubleEndedStream for Increment { + fn poll_next_back( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.0).poll_next_back(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + } + } + } + + Increment(s) + } + + let mut s = increment(stream::once(7)); + + assert_eq!(s.next_back().await, Some(8)); + assert_eq!(s.next_back().await, None); + # + # }) } + ``` + "#] + fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + #[doc = r#" + Advances the stream and returns the next value. + + Returns [`None`] when iteration is finished. Individual stream implementations may + choose to resume iteration, and so calling `next()` again may or may not eventually + start returning more values. + + [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let mut s = double_ended_stream::from_iter(vec![7u8]); + + assert_eq!(s.next_back().await, Some(7)); + assert_eq!(s.next_back().await, None); + # + # }) } + ``` + "#] + fn next_back(&mut self) -> NextBackFuture<'_, Self> + where + Self: Unpin, + { + NextBackFuture { stream: self } + } + + #[doc = r#" + Returns the nth element from the back of the stream. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.nth_back(1).await; + assert_eq!(second, Some(4)); + # + # }) } + ``` + "#] + fn nth_back(&mut self, n: usize) -> NthBackFuture<'_, Self> + where + Self: Unpin + Sized, + { + NthBackFuture::new(self, n) + } + + #[doc = r#" + Returns the the frist element from the right that matches the predicate. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let mut s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.rfind(|v| v % 2 == 0).await; + assert_eq!(second, Some(4)); + # + # }) } + ``` + "#] + fn rfind

(&mut self, p: P) -> RFindFuture<'_, Self, P> + where + Self: Unpin + Sized, + P: FnMut(&Self::Item) -> bool, + { + RFindFuture::new(self, p) + } + + #[doc = r#" + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.rfold(0, |acc, v| v + acc).await; + + assert_eq!(second, 15); + # + # }) } + ``` + "#] + fn rfold(self, accum: B, f: F) -> RFoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + RFoldFuture::new(self, accum, f) + } + + #[doc = r#" + A combinator that applies a function as long as it returns successfully, producing a single, final value. + Immediately returns the error when the function returns unsuccessfully. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; + + let s = double_ended_stream::from_iter(vec![1u8, 2, 3, 4, 5]); + let sum = s.try_rfold(0, |acc, v| { + if (acc+v) % 2 == 1 { + Ok(v+3) + } else { + Err("fail") + } + }).await; + + assert_eq!(sum, Err("fail")); + # + # }) } + ``` + "#] + fn try_rfold(self, accum: B, f: F) -> TryRFoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> Result, + { + TryRFoldFuture::new(self, accum, f) + } +} diff --git a/src/stream/double_ended/next_back.rs b/src/stream/double_ended_stream/next_back.rs similarity index 100% rename from src/stream/double_ended/next_back.rs rename to src/stream/double_ended_stream/next_back.rs diff --git a/src/stream/double_ended/nth_back.rs b/src/stream/double_ended_stream/nth_back.rs similarity index 100% rename from src/stream/double_ended/nth_back.rs rename to src/stream/double_ended_stream/nth_back.rs diff --git a/src/stream/double_ended/rfind.rs b/src/stream/double_ended_stream/rfind.rs similarity index 100% rename from src/stream/double_ended/rfind.rs rename to src/stream/double_ended_stream/rfind.rs diff --git a/src/stream/double_ended/rfold.rs b/src/stream/double_ended_stream/rfold.rs similarity index 100% rename from src/stream/double_ended/rfold.rs rename to src/stream/double_ended_stream/rfold.rs diff --git a/src/stream/double_ended/try_rfold.rs b/src/stream/double_ended_stream/try_rfold.rs similarity index 100% rename from src/stream/double_ended/try_rfold.rs rename to src/stream/double_ended_stream/try_rfold.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 9b8ee8a3..ebce3a36 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -319,8 +319,7 @@ mod repeat_with; cfg_unstable! { #[doc(hidden)] - pub mod double_ended; - mod double_ended_stream; + pub mod double_ended_stream; mod exact_size_stream; mod extend; mod from_stream; From b0038e11bed6d75f87b2c17a82da56b8534c98ce Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Thu, 28 Nov 2019 21:52:48 +0100 Subject: [PATCH 17/18] Only implement the DoubleEndedStream for once when the flag is on --- src/stream/once.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/stream/once.rs b/src/stream/once.rs index 9ce93aaf..939722d9 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -4,7 +4,9 @@ use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -use crate::stream::double_ended_stream::DoubleEndedStream; + +#[cfg(feature = "unstable")] +use crate::stream::DoubleEndedStream; /// Creates a stream that yields a single item. /// @@ -48,6 +50,7 @@ impl Stream for Once { } } +#[cfg(feature = "unstable")] impl DoubleEndedStream for Once { fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(self.project().value.take()) From 182fe6896f628efde8bdd7b5d00923c7ea0629e5 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Thu, 12 Dec 2019 20:46:03 +0100 Subject: [PATCH 18/18] No need for a custom impl for FromIter for DoubleEndedStream --- src/stream/double_ended_stream/from_iter.rs | 57 --------------------- src/stream/double_ended_stream/mod.rs | 2 - src/stream/from_iter.rs | 9 ++++ 3 files changed, 9 insertions(+), 59 deletions(-) delete mode 100644 src/stream/double_ended_stream/from_iter.rs diff --git a/src/stream/double_ended_stream/from_iter.rs b/src/stream/double_ended_stream/from_iter.rs deleted file mode 100644 index 29a3e7d8..00000000 --- a/src/stream/double_ended_stream/from_iter.rs +++ /dev/null @@ -1,57 +0,0 @@ -use crate::stream::Stream; - -use std::pin::Pin; -use std::task::{Context, Poll}; -use crate::stream::DoubleEndedStream; - -/// A double-ended stream that was created from iterator. -/// -/// This stream is created by the [`from_iter`] function. -/// See it documentation for more. -/// -/// [`from_iter`]: fn.from_iter.html -#[derive(Debug)] -pub struct FromIter { - inner: Vec, -} - -/// Converts an iterator into a double-ended stream. -/// -/// # Examples -/// -/// ``` -/// # async_std::task::block_on(async { -/// # -/// use async_std::stream::double_ended_stream::{self, DoubleEndedStream}; -/// -/// let mut s = double_ended_stream::from_iter(vec![0, 1, 2, 3]); -/// -/// assert_eq!(s.next_back().await, Some(3)); -/// assert_eq!(s.next_back().await, Some(2)); -/// assert_eq!(s.next_back().await, Some(1)); -/// assert_eq!(s.next_back().await, Some(0)); -/// assert_eq!(s.next_back().await, None); -/// # -/// # }) -/// ``` -pub fn from_iter(iter: I) -> FromIter { - FromIter { inner: iter.into_iter().collect() } -} - -impl Unpin for FromIter {} - -impl Stream for FromIter { - type Item = T; - fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - if self.inner.len() > 0 { - return Poll::Ready(Some(self.inner.remove(0))); - } - return Poll::Ready(None); - } -} - -impl DoubleEndedStream for FromIter { - fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(self.inner.pop()) - } -} diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs index 92136615..dc2a45c9 100644 --- a/src/stream/double_ended_stream/mod.rs +++ b/src/stream/double_ended_stream/mod.rs @@ -3,14 +3,12 @@ use crate::stream::Stream; use std::pin::Pin; use std::task::{Context, Poll}; -mod from_iter; mod next_back; mod nth_back; mod rfind; mod rfold; mod try_rfold; -pub use from_iter::{from_iter, FromIter}; use next_back::NextBackFuture; use nth_back::NthBackFuture; use rfind::RFindFuture; diff --git a/src/stream/from_iter.rs b/src/stream/from_iter.rs index d7a31d6c..705d1504 100644 --- a/src/stream/from_iter.rs +++ b/src/stream/from_iter.rs @@ -3,6 +3,8 @@ use std::pin::Pin; use pin_project_lite::pin_project; use crate::stream::Stream; +#[cfg(feature = "unstable")] +use crate::stream::double_ended_stream::DoubleEndedStream; use crate::task::{Context, Poll}; pin_project! { @@ -51,3 +53,10 @@ impl Stream for FromIter { Poll::Ready(self.iter.next()) } } + +#[cfg(feature = "unstable")] +impl DoubleEndedStream for FromIter { + fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.iter.next_back()) + } +}