Merge branch 'master' into stream_from_iter

pull/415/head
k-nasa 5 years ago
commit 2f3c867d44

@ -52,15 +52,12 @@ jobs:
steps: steps:
- uses: actions/checkout@master - uses: actions/checkout@master
- id: component
uses: actions-rs/components-nightly@v1
with:
component: rustfmt
- uses: actions-rs/toolchain@v1 - uses: actions-rs/toolchain@v1
with: with:
toolchain: ${{ steps.component.outputs.toolchain }} profile: minimal
toolchain: nightly
override: true override: true
components: rustfmt
- name: setup - name: setup
run: | run: |

@ -27,23 +27,23 @@ unstable = ["broadcaster"]
[dependencies] [dependencies]
async-macros = "1.0.0" async-macros = "1.0.0"
async-task = "1.0.0" async-task = "1.0.0"
broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] }
crossbeam-channel = "0.3.9" crossbeam-channel = "0.3.9"
crossbeam-deque = "0.7.1" crossbeam-deque = "0.7.1"
crossbeam-utils = "0.6.6" crossbeam-utils = "0.6.6"
futures-core-preview = "=0.3.0-alpha.19" futures-core-preview = "=0.3.0-alpha.19"
futures-io-preview = "=0.3.0-alpha.19" futures-io-preview = "=0.3.0-alpha.19"
futures-timer = "1.0.2" futures-timer = "1.0.2"
lazy_static = "1.4.0" kv-log-macro = "1.0.4"
log = { version = "0.4.8", features = ["kv_unstable"] } log = { version = "0.4.8", features = ["kv_unstable"] }
memchr = "2.2.1" memchr = "2.2.1"
mio = "0.6.19" mio = "0.6.19"
mio-uds = "0.6.7" mio-uds = "0.6.7"
num_cpus = "1.10.1" num_cpus = "1.10.1"
once_cell = "1.2.0"
pin-project-lite = "0.1"
pin-utils = "0.1.0-alpha.4" pin-utils = "0.1.0-alpha.4"
slab = "0.4.2" slab = "0.4.2"
kv-log-macro = "1.0.4"
broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] }
pin-project-lite = "0.1"
[dev-dependencies] [dev-dependencies]
femme = "1.2.0" femme = "1.2.0"

@ -61,7 +61,7 @@ syntax.
## Features ## Features
- __Modern:__ Built from the ground up for `std::future` and `async/await` with - __Modern:__ Built from the ground up for `std::future` and `async/await` with
blazing fast compilation times. blazing fast compilation time.
- __Fast:__ Our robust allocator and threadpool designs provide ultra-high - __Fast:__ Our robust allocator and threadpool designs provide ultra-high
throughput with predictably low latency. throughput with predictably low latency.
- __Intuitive:__ Complete parity with the stdlib means you only need to learn - __Intuitive:__ Complete parity with the stdlib means you only need to learn

@ -0,0 +1,42 @@
#![feature(test)]
extern crate test;
use std::sync::Arc;
use async_std::sync::Mutex;
use async_std::task;
use test::Bencher;
#[bench]
fn create(b: &mut Bencher) {
b.iter(|| Mutex::new(()));
}
#[bench]
fn contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(10, 1000)));
}
#[bench]
fn no_contention(b: &mut Bencher) {
b.iter(|| task::block_on(run(1, 10000)));
}
async fn run(task: usize, iter: usize) {
let m = Arc::new(Mutex::new(()));
let mut tasks = Vec::new();
for _ in 0..task {
let m = m.clone();
tasks.push(task::spawn(async move {
for _ in 0..iter {
let _ = m.lock().await;
}
}));
}
for t in tasks {
t.await;
}
}

@ -0,0 +1,11 @@
#![feature(test)]
extern crate test;
use async_std::task;
use test::Bencher;
#[bench]
fn block_on(b: &mut Bencher) {
b.iter(|| task::block_on(async {}));
}

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::{Path, PathBuf}; use crate::path::{Path, PathBuf};
use crate::task::blocking; use crate::task::spawn_blocking;
/// Returns the canonical form of a path. /// Returns the canonical form of a path.
/// ///
@ -32,5 +32,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn canonicalize<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> { pub async fn canonicalize<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::canonicalize(&path).map(Into::into)).await spawn_blocking(move || std::fs::canonicalize(&path).map(Into::into)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Copies the contents and permissions of a file to a new location. /// Copies the contents and permissions of a file to a new location.
/// ///
@ -41,5 +41,5 @@ use crate::task::blocking;
pub async fn copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> { pub async fn copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> {
let from = from.as_ref().to_owned(); let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned(); let to = to.as_ref().to_owned();
blocking::spawn(move || std::fs::copy(&from, &to)).await spawn_blocking(move || std::fs::copy(&from, &to)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Creates a new directory. /// Creates a new directory.
/// ///
@ -34,5 +34,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> { pub async fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::create_dir(path)).await spawn_blocking(move || std::fs::create_dir(path)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Creates a new directory and all of its parents if they are missing. /// Creates a new directory and all of its parents if they are missing.
/// ///
@ -29,5 +29,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> { pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::create_dir_all(path)).await spawn_blocking(move || std::fs::create_dir_all(path)).await
} }

@ -2,7 +2,7 @@ use std::future::Future;
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// A builder for creating directories with configurable options. /// A builder for creating directories with configurable options.
/// ///
@ -107,7 +107,7 @@ impl DirBuilder {
} }
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
async move { blocking::spawn(move || builder.create(path)).await } async move { spawn_blocking(move || builder.create(path)).await }
} }
} }

@ -5,7 +5,7 @@ use std::sync::Arc;
use crate::fs::{FileType, Metadata}; use crate::fs::{FileType, Metadata};
use crate::io; use crate::io;
use crate::path::PathBuf; use crate::path::PathBuf;
use crate::task::blocking; use crate::task::spawn_blocking;
/// An entry in a directory. /// An entry in a directory.
/// ///
@ -87,7 +87,7 @@ impl DirEntry {
/// ``` /// ```
pub async fn metadata(&self) -> io::Result<Metadata> { pub async fn metadata(&self) -> io::Result<Metadata> {
let inner = self.0.clone(); let inner = self.0.clone();
blocking::spawn(move || inner.metadata()).await spawn_blocking(move || inner.metadata()).await
} }
/// Reads the file type for this entry. /// Reads the file type for this entry.
@ -125,7 +125,7 @@ impl DirEntry {
/// ``` /// ```
pub async fn file_type(&self) -> io::Result<FileType> { pub async fn file_type(&self) -> io::Result<FileType> {
let inner = self.0.clone(); let inner = self.0.clone();
blocking::spawn(move || inner.file_type()).await spawn_blocking(move || inner.file_type()).await
} }
/// Returns the bare name of this entry without the leading path. /// Returns the bare name of this entry without the leading path.

@ -12,7 +12,7 @@ use crate::future;
use crate::io::{self, Read, Seek, SeekFrom, Write}; use crate::io::{self, Read, Seek, SeekFrom, Write};
use crate::path::Path; use crate::path::Path;
use crate::prelude::*; use crate::prelude::*;
use crate::task::{self, blocking, Context, Poll, Waker}; use crate::task::{self, spawn_blocking, Context, Poll, Waker};
/// An open file on the filesystem. /// An open file on the filesystem.
/// ///
@ -112,7 +112,7 @@ impl File {
/// ``` /// ```
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> { pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let file = blocking::spawn(move || std::fs::File::open(&path)).await?; let file = spawn_blocking(move || std::fs::File::open(&path)).await?;
Ok(File::new(file, true)) Ok(File::new(file, true))
} }
@ -147,7 +147,7 @@ impl File {
/// ``` /// ```
pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> { pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let file = blocking::spawn(move || std::fs::File::create(&path)).await?; let file = spawn_blocking(move || std::fs::File::create(&path)).await?;
Ok(File::new(file, true)) Ok(File::new(file, true))
} }
@ -180,7 +180,7 @@ impl File {
}) })
.await?; .await?;
blocking::spawn(move || state.file.sync_all()).await spawn_blocking(move || state.file.sync_all()).await
} }
/// Synchronizes OS-internal buffered contents to disk. /// Synchronizes OS-internal buffered contents to disk.
@ -216,7 +216,7 @@ impl File {
}) })
.await?; .await?;
blocking::spawn(move || state.file.sync_data()).await spawn_blocking(move || state.file.sync_data()).await
} }
/// Truncates or extends the file. /// Truncates or extends the file.
@ -249,7 +249,7 @@ impl File {
}) })
.await?; .await?;
blocking::spawn(move || state.file.set_len(size)).await spawn_blocking(move || state.file.set_len(size)).await
} }
/// Reads the file's metadata. /// Reads the file's metadata.
@ -268,7 +268,7 @@ impl File {
/// ``` /// ```
pub async fn metadata(&self) -> io::Result<Metadata> { pub async fn metadata(&self) -> io::Result<Metadata> {
let file = self.file.clone(); let file = self.file.clone();
blocking::spawn(move || file.metadata()).await spawn_blocking(move || file.metadata()).await
} }
/// Changes the permissions on the file. /// Changes the permissions on the file.
@ -297,7 +297,7 @@ impl File {
/// ``` /// ```
pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
let file = self.file.clone(); let file = self.file.clone();
blocking::spawn(move || file.set_permissions(perm)).await spawn_blocking(move || file.set_permissions(perm)).await
} }
} }
@ -692,7 +692,7 @@ impl LockGuard<State> {
self.register(cx); self.register(cx);
// Start a read operation asynchronously. // Start a read operation asynchronously.
blocking::spawn(move || { spawn_blocking(move || {
// Read some data from the file into the cache. // Read some data from the file into the cache.
let res = { let res = {
let State { file, cache, .. } = &mut *self; let State { file, cache, .. } = &mut *self;
@ -801,7 +801,7 @@ impl LockGuard<State> {
self.register(cx); self.register(cx);
// Start a write operation asynchronously. // Start a write operation asynchronously.
blocking::spawn(move || { spawn_blocking(move || {
match (&*self.file).write_all(&self.cache) { match (&*self.file).write_all(&self.cache) {
Ok(_) => { Ok(_) => {
// Switch to idle mode. // Switch to idle mode.
@ -834,7 +834,7 @@ impl LockGuard<State> {
self.register(cx); self.register(cx);
// Start a flush operation asynchronously. // Start a flush operation asynchronously.
blocking::spawn(move || { spawn_blocking(move || {
match (&*self.file).flush() { match (&*self.file).flush() {
Ok(()) => { Ok(()) => {
// Mark the file as flushed. // Mark the file as flushed.

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Creates a hard link on the filesystem. /// Creates a hard link on the filesystem.
/// ///
@ -32,5 +32,5 @@ use crate::task::blocking;
pub async fn hard_link<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> { pub async fn hard_link<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
let from = from.as_ref().to_owned(); let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned(); let to = to.as_ref().to_owned();
blocking::spawn(move || std::fs::hard_link(&from, &to)).await spawn_blocking(move || std::fs::hard_link(&from, &to)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Reads metadata for a path. /// Reads metadata for a path.
/// ///
@ -34,7 +34,7 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> { pub async fn metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::metadata(path)).await spawn_blocking(move || std::fs::metadata(path)).await
} }
cfg_not_docs! { cfg_not_docs! {

@ -3,7 +3,7 @@ use std::future::Future;
use crate::fs::File; use crate::fs::File;
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// A builder for opening files with configurable options. /// A builder for opening files with configurable options.
/// ///
@ -285,7 +285,7 @@ impl OpenOptions {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let options = self.0.clone(); let options = self.0.clone();
async move { async move {
let file = blocking::spawn(move || options.open(path)).await?; let file = spawn_blocking(move || options.open(path)).await?;
Ok(File::new(file, true)) Ok(File::new(file, true))
} }
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Reads the entire contents of a file as raw bytes. /// Reads the entire contents of a file as raw bytes.
/// ///
@ -36,5 +36,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn read<P: AsRef<Path>>(path: P) -> io::Result<Vec<u8>> { pub async fn read<P: AsRef<Path>>(path: P) -> io::Result<Vec<u8>> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::read(path)).await spawn_blocking(move || std::fs::read(path)).await
} }

@ -5,7 +5,7 @@ use crate::future::Future;
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{blocking, Context, JoinHandle, Poll}; use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
/// Returns a stream of entries in a directory. /// Returns a stream of entries in a directory.
/// ///
@ -45,7 +45,7 @@ use crate::task::{blocking, Context, JoinHandle, Poll};
/// ``` /// ```
pub async fn read_dir<P: AsRef<Path>>(path: P) -> io::Result<ReadDir> { pub async fn read_dir<P: AsRef<Path>>(path: P) -> io::Result<ReadDir> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::read_dir(path)) spawn_blocking(move || std::fs::read_dir(path))
.await .await
.map(ReadDir::new) .map(ReadDir::new)
} }
@ -91,7 +91,7 @@ impl Stream for ReadDir {
let mut inner = opt.take().unwrap(); let mut inner = opt.take().unwrap();
// Start the operation asynchronously. // Start the operation asynchronously.
self.0 = State::Busy(blocking::spawn(move || { self.0 = State::Busy(spawn_blocking(move || {
let next = inner.next(); let next = inner.next();
(inner, next) (inner, next)
})); }));

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::{Path, PathBuf}; use crate::path::{Path, PathBuf};
use crate::task::blocking; use crate::task::spawn_blocking;
/// Reads a symbolic link and returns the path it points to. /// Reads a symbolic link and returns the path it points to.
/// ///
@ -28,5 +28,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn read_link<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> { pub async fn read_link<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::read_link(path).map(Into::into)).await spawn_blocking(move || std::fs::read_link(path).map(Into::into)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Reads the entire contents of a file as a string. /// Reads the entire contents of a file as a string.
/// ///
@ -37,5 +37,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> { pub async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::read_to_string(path)).await spawn_blocking(move || std::fs::read_to_string(path)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Removes an empty directory. /// Removes an empty directory.
/// ///
@ -29,5 +29,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> { pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::remove_dir(path)).await spawn_blocking(move || std::fs::remove_dir(path)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Removes a directory and all of its contents. /// Removes a directory and all of its contents.
/// ///
@ -29,5 +29,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn remove_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> { pub async fn remove_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::remove_dir_all(path)).await spawn_blocking(move || std::fs::remove_dir_all(path)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Removes a file. /// Removes a file.
/// ///
@ -29,5 +29,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> { pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::remove_file(path)).await spawn_blocking(move || std::fs::remove_file(path)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Renames a file or directory to a new location. /// Renames a file or directory to a new location.
/// ///
@ -34,5 +34,5 @@ use crate::task::blocking;
pub async fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> { pub async fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
let from = from.as_ref().to_owned(); let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned(); let to = to.as_ref().to_owned();
blocking::spawn(move || std::fs::rename(&from, &to)).await spawn_blocking(move || std::fs::rename(&from, &to)).await
} }

@ -1,7 +1,7 @@
use crate::fs::Permissions; use crate::fs::Permissions;
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Changes the permissions of a file or directory. /// Changes the permissions of a file or directory.
/// ///
@ -32,5 +32,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn set_permissions<P: AsRef<Path>>(path: P, perm: Permissions) -> io::Result<()> { pub async fn set_permissions<P: AsRef<Path>>(path: P, perm: Permissions) -> io::Result<()> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::set_permissions(path, perm)).await spawn_blocking(move || std::fs::set_permissions(path, perm)).await
} }

@ -1,7 +1,7 @@
use crate::fs::Metadata; use crate::fs::Metadata;
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Reads metadata for a path without following symbolic links. /// Reads metadata for a path without following symbolic links.
/// ///
@ -34,5 +34,5 @@ use crate::task::blocking;
/// ``` /// ```
pub async fn symlink_metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> { pub async fn symlink_metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || std::fs::symlink_metadata(path)).await spawn_blocking(move || std::fs::symlink_metadata(path)).await
} }

@ -1,6 +1,6 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Writes a slice of bytes as the new contents of a file. /// Writes a slice of bytes as the new contents of a file.
/// ///
@ -33,5 +33,5 @@ use crate::task::blocking;
pub async fn write<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> { pub async fn write<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let contents = contents.as_ref().to_owned(); let contents = contents.as_ref().to_owned();
blocking::spawn(move || std::fs::write(path, contents)).await spawn_blocking(move || std::fs::write(path, contents)).await
} }

@ -3,7 +3,7 @@ use std::sync::Mutex;
use crate::future::Future; use crate::future::Future;
use crate::io::{self, Write}; use crate::io::{self, Write};
use crate::task::{blocking, Context, JoinHandle, Poll}; use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
/// Constructs a new handle to the standard error of the current process. /// Constructs a new handle to the standard error of the current process.
/// ///
@ -124,7 +124,7 @@ impl Write for Stderr {
inner.buf[..buf.len()].copy_from_slice(buf); inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || { *state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::write(&mut inner.stderr, &inner.buf); let res = std::io::Write::write(&mut inner.stderr, &inner.buf);
inner.last_op = Some(Operation::Write(res)); inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner)) State::Idle(Some(inner))
@ -152,7 +152,7 @@ impl Write for Stderr {
let mut inner = opt.take().unwrap(); let mut inner = opt.take().unwrap();
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || { *state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::flush(&mut inner.stderr); let res = std::io::Write::flush(&mut inner.stderr);
inner.last_op = Some(Operation::Flush(res)); inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner)) State::Idle(Some(inner))

@ -3,7 +3,7 @@ use std::sync::Mutex;
use crate::future::{self, Future}; use crate::future::{self, Future};
use crate::io::{self, Read}; use crate::io::{self, Read};
use crate::task::{blocking, Context, JoinHandle, Poll}; use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
/// Constructs a new handle to the standard input of the current process. /// Constructs a new handle to the standard input of the current process.
/// ///
@ -127,7 +127,7 @@ impl Stdin {
let mut inner = opt.take().unwrap(); let mut inner = opt.take().unwrap();
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || { *state = State::Busy(spawn_blocking(move || {
inner.line.clear(); inner.line.clear();
let res = inner.stdin.read_line(&mut inner.line); let res = inner.stdin.read_line(&mut inner.line);
inner.last_op = Some(Operation::ReadLine(res)); inner.last_op = Some(Operation::ReadLine(res));
@ -180,7 +180,7 @@ impl Read for Stdin {
} }
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || { *state = State::Busy(spawn_blocking(move || {
let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf); let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf);
inner.last_op = Some(Operation::Read(res)); inner.last_op = Some(Operation::Read(res));
State::Idle(Some(inner)) State::Idle(Some(inner))

@ -3,7 +3,7 @@ use std::sync::Mutex;
use crate::future::Future; use crate::future::Future;
use crate::io::{self, Write}; use crate::io::{self, Write};
use crate::task::{blocking, Context, JoinHandle, Poll}; use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
/// Constructs a new handle to the standard output of the current process. /// Constructs a new handle to the standard output of the current process.
/// ///
@ -124,7 +124,7 @@ impl Write for Stdout {
inner.buf[..buf.len()].copy_from_slice(buf); inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || { *state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::write(&mut inner.stdout, &inner.buf); let res = std::io::Write::write(&mut inner.stdout, &inner.buf);
inner.last_op = Some(Operation::Write(res)); inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner)) State::Idle(Some(inner))
@ -152,7 +152,7 @@ impl Write for Stdout {
let mut inner = opt.take().unwrap(); let mut inner = opt.take().unwrap();
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(move || { *state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::flush(&mut inner.stdout); let res = std::io::Write::flush(&mut inner.stdout);
inner.last_op = Some(Operation::Flush(res)); inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner)) State::Idle(Some(inner))

@ -5,7 +5,7 @@ use std::pin::Pin;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
use crate::task::{blocking, Context, JoinHandle, Poll}; use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
cfg_not_docs! { cfg_not_docs! {
macro_rules! ret { macro_rules! ret {
@ -194,7 +194,7 @@ impl ToSocketAddrs for (&str, u16) {
} }
let host = host.to_string(); let host = host.to_string();
let task = blocking::spawn(move || { let task = spawn_blocking(move || {
std::net::ToSocketAddrs::to_socket_addrs(&(host.as_str(), port)) std::net::ToSocketAddrs::to_socket_addrs(&(host.as_str(), port))
}); });
ToSocketAddrsFuture::Resolving(task) ToSocketAddrsFuture::Resolving(task)
@ -215,7 +215,7 @@ impl ToSocketAddrs for str {
} }
let addr = self.to_string(); let addr = self.to_string();
let task = blocking::spawn(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str())); let task = spawn_blocking(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str()));
ToSocketAddrsFuture::Resolving(task) ToSocketAddrsFuture::Resolving(task)
} }
} }

@ -1,8 +1,8 @@
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use lazy_static::lazy_static;
use mio::{self, Evented}; use mio::{self, Evented};
use once_cell::sync::Lazy;
use slab::Slab; use slab::Slab;
use crate::io; use crate::io;
@ -100,13 +100,12 @@ impl Reactor {
// } // }
} }
lazy_static! { /// The state of the global networking driver.
/// The state of the global networking driver. static REACTOR: Lazy<Reactor> = Lazy::new(|| {
static ref REACTOR: Reactor = {
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
// handles. // handles.
std::thread::Builder::new() std::thread::Builder::new()
.name("async-net-driver".to_string()) .name("async-std/net".to_string())
.spawn(move || { .spawn(move || {
// If the driver thread panics, there's not much we can do. It is not a // If the driver thread panics, there's not much we can do. It is not a
// recoverable error and there is no place to propagate it into so we just abort. // recoverable error and there is no place to propagate it into so we just abort.
@ -117,8 +116,7 @@ lazy_static! {
.expect("cannot start a thread driving blocking tasks"); .expect("cannot start a thread driving blocking tasks");
Reactor::new().expect("cannot initialize reactor") Reactor::new().expect("cannot initialize reactor")
}; });
}
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles. /// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
fn main_loop() -> io::Result<()> { fn main_loop() -> io::Result<()> {

@ -6,8 +6,7 @@ use crate::future;
use crate::io::{self, Read, Write}; use crate::io::{self, Read, Write};
use crate::net::driver::Watcher; use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
use crate::task::blocking; use crate::task::{spawn_blocking, Context, Poll};
use crate::task::{Context, Poll};
/// A TCP stream between a local and a remote socket. /// A TCP stream between a local and a remote socket.
/// ///
@ -74,7 +73,7 @@ impl TcpStream {
let mut last_err = None; let mut last_err = None;
for addr in addrs.to_socket_addrs().await? { for addr in addrs.to_socket_addrs().await? {
let res = blocking::spawn(move || { let res = spawn_blocking(move || {
let std_stream = std::net::TcpStream::connect(addr)?; let std_stream = std::net::TcpStream::connect(addr)?;
let mio_stream = mio::net::TcpStream::from_stream(std_stream)?; let mio_stream = mio::net::TcpStream::from_stream(std_stream)?;
Ok(TcpStream { Ok(TcpStream {

@ -2,7 +2,7 @@
use crate::io; use crate::io;
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// Creates a new symbolic link on the filesystem. /// Creates a new symbolic link on the filesystem.
/// ///
@ -26,7 +26,7 @@ use crate::task::blocking;
pub async fn symlink<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> { pub async fn symlink<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
let src = src.as_ref().to_owned(); let src = src.as_ref().to_owned();
let dst = dst.as_ref().to_owned(); let dst = dst.as_ref().to_owned();
blocking::spawn(move || std::os::unix::fs::symlink(&src, &dst)).await spawn_blocking(move || std::os::unix::fs::symlink(&src, &dst)).await
} }
cfg_not_docs! { cfg_not_docs! {

@ -11,7 +11,7 @@ use crate::io;
use crate::net::driver::Watcher; use crate::net::driver::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path; use crate::path::Path;
use crate::task::blocking; use crate::task::spawn_blocking;
/// A Unix datagram socket. /// A Unix datagram socket.
/// ///
@ -67,7 +67,7 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> { pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let socket = blocking::spawn(move || mio_uds::UnixDatagram::bind(path)).await?; let socket = spawn_blocking(move || mio_uds::UnixDatagram::bind(path)).await?;
Ok(UnixDatagram::new(socket)) Ok(UnixDatagram::new(socket))
} }

@ -13,7 +13,7 @@ use crate::net::driver::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path; use crate::path::Path;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{blocking, Context, Poll}; use crate::task::{spawn_blocking, Context, Poll};
/// A Unix domain socket server, listening for connections. /// A Unix domain socket server, listening for connections.
/// ///
@ -68,7 +68,7 @@ impl UnixListener {
/// ``` /// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> { pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let listener = blocking::spawn(move || mio_uds::UnixListener::bind(path)).await?; let listener = spawn_blocking(move || mio_uds::UnixListener::bind(path)).await?;
Ok(UnixListener { Ok(UnixListener {
watcher: Watcher::new(listener), watcher: Watcher::new(listener),

@ -12,7 +12,7 @@ use crate::io::{self, Read, Write};
use crate::net::driver::Watcher; use crate::net::driver::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path; use crate::path::Path;
use crate::task::{blocking, Context, Poll}; use crate::task::{spawn_blocking, Context, Poll};
/// A Unix stream socket. /// A Unix stream socket.
/// ///
@ -58,7 +58,7 @@ impl UnixStream {
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
blocking::spawn(move || { spawn_blocking(move || {
let std_stream = std::os::unix::net::UnixStream::connect(path)?; let std_stream = std::os::unix::net::UnixStream::connect(path)?;
let mio_stream = mio_uds::UnixStream::from_stream(std_stream)?; let mio_stream = mio_uds::UnixStream::from_stream(std_stream)?;
Ok(UnixStream { Ok(UnixStream {

@ -0,0 +1,63 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::fuse::Fuse;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
// Lexicographically compares the elements of this `Stream` with those
// of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct EqFuture<L: Stream, R: Stream> {
#[pin]
l: Fuse<L>,
#[pin]
r: Fuse<R>,
}
}
impl<L: Stream, R: Stream> EqFuture<L, R>
where
L::Item: PartialEq<R::Item>,
{
pub(super) fn new(l: L, r: R) -> Self {
EqFuture {
l: l.fuse(),
r: r.fuse(),
}
}
}
impl<L: Stream, R: Stream> Future for EqFuture<L, R>
where
L: Stream + Sized,
R: Stream + Sized,
L::Item: PartialEq<R::Item>,
{
type Output = bool;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx));
let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx));
if this.l.done && this.r.done {
return Poll::Ready(true);
}
match (l_val, r_val) {
(Some(l), Some(r)) if l != r => {
return Poll::Ready(false);
}
_ => {}
}
}
}
}

@ -0,0 +1,60 @@
use std::cmp::{Ord, Ordering};
use std::marker::PhantomData;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MinFuture<S, F, T> {
#[pin]
stream: S,
_compare: PhantomData<F>,
min: Option<T>,
}
}
impl<S, F, T> MinFuture<S, F, T> {
pub(super) fn new(stream: S) -> Self {
Self {
stream,
_compare: PhantomData,
min: None,
}
}
}
impl<S, F> Future for MinFuture<S, F, S::Item>
where
S: Stream,
S::Item: Ord,
F: FnMut(&S::Item, &S::Item) -> Ordering,
{
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
cx.waker().wake_by_ref();
match this.min.take() {
None => *this.min = Some(new),
Some(old) => match new.cmp(&old) {
Ordering::Less => *this.min = Some(new),
_ => *this.min = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(this.min.take()),
}
}
}

@ -26,6 +26,7 @@ mod any;
mod chain; mod chain;
mod cmp; mod cmp;
mod enumerate; mod enumerate;
mod eq;
mod filter; mod filter;
mod filter_map; mod filter_map;
mod find; mod find;
@ -41,11 +42,14 @@ mod le;
mod lt; mod lt;
mod map; mod map;
mod max_by; mod max_by;
mod min;
mod min_by; mod min_by;
mod min_by_key; mod min_by_key;
mod ne;
mod next; mod next;
mod nth; mod nth;
mod partial_cmp; mod partial_cmp;
mod position;
mod scan; mod scan;
mod skip; mod skip;
mod skip_while; mod skip_while;
@ -60,6 +64,7 @@ use all::AllFuture;
use any::AnyFuture; use any::AnyFuture;
use cmp::CmpFuture; use cmp::CmpFuture;
use enumerate::Enumerate; use enumerate::Enumerate;
use eq::EqFuture;
use filter_map::FilterMap; use filter_map::FilterMap;
use find::FindFuture; use find::FindFuture;
use find_map::FindMapFuture; use find_map::FindMapFuture;
@ -71,11 +76,14 @@ use last::LastFuture;
use le::LeFuture; use le::LeFuture;
use lt::LtFuture; use lt::LtFuture;
use max_by::MaxByFuture; use max_by::MaxByFuture;
use min::MinFuture;
use min_by::MinByFuture; use min_by::MinByFuture;
use min_by_key::MinByKeyFuture; use min_by_key::MinByKeyFuture;
use ne::NeFuture;
use next::NextFuture; use next::NextFuture;
use nth::NthFuture; use nth::NthFuture;
use partial_cmp::PartialCmpFuture; use partial_cmp::PartialCmpFuture;
use position::PositionFuture;
use try_fold::TryFoldFuture; use try_fold::TryFoldFuture;
use try_for_each::TryForEeachFuture; use try_for_each::TryForEeachFuture;
@ -760,6 +768,39 @@ extension_trait! {
MinByFuture::new(self, compare) MinByFuture::new(self, compare)
} }
#[doc = r#"
Returns the element that gives the minimum value. If several elements are equally minimum,
the first element is returned. If the stream is empty, `None` is returned.
# Examples
```ignore
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let min = s.clone().min().await;
assert_eq!(min, Some(1));
let min = VecDeque::<usize>::new().min().await;
assert_eq!(min, None);
#
# }) }
```
"#]
fn min<F>(
self,
) -> impl Future<Output = Option<Self::Item>> [MinFuture<Self, F, Self::Item>]
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
{
MinFuture::new(self)
}
#[doc = r#" #[doc = r#"
Returns the element that gives the maximum value with respect to the Returns the element that gives the maximum value with respect to the
specified comparison function. If several elements are equally maximum, specified comparison function. If several elements are equally maximum,
@ -1548,6 +1589,45 @@ extension_trait! {
PartialCmpFuture::new(self, other) PartialCmpFuture::new(self, other)
} }
#[doc = r#"
Searches for an element in a Stream that satisfies a predicate, returning
its index.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let res = s.clone().position(|x| *x == 1).await;
assert_eq!(res, Some(0));
let res = s.clone().position(|x| *x == 2).await;
assert_eq!(res, Some(1));
let res = s.clone().position(|x| *x == 3).await;
assert_eq!(res, Some(2));
let res = s.clone().position(|x| *x == 4).await;
assert_eq!(res, None);
#
# }) }
```
"#]
fn position<P>(
self,
predicate: P
) -> impl Future<Output = Option<usize>> [PositionFuture<Self, P>]
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
PositionFuture::new(self, predicate)
}
#[doc = r#" #[doc = r#"
Lexicographically compares the elements of this `Stream` with those Lexicographically compares the elements of this `Stream` with those
of another using 'Ord'. of another using 'Ord'.
@ -1586,6 +1666,39 @@ extension_trait! {
CmpFuture::new(self, other) CmpFuture::new(self, other)
} }
#[doc = r#"
Determines if the elements of this `Stream` are lexicographically
not equal to those of another.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
let single: VecDeque<isize> = vec![1].into_iter().collect();
let single_ne: VecDeque<isize> = vec![10].into_iter().collect();
let multi: VecDeque<isize> = vec![1,2].into_iter().collect();
let multi_ne: VecDeque<isize> = vec![1,5].into_iter().collect();
assert_eq!(single.clone().ne(single.clone()).await, false);
assert_eq!(single_ne.clone().ne(single.clone()).await, true);
assert_eq!(multi.clone().ne(single_ne.clone()).await, true);
assert_eq!(multi_ne.clone().ne(multi.clone()).await, true);
#
# }) }
```
"#]
fn ne<S>(
self,
other: S
) -> impl Future<Output = bool> [NeFuture<Self, S>]
where
Self: Sized + Stream,
S: Sized + Stream,
<Self as Stream>::Item: PartialEq<S::Item>,
{
NeFuture::new(self, other)
}
#[doc = r#" #[doc = r#"
Determines if the elements of this `Stream` are lexicographically Determines if the elements of this `Stream` are lexicographically
greater than or equal to those of another. greater than or equal to those of another.
@ -1622,6 +1735,42 @@ extension_trait! {
GeFuture::new(self, other) GeFuture::new(self, other)
} }
#[doc = r#"
Determines if the elements of this `Stream` are lexicographically
equal to those of another.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
let single: VecDeque<isize> = vec![1].into_iter().collect();
let single_eq: VecDeque<isize> = vec![10].into_iter().collect();
let multi: VecDeque<isize> = vec![1,2].into_iter().collect();
let multi_eq: VecDeque<isize> = vec![1,5].into_iter().collect();
assert_eq!(single.clone().eq(single.clone()).await, true);
assert_eq!(single_eq.clone().eq(single.clone()).await, false);
assert_eq!(multi.clone().eq(single_eq.clone()).await, false);
assert_eq!(multi_eq.clone().eq(multi.clone()).await, false);
#
# }) }
```
"#]
fn eq<S>(
self,
other: S
) -> impl Future<Output = bool> [EqFuture<Self, S>]
where
Self: Sized + Stream,
S: Sized + Stream,
<Self as Stream>::Item: PartialEq<S::Item>,
{
EqFuture::new(self, other)
}
#[doc = r#" #[doc = r#"
Determines if the elements of this `Stream` are lexicographically Determines if the elements of this `Stream` are lexicographically
greater than those of another. greater than those of another.

@ -0,0 +1,65 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::fuse::Fuse;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
// Lexicographically compares the elements of this `Stream` with those
// of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NeFuture<L: Stream, R: Stream> {
#[pin]
l: Fuse<L>,
#[pin]
r: Fuse<R>,
}
}
impl<L: Stream, R: Stream> NeFuture<L, R>
where
L::Item: PartialEq<R::Item>,
{
pub(super) fn new(l: L, r: R) -> Self {
Self {
l: l.fuse(),
r: r.fuse(),
}
}
}
impl<L: Stream, R: Stream> Future for NeFuture<L, R>
where
L: Stream + Sized,
R: Stream + Sized,
L::Item: PartialEq<R::Item>,
{
type Output = bool;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx));
let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx));
if this.l.done || this.r.done {
return Poll::Ready(false);
}
match (l_val, r_val) {
(Some(l), Some(r)) if l == r => {
continue;
}
_ => {
return Poll::Ready(true);
}
}
}
}
}

@ -0,0 +1,51 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct PositionFuture<S, P> {
#[pin]
stream: S,
predicate: P,
index:usize,
}
}
impl<S, P> PositionFuture<S, P> {
pub(super) fn new(stream: S, predicate: P) -> Self {
PositionFuture {
stream,
predicate,
index: 0,
}
}
}
impl<S, P> Future for PositionFuture<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Output = Option<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)),
Some(_) => {
cx.waker().wake_by_ref();
*this.index += 1;
Poll::Pending
}
None => Poll::Ready(None),
}
}
}

@ -4,17 +4,17 @@ use std::future::Future;
use std::isize; use std::isize;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::mem; use std::mem;
use std::ops::{Deref, DerefMut};
use std::pin::Pin; use std::pin::Pin;
use std::process; use std::process;
use std::ptr; use std::ptr;
use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{self, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll, Waker}; use std::task::{Context, Poll};
use crossbeam_utils::{Backoff, CachePadded}; use crossbeam_utils::{Backoff, CachePadded};
use futures_core::stream::Stream;
use slab::Slab; use crate::stream::Stream;
use crate::sync::WakerSet;
/// Creates a bounded multi-producer multi-consumer channel. /// Creates a bounded multi-producer multi-consumer channel.
/// ///
@ -128,7 +128,7 @@ impl<T> Sender<T> {
/// ``` /// ```
pub async fn send(&self, msg: T) { pub async fn send(&self, msg: T) {
struct SendFuture<'a, T> { struct SendFuture<'a, T> {
sender: &'a Sender<T>, channel: &'a Channel<T>,
msg: Option<T>, msg: Option<T>,
opt_key: Option<usize>, opt_key: Option<usize>,
} }
@ -142,23 +142,23 @@ impl<T> Sender<T> {
let msg = self.msg.take().unwrap(); let msg = self.msg.take().unwrap();
// Try sending the message. // Try sending the message.
let poll = match self.sender.channel.push(msg) { let poll = match self.channel.try_send(msg) {
Ok(()) => Poll::Ready(()), Ok(()) => Poll::Ready(()),
Err(PushError::Disconnected(msg)) => { Err(TrySendError::Disconnected(msg)) => {
self.msg = Some(msg); self.msg = Some(msg);
Poll::Pending Poll::Pending
} }
Err(PushError::Full(msg)) => { Err(TrySendError::Full(msg)) => {
// Register the current task. // Insert this send operation.
match self.opt_key { match self.opt_key {
None => self.opt_key = Some(self.sender.channel.sends.register(cx)), None => self.opt_key = Some(self.channel.send_wakers.insert(cx)),
Some(key) => self.sender.channel.sends.reregister(key, cx), Some(key) => self.channel.send_wakers.update(key, cx),
} }
// Try sending the message again. // Try sending the message again.
match self.sender.channel.push(msg) { match self.channel.try_send(msg) {
Ok(()) => Poll::Ready(()), Ok(()) => Poll::Ready(()),
Err(PushError::Disconnected(msg)) | Err(PushError::Full(msg)) => { Err(TrySendError::Disconnected(msg)) | Err(TrySendError::Full(msg)) => {
self.msg = Some(msg); self.msg = Some(msg);
Poll::Pending Poll::Pending
} }
@ -167,10 +167,9 @@ impl<T> Sender<T> {
}; };
if poll.is_ready() { if poll.is_ready() {
// If the current task was registered, unregister now. // If the current task is in the set, remove it.
if let Some(key) = self.opt_key.take() { if let Some(key) = self.opt_key.take() {
// `true` means the send operation is completed. self.channel.send_wakers.complete(key);
self.sender.channel.sends.unregister(key, true);
} }
} }
@ -180,16 +179,16 @@ impl<T> Sender<T> {
impl<T> Drop for SendFuture<'_, T> { impl<T> Drop for SendFuture<'_, T> {
fn drop(&mut self) { fn drop(&mut self) {
// If the current task was registered, unregister now. // If the current task is still in the set, that means it is being cancelled now.
// Wake up another task instead.
if let Some(key) = self.opt_key { if let Some(key) = self.opt_key {
// `false` means the send operation is cancelled. self.channel.send_wakers.cancel(key);
self.sender.channel.sends.unregister(key, false);
} }
} }
} }
SendFuture { SendFuture {
sender: self, channel: &self.channel,
msg: Some(msg), msg: Some(msg),
opt_key: None, opt_key: None,
} }
@ -340,7 +339,7 @@ pub struct Receiver<T> {
/// The inner channel. /// The inner channel.
channel: Arc<Channel<T>>, channel: Arc<Channel<T>>,
/// The registration key for this receiver in the `channel.streams` registry. /// The key for this receiver in the `channel.stream_wakers` set.
opt_key: Option<usize>, opt_key: Option<usize>,
} }
@ -382,16 +381,20 @@ impl<T> Receiver<T> {
type Output = Option<T>; type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
poll_recv(&self.channel, &self.channel.recvs, &mut self.opt_key, cx) poll_recv(
&self.channel,
&self.channel.recv_wakers,
&mut self.opt_key,
cx,
)
} }
} }
impl<T> Drop for RecvFuture<'_, T> { impl<T> Drop for RecvFuture<'_, T> {
fn drop(&mut self) { fn drop(&mut self) {
// If the current task was registered, unregister now. // If the current task is still in the set, that means it is being cancelled now.
if let Some(key) = self.opt_key { if let Some(key) = self.opt_key {
// `false` means the receive operation is cancelled. self.channel.recv_wakers.cancel(key);
self.channel.recvs.unregister(key, false);
} }
} }
} }
@ -484,10 +487,9 @@ impl<T> Receiver<T> {
impl<T> Drop for Receiver<T> { impl<T> Drop for Receiver<T> {
fn drop(&mut self) { fn drop(&mut self) {
// If the current task was registered as blocked on this stream, unregister now. // If the current task is still in the stream set, that means it is being cancelled now.
if let Some(key) = self.opt_key { if let Some(key) = self.opt_key {
// `false` means the last request for a stream item is cancelled. self.channel.stream_wakers.cancel(key);
self.channel.streams.unregister(key, false);
} }
// Decrement the receiver count and disconnect the channel if it drops down to zero. // Decrement the receiver count and disconnect the channel if it drops down to zero.
@ -518,7 +520,12 @@ impl<T> Stream for Receiver<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = &mut *self; let this = &mut *self;
poll_recv(&this.channel, &this.channel.streams, &mut this.opt_key, cx) poll_recv(
&this.channel,
&this.channel.stream_wakers,
&mut this.opt_key,
cx,
)
} }
} }
@ -530,39 +537,38 @@ impl<T> fmt::Debug for Receiver<T> {
/// Polls a receive operation on a channel. /// Polls a receive operation on a channel.
/// ///
/// If the receive operation is blocked, the current task will be registered in `registry` and its /// If the receive operation is blocked, the current task will be inserted into `wakers` and its
/// registration key will then be stored in `opt_key`. /// associated key will then be stored in `opt_key`.
fn poll_recv<T>( fn poll_recv<T>(
channel: &Channel<T>, channel: &Channel<T>,
registry: &Registry, wakers: &WakerSet,
opt_key: &mut Option<usize>, opt_key: &mut Option<usize>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
) -> Poll<Option<T>> { ) -> Poll<Option<T>> {
// Try receiving a message. // Try receiving a message.
let poll = match channel.pop() { let poll = match channel.try_recv() {
Ok(msg) => Poll::Ready(Some(msg)), Ok(msg) => Poll::Ready(Some(msg)),
Err(PopError::Disconnected) => Poll::Ready(None), Err(TryRecvError::Disconnected) => Poll::Ready(None),
Err(PopError::Empty) => { Err(TryRecvError::Empty) => {
// Register the current task. // Insert this receive operation.
match *opt_key { match *opt_key {
None => *opt_key = Some(registry.register(cx)), None => *opt_key = Some(wakers.insert(cx)),
Some(key) => registry.reregister(key, cx), Some(key) => wakers.update(key, cx),
} }
// Try receiving a message again. // Try receiving a message again.
match channel.pop() { match channel.try_recv() {
Ok(msg) => Poll::Ready(Some(msg)), Ok(msg) => Poll::Ready(Some(msg)),
Err(PopError::Disconnected) => Poll::Ready(None), Err(TryRecvError::Disconnected) => Poll::Ready(None),
Err(PopError::Empty) => Poll::Pending, Err(TryRecvError::Empty) => Poll::Pending,
} }
} }
}; };
if poll.is_ready() { if poll.is_ready() {
// If the current task was registered, unregister now. // If the current task is in the set, remove it.
if let Some(key) = opt_key.take() { if let Some(key) = opt_key.take() {
// `true` means the receive operation is completed. wakers.complete(key);
registry.unregister(key, true);
} }
} }
@ -612,13 +618,13 @@ struct Channel<T> {
mark_bit: usize, mark_bit: usize,
/// Send operations waiting while the channel is full. /// Send operations waiting while the channel is full.
sends: Registry, send_wakers: WakerSet,
/// Receive operations waiting while the channel is empty and not disconnected. /// Receive operations waiting while the channel is empty and not disconnected.
recvs: Registry, recv_wakers: WakerSet,
/// Streams waiting while the channel is empty and not disconnected. /// Streams waiting while the channel is empty and not disconnected.
streams: Registry, stream_wakers: WakerSet,
/// The number of currently active `Sender`s. /// The number of currently active `Sender`s.
sender_count: AtomicUsize, sender_count: AtomicUsize,
@ -672,17 +678,17 @@ impl<T> Channel<T> {
mark_bit, mark_bit,
head: CachePadded::new(AtomicUsize::new(head)), head: CachePadded::new(AtomicUsize::new(head)),
tail: CachePadded::new(AtomicUsize::new(tail)), tail: CachePadded::new(AtomicUsize::new(tail)),
sends: Registry::new(), send_wakers: WakerSet::new(),
recvs: Registry::new(), recv_wakers: WakerSet::new(),
streams: Registry::new(), stream_wakers: WakerSet::new(),
sender_count: AtomicUsize::new(1), sender_count: AtomicUsize::new(1),
receiver_count: AtomicUsize::new(1), receiver_count: AtomicUsize::new(1),
_marker: PhantomData, _marker: PhantomData,
} }
} }
/// Attempts to push a message. /// Attempts to send a message.
fn push(&self, msg: T) -> Result<(), PushError<T>> { fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
let backoff = Backoff::new(); let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed); let mut tail = self.tail.load(Ordering::Relaxed);
@ -721,10 +727,10 @@ impl<T> Channel<T> {
slot.stamp.store(stamp, Ordering::Release); slot.stamp.store(stamp, Ordering::Release);
// Wake a blocked receive operation. // Wake a blocked receive operation.
self.recvs.notify_one(); self.recv_wakers.notify_one();
// Wake all blocked streams. // Wake all blocked streams.
self.streams.notify_all(); self.stream_wakers.notify_all();
return Ok(()); return Ok(());
} }
@ -743,9 +749,9 @@ impl<T> Channel<T> {
// Check if the channel is disconnected. // Check if the channel is disconnected.
if tail & self.mark_bit != 0 { if tail & self.mark_bit != 0 {
return Err(PushError::Disconnected(msg)); return Err(TrySendError::Disconnected(msg));
} else { } else {
return Err(PushError::Full(msg)); return Err(TrySendError::Full(msg));
} }
} }
@ -759,8 +765,8 @@ impl<T> Channel<T> {
} }
} }
/// Attempts to pop a message. /// Attempts to receive a message.
fn pop(&self) -> Result<T, PopError> { fn try_recv(&self) -> Result<T, TryRecvError> {
let backoff = Backoff::new(); let backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed); let mut head = self.head.load(Ordering::Relaxed);
@ -799,7 +805,7 @@ impl<T> Channel<T> {
slot.stamp.store(stamp, Ordering::Release); slot.stamp.store(stamp, Ordering::Release);
// Wake a blocked send operation. // Wake a blocked send operation.
self.sends.notify_one(); self.send_wakers.notify_one();
return Ok(msg); return Ok(msg);
} }
@ -816,10 +822,10 @@ impl<T> Channel<T> {
if (tail & !self.mark_bit) == head { if (tail & !self.mark_bit) == head {
// If the channel is disconnected... // If the channel is disconnected...
if tail & self.mark_bit != 0 { if tail & self.mark_bit != 0 {
return Err(PopError::Disconnected); return Err(TryRecvError::Disconnected);
} else { } else {
// Otherwise, the receive operation is not ready. // Otherwise, the receive operation is not ready.
return Err(PopError::Empty); return Err(TryRecvError::Empty);
} }
} }
@ -888,9 +894,9 @@ impl<T> Channel<T> {
if tail & self.mark_bit == 0 { if tail & self.mark_bit == 0 {
// Notify everyone blocked on this channel. // Notify everyone blocked on this channel.
self.sends.notify_all(); self.send_wakers.notify_all();
self.recvs.notify_all(); self.recv_wakers.notify_all();
self.streams.notify_all(); self.stream_wakers.notify_all();
} }
} }
} }
@ -921,8 +927,8 @@ impl<T> Drop for Channel<T> {
} }
} }
/// An error returned from the `push()` method. /// An error returned from the `try_send()` method.
enum PushError<T> { enum TrySendError<T> {
/// The channel is full but not disconnected. /// The channel is full but not disconnected.
Full(T), Full(T),
@ -930,203 +936,11 @@ enum PushError<T> {
Disconnected(T), Disconnected(T),
} }
/// An error returned from the `pop()` method. /// An error returned from the `try_recv()` method.
enum PopError { enum TryRecvError {
/// The channel is empty but not disconnected. /// The channel is empty but not disconnected.
Empty, Empty,
/// The channel is empty and disconnected. /// The channel is empty and disconnected.
Disconnected, Disconnected,
} }
/// A list of blocked channel operations.
struct Blocked {
/// A list of registered channel operations.
///
/// Each entry has a waker associated with the task that is executing the operation. If the
/// waker is set to `None`, that means the task has been woken up but hasn't removed itself
/// from the registry yet.
entries: Slab<Option<Waker>>,
/// The number of wakers in the entry list.
waker_count: usize,
}
/// A registry of blocked channel operations.
///
/// Blocked operations register themselves in a registry. Successful operations on the opposite
/// side of the channel wake blocked operations in the registry.
struct Registry {
/// A list of blocked channel operations.
blocked: Spinlock<Blocked>,
/// Set to `true` if there are no wakers in the registry.
///
/// Note that this either means there are no entries in the registry, or that all entries have
/// been notified.
is_empty: AtomicBool,
}
impl Registry {
/// Creates a new registry.
fn new() -> Registry {
Registry {
blocked: Spinlock::new(Blocked {
entries: Slab::new(),
waker_count: 0,
}),
is_empty: AtomicBool::new(true),
}
}
/// Registers a blocked channel operation and returns a key associated with it.
fn register(&self, cx: &Context<'_>) -> usize {
let mut blocked = self.blocked.lock();
// Insert a new entry into the list of blocked tasks.
let w = cx.waker().clone();
let key = blocked.entries.insert(Some(w));
blocked.waker_count += 1;
if blocked.waker_count == 1 {
self.is_empty.store(false, Ordering::SeqCst);
}
key
}
/// Re-registers a blocked channel operation by filling in its waker.
fn reregister(&self, key: usize, cx: &Context<'_>) {
let mut blocked = self.blocked.lock();
let was_none = blocked.entries[key].is_none();
let w = cx.waker().clone();
blocked.entries[key] = Some(w);
if was_none {
blocked.waker_count += 1;
if blocked.waker_count == 1 {
self.is_empty.store(false, Ordering::SeqCst);
}
}
}
/// Unregisters a channel operation.
///
/// If `completed` is `true`, the operation will be removed from the registry. If `completed`
/// is `false`, that means the operation was cancelled so another one will be notified.
fn unregister(&self, key: usize, completed: bool) {
let mut blocked = self.blocked.lock();
let mut removed = false;
match blocked.entries.remove(key) {
Some(_) => removed = true,
None => {
if !completed {
// This operation was cancelled. Notify another one.
if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() {
if let Some(w) = opt_waker.take() {
w.wake();
removed = true;
}
}
}
}
}
if removed {
blocked.waker_count -= 1;
if blocked.waker_count == 0 {
self.is_empty.store(true, Ordering::SeqCst);
}
}
}
/// Notifies one blocked channel operation.
#[inline]
fn notify_one(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
let mut blocked = self.blocked.lock();
if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() {
// If there is no waker in this entry, that means it was already woken.
if let Some(w) = opt_waker.take() {
w.wake();
blocked.waker_count -= 1;
if blocked.waker_count == 0 {
self.is_empty.store(true, Ordering::SeqCst);
}
}
}
}
}
/// Notifies all blocked channel operations.
#[inline]
fn notify_all(&self) {
if !self.is_empty.load(Ordering::SeqCst) {
let mut blocked = self.blocked.lock();
for (_, opt_waker) in blocked.entries.iter_mut() {
// If there is no waker in this entry, that means it was already woken.
if let Some(w) = opt_waker.take() {
w.wake();
}
}
blocked.waker_count = 0;
self.is_empty.store(true, Ordering::SeqCst);
}
}
}
/// A simple spinlock.
struct Spinlock<T> {
flag: AtomicBool,
value: UnsafeCell<T>,
}
impl<T> Spinlock<T> {
/// Returns a new spinlock initialized with `value`.
fn new(value: T) -> Spinlock<T> {
Spinlock {
flag: AtomicBool::new(false),
value: UnsafeCell::new(value),
}
}
/// Locks the spinlock.
fn lock(&self) -> SpinlockGuard<'_, T> {
let backoff = Backoff::new();
while self.flag.swap(true, Ordering::Acquire) {
backoff.snooze();
}
SpinlockGuard { parent: self }
}
}
/// A guard holding a spinlock locked.
struct SpinlockGuard<'a, T> {
parent: &'a Spinlock<T>,
}
impl<'a, T> Drop for SpinlockGuard<'a, T> {
fn drop(&mut self) {
self.parent.flag.store(false, Ordering::Release);
}
}
impl<'a, T> Deref for SpinlockGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.parent.value.get() }
}
}
impl<'a, T> DerefMut for SpinlockGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.parent.value.get() }
}
}

@ -191,3 +191,6 @@ cfg_unstable! {
mod barrier; mod barrier;
mod channel; mod channel;
} }
pub(crate) mod waker_set;
pub(crate) use waker_set::WakerSet;

@ -2,18 +2,11 @@ use std::cell::UnsafeCell;
use std::fmt; use std::fmt;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use slab::Slab;
use crate::future::Future; use crate::future::Future;
use crate::task::{Context, Poll, Waker}; use crate::sync::WakerSet;
use crate::task::{Context, Poll};
/// Set if the mutex is locked.
const LOCK: usize = 1;
/// Set if there are tasks blocked on the mutex.
const BLOCKED: usize = 1 << 1;
/// A mutual exclusion primitive for protecting shared data. /// A mutual exclusion primitive for protecting shared data.
/// ///
@ -49,8 +42,8 @@ const BLOCKED: usize = 1 << 1;
/// # }) /// # })
/// ``` /// ```
pub struct Mutex<T> { pub struct Mutex<T> {
state: AtomicUsize, locked: AtomicBool,
blocked: std::sync::Mutex<Slab<Option<Waker>>>, wakers: WakerSet,
value: UnsafeCell<T>, value: UnsafeCell<T>,
} }
@ -69,8 +62,8 @@ impl<T> Mutex<T> {
/// ``` /// ```
pub fn new(t: T) -> Mutex<T> { pub fn new(t: T) -> Mutex<T> {
Mutex { Mutex {
state: AtomicUsize::new(0), locked: AtomicBool::new(false),
blocked: std::sync::Mutex::new(Slab::new()), wakers: WakerSet::new(),
value: UnsafeCell::new(t), value: UnsafeCell::new(t),
} }
} }
@ -105,75 +98,46 @@ impl<T> Mutex<T> {
pub struct LockFuture<'a, T> { pub struct LockFuture<'a, T> {
mutex: &'a Mutex<T>, mutex: &'a Mutex<T>,
opt_key: Option<usize>, opt_key: Option<usize>,
acquired: bool,
} }
impl<'a, T> Future for LockFuture<'a, T> { impl<'a, T> Future for LockFuture<'a, T> {
type Output = MutexGuard<'a, T>; type Output = MutexGuard<'a, T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.mutex.try_lock() { let poll = match self.mutex.try_lock() {
Some(guard) => { Some(guard) => Poll::Ready(guard),
self.acquired = true;
Poll::Ready(guard)
}
None => { None => {
let mut blocked = self.mutex.blocked.lock().unwrap(); // Insert this lock operation.
// Register the current task.
match self.opt_key { match self.opt_key {
None => { None => self.opt_key = Some(self.mutex.wakers.insert(cx)),
// Insert a new entry into the list of blocked tasks. Some(key) => self.mutex.wakers.update(key, cx),
let w = cx.waker().clone();
let key = blocked.insert(Some(w));
self.opt_key = Some(key);
if blocked.len() == 1 {
self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed);
}
}
Some(key) => {
// There is already an entry in the list of blocked tasks. Just
// reset the waker if it was removed.
if blocked[key].is_none() {
let w = cx.waker().clone();
blocked[key] = Some(w);
}
}
} }
// Try locking again because it's possible the mutex got unlocked just // Try locking again because it's possible the mutex got unlocked just
// before the current task was registered as a blocked task. // before the current task was inserted into the waker set.
match self.mutex.try_lock() { match self.mutex.try_lock() {
Some(guard) => { Some(guard) => Poll::Ready(guard),
self.acquired = true;
Poll::Ready(guard)
}
None => Poll::Pending, None => Poll::Pending,
} }
} }
};
if poll.is_ready() {
// If the current task is in the set, remove it.
if let Some(key) = self.opt_key.take() {
self.mutex.wakers.complete(key);
}
} }
poll
} }
} }
impl<T> Drop for LockFuture<'_, T> { impl<T> Drop for LockFuture<'_, T> {
fn drop(&mut self) { fn drop(&mut self) {
// If the current task is still in the set, that means it is being cancelled now.
if let Some(key) = self.opt_key { if let Some(key) = self.opt_key {
let mut blocked = self.mutex.blocked.lock().unwrap(); self.mutex.wakers.cancel(key);
let opt_waker = blocked.remove(key);
if opt_waker.is_none() && !self.acquired {
// We were awoken but didn't acquire the lock. Wake up another task.
if let Some((_, opt_waker)) = blocked.iter_mut().next() {
if let Some(w) = opt_waker.take() {
w.wake();
}
}
}
if blocked.is_empty() {
self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed);
}
} }
} }
} }
@ -181,7 +145,6 @@ impl<T> Mutex<T> {
LockFuture { LockFuture {
mutex: self, mutex: self,
opt_key: None, opt_key: None,
acquired: false,
} }
.await .await
} }
@ -220,7 +183,7 @@ impl<T> Mutex<T> {
/// # }) /// # })
/// ``` /// ```
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> { pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0 { if !self.locked.swap(true, Ordering::SeqCst) {
Some(MutexGuard(self)) Some(MutexGuard(self))
} else { } else {
None None
@ -266,18 +229,15 @@ impl<T> Mutex<T> {
impl<T: fmt::Debug> fmt::Debug for Mutex<T> { impl<T: fmt::Debug> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.try_lock() { struct Locked;
None => { impl fmt::Debug for Locked {
struct LockedPlaceholder;
impl fmt::Debug for LockedPlaceholder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<locked>") f.write_str("<locked>")
} }
} }
f.debug_struct("Mutex")
.field("data", &LockedPlaceholder) match self.try_lock() {
.finish() None => f.debug_struct("Mutex").field("data", &Locked).finish(),
}
Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(), Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
} }
} }
@ -303,19 +263,11 @@ unsafe impl<T: Sync> Sync for MutexGuard<'_, T> {}
impl<T> Drop for MutexGuard<'_, T> { impl<T> Drop for MutexGuard<'_, T> {
fn drop(&mut self) { fn drop(&mut self) {
let state = self.0.state.fetch_and(!LOCK, Ordering::AcqRel); // Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`.
self.0.locked.store(false, Ordering::SeqCst);
// If there are any blocked tasks, wake one of them up.
if state & BLOCKED != 0 {
let mut blocked = self.0.blocked.lock().unwrap();
if let Some((_, opt_waker)) = blocked.iter_mut().next() { // Notify one blocked `lock()` operation.
// If there is no waker in this entry, that means it was already woken. self.0.wakers.notify_one();
if let Some(w) = opt_waker.take() {
w.wake();
}
}
}
} }
} }

@ -10,7 +10,8 @@ use crate::future::Future;
use crate::task::{Context, Poll, Waker}; use crate::task::{Context, Poll, Waker};
/// Set if a write lock is held. /// Set if a write lock is held.
const WRITE_LOCK: usize = 1; #[allow(clippy::identity_op)]
const WRITE_LOCK: usize = 1 << 0;
/// Set if there are read operations blocked on the lock. /// Set if there are read operations blocked on the lock.
const BLOCKED_READS: usize = 1 << 1; const BLOCKED_READS: usize = 1 << 1;

@ -0,0 +1,200 @@
//! A common utility for building synchronization primitives.
//!
//! When an async operation is blocked, it needs to register itself somewhere so that it can be
//! notified later on. The `WakerSet` type helps with keeping track of such async operations and
//! notifying them when they may make progress.
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam_utils::Backoff;
use slab::Slab;
use crate::task::{Context, Waker};
/// Set when the entry list is locked.
#[allow(clippy::identity_op)]
const LOCKED: usize = 1 << 0;
/// Set when there are tasks for `notify_one()` to wake.
const NOTIFY_ONE: usize = 1 << 1;
/// Set when there are tasks for `notify_all()` to wake.
const NOTIFY_ALL: usize = 1 << 2;
/// Inner representation of `WakerSet`.
struct Inner {
/// A list of entries in the set.
///
/// Each entry has an optional waker associated with the task that is executing the operation.
/// If the waker is set to `None`, that means the task has been woken up but hasn't removed
/// itself from the `WakerSet` yet.
///
/// The key of each entry is its index in the `Slab`.
entries: Slab<Option<Waker>>,
/// The number of entries that have the waker set to `None`.
none_count: usize,
}
/// A set holding wakers.
pub struct WakerSet {
/// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`.
flag: AtomicUsize,
/// A set holding wakers.
inner: UnsafeCell<Inner>,
}
impl WakerSet {
/// Creates a new `WakerSet`.
#[inline]
pub fn new() -> WakerSet {
WakerSet {
flag: AtomicUsize::new(0),
inner: UnsafeCell::new(Inner {
entries: Slab::new(),
none_count: 0,
}),
}
}
/// Inserts a waker for a blocked operation and returns a key associated with it.
pub fn insert(&self, cx: &Context<'_>) -> usize {
let w = cx.waker().clone();
self.lock().entries.insert(Some(w))
}
/// Updates the waker of a previously inserted entry.
pub fn update(&self, key: usize, cx: &Context<'_>) {
let mut inner = self.lock();
match &mut inner.entries[key] {
None => {
// Fill in the waker.
let w = cx.waker().clone();
inner.entries[key] = Some(w);
inner.none_count -= 1;
}
Some(w) => {
// Replace the waker if the existing one is different.
if !w.will_wake(cx.waker()) {
*w = cx.waker().clone();
}
}
}
}
/// Removes the waker of a completed operation.
pub fn complete(&self, key: usize) {
let mut inner = self.lock();
if inner.entries.remove(key).is_none() {
inner.none_count -= 1;
}
}
/// Removes the waker of a cancelled operation.
pub fn cancel(&self, key: usize) {
let mut inner = self.lock();
if inner.entries.remove(key).is_none() {
inner.none_count -= 1;
// The operation was cancelled and notified so notify another operation instead.
if let Some((_, opt_waker)) = inner.entries.iter_mut().next() {
// If there is no waker in this entry, that means it was already woken.
if let Some(w) = opt_waker.take() {
w.wake();
inner.none_count += 1;
}
}
}
}
/// Notifies one blocked operation.
#[inline]
pub fn notify_one(&self) {
// Use `SeqCst` ordering to synchronize with `Lock::drop()`.
if self.flag.load(Ordering::SeqCst) & NOTIFY_ONE != 0 {
self.notify(false);
}
}
/// Notifies all blocked operations.
// TODO: Delete this attribute when `crate::sync::channel()` is stabilized.
#[cfg(feature = "unstable")]
#[inline]
pub fn notify_all(&self) {
// Use `SeqCst` ordering to synchronize with `Lock::drop()`.
if self.flag.load(Ordering::SeqCst) & NOTIFY_ALL != 0 {
self.notify(true);
}
}
/// Notifies blocked operations, either one or all of them.
fn notify(&self, all: bool) {
let mut inner = &mut *self.lock();
for (_, opt_waker) in inner.entries.iter_mut() {
// If there is no waker in this entry, that means it was already woken.
if let Some(w) = opt_waker.take() {
w.wake();
inner.none_count += 1;
}
if !all {
break;
}
}
}
/// Locks the list of entries.
#[cold]
fn lock(&self) -> Lock<'_> {
let backoff = Backoff::new();
while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 {
backoff.snooze();
}
Lock { waker_set: self }
}
}
/// A guard holding a `WakerSet` locked.
struct Lock<'a> {
waker_set: &'a WakerSet,
}
impl Drop for Lock<'_> {
#[inline]
fn drop(&mut self) {
let mut flag = 0;
// If there is at least one entry and all are `Some`, then `notify_one()` has work to do.
if !self.entries.is_empty() && self.none_count == 0 {
flag |= NOTIFY_ONE;
}
// If there is at least one `Some` entry, then `notify_all()` has work to do.
if self.entries.len() - self.none_count > 0 {
flag |= NOTIFY_ALL;
}
// Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`.
self.waker_set.flag.store(flag, Ordering::SeqCst);
}
}
impl Deref for Lock<'_> {
type Target = Inner;
#[inline]
fn deref(&self) -> &Inner {
unsafe { &*self.waker_set.inner.get() }
}
}
impl DerefMut for Lock<'_> {
#[inline]
fn deref_mut(&mut self) -> &mut Inner {
unsafe { &mut *self.waker_set.inner.get() }
}
}

@ -1,21 +1,15 @@
use std::cell::{Cell, UnsafeCell}; use std::cell::Cell;
use std::mem::{self, ManuallyDrop}; use std::mem::{self, ManuallyDrop};
use std::panic::{self, AssertUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable}; use std::task::{RawWaker, RawWakerVTable};
use std::thread; use std::thread;
use crossbeam_utils::sync::Parker; use crossbeam_utils::sync::Parker;
use pin_project_lite::pin_project; use kv_log_macro::trace;
use log::log_enabled;
use super::task;
use super::task_local;
use super::worker;
use crate::future::Future; use crate::future::Future;
use crate::task::{Context, Poll, Waker}; use crate::task::{Context, Poll, Task, Waker};
use kv_log_macro::trace;
/// Spawns a task and blocks the current thread on its result. /// Spawns a task and blocks the current thread on its result.
/// ///
@ -42,81 +36,43 @@ pub fn block_on<F, T>(future: F) -> T
where where
F: Future<Output = T>, F: Future<Output = T>,
{ {
unsafe { // Create a new task handle.
// A place on the stack where the result will be stored. let task = Task::new(None);
let out = &mut UnsafeCell::new(None);
// Wrap the future into one that stores the result into `out`.
let future = {
let out = out.get();
async move {
let future = CatchUnwindFuture {
future: AssertUnwindSafe(future),
};
*out = Some(future.await);
}
};
// Create a tag for the task.
let tag = task::Tag::new(None);
// Log this `block_on` operation. // Log this `block_on` operation.
let child_id = tag.task_id().as_u64(); if log_enabled!(log::Level::Trace) {
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
trace!("block_on", { trace!("block_on", {
parent_id: parent_id, task_id: task.id().0,
child_id: child_id, parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0),
}); });
}
// Wrap the future into one that drops task-local variables on exit.
let future = task_local::add_finalizer(future);
let future = async move { let future = async move {
future.await; // Drop task-locals on exit.
trace!("block_on completed", { defer! {
parent_id: parent_id, Task::get_current(|t| unsafe { t.drop_locals() });
child_id: child_id,
});
};
// Pin the future onto the stack.
pin_utils::pin_mut!(future);
// Transmute the future into one that is futurestatic.
let future = mem::transmute::<
Pin<&'_ mut dyn Future<Output = ()>>,
Pin<&'static mut dyn Future<Output = ()>>,
>(future);
// Block on the future and and wait for it to complete.
worker::set_tag(&tag, || block(future));
// Take out the result.
match (*out.get()).take().unwrap() {
Ok(v) => v,
Err(err) => panic::resume_unwind(err),
}
} }
}
pin_project! { // Log completion on exit.
struct CatchUnwindFuture<F> { defer! {
#[pin] if log_enabled!(log::Level::Trace) {
future: F, Task::get_current(|t| {
trace!("completed", {
task_id: t.id().0,
});
});
}
} }
}
impl<F: Future + UnwindSafe> Future for CatchUnwindFuture<F> { future.await
type Output = thread::Result<F::Output>; };
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // Run the future as a task.
panic::catch_unwind(AssertUnwindSafe(|| self.project().future.poll(cx)))?.map(Ok) unsafe { Task::set_current(&task, || run(future)) }
}
} }
fn block<F, T>(f: F) -> T /// Blocks the current thread on a future's result.
fn run<F, T>(future: F) -> T
where where
F: Future<Output = T>, F: Future<Output = T>,
{ {
@ -129,35 +85,12 @@ where
static CACHE: Cell<Option<Arc<Parker>>> = Cell::new(None); static CACHE: Cell<Option<Arc<Parker>>> = Cell::new(None);
} }
pin_utils::pin_mut!(f); // Virtual table for wakers based on `Arc<Parker>`.
static VTABLE: RawWakerVTable = {
CACHE.with(|cache| {
// Reuse a cached parker or create a new one for this invocation of `block`.
let arc_parker: Arc<Parker> = cache.take().unwrap_or_else(|| Arc::new(Parker::new()));
let ptr = (&*arc_parker as *const Parker) as *const ();
let vt = vtable();
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) };
let cx = &mut Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f.as_mut().poll(cx) {
// Save the parker for the next invocation of `block`.
cache.set(Some(arc_parker));
return t;
}
arc_parker.park();
}
})
}
fn vtable() -> &'static RawWakerVTable {
unsafe fn clone_raw(ptr: *const ()) -> RawWaker { unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
#![allow(clippy::redundant_clone)]
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
mem::forget(arc.clone()); mem::forget(arc.clone());
RawWaker::new(ptr, vtable()) RawWaker::new(ptr, &VTABLE)
} }
unsafe fn wake_raw(ptr: *const ()) { unsafe fn wake_raw(ptr: *const ()) {
@ -174,5 +107,37 @@ fn vtable() -> &'static RawWakerVTable {
drop(Arc::from_raw(ptr as *const Parker)) drop(Arc::from_raw(ptr as *const Parker))
} }
&RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw) RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)
};
// Pin the future on the stack.
pin_utils::pin_mut!(future);
CACHE.with(|cache| {
// Reuse a cached parker or create a new one for this invocation of `block`.
let arc_parker: Arc<Parker> = cache.take().unwrap_or_else(|| Arc::new(Parker::new()));
let ptr = (&*arc_parker as *const Parker) as *const ();
// Create a waker and task context.
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &VTABLE))) };
let cx = &mut Context::from_waker(&waker);
let mut step = 0;
loop {
if let Poll::Ready(t) = future.as_mut().poll(cx) {
// Save the parker for the next invocation of `block`.
cache.set(Some(arc_parker));
return t;
}
// Yield a few times or park the current thread.
if step < 3 {
thread::yield_now();
step += 1;
} else {
arc_parker.park();
step = 0;
}
}
})
} }

@ -1,7 +1,11 @@
use super::pool; use kv_log_macro::trace;
use super::JoinHandle; use log::log_enabled;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
use crate::task::executor;
use crate::task::{JoinHandle, Task};
use crate::utils::abort_on_panic;
/// Task builder that configures the settings of a new task. /// Task builder that configures the settings of a new task.
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -11,11 +15,13 @@ pub struct Builder {
impl Builder { impl Builder {
/// Creates a new builder. /// Creates a new builder.
#[inline]
pub fn new() -> Builder { pub fn new() -> Builder {
Builder { name: None } Builder { name: None }
} }
/// Configures the name of the task. /// Configures the name of the task.
#[inline]
pub fn name(mut self, name: String) -> Builder { pub fn name(mut self, name: String) -> Builder {
self.name = Some(name); self.name = Some(name);
self self
@ -27,6 +33,52 @@ impl Builder {
F: Future<Output = T> + Send + 'static, F: Future<Output = T> + Send + 'static,
T: Send + 'static, T: Send + 'static,
{ {
Ok(pool::get().spawn(future, self)) // Create a new task handle.
let task = Task::new(self.name);
// Log this `spawn` operation.
if log_enabled!(log::Level::Trace) {
trace!("spawn", {
task_id: task.id().0,
parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0),
});
}
let future = async move {
// Drop task-locals on exit.
defer! {
Task::get_current(|t| unsafe { t.drop_locals() });
}
// Log completion on exit.
defer! {
if log_enabled!(log::Level::Trace) {
Task::get_current(|t| {
trace!("completed", {
task_id: t.id().0,
});
});
}
}
future.await
};
let schedule = move |t| executor::schedule(Runnable(t));
let (task, handle) = async_task::spawn(future, schedule, task);
task.schedule();
Ok(JoinHandle::new(handle))
}
}
/// A runnable task.
pub(crate) struct Runnable(async_task::Task<Task>);
impl Runnable {
/// Runs the task by polling its future once.
pub fn run(self) {
unsafe {
Task::set_current(self.0.tag(), || abort_on_panic(|| self.0.run()));
}
} }
} }

@ -0,0 +1,28 @@
use crate::task::Task;
/// Returns a handle to the current task.
///
/// # Panics
///
/// This function will panic if not called within the context of a task created by [`block_on`],
/// [`spawn`], or [`Builder::spawn`].
///
/// [`block_on`]: fn.block_on.html
/// [`spawn`]: fn.spawn.html
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// println!("The name of this task is {:?}", task::current().name());
/// #
/// # })
/// ```
pub fn current() -> Task {
Task::get_current(|t| t.clone())
.expect("`task::current()` called outside the context of a task")
}

@ -0,0 +1,13 @@
//! Task executor.
//!
//! API bindings between `crate::task` and this module are very simple:
//!
//! * The only export is the `schedule` function.
//! * The only import is the `crate::task::Runnable` type.
pub(crate) use pool::schedule;
use sleepers::Sleepers;
mod pool;
mod sleepers;

@ -0,0 +1,140 @@
use std::cell::UnsafeCell;
use std::iter;
use std::thread;
use std::time::Duration;
use crossbeam_deque::{Injector, Stealer, Worker};
use once_cell::sync::Lazy;
use crate::task::executor::Sleepers;
use crate::task::Runnable;
use crate::utils::{abort_on_panic, random};
/// The state of an executor.
struct Pool {
/// The global queue of tasks.
injector: Injector<Runnable>,
/// Handles to local queues for stealing work from worker threads.
stealers: Vec<Stealer<Runnable>>,
/// Used for putting idle workers to sleep and notifying them when new tasks come in.
sleepers: Sleepers,
}
/// Global executor that runs spawned tasks.
static POOL: Lazy<Pool> = Lazy::new(|| {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
// Spawn worker threads.
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
thread::Builder::new()
.name("async-std/executor".to_string())
.spawn(|| abort_on_panic(|| main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
});
thread_local! {
/// Local task queue associated with the current worker thread.
static QUEUE: UnsafeCell<Option<Worker<Runnable>>> = UnsafeCell::new(None);
}
/// Schedules a new runnable task for execution.
pub(crate) fn schedule(task: Runnable) {
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref() };
// If the current thread is a worker thread, push the task into its local task queue.
// Otherwise, push it into the global task queue.
match local {
None => POOL.injector.push(task),
Some(q) => q.push(task),
}
});
// Notify a sleeping worker that new work just came in.
POOL.sleepers.notify_one();
}
/// Main loop running a worker thread.
fn main_loop(local: Worker<Runnable>) {
// Initialize the local task queue.
QUEUE.with(|queue| unsafe { *queue.get() = Some(local) });
// The number of times the thread didn't find work in a row.
let mut step = 0;
loop {
// Try to find a runnable task.
match find_runnable() {
Some(task) => {
// Found. Now run the task.
task.run();
step = 0;
}
None => {
// Yield the current thread or put it to sleep.
match step {
0..=2 => {
thread::yield_now();
step += 1;
}
3 => {
thread::sleep(Duration::from_micros(10));
step += 1;
}
_ => {
POOL.sleepers.wait();
step = 0;
}
}
}
}
}
}
/// Find the next runnable task.
fn find_runnable() -> Option<Runnable> {
let pool = &*POOL;
QUEUE.with(|queue| {
let local = unsafe { (*queue.get()).as_ref().unwrap() };
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the global queue.
pool.injector
.steal_batch_and_pop(&local)
// Or try stealing a batch of tasks from one of the other threads.
.or_else(|| {
// First, pick a random starting point in the list of local queues.
let len = pool.stealers.len();
let start = random(len as u32) as usize;
// Try stealing a batch of tasks from each local queue starting from the
// chosen point.
let (l, r) = pool.stealers.split_at(start);
let rotated = r.iter().chain(l.iter());
rotated.map(|s| s.steal_batch_and_pop(&local)).collect()
})
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
})
}

@ -0,0 +1,56 @@
use std::future::Future;
use std::pin::Pin;
use crate::task::{Context, Poll, Task};
/// A handle that awaits the result of a task.
///
/// Dropping a [`JoinHandle`] will detach the task, meaning that there is no longer
/// a handle to the task and no way to `join` on it.
///
/// Created when a task is [spawned].
///
/// [spawned]: fn.spawn.html
#[derive(Debug)]
pub struct JoinHandle<T>(async_task::JoinHandle<T, Task>);
unsafe impl<T> Send for JoinHandle<T> {}
unsafe impl<T> Sync for JoinHandle<T> {}
impl<T> JoinHandle<T> {
/// Creates a new `JoinHandle`.
pub(crate) fn new(inner: async_task::JoinHandle<T, Task>) -> JoinHandle<T> {
JoinHandle(inner)
}
/// Returns a handle to the underlying task.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// let handle = task::spawn(async {
/// 1 + 2
/// });
/// println!("id = {}", handle.task().id());
/// #
/// # })
pub fn task(&self) -> &Task {
self.0.tag()
}
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => panic!("cannot await the result of a panicked task"),
Poll::Ready(Some(val)) => Poll::Ready(val),
}
}
}

@ -126,63 +126,35 @@ pub use async_macros::ready;
pub use block_on::block_on; pub use block_on::block_on;
pub use builder::Builder; pub use builder::Builder;
pub use pool::spawn; pub use current::current;
pub use join_handle::JoinHandle;
pub use sleep::sleep; pub use sleep::sleep;
pub use task::{JoinHandle, Task, TaskId}; pub use spawn::spawn;
pub use task::Task;
pub use task_id::TaskId;
pub use task_local::{AccessError, LocalKey}; pub use task_local::{AccessError, LocalKey};
pub use worker::current;
#[cfg(any(feature = "unstable", test))]
pub use spawn_blocking::spawn_blocking;
#[cfg(not(any(feature = "unstable", test)))]
pub(crate) use spawn_blocking::spawn_blocking;
use builder::Runnable;
use task_local::LocalsMap;
mod block_on; mod block_on;
mod builder; mod builder;
mod pool; mod current;
mod executor;
mod join_handle;
mod sleep; mod sleep;
mod sleepers; mod spawn;
mod spawn_blocking;
mod task; mod task;
mod task_id;
mod task_local; mod task_local;
mod worker;
pub(crate) mod blocking;
cfg_unstable! { cfg_unstable! {
mod yield_now;
pub use yield_now::yield_now; pub use yield_now::yield_now;
} mod yield_now;
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. This
/// is useful to prevent long-running synchronous operations from blocking the main futures
/// executor.
///
/// See also: [`task::block_on`], [`task::spawn`].
///
/// [`task::block_on`]: fn.block_on.html
/// [`task::spawn`]: fn.spawn.html
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// task::spawn_blocking(|| {
/// println!("long-running task here");
/// }).await;
/// #
/// # })
/// ```
// Once this function stabilizes we should merge `blocking::spawn` into this so
// all code in our crate uses `task::blocking` too.
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub fn spawn_blocking<F, R>(f: F) -> task::JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
blocking::spawn(f)
} }

@ -1,138 +0,0 @@
use std::iter;
use std::thread;
use crossbeam_deque::{Injector, Stealer, Worker};
use kv_log_macro::trace;
use lazy_static::lazy_static;
use super::sleepers::Sleepers;
use super::task;
use super::task_local;
use super::worker;
use super::{Builder, JoinHandle};
use crate::future::Future;
use crate::utils::abort_on_panic;
/// Spawns a task.
///
/// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task.
///
/// [`std::thread`]: https://doc.rust-lang.org/std/thread/fn.spawn.html
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// let handle = task::spawn(async {
/// 1 + 2
/// });
///
/// assert_eq!(handle.await, 3);
/// #
/// # })
/// ```
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(future).expect("cannot spawn future")
}
pub(crate) struct Pool {
pub injector: Injector<task::Runnable>,
pub stealers: Vec<Stealer<task::Runnable>>,
pub sleepers: Sleepers,
}
impl Pool {
/// Spawn a future onto the pool.
pub fn spawn<F, T>(&self, future: F, builder: Builder) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let tag = task::Tag::new(builder.name);
// Log this `spawn` operation.
let child_id = tag.task_id().as_u64();
let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0);
trace!("spawn", {
parent_id: parent_id,
child_id: child_id,
});
// Wrap the future into one that drops task-local variables on exit.
let future = unsafe { task_local::add_finalizer(future) };
// Wrap the future into one that logs completion on exit.
let future = async move {
let res = future.await;
trace!("spawn completed", {
parent_id: parent_id,
child_id: child_id,
});
res
};
let (task, handle) = async_task::spawn(future, worker::schedule, tag);
task.schedule();
JoinHandle::new(handle)
}
/// Find the next runnable task to run.
pub fn find_task(&self, local: &Worker<task::Runnable>) -> Option<task::Runnable> {
// Pop a task from the local queue, if not empty.
local.pop().or_else(|| {
// Otherwise, we need to look for a task elsewhere.
iter::repeat_with(|| {
// Try stealing a batch of tasks from the injector queue.
self.injector
.steal_batch_and_pop(local)
// Or try stealing a bach of tasks from one of the other threads.
.or_else(|| {
self.stealers
.iter()
.map(|s| s.steal_batch_and_pop(local))
.collect()
})
})
// Loop while no task was stolen and any steal operation needs to be retried.
.find(|s| !s.is_retry())
// Extract the stolen task, if there is one.
.and_then(|s| s.success())
})
}
}
#[inline]
pub(crate) fn get() -> &'static Pool {
lazy_static! {
static ref POOL: Pool = {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
// Spawn worker threads.
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| abort_on_panic(|| worker::main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
};
}
&*POOL
}

@ -0,0 +1,31 @@
use crate::future::Future;
use crate::task::{Builder, JoinHandle};
/// Spawns a task.
///
/// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task.
///
/// [`std::thread`]: https://doc.rust-lang.org/std/thread/fn.spawn.html
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// let handle = task::spawn(async {
/// 1 + 2
/// });
///
/// assert_eq!(handle.await, 3);
/// #
/// # })
/// ```
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
Builder::new().spawn(future).expect("cannot spawn task")
}

