Implement simple work stealing

pull/205/head
Stjepan Glavina 5 years ago
parent dd92d8dc61
commit 0924911ac3

@ -28,6 +28,7 @@ unstable = []
async-task = "1.0.0"
cfg-if = "0.1.9"
crossbeam-channel = "0.3.9"
crossbeam-deque = "0.7.1"
futures-core-preview = "0.3.0-alpha.18"
futures-io-preview = "0.3.0-alpha.18"
futures-timer = "0.4.0"

@ -6,11 +6,11 @@ use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable};
use std::thread::{self, Thread};
use super::pool;
use super::local;
use super::task;
use super::worker;
use crate::future::Future;
use crate::task::{Context, Poll, Waker};
use crate::utils::abort_on_panic;
use kv_log_macro::trace;
@ -58,7 +58,7 @@ where
// Log this `block_on` operation.
let child_id = tag.task_id().as_u64();
let parent_id = pool::get_task(|t| t.id().as_u64()).unwrap_or(0);
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
trace!("block_on", {
parent_id: parent_id,
@ -66,31 +66,28 @@ where
});
// Wrap the future into one that drops task-local variables on exit.
let future = local::add_finalizer(future);
let future = async move {
let res = future.await;
// Abort on panic because thread-local variables behave the same way.
abort_on_panic(|| pool::get_task(|task| task.metadata().local_map.clear()));
trace!("block_on completed", {
parent_id: parent_id,
child_id: child_id,
});
res
};
// Pin the future onto the stack.
pin_utils::pin_mut!(future);
// Transmute the future into one that is static.
// Transmute the future into one that is futurestatic.
let future = mem::transmute::<
Pin<&'_ mut dyn Future<Output = ()>>,
Pin<&'static mut dyn Future<Output = ()>>,
>(future);
// Block on the future and and wait for it to complete.
pool::set_tag(&tag, || block(future));
worker::set_tag(&tag, || block(future));
// Take out the result.
match (*out.get()).take().unwrap() {

@ -0,0 +1,32 @@
use super::pool;
use super::JoinHandle;
use crate::future::Future;
use crate::io;
/// Task builder that configures the settings of a new task.
#[derive(Debug)]
pub struct Builder {
pub(crate) name: Option<String>,
}
impl Builder {
/// Creates a new builder.
pub fn new() -> Builder {
Builder { name: None }
}
/// Configures the name of the task.
pub fn name(mut self, name: String) -> Builder {
self.name = Some(name);
self
}
/// Spawns a task with the configured settings.
pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Ok(pool::get().spawn(future, self))
}
}

@ -6,7 +6,9 @@ use std::sync::Mutex;
use lazy_static::lazy_static;
use super::pool;
use super::worker;
use crate::future::Future;
use crate::utils::abort_on_panic;
/// Declares task-local values.
///
@ -152,7 +154,7 @@ impl<T: Send + 'static> LocalKey<T> {
where
F: FnOnce(&T) -> R,
{
pool::get_task(|task| unsafe {
worker::get_task(|task| unsafe {
// Prepare the numeric key, initialization function, and the map of task-locals.
let key = self.key();
let init = || Box::new((self.__init)()) as Box<dyn Send>;
@ -250,3 +252,15 @@ impl Map {
entries.clear();
}
}
// Wrap the future into one that drops task-local variables on exit.
pub(crate) unsafe fn add_finalizer<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
async move {
let res = f.await;
// Abort on panic because thread-local variables behave the same way.
abort_on_panic(|| worker::get_task(|task| task.metadata().local_map.clear()));
res
}
}

@ -25,15 +25,20 @@
pub use std::task::{Context, Poll, Waker};
pub use block_on::block_on;
pub use builder::Builder;
pub use local::{AccessError, LocalKey};
pub use pool::{current, spawn, Builder};
pub use pool::spawn;
pub use sleep::sleep;
pub use task::{JoinHandle, Task, TaskId};
pub use worker::current;
mod block_on;
mod builder;
mod local;
mod pool;
mod sleep;
mod sleepers;
mod task;
mod worker;
pub(crate) mod blocking;

@ -1,43 +1,18 @@
use std::cell::Cell;
use std::ptr;
use std::iter;
use std::thread;
use crossbeam_channel::{unbounded, Sender};
use crossbeam_deque::{Injector, Stealer, Worker};
use kv_log_macro::trace;
use lazy_static::lazy_static;
use super::local;
use super::sleepers::Sleepers;
use super::task;
use super::{JoinHandle, Task};
use super::worker;
use super::{Builder, JoinHandle};
use crate::future::Future;
use crate::io;
use crate::utils::abort_on_panic;
/// Returns a handle to the current task.
///
/// # Panics
///
/// This function will panic if not called within the context of a task created by [`block_on`],
/// [`spawn`], or [`Builder::spawn`].
///
/// [`block_on`]: fn.block_on.html
/// [`spawn`]: fn.spawn.html
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
///
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// println!("The name of this task is {:?}", task::current().name());
/// #
/// # }) }
/// ```
pub fn current() -> Task {
get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task")
}
/// Spawns a task.
///
/// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task.
@ -64,130 +39,100 @@ where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
spawn_with_builder(Builder::new(), future)
Builder::new().spawn(future).expect("cannot spawn future")
}
/// Task builder that configures the settings of a new task.
#[derive(Debug)]
pub struct Builder {
pub(crate) name: Option<String>,
pub(crate) struct Pool {
pub injector: Injector<task::Runnable>,
pub stealers: Vec<Stealer<task::Runnable>>,
pub sleepers: Sleepers,
}
impl Builder {
/// Creates a new builder.
pub fn new() -> Builder {
Builder { name: None }
}
/// Configures the name of the task.
pub fn name(mut self, name: String) -> Builder {
self.name = Some(name);
self
}
/// Spawns a task with the configured settings.
pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
impl Pool {
/// Spawn a future onto the pool.
pub fn spawn<F, T>(&self, future: F, builder: Builder) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Ok(spawn_with_builder(self, future))
}
}
pub(crate) fn spawn_with_builder<F, T>(builder: Builder, future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let Builder { name } = builder;
type Job = async_task::Task<task::Tag>;
let tag = task::Tag::new(builder.name);
lazy_static! {
static ref QUEUE: Sender<Job> = {
let (sender, receiver) = unbounded::<Job>();
// Log this `spawn` operation.
let child_id = tag.task_id().as_u64();
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| {
for job in receiver {
set_tag(job.tag(), || abort_on_panic(|| job.run()))
}
})
.expect("cannot start a thread driving tasks");
}
sender
};
}
let tag = task::Tag::new(name);
let schedule = |job| QUEUE.send(job).unwrap();
// Log this `spawn` operation.
let child_id = tag.task_id().as_u64();
let parent_id = get_task(|t| t.id().as_u64()).unwrap_or(0);
trace!("spawn", {
parent_id: parent_id,
child_id: child_id,
});
// Wrap the future into one that drops task-local variables on exit.
let future = async move {
let res = future.await;
// Abort on panic because thread-local variables behave the same way.
abort_on_panic(|| get_task(|task| task.metadata().local_map.clear()));
trace!("spawn completed", {
trace!("spawn", {
parent_id: parent_id,
child_id: child_id,
});
res
};
let (task, handle) = async_task::spawn(future, schedule, tag);
task.schedule();
JoinHandle::new(handle)
}
thread_local! {
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
}
// Wrap the future into one that drops task-local variables on exit.
let future = unsafe { local::add_finalizer(future) };
// Wrap the future into one that logs completion on exit.
let future = async move {
let res = future.await;
trace!("spawn completed", {
parent_id: parent_id,
child_id: child_id,
});
res
};
pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
where
F: FnOnce() -> R,
{
struct ResetTag<'a>(&'a Cell<*const task::Tag>);
let (task, handle) = async_task::spawn(future, worker::schedule, tag);
task.schedule();
JoinHandle::new(handle)
}
impl Drop for ResetTag<'_> {
fn drop(&mut self) {
self.0.set(ptr::null());
}
/// Find the next runnable task to run.
pub fn find_task(&self, local: &Worker<task::Runnable>) -> Option<task::Runnable> {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the injector queue.
self.injector
.steal_batch_and_pop(local)
// Or try stealing a bach of tasks from one of the other threads.
.or_else(|| {
self.stealers
.iter()
.map(|s| s.steal_batch_and_pop(local))
.collect()
})
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}
}
TAG.with(|t| {
t.set(tag);
let _guard = ResetTag(t);
#[inline]
pub(crate) fn get() -> &'static Pool {
lazy_static! {
static ref POOL: Pool = {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
f()
})
}
// Spawn worker threads.
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
pub(crate) fn get_task<F, R>(f: F) -> Option<R>
where
F: FnOnce(&Task) -> R,
{
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| abort_on_panic(|| worker::main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
};
}
&*POOL
}

@ -0,0 +1,52 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex};
/// The place where worker threads go to sleep.
///
/// Similar to how thread parking works, if a notification comes up while no threads are sleeping,
/// the next thread that attempts to go to sleep will pick up the notification immediately.
pub struct Sleepers {
/// How many threads are currently a sleep.
sleep: Mutex<usize>,
/// A condvar for notifying sleeping threads.
wake: Condvar,
/// Set to `true` if a notification came up while nobody was sleeping.
notified: AtomicBool,
}
impl Sleepers {
/// Creates a new `Sleepers`.
pub fn new() -> Sleepers {
Sleepers {
sleep: Mutex::new(0),
wake: Condvar::new(),
notified: AtomicBool::new(false),
}
}
/// Puts the current thread to sleep.
pub fn wait(&self) {
let mut sleep = self.sleep.lock().unwrap();
if self.notified.swap(false, Ordering::SeqCst) == false {
*sleep += 1;
let _ = self.wake.wait(sleep).unwrap();
}
}
/// Notifies one thread.
pub fn notify_one(&self) {
if self.notified.load(Ordering::SeqCst) == false {
let mut sleep = self.sleep.lock().unwrap();
if *sleep > 0 {
*sleep -= 1;
self.wake.notify_one();
} else {
self.notified.store(true, Ordering::SeqCst);
}
}
}
}

@ -133,6 +133,8 @@ impl fmt::Display for TaskId {
}
}
pub(crate) type Runnable = async_task::Task<Tag>;
pub(crate) struct Metadata {
pub task_id: TaskId,
pub name: Option<String>,

@ -0,0 +1,110 @@
use std::cell::Cell;
use std::ptr;
use crossbeam_deque::Worker;
use super::pool;
use super::task;
use super::Task;
use crate::utils::abort_on_panic;
/// Returns a handle to the current task.
///
/// # Panics
///
/// This function will panic if not called within the context of a task created by [`block_on`],
/// [`spawn`], or [`Builder::spawn`].
///
/// [`block_on`]: fn.block_on.html
/// [`spawn`]: fn.spawn.html
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
///
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// println!("The name of this task is {:?}", task::current().name());
/// #
/// # }) }
/// ```
pub fn current() -> Task {
get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task")
}
thread_local! {
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
}
pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
where
F: FnOnce() -> R,
{
struct ResetTag<'a>(&'a Cell<*const task::Tag>);
impl Drop for ResetTag<'_> {
fn drop(&mut self) {
self.0.set(ptr::null());
}
}
TAG.with(|t| {
t.set(tag);
let _guard = ResetTag(t);
f()
})
}
pub(crate) fn get_task<F, R>(f: F) -> Option<R>
where
F: FnOnce(&Task) -> R,
{
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
}
}
thread_local! {
static IS_WORKER: Cell<bool> = Cell::new(false);
static QUEUE: Cell<Option<Worker<task::Runnable>>> = Cell::new(None);
}
pub fn is_worker() -> bool {
IS_WORKER.with(|is_worker| is_worker.get())
}
fn get_queue<F: FnOnce(&Worker<task::Runnable>) -> T, T>(f: F) -> T {
QUEUE.with(|queue| {
let q = queue.take().unwrap();
let ret = f(&q);
queue.set(Some(q));
ret
})
}
pub(crate) fn schedule(task: task::Runnable) {
if is_worker() {
get_queue(|q| q.push(task));
} else {
pool::get().injector.push(task);
}
pool::get().sleepers.notify_one();
}
pub(crate) fn main_loop(worker: Worker<task::Runnable>) {
IS_WORKER.with(|is_worker| is_worker.set(true));
QUEUE.with(|queue| queue.set(Some(worker)));
loop {
match get_queue(|q| pool::get().find_task(q)) {
Some(task) => set_tag(task.tag(), || abort_on_panic(|| task.run())),
None => pool::get().sleepers.wait(),
}
}
}
Loading…
Cancel
Save