It compiles! Store the future and poll it instead of creating multiple new ones

new-scheduler
Felipe Sere 5 years ago
parent fe3c9ef626
commit 02b261de10

@ -303,6 +303,7 @@
pub use empty::{empty, Empty};
pub use from_fn::{from_fn, FromFn};
pub use from_iter::{from_iter, FromIter};
pub use successor::{successor, Successor};
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use repeat_with::{repeat_with, RepeatWith};
@ -316,6 +317,7 @@ mod from_iter;
mod once;
mod repeat;
mod repeat_with;
mod successor;
cfg_unstable! {
mod double_ended_stream;

@ -57,7 +57,6 @@ mod partial_cmp;
mod position;
mod scan;
mod skip;
mod successor;
mod skip_while;
mod step_by;
mod take;

@ -1,59 +0,0 @@
use std::pin::Pin;
use std::marker::PhantomData;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[derive(Debug)]
pub struct Successor<F, Fut, T>
where Fut: Future<Output=T>
{
successor: F,
next: T,
_marker: PhantomData<Fut>
}
pub fn successor<F, Fut, T>(func: F, start: T) -> Successor<F, Fut, T>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = T>,
T: Copy,
{
Successor {
successor: func,
next: start,
_marker: PhantomData,
}
}
impl <F, Fut, T> Successor<F, Fut, T>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = T>,
T: Copy,
{
pin_utils::unsafe_unpinned!(successor: F);
pin_utils::unsafe_unpinned!(next: T);
}
impl <F, Fut, T> Stream for Successor<F, Fut, T>
where
Fut: Future<Output = T>,
F: FnMut(T) -> Fut,
T: Copy,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().successor()(self.next).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(val) => {
self.next = val;
Poll::Ready(Some(val))
}
}
}
}

@ -0,0 +1,102 @@
use std::pin::Pin;
use std::marker::PhantomData;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A stream that yields elements by calling an async closure with the previous value as an
/// argument
///
/// This stream is constructed by [`successor`] function
///
/// [`successor`]: fn.successor.html
#[derive(Debug)]
pub struct Successor<F, Fut, T>
where Fut: Future<Output=T>
{
successor: F,
future: Option<Fut>,
next: T,
_marker: PhantomData<Fut>
}
/// Creates a new stream where to produce each new element a clousre is called with the previous
/// value.
///
/// #Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::successor(22, |val| {
/// async move {
/// val + 1
/// }
/// });
///
/// pin_utils::pin_mut!(s);
/// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(3));
/// #
/// # }) }
///
/// ```
pub fn successor<F, Fut, T>(start: T, func: F) -> Successor<F, Fut, T>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = T>,
T: Copy,
{
Successor {
successor: func,
future: None,
next: start,
_marker: PhantomData,
}
}
impl <F, Fut, T> Successor<F, Fut, T>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = T>,
T: Copy,
{
pin_utils::unsafe_unpinned!(successor: F);
pin_utils::unsafe_unpinned!(next: T);
pin_utils::unsafe_pinned!(future: Option<Fut>);
}
impl <F, Fut, T> Stream for Successor<F, Fut, T>
where
Fut: Future<Output = T>,
F: FnMut(T) -> Fut,
T: Copy,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &self.future {
Some(_) => {
let next = futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
self.as_mut().future().set(None);
Poll::Ready(Some(next))
},
None => {
let x = self.next;
let fut = (self.as_mut().successor())(x);
self.as_mut().future().set(Some(fut));
// Probably can poll the value here?
Poll::Pending
}
}
}
}
Loading…
Cancel
Save