From f588ba6bdd8d4f1dbd9bf01a091cae32325af4a4 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Thu, 7 Nov 2019 23:39:54 +0000 Subject: [PATCH] Spawn more than one blocking thread (#475) * Spawn more than 1 blocking thread * Fix a bug * Fix check when the thread is last sleeping --- src/task/spawn_blocking.rs | 110 ++++++++++++++----------------------- src/utils.rs | 8 ++- 2 files changed, 49 insertions(+), 69 deletions(-) diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index 3f4f18a..6076d1b 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::thread; use std::time::Duration; @@ -8,8 +8,6 @@ use once_cell::sync::Lazy; use crate::task::{JoinHandle, Task}; use crate::utils::{abort_on_panic, random}; -type Runnable = async_task::Task; - /// Spawns a blocking task. /// /// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. This @@ -44,14 +42,16 @@ where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { + let schedule = |task| POOL.sender.send(task).unwrap(); let (task, handle) = async_task::spawn(async { f() }, schedule, Task::new(None)); task.schedule(); JoinHandle::new(handle) } -const MAX_THREADS: u64 = 10_000; +type Runnable = async_task::Task; -static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); +/// The number of sleeping worker threads. +static SLEEPING: AtomicUsize = AtomicUsize::new(0); struct Pool { sender: Sender, @@ -59,78 +59,52 @@ struct Pool { } static POOL: Lazy = Lazy::new(|| { - for _ in 0..2 { - thread::Builder::new() - .name("async-std/blocking".to_string()) - .spawn(|| { - abort_on_panic(|| { - for task in &POOL.receiver { - task.run(); - } - }) - }) - .expect("cannot start a thread driving blocking tasks"); - } + // Start a single worker thread waiting for the first task. + start_thread(); - // We want to use an unbuffered channel here to help - // us drive our dynamic control. In effect, the - // kernel's scheduler becomes the queue, reducing - // the number of buffers that work must flow through - // before being acted on by a core. This helps keep - // latency snappy in the overall async system by - // reducing bufferbloat. let (sender, receiver) = unbounded(); Pool { sender, receiver } }); -// Create up to MAX_THREADS dynamic blocking task worker threads. -// Dynamic threads will terminate themselves if they don't -// receive any work after between one and ten seconds. -fn maybe_create_another_blocking_thread() { - // We use a `Relaxed` atomic operation because - // it's just a heuristic, and would not lose correctness - // even if it's random. - let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); - if workers >= MAX_THREADS { - return; - } +fn start_thread() { + SLEEPING.fetch_add(1, Ordering::SeqCst); - let n_to_spawn = std::cmp::min(2 + (workers / 10), 10); + // Generate a random duration of time between 1 second and 10 seconds. If the thread doesn't + // receive the next task in this duration of time, it will stop running. + let timeout = Duration::from_millis(1000 + u64::from(random(9_000))); - for _ in 0..n_to_spawn { - // We want to avoid having all threads terminate at - // exactly the same time, causing thundering herd - // effects. We want to stagger their destruction over - // 10 seconds or so to make the costs fade into - // background noise. - // - // Generate a simple random number of milliseconds - let rand_sleep_ms = u64::from(random(10_000)); + thread::Builder::new() + .name("async-std/blocking".to_string()) + .spawn(move || { + loop { + let task = match POOL.receiver.recv_timeout(timeout) { + Ok(task) => task, + Err(_) => { + // Check whether this is the last sleeping thread. + if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 { + // If so, then restart the thread to make sure there is always at least + // one sleeping thread. + if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 { + continue; + } + } - thread::Builder::new() - .name("async-std/blocking".to_string()) - .spawn(move || { - let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); + // Stop the thread. + return; + } + }; - DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); - while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { - abort_on_panic(|| task.run()); + // If there are no sleeping threads, then start one to make sure there is always at + // least one sleeping thread. + if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 { + start_thread(); } - DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); - }) - .expect("cannot start a dynamic thread driving blocking tasks"); - } -} -// Enqueues work, attempting to send to the threadpool in a -// nonblocking way and spinning up another worker thread if -// there is not a thread ready to accept the work. -pub(crate) fn schedule(task: Runnable) { - if let Err(err) = POOL.sender.try_send(task) { - // We were not able to send to the channel without - // blocking. Try to spin up another thread and then - // retry sending while blocking. - maybe_create_another_blocking_thread(); - POOL.sender.send(err.into_inner()).unwrap(); - } + // Run the task. + abort_on_panic(|| task.run()); + + SLEEPING.fetch_add(1, Ordering::SeqCst); + } + }) + .expect("cannot start a blocking thread"); } diff --git a/src/utils.rs b/src/utils.rs index 7758cc7..cfdf5fa 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -25,7 +25,13 @@ pub fn random(n: u32) -> u32 { use std::num::Wrapping; thread_local! { - static RNG: Cell> = Cell::new(Wrapping(1_406_868_647)); + static RNG: Cell> = { + // Take the address of a local value as seed. + let mut x = 0i32; + let r = &mut x; + let addr = r as *mut i32 as usize; + Cell::new(Wrapping(addr as u32)) + } } RNG.with(|rng| {