From b942d0a40580e1df63ddcbf7505df0fe625c5a77 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Tue, 29 Oct 2019 21:44:56 +0800 Subject: [PATCH 01/15] add stream-min --- src/stream/stream/min.rs | 60 ++++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 37 +++++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/stream/stream/min.rs diff --git a/src/stream/stream/min.rs b/src/stream/stream/min.rs new file mode 100644 index 00000000..1ab56065 --- /dev/null +++ b/src/stream/stream/min.rs @@ -0,0 +1,60 @@ +use std::marker::PhantomData; +use std::cmp::{Ordering, Ord}; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct MinFuture { + #[pin] + stream: S, + _compare: PhantomData, + min: Option, + } +} + +impl MinFuture { + pub(super) fn new(stream: S) -> Self { + Self { + stream, + _compare: PhantomData, + min: None, + } + } +} + +impl Future for MinFuture +where + S: Stream, + S::Item: Ord, + F: FnMut(&S::Item, &S::Item) -> Ordering, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(new) => { + cx.waker().wake_by_ref(); + match this.min.take() { + None => *this.min = Some(new), + + Some(old) => match new.cmp(&old) { + Ordering::Less => *this.min = Some(new), + _ => *this.min = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(this.min.take()), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e8190387..27090a5b 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -41,6 +41,7 @@ mod le; mod lt; mod map; mod max_by; +mod min; mod min_by; mod min_by_key; mod next; @@ -71,6 +72,7 @@ use last::LastFuture; use le::LeFuture; use lt::LtFuture; use max_by::MaxByFuture; +use min::MinFuture; use min_by::MinByFuture; use min_by_key::MinByKeyFuture; use next::NextFuture; @@ -753,6 +755,41 @@ extension_trait! { self, compare: F, ) -> impl Future> [MinByFuture] + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + MinByFuture::new(self, compare) + } + + #[doc = r#" + Returns the element that gives the minimum value. If several elements are equally minimum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + + let min = s.clone().min().await; + assert_eq!(min, Some(1)); + + let min = VecDeque::::new().min().await; + assert_eq!(min, None); + # + # }) } + ``` + "#] + fn min_by( + self, + compare: F, + ) -> impl Future> [MinByFuture] where Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering, From 021862dcc88e6bdda67f010cd0d127e741efae1e Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Tue, 29 Oct 2019 21:49:30 +0800 Subject: [PATCH 02/15] fix min --- src/stream/stream/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 27090a5b..5c42989f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -786,15 +786,14 @@ extension_trait! { # }) } ``` "#] - fn min_by( + fn min( self, - compare: F, - ) -> impl Future> [MinByFuture] + ) -> impl Future> [MinFuture] where Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering, { - MinByFuture::new(self, compare) + MinFuture::new(self) } #[doc = r#" From 3620b2b6abcc42ef1955803805a2ffd322bd61ef Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 30 Oct 2019 09:17:12 +0900 Subject: [PATCH 03/15] fix: Add only rustfmt on Checking fmt and docs actions --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 653834a7..dd8ec899 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,8 +59,10 @@ jobs: - uses: actions-rs/toolchain@v1 with: + profile: minimal toolchain: ${{ steps.component.outputs.toolchain }} override: true + components: rustfmt - name: setup run: | From ff6a44fcd5e6b122ca42ae7563b7d155bcde6f66 Mon Sep 17 00:00:00 2001 From: Wu Yu Wei Date: Wed, 30 Oct 2019 19:23:08 +0800 Subject: [PATCH 04/15] Use once_cell instead of lazy_static (#416) `once_cell` provides a neat way of initializing lazy singletons without macro. This PR use `sync::Lazy` to streamline same pattern proposed in related rust RFC. Resolve #406 --- Cargo.toml | 2 +- src/net/driver/mod.rs | 34 +++++++++++++++----------------- src/task/blocking.rs | 44 +++++++++++++++++++++--------------------- src/task/pool.rs | 40 ++++++++++++++++++-------------------- src/task/task_local.rs | 6 ++---- 5 files changed, 60 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ab74dd01..dcf2c7d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,12 +33,12 @@ crossbeam-utils = "0.6.6" futures-core-preview = "=0.3.0-alpha.19" futures-io-preview = "=0.3.0-alpha.19" futures-timer = "1.0.2" -lazy_static = "1.4.0" log = { version = "0.4.8", features = ["kv_unstable"] } memchr = "2.2.1" mio = "0.6.19" mio-uds = "0.6.7" num_cpus = "1.10.1" +once_cell = "1.2.0" pin-utils = "0.1.0-alpha.4" slab = "0.4.2" kv-log-macro = "1.0.4" diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index 806acdbe..40e0abb2 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -1,8 +1,8 @@ use std::fmt; use std::sync::{Arc, Mutex}; -use lazy_static::lazy_static; use mio::{self, Evented}; +use once_cell::sync::Lazy; use slab::Slab; use crate::io; @@ -100,25 +100,23 @@ impl Reactor { // } } -lazy_static! { - /// The state of the global networking driver. - static ref REACTOR: Reactor = { - // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O - // handles. - std::thread::Builder::new() - .name("async-net-driver".to_string()) - .spawn(move || { - // If the driver thread panics, there's not much we can do. It is not a - // recoverable error and there is no place to propagate it into so we just abort. - abort_on_panic(|| { - main_loop().expect("async networking thread has panicked"); - }) +/// The state of the global networking driver. +static REACTOR: Lazy = Lazy::new(|| { + // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O + // handles. + std::thread::Builder::new() + .name("async-net-driver".to_string()) + .spawn(move || { + // If the driver thread panics, there's not much we can do. It is not a + // recoverable error and there is no place to propagate it into so we just abort. + abort_on_panic(|| { + main_loop().expect("async networking thread has panicked"); }) - .expect("cannot start a thread driving blocking tasks"); + }) + .expect("cannot start a thread driving blocking tasks"); - Reactor::new().expect("cannot initialize reactor") - }; -} + Reactor::new().expect("cannot initialize reactor") +}); /// Waits on the poller for new events and wakes up tasks blocked on I/O handles. fn main_loop() -> io::Result<()> { diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 3216012a..1f1a222a 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -5,7 +5,7 @@ use std::thread; use std::time::Duration; use crossbeam_channel::{bounded, Receiver, Sender}; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use crate::task::task::{JoinHandle, Tag}; use crate::utils::abort_on_panic; @@ -19,30 +19,30 @@ struct Pool { receiver: Receiver>, } -lazy_static! { - static ref POOL: Pool = { - for _ in 0..2 { - thread::Builder::new() - .name("async-blocking-driver".to_string()) - .spawn(|| abort_on_panic(|| { +static POOL: Lazy = Lazy::new(|| { + for _ in 0..2 { + thread::Builder::new() + .name("async-blocking-driver".to_string()) + .spawn(|| { + abort_on_panic(|| { for task in &POOL.receiver { task.run(); } - })) - .expect("cannot start a thread driving blocking tasks"); - } - - // We want to use an unbuffered channel here to help - // us drive our dynamic control. In effect, the - // kernel's scheduler becomes the queue, reducing - // the number of buffers that work must flow through - // before being acted on by a core. This helps keep - // latency snappy in the overall async system by - // reducing bufferbloat. - let (sender, receiver) = bounded(0); - Pool { sender, receiver } - }; -} + }) + }) + .expect("cannot start a thread driving blocking tasks"); + } + + // We want to use an unbuffered channel here to help + // us drive our dynamic control. In effect, the + // kernel's scheduler becomes the queue, reducing + // the number of buffers that work must flow through + // before being acted on by a core. This helps keep + // latency snappy in the overall async system by + // reducing bufferbloat. + let (sender, receiver) = bounded(0); + Pool { sender, receiver } +}); // Create up to MAX_THREADS dynamic blocking task worker threads. // Dynamic threads will terminate themselves if they don't diff --git a/src/task/pool.rs b/src/task/pool.rs index bfaa17d4..3fd70470 100644 --- a/src/task/pool.rs +++ b/src/task/pool.rs @@ -3,7 +3,7 @@ use std::thread; use crossbeam_deque::{Injector, Stealer, Worker}; use kv_log_macro::trace; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use super::sleepers::Sleepers; use super::task; @@ -111,28 +111,26 @@ impl Pool { #[inline] pub(crate) fn get() -> &'static Pool { - lazy_static! { - static ref POOL: Pool = { - let num_threads = num_cpus::get().max(1); - let mut stealers = Vec::new(); + static POOL: Lazy = Lazy::new(|| { + let num_threads = num_cpus::get().max(1); + let mut stealers = Vec::new(); - // Spawn worker threads. - for _ in 0..num_threads { - let worker = Worker::new_fifo(); - stealers.push(worker.stealer()); + // Spawn worker threads. + for _ in 0..num_threads { + let worker = Worker::new_fifo(); + stealers.push(worker.stealer()); - thread::Builder::new() - .name("async-task-driver".to_string()) - .spawn(|| abort_on_panic(|| worker::main_loop(worker))) - .expect("cannot start a thread driving tasks"); - } + thread::Builder::new() + .name("async-task-driver".to_string()) + .spawn(|| abort_on_panic(|| worker::main_loop(worker))) + .expect("cannot start a thread driving tasks"); + } - Pool { - injector: Injector::new(), - stealers, - sleepers: Sleepers::new(), - } - }; - } + Pool { + injector: Injector::new(), + stealers, + sleepers: Sleepers::new(), + } + }); &*POOL } diff --git a/src/task/task_local.rs b/src/task/task_local.rs index c72937f6..e92f4f92 100644 --- a/src/task/task_local.rs +++ b/src/task/task_local.rs @@ -5,7 +5,7 @@ use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use super::worker; use crate::utils::abort_on_panic; @@ -174,9 +174,7 @@ impl LocalKey { fn key(&self) -> usize { #[cold] fn init(key: &AtomicUsize) -> usize { - lazy_static! { - static ref COUNTER: Mutex = Mutex::new(1); - } + static COUNTER: Lazy> = Lazy::new(|| Mutex::new(1)); let mut counter = COUNTER.lock().unwrap(); let prev = key.compare_and_swap(0, *counter, Ordering::AcqRel); From 5fee91c0502cff2618649210928c50c116474d7a Mon Sep 17 00:00:00 2001 From: JayatiGoyal <44127709+JayatiGoyal@users.noreply.github.com> Date: Thu, 31 Oct 2019 00:36:42 +0530 Subject: [PATCH 05/15] corrected a typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7aeaed86..9af20a39 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ syntax. ## Features - __Modern:__ Built from the ground up for `std::future` and `async/await` with - blazing fast compilation times. + blazing fast compilation time. - __Fast:__ Our robust allocator and threadpool designs provide ultra-high throughput with predictably low latency. - __Intuitive:__ Complete parity with the stdlib means you only need to learn From f5efaaa7ba82e6a0707a82ffa6cda499fdb6d694 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Thu, 31 Oct 2019 14:44:19 +0800 Subject: [PATCH 06/15] Add stream eq --- src/stream/stream/eq.rs | 61 ++++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 38 +++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 src/stream/stream/eq.rs diff --git a/src/stream/stream/eq.rs b/src/stream/stream/eq.rs new file mode 100644 index 00000000..42a37d84 --- /dev/null +++ b/src/stream/stream/eq.rs @@ -0,0 +1,61 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + // Lexicographically compares the elements of this `Stream` with those + // of another. + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct EqFuture { + #[pin] + l: Fuse, + #[pin] + r: Fuse, + } +} + +impl EqFuture +where + L::Item: PartialEq, +{ + pub(super) fn new(l: L, r: R) -> Self { + EqFuture { + l: l.fuse(), + r: r.fuse(), + } + } +} + +impl Future for EqFuture + where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx)); + let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx)); + + if this.l.done && this.r.done { + return Poll::Ready(true); + } + + match (l_val, r_val) { + (Some(l), Some(r)) if l != r => {return Poll::Ready(false);}, + _ => {}, + } + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e8190387..07cd03a1 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -26,6 +26,7 @@ mod any; mod chain; mod cmp; mod enumerate; +mod eq; mod filter; mod filter_map; mod find; @@ -60,6 +61,7 @@ use all::AllFuture; use any::AnyFuture; use cmp::CmpFuture; use enumerate::Enumerate; +use eq::EqFuture; use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture; @@ -1622,6 +1624,42 @@ extension_trait! { GeFuture::new(self, other) } + #[doc = r#" + Determines if the elements of this `Stream` are lexicographically + equal to those of another. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let single: VecDeque = vec![1].into_iter().collect(); + let single_eq: VecDeque = vec![10].into_iter().collect(); + let multi: VecDeque = vec![1,2].into_iter().collect(); + let multi_eq: VecDeque = vec![1,5].into_iter().collect(); + assert_eq!(single.clone().eq(single.clone()).await, true); + assert_eq!(single_eq.clone().eq(single.clone()).await, false); + assert_eq!(multi.clone().eq(single_eq.clone()).await, false); + assert_eq!(multi_eq.clone().eq(multi.clone()).await, false); + # + # }) } + ``` + "#] + fn eq( + self, + other: S + ) -> impl Future [EqFuture] + where + Self: Sized + Stream, + S: Sized + Stream, + ::Item: PartialEq, + { + EqFuture::new(self, other) + } + #[doc = r#" Determines if the elements of this `Stream` are lexicographically greater than those of another. From 17db7ffcd35e4c7e350adba6e3f39daddce52536 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Thu, 31 Oct 2019 18:05:51 +0800 Subject: [PATCH 07/15] Add stream ne --- src/stream/stream/mod.rs | 35 +++++++++++++++++++++++ src/stream/stream/ne.rs | 62 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/stream/stream/ne.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e8190387..b8fac7e5 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -43,6 +43,7 @@ mod map; mod max_by; mod min_by; mod min_by_key; +mod ne; mod next; mod nth; mod partial_cmp; @@ -73,6 +74,7 @@ use lt::LtFuture; use max_by::MaxByFuture; use min_by::MinByFuture; use min_by_key::MinByKeyFuture; +use ne::NeFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; @@ -1586,6 +1588,39 @@ extension_trait! { CmpFuture::new(self, other) } + #[doc = r#" + Determines if the elements of this `Stream` are lexicographically + not equal to those of another. + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + let single: VecDeque = vec![1].into_iter().collect(); + let single_ne: VecDeque = vec![10].into_iter().collect(); + let multi: VecDeque = vec![1,2].into_iter().collect(); + let multi_ne: VecDeque = vec![1,5].into_iter().collect(); + assert_eq!(single.clone().ne(single.clone()).await, false); + assert_eq!(single_ne.clone().ne(single.clone()).await, true); + assert_eq!(multi.clone().ne(single_ne.clone()).await, true); + assert_eq!(multi_ne.clone().ne(multi.clone()).await, true); + # + # }) } + ``` + "#] + fn ne( + self, + other: S + ) -> impl Future [NeFuture] + where + Self: Sized + Stream, + S: Sized + Stream, + ::Item: PartialEq, + { + NeFuture::new(self, other) + } + #[doc = r#" Determines if the elements of this `Stream` are lexicographically greater than or equal to those of another. diff --git a/src/stream/stream/ne.rs b/src/stream/stream/ne.rs new file mode 100644 index 00000000..2f17ed0e --- /dev/null +++ b/src/stream/stream/ne.rs @@ -0,0 +1,62 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + // Lexicographically compares the elements of this `Stream` with those + // of another. + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct NeFuture { + #[pin] + l: Fuse, + #[pin] + r: Fuse, + } +} + +impl NeFuture + where + L::Item: PartialEq, +{ + pub(super) fn new(l: L, r: R) -> Self { + Self { + l: l.fuse(), + r: r.fuse(), + } + } +} + +impl Future for NeFuture + where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx)); + let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx)); + + if this.l.done || this.r.done { + return Poll::Ready(false); + } + + match (l_val, r_val) { + (Some(l), Some(r)) if l == r => {continue;}, + _ => { return Poll::Ready(true); }, + } + + } + } +} \ No newline at end of file From 204da3339152596667ff6e6872afda73c997f517 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Thu, 31 Oct 2019 21:16:13 +0800 Subject: [PATCH 08/15] fmt code --- src/stream/stream/ne.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/stream/stream/ne.rs b/src/stream/stream/ne.rs index 2f17ed0e..ffeaca81 100644 --- a/src/stream/stream/ne.rs +++ b/src/stream/stream/ne.rs @@ -22,8 +22,8 @@ pin_project! { } impl NeFuture - where - L::Item: PartialEq, +where + L::Item: PartialEq, { pub(super) fn new(l: L, r: R) -> Self { Self { @@ -34,10 +34,10 @@ impl NeFuture } impl Future for NeFuture - where - L: Stream + Sized, - R: Stream + Sized, - L::Item: PartialEq, +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, { type Output = bool; @@ -53,10 +53,13 @@ impl Future for NeFuture } match (l_val, r_val) { - (Some(l), Some(r)) if l == r => {continue;}, - _ => { return Poll::Ready(true); }, + (Some(l), Some(r)) if l == r => { + continue; + } + _ => { + return Poll::Ready(true); + } } - } } -} \ No newline at end of file +} From 1ab3d901e42fb7c2e9b44303c2b4cdf5116d68b9 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Thu, 31 Oct 2019 21:17:07 +0800 Subject: [PATCH 09/15] fmt code --- src/stream/stream/eq.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/stream/stream/eq.rs b/src/stream/stream/eq.rs index 42a37d84..5343c1a0 100644 --- a/src/stream/stream/eq.rs +++ b/src/stream/stream/eq.rs @@ -34,10 +34,10 @@ where } impl Future for EqFuture - where - L: Stream + Sized, - R: Stream + Sized, - L::Item: PartialEq, +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, { type Output = bool; @@ -53,8 +53,10 @@ impl Future for EqFuture } match (l_val, r_val) { - (Some(l), Some(r)) if l != r => {return Poll::Ready(false);}, - _ => {}, + (Some(l), Some(r)) if l != r => { + return Poll::Ready(false); + } + _ => {} } } } From 48c82a9668ec3f18246d7cb5066b72f1e5e3133d Mon Sep 17 00:00:00 2001 From: zhangguyu Date: Thu, 31 Oct 2019 22:33:17 +0800 Subject: [PATCH 10/15] Add stream position --- src/stream/stream/mod.rs | 41 ++++++++++++++++++++++++++++ src/stream/stream/position.rs | 51 +++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 src/stream/stream/position.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e8190387..c4abe32f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -46,6 +46,7 @@ mod min_by_key; mod next; mod nth; mod partial_cmp; +mod position; mod scan; mod skip; mod skip_while; @@ -76,6 +77,7 @@ use min_by_key::MinByKeyFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; +use position::PositionFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; @@ -1548,6 +1550,45 @@ extension_trait! { PartialCmpFuture::new(self, other) } + #[doc = r#" + Searches for an element in a Stream that satisfies a predicate, returning + its index. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let res = s.clone().position(|x| *x == 1).await; + assert_eq!(res, Some(0)); + + let res = s.clone().position(|x| *x == 2).await; + assert_eq!(res, Some(1)); + + let res = s.clone().position(|x| *x == 3).await; + assert_eq!(res, Some(2)); + + let res = s.clone().position(|x| *x == 4).await; + assert_eq!(res, None); + # + # }) } + ``` + "#] + fn position

( + self, + predicate: P + ) -> impl Future> [PositionFuture] + where + Self: Sized + Stream, + P: FnMut(&Self::Item) -> bool, + { + PositionFuture::new(self, predicate) + } + #[doc = r#" Lexicographically compares the elements of this `Stream` with those of another using 'Ord'. diff --git a/src/stream/stream/position.rs b/src/stream/stream/position.rs new file mode 100644 index 00000000..3cd5b84c --- /dev/null +++ b/src/stream/stream/position.rs @@ -0,0 +1,51 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct PositionFuture { + #[pin] + stream: S, + predicate: P, + index:usize, + } +} + +impl PositionFuture { + pub(super) fn new(stream: S, predicate: P) -> Self { + PositionFuture { + stream, + predicate, + index: 0, + } + } +} + +impl Future for PositionFuture +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)), + Some(_) => { + cx.waker().wake_by_ref(); + *this.index += 1; + Poll::Pending + } + None => Poll::Ready(None), + } + } +} From 07d21e5eb37e6ddc2a1819dd0d27b83338f21299 Mon Sep 17 00:00:00 2001 From: zhangguyu Date: Thu, 31 Oct 2019 23:30:11 +0800 Subject: [PATCH 11/15] change trait bounds --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index c4abe32f..0469c7ae 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1583,7 +1583,7 @@ extension_trait! { predicate: P ) -> impl Future> [PositionFuture] where - Self: Sized + Stream, + Self: Sized, P: FnMut(&Self::Item) -> bool, { PositionFuture::new(self, predicate) From 3dd59d7056936c6ec66b6f5579bbd8ed90746038 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 1 Nov 2019 02:45:33 +0100 Subject: [PATCH 12/15] Refactor the task module (#421) * Refactor the task module * Fix clippy warning * Simplify task-local entries * Reduce the amount of future wrapping * Cleanup * Simplify stealing --- Cargo.toml | 6 +- src/fs/canonicalize.rs | 4 +- src/fs/copy.rs | 4 +- src/fs/create_dir.rs | 4 +- src/fs/create_dir_all.rs | 4 +- src/fs/dir_builder.rs | 4 +- src/fs/dir_entry.rs | 6 +- src/fs/file.rs | 22 +- src/fs/hard_link.rs | 4 +- src/fs/metadata.rs | 4 +- src/fs/open_options.rs | 4 +- src/fs/read.rs | 4 +- src/fs/read_dir.rs | 6 +- src/fs/read_link.rs | 4 +- src/fs/read_to_string.rs | 4 +- src/fs/remove_dir.rs | 4 +- src/fs/remove_dir_all.rs | 4 +- src/fs/remove_file.rs | 4 +- src/fs/rename.rs | 4 +- src/fs/set_permissions.rs | 4 +- src/fs/symlink_metadata.rs | 4 +- src/fs/write.rs | 4 +- src/io/stderr.rs | 6 +- src/io/stdin.rs | 6 +- src/io/stdout.rs | 6 +- src/net/addr.rs | 6 +- src/net/driver/mod.rs | 2 +- src/net/tcp/stream.rs | 5 +- src/os/unix/fs.rs | 4 +- src/os/unix/net/datagram.rs | 4 +- src/os/unix/net/listener.rs | 4 +- src/os/unix/net/stream.rs | 4 +- src/task/block_on.rs | 177 +++++------- src/task/builder.rs | 58 +++- src/task/current.rs | 28 ++ src/task/executor/mod.rs | 13 + src/task/executor/pool.rs | 140 ++++++++++ src/task/{ => executor}/sleepers.rs | 0 src/task/join_handle.rs | 56 ++++ src/task/mod.rs | 68 ++--- src/task/pool.rs | 136 --------- src/task/spawn.rs | 31 +++ src/task/{blocking.rs => spawn_blocking.rs} | 101 ++++--- src/task/task.rs | 294 +++++++++----------- src/task/task_id.rs | 35 +++ src/task/task_local.rs | 116 ++++---- src/task/worker.rs | 110 -------- src/task/yield_now.rs | 4 +- src/utils.rs | 50 +++- tests/channel.rs | 2 + 50 files changed, 817 insertions(+), 761 deletions(-) create mode 100644 src/task/current.rs create mode 100644 src/task/executor/mod.rs create mode 100644 src/task/executor/pool.rs rename src/task/{ => executor}/sleepers.rs (100%) create mode 100644 src/task/join_handle.rs delete mode 100644 src/task/pool.rs create mode 100644 src/task/spawn.rs rename src/task/{blocking.rs => spawn_blocking.rs} (67%) create mode 100644 src/task/task_id.rs delete mode 100644 src/task/worker.rs diff --git a/Cargo.toml b/Cargo.toml index dcf2c7d0..63897053 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,23 +27,23 @@ unstable = ["broadcaster"] [dependencies] async-macros = "1.0.0" async-task = "1.0.0" +broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] } crossbeam-channel = "0.3.9" crossbeam-deque = "0.7.1" crossbeam-utils = "0.6.6" futures-core-preview = "=0.3.0-alpha.19" futures-io-preview = "=0.3.0-alpha.19" futures-timer = "1.0.2" +kv-log-macro = "1.0.4" log = { version = "0.4.8", features = ["kv_unstable"] } memchr = "2.2.1" mio = "0.6.19" mio-uds = "0.6.7" num_cpus = "1.10.1" once_cell = "1.2.0" +pin-project-lite = "0.1" pin-utils = "0.1.0-alpha.4" slab = "0.4.2" -kv-log-macro = "1.0.4" -broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] } -pin-project-lite = "0.1" [dev-dependencies] femme = "1.2.0" diff --git a/src/fs/canonicalize.rs b/src/fs/canonicalize.rs index 601d477c..6eb6977d 100644 --- a/src/fs/canonicalize.rs +++ b/src/fs/canonicalize.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::{Path, PathBuf}; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Returns the canonical form of a path. /// @@ -32,5 +32,5 @@ use crate::task::blocking; /// ``` pub async fn canonicalize>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::canonicalize(&path).map(Into::into)).await + spawn_blocking(move || std::fs::canonicalize(&path).map(Into::into)).await } diff --git a/src/fs/copy.rs b/src/fs/copy.rs index 733fb64b..170b66ec 100644 --- a/src/fs/copy.rs +++ b/src/fs/copy.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Copies the contents and permissions of a file to a new location. /// @@ -41,5 +41,5 @@ use crate::task::blocking; pub async fn copy, Q: AsRef>(from: P, to: Q) -> io::Result { let from = from.as_ref().to_owned(); let to = to.as_ref().to_owned(); - blocking::spawn(move || std::fs::copy(&from, &to)).await + spawn_blocking(move || std::fs::copy(&from, &to)).await } diff --git a/src/fs/create_dir.rs b/src/fs/create_dir.rs index 740d303c..03c24918 100644 --- a/src/fs/create_dir.rs +++ b/src/fs/create_dir.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Creates a new directory. /// @@ -34,5 +34,5 @@ use crate::task::blocking; /// ``` pub async fn create_dir>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::create_dir(path)).await + spawn_blocking(move || std::fs::create_dir(path)).await } diff --git a/src/fs/create_dir_all.rs b/src/fs/create_dir_all.rs index 76604de7..15241943 100644 --- a/src/fs/create_dir_all.rs +++ b/src/fs/create_dir_all.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Creates a new directory and all of its parents if they are missing. /// @@ -29,5 +29,5 @@ use crate::task::blocking; /// ``` pub async fn create_dir_all>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::create_dir_all(path)).await + spawn_blocking(move || std::fs::create_dir_all(path)).await } diff --git a/src/fs/dir_builder.rs b/src/fs/dir_builder.rs index a55a9a92..9ee6b55a 100644 --- a/src/fs/dir_builder.rs +++ b/src/fs/dir_builder.rs @@ -2,7 +2,7 @@ use std::future::Future; use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// A builder for creating directories with configurable options. /// @@ -107,7 +107,7 @@ impl DirBuilder { } let path = path.as_ref().to_owned(); - async move { blocking::spawn(move || builder.create(path)).await } + async move { spawn_blocking(move || builder.create(path)).await } } } diff --git a/src/fs/dir_entry.rs b/src/fs/dir_entry.rs index 959e2ada..527fab42 100644 --- a/src/fs/dir_entry.rs +++ b/src/fs/dir_entry.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use crate::fs::{FileType, Metadata}; use crate::io; use crate::path::PathBuf; -use crate::task::blocking; +use crate::task::spawn_blocking; /// An entry in a directory. /// @@ -87,7 +87,7 @@ impl DirEntry { /// ``` pub async fn metadata(&self) -> io::Result { let inner = self.0.clone(); - blocking::spawn(move || inner.metadata()).await + spawn_blocking(move || inner.metadata()).await } /// Reads the file type for this entry. @@ -125,7 +125,7 @@ impl DirEntry { /// ``` pub async fn file_type(&self) -> io::Result { let inner = self.0.clone(); - blocking::spawn(move || inner.file_type()).await + spawn_blocking(move || inner.file_type()).await } /// Returns the bare name of this entry without the leading path. diff --git a/src/fs/file.rs b/src/fs/file.rs index 745a5848..8bc6c2ce 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -12,7 +12,7 @@ use crate::future; use crate::io::{self, Read, Seek, SeekFrom, Write}; use crate::path::Path; use crate::prelude::*; -use crate::task::{self, blocking, Context, Poll, Waker}; +use crate::task::{self, spawn_blocking, Context, Poll, Waker}; /// An open file on the filesystem. /// @@ -112,7 +112,7 @@ impl File { /// ``` pub async fn open>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let file = blocking::spawn(move || std::fs::File::open(&path)).await?; + let file = spawn_blocking(move || std::fs::File::open(&path)).await?; Ok(File::new(file, true)) } @@ -147,7 +147,7 @@ impl File { /// ``` pub async fn create>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let file = blocking::spawn(move || std::fs::File::create(&path)).await?; + let file = spawn_blocking(move || std::fs::File::create(&path)).await?; Ok(File::new(file, true)) } @@ -180,7 +180,7 @@ impl File { }) .await?; - blocking::spawn(move || state.file.sync_all()).await + spawn_blocking(move || state.file.sync_all()).await } /// Synchronizes OS-internal buffered contents to disk. @@ -216,7 +216,7 @@ impl File { }) .await?; - blocking::spawn(move || state.file.sync_data()).await + spawn_blocking(move || state.file.sync_data()).await } /// Truncates or extends the file. @@ -249,7 +249,7 @@ impl File { }) .await?; - blocking::spawn(move || state.file.set_len(size)).await + spawn_blocking(move || state.file.set_len(size)).await } /// Reads the file's metadata. @@ -268,7 +268,7 @@ impl File { /// ``` pub async fn metadata(&self) -> io::Result { let file = self.file.clone(); - blocking::spawn(move || file.metadata()).await + spawn_blocking(move || file.metadata()).await } /// Changes the permissions on the file. @@ -297,7 +297,7 @@ impl File { /// ``` pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> { let file = self.file.clone(); - blocking::spawn(move || file.set_permissions(perm)).await + spawn_blocking(move || file.set_permissions(perm)).await } } @@ -692,7 +692,7 @@ impl LockGuard { self.register(cx); // Start a read operation asynchronously. - blocking::spawn(move || { + spawn_blocking(move || { // Read some data from the file into the cache. let res = { let State { file, cache, .. } = &mut *self; @@ -801,7 +801,7 @@ impl LockGuard { self.register(cx); // Start a write operation asynchronously. - blocking::spawn(move || { + spawn_blocking(move || { match (&*self.file).write_all(&self.cache) { Ok(_) => { // Switch to idle mode. @@ -834,7 +834,7 @@ impl LockGuard { self.register(cx); // Start a flush operation asynchronously. - blocking::spawn(move || { + spawn_blocking(move || { match (&*self.file).flush() { Ok(()) => { // Mark the file as flushed. diff --git a/src/fs/hard_link.rs b/src/fs/hard_link.rs index 8b09b5d1..e6e56cd5 100644 --- a/src/fs/hard_link.rs +++ b/src/fs/hard_link.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Creates a hard link on the filesystem. /// @@ -32,5 +32,5 @@ use crate::task::blocking; pub async fn hard_link, Q: AsRef>(from: P, to: Q) -> io::Result<()> { let from = from.as_ref().to_owned(); let to = to.as_ref().to_owned(); - blocking::spawn(move || std::fs::hard_link(&from, &to)).await + spawn_blocking(move || std::fs::hard_link(&from, &to)).await } diff --git a/src/fs/metadata.rs b/src/fs/metadata.rs index 4afc5595..1383ec21 100644 --- a/src/fs/metadata.rs +++ b/src/fs/metadata.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Reads metadata for a path. /// @@ -34,7 +34,7 @@ use crate::task::blocking; /// ``` pub async fn metadata>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::metadata(path)).await + spawn_blocking(move || std::fs::metadata(path)).await } cfg_not_docs! { diff --git a/src/fs/open_options.rs b/src/fs/open_options.rs index 7f700734..91ad8cab 100644 --- a/src/fs/open_options.rs +++ b/src/fs/open_options.rs @@ -3,7 +3,7 @@ use std::future::Future; use crate::fs::File; use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// A builder for opening files with configurable options. /// @@ -285,7 +285,7 @@ impl OpenOptions { let path = path.as_ref().to_owned(); let options = self.0.clone(); async move { - let file = blocking::spawn(move || options.open(path)).await?; + let file = spawn_blocking(move || options.open(path)).await?; Ok(File::new(file, true)) } } diff --git a/src/fs/read.rs b/src/fs/read.rs index a0eb130b..ab7d1756 100644 --- a/src/fs/read.rs +++ b/src/fs/read.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Reads the entire contents of a file as raw bytes. /// @@ -36,5 +36,5 @@ use crate::task::blocking; /// ``` pub async fn read>(path: P) -> io::Result> { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::read(path)).await + spawn_blocking(move || std::fs::read(path)).await } diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index 6e478019..fe12fa6d 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -5,7 +5,7 @@ use crate::future::Future; use crate::io; use crate::path::Path; use crate::stream::Stream; -use crate::task::{blocking, Context, JoinHandle, Poll}; +use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; /// Returns a stream of entries in a directory. /// @@ -45,7 +45,7 @@ use crate::task::{blocking, Context, JoinHandle, Poll}; /// ``` pub async fn read_dir>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::read_dir(path)) + spawn_blocking(move || std::fs::read_dir(path)) .await .map(ReadDir::new) } @@ -91,7 +91,7 @@ impl Stream for ReadDir { let mut inner = opt.take().unwrap(); // Start the operation asynchronously. - self.0 = State::Busy(blocking::spawn(move || { + self.0 = State::Busy(spawn_blocking(move || { let next = inner.next(); (inner, next) })); diff --git a/src/fs/read_link.rs b/src/fs/read_link.rs index eaa7b624..7ec18a45 100644 --- a/src/fs/read_link.rs +++ b/src/fs/read_link.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::{Path, PathBuf}; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Reads a symbolic link and returns the path it points to. /// @@ -28,5 +28,5 @@ use crate::task::blocking; /// ``` pub async fn read_link>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::read_link(path).map(Into::into)).await + spawn_blocking(move || std::fs::read_link(path).map(Into::into)).await } diff --git a/src/fs/read_to_string.rs b/src/fs/read_to_string.rs index 40c4b6b8..d06aa614 100644 --- a/src/fs/read_to_string.rs +++ b/src/fs/read_to_string.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Reads the entire contents of a file as a string. /// @@ -37,5 +37,5 @@ use crate::task::blocking; /// ``` pub async fn read_to_string>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::read_to_string(path)).await + spawn_blocking(move || std::fs::read_to_string(path)).await } diff --git a/src/fs/remove_dir.rs b/src/fs/remove_dir.rs index d1fa7bf3..1a62db2e 100644 --- a/src/fs/remove_dir.rs +++ b/src/fs/remove_dir.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Removes an empty directory. /// @@ -29,5 +29,5 @@ use crate::task::blocking; /// ``` pub async fn remove_dir>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::remove_dir(path)).await + spawn_blocking(move || std::fs::remove_dir(path)).await } diff --git a/src/fs/remove_dir_all.rs b/src/fs/remove_dir_all.rs index 0a0fceb7..33667406 100644 --- a/src/fs/remove_dir_all.rs +++ b/src/fs/remove_dir_all.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Removes a directory and all of its contents. /// @@ -29,5 +29,5 @@ use crate::task::blocking; /// ``` pub async fn remove_dir_all>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::remove_dir_all(path)).await + spawn_blocking(move || std::fs::remove_dir_all(path)).await } diff --git a/src/fs/remove_file.rs b/src/fs/remove_file.rs index 5bc0608d..9a74ec11 100644 --- a/src/fs/remove_file.rs +++ b/src/fs/remove_file.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Removes a file. /// @@ -29,5 +29,5 @@ use crate::task::blocking; /// ``` pub async fn remove_file>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::remove_file(path)).await + spawn_blocking(move || std::fs::remove_file(path)).await } diff --git a/src/fs/rename.rs b/src/fs/rename.rs index c2aa77b7..ed7f39c9 100644 --- a/src/fs/rename.rs +++ b/src/fs/rename.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Renames a file or directory to a new location. /// @@ -34,5 +34,5 @@ use crate::task::blocking; pub async fn rename, Q: AsRef>(from: P, to: Q) -> io::Result<()> { let from = from.as_ref().to_owned(); let to = to.as_ref().to_owned(); - blocking::spawn(move || std::fs::rename(&from, &to)).await + spawn_blocking(move || std::fs::rename(&from, &to)).await } diff --git a/src/fs/set_permissions.rs b/src/fs/set_permissions.rs index d14ced94..60a6d6f1 100644 --- a/src/fs/set_permissions.rs +++ b/src/fs/set_permissions.rs @@ -1,7 +1,7 @@ use crate::fs::Permissions; use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Changes the permissions of a file or directory. /// @@ -32,5 +32,5 @@ use crate::task::blocking; /// ``` pub async fn set_permissions>(path: P, perm: Permissions) -> io::Result<()> { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::set_permissions(path, perm)).await + spawn_blocking(move || std::fs::set_permissions(path, perm)).await } diff --git a/src/fs/symlink_metadata.rs b/src/fs/symlink_metadata.rs index bc5cce86..45be6d99 100644 --- a/src/fs/symlink_metadata.rs +++ b/src/fs/symlink_metadata.rs @@ -1,7 +1,7 @@ use crate::fs::Metadata; use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Reads metadata for a path without following symbolic links. /// @@ -34,5 +34,5 @@ use crate::task::blocking; /// ``` pub async fn symlink_metadata>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - blocking::spawn(move || std::fs::symlink_metadata(path)).await + spawn_blocking(move || std::fs::symlink_metadata(path)).await } diff --git a/src/fs/write.rs b/src/fs/write.rs index 3df56042..4e5d20bb 100644 --- a/src/fs/write.rs +++ b/src/fs/write.rs @@ -1,6 +1,6 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Writes a slice of bytes as the new contents of a file. /// @@ -33,5 +33,5 @@ use crate::task::blocking; pub async fn write, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> { let path = path.as_ref().to_owned(); let contents = contents.as_ref().to_owned(); - blocking::spawn(move || std::fs::write(path, contents)).await + spawn_blocking(move || std::fs::write(path, contents)).await } diff --git a/src/io/stderr.rs b/src/io/stderr.rs index 0a8c4700..76ca5239 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -3,7 +3,7 @@ use std::sync::Mutex; use crate::future::Future; use crate::io::{self, Write}; -use crate::task::{blocking, Context, JoinHandle, Poll}; +use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; /// Constructs a new handle to the standard error of the current process. /// @@ -124,7 +124,7 @@ impl Write for Stderr { inner.buf[..buf.len()].copy_from_slice(buf); // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(move || { + *state = State::Busy(spawn_blocking(move || { let res = std::io::Write::write(&mut inner.stderr, &inner.buf); inner.last_op = Some(Operation::Write(res)); State::Idle(Some(inner)) @@ -152,7 +152,7 @@ impl Write for Stderr { let mut inner = opt.take().unwrap(); // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(move || { + *state = State::Busy(spawn_blocking(move || { let res = std::io::Write::flush(&mut inner.stderr); inner.last_op = Some(Operation::Flush(res)); State::Idle(Some(inner)) diff --git a/src/io/stdin.rs b/src/io/stdin.rs index 22b9cf34..c99a88db 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -3,7 +3,7 @@ use std::sync::Mutex; use crate::future::{self, Future}; use crate::io::{self, Read}; -use crate::task::{blocking, Context, JoinHandle, Poll}; +use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; /// Constructs a new handle to the standard input of the current process. /// @@ -127,7 +127,7 @@ impl Stdin { let mut inner = opt.take().unwrap(); // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(move || { + *state = State::Busy(spawn_blocking(move || { inner.line.clear(); let res = inner.stdin.read_line(&mut inner.line); inner.last_op = Some(Operation::ReadLine(res)); @@ -180,7 +180,7 @@ impl Read for Stdin { } // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(move || { + *state = State::Busy(spawn_blocking(move || { let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf); inner.last_op = Some(Operation::Read(res)); State::Idle(Some(inner)) diff --git a/src/io/stdout.rs b/src/io/stdout.rs index 1e9340fc..5455466a 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -3,7 +3,7 @@ use std::sync::Mutex; use crate::future::Future; use crate::io::{self, Write}; -use crate::task::{blocking, Context, JoinHandle, Poll}; +use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; /// Constructs a new handle to the standard output of the current process. /// @@ -124,7 +124,7 @@ impl Write for Stdout { inner.buf[..buf.len()].copy_from_slice(buf); // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(move || { + *state = State::Busy(spawn_blocking(move || { let res = std::io::Write::write(&mut inner.stdout, &inner.buf); inner.last_op = Some(Operation::Write(res)); State::Idle(Some(inner)) @@ -152,7 +152,7 @@ impl Write for Stdout { let mut inner = opt.take().unwrap(); // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(move || { + *state = State::Busy(spawn_blocking(move || { let res = std::io::Write::flush(&mut inner.stdout); inner.last_op = Some(Operation::Flush(res)); State::Idle(Some(inner)) diff --git a/src/net/addr.rs b/src/net/addr.rs index 519b1846..c17ff498 100644 --- a/src/net/addr.rs +++ b/src/net/addr.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use crate::future::Future; use crate::io; -use crate::task::{blocking, Context, JoinHandle, Poll}; +use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; cfg_not_docs! { macro_rules! ret { @@ -194,7 +194,7 @@ impl ToSocketAddrs for (&str, u16) { } let host = host.to_string(); - let task = blocking::spawn(move || { + let task = spawn_blocking(move || { std::net::ToSocketAddrs::to_socket_addrs(&(host.as_str(), port)) }); ToSocketAddrsFuture::Resolving(task) @@ -215,7 +215,7 @@ impl ToSocketAddrs for str { } let addr = self.to_string(); - let task = blocking::spawn(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str())); + let task = spawn_blocking(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str())); ToSocketAddrsFuture::Resolving(task) } } diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index 40e0abb2..7f33e859 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -105,7 +105,7 @@ static REACTOR: Lazy = Lazy::new(|| { // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O // handles. std::thread::Builder::new() - .name("async-net-driver".to_string()) + .name("async-std/net".to_string()) .spawn(move || { // If the driver thread panics, there's not much we can do. It is not a // recoverable error and there is no place to propagate it into so we just abort. diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 5988194f..13a1752f 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -6,8 +6,7 @@ use crate::future; use crate::io::{self, Read, Write}; use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; -use crate::task::blocking; -use crate::task::{Context, Poll}; +use crate::task::{spawn_blocking, Context, Poll}; /// A TCP stream between a local and a remote socket. /// @@ -74,7 +73,7 @@ impl TcpStream { let mut last_err = None; for addr in addrs.to_socket_addrs().await? { - let res = blocking::spawn(move || { + let res = spawn_blocking(move || { let std_stream = std::net::TcpStream::connect(addr)?; let mio_stream = mio::net::TcpStream::from_stream(std_stream)?; Ok(TcpStream { diff --git a/src/os/unix/fs.rs b/src/os/unix/fs.rs index d3e85234..498b3a97 100644 --- a/src/os/unix/fs.rs +++ b/src/os/unix/fs.rs @@ -2,7 +2,7 @@ use crate::io; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// Creates a new symbolic link on the filesystem. /// @@ -26,7 +26,7 @@ use crate::task::blocking; pub async fn symlink, Q: AsRef>(src: P, dst: Q) -> io::Result<()> { let src = src.as_ref().to_owned(); let dst = dst.as_ref().to_owned(); - blocking::spawn(move || std::os::unix::fs::symlink(&src, &dst)).await + spawn_blocking(move || std::os::unix::fs::symlink(&src, &dst)).await } cfg_not_docs! { diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index c96afd50..fc426b7c 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -11,7 +11,7 @@ use crate::io; use crate::net::driver::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; -use crate::task::blocking; +use crate::task::spawn_blocking; /// A Unix datagram socket. /// @@ -67,7 +67,7 @@ impl UnixDatagram { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let socket = blocking::spawn(move || mio_uds::UnixDatagram::bind(path)).await?; + let socket = spawn_blocking(move || mio_uds::UnixDatagram::bind(path)).await?; Ok(UnixDatagram::new(socket)) } diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index b6e6a298..9bd86d38 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -13,7 +13,7 @@ use crate::net::driver::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; use crate::stream::Stream; -use crate::task::{blocking, Context, Poll}; +use crate::task::{spawn_blocking, Context, Poll}; /// A Unix domain socket server, listening for connections. /// @@ -68,7 +68,7 @@ impl UnixListener { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let listener = blocking::spawn(move || mio_uds::UnixListener::bind(path)).await?; + let listener = spawn_blocking(move || mio_uds::UnixListener::bind(path)).await?; Ok(UnixListener { watcher: Watcher::new(listener), diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index b16f2a3c..647edc96 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -12,7 +12,7 @@ use crate::io::{self, Read, Write}; use crate::net::driver::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; -use crate::task::{blocking, Context, Poll}; +use crate::task::{spawn_blocking, Context, Poll}; /// A Unix stream socket. /// @@ -58,7 +58,7 @@ impl UnixStream { pub async fn connect>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - blocking::spawn(move || { + spawn_blocking(move || { let std_stream = std::os::unix::net::UnixStream::connect(path)?; let mio_stream = mio_uds::UnixStream::from_stream(std_stream)?; Ok(UnixStream { diff --git a/src/task/block_on.rs b/src/task/block_on.rs index c10303d7..54a415f5 100644 --- a/src/task/block_on.rs +++ b/src/task/block_on.rs @@ -1,21 +1,15 @@ -use std::cell::{Cell, UnsafeCell}; +use std::cell::Cell; use std::mem::{self, ManuallyDrop}; -use std::panic::{self, AssertUnwindSafe, UnwindSafe}; -use std::pin::Pin; use std::sync::Arc; use std::task::{RawWaker, RawWakerVTable}; use std::thread; use crossbeam_utils::sync::Parker; -use pin_project_lite::pin_project; +use kv_log_macro::trace; +use log::log_enabled; -use super::task; -use super::task_local; -use super::worker; use crate::future::Future; -use crate::task::{Context, Poll, Waker}; - -use kv_log_macro::trace; +use crate::task::{Context, Poll, Task, Waker}; /// Spawns a task and blocks the current thread on its result. /// @@ -42,81 +36,43 @@ pub fn block_on(future: F) -> T where F: Future, { - unsafe { - // A place on the stack where the result will be stored. - let out = &mut UnsafeCell::new(None); - - // Wrap the future into one that stores the result into `out`. - let future = { - let out = out.get(); - - async move { - let future = CatchUnwindFuture { - future: AssertUnwindSafe(future), - }; - *out = Some(future.await); - } - }; - - // Create a tag for the task. - let tag = task::Tag::new(None); - - // Log this `block_on` operation. - let child_id = tag.task_id().as_u64(); - let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0); + // Create a new task handle. + let task = Task::new(None); + // Log this `block_on` operation. + if log_enabled!(log::Level::Trace) { trace!("block_on", { - parent_id: parent_id, - child_id: child_id, + task_id: task.id().0, + parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), }); + } - // Wrap the future into one that drops task-local variables on exit. - let future = task_local::add_finalizer(future); - - let future = async move { - future.await; - trace!("block_on completed", { - parent_id: parent_id, - child_id: child_id, - }); - }; - - // Pin the future onto the stack. - pin_utils::pin_mut!(future); - - // Transmute the future into one that is futurestatic. - let future = mem::transmute::< - Pin<&'_ mut dyn Future>, - Pin<&'static mut dyn Future>, - >(future); - - // Block on the future and and wait for it to complete. - worker::set_tag(&tag, || block(future)); - - // Take out the result. - match (*out.get()).take().unwrap() { - Ok(v) => v, - Err(err) => panic::resume_unwind(err), + let future = async move { + // Drop task-locals on exit. + defer! { + Task::get_current(|t| unsafe { t.drop_locals() }); } - } -} -pin_project! { - struct CatchUnwindFuture { - #[pin] - future: F, - } -} + // Log completion on exit. + defer! { + if log_enabled!(log::Level::Trace) { + Task::get_current(|t| { + trace!("completed", { + task_id: t.id().0, + }); + }); + } + } -impl Future for CatchUnwindFuture { - type Output = thread::Result; + future.await + }; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - panic::catch_unwind(AssertUnwindSafe(|| self.project().future.poll(cx)))?.map(Ok) - } + // Run the future as a task. + unsafe { Task::set_current(&task, || run(future)) } } -fn block(f: F) -> T +/// Blocks the current thread on a future's result. +fn run(future: F) -> T where F: Future, { @@ -129,50 +85,59 @@ where static CACHE: Cell>> = Cell::new(None); } - pin_utils::pin_mut!(f); + // Virtual table for wakers based on `Arc`. + static VTABLE: RawWakerVTable = { + unsafe fn clone_raw(ptr: *const ()) -> RawWaker { + let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); + mem::forget(arc.clone()); + RawWaker::new(ptr, &VTABLE) + } + + unsafe fn wake_raw(ptr: *const ()) { + let arc = Arc::from_raw(ptr as *const Parker); + arc.unparker().unpark(); + } + + unsafe fn wake_by_ref_raw(ptr: *const ()) { + let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); + arc.unparker().unpark(); + } + + unsafe fn drop_raw(ptr: *const ()) { + drop(Arc::from_raw(ptr as *const Parker)) + } + + RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw) + }; + + // Pin the future on the stack. + pin_utils::pin_mut!(future); CACHE.with(|cache| { // Reuse a cached parker or create a new one for this invocation of `block`. let arc_parker: Arc = cache.take().unwrap_or_else(|| Arc::new(Parker::new())); - let ptr = (&*arc_parker as *const Parker) as *const (); - let vt = vtable(); - let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) }; + // Create a waker and task context. + let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, &VTABLE))) }; let cx = &mut Context::from_waker(&waker); + let mut step = 0; loop { - if let Poll::Ready(t) = f.as_mut().poll(cx) { + if let Poll::Ready(t) = future.as_mut().poll(cx) { // Save the parker for the next invocation of `block`. cache.set(Some(arc_parker)); return t; } - arc_parker.park(); + + // Yield a few times or park the current thread. + if step < 3 { + thread::yield_now(); + step += 1; + } else { + arc_parker.park(); + step = 0; + } } }) } - -fn vtable() -> &'static RawWakerVTable { - unsafe fn clone_raw(ptr: *const ()) -> RawWaker { - #![allow(clippy::redundant_clone)] - let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); - mem::forget(arc.clone()); - RawWaker::new(ptr, vtable()) - } - - unsafe fn wake_raw(ptr: *const ()) { - let arc = Arc::from_raw(ptr as *const Parker); - arc.unparker().unpark(); - } - - unsafe fn wake_by_ref_raw(ptr: *const ()) { - let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); - arc.unparker().unpark(); - } - - unsafe fn drop_raw(ptr: *const ()) { - drop(Arc::from_raw(ptr as *const Parker)) - } - - &RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw) -} diff --git a/src/task/builder.rs b/src/task/builder.rs index a43b42bc..a61d7859 100644 --- a/src/task/builder.rs +++ b/src/task/builder.rs @@ -1,7 +1,11 @@ -use super::pool; -use super::JoinHandle; +use kv_log_macro::trace; +use log::log_enabled; + use crate::future::Future; use crate::io; +use crate::task::executor; +use crate::task::{JoinHandle, Task}; +use crate::utils::abort_on_panic; /// Task builder that configures the settings of a new task. #[derive(Debug, Default)] @@ -11,11 +15,13 @@ pub struct Builder { impl Builder { /// Creates a new builder. + #[inline] pub fn new() -> Builder { Builder { name: None } } /// Configures the name of the task. + #[inline] pub fn name(mut self, name: String) -> Builder { self.name = Some(name); self @@ -27,6 +33,52 @@ impl Builder { F: Future + Send + 'static, T: Send + 'static, { - Ok(pool::get().spawn(future, self)) + // Create a new task handle. + let task = Task::new(self.name); + + // Log this `spawn` operation. + if log_enabled!(log::Level::Trace) { + trace!("spawn", { + task_id: task.id().0, + parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), + }); + } + + let future = async move { + // Drop task-locals on exit. + defer! { + Task::get_current(|t| unsafe { t.drop_locals() }); + } + + // Log completion on exit. + defer! { + if log_enabled!(log::Level::Trace) { + Task::get_current(|t| { + trace!("completed", { + task_id: t.id().0, + }); + }); + } + } + + future.await + }; + + let schedule = move |t| executor::schedule(Runnable(t)); + let (task, handle) = async_task::spawn(future, schedule, task); + task.schedule(); + Ok(JoinHandle::new(handle)) + } +} + +/// A runnable task. +pub(crate) struct Runnable(async_task::Task); + +impl Runnable { + /// Runs the task by polling its future once. + pub fn run(self) { + unsafe { + Task::set_current(self.0.tag(), || abort_on_panic(|| self.0.run())); + } } } diff --git a/src/task/current.rs b/src/task/current.rs new file mode 100644 index 00000000..0dc36991 --- /dev/null +++ b/src/task/current.rs @@ -0,0 +1,28 @@ +use crate::task::Task; + +/// Returns a handle to the current task. +/// +/// # Panics +/// +/// This function will panic if not called within the context of a task created by [`block_on`], +/// [`spawn`], or [`Builder::spawn`]. +/// +/// [`block_on`]: fn.block_on.html +/// [`spawn`]: fn.spawn.html +/// [`Builder::spawn`]: struct.Builder.html#method.spawn +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use async_std::task; +/// +/// println!("The name of this task is {:?}", task::current().name()); +/// # +/// # }) +/// ``` +pub fn current() -> Task { + Task::get_current(|t| t.clone()) + .expect("`task::current()` called outside the context of a task") +} diff --git a/src/task/executor/mod.rs b/src/task/executor/mod.rs new file mode 100644 index 00000000..2a6a696e --- /dev/null +++ b/src/task/executor/mod.rs @@ -0,0 +1,13 @@ +//! Task executor. +//! +//! API bindings between `crate::task` and this module are very simple: +//! +//! * The only export is the `schedule` function. +//! * The only import is the `crate::task::Runnable` type. + +pub(crate) use pool::schedule; + +use sleepers::Sleepers; + +mod pool; +mod sleepers; diff --git a/src/task/executor/pool.rs b/src/task/executor/pool.rs new file mode 100644 index 00000000..1e743844 --- /dev/null +++ b/src/task/executor/pool.rs @@ -0,0 +1,140 @@ +use std::cell::UnsafeCell; +use std::iter; +use std::thread; +use std::time::Duration; + +use crossbeam_deque::{Injector, Stealer, Worker}; +use once_cell::sync::Lazy; + +use crate::task::executor::Sleepers; +use crate::task::Runnable; +use crate::utils::{abort_on_panic, random}; + +/// The state of an executor. +struct Pool { + /// The global queue of tasks. + injector: Injector, + + /// Handles to local queues for stealing work from worker threads. + stealers: Vec>, + + /// Used for putting idle workers to sleep and notifying them when new tasks come in. + sleepers: Sleepers, +} + +/// Global executor that runs spawned tasks. +static POOL: Lazy = Lazy::new(|| { + let num_threads = num_cpus::get().max(1); + let mut stealers = Vec::new(); + + // Spawn worker threads. + for _ in 0..num_threads { + let worker = Worker::new_fifo(); + stealers.push(worker.stealer()); + + thread::Builder::new() + .name("async-std/executor".to_string()) + .spawn(|| abort_on_panic(|| main_loop(worker))) + .expect("cannot start a thread driving tasks"); + } + + Pool { + injector: Injector::new(), + stealers, + sleepers: Sleepers::new(), + } +}); + +thread_local! { + /// Local task queue associated with the current worker thread. + static QUEUE: UnsafeCell>> = UnsafeCell::new(None); +} + +/// Schedules a new runnable task for execution. +pub(crate) fn schedule(task: Runnable) { + QUEUE.with(|queue| { + let local = unsafe { (*queue.get()).as_ref() }; + + // If the current thread is a worker thread, push the task into its local task queue. + // Otherwise, push it into the global task queue. + match local { + None => POOL.injector.push(task), + Some(q) => q.push(task), + } + }); + + // Notify a sleeping worker that new work just came in. + POOL.sleepers.notify_one(); +} + +/// Main loop running a worker thread. +fn main_loop(local: Worker) { + // Initialize the local task queue. + QUEUE.with(|queue| unsafe { *queue.get() = Some(local) }); + + // The number of times the thread didn't find work in a row. + let mut step = 0; + + loop { + // Try to find a runnable task. + match find_runnable() { + Some(task) => { + // Found. Now run the task. + task.run(); + step = 0; + } + None => { + // Yield the current thread or put it to sleep. + match step { + 0..=2 => { + thread::yield_now(); + step += 1; + } + 3 => { + thread::sleep(Duration::from_micros(10)); + step += 1; + } + _ => { + POOL.sleepers.wait(); + step = 0; + } + } + } + } + } +} + +/// Find the next runnable task. +fn find_runnable() -> Option { + let pool = &*POOL; + + QUEUE.with(|queue| { + let local = unsafe { (*queue.get()).as_ref().unwrap() }; + + // Pop a task from the local queue, if not empty. + local.pop().or_else(|| { + // Otherwise, we need to look for a task elsewhere. + iter::repeat_with(|| { + // Try stealing a batch of tasks from the global queue. + pool.injector + .steal_batch_and_pop(&local) + // Or try stealing a batch of tasks from one of the other threads. + .or_else(|| { + // First, pick a random starting point in the list of local queues. + let len = pool.stealers.len(); + let start = random(len as u32) as usize; + + // Try stealing a batch of tasks from each local queue starting from the + // chosen point. + let (l, r) = pool.stealers.split_at(start); + let rotated = r.iter().chain(l.iter()); + rotated.map(|s| s.steal_batch_and_pop(&local)).collect() + }) + }) + // Loop while no task was stolen and any steal operation needs to be retried. + .find(|s| !s.is_retry()) + // Extract the stolen task, if there is one. + .and_then(|s| s.success()) + }) + }) +} diff --git a/src/task/sleepers.rs b/src/task/executor/sleepers.rs similarity index 100% rename from src/task/sleepers.rs rename to src/task/executor/sleepers.rs diff --git a/src/task/join_handle.rs b/src/task/join_handle.rs new file mode 100644 index 00000000..9fefff2e --- /dev/null +++ b/src/task/join_handle.rs @@ -0,0 +1,56 @@ +use std::future::Future; +use std::pin::Pin; + +use crate::task::{Context, Poll, Task}; + +/// A handle that awaits the result of a task. +/// +/// Dropping a [`JoinHandle`] will detach the task, meaning that there is no longer +/// a handle to the task and no way to `join` on it. +/// +/// Created when a task is [spawned]. +/// +/// [spawned]: fn.spawn.html +#[derive(Debug)] +pub struct JoinHandle(async_task::JoinHandle); + +unsafe impl Send for JoinHandle {} +unsafe impl Sync for JoinHandle {} + +impl JoinHandle { + /// Creates a new `JoinHandle`. + pub(crate) fn new(inner: async_task::JoinHandle) -> JoinHandle { + JoinHandle(inner) + } + + /// Returns a handle to the underlying task. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::task; + /// + /// let handle = task::spawn(async { + /// 1 + 2 + /// }); + /// println!("id = {}", handle.task().id()); + /// # + /// # }) + pub fn task(&self) -> &Task { + self.0.tag() + } +} + +impl Future for JoinHandle { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.0).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => panic!("cannot await the result of a panicked task"), + Poll::Ready(Some(val)) => Poll::Ready(val), + } + } +} diff --git a/src/task/mod.rs b/src/task/mod.rs index 24eae081..72d559a7 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -126,63 +126,35 @@ pub use async_macros::ready; pub use block_on::block_on; pub use builder::Builder; -pub use pool::spawn; +pub use current::current; +pub use join_handle::JoinHandle; pub use sleep::sleep; -pub use task::{JoinHandle, Task, TaskId}; +pub use spawn::spawn; +pub use task::Task; +pub use task_id::TaskId; pub use task_local::{AccessError, LocalKey}; -pub use worker::current; + +#[cfg(any(feature = "unstable", test))] +pub use spawn_blocking::spawn_blocking; +#[cfg(not(any(feature = "unstable", test)))] +pub(crate) use spawn_blocking::spawn_blocking; + +use builder::Runnable; +use task_local::LocalsMap; mod block_on; mod builder; -mod pool; +mod current; +mod executor; +mod join_handle; mod sleep; -mod sleepers; +mod spawn; +mod spawn_blocking; mod task; +mod task_id; mod task_local; -mod worker; - -pub(crate) mod blocking; cfg_unstable! { - mod yield_now; pub use yield_now::yield_now; -} - -/// Spawns a blocking task. -/// -/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. This -/// is useful to prevent long-running synchronous operations from blocking the main futures -/// executor. -/// -/// See also: [`task::block_on`], [`task::spawn`]. -/// -/// [`task::block_on`]: fn.block_on.html -/// [`task::spawn`]: fn.spawn.html -/// -/// # Examples -/// -/// Basic usage: -/// -/// ``` -/// # async_std::task::block_on(async { -/// # -/// use async_std::task; -/// -/// task::spawn_blocking(|| { -/// println!("long-running task here"); -/// }).await; -/// # -/// # }) -/// ``` -// Once this function stabilizes we should merge `blocking::spawn` into this so -// all code in our crate uses `task::blocking` too. -#[cfg(feature = "unstable")] -#[cfg_attr(feature = "docs", doc(cfg(unstable)))] -#[inline] -pub fn spawn_blocking(f: F) -> task::JoinHandle -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - blocking::spawn(f) + mod yield_now; } diff --git a/src/task/pool.rs b/src/task/pool.rs deleted file mode 100644 index 3fd70470..00000000 --- a/src/task/pool.rs +++ /dev/null @@ -1,136 +0,0 @@ -use std::iter; -use std::thread; - -use crossbeam_deque::{Injector, Stealer, Worker}; -use kv_log_macro::trace; -use once_cell::sync::Lazy; - -use super::sleepers::Sleepers; -use super::task; -use super::task_local; -use super::worker; -use super::{Builder, JoinHandle}; -use crate::future::Future; -use crate::utils::abort_on_panic; - -/// Spawns a task. -/// -/// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task. -/// -/// [`std::thread`]: https://doc.rust-lang.org/std/thread/fn.spawn.html -/// -/// # Examples -/// -/// ``` -/// # async_std::task::block_on(async { -/// # -/// use async_std::task; -/// -/// let handle = task::spawn(async { -/// 1 + 2 -/// }); -/// -/// assert_eq!(handle.await, 3); -/// # -/// # }) -/// ``` -pub fn spawn(future: F) -> JoinHandle -where - F: Future + Send + 'static, - T: Send + 'static, -{ - Builder::new().spawn(future).expect("cannot spawn future") -} - -pub(crate) struct Pool { - pub injector: Injector, - pub stealers: Vec>, - pub sleepers: Sleepers, -} - -impl Pool { - /// Spawn a future onto the pool. - pub fn spawn(&self, future: F, builder: Builder) -> JoinHandle - where - F: Future + Send + 'static, - T: Send + 'static, - { - let tag = task::Tag::new(builder.name); - - // Log this `spawn` operation. - let child_id = tag.task_id().as_u64(); - let parent_id = worker::get_task(|t| t.id().as_u64()).unwrap_or(0); - - trace!("spawn", { - parent_id: parent_id, - child_id: child_id, - }); - - // Wrap the future into one that drops task-local variables on exit. - let future = unsafe { task_local::add_finalizer(future) }; - - // Wrap the future into one that logs completion on exit. - let future = async move { - let res = future.await; - trace!("spawn completed", { - parent_id: parent_id, - child_id: child_id, - }); - res - }; - - let (task, handle) = async_task::spawn(future, worker::schedule, tag); - task.schedule(); - JoinHandle::new(handle) - } - - /// Find the next runnable task to run. - pub fn find_task(&self, local: &Worker) -> Option { - // Pop a task from the local queue, if not empty. - local.pop().or_else(|| { - // Otherwise, we need to look for a task elsewhere. - iter::repeat_with(|| { - // Try stealing a batch of tasks from the injector queue. - self.injector - .steal_batch_and_pop(local) - // Or try stealing a bach of tasks from one of the other threads. - .or_else(|| { - self.stealers - .iter() - .map(|s| s.steal_batch_and_pop(local)) - .collect() - }) - }) - // Loop while no task was stolen and any steal operation needs to be retried. - .find(|s| !s.is_retry()) - // Extract the stolen task, if there is one. - .and_then(|s| s.success()) - }) - } -} - -#[inline] -pub(crate) fn get() -> &'static Pool { - static POOL: Lazy = Lazy::new(|| { - let num_threads = num_cpus::get().max(1); - let mut stealers = Vec::new(); - - // Spawn worker threads. - for _ in 0..num_threads { - let worker = Worker::new_fifo(); - stealers.push(worker.stealer()); - - thread::Builder::new() - .name("async-task-driver".to_string()) - .spawn(|| abort_on_panic(|| worker::main_loop(worker))) - .expect("cannot start a thread driving tasks"); - } - - Pool { - injector: Injector::new(), - stealers, - sleepers: Sleepers::new(), - } - }); - &*POOL -} diff --git a/src/task/spawn.rs b/src/task/spawn.rs new file mode 100644 index 00000000..da2957b0 --- /dev/null +++ b/src/task/spawn.rs @@ -0,0 +1,31 @@ +use crate::future::Future; +use crate::task::{Builder, JoinHandle}; + +/// Spawns a task. +/// +/// This function is similar to [`std::thread::spawn`], except it spawns an asynchronous task. +/// +/// [`std::thread`]: https://doc.rust-lang.org/std/thread/fn.spawn.html +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use async_std::task; +/// +/// let handle = task::spawn(async { +/// 1 + 2 +/// }); +/// +/// assert_eq!(handle.await, 3); +/// # +/// # }) +/// ``` +pub fn spawn(future: F) -> JoinHandle +where + F: Future + Send + 'static, + T: Send + 'static, +{ + Builder::new().spawn(future).expect("cannot spawn task") +} diff --git a/src/task/blocking.rs b/src/task/spawn_blocking.rs similarity index 67% rename from src/task/blocking.rs rename to src/task/spawn_blocking.rs index 1f1a222a..b6b5ea34 100644 --- a/src/task/blocking.rs +++ b/src/task/spawn_blocking.rs @@ -1,5 +1,3 @@ -//! A thread pool for running blocking functions asynchronously. - use std::sync::atomic::{AtomicU64, Ordering}; use std::thread; use std::time::Duration; @@ -7,22 +5,63 @@ use std::time::Duration; use crossbeam_channel::{bounded, Receiver, Sender}; use once_cell::sync::Lazy; -use crate::task::task::{JoinHandle, Tag}; -use crate::utils::abort_on_panic; +use crate::task::{JoinHandle, Task}; +use crate::utils::{abort_on_panic, random}; + +type Runnable = async_task::Task; + +/// Spawns a blocking task. +/// +/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. This +/// is useful to prevent long-running synchronous operations from blocking the main futures +/// executor. +/// +/// See also: [`task::block_on`], [`task::spawn`]. +/// +/// [`task::block_on`]: fn.block_on.html +/// [`task::spawn`]: fn.spawn.html +/// +/// # Examples +/// +/// Basic usage: +/// +/// ``` +/// # #[cfg(feature = "unstable")] +/// # async_std::task::block_on(async { +/// # +/// use async_std::task; +/// +/// task::spawn_blocking(|| { +/// println!("long-running task here"); +/// }).await; +/// # +/// # }) +/// ``` +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[inline] +pub fn spawn_blocking(f: F) -> JoinHandle +where + F: FnOnce() -> T + Send + 'static, + T: Send + 'static, +{ + let (task, handle) = async_task::spawn(async { f() }, schedule, Task::new(None)); + task.schedule(); + JoinHandle::new(handle) +} const MAX_THREADS: u64 = 10_000; static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); struct Pool { - sender: Sender>, - receiver: Receiver>, + sender: Sender, + receiver: Receiver, } static POOL: Lazy = Lazy::new(|| { for _ in 0..2 { thread::Builder::new() - .name("async-blocking-driver".to_string()) + .name("async-std/blocking".to_string()) .spawn(|| { abort_on_panic(|| { for task in &POOL.receiver { @@ -66,7 +105,7 @@ fn maybe_create_another_blocking_thread() { let rand_sleep_ms = u64::from(random(10_000)); thread::Builder::new() - .name("async-blocking-driver-dynamic".to_string()) + .name("async-std/blocking".to_string()) .spawn(move || { let wait_limit = Duration::from_millis(1000 + rand_sleep_ms); @@ -82,8 +121,8 @@ fn maybe_create_another_blocking_thread() { // Enqueues work, attempting to send to the threadpool in a // nonblocking way and spinning up another worker thread if // there is not a thread ready to accept the work. -fn schedule(t: async_task::Task) { - if let Err(err) = POOL.sender.try_send(t) { +pub(crate) fn schedule(task: Runnable) { + if let Err(err) = POOL.sender.try_send(task) { // We were not able to send to the channel without // blocking. Try to spin up another thread and then // retry sending while blocking. @@ -91,45 +130,3 @@ fn schedule(t: async_task::Task) { POOL.sender.send(err.into_inner()).unwrap(); } } - -/// Spawns a blocking task. -/// -/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. -pub(crate) fn spawn(f: F) -> JoinHandle -where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, -{ - let tag = Tag::new(None); - let future = async move { f() }; - let (task, handle) = async_task::spawn(future, schedule, tag); - task.schedule(); - JoinHandle::new(handle) -} - -/// Generates a random number in `0..n`. -fn random(n: u32) -> u32 { - use std::cell::Cell; - use std::num::Wrapping; - - thread_local! { - static RNG: Cell> = Cell::new(Wrapping(1_406_868_647)); - } - - RNG.with(|rng| { - // This is the 32-bit variant of Xorshift. - // - // Source: https://en.wikipedia.org/wiki/Xorshift - let mut x = rng.get(); - x ^= x << 13; - x ^= x >> 17; - x ^= x << 5; - rng.set(x); - - // This is a fast alternative to `x % n`. - // - // Author: Daniel Lemire - // Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ - ((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32 - }) -} diff --git a/src/task/task.rs b/src/task/task.rs index 3d8e1080..bcec2e0e 100644 --- a/src/task/task.rs +++ b/src/task/task.rs @@ -1,31 +1,74 @@ +use std::cell::Cell; use std::fmt; -use std::future::Future; -use std::i64; -use std::mem; -use std::num::NonZeroU64; -use std::pin::Pin; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; +use std::mem::ManuallyDrop; +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Arc; -use super::task_local; -use crate::task::{Context, Poll}; +use crate::task::{LocalsMap, TaskId}; +use crate::utils::abort_on_panic; + +thread_local! { + /// A pointer to the currently running task. + static CURRENT: Cell<*const Task> = Cell::new(ptr::null_mut()); +} + +/// The inner representation of a task handle. +struct Inner { + /// The task ID. + id: TaskId, + + /// The optional task name. + name: Option>, + + /// The map holding task-local values. + locals: LocalsMap, +} + +impl Inner { + #[inline] + fn new(name: Option) -> Inner { + Inner { + id: TaskId::generate(), + name: name.map(String::into_boxed_str), + locals: LocalsMap::new(), + } + } +} /// A handle to a task. -#[derive(Clone)] -pub struct Task(Arc); +pub struct Task { + /// The inner representation. + /// + /// This pointer is lazily initialized on first use. In most cases, the inner representation is + /// never touched and therefore we don't allocate it unless it's really needed. + inner: AtomicPtr, +} unsafe impl Send for Task {} unsafe impl Sync for Task {} impl Task { - /// Returns a reference to task metadata. - pub(crate) fn metadata(&self) -> &Metadata { - &self.0 + /// Creates a new task handle. + /// + /// If the task is unnamed, the inner representation of the task will be lazily allocated on + /// demand. + #[inline] + pub(crate) fn new(name: Option) -> Task { + let inner = match name { + None => AtomicPtr::default(), + Some(name) => { + let raw = Arc::into_raw(Arc::new(Inner::new(Some(name)))); + AtomicPtr::new(raw as *mut Inner) + } + }; + Task { inner } } /// Gets the task's unique identifier. + #[inline] pub fn id(&self) -> TaskId { - self.metadata().task_id + self.inner().id } /// Returns the name of this task. @@ -34,178 +77,101 @@ impl Task { /// /// [`Builder::name`]: struct.Builder.html#method.name pub fn name(&self) -> Option<&str> { - self.metadata().name.as_ref().map(|s| s.as_str()) + self.inner().name.as_ref().map(|s| &**s) } -} - -impl fmt::Debug for Task { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Task").field("name", &self.name()).finish() - } -} -/// A handle that awaits the result of a task. -/// -/// Dropping a [`JoinHandle`] will detach the task, meaning that there is no longer -/// a handle to the task and no way to `join` on it. -/// -/// Created when a task is [spawned]. -/// -/// [spawned]: fn.spawn.html -#[derive(Debug)] -pub struct JoinHandle(async_task::JoinHandle); - -unsafe impl Send for JoinHandle {} -unsafe impl Sync for JoinHandle {} - -impl JoinHandle { - pub(crate) fn new(inner: async_task::JoinHandle) -> JoinHandle { - JoinHandle(inner) + /// Returns the map holding task-local values. + pub(crate) fn locals(&self) -> &LocalsMap { + &self.inner().locals } - /// Returns a handle to the underlying task. + /// Drops all task-local values. /// - /// # Examples - /// - /// ``` - /// # async_std::task::block_on(async { - /// # - /// use async_std::task; - /// - /// let handle = task::spawn(async { - /// 1 + 2 - /// }); - /// println!("id = {}", handle.task().id()); - /// # - /// # }) - pub fn task(&self) -> &Task { - self.0.tag().task() - } -} - -impl Future for JoinHandle { - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.0).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(None) => panic!("task has panicked"), - Poll::Ready(Some(val)) => Poll::Ready(val), + /// This method is only safe to call at the end of the task. + #[inline] + pub(crate) unsafe fn drop_locals(&self) { + let raw = self.inner.load(Ordering::Acquire); + if let Some(inner) = raw.as_mut() { + // Abort the process if dropping task-locals panics. + abort_on_panic(|| { + inner.locals.clear(); + }); } } -} -/// A unique identifier for a task. -/// -/// # Examples -/// -/// ``` -/// # -/// use async_std::task; -/// -/// task::block_on(async { -/// println!("id = {:?}", task::current().id()); -/// }) -/// ``` -#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)] -pub struct TaskId(NonZeroU64); - -impl TaskId { - pub(crate) fn new() -> TaskId { - static COUNTER: AtomicU64 = AtomicU64::new(1); - - let id = COUNTER.fetch_add(1, Ordering::Relaxed); - - if id > i64::MAX as u64 { - std::process::abort(); + /// Returns the inner representation, initializing it on first use. + fn inner(&self) -> &Inner { + loop { + let raw = self.inner.load(Ordering::Acquire); + if !raw.is_null() { + return unsafe { &*raw }; + } + + let new = Arc::into_raw(Arc::new(Inner::new(None))) as *mut Inner; + if self.inner.compare_and_swap(raw, new, Ordering::AcqRel) != raw { + unsafe { + drop(Arc::from_raw(new)); + } + } } - unsafe { TaskId(NonZeroU64::new_unchecked(id)) } } - pub(crate) fn as_u64(self) -> u64 { - self.0.get() - } -} - -impl fmt::Display for TaskId { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.0) + /// Set a reference to the current task. + pub(crate) unsafe fn set_current(task: *const Task, f: F) -> R + where + F: FnOnce() -> R, + { + CURRENT.with(|current| { + let old_task = current.replace(task); + defer! { + current.set(old_task); + } + f() + }) } -} - -pub(crate) type Runnable = async_task::Task; -pub(crate) struct Metadata { - pub task_id: TaskId, - pub name: Option, - pub local_map: task_local::Map, -} - -pub(crate) struct Tag { - task_id: TaskId, - raw_metadata: AtomicUsize, -} - -impl Tag { - pub fn new(name: Option) -> Tag { - let task_id = TaskId::new(); - - let opt_task = name.map(|name| { - Task(Arc::new(Metadata { - task_id, - name: Some(name), - local_map: task_local::Map::new(), - })) - }); - - Tag { - task_id, - raw_metadata: AtomicUsize::new(unsafe { - mem::transmute::, usize>(opt_task) - }), + /// Gets a reference to the current task. + pub(crate) fn get_current(f: F) -> Option + where + F: FnOnce(&Task) -> R, + { + let res = CURRENT.try_with(|current| unsafe { current.get().as_ref().map(f) }); + match res { + Ok(Some(val)) => Some(val), + Ok(None) | Err(_) => None, } } +} - pub fn task(&self) -> &Task { - #[allow(clippy::transmute_ptr_to_ptr)] - unsafe { - let raw = self.raw_metadata.load(Ordering::Acquire); - - if mem::transmute::<&usize, &Option>(&raw).is_none() { - let new = Some(Task(Arc::new(Metadata { - task_id: TaskId::new(), - name: None, - local_map: task_local::Map::new(), - }))); - - let new_raw = mem::transmute::, usize>(new); - - if self - .raw_metadata - .compare_exchange(raw, new_raw, Ordering::AcqRel, Ordering::Acquire) - .is_err() - { - let new = mem::transmute::>(new_raw); - drop(new); - } +impl Drop for Task { + fn drop(&mut self) { + // Deallocate the inner representation if it was initialized. + let raw = *self.inner.get_mut(); + if !raw.is_null() { + unsafe { + drop(Arc::from_raw(raw)); } - - #[allow(clippy::transmute_ptr_to_ptr)] - mem::transmute::<&AtomicUsize, &Option>(&self.raw_metadata) - .as_ref() - .unwrap() } } +} - pub fn task_id(&self) -> TaskId { - self.task_id +impl Clone for Task { + fn clone(&self) -> Task { + // We need to make sure the inner representation is initialized now so that this instance + // and the clone have raw pointers that point to the same `Arc`. + let arc = unsafe { ManuallyDrop::new(Arc::from_raw(self.inner())) }; + let raw = Arc::into_raw(Arc::clone(&arc)); + Task { + inner: AtomicPtr::new(raw as *mut Inner), + } } } -impl Drop for Tag { - fn drop(&mut self) { - let raw = *self.raw_metadata.get_mut(); - let opt_task = unsafe { mem::transmute::>(raw) }; - drop(opt_task); +impl fmt::Debug for Task { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Task") + .field("id", &self.id()) + .field("name", &self.name()) + .finish() } } diff --git a/src/task/task_id.rs b/src/task/task_id.rs new file mode 100644 index 00000000..67eee154 --- /dev/null +++ b/src/task/task_id.rs @@ -0,0 +1,35 @@ +use std::fmt; +use std::sync::atomic::{AtomicU64, Ordering}; + +/// A unique identifier for a task. +/// +/// # Examples +/// +/// ``` +/// use async_std::task; +/// +/// task::block_on(async { +/// println!("id = {:?}", task::current().id()); +/// }) +/// ``` +#[derive(Eq, PartialEq, Clone, Copy, Hash, Debug)] +pub struct TaskId(pub(crate) u64); + +impl TaskId { + /// Generates a new `TaskId`. + pub(crate) fn generate() -> TaskId { + static COUNTER: AtomicU64 = AtomicU64::new(1); + + let id = COUNTER.fetch_add(1, Ordering::Relaxed); + if id > u64::max_value() / 2 { + std::process::abort(); + } + TaskId(id) + } +} + +impl fmt::Display for TaskId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} diff --git a/src/task/task_local.rs b/src/task/task_local.rs index e92f4f92..0869cab4 100644 --- a/src/task/task_local.rs +++ b/src/task/task_local.rs @@ -1,14 +1,9 @@ use std::cell::UnsafeCell; use std::error::Error; use std::fmt; -use std::future::Future; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicU32, Ordering}; -use once_cell::sync::Lazy; - -use super::worker; -use crate::utils::abort_on_panic; +use crate::task::Task; /// Declares task-local values. /// @@ -50,7 +45,7 @@ macro_rules! task_local { $crate::task::LocalKey { __init, - __key: ::std::sync::atomic::AtomicUsize::new(0), + __key: ::std::sync::atomic::AtomicU32::new(0), } }; ); @@ -71,7 +66,7 @@ pub struct LocalKey { pub __init: fn() -> T, #[doc(hidden)] - pub __key: AtomicUsize, + pub __key: AtomicU32, } impl LocalKey { @@ -154,14 +149,13 @@ impl LocalKey { where F: FnOnce(&T) -> R, { - worker::get_task(|task| unsafe { + Task::get_current(|task| unsafe { // Prepare the numeric key, initialization function, and the map of task-locals. let key = self.key(); let init = || Box::new((self.__init)()) as Box; - let map = &task.metadata().local_map; // Get the value in the map of task-locals, or initialize and insert one. - let value: *const dyn Send = map.get_or_insert(key, init); + let value: *const dyn Send = task.locals().get_or_insert(key, init); // Call the closure with the value passed as an argument. f(&*(value as *const T)) @@ -171,24 +165,26 @@ impl LocalKey { /// Returns the numeric key associated with this task-local. #[inline] - fn key(&self) -> usize { + fn key(&self) -> u32 { #[cold] - fn init(key: &AtomicUsize) -> usize { - static COUNTER: Lazy> = Lazy::new(|| Mutex::new(1)); + fn init(key: &AtomicU32) -> u32 { + static COUNTER: AtomicU32 = AtomicU32::new(1); - let mut counter = COUNTER.lock().unwrap(); - let prev = key.compare_and_swap(0, *counter, Ordering::AcqRel); + let counter = COUNTER.fetch_add(1, Ordering::Relaxed); + if counter > u32::max_value() / 2 { + std::process::abort(); + } - if prev == 0 { - *counter += 1; - *counter - 1 - } else { - prev + match key.compare_and_swap(0, counter, Ordering::AcqRel) { + 0 => counter, + k => k, } } - let key = self.__key.load(Ordering::Acquire); - if key == 0 { init(&self.__key) } else { key } + match self.__key.load(Ordering::Acquire) { + 0 => init(&self.__key), + k => k, + } } } @@ -214,51 +210,55 @@ impl fmt::Display for AccessError { impl Error for AccessError {} +/// A key-value entry in a map of task-locals. +struct Entry { + /// Key identifying the task-local variable. + key: u32, + + /// Value stored in this entry. + value: Box, +} + /// A map that holds task-locals. -pub(crate) struct Map { - /// A list of `(key, value)` entries sorted by the key. - entries: UnsafeCell)>>, +pub(crate) struct LocalsMap { + /// A list of key-value entries sorted by the key. + entries: UnsafeCell>>, } -impl Map { +impl LocalsMap { /// Creates an empty map of task-locals. - pub fn new() -> Map { - Map { - entries: UnsafeCell::new(Vec::new()), + pub fn new() -> LocalsMap { + LocalsMap { + entries: UnsafeCell::new(Some(Vec::new())), } } - /// Returns a thread-local value associated with `key` or inserts one constructed by `init`. + /// Returns a task-local value associated with `key` or inserts one constructed by `init`. #[inline] - pub fn get_or_insert(&self, key: usize, init: impl FnOnce() -> Box) -> &dyn Send { - let entries = unsafe { &mut *self.entries.get() }; - - let index = match entries.binary_search_by_key(&key, |e| e.0) { - Ok(i) => i, - Err(i) => { - entries.insert(i, (key, init())); - i + pub fn get_or_insert(&self, key: u32, init: impl FnOnce() -> Box) -> &dyn Send { + match unsafe { (*self.entries.get()).as_mut() } { + None => panic!("can't access task-locals while the task is being dropped"), + Some(entries) => { + let index = match entries.binary_search_by_key(&key, |e| e.key) { + Ok(i) => i, + Err(i) => { + let value = init(); + entries.insert(i, Entry { key, value }); + i + } + }; + &*entries[index].value } - }; - - &*entries[index].1 + } } /// Clears the map and drops all task-locals. - pub fn clear(&self) { - let entries = unsafe { &mut *self.entries.get() }; - entries.clear(); - } -} - -// Wrap the future into one that drops task-local variables on exit. -pub(crate) unsafe fn add_finalizer(f: impl Future) -> impl Future { - async move { - let res = f.await; - - // Abort on panic because thread-local variables behave the same way. - abort_on_panic(|| worker::get_task(|task| task.metadata().local_map.clear())); - - res + /// + /// This method is only safe to call at the end of the task. + pub unsafe fn clear(&self) { + // Since destructors may attempt to access task-locals, we musnt't hold a mutable reference + // to the `Vec` while dropping them. Instead, we first take the `Vec` out and then drop it. + let entries = (*self.entries.get()).take(); + drop(entries); } } diff --git a/src/task/worker.rs b/src/task/worker.rs deleted file mode 100644 index f7b08e16..00000000 --- a/src/task/worker.rs +++ /dev/null @@ -1,110 +0,0 @@ -use std::cell::Cell; -use std::ptr; - -use crossbeam_deque::Worker; - -use super::pool; -use super::task; -use super::Task; -use crate::utils::abort_on_panic; - -/// Returns a handle to the current task. -/// -/// # Panics -/// -/// This function will panic if not called within the context of a task created by [`block_on`], -/// [`spawn`], or [`Builder::spawn`]. -/// -/// [`block_on`]: fn.block_on.html -/// [`spawn`]: fn.spawn.html -/// [`Builder::spawn`]: struct.Builder.html#method.spawn -/// -/// # Examples -/// -/// ``` -/// # async_std::task::block_on(async { -/// # -/// use async_std::task; -/// -/// println!("The name of this task is {:?}", task::current().name()); -/// # -/// # }) -/// ``` -pub fn current() -> Task { - get_task(|task| task.clone()).expect("`task::current()` called outside the context of a task") -} - -thread_local! { - static TAG: Cell<*const task::Tag> = Cell::new(ptr::null_mut()); -} - -pub(crate) fn set_tag(tag: *const task::Tag, f: F) -> R -where - F: FnOnce() -> R, -{ - struct ResetTag<'a>(&'a Cell<*const task::Tag>); - - impl Drop for ResetTag<'_> { - fn drop(&mut self) { - self.0.set(ptr::null()); - } - } - - TAG.with(|t| { - t.set(tag); - let _guard = ResetTag(t); - - f() - }) -} - -pub(crate) fn get_task(f: F) -> Option -where - F: FnOnce(&Task) -> R, -{ - let res = TAG.try_with(|tag| unsafe { tag.get().as_ref().map(task::Tag::task).map(f) }); - - match res { - Ok(Some(val)) => Some(val), - Ok(None) | Err(_) => None, - } -} - -thread_local! { - static IS_WORKER: Cell = Cell::new(false); - static QUEUE: Cell>> = Cell::new(None); -} - -pub(crate) fn is_worker() -> bool { - IS_WORKER.with(|is_worker| is_worker.get()) -} - -fn get_queue) -> T, T>(f: F) -> T { - QUEUE.with(|queue| { - let q = queue.take().unwrap(); - let ret = f(&q); - queue.set(Some(q)); - ret - }) -} - -pub(crate) fn schedule(task: task::Runnable) { - if is_worker() { - get_queue(|q| q.push(task)); - } else { - pool::get().injector.push(task); - } - pool::get().sleepers.notify_one(); -} - -pub(crate) fn main_loop(worker: Worker) { - IS_WORKER.with(|is_worker| is_worker.set(true)); - QUEUE.with(|queue| queue.set(Some(worker))); - - loop { - match get_queue(|q| pool::get().find_task(q)) { - Some(task) => set_tag(task.tag(), || abort_on_panic(|| task.run())), - None => pool::get().sleepers.wait(), - } - } -} diff --git a/src/task/yield_now.rs b/src/task/yield_now.rs index c11408ab..5cd0ce5b 100644 --- a/src/task/yield_now.rs +++ b/src/task/yield_now.rs @@ -1,8 +1,8 @@ +use std::pin::Pin; + use crate::future::Future; use crate::task::{Context, Poll}; -use std::pin::Pin; - /// Cooperatively gives up a timeslice to the task scheduler. /// /// Calling this function will move the currently executing future to the back diff --git a/src/utils.rs b/src/utils.rs index 60516d25..4f3ffe4f 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,5 +1,4 @@ use std::mem; -use std::process; /// Calls a function and aborts if it panics. /// @@ -10,7 +9,7 @@ pub fn abort_on_panic(f: impl FnOnce() -> T) -> T { impl Drop for Bomb { fn drop(&mut self) { - process::abort(); + std::process::abort(); } } @@ -20,6 +19,53 @@ pub fn abort_on_panic(f: impl FnOnce() -> T) -> T { t } +/// Generates a random number in `0..n`. +pub fn random(n: u32) -> u32 { + use std::cell::Cell; + use std::num::Wrapping; + + thread_local! { + static RNG: Cell> = Cell::new(Wrapping(1_406_868_647)); + } + + RNG.with(|rng| { + // This is the 32-bit variant of Xorshift. + // + // Source: https://en.wikipedia.org/wiki/Xorshift + let mut x = rng.get(); + x ^= x << 13; + x ^= x >> 17; + x ^= x << 5; + rng.set(x); + + // This is a fast alternative to `x % n`. + // + // Author: Daniel Lemire + // Source: https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ + ((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32 + }) +} + +/// Defers evaluation of a block of code until the end of the scope. +#[doc(hidden)] +macro_rules! defer { + ($($body:tt)*) => { + let _guard = { + pub struct Guard(Option); + + impl Drop for Guard { + fn drop(&mut self) { + (self.0).take().map(|f| f()); + } + } + + Guard(Some(|| { + let _ = { $($body)* }; + })) + }; + }; +} + /// Declares unstable items. #[doc(hidden)] macro_rules! cfg_unstable { diff --git a/tests/channel.rs b/tests/channel.rs index 0c40f5a7..f7938904 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -1,3 +1,5 @@ +#![cfg(feature = "unstable")] + use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; From 87de4e15982bbec890aa909600da5997c4f6edc7 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 1 Nov 2019 02:45:50 +0100 Subject: [PATCH 13/15] Add utility type WakerSet to the sync module (#390) * Add utility type Registry to the sync module * Remove unused import * Split unregister into complete and cancel * Refactoring and renaming * Split remove() into complete() and cancel() * Rename to WakerSet * Ignore clippy warning * Ignore another clippy warning * Use stronger SeqCst ordering --- benches/mutex.rs | 42 ++++++ benches/task.rs | 11 ++ src/sync/channel.rs | 336 ++++++++++-------------------------------- src/sync/mod.rs | 3 + src/sync/mutex.rs | 124 +++++----------- src/sync/rwlock.rs | 3 +- src/sync/waker_set.rs | 200 +++++++++++++++++++++++++ 7 files changed, 371 insertions(+), 348 deletions(-) create mode 100644 benches/mutex.rs create mode 100644 benches/task.rs create mode 100644 src/sync/waker_set.rs diff --git a/benches/mutex.rs b/benches/mutex.rs new file mode 100644 index 00000000..b159ba12 --- /dev/null +++ b/benches/mutex.rs @@ -0,0 +1,42 @@ +#![feature(test)] + +extern crate test; + +use std::sync::Arc; + +use async_std::sync::Mutex; +use async_std::task; +use test::Bencher; + +#[bench] +fn create(b: &mut Bencher) { + b.iter(|| Mutex::new(())); +} + +#[bench] +fn contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(10, 1000))); +} + +#[bench] +fn no_contention(b: &mut Bencher) { + b.iter(|| task::block_on(run(1, 10000))); +} + +async fn run(task: usize, iter: usize) { + let m = Arc::new(Mutex::new(())); + let mut tasks = Vec::new(); + + for _ in 0..task { + let m = m.clone(); + tasks.push(task::spawn(async move { + for _ in 0..iter { + let _ = m.lock().await; + } + })); + } + + for t in tasks { + t.await; + } +} diff --git a/benches/task.rs b/benches/task.rs new file mode 100644 index 00000000..b3144713 --- /dev/null +++ b/benches/task.rs @@ -0,0 +1,11 @@ +#![feature(test)] + +extern crate test; + +use async_std::task; +use test::Bencher; + +#[bench] +fn block_on(b: &mut Bencher) { + b.iter(|| task::block_on(async {})); +} diff --git a/src/sync/channel.rs b/src/sync/channel.rs index 6751417a..403bee74 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -4,17 +4,17 @@ use std::future::Future; use std::isize; use std::marker::PhantomData; use std::mem; -use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::process; use std::ptr; -use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{self, AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::{Context, Poll, Waker}; +use std::task::{Context, Poll}; use crossbeam_utils::{Backoff, CachePadded}; -use futures_core::stream::Stream; -use slab::Slab; + +use crate::stream::Stream; +use crate::sync::WakerSet; /// Creates a bounded multi-producer multi-consumer channel. /// @@ -128,7 +128,7 @@ impl Sender { /// ``` pub async fn send(&self, msg: T) { struct SendFuture<'a, T> { - sender: &'a Sender, + channel: &'a Channel, msg: Option, opt_key: Option, } @@ -142,23 +142,23 @@ impl Sender { let msg = self.msg.take().unwrap(); // Try sending the message. - let poll = match self.sender.channel.push(msg) { + let poll = match self.channel.try_send(msg) { Ok(()) => Poll::Ready(()), - Err(PushError::Disconnected(msg)) => { + Err(TrySendError::Disconnected(msg)) => { self.msg = Some(msg); Poll::Pending } - Err(PushError::Full(msg)) => { - // Register the current task. + Err(TrySendError::Full(msg)) => { + // Insert this send operation. match self.opt_key { - None => self.opt_key = Some(self.sender.channel.sends.register(cx)), - Some(key) => self.sender.channel.sends.reregister(key, cx), + None => self.opt_key = Some(self.channel.send_wakers.insert(cx)), + Some(key) => self.channel.send_wakers.update(key, cx), } // Try sending the message again. - match self.sender.channel.push(msg) { + match self.channel.try_send(msg) { Ok(()) => Poll::Ready(()), - Err(PushError::Disconnected(msg)) | Err(PushError::Full(msg)) => { + Err(TrySendError::Disconnected(msg)) | Err(TrySendError::Full(msg)) => { self.msg = Some(msg); Poll::Pending } @@ -167,10 +167,9 @@ impl Sender { }; if poll.is_ready() { - // If the current task was registered, unregister now. + // If the current task is in the set, remove it. if let Some(key) = self.opt_key.take() { - // `true` means the send operation is completed. - self.sender.channel.sends.unregister(key, true); + self.channel.send_wakers.complete(key); } } @@ -180,16 +179,16 @@ impl Sender { impl Drop for SendFuture<'_, T> { fn drop(&mut self) { - // If the current task was registered, unregister now. + // If the current task is still in the set, that means it is being cancelled now. + // Wake up another task instead. if let Some(key) = self.opt_key { - // `false` means the send operation is cancelled. - self.sender.channel.sends.unregister(key, false); + self.channel.send_wakers.cancel(key); } } } SendFuture { - sender: self, + channel: &self.channel, msg: Some(msg), opt_key: None, } @@ -340,7 +339,7 @@ pub struct Receiver { /// The inner channel. channel: Arc>, - /// The registration key for this receiver in the `channel.streams` registry. + /// The key for this receiver in the `channel.stream_wakers` set. opt_key: Option, } @@ -382,16 +381,20 @@ impl Receiver { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - poll_recv(&self.channel, &self.channel.recvs, &mut self.opt_key, cx) + poll_recv( + &self.channel, + &self.channel.recv_wakers, + &mut self.opt_key, + cx, + ) } } impl Drop for RecvFuture<'_, T> { fn drop(&mut self) { - // If the current task was registered, unregister now. + // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { - // `false` means the receive operation is cancelled. - self.channel.recvs.unregister(key, false); + self.channel.recv_wakers.cancel(key); } } } @@ -484,10 +487,9 @@ impl Receiver { impl Drop for Receiver { fn drop(&mut self) { - // If the current task was registered as blocked on this stream, unregister now. + // If the current task is still in the stream set, that means it is being cancelled now. if let Some(key) = self.opt_key { - // `false` means the last request for a stream item is cancelled. - self.channel.streams.unregister(key, false); + self.channel.stream_wakers.cancel(key); } // Decrement the receiver count and disconnect the channel if it drops down to zero. @@ -518,7 +520,12 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; - poll_recv(&this.channel, &this.channel.streams, &mut this.opt_key, cx) + poll_recv( + &this.channel, + &this.channel.stream_wakers, + &mut this.opt_key, + cx, + ) } } @@ -530,39 +537,38 @@ impl fmt::Debug for Receiver { /// Polls a receive operation on a channel. /// -/// If the receive operation is blocked, the current task will be registered in `registry` and its -/// registration key will then be stored in `opt_key`. +/// If the receive operation is blocked, the current task will be inserted into `wakers` and its +/// associated key will then be stored in `opt_key`. fn poll_recv( channel: &Channel, - registry: &Registry, + wakers: &WakerSet, opt_key: &mut Option, cx: &mut Context<'_>, ) -> Poll> { // Try receiving a message. - let poll = match channel.pop() { + let poll = match channel.try_recv() { Ok(msg) => Poll::Ready(Some(msg)), - Err(PopError::Disconnected) => Poll::Ready(None), - Err(PopError::Empty) => { - // Register the current task. + Err(TryRecvError::Disconnected) => Poll::Ready(None), + Err(TryRecvError::Empty) => { + // Insert this receive operation. match *opt_key { - None => *opt_key = Some(registry.register(cx)), - Some(key) => registry.reregister(key, cx), + None => *opt_key = Some(wakers.insert(cx)), + Some(key) => wakers.update(key, cx), } // Try receiving a message again. - match channel.pop() { + match channel.try_recv() { Ok(msg) => Poll::Ready(Some(msg)), - Err(PopError::Disconnected) => Poll::Ready(None), - Err(PopError::Empty) => Poll::Pending, + Err(TryRecvError::Disconnected) => Poll::Ready(None), + Err(TryRecvError::Empty) => Poll::Pending, } } }; if poll.is_ready() { - // If the current task was registered, unregister now. + // If the current task is in the set, remove it. if let Some(key) = opt_key.take() { - // `true` means the receive operation is completed. - registry.unregister(key, true); + wakers.complete(key); } } @@ -612,13 +618,13 @@ struct Channel { mark_bit: usize, /// Send operations waiting while the channel is full. - sends: Registry, + send_wakers: WakerSet, /// Receive operations waiting while the channel is empty and not disconnected. - recvs: Registry, + recv_wakers: WakerSet, /// Streams waiting while the channel is empty and not disconnected. - streams: Registry, + stream_wakers: WakerSet, /// The number of currently active `Sender`s. sender_count: AtomicUsize, @@ -672,17 +678,17 @@ impl Channel { mark_bit, head: CachePadded::new(AtomicUsize::new(head)), tail: CachePadded::new(AtomicUsize::new(tail)), - sends: Registry::new(), - recvs: Registry::new(), - streams: Registry::new(), + send_wakers: WakerSet::new(), + recv_wakers: WakerSet::new(), + stream_wakers: WakerSet::new(), sender_count: AtomicUsize::new(1), receiver_count: AtomicUsize::new(1), _marker: PhantomData, } } - /// Attempts to push a message. - fn push(&self, msg: T) -> Result<(), PushError> { + /// Attempts to send a message. + fn try_send(&self, msg: T) -> Result<(), TrySendError> { let backoff = Backoff::new(); let mut tail = self.tail.load(Ordering::Relaxed); @@ -721,10 +727,10 @@ impl Channel { slot.stamp.store(stamp, Ordering::Release); // Wake a blocked receive operation. - self.recvs.notify_one(); + self.recv_wakers.notify_one(); // Wake all blocked streams. - self.streams.notify_all(); + self.stream_wakers.notify_all(); return Ok(()); } @@ -743,9 +749,9 @@ impl Channel { // Check if the channel is disconnected. if tail & self.mark_bit != 0 { - return Err(PushError::Disconnected(msg)); + return Err(TrySendError::Disconnected(msg)); } else { - return Err(PushError::Full(msg)); + return Err(TrySendError::Full(msg)); } } @@ -759,8 +765,8 @@ impl Channel { } } - /// Attempts to pop a message. - fn pop(&self) -> Result { + /// Attempts to receive a message. + fn try_recv(&self) -> Result { let backoff = Backoff::new(); let mut head = self.head.load(Ordering::Relaxed); @@ -799,7 +805,7 @@ impl Channel { slot.stamp.store(stamp, Ordering::Release); // Wake a blocked send operation. - self.sends.notify_one(); + self.send_wakers.notify_one(); return Ok(msg); } @@ -816,10 +822,10 @@ impl Channel { if (tail & !self.mark_bit) == head { // If the channel is disconnected... if tail & self.mark_bit != 0 { - return Err(PopError::Disconnected); + return Err(TryRecvError::Disconnected); } else { // Otherwise, the receive operation is not ready. - return Err(PopError::Empty); + return Err(TryRecvError::Empty); } } @@ -888,9 +894,9 @@ impl Channel { if tail & self.mark_bit == 0 { // Notify everyone blocked on this channel. - self.sends.notify_all(); - self.recvs.notify_all(); - self.streams.notify_all(); + self.send_wakers.notify_all(); + self.recv_wakers.notify_all(); + self.stream_wakers.notify_all(); } } } @@ -921,8 +927,8 @@ impl Drop for Channel { } } -/// An error returned from the `push()` method. -enum PushError { +/// An error returned from the `try_send()` method. +enum TrySendError { /// The channel is full but not disconnected. Full(T), @@ -930,203 +936,11 @@ enum PushError { Disconnected(T), } -/// An error returned from the `pop()` method. -enum PopError { +/// An error returned from the `try_recv()` method. +enum TryRecvError { /// The channel is empty but not disconnected. Empty, /// The channel is empty and disconnected. Disconnected, } - -/// A list of blocked channel operations. -struct Blocked { - /// A list of registered channel operations. - /// - /// Each entry has a waker associated with the task that is executing the operation. If the - /// waker is set to `None`, that means the task has been woken up but hasn't removed itself - /// from the registry yet. - entries: Slab>, - - /// The number of wakers in the entry list. - waker_count: usize, -} - -/// A registry of blocked channel operations. -/// -/// Blocked operations register themselves in a registry. Successful operations on the opposite -/// side of the channel wake blocked operations in the registry. -struct Registry { - /// A list of blocked channel operations. - blocked: Spinlock, - - /// Set to `true` if there are no wakers in the registry. - /// - /// Note that this either means there are no entries in the registry, or that all entries have - /// been notified. - is_empty: AtomicBool, -} - -impl Registry { - /// Creates a new registry. - fn new() -> Registry { - Registry { - blocked: Spinlock::new(Blocked { - entries: Slab::new(), - waker_count: 0, - }), - is_empty: AtomicBool::new(true), - } - } - - /// Registers a blocked channel operation and returns a key associated with it. - fn register(&self, cx: &Context<'_>) -> usize { - let mut blocked = self.blocked.lock(); - - // Insert a new entry into the list of blocked tasks. - let w = cx.waker().clone(); - let key = blocked.entries.insert(Some(w)); - - blocked.waker_count += 1; - if blocked.waker_count == 1 { - self.is_empty.store(false, Ordering::SeqCst); - } - - key - } - - /// Re-registers a blocked channel operation by filling in its waker. - fn reregister(&self, key: usize, cx: &Context<'_>) { - let mut blocked = self.blocked.lock(); - - let was_none = blocked.entries[key].is_none(); - let w = cx.waker().clone(); - blocked.entries[key] = Some(w); - - if was_none { - blocked.waker_count += 1; - if blocked.waker_count == 1 { - self.is_empty.store(false, Ordering::SeqCst); - } - } - } - - /// Unregisters a channel operation. - /// - /// If `completed` is `true`, the operation will be removed from the registry. If `completed` - /// is `false`, that means the operation was cancelled so another one will be notified. - fn unregister(&self, key: usize, completed: bool) { - let mut blocked = self.blocked.lock(); - let mut removed = false; - - match blocked.entries.remove(key) { - Some(_) => removed = true, - None => { - if !completed { - // This operation was cancelled. Notify another one. - if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - removed = true; - } - } - } - } - } - - if removed { - blocked.waker_count -= 1; - if blocked.waker_count == 0 { - self.is_empty.store(true, Ordering::SeqCst); - } - } - } - - /// Notifies one blocked channel operation. - #[inline] - fn notify_one(&self) { - if !self.is_empty.load(Ordering::SeqCst) { - let mut blocked = self.blocked.lock(); - - if let Some((_, opt_waker)) = blocked.entries.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - - blocked.waker_count -= 1; - if blocked.waker_count == 0 { - self.is_empty.store(true, Ordering::SeqCst); - } - } - } - } - } - - /// Notifies all blocked channel operations. - #[inline] - fn notify_all(&self) { - if !self.is_empty.load(Ordering::SeqCst) { - let mut blocked = self.blocked.lock(); - - for (_, opt_waker) in blocked.entries.iter_mut() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - - blocked.waker_count = 0; - self.is_empty.store(true, Ordering::SeqCst); - } - } -} - -/// A simple spinlock. -struct Spinlock { - flag: AtomicBool, - value: UnsafeCell, -} - -impl Spinlock { - /// Returns a new spinlock initialized with `value`. - fn new(value: T) -> Spinlock { - Spinlock { - flag: AtomicBool::new(false), - value: UnsafeCell::new(value), - } - } - - /// Locks the spinlock. - fn lock(&self) -> SpinlockGuard<'_, T> { - let backoff = Backoff::new(); - while self.flag.swap(true, Ordering::Acquire) { - backoff.snooze(); - } - SpinlockGuard { parent: self } - } -} - -/// A guard holding a spinlock locked. -struct SpinlockGuard<'a, T> { - parent: &'a Spinlock, -} - -impl<'a, T> Drop for SpinlockGuard<'a, T> { - fn drop(&mut self) { - self.parent.flag.store(false, Ordering::Release); - } -} - -impl<'a, T> Deref for SpinlockGuard<'a, T> { - type Target = T; - - fn deref(&self) -> &T { - unsafe { &*self.parent.value.get() } - } -} - -impl<'a, T> DerefMut for SpinlockGuard<'a, T> { - fn deref_mut(&mut self) -> &mut T { - unsafe { &mut *self.parent.value.get() } - } -} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index d10e6bdf..ea991fe6 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -191,3 +191,6 @@ cfg_unstable! { mod barrier; mod channel; } + +pub(crate) mod waker_set; +pub(crate) use waker_set::WakerSet; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index cd7a3577..fcd030d8 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -2,18 +2,11 @@ use std::cell::UnsafeCell; use std::fmt; use std::ops::{Deref, DerefMut}; use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; - -use slab::Slab; +use std::sync::atomic::{AtomicBool, Ordering}; use crate::future::Future; -use crate::task::{Context, Poll, Waker}; - -/// Set if the mutex is locked. -const LOCK: usize = 1; - -/// Set if there are tasks blocked on the mutex. -const BLOCKED: usize = 1 << 1; +use crate::sync::WakerSet; +use crate::task::{Context, Poll}; /// A mutual exclusion primitive for protecting shared data. /// @@ -49,8 +42,8 @@ const BLOCKED: usize = 1 << 1; /// # }) /// ``` pub struct Mutex { - state: AtomicUsize, - blocked: std::sync::Mutex>>, + locked: AtomicBool, + wakers: WakerSet, value: UnsafeCell, } @@ -69,8 +62,8 @@ impl Mutex { /// ``` pub fn new(t: T) -> Mutex { Mutex { - state: AtomicUsize::new(0), - blocked: std::sync::Mutex::new(Slab::new()), + locked: AtomicBool::new(false), + wakers: WakerSet::new(), value: UnsafeCell::new(t), } } @@ -105,75 +98,46 @@ impl Mutex { pub struct LockFuture<'a, T> { mutex: &'a Mutex, opt_key: Option, - acquired: bool, } impl<'a, T> Future for LockFuture<'a, T> { type Output = MutexGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.mutex.try_lock() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } + let poll = match self.mutex.try_lock() { + Some(guard) => Poll::Ready(guard), None => { - let mut blocked = self.mutex.blocked.lock().unwrap(); - - // Register the current task. + // Insert this lock operation. match self.opt_key { - None => { - // Insert a new entry into the list of blocked tasks. - let w = cx.waker().clone(); - let key = blocked.insert(Some(w)); - self.opt_key = Some(key); - - if blocked.len() == 1 { - self.mutex.state.fetch_or(BLOCKED, Ordering::Relaxed); - } - } - Some(key) => { - // There is already an entry in the list of blocked tasks. Just - // reset the waker if it was removed. - if blocked[key].is_none() { - let w = cx.waker().clone(); - blocked[key] = Some(w); - } - } + None => self.opt_key = Some(self.mutex.wakers.insert(cx)), + Some(key) => self.mutex.wakers.update(key, cx), } // Try locking again because it's possible the mutex got unlocked just - // before the current task was registered as a blocked task. + // before the current task was inserted into the waker set. match self.mutex.try_lock() { - Some(guard) => { - self.acquired = true; - Poll::Ready(guard) - } + Some(guard) => Poll::Ready(guard), None => Poll::Pending, } } + }; + + if poll.is_ready() { + // If the current task is in the set, remove it. + if let Some(key) = self.opt_key.take() { + self.mutex.wakers.complete(key); + } } + + poll } } impl Drop for LockFuture<'_, T> { fn drop(&mut self) { + // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { - let mut blocked = self.mutex.blocked.lock().unwrap(); - let opt_waker = blocked.remove(key); - - if opt_waker.is_none() && !self.acquired { - // We were awoken but didn't acquire the lock. Wake up another task. - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - } - - if blocked.is_empty() { - self.mutex.state.fetch_and(!BLOCKED, Ordering::Relaxed); - } + self.mutex.wakers.cancel(key); } } } @@ -181,7 +145,6 @@ impl Mutex { LockFuture { mutex: self, opt_key: None, - acquired: false, } .await } @@ -220,7 +183,7 @@ impl Mutex { /// # }) /// ``` pub fn try_lock(&self) -> Option> { - if self.state.fetch_or(LOCK, Ordering::Acquire) & LOCK == 0 { + if !self.locked.swap(true, Ordering::SeqCst) { Some(MutexGuard(self)) } else { None @@ -266,18 +229,15 @@ impl Mutex { impl fmt::Debug for Mutex { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.try_lock() { - None => { - struct LockedPlaceholder; - impl fmt::Debug for LockedPlaceholder { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("") - } - } - f.debug_struct("Mutex") - .field("data", &LockedPlaceholder) - .finish() + struct Locked; + impl fmt::Debug for Locked { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str("") } + } + + match self.try_lock() { + None => f.debug_struct("Mutex").field("data", &Locked).finish(), Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(), } } @@ -303,19 +263,11 @@ unsafe impl Sync for MutexGuard<'_, T> {} impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { - let state = self.0.state.fetch_and(!LOCK, Ordering::AcqRel); - - // If there are any blocked tasks, wake one of them up. - if state & BLOCKED != 0 { - let mut blocked = self.0.blocked.lock().unwrap(); + // Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`. + self.0.locked.store(false, Ordering::SeqCst); - if let Some((_, opt_waker)) = blocked.iter_mut().next() { - // If there is no waker in this entry, that means it was already woken. - if let Some(w) = opt_waker.take() { - w.wake(); - } - } - } + // Notify one blocked `lock()` operation. + self.0.wakers.notify_one(); } } diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index ed1d2185..a0d0f07a 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -10,7 +10,8 @@ use crate::future::Future; use crate::task::{Context, Poll, Waker}; /// Set if a write lock is held. -const WRITE_LOCK: usize = 1; +#[allow(clippy::identity_op)] +const WRITE_LOCK: usize = 1 << 0; /// Set if there are read operations blocked on the lock. const BLOCKED_READS: usize = 1 << 1; diff --git a/src/sync/waker_set.rs b/src/sync/waker_set.rs new file mode 100644 index 00000000..8fd1b621 --- /dev/null +++ b/src/sync/waker_set.rs @@ -0,0 +1,200 @@ +//! A common utility for building synchronization primitives. +//! +//! When an async operation is blocked, it needs to register itself somewhere so that it can be +//! notified later on. The `WakerSet` type helps with keeping track of such async operations and +//! notifying them when they may make progress. + +use std::cell::UnsafeCell; +use std::ops::{Deref, DerefMut}; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use crossbeam_utils::Backoff; +use slab::Slab; + +use crate::task::{Context, Waker}; + +/// Set when the entry list is locked. +#[allow(clippy::identity_op)] +const LOCKED: usize = 1 << 0; + +/// Set when there are tasks for `notify_one()` to wake. +const NOTIFY_ONE: usize = 1 << 1; + +/// Set when there are tasks for `notify_all()` to wake. +const NOTIFY_ALL: usize = 1 << 2; + +/// Inner representation of `WakerSet`. +struct Inner { + /// A list of entries in the set. + /// + /// Each entry has an optional waker associated with the task that is executing the operation. + /// If the waker is set to `None`, that means the task has been woken up but hasn't removed + /// itself from the `WakerSet` yet. + /// + /// The key of each entry is its index in the `Slab`. + entries: Slab>, + + /// The number of entries that have the waker set to `None`. + none_count: usize, +} + +/// A set holding wakers. +pub struct WakerSet { + /// Holds three bits: `LOCKED`, `NOTIFY_ONE`, and `NOTIFY_ALL`. + flag: AtomicUsize, + + /// A set holding wakers. + inner: UnsafeCell, +} + +impl WakerSet { + /// Creates a new `WakerSet`. + #[inline] + pub fn new() -> WakerSet { + WakerSet { + flag: AtomicUsize::new(0), + inner: UnsafeCell::new(Inner { + entries: Slab::new(), + none_count: 0, + }), + } + } + + /// Inserts a waker for a blocked operation and returns a key associated with it. + pub fn insert(&self, cx: &Context<'_>) -> usize { + let w = cx.waker().clone(); + self.lock().entries.insert(Some(w)) + } + + /// Updates the waker of a previously inserted entry. + pub fn update(&self, key: usize, cx: &Context<'_>) { + let mut inner = self.lock(); + + match &mut inner.entries[key] { + None => { + // Fill in the waker. + let w = cx.waker().clone(); + inner.entries[key] = Some(w); + inner.none_count -= 1; + } + Some(w) => { + // Replace the waker if the existing one is different. + if !w.will_wake(cx.waker()) { + *w = cx.waker().clone(); + } + } + } + } + + /// Removes the waker of a completed operation. + pub fn complete(&self, key: usize) { + let mut inner = self.lock(); + if inner.entries.remove(key).is_none() { + inner.none_count -= 1; + } + } + + /// Removes the waker of a cancelled operation. + pub fn cancel(&self, key: usize) { + let mut inner = self.lock(); + if inner.entries.remove(key).is_none() { + inner.none_count -= 1; + + // The operation was cancelled and notified so notify another operation instead. + if let Some((_, opt_waker)) = inner.entries.iter_mut().next() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + inner.none_count += 1; + } + } + } + } + + /// Notifies one blocked operation. + #[inline] + pub fn notify_one(&self) { + // Use `SeqCst` ordering to synchronize with `Lock::drop()`. + if self.flag.load(Ordering::SeqCst) & NOTIFY_ONE != 0 { + self.notify(false); + } + } + + /// Notifies all blocked operations. + // TODO: Delete this attribute when `crate::sync::channel()` is stabilized. + #[cfg(feature = "unstable")] + #[inline] + pub fn notify_all(&self) { + // Use `SeqCst` ordering to synchronize with `Lock::drop()`. + if self.flag.load(Ordering::SeqCst) & NOTIFY_ALL != 0 { + self.notify(true); + } + } + + /// Notifies blocked operations, either one or all of them. + fn notify(&self, all: bool) { + let mut inner = &mut *self.lock(); + + for (_, opt_waker) in inner.entries.iter_mut() { + // If there is no waker in this entry, that means it was already woken. + if let Some(w) = opt_waker.take() { + w.wake(); + inner.none_count += 1; + } + if !all { + break; + } + } + } + + /// Locks the list of entries. + #[cold] + fn lock(&self) -> Lock<'_> { + let backoff = Backoff::new(); + while self.flag.fetch_or(LOCKED, Ordering::Acquire) & LOCKED != 0 { + backoff.snooze(); + } + Lock { waker_set: self } + } +} + +/// A guard holding a `WakerSet` locked. +struct Lock<'a> { + waker_set: &'a WakerSet, +} + +impl Drop for Lock<'_> { + #[inline] + fn drop(&mut self) { + let mut flag = 0; + + // If there is at least one entry and all are `Some`, then `notify_one()` has work to do. + if !self.entries.is_empty() && self.none_count == 0 { + flag |= NOTIFY_ONE; + } + + // If there is at least one `Some` entry, then `notify_all()` has work to do. + if self.entries.len() - self.none_count > 0 { + flag |= NOTIFY_ALL; + } + + // Use `SeqCst` ordering to synchronize with `WakerSet::lock_to_notify()`. + self.waker_set.flag.store(flag, Ordering::SeqCst); + } +} + +impl Deref for Lock<'_> { + type Target = Inner; + + #[inline] + fn deref(&self) -> &Inner { + unsafe { &*self.waker_set.inner.get() } + } +} + +impl DerefMut for Lock<'_> { + #[inline] + fn deref_mut(&mut self) -> &mut Inner { + unsafe { &mut *self.waker_set.inner.get() } + } +} From 277fd521bc78c2d5502c7565cd5d0d8147318fdc Mon Sep 17 00:00:00 2001 From: Yuki Okushi Date: Fri, 1 Nov 2019 11:30:51 +0900 Subject: [PATCH 14/15] Remove deprecated action --- .github/workflows/ci.yml | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dd8ec899..ac49bd6e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -52,15 +52,10 @@ jobs: steps: - uses: actions/checkout@master - - id: component - uses: actions-rs/components-nightly@v1 - with: - component: rustfmt - - uses: actions-rs/toolchain@v1 with: profile: minimal - toolchain: ${{ steps.component.outputs.toolchain }} + toolchain: nightly override: true components: rustfmt From a3b742188d6ba9e5a3d20865527164a02898db2e Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 1 Nov 2019 12:54:43 +0100 Subject: [PATCH 15/15] fix doc tests (#431) * fix doc tests Signed-off-by: Yoshua Wuyts * cargo fmt Signed-off-by: Yoshua Wuyts --- src/stream/stream/min.rs | 2 +- src/stream/stream/mod.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/min.rs b/src/stream/stream/min.rs index 1ab56065..b4a8c7c1 100644 --- a/src/stream/stream/min.rs +++ b/src/stream/stream/min.rs @@ -1,5 +1,5 @@ +use std::cmp::{Ord, Ordering}; use std::marker::PhantomData; -use std::cmp::{Ordering, Ord}; use std::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 71139f4c..f7905ed2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -774,11 +774,10 @@ extension_trait! { # Examples - ``` + ```ignore # fn main() { async_std::task::block_on(async { # use std::collections::VecDeque; - use async_std::prelude::*; let s: VecDeque = vec![1, 2, 3].into_iter().collect();