From 0c2282ffdc63fa1c9d1aab8d836675279805207c Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 13 Nov 2019 20:32:37 +0100 Subject: [PATCH] Optimization: a slot for the next task to run (#529) * Optimization: a slot for the next task to run * Only notify workers when a task is pushed into a queue --- benches/mutex.rs | 4 +- src/sync/mutex.rs | 1 + src/sync/waker_set.rs | 5 +- src/task/executor/pool.rs | 131 +++++++++++++++++++++++++------------- 4 files changed, 91 insertions(+), 50 deletions(-) diff --git a/benches/mutex.rs b/benches/mutex.rs index b159ba1..4f1910a 100644 --- a/benches/mutex.rs +++ b/benches/mutex.rs @@ -2,9 +2,7 @@ extern crate test; -use std::sync::Arc; - -use async_std::sync::Mutex; +use async_std::sync::{Arc, Mutex}; use async_std::task; use test::Bencher; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 2c0ac0c..4d2cf25 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -170,6 +170,7 @@ impl Mutex { /// # /// # }) /// ``` + #[inline] pub fn try_lock(&self) -> Option> { if !self.locked.swap(true, Ordering::SeqCst) { Some(MutexGuard(self)) diff --git a/src/sync/waker_set.rs b/src/sync/waker_set.rs index 5ba4cfb..7e897af 100644 --- a/src/sync/waker_set.rs +++ b/src/sync/waker_set.rs @@ -60,6 +60,7 @@ impl WakerSet { } /// Inserts a waker for a blocked operation and returns a key associated with it. + #[cold] pub fn insert(&self, cx: &Context<'_>) -> usize { let w = cx.waker().clone(); let mut inner = self.lock(); @@ -70,6 +71,7 @@ impl WakerSet { } /// Removes the waker of an operation. + #[cold] pub fn remove(&self, key: usize) { let mut inner = self.lock(); @@ -81,6 +83,7 @@ impl WakerSet { /// Removes the waker of a cancelled operation. /// /// Returns `true` if another blocked operation from the set was notified. + #[cold] pub fn cancel(&self, key: usize) -> bool { let mut inner = self.lock(); @@ -147,6 +150,7 @@ impl WakerSet { /// Notifies blocked operations, either one or all of them. /// /// Returns `true` if at least one operation was notified. + #[cold] fn notify(&self, n: Notify) -> bool { let mut inner = &mut *self.lock(); let mut notified = false; @@ -172,7 +176,6 @@ impl WakerSet { } /// Locks the list of entries. - #[cold] fn lock(&self) -> Lock<'_> { let backoff = Backoff::new(); while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 { diff --git a/src/task/executor/pool.rs b/src/task/executor/pool.rs index 1e74384..08694dd 100644 --- a/src/task/executor/pool.rs +++ b/src/task/executor/pool.rs @@ -1,10 +1,11 @@ -use std::cell::UnsafeCell; +use std::cell::Cell; use std::iter; use std::thread; use std::time::Duration; use crossbeam_deque::{Injector, Stealer, Worker}; use once_cell::sync::Lazy; +use once_cell::unsync::OnceCell; use crate::task::executor::Sleepers; use crate::task::Runnable; @@ -32,9 +33,18 @@ static POOL: Lazy = Lazy::new(|| { let worker = Worker::new_fifo(); stealers.push(worker.stealer()); + let proc = Processor { + worker, + slot: Cell::new(None), + slot_runs: Cell::new(0), + }; + thread::Builder::new() .name("async-std/executor".to_string()) - .spawn(|| abort_on_panic(|| main_loop(worker))) + .spawn(|| { + let _ = PROCESSOR.with(|p| p.set(proc)); + abort_on_panic(|| main_loop()); + }) .expect("cannot start a thread driving tasks"); } @@ -45,59 +55,75 @@ static POOL: Lazy = Lazy::new(|| { } }); +/// The state of a worker thread. +struct Processor { + /// The local task queue. + worker: Worker, + + /// Contains the next task to run as an optimization that skips queues. + slot: Cell>, + + /// How many times in a row tasks have been taked from the slot rather than the queue. + slot_runs: Cell, +} + thread_local! { - /// Local task queue associated with the current worker thread. - static QUEUE: UnsafeCell>> = UnsafeCell::new(None); + /// Worker thread state. + static PROCESSOR: OnceCell = OnceCell::new(); } /// Schedules a new runnable task for execution. pub(crate) fn schedule(task: Runnable) { - QUEUE.with(|queue| { - let local = unsafe { (*queue.get()).as_ref() }; - - // If the current thread is a worker thread, push the task into its local task queue. - // Otherwise, push it into the global task queue. - match local { - None => POOL.injector.push(task), - Some(q) => q.push(task), + PROCESSOR.with(|proc| { + // If the current thread is a worker thread, store it into its task slot or push it into + // its local task queue. Otherwise, push it into the global task queue. + match proc.get() { + Some(proc) => { + // Replace the task in the slot. + if let Some(task) = proc.slot.replace(Some(task)) { + // If the slot already contained a task, push it into the local task queue. + proc.worker.push(task); + POOL.sleepers.notify_one(); + } + } + None => { + POOL.injector.push(task); + POOL.sleepers.notify_one(); + } } - }); - - // Notify a sleeping worker that new work just came in. - POOL.sleepers.notify_one(); + }) } /// Main loop running a worker thread. -fn main_loop(local: Worker) { - // Initialize the local task queue. - QUEUE.with(|queue| unsafe { *queue.get() = Some(local) }); +fn main_loop() { + /// Number of yields when no runnable task is found. + const YIELDS: u32 = 3; + /// Number of short sleeps when no runnable task in found. + const SLEEPS: u32 = 1; // The number of times the thread didn't find work in a row. - let mut step = 0; + let mut fails = 0; loop { // Try to find a runnable task. match find_runnable() { Some(task) => { - // Found. Now run the task. + fails = 0; + + // Run the found task. task.run(); - step = 0; } None => { + fails += 1; + // Yield the current thread or put it to sleep. - match step { - 0..=2 => { - thread::yield_now(); - step += 1; - } - 3 => { - thread::sleep(Duration::from_micros(10)); - step += 1; - } - _ => { - POOL.sleepers.wait(); - step = 0; - } + if fails <= YIELDS { + thread::yield_now(); + } else if fails <= YIELDS + SLEEPS { + thread::sleep(Duration::from_micros(10)); + } else { + POOL.sleepers.wait(); + fails = 0; } } } @@ -106,29 +132,42 @@ fn main_loop(local: Worker) { /// Find the next runnable task. fn find_runnable() -> Option { - let pool = &*POOL; - - QUEUE.with(|queue| { - let local = unsafe { (*queue.get()).as_ref().unwrap() }; + /// Maximum number of times the slot can be used in a row. + const SLOT_LIMIT: u32 = 16; + + PROCESSOR.with(|proc| { + let proc = proc.get().unwrap(); + + // Try taking a task from the slot. + let runs = proc.slot_runs.get(); + if runs < SLOT_LIMIT { + if let Some(task) = proc.slot.take() { + proc.slot_runs.set(runs + 1); + return Some(task); + } + } + proc.slot_runs.set(0); // Pop a task from the local queue, if not empty. - local.pop().or_else(|| { + proc.worker.pop().or_else(|| { // Otherwise, we need to look for a task elsewhere. iter::repeat_with(|| { // Try stealing a batch of tasks from the global queue. - pool.injector - .steal_batch_and_pop(&local) + POOL.injector + .steal_batch_and_pop(&proc.worker) // Or try stealing a batch of tasks from one of the other threads. .or_else(|| { // First, pick a random starting point in the list of local queues. - let len = pool.stealers.len(); + let len = POOL.stealers.len(); let start = random(len as u32) as usize; // Try stealing a batch of tasks from each local queue starting from the // chosen point. - let (l, r) = pool.stealers.split_at(start); - let rotated = r.iter().chain(l.iter()); - rotated.map(|s| s.steal_batch_and_pop(&local)).collect() + let (l, r) = POOL.stealers.split_at(start); + let stealers = r.iter().chain(l.iter()); + stealers + .map(|s| s.steal_batch_and_pop(&proc.worker)) + .collect() }) }) // Loop while no task was stolen and any steal operation needs to be retried.