@ -1,34 +1,74 @@
//! A thread pool for running blocking functions asynchronously.
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender}; use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static; use once_cell::sync::Lazy;
use crate::task::{JoinHandle, Task};
use crate::utils::{abort_on_panic, random};
type Runnable = async_task::Task<Task>;
use crate::task::task::{JoinHandle, Tag}; /// Spawns a blocking task.
use crate::utils::abort_on_panic; ///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. This
/// is useful to prevent long-running synchronous operations from blocking the main futures
/// executor.
///
/// See also: [`task::block_on`], [`task::spawn`].
///
/// [`task::block_on`]: fn.block_on.html
/// [`task::spawn`]: fn.spawn.html
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # #[cfg(feature = "unstable")]
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// task::spawn_blocking(|| {
/// println!("long-running task here");
/// }).await;
/// #
/// # })
/// ```
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub fn spawn_blocking<F, T>(f: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (task, handle) = async_task::spawn(async { f() }, schedule, Task::new(None));
task.schedule();
JoinHandle::new(handle)
}
const MAX_THREADS: u64 = 10_000; const MAX_THREADS: u64 = 10_000;
static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
struct Pool { struct Pool {
sender: Sender<async_task::Task<Tag>>, sender: Sender<Runnable>,
receiver: Receiver<async_task::Task<Tag>>, receiver: Receiver<Runnable>,
} }
lazy_static! { static POOL: Lazy<Pool> = Lazy::new(|| {
static ref POOL: Pool = {
for _ in 0..2 { for _ in 0..2 {
thread::Builder::new() thread::Builder::new()
.name("async-blocking-driver".to_string()) .name("async-std/blocking".to_string())
.spawn(|| abort_on_panic(|| { .spawn(|| {
abort_on_panic(|| {
for task in &POOL.receiver { for task in &POOL.receiver {
task.run(); task.run();
} }
})) })
})
.expect("cannot start a thread driving blocking tasks"); .expect("cannot start a thread driving blocking tasks");
} }
@ -41,8 +81,7 @@ lazy_static! {
// reducing bufferbloat. // reducing bufferbloat.
let (sender, receiver) = bounded(0); let (sender, receiver) = bounded(0);
Pool { sender, receiver } Pool { sender, receiver }
}; });
}
// Create up to MAX_THREADS dynamic blocking task worker threads. // Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't // Dynamic threads will terminate themselves if they don't
@ -66,7 +105,7 @@ fn maybe_create_another_blocking_thread() {
let rand_sleep_ms = u64::from(random(10_000)); let rand_sleep_ms = u64::from(random(10_000));
thread::Builder::new() thread::Builder::new()
.name("async-blocking-driver-dynamic".to_string()) .name("async-std/blocking".to_string())
.spawn(move || { .spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
@ -82,8 +121,8 @@ fn maybe_create_another_blocking_thread() {
// Enqueues work, attempting to send to the threadpool in a // Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if // nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work. // there is not a thread ready to accept the work.
fn schedule(t: async_task::Task<Tag>) { pub(crate) fn schedule(task: Runnable) {
if let Err(err) = POOL.sender.try_send(t) { if let Err(err) = POOL.sender.try_send(task) {
// We were not able to send to the channel without // We were not able to send to the channel without
// blocking. Try to spin up another thread and then // blocking. Try to spin up another thread and then
// retry sending while blocking. // retry sending while blocking.
@ -91,45 +130,3 @@ fn schedule(t: async_task::Task<Tag>) {
POOL.sender.send(err.into_inner()).unwrap(); POOL.sender.send(err.into_inner()).unwrap();
} }
} }
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub(crate) fn spawn<F, R>(f: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let tag = Tag::new(None);
let future = async move { f() };
let (task, handle) = async_task::spawn(future, schedule, tag);
task.schedule();
JoinHandle::new(handle)
}
/// Generates a random number in `0..n`.
fn random(n: u32) -> u32 {
use std::cell::Cell;
use std::num::Wrapping;
thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1_406_868_647));
}
RNG.with(|rng| {
// This is the 32-bit variant of Xorshift.
//
// Source: https://en.wikipedia.org/wiki/Xorshift
let mut x = rng.get();
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
rng.set(x);
// This is a fast alternative to `x % n`.
//
// Author: Daniel Lemire
// Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32
})
}

@ -1,31 +1,74 @@
use std::cell::Cell;
use std::fmt; use std::fmt;
use std::future::Future; use std::mem::ManuallyDrop;
use std::i64; use std::ptr;
use std::mem; use std::sync::atomic::{AtomicPtr, Ordering};
use std::num::NonZeroU64;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use super::task_local; use crate::task::{LocalsMap, TaskId};
use crate::task::{Context, Poll}; use crate::utils::abort_on_panic;
thread_local! {
/// A pointer to the currently running task.
static CURRENT: Cell<*const Task> = Cell::new(ptr::null_mut());
}
/// The inner representation of a task handle.
struct Inner {
/// The task ID.
id: TaskId,
/// The optional task name.
name: Option<Box<str>>,
/// The map holding task-local values.
locals: LocalsMap,
}
impl Inner {
#[inline]
fn new(name: Option<String>) -> Inner {
Inner {
id: TaskId::generate(),
name: name.map(String::into_boxed_str),
locals: LocalsMap::new(),
}
}
}
/// A handle to a task. /// A handle to a task.
#[derive(Clone)] pub struct Task {
pub struct Task(Arc<Metadata>); /// The inner representation.
///
/// This pointer is lazily initialized on first use. In most cases, the inner representation is
/// never touched and therefore we don't allocate it unless it's really needed.
inner: AtomicPtr<Inner>,
}
unsafe impl Send for Task {} unsafe impl Send for Task {}
unsafe impl Sync for Task {} unsafe impl Sync for Task {}
impl Task { impl Task {
/// Returns a reference to task metadata. /// Creates a new task handle.
pub(crate) fn metadata(&self) -> &Metadata { ///
&self.0 /// If the task is unnamed, the inner representation of the task will be lazily allocated on
/// demand.
#[inline]
pub(crate) fn new(name: Option<String>) -> Task {
let inner = match name {
None => AtomicPtr::default(),
Some(name) => {
let raw = Arc::into_raw(Arc::new(Inner::new(Some(name))));
AtomicPtr::new(raw as *mut Inner)
}
};
Task { inner }
} }
/// Gets the task's unique identifier. /// Gets the task's unique identifier.
#[inline]
pub fn id(&self) -> TaskId { pub fn id(&self) -> TaskId {
self.metadata().task_id self.inner().id
} }
/// Returns the name of this task. /// Returns the name of this task.
@ -34,178 +77,101 @@ impl Task {
/// ///
/// [`Builder::name`]: struct.Builder.html#method.name /// [`Builder::name`]: struct.Builder.html#method.name
pub fn name(&self) -> Option<&str> { pub fn name(&self) -> Option<&str> {
self.metadata().name.as_ref().map(|s| s.as_str()) self.inner().name.as_ref().map(|s| &**s)
} }
}
impl fmt::Debug for Task { /// Returns the map holding task-local values.
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { pub(crate) fn locals(&self) -> &LocalsMap {
f.debug_struct("Task").field("name", &self.name()).finish() &self.inner().locals
}
}
/// A handle that awaits the result of a task.
///
/// Dropping a [`JoinHandle`] will detach the task, meaning that there is no longer
/// a handle to the task and no way to `join` on it.
///
/// Created when a task is [spawned].
///
/// [spawned]: fn.spawn.html
#[derive(Debug)]
pub struct JoinHandle<T>(async_task::JoinHandle<T, Tag>);
unsafe impl<T> Send for JoinHandle<T> {}
unsafe impl<T> Sync for JoinHandle<T> {}
impl<T> JoinHandle<T> {
pub(crate) fn new(inner: async_task::JoinHandle<T, Tag>) -> JoinHandle<T> {
JoinHandle(inner)
} }
/// Returns a handle to the underlying task. /// Drops all task-local values.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
/// ///
/// let handle = task::spawn(async { /// This method is only safe to call at the end of the task.
/// 1 + 2 #[inline]
/// }); pub(crate) unsafe fn drop_locals(&self) {
/// println!("id = {}", handle.task().id()); let raw = self.inner.load(Ordering::Acquire);
/// # if let Some(inner) = raw.as_mut() {
/// # }) // Abort the process if dropping task-locals panics.
pub fn task(&self) -> &Task { abort_on_panic(|| {
self.0.tag().task() inner.locals.clear();
} });
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => panic!("task has panicked"),
Poll::Ready(Some(val)) => Poll::Ready(val),
} }
} }
}
/// A unique identifier for a task.
///
/// # Examples
///
/// ```
/// #
/// use async_std::task;
///
/// task::block_on(async {
/// println!("id = {:?}", task::current().id());
/// })
/// ```
#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)]
pub struct TaskId(NonZeroU64);
impl TaskId { /// Returns the inner representation, initializing it on first use.
pub(crate) fn new() -> TaskId { fn inner(&self) -> &Inner {
static COUNTER: AtomicU64 = AtomicU64::new(1); loop {
let raw = self.inner.load(Ordering::Acquire);
let id = COUNTER.fetch_add(1, Ordering::Relaxed); if !raw.is_null() {
return unsafe { &*raw };
}
if id > i64::MAX as u64 { let new = Arc::into_raw(Arc::new(Inner::new(None))) as *mut Inner;
std::process::abort(); if self.inner.compare_and_swap(raw, new, Ordering::AcqRel) != raw {
unsafe {
drop(Arc::from_raw(new));
} }
unsafe { TaskId(NonZeroU64::new_unchecked(id)) }
} }
pub(crate) fn as_u64(self) -> u64 {
self.0.get()
} }
}
impl fmt::Display for TaskId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
} }
}
pub(crate) type Runnable = async_task::Task<Tag>;
pub(crate) struct Metadata {
pub task_id: TaskId,
pub name: Option<String>,
pub local_map: task_local::Map,
}
pub(crate) struct Tag {
task_id: TaskId,
raw_metadata: AtomicUsize,
}
impl Tag {
pub fn new(name: Option<String>) -> Tag {
let task_id = TaskId::new();
let opt_task = name.map(|name| {
Task(Arc::new(Metadata {
task_id,
name: Some(name),
local_map: task_local::Map::new(),
}))
});
Tag { /// Set a reference to the current task.
task_id, pub(crate) unsafe fn set_current<F, R>(task: *const Task, f: F) -> R
raw_metadata: AtomicUsize::new(unsafe { where
mem::transmute::<Option<Task>, usize>(opt_task) F: FnOnce() -> R,
}), {
CURRENT.with(|current| {
let old_task = current.replace(task);
defer! {
current.set(old_task);
} }
f()
})
} }
pub fn task(&self) -> &Task { /// Gets a reference to the current task.
#[allow(clippy::transmute_ptr_to_ptr)] pub(crate) fn get_current<F, R>(f: F) -> Option<R>
unsafe { where
let raw = self.raw_metadata.load(Ordering::Acquire); F: FnOnce(&Task) -> R,
if mem::transmute::<&usize, &Option<Task>>(&raw).is_none() {
let new = Some(Task(Arc::new(Metadata {
task_id: TaskId::new(),
name: None,
local_map: task_local::Map::new(),
})));
let new_raw = mem::transmute::<Option<Task>, usize>(new);
if self
.raw_metadata
.compare_exchange(raw, new_raw, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{ {
let new = mem::transmute::<usize, Option<Task>>(new_raw); let res = CURRENT.try_with(|current| unsafe { current.get().as_ref().map(f) });
drop(new); match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
} }
} }
}
#[allow(clippy::transmute_ptr_to_ptr)] impl Drop for Task {
mem::transmute::<&AtomicUsize, &Option<Task>>(&self.raw_metadata) fn drop(&mut self) {
.as_ref() // Deallocate the inner representation if it was initialized.
.unwrap() let raw = *self.inner.get_mut();
if !raw.is_null() {
unsafe {
drop(Arc::from_raw(raw));
}
} }
} }
}
pub fn task_id(&self) -> TaskId { impl Clone for Task {
self.task_id fn clone(&self) -> Task {
// We need to make sure the inner representation is initialized now so that this instance
// and the clone have raw pointers that point to the same `Arc<Inner>`.
let arc = unsafe { ManuallyDrop::new(Arc::from_raw(self.inner())) };
let raw = Arc::into_raw(Arc::clone(&arc));
Task {
inner: AtomicPtr::new(raw as *mut Inner),
}
} }
} }
impl Drop for Tag { impl fmt::Debug for Task {
fn drop(&mut self) { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let raw = *self.raw_metadata.get_mut(); f.debug_struct("Task")
let opt_task = unsafe { mem::transmute::<usize, Option<Task>>(raw) }; .field("id", &self.id())
drop(opt_task); .field("name", &self.name())
.finish()
} }
} }

