From a2393501c5edd4c0ef682dfdfe09f05e02bd5e5c Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Wed, 16 Oct 2019 18:43:34 +0200 Subject: [PATCH 01/14] Implemented StreamExt::throttle --- src/stream/stream/mod.rs | 10 +++++++ src/stream/stream/throttle.rs | 56 +++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 src/stream/stream/throttle.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 764dc97..8035769 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -50,6 +50,7 @@ mod skip_while; mod step_by; mod take; mod take_while; +mod throttle; mod try_fold; mod try_for_each; mod zip; @@ -86,10 +87,12 @@ pub use skip_while::SkipWhile; pub use step_by::StepBy; pub use take::Take; pub use take_while::TakeWhile; +pub use throttle::Throttle; pub use zip::Zip; use std::cmp::Ordering; use std::marker::PhantomData; +use std::time::Duration; use cfg_if::cfg_if; @@ -288,6 +291,13 @@ extension_trait! { TakeWhile::new(self, predicate) } + fn throttle(self, d: Duration) -> Throttle + where + Self: Sized, + { + Throttle::new(self, d) + } + #[doc = r#" Creates a stream that yields each `step`th element. diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs new file mode 100644 index 0000000..c37c972 --- /dev/null +++ b/src/stream/stream/throttle.rs @@ -0,0 +1,56 @@ +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +use futures_timer::Delay; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that only yields one element once every `duration`, and drops all others. +/// #[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Throttle { + stream: S, + duration: Duration, + delay: Option, +} + +impl Unpin for Throttle {} + +impl Throttle { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(duration: Duration); + pin_utils::unsafe_pinned!(delay: Option); + + pub(super) fn new(stream: S, duration: Duration) -> Self { + Throttle { + stream, + duration, + delay: None, + } + } +} + +impl Stream for Throttle { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.as_mut().stream().poll_next(cx) { + Poll::Ready(v) => match self.as_mut().delay().as_pin_mut() { + None => { + *self.as_mut().delay() = Some(Delay::new(self.duration)); + Poll::Ready(v) + } + Some(d) => match d.poll(cx) { + Poll::Ready(_) => { + *self.as_mut().delay() = Some(Delay::new(self.duration)); + Poll::Ready(v) + } + Poll::Pending => Poll::Pending, + }, + }, + Poll::Pending => Poll::Pending, + } + } +} From 1c843a8124d599c19930da286363fc29b135d644 Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Wed, 23 Oct 2019 22:27:51 +0200 Subject: [PATCH 02/14] Re-implemented Throttle to keep last value in memory --- src/stream/stream/mod.rs | 2 +- src/stream/stream/throttle.rs | 43 ++++++++++++++++++++++------------- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 95b0444..3584be3 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -281,7 +281,7 @@ extension_trait! { TakeWhile::new(self, predicate) } - fn throttle(self, d: Duration) -> Throttle + fn throttle(self, d: Duration) -> Throttle where Self: Sized, { diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index c37c972..8e96fea 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -10,47 +10,58 @@ use crate::task::{Context, Poll}; /// A stream that only yields one element once every `duration`, and drops all others. /// #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct Throttle { +pub struct Throttle { stream: S, duration: Duration, delay: Option, + last: Option, } -impl Unpin for Throttle {} +impl Unpin for Throttle {} -impl Throttle { +impl Throttle { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(duration: Duration); pin_utils::unsafe_pinned!(delay: Option); + pin_utils::unsafe_unpinned!(last: Option); pub(super) fn new(stream: S, duration: Duration) -> Self { Throttle { stream, duration, delay: None, + last: None, } } } -impl Stream for Throttle { +impl Stream for Throttle { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.as_mut().stream().poll_next(cx) { - Poll::Ready(v) => match self.as_mut().delay().as_pin_mut() { - None => { + if let Some(d) = self.as_mut().delay().as_pin_mut() { + if d.poll(cx).is_ready() { + if let Some(v) = self.as_mut().last().take() { + // Sets last to None. *self.as_mut().delay() = Some(Delay::new(self.duration)); - Poll::Ready(v) + return Poll::Ready(Some(v)); } - Some(d) => match d.poll(cx) { - Poll::Ready(_) => { - *self.as_mut().delay() = Some(Delay::new(self.duration)); - Poll::Ready(v) - } - Poll::Pending => Poll::Pending, - }, - }, + } + } + + match self.as_mut().stream().poll_next(cx) { Poll::Pending => Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(v)) => { + if self.as_mut().delay().is_some() { + *self.as_mut().last() = Some(v); + cx.waker().wake_by_ref(); // Continue driving even though emitting Pending + return Poll::Pending; + } + + *self.as_mut().delay() = Some(Delay::new(self.duration)); + Poll::Ready(Some(v)) + } } } } From 1fd05a157f70c99157079672584d3f1aa1626bd8 Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Wed, 23 Oct 2019 22:34:39 +0200 Subject: [PATCH 03/14] Reset delay to prevent poll after ready --- src/stream/stream/throttle.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 8e96fea..2a0cc56 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -45,6 +45,8 @@ impl Stream for Throttle { // Sets last to None. *self.as_mut().delay() = Some(Delay::new(self.duration)); return Poll::Ready(Some(v)); + } else { + *self.as_mut().delay() = None; } } } From b591fc68bdee365cfc1f6228c2d2f264b1b2f6e8 Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Mon, 11 Nov 2019 12:17:00 +0100 Subject: [PATCH 04/14] Changed semantics of throttle to non-dropping variant with backpressure --- examples/throttle.rs | 27 ++++++++++++++++++++++++++ src/stream/stream/mod.rs | 29 ++++++++++++++++++++++++++-- src/stream/stream/throttle.rs | 36 +++++++++++++---------------------- 3 files changed, 67 insertions(+), 25 deletions(-) create mode 100644 examples/throttle.rs diff --git a/examples/throttle.rs b/examples/throttle.rs new file mode 100644 index 0000000..1b9a6f2 --- /dev/null +++ b/examples/throttle.rs @@ -0,0 +1,27 @@ +//! Spawns a timed task which gets throttled. + +fn main() { + #[cfg(feature = "unstable")] + { + use async_std::prelude::*; + use async_std::task; + + task::block_on(async { + use async_std::stream; + use std::time::Duration; + + // emit value every 1 second + let s = stream::interval(Duration::from_nanos(1000000)).enumerate(); + + // throttle for 2 seconds + let s = s.throttle(Duration::from_secs(2)); + + s.for_each(|(n, _)| { + dbg!(n); + }) + .await; + // => 0 .. 1 .. 2 .. 3 + // with a pause of 2 seconds between each print + }) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8c39eb9..8b30c5e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -117,7 +117,6 @@ use std::time::Duration; cfg_unstable! { use std::future::Future; use std::pin::Pin; - use std::time::Duration; use crate::stream::into_stream::IntoStream; use crate::stream::{FromStream, Product, Sum}; @@ -316,7 +315,33 @@ extension_trait! { TakeWhile::new(self, predicate) } - fn throttle(self, d: Duration) -> Throttle + #[doc = r#" + Limit the amount of items yielded per timeslice in a stream. + + # Examples + ```ignore + # fn main() { async_std::task::block_on(async { + # + use async_std::stream; + use std::time::Duration; + + // emit value every 1 second + let s = stream::interval(Duration::from_nanos(1000000)).enumerate(); + + // throttle for 2 seconds + let s = s.throttle(Duration::from_secs(2)); + + s.for_each(|(n, _)| { + dbg!(n); + }) + .await; + // => 0 .. 1 .. 2 .. 3 + // with a pause of 2 seconds between each print + # + # }) } + ``` + "#] + fn throttle(self, d: Duration) -> Throttle where Self: Sized, { diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 2a0cc56..010839c 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -7,60 +7,50 @@ use futures_timer::Delay; use crate::stream::Stream; use crate::task::{Context, Poll}; -/// A stream that only yields one element once every `duration`, and drops all others. +/// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements. /// #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct Throttle { +pub struct Throttle { stream: S, duration: Duration, delay: Option, - last: Option, } -impl Unpin for Throttle {} +impl Unpin for Throttle {} -impl Throttle { +impl Throttle { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(duration: Duration); pin_utils::unsafe_pinned!(delay: Option); - pin_utils::unsafe_unpinned!(last: Option); pub(super) fn new(stream: S, duration: Duration) -> Self { Throttle { stream, duration, delay: None, - last: None, } } } -impl Stream for Throttle { +impl Stream for Throttle { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Some(d) = self.as_mut().delay().as_pin_mut() { if d.poll(cx).is_ready() { - if let Some(v) = self.as_mut().last().take() { - // Sets last to None. - *self.as_mut().delay() = Some(Delay::new(self.duration)); - return Poll::Ready(Some(v)); - } else { - *self.as_mut().delay() = None; - } + *self.as_mut().delay() = None; + } else { + return Poll::Pending; } } match self.as_mut().stream().poll_next(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => return Poll::Ready(None), + Poll::Pending => { + cx.waker().wake_by_ref(); // Continue driving even though emitting Pending + Poll::Pending + } + Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { - if self.as_mut().delay().is_some() { - *self.as_mut().last() = Some(v); - cx.waker().wake_by_ref(); // Continue driving even though emitting Pending - return Poll::Pending; - } - *self.as_mut().delay() = Some(Delay::new(self.duration)); Poll::Ready(Some(v)) } From 139a34b6852b5bf74bb97ead4a26241f4a88edde Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Mon, 11 Nov 2019 12:26:32 +0100 Subject: [PATCH 05/14] Make throttle an unstable feature --- src/stream/stream/mod.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8b30c5e..756e8e9 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -60,7 +60,6 @@ mod skip_while; mod step_by; mod take; mod take_while; -mod throttle; mod try_fold; mod try_for_each; mod zip; @@ -107,16 +106,15 @@ pub use skip_while::SkipWhile; pub use step_by::StepBy; pub use take::Take; pub use take_while::TakeWhile; -pub use throttle::Throttle; pub use zip::Zip; use std::cmp::Ordering; use std::marker::PhantomData; -use std::time::Duration; cfg_unstable! { use std::future::Future; use std::pin::Pin; + use std::time::Duration; use crate::stream::into_stream::IntoStream; use crate::stream::{FromStream, Product, Sum}; @@ -125,11 +123,13 @@ cfg_unstable! { pub use flatten::Flatten; pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; + pub use throttle::Throttle; mod merge; mod flatten; mod flat_map; mod timeout; + mod throttle; } extension_trait! { @@ -315,6 +315,7 @@ extension_trait! { TakeWhile::new(self, predicate) } + #[cfg(all(feature = "default", feature = "unstable"))] #[doc = r#" Limit the amount of items yielded per timeslice in a stream. From ef958f0408c713de6b93cb995d00244212472de8 Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Mon, 11 Nov 2019 13:09:35 +0100 Subject: [PATCH 06/14] Use pin_project_lite instead for throttle --- src/stream/stream/mod.rs | 3 ++- src/stream/stream/throttle.rs | 36 +++++++++++++++++------------------ 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 756e8e9..a91517d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -315,7 +315,6 @@ extension_trait! { TakeWhile::new(self, predicate) } - #[cfg(all(feature = "default", feature = "unstable"))] #[doc = r#" Limit the amount of items yielded per timeslice in a stream. @@ -342,6 +341,8 @@ extension_trait! { # }) } ``` "#] + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn throttle(self, d: Duration) -> Throttle where Self: Sized, diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 010839c..8813607 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -3,26 +3,25 @@ use std::pin::Pin; use std::time::Duration; use futures_timer::Delay; +use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -/// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements. -/// #[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct Throttle { - stream: S, - duration: Duration, - delay: Option, +pin_project! { + /// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements. + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct Throttle { + #[pin] + stream: S, + duration: Duration, + #[pin] + delay: Option, + } } -impl Unpin for Throttle {} - impl Throttle { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_unpinned!(duration: Duration); - pin_utils::unsafe_pinned!(delay: Option); - pub(super) fn new(stream: S, duration: Duration) -> Self { Throttle { stream, @@ -35,23 +34,24 @@ impl Throttle { impl Stream for Throttle { type Item = S::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(d) = self.as_mut().delay().as_pin_mut() { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + if let Some(d) = this.delay.as_mut().as_pin_mut() { if d.poll(cx).is_ready() { - *self.as_mut().delay() = None; + this.delay.set(None); } else { return Poll::Pending; } } - match self.as_mut().stream().poll_next(cx) { + match this.stream.poll_next(cx) { Poll::Pending => { cx.waker().wake_by_ref(); // Continue driving even though emitting Pending Poll::Pending } Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { - *self.as_mut().delay() = Some(Delay::new(self.duration)); + this.delay.set(Some(Delay::new(*this.duration))); Poll::Ready(Some(v)) } } From 7c7386735ef013f29a72cea801689a71e3db74ac Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Tue, 12 Nov 2019 14:34:31 +0100 Subject: [PATCH 07/14] Wrap around throttle comment Co-Authored-By: Yoshua Wuyts --- src/stream/stream/throttle.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 8813607..44ce7e1 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -9,7 +9,13 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { - /// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements. + /// A stream that only yields one element once every `duration`. + /// + /// This `struct` is created by the [`throttle`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`throttle`]: trait.Stream.html#method.throttle + /// [`Stream`]: trait.Stream.html #[doc(hidden)] #[allow(missing_debug_implementations)] pub struct Throttle { From 6f6d5e9d205765a6676a08c74e00c3f6001da93e Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Tue, 12 Nov 2019 14:35:03 +0100 Subject: [PATCH 08/14] Updated throttle fn comments. Co-Authored-By: Yoshua Wuyts --- src/stream/stream/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index a91517d..6a6d43d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -318,6 +318,7 @@ extension_trait! { #[doc = r#" Limit the amount of items yielded per timeslice in a stream. +This stream does not drop any items, but will only limit the rate at which items pass through. # Examples ```ignore # fn main() { async_std::task::block_on(async { From 88cbf2c119371830714e6e26417a00439e968659 Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Tue, 12 Nov 2019 14:30:28 +0100 Subject: [PATCH 09/14] Change throttle test to run in milliseconds --- src/stream/stream/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index a91517d..6a7f89f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -319,23 +319,23 @@ extension_trait! { Limit the amount of items yielded per timeslice in a stream. # Examples - ```ignore + ``` # fn main() { async_std::task::block_on(async { # + use async_std::prelude::*; use async_std::stream; use std::time::Duration; // emit value every 1 second - let s = stream::interval(Duration::from_nanos(1000000)).enumerate(); + let s = stream::interval(Duration::from_millis(5)).enumerate().take(3); // throttle for 2 seconds - let s = s.throttle(Duration::from_secs(2)); + let mut s = s.throttle(Duration::from_millis(10)); - s.for_each(|(n, _)| { - dbg!(n); - }) - .await; - // => 0 .. 1 .. 2 .. 3 + assert_eq!(s.next().await, Some((0, ()))); + assert_eq!(s.next().await, Some((1, ()))); + assert_eq!(s.next().await, Some((2, ()))); + assert_eq!(s.next().await, None); // with a pause of 2 seconds between each print # # }) } From 6990c1403f85f598f5bd3536a9a01ef7c5464a31 Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Tue, 12 Nov 2019 15:07:20 +0100 Subject: [PATCH 10/14] Reimplemented throttle to never drop Delay, added boolean flag --- src/stream/stream/mod.rs | 2 +- src/stream/stream/throttle.rs | 17 +++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index cc1646c..48a1a20 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -318,7 +318,7 @@ extension_trait! { #[doc = r#" Limit the amount of items yielded per timeslice in a stream. -This stream does not drop any items, but will only limit the rate at which items pass through. + This stream does not drop any items, but will only limit the rate at which items pass through. # Examples ``` # fn main() { async_std::task::block_on(async { diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 44ce7e1..8896899 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -1,6 +1,6 @@ use std::future::Future; use std::pin::Pin; -use std::time::Duration; +use std::time::{Duration, Instant}; use futures_timer::Delay; use pin_project_lite::pin_project; @@ -23,7 +23,9 @@ pin_project! { stream: S, duration: Duration, #[pin] - delay: Option, + blocked: bool, + #[pin] + delay: Delay, } } @@ -32,7 +34,8 @@ impl Throttle { Throttle { stream, duration, - delay: None, + blocked: false, + delay: Delay::new(Duration::default()), } } } @@ -42,9 +45,10 @@ impl Stream for Throttle { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - if let Some(d) = this.delay.as_mut().as_pin_mut() { + if *this.blocked { + let d = this.delay.as_mut(); if d.poll(cx).is_ready() { - this.delay.set(None); + *this.blocked = false; } else { return Poll::Pending; } @@ -57,7 +61,8 @@ impl Stream for Throttle { } Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { - this.delay.set(Some(Delay::new(*this.duration))); + *this.blocked = true; + this.delay.reset(Instant::now() + *this.duration); Poll::Ready(Some(v)) } } From 4ab7b213de8d8f8307f0a1877cb4a4d4c802e792 Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Tue, 12 Nov 2019 15:38:02 +0100 Subject: [PATCH 11/14] Updated example to be consistent; added timing measurements to throttle --- examples/throttle.rs | 2 +- src/stream/stream/mod.rs | 23 ++++++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/examples/throttle.rs b/examples/throttle.rs index 1b9a6f2..74c1fd3 100644 --- a/examples/throttle.rs +++ b/examples/throttle.rs @@ -11,7 +11,7 @@ fn main() { use std::time::Duration; // emit value every 1 second - let s = stream::interval(Duration::from_nanos(1000000)).enumerate(); + let s = stream::interval(Duration::from_secs(1)).enumerate(); // throttle for 2 seconds let s = s.throttle(Duration::from_secs(2)); diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 48a1a20..cec874f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -325,19 +325,32 @@ extension_trait! { # use async_std::prelude::*; use async_std::stream; - use std::time::Duration; + use std::time::{Duration, Instant}; - // emit value every 1 second - let s = stream::interval(Duration::from_millis(5)).enumerate().take(3); + // emit value every 5 milliseconds + let s = stream::interval(Duration::from_millis(5)) + .enumerate() + .take(3); - // throttle for 2 seconds + // throttle for 10 milliseconds let mut s = s.throttle(Duration::from_millis(10)); + let start = Instant::now(); assert_eq!(s.next().await, Some((0, ()))); + let duration_ms = start.elapsed().as_millis(); + assert!(duration_ms >= 5 && duration_ms < 15); + assert_eq!(s.next().await, Some((1, ()))); + let duration_ms = start.elapsed().as_millis(); + assert!(duration_ms >= 15 && duration_ms < 25); + assert_eq!(s.next().await, Some((2, ()))); + let duration_ms = start.elapsed().as_millis(); + assert!(duration_ms >= 25 && duration_ms < 35); + assert_eq!(s.next().await, None); - // with a pause of 2 seconds between each print + let duration_ms = start.elapsed().as_millis(); + assert!(duration_ms >= 35 && duration_ms < 45); # # }) } ``` From c5b3a98e5b4f12f80f5e9b5ddb135c22da60502f Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Tue, 12 Nov 2019 16:22:25 +0100 Subject: [PATCH 12/14] Increased throttle test to 10x time --- src/stream/stream/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index cec874f..86645fc 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -327,30 +327,30 @@ extension_trait! { use async_std::stream; use std::time::{Duration, Instant}; - // emit value every 5 milliseconds - let s = stream::interval(Duration::from_millis(5)) + // emit value every 50 milliseconds + let s = stream::interval(Duration::from_millis(50)) .enumerate() .take(3); - // throttle for 10 milliseconds - let mut s = s.throttle(Duration::from_millis(10)); + // throttle for 100 milliseconds + let mut s = s.throttle(Duration::from_millis(100)); let start = Instant::now(); assert_eq!(s.next().await, Some((0, ()))); let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 5 && duration_ms < 15); + assert!(duration_ms >= 50 && duration_ms < 150); assert_eq!(s.next().await, Some((1, ()))); let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 15 && duration_ms < 25); + assert!(duration_ms >= 150 && duration_ms < 250); assert_eq!(s.next().await, Some((2, ()))); let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 25 && duration_ms < 35); + assert!(duration_ms >= 250 && duration_ms < 350); assert_eq!(s.next().await, None); let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 35 && duration_ms < 45); + assert!(duration_ms >= 350 && duration_ms < 450); # # }) } ``` From 90c67c223a383feaf7f898f6bbfd8b7ac4feee89 Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Thu, 14 Nov 2019 10:26:56 +0100 Subject: [PATCH 13/14] Decreased throttle test time to original values; only test lower bound --- src/stream/stream/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 86645fc..99a1120 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -327,30 +327,30 @@ extension_trait! { use async_std::stream; use std::time::{Duration, Instant}; - // emit value every 50 milliseconds - let s = stream::interval(Duration::from_millis(50)) + // emit value every 5 milliseconds + let s = stream::interval(Duration::from_millis(5)) .enumerate() .take(3); - // throttle for 100 milliseconds - let mut s = s.throttle(Duration::from_millis(100)); + // throttle for 10 milliseconds + let mut s = s.throttle(Duration::from_millis(10)); let start = Instant::now(); assert_eq!(s.next().await, Some((0, ()))); let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 50 && duration_ms < 150); + assert!(duration_ms >= 5); assert_eq!(s.next().await, Some((1, ()))); let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 150 && duration_ms < 250); + assert!(duration_ms >= 15); assert_eq!(s.next().await, Some((2, ()))); let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 250 && duration_ms < 350); + assert!(duration_ms >= 25); assert_eq!(s.next().await, None); let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 350 && duration_ms < 450); + assert!(duration_ms >= 35); # # }) } ``` From dda65cbff0c9a68b0c6efda2a61754065fdee4dc Mon Sep 17 00:00:00 2001 From: Wouter Geraedts Date: Thu, 14 Nov 2019 11:29:49 +0100 Subject: [PATCH 14/14] Start throttle measurement before initialisation --- src/stream/stream/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 99a1120..de4a8fb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -327,6 +327,8 @@ extension_trait! { use async_std::stream; use std::time::{Duration, Instant}; + let start = Instant::now(); + // emit value every 5 milliseconds let s = stream::interval(Duration::from_millis(5)) .enumerate() @@ -335,7 +337,6 @@ extension_trait! { // throttle for 10 milliseconds let mut s = s.throttle(Duration::from_millis(10)); - let start = Instant::now(); assert_eq!(s.next().await, Some((0, ()))); let duration_ms = start.elapsed().as_millis(); assert!(duration_ms >= 5);