forked from mirror/async-std
Compare commits
160 Commits
split-by-p
...
master
Author | SHA1 | Date |
---|---|---|
Yoshua Wuyts | 8aa5921dfa | 5 years ago |
Jonathas-Conceicao | cd7fb9dec2 | 5 years ago |
Yoshua Wuyts | c82b1efb69 | 5 years ago |
Friedel Ziegelmayer | 8c4b425136 | 5 years ago |
Thibault Martinez | 2ab08ebbbc | 5 years ago |
Friedel Ziegelmayer | 0e7650a421 | 5 years ago |
dignifiedquire | 8f17e9275b | 5 years ago |
dignifiedquire | 18dffe8b43 | 5 years ago |
Friedel Ziegelmayer | 43de93312c | 5 years ago |
Gary Guo | 2e7e804736 | 5 years ago |
Friedel Ziegelmayer | 17ab958ac2 | 5 years ago |
Friedel Ziegelmayer | caa76af745 | 5 years ago |
dignifiedquire | e495ba46b3 | 5 years ago |
Afirez | 0c2ce52ac4 | 5 years ago |
Friedel Ziegelmayer | 5f418f07ac | 5 years ago |
dignifiedquire | 06a2fb8c4f | 5 years ago |
dignifiedquire | 1c1c168e1b | 5 years ago |
Friedel Ziegelmayer | 5d55fa7a47 | 5 years ago |
dignifiedquire | 093d640ad7 | 5 years ago |
Oleg Nosov | 42425f6c1a | 5 years ago |
Yoshua Wuyts | a602a91d83 | 5 years ago |
Afirez | 9fa3ce3fd6 | 5 years ago |
Oleg Nosov | df22d87d09 | 5 years ago |
Oleg Nosov | 924e5a3f41 | 5 years ago |
Oleg Nosov | 2323ac9a8e | 5 years ago |
Friedel Ziegelmayer | 5c2a3de9e7 | 5 years ago |
dignifiedquire | e9c6ea873c | 5 years ago |
Friedel Ziegelmayer | 0d98aac8f7 | 5 years ago |
Thibault Martinez | 4555f193a5 | 5 years ago |
Yoshua Wuyts | 61fc2bae72 | 5 years ago |
dignifiedquire | 5a1a681d68 | 5 years ago |
Friedel Ziegelmayer | e12cf80ab0 | 5 years ago |
Friedel Ziegelmayer | 631105b650 | 5 years ago |
Friedel Ziegelmayer | 0897b9184a | 5 years ago |
Friedel Ziegelmayer | 6ca7b0977c | 5 years ago |
Konrad Borowski | 721760a7a6 | 5 years ago |
dignifiedquire | 8389041414 | 5 years ago |
dignifiedquire | 8943ba82dd | 5 years ago |
dignifiedquire | 52c72426c1 | 5 years ago |
Yoshua Wuyts | 0df3c02b81 | 5 years ago |
Yoshua Wuyts | 166c469d1c | 5 years ago |
Friedel Ziegelmayer | 0ec027dbff | 5 years ago |
jerry73204 | d60e7cc27d | 5 years ago |
Friedel Ziegelmayer | 6d2a43e336 | 5 years ago |
dignifiedquire | e1c8638173 | 5 years ago |
dignifiedquire | 06eea4225b | 5 years ago |
Friedel Ziegelmayer | 252140839b | 5 years ago |
Heinz N. Gies | 69806403c6 | 5 years ago |
Friedel Ziegelmayer | 955befd746 | 5 years ago |
nasa | 70dac51938 | 5 years ago |
k-nasa | d30603affe | 5 years ago |
dignifiedquire | c9ecb5bbbd | 5 years ago |
Jacob Rothstein | 9e6a76af04 | 5 years ago |
Friedel Ziegelmayer | 2b6c7fedff | 5 years ago |
Friedel Ziegelmayer | b3277954c7 | 5 years ago |
Azriel Hoh | baead51a28 | 5 years ago |
Azriel Hoh | e9621af345 | 5 years ago |
Azriel Hoh | d3e59370e7 | 5 years ago |
Jacob Rothstein | cd5e17fe87 | 5 years ago |
Friedel Ziegelmayer | e20b0f0d75 | 5 years ago |
dignifiedquire | 19170aead4 | 5 years ago |
dignifiedquire | 2762ec5800 | 5 years ago |
dignifiedquire | 247c94ca06 | 5 years ago |
Friedel Ziegelmayer | e404dcdd03 | 5 years ago |
dignifiedquire | bd6a7e200b | 5 years ago |
Friedel Ziegelmayer | e4c4c93d29 | 5 years ago |
Thayne McCombs | 6f6fced103 | 5 years ago |
Friedel Ziegelmayer | 10f7abb3b6 | 5 years ago |
dignifiedquire | 27c605b4c9 | 5 years ago |
dignifiedquire | faea222b9c | 5 years ago |
dignifiedquire | 1214bc2dee | 5 years ago |
dignifiedquire | 26f62aafd9 | 5 years ago |
dignifiedquire | e0928463b1 | 5 years ago |
dignifiedquire | 92532612b7 | 5 years ago |
dignifiedquire | 1a6d4f6a2f | 5 years ago |
dignifiedquire | 7a9afbd81c | 5 years ago |
dignifiedquire | 280b1a4344 | 5 years ago |
dignifiedquire | 48dd683535 | 5 years ago |
dignifiedquire | 804a52b7fd | 5 years ago |
dignifiedquire | e4df1405c1 | 5 years ago |
dignifiedquire | 2cd2ba3530 | 5 years ago |
dignifiedquire | 3161a4e449 | 5 years ago |
dignifiedquire | 228cc59b3b | 5 years ago |
dignifiedquire | 0a7a52aed5 | 5 years ago |
dignifiedquire | 10c8b9a6d8 | 5 years ago |
dignifiedquire | fd6ae40817 | 5 years ago |
dignifiedquire | ab9d6554aa | 5 years ago |
dignifiedquire | f5fa0d7e4e | 5 years ago |
dignifiedquire | b96afc41dc | 5 years ago |
dignifiedquire | 75ab7219df | 5 years ago |
dignifiedquire | e082634b5e | 5 years ago |
dignifiedquire | fc9ee0dfdd | 5 years ago |
dignifiedquire | 1308fbdf55 | 5 years ago |
dignifiedquire | 690ab16587 | 5 years ago |
Florian Gilcher | 370642ef3e | 5 years ago |
Sunli | 100c3423c1 | 5 years ago |
nasa | 7999e6bf4b | 5 years ago |
Fangdun Cai | e707ea96e0 | 5 years ago |
Friedel Ziegelmayer | b446cd0230 | 5 years ago |
Thayne McCombs | db438abb8f | 5 years ago |
dignifiedquire | a4e07e345c | 5 years ago |
Yoshua Wuyts | aebba2bd95 | 5 years ago |
dignifiedquire | 0c9a66c1f6 | 5 years ago |
Friedel Ziegelmayer | fc4e472599 | 5 years ago |
nasa | 6674dc0edf | 5 years ago |
k-nasa | 088aa5662c | 5 years ago |
Devashish Dixit | 68fa054517 | 5 years ago |
k-nasa | b88138b5d7 | 5 years ago |
k-nasa | 11ee2a8985 | 5 years ago |
k-nasa | 322911142c | 5 years ago |
k-nasa | cfaec2aa95 | 5 years ago |
sunli | 57c648cf01 | 5 years ago |
k-nasa | 6d3ca5a06f | 5 years ago |
k-nasa | f960776846 | 5 years ago |
k-nasa | 5c6741724f | 5 years ago |
k-nasa | 24c5dbf949 | 5 years ago |
nasa | 2dbebe54ed | 5 years ago |
k-nasa | d7ee29a03f | 5 years ago |
k-nasa | 2b44c1be2e | 5 years ago |
k-nasa | b1ec1ea930 | 5 years ago |
k-nasa | 2ab075d027 | 5 years ago |
k-nasa | c0f18600cf | 5 years ago |
k-nasa | 6c8237276b | 5 years ago |
k-nasa | 98cbf7f8eb | 5 years ago |
k-nasa | 84e5c5f351 | 5 years ago |
Yoshua Wuyts | 3ff9e98f20 | 5 years ago |
Yoshua Wuyts | b7c7efc797 | 5 years ago |
Yoshua Wuyts | 19fd7a4084 | 5 years ago |
Yoshua Wuyts | 7885c245c5 | 5 years ago |
Yoshua Wuyts | 7b7b959a6e | 5 years ago |
Yoshua Wuyts | 32dce319d3 | 5 years ago |
Yoshua Wuyts | 49dd02b4de | 5 years ago |
Yoshua Wuyts | bb11c676a1 | 5 years ago |
Yoshua Wuyts | e026b7579a | 5 years ago |
Yoshua Wuyts | 51dd7ceb72 | 5 years ago |
k-nasa | 8931d1464e | 5 years ago |
nasa | cc19592f80 | 5 years ago |
nasa | f69887a50d | 5 years ago |
k-nasa | 0b0531057d | 5 years ago |
Yoshua Wuyts | 61f9483cc5 | 5 years ago |
k-nasa | f33d7f40ab | 5 years ago |
k-nasa | e3bf89fc05 | 5 years ago |
k-nasa | ec4b09ecd0 | 5 years ago |
k-nasa | b95bd6c1fe | 5 years ago |
k-nasa | 1e18839f1f | 5 years ago |
k-nasa | f31878655e | 5 years ago |
k-nasa | 9a62df143f | 5 years ago |
k-nasa | 75223905bd | 5 years ago |
k-nasa | be60dd9fe7 | 5 years ago |
k-nasa | 23b7c174f3 | 5 years ago |
Katharina Fey | aae835cc14 | 5 years ago |
Oleg Nosov | 68063adddf | 5 years ago |
Oleg Nosov | d7cab38b67 | 5 years ago |
Oleg Nosov | 32068942a6 | 5 years ago |
Oleg Nosov | 85c32ef9d2 | 5 years ago |
Oleg Nosov | b68be72763 | 5 years ago |
Oleg Nosov | c80915e216 | 5 years ago |
Oleg Nosov | 303ac90b7c | 5 years ago |
Stjepan Glavina | ceba324bef | 5 years ago |
Stjepan Glavina | 36d24cd0e1 | 5 years ago |
@ -1,380 +0,0 @@
|
||||
use std::fmt;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use mio::{self, Evented};
|
||||
use once_cell::sync::Lazy;
|
||||
use slab::Slab;
|
||||
|
||||
use crate::io;
|
||||
use crate::task::{Context, Poll, Waker};
|
||||
use crate::utils::abort_on_panic;
|
||||
|
||||
/// Data associated with a registered I/O handle.
|
||||
#[derive(Debug)]
|
||||
struct Entry {
|
||||
/// A unique identifier.
|
||||
token: mio::Token,
|
||||
|
||||
/// Tasks that are blocked on reading from this I/O handle.
|
||||
readers: Mutex<Readers>,
|
||||
|
||||
/// Thasks that are blocked on writing to this I/O handle.
|
||||
writers: Mutex<Writers>,
|
||||
}
|
||||
|
||||
/// The set of `Waker`s interested in read readiness.
|
||||
#[derive(Debug)]
|
||||
struct Readers {
|
||||
/// Flag indicating read readiness.
|
||||
/// (cf. `Watcher::poll_read_ready`)
|
||||
ready: bool,
|
||||
/// The `Waker`s blocked on reading.
|
||||
wakers: Vec<Waker>
|
||||
}
|
||||
|
||||
/// The set of `Waker`s interested in write readiness.
|
||||
#[derive(Debug)]
|
||||
struct Writers {
|
||||
/// Flag indicating write readiness.
|
||||
/// (cf. `Watcher::poll_write_ready`)
|
||||
ready: bool,
|
||||
/// The `Waker`s blocked on writing.
|
||||
wakers: Vec<Waker>
|
||||
}
|
||||
|
||||
/// The state of a networking driver.
|
||||
struct Reactor {
|
||||
/// A mio instance that polls for new events.
|
||||
poller: mio::Poll,
|
||||
|
||||
/// A collection of registered I/O handles.
|
||||
entries: Mutex<Slab<Arc<Entry>>>,
|
||||
|
||||
/// Dummy I/O handle that is only used to wake up the polling thread.
|
||||
notify_reg: (mio::Registration, mio::SetReadiness),
|
||||
|
||||
/// An identifier for the notification handle.
|
||||
notify_token: mio::Token,
|
||||
}
|
||||
|
||||
impl Reactor {
|
||||
/// Creates a new reactor for polling I/O events.
|
||||
fn new() -> io::Result<Reactor> {
|
||||
let poller = mio::Poll::new()?;
|
||||
let notify_reg = mio::Registration::new2();
|
||||
|
||||
let mut reactor = Reactor {
|
||||
poller,
|
||||
entries: Mutex::new(Slab::new()),
|
||||
notify_reg,
|
||||
notify_token: mio::Token(0),
|
||||
};
|
||||
|
||||
// Register a dummy I/O handle for waking up the polling thread.
|
||||
let entry = reactor.register(&reactor.notify_reg.0)?;
|
||||
reactor.notify_token = entry.token;
|
||||
|
||||
Ok(reactor)
|
||||
}
|
||||
|
||||
/// Registers an I/O event source and returns its associated entry.
|
||||
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
|
||||
let mut entries = self.entries.lock().unwrap();
|
||||
|
||||
// Reserve a vacant spot in the slab and use its key as the token value.
|
||||
let vacant = entries.vacant_entry();
|
||||
let token = mio::Token(vacant.key());
|
||||
|
||||
// Allocate an entry and insert it into the slab.
|
||||
let entry = Arc::new(Entry {
|
||||
token,
|
||||
readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }),
|
||||
writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }),
|
||||
});
|
||||
vacant.insert(entry.clone());
|
||||
|
||||
// Register the I/O event source in the poller.
|
||||
let interest = mio::Ready::all();
|
||||
let opts = mio::PollOpt::edge();
|
||||
self.poller.register(source, token, interest, opts)?;
|
||||
|
||||
Ok(entry)
|
||||
}
|
||||
|
||||
/// Deregisters an I/O event source associated with an entry.
|
||||
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
|
||||
// Deregister the I/O object from the mio instance.
|
||||
self.poller.deregister(source)?;
|
||||
|
||||
// Remove the entry associated with the I/O object.
|
||||
self.entries.lock().unwrap().remove(entry.token.0);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// fn notify(&self) {
|
||||
// self.notify_reg
|
||||
// .1
|
||||
// .set_readiness(mio::Ready::readable())
|
||||
// .unwrap();
|
||||
// }
|
||||
}
|
||||
|
||||
/// The state of the global networking driver.
|
||||
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
|
||||
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
|
||||
// handles.
|
||||
std::thread::Builder::new()
|
||||
.name("async-std/net".to_string())
|
||||
.spawn(move || {
|
||||
// If the driver thread panics, there's not much we can do. It is not a
|
||||
// recoverable error and there is no place to propagate it into so we just abort.
|
||||
abort_on_panic(|| {
|
||||
main_loop().expect("async networking thread has panicked");
|
||||
})
|
||||
})
|
||||
.expect("cannot start a thread driving blocking tasks");
|
||||
|
||||
Reactor::new().expect("cannot initialize reactor")
|
||||
});
|
||||
|
||||
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
|
||||
fn main_loop() -> io::Result<()> {
|
||||
let reactor = &REACTOR;
|
||||
let mut events = mio::Events::with_capacity(1000);
|
||||
|
||||
loop {
|
||||
// Block on the poller until at least one new event comes in.
|
||||
reactor.poller.poll(&mut events, None)?;
|
||||
|
||||
// Lock the entire entry table while we're processing new events.
|
||||
let entries = reactor.entries.lock().unwrap();
|
||||
|
||||
for event in events.iter() {
|
||||
let token = event.token();
|
||||
|
||||
if token == reactor.notify_token {
|
||||
// If this is the notification token, we just need the notification state.
|
||||
reactor.notify_reg.1.set_readiness(mio::Ready::empty())?;
|
||||
} else {
|
||||
// Otherwise, look for the entry associated with this token.
|
||||
if let Some(entry) = entries.get(token.0) {
|
||||
// Set the readiness flags from this I/O event.
|
||||
let readiness = event.readiness();
|
||||
|
||||
// Wake up reader tasks blocked on this I/O handle.
|
||||
if !(readiness & reader_interests()).is_empty() {
|
||||
let mut readers = entry.readers.lock().unwrap();
|
||||
readers.ready = true;
|
||||
for w in readers.wakers.drain(..) {
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
|
||||
// Wake up writer tasks blocked on this I/O handle.
|
||||
if !(readiness & writer_interests()).is_empty() {
|
||||
let mut writers = entry.writers.lock().unwrap();
|
||||
writers.ready = true;
|
||||
for w in writers.wakers.drain(..) {
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An I/O handle powered by the networking driver.
|
||||
///
|
||||
/// This handle wraps an I/O event source and exposes a "futurized" interface on top of it,
|
||||
/// implementing traits `AsyncRead` and `AsyncWrite`.
|
||||
pub struct Watcher<T: Evented> {
|
||||
/// Data associated with the I/O handle.
|
||||
entry: Arc<Entry>,
|
||||
|
||||
/// The I/O event source.
|
||||
source: Option<T>,
|
||||
}
|
||||
|
||||
impl<T: Evented> Watcher<T> {
|
||||
/// Creates a new I/O handle.
|
||||
///
|
||||
/// The provided I/O event source will be kept registered inside the reactor's poller for the
|
||||
/// lifetime of the returned I/O handle.
|
||||
pub fn new(source: T) -> Watcher<T> {
|
||||
Watcher {
|
||||
entry: REACTOR
|
||||
.register(&source)
|
||||
.expect("cannot register an I/O event source"),
|
||||
source: Some(source),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner I/O event source.
|
||||
pub fn get_ref(&self) -> &T {
|
||||
self.source.as_ref().unwrap()
|
||||
}
|
||||
|
||||
/// Polls the inner I/O source for a non-blocking read operation.
|
||||
///
|
||||
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
|
||||
/// will be registered for wakeup when the I/O source becomes readable.
|
||||
pub fn poll_read_with<'a, F, R>(&'a self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
|
||||
where
|
||||
F: FnMut(&'a T) -> io::Result<R>,
|
||||
{
|
||||
// If the operation isn't blocked, return its result.
|
||||
match f(self.source.as_ref().unwrap()) {
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||
res => return Poll::Ready(res),
|
||||
}
|
||||
|
||||
// Lock the waker list.
|
||||
let mut readers = self.entry.readers.lock().unwrap();
|
||||
|
||||
// Try running the operation again.
|
||||
match f(self.source.as_ref().unwrap()) {
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||
res => return Poll::Ready(res),
|
||||
}
|
||||
|
||||
// Register the task if it isn't registered already.
|
||||
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
|
||||
readers.wakers.push(cx.waker().clone());
|
||||
}
|
||||
|
||||
readers.ready = false;
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Polls the inner I/O source for a non-blocking write operation.
|
||||
///
|
||||
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
|
||||
/// will be registered for wakeup when the I/O source becomes writable.
|
||||
pub fn poll_write_with<'a, F, R>(
|
||||
&'a self,
|
||||
cx: &mut Context<'_>,
|
||||
mut f: F,
|
||||
) -> Poll<io::Result<R>>
|
||||
where
|
||||
F: FnMut(&'a T) -> io::Result<R>,
|
||||
{
|
||||
// If the operation isn't blocked, return its result.
|
||||
match f(self.source.as_ref().unwrap()) {
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||
res => return Poll::Ready(res),
|
||||
}
|
||||
|
||||
// Lock the waker list.
|
||||
let mut writers = self.entry.writers.lock().unwrap();
|
||||
|
||||
// Try running the operation again.
|
||||
match f(self.source.as_ref().unwrap()) {
|
||||
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||
res => return Poll::Ready(res),
|
||||
}
|
||||
|
||||
// Register the task if it isn't registered already.
|
||||
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
|
||||
writers.wakers.push(cx.waker().clone());
|
||||
}
|
||||
|
||||
writers.ready = false;
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Polls the inner I/O source until a non-blocking read can be performed.
|
||||
///
|
||||
/// If non-blocking reads are currently not possible, the `Waker`
|
||||
/// will be saved and notified when it can read non-blocking
|
||||
/// again.
|
||||
#[allow(dead_code)]
|
||||
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
// Lock the waker list.
|
||||
let mut readers = self.entry.readers.lock().unwrap();
|
||||
if readers.ready {
|
||||
return Poll::Ready(())
|
||||
}
|
||||
// Register the task if it isn't registered already.
|
||||
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
|
||||
readers.wakers.push(cx.waker().clone());
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Polls the inner I/O source until a non-blocking write can be performed.
|
||||
///
|
||||
/// If non-blocking writes are currently not possible, the `Waker`
|
||||
/// will be saved and notified when it can write non-blocking
|
||||
/// again.
|
||||
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
|
||||
// Lock the waker list.
|
||||
let mut writers = self.entry.writers.lock().unwrap();
|
||||
if writers.ready {
|
||||
return Poll::Ready(())
|
||||
}
|
||||
// Register the task if it isn't registered already.
|
||||
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
|
||||
writers.wakers.push(cx.waker().clone());
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Deregisters and returns the inner I/O source.
|
||||
///
|
||||
/// This method is typically used to convert `Watcher`s to raw file descriptors/handles.
|
||||
#[allow(dead_code)]
|
||||
pub fn into_inner(mut self) -> T {
|
||||
let source = self.source.take().unwrap();
|
||||
REACTOR
|
||||
.deregister(&source, &self.entry)
|
||||
.expect("cannot deregister I/O event source");
|
||||
source
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Evented> Drop for Watcher<T> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(ref source) = self.source {
|
||||
REACTOR
|
||||
.deregister(source, &self.entry)
|
||||
.expect("cannot deregister I/O event source");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Watcher")
|
||||
.field("entry", &self.entry)
|
||||
.field("source", &self.source)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a mask containing flags that interest tasks reading from I/O handles.
|
||||
#[inline]
|
||||
fn reader_interests() -> mio::Ready {
|
||||
mio::Ready::all() - mio::Ready::writable()
|
||||
}
|
||||
|
||||
/// Returns a mask containing flags that interest tasks writing into I/O handles.
|
||||
#[inline]
|
||||
fn writer_interests() -> mio::Ready {
|
||||
mio::Ready::writable() | hup()
|
||||
}
|
||||
|
||||
/// Returns a flag containing the hangup status.
|
||||
#[inline]
|
||||
fn hup() -> mio::Ready {
|
||||
#[cfg(unix)]
|
||||
let ready = mio::unix::UnixReady::hup().into();
|
||||
|
||||
#[cfg(not(unix))]
|
||||
let ready = mio::Ready::empty();
|
||||
|
||||
ready
|
||||
}
|
@ -0,0 +1,34 @@
|
||||
//! The runtime.
|
||||
|
||||
use std::env;
|
||||
use std::thread;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::future;
|
||||
|
||||
/// Dummy runtime struct.
|
||||
pub struct Runtime {}
|
||||
|
||||
/// The global runtime.
|
||||
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
||||
// Create an executor thread pool.
|
||||
|
||||
let thread_count = env::var("ASYNC_STD_THREAD_COUNT")
|
||||
.map(|env| {
|
||||
env.parse()
|
||||
.expect("ASYNC_STD_THREAD_COUNT must be a number")
|
||||
})
|
||||
.unwrap_or_else(|_| num_cpus::get())
|
||||
.max(1);
|
||||
|
||||
let thread_name = env::var("ASYNC_STD_THREAD_NAME").unwrap_or("async-std/runtime".to_string());
|
||||
|
||||
for _ in 0..thread_count {
|
||||
thread::Builder::new()
|
||||
.name(thread_name.clone())
|
||||
.spawn(|| crate::task::block_on(future::pending::<()>()))
|
||||
.expect("cannot start a runtime thread");
|
||||
}
|
||||
Runtime {}
|
||||
});
|
@ -0,0 +1,417 @@
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
|
||||
use super::MutexGuard;
|
||||
use crate::future::{timeout, Future};
|
||||
use crate::sync::WakerSet;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
|
||||
pub struct WaitTimeoutResult(bool);
|
||||
|
||||
/// A type indicating whether a timed wait on a condition variable returned due to a time out or
|
||||
/// not
|
||||
impl WaitTimeoutResult {
|
||||
/// Returns `true` if the wait was known to have timed out.
|
||||
pub fn timed_out(self) -> bool {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
/// A Condition Variable
|
||||
///
|
||||
/// This type is an async version of [`std::sync::Mutex`].
|
||||
///
|
||||
/// [`std::sync::Condvar`]: https://doc.rust-lang.org/std/sync/struct.Condvar.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use std::sync::Arc;
|
||||
///
|
||||
/// use async_std::sync::{Mutex, Condvar};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
/// let pair2 = pair.clone();
|
||||
///
|
||||
/// // Inside of our lock, spawn a new thread, and then wait for it to start.
|
||||
/// task::spawn(async move {
|
||||
/// let (lock, cvar) = &*pair2;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// *started = true;
|
||||
/// // We notify the condvar that the value has changed.
|
||||
/// cvar.notify_one();
|
||||
/// });
|
||||
///
|
||||
/// // Wait for the thread to start up.
|
||||
/// let (lock, cvar) = &*pair;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// while !*started {
|
||||
/// started = cvar.wait(started).await;
|
||||
/// }
|
||||
///
|
||||
/// # })
|
||||
/// ```
|
||||
pub struct Condvar {
|
||||
wakers: WakerSet,
|
||||
}
|
||||
|
||||
unsafe impl Send for Condvar {}
|
||||
unsafe impl Sync for Condvar {}
|
||||
|
||||
impl Default for Condvar {
|
||||
fn default() -> Self {
|
||||
Condvar::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl Condvar {
|
||||
/// Creates a new condition variable
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::sync::Condvar;
|
||||
///
|
||||
/// let cvar = Condvar::new();
|
||||
/// ```
|
||||
pub fn new() -> Self {
|
||||
Condvar {
|
||||
wakers: WakerSet::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks the current task until this condition variable receives a notification.
|
||||
///
|
||||
/// Unlike the std equivalent, this does not check that a single mutex is used at runtime.
|
||||
/// However, as a best practice avoid using with multiple mutexes.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// use std::sync::Arc;
|
||||
///
|
||||
/// use async_std::sync::{Mutex, Condvar};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
/// let pair2 = pair.clone();
|
||||
///
|
||||
/// task::spawn(async move {
|
||||
/// let (lock, cvar) = &*pair2;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// *started = true;
|
||||
/// // We notify the condvar that the value has changed.
|
||||
/// cvar.notify_one();
|
||||
/// });
|
||||
///
|
||||
/// // Wait for the thread to start up.
|
||||
/// let (lock, cvar) = &*pair;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// while !*started {
|
||||
/// started = cvar.wait(started).await;
|
||||
/// }
|
||||
/// # })
|
||||
/// ```
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
pub async fn wait<'a, T>(&self, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
|
||||
let mutex = MutexGuard::source(&guard);
|
||||
|
||||
self.await_notify(guard).await;
|
||||
|
||||
mutex.lock().await
|
||||
}
|
||||
|
||||
fn await_notify<'a, T>(&self, guard: MutexGuard<'a, T>) -> AwaitNotify<'_, 'a, T> {
|
||||
AwaitNotify {
|
||||
cond: self,
|
||||
guard: Some(guard),
|
||||
key: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks the current taks until this condition variable receives a notification and the
|
||||
/// required condition is met. Spurious wakeups are ignored and this function will only
|
||||
/// return once the condition has been met.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use std::sync::Arc;
|
||||
///
|
||||
/// use async_std::sync::{Mutex, Condvar};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
/// let pair2 = pair.clone();
|
||||
///
|
||||
/// task::spawn(async move {
|
||||
/// let (lock, cvar) = &*pair2;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// *started = true;
|
||||
/// // We notify the condvar that the value has changed.
|
||||
/// cvar.notify_one();
|
||||
/// });
|
||||
///
|
||||
/// // Wait for the thread to start up.
|
||||
/// let (lock, cvar) = &*pair;
|
||||
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
|
||||
/// let _guard = cvar.wait_until(lock.lock().await, |started| { *started }).await;
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
pub async fn wait_until<'a, T, F>(
|
||||
&self,
|
||||
mut guard: MutexGuard<'a, T>,
|
||||
mut condition: F,
|
||||
) -> MutexGuard<'a, T>
|
||||
where
|
||||
F: FnMut(&mut T) -> bool,
|
||||
{
|
||||
while !condition(&mut *guard) {
|
||||
guard = self.wait(guard).await;
|
||||
}
|
||||
guard
|
||||
}
|
||||
|
||||
/// Waits on this condition variable for a notification, timing out after a specified duration.
|
||||
///
|
||||
/// For these reasons `Condvar::wait_timeout_until` is recommended in most cases.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use std::sync::Arc;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// use async_std::sync::{Mutex, Condvar};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
/// let pair2 = pair.clone();
|
||||
///
|
||||
/// task::spawn(async move {
|
||||
/// let (lock, cvar) = &*pair2;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// *started = true;
|
||||
/// // We notify the condvar that the value has changed.
|
||||
/// cvar.notify_one();
|
||||
/// });
|
||||
///
|
||||
/// // wait for the thread to start up
|
||||
/// let (lock, cvar) = &*pair;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// loop {
|
||||
/// let result = cvar.wait_timeout(started, Duration::from_millis(10)).await;
|
||||
/// started = result.0;
|
||||
/// if *started == true {
|
||||
/// // We received the notification and the value has been updated, we can leave.
|
||||
/// break
|
||||
/// }
|
||||
/// }
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
pub async fn wait_timeout<'a, T>(
|
||||
&self,
|
||||
guard: MutexGuard<'a, T>,
|
||||
dur: Duration,
|
||||
) -> (MutexGuard<'a, T>, WaitTimeoutResult) {
|
||||
let mutex = MutexGuard::source(&guard);
|
||||
match timeout(dur, self.wait(guard)).await {
|
||||
Ok(guard) => (guard, WaitTimeoutResult(false)),
|
||||
Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Waits on this condition variable for a notification, timing out after a specified duration.
|
||||
/// Spurious wakes will not cause this function to return.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// use std::sync::Arc;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// use async_std::sync::{Mutex, Condvar};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
/// let pair2 = pair.clone();
|
||||
///
|
||||
/// task::spawn(async move {
|
||||
/// let (lock, cvar) = &*pair2;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// *started = true;
|
||||
/// // We notify the condvar that the value has changed.
|
||||
/// cvar.notify_one();
|
||||
/// });
|
||||
///
|
||||
/// // wait for the thread to start up
|
||||
/// let (lock, cvar) = &*pair;
|
||||
/// let result = cvar.wait_timeout_until(
|
||||
/// lock.lock().await,
|
||||
/// Duration::from_millis(100),
|
||||
/// |&mut started| started,
|
||||
/// ).await;
|
||||
/// if result.1.timed_out() {
|
||||
/// // timed-out without the condition ever evaluating to true.
|
||||
/// }
|
||||
/// // access the locked mutex via result.0
|
||||
/// # });
|
||||
/// ```
|
||||
#[allow(clippy::needless_lifetimes)]
|
||||
pub async fn wait_timeout_until<'a, T, F>(
|
||||
&self,
|
||||
guard: MutexGuard<'a, T>,
|
||||
dur: Duration,
|
||||
condition: F,
|
||||
) -> (MutexGuard<'a, T>, WaitTimeoutResult)
|
||||
where
|
||||
F: FnMut(&mut T) -> bool,
|
||||
{
|
||||
let mutex = MutexGuard::source(&guard);
|
||||
match timeout(dur, self.wait_until(guard, condition)).await {
|
||||
Ok(guard) => (guard, WaitTimeoutResult(false)),
|
||||
Err(_) => (mutex.lock().await, WaitTimeoutResult(true)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Wakes up one blocked task on this condvar.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # fn main() { async_std::task::block_on(async {
|
||||
/// use std::sync::Arc;
|
||||
///
|
||||
/// use async_std::sync::{Mutex, Condvar};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
/// let pair2 = pair.clone();
|
||||
///
|
||||
/// task::spawn(async move {
|
||||
/// let (lock, cvar) = &*pair2;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// *started = true;
|
||||
/// // We notify the condvar that the value has changed.
|
||||
/// cvar.notify_one();
|
||||
/// });
|
||||
///
|
||||
/// // Wait for the thread to start up.
|
||||
/// let (lock, cvar) = &*pair;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// while !*started {
|
||||
/// started = cvar.wait(started).await;
|
||||
/// }
|
||||
/// # }) }
|
||||
/// ```
|
||||
pub fn notify_one(&self) {
|
||||
self.wakers.notify_one();
|
||||
}
|
||||
|
||||
/// Wakes up all blocked tasks on this condvar.
|
||||
///
|
||||
/// # Examples
|
||||
/// ```
|
||||
/// # fn main() { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use std::sync::Arc;
|
||||
///
|
||||
/// use async_std::sync::{Mutex, Condvar};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
/// let pair2 = pair.clone();
|
||||
///
|
||||
/// task::spawn(async move {
|
||||
/// let (lock, cvar) = &*pair2;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// *started = true;
|
||||
/// // We notify the condvar that the value has changed.
|
||||
/// cvar.notify_all();
|
||||
/// });
|
||||
///
|
||||
/// // Wait for the thread to start up.
|
||||
/// let (lock, cvar) = &*pair;
|
||||
/// let mut started = lock.lock().await;
|
||||
/// // As long as the value inside the `Mutex<bool>` is `false`, we wait.
|
||||
/// while !*started {
|
||||
/// started = cvar.wait(started).await;
|
||||
/// }
|
||||
/// #
|
||||
/// # }) }
|
||||
/// ```
|
||||
pub fn notify_all(&self) {
|
||||
self.wakers.notify_all();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Condvar {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.pad("Condvar { .. }")
|
||||
}
|
||||
}
|
||||
|
||||
/// A future that waits for another task to notify the condition variable.
|
||||
///
|
||||
/// This is an internal future that `wait` and `wait_until` await on.
|
||||
struct AwaitNotify<'a, 'b, T> {
|
||||
/// The condition variable that we are waiting on
|
||||
cond: &'a Condvar,
|
||||
/// The lock used with `cond`.
|
||||
/// This will be released the first time the future is polled,
|
||||
/// after registering the context to be notified.
|
||||
guard: Option<MutexGuard<'b, T>>,
|
||||
/// A key into the conditions variable's `WakerSet`.
|
||||
/// This is set to the index of the `Waker` for the context each time
|
||||
/// the future is polled and not completed.
|
||||
key: Option<usize>,
|
||||
}
|
||||
|
||||
impl<'a, 'b, T> Future for AwaitNotify<'a, 'b, T> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.guard.take() {
|
||||
Some(_) => {
|
||||
self.key = Some(self.cond.wakers.insert(cx));
|
||||
// the guard is dropped when we return, which frees the lock
|
||||
Poll::Pending
|
||||
}
|
||||
None => {
|
||||
if let Some(key) = self.key {
|
||||
if self.cond.wakers.remove_if_notified(key, cx) {
|
||||
self.key = None;
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
} else {
|
||||
// This should only happen if it is polled twice after receiving a notification
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a, 'b, T> Drop for AwaitNotify<'a, 'b, T> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(key) = self.key {
|
||||
self.cond.wakers.cancel(key);
|
||||
}
|
||||
}
|
||||
}
|
@ -1,289 +0,0 @@
|
||||
use std::cell::UnsafeCell;
|
||||
use std::fmt;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::future::Future;
|
||||
|
||||
use crate::sync::WakerSet;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
/// A mutual exclusion primitive for protecting shared data.
|
||||
///
|
||||
/// This type is an async version of [`std::sync::Mutex`].
|
||||
///
|
||||
/// [`std::sync::Mutex`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::sync::{Arc, Mutex};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let m = Arc::new(Mutex::new(0));
|
||||
/// let mut tasks = vec![];
|
||||
///
|
||||
/// for _ in 0..10 {
|
||||
/// let m = m.clone();
|
||||
/// tasks.push(task::spawn(async move {
|
||||
/// *m.lock().await += 1;
|
||||
/// }));
|
||||
/// }
|
||||
///
|
||||
/// for t in tasks {
|
||||
/// t.await;
|
||||
/// }
|
||||
/// assert_eq!(*m.lock().await, 10);
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
pub struct Mutex<T: ?Sized> {
|
||||
locked: AtomicBool,
|
||||
wakers: WakerSet,
|
||||
value: UnsafeCell<T>,
|
||||
}
|
||||
|
||||
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
|
||||
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
|
||||
|
||||
impl<T> Mutex<T> {
|
||||
/// Creates a new mutex.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::sync::Mutex;
|
||||
///
|
||||
/// let mutex = Mutex::new(0);
|
||||
/// ```
|
||||
pub fn new(t: T) -> Mutex<T> {
|
||||
Mutex {
|
||||
locked: AtomicBool::new(false),
|
||||
wakers: WakerSet::new(),
|
||||
value: UnsafeCell::new(t),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> Mutex<T> {
|
||||
/// Acquires the lock.
|
||||
///
|
||||
/// Returns a guard that releases the lock when dropped.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::sync::{Arc, Mutex};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let m1 = Arc::new(Mutex::new(10));
|
||||
/// let m2 = m1.clone();
|
||||
///
|
||||
/// task::spawn(async move {
|
||||
/// *m1.lock().await = 20;
|
||||
/// })
|
||||
/// .await;
|
||||
///
|
||||
/// assert_eq!(*m2.lock().await, 20);
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
pub async fn lock(&self) -> MutexGuard<'_, T> {
|
||||
pub struct LockFuture<'a, T: ?Sized> {
|
||||
mutex: &'a Mutex<T>,
|
||||
opt_key: Option<usize>,
|
||||
}
|
||||
|
||||
impl<'a, T: ?Sized> Future for LockFuture<'a, T> {
|
||||
type Output = MutexGuard<'a, T>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
loop {
|
||||
// If the current task is in the set, remove it.
|
||||
if let Some(key) = self.opt_key.take() {
|
||||
self.mutex.wakers.remove(key);
|
||||
}
|
||||
|
||||
// Try acquiring the lock.
|
||||
match self.mutex.try_lock() {
|
||||
Some(guard) => return Poll::Ready(guard),
|
||||
None => {
|
||||
// Insert this lock operation.
|
||||
self.opt_key = Some(self.mutex.wakers.insert(cx));
|
||||
|
||||
// If the mutex is still locked, return.
|
||||
if self.mutex.locked.load(Ordering::SeqCst) {
|
||||
return Poll::Pending;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> Drop for LockFuture<'_, T> {
|
||||
fn drop(&mut self) {
|
||||
// If the current task is still in the set, that means it is being cancelled now.
|
||||
if let Some(key) = self.opt_key {
|
||||
self.mutex.wakers.cancel(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LockFuture {
|
||||
mutex: self,
|
||||
opt_key: None,
|
||||
}
|
||||
.await
|
||||
}
|
||||
|
||||
/// Attempts to acquire the lock.
|
||||
///
|
||||
/// If the lock could not be acquired at this time, then [`None`] is returned. Otherwise, a
|
||||
/// guard is returned that releases the lock when dropped.
|
||||
///
|
||||
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::sync::{Arc, Mutex};
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let m1 = Arc::new(Mutex::new(10));
|
||||
/// let m2 = m1.clone();
|
||||
///
|
||||
/// task::spawn(async move {
|
||||
/// if let Some(mut guard) = m1.try_lock() {
|
||||
/// *guard = 20;
|
||||
/// } else {
|
||||
/// println!("try_lock failed");
|
||||
/// }
|
||||
/// })
|
||||
/// .await;
|
||||
///
|
||||
/// assert_eq!(*m2.lock().await, 20);
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
#[inline]
|
||||
pub fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
|
||||
if !self.locked.swap(true, Ordering::SeqCst) {
|
||||
Some(MutexGuard(self))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes the mutex, returning the underlying data.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::sync::Mutex;
|
||||
///
|
||||
/// let mutex = Mutex::new(10);
|
||||
/// assert_eq!(mutex.into_inner(), 10);
|
||||
/// ```
|
||||
pub fn into_inner(self) -> T where T: Sized {
|
||||
self.value.into_inner()
|
||||
}
|
||||
|
||||
/// Returns a mutable reference to the underlying data.
|
||||
///
|
||||
/// Since this call borrows the mutex mutably, no actual locking takes place -- the mutable
|
||||
/// borrow statically guarantees no locks exist.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::sync::Mutex;
|
||||
///
|
||||
/// let mut mutex = Mutex::new(0);
|
||||
/// *mutex.get_mut() = 10;
|
||||
/// assert_eq!(*mutex.lock().await, 10);
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
pub fn get_mut(&mut self) -> &mut T {
|
||||
unsafe { &mut *self.value.get() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized + fmt::Debug> fmt::Debug for Mutex<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
struct Locked;
|
||||
impl fmt::Debug for Locked {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str("<locked>")
|
||||
}
|
||||
}
|
||||
|
||||
match self.try_lock() {
|
||||
None => f.debug_struct("Mutex").field("data", &Locked).finish(),
|
||||
Some(guard) => f.debug_struct("Mutex").field("data", &&*guard).finish(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> From<T> for Mutex<T> {
|
||||
fn from(val: T) -> Mutex<T> {
|
||||
Mutex::new(val)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized + Default> Default for Mutex<T> {
|
||||
fn default() -> Mutex<T> {
|
||||
Mutex::new(Default::default())
|
||||
}
|
||||
}
|
||||
|
||||
/// A guard that releases the lock when dropped.
|
||||
pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex<T>);
|
||||
|
||||
unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
|
||||
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
|
||||
|
||||
impl<T: ?Sized> Drop for MutexGuard<'_, T> {
|
||||
fn drop(&mut self) {
|
||||
// Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`.
|
||||
self.0.locked.store(false, Ordering::SeqCst);
|
||||
|
||||
// Notify a blocked `lock()` operation if none were notified already.
|
||||
self.0.wakers.notify_any();
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized +fmt::Debug> fmt::Debug for MutexGuard<'_, T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
fmt::Debug::fmt(&**self, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized + fmt::Display> fmt::Display for MutexGuard<'_, T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
(**self).fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> Deref for MutexGuard<'_, T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
unsafe { &*self.0.value.get() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> DerefMut for MutexGuard<'_, T> {
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
unsafe { &mut *self.0.value.get() }
|
||||
}
|
||||
}
|
@ -1,13 +0,0 @@
|
||||
//! Task executor.
|
||||
//!
|
||||
//! API bindings between `crate::task` and this module are very simple:
|
||||
//!
|
||||
//! * The only export is the `schedule` function.
|
||||
//! * The only import is the `crate::task::Runnable` type.
|
||||
|
||||
pub(crate) use pool::schedule;
|
||||
|
||||
use sleepers::Sleepers;
|
||||
|
||||
mod pool;
|
||||
mod sleepers;
|
@ -1,179 +0,0 @@
|
||||
use std::cell::Cell;
|
||||
use std::iter;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use crossbeam_deque::{Injector, Stealer, Worker};
|
||||
use once_cell::sync::Lazy;
|
||||
use once_cell::unsync::OnceCell;
|
||||
|
||||
use crate::task::executor::Sleepers;
|
||||
use crate::task::Runnable;
|
||||
use crate::utils::{abort_on_panic, random};
|
||||
|
||||
/// The state of an executor.
|
||||
struct Pool {
|
||||
/// The global queue of tasks.
|
||||
injector: Injector<Runnable>,
|
||||
|
||||
/// Handles to local queues for stealing work from worker threads.
|
||||
stealers: Vec<Stealer<Runnable>>,
|
||||
|
||||
/// Used for putting idle workers to sleep and notifying them when new tasks come in.
|
||||
sleepers: Sleepers,
|
||||
}
|
||||
|
||||
/// Global executor that runs spawned tasks.
|
||||
static POOL: Lazy<Pool> = Lazy::new(|| {
|
||||
let num_threads = num_cpus::get().max(1);
|
||||
let mut stealers = Vec::new();
|
||||
|
||||
// Spawn worker threads.
|
||||
for _ in 0..num_threads {
|
||||
let worker = Worker::new_fifo();
|
||||
stealers.push(worker.stealer());
|
||||
|
||||
let proc = Processor {
|
||||
worker,
|
||||
slot: Cell::new(None),
|
||||
slot_runs: Cell::new(0),
|
||||
};
|
||||
|
||||
thread::Builder::new()
|
||||
.name("async-std/executor".to_string())
|
||||
.spawn(|| {
|
||||
let _ = PROCESSOR.with(|p| p.set(proc));
|
||||
abort_on_panic(main_loop);
|
||||
})
|
||||
.expect("cannot start a thread driving tasks");
|
||||
}
|
||||
|
||||
Pool {
|
||||
injector: Injector::new(),
|
||||
stealers,
|
||||
sleepers: Sleepers::new(),
|
||||
}
|
||||
});
|
||||
|
||||
/// The state of a worker thread.
|
||||
struct Processor {
|
||||
/// The local task queue.
|
||||
worker: Worker<Runnable>,
|
||||
|
||||
/// Contains the next task to run as an optimization that skips queues.
|
||||
slot: Cell<Option<Runnable>>,
|
||||
|
||||
/// How many times in a row tasks have been taked from the slot rather than the queue.
|
||||
slot_runs: Cell<u32>,
|
||||
}
|
||||
|
||||
thread_local! {
|
||||
/// Worker thread state.
|
||||
static PROCESSOR: OnceCell<Processor> = OnceCell::new();
|
||||
}
|
||||
|
||||
/// Schedules a new runnable task for execution.
|
||||
pub(crate) fn schedule(task: Runnable) {
|
||||
PROCESSOR.with(|proc| {
|
||||
// If the current thread is a worker thread, store it into its task slot or push it into
|
||||
// its local task queue. Otherwise, push it into the global task queue.
|
||||
match proc.get() {
|
||||
Some(proc) => {
|
||||
// Replace the task in the slot.
|
||||
if let Some(task) = proc.slot.replace(Some(task)) {
|
||||
// If the slot already contained a task, push it into the local task queue.
|
||||
proc.worker.push(task);
|
||||
POOL.sleepers.notify_one();
|
||||
}
|
||||
}
|
||||
None => {
|
||||
POOL.injector.push(task);
|
||||
POOL.sleepers.notify_one();
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Main loop running a worker thread.
|
||||
fn main_loop() {
|
||||
/// Number of yields when no runnable task is found.
|
||||
const YIELDS: u32 = 3;
|
||||
/// Number of short sleeps when no runnable task in found.
|
||||
const SLEEPS: u32 = 1;
|
||||
|
||||
// The number of times the thread didn't find work in a row.
|
||||
let mut fails = 0;
|
||||
|
||||
loop {
|
||||
// Try to find a runnable task.
|
||||
match find_runnable() {
|
||||
Some(task) => {
|
||||
fails = 0;
|
||||
|
||||
// Run the found task.
|
||||
task.run();
|
||||
}
|
||||
None => {
|
||||
fails += 1;
|
||||
|
||||
// Yield the current thread or put it to sleep.
|
||||
if fails <= YIELDS {
|
||||
thread::yield_now();
|
||||
} else if fails <= YIELDS + SLEEPS {
|
||||
thread::sleep(Duration::from_micros(10));
|
||||
} else {
|
||||
POOL.sleepers.wait();
|
||||
fails = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the next runnable task.
|
||||
fn find_runnable() -> Option<Runnable> {
|
||||
/// Maximum number of times the slot can be used in a row.
|
||||
const SLOT_LIMIT: u32 = 16;
|
||||
|
||||
PROCESSOR.with(|proc| {
|
||||
let proc = proc.get().unwrap();
|
||||
|
||||
// Try taking a task from the slot.
|
||||
let runs = proc.slot_runs.get();
|
||||
if runs < SLOT_LIMIT {
|
||||
if let Some(task) = proc.slot.take() {
|
||||
proc.slot_runs.set(runs + 1);
|
||||
return Some(task);
|
||||
}
|
||||
}
|
||||
proc.slot_runs.set(0);
|
||||
|
||||
// Pop a task from the local queue, if not empty.
|
||||
proc.worker.pop().or_else(|| {
|
||||
// Otherwise, we need to look for a task elsewhere.
|
||||
iter::repeat_with(|| {
|
||||
// Try stealing a batch of tasks from the global queue.
|
||||
POOL.injector
|
||||
.steal_batch_and_pop(&proc.worker)
|
||||
// Or try stealing a batch of tasks from one of the other threads.
|
||||
.or_else(|| {
|
||||
// First, pick a random starting point in the list of local queues.
|
||||
let len = POOL.stealers.len();
|
||||
let start = random(len as u32) as usize;
|
||||
|
||||
// Try stealing a batch of tasks from each local queue starting from the
|
||||
// chosen point.
|
||||
let (l, r) = POOL.stealers.split_at(start);
|
||||
let stealers = r.iter().chain(l.iter());
|
||||
stealers
|
||||
.map(|s| s.steal_batch_and_pop(&proc.worker))
|
||||
.collect()
|
||||
})
|
||||
})
|
||||
// Loop while no task was stolen and any steal operation needs to be retried.
|
||||
.find(|s| !s.is_retry())
|
||||
// Extract the stolen task, if there is one.
|
||||
.and_then(|s| s.success())
|
||||
})
|
||||
})
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Condvar, Mutex};
|
||||
|
||||
/// The place where worker threads go to sleep.
|
||||
///
|
||||
/// Similar to how thread parking works, if a notification comes up while no threads are sleeping,
|
||||
/// the next thread that attempts to go to sleep will pick up the notification immediately.
|
||||
pub struct Sleepers {
|
||||
/// How many threads are currently a sleep.
|
||||
sleep: Mutex<usize>,
|
||||
|
||||
/// A condvar for notifying sleeping threads.
|
||||
wake: Condvar,
|
||||
|
||||
/// Set to `true` if a notification came up while nobody was sleeping.
|
||||
notified: AtomicBool,
|
||||
}
|
||||
|
||||
impl Sleepers {
|
||||
/// Creates a new `Sleepers`.
|
||||
pub fn new() -> Sleepers {
|
||||
Sleepers {
|
||||
sleep: Mutex::new(0),
|
||||
wake: Condvar::new(),
|
||||
notified: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
/// Puts the current thread to sleep.
|
||||
pub fn wait(&self) {
|
||||
let mut sleep = self.sleep.lock().unwrap();
|
||||
|
||||
if !self.notified.swap(false, Ordering::SeqCst) {
|
||||
*sleep += 1;
|
||||
let _ = self.wake.wait(sleep).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies one thread.
|
||||
pub fn notify_one(&self) {
|
||||
if !self.notified.load(Ordering::SeqCst) {
|
||||
let mut sleep = self.sleep.lock().unwrap();
|
||||
|
||||
if *sleep > 0 {
|
||||
*sleep -= 1;
|
||||
self.wake.notify_one();
|
||||
} else {
|
||||
self.notified.store(true, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,31 @@
|
||||
use std::future::Future;
|
||||
|
||||
use crate::task::{Builder, JoinHandle};
|
||||
|
||||
/// Spawns a task onto the thread-local executor.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # #[cfg(feature = "unstable")]
|
||||
/// # async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::task;
|
||||
///
|
||||
/// let handle = task::spawn_local(async {
|
||||
/// 1 + 2
|
||||
/// });
|
||||
///
|
||||
/// assert_eq!(handle.await, 3);
|
||||
/// #
|
||||
/// # })
|
||||
/// ```
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[inline]
|
||||
pub fn spawn_local<F, T>(future: F) -> JoinHandle<T>
|
||||
where
|
||||
F: Future<Output = T> + 'static,
|
||||
T: 'static,
|
||||
{
|
||||
Builder::new().local(future).expect("cannot spawn task")
|
||||
}
|
@ -0,0 +1,84 @@
|
||||
use std::cell::Cell;
|
||||
use std::ptr;
|
||||
|
||||
use crate::task::{LocalsMap, Task, TaskId};
|
||||
use crate::utils::abort_on_panic;
|
||||
|
||||
thread_local! {
|
||||
/// A pointer to the currently running task.
|
||||
static CURRENT: Cell<*const TaskLocalsWrapper> = Cell::new(ptr::null_mut());
|
||||
}
|
||||
|
||||
/// A wrapper to store task local data.
|
||||
pub(crate) struct TaskLocalsWrapper {
|
||||
/// The actual task details.
|
||||
task: Task,
|
||||
|
||||
/// The map holding task-local values.
|
||||
locals: LocalsMap,
|
||||
}
|
||||
|
||||
impl TaskLocalsWrapper {
|
||||
/// Creates a new task handle.
|
||||
///
|
||||
/// If the task is unnamed, the inner representation of the task will be lazily allocated on
|
||||
/// demand.
|
||||
#[inline]
|
||||
pub(crate) fn new(task: Task) -> Self {
|
||||
Self {
|
||||
task,
|
||||
locals: LocalsMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the task's unique identifier.
|
||||
#[inline]
|
||||
pub fn id(&self) -> TaskId {
|
||||
self.task.id()
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner `Task`.
|
||||
pub(crate) fn task(&self) -> &Task {
|
||||
&self.task
|
||||
}
|
||||
|
||||
/// Returns the map holding task-local values.
|
||||
pub(crate) fn locals(&self) -> &LocalsMap {
|
||||
&self.locals
|
||||
}
|
||||
|
||||
/// Set a reference to the current task.
|
||||
pub(crate) unsafe fn set_current<F, R>(task: *const TaskLocalsWrapper, f: F) -> R
|
||||
where
|
||||
F: FnOnce() -> R,
|
||||
{
|
||||
CURRENT.with(|current| {
|
||||
let old_task = current.replace(task);
|
||||
defer! {
|
||||
current.set(old_task);
|
||||
}
|
||||
f()
|
||||
})
|
||||
}
|
||||
|
||||
/// Gets a reference to the current task.
|
||||
pub(crate) fn get_current<F, R>(f: F) -> Option<R>
|
||||
where
|
||||
F: FnOnce(&TaskLocalsWrapper) -> R,
|
||||
{
|
||||
let res = CURRENT.try_with(|current| unsafe { current.get().as_ref().map(f) });
|
||||
match res {
|
||||
Ok(Some(val)) => Some(val),
|
||||
Ok(None) | Err(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TaskLocalsWrapper {
|
||||
fn drop(&mut self) {
|
||||
// Abort the process if dropping task-locals panics.
|
||||
abort_on_panic(|| {
|
||||
unsafe { self.locals.clear() };
|
||||
});
|
||||
}
|
||||
}
|
@ -1,16 +1,63 @@
|
||||
use async_std::task;
|
||||
#![cfg(not(target_os = "unknown"))]
|
||||
|
||||
use async_std::{future::ready, task::block_on};
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
let res = task::block_on(async { 1 + 2 });
|
||||
let res = block_on(async { 1 + 2 });
|
||||
assert_eq!(res, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic = "boom"]
|
||||
fn panic() {
|
||||
task::block_on(async {
|
||||
block_on(async {
|
||||
// This panic should get propagated into the parent thread.
|
||||
panic!("boom");
|
||||
});
|
||||
}
|
||||
|
||||
#[cfg(feature = "unstable")]
|
||||
#[test]
|
||||
fn nested_block_on_local() {
|
||||
use async_std::task::spawn_local;
|
||||
|
||||
let x = block_on(async {
|
||||
let a = block_on(async { block_on(async { ready(3).await }) });
|
||||
let b = spawn_local(async { block_on(async { ready(2).await }) }).await;
|
||||
let c = block_on(async { block_on(async { ready(1).await }) });
|
||||
a + b + c
|
||||
});
|
||||
|
||||
assert_eq!(x, 3 + 2 + 1);
|
||||
|
||||
let y = block_on(async {
|
||||
let a = block_on(async { block_on(async { ready(3).await }) });
|
||||
let b = spawn_local(async { block_on(async { ready(2).await }) }).await;
|
||||
let c = block_on(async { block_on(async { ready(1).await }) });
|
||||
a + b + c
|
||||
});
|
||||
|
||||
assert_eq!(y, 3 + 2 + 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn nested_block_on() {
|
||||
let x = block_on(async {
|
||||
let a = block_on(async { block_on(async { ready(3).await }) });
|
||||
let b = block_on(async { block_on(async { ready(2).await }) });
|
||||
let c = block_on(async { block_on(async { ready(1).await }) });
|
||||
a + b + c
|
||||
});
|
||||
|
||||
assert_eq!(x, 3 + 2 + 1);
|
||||
|
||||
let y = block_on(async {
|
||||
let a = block_on(async { block_on(async { ready(3).await }) });
|
||||
let b = block_on(async { block_on(async { ready(2).await }) });
|
||||
let c = block_on(async { block_on(async { ready(1).await }) });
|
||||
a + b + c
|
||||
});
|
||||
|
||||
assert_eq!(y, 3 + 2 + 1);
|
||||
}
|
||||
|
@ -0,0 +1,20 @@
|
||||
#[cfg(feature = "unstable")]
|
||||
#[test]
|
||||
fn test_send() -> async_std::io::Result<()> {
|
||||
use async_std::prelude::*;
|
||||
use async_std::{stream, task};
|
||||
|
||||
task::block_on(async {
|
||||
fn test_send_trait<T: Send>(_: &T) {}
|
||||
|
||||
let stream = stream::repeat(1u8).take(10);
|
||||
test_send_trait(&stream);
|
||||
|
||||
let fut = stream.collect::<Vec<_>>();
|
||||
|
||||
// This line triggers a compilation error
|
||||
test_send_trait(&fut);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
@ -0,0 +1,103 @@
|
||||
#![cfg(feature = "unstable")]
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_std::sync::{Condvar, Mutex};
|
||||
use async_std::task::{self, JoinHandle};
|
||||
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
use async_std::task::spawn;
|
||||
#[cfg(target_os = "unknown")]
|
||||
use async_std::task::spawn_local as spawn;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||
fn wait_timeout_with_lock() {
|
||||
task::block_on(async {
|
||||
let pair = Arc::new((Mutex::new(false), Condvar::new()));
|
||||
let pair2 = pair.clone();
|
||||
|
||||
spawn(async move {
|
||||
let (m, c) = &*pair2;
|
||||
let _g = m.lock().await;
|
||||
task::sleep(Duration::from_millis(200)).await;
|
||||
c.notify_one();
|
||||
});
|
||||
|
||||
let (m, c) = &*pair;
|
||||
let (_, wait_result) = c
|
||||
.wait_timeout(m.lock().await, Duration::from_millis(50))
|
||||
.await;
|
||||
assert!(wait_result.timed_out());
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||
fn wait_timeout_without_lock() {
|
||||
task::block_on(async {
|
||||
let m = Mutex::new(false);
|
||||
let c = Condvar::new();
|
||||
|
||||
let (_, wait_result) = c
|
||||
.wait_timeout(m.lock().await, Duration::from_millis(10))
|
||||
.await;
|
||||
assert!(wait_result.timed_out());
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||
fn wait_timeout_until_timed_out() {
|
||||
task::block_on(async {
|
||||
let m = Mutex::new(false);
|
||||
let c = Condvar::new();
|
||||
|
||||
let (_, wait_result) = c
|
||||
.wait_timeout_until(m.lock().await, Duration::from_millis(100), |&mut started| {
|
||||
started
|
||||
})
|
||||
.await;
|
||||
assert!(wait_result.timed_out());
|
||||
})
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||
fn notify_all() {
|
||||
task::block_on(async {
|
||||
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
|
||||
let pair = Arc::new((Mutex::new(0u32), Condvar::new()));
|
||||
|
||||
for _ in 0..10 {
|
||||
let pair = pair.clone();
|
||||
tasks.push(spawn(async move {
|
||||
let (m, c) = &*pair;
|
||||
let mut count = m.lock().await;
|
||||
while *count == 0 {
|
||||
count = c.wait(count).await;
|
||||
}
|
||||
*count += 1;
|
||||
}));
|
||||
}
|
||||
|
||||
// Give some time for tasks to start up
|
||||
task::sleep(Duration::from_millis(50)).await;
|
||||
|
||||
let (m, c) = &*pair;
|
||||
{
|
||||
let mut count = m.lock().await;
|
||||
*count += 1;
|
||||
c.notify_all();
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await;
|
||||
}
|
||||
let count = m.lock().await;
|
||||
assert_eq!(11, *count);
|
||||
})
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use async_std::future::timeout;
|
||||
use async_std::task;
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
|
||||
|
||||
#[test]
|
||||
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
|
||||
fn timeout_future_many() {
|
||||
task::block_on(async {
|
||||
let futures = (0..10)
|
||||
.map(|i| {
|
||||
timeout(Duration::from_millis(i * 50), async move {
|
||||
task::sleep(Duration::from_millis(i)).await;
|
||||
Ok::<(), async_std::future::TimeoutError>(())
|
||||
})
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
for future in futures {
|
||||
future.await.unwrap().unwrap();
|
||||
}
|
||||
});
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue