diff --git a/src/task/block_on.rs b/src/task/block_on.rs index 65d6541..92a1187 100644 --- a/src/task/block_on.rs +++ b/src/task/block_on.rs @@ -1,8 +1,6 @@ use std::future::Future; -use kv_log_macro::trace; - -use crate::task::Task; +use crate::task::Builder; /// Spawns a task and blocks the current thread on its result. /// @@ -31,33 +29,5 @@ pub fn block_on(future: F) -> T where F: Future, { - // Create a new task handle. - let task = Task::new(None); - - // Log this `block_on` operation. - trace!("block_on", { - task_id: task.id().0, - parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), - }); - - let wrapped_future = async move { - // Drop task-locals on exit. - defer! { - Task::get_current(|t| unsafe { t.drop_locals() }); - } - - // Log completion on exit. - defer! { - trace!("completed", { - task_id: Task::get_current(|t| t.id().0), - }); - } - - future.await - }; - - once_cell::sync::Lazy::force(&crate::rt::RUNTIME); - - // Run the future as a task. - unsafe { Task::set_current(&task, || smol::block_on(wrapped_future)) } + Builder::new().blocking(future) } diff --git a/src/task/builder.rs b/src/task/builder.rs index de3e6ac..4936d4b 100644 --- a/src/task/builder.rs +++ b/src/task/builder.rs @@ -1,9 +1,12 @@ use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; use kv_log_macro::trace; use crate::io; -use crate::task::{JoinHandle, Task}; +use crate::task::{JoinHandle, Task, TaskLocalsWrapper}; /// Task builder that configures the settings of a new task. #[derive(Debug, Default)] @@ -25,41 +28,83 @@ impl Builder { self } + fn build(self, future: F) -> SupportTaskLocals + where + F: Future, + { + let name = self.name.map(Arc::new); + + // Create a new task handle. + let task = Task::new(name); + + once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + + let tag = TaskLocalsWrapper::new(task.clone()); + + // FIXME: do not require all futures to be boxed. + SupportTaskLocals { + tag, + future: Box::pin(future), + } + } + /// Spawns a task with the configured settings. pub fn spawn(self, future: F) -> io::Result> where F: Future + Send + 'static, T: Send + 'static, { - // Create a new task handle. - let task = Task::new(self.name); + let wrapped = self.build(future); // Log this `spawn` operation. trace!("spawn", { - task_id: task.id().0, - parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), + task_id: wrapped.tag.id().0, + parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), }); - let wrapped_future = async move { - // Drop task-locals on exit. - defer! { - Task::get_current(|t| unsafe { t.drop_locals() }); - } - - // Log completion on exit. - defer! { - trace!("completed", { - task_id: Task::get_current(|t| t.id().0), - }); - } - future.await - }; + let task = wrapped.tag.task().clone(); + let smol_task = smol::Task::spawn(wrapped).detach(); - once_cell::sync::Lazy::force(&crate::rt::RUNTIME); + Ok(JoinHandle::new(smol_task, task)) + } - // FIXME: figure out how to set the current task. + /// Spawns a task with the configured settings, blocking on its execution. + pub fn blocking(self, future: F) -> T + where + F: Future, + { + let wrapped = self.build(future); - let smol_task = smol::Task::spawn(wrapped_future).detach(); - Ok(JoinHandle::new(smol_task, task)) + // Log this `block_on` operation. + trace!("block_on", { + task_id: wrapped.tag.id().0, + parent_task_id: TaskLocalsWrapper::get_current(|t| t.id().0).unwrap_or(0), + }); + + // Run the future as a task. + unsafe { TaskLocalsWrapper::set_current(&wrapped.tag, || smol::block_on(wrapped)) } + } +} + +/// Wrapper to add support for task locals. +struct SupportTaskLocals { + tag: TaskLocalsWrapper, + future: Pin>, +} + +impl Drop for SupportTaskLocals { + fn drop(&mut self) { + // Log completion on exit. + trace!("completed", { + task_id: self.tag.id().0, + }); + } +} + +impl Future for SupportTaskLocals { + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + unsafe { TaskLocalsWrapper::set_current(&self.tag, || Pin::new(&mut self.future).poll(cx)) } } } diff --git a/src/task/current.rs b/src/task/current.rs index 0dc3699..e4624e1 100644 --- a/src/task/current.rs +++ b/src/task/current.rs @@ -1,4 +1,4 @@ -use crate::task::Task; +use crate::task::{Task, TaskLocalsWrapper}; /// Returns a handle to the current task. /// @@ -23,6 +23,6 @@ use crate::task::Task; /// # }) /// ``` pub fn current() -> Task { - Task::get_current(|t| t.clone()) + TaskLocalsWrapper::get_current(|t| t.task().clone()) .expect("`task::current()` called outside the context of a task") } diff --git a/src/task/mod.rs b/src/task/mod.rs index 61917cd..d4fccea 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -142,6 +142,7 @@ cfg_default! { pub use task_local::{AccessError, LocalKey}; pub(crate) use task_local::LocalsMap; + pub(crate) use task_locals_wrapper::TaskLocalsWrapper; mod block_on; mod builder; @@ -153,6 +154,7 @@ cfg_default! { mod task; mod task_id; mod task_local; + mod task_locals_wrapper; #[cfg(any(feature = "unstable", test))] pub use spawn_blocking::spawn_blocking; diff --git a/src/task/task.rs b/src/task/task.rs index bcec2e0..eba99c7 100644 --- a/src/task/task.rs +++ b/src/task/task.rs @@ -1,74 +1,32 @@ -use std::cell::Cell; use std::fmt; -use std::mem::ManuallyDrop; -use std::ptr; -use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; -use crate::task::{LocalsMap, TaskId}; -use crate::utils::abort_on_panic; +use crate::task::TaskId; -thread_local! { - /// A pointer to the currently running task. - static CURRENT: Cell<*const Task> = Cell::new(ptr::null_mut()); -} - -/// The inner representation of a task handle. -struct Inner { +/// A handle to a task. +#[derive(Clone)] +pub struct Task { /// The task ID. id: TaskId, /// The optional task name. - name: Option>, - - /// The map holding task-local values. - locals: LocalsMap, -} - -impl Inner { - #[inline] - fn new(name: Option) -> Inner { - Inner { - id: TaskId::generate(), - name: name.map(String::into_boxed_str), - locals: LocalsMap::new(), - } - } + name: Option>, } -/// A handle to a task. -pub struct Task { - /// The inner representation. - /// - /// This pointer is lazily initialized on first use. In most cases, the inner representation is - /// never touched and therefore we don't allocate it unless it's really needed. - inner: AtomicPtr, -} - -unsafe impl Send for Task {} -unsafe impl Sync for Task {} - impl Task { /// Creates a new task handle. - /// - /// If the task is unnamed, the inner representation of the task will be lazily allocated on - /// demand. #[inline] - pub(crate) fn new(name: Option) -> Task { - let inner = match name { - None => AtomicPtr::default(), - Some(name) => { - let raw = Arc::into_raw(Arc::new(Inner::new(Some(name)))); - AtomicPtr::new(raw as *mut Inner) - } - }; - Task { inner } + pub(crate) fn new(name: Option>) -> Task { + Task { + id: TaskId::generate(), + name, + } } /// Gets the task's unique identifier. #[inline] pub fn id(&self) -> TaskId { - self.inner().id + self.id } /// Returns the name of this task. @@ -77,93 +35,7 @@ impl Task { /// /// [`Builder::name`]: struct.Builder.html#method.name pub fn name(&self) -> Option<&str> { - self.inner().name.as_ref().map(|s| &**s) - } - - /// Returns the map holding task-local values. - pub(crate) fn locals(&self) -> &LocalsMap { - &self.inner().locals - } - - /// Drops all task-local values. - /// - /// This method is only safe to call at the end of the task. - #[inline] - pub(crate) unsafe fn drop_locals(&self) { - let raw = self.inner.load(Ordering::Acquire); - if let Some(inner) = raw.as_mut() { - // Abort the process if dropping task-locals panics. - abort_on_panic(|| { - inner.locals.clear(); - }); - } - } - - /// Returns the inner representation, initializing it on first use. - fn inner(&self) -> &Inner { - loop { - let raw = self.inner.load(Ordering::Acquire); - if !raw.is_null() { - return unsafe { &*raw }; - } - - let new = Arc::into_raw(Arc::new(Inner::new(None))) as *mut Inner; - if self.inner.compare_and_swap(raw, new, Ordering::AcqRel) != raw { - unsafe { - drop(Arc::from_raw(new)); - } - } - } - } - - /// Set a reference to the current task. - pub(crate) unsafe fn set_current(task: *const Task, f: F) -> R - where - F: FnOnce() -> R, - { - CURRENT.with(|current| { - let old_task = current.replace(task); - defer! { - current.set(old_task); - } - f() - }) - } - - /// Gets a reference to the current task. - pub(crate) fn get_current(f: F) -> Option - where - F: FnOnce(&Task) -> R, - { - let res = CURRENT.try_with(|current| unsafe { current.get().as_ref().map(f) }); - match res { - Ok(Some(val)) => Some(val), - Ok(None) | Err(_) => None, - } - } -} - -impl Drop for Task { - fn drop(&mut self) { - // Deallocate the inner representation if it was initialized. - let raw = *self.inner.get_mut(); - if !raw.is_null() { - unsafe { - drop(Arc::from_raw(raw)); - } - } - } -} - -impl Clone for Task { - fn clone(&self) -> Task { - // We need to make sure the inner representation is initialized now so that this instance - // and the clone have raw pointers that point to the same `Arc`. - let arc = unsafe { ManuallyDrop::new(Arc::from_raw(self.inner())) }; - let raw = Arc::into_raw(Arc::clone(&arc)); - Task { - inner: AtomicPtr::new(raw as *mut Inner), - } + self.name.as_ref().map(|s| s.as_str()) } } diff --git a/src/task/task_local.rs b/src/task/task_local.rs index 72e53d7..4e2ba83 100644 --- a/src/task/task_local.rs +++ b/src/task/task_local.rs @@ -3,7 +3,7 @@ use std::error::Error; use std::fmt; use std::sync::atomic::{AtomicU32, Ordering}; -use crate::task::Task; +use crate::task::TaskLocalsWrapper; /// The key for accessing a task-local value. /// @@ -98,7 +98,7 @@ impl LocalKey { where F: FnOnce(&T) -> R, { - Task::get_current(|task| unsafe { + TaskLocalsWrapper::get_current(|task| unsafe { // Prepare the numeric key, initialization function, and the map of task-locals. let key = self.key(); let init = || Box::new((self.__init)()) as Box; diff --git a/src/task/task_locals_wrapper.rs b/src/task/task_locals_wrapper.rs new file mode 100644 index 0000000..2a7ddb7 --- /dev/null +++ b/src/task/task_locals_wrapper.rs @@ -0,0 +1,84 @@ +use std::cell::Cell; +use std::ptr; + +use crate::task::{LocalsMap, Task, TaskId}; +use crate::utils::abort_on_panic; + +thread_local! { + /// A pointer to the currently running task. + static CURRENT: Cell<*const TaskLocalsWrapper> = Cell::new(ptr::null_mut()); +} + +/// A wrapper to store task local data. +pub(crate) struct TaskLocalsWrapper { + /// The actual task details. + task: Task, + + /// The map holding task-local values. + locals: LocalsMap, +} + +impl TaskLocalsWrapper { + /// Creates a new task handle. + /// + /// If the task is unnamed, the inner representation of the task will be lazily allocated on + /// demand. + #[inline] + pub(crate) fn new(task: Task) -> Self { + Self { + task, + locals: LocalsMap::new(), + } + } + + /// Gets the task's unique identifier. + #[inline] + pub fn id(&self) -> TaskId { + self.task.id() + } + + /// Returns a reference to the inner `Task`. + pub(crate) fn task(&self) -> &Task { + &self.task + } + + /// Returns the map holding task-local values. + pub(crate) fn locals(&self) -> &LocalsMap { + &self.locals + } + + /// Set a reference to the current task. + pub(crate) unsafe fn set_current(task: *const TaskLocalsWrapper, f: F) -> R + where + F: FnOnce() -> R, + { + CURRENT.with(|current| { + let old_task = current.replace(task); + defer! { + current.set(old_task); + } + f() + }) + } + + /// Gets a reference to the current task. + pub(crate) fn get_current(f: F) -> Option + where + F: FnOnce(&TaskLocalsWrapper) -> R, + { + let res = CURRENT.try_with(|current| unsafe { current.get().as_ref().map(f) }); + match res { + Ok(Some(val)) => Some(val), + Ok(None) | Err(_) => None, + } + } +} + +impl Drop for TaskLocalsWrapper { + fn drop(&mut self) { + // Abort the process if dropping task-locals panics. + abort_on_panic(|| { + unsafe { self.locals.clear() }; + }); + } +}