diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 7e8939b..3b5f461 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -27,6 +27,7 @@ pub use empty::{empty, Empty}; pub use from_fn::{from_fn, FromFn}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; +pub use repeat_with::{repeat_with, RepeatWith}; pub use stream::{ Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip, }; @@ -37,6 +38,7 @@ mod empty; mod from_fn; mod once; mod repeat; +mod repeat_with; cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs new file mode 100644 index 0000000..f38b323 --- /dev/null +++ b/src/stream/repeat_with.rs @@ -0,0 +1,103 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that repeats elements of type `T` endlessly by applying a provided closure. +/// +/// This stream is constructed by the [`repeat_with`] function. +/// +/// [`repeat_with`]: fn.repeat_with.html +#[derive(Debug)] +pub struct RepeatWith { + f: F, + future: Option, + __a: PhantomData, +} + +/// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure. +/// +/// # Examples +/// +/// Basic usage: +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let s = stream::repeat_with(|| async { 1 }); +/// +/// pin_utils::pin_mut!(s); +/// +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(1)); +/// # }) } +/// ``` +/// +/// Going finite: +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let s = stream::repeat_with(|| async { 1u8 }).take(2); +/// +/// pin_utils::pin_mut!(s); +/// +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, None); +/// # }) } +/// ``` +pub fn repeat_with(repeater: F) -> RepeatWith +where + F: FnMut() -> Fut, + Fut: Future, +{ + RepeatWith { + f: repeater, + future: None, + __a: PhantomData, + } +} + +impl RepeatWith { + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); +} + +impl Stream for RepeatWith +where + F: FnMut() -> Fut, + Fut: Future, +{ + type Item = A; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match &self.future { + Some(_) => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + + self.as_mut().future().set(None); + + return Poll::Ready(Some(res)); + } + None => { + let fut = (self.as_mut().f())(); + + self.as_mut().future().set(Some(fut)); + } + } + } + } +}