195: Remove the Send bound from block_on r=stjepang a=stjepang



Co-authored-by: Stjepan Glavina <stjepang@gmail.com>
staging
bors[bot] 5 years ago committed by GitHub
commit 03f5022262
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -6,10 +6,12 @@ use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable}; use std::task::{RawWaker, RawWakerVTable};
use std::thread::{self, Thread}; use std::thread::{self, Thread};
use super::log_utils;
use super::pool; use super::pool;
use super::Builder; use super::task;
use crate::future::Future; use crate::future::Future;
use crate::task::{Context, Poll, Waker}; use crate::task::{Context, Poll, Waker};
use crate::utils::abort_on_panic;
/// Spawns a task and blocks the current thread on its result. /// Spawns a task and blocks the current thread on its result.
/// ///
@ -32,8 +34,7 @@ use crate::task::{Context, Poll, Waker};
/// ``` /// ```
pub fn block_on<F, T>(future: F) -> T pub fn block_on<F, T>(future: F) -> T
where where
F: Future<Output = T> + Send, F: Future<Output = T>,
T: Send,
{ {
unsafe { unsafe {
// A place on the stack where the result will be stored. // A place on the stack where the result will be stored.
@ -51,17 +52,48 @@ where
} }
}; };
// Create a tag for the task.
let tag = task::Tag::new(None);
// Log this `block_on` operation.
let child_id = tag.task_id().as_u64();
let parent_id = pool::get_task(|t| t.id().as_u64()).unwrap_or(0);
log_utils::print(
format_args!("block_on"),
log_utils::LogData {
parent_id,
child_id,
},
);
// Wrap the future into one that drops task-local variables on exit.
let future = async move {
let res = future.await;
// Abort on panic because thread-local variables behave the same way.
abort_on_panic(|| pool::get_task(|task| task.metadata().local_map.clear()));
log_utils::print(
format_args!("block_on completed"),
log_utils::LogData {
parent_id,
child_id,
},
);
res
};
// Pin the future onto the stack. // Pin the future onto the stack.
pin_utils::pin_mut!(future); pin_utils::pin_mut!(future);
// Transmute the future into one that is static and sendable. // Transmute the future into one that is static.
let future = mem::transmute::< let future = mem::transmute::<
Pin<&mut dyn Future<Output = ()>>, Pin<&'_ mut dyn Future<Output = ()>>,
Pin<&'static mut (dyn Future<Output = ()> + Send)>, Pin<&'static mut dyn Future<Output = ()>>,
>(future); >(future);
// Spawn the future and wait for it to complete. // Block on the future and and wait for it to complete.
block(pool::spawn_with_builder(Builder::new(), future, "block_on")); pool::set_tag(&tag, || block(future));
// Take out the result. // Take out the result.
match (*out.get()).take().unwrap() { match (*out.get()).take().unwrap() {
@ -87,7 +119,10 @@ impl<F: Future + UnwindSafe> Future for CatchUnwindFuture<F> {
} }
} }
fn block<F: Future>(f: F) -> F::Output { fn block<F, T>(f: F) -> T
where
F: Future<Output = T>,
{
thread_local! { thread_local! {
static ARC_THREAD: Arc<Thread> = Arc::new(thread::current()); static ARC_THREAD: Arc<Thread> = Arc::new(thread::current());
} }

@ -0,0 +1,32 @@
use std::fmt::Arguments;
/// This struct only exists because kv logging isn't supported from the macros right now.
pub(crate) struct LogData {
pub parent_id: u64,
pub child_id: u64,
}
impl<'a> log::kv::Source for LogData {
fn visit<'kvs>(
&'kvs self,
visitor: &mut dyn log::kv::Visitor<'kvs>,
) -> Result<(), log::kv::Error> {
visitor.visit_pair("parent_id".into(), self.parent_id.into())?;
visitor.visit_pair("child_id".into(), self.child_id.into())?;
Ok(())
}
}
pub fn print(msg: Arguments<'_>, key_values: impl log::kv::Source) {
log::logger().log(
&log::Record::builder()
.args(msg)
.key_values(&key_values)
.level(log::Level::Trace)
.target(module_path!())
.module_path(Some(module_path!()))
.file(Some(file!()))
.line(Some(line!()))
.build(),
);
}

@ -32,6 +32,7 @@ pub use task::{JoinHandle, Task, TaskId};
mod block_on; mod block_on;
mod local; mod local;
mod log_utils;
mod pool; mod pool;
mod sleep; mod sleep;
mod task; mod task;

