diff --git a/src/stream/cycle.rs b/src/stream/cycle.rs index b9dd87f9..1d16f855 100644 --- a/src/stream/cycle.rs +++ b/src/stream/cycle.rs @@ -7,24 +7,53 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that will repeatedly yield the same list of elements - pub struct Cycle { - source: Vec, + pub struct Cycle { + #[pin] + source: S, index: usize, - len: usize, + buffer: Vec, + state: CycleState, } } -impl Stream for Cycle { - type Item = T; +#[derive(Eq, PartialEq)] +enum CycleState { + FromStream, + FromBuffer, +} + +impl Stream for Cycle + where + S: Stream, + T: Copy, +{ + + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); - fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - let value = self.source[self.index]; + let mut next; + if CycleState::FromStream == *this.state { + next = futures_core::ready!(this.source.poll_next(cx)); - let next = self.index + 1; + if let Some(val) = next { + this.buffer.push(val); + } else { + *this.state = CycleState::FromBuffer; + next = Some(this.buffer[*this.index]); + } + } else { + let mut index = *this.index; + if index == this.buffer.len() { + index = 0 + } + next = Some(this.buffer[index]); - self.as_mut().index = next % self.len; + *this.index = index + 1; + } - Poll::Ready(Some(value)) + Poll::Ready(next) } } @@ -40,21 +69,21 @@ impl Stream for Cycle { /// use async_std::prelude::*; /// use async_std::stream; /// -/// let mut s = stream::cycle(vec![1,2,3]); +/// let mut s = stream::cycle(stream::once(7)); /// -/// assert_eq!(s.next().await, Some(1)); -/// assert_eq!(s.next().await, Some(2)); -/// assert_eq!(s.next().await, Some(3)); -/// assert_eq!(s.next().await, Some(1)); -/// assert_eq!(s.next().await, Some(2)); +/// assert_eq!(s.next().await, Some(7)); +/// assert_eq!(s.next().await, Some(7)); +/// assert_eq!(s.next().await, Some(7)); +/// assert_eq!(s.next().await, Some(7)); +/// assert_eq!(s.next().await, Some(7)); /// # /// # }) /// ``` -pub fn cycle(source: Vec) -> impl Stream { - let len = source.len(); +pub fn cycle, T: Copy>(source: S) -> impl Stream { Cycle { source, index: 0, - len, + buffer: Vec::new(), + state: CycleState::FromStream, } }