@ -0,0 +1,35 @@
use std::fmt;
use std::sync::atomic::{AtomicU64, Ordering};
/// A unique identifier for a task.
///
/// # Examples
///
/// ```
/// use async_std::task;
///
/// task::block_on(async {
/// println!("id = {:?}", task::current().id());
/// })
/// ```
#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)]
pub struct TaskId(pub(crate) u64);
impl TaskId {
/// Generates a new `TaskId`.
pub(crate) fn generate() -> TaskId {
static COUNTER: AtomicU64 = AtomicU64::new(1);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
if id > u64::max_value() / 2 {
std::process::abort();
}
TaskId(id)
}
}
impl fmt::Display for TaskId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

@ -1,14 +1,9 @@
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::future::Future; use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use lazy_static::lazy_static; use crate::task::Task;
use super::worker;
use crate::utils::abort_on_panic;
/// Declares task-local values. /// Declares task-local values.
/// ///
@ -50,7 +45,7 @@ macro_rules! task_local {
$crate::task::LocalKey { $crate::task::LocalKey {
__init, __init,
__key: ::std::sync::atomic::AtomicUsize::new(0), __key: ::std::sync::atomic::AtomicU32::new(0),
} }
}; };
); );
@ -71,7 +66,7 @@ pub struct LocalKey<T: Send + 'static> {
pub __init: fn() -> T, pub __init: fn() -> T,
#[doc(hidden)] #[doc(hidden)]
pub __key: AtomicUsize, pub __key: AtomicU32,
} }
impl<T: Send + 'static> LocalKey<T> { impl<T: Send + 'static> LocalKey<T> {
@ -154,14 +149,13 @@ impl<T: Send + 'static> LocalKey<T> {
where where
F: FnOnce(&T) -> R, F: FnOnce(&T) -> R,
{ {
worker::get_task(|task| unsafe { Task::get_current(|task| unsafe {
// Prepare the numeric key, initialization function, and the map of task-locals. // Prepare the numeric key, initialization function, and the map of task-locals.
let key = self.key(); let key = self.key();
let init = || Box::new((self.__init)()) as Box<dyn Send>; let init = || Box::new((self.__init)()) as Box<dyn Send>;
let map = &task.metadata().local_map;
// Get the value in the map of task-locals, or initialize and insert one. // Get the value in the map of task-locals, or initialize and insert one.
let value: *const dyn Send = map.get_or_insert(key, init); let value: *const dyn Send = task.locals().get_or_insert(key, init);
// Call the closure with the value passed as an argument. // Call the closure with the value passed as an argument.
f(&*(value as *const T)) f(&*(value as *const T))
@ -171,26 +165,26 @@ impl<T: Send + 'static> LocalKey<T> {
/// Returns the numeric key associated with this task-local. /// Returns the numeric key associated with this task-local.
#[inline] #[inline]
fn key(&self) -> usize { fn key(&self) -> u32 {
#[cold] #[cold]
fn init(key: &AtomicUsize) -> usize { fn init(key: &AtomicU32) -> u32 {
lazy_static! { static COUNTER: AtomicU32 = AtomicU32::new(1);
static ref COUNTER: Mutex<usize> = Mutex::new(1);
}
let mut counter = COUNTER.lock().unwrap(); let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
let prev = key.compare_and_swap(0, *counter, Ordering::AcqRel); if counter > u32::max_value() / 2 {
std::process::abort();
}
if prev == 0 { match key.compare_and_swap(0, counter, Ordering::AcqRel) {
*counter += 1; 0 => counter,
*counter - 1 k => k,
} else {
prev
} }
} }
let key = self.__key.load(Ordering::Acquire); match self.__key.load(Ordering::Acquire) {
if key == 0 { init(&self.__key) } else { key } 0 => init(&self.__key),
k => k,
}
} }
} }
@ -216,51 +210,55 @@ impl fmt::Display for AccessError {
impl Error for AccessError {} impl Error for AccessError {}
/// A key-value entry in a map of task-locals.
struct Entry {
/// Key identifying the task-local variable.
key: u32,
/// Value stored in this entry.
value: Box<dyn Send>,
}
/// A map that holds task-locals. /// A map that holds task-locals.
pub(crate) struct Map { pub(crate) struct LocalsMap {
/// A list of `(key, value)` entries sorted by the key. /// A list of key-value entries sorted by the key.
entries: UnsafeCell<Vec<(usize, Box<dyn Send>)>>, entries: UnsafeCell<Option<Vec<Entry>>>,
} }
impl Map { impl LocalsMap {
/// Creates an empty map of task-locals. /// Creates an empty map of task-locals.
pub fn new() -> Map { pub fn new() -> LocalsMap {
Map { LocalsMap {
entries: UnsafeCell::new(Vec::new()), entries: UnsafeCell::new(Some(Vec::new())),
} }
} }
/// Returns a thread-local value associated with `key` or inserts one constructed by `init`. /// Returns a task-local value associated with `key` or inserts one constructed by `init`.
#[inline] #[inline]
pub fn get_or_insert(&self, key: usize, init: impl FnOnce() -> Box<dyn Send>) -> &dyn Send { pub fn get_or_insert(&self, key: u32, init: impl FnOnce() -> Box<dyn Send>) -> &dyn Send {
let entries = unsafe { &mut *self.entries.get() }; match unsafe { (*self.entries.get()).as_mut() } {
None => panic!("can't access task-locals while the task is being dropped"),
let index = match entries.binary_search_by_key(&key, |e| e.0) { Some(entries) => {
let index = match entries.binary_search_by_key(&key, |e| e.key) {
Ok(i) => i, Ok(i) => i,
Err(i) => { Err(i) => {
entries.insert(i, (key, init())); let value = init();
entries.insert(i, Entry { key, value });
i i
} }
}; };
&*entries[index].value
&*entries[index].1 }
} }
/// Clears the map and drops all task-locals.
pub fn clear(&self) {
let entries = unsafe { &mut *self.entries.get() };
entries.clear();
} }
}
// Wrap the future into one that drops task-local variables on exit.
pub(crate) unsafe fn add_finalizer<T>(f: impl Future<Output = T>) -> impl Future<Output = T> {
async move {
let res = f.await;
// Abort on panic because thread-local variables behave the same way.
abort_on_panic(|| worker::get_task(|task| task.metadata().local_map.clear()));
res /// Clears the map and drops all task-locals.
///
/// This method is only safe to call at the end of the task.
pub unsafe fn clear(&self) {
// Since destructors may attempt to access task-locals, we musnt't hold a mutable reference
// to the `Vec` while dropping them. Instead, we first take the `Vec` out and then drop it.
let entries = (*self.entries.get()).take();
drop(entries);
} }
} }

@ -1,110 +0,0 @@
use std::cell::Cell;
use std::ptr;
use crossbeam_deque::Worker;
use super::pool;
use super::task;
use super::Task;
use crate::utils::abort_on_panic;
/// Returns a handle to the current task.
///
/// # Panics
///
/// This function will panic if not called within the context of a task created by [`block_on`],
/// [`spawn`], or [`Builder::spawn`].
///
/// [`block_on`]: fn.block_on.html
/// [`spawn`]: fn.spawn.html
/// [`Builder::spawn`]: struct.Builder.html#method.spawn
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::task;
///
/// println!("The name of this task is {:?}", task::current().name());
/// #
/// # })
/// ```
pub fn current() -> Task {
get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task")
}
thread_local! {
static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut());
}
pub(crate) fn set_tag<F, R>(tag: *const task::Tag, f: F) -> R
where
F: FnOnce() -> R,
{
struct ResetTag<'a>(&'a Cell<*const task::Tag>);
impl Drop for ResetTag<'_> {
fn drop(&mut self) {
self.0.set(ptr::null());
}
}
TAG.with(|t| {
t.set(tag);
let _guard = ResetTag(t);
f()
})
}
pub(crate) fn get_task<F, R>(f: F) -> Option<R>
where
F: FnOnce(&Task) -> R,
{
let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) });
match res {
Ok(Some(val)) => Some(val),
Ok(None) | Err(_) => None,
}
}
thread_local! {
static IS_WORKER: Cell<bool> = Cell::new(false);
static QUEUE: Cell<Option<Worker<task::Runnable>>> = Cell::new(None);
}
pub(crate) fn is_worker() -> bool {
IS_WORKER.with(|is_worker| is_worker.get())
}
fn get_queue<F: FnOnce(&Worker<task::Runnable>) -> T, T>(f: F) -> T {
QUEUE.with(|queue| {
let q = queue.take().unwrap();
let ret = f(&q);
queue.set(Some(q));
ret
})
}
pub(crate) fn schedule(task: task::Runnable) {
if is_worker() {
get_queue(|q| q.push(task));
} else {
pool::get().injector.push(task);
}
pool::get().sleepers.notify_one();
}
pub(crate) fn main_loop(worker: Worker<task::Runnable>) {
IS_WORKER.with(|is_worker| is_worker.set(true));
QUEUE.with(|queue| queue.set(Some(worker)));
loop {
match get_queue(|q| pool::get().find_task(q)) {
Some(task) => set_tag(task.tag(), || abort_on_panic(|| task.run())),
None => pool::get().sleepers.wait(),
}
}
}

