forked from mirror/async-std
Merge branch 'master' into fix_doc_test
commit
d30603affe
@ -1,380 +0,0 @@
|
|||||||
use std::fmt;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
|
|
||||||
use mio::{self, Evented};
|
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use slab::Slab;
|
|
||||||
|
|
||||||
use crate::io;
|
|
||||||
use crate::task::{Context, Poll, Waker};
|
|
||||||
use crate::utils::abort_on_panic;
|
|
||||||
|
|
||||||
/// Data associated with a registered I/O handle.
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct Entry {
|
|
||||||
/// A unique identifier.
|
|
||||||
token: mio::Token,
|
|
||||||
|
|
||||||
/// Tasks that are blocked on reading from this I/O handle.
|
|
||||||
readers: Mutex<Readers>,
|
|
||||||
|
|
||||||
/// Thasks that are blocked on writing to this I/O handle.
|
|
||||||
writers: Mutex<Writers>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The set of `Waker`s interested in read readiness.
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct Readers {
|
|
||||||
/// Flag indicating read readiness.
|
|
||||||
/// (cf. `Watcher::poll_read_ready`)
|
|
||||||
ready: bool,
|
|
||||||
/// The `Waker`s blocked on reading.
|
|
||||||
wakers: Vec<Waker>
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The set of `Waker`s interested in write readiness.
|
|
||||||
#[derive(Debug)]
|
|
||||||
struct Writers {
|
|
||||||
/// Flag indicating write readiness.
|
|
||||||
/// (cf. `Watcher::poll_write_ready`)
|
|
||||||
ready: bool,
|
|
||||||
/// The `Waker`s blocked on writing.
|
|
||||||
wakers: Vec<Waker>
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The state of a networking driver.
|
|
||||||
struct Reactor {
|
|
||||||
/// A mio instance that polls for new events.
|
|
||||||
poller: mio::Poll,
|
|
||||||
|
|
||||||
/// A collection of registered I/O handles.
|
|
||||||
entries: Mutex<Slab<Arc<Entry>>>,
|
|
||||||
|
|
||||||
/// Dummy I/O handle that is only used to wake up the polling thread.
|
|
||||||
notify_reg: (mio::Registration, mio::SetReadiness),
|
|
||||||
|
|
||||||
/// An identifier for the notification handle.
|
|
||||||
notify_token: mio::Token,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Reactor {
|
|
||||||
/// Creates a new reactor for polling I/O events.
|
|
||||||
fn new() -> io::Result<Reactor> {
|
|
||||||
let poller = mio::Poll::new()?;
|
|
||||||
let notify_reg = mio::Registration::new2();
|
|
||||||
|
|
||||||
let mut reactor = Reactor {
|
|
||||||
poller,
|
|
||||||
entries: Mutex::new(Slab::new()),
|
|
||||||
notify_reg,
|
|
||||||
notify_token: mio::Token(0),
|
|
||||||
};
|
|
||||||
|
|
||||||
// Register a dummy I/O handle for waking up the polling thread.
|
|
||||||
let entry = reactor.register(&reactor.notify_reg.0)?;
|
|
||||||
reactor.notify_token = entry.token;
|
|
||||||
|
|
||||||
Ok(reactor)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Registers an I/O event source and returns its associated entry.
|
|
||||||
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
|
|
||||||
let mut entries = self.entries.lock().unwrap();
|
|
||||||
|
|
||||||
// Reserve a vacant spot in the slab and use its key as the token value.
|
|
||||||
let vacant = entries.vacant_entry();
|
|
||||||
let token = mio::Token(vacant.key());
|
|
||||||
|
|
||||||
// Allocate an entry and insert it into the slab.
|
|
||||||
let entry = Arc::new(Entry {
|
|
||||||
token,
|
|
||||||
readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }),
|
|
||||||
writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }),
|
|
||||||
});
|
|
||||||
vacant.insert(entry.clone());
|
|
||||||
|
|
||||||
// Register the I/O event source in the poller.
|
|
||||||
let interest = mio::Ready::all();
|
|
||||||
let opts = mio::PollOpt::edge();
|
|
||||||
self.poller.register(source, token, interest, opts)?;
|
|
||||||
|
|
||||||
Ok(entry)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Deregisters an I/O event source associated with an entry.
|
|
||||||
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
|
|
||||||
// Deregister the I/O object from the mio instance.
|
|
||||||
self.poller.deregister(source)?;
|
|
||||||
|
|
||||||
// Remove the entry associated with the I/O object.
|
|
||||||
self.entries.lock().unwrap().remove(entry.token.0);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// fn notify(&self) {
|
|
||||||
// self.notify_reg
|
|
||||||
// .1
|
|
||||||
// .set_readiness(mio::Ready::readable())
|
|
||||||
// .unwrap();
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The state of the global networking driver.
|
|
||||||
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
|
|
||||||
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
|
|
||||||
// handles.
|
|
||||||
std::thread::Builder::new()
|
|
||||||
.name("async-std/net".to_string())
|
|
||||||
.spawn(move || {
|
|
||||||
// If the driver thread panics, there's not much we can do. It is not a
|
|
||||||
// recoverable error and there is no place to propagate it into so we just abort.
|
|
||||||
abort_on_panic(|| {
|
|
||||||
main_loop().expect("async networking thread has panicked");
|
|
||||||
})
|
|
||||||
})
|
|
||||||
.expect("cannot start a thread driving blocking tasks");
|
|
||||||
|
|
||||||
Reactor::new().expect("cannot initialize reactor")
|
|
||||||
});
|
|
||||||
|
|
||||||
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
|
|
||||||
fn main_loop() -> io::Result<()> {
|
|
||||||
let reactor = &REACTOR;
|
|
||||||
let mut events = mio::Events::with_capacity(1000);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// Block on the poller until at least one new event comes in.
|
|
||||||
reactor.poller.poll(&mut events, None)?;
|
|
||||||
|
|
||||||
// Lock the entire entry table while we're processing new events.
|
|
||||||
let entries = reactor.entries.lock().unwrap();
|
|
||||||
|
|
||||||
for event in events.iter() {
|
|
||||||
let token = event.token();
|
|
||||||
|
|
||||||
if token == reactor.notify_token {
|
|
||||||
// If this is the notification token, we just need the notification state.
|
|
||||||
reactor.notify_reg.1.set_readiness(mio::Ready::empty())?;
|
|
||||||
} else {
|
|
||||||
// Otherwise, look for the entry associated with this token.
|
|
||||||
if let Some(entry) = entries.get(token.0) {
|
|
||||||
// Set the readiness flags from this I/O event.
|
|
||||||
let readiness = event.readiness();
|
|
||||||
|
|
||||||
// Wake up reader tasks blocked on this I/O handle.
|
|
||||||
if !(readiness & reader_interests()).is_empty() {
|
|
||||||
let mut readers = entry.readers.lock().unwrap();
|
|
||||||
readers.ready = true;
|
|
||||||
for w in readers.wakers.drain(..) {
|
|
||||||
w.wake();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wake up writer tasks blocked on this I/O handle.
|
|
||||||
if !(readiness & writer_interests()).is_empty() {
|
|
||||||
let mut writers = entry.writers.lock().unwrap();
|
|
||||||
writers.ready = true;
|
|
||||||
for w in writers.wakers.drain(..) {
|
|
||||||
w.wake();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// An I/O handle powered by the networking driver.
|
|
||||||
///
|
|
||||||
/// This handle wraps an I/O event source and exposes a "futurized" interface on top of it,
|
|
||||||
/// implementing traits `AsyncRead` and `AsyncWrite`.
|
|
||||||
pub struct Watcher<T: Evented> {
|
|
||||||
/// Data associated with the I/O handle.
|
|
||||||
entry: Arc<Entry>,
|
|
||||||
|
|
||||||
/// The I/O event source.
|
|
||||||
source: Option<T>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Evented> Watcher<T> {
|
|
||||||
/// Creates a new I/O handle.
|
|
||||||
///
|
|
||||||
/// The provided I/O event source will be kept registered inside the reactor's poller for the
|
|
||||||
/// lifetime of the returned I/O handle.
|
|
||||||
pub fn new(source: T) -> Watcher<T> {
|
|
||||||
Watcher {
|
|
||||||
entry: REACTOR
|
|
||||||
.register(&source)
|
|
||||||
.expect("cannot register an I/O event source"),
|
|
||||||
source: Some(source),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a reference to the inner I/O event source.
|
|
||||||
pub fn get_ref(&self) -> &T {
|
|
||||||
self.source.as_ref().unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Polls the inner I/O source for a non-blocking read operation.
|
|
||||||
///
|
|
||||||
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
|
|
||||||
/// will be registered for wakeup when the I/O source becomes readable.
|
|
||||||
pub fn poll_read_with<'a, F, R>(&'a self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
|
|
||||||
where
|
|
||||||
F: FnMut(&'a T) -> io::Result<R>,
|
|
||||||
{
|
|
||||||
// If the operation isn't blocked, return its result.
|
|
||||||
match f(self.source.as_ref().unwrap()) {
|
|
||||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
||||||
res => return Poll::Ready(res),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock the waker list.
|
|
||||||
let mut readers = self.entry.readers.lock().unwrap();
|
|
||||||
|
|
||||||
// Try running the operation again.
|
|
||||||
match f(self.source.as_ref().unwrap()) {
|
|
||||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
||||||
res => return Poll::Ready(res),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register the task if it isn't registered already.
|
|
||||||
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
|
|
||||||
readers.wakers.push(cx.waker().clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
readers.ready = false;
|
|
||||||
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Polls the inner I/O source for a non-blocking write operation.
|
|
||||||
///
|
|
||||||
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
|
|
||||||
/// will be registered for wakeup when the I/O source becomes writable.
|
|
||||||
pub fn poll_write_with<'a, F, R>(
|
|
||||||
&'a self,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
mut f: F,
|
|
||||||
) -> Poll<io::Result<R>>
|
|
||||||
where
|
|
||||||
F: FnMut(&'a T) -> io::Result<R>,
|
|
||||||
{
|
|
||||||
// If the operation isn't blocked, return its result.
|
|
||||||
match f(self.source.as_ref().unwrap()) {
|
|
||||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
||||||
res => return Poll::Ready(res),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lock the waker list.
|
|
||||||
let mut writers = self.entry.writers.lock().unwrap();
|
|
||||||
|
|
||||||
// Try running the operation again.
|
|
||||||
match f(self.source.as_ref().unwrap()) {
|
|
||||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
||||||
res => return Poll::Ready(res),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register the task if it isn't registered already.
|
|
||||||
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
|
|
||||||
writers.wakers.push(cx.waker().clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
writers.ready = false;
|
|
||||||
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Polls the inner I/O source until a non-blocking read can be performed.
|
|
||||||
///
|
|
||||||
/// If non-blocking reads are currently not possible, the `Waker`
|
|
||||||
/// will be saved and notified when it can read non-blocking
|
|
||||||
/// again.
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
|
|
||||||
// Lock the waker list.
|
|
||||||
let mut readers = self.entry.readers.lock().unwrap();
|
|
||||||
if readers.ready {
|
|
||||||
return Poll::Ready(())
|
|
||||||
}
|
|
||||||
// Register the task if it isn't registered already.
|
|
||||||
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
|
|
||||||
readers.wakers.push(cx.waker().clone());
|
|
||||||
}
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Polls the inner I/O source until a non-blocking write can be performed.
|
|
||||||
///
|
|
||||||
/// If non-blocking writes are currently not possible, the `Waker`
|
|
||||||
/// will be saved and notified when it can write non-blocking
|
|
||||||
/// again.
|
|
||||||
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
|
|
||||||
// Lock the waker list.
|
|
||||||
let mut writers = self.entry.writers.lock().unwrap();
|
|
||||||
if writers.ready {
|
|
||||||
return Poll::Ready(())
|
|
||||||
}
|
|
||||||
// Register the task if it isn't registered already.
|
|
||||||
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
|
|
||||||
writers.wakers.push(cx.waker().clone());
|
|
||||||
}
|
|
||||||
Poll::Pending
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Deregisters and returns the inner I/O source.
|
|
||||||
///
|
|
||||||
/// This method is typically used to convert `Watcher`s to raw file descriptors/handles.
|
|
||||||
#[allow(dead_code)]
|
|
||||||
pub fn into_inner(mut self) -> T {
|
|
||||||
let source = self.source.take().unwrap();
|
|
||||||
REACTOR
|
|
||||||
.deregister(&source, &self.entry)
|
|
||||||
.expect("cannot deregister I/O event source");
|
|
||||||
source
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Evented> Drop for Watcher<T> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
if let Some(ref source) = self.source {
|
|
||||||
REACTOR
|
|
||||||
.deregister(source, &self.entry)
|
|
||||||
.expect("cannot deregister I/O event source");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
f.debug_struct("Watcher")
|
|
||||||
.field("entry", &self.entry)
|
|
||||||
.field("source", &self.source)
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a mask containing flags that interest tasks reading from I/O handles.
|
|
||||||
#[inline]
|
|
||||||
fn reader_interests() -> mio::Ready {
|
|
||||||
mio::Ready::all() - mio::Ready::writable()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a mask containing flags that interest tasks writing into I/O handles.
|
|
||||||
#[inline]
|
|
||||||
fn writer_interests() -> mio::Ready {
|
|
||||||
mio::Ready::writable() | hup()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns a flag containing the hangup status.
|
|
||||||
#[inline]
|
|
||||||
fn hup() -> mio::Ready {
|
|
||||||
#[cfg(unix)]
|
|
||||||
let ready = mio::unix::UnixReady::hup().into();
|
|
||||||
|
|
||||||
#[cfg(not(unix))]
|
|
||||||
let ready = mio::Ready::empty();
|
|
||||||
|
|
||||||
ready
|
|
||||||
}
|
|
@ -0,0 +1,34 @@
|
|||||||
|
//! The runtime.
|
||||||
|
|
||||||
|
use std::env;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
|
||||||
|
use crate::future;
|
||||||
|
|
||||||
|
/// Dummy runtime struct.
|
||||||
|
pub struct Runtime {}
|
||||||
|
|
||||||
|
/// The global runtime.
|
||||||
|
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||||
|
// Create an executor thread pool.
|
||||||
|
|
||||||
|
let thread_count = env::var("ASYNC_STD_THREAD_COUNT")
|
||||||
|
.map(|env| {
|
||||||
|
env.parse()
|
||||||
|
.expect("ASYNC_STD_THREAD_COUNT must be a number")
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|_| num_cpus::get())
|
||||||
|
.max(1);
|
||||||
|
|
||||||
|
let thread_name = env::var("ASYNC_STD_THREAD_NAME").unwrap_or("async-std/runtime".to_string());
|
||||||
|
|
||||||
|
for _ in 0..thread_count {
|
||||||
|
thread::Builder::new()
|
||||||
|
.name(thread_name.clone())
|
||||||
|
.spawn(|| smol::run(future::pending::<()>()))
|
||||||
|
.expect("cannot start a runtime thread");
|
||||||
|
}
|
||||||
|
Runtime {}
|
||||||
|
});
|
@ -0,0 +1,417 @@
|
|||||||
|
use std::fmt;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use super::mutex::{guard_lock, MutexGuard};
|
||||||
|
use crate::future::{timeout, Future};
|
||||||
|
use crate::sync::WakerSet;
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
|
||||||
|
pub struct WaitTimeoutResult(bool);
|
||||||
|
|
||||||
|
/// A type indicating whether a timed wait on a condition variable returned due to a time out or
|
||||||
|
/// not
|
||||||
|
impl WaitTimeoutResult {
|
||||||
|
/// Returns `true` if the wait was known to have timed out.
|
||||||
|
pub fn timed_out(self) -> bool {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A Condition Variable
|
||||||
|
///
|
||||||
|
/// This type is an async version of [`std::sync::Mutex`].
|
||||||
|
///
|
||||||
|
/// [`std::sync::Condvar`]: https://doc.rust-lang.org/std/sync/struct.Condvar.html
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use std::sync::Arc;
|
||||||
|
///
|
||||||
|
/// use async_std::sync::{Mutex, Condvar};
|
||||||
|
/// use async_std::task;
|
||||||
|
///
|
||||||
|
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||||
|
/// let pair2 = pair.clone();
|
||||||
|
///
|
||||||
|
/// // Inside of our lock, spawn a new thread, and then wait for it to start.
|
||||||
|
/// task::spawn(async move {
|
||||||
|
/// let (lock, cvar) = &*pair2;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// *started = true;
|
||||||
|
/// // We notify the condvar that the value has changed.
|
||||||
|
/// cvar.notify_one();
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// // Wait for the thread to start up.
|
||||||
|
/// let (lock, cvar) = &*pair;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// while !*started {
|
||||||
|
/// started = cvar.wait(started).await;
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// # })
|
||||||
|
/// ```
|
||||||
|
pub struct Condvar {
|
||||||
|
wakers: WakerSet,
|
||||||
|
}
|
||||||
|
|
||||||
|
unsafe impl Send for Condvar {}
|
||||||
|
unsafe impl Sync for Condvar {}
|
||||||
|
|
||||||
|
impl Default for Condvar {
|
||||||
|
fn default() -> Self {
|
||||||
|
Condvar::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Condvar {
|
||||||
|
/// Creates a new condition variable
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use async_std::sync::Condvar;
|
||||||
|
///
|
||||||
|
/// let cvar = Condvar::new();
|
||||||
|
/// ```
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Condvar {
|
||||||
|
wakers: WakerSet::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Blocks the current task until this condition variable receives a notification.
|
||||||
|
///
|
||||||
|
/// Unlike the std equivalent, this does not check that a single mutex is used at runtime.
|
||||||
|
/// However, as a best practice avoid using with multiple mutexes.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # async_std::task::block_on(async {
|
||||||
|
/// use std::sync::Arc;
|
||||||
|
///
|
||||||
|
/// use async_std::sync::{Mutex, Condvar};
|
||||||
|
/// use async_std::task;
|
||||||
|
///
|
||||||
|
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||||
|
/// let pair2 = pair.clone();
|
||||||
|
///
|
||||||
|
/// task::spawn(async move {
|
||||||
|
/// let (lock, cvar) = &*pair2;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// *started = true;
|
||||||
|
/// // We notify the condvar that the value has changed.
|
||||||
|
/// cvar.notify_one();
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// // Wait for the thread to start up.
|
||||||
|
/// let (lock, cvar) = &*pair;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// while !*started {
|
||||||
|
/// started = cvar.wait(started).await;
|
||||||
|
/// }
|
||||||
|
/// # })
|
||||||
|
/// ```
|
||||||
|
#[allow(clippy::needless_lifetimes)]
|
||||||
|
pub async fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
|
||||||
|
let mutex = guard_lock(&guard);
|
||||||
|
|
||||||
|
self.await_notify(guard).await;
|
||||||
|
|
||||||
|
mutex.lock().await
|
||||||
|
}
|
||||||
|
|
||||||
|
fn await_notify<'a, T>(&self, guard: MutexGuard<'a, T>) -> AwaitNotify<'_, 'a, T> {
|
||||||
|
AwaitNotify {
|
||||||
|
cond: self,
|
||||||
|
guard: Some(guard),
|
||||||
|
key: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Blocks the current taks until this condition variable receives a notification and the
|
||||||
|
/// required condition is met. Spurious wakeups are ignored and this function will only
|
||||||
|
/// return once the condition has been met.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use std::sync::Arc;
|
||||||
|
///
|
||||||
|
/// use async_std::sync::{Mutex, Condvar};
|
||||||
|
/// use async_std::task;
|
||||||
|
///
|
||||||
|
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||||
|
/// let pair2 = pair.clone();
|
||||||
|
///
|
||||||
|
/// task::spawn(async move {
|
||||||
|
/// let (lock, cvar) = &*pair2;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// *started = true;
|
||||||
|
/// // We notify the condvar that the value has changed.
|
||||||
|
/// cvar.notify_one();
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// // Wait for the thread to start up.
|
||||||
|
/// let (lock, cvar) = &*pair;
|
||||||
|
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
|
||||||
|
/// let _guard = cvar.wait_until(lock.lock().await, |started| { *started }).await;
|
||||||
|
/// #
|
||||||
|
/// # })
|
||||||
|
/// ```
|
||||||
|
#[allow(clippy::needless_lifetimes)]
|
||||||
|
pub async fn wait_until<'a, T, F>(
|
||||||
|
&self,
|
||||||
|
mut guard: MutexGuard<'a, T>,
|
||||||
|
mut condition: F,
|
||||||
|
) -> MutexGuard<'a, T>
|
||||||
|
where
|
||||||
|
F: FnMut(&mut T) -> bool,
|
||||||
|
{
|
||||||
|
while !condition(&mut *guard) {
|
||||||
|
guard = self.wait(guard).await;
|
||||||
|
}
|
||||||
|
guard
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Waits on this condition variable for a notification, timing out after a specified duration.
|
||||||
|
///
|
||||||
|
/// For these reasons `Condvar::wait_timeout_until` is recommended in most cases.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use std::sync::Arc;
|
||||||
|
/// use std::time::Duration;
|
||||||
|
///
|
||||||
|
/// use async_std::sync::{Mutex, Condvar};
|
||||||
|
/// use async_std::task;
|
||||||
|
///
|
||||||
|
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||||
|
/// let pair2 = pair.clone();
|
||||||
|
///
|
||||||
|
/// task::spawn(async move {
|
||||||
|
/// let (lock, cvar) = &*pair2;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// *started = true;
|
||||||
|
/// // We notify the condvar that the value has changed.
|
||||||
|
/// cvar.notify_one();
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// // wait for the thread to start up
|
||||||
|
/// let (lock, cvar) = &*pair;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// loop {
|
||||||
|
/// let result = cvar.wait_timeout(started, Duration::from_millis(10)).await;
|
||||||
|
/// started = result.0;
|
||||||
|
/// if *started == true {
|
||||||
|
/// // We received the notification and the value has been updated, we can leave.
|
||||||
|
/// break
|
||||||
|
/// }
|
||||||
|
/// }
|
||||||
|
/// #
|
||||||
|
/// # })
|
||||||
|
/// ```
|
||||||
|
#[allow(clippy::needless_lifetimes)]
|
||||||
|
pub async fn wait_timeout<'a, T>(
|
||||||
|
&self,
|
||||||
|
guard: MutexGuard<'a, T>,
|
||||||
|
dur: Duration,
|
||||||
|
) -> (MutexGuard<'a, T>, WaitTimeoutResult) {
|
||||||
|
let mutex = guard_lock(&guard);
|
||||||
|
match timeout(dur, self.wait(guard)).await {
|
||||||
|
Ok(guard) => (guard, WaitTimeoutResult(false)),
|
||||||
|
Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Waits on this condition variable for a notification, timing out after a specified duration.
|
||||||
|
/// Spurious wakes will not cause this function to return.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
/// ```
|
||||||
|
/// # async_std::task::block_on(async {
|
||||||
|
/// use std::sync::Arc;
|
||||||
|
/// use std::time::Duration;
|
||||||
|
///
|
||||||
|
/// use async_std::sync::{Mutex, Condvar};
|
||||||
|
/// use async_std::task;
|
||||||
|
///
|
||||||
|
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||||
|
/// let pair2 = pair.clone();
|
||||||
|
///
|
||||||
|
/// task::spawn(async move {
|
||||||
|
/// let (lock, cvar) = &*pair2;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// *started = true;
|
||||||
|
/// // We notify the condvar that the value has changed.
|
||||||
|
/// cvar.notify_one();
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// // wait for the thread to start up
|
||||||
|
/// let (lock, cvar) = &*pair;
|
||||||
|
/// let result = cvar.wait_timeout_until(
|
||||||
|
/// lock.lock().await,
|
||||||
|
/// Duration::from_millis(100),
|
||||||
|
/// |&mut started| started,
|
||||||
|
/// ).await;
|
||||||
|
/// if result.1.timed_out() {
|
||||||
|
/// // timed-out without the condition ever evaluating to true.
|
||||||
|
/// }
|
||||||
|
/// // access the locked mutex via result.0
|
||||||
|
/// # });
|
||||||
|
/// ```
|
||||||
|
#[allow(clippy::needless_lifetimes)]
|
||||||
|
pub async fn wait_timeout_until<'a, T, F>(
|
||||||
|
&self,
|
||||||
|
guard: MutexGuard<'a, T>,
|
||||||
|
dur: Duration,
|
||||||
|
condition: F,
|
||||||
|
) -> (MutexGuard<'a, T>, WaitTimeoutResult)
|
||||||
|
where
|
||||||
|
F: FnMut(&mut T) -> bool,
|
||||||
|
{
|
||||||
|
let mutex = guard_lock(&guard);
|
||||||
|
match timeout(dur, self.wait_until(guard, condition)).await {
|
||||||
|
Ok(guard) => (guard, WaitTimeoutResult(false)),
|
||||||
|
Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wakes up one blocked task on this condvar.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # fn main() { async_std::task::block_on(async {
|
||||||
|
/// use std::sync::Arc;
|
||||||
|
///
|
||||||
|
/// use async_std::sync::{Mutex, Condvar};
|
||||||
|
/// use async_std::task;
|
||||||
|
///
|
||||||
|
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||||
|
/// let pair2 = pair.clone();
|
||||||
|
///
|
||||||
|
/// task::spawn(async move {
|
||||||
|
/// let (lock, cvar) = &*pair2;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// *started = true;
|
||||||
|
/// // We notify the condvar that the value has changed.
|
||||||
|
/// cvar.notify_one();
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// // Wait for the thread to start up.
|
||||||
|
/// let (lock, cvar) = &*pair;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// while !*started {
|
||||||
|
/// started = cvar.wait(started).await;
|
||||||
|
/// }
|
||||||
|
/// # }) }
|
||||||
|
/// ```
|
||||||
|
pub fn notify_one(&self) {
|
||||||
|
self.wakers.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wakes up all blocked tasks on this condvar.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
/// ```
|
||||||
|
/// # fn main() { async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use std::sync::Arc;
|
||||||
|
///
|
||||||
|
/// use async_std::sync::{Mutex, Condvar};
|
||||||
|
/// use async_std::task;
|
||||||
|
///
|
||||||
|
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||||
|
/// let pair2 = pair.clone();
|
||||||
|
///
|
||||||
|
/// task::spawn(async move {
|
||||||
|
/// let (lock, cvar) = &*pair2;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// *started = true;
|
||||||
|
/// // We notify the condvar that the value has changed.
|
||||||
|
/// cvar.notify_all();
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// // Wait for the thread to start up.
|
||||||
|
/// let (lock, cvar) = &*pair;
|
||||||
|
/// let mut started = lock.lock().await;
|
||||||
|
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
|
||||||
|
/// while !*started {
|
||||||
|
/// started = cvar.wait(started).await;
|
||||||
|
/// }
|
||||||
|
/// #
|
||||||
|
/// # }) }
|
||||||
|
/// ```
|
||||||
|
pub fn notify_all(&self) {
|
||||||
|
self.wakers.notify_all();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for Condvar {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.pad("Condvar { .. }")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A future that waits for another task to notify the condition variable.
|
||||||
|
///
|
||||||
|
/// This is an internal future that `wait` and `wait_until` await on.
|
||||||
|
struct AwaitNotify<'a, 'b, T> {
|
||||||
|
/// The condition variable that we are waiting on
|
||||||
|
cond: &'a Condvar,
|
||||||
|
/// The lock used with `cond`.
|
||||||
|
/// This will be released the first time the future is polled,
|
||||||
|
/// after registering the context to be notified.
|
||||||
|
guard: Option<MutexGuard<'b, T>>,
|
||||||
|
/// A key into the conditions variable's `WakerSet`.
|
||||||
|
/// This is set to the index of the `Waker` for the context each time
|
||||||
|
/// the future is polled and not completed.
|
||||||
|
key: Option<usize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
match self.guard.take() {
|
||||||
|
Some(_) => {
|
||||||
|
self.key = Some(self.cond.wakers.insert(cx));
|
||||||
|
// the guard is dropped when we return, which frees the lock
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
if let Some(key) = self.key {
|
||||||
|
if self.cond.wakers.remove_if_notified(key, cx) {
|
||||||
|
self.key = None;
|
||||||
|
Poll::Ready(())
|
||||||
|
} else {
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// This should only happen if it is polled twice after receiving a notification
|
||||||
|
Poll::Ready(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(key) = self.key {
|
||||||
|
self.cond.wakers.cancel(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,13 +0,0 @@
|
|||||||
//! Task executor.
|
|
||||||
//!
|
|
||||||
//! API bindings between `crate::task` and this module are very simple:
|
|
||||||
//!
|
|
||||||
//! * The only export is the `schedule` function.
|
|
||||||
//! * The only import is the `crate::task::Runnable` type.
|
|
||||||
|
|
||||||
pub(crate) use pool::schedule;
|
|
||||||
|
|
||||||
use sleepers::Sleepers;
|
|
||||||
|
|
||||||
mod pool;
|
|
||||||
mod sleepers;
|
|
@ -1,179 +0,0 @@
|
|||||||
use std::cell::Cell;
|
|
||||||
use std::iter;
|
|
||||||
use std::thread;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use crossbeam_deque::{Injector, Stealer, Worker};
|
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
use once_cell::unsync::OnceCell;
|
|
||||||
|
|
||||||
use crate::task::executor::Sleepers;
|
|
||||||
use crate::task::Runnable;
|
|
||||||
use crate::utils::{abort_on_panic, random};
|
|
||||||
|
|
||||||
/// The state of an executor.
|
|
||||||
struct Pool {
|
|
||||||
/// The global queue of tasks.
|
|
||||||
injector: Injector<Runnable>,
|
|
||||||
|
|
||||||
/// Handles to local queues for stealing work from worker threads.
|
|
||||||
stealers: Vec<Stealer<Runnable>>,
|
|
||||||
|
|
||||||
/// Used for putting idle workers to sleep and notifying them when new tasks come in.
|
|
||||||
sleepers: Sleepers,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Global executor that runs spawned tasks.
|
|
||||||
static POOL: Lazy<Pool> = Lazy::new(|| {
|
|
||||||
let num_threads = num_cpus::get().max(1);
|
|
||||||
let mut stealers = Vec::new();
|
|
||||||
|
|
||||||
// Spawn worker threads.
|
|
||||||
for _ in 0..num_threads {
|
|
||||||
let worker = Worker::new_fifo();
|
|
||||||
stealers.push(worker.stealer());
|
|
||||||
|
|
||||||
let proc = Processor {
|
|
||||||
worker,
|
|
||||||
slot: Cell::new(None),
|
|
||||||
slot_runs: Cell::new(0),
|
|
||||||
};
|
|
||||||
|
|
||||||
thread::Builder::new()
|
|
||||||
.name("async-std/executor".to_string())
|
|
||||||
.spawn(|| {
|
|
||||||
let _ = PROCESSOR.with(|p| p.set(proc));
|
|
||||||
abort_on_panic(main_loop);
|
|
||||||
})
|
|
||||||
.expect("cannot start a thread driving tasks");
|
|
||||||
}
|
|
||||||
|
|
||||||
Pool {
|
|
||||||
injector: Injector::new(),
|
|
||||||
stealers,
|
|
||||||
sleepers: Sleepers::new(),
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
/// The state of a worker thread.
|
|
||||||
struct Processor {
|
|
||||||
/// The local task queue.
|
|
||||||
worker: Worker<Runnable>,
|
|
||||||
|
|
||||||
/// Contains the next task to run as an optimization that skips queues.
|
|
||||||
slot: Cell<Option<Runnable>>,
|
|
||||||
|
|
||||||
/// How many times in a row tasks have been taked from the slot rather than the queue.
|
|
||||||
slot_runs: Cell<u32>,
|
|
||||||
}
|
|
||||||
|
|
||||||
thread_local! {
|
|
||||||
/// Worker thread state.
|
|
||||||
static PROCESSOR: OnceCell<Processor> = OnceCell::new();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Schedules a new runnable task for execution.
|
|
||||||
pub(crate) fn schedule(task: Runnable) {
|
|
||||||
PROCESSOR.with(|proc| {
|
|
||||||
// If the current thread is a worker thread, store it into its task slot or push it into
|
|
||||||
// its local task queue. Otherwise, push it into the global task queue.
|
|
||||||
match proc.get() {
|
|
||||||
Some(proc) => {
|
|
||||||
// Replace the task in the slot.
|
|
||||||
if let Some(task) = proc.slot.replace(Some(task)) {
|
|
||||||
// If the slot already contained a task, push it into the local task queue.
|
|
||||||
proc.worker.push(task);
|
|
||||||
POOL.sleepers.notify_one();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
POOL.injector.push(task);
|
|
||||||
POOL.sleepers.notify_one();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Main loop running a worker thread.
|
|
||||||
fn main_loop() {
|
|
||||||
/// Number of yields when no runnable task is found.
|
|
||||||
const YIELDS: u32 = 3;
|
|
||||||
/// Number of short sleeps when no runnable task in found.
|
|
||||||
const SLEEPS: u32 = 1;
|
|
||||||
|
|
||||||
// The number of times the thread didn't find work in a row.
|
|
||||||
let mut fails = 0;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// Try to find a runnable task.
|
|
||||||
match find_runnable() {
|
|
||||||
Some(task) => {
|
|
||||||
fails = 0;
|
|
||||||
|
|
||||||
// Run the found task.
|
|
||||||
task.run();
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
fails += 1;
|
|
||||||
|
|
||||||
// Yield the current thread or put it to sleep.
|
|
||||||
if fails <= YIELDS {
|
|
||||||
thread::yield_now();
|
|
||||||
} else if fails <= YIELDS + SLEEPS {
|
|
||||||
thread::sleep(Duration::from_micros(10));
|
|
||||||
} else {
|
|
||||||
POOL.sleepers.wait();
|
|
||||||
fails = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Find the next runnable task.
|
|
||||||
fn find_runnable() -> Option<Runnable> {
|
|
||||||
/// Maximum number of times the slot can be used in a row.
|
|
||||||
const SLOT_LIMIT: u32 = 16;
|
|
||||||
|
|
||||||
PROCESSOR.with(|proc| {
|
|
||||||
let proc = proc.get().unwrap();
|
|
||||||
|
|
||||||
// Try taking a task from the slot.
|
|
||||||
let runs = proc.slot_runs.get();
|
|
||||||
if runs < SLOT_LIMIT {
|
|
||||||
if let Some(task) = proc.slot.take() {
|
|
||||||
proc.slot_runs.set(runs + 1);
|
|
||||||
return Some(task);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
proc.slot_runs.set(0);
|
|
||||||
|
|
||||||
// Pop a task from the local queue, if not empty.
|
|
||||||
proc.worker.pop().or_else(|| {
|
|
||||||
// Otherwise, we need to look for a task elsewhere.
|
|
||||||
iter::repeat_with(|| {
|
|
||||||
// Try stealing a batch of tasks from the global queue.
|
|
||||||
POOL.injector
|
|
||||||
.steal_batch_and_pop(&proc.worker)
|
|
||||||
// Or try stealing a batch of tasks from one of the other threads.
|
|
||||||
.or_else(|| {
|
|
||||||
// First, pick a random starting point in the list of local queues.
|
|
||||||
let len = POOL.stealers.len();
|
|
||||||
let start = random(len as u32) as usize;
|
|
||||||
|
|
||||||
// Try stealing a batch of tasks from each local queue starting from the
|
|
||||||
// chosen point.
|
|
||||||
let (l, r) = POOL.stealers.split_at(start);
|
|
||||||
let stealers = r.iter().chain(l.iter());
|
|
||||||
stealers
|
|
||||||
.map(|s| s.steal_batch_and_pop(&proc.worker))
|
|
||||||
.collect()
|
|
||||||
})
|
|
||||||
})
|
|
||||||
// Loop while no task was stolen and any steal operation needs to be retried.
|
|
||||||
.find(|s| !s.is_retry())
|
|
||||||
// Extract the stolen task, if there is one.
|
|
||||||
.and_then(|s| s.success())
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
@ -1,52 +0,0 @@
|
|||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
use std::sync::{Condvar, Mutex};
|
|
||||||
|
|
||||||
/// The place where worker threads go to sleep.
|
|
||||||
///
|
|
||||||
/// Similar to how thread parking works, if a notification comes up while no threads are sleeping,
|
|
||||||
/// the next thread that attempts to go to sleep will pick up the notification immediately.
|
|
||||||
pub struct Sleepers {
|
|
||||||
/// How many threads are currently a sleep.
|
|
||||||
sleep: Mutex<usize>,
|
|
||||||
|
|
||||||
/// A condvar for notifying sleeping threads.
|
|
||||||
wake: Condvar,
|
|
||||||
|
|
||||||
/// Set to `true` if a notification came up while nobody was sleeping.
|
|
||||||
notified: AtomicBool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Sleepers {
|
|
||||||
/// Creates a new `Sleepers`.
|
|
||||||
pub fn new() -> Sleepers {
|
|
||||||
Sleepers {
|
|
||||||
sleep: Mutex::new(0),
|
|
||||||
wake: Condvar::new(),
|
|
||||||
notified: AtomicBool::new(false),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Puts the current thread to sleep.
|
|
||||||
pub fn wait(&self) {
|
|
||||||
let mut sleep = self.sleep.lock().unwrap();
|
|
||||||
|
|
||||||
if !self.notified.swap(false, Ordering::SeqCst) {
|
|
||||||
*sleep += 1;
|
|
||||||
let _ = self.wake.wait(sleep).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Notifies one thread.
|
|
||||||
pub fn notify_one(&self) {
|
|
||||||
if !self.notified.load(Ordering::SeqCst) {
|
|
||||||
let mut sleep = self.sleep.lock().unwrap();
|
|
||||||
|
|
||||||
if *sleep > 0 {
|
|
||||||
*sleep -= 1;
|
|
||||||
self.wake.notify_one();
|
|
||||||
} else {
|
|
||||||
self.notified.store(true, Ordering::SeqCst);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,28 @@
|
|||||||
|
use std::future::Future;
|
||||||
|
|
||||||
|
use crate::task::{Builder, JoinHandle};
|
||||||
|
|
||||||
|
/// Spawns a task onto the thread-local executor.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// # async_std::task::block_on(async {
|
||||||
|
/// #
|
||||||
|
/// use async_std::task;
|
||||||
|
///
|
||||||
|
/// let handle = task::spawn_local(async {
|
||||||
|
/// 1 + 2
|
||||||
|
/// });
|
||||||
|
///
|
||||||
|
/// assert_eq!(handle.await, 3);
|
||||||
|
/// #
|
||||||
|
/// # })
|
||||||
|
/// ```
|
||||||
|
pub fn spawn_local<F, T>(future: F) -> JoinHandle<T>
|
||||||
|
where
|
||||||
|
F: Future<Output = T> + 'static,
|
||||||
|
T: 'static,
|
||||||
|
{
|
||||||
|
Builder::new().local(future).expect("cannot spawn task")
|
||||||
|
}
|
@ -0,0 +1,84 @@
|
|||||||
|
use std::cell::Cell;
|
||||||
|
use std::ptr;
|
||||||
|
|
||||||
|
use crate::task::{LocalsMap, Task, TaskId};
|
||||||
|
use crate::utils::abort_on_panic;
|
||||||
|
|
||||||
|
thread_local! {
|
||||||
|
/// A pointer to the currently running task.
|
||||||
|
static CURRENT: Cell<*const TaskLocalsWrapper> = Cell::new(ptr::null_mut());
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A wrapper to store task local data.
|
||||||
|
pub(crate) struct TaskLocalsWrapper {
|
||||||
|
/// The actual task details.
|
||||||
|
task: Task,
|
||||||
|
|
||||||
|
/// The map holding task-local values.
|
||||||
|
locals: LocalsMap,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TaskLocalsWrapper {
|
||||||
|
/// Creates a new task handle.
|
||||||
|
///
|
||||||
|
/// If the task is unnamed, the inner representation of the task will be lazily allocated on
|
||||||
|
/// demand.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn new(task: Task) -> Self {
|
||||||
|
Self {
|
||||||
|
task,
|
||||||
|
locals: LocalsMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the task's unique identifier.
|
||||||
|
#[inline]
|
||||||
|
pub fn id(&self) -> TaskId {
|
||||||
|
self.task.id()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a reference to the inner `Task`.
|
||||||
|
pub(crate) fn task(&self) -> &Task {
|
||||||
|
&self.task
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the map holding task-local values.
|
||||||
|
pub(crate) fn locals(&self) -> &LocalsMap {
|
||||||
|
&self.locals
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set a reference to the current task.
|
||||||
|
pub(crate) unsafe fn set_current<F, R>(task: *const TaskLocalsWrapper, f: F) -> R
|
||||||
|
where
|
||||||
|
F: FnOnce() -> R,
|
||||||
|
{
|
||||||
|
CURRENT.with(|current| {
|
||||||
|
let old_task = current.replace(task);
|
||||||
|
defer! {
|
||||||
|
current.set(old_task);
|
||||||
|
}
|
||||||
|
f()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets a reference to the current task.
|
||||||
|
pub(crate) fn get_current<F, R>(f: F) -> Option<R>
|
||||||
|
where
|
||||||
|
F: FnOnce(&TaskLocalsWrapper) -> R,
|
||||||
|
{
|
||||||
|
let res = CURRENT.try_with(|current| unsafe { current.get().as_ref().map(f) });
|
||||||
|
match res {
|
||||||
|
Ok(Some(val)) => Some(val),
|
||||||
|
Ok(None) | Err(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for TaskLocalsWrapper {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
// Abort the process if dropping task-locals panics.
|
||||||
|
abort_on_panic(|| {
|
||||||
|
unsafe { self.locals.clear() };
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,103 @@
|
|||||||
|
#![cfg(feature = "unstable")]
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_std::sync::{Condvar, Mutex};
|
||||||
|
use async_std::task::{self, JoinHandle};
|
||||||
|
|
||||||
|
#[cfg(not(target_os = "unknown"))]
|
||||||
|
use async_std::task::spawn;
|
||||||
|
#[cfg(target_os = "unknown")]
|
||||||
|
use async_std::task::spawn_local as spawn;
|
||||||
|
|
||||||
|
#[cfg(target_arch = "wasm32")]
|
||||||
|
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||||
|
fn wait_timeout_with_lock() {
|
||||||
|
task::block_on(async {
|
||||||
|
let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||||
|
let pair2 = pair.clone();
|
||||||
|
|
||||||
|
spawn(async move {
|
||||||
|
let (m, c) = &*pair2;
|
||||||
|
let _g = m.lock().await;
|
||||||
|
task::sleep(Duration::from_millis(200)).await;
|
||||||
|
c.notify_one();
|
||||||
|
});
|
||||||
|
|
||||||
|
let (m, c) = &*pair;
|
||||||
|
let (_, wait_result) = c
|
||||||
|
.wait_timeout(m.lock().await, Duration::from_millis(50))
|
||||||
|
.await;
|
||||||
|
assert!(wait_result.timed_out());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||||
|
fn wait_timeout_without_lock() {
|
||||||
|
task::block_on(async {
|
||||||
|
let m = Mutex::new(false);
|
||||||
|
let c = Condvar::new();
|
||||||
|
|
||||||
|
let (_, wait_result) = c
|
||||||
|
.wait_timeout(m.lock().await, Duration::from_millis(10))
|
||||||
|
.await;
|
||||||
|
assert!(wait_result.timed_out());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||||
|
fn wait_timeout_until_timed_out() {
|
||||||
|
task::block_on(async {
|
||||||
|
let m = Mutex::new(false);
|
||||||
|
let c = Condvar::new();
|
||||||
|
|
||||||
|
let (_, wait_result) = c
|
||||||
|
.wait_timeout_until(m.lock().await, Duration::from_millis(100), |&mut started| {
|
||||||
|
started
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
assert!(wait_result.timed_out());
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||||
|
fn notify_all() {
|
||||||
|
task::block_on(async {
|
||||||
|
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
|
||||||
|
let pair = Arc::new((Mutex::new(0u32), Condvar::new()));
|
||||||
|
|
||||||
|
for _ in 0..10 {
|
||||||
|
let pair = pair.clone();
|
||||||
|
tasks.push(spawn(async move {
|
||||||
|
let (m, c) = &*pair;
|
||||||
|
let mut count = m.lock().await;
|
||||||
|
while *count == 0 {
|
||||||
|
count = c.wait(count).await;
|
||||||
|
}
|
||||||
|
*count += 1;
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Give some time for tasks to start up
|
||||||
|
task::sleep(Duration::from_millis(50)).await;
|
||||||
|
|
||||||
|
let (m, c) = &*pair;
|
||||||
|
{
|
||||||
|
let mut count = m.lock().await;
|
||||||
|
*count += 1;
|
||||||
|
c.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
|
for t in tasks {
|
||||||
|
t.await;
|
||||||
|
}
|
||||||
|
let count = m.lock().await;
|
||||||
|
assert_eq!(11, *count);
|
||||||
|
})
|
||||||
|
}
|
@ -0,0 +1,26 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use async_std::future::timeout;
|
||||||
|
use async_std::task;
|
||||||
|
|
||||||
|
#[cfg(target_arch = "wasm32")]
|
||||||
|
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||||
|
fn timeout_future_many() {
|
||||||
|
task::block_on(async {
|
||||||
|
let futures = (0..100)
|
||||||
|
.map(|i| {
|
||||||
|
timeout(Duration::from_millis(i * 20), async move {
|
||||||
|
task::sleep(Duration::from_millis(i)).await;
|
||||||
|
Ok::<(), async_std::future::TimeoutError>(())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
for future in futures {
|
||||||
|
future.await.unwrap().unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
@ -0,0 +1,10 @@
|
|||||||
|
#!/bin/sh
|
||||||
|
|
||||||
|
wasm-pack test --chrome --headless -- --features unstable --test buf_writer
|
||||||
|
wasm-pack test --chrome --headless -- --features unstable --test channel
|
||||||
|
wasm-pack test --chrome --headless -- --features unstable --test condvar
|
||||||
|
wasm-pack test --chrome --headless -- --features unstable --test mutex
|
||||||
|
wasm-pack test --chrome --headless -- --features unstable --test rwlock
|
||||||
|
wasm-pack test --chrome --headless -- --features unstable --test stream
|
||||||
|
wasm-pack test --chrome --headless -- --features unstable --test task_local
|
||||||
|
wasm-pack test --chrome --headless -- --features unstable --test timeout
|
Loading…
Reference in New Issue