From 064fdf020fa362349bf13ad89d5db2ef5e547412 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 12 Oct 2019 01:35:41 +0200 Subject: [PATCH] Stream::delay Signed-off-by: Yoshua Wuyts --- src/stream/stream/delay.rs | 44 ++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 32 +++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) create mode 100644 src/stream/stream/delay.rs diff --git a/src/stream/stream/delay.rs b/src/stream/stream/delay.rs new file mode 100644 index 00000000..f6de5f2d --- /dev/null +++ b/src/stream/stream/delay.rs @@ -0,0 +1,44 @@ +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Delay { + stream: S, + delay: futures_timer::Delay, + delay_done: bool, +} + +impl Delay { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_pinned!(delay: futures_timer::Delay); + pin_utils::unsafe_unpinned!(delay_done: bool); + + pub(super) fn new(stream: S, dur: Duration) -> Self { + Delay { + stream, + delay: futures_timer::Delay::new(dur), + delay_done: false, + } + } +} + +impl Stream for Delay +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if !self.delay_done { + futures_core::ready!(self.as_mut().delay().poll(cx)); + *self.as_mut().delay_done() = true; + } + + self.as_mut().stream().poll_next(cx) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8849605c..26def735 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -24,6 +24,7 @@ mod all; mod any; mod chain; +mod delay; mod enumerate; mod filter; mod filter_map; @@ -61,6 +62,7 @@ use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; pub use fuse::Fuse; +pub use delay::Delay; pub use inspect::Inspect; pub use map::Map; pub use scan::Scan; @@ -340,6 +342,36 @@ extension_trait! { Enumerate::new(self) } + #[doc = r#" + Creates a stream that is delayed before it starts yielding items. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::future; + use std::time::Duration; + + let p1 = future::ready(1).delay(Duration::from_millis(200)); + let p1 = future::ready(2).delay(Duration::from_millis(100)); + let p1 = future::ready(3).delay(Duration::from_millis(300)); + + assert_eq!(future::join!(p1, p2, p3).await, (1, 2, 3)); + # + # }) } + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn delay(self, dur: std::time::Duration) -> Delay + where + Self: Sized, + { + Delay::new(self, dur) + } + #[doc = r#" Takes a closure and creates a stream that calls that closure on every element of this stream.