From e938527f660647ab0efecf5c9e1ff71191f33901 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 15 Oct 2019 15:30:24 +0200 Subject: [PATCH] add stream::interval (#298) * add stream::interval Signed-off-by: Yoshua Wuyts * fix tests Signed-off-by: Yoshua Wuyts * cargo fmt Signed-off-by: Yoshua Wuyts * cross-docs Signed-off-by: Yoshua Wuyts * update deps Signed-off-by: Yoshua Wuyts --- Cargo.toml | 2 +- src/io/timeout.rs | 22 ++--- src/stream/interval.rs | 198 +++++++++++++++++++++++++++++++++++++++++ src/stream/mod.rs | 4 +- src/task/sleep.rs | 4 + 5 files changed, 217 insertions(+), 13 deletions(-) create mode 100644 src/stream/interval.rs diff --git a/Cargo.toml b/Cargo.toml index 0c652988..dc319f71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,7 @@ crossbeam-channel = "0.3.9" crossbeam-deque = "0.7.1" futures-core-preview = "=0.3.0-alpha.19" futures-io-preview = "=0.3.0-alpha.19" -futures-timer = "0.4.0" +futures-timer = "1.0.2" lazy_static = "1.4.0" log = { version = "0.4.8", features = ["kv_unstable"] } memchr = "2.2.1" diff --git a/src/io/timeout.rs b/src/io/timeout.rs index 69b6892c..9fcc15ef 100644 --- a/src/io/timeout.rs +++ b/src/io/timeout.rs @@ -2,8 +2,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; -use futures_core::future::TryFuture; use futures_timer::Delay; +use pin_utils::unsafe_pinned; use crate::future::Future; use crate::io; @@ -36,16 +36,16 @@ pub async fn timeout(dur: Duration, f: F) -> io::Result where F: Future>, { - let f = TimeoutFuture { + Timeout { timeout: Delay::new(dur), future: f, - }; - f.await + } + .await } -// Future returned by the [`io::timeout`](./fn.timeout.html) function. +/// Future returned by the `FutureExt::timeout` method. #[derive(Debug)] -pub struct TimeoutFuture +pub struct Timeout where F: Future>, { @@ -53,22 +53,22 @@ where timeout: Delay, } -impl TimeoutFuture +impl Timeout where F: Future>, { - pin_utils::unsafe_pinned!(future: F); - pin_utils::unsafe_pinned!(timeout: Delay); + unsafe_pinned!(future: F); + unsafe_pinned!(timeout: Delay); } -impl Future for TimeoutFuture +impl Future for Timeout where F: Future>, { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.as_mut().future().try_poll(cx) { + match self.as_mut().future().poll(cx) { Poll::Pending => {} other => return other, } diff --git a/src/stream/interval.rs b/src/stream/interval.rs new file mode 100644 index 00000000..271cd81d --- /dev/null +++ b/src/stream/interval.rs @@ -0,0 +1,198 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + +use futures_core::future::Future; +use futures_core::stream::Stream; +use pin_utils::unsafe_pinned; + +use futures_timer::Delay; + +/// Creates a new stream that yields at a set interval. +/// +/// The stream first yields after `dur`, and continues to yield every +/// `dur` after that. The stream accounts for time elapsed between calls, and +/// will adjust accordingly to prevent time skews. +/// +/// Each interval may be slightly longer than the specified duration, but never +/// less. +/// +/// Note that intervals are not intended for high resolution timers, but rather +/// they will likely fire some granularity after the exact instant that they're +/// otherwise indicated to fire at. +/// +/// See also: [`task::sleep`]. +/// +/// [`task::sleep`]: ../task/fn.sleep.html +/// +/// # Examples +/// +/// Basic example: +/// +/// ```no_run +/// use async_std::prelude::*; +/// use async_std::stream; +/// use std::time::Duration; +/// +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// let mut interval = stream::interval(Duration::from_secs(4)); +/// while let Some(_) = interval.next().await { +/// println!("prints every four seconds"); +/// } +/// # +/// # Ok(()) }) } +/// ``` +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[doc(inline)] +pub fn interval(dur: Duration) -> Interval { + Interval { + delay: Delay::new(dur), + interval: dur, + } +} + +/// A stream representing notifications at fixed interval +/// +#[derive(Debug)] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[doc(inline)] +pub struct Interval { + delay: Delay, + interval: Duration, +} + +impl Interval { + unsafe_pinned!(delay: Delay); +} + +impl Stream for Interval { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if Pin::new(&mut *self).delay().poll(cx).is_pending() { + return Poll::Pending; + } + let when = Instant::now(); + let next = next_interval(when, Instant::now(), self.interval); + self.delay.reset(next); + Poll::Ready(Some(())) + } +} + +/// Converts Duration object to raw nanoseconds if possible +/// +/// This is useful to divide intervals. +/// +/// While technically for large duration it's impossible to represent any +/// duration as nanoseconds, the largest duration we can represent is about +/// 427_000 years. Large enough for any interval we would use or calculate in +/// tokio. +fn duration_to_nanos(dur: Duration) -> Option { + dur.as_secs() + .checked_mul(1_000_000_000) + .and_then(|v| v.checked_add(u64::from(dur.subsec_nanos()))) +} + +fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant { + let new = prev + interval; + if new > now { + return new; + } + + let spent_ns = duration_to_nanos(now.duration_since(prev)).expect("interval should be expired"); + let interval_ns = + duration_to_nanos(interval).expect("interval is less that 427 thousand years"); + let mult = spent_ns / interval_ns + 1; + assert!( + mult < (1 << 32), + "can't skip more than 4 billion intervals of {:?} \ + (trying to skip {})", + interval, + mult + ); + prev + interval * (mult as u32) +} + +#[cfg(test)] +mod test { + use super::next_interval; + use std::time::{Duration, Instant}; + + struct Timeline(Instant); + + impl Timeline { + fn new() -> Timeline { + Timeline(Instant::now()) + } + fn at(&self, millis: u64) -> Instant { + self.0 + Duration::from_millis(millis) + } + fn at_ns(&self, sec: u64, nanos: u32) -> Instant { + self.0 + Duration::new(sec, nanos) + } + } + + fn dur(millis: u64) -> Duration { + Duration::from_millis(millis) + } + + // The math around Instant/Duration isn't 100% precise due to rounding + // errors, see #249 for more info + fn almost_eq(a: Instant, b: Instant) -> bool { + if a == b { + true + } else if a > b { + a - b < Duration::from_millis(1) + } else { + b - a < Duration::from_millis(1) + } + } + + #[test] + fn norm_next() { + let tm = Timeline::new(); + assert!(almost_eq( + next_interval(tm.at(1), tm.at(2), dur(10)), + tm.at(11) + )); + assert!(almost_eq( + next_interval(tm.at(7777), tm.at(7788), dur(100)), + tm.at(7877) + )); + assert!(almost_eq( + next_interval(tm.at(1), tm.at(1000), dur(2100)), + tm.at(2101) + )); + } + + #[test] + fn fast_forward() { + let tm = Timeline::new(); + assert!(almost_eq( + next_interval(tm.at(1), tm.at(1000), dur(10)), + tm.at(1001) + )); + assert!(almost_eq( + next_interval(tm.at(7777), tm.at(8888), dur(100)), + tm.at(8977) + )); + assert!(almost_eq( + next_interval(tm.at(1), tm.at(10000), dur(2100)), + tm.at(10501) + )); + } + + /// TODO: this test actually should be successful, but since we can't + /// multiply Duration on anything larger than u32 easily we decided + /// to allow it to fail for now + #[test] + #[should_panic(expected = "can't skip more than 4 billion intervals")] + fn large_skip() { + let tm = Timeline::new(); + assert_eq!( + next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)), + tm.at_ns(25, 1) + ); + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 070983ed..f34e9dc9 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -44,13 +44,15 @@ cfg_if! { mod extend; mod from_stream; mod into_stream; + mod interval; pub use double_ended_stream::DoubleEndedStream; pub use exact_size_stream::ExactSizeStream; pub use extend::Extend; pub use from_stream::FromStream; - pub use into_stream::IntoStream; pub use fused_stream::FusedStream; + pub use into_stream::IntoStream; + pub use interval::{interval, Interval}; pub use stream::Merge; } diff --git a/src/task/sleep.rs b/src/task/sleep.rs index 3e98755d..380ee3e6 100644 --- a/src/task/sleep.rs +++ b/src/task/sleep.rs @@ -11,6 +11,10 @@ use crate::io; /// /// [`std::thread::sleep`]: https://doc.rust-lang.org/std/thread/fn.sleep.html /// +/// See also: [`stream::interval`]. +/// +/// [`stream::interval`]: ../stream/fn.interval.html +/// /// # Examples /// /// ```