From 735d604cd14422fcb7f974130d616993d73ce190 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sat, 5 Oct 2019 22:17:21 +0300 Subject: [PATCH] Adds stream::repeat_with --- src/stream/mod.rs | 2 + src/stream/repeat_with.rs | 103 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 src/stream/repeat_with.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 8aa12a2..f2fcdeb 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -26,6 +26,7 @@ use cfg_if::cfg_if; pub use empty::{empty, Empty}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; +pub use repeat_with::{repeat_with, RepeatWith}; pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, Zip}; pub(crate) mod stream; @@ -33,6 +34,7 @@ pub(crate) mod stream; mod empty; mod once; mod repeat; +mod repeat_with; cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs new file mode 100644 index 0000000..b682dfa --- /dev/null +++ b/src/stream/repeat_with.rs @@ -0,0 +1,103 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that repeats elements of type `T` endlessly by applying a provided clousre. +/// +/// This stream is constructed by the [`repeat_with`] function. +/// +/// [`repeat_with`]: fn.repeat_with.html +#[derive(Debug)] +pub struct RepeatWith { + f: F, + future: Option, + __a: PhantomData, +} + +/// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure. +/// +/// # Examples +/// +/// Basic usage: +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let s = stream::repeat_with(|| async { 1 }); +/// +/// +/// pin_utils::pin_mut!(s); +/// +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(1)); +/// +/// # }) } +/// ``` +/// +/// Going finite: +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let s = stream::repeat_with(|| async { 1u8 }).take(2); +/// +/// +/// pin_utils::pin_mut!(s); +/// +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, None); +/// +/// # }) } +/// ``` +pub fn repeat_with(repeater: F) -> RepeatWith { + RepeatWith { + f: repeater, + future: None, + __a: PhantomData, + } +} + +impl RepeatWith { + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); +} + +impl Stream for RepeatWith +where + F: FnMut() -> Fut, + Fut: Future, +{ + type Item = A; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match self.future.is_some() { + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + + self.as_mut().future().set(None); + + return Poll::Ready(Some(res)); + } + false => { + let fut = (self.as_mut().f())(); + + self.as_mut().future().set(Some(fut)); + } + } + } + } +}