From 23beab412506dd441acce18b3599fb4cd5abbb24 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Tue, 15 Oct 2019 16:50:17 +0300 Subject: [PATCH] Adds a from_fn stream implementation (#277) * Adds a from_fn stream implementation * Update src/stream/from_fn.rs Co-Authored-By: Yoshua Wuyts * Fix review nits * Use async_std Mutex --- src/stream/from_fn.rs | 100 ++++++++++++++++++++++++++++++++++++++++++ src/stream/mod.rs | 2 + 2 files changed, 102 insertions(+) create mode 100644 src/stream/from_fn.rs diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs new file mode 100644 index 0000000..c1cb97a --- /dev/null +++ b/src/stream/from_fn.rs @@ -0,0 +1,100 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that yields elements by calling a closure. +/// +/// This stream is constructed by [`from_fn`] function. +/// +/// [`from_fn`]: fn.from_fn.html +#[derive(Debug)] +pub struct FromFn { + f: F, + future: Option, + __t: PhantomData, +} + +/// Creates a new stream where to produce each new element a provided closure is called. +/// +/// This allows creating a custom stream with any behaviour without using the more verbose +/// syntax of creating a dedicated type and implementing a `Stream` trait for it. +/// +/// # Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::sync::Mutex; +/// use std::sync::Arc; +/// use async_std::stream; +/// +/// let count = Arc::new(Mutex::new(0u8)); +/// let s = stream::from_fn(|| { +/// let count = Arc::clone(&count); +/// +/// async move { +/// *count.lock().await += 1; +/// +/// if *count.lock().await > 3 { +/// None +/// } else { +/// Some(*count.lock().await) +/// } +/// } +/// }); +/// +/// pin_utils::pin_mut!(s); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(2)); +/// assert_eq!(s.next().await, Some(3)); +/// assert_eq!(s.next().await, None); +/// # +/// # }) } +/// +/// ``` +pub fn from_fn(f: F) -> FromFn +where + F: FnMut() -> Fut, + Fut: Future>, +{ + FromFn { + f, + future: None, + __t: PhantomData, + } +} + +impl FromFn { + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); +} + +impl Stream for FromFn +where + F: FnMut() -> Fut, + Fut: Future>, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match &self.future { + Some(_) => { + let next = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + self.as_mut().future().set(None); + + return Poll::Ready(next); + } + None => { + let fut = (self.as_mut().f())(); + self.as_mut().future().set(Some(fut)); + } + } + } + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f34e9dc..87f3f84 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -24,6 +24,7 @@ use cfg_if::cfg_if; pub use empty::{empty, Empty}; +pub use from_fn::{from_fn, FromFn}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use stream::{ @@ -33,6 +34,7 @@ pub use stream::{ pub(crate) mod stream; mod empty; +mod from_fn; mod once; mod repeat;