forked from mirror/async-std
Merge #205
205: Implement simple work stealing r=yoshuawuyts a=stjepang This is our first version of a work-stealing scheduler. We won't stop here, there is still lots of room for improvement. Co-authored-by: Stjepan Glavina <stjepang@gmail.com>
This commit is contained in:
commit
af6ed7d0ee
9 changed files with 309 additions and 151 deletions
|
@ -28,6 +28,7 @@ unstable = []
|
||||||
async-task = "1.0.0"
|
async-task = "1.0.0"
|
||||||
cfg-if = "0.1.9"
|
cfg-if = "0.1.9"
|
||||||
crossbeam-channel = "0.3.9"
|
crossbeam-channel = "0.3.9"
|
||||||
|
crossbeam-deque = "0.7.1"
|
||||||
futures-core-preview = "0.3.0-alpha.18"
|
futures-core-preview = "0.3.0-alpha.18"
|
||||||
futures-io-preview = "0.3.0-alpha.18"
|
futures-io-preview = "0.3.0-alpha.18"
|
||||||
futures-timer = "0.4.0"
|
futures-timer = "0.4.0"
|
||||||
|
|
|
@ -6,11 +6,11 @@ use std::sync::Arc;
|
||||||
use std::task::{RawWaker, RawWakerVTable};
|
use std::task::{RawWaker, RawWakerVTable};
|
||||||
use std::thread::{self, Thread};
|
use std::thread::{self, Thread};
|
||||||
|
|
||||||
use super::pool;
|
use super::local;
|
||||||
use super::task;
|
use super::task;
|
||||||
|
use super::worker;
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::task::{Context, Poll, Waker};
|
use crate::task::{Context, Poll, Waker};
|
||||||
use crate::utils::abort_on_panic;
|
|
||||||
|
|
||||||
use kv_log_macro::trace;
|
use kv_log_macro::trace;
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ where
|
||||||
|
|
||||||
// Log this `block_on` operation.
|
// Log this `block_on` operation.
|
||||||
let child_id = tag.task_id().as_u64();
|
let child_id = tag.task_id().as_u64();
|
||||||
let parent_id = pool::get_task(|t| t.id().as_u64()).unwrap_or(0);
|
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
|
||||||
|
|
||||||
trace!("block_on", {
|
trace!("block_on", {
|
||||||
parent_id: parent_id,
|
parent_id: parent_id,
|
||||||
|
@ -66,31 +66,28 @@ where
|
||||||
});
|
});
|
||||||
|
|
||||||
// Wrap the future into one that drops task-local variables on exit.
|
// Wrap the future into one that drops task-local variables on exit.
|
||||||
|
let future = local::add_finalizer(future);
|
||||||
|
|
||||||
let future = async move {
|
let future = async move {
|
||||||
let res = future.await;
|
let res = future.await;
|
||||||
|
|
||||||
// Abort on panic because thread-local variables behave the same way.
|
|
||||||
abort_on_panic(|| pool::get_task(|task| task.metadata().local_map.clear()));
|
|
||||||
|
|
||||||
trace!("block_on completed", {
|
trace!("block_on completed", {
|
||||||
parent_id: parent_id,
|
parent_id: parent_id,
|
||||||
child_id: child_id,
|
child_id: child_id,
|
||||||
});
|
});
|
||||||
|
|
||||||
res
|
res
|
||||||
};
|
};
|
||||||
|
|
||||||
// Pin the future onto the stack.
|
// Pin the future onto the stack.
|
||||||
pin_utils::pin_mut!(future);
|
pin_utils::pin_mut!(future);
|
||||||
|
|
||||||
// Transmute the future into one that is static.
|
// Transmute the future into one that is futurestatic.
|
||||||
let future = mem::transmute::<
|
let future = mem::transmute::<
|
||||||
Pin<&'_ mut dyn Future<Output = ()>>,
|
Pin<&'_ mut dyn Future<Output = ()>>,
|
||||||
Pin<&'static mut dyn Future<Output = ()>>,
|
Pin<&'static mut dyn Future<Output = ()>>,
|
||||||
>(future);
|
>(future);
|
||||||
|
|
||||||
// Block on the future and and wait for it to complete.
|
// Block on the future and and wait for it to complete.
|
||||||
pool::set_tag(&tag, || block(future));
|
worker::set_tag(&tag, || block(future));
|
||||||
|
|
||||||
// Take out the result.
|
// Take out the result.
|
||||||
match (*out.get()).take().unwrap() {
|
match (*out.get()).take().unwrap() {
|
||||||
|
|
32
src/task/builder.rs
Normal file
32
src/task/builder.rs
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
use super::pool;
|
||||||
|
use super::JoinHandle;
|
||||||
|
use crate::future::Future;
|
||||||
|
use crate::io;
|
||||||
|
|
||||||
|
/// Task builder that configures the settings of a new task.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Builder {
|
||||||
|
pub(crate) name: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Builder {
|
||||||
|
/// Creates a new builder.
|
||||||
|
pub fn new() -> Builder {
|
||||||
|
Builder { name: None }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configures the name of the task.
|
||||||
|
pub fn name(mut self, name: String) -> Builder {
|
||||||
|
self.name = Some(name);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Spawns a task with the configured settings.
|
||||||
|
pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
|
||||||
|
where
|
||||||
|
F: Future<Output = T> + Send + 'static,
|
||||||
|
T: Send + 'static,
|
||||||
|
{
|
||||||
|
Ok(pool::get().spawn(future, self))
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,7 +6,9 @@ use std::sync::Mutex;
|
||||||
|
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
use super::pool;
|
use super::worker;
|
||||||
|
use crate::future::Future;
|
||||||
|
use crate::utils::abort_on_panic;
|
||||||
|
|
||||||
/// Declares task-local values.
|
/// Declares task-local values.
|
||||||
///
|
///
|
||||||
|
@ -152,7 +154,7 @@ impl<T: Send + 'static> LocalKey<T> {
|
||||||
where
|
where
|
||||||
F: FnOnce(&T) -> R,
|
F: FnOnce(&T) -> R,
|
||||||
{
|
{
|
||||||
pool::get_task(|task| unsafe {
|
worker::get_task(|task| unsafe {
|
||||||
// Prepare the numeric key, initialization function, and the map of task-locals.
|
// Prepare the numeric key, initialization function, and the map of task-locals.
|
||||||
let key = self.key();
|
let key = self.key();
|
||||||
let init = || Box::new((self.__init)()) as Box<dyn Send>;
|
let init = || Box::new((self.__init)()) as Box<dyn Send>;
|
||||||
|
@ -250,3 +252,15 @@ impl Map {
|
||||||
entries.clear();
|
entries.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wrap the future into one that drops task-local variables on exit.
|
||||||
|
pub(crate) unsafe fn add_finalizer<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
|
||||||
|
async move {
|
||||||
|
let res = f.await;
|
||||||
|
|
||||||
|
// Abort on panic because thread-local variables behave the same way.
|
||||||
|
abort_on_panic(|| worker::get_task(|task| task.metadata().local_map.clear()));
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -25,15 +25,20 @@
|
||||||
pub use std::task::{Context, Poll, Waker};
|
pub use std::task::{Context, Poll, Waker};
|
||||||
|
|
||||||
pub use block_on::block_on;
|
pub use block_on::block_on;
|
||||||
|
pub use builder::Builder;
|
||||||
pub use local::{AccessError, LocalKey};
|
pub use local::{AccessError, LocalKey};
|
||||||
pub use pool::{current, spawn, Builder};
|
pub use pool::spawn;
|
||||||
pub use sleep::sleep;
|
pub use sleep::sleep;
|
||||||
pub use task::{JoinHandle, Task, TaskId};
|
pub use task::{JoinHandle, Task, TaskId};
|
||||||
|
pub use worker::current;
|
||||||
|
|
||||||
mod block_on;
|
mod block_on;
|
||||||
|
mod builder;
|
||||||
mod local;
|
mod local;
|
||||||
mod pool;
|
mod pool;
|
||||||
mod sleep;
|
mod sleep;
|
||||||
|
mod sleepers;
|
||||||
mod task;
|
mod task;
|
||||||
|
mod worker;
|
||||||
|
|
||||||
pub(crate) mod blocking;
|
pub(crate) mod blocking;
|
||||||
|
|
221
src/task/pool.rs
221
src/task/pool.rs
|
@ -1,43 +1,18 @@
|
||||||
use std::cell::Cell;
|
use std::iter;
|
||||||
use std::ptr;
|
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
use crossbeam_channel::{unbounded, Sender};
|
use crossbeam_deque::{Injector, Stealer, Worker};
|
||||||
use kv_log_macro::trace;
|
use kv_log_macro::trace;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
|
use super::local;
|
||||||
|
use super::sleepers::Sleepers;
|
||||||
use super::task;
|
use super::task;
|
||||||
use super::{JoinHandle, Task};
|
use super::worker;
|
||||||
|
use super::{Builder, JoinHandle};
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::io;
|
|
||||||
use crate::utils::abort_on_panic;
|
use crate::utils::abort_on_panic;
|
||||||
|
|
||||||
/// Returns a handle to the current task.
|
|
||||||
///
|
|
||||||
/// # Panics
|
|
||||||
///
|
|
||||||
/// This function will panic if not called within the context of a task created by [`block_on`],
|
|
||||||
/// [`spawn`], or [`Builder::spawn`].
|
|
||||||
///
|
|
||||||
/// [`block_on`]: fn.block_on.html
|
|
||||||
/// [`spawn`]: fn.spawn.html
|
|
||||||
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```
|
|
||||||
/// # fn main() { async_std::task::block_on(async {
|
|
||||||
/// #
|
|
||||||
/// use async_std::task;
|
|
||||||
///
|
|
||||||
/// println!("The name of this task is {:?}", task::current().name());
|
|
||||||
/// #
|
|
||||||
/// # }) }
|
|
||||||
/// ```
|
|
||||||
pub fn current() -> Task {
|
|
||||||
get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task")
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawns a task.
|
/// Spawns a task.
|
||||||
///
|
///
|
||||||
/// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task.
|
/// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task.
|
||||||
|
@ -64,130 +39,100 @@ where
|
||||||
F: Future<Output = T> + Send + 'static,
|
F: Future<Output = T> + Send + 'static,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
spawn_with_builder(Builder::new(), future)
|
Builder::new().spawn(future).expect("cannot spawn future")
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Task builder that configures the settings of a new task.
|
pub(crate) struct Pool {
|
||||||
#[derive(Debug)]
|
pub injector: Injector<task::Runnable>,
|
||||||
pub struct Builder {
|
pub stealers: Vec<Stealer<task::Runnable>>,
|
||||||
pub(crate) name: Option<String>,
|
pub sleepers: Sleepers,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Builder {
|
impl Pool {
|
||||||
/// Creates a new builder.
|
/// Spawn a future onto the pool.
|
||||||
pub fn new() -> Builder {
|
pub fn spawn<F, T>(&self, future: F, builder: Builder) -> JoinHandle<T>
|
||||||
Builder { name: None }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Configures the name of the task.
|
|
||||||
pub fn name(mut self, name: String) -> Builder {
|
|
||||||
self.name = Some(name);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Spawns a task with the configured settings.
|
|
||||||
pub fn spawn<F, T>(self, future: F) -> io::Result<JoinHandle<T>>
|
|
||||||
where
|
where
|
||||||
F: Future<Output = T> + Send + 'static,
|
F: Future<Output = T> + Send + 'static,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
Ok(spawn_with_builder(self, future))
|
let tag = task::Tag::new(builder.name);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn spawn_with_builder<F, T>(builder: Builder, future: F) -> JoinHandle<T>
|
// Log this `spawn` operation.
|
||||||
where
|
let child_id = tag.task_id().as_u64();
|
||||||
F: Future<Output = T> + Send + 'static,
|
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
|
||||||
T: Send + 'static,
|
|
||||||
{
|
|
||||||
let Builder { name } = builder;
|
|
||||||
|
|
||||||
type Job = async_task::Task<task::Tag>;
|
trace!("spawn", {
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
static ref QUEUE: Sender<Job> = {
|
|
||||||
let (sender, receiver) = unbounded::<Job>();
|
|
||||||
|
|
||||||
for _ in 0..num_cpus::get().max(1) {
|
|
||||||
let receiver = receiver.clone();
|
|
||||||
thread::Builder::new()
|
|
||||||
.name("async-task-driver".to_string())
|
|
||||||
.spawn(|| {
|
|
||||||
for job in receiver {
|
|
||||||
set_tag(job.tag(), || abort_on_panic(|| job.run()))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.expect("cannot start a thread driving tasks");
|
|
||||||
}
|
|
||||||
|
|
||||||
sender
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
let tag = task::Tag::new(name);
|
|
||||||
let schedule = |job| QUEUE.send(job).unwrap();
|
|
||||||
|
|
||||||
// Log this `spawn` operation.
|
|
||||||
let child_id = tag.task_id().as_u64();
|
|
||||||
let parent_id = get_task(|t| t.id().as_u64()).unwrap_or(0);
|
|
||||||
|
|
||||||
trace!("spawn", {
|
|
||||||
parent_id: parent_id,
|
|
||||||
child_id: child_id,
|
|
||||||
});
|
|
||||||
|
|
||||||
// Wrap the future into one that drops task-local variables on exit.
|
|
||||||
let future = async move {
|
|
||||||
let res = future.await;
|
|
||||||
|
|
||||||
// Abort on panic because thread-local variables behave the same way.
|
|
||||||
abort_on_panic(|| get_task(|task| task.metadata().local_map.clear()));
|
|
||||||
|
|
||||||
trace!("spawn completed", {
|
|
||||||
parent_id: parent_id,
|
parent_id: parent_id,
|
||||||
child_id: child_id,
|
child_id: child_id,
|
||||||
});
|
});
|
||||||
|
|
||||||
res
|
// Wrap the future into one that drops task-local variables on exit.
|
||||||
};
|
let future = unsafe { local::add_finalizer(future) };
|
||||||
|
|
||||||
let (task, handle) = async_task::spawn(future, schedule, tag);
|
// Wrap the future into one that logs completion on exit.
|
||||||
task.schedule();
|
let future = async move {
|
||||||
JoinHandle::new(handle)
|
let res = future.await;
|
||||||
}
|
trace!("spawn completed", {
|
||||||
|
parent_id: parent_id,
|
||||||
|
child_id: child_id,
|
||||||
|
});
|
||||||
|
res
|
||||||
|
};
|
||||||
|
|
||||||
thread_local! {
|
let (task, handle) = async_task::spawn(future, worker::schedule, tag);
|
||||||
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
|
task.schedule();
|
||||||
}
|
JoinHandle::new(handle)
|
||||||
|
|
||||||
pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
|
|
||||||
where
|
|
||||||
F: FnOnce() -> R,
|
|
||||||
{
|
|
||||||
struct ResetTag<'a>(&'a Cell<*const task::Tag>);
|
|
||||||
|
|
||||||
impl Drop for ResetTag<'_> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.0.set(ptr::null());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TAG.with(|t| {
|
/// Find the next runnable task to run.
|
||||||
t.set(tag);
|
pub fn find_task(&self, local: &Worker<task::Runnable>) -> Option<task::Runnable> {
|
||||||
let _guard = ResetTag(t);
|
// Pop a task from the local queue, if not empty.
|
||||||
|
local.pop().or_else(|| {
|
||||||
f()
|
// Otherwise, we need to look for a task elsewhere.
|
||||||
})
|
iter::repeat_with(|| {
|
||||||
}
|
// Try stealing a batch of tasks from the injector queue.
|
||||||
|
self.injector
|
||||||
pub(crate) fn get_task<F, R>(f: F) -> Option<R>
|
.steal_batch_and_pop(local)
|
||||||
where
|
// Or try stealing a bach of tasks from one of the other threads.
|
||||||
F: FnOnce(&Task) -> R,
|
.or_else(|| {
|
||||||
{
|
self.stealers
|
||||||
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
|
.iter()
|
||||||
|
.map(|s| s.steal_batch_and_pop(local))
|
||||||
match res {
|
.collect()
|
||||||
Ok(Some(val)) => Some(val),
|
})
|
||||||
Ok(None) | Err(_) => None,
|
})
|
||||||
|
// Loop while no task was stolen and any steal operation needs to be retried.
|
||||||
|
.find(|s| !s.is_retry())
|
||||||
|
// Extract the stolen task, if there is one.
|
||||||
|
.and_then(|s| s.success())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn get() -> &'static Pool {
|
||||||
|
lazy_static! {
|
||||||
|
static ref POOL: Pool = {
|
||||||
|
let num_threads = num_cpus::get().max(1);
|
||||||
|
let mut stealers = Vec::new();
|
||||||
|
|
||||||
|
// Spawn worker threads.
|
||||||
|
for _ in 0..num_threads {
|
||||||
|
let worker = Worker::new_fifo();
|
||||||
|
stealers.push(worker.stealer());
|
||||||
|
|
||||||
|
thread::Builder::new()
|
||||||
|
.name("async-task-driver".to_string())
|
||||||
|
.spawn(|| abort_on_panic(|| worker::main_loop(worker)))
|
||||||
|
.expect("cannot start a thread driving tasks");
|
||||||
|
}
|
||||||
|
|
||||||
|
Pool {
|
||||||
|
injector: Injector::new(),
|
||||||
|
stealers,
|
||||||
|
sleepers: Sleepers::new(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
&*POOL
|
||||||
|
}
|
||||||
|
|
52
src/task/sleepers.rs
Normal file
52
src/task/sleepers.rs
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::sync::{Condvar, Mutex};
|
||||||
|
|
||||||
|
/// The place where worker threads go to sleep.
|
||||||
|
///
|
||||||
|
/// Similar to how thread parking works, if a notification comes up while no threads are sleeping,
|
||||||
|
/// the next thread that attempts to go to sleep will pick up the notification immediately.
|
||||||
|
pub struct Sleepers {
|
||||||
|
/// How many threads are currently a sleep.
|
||||||
|
sleep: Mutex<usize>,
|
||||||
|
|
||||||
|
/// A condvar for notifying sleeping threads.
|
||||||
|
wake: Condvar,
|
||||||
|
|
||||||
|
/// Set to `true` if a notification came up while nobody was sleeping.
|
||||||
|
notified: AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sleepers {
|
||||||
|
/// Creates a new `Sleepers`.
|
||||||
|
pub fn new() -> Sleepers {
|
||||||
|
Sleepers {
|
||||||
|
sleep: Mutex::new(0),
|
||||||
|
wake: Condvar::new(),
|
||||||
|
notified: AtomicBool::new(false),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Puts the current thread to sleep.
|
||||||
|
pub fn wait(&self) {
|
||||||
|
let mut sleep = self.sleep.lock().unwrap();
|
||||||
|
|
||||||
|
if self.notified.swap(false, Ordering::SeqCst) == false {
|
||||||
|
*sleep += 1;
|
||||||
|
let _ = self.wake.wait(sleep).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Notifies one thread.
|
||||||
|
pub fn notify_one(&self) {
|
||||||
|
if self.notified.load(Ordering::SeqCst) == false {
|
||||||
|
let mut sleep = self.sleep.lock().unwrap();
|
||||||
|
|
||||||
|
if *sleep > 0 {
|
||||||
|
*sleep -= 1;
|
||||||
|
self.wake.notify_one();
|
||||||
|
} else {
|
||||||
|
self.notified.store(true, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -133,6 +133,8 @@ impl fmt::Display for TaskId {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) type Runnable = async_task::Task<Tag>;
|
||||||
|
|
||||||
pub(crate) struct Metadata {
|
pub(crate) struct Metadata {
|
||||||
pub task_id: TaskId,
|
pub task_id: TaskId,
|
||||||
pub name: Option<String>,
|
pub name: Option<String>,
|
||||||
|
|
110
src/task/worker.rs
Normal file
110
src/task/worker.rs
Normal file
|
@ -0,0 +1,110 @@
|
||||||
|
use std::cell::Cell;
|
||||||
|
use std::ptr;
|
||||||
|
|
||||||
|
use crossbeam_deque::Worker;
|
||||||
|
|
||||||
|
use super::pool;
|
||||||
|
use super::task;
|
||||||
|
use super::Task;
|
||||||
|
use crate::utils::abort_on_panic;
|
||||||
|
|
||||||
|
/// Returns a handle to the current task.
|
||||||
|
///
|
||||||
|
/// # Panics
|
||||||
|
///
|
||||||
|
/// This function will panic if not called within the context of a task created by [`block_on`],
|
||||||
|
/// [`spawn`], or [`Builder::spawn`].
|
||||||
|
///
|
||||||
|
/// [`block_on`]: fn.block_on.html
|
||||||
|
/// [`spawn`]: fn.spawn.html
|
||||||
|
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # fn main() { async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use async_std::task;
|
||||||
|
///
|
||||||
|
/// println!("The name of this task is {:?}", task::current().name());
|
||||||
|
/// #
|
||||||
|
/// # }) }
|
||||||
|
/// ```
|
||||||
|
pub fn current() -> Task {
|
||||||
|
get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task")
|
||||||
|
}
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
|
||||||
|
where
|
||||||
|
F: FnOnce() -> R,
|
||||||
|
{
|
||||||
|
struct ResetTag<'a>(&'a Cell<*const task::Tag>);
|
||||||
|
|
||||||
|
impl Drop for ResetTag<'_> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0.set(ptr::null());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TAG.with(|t| {
|
||||||
|
t.set(tag);
|
||||||
|
let _guard = ResetTag(t);
|
||||||
|
|
||||||
|
f()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_task<F, R>(f: F) -> Option<R>
|
||||||
|
where
|
||||||
|
F: FnOnce(&Task) -> R,
|
||||||
|
{
|
||||||
|
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(Some(val)) => Some(val),
|
||||||
|
Ok(None) | Err(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
static IS_WORKER: Cell<bool> = Cell::new(false);
|
||||||
|
static QUEUE: Cell<Option<Worker<task::Runnable>>> = Cell::new(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn is_worker() -> bool {
|
||||||
|
IS_WORKER.with(|is_worker| is_worker.get())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_queue<F: FnOnce(&Worker<task::Runnable>) -> T, T>(f: F) -> T {
|
||||||
|
QUEUE.with(|queue| {
|
||||||
|
let q = queue.take().unwrap();
|
||||||
|
let ret = f(&q);
|
||||||
|
queue.set(Some(q));
|
||||||
|
ret
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn schedule(task: task::Runnable) {
|
||||||
|
if is_worker() {
|
||||||
|
get_queue(|q| q.push(task));
|
||||||
|
} else {
|
||||||
|
pool::get().injector.push(task);
|
||||||
|
}
|
||||||
|
pool::get().sleepers.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn main_loop(worker: Worker<task::Runnable>) {
|
||||||
|
IS_WORKER.with(|is_worker| is_worker.set(true));
|
||||||
|
QUEUE.with(|queue| queue.set(Some(worker)));
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match get_queue(|q| pool::get().find_task(q)) {
|
||||||
|
Some(task) => set_tag(task.tag(), || abort_on_panic(|| task.run())),
|
||||||
|
None => pool::get().sleepers.wait(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue