diff --git a/Cargo.toml b/Cargo.toml index 37374f7b..d4a5ad70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,7 @@ crossbeam-deque = { version = "0.7.3", optional = true } crossbeam-utils = { version = "0.7.2", optional = true } futures-core = { version = "0.3.4", optional = true, default-features = false } futures-io = { version = "0.3.4", optional = true } -futures-timer = { version = "2.0.2", optional = true } +futures-timer = { version = "3.0.2", optional = true } kv-log-macro = { version = "1.0.4", optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true } memchr = { version = "2.3.3", optional = true } diff --git a/src/stream/interval.rs b/src/stream/interval.rs index 7a0c1740..be94b06c 100644 --- a/src/stream/interval.rs +++ b/src/stream/interval.rs @@ -1,6 +1,6 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; +use std::time::Duration; use crate::future::Future; use crate::stream::Stream; @@ -71,125 +71,8 @@ impl Stream for Interval { if Pin::new(&mut self.delay).poll(cx).is_pending() { return Poll::Pending; } - let when = Instant::now(); - let next = next_interval(when, Instant::now(), self.interval); - self.delay.reset(next); + let interval = self.interval; + self.delay.reset(interval); Poll::Ready(Some(())) } } - -/// Converts Duration object to raw nanoseconds if possible -/// -/// This is useful to divide intervals. -/// -/// While technically for large duration it's impossible to represent any -/// duration as nanoseconds, the largest duration we can represent is about -/// 427_000 years. Large enough for any interval we would use or calculate in -/// async-std. -fn duration_to_nanos(dur: Duration) -> Option { - dur.as_secs() - .checked_mul(1_000_000_000) - .and_then(|v| v.checked_add(u64::from(dur.subsec_nanos()))) -} - -fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant { - let new = prev + interval; - if new > now { - return new; - } - - let spent_ns = duration_to_nanos(now.duration_since(prev)).expect("interval should be expired"); - let interval_ns = - duration_to_nanos(interval).expect("interval is less that 427 thousand years"); - let mult = spent_ns / interval_ns + 1; - assert!( - mult < (1 << 32), - "can't skip more than 4 billion intervals of {:?} \ - (trying to skip {})", - interval, - mult - ); - prev + interval * (mult as u32) -} - -#[cfg(test)] -mod test { - use super::next_interval; - use std::cmp::Ordering; - use std::time::{Duration, Instant}; - - struct Timeline(Instant); - - impl Timeline { - fn new() -> Timeline { - Timeline(Instant::now()) - } - fn at(&self, millis: u64) -> Instant { - self.0 + Duration::from_millis(millis) - } - fn at_ns(&self, sec: u64, nanos: u32) -> Instant { - self.0 + Duration::new(sec, nanos) - } - } - - fn dur(millis: u64) -> Duration { - Duration::from_millis(millis) - } - - // The math around Instant/Duration isn't 100% precise due to rounding - // errors, see #249 for more info - fn almost_eq(a: Instant, b: Instant) -> bool { - match a.cmp(&b) { - Ordering::Equal => true, - Ordering::Greater => a - b < Duration::from_millis(1), - Ordering::Less => b - a < Duration::from_millis(1), - } - } - - #[test] - fn norm_next() { - let tm = Timeline::new(); - assert!(almost_eq( - next_interval(tm.at(1), tm.at(2), dur(10)), - tm.at(11) - )); - assert!(almost_eq( - next_interval(tm.at(7777), tm.at(7788), dur(100)), - tm.at(7877) - )); - assert!(almost_eq( - next_interval(tm.at(1), tm.at(1000), dur(2100)), - tm.at(2101) - )); - } - - #[test] - fn fast_forward() { - let tm = Timeline::new(); - assert!(almost_eq( - next_interval(tm.at(1), tm.at(1000), dur(10)), - tm.at(1001) - )); - assert!(almost_eq( - next_interval(tm.at(7777), tm.at(8888), dur(100)), - tm.at(8977) - )); - assert!(almost_eq( - next_interval(tm.at(1), tm.at(10000), dur(2100)), - tm.at(10501) - )); - } - - /// TODO: this test actually should be successful, but since we can't - /// multiply Duration on anything larger than u32 easily we decided - /// to allow it to fail for now - #[test] - #[should_panic(expected = "can't skip more than 4 billion intervals")] - fn large_skip() { - let tm = Timeline::new(); - assert_eq!( - next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)), - tm.at_ns(25, 1) - ); - } -} diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index ce8c13b3..554ca306 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -1,6 +1,6 @@ use std::future::Future; use std::pin::Pin; -use std::time::{Duration, Instant}; +use std::time::Duration; use futures_timer::Delay; use pin_project_lite::pin_project; @@ -59,7 +59,7 @@ impl Stream for Throttle { Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { *this.blocked = true; - this.delay.reset(Instant::now() + *this.duration); + this.delay.reset(*this.duration); Poll::Ready(Some(v)) } }