forked from mirror/async-std
Compare commits
6 Commits
master
...
poc-serde-
Author | SHA1 | Date |
---|---|---|
James Munns | c70f00e915 | 5 years ago |
James Munns | 3c92ffeb5c | 5 years ago |
James Munns | 6b0a81012b | 5 years ago |
James Munns | c5a757fb60 | 5 years ago |
James Munns | bcc134cd02 | 5 years ago |
James Munns | 4e88f104c5 | 5 years ago |
@ -1,3 +0,0 @@
|
|||||||
Our contribution policy can be found at [async.rs/contribute][policy].
|
|
||||||
|
|
||||||
[policy]: https://async.rs/contribute/
|
|
@ -1,266 +0,0 @@
|
|||||||
# Production-Ready Accept Loop
|
|
||||||
|
|
||||||
A production-ready accept loop needs the following things:
|
|
||||||
1. Handling errors
|
|
||||||
2. Limiting the number of simultanteous connections to avoid deny-of-service
|
|
||||||
(DoS) attacks
|
|
||||||
|
|
||||||
|
|
||||||
## Handling errors
|
|
||||||
|
|
||||||
There are two kinds of errors in an accept loop:
|
|
||||||
1. Per-connection errors. The system uses them to notify that there was a
|
|
||||||
connection in the queue and it's dropped by the peer. Subsequent connections
|
|
||||||
can be already queued so next connection must be accepted immediately.
|
|
||||||
2. Resource shortages. When these are encountered it doesn't make sense to
|
|
||||||
accept the next socket immediately. But the listener stays active, so you server
|
|
||||||
should try to accept socket later.
|
|
||||||
|
|
||||||
Here is the example of a per-connection error (printed in normal and debug mode):
|
|
||||||
```
|
|
||||||
Error: Connection reset by peer (os error 104)
|
|
||||||
Error: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }
|
|
||||||
```
|
|
||||||
|
|
||||||
And the following is the most common example of a resource shortage error:
|
|
||||||
```
|
|
||||||
Error: Too many open files (os error 24)
|
|
||||||
Error: Os { code: 24, kind: Other, message: "Too many open files" }
|
|
||||||
```
|
|
||||||
|
|
||||||
### Testing Application
|
|
||||||
|
|
||||||
To test your application for these errors try the following (this works
|
|
||||||
on unixes only).
|
|
||||||
|
|
||||||
Lower limits and start the application:
|
|
||||||
```
|
|
||||||
$ ulimit -n 100
|
|
||||||
$ cargo run --example your_app
|
|
||||||
Compiling your_app v0.1.0 (/work)
|
|
||||||
Finished dev [unoptimized + debuginfo] target(s) in 5.47s
|
|
||||||
Running `target/debug/examples/your_app`
|
|
||||||
Server is listening on: http://127.0.0.1:1234
|
|
||||||
```
|
|
||||||
Then in another console run the [`wrk`] benchmark tool:
|
|
||||||
```
|
|
||||||
$ wrk -c 1000 http://127.0.0.1:1234
|
|
||||||
Running 10s test @ http://localhost:8080/
|
|
||||||
2 threads and 1000 connections
|
|
||||||
$ telnet localhost 1234
|
|
||||||
Trying ::1...
|
|
||||||
Connected to localhost.
|
|
||||||
```
|
|
||||||
|
|
||||||
Important is to check the following things:
|
|
||||||
|
|
||||||
1. The application doesn't crash on error (but may log errors, see below)
|
|
||||||
2. It's possible to connect to the application again once load is stopped
|
|
||||||
(few seconds after `wrk`). This is what `telnet` does in example above,
|
|
||||||
make sure it prints `Connected to <hostname>`.
|
|
||||||
3. The `Too many open files` error is logged in the appropriate log. This
|
|
||||||
requires to set "maximum number of simultaneous connections" parameter (see
|
|
||||||
below) of your application to a value greater then `100` for this example.
|
|
||||||
4. Check CPU usage of the app while doing a test. It should not occupy 100%
|
|
||||||
of a single CPU core (it's unlikely that you can exhaust CPU by 1000
|
|
||||||
connections in Rust, so this means error handling is not right).
|
|
||||||
|
|
||||||
#### Testing non-HTTP applications
|
|
||||||
|
|
||||||
If it's possible, use the appropriate benchmark tool and set the appropriate
|
|
||||||
number of connections. For example `redis-benchmark` has a `-c` parameter for
|
|
||||||
that, if you implement redis protocol.
|
|
||||||
|
|
||||||
Alternatively, can still use `wrk`, just make sure that connection is not
|
|
||||||
immediately closed. If it is, put a temporary timeout before handing
|
|
||||||
the connection to the protocol handler, like this:
|
|
||||||
|
|
||||||
```rust,edition2018
|
|
||||||
# extern crate async_std;
|
|
||||||
# use std::time::Duration;
|
|
||||||
# use async_std::{
|
|
||||||
# net::{TcpListener, ToSocketAddrs},
|
|
||||||
# prelude::*,
|
|
||||||
# };
|
|
||||||
#
|
|
||||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
|
||||||
#
|
|
||||||
#async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
|
||||||
# let listener = TcpListener::bind(addr).await?;
|
|
||||||
# let mut incoming = listener.incoming();
|
|
||||||
while let Some(stream) = incoming.next().await {
|
|
||||||
task::spawn(async {
|
|
||||||
task::sleep(Duration::from_secs(10)).await; // 1
|
|
||||||
connection_loop(stream).await;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
# Ok(())
|
|
||||||
# }
|
|
||||||
```
|
|
||||||
|
|
||||||
1. Make sure the sleep coroutine is inside the spawned task, not in the loop.
|
|
||||||
|
|
||||||
[`wrk`]: https://github.com/wg/wrk
|
|
||||||
|
|
||||||
|
|
||||||
### Handling Errors Manually
|
|
||||||
|
|
||||||
Here is how basic accept loop could look like:
|
|
||||||
|
|
||||||
```rust,edition2018
|
|
||||||
# extern crate async_std;
|
|
||||||
# use std::time::Duration;
|
|
||||||
# use async_std::{
|
|
||||||
# net::{TcpListener, ToSocketAddrs},
|
|
||||||
# prelude::*,
|
|
||||||
# };
|
|
||||||
#
|
|
||||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
|
||||||
#
|
|
||||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
|
||||||
let listener = TcpListener::bind(addr).await?;
|
|
||||||
let mut incoming = listener.incoming();
|
|
||||||
while let Some(result) = incoming.next().await {
|
|
||||||
let stream = match stream {
|
|
||||||
Err(ref e) if is_connection_error(e) => continue, // 1
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Error: {}. Pausing for 500ms."); // 3
|
|
||||||
task::sleep(Duration::from_millis(500)).await; // 2
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Ok(s) => s,
|
|
||||||
};
|
|
||||||
// body
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
1. Ignore per-connection errors.
|
|
||||||
2. Sleep and continue on resource shortage.
|
|
||||||
3. It's important to log the message, because these errors commonly mean the
|
|
||||||
misconfiguration of the system and are helpful for operations people running
|
|
||||||
the application.
|
|
||||||
|
|
||||||
Be sure to [test your application](#testing-application).
|
|
||||||
|
|
||||||
|
|
||||||
### External Crates
|
|
||||||
|
|
||||||
The crate [`async-listen`] has a helper to achieve this task:
|
|
||||||
```rust,edition2018
|
|
||||||
# extern crate async_std;
|
|
||||||
# extern crate async_listen;
|
|
||||||
# use std::time::Duration;
|
|
||||||
# use async_std::{
|
|
||||||
# net::{TcpListener, ToSocketAddrs},
|
|
||||||
# prelude::*,
|
|
||||||
# };
|
|
||||||
#
|
|
||||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
|
||||||
#
|
|
||||||
use async_listen::{ListenExt, error_hint};
|
|
||||||
|
|
||||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
|
||||||
|
|
||||||
let listener = TcpListener::bind(addr).await?;
|
|
||||||
let mut incoming = listener
|
|
||||||
.incoming()
|
|
||||||
.log_warnings(log_accept_error) // 1
|
|
||||||
.handle_errors(Duration::from_millis(500));
|
|
||||||
while let Some(socket) = incoming.next().await { // 2
|
|
||||||
// body
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn log_accept_error(e: &io::Error) {
|
|
||||||
eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)) // 3
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
1. Logs resource shortages (`async-listen` calls them warnings). If you use
|
|
||||||
`log` crate or any other in your app this should go to the log.
|
|
||||||
2. Stream yields sockets without `Result` wrapper after `handle_errors` because
|
|
||||||
all errors are already handled.
|
|
||||||
3. Together with the error we print a hint, which explains some errors for end
|
|
||||||
users. For example, it recommends increasing open file limit and gives
|
|
||||||
a link.
|
|
||||||
|
|
||||||
[`async-listen`]: https://crates.io/crates/async-listen/
|
|
||||||
|
|
||||||
Be sure to [test your application](#testing-application).
|
|
||||||
|
|
||||||
|
|
||||||
## Connections Limit
|
|
||||||
|
|
||||||
Even if you've applied everything described in
|
|
||||||
[Handling Errors](#handling-errors) section, there is still a problem.
|
|
||||||
|
|
||||||
Let's imagine you have a server that needs to open a file to process
|
|
||||||
client request. At some point, you might encounter the following situation:
|
|
||||||
|
|
||||||
1. There are as many client connection as max file descriptors allowed for
|
|
||||||
the application.
|
|
||||||
2. Listener gets `Too many open files` error so it sleeps.
|
|
||||||
3. Some client sends a request via the previously open connection.
|
|
||||||
4. Opening a file to serve request fails, because of the same
|
|
||||||
`Too many open files` error, until some other client drops a connection.
|
|
||||||
|
|
||||||
There are many more possible situations, this is just a small illustation that
|
|
||||||
limiting number of connections is very useful. Generally, it's one of the ways
|
|
||||||
to control resources used by a server and avoiding some kinds of deny of
|
|
||||||
service (DoS) attacks.
|
|
||||||
|
|
||||||
### `async-listen` crate
|
|
||||||
|
|
||||||
Limiting maximum number of simultaneous connections with [`async-listen`]
|
|
||||||
looks like the following:
|
|
||||||
|
|
||||||
```rust,edition2018
|
|
||||||
# extern crate async_std;
|
|
||||||
# extern crate async_listen;
|
|
||||||
# use std::time::Duration;
|
|
||||||
# use async_std::{
|
|
||||||
# net::{TcpListener, TcpStream, ToSocketAddrs},
|
|
||||||
# prelude::*,
|
|
||||||
# };
|
|
||||||
#
|
|
||||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
|
||||||
#
|
|
||||||
use async_listen::{ListenExt, Token, error_hint};
|
|
||||||
|
|
||||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
|
||||||
|
|
||||||
let listener = TcpListener::bind(addr).await?;
|
|
||||||
let mut incoming = listener
|
|
||||||
.incoming()
|
|
||||||
.log_warnings(log_accept_error)
|
|
||||||
.handle_errors(Duration::from_millis(500)) // 1
|
|
||||||
.backpressure(100);
|
|
||||||
while let Some((token, socket)) = incoming.next().await { // 2
|
|
||||||
task::spawn(async move {
|
|
||||||
connection_loop(&token, stream).await; // 3
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
async fn connection_loop(_token: &Token, stream: TcpStream) { // 4
|
|
||||||
// ...
|
|
||||||
}
|
|
||||||
# fn log_accept_error(e: &io::Error) {
|
|
||||||
# eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e));
|
|
||||||
# }
|
|
||||||
```
|
|
||||||
|
|
||||||
1. We need to handle errors first, because [`backpressure`] helper expects
|
|
||||||
stream of `TcpStream` rather than `Result`.
|
|
||||||
2. The token yielded by a new stream is what is counted by backpressure helper.
|
|
||||||
I.e. if you drop a token, new connection can be established.
|
|
||||||
3. We give the connection loop a reference to token to bind token's lifetime to
|
|
||||||
the lifetime of the connection.
|
|
||||||
4. The token itsellf in the function can be ignored, hence `_token`
|
|
||||||
|
|
||||||
[`backpressure`]: https://docs.rs/async-listen/0.1.2/async_listen/trait.ListenExt.html#method.backpressure
|
|
||||||
|
|
||||||
Be sure to [test this behavior](#testing-application).
|
|
@ -1,14 +1,11 @@
|
|||||||
# Tutorial: Writing a chat
|
# Tutorial: Writing a chat
|
||||||
|
|
||||||
Nothing is simpler than creating a chat server, right?
|
Nothing is as simple as a chat server, right? Not quite, chat servers
|
||||||
Not quite, chat servers expose you to all the fun of asynchronous programming:
|
already expose you to all the fun of asynchronous programming: how
|
||||||
|
do you handle clients connecting concurrently. How do you handle them disconnecting?
|
||||||
|
|
||||||
How will the server handle clients connecting concurrently?
|
How do you distribute the messages?
|
||||||
|
|
||||||
How will it handle them disconnecting?
|
In this tutorial, we will show you how to write one in `async-std`.
|
||||||
|
|
||||||
How will it distribute the messages?
|
|
||||||
|
|
||||||
This tutorial explains how to write a chat server in `async-std`.
|
|
||||||
|
|
||||||
You can also find the tutorial in [our repository](https://github.com/async-rs/async-std/blob/master/examples/a-chat).
|
You can also find the tutorial in [our repository](https://github.com/async-rs/async-std/blob/master/examples/a-chat).
|
||||||
|
@ -1,79 +0,0 @@
|
|||||||
//! A type that wraps a future to keep track of its completion status.
|
|
||||||
//!
|
|
||||||
//! This implementation was taken from the original `macro_rules` `join/try_join`
|
|
||||||
//! macros in the `futures-preview` crate.
|
|
||||||
|
|
||||||
use std::future::Future;
|
|
||||||
use std::mem;
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::task::{Context, Poll};
|
|
||||||
|
|
||||||
use futures_core::ready;
|
|
||||||
|
|
||||||
/// A future that may have completed.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub(crate) enum MaybeDone<Fut: Future> {
|
|
||||||
/// A not-yet-completed future
|
|
||||||
Future(Fut),
|
|
||||||
|
|
||||||
/// The output of the completed future
|
|
||||||
Done(Fut::Output),
|
|
||||||
|
|
||||||
/// The empty variant after the result of a [`MaybeDone`] has been
|
|
||||||
/// taken using the [`take`](MaybeDone::take) method.
|
|
||||||
Gone,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Fut: Future> MaybeDone<Fut> {
|
|
||||||
/// Create a new instance of `MaybeDone`.
|
|
||||||
pub(crate) fn new(future: Fut) -> MaybeDone<Fut> {
|
|
||||||
Self::Future(future)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns an [`Option`] containing a reference to the output of the future.
|
|
||||||
/// The output of this method will be [`Some`] if and only if the inner
|
|
||||||
/// future has been completed and [`take`](MaybeDone::take)
|
|
||||||
/// has not yet been called.
|
|
||||||
#[inline]
|
|
||||||
pub(crate) fn output(self: Pin<&Self>) -> Option<&Fut::Output> {
|
|
||||||
let this = self.get_ref();
|
|
||||||
match this {
|
|
||||||
MaybeDone::Done(res) => Some(res),
|
|
||||||
_ => None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Attempt to take the output of a `MaybeDone` without driving it
|
|
||||||
/// towards completion.
|
|
||||||
#[inline]
|
|
||||||
pub(crate) fn take(self: Pin<&mut Self>) -> Option<Fut::Output> {
|
|
||||||
unsafe {
|
|
||||||
let this = self.get_unchecked_mut();
|
|
||||||
match this {
|
|
||||||
MaybeDone::Done(_) => {}
|
|
||||||
MaybeDone::Future(_) | MaybeDone::Gone => return None,
|
|
||||||
};
|
|
||||||
if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
|
|
||||||
Some(output)
|
|
||||||
} else {
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Fut: Future> Future for MaybeDone<Fut> {
|
|
||||||
type Output = ();
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let res = unsafe {
|
|
||||||
match Pin::as_mut(&mut self).get_unchecked_mut() {
|
|
||||||
MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)),
|
|
||||||
MaybeDone::Done(_) => return Poll::Ready(()),
|
|
||||||
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
self.set(MaybeDone::Done(res));
|
|
||||||
Poll::Ready(())
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,42 +0,0 @@
|
|||||||
use crate::utils::Context;
|
|
||||||
|
|
||||||
use std::{error::Error as StdError, fmt, io};
|
|
||||||
|
|
||||||
/// Wrap `std::io::Error` with additional message
|
|
||||||
///
|
|
||||||
/// Keeps the original error kind and stores the original I/O error as `source`.
|
|
||||||
impl<T> Context for Result<T, std::io::Error> {
|
|
||||||
fn context(self, message: impl Fn() -> String) -> Self {
|
|
||||||
self.map_err(|e| VerboseError::wrap(e, message()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub(crate) struct VerboseError {
|
|
||||||
source: io::Error,
|
|
||||||
message: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl VerboseError {
|
|
||||||
pub(crate) fn wrap(source: io::Error, message: impl Into<String>) -> io::Error {
|
|
||||||
io::Error::new(
|
|
||||||
source.kind(),
|
|
||||||
VerboseError {
|
|
||||||
source,
|
|
||||||
message: message.into(),
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for VerboseError {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
write!(f, "{}", self.message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl StdError for VerboseError {
|
|
||||||
fn source(&self) -> Option<&(dyn StdError + 'static)> {
|
|
||||||
Some(&self.source)
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,315 @@
|
|||||||
|
use std::fmt;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
|
use mio::{self, Evented};
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use slab::Slab;
|
||||||
|
|
||||||
|
use crate::io;
|
||||||
|
use crate::task::{Context, Poll, Waker};
|
||||||
|
use crate::utils::abort_on_panic;
|
||||||
|
|
||||||
|
/// Data associated with a registered I/O handle.
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct Entry {
|
||||||
|
/// A unique identifier.
|
||||||
|
token: mio::Token,
|
||||||
|
|
||||||
|
/// Tasks that are blocked on reading from this I/O handle.
|
||||||
|
readers: Mutex<Vec<Waker>>,
|
||||||
|
|
||||||
|
/// Thasks that are blocked on writing to this I/O handle.
|
||||||
|
writers: Mutex<Vec<Waker>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The state of a networking driver.
|
||||||
|
struct Reactor {
|
||||||
|
/// A mio instance that polls for new events.
|
||||||
|
poller: mio::Poll,
|
||||||
|
|
||||||
|
/// A collection of registered I/O handles.
|
||||||
|
entries: Mutex<Slab<Arc<Entry>>>,
|
||||||
|
|
||||||
|
/// Dummy I/O handle that is only used to wake up the polling thread.
|
||||||
|
notify_reg: (mio::Registration, mio::SetReadiness),
|
||||||
|
|
||||||
|
/// An identifier for the notification handle.
|
||||||
|
notify_token: mio::Token,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Reactor {
|
||||||
|
/// Creates a new reactor for polling I/O events.
|
||||||
|
fn new() -> io::Result<Reactor> {
|
||||||
|
let poller = mio::Poll::new()?;
|
||||||
|
let notify_reg = mio::Registration::new2();
|
||||||
|
|
||||||
|
let mut reactor = Reactor {
|
||||||
|
poller,
|
||||||
|
entries: Mutex::new(Slab::new()),
|
||||||
|
notify_reg,
|
||||||
|
notify_token: mio::Token(0),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Register a dummy I/O handle for waking up the polling thread.
|
||||||
|
let entry = reactor.register(&reactor.notify_reg.0)?;
|
||||||
|
reactor.notify_token = entry.token;
|
||||||
|
|
||||||
|
Ok(reactor)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Registers an I/O event source and returns its associated entry.
|
||||||
|
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
|
||||||
|
let mut entries = self.entries.lock().unwrap();
|
||||||
|
|
||||||
|
// Reserve a vacant spot in the slab and use its key as the token value.
|
||||||
|
let vacant = entries.vacant_entry();
|
||||||
|
let token = mio::Token(vacant.key());
|
||||||
|
|
||||||
|
// Allocate an entry and insert it into the slab.
|
||||||
|
let entry = Arc::new(Entry {
|
||||||
|
token,
|
||||||
|
readers: Mutex::new(Vec::new()),
|
||||||
|
writers: Mutex::new(Vec::new()),
|
||||||
|
});
|
||||||
|
vacant.insert(entry.clone());
|
||||||
|
|
||||||
|
// Register the I/O event source in the poller.
|
||||||
|
let interest = mio::Ready::all();
|
||||||
|
let opts = mio::PollOpt::edge();
|
||||||
|
self.poller.register(source, token, interest, opts)?;
|
||||||
|
|
||||||
|
Ok(entry)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deregisters an I/O event source associated with an entry.
|
||||||
|
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
|
||||||
|
// Deregister the I/O object from the mio instance.
|
||||||
|
self.poller.deregister(source)?;
|
||||||
|
|
||||||
|
// Remove the entry associated with the I/O object.
|
||||||
|
self.entries.lock().unwrap().remove(entry.token.0);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// fn notify(&self) {
|
||||||
|
// self.notify_reg
|
||||||
|
// .1
|
||||||
|
// .set_readiness(mio::Ready::readable())
|
||||||
|
// .unwrap();
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The state of the global networking driver.
|
||||||
|
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
|
||||||
|
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
|
||||||
|
// handles.
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name("async-std/net".to_string())
|
||||||
|
.spawn(move || {
|
||||||
|
// If the driver thread panics, there's not much we can do. It is not a
|
||||||
|
// recoverable error and there is no place to propagate it into so we just abort.
|
||||||
|
abort_on_panic(|| {
|
||||||
|
main_loop().expect("async networking thread has panicked");
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.expect("cannot start a thread driving blocking tasks");
|
||||||
|
|
||||||
|
Reactor::new().expect("cannot initialize reactor")
|
||||||
|
});
|
||||||
|
|
||||||
|
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
|
||||||
|
fn main_loop() -> io::Result<()> {
|
||||||
|
let reactor = &REACTOR;
|
||||||
|
let mut events = mio::Events::with_capacity(1000);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Block on the poller until at least one new event comes in.
|
||||||
|
reactor.poller.poll(&mut events, None)?;
|
||||||
|
|
||||||
|
// Lock the entire entry table while we're processing new events.
|
||||||
|
let entries = reactor.entries.lock().unwrap();
|
||||||
|
|
||||||
|
for event in events.iter() {
|
||||||
|
let token = event.token();
|
||||||
|
|
||||||
|
if token == reactor.notify_token {
|
||||||
|
// If this is the notification token, we just need the notification state.
|
||||||
|
reactor.notify_reg.1.set_readiness(mio::Ready::empty())?;
|
||||||
|
} else {
|
||||||
|
// Otherwise, look for the entry associated with this token.
|
||||||
|
if let Some(entry) = entries.get(token.0) {
|
||||||
|
// Set the readiness flags from this I/O event.
|
||||||
|
let readiness = event.readiness();
|
||||||
|
|
||||||
|
// Wake up reader tasks blocked on this I/O handle.
|
||||||
|
if !(readiness & reader_interests()).is_empty() {
|
||||||
|
for w in entry.readers.lock().unwrap().drain(..) {
|
||||||
|
w.wake();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wake up writer tasks blocked on this I/O handle.
|
||||||
|
if !(readiness & writer_interests()).is_empty() {
|
||||||
|
for w in entry.writers.lock().unwrap().drain(..) {
|
||||||
|
w.wake();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// An I/O handle powered by the networking driver.
|
||||||
|
///
|
||||||
|
/// This handle wraps an I/O event source and exposes a "futurized" interface on top of it,
|
||||||
|
/// implementing traits `AsyncRead` and `AsyncWrite`.
|
||||||
|
pub struct Watcher<T: Evented> {
|
||||||
|
/// Data associated with the I/O handle.
|
||||||
|
entry: Arc<Entry>,
|
||||||
|
|
||||||
|
/// The I/O event source.
|
||||||
|
source: Option<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Evented> Watcher<T> {
|
||||||
|
/// Creates a new I/O handle.
|
||||||
|
///
|
||||||
|
/// The provided I/O event source will be kept registered inside the reactor's poller for the
|
||||||
|
/// lifetime of the returned I/O handle.
|
||||||
|
pub fn new(source: T) -> Watcher<T> {
|
||||||
|
Watcher {
|
||||||
|
entry: REACTOR
|
||||||
|
.register(&source)
|
||||||
|
.expect("cannot register an I/O event source"),
|
||||||
|
source: Some(source),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a reference to the inner I/O event source.
|
||||||
|
pub fn get_ref(&self) -> &T {
|
||||||
|
self.source.as_ref().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Polls the inner I/O source for a non-blocking read operation.
|
||||||
|
///
|
||||||
|
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
|
||||||
|
/// will be registered for wakeup when the I/O source becomes readable.
|
||||||
|
pub fn poll_read_with<'a, F, R>(&'a self, cx: &mut Context<'_>, mut f: F) -> Poll<io::Result<R>>
|
||||||
|
where
|
||||||
|
F: FnMut(&'a T) -> io::Result<R>,
|
||||||
|
{
|
||||||
|
// If the operation isn't blocked, return its result.
|
||||||
|
match f(self.source.as_ref().unwrap()) {
|
||||||
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||||
|
res => return Poll::Ready(res),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock the waker list.
|
||||||
|
let mut list = self.entry.readers.lock().unwrap();
|
||||||
|
|
||||||
|
// Try running the operation again.
|
||||||
|
match f(self.source.as_ref().unwrap()) {
|
||||||
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||||
|
res => return Poll::Ready(res),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the task if it isn't registered already.
|
||||||
|
if list.iter().all(|w| !w.will_wake(cx.waker())) {
|
||||||
|
list.push(cx.waker().clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Polls the inner I/O source for a non-blocking write operation.
|
||||||
|
///
|
||||||
|
/// If the operation returns an error of the `io::ErrorKind::WouldBlock` kind, the current task
|
||||||
|
/// will be registered for wakeup when the I/O source becomes writable.
|
||||||
|
pub fn poll_write_with<'a, F, R>(
|
||||||
|
&'a self,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
mut f: F,
|
||||||
|
) -> Poll<io::Result<R>>
|
||||||
|
where
|
||||||
|
F: FnMut(&'a T) -> io::Result<R>,
|
||||||
|
{
|
||||||
|
// If the operation isn't blocked, return its result.
|
||||||
|
match f(self.source.as_ref().unwrap()) {
|
||||||
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||||
|
res => return Poll::Ready(res),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lock the waker list.
|
||||||
|
let mut list = self.entry.writers.lock().unwrap();
|
||||||
|
|
||||||
|
// Try running the operation again.
|
||||||
|
match f(self.source.as_ref().unwrap()) {
|
||||||
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
||||||
|
res => return Poll::Ready(res),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the task if it isn't registered already.
|
||||||
|
if list.iter().all(|w| !w.will_wake(cx.waker())) {
|
||||||
|
list.push(cx.waker().clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deregisters and returns the inner I/O source.
|
||||||
|
///
|
||||||
|
/// This method is typically used to convert `Watcher`s to raw file descriptors/handles.
|
||||||
|
#[allow(dead_code)]
|
||||||
|
pub fn into_inner(mut self) -> T {
|
||||||
|
let source = self.source.take().unwrap();
|
||||||
|
REACTOR
|
||||||
|
.deregister(&source, &self.entry)
|
||||||
|
.expect("cannot deregister I/O event source");
|
||||||
|
source
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Evented> Drop for Watcher<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if let Some(ref source) = self.source {
|
||||||
|
REACTOR
|
||||||
|
.deregister(source, &self.entry)
|
||||||
|
.expect("cannot deregister I/O event source");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("Watcher")
|
||||||
|
.field("entry", &self.entry)
|
||||||
|
.field("source", &self.source)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a mask containing flags that interest tasks reading from I/O handles.
|
||||||
|
#[inline]
|
||||||
|
fn reader_interests() -> mio::Ready {
|
||||||
|
mio::Ready::all() - mio::Ready::writable()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a mask containing flags that interest tasks writing into I/O handles.
|
||||||
|
#[inline]
|
||||||
|
fn writer_interests() -> mio::Ready {
|
||||||
|
mio::Ready::writable() | hup()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a flag containing the hangup status.
|
||||||
|
#[inline]
|
||||||
|
fn hup() -> mio::Ready {
|
||||||
|
#[cfg(unix)]
|
||||||
|
let ready = mio::unix::UnixReady::hup().into();
|
||||||
|
|
||||||
|
#[cfg(not(unix))]
|
||||||
|
let ready = mio::Ready::empty();
|
||||||
|
|
||||||
|
ready
|
||||||
|
}
|
@ -1,55 +0,0 @@
|
|||||||
//! Windows-specific filesystem extensions.
|
|
||||||
|
|
||||||
use crate::io;
|
|
||||||
use crate::path::Path;
|
|
||||||
use crate::task::spawn_blocking;
|
|
||||||
|
|
||||||
/// Creates a new directory symbolic link on the filesystem.
|
|
||||||
///
|
|
||||||
/// The `dst` path will be a directory symbolic link pointing to the `src` path.
|
|
||||||
///
|
|
||||||
/// This function is an async version of [`std::os::windows::fs::symlink_dir`].
|
|
||||||
///
|
|
||||||
/// [`std::os::windows::fs::symlink_dir`]: https://doc.rust-lang.org/std/os/windows/fs/fn.symlink_dir.html
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```no_run
|
|
||||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
|
||||||
/// #
|
|
||||||
/// use async_std::os::windows::fs::symlink_dir;
|
|
||||||
///
|
|
||||||
/// symlink_dir("a", "b").await?;
|
|
||||||
/// #
|
|
||||||
/// # Ok(()) }) }
|
|
||||||
/// ```
|
|
||||||
pub async fn symlink_dir<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
|
|
||||||
let src = src.as_ref().to_owned();
|
|
||||||
let dst = dst.as_ref().to_owned();
|
|
||||||
spawn_blocking(move || std::os::windows::fs::symlink_dir(&src, &dst)).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new file symbolic link on the filesystem.
|
|
||||||
///
|
|
||||||
/// The `dst` path will be a file symbolic link pointing to the `src` path.
|
|
||||||
///
|
|
||||||
/// This function is an async version of [`std::os::windows::fs::symlink_file`].
|
|
||||||
///
|
|
||||||
/// [`std::os::windows::fs::symlink_file`]: https://doc.rust-lang.org/std/os/windows/fs/fn.symlink_file.html
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```no_run
|
|
||||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
|
||||||
/// #
|
|
||||||
/// use async_std::os::windows::fs::symlink_file;
|
|
||||||
///
|
|
||||||
/// symlink_file("a.txt", "b.txt").await?;
|
|
||||||
/// #
|
|
||||||
/// # Ok(()) }) }
|
|
||||||
/// ```
|
|
||||||
pub async fn symlink_file<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
|
|
||||||
let src = src.as_ref().to_owned();
|
|
||||||
let dst = dst.as_ref().to_owned();
|
|
||||||
spawn_blocking(move || std::os::windows::fs::symlink_file(&src, &dst)).await
|
|
||||||
}
|
|
@ -1,34 +0,0 @@
|
|||||||
//! The runtime.
|
|
||||||
|
|
||||||
use std::env;
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
use once_cell::sync::Lazy;
|
|
||||||
|
|
||||||
use crate::future;
|
|
||||||
|
|
||||||
/// Dummy runtime struct.
|
|
||||||
pub struct Runtime {}
|
|
||||||
|
|
||||||
/// The global runtime.
|
|
||||||
pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
|
|
||||||
// Create an executor thread pool.
|
|
||||||
|
|
||||||
let thread_count = env::var("ASYNC_STD_THREAD_COUNT")
|
|
||||||
.map(|env| {
|
|
||||||
env.parse()
|
|
||||||
.expect("ASYNC_STD_THREAD_COUNT must be a number")
|
|
||||||
})
|
|
||||||
.unwrap_or_else(|_| num_cpus::get())
|
|
||||||
.max(1);
|
|
||||||
|
|
||||||
let thread_name = env::var("ASYNC_STD_THREAD_NAME").unwrap_or("async-std/runtime".to_string());
|
|
||||||
|
|
||||||
for _ in 0..thread_count {
|
|
||||||
thread::Builder::new()
|
|
||||||
.name(thread_name.clone())
|
|
||||||
.spawn(|| crate::task::block_on(future::pending::<()>()))
|
|
||||||
.expect("cannot start a runtime thread");
|
|
||||||
}
|
|
||||||
Runtime {}
|
|
||||||
});
|
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue