From e97dfdc4cc4089ad0d617f5b8cd2213c91d9da84 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 8 Aug 2019 16:48:18 +0200 Subject: [PATCH] Add dynamic threadpool --- src/task/blocking.rs | 93 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index ccffa7db..a6444039 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -3,14 +3,23 @@ use std::fmt; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; use std::task::{Context, Poll}; use std::thread; +use std::time::Duration; -use crossbeam::channel::{unbounded, Receiver, Sender}; +use crossbeam::channel::{bounded, Receiver, Sender}; use lazy_static::lazy_static; use crate::utils::abort_on_panic; +const MAX_THREADS: u64 = 10_000; +const MIN_WAIT_MS: u64 = 1; +const MAX_WAIT_MS: u64 = 100; +const WAIT_SPREAD: u64 = MAX_WAIT_MS - MIN_WAIT_MS; + +static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); + struct Pool { sender: Sender>, receiver: Receiver>, @@ -29,11 +38,90 @@ lazy_static! { .expect("cannot start a thread driving blocking tasks"); } - let (sender, receiver) = unbounded(); + // We want to bound the work queue to make it more + // suitable as a backpressure mechanism. + let (sender, receiver) = bounded(MAX_THREADS as usize); Pool { sender, receiver } }; } +// Create up to 10,000 dynamic blocking task worker threads. +// Dynamic threads will terminate themselves if they don't +// receive any work after a timeout that scales down as the +// total number of threads scales up. +fn maybe_create_another_blocking_thread() { + let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); + if workers >= MAX_THREADS { + return; + } + + // We want to give up earlier when we have more threads + // to exert backpressure on the system submitting work + // to do. We use a `Relaxed` atomic operation because + // it's just a heuristic, and would not lose correctness + // even if it's random. + let utilization_percent = (workers * 100) / MAX_THREADS; + let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; + + // higher utilization -> lower wait time + let wait_limit_ms = MAX_WAIT_MS - relative_wait_limit; + assert!(wait_limit_ms >= MIN_WAIT_MS); + let wait_limit = Duration::from_millis(wait_limit_ms); + + thread::Builder::new() + .name("async-blocking-driver-dynamic".to_string()) + .spawn(move || { + DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); + while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { + abort_on_panic(|| task.run()); + } + DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); + }) + .expect("cannot start a dynamic thread driving blocking tasks"); +} + +// Enqueues work, blocking on a threadpool for a certain amount of +// time based on the number of worker threads currently active in +// the system. If we cannot send our work to the pool after the +// given timeout, we will attempt to increase the number of +// worker threads active in the system, up to MAX_THREADS. The +// timeout is dynamic, and when we have more threads we block +// for longer before spinning up another thread for backpressure. +fn schedule(t: async_task::Task<()>) { + let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); + + // We want to block for longer when we have more threads to + // exert backpressure on the system submitting work to do. + let utilization_percent = (workers * 100) / MAX_THREADS; + let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; + + // higher utilization -> higher block time + let wait_limit_ms = MIN_WAIT_MS + relative_wait_limit; + assert!(wait_limit_ms <= MAX_WAIT_MS); + let wait_limit = Duration::from_millis(wait_limit_ms); + + let first_try_result = POOL.sender.send_timeout(t, wait_limit); + match first_try_result { + Ok(()) => { + // NICEEEE + } + Err(crossbeam::channel::SendTimeoutError::Timeout(t)) => { + // We were not able to send to the channel within our + // budget. Try to spin up another thread, and then + // block without a time limit on the submission of + // the task. + maybe_create_another_blocking_thread(); + POOL.sender.send(t).unwrap() + } + Err(crossbeam::channel::SendTimeoutError::Disconnected(_)) => { + panic!( + "unable to send to blocking threadpool \ + due to receiver disconnection" + ); + } + } +} + /// Spawns a blocking task. /// /// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. @@ -42,7 +130,6 @@ where F: Future + Send + 'static, R: Send + 'static, { - let schedule = |t| POOL.sender.send(t).unwrap(); let (task, handle) = async_task::spawn(future, schedule, ()); task.schedule(); JoinHandle(handle)