diff --git a/Cargo.toml b/Cargo.toml index 907c1eb..a48cab0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ unstable = [] async-task = "1.0.0" cfg-if = "0.1.9" crossbeam-channel = "0.3.9" +crossbeam-deque = "0.7.1" futures-core-preview = "0.3.0-alpha.18" futures-io-preview = "0.3.0-alpha.18" futures-timer = "0.4.0" diff --git a/src/task/block_on.rs b/src/task/block_on.rs index 0e9a5f4..eec530c 100644 --- a/src/task/block_on.rs +++ b/src/task/block_on.rs @@ -6,11 +6,11 @@ use std::sync::Arc; use std::task::{RawWaker, RawWakerVTable}; use std::thread::{self, Thread}; -use super::pool; +use super::local; use super::task; +use super::worker; use crate::future::Future; use crate::task::{Context, Poll, Waker}; -use crate::utils::abort_on_panic; use kv_log_macro::trace; @@ -58,7 +58,7 @@ where // Log this `block_on` operation. let child_id = tag.task_id().as_u64(); - let parent_id = pool::get_task(|t| t.id().as_u64()).unwrap_or(0); + let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0); trace!("block_on", { parent_id: parent_id, @@ -66,31 +66,28 @@ where }); // Wrap the future into one that drops task-local variables on exit. + let future = local::add_finalizer(future); + let future = async move { let res = future.await; - - // Abort on panic because thread-local variables behave the same way. - abort_on_panic(|| pool::get_task(|task| task.metadata().local_map.clear())); - trace!("block_on completed", { parent_id: parent_id, child_id: child_id, }); - res }; // Pin the future onto the stack. pin_utils::pin_mut!(future); - // Transmute the future into one that is static. + // Transmute the future into one that is futurestatic. let future = mem::transmute::< Pin<&'_ mut dyn Future>, Pin<&'static mut dyn Future>, >(future); // Block on the future and and wait for it to complete. - pool::set_tag(&tag, || block(future)); + worker::set_tag(&tag, || block(future)); // Take out the result. match (*out.get()).take().unwrap() { diff --git a/src/task/builder.rs b/src/task/builder.rs new file mode 100644 index 0000000..630876c --- /dev/null +++ b/src/task/builder.rs @@ -0,0 +1,32 @@ +use super::pool; +use super::JoinHandle; +use crate::future::Future; +use crate::io; + +/// Task builder that configures the settings of a new task. +#[derive(Debug)] +pub struct Builder { + pub(crate) name: Option, +} + +impl Builder { + /// Creates a new builder. + pub fn new() -> Builder { + Builder { name: None } + } + + /// Configures the name of the task. + pub fn name(mut self, name: String) -> Builder { + self.name = Some(name); + self + } + + /// Spawns a task with the configured settings. + pub fn spawn(self, future: F) -> io::Result> + where + F: Future + Send + 'static, + T: Send + 'static, + { + Ok(pool::get().spawn(future, self)) + } +} diff --git a/src/task/local.rs b/src/task/local.rs index 8897696..8347e34 100644 --- a/src/task/local.rs +++ b/src/task/local.rs @@ -6,7 +6,9 @@ use std::sync::Mutex; use lazy_static::lazy_static; -use super::pool; +use super::worker; +use crate::future::Future; +use crate::utils::abort_on_panic; /// Declares task-local values. /// @@ -152,7 +154,7 @@ impl LocalKey { where F: FnOnce(&T) -> R, { - pool::get_task(|task| unsafe { + worker::get_task(|task| unsafe { // Prepare the numeric key, initialization function, and the map of task-locals. let key = self.key(); let init = || Box::new((self.__init)()) as Box; @@ -250,3 +252,15 @@ impl Map { entries.clear(); } } + +// Wrap the future into one that drops task-local variables on exit. +pub(crate) unsafe fn add_finalizer(f: impl Future) -> impl Future { + async move { + let res = f.await; + + // Abort on panic because thread-local variables behave the same way. + abort_on_panic(|| worker::get_task(|task| task.metadata().local_map.clear())); + + res + } +} diff --git a/src/task/mod.rs b/src/task/mod.rs index eef7284..66d8b67 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -25,15 +25,20 @@ pub use std::task::{Context, Poll, Waker}; pub use block_on::block_on; +pub use builder::Builder; pub use local::{AccessError, LocalKey}; -pub use pool::{current, spawn, Builder}; +pub use pool::spawn; pub use sleep::sleep; pub use task::{JoinHandle, Task, TaskId}; +pub use worker::current; mod block_on; +mod builder; mod local; mod pool; mod sleep; +mod sleepers; mod task; +mod worker; pub(crate) mod blocking; diff --git a/src/task/pool.rs b/src/task/pool.rs index 210b862..e36aace 100644 --- a/src/task/pool.rs +++ b/src/task/pool.rs @@ -1,43 +1,18 @@ -use std::cell::Cell; -use std::ptr; +use std::iter; use std::thread; -use crossbeam_channel::{unbounded, Sender}; +use crossbeam_deque::{Injector, Stealer, Worker}; use kv_log_macro::trace; use lazy_static::lazy_static; +use super::local; +use super::sleepers::Sleepers; use super::task; -use super::{JoinHandle, Task}; +use super::worker; +use super::{Builder, JoinHandle}; use crate::future::Future; -use crate::io; use crate::utils::abort_on_panic; -/// Returns a handle to the current task. -/// -/// # Panics -/// -/// This function will panic if not called within the context of a task created by [`block_on`], -/// [`spawn`], or [`Builder::spawn`]. -/// -/// [`block_on`]: fn.block_on.html -/// [`spawn`]: fn.spawn.html -/// [`Builder::spawn`]: struct.Builder.html#method.spawn -/// -/// # Examples -/// -/// ``` -/// # fn main() { async_std::task::block_on(async { -/// # -/// use async_std::task; -/// -/// println!("The name of this task is {:?}", task::current().name()); -/// # -/// # }) } -/// ``` -pub fn current() -> Task { - get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task") -} - /// Spawns a task. /// /// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task. @@ -64,130 +39,100 @@ where F: Future + Send + 'static, T: Send + 'static, { - spawn_with_builder(Builder::new(), future) + Builder::new().spawn(future).expect("cannot spawn future") } -/// Task builder that configures the settings of a new task. -#[derive(Debug)] -pub struct Builder { - pub(crate) name: Option, +pub(crate) struct Pool { + pub injector: Injector, + pub stealers: Vec>, + pub sleepers: Sleepers, } -impl Builder { - /// Creates a new builder. - pub fn new() -> Builder { - Builder { name: None } - } - - /// Configures the name of the task. - pub fn name(mut self, name: String) -> Builder { - self.name = Some(name); - self - } - - /// Spawns a task with the configured settings. - pub fn spawn(self, future: F) -> io::Result> +impl Pool { + /// Spawn a future onto the pool. + pub fn spawn(&self, future: F, builder: Builder) -> JoinHandle where F: Future + Send + 'static, T: Send + 'static, { - Ok(spawn_with_builder(self, future)) - } -} + let tag = task::Tag::new(builder.name); -pub(crate) fn spawn_with_builder(builder: Builder, future: F) -> JoinHandle -where - F: Future + Send + 'static, - T: Send + 'static, -{ - let Builder { name } = builder; + // Log this `spawn` operation. + let child_id = tag.task_id().as_u64(); + let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0); - type Job = async_task::Task; - - lazy_static! { - static ref QUEUE: Sender = { - let (sender, receiver) = unbounded::(); - - for _ in 0..num_cpus::get().max(1) { - let receiver = receiver.clone(); - thread::Builder::new() - .name("async-task-driver".to_string()) - .spawn(|| { - for job in receiver { - set_tag(job.tag(), || abort_on_panic(|| job.run())) - } - }) - .expect("cannot start a thread driving tasks"); - } - - sender - }; - } - - let tag = task::Tag::new(name); - let schedule = |job| QUEUE.send(job).unwrap(); - - // Log this `spawn` operation. - let child_id = tag.task_id().as_u64(); - let parent_id = get_task(|t| t.id().as_u64()).unwrap_or(0); - - trace!("spawn", { - parent_id: parent_id, - child_id: child_id, - }); - - // Wrap the future into one that drops task-local variables on exit. - let future = async move { - let res = future.await; - - // Abort on panic because thread-local variables behave the same way. - abort_on_panic(|| get_task(|task| task.metadata().local_map.clear())); - - trace!("spawn completed", { + trace!("spawn", { parent_id: parent_id, child_id: child_id, }); - res - }; + // Wrap the future into one that drops task-local variables on exit. + let future = unsafe { local::add_finalizer(future) }; - let (task, handle) = async_task::spawn(future, schedule, tag); - task.schedule(); - JoinHandle::new(handle) -} + // Wrap the future into one that logs completion on exit. + let future = async move { + let res = future.await; + trace!("spawn completed", { + parent_id: parent_id, + child_id: child_id, + }); + res + }; -thread_local! { - static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut()); -} - -pub(crate) fn set_tag(tag: *const task::Tag, f: F) -> R -where - F: FnOnce() -> R, -{ - struct ResetTag<'a>(&'a Cell<*const task::Tag>); - - impl Drop for ResetTag<'_> { - fn drop(&mut self) { - self.0.set(ptr::null()); - } + let (task, handle) = async_task::spawn(future, worker::schedule, tag); + task.schedule(); + JoinHandle::new(handle) } - TAG.with(|t| { - t.set(tag); - let _guard = ResetTag(t); - - f() - }) -} - -pub(crate) fn get_task(f: F) -> Option -where - F: FnOnce(&Task) -> R, -{ - let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) }); - - match res { - Ok(Some(val)) => Some(val), - Ok(None) | Err(_) => None, + /// Find the next runnable task to run. + pub fn find_task(&self, local: &Worker) -> Option { + // Pop a task from the local queue, if not empty. + local.pop().or_else(|| { + // Otherwise, we need to look for a task elsewhere. + iter::repeat_with(|| { + // Try stealing a batch of tasks from the injector queue. + self.injector + .steal_batch_and_pop(local) + // Or try stealing a bach of tasks from one of the other threads. + .or_else(|| { + self.stealers + .iter() + .map(|s| s.steal_batch_and_pop(local)) + .collect() + }) + }) + // Loop while no task was stolen and any steal operation needs to be retried. + .find(|s| !s.is_retry()) + // Extract the stolen task, if there is one. + .and_then(|s| s.success()) + }) } } + +#[inline] +pub(crate) fn get() -> &'static Pool { + lazy_static! { + static ref POOL: Pool = { + let num_threads = num_cpus::get().max(1); + let mut stealers = Vec::new(); + + // Spawn worker threads. + for _ in 0..num_threads { + let worker = Worker::new_fifo(); + stealers.push(worker.stealer()); + + thread::Builder::new() + .name("async-task-driver".to_string()) + .spawn(|| abort_on_panic(|| worker::main_loop(worker))) + .expect("cannot start a thread driving tasks"); + } + + Pool { + injector: Injector::new(), + stealers, + sleepers: Sleepers::new(), + } + }; + } + &*POOL +} diff --git a/src/task/sleepers.rs b/src/task/sleepers.rs new file mode 100644 index 0000000..a907175 --- /dev/null +++ b/src/task/sleepers.rs @@ -0,0 +1,52 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::{Condvar, Mutex}; + +/// The place where worker threads go to sleep. +/// +/// Similar to how thread parking works, if a notification comes up while no threads are sleeping, +/// the next thread that attempts to go to sleep will pick up the notification immediately. +pub struct Sleepers { + /// How many threads are currently a sleep. + sleep: Mutex, + + /// A condvar for notifying sleeping threads. + wake: Condvar, + + /// Set to `true` if a notification came up while nobody was sleeping. + notified: AtomicBool, +} + +impl Sleepers { + /// Creates a new `Sleepers`. + pub fn new() -> Sleepers { + Sleepers { + sleep: Mutex::new(0), + wake: Condvar::new(), + notified: AtomicBool::new(false), + } + } + + /// Puts the current thread to sleep. + pub fn wait(&self) { + let mut sleep = self.sleep.lock().unwrap(); + + if self.notified.swap(false, Ordering::SeqCst) == false { + *sleep += 1; + let _ = self.wake.wait(sleep).unwrap(); + } + } + + /// Notifies one thread. + pub fn notify_one(&self) { + if self.notified.load(Ordering::SeqCst) == false { + let mut sleep = self.sleep.lock().unwrap(); + + if *sleep > 0 { + *sleep -= 1; + self.wake.notify_one(); + } else { + self.notified.store(true, Ordering::SeqCst); + } + } + } +} diff --git a/src/task/task.rs b/src/task/task.rs index 8ffe821..c86e008 100644 --- a/src/task/task.rs +++ b/src/task/task.rs @@ -133,6 +133,8 @@ impl fmt::Display for TaskId { } } +pub(crate) type Runnable = async_task::Task; + pub(crate) struct Metadata { pub task_id: TaskId, pub name: Option, diff --git a/src/task/worker.rs b/src/task/worker.rs new file mode 100644 index 0000000..fc2a6e7 --- /dev/null +++ b/src/task/worker.rs @@ -0,0 +1,110 @@ +use std::cell::Cell; +use std::ptr; + +use crossbeam_deque::Worker; + +use super::pool; +use super::task; +use super::Task; +use crate::utils::abort_on_panic; + +/// Returns a handle to the current task. +/// +/// # Panics +/// +/// This function will panic if not called within the context of a task created by [`block_on`], +/// [`spawn`], or [`Builder::spawn`]. +/// +/// [`block_on`]: fn.block_on.html +/// [`spawn`]: fn.spawn.html +/// [`Builder::spawn`]: struct.Builder.html#method.spawn +/// +/// # Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::task; +/// +/// println!("The name of this task is {:?}", task::current().name()); +/// # +/// # }) } +/// ``` +pub fn current() -> Task { + get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task") +} + +thread_local! { + static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut()); +} + +pub(crate) fn set_tag(tag: *const task::Tag, f: F) -> R +where + F: FnOnce() -> R, +{ + struct ResetTag<'a>(&'a Cell<*const task::Tag>); + + impl Drop for ResetTag<'_> { + fn drop(&mut self) { + self.0.set(ptr::null()); + } + } + + TAG.with(|t| { + t.set(tag); + let _guard = ResetTag(t); + + f() + }) +} + +pub(crate) fn get_task(f: F) -> Option +where + F: FnOnce(&Task) -> R, +{ + let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) }); + + match res { + Ok(Some(val)) => Some(val), + Ok(None) | Err(_) => None, + } +} + +thread_local! { + static IS_WORKER: Cell = Cell::new(false); + static QUEUE: Cell>> = Cell::new(None); +} + +pub(crate) fn is_worker() -> bool { + IS_WORKER.with(|is_worker| is_worker.get()) +} + +fn get_queue) -> T, T>(f: F) -> T { + QUEUE.with(|queue| { + let q = queue.take().unwrap(); + let ret = f(&q); + queue.set(Some(q)); + ret + }) +} + +pub(crate) fn schedule(task: task::Runnable) { + if is_worker() { + get_queue(|q| q.push(task)); + } else { + pool::get().injector.push(task); + } + pool::get().sleepers.notify_one(); +} + +pub(crate) fn main_loop(worker: Worker) { + IS_WORKER.with(|is_worker| is_worker.set(true)); + QUEUE.with(|queue| queue.set(Some(worker))); + + loop { + match get_queue(|q| pool::get().find_task(q)) { + Some(task) => set_tag(task.tag(), || abort_on_panic(|| task.run())), + None => pool::get().sleepers.wait(), + } + } +}