Merge pull request #836 from Keruspe/multitask

pull/845/head
Friedel Ziegelmayer 4 years ago committed by GitHub
commit f9c8974206
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -86,6 +86,12 @@ jobs:
command: check command: check
args: --features attributes args: --features attributes
- name: build unstable only
uses: actions-rs/cargo@v1
with:
command: build
args: --no-default-features --features unstable
- name: tests - name: tests
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with:

@ -24,12 +24,15 @@ rustdoc-args = ["--cfg", "feature=\"docs\""]
[features] [features]
default = [ default = [
"std", "std",
"async-executor",
"async-io",
"async-task", "async-task",
"blocking",
"futures-lite",
"kv-log-macro", "kv-log-macro",
"log", "log",
"num_cpus", "num_cpus",
"pin-project-lite", "pin-project-lite",
"smol",
] ]
docs = ["attributes", "unstable", "default"] docs = ["attributes", "unstable", "default"]
unstable = [ unstable = [
@ -54,7 +57,7 @@ alloc = [
"futures-core/alloc", "futures-core/alloc",
"pin-project-lite", "pin-project-lite",
] ]
tokio02 = ["smol/tokio02"] tokio02 = ["tokio"]
[dependencies] [dependencies]
async-attributes = { version = "1.1.1", optional = true } async-attributes = { version = "1.1.1", optional = true }
@ -77,7 +80,10 @@ futures-timer = { version = "3.0.2", optional = true }
surf = { version = "1.0.3", optional = true } surf = { version = "1.0.3", optional = true }
[target.'cfg(not(target_os = "unknown"))'.dependencies] [target.'cfg(not(target_os = "unknown"))'.dependencies]
smol = { version = "0.1.17", optional = true } async-executor = { version = "0.1.1", features = ["async-io"], optional = true }
async-io = { version = "0.1.5", optional = true }
blocking = { version = "0.5.0", optional = true }
futures-lite = { version = "0.1.8", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
futures-timer = { version = "3.0.2", optional = true, features = ["wasm-bindgen"] } futures-timer = { version = "3.0.2", optional = true, features = ["wasm-bindgen"] }
@ -87,6 +93,12 @@ futures-channel = { version = "0.3.4", optional = true }
[target.'cfg(target_arch = "wasm32")'.dev-dependencies] [target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3.10" wasm-bindgen-test = "0.3.10"
[dependencies.tokio]
version = "0.2"
default-features = false
features = ["rt-threaded"]
optional = true
[dev-dependencies] [dev-dependencies]
femme = "1.3.0" femme = "1.3.0"
rand = "0.7.3" rand = "0.7.3"

@ -315,7 +315,7 @@ impl Drop for File {
// non-blocking fashion, but our only other option here is losing data remaining in the // non-blocking fashion, but our only other option here is losing data remaining in the
// write cache. Good task schedulers should be resilient to occasional blocking hiccups in // write cache. Good task schedulers should be resilient to occasional blocking hiccups in
// file destructors so we don't expect this to be a common problem in practice. // file destructors so we don't expect this to be a common problem in practice.
let _ = smol::block_on(self.flush()); let _ = futures_lite::future::block_on(self.flush());
} }
} }

@ -2,7 +2,7 @@ use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use smol::Async; use async_io::Async;
use crate::io; use crate::io;
use crate::net::{TcpStream, ToSocketAddrs}; use crate::net::{TcpStream, ToSocketAddrs};
@ -81,7 +81,7 @@ impl TcpListener {
let addrs = addrs.to_socket_addrs().await?; let addrs = addrs.to_socket_addrs().await?;
for addr in addrs { for addr in addrs {
match Async::<std::net::TcpListener>::bind(&addr) { match Async::<std::net::TcpListener>::bind(addr) {
Ok(listener) => { Ok(listener) => {
return Ok(TcpListener { watcher: listener }); return Ok(TcpListener { watcher: listener });
} }
@ -227,7 +227,7 @@ cfg_unix! {
impl IntoRawFd for TcpListener { impl IntoRawFd for TcpListener {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd() self.watcher.into_inner().unwrap().into_raw_fd()
} }
} }
} }
@ -251,7 +251,7 @@ cfg_windows! {
impl IntoRawSocket for TcpListener { impl IntoRawSocket for TcpListener {
fn into_raw_socket(self) -> RawSocket { fn into_raw_socket(self) -> RawSocket {
self.watcher.into_raw_socket() self.watcher.into_inner().unwrap().into_raw_socket()
} }
} }
} }

@ -2,7 +2,7 @@ use std::io::{IoSlice, IoSliceMut};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use smol::Async; use async_io::Async;
use crate::io::{self, Read, Write}; use crate::io::{self, Read, Write};
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
@ -77,7 +77,7 @@ impl TcpStream {
let addrs = addrs.to_socket_addrs().await?; let addrs = addrs.to_socket_addrs().await?;
for addr in addrs { for addr in addrs {
match Async::<std::net::TcpStream>::connect(&addr).await { match Async::<std::net::TcpStream>::connect(addr).await {
Ok(stream) => { Ok(stream) => {
return Ok(TcpStream { return Ok(TcpStream {
watcher: Arc::new(stream), watcher: Arc::new(stream),

@ -2,7 +2,7 @@ use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::net::{Ipv4Addr, Ipv6Addr}; use std::net::{Ipv4Addr, Ipv6Addr};
use smol::Async; use async_io::Async;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
use crate::utils::Context as _; use crate::utils::Context as _;
@ -74,7 +74,7 @@ impl UdpSocket {
let addrs = addrs.to_socket_addrs().await?; let addrs = addrs.to_socket_addrs().await?;
for addr in addrs { for addr in addrs {
match Async::<std::net::UdpSocket>::bind(&addr) { match Async::<std::net::UdpSocket>::bind(addr) {
Ok(socket) => { Ok(socket) => {
return Ok(UdpSocket { watcher: socket }); return Ok(UdpSocket { watcher: socket });
} }
@ -506,7 +506,7 @@ cfg_unix! {
impl IntoRawFd for UdpSocket { impl IntoRawFd for UdpSocket {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd() self.watcher.into_inner().unwrap().into_raw_fd()
} }
} }
} }
@ -530,7 +530,7 @@ cfg_windows! {
impl IntoRawSocket for UdpSocket { impl IntoRawSocket for UdpSocket {
fn into_raw_socket(self) -> RawSocket { fn into_raw_socket(self) -> RawSocket {
self.watcher.into_raw_socket() self.watcher.into_inner().unwrap().into_raw_socket()
} }
} }
} }

@ -4,7 +4,7 @@ use std::fmt;
use std::net::Shutdown; use std::net::Shutdown;
use std::os::unix::net::UnixDatagram as StdUnixDatagram; use std::os::unix::net::UnixDatagram as StdUnixDatagram;
use smol::Async; use async_io::Async;
use super::SocketAddr; use super::SocketAddr;
use crate::io; use crate::io;
@ -335,6 +335,6 @@ impl FromRawFd for UnixDatagram {
impl IntoRawFd for UnixDatagram { impl IntoRawFd for UnixDatagram {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd() self.watcher.into_inner().unwrap().into_raw_fd()
} }
} }

@ -5,7 +5,7 @@ use std::future::Future;
use std::os::unix::net::UnixListener as StdUnixListener; use std::os::unix::net::UnixListener as StdUnixListener;
use std::pin::Pin; use std::pin::Pin;
use smol::Async; use async_io::Async;
use super::SocketAddr; use super::SocketAddr;
use super::UnixStream; use super::UnixStream;
@ -217,6 +217,6 @@ impl FromRawFd for UnixListener {
impl IntoRawFd for UnixListener { impl IntoRawFd for UnixListener {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd() self.watcher.into_inner().unwrap().into_raw_fd()
} }
} }

@ -5,7 +5,7 @@ use std::net::Shutdown;
use std::os::unix::net::UnixStream as StdUnixStream; use std::os::unix::net::UnixStream as StdUnixStream;
use std::pin::Pin; use std::pin::Pin;
use smol::Async; use async_io::Async;
use super::SocketAddr; use super::SocketAddr;
use crate::io::{self, Read, Write}; use crate::io::{self, Read, Write};

@ -5,5 +5,6 @@ cfg_std! {
} }
cfg_unstable! { cfg_unstable! {
#[cfg(feature = "default")]
pub mod fs; pub mod fs;
} }

@ -27,7 +27,7 @@ pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
for _ in 0..thread_count { for _ in 0..thread_count {
thread::Builder::new() thread::Builder::new()
.name(thread_name.clone()) .name(thread_name.clone())
.spawn(|| crate::task::block_on(future::pending::<()>())) .spawn(|| crate::task::executor::run_global(future::pending::<()>()))
.expect("cannot start a runtime thread"); .expect("cannot start a runtime thread");
} }
Runtime {} Runtime {}

@ -7,7 +7,7 @@ use std::task::{Context, Poll};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::io; use crate::io;
use crate::task::{JoinHandle, Task, TaskLocalsWrapper}; use crate::task::{self, JoinHandle, Task, TaskLocalsWrapper};
/// Task builder that configures the settings of a new task. /// Task builder that configures the settings of a new task.
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -61,9 +61,9 @@ impl Builder {
}); });
let task = wrapped.tag.task().clone(); let task = wrapped.tag.task().clone();
let smol_task = smol::Task::spawn(wrapped).into(); let handle = task::executor::spawn(wrapped);
Ok(JoinHandle::new(smol_task, task)) Ok(JoinHandle::new(handle, task))
} }
/// Spawns a task locally with the configured settings. /// Spawns a task locally with the configured settings.
@ -81,9 +81,9 @@ impl Builder {
}); });
let task = wrapped.tag.task().clone(); let task = wrapped.tag.task().clone();
let smol_task = smol::Task::local(wrapped).into(); let handle = task::executor::local(wrapped);
Ok(JoinHandle::new(smol_task, task)) Ok(JoinHandle::new(handle, task))
} }
/// Spawns a task locally with the configured settings. /// Spawns a task locally with the configured settings.
@ -166,10 +166,10 @@ impl Builder {
unsafe { unsafe {
TaskLocalsWrapper::set_current(&wrapped.tag, || { TaskLocalsWrapper::set_current(&wrapped.tag, || {
let res = if should_run { let res = if should_run {
// The first call should use run. // The first call should run the executor
smol::run(wrapped) task::executor::run(wrapped)
} else { } else {
smol::block_on(wrapped) futures_lite::future::block_on(wrapped)
}; };
num_nested_blocking.replace(num_nested_blocking.get() - 1); num_nested_blocking.replace(num_nested_blocking.get() - 1);
res res

@ -0,0 +1,72 @@
use std::cell::RefCell;
use std::future::Future;
static GLOBAL_EXECUTOR: once_cell::sync::Lazy<async_executor::Executor> = once_cell::sync::Lazy::new(async_executor::Executor::new);
thread_local! {
static EXECUTOR: RefCell<async_executor::LocalExecutor> = RefCell::new(async_executor::LocalExecutor::new());
}
pub(crate) fn spawn<F, T>(future: F) -> async_executor::Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
GLOBAL_EXECUTOR.spawn(future)
}
#[cfg(feature = "unstable")]
pub(crate) fn local<F, T>(future: F) -> async_executor::Task<T>
where
F: Future<Output = T> + 'static,
T: 'static,
{
EXECUTOR.with(|executor| executor.borrow().spawn(future))
}
pub(crate) fn run<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
EXECUTOR.with(|executor| enter(|| GLOBAL_EXECUTOR.enter(|| executor.borrow().run(future))))
}
pub(crate) fn run_global<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
enter(|| GLOBAL_EXECUTOR.run(future))
}
/// Enters the tokio context if the `tokio` feature is enabled.
fn enter<T>(f: impl FnOnce() -> T) -> T {
#[cfg(not(feature = "tokio02"))]
return f();
#[cfg(feature = "tokio02")]
{
use std::cell::Cell;
use tokio::runtime::Runtime;
thread_local! {
/// The level of nested `enter` calls we are in, to ensure that the outermost always
/// has a runtime spawned.
static NESTING: Cell<usize> = Cell::new(0);
}
/// The global tokio runtime.
static RT: once_cell::sync::Lazy<Runtime> = once_cell::sync::Lazy::new(|| Runtime::new().expect("cannot initialize tokio"));
NESTING.with(|nesting| {
let res = if nesting.get() == 0 {
nesting.replace(1);
RT.enter(f)
} else {
nesting.replace(nesting.get() + 1);
f()
};
nesting.replace(nesting.get() - 1);
res
})
}
}

@ -18,7 +18,7 @@ pub struct JoinHandle<T> {
} }
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
type InnerHandle<T> = async_task::JoinHandle<T, ()>; type InnerHandle<T> = async_executor::Task<T>;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
type InnerHandle<T> = futures_channel::oneshot::Receiver<T>; type InnerHandle<T> = futures_channel::oneshot::Receiver<T>;
@ -54,8 +54,7 @@ impl<T> JoinHandle<T> {
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]
pub async fn cancel(mut self) -> Option<T> { pub async fn cancel(mut self) -> Option<T> {
let handle = self.handle.take().unwrap(); let handle = self.handle.take().unwrap();
handle.cancel(); handle.cancel().await
handle.await
} }
/// Cancel this task. /// Cancel this task.
@ -67,15 +66,19 @@ impl<T> JoinHandle<T> {
} }
} }
#[cfg(not(target_os = "unknown"))]
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.detach();
}
}
}
impl<T> Future for JoinHandle<T> { impl<T> Future for JoinHandle<T> {
type Output = T; type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx) { Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx)
Poll::Pending => Poll::Pending,
Poll::Ready(output) => {
Poll::Ready(output.expect("cannot await the result of a panicked task"))
}
}
} }
} }

@ -148,6 +148,8 @@ cfg_default! {
mod block_on; mod block_on;
mod builder; mod builder;
mod current; mod current;
#[cfg(not(target_os = "unknown"))]
pub(crate) mod executor;
mod join_handle; mod join_handle;
mod sleep; mod sleep;
#[cfg(not(target_os = "unknown"))] #[cfg(not(target_os = "unknown"))]

@ -1,4 +1,4 @@
use crate::task::{JoinHandle, Task}; use crate::task::{self, JoinHandle};
/// Spawns a blocking task. /// Spawns a blocking task.
/// ///
@ -35,8 +35,5 @@ where
F: FnOnce() -> T + Send + 'static, F: FnOnce() -> T + Send + 'static,
T: Send + 'static, T: Send + 'static,
{ {
once_cell::sync::Lazy::force(&crate::rt::RUNTIME); task::spawn(async move { blocking::unblock!(f()) })
let handle = smol::Task::blocking(async move { f() }).into();
JoinHandle::new(handle, Task::new(None))
} }

@ -61,15 +61,15 @@ pub(crate) trait Context {
#[cfg(all(not(target_os = "unknown"), feature = "default"))] #[cfg(all(not(target_os = "unknown"), feature = "default"))]
mod timer { mod timer {
pub type Timer = smol::Timer; pub type Timer = async_io::Timer;
} }
#[cfg(any(feature = "unstable", feature = "default"))] #[cfg(any(feature = "unstable", feature = "default"))]
pub(crate) fn timer_after(dur: std::time::Duration) -> timer::Timer { pub(crate) fn timer_after(dur: std::time::Duration) -> timer::Timer {
#[cfg(not(target_os = "unknown"))] #[cfg(all(not(target_os = "unknown"), feature = "default"))]
once_cell::sync::Lazy::force(&crate::rt::RUNTIME); once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
Timer::after(dur) Timer::new(dur)
} }
#[cfg(any( #[cfg(any(
@ -84,7 +84,7 @@ mod timer {
pub(crate) struct Timer(futures_timer::Delay); pub(crate) struct Timer(futures_timer::Delay);
impl Timer { impl Timer {
pub(crate) fn after(dur: std::time::Duration) -> Self { pub(crate) fn new(dur: std::time::Duration) -> Self {
Timer(futures_timer::Delay::new(dur)) Timer(futures_timer::Delay::new(dur))
} }
} }

Loading…
Cancel
Save