diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index cc1646cd..48a1a201 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -318,7 +318,7 @@ extension_trait! { #[doc = r#" Limit the amount of items yielded per timeslice in a stream. -This stream does not drop any items, but will only limit the rate at which items pass through. + This stream does not drop any items, but will only limit the rate at which items pass through. # Examples ``` # fn main() { async_std::task::block_on(async { diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 44ce7e16..8896899e 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -1,6 +1,6 @@ use std::future::Future; use std::pin::Pin; -use std::time::Duration; +use std::time::{Duration, Instant}; use futures_timer::Delay; use pin_project_lite::pin_project; @@ -23,7 +23,9 @@ pin_project! { stream: S, duration: Duration, #[pin] - delay: Option, + blocked: bool, + #[pin] + delay: Delay, } } @@ -32,7 +34,8 @@ impl Throttle { Throttle { stream, duration, - delay: None, + blocked: false, + delay: Delay::new(Duration::default()), } } } @@ -42,9 +45,10 @@ impl Stream for Throttle { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - if let Some(d) = this.delay.as_mut().as_pin_mut() { + if *this.blocked { + let d = this.delay.as_mut(); if d.poll(cx).is_ready() { - this.delay.set(None); + *this.blocked = false; } else { return Poll::Pending; } @@ -57,7 +61,8 @@ impl Stream for Throttle { } Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { - this.delay.set(Some(Delay::new(*this.duration))); + *this.blocked = true; + this.delay.reset(Instant::now() + *this.duration); Poll::Ready(Some(v)) } }