Merge pull request #739 from devashishdxt/futures-timer-update

Update futures-timer to 3.0.2
pull/741/head
nasa 5 years ago committed by GitHub
commit 6674dc0edf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -61,7 +61,7 @@ crossbeam-deque = { version = "0.7.3", optional = true }
crossbeam-utils = { version = "0.7.2", optional = true } crossbeam-utils = { version = "0.7.2", optional = true }
futures-core = { version = "0.3.4", optional = true, default-features = false } futures-core = { version = "0.3.4", optional = true, default-features = false }
futures-io = { version = "0.3.4", optional = true } futures-io = { version = "0.3.4", optional = true }
futures-timer = { version = "2.0.2", optional = true } futures-timer = { version = "3.0.2", optional = true }
kv-log-macro = { version = "1.0.4", optional = true } kv-log-macro = { version = "1.0.4", optional = true }
log = { version = "0.4.8", features = ["kv_unstable"], optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true }
memchr = { version = "2.3.3", optional = true } memchr = { version = "2.3.3", optional = true }

@ -1,6 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::Duration;
use crate::future::Future; use crate::future::Future;
use crate::stream::Stream; use crate::stream::Stream;
@ -71,125 +71,8 @@ impl Stream for Interval {
if Pin::new(&mut self.delay).poll(cx).is_pending() { if Pin::new(&mut self.delay).poll(cx).is_pending() {
return Poll::Pending; return Poll::Pending;
} }
let when = Instant::now(); let interval = self.interval;
let next = next_interval(when, Instant::now(), self.interval); self.delay.reset(interval);
self.delay.reset(next);
Poll::Ready(Some(())) Poll::Ready(Some(()))
} }
} }
/// Converts Duration object to raw nanoseconds if possible
///
/// This is useful to divide intervals.
///
/// While technically for large duration it's impossible to represent any
/// duration as nanoseconds, the largest duration we can represent is about
/// 427_000 years. Large enough for any interval we would use or calculate in
/// async-std.
fn duration_to_nanos(dur: Duration) -> Option<u64> {
dur.as_secs()
.checked_mul(1_000_000_000)
.and_then(|v| v.checked_add(u64::from(dur.subsec_nanos())))
}
fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
let new = prev + interval;
if new > now {
return new;
}
let spent_ns = duration_to_nanos(now.duration_since(prev)).expect("interval should be expired");
let interval_ns =
duration_to_nanos(interval).expect("interval is less that 427 thousand years");
let mult = spent_ns / interval_ns + 1;
assert!(
mult < (1 << 32),
"can't skip more than 4 billion intervals of {:?} \
(trying to skip {})",
interval,
mult
);
prev + interval * (mult as u32)
}
#[cfg(test)]
mod test {
use super::next_interval;
use std::cmp::Ordering;
use std::time::{Duration, Instant};
struct Timeline(Instant);
impl Timeline {
fn new() -> Timeline {
Timeline(Instant::now())
}
fn at(&self, millis: u64) -> Instant {
self.0 + Duration::from_millis(millis)
}
fn at_ns(&self, sec: u64, nanos: u32) -> Instant {
self.0 + Duration::new(sec, nanos)
}
}
fn dur(millis: u64) -> Duration {
Duration::from_millis(millis)
}
// The math around Instant/Duration isn't 100% precise due to rounding
// errors, see #249 for more info
fn almost_eq(a: Instant, b: Instant) -> bool {
match a.cmp(&b) {
Ordering::Equal => true,
Ordering::Greater => a - b < Duration::from_millis(1),
Ordering::Less => b - a < Duration::from_millis(1),
}
}
#[test]
fn norm_next() {
let tm = Timeline::new();
assert!(almost_eq(
next_interval(tm.at(1), tm.at(2), dur(10)),
tm.at(11)
));
assert!(almost_eq(
next_interval(tm.at(7777), tm.at(7788), dur(100)),
tm.at(7877)
));
assert!(almost_eq(
next_interval(tm.at(1), tm.at(1000), dur(2100)),
tm.at(2101)
));
}
#[test]
fn fast_forward() {
let tm = Timeline::new();
assert!(almost_eq(
next_interval(tm.at(1), tm.at(1000), dur(10)),
tm.at(1001)
));
assert!(almost_eq(
next_interval(tm.at(7777), tm.at(8888), dur(100)),
tm.at(8977)
));
assert!(almost_eq(
next_interval(tm.at(1), tm.at(10000), dur(2100)),
tm.at(10501)
));
}
/// TODO: this test actually should be successful, but since we can't
/// multiply Duration on anything larger than u32 easily we decided
/// to allow it to fail for now
#[test]
#[should_panic(expected = "can't skip more than 4 billion intervals")]
fn large_skip() {
let tm = Timeline::new();
assert_eq!(
next_interval(tm.at_ns(0, 1), tm.at_ns(25, 0), Duration::new(0, 2)),
tm.at_ns(25, 1)
);
}
}

@ -1,6 +1,6 @@
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::time::{Duration, Instant}; use std::time::Duration;
use futures_timer::Delay; use futures_timer::Delay;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -59,7 +59,7 @@ impl<S: Stream> Stream for Throttle<S> {
Poll::Ready(None) => Poll::Ready(None), Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(v)) => { Poll::Ready(Some(v)) => {
*this.blocked = true; *this.blocked = true;
this.delay.reset(Instant::now() + *this.duration); this.delay.reset(*this.duration);
Poll::Ready(Some(v)) Poll::Ready(Some(v))
} }
} }

Loading…
Cancel
Save