diff --git a/src/stream/stream/cycle.rs b/src/stream/stream/cycle.rs new file mode 100644 index 0000000..8a31cc1 --- /dev/null +++ b/src/stream/stream/cycle.rs @@ -0,0 +1,73 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + /// A stream that will repeatedly yield the same list of elements + pub struct Cycle { + #[pin] + source: S, + index: usize, + buffer: Vec, + state: CycleState, + } +} + +#[derive(Eq, PartialEq)] +enum CycleState { + FromStream, + FromBuffer, +} + +impl Cycle +where + S: Stream, + S::Item: Clone, +{ + pub fn new(source: S) -> Cycle { + Cycle { + source, + index: 0, + buffer: Vec::new(), + state: CycleState::FromStream, + } + } +} + +impl Stream for Cycle +where + S: Stream, + S::Item: Clone, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + let mut next; + if *this.state == CycleState::FromStream { + next = futures_core::ready!(this.source.poll_next(cx)); + + if let Some(val) = next { + this.buffer.push(val.clone()); + next = Some(val) + } else { + *this.state = CycleState::FromBuffer; + next = this.buffer.get(*this.index).cloned(); + } + } else { + let mut index = *this.index; + if index == this.buffer.len() { + index = 0 + } + next = Some(this.buffer[index].clone()); + + *this.index = index + 1; + } + + Poll::Ready(next) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2411c98..f9a9e3a 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -27,6 +27,7 @@ mod chain; mod cloned; mod cmp; mod copied; +mod cycle; mod enumerate; mod eq; mod filter; @@ -66,6 +67,7 @@ mod zip; use all::AllFuture; use any::AnyFuture; use cmp::CmpFuture; +use cycle::Cycle; use enumerate::Enumerate; use eq::EqFuture; use filter_map::FilterMap; @@ -448,6 +450,38 @@ extension_trait! { Copied::new(self) } + #[doc = r#" + Creates a stream that yields the provided values infinitely and in order. + + # Examples + + Basic usage: + + ``` + # async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let mut s = stream::once(7).cycle(); + + assert_eq!(s.next().await, Some(7)); + assert_eq!(s.next().await, Some(7)); + assert_eq!(s.next().await, Some(7)); + assert_eq!(s.next().await, Some(7)); + assert_eq!(s.next().await, Some(7)); + # + # }) + ``` + "#] + fn cycle(self) -> Cycle + where + Self: Sized, + Self::Item: Clone, + { + Cycle::new(self) + } + #[doc = r#" Creates a stream that gives the current element's count as well as the next value.