diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index 5a9d3025..a0d88b98 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -1,6 +1,7 @@ use std::cell::Cell; use std::io; use std::iter; +use std::ptr; use std::sync::atomic::{self, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; @@ -26,6 +27,12 @@ thread_local! { struct Scheduler { /// Set to `true` while a machine is polling the reactor. polling: bool, + + /// Idle processors. + processors: Vec, + + /// Running machines. + machines: Vec>, } /// An async runtime. @@ -39,9 +46,6 @@ pub struct Runtime { /// Handles to local queues for stealing work. stealers: Vec>, - /// Machines to start - machines: Vec>, - /// The scheduler state. sched: Mutex, } @@ -51,23 +55,17 @@ impl Runtime { pub fn new() -> Runtime { let cpus = num_cpus::get().max(1); let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect(); - - let machines: Vec<_> = processors - .into_iter() - .map(|p| Arc::new(Machine::new(p))) - .collect(); - - let stealers = machines - .iter() - .map(|m| m.processor.lock().worker.stealer()) - .collect(); + let stealers = processors.iter().map(|p| p.worker.stealer()).collect(); Runtime { reactor: Reactor::new().unwrap(), injector: Injector::new(), stealers, - machines, - sched: Mutex::new(Scheduler { polling: false }), + sched: Mutex::new(Scheduler { + processors, + machines: Vec::new(), + polling: false, + }), } } @@ -99,21 +97,57 @@ impl Runtime { /// Runs the runtime on the current thread. pub fn run(&self) { scope(|s| { - for m in &self.machines { - s.builder() - .name("async-std/machine".to_string()) - .spawn(move |_| { - abort_on_panic(|| { - let _ = MACHINE.with(|machine| machine.set(m.clone())); - m.run(self); + let mut idle = 0; + let mut delay = 0; + + loop { + // Get a list of new machines to start, if any need to be started. + for m in self.make_machines() { + idle = 0; + + s.builder() + .name("async-std/machine".to_string()) + .spawn(move |_| { + abort_on_panic(|| { + let _ = MACHINE.with(|machine| machine.set(m.clone())); + m.run(self); + }) }) - }) - .expect("cannot start a machine thread"); + .expect("cannot start a machine thread"); + } + + // Sleep for a bit longer if the scheduler state hasn't changed in a while. + if idle > 10 { + delay = (delay * 2).min(10_000); + } else { + idle += 1; + delay = 1000; + } + + thread::sleep(Duration::from_micros(delay)); } }) .unwrap(); } + /// Returns a list of machines that need to be started. + fn make_machines(&self) -> Vec> { + let mut sched = self.sched.lock().unwrap(); + let mut to_start = Vec::new(); + + // If no machine has been polling the reactor in a while, that means the runtime is + // overloaded with work and we need to start another machine. + if !sched.polling { + if let Some(p) = sched.processors.pop() { + let m = Arc::new(Machine::new(p)); + to_start.push(m.clone()); + sched.machines.push(m); + } + } + + to_start + } + /// Unparks a thread polling the reactor. fn notify(&self) { atomic::fence(Ordering::SeqCst); @@ -139,20 +173,26 @@ impl Runtime { /// A thread running a processor. struct Machine { /// Holds the processor until it gets stolen. - processor: Spinlock, + processor: Spinlock>, } impl Machine { /// Creates a new machine running a processor. fn new(p: Processor) -> Machine { Machine { - processor: Spinlock::new(p), + processor: Spinlock::new(Some(p)), } } /// Schedules a task onto the machine. fn schedule(&self, rt: &Runtime, task: Runnable) { - self.processor.lock().schedule(rt, task); + match self.processor.lock().as_mut() { + None => { + rt.injector.push(task); + rt.notify(); + } + Some(p) => p.schedule(rt, task), + } } /// Finds the next runnable task. @@ -160,14 +200,16 @@ impl Machine { let mut retry = false; // First try finding a task in the local queue or in the global queue. - if let Some(task) = self.processor.lock().pop_task() { - return Steal::Success(task); - } + if let Some(p) = self.processor.lock().as_mut() { + if let Some(task) = p.pop_task() { + return Steal::Success(task); + } - match self.processor.lock().steal_from_global(rt) { - Steal::Empty => {} - Steal::Retry => retry = true, - Steal::Success(task) => return Steal::Success(task), + match p.steal_from_global(rt) { + Steal::Empty => {} + Steal::Retry => retry = true, + Steal::Success(task) => return Steal::Success(task), + } } // Try polling the reactor, but don't block on it. @@ -175,16 +217,18 @@ impl Machine { // Try finding a task in the local queue, which might hold tasks woken by the reactor. If // the local queue is still empty, try stealing from other processors. - if progress { - if let Some(task) = self.processor.lock().pop_task() { - return Steal::Success(task); + if let Some(p) = self.processor.lock().as_mut() { + if progress { + if let Some(task) = p.pop_task() { + return Steal::Success(task); + } } - } - match self.processor.lock().steal_from_others(rt) { - Steal::Empty => {} - Steal::Retry => retry = true, - Steal::Success(task) => return Steal::Success(task), + match p.steal_from_others(rt) { + Steal::Empty => {} + Steal::Retry => retry = true, + Steal::Success(task) => return Steal::Success(task), + } } if retry { Steal::Retry } else { Steal::Empty } @@ -208,7 +252,9 @@ impl Machine { // Check if `task::yield_now()` was invoked and flush the slot if so. YIELD_NOW.with(|flag| { if flag.replace(false) { - self.processor.lock().flush_slot(rt); + if let Some(p) = self.processor.lock().as_mut() { + p.flush_slot(rt); + } } }); @@ -219,12 +265,13 @@ impl Machine { runs = 0; rt.quick_poll().unwrap(); - let mut p = self.processor.lock(); - if let Steal::Success(task) = p.steal_from_global(rt) { - p.schedule(rt, task); - } + if let Some(p) = self.processor.lock().as_mut() { + if let Steal::Success(task) = p.steal_from_global(rt) { + p.schedule(rt, task); + } - p.flush_slot(rt); + p.flush_slot(rt); + } } // Try to find a runnable task. @@ -245,7 +292,9 @@ impl Machine { // Put the current thread to sleep a few times. if fails <= YIELDS + SLEEPS { + let opt_p = self.processor.lock().take(); thread::sleep(Duration::from_micros(10)); + *self.processor.lock() = opt_p; continue; } @@ -266,6 +315,16 @@ impl Machine { break; } + // Take out the machine associated with the current thread. + let m = match sched + .machines + .iter() + .position(|elem| ptr::eq(&**elem, self)) + { + None => break, // The processor was stolen. + Some(pos) => sched.machines.swap_remove(pos), + }; + // Unlock the schedule poll the reactor until new I/O events arrive. sched.polling = true; drop(sched); @@ -274,10 +333,21 @@ impl Machine { // Lock the scheduler again and re-register the machine. sched = rt.sched.lock().unwrap(); sched.polling = false; + sched.machines.push(m); runs = 0; fails = 0; } + + // When shutting down the thread, take the processor out if still available. + let opt_p = self.processor.lock().take(); + + // Return the processor to the scheduler and remove the machine. + if let Some(p) = opt_p { + let mut sched = rt.sched.lock().unwrap(); + sched.processors.push(p); + sched.machines.retain(|elem| !ptr::eq(&**elem, self)); + } } }