From 02b261de10d09c699fb949d3ac5347282756e1d8 Mon Sep 17 00:00:00 2001 From: Felipe Sere Date: Thu, 17 Oct 2019 23:27:41 +0200 Subject: [PATCH] It compiles! Store the future and poll it instead of creating multiple new ones --- src/stream/mod.rs | 2 + src/stream/stream/mod.rs | 1 - src/stream/stream/successor.rs | 59 ------------------- src/stream/successor.rs | 102 +++++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 60 deletions(-) delete mode 100644 src/stream/stream/successor.rs create mode 100644 src/stream/successor.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f782882..f410e08 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -303,6 +303,7 @@ pub use empty::{empty, Empty}; pub use from_fn::{from_fn, FromFn}; pub use from_iter::{from_iter, FromIter}; +pub use successor::{successor, Successor}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; @@ -316,6 +317,7 @@ mod from_iter; mod once; mod repeat; mod repeat_with; +mod successor; cfg_unstable! { mod double_ended_stream; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b5583e1..d6292c3 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -57,7 +57,6 @@ mod partial_cmp; mod position; mod scan; mod skip; -mod successor; mod skip_while; mod step_by; mod take; diff --git a/src/stream/stream/successor.rs b/src/stream/stream/successor.rs deleted file mode 100644 index 519729f..0000000 --- a/src/stream/stream/successor.rs +++ /dev/null @@ -1,59 +0,0 @@ -use std::pin::Pin; -use std::marker::PhantomData; - -use crate::future::Future; -use crate::stream::Stream; -use crate::task::{Context, Poll}; - -#[derive(Debug)] -pub struct Successor -where Fut: Future -{ - successor: F, - next: T, - _marker: PhantomData -} - -pub fn successor(func: F, start: T) -> Successor -where - F: FnMut(T) -> Fut, - Fut: Future, - T: Copy, - { - Successor { - successor: func, - next: start, - _marker: PhantomData, - } - } - -impl Successor -where - F: FnMut(T) -> Fut, - Fut: Future, - T: Copy, - -{ - pin_utils::unsafe_unpinned!(successor: F); - pin_utils::unsafe_unpinned!(next: T); -} - -impl Stream for Successor -where - Fut: Future, - F: FnMut(T) -> Fut, - T: Copy, -{ - type Item = T; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - - match self.as_mut().successor()(self.next).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(val) => { - self.next = val; - Poll::Ready(Some(val)) - } - } - } -} diff --git a/src/stream/successor.rs b/src/stream/successor.rs new file mode 100644 index 0000000..434ef97 --- /dev/null +++ b/src/stream/successor.rs @@ -0,0 +1,102 @@ +use std::pin::Pin; +use std::marker::PhantomData; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that yields elements by calling an async closure with the previous value as an +/// argument +/// +/// This stream is constructed by [`successor`] function +/// +/// [`successor`]: fn.successor.html +#[derive(Debug)] +pub struct Successor +where Fut: Future +{ + successor: F, + future: Option, + next: T, + _marker: PhantomData +} + +/// Creates a new stream where to produce each new element a clousre is called with the previous +/// value. +/// +/// #Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let s = stream::successor(22, |val| { +/// async move { +/// val + 1 +/// } +/// }); +/// +/// pin_utils::pin_mut!(s); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(2)); +/// assert_eq!(s.next().await, Some(3)); +/// # +/// # }) } +/// +/// ``` +pub fn successor(start: T, func: F) -> Successor +where + F: FnMut(T) -> Fut, + Fut: Future, + T: Copy, + { + Successor { + successor: func, + future: None, + next: start, + _marker: PhantomData, + } + } + +impl Successor +where + F: FnMut(T) -> Fut, + Fut: Future, + T: Copy, + +{ + pin_utils::unsafe_unpinned!(successor: F); + pin_utils::unsafe_unpinned!(next: T); + pin_utils::unsafe_pinned!(future: Option); + +} + +impl Stream for Successor +where + Fut: Future, + F: FnMut(T) -> Fut, + T: Copy, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match &self.future { + Some(_) => { + let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + self.as_mut().future().set(None); + + Poll::Ready(Some(next)) + }, + None => { + let x = self.next; + let fut = (self.as_mut().successor())(x); + self.as_mut().future().set(Some(fut)); + // Probably can poll the value here? + Poll::Pending + } + } + } +} +