diff --git a/Cargo.toml b/Cargo.toml index d1c10a6..e8e2ccc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ async-task = "1.0.0" cfg-if = "0.1.9" crossbeam-channel = "0.3.9" crossbeam-deque = "0.7.1" +crossbeam-utils = "0.6.6" futures-core-preview = "=0.3.0-alpha.19" futures-io-preview = "=0.3.0-alpha.19" futures-timer = "1.0.2" diff --git a/src/task/block_on.rs b/src/task/block_on.rs index db46f02..032bf02 100644 --- a/src/task/block_on.rs +++ b/src/task/block_on.rs @@ -1,10 +1,12 @@ -use std::cell::UnsafeCell; +use std::cell::{Cell, UnsafeCell}; use std::mem::{self, ManuallyDrop}; use std::panic::{self, AssertUnwindSafe, UnwindSafe}; use std::pin::Pin; use std::sync::Arc; use std::task::{RawWaker, RawWakerVTable}; -use std::thread::{self, Thread}; +use std::thread; + +use crossbeam_utils::sync::Parker; use super::task; use super::task_local; @@ -119,13 +121,21 @@ where F: Future, { thread_local! { - static ARC_THREAD: Arc = Arc::new(thread::current()); + // May hold a pre-allocated parker that can be reused for efficiency. + // + // Note that each invocation of `block` needs its own parker. In particular, if `block` + // recursively calls itself, we must make sure that each recursive call uses a distinct + // parker instance. + static CACHE: Cell>> = Cell::new(None); } pin_utils::pin_mut!(f); - ARC_THREAD.with(|arc_thread: &Arc| { - let ptr = (&**arc_thread as *const Thread) as *const (); + CACHE.with(|cache| { + // Reuse a cached parker or create a new one for this invocation of `block`. + let arc_parker: Arc = cache.take().unwrap_or_else(|| Arc::new(Parker::new())); + + let ptr = (&*arc_parker as *const Parker) as *const (); let vt = vtable(); let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) }; @@ -133,32 +143,34 @@ where loop { if let Poll::Ready(t) = f.as_mut().poll(cx) { + // Save the parker for the next invocation of `block`. + cache.set(Some(arc_parker)); return t; } - thread::park(); + arc_parker.park(); } }) } fn vtable() -> &'static RawWakerVTable { unsafe fn clone_raw(ptr: *const ()) -> RawWaker { - let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread)); + let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); mem::forget(arc.clone()); RawWaker::new(ptr, vtable()) } unsafe fn wake_raw(ptr: *const ()) { - let arc = Arc::from_raw(ptr as *const Thread); - arc.unpark(); + let arc = Arc::from_raw(ptr as *const Parker); + arc.unparker().unpark(); } unsafe fn wake_by_ref_raw(ptr: *const ()) { - let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread)); - arc.unpark(); + let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); + arc.unparker().unpark(); } unsafe fn drop_raw(ptr: *const ()) { - drop(Arc::from_raw(ptr as *const Thread)) + drop(Arc::from_raw(ptr as *const Parker)) } &RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)