From 5adc608791346a0b8b97ad2bdae56da7f72ac7e0 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 1 Nov 2019 21:53:13 +0100 Subject: [PATCH] =?UTF-8?q?Spawn=20several=20threads=20when=20we=20fail=20?= =?UTF-8?q?to=20enqueue=20work=20in=20the=20blocki=E2=80=A6=20(#181)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Rebase onto master * Switch to unbounded channels --- src/task/spawn_blocking.rs | 46 +++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index b6b5ea34..3f4f18a1 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::thread; use std::time::Duration; -use crossbeam_channel::{bounded, Receiver, Sender}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use once_cell::sync::Lazy; use crate::task::{JoinHandle, Task}; @@ -79,7 +79,7 @@ static POOL: Lazy = Lazy::new(|| { // before being acted on by a core. This helps keep // latency snappy in the overall async system by // reducing bufferbloat. - let (sender, receiver) = bounded(0); + let (sender, receiver) = unbounded(); Pool { sender, receiver } }); @@ -95,27 +95,31 @@ fn maybe_create_another_blocking_thread() { return; } - // We want to avoid having all threads terminate at - // exactly the same time, causing thundering herd - // effects. We want to stagger their destruction over - // 10 seconds or so to make the costs fade into - // background noise. - // - // Generate a simple random number of milliseconds - let rand_sleep_ms = u64::from(random(10_000)); + let n_to_spawn = std::cmp::min(2 + (workers / 10), 10); - thread::Builder::new() - .name("async-std/blocking".to_string()) - .spawn(move || { - let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); + for _ in 0..n_to_spawn { + // We want to avoid having all threads terminate at + // exactly the same time, causing thundering herd + // effects. We want to stagger their destruction over + // 10 seconds or so to make the costs fade into + // background noise. + // + // Generate a simple random number of milliseconds + let rand_sleep_ms = u64::from(random(10_000)); - DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); - while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { - abort_on_panic(|| task.run()); - } - DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); - }) - .expect("cannot start a dynamic thread driving blocking tasks"); + thread::Builder::new() + .name("async-std/blocking".to_string()) + .spawn(move || { + let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); + + DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); + while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { + abort_on_panic(|| task.run()); + } + DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed); + }) + .expect("cannot start a dynamic thread driving blocking tasks"); + } } // Enqueues work, attempting to send to the threadpool in a