Stream::delay

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
new-scheduler
Yoshua Wuyts 5 years ago
parent 30b5ca5851
commit 064fdf020f
No known key found for this signature in database
GPG Key ID: 24EA8164F96777ED

@ -0,0 +1,44 @@
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct Delay<S> {
stream: S,
delay: futures_timer::Delay,
delay_done: bool,
}
impl<S> Delay<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_pinned!(delay: futures_timer::Delay);
pin_utils::unsafe_unpinned!(delay_done: bool);
pub(super) fn new(stream: S, dur: Duration) -> Self {
Delay {
stream,
delay: futures_timer::Delay::new(dur),
delay_done: false,
}
}
}
impl<S> Stream for Delay<S>
where
S: Stream,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.delay_done {
futures_core::ready!(self.as_mut().delay().poll(cx));
*self.as_mut().delay_done() = true;
}
self.as_mut().stream().poll_next(cx)
}
}

@ -24,6 +24,7 @@
mod all; mod all;
mod any; mod any;
mod chain; mod chain;
mod delay;
mod enumerate; mod enumerate;
mod filter; mod filter;
mod filter_map; mod filter_map;
@ -61,6 +62,7 @@ use try_for_each::TryForEeachFuture;
pub use chain::Chain; pub use chain::Chain;
pub use filter::Filter; pub use filter::Filter;
pub use fuse::Fuse; pub use fuse::Fuse;
pub use delay::Delay;
pub use inspect::Inspect; pub use inspect::Inspect;
pub use map::Map; pub use map::Map;
pub use scan::Scan; pub use scan::Scan;
@ -340,6 +342,36 @@ extension_trait! {
Enumerate::new(self) Enumerate::new(self)
} }
#[doc = r#"
Creates a stream that is delayed before it starts yielding items.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::future;
use std::time::Duration;
let p1 = future::ready(1).delay(Duration::from_millis(200));
let p1 = future::ready(2).delay(Duration::from_millis(100));
let p1 = future::ready(3).delay(Duration::from_millis(300));
assert_eq!(future::join!(p1, p2, p3).await, (1, 2, 3));
#
# }) }
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn delay(self, dur: std::time::Duration) -> Delay<Self>
where
Self: Sized,
{
Delay::new(self, dur)
}
#[doc = r#" #[doc = r#"
Takes a closure and creates a stream that calls that closure on every element of this stream. Takes a closure and creates a stream that calls that closure on every element of this stream.

Loading…
Cancel
Save