From 81fa1d419a1f5af74858303d8e18f9ff2121e8bb Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 9 Aug 2019 12:22:54 +0200 Subject: [PATCH] Remove all of the dynamic control stuff for now and just use nonblocking sends + 1 second receive timeouts --- src/task/blocking.rs | 52 +++++++++----------------------------------- 1 file changed, 10 insertions(+), 42 deletions(-) diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 58dfaaf3..fe9e39a5 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -14,9 +14,6 @@ use lazy_static::lazy_static; use crate::utils::abort_on_panic; const MAX_THREADS: u64 = 10_000; -const MIN_WAIT_US: u64 = 10; -const MAX_WAIT_US: u64 = 10_000; -const WAIT_SPREAD: u64 = MAX_WAIT_US - MIN_WAIT_US; static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); @@ -52,8 +49,7 @@ lazy_static! { // Create up to MAX_THREADS dynamic blocking task worker threads. // Dynamic threads will terminate themselves if they don't -// receive any work after a timeout that scales down as the -// total number of threads scales up. +// receive any work after one second. fn maybe_create_another_blocking_thread() { // We use a `Relaxed` atomic operation because // it's just a heuristic, and would not lose correctness @@ -63,20 +59,11 @@ fn maybe_create_another_blocking_thread() { return; } - // We want to give up earlier when we have more threads - // to exert backpressure on the system submitting work - // to do. - let utilization_percent = (workers * 100) / MAX_THREADS; - let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; - - // higher utilization -> lower wait time - let wait_limit_us = MAX_WAIT_US - relative_wait_limit; - assert!(wait_limit_us >= MIN_WAIT_US); - let wait_limit = Duration::from_micros(wait_limit_us); - thread::Builder::new() .name("async-blocking-driver-dynamic".to_string()) - .spawn(move || { + .spawn(|| { + let wait_limit = Duration::from_secs(1); + DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { abort_on_panic(|| task.run()); @@ -86,35 +73,16 @@ fn maybe_create_another_blocking_thread() { .expect("cannot start a dynamic thread driving blocking tasks"); } -// Enqueues work, blocking on a threadpool for a certain amount of -// time based on the number of worker threads currently active in -// the system. If we cannot send our work to the pool after the -// given timeout, we will attempt to increase the number of -// worker threads active in the system, up to MAX_THREADS. The -// timeout is dynamic, and when we have more threads we block -// for longer before spinning up another thread for backpressure. +// Enqueues work, attempting to send to the threadpool in a +// nonblocking way and spinning up another worker thread if +// there is not a thread ready to accept the work. fn schedule(t: async_task::Task<()>) { - // We use a `Relaxed` atomic operation because - // it's just a heuristic, and would not lose correctness - // even if it's random. - let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed); - - // We want to block for longer when we have more threads to - // exert backpressure on the system submitting work to do. - let utilization_percent = (workers * 100) / MAX_THREADS; - let relative_wait_limit = (WAIT_SPREAD * utilization_percent) / 100; - - // higher utilization -> higher block time - let wait_limit_us = MIN_WAIT_US + relative_wait_limit; - assert!(wait_limit_us <= MAX_WAIT_US); - let wait_limit = Duration::from_micros(wait_limit_us); - - let first_try_result = POOL.sender.send_timeout(t, wait_limit); + let first_try_result = POOL.sender.try_send(t); match first_try_result { Ok(()) => { // NICEEEE } - Err(crossbeam::channel::SendTimeoutError::Timeout(t)) => { + Err(crossbeam::channel::TrySendError::Full(t)) => { // We were not able to send to the channel within our // budget. Try to spin up another thread, and then // block without a time limit on the submission of @@ -122,7 +90,7 @@ fn schedule(t: async_task::Task<()>) { maybe_create_another_blocking_thread(); POOL.sender.send(t).unwrap() } - Err(crossbeam::channel::SendTimeoutError::Disconnected(_)) => { + Err(crossbeam::channel::TrySendError::Disconnected(_)) => { panic!( "unable to send to blocking threadpool \ due to receiver disconnection"