diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index a0d88b98..9227d249 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -8,7 +8,11 @@ use std::thread; use std::time::Duration; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; -use crossbeam_utils::thread::scope; +use crossbeam_utils::{ + sync::{Parker, Unparker}, + thread::scope, +}; +use once_cell::sync::Lazy; use once_cell::unsync::OnceCell; use crate::rt::Reactor; @@ -24,10 +28,16 @@ thread_local! { static YIELD_NOW: Cell = Cell::new(false); } +/// Maximum number of OS threads = processors = machines +static MAXPROCS: Lazy = Lazy::new(|| num_cpus::get().max(1)); + struct Scheduler { /// Set to `true` while a machine is polling the reactor. polling: bool, + /// Available threads. + threads: Vec, + /// Idle processors. processors: Vec, @@ -35,6 +45,13 @@ struct Scheduler { machines: Vec>, } +struct ThreadState { + unparker: Unparker, + parked: Arc, + /// Used to transfer the machine into the thread. + machine_sender: crossbeam_channel::Sender>, +} + /// An async runtime. pub struct Runtime { /// The reactor. @@ -53,9 +70,9 @@ pub struct Runtime { impl Runtime { /// Creates a new runtime. pub fn new() -> Runtime { - let cpus = num_cpus::get().max(1); - let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect(); + let processors: Vec<_> = (0..*MAXPROCS).map(|_| Processor::new()).collect(); let stealers = processors.iter().map(|p| p.worker.stealer()).collect(); + let threads = Vec::with_capacity(*MAXPROCS); Runtime { reactor: Reactor::new().unwrap(), @@ -63,7 +80,8 @@ impl Runtime { stealers, sched: Mutex::new(Scheduler { processors, - machines: Vec::new(), + machines: Vec::with_capacity(*MAXPROCS), + threads, polling: false, }), } @@ -102,18 +120,67 @@ impl Runtime { loop { // Get a list of new machines to start, if any need to be started. - for m in self.make_machines() { + let machines = self.make_machines(); + for m in machines { idle = 0; - s.builder() - .name("async-std/machine".to_string()) - .spawn(move |_| { - abort_on_panic(|| { - let _ = MACHINE.with(|machine| machine.set(m.clone())); - m.run(self); + // println!("getting idle thread"); + let mut sched = self.sched.lock().unwrap(); + 'inner: for thread in &sched.threads { + // grab the first parked thread + if thread + .parked + .compare_and_swap(true, false, Ordering::Acquire) + { + // transfer the machine + thread + .machine_sender + .send(m) + .expect("failed to send machine to thread"); + // unpark the thread + thread.unparker.unpark(); + break 'inner; + } + } + + // no idle thread available, check if we can spawn one + if sched.threads.len() < *MAXPROCS { + // we can spawn one, lets do it + let parked = Arc::new(atomic::AtomicBool::new(true)); + let parked2 = parked.clone(); + let (machine_sender, machine_recv) = crossbeam_channel::bounded(1); + let parker = Parker::new(); + let unparker = parker.unparker().clone(); + + s.builder() + .name("async-std/machine".to_string()) + .spawn(move |_| { + abort_on_panic(|| { + loop { + while parked2.load(Ordering::Acquire) { + parker.park(); + } + // when this thread is unparked, retrieve machine + let m: Arc = + machine_recv.recv().expect("failed to receive machine"); + // store it in the thread local + let _ = MACHINE.with(|machine| machine.set(m.clone())); + // run it + m.run(self); + // when run ends, go into parked mode again + parked2.store(false, Ordering::Relaxed); + } + }) }) - }) - .expect("cannot start a machine thread"); + .expect("cannot start a machine thread"); + + sched.threads.push(ThreadState { + unparker, + parked, + machine_sender, + }); + } + drop(sched); } // Sleep for a bit longer if the scheduler state hasn't changed in a while. @@ -142,6 +209,7 @@ impl Runtime { let m = Arc::new(Machine::new(p)); to_start.push(m.clone()); sched.machines.push(m); + assert!(sched.machines.len() <= *MAXPROCS); } } @@ -326,6 +394,7 @@ impl Machine { }; // Unlock the schedule poll the reactor until new I/O events arrive. + // println!("polling start"); sched.polling = true; drop(sched); rt.reactor.poll(None).unwrap(); @@ -333,6 +402,7 @@ impl Machine { // Lock the scheduler again and re-register the machine. sched = rt.sched.lock().unwrap(); sched.polling = false; + //println!("polling stop"); sched.machines.push(m); runs = 0; @@ -344,9 +414,14 @@ impl Machine { // Return the processor to the scheduler and remove the machine. if let Some(p) = opt_p { + // println!("returning processor to pool"); let mut sched = rt.sched.lock().unwrap(); sched.processors.push(p); + assert!(sched.processors.len() <= *MAXPROCS); + // println!("machines {}", sched.machines.len()); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); + // println!("machines retained {}", sched.machines.len()); + assert!(sched.machines.len() <= *MAXPROCS); } } }