From 064fdf020fa362349bf13ad89d5db2ef5e547412 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 12 Oct 2019 01:35:41 +0200 Subject: [PATCH 1/7] Stream::delay Signed-off-by: Yoshua Wuyts --- src/stream/stream/delay.rs | 44 ++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 32 +++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 src/stream/stream/delay.rs diff --git a/src/stream/stream/delay.rs b/src/stream/stream/delay.rs new file mode 100644 index 00000000..f6de5f2d --- /dev/null +++ b/src/stream/stream/delay.rs @@ -0,0 +1,44 @@ +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Delay { + stream: S, + delay: futures_timer::Delay, + delay_done: bool, +} + +impl Delay { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_pinned!(delay: futures_timer::Delay); + pin_utils::unsafe_unpinned!(delay_done: bool); + + pub(super) fn new(stream: S, dur: Duration) -> Self { + Delay { + stream, + delay: futures_timer::Delay::new(dur), + delay_done: false, + } + } +} + +impl Stream for Delay +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if !self.delay_done { + futures_core::ready!(self.as_mut().delay().poll(cx)); + *self.as_mut().delay_done() = true; + } + + self.as_mut().stream().poll_next(cx) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8849605c..26def735 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -24,6 +24,7 @@ mod all; mod any; mod chain; +mod delay; mod enumerate; mod filter; mod filter_map; @@ -61,6 +62,7 @@ use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; pub use fuse::Fuse; +pub use delay::Delay; pub use inspect::Inspect; pub use map::Map; pub use scan::Scan; @@ -340,6 +342,36 @@ extension_trait! { Enumerate::new(self) } + #[doc = r#" + Creates a stream that is delayed before it starts yielding items. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::future; + use std::time::Duration; + + let p1 = future::ready(1).delay(Duration::from_millis(200)); + let p1 = future::ready(2).delay(Duration::from_millis(100)); + let p1 = future::ready(3).delay(Duration::from_millis(300)); + + assert_eq!(future::join!(p1, p2, p3).await, (1, 2, 3)); + # + # }) } + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn delay(self, dur: std::time::Duration) -> Delay + where + Self: Sized, + { + Delay::new(self, dur) + } + #[doc = r#" Takes a closure and creates a stream that calls that closure on every element of this stream. From 483ded0e1c2bc85a7bc79efcc1636f9729b8969a Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 12 Oct 2019 01:38:53 +0200 Subject: [PATCH 2/7] fix example Signed-off-by: Yoshua Wuyts --- src/stream/stream/mod.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 26def735..80e31969 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -351,14 +351,19 @@ extension_trait! { # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; - use async_std::future; + use async_std::stream; use std::time::Duration; - let p1 = future::ready(1).delay(Duration::from_millis(200)); - let p1 = future::ready(2).delay(Duration::from_millis(100)); - let p1 = future::ready(3).delay(Duration::from_millis(300)); + let a = stream::once(1).delay(Duration::from_millis(200)); + let b = stream::once(2).delay(Duration::from_millis(100)); + let c = stream::once(3).delay(Duration::from_millis(300)); + + let s = stream::join!(a, b, c); - assert_eq!(future::join!(p1, p2, p3).await, (1, 2, 3)); + assert_eq!(stream.next().await, Some(1)); + assert_eq!(stream.next().await, Some(2)); + assert_eq!(stream.next().await, Some(3)); + assert_eq!(stream.next().await, None); # # }) } ``` From 635c592950e1d70ebf96336b285c4c43a88e9357 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 27 Nov 2019 14:26:04 +0900 Subject: [PATCH 3/7] feat: Add stream::delay --- src/stream/stream/delay.rs | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/stream/stream/delay.rs b/src/stream/stream/delay.rs index f6de5f2d..b0450104 100644 --- a/src/stream/stream/delay.rs +++ b/src/stream/stream/delay.rs @@ -2,22 +2,24 @@ use std::future::Future; use std::pin::Pin; use std::time::Duration; +use pin_project_lite::pin_project; + use crate::stream::Stream; use crate::task::{Context, Poll}; +pin_project! { #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct Delay { - stream: S, - delay: futures_timer::Delay, - delay_done: bool, + pub struct Delay { + #[pin] + stream: S, + #[pin] + delay: futures_timer::Delay, + delay_done: bool, + } } impl Delay { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_pinned!(delay: futures_timer::Delay); - pin_utils::unsafe_unpinned!(delay_done: bool); - pub(super) fn new(stream: S, dur: Duration) -> Self { Delay { stream, @@ -33,12 +35,14 @@ where { type Item = S::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if !self.delay_done { - futures_core::ready!(self.as_mut().delay().poll(cx)); - *self.as_mut().delay_done() = true; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + if !*this.delay_done { + futures_core::ready!(this.delay.poll(cx)); + *this.delay_done = true; } - self.as_mut().stream().poll_next(cx) + this.stream.poll_next(cx) } } From 32765ece418f81b2e36f9f2ca73a60ea0b893531 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 27 Nov 2019 14:26:25 +0900 Subject: [PATCH 4/7] test: Add stream::delay test code --- src/stream/stream/mod.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index ef1e5ecc..e847d6af 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -587,16 +587,12 @@ extension_trait! { use async_std::stream; use std::time::Duration; - let a = stream::once(1).delay(Duration::from_millis(200)); - let b = stream::once(2).delay(Duration::from_millis(100)); - let c = stream::once(3).delay(Duration::from_millis(300)); + let mut s = stream::from_iter(vec![0u8, 1, 2]).delay(Duration::from_millis(200)); - let s = stream::join!(a, b, c); - - assert_eq!(stream.next().await, Some(1)); - assert_eq!(stream.next().await, Some(2)); - assert_eq!(stream.next().await, Some(3)); - assert_eq!(stream.next().await, None); + assert_eq!(s.next().await, Some(0)); + assert_eq!(s.next().await, Some(1)); + assert_eq!(s.next().await, Some(2)); + assert_eq!(s.next().await, None); # # }) } ``` From 9f7c1833dc7559f981d4071a8decb8e4eec322a2 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Thu, 28 Nov 2019 10:37:04 +0900 Subject: [PATCH 5/7] fix module --- src/stream/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e847d6af..cab0b39b 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -28,7 +28,6 @@ mod cloned; mod cmp; mod copied; mod cycle; -mod delay; mod enumerate; mod eq; mod filter; @@ -99,7 +98,6 @@ use try_for_each::TryForEachFuture; pub use chain::Chain; pub use cloned::Cloned; pub use copied::Copied; -pub use delay::Delay; pub use filter::Filter; pub use fuse::Fuse; pub use inspect::Inspect; @@ -132,6 +130,7 @@ cfg_unstable! { pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; pub use throttle::Throttle; + pub use delay::Delay; mod count; mod merge; @@ -140,6 +139,7 @@ cfg_unstable! { mod partition; mod timeout; mod throttle; + mod delay; mod unzip; } From da965e9ba4bb189fe3202df0874973c4003120e0 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Thu, 28 Nov 2019 15:54:13 +0900 Subject: [PATCH 6/7] fix indent --- src/stream/stream/delay.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/delay.rs b/src/stream/stream/delay.rs index b0450104..57687929 100644 --- a/src/stream/stream/delay.rs +++ b/src/stream/stream/delay.rs @@ -8,8 +8,8 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { -#[doc(hidden)] -#[allow(missing_debug_implementations)] + #[doc(hidden)] + #[allow(missing_debug_implementations)] pub struct Delay { #[pin] stream: S, From fb1fb6c903fca2b68871706b873f3f548148b4b4 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Thu, 28 Nov 2019 22:53:24 +0900 Subject: [PATCH 7/7] test: Test the delay time --- src/stream/stream/mod.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index cab0b39b..4fefbad5 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -585,14 +585,24 @@ extension_trait! { # use async_std::prelude::*; use async_std::stream; - use std::time::Duration; + use std::time::{Duration, Instant}; + let start = Instant::now(); let mut s = stream::from_iter(vec![0u8, 1, 2]).delay(Duration::from_millis(200)); assert_eq!(s.next().await, Some(0)); + // The first time will take more than 200ms due to delay. + assert!(start.elapsed().as_millis() >= 200); + assert_eq!(s.next().await, Some(1)); + // There will be no delay after the first time. + assert!(start.elapsed().as_millis() <= 210); + assert_eq!(s.next().await, Some(2)); + assert!(start.elapsed().as_millis() <= 210); + assert_eq!(s.next().await, None); + assert!(start.elapsed().as_millis() <= 210); # # }) } ```