mirror of
https://github.com/async-rs/async-std.git
synced 2025-04-29 11:46:52 +00:00
Remove the Send bound from block_on
This commit is contained in:
parent
e060326910
commit
1d862cf604
4 changed files with 116 additions and 82 deletions
|
@ -6,10 +6,12 @@ use std::sync::Arc;
|
||||||
use std::task::{RawWaker, RawWakerVTable};
|
use std::task::{RawWaker, RawWakerVTable};
|
||||||
use std::thread::{self, Thread};
|
use std::thread::{self, Thread};
|
||||||
|
|
||||||
|
use super::log_utils;
|
||||||
use super::pool;
|
use super::pool;
|
||||||
use super::Builder;
|
use super::task;
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::task::{Context, Poll, Waker};
|
use crate::task::{Context, Poll, Waker};
|
||||||
|
use crate::utils::abort_on_panic;
|
||||||
|
|
||||||
/// Spawns a task and blocks the current thread on its result.
|
/// Spawns a task and blocks the current thread on its result.
|
||||||
///
|
///
|
||||||
|
@ -32,8 +34,7 @@ use crate::task::{Context, Poll, Waker};
|
||||||
/// ```
|
/// ```
|
||||||
pub fn block_on<F, T>(future: F) -> T
|
pub fn block_on<F, T>(future: F) -> T
|
||||||
where
|
where
|
||||||
F: Future<Output = T> + Send,
|
F: Future<Output = T>,
|
||||||
T: Send,
|
|
||||||
{
|
{
|
||||||
unsafe {
|
unsafe {
|
||||||
// A place on the stack where the result will be stored.
|
// A place on the stack where the result will be stored.
|
||||||
|
@ -51,17 +52,48 @@ where
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Create a tag for the task.
|
||||||
|
let tag = task::Tag::new(None);
|
||||||
|
|
||||||
|
// Log this `block_on` operation.
|
||||||
|
let child_id = tag.task_id().as_u64();
|
||||||
|
let parent_id = pool::get_task(|t| t.id().as_u64()).unwrap_or(0);
|
||||||
|
log_utils::print(
|
||||||
|
format_args!("block_on"),
|
||||||
|
log_utils::LogData {
|
||||||
|
parent_id,
|
||||||
|
child_id,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
// Wrap the future into one that drops task-local variables on exit.
|
||||||
|
let future = async move {
|
||||||
|
let res = future.await;
|
||||||
|
|
||||||
|
// Abort on panic because thread-local variables behave the same way.
|
||||||
|
abort_on_panic(|| pool::get_task(|task| task.metadata().local_map.clear()));
|
||||||
|
|
||||||
|
log_utils::print(
|
||||||
|
format_args!("block_on completed"),
|
||||||
|
log_utils::LogData {
|
||||||
|
parent_id,
|
||||||
|
child_id,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
res
|
||||||
|
};
|
||||||
|
|
||||||
// Pin the future onto the stack.
|
// Pin the future onto the stack.
|
||||||
pin_utils::pin_mut!(future);
|
pin_utils::pin_mut!(future);
|
||||||
|
|
||||||
// Transmute the future into one that is static and sendable.
|
// Transmute the future into one that is static.
|
||||||
let future = mem::transmute::<
|
let future = mem::transmute::<
|
||||||
Pin<&mut dyn Future<Output = ()>>,
|
Pin<&'_ mut dyn Future<Output = ()>>,
|
||||||
Pin<&'static mut (dyn Future<Output = ()> + Send)>,
|
Pin<&'static mut dyn Future<Output = ()>>,
|
||||||
>(future);
|
>(future);
|
||||||
|
|
||||||
// Spawn the future and wait for it to complete.
|
// Block on the future and and wait for it to complete.
|
||||||
block(pool::spawn_with_builder(Builder::new(), future, "block_on"));
|
pool::set_tag(&tag, || block(future));
|
||||||
|
|
||||||
// Take out the result.
|
// Take out the result.
|
||||||
match (*out.get()).take().unwrap() {
|
match (*out.get()).take().unwrap() {
|
||||||
|
@ -87,7 +119,10 @@ impl<F: Future + UnwindSafe> Future for CatchUnwindFuture<F> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn block<F: Future>(f: F) -> F::Output {
|
fn block<F, T>(f: F) -> T
|
||||||
|
where
|
||||||
|
F: Future<Output = T>,
|
||||||
|
{
|
||||||
thread_local! {
|
thread_local! {
|
||||||
static ARC_THREAD: Arc<Thread> = Arc::new(thread::current());
|
static ARC_THREAD: Arc<Thread> = Arc::new(thread::current());
|
||||||
}
|
}
|
||||||
|
|
32
src/task/log_utils.rs
Normal file
32
src/task/log_utils.rs
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
use std::fmt::Arguments;
|
||||||
|
|
||||||
|
/// This struct only exists because kv logging isn't supported from the macros right now.
|
||||||
|
pub(crate) struct LogData {
|
||||||
|
pub parent_id: u64,
|
||||||
|
pub child_id: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> log::kv::Source for LogData {
|
||||||
|
fn visit<'kvs>(
|
||||||
|
&'kvs self,
|
||||||
|
visitor: &mut dyn log::kv::Visitor<'kvs>,
|
||||||
|
) -> Result<(), log::kv::Error> {
|
||||||
|
visitor.visit_pair("parent_id".into(), self.parent_id.into())?;
|
||||||
|
visitor.visit_pair("child_id".into(), self.child_id.into())?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn print(msg: Arguments<'_>, key_values: impl log::kv::Source) {
|
||||||
|
log::logger().log(
|
||||||
|
&log::Record::builder()
|
||||||
|
.args(msg)
|
||||||
|
.key_values(&key_values)
|
||||||
|
.level(log::Level::Trace)
|
||||||
|
.target(module_path!())
|
||||||
|
.module_path(Some(module_path!()))
|
||||||
|
.file(Some(file!()))
|
||||||
|
.line(Some(line!()))
|
||||||
|
.build(),
|
||||||
|
);
|
||||||
|
}
|
|
@ -32,6 +32,7 @@ pub use task::{JoinHandle, Task, TaskId};
|
||||||
|
|
||||||
mod block_on;
|
mod block_on;
|
||||||
mod local;
|
mod local;
|
||||||
|
mod log_utils;
|
||||||
mod pool;
|
mod pool;
|
||||||
mod sleep;
|
mod sleep;
|
||||||
mod task;
|
mod task;
|
||||||
|
|
112
src/task/pool.rs
112
src/task/pool.rs
|
@ -1,16 +1,16 @@
|
||||||
use std::cell::Cell;
|
use std::cell::Cell;
|
||||||
use std::fmt::Arguments;
|
|
||||||
use std::mem;
|
|
||||||
use std::ptr;
|
use std::ptr;
|
||||||
use std::thread;
|
use std::thread;
|
||||||
|
|
||||||
use crossbeam_channel::{unbounded, Sender};
|
use crossbeam_channel::{unbounded, Sender};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
|
use super::log_utils;
|
||||||
use super::task;
|
use super::task;
|
||||||
use super::{JoinHandle, Task};
|
use super::{JoinHandle, Task};
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::io;
|
use crate::io;
|
||||||
|
use crate::utils::abort_on_panic;
|
||||||
|
|
||||||
/// Returns a handle to the current task.
|
/// Returns a handle to the current task.
|
||||||
///
|
///
|
||||||
|
@ -64,7 +64,7 @@ where
|
||||||
F: Future<Output = T> + Send + 'static,
|
F: Future<Output = T> + Send + 'static,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
spawn_with_builder(Builder::new(), future, "spawn")
|
spawn_with_builder(Builder::new(), future)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Task builder that configures the settings of a new task.
|
/// Task builder that configures the settings of a new task.
|
||||||
|
@ -91,15 +91,11 @@ impl Builder {
|
||||||
F: Future<Output = T> + Send + 'static,
|
F: Future<Output = T> + Send + 'static,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
Ok(spawn_with_builder(self, future, "spawn"))
|
Ok(spawn_with_builder(self, future))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn spawn_with_builder<F, T>(
|
pub(crate) fn spawn_with_builder<F, T>(builder: Builder, future: F) -> JoinHandle<T>
|
||||||
builder: Builder,
|
|
||||||
future: F,
|
|
||||||
fn_name: &'static str,
|
|
||||||
) -> JoinHandle<T>
|
|
||||||
where
|
where
|
||||||
F: Future<Output = T> + Send + 'static,
|
F: Future<Output = T> + Send + 'static,
|
||||||
T: Send + 'static,
|
T: Send + 'static,
|
||||||
|
@ -117,13 +113,9 @@ where
|
||||||
thread::Builder::new()
|
thread::Builder::new()
|
||||||
.name("async-task-driver".to_string())
|
.name("async-task-driver".to_string())
|
||||||
.spawn(|| {
|
.spawn(|| {
|
||||||
TAG.with(|tag| {
|
for job in receiver {
|
||||||
for job in receiver {
|
set_tag(job.tag(), || abort_on_panic(|| job.run()))
|
||||||
tag.set(job.tag());
|
}
|
||||||
abort_on_panic(|| job.run());
|
|
||||||
tag.set(ptr::null());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
})
|
})
|
||||||
.expect("cannot start a thread driving tasks");
|
.expect("cannot start a thread driving tasks");
|
||||||
}
|
}
|
||||||
|
@ -135,11 +127,12 @@ where
|
||||||
let tag = task::Tag::new(name);
|
let tag = task::Tag::new(name);
|
||||||
let schedule = |job| QUEUE.send(job).unwrap();
|
let schedule = |job| QUEUE.send(job).unwrap();
|
||||||
|
|
||||||
|
// Log this `spawn` operation.
|
||||||
let child_id = tag.task_id().as_u64();
|
let child_id = tag.task_id().as_u64();
|
||||||
let parent_id = get_task(|t| t.id().as_u64()).unwrap_or(0);
|
let parent_id = get_task(|t| t.id().as_u64()).unwrap_or(0);
|
||||||
print(
|
log_utils::print(
|
||||||
format_args!("{}", fn_name),
|
format_args!("spawn"),
|
||||||
LogData {
|
log_utils::LogData {
|
||||||
parent_id,
|
parent_id,
|
||||||
child_id,
|
child_id,
|
||||||
},
|
},
|
||||||
|
@ -152,9 +145,9 @@ where
|
||||||
// Abort on panic because thread-local variables behave the same way.
|
// Abort on panic because thread-local variables behave the same way.
|
||||||
abort_on_panic(|| get_task(|task| task.metadata().local_map.clear()));
|
abort_on_panic(|| get_task(|task| task.metadata().local_map.clear()));
|
||||||
|
|
||||||
print(
|
log_utils::print(
|
||||||
format_args!("{} completed", fn_name),
|
format_args!("spawn completed"),
|
||||||
LogData {
|
log_utils::LogData {
|
||||||
parent_id,
|
parent_id,
|
||||||
child_id,
|
child_id,
|
||||||
},
|
},
|
||||||
|
@ -171,7 +164,30 @@ thread_local! {
|
||||||
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
|
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn get_task<F: FnOnce(&Task) -> R, R>(f: F) -> Option<R> {
|
pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
|
||||||
|
where
|
||||||
|
F: FnOnce() -> R,
|
||||||
|
{
|
||||||
|
struct ResetTag<'a>(&'a Cell<*const task::Tag>);
|
||||||
|
|
||||||
|
impl Drop for ResetTag<'_> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.0.set(ptr::null());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TAG.with(|t| {
|
||||||
|
t.set(tag);
|
||||||
|
let _guard = ResetTag(t);
|
||||||
|
|
||||||
|
f()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn get_task<F, R>(f: F) -> Option<R>
|
||||||
|
where
|
||||||
|
F: FnOnce(&Task) -> R,
|
||||||
|
{
|
||||||
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
|
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
|
||||||
|
|
||||||
match res {
|
match res {
|
||||||
|
@ -179,53 +195,3 @@ pub(crate) fn get_task<F: FnOnce(&Task) -> R, R>(f: F) -> Option<R> {
|
||||||
Ok(None) | Err(_) => None,
|
Ok(None) | Err(_) => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calls a function and aborts if it panics.
|
|
||||||
///
|
|
||||||
/// This is useful in unsafe code where we can't recover from panics.
|
|
||||||
#[inline]
|
|
||||||
fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
|
|
||||||
struct Bomb;
|
|
||||||
|
|
||||||
impl Drop for Bomb {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
std::process::abort();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let bomb = Bomb;
|
|
||||||
let t = f();
|
|
||||||
mem::forget(bomb);
|
|
||||||
t
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This struct only exists because kv logging isn't supported from the macros right now.
|
|
||||||
struct LogData {
|
|
||||||
parent_id: u64,
|
|
||||||
child_id: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<'a> log::kv::Source for LogData {
|
|
||||||
fn visit<'kvs>(
|
|
||||||
&'kvs self,
|
|
||||||
visitor: &mut dyn log::kv::Visitor<'kvs>,
|
|
||||||
) -> Result<(), log::kv::Error> {
|
|
||||||
visitor.visit_pair("parent_id".into(), self.parent_id.into())?;
|
|
||||||
visitor.visit_pair("child_id".into(), self.child_id.into())?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn print(msg: Arguments<'_>, key_values: impl log::kv::Source) {
|
|
||||||
log::logger().log(
|
|
||||||
&log::Record::builder()
|
|
||||||
.args(msg)
|
|
||||||
.key_values(&key_values)
|
|
||||||
.level(log::Level::Trace)
|
|
||||||
.target(module_path!())
|
|
||||||
.module_path(Some(module_path!()))
|
|
||||||
.file(Some(file!()))
|
|
||||||
.line(Some(line!()))
|
|
||||||
.build(),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue