diff --git a/Cargo.toml b/Cargo.toml index 7c4613b..e5ae02d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ default = [ "async-task", "crossbeam-channel", "crossbeam-deque", + "crossbeam-queue", "futures-timer", "kv-log-macro", "log", @@ -56,6 +57,7 @@ async-task = { version = "1.0.0", optional = true } broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] } crossbeam-channel = { version = "0.4.0", optional = true } crossbeam-deque = { version = "0.7.2", optional = true } +crossbeam-queue = { version = "0.2.0", optional = true } crossbeam-utils = { version = "0.7.0", optional = true } futures-core = { version = "0.3.1", optional = true } futures-io = { version = "0.3.1", optional = true } diff --git a/src/lib.rs b/src/lib.rs index d0c87ff..070df88 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -246,6 +246,8 @@ cfg_std! { pub mod stream; pub mod sync; pub mod task; + + pub(crate) mod rt; } cfg_default! { diff --git a/src/net/mod.rs b/src/net/mod.rs index 29e4309..fe83d3b 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -66,6 +66,5 @@ pub use tcp::{Incoming, TcpListener, TcpStream}; pub use udp::UdpSocket; mod addr; -pub(crate) mod driver; mod tcp; mod udp; diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index fe06a96..b389518 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use crate::future; use crate::io; -use crate::net::driver::Watcher; +use crate::rt::Watcher; use crate::net::{TcpStream, ToSocketAddrs}; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 4131783..71245a3 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use crate::future; use crate::io::{self, Read, Write}; -use crate::net::driver::Watcher; +use crate::rt::Watcher; use crate::net::ToSocketAddrs; use crate::task::{spawn_blocking, Context, Poll}; use crate::utils::Context as _; diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 418b4b6..961288a 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -3,8 +3,8 @@ use std::net::SocketAddr; use std::net::{Ipv4Addr, Ipv6Addr}; use crate::future; -use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; +use crate::rt::Watcher; use crate::utils::Context as _; /// A UDP socket. @@ -102,7 +102,7 @@ impl UdpSocket { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use async_std::net::UdpSocket; + /// use async_std::net::UdpSocket; /// /// let socket = UdpSocket::bind("127.0.0.1:0").await?; /// let addr = socket.local_addr()?; diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index fc426b7..5a2d6ec 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -8,7 +8,7 @@ use mio_uds; use super::SocketAddr; use crate::future; use crate::io; -use crate::net::driver::Watcher; +use crate::rt::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; use crate::task::spawn_blocking; diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index 675ef48..9f6bdcb 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -10,7 +10,7 @@ use super::SocketAddr; use super::UnixStream; use crate::future; use crate::io; -use crate::net::driver::Watcher; +use crate::rt::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; use crate::stream::Stream; diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 647edc9..a1c83f1 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -9,7 +9,7 @@ use mio_uds; use super::SocketAddr; use crate::io::{self, Read, Write}; -use crate::net::driver::Watcher; +use crate::rt::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; use crate::task::{spawn_blocking, Context, Poll}; diff --git a/src/rt/mod.rs b/src/rt/mod.rs new file mode 100644 index 0000000..2149d24 --- /dev/null +++ b/src/rt/mod.rs @@ -0,0 +1,23 @@ +//! The runtime. + +use std::thread; + +use once_cell::sync::Lazy; + +use crate::utils::abort_on_panic; + +pub use reactor::{Reactor, Watcher}; +pub use runtime::Runtime; + +mod reactor; +mod runtime; + +/// The global runtime. +pub static RUNTIME: Lazy = Lazy::new(|| { + thread::Builder::new() + .name("async-std/runtime".to_string()) + .spawn(|| abort_on_panic(|| RUNTIME.run())) + .expect("cannot start a runtime thread"); + + Runtime::new() +}); diff --git a/src/net/driver/mod.rs b/src/rt/reactor.rs similarity index 77% rename from src/net/driver/mod.rs rename to src/rt/reactor.rs index 7f33e85..c0046c9 100644 --- a/src/net/driver/mod.rs +++ b/src/rt/reactor.rs @@ -1,13 +1,13 @@ use std::fmt; use std::sync::{Arc, Mutex}; +use std::time::Duration; use mio::{self, Evented}; -use once_cell::sync::Lazy; use slab::Slab; use crate::io; +use crate::rt::RUNTIME; use crate::task::{Context, Poll, Waker}; -use crate::utils::abort_on_panic; /// Data associated with a registered I/O handle. #[derive(Debug)] @@ -18,15 +18,18 @@ struct Entry { /// Tasks that are blocked on reading from this I/O handle. readers: Mutex>, - /// Thasks that are blocked on writing to this I/O handle. + /// Tasks that are blocked on writing to this I/O handle. writers: Mutex>, } /// The state of a networking driver. -struct Reactor { +pub struct Reactor { /// A mio instance that polls for new events. poller: mio::Poll, + /// A list into which mio stores events. + events: Mutex, + /// A collection of registered I/O handles. entries: Mutex>>, @@ -39,12 +42,13 @@ struct Reactor { impl Reactor { /// Creates a new reactor for polling I/O events. - fn new() -> io::Result { + pub fn new() -> io::Result { let poller = mio::Poll::new()?; let notify_reg = mio::Registration::new2(); let mut reactor = Reactor { poller, + events: Mutex::new(mio::Events::with_capacity(1000)), entries: Mutex::new(Slab::new()), notify_reg, notify_token: mio::Token(0), @@ -92,50 +96,32 @@ impl Reactor { Ok(()) } - // fn notify(&self) { - // self.notify_reg - // .1 - // .set_readiness(mio::Ready::readable()) - // .unwrap(); - // } -} + /// Notifies the reactor so that polling stops blocking. + pub fn notify(&self) -> io::Result<()> { + self.notify_reg.1.set_readiness(mio::Ready::readable()) + } -/// The state of the global networking driver. -static REACTOR: Lazy = Lazy::new(|| { - // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O - // handles. - std::thread::Builder::new() - .name("async-std/net".to_string()) - .spawn(move || { - // If the driver thread panics, there's not much we can do. It is not a - // recoverable error and there is no place to propagate it into so we just abort. - abort_on_panic(|| { - main_loop().expect("async networking thread has panicked"); - }) - }) - .expect("cannot start a thread driving blocking tasks"); + /// Waits on the poller for new events and wakes up tasks blocked on I/O handles. + /// + /// Returns `Ok(true)` if at least one new task was woken. + pub fn poll(&self, timeout: Option) -> io::Result { + let mut events = self.events.lock().unwrap(); - Reactor::new().expect("cannot initialize reactor") -}); - -/// Waits on the poller for new events and wakes up tasks blocked on I/O handles. -fn main_loop() -> io::Result<()> { - let reactor = &REACTOR; - let mut events = mio::Events::with_capacity(1000); - - loop { // Block on the poller until at least one new event comes in. - reactor.poller.poll(&mut events, None)?; + self.poller.poll(&mut events, timeout)?; // Lock the entire entry table while we're processing new events. - let entries = reactor.entries.lock().unwrap(); + let entries = self.entries.lock().unwrap(); + + // The number of woken tasks. + let mut progress = false; for event in events.iter() { let token = event.token(); - if token == reactor.notify_token { + if token == self.notify_token { // If this is the notification token, we just need the notification state. - reactor.notify_reg.1.set_readiness(mio::Ready::empty())?; + self.notify_reg.1.set_readiness(mio::Ready::empty())?; } else { // Otherwise, look for the entry associated with this token. if let Some(entry) = entries.get(token.0) { @@ -143,21 +129,27 @@ fn main_loop() -> io::Result<()> { let readiness = event.readiness(); // Wake up reader tasks blocked on this I/O handle. - if !(readiness & reader_interests()).is_empty() { + let reader_interests = mio::Ready::all() - mio::Ready::writable(); + if !(readiness & reader_interests).is_empty() { for w in entry.readers.lock().unwrap().drain(..) { w.wake(); + progress = true; } } // Wake up writer tasks blocked on this I/O handle. - if !(readiness & writer_interests()).is_empty() { + let writer_interests = mio::Ready::all() - mio::Ready::readable(); + if !(readiness & writer_interests).is_empty() { for w in entry.writers.lock().unwrap().drain(..) { w.wake(); + progress = true; } } } } } + + Ok(progress) } } @@ -180,7 +172,8 @@ impl Watcher { /// lifetime of the returned I/O handle. pub fn new(source: T) -> Watcher { Watcher { - entry: REACTOR + entry: RUNTIME + .reactor() .register(&source) .expect("cannot register an I/O event source"), source: Some(source), @@ -264,7 +257,8 @@ impl Watcher { #[allow(dead_code)] pub fn into_inner(mut self) -> T { let source = self.source.take().unwrap(); - REACTOR + RUNTIME + .reactor() .deregister(&source, &self.entry) .expect("cannot deregister I/O event source"); source @@ -274,7 +268,8 @@ impl Watcher { impl Drop for Watcher { fn drop(&mut self) { if let Some(ref source) = self.source { - REACTOR + RUNTIME + .reactor() .deregister(source, &self.entry) .expect("cannot deregister I/O event source"); } @@ -289,27 +284,3 @@ impl fmt::Debug for Watcher { .finish() } } - -/// Returns a mask containing flags that interest tasks reading from I/O handles. -#[inline] -fn reader_interests() -> mio::Ready { - mio::Ready::all() - mio::Ready::writable() -} - -/// Returns a mask containing flags that interest tasks writing into I/O handles. -#[inline] -fn writer_interests() -> mio::Ready { - mio::Ready::writable() | hup() -} - -/// Returns a flag containing the hangup status. -#[inline] -fn hup() -> mio::Ready { - #[cfg(unix)] - let ready = mio::unix::UnixReady::hup().into(); - - #[cfg(not(unix))] - let ready = mio::Ready::empty(); - - ready -} diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs new file mode 100644 index 0000000..35ebe50 --- /dev/null +++ b/src/rt/runtime.rs @@ -0,0 +1,449 @@ +use std::cell::Cell; +use std::io; +use std::iter; +use std::ptr; +use std::sync::atomic::{self, AtomicBool, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + +use crossbeam_deque::{Injector, Steal, Stealer, Worker}; +use crossbeam_utils::thread::scope; +use once_cell::unsync::OnceCell; + +use crate::rt::Reactor; +use crate::task::Runnable; +use crate::utils::{abort_on_panic, random, Spinlock}; + +thread_local! { + /// A reference to the current machine, if the current thread runs tasks. + static MACHINE: OnceCell> = OnceCell::new(); + + /// This flag is set to true whenever `task::yield_now()` is invoked. + static YIELD_NOW: Cell = Cell::new(false); +} + +/// Scheduler state. +struct Scheduler { + /// Set to `true` every time before a machine blocks polling the reactor. + progress: bool, + + /// Set to `true` while a machine is polling the reactor. + polling: bool, + + /// Idle processors. + processors: Vec, + + /// Running machines. + machines: Vec>, +} + +/// An async runtime. +pub struct Runtime { + /// The reactor. + reactor: Reactor, + + /// The global queue of tasks. + injector: Injector, + + /// Handles to local queues for stealing work. + stealers: Vec>, + + /// The scheduler state. + sched: Mutex, +} + +impl Runtime { + /// Creates a new runtime. + pub fn new() -> Runtime { + let cpus = num_cpus::get().max(1); + let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect(); + let stealers = processors.iter().map(|p| p.worker.stealer()).collect(); + + Runtime { + reactor: Reactor::new().unwrap(), + injector: Injector::new(), + stealers, + sched: Mutex::new(Scheduler { + processors, + machines: Vec::new(), + progress: false, + polling: false, + }), + } + } + + /// Returns a reference to the reactor. + pub fn reactor(&self) -> &Reactor { + &self.reactor + } + + /// Flushes the task slot so that tasks get run more fairly. + pub fn yield_now(&self) { + YIELD_NOW.with(|flag| flag.set(true)); + } + + /// Schedules a task. + pub fn schedule(&self, task: Runnable) { + MACHINE.with(|machine| { + // If the current thread is a worker thread, schedule it onto the current machine. + // Otherwise, push it into the global task queue. + match machine.get() { + None => { + self.injector.push(task); + self.notify(); + } + Some(m) => m.schedule(&self, task), + } + }); + } + + /// Runs the runtime on the current thread. + pub fn run(&self) { + scope(|s| { + let mut idle = 0; + let mut delay = 0; + + loop { + // Get a list of new machines to start, if any need to be started. + for m in self.make_machines() { + idle = 0; + + s.builder() + .name("async-std/machine".to_string()) + .spawn(move |_| { + abort_on_panic(|| { + let _ = MACHINE.with(|machine| machine.set(m.clone())); + m.run(self); + }) + }) + .expect("cannot start a machine thread"); + } + + // Sleep for a bit longer if the scheduler state hasn't changed in a while. + if idle > 10 { + delay = (delay * 2).min(10_000); + } else { + idle += 1; + delay = 1000; + } + + thread::sleep(Duration::from_micros(delay)); + } + }) + .unwrap(); + } + + /// Returns a list of machines that need to be started. + fn make_machines(&self) -> Vec> { + let mut sched = self.sched.lock().unwrap(); + let mut to_start = Vec::new(); + + // If there is a machine that is stuck on a task and not making any progress, steal its + // processor and set up a new machine to take over. + for m in &mut sched.machines { + if !m.progress.swap(false, Ordering::SeqCst) { + let opt_p = m.processor.try_lock().and_then(|mut p| p.take()); + + if let Some(p) = opt_p { + *m = Arc::new(Machine::new(p)); + to_start.push(m.clone()); + } + } + } + + // If no machine has been polling the reactor in a while, that means the runtime is + // overloaded with work and we need to start another machine. + if !sched.polling { + if !sched.progress { + if let Some(p) = sched.processors.pop() { + let m = Arc::new(Machine::new(p)); + to_start.push(m.clone()); + sched.machines.push(m); + } + } + + sched.progress = false; + } + + to_start + } + + /// Unparks a thread polling the reactor. + fn notify(&self) { + atomic::fence(Ordering::SeqCst); + self.reactor.notify().unwrap(); + } + + /// Attempts to poll the reactor without blocking on it. + /// + /// Returns `Ok(true)` if at least one new task was woken. + /// + /// This function might not poll the reactor at all so do not rely on it doing anything. Only + /// use for optimization. + fn quick_poll(&self) -> io::Result { + if let Ok(sched) = self.sched.try_lock() { + if !sched.polling { + return self.reactor.poll(Some(Duration::from_secs(0))); + } + } + Ok(false) + } +} + +/// A thread running a processor. +struct Machine { + /// Holds the processor until it gets stolen. + processor: Spinlock>, + + /// Gets set to `true` before running every task to indicate the machine is not stuck. + progress: AtomicBool, +} + +impl Machine { + /// Creates a new machine running a processor. + fn new(p: Processor) -> Machine { + Machine { + processor: Spinlock::new(Some(p)), + progress: AtomicBool::new(true), + } + } + + /// Schedules a task onto the machine. + fn schedule(&self, rt: &Runtime, task: Runnable) { + match self.processor.lock().as_mut() { + None => { + rt.injector.push(task); + rt.notify(); + } + Some(p) => p.schedule(rt, task), + } + } + + /// Finds the next runnable task. + fn find_task(&self, rt: &Runtime) -> Steal { + let mut retry = false; + + // First try finding a task in the local queue or in the global queue. + if let Some(p) = self.processor.lock().as_mut() { + if let Some(task) = p.pop_task() { + return Steal::Success(task); + } + + match p.steal_from_global(rt) { + Steal::Empty => {} + Steal::Retry => retry = true, + Steal::Success(task) => return Steal::Success(task), + } + } + + // Try polling the reactor, but don't block on it. + let progress = rt.quick_poll().unwrap(); + + // Try finding a task in the local queue, which might hold tasks woken by the reactor. If + // the local queue is still empty, try stealing from other processors. + if let Some(p) = self.processor.lock().as_mut() { + if progress { + if let Some(task) = p.pop_task() { + return Steal::Success(task); + } + } + + match p.steal_from_others(rt) { + Steal::Empty => {} + Steal::Retry => retry = true, + Steal::Success(task) => return Steal::Success(task), + } + } + + if retry { Steal::Retry } else { Steal::Empty } + } + + /// Runs the machine on the current thread. + fn run(&self, rt: &Runtime) { + /// Number of yields when no runnable task is found. + const YIELDS: u32 = 3; + /// Number of short sleeps when no runnable task in found. + const SLEEPS: u32 = 10; + /// Number of runs in a row before the global queue is inspected. + const RUNS: u32 = 64; + + // The number of times the thread found work in a row. + let mut runs = 0; + // The number of times the thread didn't find work in a row. + let mut fails = 0; + + loop { + // let the scheduler know this machine is making progress. + self.progress.store(true, Ordering::SeqCst); + + // Check if `task::yield_now()` was invoked and flush the slot if so. + YIELD_NOW.with(|flag| { + if flag.replace(false) { + if let Some(p) = self.processor.lock().as_mut() { + p.flush_slot(rt); + } + } + }); + + // After a number of runs in a row, do some work to ensure no task is left behind + // indefinitely. Poll the reactor, steal tasks from the global queue, and flush the + // task slot. + if runs >= RUNS { + runs = 0; + rt.quick_poll().unwrap(); + + if let Some(p) = self.processor.lock().as_mut() { + if let Steal::Success(task) = p.steal_from_global(rt) { + p.schedule(rt, task); + } + + p.flush_slot(rt); + } + } + + // Try to find a runnable task. + if let Steal::Success(task) = self.find_task(rt) { + task.run(); + runs += 1; + fails = 0; + continue; + } + + fails += 1; + + // Check if the processor was stolen. + if self.processor.lock().is_none() { + break; + } + + // Yield the current thread a few times. + if fails <= YIELDS { + thread::yield_now(); + continue; + } + + // Put the current thread to sleep a few times. + if fails <= YIELDS + SLEEPS { + let opt_p = self.processor.lock().take(); + thread::sleep(Duration::from_micros(10)); + *self.processor.lock() = opt_p; + continue; + } + + let mut sched = rt.sched.lock().unwrap(); + + // One final check for available tasks while the scheduler is locked. + if let Some(task) = iter::repeat_with(|| self.find_task(rt)) + .find(|s| !s.is_retry()) + .and_then(|s| s.success()) + { + self.schedule(rt, task); + continue; + } + + // If another thread is already blocked on the reactor, there is no point in keeping + // the current thread around since there is too little work to do. + if sched.polling { + break; + } + + // Take out the machine associated with the current thread. + let m = match sched + .machines + .iter() + .position(|elem| ptr::eq(&**elem, self)) + { + None => break, // The processor was stolen. + Some(pos) => sched.machines.swap_remove(pos), + }; + + // Unlock the schedule poll the reactor until new I/O events arrive. + sched.polling = true; + drop(sched); + rt.reactor.poll(None).unwrap(); + + // Lock the scheduler again and re-register the machine. + sched = rt.sched.lock().unwrap(); + sched.polling = false; + sched.machines.push(m); + sched.progress = true; + + runs = 0; + fails = 0; + } + + // When shutting down the thread, take the processor out if still available. + let opt_p = self.processor.lock().take(); + + // Return the processor to the scheduler and remove the machine. + if let Some(p) = opt_p { + let mut sched = rt.sched.lock().unwrap(); + sched.processors.push(p); + sched.machines.retain(|elem| !ptr::eq(&**elem, self)); + } + } +} + +struct Processor { + /// The local task queue. + worker: Worker, + + /// Contains the next task to run as an optimization that skips the queue. + slot: Option, +} + +impl Processor { + /// Creates a new processor. + fn new() -> Processor { + Processor { + worker: Worker::new_fifo(), + slot: None, + } + } + + /// Schedules a task to run on this processor. + fn schedule(&mut self, rt: &Runtime, task: Runnable) { + match self.slot.replace(task) { + None => {} + Some(task) => { + self.worker.push(task); + rt.notify(); + } + } + } + + /// Flushes a task from the slot into the local queue. + fn flush_slot(&mut self, rt: &Runtime) { + if let Some(task) = self.slot.take() { + self.worker.push(task); + rt.notify(); + } + } + + /// Pops a task from this processor. + fn pop_task(&mut self) -> Option { + self.slot.take().or_else(|| self.worker.pop()) + } + + /// Steals a task from the global queue. + fn steal_from_global(&mut self, rt: &Runtime) -> Steal { + rt.injector.steal_batch_and_pop(&self.worker) + } + + /// Steals a task from other processors. + fn steal_from_others(&mut self, rt: &Runtime) -> Steal { + // Pick a random starting point in the list of queues. + let len = rt.stealers.len(); + let start = random(len as u32) as usize; + + // Create an iterator over stealers that starts from the chosen point. + let (l, r) = rt.stealers.split_at(start); + let stealers = r.iter().chain(l.iter()); + + // Try stealing a batch of tasks from each queue. + stealers + .map(|s| s.steal_batch_and_pop(&self.worker)) + .collect() + } +} diff --git a/src/task/block_on.rs b/src/task/block_on.rs index 80259c5..4bade5b 100644 --- a/src/task/block_on.rs +++ b/src/task/block_on.rs @@ -3,11 +3,9 @@ use std::future::Future; use std::mem::{self, ManuallyDrop}; use std::sync::Arc; use std::task::{RawWaker, RawWakerVTable}; -use std::thread; use crossbeam_utils::sync::Parker; use kv_log_macro::trace; -use log::log_enabled; use crate::task::{Context, Poll, Task, Waker}; @@ -42,12 +40,10 @@ where let task = Task::new(None); // Log this `block_on` operation. - if log_enabled!(log::Level::Trace) { - trace!("block_on", { - task_id: task.id().0, - parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), - }); - } + trace!("block_on", { + task_id: task.id().0, + parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), + }); let future = async move { // Drop task-locals on exit. @@ -57,13 +53,9 @@ where // Log completion on exit. defer! { - if log_enabled!(log::Level::Trace) { - Task::get_current(|t| { - trace!("completed", { - task_id: t.id().0, - }); - }); - } + trace!("completed", { + task_id: Task::get_current(|t| t.id().0), + }); } future.await @@ -125,7 +117,6 @@ where let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &VTABLE))) }; let cx = &mut Context::from_waker(&waker); - let mut step = 0; loop { if let Poll::Ready(t) = future.as_mut().poll(cx) { // Save the parker for the next invocation of `block`. @@ -133,14 +124,7 @@ where return t; } - // Yield a few times or park the current thread. - if step < 3 { - thread::yield_now(); - step += 1; - } else { - arc_parker.park(); - step = 0; - } + arc_parker.park(); } }) } diff --git a/src/task/builder.rs b/src/task/builder.rs index afd4c2c..f1fef59 100644 --- a/src/task/builder.rs +++ b/src/task/builder.rs @@ -1,9 +1,9 @@ -use kv_log_macro::trace; -use log::log_enabled; use std::future::Future; +use kv_log_macro::trace; + use crate::io; -use crate::task::executor; +use crate::rt::RUNTIME; use crate::task::{JoinHandle, Task}; use crate::utils::abort_on_panic; @@ -37,12 +37,10 @@ impl Builder { let task = Task::new(self.name); // Log this `spawn` operation. - if log_enabled!(log::Level::Trace) { - trace!("spawn", { - task_id: task.id().0, - parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), - }); - } + trace!("spawn", { + task_id: task.id().0, + parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), + }); let future = async move { // Drop task-locals on exit. @@ -52,19 +50,15 @@ impl Builder { // Log completion on exit. defer! { - if log_enabled!(log::Level::Trace) { - Task::get_current(|t| { - trace!("completed", { - task_id: t.id().0, - }); - }); - } + trace!("completed", { + task_id: Task::get_current(|t| t.id().0), + }); } future.await }; - let schedule = move |t| executor::schedule(Runnable(t)); + let schedule = move |t| RUNTIME.schedule(Runnable(t)); let (task, handle) = async_task::spawn(future, schedule, task); task.schedule(); Ok(JoinHandle::new(handle)) @@ -72,7 +66,7 @@ impl Builder { } /// A runnable task. -pub(crate) struct Runnable(async_task::Task); +pub struct Runnable(async_task::Task); impl Runnable { /// Runs the task by polling its future once. diff --git a/src/task/executor/mod.rs b/src/task/executor/mod.rs deleted file mode 100644 index 2a6a696..0000000 --- a/src/task/executor/mod.rs +++ /dev/null @@ -1,13 +0,0 @@ -//! Task executor. -//! -//! API bindings between `crate::task` and this module are very simple: -//! -//! * The only export is the `schedule` function. -//! * The only import is the `crate::task::Runnable` type. - -pub(crate) use pool::schedule; - -use sleepers::Sleepers; - -mod pool; -mod sleepers; diff --git a/src/task/executor/pool.rs b/src/task/executor/pool.rs deleted file mode 100644 index 5249b3d..0000000 --- a/src/task/executor/pool.rs +++ /dev/null @@ -1,179 +0,0 @@ -use std::cell::Cell; -use std::iter; -use std::thread; -use std::time::Duration; - -use crossbeam_deque::{Injector, Stealer, Worker}; -use once_cell::sync::Lazy; -use once_cell::unsync::OnceCell; - -use crate::task::executor::Sleepers; -use crate::task::Runnable; -use crate::utils::{abort_on_panic, random}; - -/// The state of an executor. -struct Pool { - /// The global queue of tasks. - injector: Injector, - - /// Handles to local queues for stealing work from worker threads. - stealers: Vec>, - - /// Used for putting idle workers to sleep and notifying them when new tasks come in. - sleepers: Sleepers, -} - -/// Global executor that runs spawned tasks. -static POOL: Lazy = Lazy::new(|| { - let num_threads = num_cpus::get().max(1); - let mut stealers = Vec::new(); - - // Spawn worker threads. - for _ in 0..num_threads { - let worker = Worker::new_fifo(); - stealers.push(worker.stealer()); - - let proc = Processor { - worker, - slot: Cell::new(None), - slot_runs: Cell::new(0), - }; - - thread::Builder::new() - .name("async-std/executor".to_string()) - .spawn(|| { - let _ = PROCESSOR.with(|p| p.set(proc)); - abort_on_panic(main_loop); - }) - .expect("cannot start a thread driving tasks"); - } - - Pool { - injector: Injector::new(), - stealers, - sleepers: Sleepers::new(), - } -}); - -/// The state of a worker thread. -struct Processor { - /// The local task queue. - worker: Worker, - - /// Contains the next task to run as an optimization that skips queues. - slot: Cell>, - - /// How many times in a row tasks have been taked from the slot rather than the queue. - slot_runs: Cell, -} - -thread_local! { - /// Worker thread state. - static PROCESSOR: OnceCell = OnceCell::new(); -} - -/// Schedules a new runnable task for execution. -pub(crate) fn schedule(task: Runnable) { - PROCESSOR.with(|proc| { - // If the current thread is a worker thread, store it into its task slot or push it into - // its local task queue. Otherwise, push it into the global task queue. - match proc.get() { - Some(proc) => { - // Replace the task in the slot. - if let Some(task) = proc.slot.replace(Some(task)) { - // If the slot already contained a task, push it into the local task queue. - proc.worker.push(task); - POOL.sleepers.notify_one(); - } - } - None => { - POOL.injector.push(task); - POOL.sleepers.notify_one(); - } - } - }) -} - -/// Main loop running a worker thread. -fn main_loop() { - /// Number of yields when no runnable task is found. - const YIELDS: u32 = 3; - /// Number of short sleeps when no runnable task in found. - const SLEEPS: u32 = 1; - - // The number of times the thread didn't find work in a row. - let mut fails = 0; - - loop { - // Try to find a runnable task. - match find_runnable() { - Some(task) => { - fails = 0; - - // Run the found task. - task.run(); - } - None => { - fails += 1; - - // Yield the current thread or put it to sleep. - if fails <= YIELDS { - thread::yield_now(); - } else if fails <= YIELDS + SLEEPS { - thread::sleep(Duration::from_micros(10)); - } else { - POOL.sleepers.wait(); - fails = 0; - } - } - } - } -} - -/// Find the next runnable task. -fn find_runnable() -> Option { - /// Maximum number of times the slot can be used in a row. - const SLOT_LIMIT: u32 = 16; - - PROCESSOR.with(|proc| { - let proc = proc.get().unwrap(); - - // Try taking a task from the slot. - let runs = proc.slot_runs.get(); - if runs < SLOT_LIMIT { - if let Some(task) = proc.slot.take() { - proc.slot_runs.set(runs + 1); - return Some(task); - } - } - proc.slot_runs.set(0); - - // Pop a task from the local queue, if not empty. - proc.worker.pop().or_else(|| { - // Otherwise, we need to look for a task elsewhere. - iter::repeat_with(|| { - // Try stealing a batch of tasks from the global queue. - POOL.injector - .steal_batch_and_pop(&proc.worker) - // Or try stealing a batch of tasks from one of the other threads. - .or_else(|| { - // First, pick a random starting point in the list of local queues. - let len = POOL.stealers.len(); - let start = random(len as u32) as usize; - - // Try stealing a batch of tasks from each local queue starting from the - // chosen point. - let (l, r) = POOL.stealers.split_at(start); - let stealers = r.iter().chain(l.iter()); - stealers - .map(|s| s.steal_batch_and_pop(&proc.worker)) - .collect() - }) - }) - // Loop while no task was stolen and any steal operation needs to be retried. - .find(|s| !s.is_retry()) - // Extract the stolen task, if there is one. - .and_then(|s| s.success()) - }) - }) -} diff --git a/src/task/executor/sleepers.rs b/src/task/executor/sleepers.rs deleted file mode 100644 index 4e70129..0000000 --- a/src/task/executor/sleepers.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Condvar, Mutex}; - -/// The place where worker threads go to sleep. -/// -/// Similar to how thread parking works, if a notification comes up while no threads are sleeping, -/// the next thread that attempts to go to sleep will pick up the notification immediately. -pub struct Sleepers { - /// How many threads are currently a sleep. - sleep: Mutex, - - /// A condvar for notifying sleeping threads. - wake: Condvar, - - /// Set to `true` if a notification came up while nobody was sleeping. - notified: AtomicBool, -} - -impl Sleepers { - /// Creates a new `Sleepers`. - pub fn new() -> Sleepers { - Sleepers { - sleep: Mutex::new(0), - wake: Condvar::new(), - notified: AtomicBool::new(false), - } - } - - /// Puts the current thread to sleep. - pub fn wait(&self) { - let mut sleep = self.sleep.lock().unwrap(); - - if !self.notified.swap(false, Ordering::SeqCst) { - *sleep += 1; - let _ = self.wake.wait(sleep).unwrap(); - } - } - - /// Notifies one thread. - pub fn notify_one(&self) { - if !self.notified.load(Ordering::SeqCst) { - let mut sleep = self.sleep.lock().unwrap(); - - if *sleep > 0 { - *sleep -= 1; - self.wake.notify_one(); - } else { - self.notified.store(true, Ordering::SeqCst); - } - } - } -} diff --git a/src/task/join_handle.rs b/src/task/join_handle.rs index 9fefff2..d929d11 100644 --- a/src/task/join_handle.rs +++ b/src/task/join_handle.rs @@ -14,9 +14,6 @@ use crate::task::{Context, Poll, Task}; #[derive(Debug)] pub struct JoinHandle(async_task::JoinHandle); -unsafe impl Send for JoinHandle {} -unsafe impl Sync for JoinHandle {} - impl JoinHandle { /// Creates a new `JoinHandle`. pub(crate) fn new(inner: async_task::JoinHandle) -> JoinHandle { diff --git a/src/task/mod.rs b/src/task/mod.rs index 8e181d1..075e71d 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -140,13 +140,12 @@ cfg_default! { pub use spawn::spawn; pub use task_local::{AccessError, LocalKey}; - use builder::Runnable; - use task_local::LocalsMap; + pub(crate) use builder::Runnable; + pub(crate) use task_local::LocalsMap; mod block_on; mod builder; mod current; - mod executor; mod join_handle; mod sleep; mod spawn; diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index 578afa4..e22c5cb 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -1,12 +1,4 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::thread; -use std::time::Duration; - -use crossbeam_channel::{unbounded, Receiver, Sender}; -use once_cell::sync::Lazy; - -use crate::task::{JoinHandle, Task}; -use crate::utils::abort_on_panic; +use crate::task::{spawn, JoinHandle}; /// Spawns a blocking task. /// @@ -31,7 +23,8 @@ use crate::utils::abort_on_panic; /// /// task::spawn_blocking(|| { /// println!("long-running task here"); -/// }).await; +/// }) +/// .await; /// # /// # }) /// ``` @@ -42,80 +35,5 @@ where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { - let schedule = |task| POOL.sender.send(task).unwrap(); - let (task, handle) = async_task::spawn(async { f() }, schedule, Task::new(None)); - task.schedule(); - JoinHandle::new(handle) -} - -type Runnable = async_task::Task; - -/// The number of sleeping worker threads. -static SLEEPING: AtomicUsize = AtomicUsize::new(0); - -struct Pool { - sender: Sender, - receiver: Receiver, -} - -static POOL: Lazy = Lazy::new(|| { - // Start a single worker thread waiting for the first task. - start_thread(); - - let (sender, receiver) = unbounded(); - Pool { sender, receiver } -}); - -fn start_thread() { - SLEEPING.fetch_add(1, Ordering::SeqCst); - let timeout = Duration::from_secs(1); - - thread::Builder::new() - .name("async-std/blocking".to_string()) - .spawn(move || { - loop { - let mut task = match POOL.receiver.recv_timeout(timeout) { - Ok(task) => task, - Err(_) => { - // Check whether this is the last sleeping thread. - if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 { - // If so, then restart the thread to make sure there is always at least - // one sleeping thread. - if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 { - continue; - } - } - - // Stop the thread. - return; - } - }; - - // If there are no sleeping threads, then start one to make sure there is always at - // least one sleeping thread. - if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 { - start_thread(); - } - - loop { - // Run the task. - abort_on_panic(|| task.run()); - - // Try taking another task if there are any available. - task = match POOL.receiver.try_recv() { - Ok(task) => task, - Err(_) => break, - }; - } - - // If there is at least one sleeping thread, stop this thread instead of putting it - // to sleep. - if SLEEPING.load(Ordering::SeqCst) > 0 { - return; - } - - SLEEPING.fetch_add(1, Ordering::SeqCst); - } - }) - .expect("cannot start a blocking thread"); + spawn(async { f() }) } diff --git a/src/task/yield_now.rs b/src/task/yield_now.rs index 4030696..c7ddb17 100644 --- a/src/task/yield_now.rs +++ b/src/task/yield_now.rs @@ -1,6 +1,7 @@ -use std::pin::Pin; use std::future::Future; +use std::pin::Pin; +use crate::rt::RUNTIME; use crate::task::{Context, Poll}; /// Cooperatively gives up a timeslice to the task scheduler. @@ -43,6 +44,7 @@ impl Future for YieldNow { if !self.0 { self.0 = true; cx.waker().wake_by_ref(); + RUNTIME.yield_now(); Poll::Pending } else { Poll::Ready(()) diff --git a/src/utils.rs b/src/utils.rs index 7d253b4..79c2fdf 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,9 @@ +use std::cell::UnsafeCell; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicBool, Ordering}; + +use crossbeam_utils::Backoff; + /// Calls a function and aborts if it panics. /// /// This is useful in unsafe code where we can't recover from panics. @@ -52,6 +58,71 @@ pub fn random(n: u32) -> u32 { }) } +/// A simple spinlock. +pub struct Spinlock { + flag: AtomicBool, + value: UnsafeCell, +} + +unsafe impl Send for Spinlock {} +unsafe impl Sync for Spinlock {} + +impl Spinlock { + /// Returns a new spinlock initialized with `value`. + pub const fn new(value: T) -> Spinlock { + Spinlock { + flag: AtomicBool::new(false), + value: UnsafeCell::new(value), + } + } + + /// Locks the spinlock. + pub fn lock(&self) -> SpinlockGuard<'_, T> { + let backoff = Backoff::new(); + while self.flag.swap(true, Ordering::Acquire) { + backoff.snooze(); + } + SpinlockGuard { parent: self } + } + + /// Attempts to lock the spinlock. + pub fn try_lock(&self) -> Option> { + if self.flag.swap(true, Ordering::Acquire) { + None + } else { + Some(SpinlockGuard { parent: self }) + } + } +} + +/// A guard holding a spinlock locked. +pub struct SpinlockGuard<'a, T> { + parent: &'a Spinlock, +} + +unsafe impl Send for SpinlockGuard<'_, T> {} +unsafe impl Sync for SpinlockGuard<'_, T> {} + +impl<'a, T> Drop for SpinlockGuard<'a, T> { + fn drop(&mut self) { + self.parent.flag.store(false, Ordering::Release); + } +} + +impl<'a, T> Deref for SpinlockGuard<'a, T> { + type Target = T; + + fn deref(&self) -> &T { + unsafe { &*self.parent.value.get() } + } +} + +impl<'a, T> DerefMut for SpinlockGuard<'a, T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.parent.value.get() } + } +} + /// Add additional context to errors pub(crate) trait Context { fn context(self, message: impl Fn() -> String) -> Self;