@ -1,16 +1,16 @@
use std::cell::Cell; use std::cell::Cell;
use std::fmt::Arguments;
use std::mem;
use std::ptr; use std::ptr;
use std::thread; use std::thread;
use crossbeam_channel::{unbounded, Sender}; use crossbeam_channel::{unbounded, Sender};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use super::log_utils;
use super::task; use super::task;
use super::{JoinHandle, Task}; use super::{JoinHandle, Task};
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
use crate::utils::abort_on_panic;
/// Returns a handle to the current task. /// Returns a handle to the current task.
/// ///
@ -64,7 +64,7 @@ where
F: Future<Output = T> + Send + 'static, F: Future<Output = T> + Send + 'static,
T: Send + 'static, T: Send + 'static,
{ {
spawn_with_builder(Builder::new(), future, "spawn") spawn_with_builder(Builder::new(), future)
} }
/// Task builder that configures the settings of a new task. /// Task builder that configures the settings of a new task.
@ -91,15 +91,11 @@ impl Builder {
F: Future<Output = T> + Send + 'static, F: Future<Output = T> + Send + 'static,
T: Send + 'static, T: Send + 'static,
{ {
Ok(spawn_with_builder(self, future, "spawn")) Ok(spawn_with_builder(self, future))
} }
} }
pub(crate) fn spawn_with_builder<F, T>( pub(crate) fn spawn_with_builder<F, T>(builder: Builder, future: F) -> JoinHandle<T>
builder: Builder,
future: F,
fn_name: &'static str,
) -> JoinHandle<T>
where where
F: Future<Output = T> + Send + 'static, F: Future<Output = T> + Send + 'static,
T: Send + 'static, T: Send + 'static,
@ -117,13 +113,9 @@ where
thread::Builder::new() thread::Builder::new()
.name("async-task-driver".to_string()) .name("async-task-driver".to_string())
.spawn(|| { .spawn(|| {
TAG.with(|tag| {
for job in receiver { for job in receiver {
tag.set(job.tag()); set_tag(job.tag(), || abort_on_panic(|| job.run()))
abort_on_panic(|| job.run());
tag.set(ptr::null());
} }
});
}) })
.expect("cannot start a thread driving tasks"); .expect("cannot start a thread driving tasks");
} }
@ -135,11 +127,12 @@ where
let tag = task::Tag::new(name); let tag = task::Tag::new(name);
let schedule = |job| QUEUE.send(job).unwrap(); let schedule = |job| QUEUE.send(job).unwrap();
// Log this `spawn` operation.
let child_id = tag.task_id().as_u64(); let child_id = tag.task_id().as_u64();
let parent_id = get_task(|t| t.id().as_u64()).unwrap_or(0); let parent_id = get_task(|t| t.id().as_u64()).unwrap_or(0);
print( log_utils::print(
format_args!("{}", fn_name), format_args!("spawn"),
LogData { log_utils::LogData {
parent_id, parent_id,
child_id, child_id,
}, },
@ -152,9 +145,9 @@ where
// Abort on panic because thread-local variables behave the same way. // Abort on panic because thread-local variables behave the same way.
abort_on_panic(|| get_task(|task| task.metadata().local_map.clear())); abort_on_panic(|| get_task(|task| task.metadata().local_map.clear()));
print( log_utils::print(
format_args!("{} completed", fn_name), format_args!("spawn completed"),
LogData { log_utils::LogData {
parent_id, parent_id,
child_id, child_id,
}, },
@ -171,61 +164,34 @@ thread_local! {
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut()); static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
} }
pub(crate) fn get_task<F: FnOnce(&Task) -> R, R>(f: F) -> Option<R> { pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) }); where
F: FnOnce() -> R,
match res { {
Ok(Some(val)) => Some(val), struct ResetTag<'a>(&'a Cell<*const task::Tag>);
Ok(None) | Err(_) => None,
}
}
/// Calls a function and aborts if it panics.
///
/// This is useful in unsafe code where we can't recover from panics.
#[inline]
fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
struct Bomb;
impl Drop for Bomb { impl Drop for ResetTag<'_> {
fn drop(&mut self) { fn drop(&mut self) {
std::process::abort(); self.0.set(ptr::null());
} }
} }
let bomb = Bomb; TAG.with(|t| {
let t = f(); t.set(tag);
mem::forget(bomb); let _guard = ResetTag(t);
t
}
/// This struct only exists because kv logging isn't supported from the macros right now. f()
struct LogData { })
parent_id: u64,
child_id: u64,
} }
impl<'a> log::kv::Source for LogData { pub(crate) fn get_task<F, R>(f: F) -> Option<R>
fn visit<'kvs>( where
&'kvs self, F: FnOnce(&Task) -> R,
visitor: &mut dyn log::kv::Visitor<'kvs>, {
) -> Result<(), log::kv::Error> { let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
visitor.visit_pair("parent_id".into(), self.parent_id.into())?;
visitor.visit_pair("child_id".into(), self.child_id.into())?;
Ok(())
}
}
fn print(msg: Arguments<'_>, key_values: impl log::kv::Source) { match res {
log::logger().log( Ok(Some(val)) => Some(val),
&log::Record::builder() Ok(None) | Err(_) => None,
.args(msg) }
.key_values(&key_values)
.level(log::Level::Trace)
.target(module_path!())
.module_path(Some(module_path!()))
.file(Some(file!()))
.line(Some(line!()))
.build(),
);
} }

Loading…
Cancel
Save