@ -1,8 +1,8 @@
use std::pin::Pin;
use crate::future::Future; use crate::future::Future;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
use std::pin::Pin;
/// Cooperatively gives up a timeslice to the task scheduler. /// Cooperatively gives up a timeslice to the task scheduler.
/// ///
/// Calling this function will move the currently executing future to the back /// Calling this function will move the currently executing future to the back

@ -1,5 +1,4 @@
use std::mem; use std::mem;
use std::process;
/// Calls a function and aborts if it panics. /// Calls a function and aborts if it panics.
/// ///
@ -10,7 +9,7 @@ pub fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
impl Drop for Bomb { impl Drop for Bomb {
fn drop(&mut self) { fn drop(&mut self) {
process::abort(); std::process::abort();
} }
} }
@ -20,6 +19,53 @@ pub fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
t t
} }
/// Generates a random number in `0..n`.
pub fn random(n: u32) -> u32 {
use std::cell::Cell;
use std::num::Wrapping;
thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1_406_868_647));
}
RNG.with(|rng| {
// This is the 32-bit variant of Xorshift.
//
// Source: https://en.wikipedia.org/wiki/Xorshift
let mut x = rng.get();
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
rng.set(x);
// This is a fast alternative to `x % n`.
//
// Author: Daniel Lemire
// Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32
})
}
/// Defers evaluation of a block of code until the end of the scope.
#[doc(hidden)]
macro_rules! defer {
($($body:tt)*) => {
let _guard = {
pub struct Guard<F: FnOnce()>(Option<F>);
impl<F: FnOnce()> Drop for Guard<F> {
fn drop(&mut self) {
(self.0).take().map(|f| f());
}
}
Guard(Some(|| {
let _ = { $($body)* };
}))
};
};
}
/// Declares unstable items. /// Declares unstable items.
#[doc(hidden)] #[doc(hidden)]
macro_rules! cfg_unstable { macro_rules! cfg_unstable {

@ -1,3 +1,5 @@
#![cfg(feature = "unstable")]
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;

Loading…
Cancel
Save