diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 90c6e355..41177bc9 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -49,7 +49,7 @@ lazy_static! { // Create up to MAX_THREADS dynamic blocking task worker threads. // Dynamic threads will terminate themselves if they don't -// receive any work after one second. +// receive any work after between one and ten seconds. fn maybe_create_another_blocking_thread() { // We use a `Relaxed` atomic operation because // it's just a heuristic, and would not lose correctness @@ -59,10 +59,19 @@ fn maybe_create_another_blocking_thread() { return; } + // We want to avoid having all threads terminate at + // exactly the same time, causing thundering herd + // effects. We want to stagger their destruction over + // 10 seconds or so to make the costs fade into + // background noise. + // + // Generate a simple random number of milliseconds + let rand_sleep_ms = u64::from(random(10_000)); + thread::Builder::new() .name("async-blocking-driver-dynamic".to_string()) - .spawn(|| { - let wait_limit = Duration::from_secs(1); + .spawn(move || { + let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { @@ -119,3 +128,30 @@ impl fmt::Debug for JoinHandle { .finish() } } + +/// Generates a random number in `0..n`. +fn random(n: u32) -> u32 { + use std::cell::Cell; + use std::num::Wrapping; + + thread_local! { + static RNG: Cell> = Cell::new(Wrapping(1406868647)); + } + + RNG.with(|rng| { + // This is the 32-bit variant of Xorshift. + // + // Source: https://en.wikipedia.org/wiki/Xorshift + let mut x = rng.get(); + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + rng.set(x); + + // This is a fast alternative to `x % n`. + // + // Author: Daniel Lemire + // Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + ((x.0 as u64).wrapping_mul(n as u64) >> 32) as u32 + }) +}