Refactor the task module (#421)

* Refactor the task module

* Fix clippy warning

* Simplify task-local entries

* Reduce the amount of future wrapping

* Cleanup

* Simplify stealing
poc-serde-support
Stjepan Glavina 5 years ago committed by GitHub
parent c1e8517959
commit 3dd59d7056
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -27,23 +27,23 @@ unstable = ["broadcaster"]
[dependencies]
async-macros = "1.0.0"
async-task = "1.0.0"
broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] }
crossbeam-channel = "0.3.9"
crossbeam-deque = "0.7.1"
crossbeam-utils = "0.6.6"
futures-core-preview = "=0.3.0-alpha.19"
futures-io-preview = "=0.3.0-alpha.19"
futures-timer = "1.0.2"
kv-log-macro = "1.0.4"
log = { version = "0.4.8", features = ["kv_unstable"] }
memchr = "2.2.1"
mio = "0.6.19"
mio-uds = "0.6.7"
num_cpus = "1.10.1"
once_cell = "1.2.0"
pin-project-lite = "0.1"
pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"
kv-log-macro = "1.0.4"
broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] }
pin-project-lite = "0.1"
[dev-dependencies]
femme = "1.2.0"

@ -1,6 +1,6 @@
use crate::io;
use crate::path::{Path, PathBuf};
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Returns the canonical form of a path.
///
@ -32,5 +32,5 @@ use crate::task::blocking;
/// ```
pub async fn canonicalize<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::canonicalize(&path).map(Into::into)).await
spawn_blocking(move || std::fs::canonicalize(&path).map(Into::into)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Copies the contents and permissions of a file to a new location.
///
@ -41,5 +41,5 @@ use crate::task::blocking;
pub async fn copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> {
let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned();
blocking::spawn(move || std::fs::copy(&from, &to)).await
spawn_blocking(move || std::fs::copy(&from, &to)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Creates a new directory.
///
@ -34,5 +34,5 @@ use crate::task::blocking;
/// ```
pub async fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::create_dir(path)).await
spawn_blocking(move || std::fs::create_dir(path)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Creates a new directory and all of its parents if they are missing.
///
@ -29,5 +29,5 @@ use crate::task::blocking;
/// ```
pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::create_dir_all(path)).await
spawn_blocking(move || std::fs::create_dir_all(path)).await
}

@ -2,7 +2,7 @@ use std::future::Future;
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// A builder for creating directories with configurable options.
///
@ -107,7 +107,7 @@ impl DirBuilder {
}
let path = path.as_ref().to_owned();
async move { blocking::spawn(move || builder.create(path)).await }
async move { spawn_blocking(move || builder.create(path)).await }
}
}

