Make sure each invocation of block_on uses its own Parker (#358)

yoshuawuyts-patch-1
Stjepan Glavina 5 years ago committed by GitHub
parent e405544ea0
commit 46f0fb1c64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -30,6 +30,7 @@ async-task = "1.0.0"
cfg-if = "0.1.9" cfg-if = "0.1.9"
crossbeam-channel = "0.3.9" crossbeam-channel = "0.3.9"
crossbeam-deque = "0.7.1" crossbeam-deque = "0.7.1"
crossbeam-utils = "0.6.6"
futures-core-preview = "=0.3.0-alpha.19" futures-core-preview = "=0.3.0-alpha.19"
futures-io-preview = "=0.3.0-alpha.19" futures-io-preview = "=0.3.0-alpha.19"
futures-timer = "1.0.2" futures-timer = "1.0.2"

@ -1,10 +1,12 @@
use std::cell::UnsafeCell; use std::cell::{Cell, UnsafeCell};
use std::mem::{self, ManuallyDrop}; use std::mem::{self, ManuallyDrop};
use std::panic::{self, AssertUnwindSafe, UnwindSafe}; use std::panic::{self, AssertUnwindSafe, UnwindSafe};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable}; use std::task::{RawWaker, RawWakerVTable};
use std::thread::{self, Thread}; use std::thread;
use crossbeam_utils::sync::Parker;
use super::task; use super::task;
use super::task_local; use super::task_local;
@ -119,13 +121,21 @@ where
F: Future<Output = T>, F: Future<Output = T>,
{ {
thread_local! { thread_local! {
static ARC_THREAD: Arc<Thread> = Arc::new(thread::current()); // May hold a pre-allocated parker that can be reused for efficiency.
//
// Note that each invocation of `block` needs its own parker. In particular, if `block`
// recursively calls itself, we must make sure that each recursive call uses a distinct
// parker instance.
static CACHE: Cell<Option<Arc<Parker>>> = Cell::new(None);
} }
pin_utils::pin_mut!(f); pin_utils::pin_mut!(f);
ARC_THREAD.with(|arc_thread: &Arc<Thread>| { CACHE.with(|cache| {
let ptr = (&**arc_thread as *const Thread) as *const (); // Reuse a cached parker or create a new one for this invocation of `block`.
let arc_parker: Arc<Parker> = cache.take().unwrap_or_else(|| Arc::new(Parker::new()));
let ptr = (&*arc_parker as *const Parker) as *const ();
let vt = vtable(); let vt = vtable();
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) }; let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) };
@ -133,32 +143,34 @@ where
loop { loop {
if let Poll::Ready(t) = f.as_mut().poll(cx) { if let Poll::Ready(t) = f.as_mut().poll(cx) {
// Save the parker for the next invocation of `block`.
cache.set(Some(arc_parker));
return t; return t;
} }
thread::park(); arc_parker.park();
} }
}) })
} }
fn vtable() -> &'static RawWakerVTable { fn vtable() -> &'static RawWakerVTable {
unsafe fn clone_raw(ptr: *const ()) -> RawWaker { unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread)); let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
mem::forget(arc.clone()); mem::forget(arc.clone());
RawWaker::new(ptr, vtable()) RawWaker::new(ptr, vtable())
} }
unsafe fn wake_raw(ptr: *const ()) { unsafe fn wake_raw(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const Thread); let arc = Arc::from_raw(ptr as *const Parker);
arc.unpark(); arc.unparker().unpark();
} }
unsafe fn wake_by_ref_raw(ptr: *const ()) { unsafe fn wake_by_ref_raw(ptr: *const ()) {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread)); let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
arc.unpark(); arc.unparker().unpark();
} }
unsafe fn drop_raw(ptr: *const ()) { unsafe fn drop_raw(ptr: *const ()) {
drop(Arc::from_raw(ptr as *const Thread)) drop(Arc::from_raw(ptr as *const Parker))
} }
&RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw) &RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)

Loading…
Cancel
Save