switch to async-executor

Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
pull/836/head
Marc-Antoine Perennou 5 years ago
parent 0c51283bfc
commit abc2929a8e

@ -24,13 +24,13 @@ rustdoc-args = ["--cfg", "feature=\"docs\""]
[features] [features]
default = [ default = [
"std", "std",
"async-executor",
"async-io", "async-io",
"async-task", "async-task",
"blocking", "blocking",
"futures-lite", "futures-lite",
"kv-log-macro", "kv-log-macro",
"log", "log",
"multitask",
"num_cpus", "num_cpus",
"pin-project-lite", "pin-project-lite",
] ]
@ -80,10 +80,10 @@ futures-timer = { version = "3.0.2", optional = true }
surf = { version = "1.0.3", optional = true } surf = { version = "1.0.3", optional = true }
[target.'cfg(not(target_os = "unknown"))'.dependencies] [target.'cfg(not(target_os = "unknown"))'.dependencies]
async-executor = { version = "0.1.1", features = ["async-io"], optional = true }
async-io = { version = "0.1.5", optional = true } async-io = { version = "0.1.5", optional = true }
blocking = { version = "0.5.0", optional = true } blocking = { version = "0.5.0", optional = true }
futures-lite = { version = "0.1.8", optional = true } futures-lite = { version = "0.1.8", optional = true }
multitask = { version = "0.2.0", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
futures-timer = { version = "3.0.2", optional = true, features = ["wasm-bindgen"] } futures-timer = { version = "3.0.2", optional = true, features = ["wasm-bindgen"] }

@ -27,7 +27,7 @@ pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
for _ in 0..thread_count { for _ in 0..thread_count {
thread::Builder::new() thread::Builder::new()
.name(thread_name.clone()) .name(thread_name.clone())
.spawn(|| crate::task::block_on(future::pending::<()>())) .spawn(|| crate::task::executor::run_global(future::pending::<()>()))
.expect("cannot start a runtime thread"); .expect("cannot start a runtime thread");
} }
Runtime {} Runtime {}

@ -1,23 +1,13 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::future::Future; use std::future::Future;
use std::task::{Context, Poll};
static GLOBAL_EXECUTOR: once_cell::sync::Lazy<multitask::Executor> = once_cell::sync::Lazy::new(multitask::Executor::new); static GLOBAL_EXECUTOR: once_cell::sync::Lazy<async_executor::Executor> = once_cell::sync::Lazy::new(async_executor::Executor::new);
struct Executor {
local_executor: multitask::LocalExecutor,
parker: async_io::parking::Parker,
}
thread_local! { thread_local! {
static EXECUTOR: RefCell<Executor> = RefCell::new({ static EXECUTOR: RefCell<async_executor::LocalExecutor> = RefCell::new(async_executor::LocalExecutor::new());
let (parker, unparker) = async_io::parking::pair();
let local_executor = multitask::LocalExecutor::new(move || unparker.unpark());
Executor { local_executor, parker }
});
} }
pub(crate) fn spawn<F, T>(future: F) -> multitask::Task<T> pub(crate) fn spawn<F, T>(future: F) -> async_executor::Task<T>
where where
F: Future<Output = T> + Send + 'static, F: Future<Output = T> + Send + 'static,
T: Send + 'static, T: Send + 'static,
@ -26,35 +16,26 @@ where
} }
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
pub(crate) fn local<F, T>(future: F) -> multitask::Task<T> pub(crate) fn local<F, T>(future: F) -> async_executor::Task<T>
where where
F: Future<Output = T> + 'static, F: Future<Output = T> + 'static,
T: 'static, T: 'static,
{ {
EXECUTOR.with(|executor| executor.borrow().local_executor.spawn(future)) EXECUTOR.with(|executor| executor.borrow().spawn(future))
} }
pub(crate) fn run<F, T>(future: F) -> T pub(crate) fn run<F, T>(future: F) -> T
where where
F: Future<Output = T>, F: Future<Output = T>,
{ {
enter(|| EXECUTOR.with(|executor| { EXECUTOR.with(|executor| enter(|| GLOBAL_EXECUTOR.enter(|| executor.borrow().run(future))))
let executor = executor.borrow(); }
let unparker = executor.parker.unparker();
let global_ticker = GLOBAL_EXECUTOR.ticker(move || unparker.unpark()); pub(crate) fn run_global<F, T>(future: F) -> T
let unparker = executor.parker.unparker(); where
let waker = async_task::waker_fn(move || unparker.unpark()); F: Future<Output = T>,
let cx = &mut Context::from_waker(&waker); {
pin_utils::pin_mut!(future); enter(|| GLOBAL_EXECUTOR.run(future))
loop {
if let Poll::Ready(res) = future.as_mut().poll(cx) {
return res;
}
if let Ok(false) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| executor.local_executor.tick() || global_ticker.tick())) {
executor.parker.park();
}
}
}))
} }
/// Enters the tokio context if the `tokio` feature is enabled. /// Enters the tokio context if the `tokio` feature is enabled.

@ -18,7 +18,7 @@ pub struct JoinHandle<T> {
} }
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
type InnerHandle<T> = multitask::Task<T>; type InnerHandle<T> = async_executor::Task<T>;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
type InnerHandle<T> = futures_channel::oneshot::Receiver<T>; type InnerHandle<T> = futures_channel::oneshot::Receiver<T>;

@ -149,7 +149,7 @@ cfg_default! {
mod builder; mod builder;
mod current; mod current;
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
mod executor; pub(crate) mod executor;
mod join_handle; mod join_handle;
mod sleep; mod sleep;
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]

Loading…
Cancel
Save