From b591fc68bdee365cfc1f6228c2d2f264b1b2f6e8 Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Mon, 11 Nov 2019 12:17:00 +0100 Subject: [PATCH] Changed semantics of throttle to non-dropping variant with backpressure --- examples/throttle.rs | 27 ++++++++++++++++++++++++++ src/stream/stream/mod.rs | 29 ++++++++++++++++++++++++++-- src/stream/stream/throttle.rs | 36 +++++++++++++---------------------- 3 files changed, 67 insertions(+), 25 deletions(-) create mode 100644 examples/throttle.rs diff --git a/examples/throttle.rs b/examples/throttle.rs new file mode 100644 index 0000000..1b9a6f2 --- /dev/null +++ b/examples/throttle.rs @@ -0,0 +1,27 @@ +//! Spawns a timed task which gets throttled. + +fn main() { + #[cfg(feature = "unstable")] + { + use async_std::prelude::*; + use async_std::task; + + task::block_on(async { + use async_std::stream; + use std::time::Duration; + + // emit value every 1 second + let s = stream::interval(Duration::from_nanos(1000000)).enumerate(); + + // throttle for 2 seconds + let s = s.throttle(Duration::from_secs(2)); + + s.for_each(|(n, _)| { + dbg!(n); + }) + .await; + // => 0 .. 1 .. 2 .. 3 + // with a pause of 2 seconds between each print + }) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8c39eb9..8b30c5e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -117,7 +117,6 @@ use std::time::Duration; cfg_unstable! { use std::future::Future; use std::pin::Pin; - use std::time::Duration; use crate::stream::into_stream::IntoStream; use crate::stream::{FromStream, Product, Sum}; @@ -316,7 +315,33 @@ extension_trait! { TakeWhile::new(self, predicate) } - fn throttle(self, d: Duration) -> Throttle + #[doc = r#" + Limit the amount of items yielded per timeslice in a stream. + + # Examples + ```ignore + # fn main() { async_std::task::block_on(async { + # + use async_std::stream; + use std::time::Duration; + + // emit value every 1 second + let s = stream::interval(Duration::from_nanos(1000000)).enumerate(); + + // throttle for 2 seconds + let s = s.throttle(Duration::from_secs(2)); + + s.for_each(|(n, _)| { + dbg!(n); + }) + .await; + // => 0 .. 1 .. 2 .. 3 + // with a pause of 2 seconds between each print + # + # }) } + ``` + "#] + fn throttle(self, d: Duration) -> Throttle where Self: Sized, { diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 2a0cc56..010839c 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -7,60 +7,50 @@ use futures_timer::Delay; use crate::stream::Stream; use crate::task::{Context, Poll}; -/// A stream that only yields one element once every `duration`, and drops all others. +/// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements. /// #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct Throttle { +pub struct Throttle { stream: S, duration: Duration, delay: Option, - last: Option, } -impl Unpin for Throttle {} +impl Unpin for Throttle {} -impl Throttle { +impl Throttle { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(duration: Duration); pin_utils::unsafe_pinned!(delay: Option); - pin_utils::unsafe_unpinned!(last: Option); pub(super) fn new(stream: S, duration: Duration) -> Self { Throttle { stream, duration, delay: None, - last: None, } } } -impl Stream for Throttle { +impl Stream for Throttle { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Some(d) = self.as_mut().delay().as_pin_mut() { if d.poll(cx).is_ready() { - if let Some(v) = self.as_mut().last().take() { - // Sets last to None. - *self.as_mut().delay() = Some(Delay::new(self.duration)); - return Poll::Ready(Some(v)); - } else { - *self.as_mut().delay() = None; - } + *self.as_mut().delay() = None; + } else { + return Poll::Pending; } } match self.as_mut().stream().poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => { + cx.waker().wake_by_ref(); // Continue driving even though emitting Pending + Poll::Pending + } + Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { - if self.as_mut().delay().is_some() { - *self.as_mut().last() = Some(v); - cx.waker().wake_by_ref(); // Continue driving even though emitting Pending - return Poll::Pending; - } - *self.as_mut().delay() = Some(Delay::new(self.duration)); Poll::Ready(Some(v)) }