use std::cell::Cell; use std::cell::RefCell; use std::io; use std::iter; use std::ptr; use std::sync::atomic::{self, Ordering}; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; use crossbeam_utils::{ sync::{Parker, Unparker}, thread::scope, }; use once_cell::sync::Lazy; use crate::rt::Reactor; use crate::sync::Spinlock; use crate::task::Runnable; use crate::utils::{abort_on_panic, random}; thread_local! { /// A reference to the current machine, if the current thread runs tasks. static MACHINE: RefCell>> = RefCell::new(None); /// This flag is set to true whenever `task::yield_now()` is invoked. static YIELD_NOW: Cell = Cell::new(false); } /// Maximum number of OS threads = processors = machines static MAXPROCS: Lazy = Lazy::new(|| num_cpus::get().max(1)); /// Minimum number of machines that are kept exeuting, to avoid starvation. const MIN_MACHINES: usize = 2; struct Scheduler { /// Set to `true` while a machine is polling the reactor. polling: bool, progress: bool, /// Available threads. threads: Vec, /// Idle processors. processors: Vec, /// Running machines. machines: Vec>, } impl Scheduler { /// Get the next machine that has no work yet, if there is any. fn next_idle_machine(&self) -> Option> { self.machines .iter() .find(|m| !m.has_work()) .map(|m| m.clone()) } } struct ThreadState { unparker: Unparker, parked: Arc, /// Used to transfer the machine into the thread. machine_sender: crossbeam_channel::Sender>, } /// An async runtime. pub struct Runtime { /// The reactor. reactor: Reactor, /// The global queue of tasks. injector: Injector, /// Handles to local queues for stealing work. stealers: Vec>, /// The scheduler state. sched: Mutex, #[cfg(feature = "tracing")] poll_count: atomic::AtomicUsize, } impl Runtime { /// Creates a new runtime. pub fn new() -> Runtime { let processors: Vec<_> = (0..*MAXPROCS).map(|_| Processor::new()).collect(); let stealers = processors.iter().map(|p| p.worker.stealer()).collect(); let threads = Vec::with_capacity(*MAXPROCS); Runtime { reactor: Reactor::new().unwrap(), injector: Injector::new(), stealers, #[cfg(feature = "tracing")] poll_count: atomic::AtomicUsize::new(0), sched: Mutex::new(Scheduler { processors, machines: Vec::with_capacity(*MAXPROCS), threads, polling: false, progress: false, }), } } /// Returns a reference to the reactor. pub fn reactor(&self) -> &Reactor { &self.reactor } /// Flushes the task slot so that tasks get run more fairly. pub fn yield_now(&self) { YIELD_NOW.with(|flag| flag.set(true)); } /// Schedules a task. pub fn schedule(&self, task: Runnable) { MACHINE.with(|machine| { // If the current thread is a worker thread, schedule it onto the current machine. // Otherwise, push it into the global task queue. match &*machine.borrow() { None => { self.injector.push(task); self.notify(); } Some(m) => m.schedule(&self, task), } }); } /// Runs the runtime on the current thread. pub fn run(&self) { scope(|s| { let mut idle = 0; let mut delay = 0; #[cfg(feature = "tracing")] s.builder() .name("async-std/trace".to_string()) .spawn(|_| { use log_update::LogUpdate; use std::io::stdout; let mut log_update = LogUpdate::new(stdout()).unwrap(); loop { let (thread_list, machine_list, processor_list, polling) = { let sched = self.sched.lock().unwrap(); let thread_list = sched .threads .iter() .map(|t| { if t.parked.load(Ordering::Relaxed) { "_" } else { "|" } }) .fold(String::new(), |mut s, curr| { s += " "; s += curr; s }); let machine_list = sched .machines .iter() .map(|m| match &*m.processor.lock() { Some(p) => { let len = p.worker.len() + p.slot.is_some() as usize; len.to_string() } None => "_".to_string(), }) .fold(String::new(), |mut s, curr| { s += " "; s += &curr; s }); let processor_list = sched .processors .iter() .map(|p| { let len = p.worker.len() + p.slot.is_some() as usize; len.to_string() }) .fold(String::new(), |mut s, curr| { s += " "; s += &curr; s }); (thread_list, machine_list, processor_list, sched.polling) }; let glen = self.injector.len(); let polls = self.poll_count.load(Ordering::Relaxed); let msg = format!( "GlobalQueue: {}\nPolls: {} - {}\nThreads:\n{}\nMachines:\n{}\nProcessors:\n{}\n", glen, polls,polling, thread_list, machine_list, processor_list ); log_update.render(&msg).unwrap(); thread::sleep(Duration::from_millis(10)); } }) .expect("failed to start tracing"); loop { // Get a list of new machines to start, if any need to be started. let machines = self.make_machines(); for m in machines { // println!("{} -- looking for thread", k); idle = 0; // println!("getting idle thread"); let sched = self.sched.lock().unwrap(); 'inner: for (i, thread) in sched.threads.iter().enumerate() { // grab the first parked thread if thread .parked .compare_and_swap(true, false, Ordering::Acquire) { // println!("unpark thread {}", i); // transfer the machine thread .machine_sender .send(m.clone()) .expect("failed to send machine to thread"); // unpark the thread thread.unparker.unpark(); // println!("{} found thread to unpark {}", k, i); break 'inner; } } let len = sched.threads.len(); drop(sched); // no idle thread available, check if we can spawn one if len < *MAXPROCS { let i = len; // println!("{} spawning thread {}", k, i); // we can spawn one, lets do it let parked = Arc::new(atomic::AtomicBool::new(false)); let parked2 = parked.clone(); let (machine_sender, machine_recv) = crossbeam_channel::bounded(1); let parker = Parker::new(); let unparker = parker.unparker().clone(); s.builder() .name("async-std/machine".to_string()) .spawn(move |_| { abort_on_panic(|| { loop { // println!("checking park loop {}", i); while parked2.load(Ordering::Acquire) { parker.park(); // TODO: shutdown if idle for too long } // println!("thread unparked {}", i); // when this thread is unparked, retrieve machine let m: Arc = machine_recv.recv().expect("failed to receive machine"); // store it in the thread local MACHINE.with(|machine| { *machine.borrow_mut() = Some(m.clone()); }); // run it m.run(self); // when run ends { // see if there are any available processors let mut sched = self.sched.lock().unwrap(); if let Some(p) = sched.processors.pop() { // get a machine if let Some(m) = sched.next_idle_machine(){ *m.processor.lock() = Some(p); MACHINE.with(|machine| { machine.borrow_mut().replace(m); }); continue; } } drop(sched); // go into parked mode, no work MACHINE.with(|machine| { *machine.borrow_mut() = None; }); parked2.store(true, Ordering::Relaxed); // println!("thread parked {}", i); } } }) }) .expect("cannot start a machine thread"); let mut sched = self.sched.lock().unwrap(); // transfer the machine machine_sender .send(m) .expect("failed to send machine to thread"); sched.threads.push(ThreadState { unparker, parked, machine_sender, }); drop(sched); } } // Sleep for a bit longer if the scheduler state hasn't changed in a while. if idle > 10 { delay = (delay * 2).min(10_000); } else { idle += 1; delay = 1000; } thread::sleep(Duration::from_micros(delay)); } }) .unwrap(); } /// Returns a list of machines that need to be started. fn make_machines(&self) -> Vec> { let mut sched = self.sched.lock().unwrap(); let mut to_start = Vec::new(); // If no machine has been polling the reactor in a while, that means the runtime is // overloaded with work and we need to start another machine. // // Also ensure that there are at least 2 running machiens to avoid starvation. if !sched.polling || sched.machines.len() < MIN_MACHINES { #[cfg(feature = "tracing")] self.poll_count.fetch_add(1, Ordering::Relaxed); // if !sched.progress { if let Some(p) = sched.processors.pop() { if let Some(m) = sched.next_idle_machine() { // find idle m *m.processor.lock() = Some(p); to_start.push(m.clone()); } else { // no idle m let m = Arc::new(Machine::new(p)); to_start.push(m.clone()); sched.machines.push(m); } } // } sched.progress = false; } to_start } /// Unparks a thread polling the reactor. fn notify(&self) { atomic::fence(Ordering::SeqCst); self.reactor.notify().unwrap(); } /// Attempts to poll the reactor without blocking on it. /// /// Returns `Ok(true)` if at least one new task was woken. /// /// This function might not poll the reactor at all so do not rely on it doing anything. Only /// use for optimization. fn quick_poll(&self) -> io::Result { if let Ok(sched) = self.sched.try_lock() { if !sched.polling { return self.reactor.poll(Some(Duration::from_secs(0))); } } Ok(false) } } /// A thread running a processor. struct Machine { /// Holds the processor until it gets stolen. processor: Spinlock>, } impl Machine { /// Creates a new machine running a processor. fn new(p: Processor) -> Machine { Machine { processor: Spinlock::new(Some(p)), } } fn has_work(&self) -> bool { if let Some(p) = &*self.processor.lock() { // TODO: is this the right check? p.has_work() } else { false } } /// Schedules a task onto the machine. fn schedule(&self, rt: &Runtime, task: Runnable) { match self.processor.lock().as_mut() { None => { rt.injector.push(task); rt.notify(); } Some(p) => p.schedule(rt, task), } } /// Finds the next runnable task. fn find_task(&self, rt: &Runtime) -> Steal { let mut retry = false; // First try finding a task in the local queue or in the global queue. if let Some(p) = self.processor.lock().as_mut() { if let Some(task) = p.pop_task() { return Steal::Success(task); } match p.steal_from_global(rt) { Steal::Empty => {} Steal::Retry => retry = true, Steal::Success(task) => return Steal::Success(task), } } // Try polling the reactor, but don't block on it. let progress = rt.quick_poll().unwrap(); // Try finding a task in the local queue, which might hold tasks woken by the reactor. If // the local queue is still empty, try stealing from other processors. if let Some(p) = self.processor.lock().as_mut() { if progress { if let Some(task) = p.pop_task() { return Steal::Success(task); } } match p.steal_from_others(rt) { Steal::Empty => {} Steal::Retry => retry = true, Steal::Success(task) => return Steal::Success(task), } } if retry { Steal::Retry } else { Steal::Empty } } /// Runs the machine on the current thread. fn run(&self, rt: &Runtime) { /// Number of yields when no runnable task is found. const YIELDS: u32 = 3; /// Number of short sleeps when no runnable task in found. const SLEEPS: u32 = 10; /// Number of runs in a row before the global queue is inspected. const RUNS: u32 = 64; // The number of times the thread found work in a row. let mut runs = 0; // The number of times the thread didn't find work in a row. let mut fails = 0; loop { // Check if `task::yield_now()` was invoked and flush the slot if so. YIELD_NOW.with(|flag| { if flag.replace(false) { if let Some(p) = self.processor.lock().as_mut() { p.flush_slot(rt); } } }); // After a number of runs in a row, do some work to ensure no task is left behind // indefinitely. Poll the reactor, steal tasks from the global queue, and flush the // task slot. if runs >= RUNS { runs = 0; rt.quick_poll().unwrap(); if let Some(p) = self.processor.lock().as_mut() { if let Steal::Success(task) = p.steal_from_global(rt) { p.schedule(rt, task); } p.flush_slot(rt); } } // Try to find a runnable task. if let Steal::Success(task) = self.find_task(rt) { task.run(); runs += 1; fails = 0; continue; } fails += 1; // Yield the current thread a few times. if fails <= YIELDS { thread::yield_now(); continue; } // Put the current thread to sleep a few times. if fails <= YIELDS + SLEEPS { let opt_p = self.processor.lock().take(); thread::sleep(Duration::from_micros(10)); *self.processor.lock() = opt_p; continue; } let mut sched = rt.sched.lock().unwrap(); // One final check for available tasks while the scheduler is locked. if let Some(task) = iter::repeat_with(|| self.find_task(rt)) .find(|s| !s.is_retry()) .and_then(|s| s.success()) { self.schedule(rt, task); continue; } // If another thread is already blocked on the reactor, there is no point in keeping // the current thread around since there is too little work to do. if sched.polling { break; } // Take out the machine associated with the current thread. let m = match sched .machines .iter() .position(|elem| ptr::eq(&**elem, self)) { None => break, // The processor was stolen. Some(pos) => sched.machines.swap_remove(pos), }; // Unlock the schedule poll the reactor until new I/O events arrive. // println!("polling start"); sched.polling = true; drop(sched); rt.reactor.poll(None).unwrap(); // Lock the scheduler again and re-register the machine. sched = rt.sched.lock().unwrap(); sched.polling = false; //println!("polling stop"); sched.machines.push(m); sched.progress = true; runs = 0; fails = 0; } // println!("thread break"); // When shutting down the thread, take the processor out if still available. let opt_p = self.processor.lock().take(); // println!("processor {:?}", opt_p.is_some()); // Return the processor to the scheduler and remove the machine. if let Some(p) = opt_p { // println!("returning processor to pool"); let mut sched = rt.sched.lock().unwrap(); sched.processors.push(p); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); } // println!("thread run stopped"); } } struct Processor { /// The local task queue. worker: Worker, /// Contains the next task to run as an optimization that skips the queue. slot: Option, } impl Processor { /// Creates a new processor. fn new() -> Processor { Processor { worker: Worker::new_fifo(), slot: None, } } /// Is there any available work for this processor? fn has_work(&self) -> bool { self.slot.is_some() || !self.worker.is_empty() } /// Schedules a task to run on this processor. fn schedule(&mut self, rt: &Runtime, task: Runnable) { match self.slot.replace(task) { None => {} Some(task) => { self.worker.push(task); rt.notify(); } } } /// Flushes a task from the slot into the local queue. fn flush_slot(&mut self, rt: &Runtime) { if let Some(task) = self.slot.take() { self.worker.push(task); rt.notify(); } } /// Pops a task from this processor. fn pop_task(&mut self) -> Option { self.slot.take().or_else(|| self.worker.pop()) } /// Steals a task from the global queue. fn steal_from_global(&self, rt: &Runtime) -> Steal { rt.injector.steal_batch_and_pop(&self.worker) } /// Steals a task from other processors. fn steal_from_others(&self, rt: &Runtime) -> Steal { // Pick a random starting point in the list of queues. let len = rt.stealers.len(); let start = random(len as u32) as usize; // Create an iterator over stealers that starts from the chosen point. let (l, r) = rt.stealers.split_at(start); let stealers = r.iter().chain(l.iter()); // Try stealing a batch of tasks from each queue. stealers .map(|s| s.steal_batch_and_pop(&self.worker)) .collect() } }