From e97dfdc4cc4089ad0d617f5b8cd2213c91d9da84 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Thu, 8 Aug 2019 16:48:18 +0200 Subject: [PATCH 1/6] Add dynamic threadpool --- src/task/blocking.rs | 93 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 90 insertions(+), 3 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index ccffa7d..a644403 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -3,14 +3,23 @@ use std::fmt; use std::future::Future; use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; use std::task::{Context, Poll}; use std::thread; +use std::time::Duration; -use crossbeam::channel::{unbounded, Receiver, Sender}; +use crossbeam::channel::{bounded, Receiver, Sender}; use lazy_static::lazy_static; use crate::utils::abort_on_panic; +const MAX_THREADS: u64 = 10_000; +const MIN_WAIT_MS: u64 = 1; +const MAX_WAIT_MS: u64 = 100; +const WAIT_SPREAD: u64 = MAX_WAIT_MS - MIN_WAIT_MS; + +static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); + struct Pool { sender: Sender>, receiver: Receiver>, @@ -29,11 +38,90 @@ lazy_static! { .expect("cannot start a thread driving blocking tasks"); } - let (sender, receiver) = unbounded(); + // We want to bound the work queue to make it more + // suitable as a backpressure mechanism. + let (sender, receiver) = bounded(MAX_THREADS as usize); Pool { sender, receiver } }; } +// Create up to 10,000 dynamic blocking task worker threads. +// Dynamic threads will terminate themselves if they don't +// receive any work after a timeout that scales down as the +// total number of threads scales up. +fn maybe_create_another_blocking_thread() { + let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); + if workers >= MAX_THREADS { + return; + } + + // We want to give up earlier when we have more threads + // to exert backpressure on the system submitting work + // to do. We use a `Relaxed` atomic operation because + // it's just a heuristic, and would not lose correctness + // even if it's random. + let utilization_percent = (workers * 100) / MAX_THREADS; + let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; + + // higher utilization -> lower wait time + let wait_limit_ms = MAX_WAIT_MS - relative_wait_limit; + assert!(wait_limit_ms >= MIN_WAIT_MS); + let wait_limit = Duration::from_millis(wait_limit_ms); + + thread::Builder::new() + .name("async-blocking-driver-dynamic".to_string()) + .spawn(move || { + DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); + while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { + abort_on_panic(|| task.run()); + } + DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); + }) + .expect("cannot start a dynamic thread driving blocking tasks"); +} + +// Enqueues work, blocking on a threadpool for a certain amount of +// time based on the number of worker threads currently active in +// the system. If we cannot send our work to the pool after the +// given timeout, we will attempt to increase the number of +// worker threads active in the system, up to MAX_THREADS. The +// timeout is dynamic, and when we have more threads we block +// for longer before spinning up another thread for backpressure. +fn schedule(t: async_task::Task<()>) { + let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); + + // We want to block for longer when we have more threads to + // exert backpressure on the system submitting work to do. + let utilization_percent = (workers * 100) / MAX_THREADS; + let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; + + // higher utilization -> higher block time + let wait_limit_ms = MIN_WAIT_MS + relative_wait_limit; + assert!(wait_limit_ms <= MAX_WAIT_MS); + let wait_limit = Duration::from_millis(wait_limit_ms); + + let first_try_result = POOL.sender.send_timeout(t, wait_limit); + match first_try_result { + Ok(()) => { + // NICEEEE + } + Err(crossbeam::channel::SendTimeoutError::Timeout(t)) => { + // We were not able to send to the channel within our + // budget. Try to spin up another thread, and then + // block without a time limit on the submission of + // the task. + maybe_create_another_blocking_thread(); + POOL.sender.send(t).unwrap() + } + Err(crossbeam::channel::SendTimeoutError::Disconnected(_)) => { + panic!( + "unable to send to blocking threadpool \ + due to receiver disconnection" + ); + } + } +} + /// Spawns a blocking task. /// /// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. @@ -42,7 +130,6 @@ where F: Future + Send + 'static, R: Send + 'static, { - let schedule = |t| POOL.sender.send(t).unwrap(); let (task, handle) = async_task::spawn(future, schedule, ()); task.schedule(); JoinHandle(handle) From d75aae23cb6696c0ce9a19c59e251f0ee338bd34 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 9 Aug 2019 12:06:00 +0200 Subject: [PATCH 2/6] Tune timings of dynamic threadpool --- src/task/blocking.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index a644403..b22117a 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -14,9 +14,9 @@ use lazy_static::lazy_static; use crate::utils::abort_on_panic; const MAX_THREADS: u64 = 10_000; -const MIN_WAIT_MS: u64 = 1; -const MAX_WAIT_MS: u64 = 100; -const WAIT_SPREAD: u64 = MAX_WAIT_MS - MIN_WAIT_MS; +const MIN_WAIT_US: u64 = 10; +const MAX_WAIT_US: u64 = 10_000; +const WAIT_SPREAD: u64 = MAX_WAIT_US - MIN_WAIT_US; static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); @@ -64,9 +64,9 @@ fn maybe_create_another_blocking_thread() { let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; // higher utilization -> lower wait time - let wait_limit_ms = MAX_WAIT_MS - relative_wait_limit; - assert!(wait_limit_ms >= MIN_WAIT_MS); - let wait_limit = Duration::from_millis(wait_limit_ms); + let wait_limit_us = MAX_WAIT_US - relative_wait_limit; + assert!(wait_limit_us >= MIN_WAIT_US); + let wait_limit = Duration::from_micros(wait_limit_us); thread::Builder::new() .name("async-blocking-driver-dynamic".to_string()) @@ -96,9 +96,9 @@ fn schedule(t: async_task::Task<()>) { let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; // higher utilization -> higher block time - let wait_limit_ms = MIN_WAIT_MS + relative_wait_limit; - assert!(wait_limit_ms <= MAX_WAIT_MS); - let wait_limit = Duration::from_millis(wait_limit_ms); + let wait_limit_us = MIN_WAIT_US + relative_wait_limit; + assert!(wait_limit_us <= MAX_WAIT_US); + let wait_limit = Duration::from_micros(wait_limit_us); let first_try_result = POOL.sender.send_timeout(t, wait_limit); match first_try_result { From 4cb1faf299a3e28df87f5bc66bb8336d09fb3cbd Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 9 Aug 2019 12:06:26 +0200 Subject: [PATCH 3/6] Use unbuffered work queue in the dynamic threadpool to reduce bufferbloat --- src/task/blocking.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index b22117a..252ec2f 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -38,9 +38,14 @@ lazy_static! { .expect("cannot start a thread driving blocking tasks"); } - // We want to bound the work queue to make it more - // suitable as a backpressure mechanism. - let (sender, receiver) = bounded(MAX_THREADS as usize); + // We want to use an unbuffered channel here to help + // us drive our dynamic control. In effect, the + // kernel's scheduler becomes the queue, reducing + // the number of buffers that work must flow through + // before being acted on by a core. This helps keep + // latency snappy in the overall async system by + // reducing bufferbloat. + let (sender, receiver) = bounded(0); Pool { sender, receiver } }; } From ab613a53e58dbe97a19ab58fd1665108909ce1ce Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 9 Aug 2019 12:06:43 +0200 Subject: [PATCH 4/6] Improve comments in the blocking threadpool --- src/task/blocking.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 252ec2f..58dfaaf 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -50,11 +50,14 @@ lazy_static! { }; } -// Create up to 10,000 dynamic blocking task worker threads. +// Create up to MAX_THREADS dynamic blocking task worker threads. // Dynamic threads will terminate themselves if they don't // receive any work after a timeout that scales down as the // total number of threads scales up. fn maybe_create_another_blocking_thread() { + // We use a `Relaxed` atomic operation because + // it's just a heuristic, and would not lose correctness + // even if it's random. let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); if workers >= MAX_THREADS { return; @@ -62,9 +65,7 @@ fn maybe_create_another_blocking_thread() { // We want to give up earlier when we have more threads // to exert backpressure on the system submitting work - // to do. We use a `Relaxed` atomic operation because - // it's just a heuristic, and would not lose correctness - // even if it's random. + // to do. let utilization_percent = (workers * 100) / MAX_THREADS; let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; @@ -93,6 +94,9 @@ fn maybe_create_another_blocking_thread() { // timeout is dynamic, and when we have more threads we block // for longer before spinning up another thread for backpressure. fn schedule(t: async_task::Task<()>) { + // We use a `Relaxed` atomic operation because + // it's just a heuristic, and would not lose correctness + // even if it's random. let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); // We want to block for longer when we have more threads to From 81fa1d419a1f5af74858303d8e18f9ff2121e8bb Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 9 Aug 2019 12:22:54 +0200 Subject: [PATCH 5/6] Remove all of the dynamic control stuff for now and just use nonblocking sends + 1 second receive timeouts --- src/task/blocking.rs | 52 +++++++++----------------------------------- 1 file changed, 10 insertions(+), 42 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 58dfaaf..fe9e39a 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -14,9 +14,6 @@ use lazy_static::lazy_static; use crate::utils::abort_on_panic; const MAX_THREADS: u64 = 10_000; -const MIN_WAIT_US: u64 = 10; -const MAX_WAIT_US: u64 = 10_000; -const WAIT_SPREAD: u64 = MAX_WAIT_US - MIN_WAIT_US; static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); @@ -52,8 +49,7 @@ lazy_static! { // Create up to MAX_THREADS dynamic blocking task worker threads. // Dynamic threads will terminate themselves if they don't -// receive any work after a timeout that scales down as the -// total number of threads scales up. +// receive any work after one second. fn maybe_create_another_blocking_thread() { // We use a `Relaxed` atomic operation because // it's just a heuristic, and would not lose correctness @@ -63,20 +59,11 @@ fn maybe_create_another_blocking_thread() { return; } - // We want to give up earlier when we have more threads - // to exert backpressure on the system submitting work - // to do. - let utilization_percent = (workers * 100) / MAX_THREADS; - let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; - - // higher utilization -> lower wait time - let wait_limit_us = MAX_WAIT_US - relative_wait_limit; - assert!(wait_limit_us >= MIN_WAIT_US); - let wait_limit = Duration::from_micros(wait_limit_us); - thread::Builder::new() .name("async-blocking-driver-dynamic".to_string()) - .spawn(move || { + .spawn(|| { + let wait_limit = Duration::from_secs(1); + DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { abort_on_panic(|| task.run()); @@ -86,35 +73,16 @@ fn maybe_create_another_blocking_thread() { .expect("cannot start a dynamic thread driving blocking tasks"); } -// Enqueues work, blocking on a threadpool for a certain amount of -// time based on the number of worker threads currently active in -// the system. If we cannot send our work to the pool after the -// given timeout, we will attempt to increase the number of -// worker threads active in the system, up to MAX_THREADS. The -// timeout is dynamic, and when we have more threads we block -// for longer before spinning up another thread for backpressure. +// Enqueues work, attempting to send to the threadpool in a +// nonblocking way and spinning up another worker thread if +// there is not a thread ready to accept the work. fn schedule(t: async_task::Task<()>) { - // We use a `Relaxed` atomic operation because - // it's just a heuristic, and would not lose correctness - // even if it's random. - let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); - - // We want to block for longer when we have more threads to - // exert backpressure on the system submitting work to do. - let utilization_percent = (workers * 100) / MAX_THREADS; - let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; - - // higher utilization -> higher block time - let wait_limit_us = MIN_WAIT_US + relative_wait_limit; - assert!(wait_limit_us <= MAX_WAIT_US); - let wait_limit = Duration::from_micros(wait_limit_us); - - let first_try_result = POOL.sender.send_timeout(t, wait_limit); + let first_try_result = POOL.sender.try_send(t); match first_try_result { Ok(()) => { // NICEEEE } - Err(crossbeam::channel::SendTimeoutError::Timeout(t)) => { + Err(crossbeam::channel::TrySendError::Full(t)) => { // We were not able to send to the channel within our // budget. Try to spin up another thread, and then // block without a time limit on the submission of @@ -122,7 +90,7 @@ fn schedule(t: async_task::Task<()>) { maybe_create_another_blocking_thread(); POOL.sender.send(t).unwrap() } - Err(crossbeam::channel::SendTimeoutError::Disconnected(_)) => { + Err(crossbeam::channel::TrySendError::Disconnected(_)) => { panic!( "unable to send to blocking threadpool \ due to receiver disconnection" From 445b4161cbaf6d58c578a243b2a9274fa790acda Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 9 Aug 2019 16:40:31 +0200 Subject: [PATCH 6/6] Improve comment on the blocking threadpool --- src/task/blocking.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index fe9e39a..b519cf8 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -83,10 +83,9 @@ fn schedule(t: async_task::Task<()>) { // NICEEEE } Err(crossbeam::channel::TrySendError::Full(t)) => { - // We were not able to send to the channel within our - // budget. Try to spin up another thread, and then - // block without a time limit on the submission of - // the task. + // We were not able to send to the channel without + // blocking. Try to spin up another thread and then + // retry sending while blocking. maybe_create_another_blocking_thread(); POOL.sender.send(t).unwrap() }