@ -5,7 +5,7 @@ use std::sync::Arc;
use crate::fs::{FileType, Metadata};
use crate::io;
use crate::path::PathBuf;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// An entry in a directory.
///
@ -87,7 +87,7 @@ impl DirEntry {
/// ```
pub async fn metadata(&self) -> io::Result<Metadata> {
let inner = self.0.clone();
blocking::spawn(move || inner.metadata()).await
spawn_blocking(move || inner.metadata()).await
}
/// Reads the file type for this entry.
@ -125,7 +125,7 @@ impl DirEntry {
/// ```
pub async fn file_type(&self) -> io::Result<FileType> {
let inner = self.0.clone();
blocking::spawn(move || inner.file_type()).await
spawn_blocking(move || inner.file_type()).await
}
/// Returns the bare name of this entry without the leading path.

@ -12,7 +12,7 @@ use crate::future;
use crate::io::{self, Read, Seek, SeekFrom, Write};
use crate::path::Path;
use crate::prelude::*;
use crate::task::{self, blocking, Context, Poll, Waker};
use crate::task::{self, spawn_blocking, Context, Poll, Waker};
/// An open file on the filesystem.
///
@ -112,7 +112,7 @@ impl File {
/// ```
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned();
let file = blocking::spawn(move || std::fs::File::open(&path)).await?;
let file = spawn_blocking(move || std::fs::File::open(&path)).await?;
Ok(File::new(file, true))
}
@ -147,7 +147,7 @@ impl File {
/// ```
pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned();
let file = blocking::spawn(move || std::fs::File::create(&path)).await?;
let file = spawn_blocking(move || std::fs::File::create(&path)).await?;
Ok(File::new(file, true))
}
@ -180,7 +180,7 @@ impl File {
})
.await?;
blocking::spawn(move || state.file.sync_all()).await
spawn_blocking(move || state.file.sync_all()).await
}
/// Synchronizes OS-internal buffered contents to disk.
@ -216,7 +216,7 @@ impl File {
})
.await?;
blocking::spawn(move || state.file.sync_data()).await
spawn_blocking(move || state.file.sync_data()).await
}
/// Truncates or extends the file.
@ -249,7 +249,7 @@ impl File {
})
.await?;
blocking::spawn(move || state.file.set_len(size)).await
spawn_blocking(move || state.file.set_len(size)).await
}
/// Reads the file's metadata.
@ -268,7 +268,7 @@ impl File {
/// ```
pub async fn metadata(&self) -> io::Result<Metadata> {
let file = self.file.clone();
blocking::spawn(move || file.metadata()).await
spawn_blocking(move || file.metadata()).await
}
/// Changes the permissions on the file.
@ -297,7 +297,7 @@ impl File {
/// ```
pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
let file = self.file.clone();
blocking::spawn(move || file.set_permissions(perm)).await
spawn_blocking(move || file.set_permissions(perm)).await
}
}
@ -692,7 +692,7 @@ impl LockGuard<State> {
self.register(cx);
// Start a read operation asynchronously.
blocking::spawn(move || {
spawn_blocking(move || {
// Read some data from the file into the cache.
let res = {
let State { file, cache, .. } = &mut *self;
@ -801,7 +801,7 @@ impl LockGuard<State> {
self.register(cx);
// Start a write operation asynchronously.
blocking::spawn(move || {
spawn_blocking(move || {
match (&*self.file).write_all(&self.cache) {
Ok(_) => {
// Switch to idle mode.
@ -834,7 +834,7 @@ impl LockGuard<State> {
self.register(cx);
// Start a flush operation asynchronously.
blocking::spawn(move || {
spawn_blocking(move || {
match (&*self.file).flush() {
Ok(()) => {
// Mark the file as flushed.

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Creates a hard link on the filesystem.
///
@ -32,5 +32,5 @@ use crate::task::blocking;
pub async fn hard_link<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned();
blocking::spawn(move || std::fs::hard_link(&from, &to)).await
spawn_blocking(move || std::fs::hard_link(&from, &to)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Reads metadata for a path.
///
@ -34,7 +34,7 @@ use crate::task::blocking;
/// ```
pub async fn metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::metadata(path)).await
spawn_blocking(move || std::fs::metadata(path)).await
}
cfg_not_docs! {

@ -3,7 +3,7 @@ use std::future::Future;
use crate::fs::File;
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// A builder for opening files with configurable options.
///
@ -285,7 +285,7 @@ impl OpenOptions {
let path = path.as_ref().to_owned();
let options = self.0.clone();
async move {
let file = blocking::spawn(move || options.open(path)).await?;
let file = spawn_blocking(move || options.open(path)).await?;
Ok(File::new(file, true))
}
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Reads the entire contents of a file as raw bytes.
///
@ -36,5 +36,5 @@ use crate::task::blocking;
/// ```
pub async fn read<P: AsRef<Path>>(path: P) -> io::Result<Vec<u8>> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::read(path)).await
spawn_blocking(move || std::fs::read(path)).await
}

@ -5,7 +5,7 @@ use crate::future::Future;
use crate::io;
use crate::path::Path;
use crate::stream::Stream;
use crate::task::{blocking, Context, JoinHandle, Poll};
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
/// Returns a stream of entries in a directory.
///
@ -45,7 +45,7 @@ use crate::task::{blocking, Context, JoinHandle, Poll};
/// ```
pub async fn read_dir<P: AsRef<Path>>(path: P) -> io::Result<ReadDir> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::read_dir(path))
spawn_blocking(move || std::fs::read_dir(path))
.await
.map(ReadDir::new)
}
@ -91,7 +91,7 @@ impl Stream for ReadDir {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
self.0 = State::Busy(blocking::spawn(move || {
self.0 = State::Busy(spawn_blocking(move || {
let next = inner.next();
(inner, next)
}));

@ -1,6 +1,6 @@
use crate::io;
use crate::path::{Path, PathBuf};
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Reads a symbolic link and returns the path it points to.
///
@ -28,5 +28,5 @@ use crate::task::blocking;
/// ```
pub async fn read_link<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::read_link(path).map(Into::into)).await
spawn_blocking(move || std::fs::read_link(path).map(Into::into)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Reads the entire contents of a file as a string.
///
@ -37,5 +37,5 @@ use crate::task::blocking;
/// ```
pub async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::read_to_string(path)).await
spawn_blocking(move || std::fs::read_to_string(path)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Removes an empty directory.
///
@ -29,5 +29,5 @@ use crate::task::blocking;
/// ```
pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::remove_dir(path)).await
spawn_blocking(move || std::fs::remove_dir(path)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Removes a directory and all of its contents.
///
@ -29,5 +29,5 @@ use crate::task::blocking;
/// ```
pub async fn remove_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::remove_dir_all(path)).await
spawn_blocking(move || std::fs::remove_dir_all(path)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Removes a file.
///
@ -29,5 +29,5 @@ use crate::task::blocking;
/// ```
pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::remove_file(path)).await
spawn_blocking(move || std::fs::remove_file(path)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Renames a file or directory to a new location.
///
@ -34,5 +34,5 @@ use crate::task::blocking;
pub async fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned();
blocking::spawn(move || std::fs::rename(&from, &to)).await
spawn_blocking(move || std::fs::rename(&from, &to)).await
}

@ -1,7 +1,7 @@
use crate::fs::Permissions;
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Changes the permissions of a file or directory.
///
@ -32,5 +32,5 @@ use crate::task::blocking;
/// ```
pub async fn set_permissions<P: AsRef<Path>>(path: P, perm: Permissions) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::set_permissions(path, perm)).await
spawn_blocking(move || std::fs::set_permissions(path, perm)).await
}

@ -1,7 +1,7 @@
use crate::fs::Metadata;
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Reads metadata for a path without following symbolic links.
///
@ -34,5 +34,5 @@ use crate::task::blocking;
/// ```
pub async fn symlink_metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> {
let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::symlink_metadata(path)).await
spawn_blocking(move || std::fs::symlink_metadata(path)).await
}

@ -1,6 +1,6 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Writes a slice of bytes as the new contents of a file.
///
@ -33,5 +33,5 @@ use crate::task::blocking;
pub async fn write<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
let path = path.as_ref().to_owned();
let contents = contents.as_ref().to_owned();
blocking::spawn(move || std::fs::write(path, contents)).await
spawn_blocking(move || std::fs::write(path, contents)).await
}

@ -3,7 +3,7 @@ use std::sync::Mutex;
use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, JoinHandle, Poll};
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
/// Constructs a new handle to the standard error of the current process.
///
@ -124,7 +124,7 @@ impl Write for Stderr {
inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || {
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::write(&mut inner.stderr, &inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
@ -152,7 +152,7 @@ impl Write for Stderr {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || {
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::flush(&mut inner.stderr);
inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner))

@ -3,7 +3,7 @@ use std::sync::Mutex;
use crate::future::{self, Future};
use crate::io::{self, Read};
use crate::task::{blocking, Context, JoinHandle, Poll};
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
/// Constructs a new handle to the standard input of the current process.
///
@ -127,7 +127,7 @@ impl Stdin {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || {
*state = State::Busy(spawn_blocking(move || {
inner.line.clear();
let res = inner.stdin.read_line(&mut inner.line);
inner.last_op = Some(Operation::ReadLine(res));
@ -180,7 +180,7 @@ impl Read for Stdin {
}
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || {
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf);
inner.last_op = Some(Operation::Read(res));
State::Idle(Some(inner))

@ -3,7 +3,7 @@ use std::sync::Mutex;
use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, JoinHandle, Poll};
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
/// Constructs a new handle to the standard output of the current process.
///
@ -124,7 +124,7 @@ impl Write for Stdout {
inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || {
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::write(&mut inner.stdout, &inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
@ -152,7 +152,7 @@ impl Write for Stdout {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || {
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::flush(&mut inner.stdout);
inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner))

@ -5,7 +5,7 @@ use std::pin::Pin;
use crate::future::Future;
use crate::io;
use crate::task::{blocking, Context, JoinHandle, Poll};
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
cfg_not_docs! {
macro_rules! ret {
@ -194,7 +194,7 @@ impl ToSocketAddrs for (&str, u16) {
}
let host = host.to_string();
let task = blocking::spawn(move || {
let task = spawn_blocking(move || {
std::net::ToSocketAddrs::to_socket_addrs(&(host.as_str(), port))
});
ToSocketAddrsFuture::Resolving(task)
@ -215,7 +215,7 @@ impl ToSocketAddrs for str {
}
let addr = self.to_string();
let task = blocking::spawn(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str()));
let task = spawn_blocking(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str()));
ToSocketAddrsFuture::Resolving(task)
}
}

@ -105,7 +105,7 @@ static REACTOR: Lazy<Reactor> = Lazy::new(|| {
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
// handles.
std::thread::Builder::new()
.name("async-net-driver".to_string())
.name("async-std/net".to_string())
.spawn(move || {
// If the driver thread panics, there's not much we can do. It is not a
// recoverable error and there is no place to propagate it into so we just abort.

@ -6,8 +6,7 @@ use crate::future;
use crate::io::{self, Read, Write};
use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs;
use crate::task::blocking;
use crate::task::{Context, Poll};
use crate::task::{spawn_blocking, Context, Poll};
/// A TCP stream between a local and a remote socket.
///
@ -74,7 +73,7 @@ impl TcpStream {
let mut last_err = None;
for addr in addrs.to_socket_addrs().await? {
let res = blocking::spawn(move || {
let res = spawn_blocking(move || {
let std_stream = std::net::TcpStream::connect(addr)?;
let mio_stream = mio::net::TcpStream::from_stream(std_stream)?;
Ok(TcpStream {

@ -2,7 +2,7 @@
use crate::io;
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// Creates a new symbolic link on the filesystem.
///
@ -26,7 +26,7 @@ use crate::task::blocking;
pub async fn symlink<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
let src = src.as_ref().to_owned();
let dst = dst.as_ref().to_owned();
blocking::spawn(move || std::os::unix::fs::symlink(&src, &dst)).await
spawn_blocking(move || std::os::unix::fs::symlink(&src, &dst)).await
}
cfg_not_docs! {

@ -11,7 +11,7 @@ use crate::io;
use crate::net::driver::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path;
use crate::task::blocking;
use crate::task::spawn_blocking;
/// A Unix datagram socket.
///
@ -67,7 +67,7 @@ impl UnixDatagram {
/// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
let path = path.as_ref().to_owned();
let socket = blocking::spawn(move || mio_uds::UnixDatagram::bind(path)).await?;
let socket = spawn_blocking(move || mio_uds::UnixDatagram::bind(path)).await?;
Ok(UnixDatagram::new(socket))
}

@ -13,7 +13,7 @@ use crate::net::driver::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path;
use crate::stream::Stream;
use crate::task::{blocking, Context, Poll};
use crate::task::{spawn_blocking, Context, Poll};
/// A Unix domain socket server, listening for connections.
///
@ -68,7 +68,7 @@ impl UnixListener {
/// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
let path = path.as_ref().to_owned();
let listener = blocking::spawn(move || mio_uds::UnixListener::bind(path)).await?;
let listener = spawn_blocking(move || mio_uds::UnixListener::bind(path)).await?;
Ok(UnixListener {
watcher: Watcher::new(listener),

@ -12,7 +12,7 @@ use crate::io::{self, Read, Write};
use crate::net::driver::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path;
use crate::task::{blocking, Context, Poll};
use crate::task::{spawn_blocking, Context, Poll};
/// A Unix stream socket.
///
@ -58,7 +58,7 @@ impl UnixStream {
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
let path = path.as_ref().to_owned();
blocking::spawn(move || {
spawn_blocking(move || {
let std_stream = std::os::unix::net::UnixStream::connect(path)?;
let mio_stream = mio_uds::UnixStream::from_stream(std_stream)?;
Ok(UnixStream {

@ -1,21 +1,15 @@
use std::cell::{Cell, UnsafeCell};
use std::cell::Cell;
use std::mem::{self, ManuallyDrop};
use std::panic::{self, AssertUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable};
use std::thread;
use crossbeam_utils::sync::Parker;
use pin_project_lite::pin_project;
use kv_log_macro::trace;
use log::log_enabled;
use super::task;
use super::task_local;
use super::worker;
use crate::future::Future;
use crate::task::{Context, Poll, Waker};
use kv_log_macro::trace;
use crate::task::{Context, Poll, Task, Waker};
/// Spawns a task and blocks the current thread on its result.
///
@ -42,81 +36,43 @@ pub fn block_on<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
unsafe {
// A place on the stack where the result will be stored.
let out = &mut UnsafeCell::new(None);
// Wrap the future into one that stores the result into `out`.
let future = {
let out = out.get();
async move {
let future = CatchUnwindFuture {
future: AssertUnwindSafe(future),
};
*out = Some(future.await);
}
};
// Create a tag for the task.
let tag = task::Tag::new(None);
// Log this `block_on` operation.
let child_id = tag.task_id().as_u64();
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
// Create a new task handle.
let task = Task::new(None);
// Log this `block_on` operation.
if log_enabled!(log::Level::Trace) {
trace!("block_on", {
parent_id: parent_id,
child_id: child_id,
task_id: task.id().0,
parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0),
});
}
// Wrap the future into one that drops task-local variables on exit.
let future = task_local::add_finalizer(future);
let future = async move {
future.await;
trace!("block_on completed", {
parent_id: parent_id,
child_id: child_id,
});
};
// Pin the future onto the stack.
pin_utils::pin_mut!(future);
// Transmute the future into one that is futurestatic.
let future = mem::transmute::<
Pin<&'_ mut dyn Future<Output = ()>>,
Pin<&'static mut dyn Future<Output = ()>>,
>(future);
// Block on the future and and wait for it to complete.
worker::set_tag(&tag, || block(future));
// Take out the result.
match (*out.get()).take().unwrap() {
Ok(v) => v,
Err(err) => panic::resume_unwind(err),
let future = async move {
// Drop task-locals on exit.
defer! {
Task::get_current(|t| unsafe { t.drop_locals() });
}
}
}
pin_project! {
struct CatchUnwindFuture<F> {
#[pin]
future: F,
}
}
// Log completion on exit.
defer! {
if log_enabled!(log::Level::Trace) {
Task::get_current(|t| {
trace!("completed", {
task_id: t.id().0,
});
});
}
}
impl<F: Future + UnwindSafe> Future for CatchUnwindFuture<F> {
type Output = thread::Result<F::Output>;
future.await
};
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
panic::catch_unwind(AssertUnwindSafe(|| self.project().future.poll(cx)))?.map(Ok)
}
// Run the future as a task.
unsafe { Task::set_current(&task, || run(future)) }
}
fn block<F, T>(f: F) -> T
/// Blocks the current thread on a future's result.
fn run<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
@ -129,50 +85,59 @@ where
static CACHE: Cell<Option<Arc<Parker>>> = Cell::new(None);
}
pin_utils::pin_mut!(f);
// Virtual table for wakers based on `Arc<Parker>`.
static VTABLE: RawWakerVTable = {
unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
mem::forget(arc.clone());
RawWaker::new(ptr, &VTABLE)
}
unsafe fn wake_raw(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const Parker);
arc.unparker().unpark();
}
unsafe fn wake_by_ref_raw(ptr: *const ()) {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
arc.unparker().unpark();
}
unsafe fn drop_raw(ptr: *const ()) {
drop(Arc::from_raw(ptr as *const Parker))
}
RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)
};
// Pin the future on the stack.
pin_utils::pin_mut!(future);
CACHE.with(|cache| {
// Reuse a cached parker or create a new one for this invocation of `block`.
let arc_parker: Arc<Parker> = cache.take().unwrap_or_else(|| Arc::new(Parker::new()));
let ptr = (&*arc_parker as *const Parker) as *const ();
let vt = vtable();
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) };
// Create a waker and task context.
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &VTABLE))) };
let cx = &mut Context::from_waker(&waker);
let mut step = 0;
loop {
if let Poll::Ready(t) = f.as_mut().poll(cx) {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
// Save the parker for the next invocation of `block`.
cache.set(Some(arc_parker));
return t;
}
arc_parker.park();
// Yield a few times or park the current thread.
if step < 3 {
thread::yield_now();
step += 1;
} else {
arc_parker.park();
step = 0;
}
}
})
}
fn vtable() -> &'static RawWakerVTable {
unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
#![allow(clippy::redundant_clone)]
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
mem::forget(arc.clone());
RawWaker::new(ptr, vtable())
}
unsafe fn wake_raw(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const Parker);
arc.unparker().unpark();
}
unsafe fn wake_by_ref_raw(ptr: *const ()) {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
arc.unparker().unpark();
}
unsafe fn drop_raw(ptr: *const ()) {
drop(Arc::from_raw(ptr as *const Parker))
}
&RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)
}

@ -1,7 +1,11 @@
use super::pool;
use super::JoinHandle;
use kv_log_macro::trace;
use log::log_enabled;
use crate::future::Future;
use crate::io;
use crate::task::executor;
use crate::task::{JoinHandle, Task};
use crate::utils::abort_on_panic;
/// Task builder that configures the settings of a new task.
#[derive(Debug, Default)]
@ -11,11 +15,13 @@ pub struct Builder {
impl Builder {
/// Creates a new builder.
#[inline]
pub fn new() -> Builder {
Builder { name: None }
}
/// Configures the name of the task.
#[inline]
pub fn name(mut self, name: String) -> Builder {
self.name = Some(name);
self
@ -27,6 +33,52 @@ impl Builder {
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Ok(pool::get().spawn(future, self))
// Create a new task handle.
let task = Task::new(self.name);
// Log this `spawn` operation.
if log_enabled!(log::Level::Trace) {
trace!("spawn", {
task_id: task.id().0,
parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0),
});
}
let future = async move {
// Drop task-locals on exit.
defer! {
Task::get_current(|t| unsafe { t.drop_locals() });
}
// Log completion on exit.
defer! {
if log_enabled!(log::Level::Trace) {
Task::get_current(|t| {
trace!("completed", {
task_id: t.id().0,
});
});
}
}
future.await
};
let schedule = move |t| executor::schedule(Runnable(t));
let (task, handle) = async_task::spawn(future, schedule, task);
task.schedule();
Ok(JoinHandle::new(handle))
}
}
/// A runnable task.
pub(crate) struct Runnable(async_task::Task<Task>);
impl Runnable {
/// Runs the task by polling its future once.
pub fn run(self) {
unsafe {
Task::set_current(self.0.tag(), || abort_on_panic(|| self.0.run()));
}
}
}

@ -0,0 +1,28 @@
use crate::task::Task;
/// Returns a handle to the current task.
///
/// # Panics
///
/// This function will panic if not called within the context of a task created by [`block_on`],
/// [`spawn`], or [`Builder::spawn`].
///
/// [`block_on`]: fn.block_on.html
/// [`spawn`]: fn.spawn.html
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// println!("The name of this task is {:?}", task::current().name());
/// #
/// # })
/// ```
pub fn current() -> Task {
Task::get_current(|t| t.clone())
.expect("`task::current()` called outside the context of a task")
}

@ -0,0 +1,13 @@
//! Task executor.
//!
//! API bindings between `crate::task` and this module are very simple:
//!
//! * The only export is the `schedule` function.
//! * The only import is the `crate::task::Runnable` type.
pub(crate) use pool::schedule;
use sleepers::Sleepers;
mod pool;
mod sleepers;

@ -0,0 +1,140 @@
use std::cell::UnsafeCell;
use std::iter;
use std::thread;
use std::time::Duration;
use crossbeam_deque::{Injector, Stealer, Worker};
use once_cell::sync::Lazy;
use crate::task::executor::Sleepers;
use crate::task::Runnable;
use crate::utils::{abort_on_panic, random};
/// The state of an executor.
struct Pool {
/// The global queue of tasks.
injector: Injector<Runnable>,
/// Handles to local queues for stealing work from worker threads.
stealers: Vec<Stealer<Runnable>>,
/// Used for putting idle workers to sleep and notifying them when new tasks come in.
sleepers: Sleepers,
}
/// Global executor that runs spawned tasks.
static POOL: Lazy<Pool> = Lazy::new(|| {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
// Spawn worker threads.
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
thread::Builder::new()
.name("async-std/executor".to_string())
.spawn(|| abort_on_panic(|| main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
});
thread_local! {
/// Local task queue associated with the current worker thread.
static QUEUE: UnsafeCell<Option<Worker<Runnable>>> = UnsafeCell::new(None);
}
/// Schedules a new runnable task for execution.
pub(crate) fn schedule(task: Runnable) {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref() };
// If the current thread is a worker thread, push the task into its local task queue.
// Otherwise, push it into the global task queue.
match local {
None => POOL.injector.push(task),
Some(q) => q.push(task),
}
});
// Notify a sleeping worker that new work just came in.
POOL.sleepers.notify_one();
}
/// Main loop running a worker thread.
fn main_loop(local: Worker<Runnable>) {
// Initialize the local task queue.
QUEUE.with(|queue| unsafe { *queue.get() = Some(local) });
// The number of times the thread didn't find work in a row.
let mut step = 0;
loop {
// Try to find a runnable task.
match find_runnable() {
Some(task) => {
// Found. Now run the task.
task.run();
step = 0;
}
None => {
// Yield the current thread or put it to sleep.
match step {
0..=2 => {
thread::yield_now();
step += 1;
}
3 => {
thread::sleep(Duration::from_micros(10));
step += 1;
}
_ => {
POOL.sleepers.wait();
step = 0;
}
}
}
}
}
}
/// Find the next runnable task.
fn find_runnable() -> Option<Runnable> {
let pool = &*POOL;
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref().unwrap() };
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the global queue.
pool.injector
.steal_batch_and_pop(&local)
// Or try stealing a batch of tasks from one of the other threads.
.or_else(|| {
// First, pick a random starting point in the list of local queues.
let len = pool.stealers.len();
let start = random(len as u32) as usize;
// Try stealing a batch of tasks from each local queue starting from the
// chosen point.
let (l, r) = pool.stealers.split_at(start);
let rotated = r.iter().chain(l.iter());
rotated.map(|s| s.steal_batch_and_pop(&local)).collect()
})
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
})
}

@ -0,0 +1,56 @@
use std::future::Future;
use std::pin::Pin;
use crate::task::{Context, Poll, Task};
/// A handle that awaits the result of a task.
///
/// Dropping a [`JoinHandle`] will detach the task, meaning that there is no longer
/// a handle to the task and no way to `join` on it.
///
/// Created when a task is [spawned].
///
/// [spawned]: fn.spawn.html
#[derive(Debug)]
pub struct JoinHandle<T>(async_task::JoinHandle<T, Task>);
unsafe impl<T> Send for JoinHandle<T> {}
unsafe impl<T> Sync for JoinHandle<T> {}
impl<T> JoinHandle<T> {
/// Creates a new `JoinHandle`.
pub(crate) fn new(inner: async_task::JoinHandle<T, Task>) -> JoinHandle<T> {
JoinHandle(inner)
}
/// Returns a handle to the underlying task.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// let handle = task::spawn(async {
/// 1 + 2
/// });
/// println!("id = {}", handle.task().id());
/// #
/// # })
pub fn task(&self) -> &Task {
self.0.tag()
}
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => panic!("cannot await the result of a panicked task"),
Poll::Ready(Some(val)) => Poll::Ready(val),
}
}
}

@ -126,63 +126,35 @@ pub use async_macros::ready;
pub use block_on::block_on;
pub use builder::Builder;
pub use pool::spawn;
pub use current::current;
pub use join_handle::JoinHandle;
pub use sleep::sleep;
pub use task::{JoinHandle, Task, TaskId};
pub use spawn::spawn;
pub use task::Task;
pub use task_id::TaskId;
pub use task_local::{AccessError, LocalKey};
pub use worker::current;
#[cfg(any(feature = "unstable", test))]
pub use spawn_blocking::spawn_blocking;
#[cfg(not(any(feature = "unstable", test)))]
pub(crate) use spawn_blocking::spawn_blocking;
use builder::Runnable;
use task_local::LocalsMap;
mod block_on;
mod builder;
mod pool;
mod current;
mod executor;
mod join_handle;
mod sleep;
mod sleepers;
mod spawn;
mod spawn_blocking;
mod task;
mod task_id;
mod task_local;
mod worker;
pub(crate) mod blocking;
cfg_unstable! {
mod yield_now;
pub use yield_now::yield_now;
}
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. This
/// is useful to prevent long-running synchronous operations from blocking the main futures
/// executor.
///
/// See also: [`task::block_on`], [`task::spawn`].
///
/// [`task::block_on`]: fn.block_on.html
/// [`task::spawn`]: fn.spawn.html
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// task::spawn_blocking(|| {
/// println!("long-running task here");
/// }).await;
/// #
/// # })
/// ```
// Once this function stabilizes we should merge `blocking::spawn` into this so
// all code in our crate uses `task::blocking` too.
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub fn spawn_blocking<F, R>(f: F) -> task::JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
blocking::spawn(f)
mod yield_now;
}

@ -1,136 +0,0 @@
use std::iter;
use std::thread;
use crossbeam_deque::{Injector, Stealer, Worker};
use kv_log_macro::trace;
use once_cell::sync::Lazy;
use super::sleepers::Sleepers;
use super::task;
use super::task_local;
use super::worker;
use super::{Builder, JoinHandle};
use crate::future::Future;
use crate::utils::abort_on_panic;
/// Spawns a task.
///
/// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task.
///
/// [`std::thread`]: https://doc.rust-lang.org/std/thread/fn.spawn.html
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// let handle = task::spawn(async {
/// 1 + 2
/// });
///
/// assert_eq!(handle.await, 3);
/// #
/// # })
/// ```
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(future).expect("cannot spawn future")
}
pub(crate) struct Pool {
pub injector: Injector<task::Runnable>,
pub stealers: Vec<Stealer<task::Runnable>>,
pub sleepers: Sleepers,
}
impl Pool {
/// Spawn a future onto the pool.
pub fn spawn<F, T>(&self, future: F, builder: Builder) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let tag = task::Tag::new(builder.name);
// Log this `spawn` operation.
let child_id = tag.task_id().as_u64();
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
trace!("spawn", {
parent_id: parent_id,
child_id: child_id,
});
// Wrap the future into one that drops task-local variables on exit.
let future = unsafe { task_local::add_finalizer(future) };
// Wrap the future into one that logs completion on exit.
let future = async move {
let res = future.await;
trace!("spawn completed", {
parent_id: parent_id,
child_id: child_id,
});
res
};
let (task, handle) = async_task::spawn(future, worker::schedule, tag);
task.schedule();
JoinHandle::new(handle)
}
/// Find the next runnable task to run.
pub fn find_task(&self, local: &Worker<task::Runnable>) -> Option<task::Runnable> {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the injector queue.
self.injector
.steal_batch_and_pop(local)
// Or try stealing a bach of tasks from one of the other threads.
.or_else(|| {
self.stealers
.iter()
.map(|s| s.steal_batch_and_pop(local))
.collect()
})
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}
}
#[inline]
pub(crate) fn get() -> &'static Pool {
static POOL: Lazy<Pool> = Lazy::new(|| {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
// Spawn worker threads.
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| abort_on_panic(|| worker::main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
});
&*POOL
}

@ -0,0 +1,31 @@
use crate::future::Future;
use crate::task::{Builder, JoinHandle};
/// Spawns a task.
///
/// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task.
///
/// [`std::thread`]: https://doc.rust-lang.org/std/thread/fn.spawn.html
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// let handle = task::spawn(async {
/// 1 + 2
/// });
///
/// assert_eq!(handle.await, 3);
/// #
/// # })
/// ```
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(future).expect("cannot spawn task")
}

@ -1,5 +1,3 @@
//! A thread pool for running blocking functions asynchronously.
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;
@ -7,22 +5,63 @@ use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender};
use once_cell::sync::Lazy;
use crate::task::task::{JoinHandle, Tag};
use crate::utils::abort_on_panic;
use crate::task::{JoinHandle, Task};
use crate::utils::{abort_on_panic, random};
type Runnable = async_task::Task<Task>;
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. This
/// is useful to prevent long-running synchronous operations from blocking the main futures
/// executor.
///
/// See also: [`task::block_on`], [`task::spawn`].
///
/// [`task::block_on`]: fn.block_on.html
/// [`task::spawn`]: fn.spawn.html
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # #[cfg(feature = "unstable")]
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// task::spawn_blocking(|| {
/// println!("long-running task here");
/// }).await;
/// #
/// # })
/// ```
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (task, handle) = async_task::spawn(async { f() }, schedule, Task::new(None));
task.schedule();
JoinHandle::new(handle)
}
const MAX_THREADS: u64 = 10_000;
static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
struct Pool {
sender: Sender<async_task::Task<Tag>>,
receiver: Receiver<async_task::Task<Tag>>,
sender: Sender<Runnable>,
receiver: Receiver<Runnable>,
}
static POOL: Lazy<Pool> = Lazy::new(|| {
for _ in 0..2 {
thread::Builder::new()
.name("async-blocking-driver".to_string())
.name("async-std/blocking".to_string())
.spawn(|| {
abort_on_panic(|| {
for task in &POOL.receiver {
@ -66,7 +105,7 @@ fn maybe_create_another_blocking_thread() {
let rand_sleep_ms = u64::from(random(10_000));
thread::Builder::new()
.name("async-blocking-driver-dynamic".to_string())
.name("async-std/blocking".to_string())
.spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
@ -82,8 +121,8 @@ fn maybe_create_another_blocking_thread() {
// Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work.
fn schedule(t: async_task::Task<Tag>) {
if let Err(err) = POOL.sender.try_send(t) {
pub(crate) fn schedule(task: Runnable) {
if let Err(err) = POOL.sender.try_send(task) {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
// retry sending while blocking.
@ -91,45 +130,3 @@ fn schedule(t: async_task::Task<Tag>) {
POOL.sender.send(err.into_inner()).unwrap();
}
}
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub(crate) fn spawn<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let tag = Tag::new(None);
let future = async move { f() };
let (task, handle) = async_task::spawn(future, schedule, tag);
task.schedule();
JoinHandle::new(handle)
}
/// Generates a random number in `0..n`.
fn random(n: u32) -> u32 {
use std::cell::Cell;
use std::num::Wrapping;
thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1_406_868_647));
}
RNG.with(|rng| {
// This is the 32-bit variant of Xorshift.
//
// Source: https://en.wikipedia.org/wiki/Xorshift
let mut x = rng.get();
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
rng.set(x);
// This is a fast alternative to `x % n`.
//
// Author: Daniel Lemire
// Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32
})
}

@ -1,31 +1,74 @@
use std::cell::Cell;
use std::fmt;
use std::future::Future;
use std::i64;
use std::mem;
use std::num::NonZeroU64;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::mem::ManuallyDrop;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
use std::sync::Arc;
use super::task_local;
use crate::task::{Context, Poll};
use crate::task::{LocalsMap, TaskId};
use crate::utils::abort_on_panic;
thread_local! {
/// A pointer to the currently running task.
static CURRENT: Cell<*const Task> = Cell::new(ptr::null_mut());
}
/// The inner representation of a task handle.
struct Inner {
/// The task ID.
id: TaskId,
/// The optional task name.
name: Option<Box<str>>,
/// The map holding task-local values.
locals: LocalsMap,
}
impl Inner {
#[inline]
fn new(name: Option<String>) -> Inner {
Inner {
id: TaskId::generate(),
name: name.map(String::into_boxed_str),
locals: LocalsMap::new(),
}
}
}
/// A handle to a task.
#[derive(Clone)]
pub struct Task(Arc<Metadata>);
pub struct Task {
/// The inner representation.
///
/// This pointer is lazily initialized on first use. In most cases, the inner representation is
/// never touched and therefore we don't allocate it unless it's really needed.
inner: AtomicPtr<Inner>,
}
unsafe impl Send for Task {}
unsafe impl Sync for Task {}
impl Task {
/// Returns a reference to task metadata.
pub(crate) fn metadata(&self) -> &Metadata {
&self.0
/// Creates a new task handle.
///
/// If the task is unnamed, the inner representation of the task will be lazily allocated on
/// demand.
#[inline]
pub(crate) fn new(name: Option<String>) -> Task {
let inner = match name {
None => AtomicPtr::default(),
Some(name) => {
let raw = Arc::into_raw(Arc::new(Inner::new(Some(name))));
AtomicPtr::new(raw as *mut Inner)
}
};
Task { inner }
}
/// Gets the task's unique identifier.
#[inline]
pub fn id(&self) -> TaskId {
self.metadata().task_id
self.inner().id
}
/// Returns the name of this task.
@ -34,178 +77,101 @@ impl Task {
///
/// [`Builder::name`]: struct.Builder.html#method.name
pub fn name(&self) -> Option<&str> {
self.metadata().name.as_ref().map(|s| s.as_str())
self.inner().name.as_ref().map(|s| &**s)
}
}
impl fmt::Debug for Task {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Task").field("name", &self.name()).finish()
}
}
/// A handle that awaits the result of a task.
///
/// Dropping a [`JoinHandle`] will detach the task, meaning that there is no longer
/// a handle to the task and no way to `join` on it.
///
/// Created when a task is [spawned].
///
/// [spawned]: fn.spawn.html
#[derive(Debug)]
pub struct JoinHandle<T>(async_task::JoinHandle<T, Tag>);
unsafe impl<T> Send for JoinHandle<T> {}
unsafe impl<T> Sync for JoinHandle<T> {}
impl<T> JoinHandle<T> {
pub(crate) fn new(inner: async_task::JoinHandle<T, Tag>) -> JoinHandle<T> {
JoinHandle(inner)
/// Returns the map holding task-local values.
pub(crate) fn locals(&self) -> &LocalsMap {
&self.inner().locals
}
/// Returns a handle to the underlying task.
/// Drops all task-local values.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// let handle = task::spawn(async {
/// 1 + 2
/// });
/// println!("id = {}", handle.task().id());
/// #
/// # })
pub fn task(&self) -> &Task {
self.0.tag().task()
}
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => panic!("task has panicked"),
Poll::Ready(Some(val)) => Poll::Ready(val),
/// This method is only safe to call at the end of the task.
#[inline]
pub(crate) unsafe fn drop_locals(&self) {
let raw = self.inner.load(Ordering::Acquire);
if let Some(inner) = raw.as_mut() {
// Abort the process if dropping task-locals panics.
abort_on_panic(|| {
inner.locals.clear();
});
}
}
}
/// A unique identifier for a task.
///
/// # Examples
///
/// ```
/// #
/// use async_std::task;
///
/// task::block_on(async {
/// println!("id = {:?}", task::current().id());
/// })
/// ```
#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)]
pub struct TaskId(NonZeroU64);
impl TaskId {
pub(crate) fn new() -> TaskId {
static COUNTER: AtomicU64 = AtomicU64::new(1);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
if id > i64::MAX as u64 {
std::process::abort();
/// Returns the inner representation, initializing it on first use.
fn inner(&self) -> &Inner {
loop {
let raw = self.inner.load(Ordering::Acquire);
if !raw.is_null() {
return unsafe { &*raw };
}
let new = Arc::into_raw(Arc::new(Inner::new(None))) as *mut Inner;
if self.inner.compare_and_swap(raw, new, Ordering::AcqRel) != raw {
unsafe {
drop(Arc::from_raw(new));
}
}
}
unsafe { TaskId(NonZeroU64::new_unchecked(id)) }
}
pub(crate) fn as_u64(self) -> u64 {
self.0.get()
}
}
impl fmt::Display for TaskId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
/// Set a reference to the current task.
pub(crate) unsafe fn set_current<F, R>(task: *const Task, f: F) -> R
where
F: FnOnce() -> R,
{
CURRENT.with(|current| {
let old_task = current.replace(task);
defer! {
current.set(old_task);
}
f()
})
}
}
pub(crate) type Runnable = async_task::Task<Tag>;
pub(crate) struct Metadata {
pub task_id: TaskId,
pub name: Option<String>,
pub local_map: task_local::Map,
}
pub(crate) struct Tag {
task_id: TaskId,
raw_metadata: AtomicUsize,
}
impl Tag {
pub fn new(name: Option<String>) -> Tag {
let task_id = TaskId::new();
let opt_task = name.map(|name| {
Task(Arc::new(Metadata {
task_id,
name: Some(name),
local_map: task_local::Map::new(),
}))
});
Tag {
task_id,
raw_metadata: AtomicUsize::new(unsafe {
mem::transmute::<Option<Task>, usize>(opt_task)
}),
/// Gets a reference to the current task.
pub(crate) fn get_current<F, R>(f: F) -> Option<R>
where
F: FnOnce(&Task) -> R,
{
let res = CURRENT.try_with(|current| unsafe { current.get().as_ref().map(f) });
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
}
}
}
pub fn task(&self) -> &Task {
#[allow(clippy::transmute_ptr_to_ptr)]
unsafe {
let raw = self.raw_metadata.load(Ordering::Acquire);
if mem::transmute::<&usize, &Option<Task>>(&raw).is_none() {
let new = Some(Task(Arc::new(Metadata {
task_id: TaskId::new(),
name: None,
local_map: task_local::Map::new(),
})));
let new_raw = mem::transmute::<Option<Task>, usize>(new);
if self
.raw_metadata
.compare_exchange(raw, new_raw, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
let new = mem::transmute::<usize, Option<Task>>(new_raw);
drop(new);
}
impl Drop for Task {
fn drop(&mut self) {
// Deallocate the inner representation if it was initialized.
let raw = *self.inner.get_mut();
if !raw.is_null() {
unsafe {
drop(Arc::from_raw(raw));
}
#[allow(clippy::transmute_ptr_to_ptr)]
mem::transmute::<&AtomicUsize, &Option<Task>>(&self.raw_metadata)
.as_ref()
.unwrap()
}
}
}
pub fn task_id(&self) -> TaskId {
self.task_id
impl Clone for Task {
fn clone(&self) -> Task {
// We need to make sure the inner representation is initialized now so that this instance
// and the clone have raw pointers that point to the same `Arc<Inner>`.
let arc = unsafe { ManuallyDrop::new(Arc::from_raw(self.inner())) };
let raw = Arc::into_raw(Arc::clone(&arc));
Task {
inner: AtomicPtr::new(raw as *mut Inner),
}
}
}
impl Drop for Tag {
fn drop(&mut self) {
let raw = *self.raw_metadata.get_mut();
let opt_task = unsafe { mem::transmute::<usize, Option<Task>>(raw) };
drop(opt_task);
impl fmt::Debug for Task {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Task")
.field("id", &self.id())
.field("name", &self.name())
.finish()
}
}

@ -0,0 +1,35 @@
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
/// A unique identifier for a task.
///
/// # Examples
///
/// ```
/// use async_std::task;
///
/// task::block_on(async {
/// println!("id = {:?}", task::current().id());
/// })
/// ```
#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)]
pub struct TaskId(pub(crate) u64);
impl TaskId {
/// Generates a new `TaskId`.
pub(crate) fn generate() -> TaskId {
static COUNTER: AtomicU64 = AtomicU64::new(1);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
if id > u64::max_value() / 2 {
std::process::abort();
}
TaskId(id)
}
}
impl fmt::Display for TaskId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

@ -1,14 +1,9 @@
use std::cell::UnsafeCell;
use std::error::Error;
use std::fmt;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::sync::atomic::{AtomicU32, Ordering};
use once_cell::sync::Lazy;
use super::worker;
use crate::utils::abort_on_panic;
use crate::task::Task;
/// Declares task-local values.
///
@ -50,7 +45,7 @@ macro_rules! task_local {
$crate::task::LocalKey {
__init,
__key: ::std::sync::atomic::AtomicUsize::new(0),
__key: ::std::sync::atomic::AtomicU32::new(0),
}
};
);
@ -71,7 +66,7 @@ pub struct LocalKey<T: Send + 'static> {
pub __init: fn() -> T,
#[doc(hidden)]
pub __key: AtomicUsize,
pub __key: AtomicU32,
}
impl<T: Send + 'static> LocalKey<T> {
@ -154,14 +149,13 @@ impl<T: Send + 'static> LocalKey<T> {
where
F: FnOnce(&T) -> R,
{
worker::get_task(|task| unsafe {
Task::get_current(|task| unsafe {
// Prepare the numeric key, initialization function, and the map of task-locals.
let key = self.key();
let init = || Box::new((self.__init)()) as Box<dyn Send>;
let map = &task.metadata().local_map;
// Get the value in the map of task-locals, or initialize and insert one.
let value: *const dyn Send = map.get_or_insert(key, init);
let value: *const dyn Send = task.locals().get_or_insert(key, init);
// Call the closure with the value passed as an argument.
f(&*(value as *const T))
@ -171,24 +165,26 @@ impl<T: Send + 'static> LocalKey<T> {
/// Returns the numeric key associated with this task-local.
#[inline]
fn key(&self) -> usize {
fn key(&self) -> u32 {
#[cold]
fn init(key: &AtomicUsize) -> usize {
static COUNTER: Lazy<Mutex<usize>> = Lazy::new(|| Mutex::new(1));
fn init(key: &AtomicU32) -> u32 {
static COUNTER: AtomicU32 = AtomicU32::new(1);
let mut counter = COUNTER.lock().unwrap();
let prev = key.compare_and_swap(0, *counter, Ordering::AcqRel);
let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
if counter > u32::max_value() / 2 {
std::process::abort();
}
if prev == 0 {
*counter += 1;
*counter - 1
} else {
prev
match key.compare_and_swap(0, counter, Ordering::AcqRel) {
0 => counter,
k => k,
}
}
let key = self.__key.load(Ordering::Acquire);
if key == 0 { init(&self.__key) } else { key }
match self.__key.load(Ordering::Acquire) {
0 => init(&self.__key),
k => k,
}
}
}
@ -214,51 +210,55 @@ impl fmt::Display for AccessError {
impl Error for AccessError {}
/// A key-value entry in a map of task-locals.
struct Entry {
/// Key identifying the task-local variable.
key: u32,
/// Value stored in this entry.
value: Box<dyn Send>,
}
/// A map that holds task-locals.
pub(crate) struct Map {
/// A list of `(key, value)` entries sorted by the key.
entries: UnsafeCell<Vec<(usize, Box<dyn Send>)>>,
pub(crate) struct LocalsMap {
/// A list of key-value entries sorted by the key.
entries: UnsafeCell<Option<Vec<Entry>>>,
}
impl Map {
impl LocalsMap {
/// Creates an empty map of task-locals.
pub fn new() -> Map {
Map {
entries: UnsafeCell::new(Vec::new()),
pub fn new() -> LocalsMap {
LocalsMap {
entries: UnsafeCell::new(Some(Vec::new())),
}
}
/// Returns a thread-local value associated with `key` or inserts one constructed by `init`.
/// Returns a task-local value associated with `key` or inserts one constructed by `init`.
#[inline]
pub fn get_or_insert(&self, key: usize, init: impl FnOnce() -> Box<dyn Send>) -> &dyn Send {
let entries = unsafe { &mut *self.entries.get() };
let index = match entries.binary_search_by_key(&key, |e| e.0) {
Ok(i) => i,
Err(i) => {
entries.insert(i, (key, init()));
i
pub fn get_or_insert(&self, key: u32, init: impl FnOnce() -> Box<dyn Send>) -> &dyn Send {
match unsafe { (*self.entries.get()).as_mut() } {
None => panic!("can't access task-locals while the task is being dropped"),
Some(entries) => {
let index = match entries.binary_search_by_key(&key, |e| e.key) {
Ok(i) => i,
Err(i) => {
let value = init();
entries.insert(i, Entry { key, value });
i
}
};
&*entries[index].value
}
};
&*entries[index].1
}
}
/// Clears the map and drops all task-locals.
pub fn clear(&self) {
let entries = unsafe { &mut *self.entries.get() };
entries.clear();
}
}
// Wrap the future into one that drops task-local variables on exit.
pub(crate) unsafe fn add_finalizer<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
async move {
let res = f.await;
// Abort on panic because thread-local variables behave the same way.
abort_on_panic(|| worker::get_task(|task| task.metadata().local_map.clear()));
res
///
/// This method is only safe to call at the end of the task.
pub unsafe fn clear(&self) {
// Since destructors may attempt to access task-locals, we musnt't hold a mutable reference
// to the `Vec` while dropping them. Instead, we first take the `Vec` out and then drop it.
let entries = (*self.entries.get()).take();
drop(entries);
}
}

@ -1,110 +0,0 @@
use std::cell::Cell;
use std::ptr;
use crossbeam_deque::Worker;
use super::pool;
use super::task;
use super::Task;
use crate::utils::abort_on_panic;
/// Returns a handle to the current task.
///
/// # Panics
///
/// This function will panic if not called within the context of a task created by [`block_on`],
/// [`spawn`], or [`Builder::spawn`].
///
/// [`block_on`]: fn.block_on.html
/// [`spawn`]: fn.spawn.html
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// println!("The name of this task is {:?}", task::current().name());
/// #
/// # })
/// ```
pub fn current() -> Task {
get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task")
}
thread_local! {
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
}
pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
where
F: FnOnce() -> R,
{
struct ResetTag<'a>(&'a Cell<*const task::Tag>);
impl Drop for ResetTag<'_> {
fn drop(&mut self) {
self.0.set(ptr::null());
}
}
TAG.with(|t| {
t.set(tag);
let _guard = ResetTag(t);
f()
})
}
pub(crate) fn get_task<F, R>(f: F) -> Option<R>
where
F: FnOnce(&Task) -> R,
{
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
}
}
thread_local! {
static IS_WORKER: Cell<bool> = Cell::new(false);
static QUEUE: Cell<Option<Worker<task::Runnable>>> = Cell::new(None);
}
pub(crate) fn is_worker() -> bool {
IS_WORKER.with(|is_worker| is_worker.get())
}
fn get_queue<F: FnOnce(&Worker<task::Runnable>) -> T, T>(f: F) -> T {
QUEUE.with(|queue| {
let q = queue.take().unwrap();
let ret = f(&q);
queue.set(Some(q));
ret
})
}
pub(crate) fn schedule(task: task::Runnable) {
if is_worker() {
get_queue(|q| q.push(task));
} else {
pool::get().injector.push(task);
}
pool::get().sleepers.notify_one();
}
pub(crate) fn main_loop(worker: Worker<task::Runnable>) {
IS_WORKER.with(|is_worker| is_worker.set(true));
QUEUE.with(|queue| queue.set(Some(worker)));
loop {
match get_queue(|q| pool::get().find_task(q)) {
Some(task) => set_tag(task.tag(), || abort_on_panic(|| task.run())),
None => pool::get().sleepers.wait(),
}
}
}

@ -1,8 +1,8 @@
use std::pin::Pin;
use crate::future::Future;
use crate::task::{Context, Poll};
use std::pin::Pin;
/// Cooperatively gives up a timeslice to the task scheduler.
///
/// Calling this function will move the currently executing future to the back

@ -1,5 +1,4 @@
use std::mem;
use std::process;
/// Calls a function and aborts if it panics.
///
@ -10,7 +9,7 @@ pub fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
impl Drop for Bomb {
fn drop(&mut self) {
process::abort();
std::process::abort();
}
}
@ -20,6 +19,53 @@ pub fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
t
}
/// Generates a random number in `0..n`.
pub fn random(n: u32) -> u32 {
use std::cell::Cell;
use std::num::Wrapping;
thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1_406_868_647));
}
RNG.with(|rng| {
// This is the 32-bit variant of Xorshift.
//
// Source: https://en.wikipedia.org/wiki/Xorshift
let mut x = rng.get();
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
rng.set(x);
// This is a fast alternative to `x % n`.
//
// Author: Daniel Lemire
// Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32
})
}
/// Defers evaluation of a block of code until the end of the scope.
#[doc(hidden)]
macro_rules! defer {
($($body:tt)*) => {
let _guard = {
pub struct Guard<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> Drop for Guard<F> {
fn drop(&mut self) {
(self.0).take().map(|f| f());
}
}
Guard(Some(|| {
let _ = { $($body)* };
}))
};
};
}
/// Declares unstable items.
#[doc(hidden)]
macro_rules! cfg_unstable {

@ -1,3 +1,5 @@
#![cfg(feature = "unstable")]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

Loading…
Cancel
Save