diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f782882..d8b96ec 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -326,6 +326,7 @@ cfg_unstable! { mod interval; mod into_stream; mod product; + mod successors; mod sum; pub use double_ended_stream::DoubleEndedStream; @@ -337,5 +338,6 @@ cfg_unstable! { pub use into_stream::IntoStream; pub use product::Product; pub use stream::Merge; + pub use successors::{successors, Successors}; pub use sum::Sum; } diff --git a/src/stream/successors.rs b/src/stream/successors.rs new file mode 100644 index 0000000..d5840ee --- /dev/null +++ b/src/stream/successors.rs @@ -0,0 +1,82 @@ +use std::pin::Pin; +use std::mem; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +/// Creates a new stream where to produce each new element a closure is called with the previous +/// value. +/// +/// # Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let s = stream::successors(Some(22), |&val| Some(val + 1) ); +/// +/// pin_utils::pin_mut!(s); +/// assert_eq!(s.next().await, Some(22)); +/// assert_eq!(s.next().await, Some(23)); +/// assert_eq!(s.next().await, Some(24)); +/// assert_eq!(s.next().await, Some(25)); +/// +/// # +/// # }) } +/// +/// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub fn successors(first: Option, succ: F) -> Successors +where + F: FnMut(&T) -> Option, +{ + Successors { + succ, + slot: first, + } +} + +pin_project! { + /// A stream that yields elements by calling an async closure with the previous value as an + /// argument + /// + /// This stream is constructed by [`successors`] function + /// + /// [`successors`]: fn.succssors.html + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[derive(Debug)] + pub struct Successors + where + F: FnMut(&T) -> Option + { + succ: F, + slot: Option, + } +} + +impl Stream for Successors +where + F: FnMut(&T) -> Option, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + if this.slot.is_none() { + return Poll::Ready(None); + } + + let mut next = (this.succ)(&this.slot.as_ref().unwrap()); + + // 'swapping' here means 'slot' will hold the next value and next will be th one from the previous iteration + mem::swap(this.slot, &mut next); + Poll::Ready(next) + } +}