diff --git a/examples/throttle.rs b/examples/throttle.rs new file mode 100644 index 0000000..74c1fd3 --- /dev/null +++ b/examples/throttle.rs @@ -0,0 +1,27 @@ +//! Spawns a timed task which gets throttled. + +fn main() { + #[cfg(feature = "unstable")] + { + use async_std::prelude::*; + use async_std::task; + + task::block_on(async { + use async_std::stream; + use std::time::Duration; + + // emit value every 1 second + let s = stream::interval(Duration::from_secs(1)).enumerate(); + + // throttle for 2 seconds + let s = s.throttle(Duration::from_secs(2)); + + s.for_each(|(n, _)| { + dbg!(n); + }) + .await; + // => 0 .. 1 .. 2 .. 3 + // with a pause of 2 seconds between each print + }) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 5756a21..de4a8fb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -123,11 +123,13 @@ cfg_unstable! { pub use flatten::Flatten; pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; + pub use throttle::Throttle; mod merge; mod flatten; mod flat_map; mod timeout; + mod throttle; } extension_trait! { @@ -313,6 +315,56 @@ extension_trait! { TakeWhile::new(self, predicate) } + #[doc = r#" + Limit the amount of items yielded per timeslice in a stream. + + This stream does not drop any items, but will only limit the rate at which items pass through. + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + use std::time::{Duration, Instant}; + + let start = Instant::now(); + + // emit value every 5 milliseconds + let s = stream::interval(Duration::from_millis(5)) + .enumerate() + .take(3); + + // throttle for 10 milliseconds + let mut s = s.throttle(Duration::from_millis(10)); + + assert_eq!(s.next().await, Some((0, ()))); + let duration_ms = start.elapsed().as_millis(); + assert!(duration_ms >= 5); + + assert_eq!(s.next().await, Some((1, ()))); + let duration_ms = start.elapsed().as_millis(); + assert!(duration_ms >= 15); + + assert_eq!(s.next().await, Some((2, ()))); + let duration_ms = start.elapsed().as_millis(); + assert!(duration_ms >= 25); + + assert_eq!(s.next().await, None); + let duration_ms = start.elapsed().as_millis(); + assert!(duration_ms >= 35); + # + # }) } + ``` + "#] + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn throttle(self, d: Duration) -> Throttle + where + Self: Sized, + { + Throttle::new(self, d) + } + #[doc = r#" Creates a stream that yields each `step`th element. diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs new file mode 100644 index 0000000..8896899 --- /dev/null +++ b/src/stream/stream/throttle.rs @@ -0,0 +1,70 @@ +use std::future::Future; +use std::pin::Pin; +use std::time::{Duration, Instant}; + +use futures_timer::Delay; +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + /// A stream that only yields one element once every `duration`. + /// + /// This `struct` is created by the [`throttle`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`throttle`]: trait.Stream.html#method.throttle + /// [`Stream`]: trait.Stream.html + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct Throttle { + #[pin] + stream: S, + duration: Duration, + #[pin] + blocked: bool, + #[pin] + delay: Delay, + } +} + +impl Throttle { + pub(super) fn new(stream: S, duration: Duration) -> Self { + Throttle { + stream, + duration, + blocked: false, + delay: Delay::new(Duration::default()), + } + } +} + +impl Stream for Throttle { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + if *this.blocked { + let d = this.delay.as_mut(); + if d.poll(cx).is_ready() { + *this.blocked = false; + } else { + return Poll::Pending; + } + } + + match this.stream.poll_next(cx) { + Poll::Pending => { + cx.waker().wake_by_ref(); // Continue driving even though emitting Pending + Poll::Pending + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(v)) => { + *this.blocked = true; + this.delay.reset(Instant::now() + *this.duration); + Poll::Ready(Some(v)) + } + } + } +}