Remove scheduler state

mio-0-7
k-nasa 5 years ago
parent d7ee29a03f
commit 24c5dbf949

@ -1,9 +1,8 @@
use std::cell::Cell; use std::cell::Cell;
use std::io; use std::io;
use std::iter; use std::iter;
use std::ptr; use std::sync::atomic::{self, Ordering};
use std::sync::atomic::{self, AtomicBool, Ordering}; use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -12,7 +11,6 @@ use crossbeam_utils::thread::scope;
use once_cell::unsync::OnceCell; use once_cell::unsync::OnceCell;
use crate::rt::Reactor; use crate::rt::Reactor;
use crate::sync::Spinlock;
use crate::task::Runnable; use crate::task::Runnable;
use crate::utils::{abort_on_panic, random}; use crate::utils::{abort_on_panic, random};
@ -24,21 +22,6 @@ thread_local! {
static YIELD_NOW: Cell<bool> = Cell::new(false); static YIELD_NOW: Cell<bool> = Cell::new(false);
} }
/// Scheduler state.
struct Scheduler {
/// Set to `true` every time before a machine blocks polling the reactor.
progress: bool,
/// Set to `true` while a machine is polling the reactor.
polling: bool,
/// Idle processors.
processors: Vec<Processor>,
/// Running machines.
machines: Vec<Arc<Machine>>,
}
/// An async runtime. /// An async runtime.
pub struct Runtime { pub struct Runtime {
/// The reactor. /// The reactor.
@ -50,8 +33,7 @@ pub struct Runtime {
/// Handles to local queues for stealing work. /// Handles to local queues for stealing work.
stealers: Vec<Stealer<Runnable>>, stealers: Vec<Stealer<Runnable>>,
/// The scheduler state. machines: Vec<Arc<Machine>>,
sched: Mutex<Scheduler>,
} }
impl Runtime { impl Runtime {
@ -59,18 +41,22 @@ impl Runtime {
pub fn new() -> Runtime { pub fn new() -> Runtime {
let cpus = num_cpus::get().max(1); let cpus = num_cpus::get().max(1);
let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect(); let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect();
let stealers = processors.iter().map(|p| p.worker.stealer()).collect();
let machines: Vec<_> = processors
.into_iter()
.map(|p| Arc::new(Machine::new(p)))
.collect();
let stealers = machines
.iter()
.map(|m| m.processor.worker.stealer())
.collect();
Runtime { Runtime {
reactor: Reactor::new().unwrap(), reactor: Reactor::new().unwrap(),
injector: Injector::new(), injector: Injector::new(),
stealers, stealers,
sched: Mutex::new(Scheduler { machines,
processors,
machines: Vec::new(),
progress: false,
polling: false,
}),
} }
} }
@ -102,74 +88,21 @@ impl Runtime {
/// Runs the runtime on the current thread. /// Runs the runtime on the current thread.
pub fn run(&self) { pub fn run(&self) {
scope(|s| { scope(|s| {
let mut idle = 0; for m in &self.machines {
let mut delay = 0; s.builder()
.name("async-std/machine".to_string())
loop { .spawn(move |_| {
// Get a list of new machines to start, if any need to be started. abort_on_panic(|| {
for m in self.make_machines() { let _ = MACHINE.with(|machine| machine.set(m.clone()));
idle = 0; m.run(self);
s.builder()
.name("async-std/machine".to_string())
.spawn(move |_| {
abort_on_panic(|| {
let _ = MACHINE.with(|machine| machine.set(m.clone()));
m.run(self);
})
}) })
.expect("cannot start a machine thread"); })
} .expect("cannot start a machine thread");
// Sleep for a bit longer if the scheduler state hasn't changed in a while.
if idle > 10 {
delay = (delay * 2).min(10_000);
} else {
idle += 1;
delay = 1000;
}
thread::sleep(Duration::from_micros(delay));
} }
}) })
.unwrap(); .unwrap();
} }
/// Returns a list of machines that need to be started.
fn make_machines(&self) -> Vec<Arc<Machine>> {
let mut sched = self.sched.lock().unwrap();
let mut to_start = Vec::new();
// If there is a machine that is stuck on a task and not making any progress, steal its
// processor and set up a new machine to take over.
for m in &mut sched.machines {
if !m.progress.swap(false, Ordering::SeqCst) {
let opt_p = m.processor.try_lock().and_then(|mut p| p.take());
if let Some(p) = opt_p {
*m = Arc::new(Machine::new(p));
to_start.push(m.clone());
}
}
}
// If no machine has been polling the reactor in a while, that means the runtime is
// overloaded with work and we need to start another machine.
if !sched.polling {
if !sched.progress {
if let Some(p) = sched.processors.pop() {
let m = Arc::new(Machine::new(p));
to_start.push(m.clone());
sched.machines.push(m);
}
}
sched.progress = false;
}
to_start
}
/// Unparks a thread polling the reactor. /// Unparks a thread polling the reactor.
fn notify(&self) { fn notify(&self) {
atomic::fence(Ordering::SeqCst); atomic::fence(Ordering::SeqCst);
@ -183,42 +116,28 @@ impl Runtime {
/// This function might not poll the reactor at all so do not rely on it doing anything. Only /// This function might not poll the reactor at all so do not rely on it doing anything. Only
/// use for optimization. /// use for optimization.
fn quick_poll(&self) -> io::Result<bool> { fn quick_poll(&self) -> io::Result<bool> {
if let Ok(sched) = self.sched.try_lock() { return self.reactor.poll(Some(Duration::from_secs(0)));
if !sched.polling {
return self.reactor.poll(Some(Duration::from_secs(0)));
}
}
Ok(false)
} }
} }
/// A thread running a processor. /// A thread running a processor.
struct Machine { struct Machine {
/// Holds the processor until it gets stolen. /// Holds the processor until it gets stolen.
processor: Spinlock<Option<Processor>>, processor: Processor,
/// Gets set to `true` before running every task to indicate the machine is not stuck.
progress: AtomicBool,
} }
unsafe impl Send for Machine {}
unsafe impl Sync for Machine {}
impl Machine { impl Machine {
/// Creates a new machine running a processor. /// Creates a new machine running a processor.
fn new(p: Processor) -> Machine { fn new(p: Processor) -> Machine {
Machine { Machine { processor: p }
processor: Spinlock::new(Some(p)),
progress: AtomicBool::new(true),
}
} }
/// Schedules a task onto the machine. /// Schedules a task onto the machine.
fn schedule(&self, rt: &Runtime, task: Runnable) { fn schedule(&self, rt: &Runtime, task: Runnable) {
match self.processor.lock().as_mut() { self.processor.schedule(rt, task);
None => {
rt.injector.push(task);
rt.notify();
}
Some(p) => p.schedule(rt, task),
}
} }
/// Finds the next runnable task. /// Finds the next runnable task.
@ -226,16 +145,14 @@ impl Machine {
let mut retry = false; let mut retry = false;
// First try finding a task in the local queue or in the global queue. // First try finding a task in the local queue or in the global queue.
if let Some(p) = self.processor.lock().as_mut() { if let Some(task) = self.processor.pop_task() {
if let Some(task) = p.pop_task() { return Steal::Success(task);
return Steal::Success(task); }
}
match p.steal_from_global(rt) { match self.processor.steal_from_global(rt) {
Steal::Empty => {} Steal::Empty => {}
Steal::Retry => retry = true, Steal::Retry => retry = true,
Steal::Success(task) => return Steal::Success(task), Steal::Success(task) => return Steal::Success(task),
}
} }
// Try polling the reactor, but don't block on it. // Try polling the reactor, but don't block on it.
@ -243,18 +160,16 @@ impl Machine {
// Try finding a task in the local queue, which might hold tasks woken by the reactor. If // Try finding a task in the local queue, which might hold tasks woken by the reactor. If
// the local queue is still empty, try stealing from other processors. // the local queue is still empty, try stealing from other processors.
if let Some(p) = self.processor.lock().as_mut() { if progress {
if progress { if let Some(task) = self.processor.pop_task() {
if let Some(task) = p.pop_task() { return Steal::Success(task);
return Steal::Success(task);
}
} }
}
match p.steal_from_others(rt) { match self.processor.steal_from_others(rt) {
Steal::Empty => {} Steal::Empty => {}
Steal::Retry => retry = true, Steal::Retry => retry = true,
Steal::Success(task) => return Steal::Success(task), Steal::Success(task) => return Steal::Success(task),
}
} }
if retry { Steal::Retry } else { Steal::Empty } if retry { Steal::Retry } else { Steal::Empty }
@ -275,15 +190,10 @@ impl Machine {
let mut fails = 0; let mut fails = 0;
loop { loop {
// let the scheduler know this machine is making progress.
self.progress.store(true, Ordering::SeqCst);
// Check if `task::yield_now()` was invoked and flush the slot if so. // Check if `task::yield_now()` was invoked and flush the slot if so.
YIELD_NOW.with(|flag| { YIELD_NOW.with(|flag| {
if flag.replace(false) { if flag.replace(false) {
if let Some(p) = self.processor.lock().as_mut() { self.processor.flush_slot(rt);
p.flush_slot(rt);
}
} }
}); });
@ -294,13 +204,11 @@ impl Machine {
runs = 0; runs = 0;
rt.quick_poll().unwrap(); rt.quick_poll().unwrap();
if let Some(p) = self.processor.lock().as_mut() { if let Steal::Success(task) = self.processor.steal_from_global(rt) {
if let Steal::Success(task) = p.steal_from_global(rt) { self.processor.schedule(rt, task);
p.schedule(rt, task);
}
p.flush_slot(rt);
} }
self.processor.flush_slot(rt);
} }
// Try to find a runnable task. // Try to find a runnable task.
@ -313,11 +221,6 @@ impl Machine {
fails += 1; fails += 1;
// Check if the processor was stolen.
if self.processor.lock().is_none() {
break;
}
// Yield the current thread a few times. // Yield the current thread a few times.
if fails <= YIELDS { if fails <= YIELDS {
thread::yield_now(); thread::yield_now();
@ -326,14 +229,10 @@ impl Machine {
// Put the current thread to sleep a few times. // Put the current thread to sleep a few times.
if fails <= YIELDS + SLEEPS { if fails <= YIELDS + SLEEPS {
let opt_p = self.processor.lock().take();
thread::sleep(Duration::from_micros(10)); thread::sleep(Duration::from_micros(10));
*self.processor.lock() = opt_p;
continue; continue;
} }
let mut sched = rt.sched.lock().unwrap();
// One final check for available tasks while the scheduler is locked. // One final check for available tasks while the scheduler is locked.
if let Some(task) = iter::repeat_with(|| self.find_task(rt)) if let Some(task) = iter::repeat_with(|| self.find_task(rt))
.find(|s| !s.is_retry()) .find(|s| !s.is_retry())
@ -343,46 +242,11 @@ impl Machine {
continue; continue;
} }
// If another thread is already blocked on the reactor, there is no point in keeping
// the current thread around since there is too little work to do.
if sched.polling {
break;
}
// Take out the machine associated with the current thread.
let m = match sched
.machines
.iter()
.position(|elem| ptr::eq(&**elem, self))
{
None => break, // The processor was stolen.
Some(pos) => sched.machines.swap_remove(pos),
};
// Unlock the schedule poll the reactor until new I/O events arrive.
sched.polling = true;
drop(sched);
rt.reactor.poll(None).unwrap(); rt.reactor.poll(None).unwrap();
// Lock the scheduler again and re-register the machine.
sched = rt.sched.lock().unwrap();
sched.polling = false;
sched.machines.push(m);
sched.progress = true;
runs = 0; runs = 0;
fails = 0; fails = 0;
} }
// When shutting down the thread, take the processor out if still available.
let opt_p = self.processor.lock().take();
// Return the processor to the scheduler and remove the machine.
if let Some(p) = opt_p {
let mut sched = rt.sched.lock().unwrap();
sched.processors.push(p);
sched.machines.retain(|elem| !ptr::eq(&**elem, self));
}
} }
} }
@ -391,7 +255,7 @@ struct Processor {
worker: Worker<Runnable>, worker: Worker<Runnable>,
/// Contains the next task to run as an optimization that skips the queue. /// Contains the next task to run as an optimization that skips the queue.
slot: Option<Runnable>, slot: Cell<Option<Runnable>>,
} }
impl Processor { impl Processor {
@ -399,13 +263,13 @@ impl Processor {
fn new() -> Processor { fn new() -> Processor {
Processor { Processor {
worker: Worker::new_fifo(), worker: Worker::new_fifo(),
slot: None, slot: Cell::new(None),
} }
} }
/// Schedules a task to run on this processor. /// Schedules a task to run on this processor.
fn schedule(&mut self, rt: &Runtime, task: Runnable) { fn schedule(&self, rt: &Runtime, task: Runnable) {
match self.slot.replace(task) { match self.slot.replace(Some(task)) {
None => {} None => {}
Some(task) => { Some(task) => {
self.worker.push(task); self.worker.push(task);
@ -415,7 +279,7 @@ impl Processor {
} }
/// Flushes a task from the slot into the local queue. /// Flushes a task from the slot into the local queue.
fn flush_slot(&mut self, rt: &Runtime) { fn flush_slot(&self, rt: &Runtime) {
if let Some(task) = self.slot.take() { if let Some(task) = self.slot.take() {
self.worker.push(task); self.worker.push(task);
rt.notify(); rt.notify();
@ -423,17 +287,17 @@ impl Processor {
} }
/// Pops a task from this processor. /// Pops a task from this processor.
fn pop_task(&mut self) -> Option<Runnable> { fn pop_task(&self) -> Option<Runnable> {
self.slot.take().or_else(|| self.worker.pop()) self.slot.take().or_else(|| self.worker.pop())
} }
/// Steals a task from the global queue. /// Steals a task from the global queue.
fn steal_from_global(&mut self, rt: &Runtime) -> Steal<Runnable> { fn steal_from_global(&self, rt: &Runtime) -> Steal<Runnable> {
rt.injector.steal_batch_and_pop(&self.worker) rt.injector.steal_batch_and_pop(&self.worker)
} }
/// Steals a task from other processors. /// Steals a task from other processors.
fn steal_from_others(&mut self, rt: &Runtime) -> Steal<Runnable> { fn steal_from_others(&self, rt: &Runtime) -> Steal<Runnable> {
// Pick a random starting point in the list of queues. // Pick a random starting point in the list of queues.
let len = rt.stealers.len(); let len = rt.stealers.len();
let start = random(len as u32) as usize; let start = random(len as u32) as usize;

@ -178,11 +178,9 @@ pub use std::sync::{Arc, Weak};
pub use mutex::{Mutex, MutexGuard}; pub use mutex::{Mutex, MutexGuard};
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
pub(crate) use spin_lock::Spinlock;
mod mutex; mod mutex;
mod rwlock; mod rwlock;
mod spin_lock;
cfg_unstable! { cfg_unstable! {
pub use barrier::{Barrier, BarrierWaitResult}; pub use barrier::{Barrier, BarrierWaitResult};

@ -1,96 +0,0 @@
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicBool, Ordering};
use crossbeam_utils::Backoff;
/// A simple spinlock.
#[derive(Debug)]
pub struct Spinlock<T> {
locked: AtomicBool,
value: UnsafeCell<T>,
}
unsafe impl<T: Send> Send for Spinlock<T> {}
unsafe impl<T: Send> Sync for Spinlock<T> {}
impl<T> Spinlock<T> {
/// Returns a new spinlock initialized with `value`.
pub const fn new(value: T) -> Spinlock<T> {
Spinlock {
locked: AtomicBool::new(false),
value: UnsafeCell::new(value),
}
}
/// Locks the spinlock.
pub fn lock(&self) -> SpinlockGuard<'_, T> {
let backoff = Backoff::new();
while self.locked.compare_and_swap(false, true, Ordering::Acquire) {
backoff.snooze();
}
SpinlockGuard { parent: self }
}
/// Attempts to lock the spinlock.
pub fn try_lock(&self) -> Option<SpinlockGuard<'_, T>> {
if self.locked.swap(true, Ordering::Acquire) {
None
} else {
Some(SpinlockGuard { parent: self })
}
}
}
/// A guard holding a spinlock locked.
#[derive(Debug)]
pub struct SpinlockGuard<'a, T> {
parent: &'a Spinlock<T>,
}
unsafe impl<T: Send> Send for SpinlockGuard<'_, T> {}
unsafe impl<T: Sync> Sync for SpinlockGuard<'_, T> {}
impl<'a, T> Drop for SpinlockGuard<'a, T> {
fn drop(&mut self) {
self.parent.locked.store(false, Ordering::Release);
}
}
impl<'a, T> Deref for SpinlockGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.parent.value.get() }
}
}
impl<'a, T> DerefMut for SpinlockGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.parent.value.get() }
}
}
#[test]
fn spinlock() {
crate::task::block_on(async {
use crate::sync::{Arc, Spinlock};
use crate::task;
let m = Arc::new(Spinlock::new(0));
let mut tasks = vec![];
for _ in 0..10 {
let m = m.clone();
tasks.push(task::spawn(async move {
*m.lock() += 1;
}));
}
for t in tasks {
t.await;
}
assert_eq!(*m.lock(), 10);
})
}
Loading…
Cancel
Save