diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 9af1601..a7a610a 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -2,15 +2,21 @@ use std::fmt; use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; use std::thread; +use std::time::Duration; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::{bounded, Receiver, Sender}; use lazy_static::lazy_static; use crate::future::Future; use crate::task::{Context, Poll}; use crate::utils::abort_on_panic; +const MAX_THREADS: u64 = 10_000; + +static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); + struct Pool { sender: Sender>, receiver: Receiver>, @@ -29,11 +35,69 @@ lazy_static! { .expect("cannot start a thread driving blocking tasks"); } - let (sender, receiver) = unbounded(); + // We want to use an unbuffered channel here to help + // us drive our dynamic control. In effect, the + // kernel's scheduler becomes the queue, reducing + // the number of buffers that work must flow through + // before being acted on by a core. This helps keep + // latency snappy in the overall async system by + // reducing bufferbloat. + let (sender, receiver) = bounded(0); Pool { sender, receiver } }; } +// Create up to MAX_THREADS dynamic blocking task worker threads. +// Dynamic threads will terminate themselves if they don't +// receive any work after one second. +fn maybe_create_another_blocking_thread() { + // We use a `Relaxed` atomic operation because + // it's just a heuristic, and would not lose correctness + // even if it's random. + let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); + if workers >= MAX_THREADS { + return; + } + + thread::Builder::new() + .name("async-blocking-driver-dynamic".to_string()) + .spawn(|| { + let wait_limit = Duration::from_secs(1); + + DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); + while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { + abort_on_panic(|| task.run()); + } + DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); + }) + .expect("cannot start a dynamic thread driving blocking tasks"); +} + +// Enqueues work, attempting to send to the threadpool in a +// nonblocking way and spinning up another worker thread if +// there is not a thread ready to accept the work. +fn schedule(t: async_task::Task<()>) { + let first_try_result = POOL.sender.try_send(t); + match first_try_result { + Ok(()) => { + // NICEEEE + } + Err(crossbeam::channel::TrySendError::Full(t)) => { + // We were not able to send to the channel without + // blocking. Try to spin up another thread and then + // retry sending while blocking. + maybe_create_another_blocking_thread(); + POOL.sender.send(t).unwrap() + } + Err(crossbeam::channel::TrySendError::Disconnected(_)) => { + panic!( + "unable to send to blocking threadpool \ + due to receiver disconnection" + ); + } + } +} + /// Spawns a blocking task. /// /// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. @@ -42,7 +106,6 @@ where F: Future + Send + 'static, R: Send + 'static, { - let schedule = |t| POOL.sender.send(t).unwrap(); let (task, handle) = async_task::spawn(future, schedule, ()); task.schedule(); JoinHandle(handle)