forked from mirror/async-std
		
	Stagger background thread spin-down to avoid a thundering herd
This commit is contained in:
		
							parent
							
								
									9b3e8b8f26
								
							
						
					
					
						commit
						10146e31f0
					
				
					 1 changed files with 39 additions and 3 deletions
				
			
		|  | @ -49,7 +49,7 @@ lazy_static! { | ||||||
| 
 | 
 | ||||||
| // Create up to MAX_THREADS dynamic blocking task worker threads.
 | // Create up to MAX_THREADS dynamic blocking task worker threads.
 | ||||||
| // Dynamic threads will terminate themselves if they don't
 | // Dynamic threads will terminate themselves if they don't
 | ||||||
| // receive any work after one second.
 | // receive any work after between one and ten seconds.
 | ||||||
| fn maybe_create_another_blocking_thread() { | fn maybe_create_another_blocking_thread() { | ||||||
|     // We use a `Relaxed` atomic operation because
 |     // We use a `Relaxed` atomic operation because
 | ||||||
|     // it's just a heuristic, and would not lose correctness
 |     // it's just a heuristic, and would not lose correctness
 | ||||||
|  | @ -59,10 +59,19 @@ fn maybe_create_another_blocking_thread() { | ||||||
|         return; |         return; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     // We want to avoid having all threads terminate at
 | ||||||
|  |     // exactly the same time, causing thundering herd
 | ||||||
|  |     // effects. We want to stagger their destruction over
 | ||||||
|  |     // 10 seconds or so to make the costs fade into
 | ||||||
|  |     // background noise.
 | ||||||
|  |     //
 | ||||||
|  |     // Generate a simple random number of milliseconds
 | ||||||
|  |     let rand_sleep_ms = u64::from(random(10_000)); | ||||||
|  | 
 | ||||||
|     thread::Builder::new() |     thread::Builder::new() | ||||||
|         .name("async-blocking-driver-dynamic".to_string()) |         .name("async-blocking-driver-dynamic".to_string()) | ||||||
|         .spawn(|| { |         .spawn(move || { | ||||||
|             let wait_limit = Duration::from_secs(1); |             let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); | ||||||
| 
 | 
 | ||||||
|             DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); |             DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed); | ||||||
|             while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { |             while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) { | ||||||
|  | @ -119,3 +128,30 @@ impl<R> fmt::Debug for JoinHandle<R> { | ||||||
|             .finish() |             .finish() | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | 
 | ||||||
|  | /// Generates a random number in `0..n`.
 | ||||||
|  | fn random(n: u32) -> u32 { | ||||||
|  |     use std::cell::Cell; | ||||||
|  |     use std::num::Wrapping; | ||||||
|  | 
 | ||||||
|  |     thread_local! { | ||||||
|  |         static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1406868647)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     RNG.with(|rng| { | ||||||
|  |         // This is the 32-bit variant of Xorshift.
 | ||||||
|  |         //
 | ||||||
|  |         // Source: https://en.wikipedia.org/wiki/Xorshift
 | ||||||
|  |         let mut x = rng.get(); | ||||||
|  |         x ^= x << 13; | ||||||
|  |         x ^= x >> 17; | ||||||
|  |         x ^= x << 5; | ||||||
|  |         rng.set(x); | ||||||
|  | 
 | ||||||
|  |         // This is a fast alternative to `x % n`.
 | ||||||
|  |         //
 | ||||||
|  |         // Author: Daniel Lemire
 | ||||||
|  |         // Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
 | ||||||
|  |         ((x.0 as u64).wrapping_mul(n as u64) >> 32) as u32 | ||||||
|  |     }) | ||||||
|  | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue