diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a24367c8..a4f85ea2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,6 +86,12 @@ jobs: command: check args: --features attributes + - name: build unstable only + uses: actions-rs/cargo@v1 + with: + command: build + args: --no-default-features --features unstable + - name: tests uses: actions-rs/cargo@v1 with: diff --git a/Cargo.toml b/Cargo.toml index a5b5c4f0..8c890125 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,12 +24,15 @@ rustdoc-args = ["--cfg", "feature=\"docs\""] [features] default = [ "std", + "async-executor", + "async-io", "async-task", + "blocking", + "futures-lite", "kv-log-macro", "log", "num_cpus", "pin-project-lite", - "smol", ] docs = ["attributes", "unstable", "default"] unstable = [ @@ -54,7 +57,7 @@ alloc = [ "futures-core/alloc", "pin-project-lite", ] -tokio02 = ["smol/tokio02"] +tokio02 = ["tokio"] [dependencies] async-attributes = { version = "1.1.1", optional = true } @@ -77,7 +80,10 @@ futures-timer = { version = "3.0.2", optional = true } surf = { version = "1.0.3", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] -smol = { version = "0.1.17", optional = true } +async-executor = { version = "0.1.1", features = ["async-io"], optional = true } +async-io = { version = "0.1.5", optional = true } +blocking = { version = "0.5.0", optional = true } +futures-lite = { version = "0.1.8", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] futures-timer = { version = "3.0.2", optional = true, features = ["wasm-bindgen"] } @@ -87,6 +93,12 @@ futures-channel = { version = "0.3.4", optional = true } [target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test = "0.3.10" +[dependencies.tokio] +version = "0.2" +default-features = false +features = ["rt-threaded"] +optional = true + [dev-dependencies] femme = "1.3.0" rand = "0.7.3" diff --git a/src/fs/file.rs b/src/fs/file.rs index 2ff5643e..56d292b9 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -315,7 +315,7 @@ impl Drop for File { // non-blocking fashion, but our only other option here is losing data remaining in the // write cache. Good task schedulers should be resilient to occasional blocking hiccups in // file destructors so we don't expect this to be a common problem in practice. - let _ = smol::block_on(self.flush()); + let _ = futures_lite::future::block_on(self.flush()); } } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 09f5812f..8c87fc5f 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; -use smol::Async; +use async_io::Async; use crate::io; use crate::net::{TcpStream, ToSocketAddrs}; @@ -81,7 +81,7 @@ impl TcpListener { let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match Async::::bind(&addr) { + match Async::::bind(addr) { Ok(listener) => { return Ok(TcpListener { watcher: listener }); } @@ -227,7 +227,7 @@ cfg_unix! { impl IntoRawFd for TcpListener { fn into_raw_fd(self) -> RawFd { - self.watcher.into_raw_fd() + self.watcher.into_inner().unwrap().into_raw_fd() } } } @@ -251,7 +251,7 @@ cfg_windows! { impl IntoRawSocket for TcpListener { fn into_raw_socket(self) -> RawSocket { - self.watcher.into_raw_socket() + self.watcher.into_inner().unwrap().into_raw_socket() } } } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 63232fa3..f7bd5c91 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -2,7 +2,7 @@ use std::io::{IoSlice, IoSliceMut}; use std::net::SocketAddr; use std::pin::Pin; -use smol::Async; +use async_io::Async; use crate::io::{self, Read, Write}; use crate::net::ToSocketAddrs; @@ -77,7 +77,7 @@ impl TcpStream { let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match Async::::connect(&addr).await { + match Async::::connect(addr).await { Ok(stream) => { return Ok(TcpStream { watcher: Arc::new(stream), diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index d361a6fc..dd47e058 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -2,7 +2,7 @@ use std::io; use std::net::SocketAddr; use std::net::{Ipv4Addr, Ipv6Addr}; -use smol::Async; +use async_io::Async; use crate::net::ToSocketAddrs; use crate::utils::Context as _; @@ -74,7 +74,7 @@ impl UdpSocket { let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match Async::::bind(&addr) { + match Async::::bind(addr) { Ok(socket) => { return Ok(UdpSocket { watcher: socket }); } @@ -506,7 +506,7 @@ cfg_unix! { impl IntoRawFd for UdpSocket { fn into_raw_fd(self) -> RawFd { - self.watcher.into_raw_fd() + self.watcher.into_inner().unwrap().into_raw_fd() } } } @@ -530,7 +530,7 @@ cfg_windows! { impl IntoRawSocket for UdpSocket { fn into_raw_socket(self) -> RawSocket { - self.watcher.into_raw_socket() + self.watcher.into_inner().unwrap().into_raw_socket() } } } diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index 52c6b07f..83ef9fe9 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -4,7 +4,7 @@ use std::fmt; use std::net::Shutdown; use std::os::unix::net::UnixDatagram as StdUnixDatagram; -use smol::Async; +use async_io::Async; use super::SocketAddr; use crate::io; @@ -335,6 +335,6 @@ impl FromRawFd for UnixDatagram { impl IntoRawFd for UnixDatagram { fn into_raw_fd(self) -> RawFd { - self.watcher.into_raw_fd() + self.watcher.into_inner().unwrap().into_raw_fd() } } diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index a63bd4b6..078b780d 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -5,7 +5,7 @@ use std::future::Future; use std::os::unix::net::UnixListener as StdUnixListener; use std::pin::Pin; -use smol::Async; +use async_io::Async; use super::SocketAddr; use super::UnixStream; @@ -217,6 +217,6 @@ impl FromRawFd for UnixListener { impl IntoRawFd for UnixListener { fn into_raw_fd(self) -> RawFd { - self.watcher.into_raw_fd() + self.watcher.into_inner().unwrap().into_raw_fd() } } diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 74bd6aef..3b2fe36f 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -5,7 +5,7 @@ use std::net::Shutdown; use std::os::unix::net::UnixStream as StdUnixStream; use std::pin::Pin; -use smol::Async; +use async_io::Async; use super::SocketAddr; use crate::io::{self, Read, Write}; diff --git a/src/os/windows/mod.rs b/src/os/windows/mod.rs index 6dac11b6..67bf0372 100644 --- a/src/os/windows/mod.rs +++ b/src/os/windows/mod.rs @@ -5,5 +5,6 @@ cfg_std! { } cfg_unstable! { + #[cfg(feature = "default")] pub mod fs; } diff --git a/src/rt/mod.rs b/src/rt/mod.rs index d8550aac..f58afb18 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -27,7 +27,7 @@ pub static RUNTIME: Lazy = Lazy::new(|| { for _ in 0..thread_count { thread::Builder::new() .name(thread_name.clone()) - .spawn(|| crate::task::block_on(future::pending::<()>())) + .spawn(|| crate::task::executor::run_global(future::pending::<()>())) .expect("cannot start a runtime thread"); } Runtime {} diff --git a/src/task/builder.rs b/src/task/builder.rs index 0024f8ab..236081c0 100644 --- a/src/task/builder.rs +++ b/src/task/builder.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use pin_project_lite::pin_project; use crate::io; -use crate::task::{JoinHandle, Task, TaskLocalsWrapper}; +use crate::task::{self, JoinHandle, Task, TaskLocalsWrapper}; /// Task builder that configures the settings of a new task. #[derive(Debug, Default)] @@ -61,9 +61,9 @@ impl Builder { }); let task = wrapped.tag.task().clone(); - let smol_task = smol::Task::spawn(wrapped).into(); + let handle = task::executor::spawn(wrapped); - Ok(JoinHandle::new(smol_task, task)) + Ok(JoinHandle::new(handle, task)) } /// Spawns a task locally with the configured settings. @@ -81,9 +81,9 @@ impl Builder { }); let task = wrapped.tag.task().clone(); - let smol_task = smol::Task::local(wrapped).into(); + let handle = task::executor::local(wrapped); - Ok(JoinHandle::new(smol_task, task)) + Ok(JoinHandle::new(handle, task)) } /// Spawns a task locally with the configured settings. @@ -166,10 +166,10 @@ impl Builder { unsafe { TaskLocalsWrapper::set_current(&wrapped.tag, || { let res = if should_run { - // The first call should use run. - smol::run(wrapped) + // The first call should run the executor + task::executor::run(wrapped) } else { - smol::block_on(wrapped) + futures_lite::future::block_on(wrapped) }; num_nested_blocking.replace(num_nested_blocking.get() - 1); res diff --git a/src/task/executor.rs b/src/task/executor.rs new file mode 100644 index 00000000..d9caf053 --- /dev/null +++ b/src/task/executor.rs @@ -0,0 +1,72 @@ +use std::cell::RefCell; +use std::future::Future; + +static GLOBAL_EXECUTOR: once_cell::sync::Lazy = once_cell::sync::Lazy::new(async_executor::Executor::new); + +thread_local! { + static EXECUTOR: RefCell = RefCell::new(async_executor::LocalExecutor::new()); +} + +pub(crate) fn spawn(future: F) -> async_executor::Task +where + F: Future + Send + 'static, + T: Send + 'static, +{ + GLOBAL_EXECUTOR.spawn(future) +} + +#[cfg(feature = "unstable")] +pub(crate) fn local(future: F) -> async_executor::Task +where + F: Future + 'static, + T: 'static, +{ + EXECUTOR.with(|executor| executor.borrow().spawn(future)) +} + +pub(crate) fn run(future: F) -> T +where + F: Future, +{ + EXECUTOR.with(|executor| enter(|| GLOBAL_EXECUTOR.enter(|| executor.borrow().run(future)))) +} + +pub(crate) fn run_global(future: F) -> T +where + F: Future, +{ + enter(|| GLOBAL_EXECUTOR.run(future)) +} + +/// Enters the tokio context if the `tokio` feature is enabled. +fn enter(f: impl FnOnce() -> T) -> T { + #[cfg(not(feature = "tokio02"))] + return f(); + + #[cfg(feature = "tokio02")] + { + use std::cell::Cell; + use tokio::runtime::Runtime; + + thread_local! { + /// The level of nested `enter` calls we are in, to ensure that the outermost always + /// has a runtime spawned. + static NESTING: Cell = Cell::new(0); + } + + /// The global tokio runtime. + static RT: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| Runtime::new().expect("cannot initialize tokio")); + + NESTING.with(|nesting| { + let res = if nesting.get() == 0 { + nesting.replace(1); + RT.enter(f) + } else { + nesting.replace(nesting.get() + 1); + f() + }; + nesting.replace(nesting.get() - 1); + res + }) + } +} diff --git a/src/task/join_handle.rs b/src/task/join_handle.rs index 110b827e..9189ea57 100644 --- a/src/task/join_handle.rs +++ b/src/task/join_handle.rs @@ -18,7 +18,7 @@ pub struct JoinHandle { } #[cfg(not(target_os = "unknown"))] -type InnerHandle = async_task::JoinHandle; +type InnerHandle = async_executor::Task; #[cfg(target_arch = "wasm32")] type InnerHandle = futures_channel::oneshot::Receiver; @@ -54,8 +54,7 @@ impl JoinHandle { #[cfg(not(target_os = "unknown"))] pub async fn cancel(mut self) -> Option { let handle = self.handle.take().unwrap(); - handle.cancel(); - handle.await + handle.cancel().await } /// Cancel this task. @@ -67,15 +66,19 @@ impl JoinHandle { } } +#[cfg(not(target_os = "unknown"))] +impl Drop for JoinHandle { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + handle.detach(); + } + } +} + impl Future for JoinHandle { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(output) => { - Poll::Ready(output.expect("cannot await the result of a panicked task")) - } - } + Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx) } } diff --git a/src/task/mod.rs b/src/task/mod.rs index ca0b92a0..b528a788 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -148,6 +148,8 @@ cfg_default! { mod block_on; mod builder; mod current; + #[cfg(not(target_os = "unknown"))] + pub(crate) mod executor; mod join_handle; mod sleep; #[cfg(not(target_os = "unknown"))] diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index e9ed0c5a..9c836f24 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -1,4 +1,4 @@ -use crate::task::{JoinHandle, Task}; +use crate::task::{self, JoinHandle}; /// Spawns a blocking task. /// @@ -35,8 +35,5 @@ where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { - once_cell::sync::Lazy::force(&crate::rt::RUNTIME); - - let handle = smol::Task::blocking(async move { f() }).into(); - JoinHandle::new(handle, Task::new(None)) + task::spawn(async move { blocking::unblock!(f()) }) } diff --git a/src/utils.rs b/src/utils.rs index 31290e33..f3299f63 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -61,15 +61,15 @@ pub(crate) trait Context { #[cfg(all(not(target_os = "unknown"), feature = "default"))] mod timer { - pub type Timer = smol::Timer; + pub type Timer = async_io::Timer; } #[cfg(any(feature = "unstable", feature = "default"))] pub(crate) fn timer_after(dur: std::time::Duration) -> timer::Timer { - #[cfg(not(target_os = "unknown"))] + #[cfg(all(not(target_os = "unknown"), feature = "default"))] once_cell::sync::Lazy::force(&crate::rt::RUNTIME); - Timer::after(dur) + Timer::new(dur) } #[cfg(any( @@ -84,7 +84,7 @@ mod timer { pub(crate) struct Timer(futures_timer::Delay); impl Timer { - pub(crate) fn after(dur: std::time::Duration) -> Self { + pub(crate) fn new(dur: std::time::Duration) -> Self { Timer(futures_timer::Delay::new(dur)) } }