From fe3c9ef626801455028325f3a5bfeefa97406470 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Thu, 17 Oct 2019 14:23:14 +0200 Subject: [PATCH 01/22] First attempt at successor --- src/stream/stream/mod.rs | 1 + src/stream/stream/successor.rs | 59 ++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) create mode 100644 src/stream/stream/successor.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d6292c3..b5583e1 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -57,6 +57,7 @@ mod partial_cmp; mod position; mod scan; mod skip; +mod successor; mod skip_while; mod step_by; mod take; diff --git a/src/stream/stream/successor.rs b/src/stream/stream/successor.rs new file mode 100644 index 0000000..519729f --- /dev/null +++ b/src/stream/stream/successor.rs @@ -0,0 +1,59 @@ +use std::pin::Pin; +use std::marker::PhantomData; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[derive(Debug)] +pub struct Successor +where Fut: Future +{ + successor: F, + next: T, + _marker: PhantomData +} + +pub fn successor(func: F, start: T) -> Successor +where + F: FnMut(T) -> Fut, + Fut: Future, + T: Copy, + { + Successor { + successor: func, + next: start, + _marker: PhantomData, + } + } + +impl Successor +where + F: FnMut(T) -> Fut, + Fut: Future, + T: Copy, + +{ + pin_utils::unsafe_unpinned!(successor: F); + pin_utils::unsafe_unpinned!(next: T); +} + +impl Stream for Successor +where + Fut: Future, + F: FnMut(T) -> Fut, + T: Copy, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + + match self.as_mut().successor()(self.next).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(val) => { + self.next = val; + Poll::Ready(Some(val)) + } + } + } +} From 02b261de10d09c699fb949d3ac5347282756e1d8 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Thu, 17 Oct 2019 23:27:41 +0200 Subject: [PATCH 02/22] It compiles! Store the future and poll it instead of creating multiple new ones --- src/stream/mod.rs | 2 + src/stream/stream/mod.rs | 1 - src/stream/stream/successor.rs | 59 ------------------- src/stream/successor.rs | 102 +++++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 60 deletions(-) delete mode 100644 src/stream/stream/successor.rs create mode 100644 src/stream/successor.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f782882..f410e08 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -303,6 +303,7 @@ pub use empty::{empty, Empty}; pub use from_fn::{from_fn, FromFn}; pub use from_iter::{from_iter, FromIter}; +pub use successor::{successor, Successor}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; @@ -316,6 +317,7 @@ mod from_iter; mod once; mod repeat; mod repeat_with; +mod successor; cfg_unstable! { mod double_ended_stream; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b5583e1..d6292c3 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -57,7 +57,6 @@ mod partial_cmp; mod position; mod scan; mod skip; -mod successor; mod skip_while; mod step_by; mod take; diff --git a/src/stream/stream/successor.rs b/src/stream/stream/successor.rs deleted file mode 100644 index 519729f..0000000 --- a/src/stream/stream/successor.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::pin::Pin; -use std::marker::PhantomData; - -use crate::future::Future; -use crate::stream::Stream; -use crate::task::{Context, Poll}; - -#[derive(Debug)] -pub struct Successor -where Fut: Future -{ - successor: F, - next: T, - _marker: PhantomData -} - -pub fn successor(func: F, start: T) -> Successor -where - F: FnMut(T) -> Fut, - Fut: Future, - T: Copy, - { - Successor { - successor: func, - next: start, - _marker: PhantomData, - } - } - -impl Successor -where - F: FnMut(T) -> Fut, - Fut: Future, - T: Copy, - -{ - pin_utils::unsafe_unpinned!(successor: F); - pin_utils::unsafe_unpinned!(next: T); -} - -impl Stream for Successor -where - Fut: Future, - F: FnMut(T) -> Fut, - T: Copy, -{ - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - - match self.as_mut().successor()(self.next).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(val) => { - self.next = val; - Poll::Ready(Some(val)) - } - } - } -} diff --git a/src/stream/successor.rs b/src/stream/successor.rs new file mode 100644 index 0000000..434ef97 --- /dev/null +++ b/src/stream/successor.rs @@ -0,0 +1,102 @@ +use std::pin::Pin; +use std::marker::PhantomData; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that yields elements by calling an async closure with the previous value as an +/// argument +/// +/// This stream is constructed by [`successor`] function +/// +/// [`successor`]: fn.successor.html +#[derive(Debug)] +pub struct Successor +where Fut: Future +{ + successor: F, + future: Option, + next: T, + _marker: PhantomData +} + +/// Creates a new stream where to produce each new element a clousre is called with the previous +/// value. +/// +/// #Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let s = stream::successor(22, |val| { +/// async move { +/// val + 1 +/// } +/// }); +/// +/// pin_utils::pin_mut!(s); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(2)); +/// assert_eq!(s.next().await, Some(3)); +/// # +/// # }) } +/// +/// ``` +pub fn successor(start: T, func: F) -> Successor +where + F: FnMut(T) -> Fut, + Fut: Future, + T: Copy, + { + Successor { + successor: func, + future: None, + next: start, + _marker: PhantomData, + } + } + +impl Successor +where + F: FnMut(T) -> Fut, + Fut: Future, + T: Copy, + +{ + pin_utils::unsafe_unpinned!(successor: F); + pin_utils::unsafe_unpinned!(next: T); + pin_utils::unsafe_pinned!(future: Option); + +} + +impl Stream for Successor +where + Fut: Future, + F: FnMut(T) -> Fut, + T: Copy, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &self.future { + Some(_) => { + let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + self.as_mut().future().set(None); + + Poll::Ready(Some(next)) + }, + None => { + let x = self.next; + let fut = (self.as_mut().successor())(x); + self.as_mut().future().set(Some(fut)); + // Probably can poll the value here? + Poll::Pending + } + } + } +} + From 95a3e53fcdcab2d0610e54aa9520c152a11a1ceb Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 18 Oct 2019 09:00:38 +0200 Subject: [PATCH 03/22] Only use the Option of the future to decide to construct a new one --- src/stream/successor.rs | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/src/stream/successor.rs b/src/stream/successor.rs index 434ef97..3ddeef4 100644 --- a/src/stream/successor.rs +++ b/src/stream/successor.rs @@ -39,9 +39,9 @@ where Fut: Future /// }); /// /// pin_utils::pin_mut!(s); -/// assert_eq!(s.next().await, Some(1)); -/// assert_eq!(s.next().await, Some(2)); -/// assert_eq!(s.next().await, Some(3)); +/// assert_eq!(s.next().await, Some(23)); +/// assert_eq!(s.next().await, Some(24)); +/// assert_eq!(s.next().await, Some(25)); /// # /// # }) } /// @@ -83,20 +83,18 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match &self.future { - Some(_) => { - let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - self.as_mut().future().set(None); - - Poll::Ready(Some(next)) - }, None => { let x = self.next; let fut = (self.as_mut().successor())(x); self.as_mut().future().set(Some(fut)); - // Probably can poll the value here? - Poll::Pending } + _ => {}, } + + let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + *self.as_mut().next() = next; + self.as_mut().future().set(None); + Poll::Ready(Some(next)) } } From 8b662b659df21c09bc5f00c9e57f353f99457fd4 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Fri, 18 Oct 2019 09:10:15 +0200 Subject: [PATCH 04/22] Run rustfmt --- src/stream/mod.rs | 1 + src/stream/successor.rs | 32 +++++++++++++++----------------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f410e08..bab9dc7 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -308,6 +308,7 @@ pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; pub use stream::*; +pub use successor::{successor, Successor}; pub(crate) mod stream; diff --git a/src/stream/successor.rs b/src/stream/successor.rs index 3ddeef4..8e93956 100644 --- a/src/stream/successor.rs +++ b/src/stream/successor.rs @@ -1,5 +1,5 @@ -use std::pin::Pin; use std::marker::PhantomData; +use std::pin::Pin; use crate::future::Future; use crate::stream::Stream; @@ -12,13 +12,14 @@ use crate::task::{Context, Poll}; /// /// [`successor`]: fn.successor.html #[derive(Debug)] -pub struct Successor -where Fut: Future +pub struct Successor +where + Fut: Future, { successor: F, future: Option, next: T, - _marker: PhantomData + _marker: PhantomData, } /// Creates a new stream where to produce each new element a clousre is called with the previous @@ -51,29 +52,27 @@ where F: FnMut(T) -> Fut, Fut: Future, T: Copy, - { - Successor { - successor: func, - future: None, - next: start, - _marker: PhantomData, - } +{ + Successor { + successor: func, + future: None, + next: start, + _marker: PhantomData, } +} -impl Successor +impl Successor where F: FnMut(T) -> Fut, Fut: Future, T: Copy, - { pin_utils::unsafe_unpinned!(successor: F); pin_utils::unsafe_unpinned!(next: T); pin_utils::unsafe_pinned!(future: Option); - } -impl Stream for Successor +impl Stream for Successor where Fut: Future, F: FnMut(T) -> Fut, @@ -88,7 +87,7 @@ where let fut = (self.as_mut().successor())(x); self.as_mut().future().set(Some(fut)); } - _ => {}, + _ => {} } let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); @@ -97,4 +96,3 @@ where Poll::Ready(Some(next)) } } - From 554d5cfbc1ec93c4240c9cdbfac90ea62bd596ea Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sun, 20 Oct 2019 13:37:26 +0200 Subject: [PATCH 05/22] Slight renamings --- src/stream/successor.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/stream/successor.rs b/src/stream/successor.rs index 8e93956..32353d0 100644 --- a/src/stream/successor.rs +++ b/src/stream/successor.rs @@ -12,7 +12,7 @@ use crate::task::{Context, Poll}; /// /// [`successor`]: fn.successor.html #[derive(Debug)] -pub struct Successor +pub struct Successors where Fut: Future, { @@ -22,7 +22,7 @@ where _marker: PhantomData, } -/// Creates a new stream where to produce each new element a clousre is called with the previous +/// Creates a new stream where to produce each new element a closure is called with the previous /// value. /// /// #Examples @@ -33,7 +33,7 @@ where /// use async_std::prelude::*; /// use async_std::stream; /// -/// let s = stream::successor(22, |val| { +/// let s = stream::successors(22, |val| { /// async move { /// val + 1 /// } @@ -47,13 +47,13 @@ where /// # }) } /// /// ``` -pub fn successor(start: T, func: F) -> Successor +pub fn successors(start: T, func: F) -> Successors where F: FnMut(T) -> Fut, Fut: Future, T: Copy, { - Successor { + Successors { successor: func, future: None, next: start, @@ -61,7 +61,7 @@ where } } -impl Successor +impl Successors where F: FnMut(T) -> Fut, Fut: Future, @@ -72,7 +72,7 @@ where pin_utils::unsafe_pinned!(future: Option); } -impl Stream for Successor +impl Stream for Successors where Fut: Future, F: FnMut(T) -> Fut, From 266754897ec4574959ef35202af4d15bc83860e3 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sun, 20 Oct 2019 13:38:32 +0200 Subject: [PATCH 06/22] Rename the module to 'successors' --- src/stream/mod.rs | 2 +- src/stream/{successor.rs => successors.rs} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename src/stream/{successor.rs => successors.rs} (100%) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index bab9dc7..d5cc5ac 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -318,7 +318,7 @@ mod from_iter; mod once; mod repeat; mod repeat_with; -mod successor; +mod successors; cfg_unstable! { mod double_ended_stream; diff --git a/src/stream/successor.rs b/src/stream/successors.rs similarity index 100% rename from src/stream/successor.rs rename to src/stream/successors.rs From 8d97e0f974175040840a15e9ab568a2560f1658d Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sun, 20 Oct 2019 17:56:35 +0200 Subject: [PATCH 07/22] Only produes empty value if next is ever a 'None' --- src/stream/successors.rs | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/src/stream/successors.rs b/src/stream/successors.rs index 32353d0..e708416 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -14,11 +14,11 @@ use crate::task::{Context, Poll}; #[derive(Debug)] pub struct Successors where - Fut: Future, + Fut: Future>, { successor: F, future: Option, - next: T, + next: Option, _marker: PhantomData, } @@ -33,9 +33,9 @@ where /// use async_std::prelude::*; /// use async_std::stream; /// -/// let s = stream::successors(22, |val| { +/// let s = stream::successors(Some(22), |val| { /// async move { -/// val + 1 +/// Some(val + 1) /// } /// }); /// @@ -43,14 +43,25 @@ where /// assert_eq!(s.next().await, Some(23)); /// assert_eq!(s.next().await, Some(24)); /// assert_eq!(s.next().await, Some(25)); +/// +/// +///let never = stream::successors(None, |val: usize| { +/// async move { +/// Some(val + 1) +/// } +/// }); +/// +/// pin_utils::pin_mut!(never); +/// assert_eq!(never.next().await, None); +/// assert_eq!(never.next().await, None); /// # /// # }) } /// /// ``` -pub fn successors(start: T, func: F) -> Successors +pub fn successors(start: Option, func: F) -> Successors where F: FnMut(T) -> Fut, - Fut: Future, + Fut: Future>, T: Copy, { Successors { @@ -64,26 +75,30 @@ where impl Successors where F: FnMut(T) -> Fut, - Fut: Future, + Fut: Future>, T: Copy, { pin_utils::unsafe_unpinned!(successor: F); - pin_utils::unsafe_unpinned!(next: T); + pin_utils::unsafe_unpinned!(next: Option); pin_utils::unsafe_pinned!(future: Option); } impl Stream for Successors where - Fut: Future, + Fut: Future>, F: FnMut(T) -> Fut, T: Copy, { type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.next.is_none() { + return Poll::Ready(None); + } + match &self.future { None => { - let x = self.next; + let x = self.next.unwrap(); let fut = (self.as_mut().successor())(x); self.as_mut().future().set(Some(fut)); } @@ -93,6 +108,6 @@ where let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); *self.as_mut().next() = next; self.as_mut().future().set(None); - Poll::Ready(Some(next)) + Poll::Ready(next) } } From af928163e44731b27b23f1a01e8d4e7f3432a6cc Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sun, 10 Nov 2019 13:57:41 +0100 Subject: [PATCH 08/22] Got further! Thx Josh! --- src/stream/successors.rs | 80 +++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 42 deletions(-) diff --git a/src/stream/successors.rs b/src/stream/successors.rs index e708416..4186c66 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -1,25 +1,31 @@ use std::marker::PhantomData; use std::pin::Pin; +use std::mem; use crate::future::Future; use crate::stream::Stream; -use crate::task::{Context, Poll}; +use crate::task::{Context, Poll, ready}; -/// A stream that yields elements by calling an async closure with the previous value as an -/// argument -/// -/// This stream is constructed by [`successor`] function -/// -/// [`successor`]: fn.successor.html -#[derive(Debug)] -pub struct Successors -where - Fut: Future>, -{ - successor: F, - future: Option, - next: Option, - _marker: PhantomData, + + +pin_project_lite::pin_project! { + /// A stream that yields elements by calling an async closure with the previous value as an + /// argument + /// + /// This stream is constructed by [`successor`] function + /// + /// [`successor`]: fn.successor.html + #[derive(Debug)] + pub struct Successors + where + Fut: Future>, + { + successor: F, + #[pin] + future: Option, + slot: Option, + _marker: PhantomData, + } } /// Creates a new stream where to produce each new element a closure is called with the previous @@ -40,6 +46,7 @@ where /// }); /// /// pin_utils::pin_mut!(s); +/// assert_eq!(s.next().await, Some(22)); /// assert_eq!(s.next().await, Some(23)); /// assert_eq!(s.next().await, Some(24)); /// assert_eq!(s.next().await, Some(25)); @@ -58,31 +65,20 @@ where /// # }) } /// /// ``` -pub fn successors(start: Option, func: F) -> Successors +pub fn successors(first: Option, succ: F) -> Successors where F: FnMut(T) -> Fut, Fut: Future>, T: Copy, { Successors { - successor: func, + successor: succ, future: None, - next: start, + slot: first, _marker: PhantomData, } } -impl Successors -where - F: FnMut(T) -> Fut, - Fut: Future>, - T: Copy, -{ - pin_utils::unsafe_unpinned!(successor: F); - pin_utils::unsafe_unpinned!(next: Option); - pin_utils::unsafe_pinned!(future: Option); -} - impl Stream for Successors where Fut: Future>, @@ -91,23 +87,23 @@ where { type Item = T; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.next.is_none() { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + if this.slot.is_none() { return Poll::Ready(None); } - match &self.future { - None => { - let x = self.next.unwrap(); - let fut = (self.as_mut().successor())(x); - self.as_mut().future().set(Some(fut)); - } - _ => {} + if this.future.is_none() { + let x = this.slot.unwrap(); + let fut = (this.successor)(x); + this.future.set(Some(fut)); } - let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - *self.as_mut().next() = next; - self.as_mut().future().set(None); + let mut next = ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); + + this.future.set(None); + mem::swap(this.slot, &mut next); Poll::Ready(next) } } From a257b7018c83748ff98f07a9592aa7b29f6f62ef Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sun, 10 Nov 2019 14:29:43 +0100 Subject: [PATCH 09/22] Rename some variables to match iter --- src/stream/successors.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/stream/successors.rs b/src/stream/successors.rs index 4186c66..7f598f5 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use std::mem; @@ -12,19 +11,18 @@ pin_project_lite::pin_project! { /// A stream that yields elements by calling an async closure with the previous value as an /// argument /// - /// This stream is constructed by [`successor`] function + /// This stream is constructed by [`successors`] function /// - /// [`successor`]: fn.successor.html + /// [`succcessors`]: fn.succssors.html #[derive(Debug)] pub struct Successors where Fut: Future>, { - successor: F, + succ: F, #[pin] future: Option, slot: Option, - _marker: PhantomData, } } @@ -72,10 +70,9 @@ where T: Copy, { Successors { - successor: succ, + succ: succ, future: None, slot: first, - _marker: PhantomData, } } @@ -95,14 +92,15 @@ where } if this.future.is_none() { - let x = this.slot.unwrap(); - let fut = (this.successor)(x); + let fut = (this.succ)(this.slot.unwrap()); this.future.set(Some(fut)); } let mut next = ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); this.future.set(None); + + // 'swapping' here means 'slot' will hold the next value and next will be th one from the previous iteration mem::swap(this.slot, &mut next); Poll::Ready(next) } From 243cdd7ff1fe9ff2262ac0f7c7a729988dfa9482 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sun, 10 Nov 2019 17:56:31 +0100 Subject: [PATCH 10/22] Slight miss-merge --- src/stream/mod.rs | 1 - src/stream/successors.rs | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index d5cc5ac..d980f6c 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -303,7 +303,6 @@ pub use empty::{empty, Empty}; pub use from_fn::{from_fn, FromFn}; pub use from_iter::{from_iter, FromIter}; -pub use successor::{successor, Successor}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; diff --git a/src/stream/successors.rs b/src/stream/successors.rs index 7f598f5..fb4e1c6 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -5,9 +5,11 @@ use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll, ready}; +use pin_project_lite::pin_project; -pin_project_lite::pin_project! { + +pin_project! { /// A stream that yields elements by calling an async closure with the previous value as an /// argument /// From 4c09cdbeace41a6d44ac7017275330dbf8c0ba55 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Sun, 10 Nov 2019 18:03:07 +0100 Subject: [PATCH 11/22] Mark successors as unstable --- src/stream/mod.rs | 3 ++- src/stream/successors.rs | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index d980f6c..47635ee 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -317,7 +317,6 @@ mod from_iter; mod once; mod repeat; mod repeat_with; -mod successors; cfg_unstable! { mod double_ended_stream; @@ -328,6 +327,7 @@ cfg_unstable! { mod interval; mod into_stream; mod product; + mod successors; mod sum; pub use double_ended_stream::DoubleEndedStream; @@ -339,5 +339,6 @@ cfg_unstable! { pub use into_stream::IntoStream; pub use product::Product; pub use stream::Merge; + pub use successors::{successors, Successors}; pub use sum::Sum; } diff --git a/src/stream/successors.rs b/src/stream/successors.rs index fb4e1c6..0295b33 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -16,6 +16,8 @@ pin_project! { /// This stream is constructed by [`successors`] function /// /// [`succcessors`]: fn.succssors.html + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] pub struct Successors where @@ -65,6 +67,8 @@ pin_project! { /// # }) } /// /// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub fn successors(first: Option, succ: F) -> Successors where F: FnMut(T) -> Fut, From bfb42b432ee636ab4005df3d24a70f5729363af9 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Mon, 11 Nov 2019 09:07:48 +0100 Subject: [PATCH 12/22] Rearrange docs to match 'repeat' --- src/stream/successors.rs | 46 +++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/src/stream/successors.rs b/src/stream/successors.rs index 0295b33..f7d5bd8 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -7,33 +7,10 @@ use crate::task::{Context, Poll, ready}; use pin_project_lite::pin_project; - - -pin_project! { - /// A stream that yields elements by calling an async closure with the previous value as an - /// argument - /// - /// This stream is constructed by [`successors`] function - /// - /// [`succcessors`]: fn.succssors.html - #[cfg(feature = "unstable")] - #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - #[derive(Debug)] - pub struct Successors - where - Fut: Future>, - { - succ: F, - #[pin] - future: Option, - slot: Option, - } -} - /// Creates a new stream where to produce each new element a closure is called with the previous /// value. /// -/// #Examples +/// # Examples /// /// ``` /// # fn main() { async_std::task::block_on(async { @@ -82,6 +59,27 @@ where } } +pin_project! { + /// A stream that yields elements by calling an async closure with the previous value as an + /// argument + /// + /// This stream is constructed by [`successors`] function + /// + /// [`successors`]: fn.succssors.html + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[derive(Debug)] + pub struct Successors + where + Fut: Future>, + { + succ: F, + #[pin] + future: Option, + slot: Option, + } +} + impl Stream for Successors where Fut: Future>, From 7677e9a3dfdada2bf8f97eadb286258c194c1e4b Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Mon, 11 Nov 2019 09:13:57 +0100 Subject: [PATCH 13/22] Make the closure take a borrow to the value --- src/stream/successors.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/stream/successors.rs b/src/stream/successors.rs index f7d5bd8..446ffe5 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -18,7 +18,7 @@ use pin_project_lite::pin_project; /// use async_std::prelude::*; /// use async_std::stream; /// -/// let s = stream::successors(Some(22), |val| { +/// let s = stream::successors(Some(22), |&val| { /// async move { /// Some(val + 1) /// } @@ -31,9 +31,9 @@ use pin_project_lite::pin_project; /// assert_eq!(s.next().await, Some(25)); /// /// -///let never = stream::successors(None, |val: usize| { +///let never = stream::successors(None, |_| { /// async move { -/// Some(val + 1) +/// Some(1) /// } /// }); /// @@ -48,7 +48,7 @@ use pin_project_lite::pin_project; #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub fn successors(first: Option, succ: F) -> Successors where - F: FnMut(T) -> Fut, + F: FnMut(&T) -> Fut, Fut: Future>, T: Copy, { @@ -83,7 +83,7 @@ pin_project! { impl Stream for Successors where Fut: Future>, - F: FnMut(T) -> Fut, + F: FnMut(&T) -> Fut, T: Copy, { type Item = T; @@ -96,7 +96,7 @@ where } if this.future.is_none() { - let fut = (this.succ)(this.slot.unwrap()); + let fut = (this.succ)(&this.slot.unwrap()); this.future.set(Some(fut)); } From f14b37ff17618a72dbb441cb1a33cb122c70339d Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Mon, 11 Nov 2019 09:15:38 +0100 Subject: [PATCH 14/22] Remoe the T: Copy bound on the item --- src/stream/successors.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/stream/successors.rs b/src/stream/successors.rs index 446ffe5..e86512b 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -50,7 +50,6 @@ pub fn successors(first: Option, succ: F) -> Successors where F: FnMut(&T) -> Fut, Fut: Future>, - T: Copy, { Successors { succ: succ, @@ -84,7 +83,6 @@ impl Stream for Successors where Fut: Future>, F: FnMut(&T) -> Fut, - T: Copy, { type Item = T; @@ -96,7 +94,7 @@ where } if this.future.is_none() { - let fut = (this.succ)(&this.slot.unwrap()); + let fut = (this.succ)(this.slot.as_ref().unwrap()); this.future.set(Some(fut)); } From 786a52a09d40bb9303f237c6bac756132e1651c9 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Thu, 14 Nov 2019 21:37:51 +0100 Subject: [PATCH 15/22] Slight miss-merge --- src/stream/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 47635ee..d8b96ec 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -307,7 +307,6 @@ pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; pub use stream::*; -pub use successor::{successor, Successor}; pub(crate) mod stream; From 64216b8e6bf24ccb95a296650e08cc56cc59ad74 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Thu, 14 Nov 2019 21:49:24 +0100 Subject: [PATCH 16/22] Take a normal closure, not an async one --- src/stream/successors.rs | 51 ++++++++++------------------------------ 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/src/stream/successors.rs b/src/stream/successors.rs index e86512b..d5840ee 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -1,9 +1,8 @@ use std::pin::Pin; use std::mem; -use crate::future::Future; use crate::stream::Stream; -use crate::task::{Context, Poll, ready}; +use crate::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -18,11 +17,7 @@ use pin_project_lite::pin_project; /// use async_std::prelude::*; /// use async_std::stream; /// -/// let s = stream::successors(Some(22), |&val| { -/// async move { -/// Some(val + 1) -/// } -/// }); +/// let s = stream::successors(Some(22), |&val| Some(val + 1) ); /// /// pin_utils::pin_mut!(s); /// assert_eq!(s.next().await, Some(22)); @@ -30,30 +25,18 @@ use pin_project_lite::pin_project; /// assert_eq!(s.next().await, Some(24)); /// assert_eq!(s.next().await, Some(25)); /// -/// -///let never = stream::successors(None, |_| { -/// async move { -/// Some(1) -/// } -/// }); -/// -/// pin_utils::pin_mut!(never); -/// assert_eq!(never.next().await, None); -/// assert_eq!(never.next().await, None); /// # /// # }) } /// /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] -pub fn successors(first: Option, succ: F) -> Successors +pub fn successors(first: Option, succ: F) -> Successors where - F: FnMut(&T) -> Fut, - Fut: Future>, + F: FnMut(&T) -> Option, { Successors { - succ: succ, - future: None, + succ, slot: first, } } @@ -68,39 +51,29 @@ pin_project! { #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] - pub struct Successors + pub struct Successors where - Fut: Future>, + F: FnMut(&T) -> Option { succ: F, - #[pin] - future: Option, slot: Option, } } -impl Stream for Successors +impl Stream for Successors where - Fut: Future>, - F: FnMut(&T) -> Fut, + F: FnMut(&T) -> Option, { type Item = T; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.project(); if this.slot.is_none() { return Poll::Ready(None); } - if this.future.is_none() { - let fut = (this.succ)(this.slot.as_ref().unwrap()); - this.future.set(Some(fut)); - } - - let mut next = ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); - - this.future.set(None); + let mut next = (this.succ)(&this.slot.as_ref().unwrap()); // 'swapping' here means 'slot' will hold the next value and next will be th one from the previous iteration mem::swap(this.slot, &mut next); From de67bf0fd4449e3eec9058238c3e682101f8a18f Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 15 Nov 2019 11:17:39 +0900 Subject: [PATCH 17/22] feat: Add stream by_ref --- src/stream/stream/mod.rs | 46 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 281e4d8..2bef88f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1404,6 +1404,52 @@ extension_trait! { } } + #[doc = r#" + Borrows an stream, rather than consuming it. + + This is useful to allow applying stream adaptors while still retaining ownership of the original stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let a = vec![1isize, 2, 3]; + + let stream = stream::from_iter(a); + + let sum: isize = stream.take(5).sum().await; + + assert_eq!(sum, 6); + + // if we try to use stream again, it won't work. The following line + // gives "error: use of moved value: `stream` + // assert_eq!(stream.next(), None); + + // let's try that again + let a = vec![1isize, 2, 3]; + + let mut stream = stream::from_iter(a); + + // instead, we add in a .by_ref() + let sum: isize = stream.by_ref().take(2).sum().await; + + assert_eq!(sum, 3); + + // now this is just fine: + assert_eq!(stream.next().await, Some(3)); + assert_eq!(stream.next().await, None); + # + # }) } + ``` + "#] + fn by_ref(&mut self) -> &mut Self { + self + } + #[doc = r#" A stream adaptor similar to [`fold`] that holds internal state and produces a new stream. From df92c633375f43d319d01792476e073df21b2c21 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 00:29:54 +0900 Subject: [PATCH 18/22] fix: Add unstable features --- src/stream/stream/mod.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2bef88f..bc2482d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1446,6 +1446,8 @@ extension_trait! { # }) } ``` "#] + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn by_ref(&mut self) -> &mut Self { self } From 3564be9c0ce12ba3cebcd77bd9f31e51af6807d8 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Nov 2019 16:58:33 +0100 Subject: [PATCH 19/22] update futures-timer dep Signed-off-by: Yoshua Wuyts --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index e920739..b8d24df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -59,7 +59,7 @@ crossbeam-deque = { version = "0.7.1", optional = true } crossbeam-utils = { version = "0.6.6", optional = true } futures-core = { version = "0.3.0", optional = true } futures-io = { version = "0.3.0", optional = true } -futures-timer = { version = "1.0.2", optional = true } +futures-timer = { version = "2.0.2", optional = true } kv-log-macro = { version = "1.0.4", optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true } memchr = { version = "2.2.1", optional = true } From 8779c04dc7d0dd1ea8ede5b03fe531931219e2a1 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Nov 2019 16:59:58 +0100 Subject: [PATCH 20/22] upgrade all deps Signed-off-by: Yoshua Wuyts --- Cargo.toml | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b8d24df..7ffaae3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,33 +50,33 @@ std = [ ] [dependencies] -async-attributes = { version = "1.1.0", optional = true } +async-attributes = { version = "1.1.1", optional = true } async-macros = { version = "2.0.0", optional = true } async-task = { version = "1.0.0", optional = true } broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] } -crossbeam-channel = { version = "0.3.9", optional = true } -crossbeam-deque = { version = "0.7.1", optional = true } -crossbeam-utils = { version = "0.6.6", optional = true } -futures-core = { version = "0.3.0", optional = true } -futures-io = { version = "0.3.0", optional = true } +crossbeam-channel = { version = "0.4.0", optional = true } +crossbeam-deque = { version = "0.7.2", optional = true } +crossbeam-utils = { version = "0.7.0", optional = true } +futures-core = { version = "0.3.1", optional = true } +futures-io = { version = "0.3.1", optional = true } futures-timer = { version = "2.0.2", optional = true } kv-log-macro = { version = "1.0.4", optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true } memchr = { version = "2.2.1", optional = true } mio = { version = "0.6.19", optional = true } mio-uds = { version = "0.6.7", optional = true } -num_cpus = { version = "1.10.1", optional = true } +num_cpus = { version = "1.11.1", optional = true } once_cell = { version = "1.2.0", optional = true } -pin-project-lite = { version = "0.1", optional = true } +pin-project-lite = { version = "0.1.1", optional = true } pin-utils = { version = "0.1.0-alpha.4", optional = true } slab = { version = "0.4.2", optional = true } [dev-dependencies] -femme = "1.2.0" +femme = "1.3.0" rand = "0.7.2" surf = "1.0.3" tempdir = "0.3.7" -futures = "0.3.0" +futures = "0.3.1" [[test]] name = "stream" From a69b3a8a9e215c689bfde3e07d5b50fe2ecc08e7 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Sat, 16 Nov 2019 00:54:50 +0800 Subject: [PATCH 21/22] use `as_mut` for stream-partition --- src/stream/stream/partition.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/stream/stream/partition.rs b/src/stream/stream/partition.rs index ba4938c..7374456 100644 --- a/src/stream/stream/partition.rs +++ b/src/stream/stream/partition.rs @@ -45,13 +45,11 @@ where match next { Some(v) => { - let mut res = this.res.take().unwrap(); + let res = this.res.as_mut().unwrap(); match (this.f)(&v) { true => res.0.extend(Some(v)), false => res.1.extend(Some(v)), }; - - *this.res = Some(res); } None => return Poll::Ready(this.res.take().unwrap()), } From d68dc659b254569bae4bf4694d84ff5afaa46b1b Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 15 Nov 2019 18:08:00 +0100 Subject: [PATCH 22/22] remove pin_mut from successors test Signed-off-by: Yoshua Wuyts --- src/stream/successors.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/stream/successors.rs b/src/stream/successors.rs index d5840ee..4421564 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -1,5 +1,5 @@ -use std::pin::Pin; use std::mem; +use std::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; @@ -17,9 +17,8 @@ use pin_project_lite::pin_project; /// use async_std::prelude::*; /// use async_std::stream; /// -/// let s = stream::successors(Some(22), |&val| Some(val + 1) ); +/// let mut s = stream::successors(Some(22), |&val| Some(val + 1)); /// -/// pin_utils::pin_mut!(s); /// assert_eq!(s.next().await, Some(22)); /// assert_eq!(s.next().await, Some(23)); /// assert_eq!(s.next().await, Some(24)); @@ -27,7 +26,6 @@ use pin_project_lite::pin_project; /// /// # /// # }) } -/// /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] @@ -35,10 +33,7 @@ pub fn successors(first: Option, succ: F) -> Successors where F: FnMut(&T) -> Option, { - Successors { - succ, - slot: first, - } + Successors { succ, slot: first } } pin_project! {