diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index a7b7552..3a08bb6 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -1,9 +1,8 @@ use std::cell::Cell; use std::io; use std::iter; -use std::ptr; -use std::sync::atomic::{self, AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::atomic::{self, Ordering}; +use std::sync::Arc; use std::thread; use std::time::Duration; @@ -12,7 +11,6 @@ use crossbeam_utils::thread::scope; use once_cell::unsync::OnceCell; use crate::rt::Reactor; -use crate::sync::Spinlock; use crate::task::Runnable; use crate::utils::{abort_on_panic, random}; @@ -24,21 +22,6 @@ thread_local! { static YIELD_NOW: Cell = Cell::new(false); } -/// Scheduler state. -struct Scheduler { - /// Set to `true` every time before a machine blocks polling the reactor. - progress: bool, - - /// Set to `true` while a machine is polling the reactor. - polling: bool, - - /// Idle processors. - processors: Vec, - - /// Running machines. - machines: Vec>, -} - /// An async runtime. pub struct Runtime { /// The reactor. @@ -50,8 +33,7 @@ pub struct Runtime { /// Handles to local queues for stealing work. stealers: Vec>, - /// The scheduler state. - sched: Mutex, + machines: Vec>, } impl Runtime { @@ -59,18 +41,22 @@ impl Runtime { pub fn new() -> Runtime { let cpus = num_cpus::get().max(1); let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect(); - let stealers = processors.iter().map(|p| p.worker.stealer()).collect(); + + let machines: Vec<_> = processors + .into_iter() + .map(|p| Arc::new(Machine::new(p))) + .collect(); + + let stealers = machines + .iter() + .map(|m| m.processor.worker.stealer()) + .collect(); Runtime { reactor: Reactor::new().unwrap(), injector: Injector::new(), stealers, - sched: Mutex::new(Scheduler { - processors, - machines: Vec::new(), - progress: false, - polling: false, - }), + machines, } } @@ -102,74 +88,21 @@ impl Runtime { /// Runs the runtime on the current thread. pub fn run(&self) { scope(|s| { - let mut idle = 0; - let mut delay = 0; - - loop { - // Get a list of new machines to start, if any need to be started. - for m in self.make_machines() { - idle = 0; - - s.builder() - .name("async-std/machine".to_string()) - .spawn(move |_| { - abort_on_panic(|| { - let _ = MACHINE.with(|machine| machine.set(m.clone())); - m.run(self); - }) + for m in &self.machines { + s.builder() + .name("async-std/machine".to_string()) + .spawn(move |_| { + abort_on_panic(|| { + let _ = MACHINE.with(|machine| machine.set(m.clone())); + m.run(self); }) - .expect("cannot start a machine thread"); - } - - // Sleep for a bit longer if the scheduler state hasn't changed in a while. - if idle > 10 { - delay = (delay * 2).min(10_000); - } else { - idle += 1; - delay = 1000; - } - - thread::sleep(Duration::from_micros(delay)); + }) + .expect("cannot start a machine thread"); } }) .unwrap(); } - /// Returns a list of machines that need to be started. - fn make_machines(&self) -> Vec> { - let mut sched = self.sched.lock().unwrap(); - let mut to_start = Vec::new(); - - // If there is a machine that is stuck on a task and not making any progress, steal its - // processor and set up a new machine to take over. - for m in &mut sched.machines { - if !m.progress.swap(false, Ordering::SeqCst) { - let opt_p = m.processor.try_lock().and_then(|mut p| p.take()); - - if let Some(p) = opt_p { - *m = Arc::new(Machine::new(p)); - to_start.push(m.clone()); - } - } - } - - // If no machine has been polling the reactor in a while, that means the runtime is - // overloaded with work and we need to start another machine. - if !sched.polling { - if !sched.progress { - if let Some(p) = sched.processors.pop() { - let m = Arc::new(Machine::new(p)); - to_start.push(m.clone()); - sched.machines.push(m); - } - } - - sched.progress = false; - } - - to_start - } - /// Unparks a thread polling the reactor. fn notify(&self) { atomic::fence(Ordering::SeqCst); @@ -183,42 +116,28 @@ impl Runtime { /// This function might not poll the reactor at all so do not rely on it doing anything. Only /// use for optimization. fn quick_poll(&self) -> io::Result { - if let Ok(sched) = self.sched.try_lock() { - if !sched.polling { - return self.reactor.poll(Some(Duration::from_secs(0))); - } - } - Ok(false) + return self.reactor.poll(Some(Duration::from_secs(0))); } } /// A thread running a processor. struct Machine { /// Holds the processor until it gets stolen. - processor: Spinlock>, - - /// Gets set to `true` before running every task to indicate the machine is not stuck. - progress: AtomicBool, + processor: Processor, } +unsafe impl Send for Machine {} +unsafe impl Sync for Machine {} + impl Machine { /// Creates a new machine running a processor. fn new(p: Processor) -> Machine { - Machine { - processor: Spinlock::new(Some(p)), - progress: AtomicBool::new(true), - } + Machine { processor: p } } /// Schedules a task onto the machine. fn schedule(&self, rt: &Runtime, task: Runnable) { - match self.processor.lock().as_mut() { - None => { - rt.injector.push(task); - rt.notify(); - } - Some(p) => p.schedule(rt, task), - } + self.processor.schedule(rt, task); } /// Finds the next runnable task. @@ -226,16 +145,14 @@ impl Machine { let mut retry = false; // First try finding a task in the local queue or in the global queue. - if let Some(p) = self.processor.lock().as_mut() { - if let Some(task) = p.pop_task() { - return Steal::Success(task); - } + if let Some(task) = self.processor.pop_task() { + return Steal::Success(task); + } - match p.steal_from_global(rt) { - Steal::Empty => {} - Steal::Retry => retry = true, - Steal::Success(task) => return Steal::Success(task), - } + match self.processor.steal_from_global(rt) { + Steal::Empty => {} + Steal::Retry => retry = true, + Steal::Success(task) => return Steal::Success(task), } // Try polling the reactor, but don't block on it. @@ -243,18 +160,16 @@ impl Machine { // Try finding a task in the local queue, which might hold tasks woken by the reactor. If // the local queue is still empty, try stealing from other processors. - if let Some(p) = self.processor.lock().as_mut() { - if progress { - if let Some(task) = p.pop_task() { - return Steal::Success(task); - } + if progress { + if let Some(task) = self.processor.pop_task() { + return Steal::Success(task); } + } - match p.steal_from_others(rt) { - Steal::Empty => {} - Steal::Retry => retry = true, - Steal::Success(task) => return Steal::Success(task), - } + match self.processor.steal_from_others(rt) { + Steal::Empty => {} + Steal::Retry => retry = true, + Steal::Success(task) => return Steal::Success(task), } if retry { Steal::Retry } else { Steal::Empty } @@ -275,15 +190,10 @@ impl Machine { let mut fails = 0; loop { - // let the scheduler know this machine is making progress. - self.progress.store(true, Ordering::SeqCst); - // Check if `task::yield_now()` was invoked and flush the slot if so. YIELD_NOW.with(|flag| { if flag.replace(false) { - if let Some(p) = self.processor.lock().as_mut() { - p.flush_slot(rt); - } + self.processor.flush_slot(rt); } }); @@ -294,13 +204,11 @@ impl Machine { runs = 0; rt.quick_poll().unwrap(); - if let Some(p) = self.processor.lock().as_mut() { - if let Steal::Success(task) = p.steal_from_global(rt) { - p.schedule(rt, task); - } - - p.flush_slot(rt); + if let Steal::Success(task) = self.processor.steal_from_global(rt) { + self.processor.schedule(rt, task); } + + self.processor.flush_slot(rt); } // Try to find a runnable task. @@ -313,11 +221,6 @@ impl Machine { fails += 1; - // Check if the processor was stolen. - if self.processor.lock().is_none() { - break; - } - // Yield the current thread a few times. if fails <= YIELDS { thread::yield_now(); @@ -326,14 +229,10 @@ impl Machine { // Put the current thread to sleep a few times. if fails <= YIELDS + SLEEPS { - let opt_p = self.processor.lock().take(); thread::sleep(Duration::from_micros(10)); - *self.processor.lock() = opt_p; continue; } - let mut sched = rt.sched.lock().unwrap(); - // One final check for available tasks while the scheduler is locked. if let Some(task) = iter::repeat_with(|| self.find_task(rt)) .find(|s| !s.is_retry()) @@ -343,46 +242,11 @@ impl Machine { continue; } - // If another thread is already blocked on the reactor, there is no point in keeping - // the current thread around since there is too little work to do. - if sched.polling { - break; - } - - // Take out the machine associated with the current thread. - let m = match sched - .machines - .iter() - .position(|elem| ptr::eq(&**elem, self)) - { - None => break, // The processor was stolen. - Some(pos) => sched.machines.swap_remove(pos), - }; - - // Unlock the schedule poll the reactor until new I/O events arrive. - sched.polling = true; - drop(sched); rt.reactor.poll(None).unwrap(); - // Lock the scheduler again and re-register the machine. - sched = rt.sched.lock().unwrap(); - sched.polling = false; - sched.machines.push(m); - sched.progress = true; - runs = 0; fails = 0; } - - // When shutting down the thread, take the processor out if still available. - let opt_p = self.processor.lock().take(); - - // Return the processor to the scheduler and remove the machine. - if let Some(p) = opt_p { - let mut sched = rt.sched.lock().unwrap(); - sched.processors.push(p); - sched.machines.retain(|elem| !ptr::eq(&**elem, self)); - } } } @@ -391,7 +255,7 @@ struct Processor { worker: Worker, /// Contains the next task to run as an optimization that skips the queue. - slot: Option, + slot: Cell>, } impl Processor { @@ -399,13 +263,13 @@ impl Processor { fn new() -> Processor { Processor { worker: Worker::new_fifo(), - slot: None, + slot: Cell::new(None), } } /// Schedules a task to run on this processor. - fn schedule(&mut self, rt: &Runtime, task: Runnable) { - match self.slot.replace(task) { + fn schedule(&self, rt: &Runtime, task: Runnable) { + match self.slot.replace(Some(task)) { None => {} Some(task) => { self.worker.push(task); @@ -415,7 +279,7 @@ impl Processor { } /// Flushes a task from the slot into the local queue. - fn flush_slot(&mut self, rt: &Runtime) { + fn flush_slot(&self, rt: &Runtime) { if let Some(task) = self.slot.take() { self.worker.push(task); rt.notify(); @@ -423,17 +287,17 @@ impl Processor { } /// Pops a task from this processor. - fn pop_task(&mut self) -> Option { + fn pop_task(&self) -> Option { self.slot.take().or_else(|| self.worker.pop()) } /// Steals a task from the global queue. - fn steal_from_global(&mut self, rt: &Runtime) -> Steal { + fn steal_from_global(&self, rt: &Runtime) -> Steal { rt.injector.steal_batch_and_pop(&self.worker) } /// Steals a task from other processors. - fn steal_from_others(&mut self, rt: &Runtime) -> Steal { + fn steal_from_others(&self, rt: &Runtime) -> Steal { // Pick a random starting point in the list of queues. let len = rt.stealers.len(); let start = random(len as u32) as usize; diff --git a/src/sync/mod.rs b/src/sync/mod.rs index caaf7f7..c221165 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -178,11 +178,9 @@ pub use std::sync::{Arc, Weak}; pub use mutex::{Mutex, MutexGuard}; pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -pub(crate) use spin_lock::Spinlock; mod mutex; mod rwlock; -mod spin_lock; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; diff --git a/src/sync/spin_lock.rs b/src/sync/spin_lock.rs deleted file mode 100644 index 48ad15d..0000000 --- a/src/sync/spin_lock.rs +++ /dev/null @@ -1,96 +0,0 @@ -use std::cell::UnsafeCell; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicBool, Ordering}; - -use crossbeam_utils::Backoff; - -/// A simple spinlock. -#[derive(Debug)] -pub struct Spinlock { - locked: AtomicBool, - value: UnsafeCell, -} - -unsafe impl Send for Spinlock {} -unsafe impl Sync for Spinlock {} - -impl Spinlock { - /// Returns a new spinlock initialized with `value`. - pub const fn new(value: T) -> Spinlock { - Spinlock { - locked: AtomicBool::new(false), - value: UnsafeCell::new(value), - } - } - - /// Locks the spinlock. - pub fn lock(&self) -> SpinlockGuard<'_, T> { - let backoff = Backoff::new(); - while self.locked.compare_and_swap(false, true, Ordering::Acquire) { - backoff.snooze(); - } - SpinlockGuard { parent: self } - } - - /// Attempts to lock the spinlock. - pub fn try_lock(&self) -> Option> { - if self.locked.swap(true, Ordering::Acquire) { - None - } else { - Some(SpinlockGuard { parent: self }) - } - } -} - -/// A guard holding a spinlock locked. -#[derive(Debug)] -pub struct SpinlockGuard<'a, T> { - parent: &'a Spinlock, -} - -unsafe impl Send for SpinlockGuard<'_, T> {} -unsafe impl Sync for SpinlockGuard<'_, T> {} - -impl<'a, T> Drop for SpinlockGuard<'a, T> { - fn drop(&mut self) { - self.parent.locked.store(false, Ordering::Release); - } -} - -impl<'a, T> Deref for SpinlockGuard<'a, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.parent.value.get() } - } -} - -impl<'a, T> DerefMut for SpinlockGuard<'a, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.parent.value.get() } - } -} - -#[test] -fn spinlock() { - crate::task::block_on(async { - use crate::sync::{Arc, Spinlock}; - use crate::task; - - let m = Arc::new(Spinlock::new(0)); - let mut tasks = vec![]; - - for _ in 0..10 { - let m = m.clone(); - tasks.push(task::spawn(async move { - *m.lock() += 1; - })); - } - - for t in tasks { - t.await; - } - assert_eq!(*m.lock(), 10); - - }) -}