forked from mirror/async-std
Compare commits
1 Commits
master
...
staging.tm
Author | SHA1 | Date |
---|---|---|
bors[bot] | 20dc66be38 | 5 years ago |
@ -1,3 +0,0 @@
|
||||
Our contribution policy can be found at [async.rs/contribute][policy].
|
||||
|
||||
[policy]: https://async.rs/contribute/
|
@ -0,0 +1,20 @@
|
||||
on: pull_request
|
||||
|
||||
name: Clippy check
|
||||
jobs:
|
||||
clippy_check:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- id: component
|
||||
uses: actions-rs/components-nightly@v1
|
||||
with:
|
||||
component: clippy
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ steps.component.outputs.toolchain }}
|
||||
override: true
|
||||
- run: rustup component add clippy
|
||||
- uses: actions-rs/clippy-check@v1
|
||||
with:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
@ -0,0 +1,68 @@
|
||||
language: rust
|
||||
|
||||
env:
|
||||
- RUSTFLAGS="-D warnings"
|
||||
|
||||
# Cache the whole `~/.cargo` directory to keep `~/cargo/.crates.toml`.
|
||||
cache:
|
||||
directories:
|
||||
- /home/travis/.cargo
|
||||
|
||||
# Don't cache the cargo registry because it's too big.
|
||||
before_cache:
|
||||
- rm -rf /home/travis/.cargo/registry
|
||||
|
||||
|
||||
branches:
|
||||
only:
|
||||
- master
|
||||
- staging
|
||||
- trying
|
||||
|
||||
matrix:
|
||||
fast_finish: true
|
||||
include:
|
||||
- rust: nightly
|
||||
os: linux
|
||||
|
||||
- rust: nightly
|
||||
os: osx
|
||||
osx_image: xcode9.2
|
||||
|
||||
- rust: nightly-x86_64-pc-windows-msvc
|
||||
os: windows
|
||||
|
||||
- name: fmt
|
||||
rust: nightly
|
||||
os: linux
|
||||
before_script: |
|
||||
if ! rustup component add rustfmt; then
|
||||
target=`curl https://rust-lang.github.io/rustup-components-history/x86_64-unknown-linux-gnu/rustfmt`;
|
||||
echo "'rustfmt' is unavailable on the toolchain 'nightly', use the toolchain 'nightly-$target' instead";
|
||||
rustup toolchain install nightly-$target;
|
||||
rustup default nightly-$target;
|
||||
rustup component add rustfmt;
|
||||
fi
|
||||
script:
|
||||
- cargo fmt --all -- --check
|
||||
|
||||
- name: docs
|
||||
rust: nightly
|
||||
os: linux
|
||||
script:
|
||||
- cargo doc --features docs,unstable
|
||||
|
||||
- name: book
|
||||
rust: nightly
|
||||
os: linux
|
||||
before_script:
|
||||
- test -x $HOME/.cargo/bin/mdbook || ./ci/install-mdbook.sh
|
||||
- cargo build # to find 'extern crate async_std' by `mdbook test`
|
||||
script:
|
||||
- mdbook build docs
|
||||
- mdbook test -L ./target/debug/deps docs
|
||||
|
||||
script:
|
||||
- cargo check --all --benches --bins --examples --tests
|
||||
- cargo check --features unstable --all --benches --bins --examples --tests
|
||||
- cargo test --all --doc --features unstable
|
@ -1,159 +1,142 @@
|
||||
<h1 align="center">async-std</h1>
|
||||
<div align="center">
|
||||
<strong>
|
||||
Async version of the Rust standard library
|
||||
</strong>
|
||||
</div>
|
||||
|
||||
<br />
|
||||
|
||||
<div align="center">
|
||||
<!-- CI status -->
|
||||
<a href="https://github.com/async-rs/async-std/actions">
|
||||
<img src="https://github.com/async-rs/async-std/workflows/CI/badge.svg"
|
||||
alt="CI Status" />
|
||||
</a>
|
||||
<!-- Crates version -->
|
||||
<a href="https://crates.io/crates/async-std">
|
||||
<img src="https://img.shields.io/crates/v/async-std.svg?style=flat-square"
|
||||
alt="Crates.io version" />
|
||||
</a>
|
||||
<!-- Downloads -->
|
||||
<a href="https://crates.io/crates/async-std">
|
||||
<img src="https://img.shields.io/crates/d/async-std.svg?style=flat-square"
|
||||
alt="Download" />
|
||||
</a>
|
||||
<!-- docs.rs docs -->
|
||||
<a href="https://docs.rs/async-std">
|
||||
<img src="https://img.shields.io/badge/docs-latest-blue.svg?style=flat-square"
|
||||
alt="docs.rs docs" />
|
||||
</a>
|
||||
|
||||
<a href="https://discord.gg/JvZeVNe">
|
||||
<img src="https://img.shields.io/discord/598880689856970762.svg?logo=discord&style=flat-square"
|
||||
alt="chat" />
|
||||
</a>
|
||||
</div>
|
||||
|
||||
<div align="center">
|
||||
<h3>
|
||||
<a href="https://docs.rs/async-std">
|
||||
API Docs
|
||||
</a>
|
||||
<span> | </span>
|
||||
<a href="https://book.async.rs">
|
||||
Book
|
||||
</a>
|
||||
<span> | </span>
|
||||
<a href="https://github.com/async-rs/async-std/releases">
|
||||
Releases
|
||||
</a>
|
||||
<span> | </span>
|
||||
<a href="https://async.rs/contribute">
|
||||
Contributing
|
||||
</a>
|
||||
</h3>
|
||||
</div>
|
||||
|
||||
<br/>
|
||||
|
||||
This crate provides an async version of [`std`]. It provides all the interfaces
|
||||
you are used to, but in an async version and ready for Rust's `async`/`await`
|
||||
syntax.
|
||||
# Async version of the Rust standard library
|
||||
|
||||
[![Build Status](https://travis-ci.com/async-rs/async-std.svg?branch=master)](https://travis-ci.com/async-rs/async-std)
|
||||
[![License](https://img.shields.io/badge/license-MIT%2FApache--2.0-blue.svg)](https://github.com/async-rs/async-std)
|
||||
[![Cargo](https://img.shields.io/crates/v/async-std.svg)](https://crates.io/crates/async-std)
|
||||
[![Documentation](https://docs.rs/async-std/badge.svg)](https://docs.rs/async-std)
|
||||
[![chat](https://img.shields.io/discord/598880689856970762.svg?logo=discord)](https://discord.gg/JvZeVNe)
|
||||
|
||||
This crate provides an async version of [`std`]. It provides all the interfaces you
|
||||
are used to, but in an async version and ready for Rust's `async`/`await` syntax.
|
||||
|
||||
[`std`]: https://doc.rust-lang.org/std/index.html
|
||||
|
||||
## Features
|
||||
## Documentation
|
||||
|
||||
- __Modern:__ Built from the ground up for `std::future` and `async/await` with
|
||||
blazing fast compilation time.
|
||||
- __Fast:__ Our robust allocator and threadpool designs provide ultra-high
|
||||
throughput with predictably low latency.
|
||||
- __Intuitive:__ Complete parity with the stdlib means you only need to learn
|
||||
APIs once.
|
||||
- __Clear:__ [Detailed documentation][docs] and [accessible guides][book] mean
|
||||
using async Rust was never easier.
|
||||
`async-std` comes with [extensive API documentation][docs] and a [book][book].
|
||||
|
||||
[docs]: https://docs.rs/async-std
|
||||
[book]: https://book.async.rs
|
||||
|
||||
## Examples
|
||||
## Quickstart
|
||||
|
||||
Add the following lines to your `Cargo.toml`:
|
||||
|
||||
```toml
|
||||
[dependencies]
|
||||
async-std = "0.99"
|
||||
```
|
||||
|
||||
Or use [cargo add][cargo-add] if you have it installed:
|
||||
|
||||
```sh
|
||||
$ cargo add async-std
|
||||
```
|
||||
|
||||
[cargo-add]: https://github.com/killercup/cargo-edit
|
||||
|
||||
## Hello world
|
||||
|
||||
```rust
|
||||
use async_std::task;
|
||||
|
||||
async fn say_hello() {
|
||||
println!("Hello, world!");
|
||||
fn main() {
|
||||
task::block_on(async {
|
||||
println!("Hello, world!");
|
||||
})
|
||||
}
|
||||
```
|
||||
|
||||
## Low-Friction Sockets with Built-In Timeouts
|
||||
|
||||
```rust
|
||||
use std::time::Duration;
|
||||
|
||||
use async_std::{
|
||||
prelude::*,
|
||||
task,
|
||||
io,
|
||||
net::TcpStream,
|
||||
};
|
||||
|
||||
async fn get() -> io::Result<Vec<u8>> {
|
||||
let mut stream = TcpStream::connect("example.com:80").await?;
|
||||
stream.write_all(b"GET /index.html HTTP/1.0\r\n\r\n").await?;
|
||||
|
||||
let mut buf = vec![];
|
||||
|
||||
io::timeout(Duration::from_secs(5), async {
|
||||
stream.read_to_end(&mut buf).await?;
|
||||
Ok(buf)
|
||||
}).await
|
||||
}
|
||||
|
||||
fn main() {
|
||||
task::block_on(say_hello())
|
||||
task::block_on(async {
|
||||
let raw_response = get().await.expect("request");
|
||||
let response = String::from_utf8(raw_response)
|
||||
.expect("utf8 conversion");
|
||||
println!("received: {}", response);
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
More examples, including networking and file access, can be found in our
|
||||
[`examples`] directory and in our [documentation].
|
||||
## Features
|
||||
|
||||
[`examples`]: https://github.com/async-rs/async-std/tree/master/examples
|
||||
[documentation]: https://docs.rs/async-std#examples
|
||||
[`task::block_on`]: https://docs.rs/async-std/*/async_std/task/fn.block_on.html
|
||||
[`"attributes"` feature]: https://docs.rs/async-std/#features
|
||||
`async-std` is strongly commited to following semver. This means your code won't
|
||||
break unless _you_ decide to upgrade.
|
||||
|
||||
## Philosophy
|
||||
However every now and then we come up with something that we think will work
|
||||
_great_ for `async-std`, and we want to provide a sneak-peek so you can try it
|
||||
out. This is what we call _"unstable"_ features. You can try out the unstable
|
||||
features by enabling the `unstable` feature in your `Cargo.toml` file:
|
||||
|
||||
We believe Async Rust should be as easy to pick up as Sync Rust. We also believe
|
||||
that the best API is the one you already know. And finally, we believe that
|
||||
providing an asynchronous counterpart to the standard library is the best way
|
||||
stdlib provides a reliable basis for both performance and productivity.
|
||||
```toml
|
||||
[dependencies.async-std]
|
||||
version = "0.99"
|
||||
features = ["unstable"]
|
||||
```
|
||||
|
||||
Async-std is the embodiment of that vision. It combines single-allocation task
|
||||
creation, with an adaptive lock-free executor, threadpool and network driver to
|
||||
create a smooth system that processes work at a high pace with low latency,
|
||||
using Rust's familiar stdlib API.
|
||||
Just be careful when using these features, as they may change between
|
||||
versions.
|
||||
|
||||
## Installation
|
||||
## Take a look around
|
||||
|
||||
With [cargo add][cargo-add] installed run:
|
||||
Clone the repo:
|
||||
|
||||
```sh
|
||||
$ cargo add async-std
|
||||
```
|
||||
git clone git@github.com:async-rs/async-std.git && cd async-std
|
||||
```
|
||||
|
||||
We also provide a set of "unstable" features with async-std. See the [features
|
||||
documentation] on how to enable them.
|
||||
Generate docs:
|
||||
|
||||
```
|
||||
cargo doc --features docs.rs --open
|
||||
```
|
||||
|
||||
Check out the [examples](examples). To run an example:
|
||||
|
||||
```
|
||||
cargo run --example hello-world
|
||||
```
|
||||
|
||||
## Contributing
|
||||
|
||||
See [our contribution document][contribution].
|
||||
|
||||
[contribution]: https://async.rs/contribute
|
||||
|
||||
[cargo-add]: https://github.com/killercup/cargo-edit
|
||||
[features documentation]: https://docs.rs/async-std/#features
|
||||
|
||||
## Ecosystem
|
||||
|
||||
* [async-tls](https://crates.io/crates/async-tls) — Async TLS/SSL streams using **Rustls**.
|
||||
|
||||
* [async-native-tls](https://crates.io/crates/async-native-tls) — **Native TLS** for Async. Native TLS for futures and async-std.
|
||||
|
||||
* [async-tungstenite](https://crates.io/crates/async-tungstenite) — Asynchronous **WebSockets** for async-std, tokio, gio and any std Futures runtime.
|
||||
|
||||
* [Tide](https://crates.io/crates/tide) — Serve the web. A modular **web framework** built around async/await.
|
||||
|
||||
* [SQLx](https://crates.io/crates/sqlx) — The Rust **SQL** Toolkit. SQLx is a 100% safe Rust library for Postgres and MySQL with compile-time checked queries.
|
||||
|
||||
* [Surf](https://crates.io/crates/surf) — Surf the web. Surf is a friendly **HTTP client** built for casual Rustaceans and veterans alike.
|
||||
|
||||
* [Xactor](https://crates.io/crates/xactor) — Xactor is a rust actors framework based on async-std.
|
||||
|
||||
* [async-graphql](https://crates.io/crates/async-graphql) — A GraphQL server library implemented in rust, with full support for async/await.
|
||||
|
||||
## License
|
||||
|
||||
<sup>
|
||||
Licensed under either of <a href="LICENSE-APACHE">Apache License, Version
|
||||
2.0</a> or <a href="LICENSE-MIT">MIT license</a> at your option.
|
||||
</sup>
|
||||
Licensed under either of
|
||||
|
||||
* Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
|
||||
* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
|
||||
|
||||
at your option.
|
||||
|
||||
<br/>
|
||||
#### Contribution
|
||||
|
||||
<sub>
|
||||
Unless you explicitly state otherwise, any contribution intentionally submitted
|
||||
for inclusion in this crate by you, as defined in the Apache-2.0 license, shall
|
||||
be dual licensed as above, without any additional terms or conditions.
|
||||
</sub>
|
||||
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
|
||||
dual licensed as above, without any additional terms or conditions.
|
||||
|
@ -1,40 +0,0 @@
|
||||
#![feature(test)]
|
||||
|
||||
extern crate test;
|
||||
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
use async_std::task;
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
fn create(b: &mut Bencher) {
|
||||
b.iter(|| Mutex::new(()));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn contention(b: &mut Bencher) {
|
||||
b.iter(|| task::block_on(run(10, 1000)));
|
||||
}
|
||||
|
||||
#[bench]
|
||||
fn no_contention(b: &mut Bencher) {
|
||||
b.iter(|| task::block_on(run(1, 10000)));
|
||||
}
|
||||
|
||||
async fn run(task: usize, iter: usize) {
|
||||
let m = Arc::new(Mutex::new(()));
|
||||
let mut tasks = Vec::new();
|
||||
|
||||
for _ in 0..task {
|
||||
let m = m.clone();
|
||||
tasks.push(task::spawn(async move {
|
||||
for _ in 0..iter {
|
||||
let _ = m.lock().await;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
for t in tasks {
|
||||
t.await;
|
||||
}
|
||||
}
|
@ -1,11 +0,0 @@
|
||||
#![feature(test)]
|
||||
|
||||
extern crate test;
|
||||
|
||||
use async_std::task;
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
fn block_on(b: &mut Bencher) {
|
||||
b.iter(|| task::block_on(async {}));
|
||||
}
|
@ -1,7 +1 @@
|
||||
status = [
|
||||
"Build and test (ubuntu-latest, nightly)",
|
||||
"Build and test (windows-latest, nightly)",
|
||||
"Build and test (macOS-latest, nightly)",
|
||||
"Checking fmt and docs",
|
||||
"Clippy check",
|
||||
]
|
||||
status = ["continuous-integration/travis-ci/push"]
|
||||
|
@ -1,266 +0,0 @@
|
||||
# Production-Ready Accept Loop
|
||||
|
||||
A production-ready accept loop needs the following things:
|
||||
1. Handling errors
|
||||
2. Limiting the number of simultanteous connections to avoid deny-of-service
|
||||
(DoS) attacks
|
||||
|
||||
|
||||
## Handling errors
|
||||
|
||||
There are two kinds of errors in an accept loop:
|
||||
1. Per-connection errors. The system uses them to notify that there was a
|
||||
connection in the queue and it's dropped by the peer. Subsequent connections
|
||||
can be already queued so next connection must be accepted immediately.
|
||||
2. Resource shortages. When these are encountered it doesn't make sense to
|
||||
accept the next socket immediately. But the listener stays active, so you server
|
||||
should try to accept socket later.
|
||||
|
||||
Here is the example of a per-connection error (printed in normal and debug mode):
|
||||
```
|
||||
Error: Connection reset by peer (os error 104)
|
||||
Error: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }
|
||||
```
|
||||
|
||||
And the following is the most common example of a resource shortage error:
|
||||
```
|
||||
Error: Too many open files (os error 24)
|
||||
Error: Os { code: 24, kind: Other, message: "Too many open files" }
|
||||
```
|
||||
|
||||
### Testing Application
|
||||
|
||||
To test your application for these errors try the following (this works
|
||||
on unixes only).
|
||||
|
||||
Lower limits and start the application:
|
||||
```
|
||||
$ ulimit -n 100
|
||||
$ cargo run --example your_app
|
||||
Compiling your_app v0.1.0 (/work)
|
||||
Finished dev [unoptimized + debuginfo] target(s) in 5.47s
|
||||
Running `target/debug/examples/your_app`
|
||||
Server is listening on: http://127.0.0.1:1234
|
||||
```
|
||||
Then in another console run the [`wrk`] benchmark tool:
|
||||
```
|
||||
$ wrk -c 1000 http://127.0.0.1:1234
|
||||
Running 10s test @ http://localhost:8080/
|
||||
2 threads and 1000 connections
|
||||
$ telnet localhost 1234
|
||||
Trying ::1...
|
||||
Connected to localhost.
|
||||
```
|
||||
|
||||
Important is to check the following things:
|
||||
|
||||
1. The application doesn't crash on error (but may log errors, see below)
|
||||
2. It's possible to connect to the application again once load is stopped
|
||||
(few seconds after `wrk`). This is what `telnet` does in example above,
|
||||
make sure it prints `Connected to <hostname>`.
|
||||
3. The `Too many open files` error is logged in the appropriate log. This
|
||||
requires to set "maximum number of simultaneous connections" parameter (see
|
||||
below) of your application to a value greater then `100` for this example.
|
||||
4. Check CPU usage of the app while doing a test. It should not occupy 100%
|
||||
of a single CPU core (it's unlikely that you can exhaust CPU by 1000
|
||||
connections in Rust, so this means error handling is not right).
|
||||
|
||||
#### Testing non-HTTP applications
|
||||
|
||||
If it's possible, use the appropriate benchmark tool and set the appropriate
|
||||
number of connections. For example `redis-benchmark` has a `-c` parameter for
|
||||
that, if you implement redis protocol.
|
||||
|
||||
Alternatively, can still use `wrk`, just make sure that connection is not
|
||||
immediately closed. If it is, put a temporary timeout before handing
|
||||
the connection to the protocol handler, like this:
|
||||
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
# use std::time::Duration;
|
||||
# use async_std::{
|
||||
# net::{TcpListener, ToSocketAddrs},
|
||||
# prelude::*,
|
||||
# };
|
||||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
#async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
# let listener = TcpListener::bind(addr).await?;
|
||||
# let mut incoming = listener.incoming();
|
||||
while let Some(stream) = incoming.next().await {
|
||||
task::spawn(async {
|
||||
task::sleep(Duration::from_secs(10)).await; // 1
|
||||
connection_loop(stream).await;
|
||||
});
|
||||
}
|
||||
# Ok(())
|
||||
# }
|
||||
```
|
||||
|
||||
1. Make sure the sleep coroutine is inside the spawned task, not in the loop.
|
||||
|
||||
[`wrk`]: https://github.com/wg/wrk
|
||||
|
||||
|
||||
### Handling Errors Manually
|
||||
|
||||
Here is how basic accept loop could look like:
|
||||
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
# use std::time::Duration;
|
||||
# use async_std::{
|
||||
# net::{TcpListener, ToSocketAddrs},
|
||||
# prelude::*,
|
||||
# };
|
||||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let mut incoming = listener.incoming();
|
||||
while let Some(result) = incoming.next().await {
|
||||
let stream = match stream {
|
||||
Err(ref e) if is_connection_error(e) => continue, // 1
|
||||
Err(e) => {
|
||||
eprintln!("Error: {}. Pausing for 500ms."); // 3
|
||||
task::sleep(Duration::from_millis(500)).await; // 2
|
||||
continue;
|
||||
}
|
||||
Ok(s) => s,
|
||||
};
|
||||
// body
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
```
|
||||
|
||||
1. Ignore per-connection errors.
|
||||
2. Sleep and continue on resource shortage.
|
||||
3. It's important to log the message, because these errors commonly mean the
|
||||
misconfiguration of the system and are helpful for operations people running
|
||||
the application.
|
||||
|
||||
Be sure to [test your application](#testing-application).
|
||||
|
||||
|
||||
### External Crates
|
||||
|
||||
The crate [`async-listen`] has a helper to achieve this task:
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
# extern crate async_listen;
|
||||
# use std::time::Duration;
|
||||
# use async_std::{
|
||||
# net::{TcpListener, ToSocketAddrs},
|
||||
# prelude::*,
|
||||
# };
|
||||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
use async_listen::{ListenExt, error_hint};
|
||||
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let mut incoming = listener
|
||||
.incoming()
|
||||
.log_warnings(log_accept_error) // 1
|
||||
.handle_errors(Duration::from_millis(500));
|
||||
while let Some(socket) = incoming.next().await { // 2
|
||||
// body
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn log_accept_error(e: &io::Error) {
|
||||
eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)) // 3
|
||||
}
|
||||
```
|
||||
|
||||
1. Logs resource shortages (`async-listen` calls them warnings). If you use
|
||||
`log` crate or any other in your app this should go to the log.
|
||||
2. Stream yields sockets without `Result` wrapper after `handle_errors` because
|
||||
all errors are already handled.
|
||||
3. Together with the error we print a hint, which explains some errors for end
|
||||
users. For example, it recommends increasing open file limit and gives
|
||||
a link.
|
||||
|
||||
[`async-listen`]: https://crates.io/crates/async-listen/
|
||||
|
||||
Be sure to [test your application](#testing-application).
|
||||
|
||||
|
||||
## Connections Limit
|
||||
|
||||
Even if you've applied everything described in
|
||||
[Handling Errors](#handling-errors) section, there is still a problem.
|
||||
|
||||
Let's imagine you have a server that needs to open a file to process
|
||||
client request. At some point, you might encounter the following situation:
|
||||
|
||||
1. There are as many client connection as max file descriptors allowed for
|
||||
the application.
|
||||
2. Listener gets `Too many open files` error so it sleeps.
|
||||
3. Some client sends a request via the previously open connection.
|
||||
4. Opening a file to serve request fails, because of the same
|
||||
`Too many open files` error, until some other client drops a connection.
|
||||
|
||||
There are many more possible situations, this is just a small illustation that
|
||||
limiting number of connections is very useful. Generally, it's one of the ways
|
||||
to control resources used by a server and avoiding some kinds of deny of
|
||||
service (DoS) attacks.
|
||||
|
||||
### `async-listen` crate
|
||||
|
||||
Limiting maximum number of simultaneous connections with [`async-listen`]
|
||||
looks like the following:
|
||||
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
# extern crate async_listen;
|
||||
# use std::time::Duration;
|
||||
# use async_std::{
|
||||
# net::{TcpListener, TcpStream, ToSocketAddrs},
|
||||
# prelude::*,
|
||||
# };
|
||||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
use async_listen::{ListenExt, Token, error_hint};
|
||||
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let mut incoming = listener
|
||||
.incoming()
|
||||
.log_warnings(log_accept_error)
|
||||
.handle_errors(Duration::from_millis(500)) // 1
|
||||
.backpressure(100);
|
||||
while let Some((token, socket)) = incoming.next().await { // 2
|
||||
task::spawn(async move {
|
||||
connection_loop(&token, stream).await; // 3
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn connection_loop(_token: &Token, stream: TcpStream) { // 4
|
||||
// ...
|
||||
}
|
||||
# fn log_accept_error(e: &io::Error) {
|
||||
# eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e));
|
||||
# }
|
||||
```
|
||||
|
||||
1. We need to handle errors first, because [`backpressure`] helper expects
|
||||
stream of `TcpStream` rather than `Result`.
|
||||
2. The token yielded by a new stream is what is counted by backpressure helper.
|
||||
I.e. if you drop a token, new connection can be established.
|
||||
3. We give the connection loop a reference to token to bind token's lifetime to
|
||||
the lifetime of the connection.
|
||||
4. The token itsellf in the function can be ignored, hence `_token`
|
||||
|
||||
[`backpressure`]: https://docs.rs/async-listen/0.1.2/async_listen/trait.ListenExt.html#method.backpressure
|
||||
|
||||
Be sure to [test this behavior](#testing-application).
|
@ -1,14 +1,11 @@
|
||||
# Tutorial: Writing a chat
|
||||
|
||||
Nothing is simpler than creating a chat server, right?
|
||||
Not quite, chat servers expose you to all the fun of asynchronous programming:
|
||||
Nothing is as simple as a chat server, right? Not quite, chat servers
|
||||
already expose you to all the fun of asynchronous programming: how
|
||||
do you handle clients connecting concurrently. How do you handle them disconnecting?
|
||||
|
||||
How will the server handle clients connecting concurrently?
|
||||
How do you distribute the messages?
|
||||
|
||||
How will it handle them disconnecting?
|
||||
|
||||
How will it distribute the messages?
|
||||
|
||||
This tutorial explains how to write a chat server in `async-std`.
|
||||
In this tutorial, we will show you how to write one in `async-std`.
|
||||
|
||||
You can also find the tutorial in [our repository](https://github.com/async-rs/async-std/blob/master/examples/a-chat).
|
||||
|
@ -1,45 +0,0 @@
|
||||
//! TCP echo server, accepting connections both on both ipv4 and ipv6 sockets.
|
||||
//!
|
||||
//! To send messages, do:
|
||||
//!
|
||||
//! ```sh
|
||||
//! $ nc 127.0.0.1 8080
|
||||
//! $ nc ::1 8080
|
||||
//! ```
|
||||
|
||||
use async_std::io;
|
||||
use async_std::net::{TcpListener, TcpStream};
|
||||
use async_std::prelude::*;
|
||||
use async_std::task;
|
||||
|
||||
async fn process(stream: TcpStream) -> io::Result<()> {
|
||||
println!("Accepted from: {}", stream.peer_addr()?);
|
||||
|
||||
let mut reader = stream.clone();
|
||||
let mut writer = stream;
|
||||
io::copy(&mut reader, &mut writer).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> io::Result<()> {
|
||||
task::block_on(async {
|
||||
let ipv4_listener = TcpListener::bind("127.0.0.1:8080").await?;
|
||||
println!("Listening on {}", ipv4_listener.local_addr()?);
|
||||
let ipv6_listener = TcpListener::bind("[::1]:8080").await?;
|
||||
println!("Listening on {}", ipv6_listener.local_addr()?);
|
||||
|
||||
let ipv4_incoming = ipv4_listener.incoming();
|
||||
let ipv6_incoming = ipv6_listener.incoming();
|
||||
|
||||
let mut incoming = ipv4_incoming.merge(ipv6_incoming);
|
||||
|
||||
while let Some(stream) = incoming.next().await {
|
||||
let stream = stream?;
|
||||
task::spawn(async {
|
||||
process(stream).await.unwrap();
|
||||
});
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
@ -1,84 +1,86 @@
|
||||
cfg_not_docs! {
|
||||
pub use std::fs::FileType;
|
||||
}
|
||||
|
||||
cfg_docs! {
|
||||
/// The type of a file or directory.
|
||||
///
|
||||
/// A file type is returned by [`Metadata::file_type`].
|
||||
///
|
||||
/// Note that file types are mutually exclusive, i.e. at most one of methods [`is_dir`],
|
||||
/// [`is_file`], and [`is_symlink`] can return `true`.
|
||||
///
|
||||
/// This type is a re-export of [`std::fs::FileType`].
|
||||
///
|
||||
/// [`Metadata::file_type`]: struct.Metadata.html#method.file_type
|
||||
/// [`is_dir`]: #method.is_dir
|
||||
/// [`is_file`]: #method.is_file
|
||||
/// [`is_symlink`]: #method.is_symlink
|
||||
/// [`std::fs::FileType`]: https://doc.rust-lang.org/std/fs/struct.FileType.html
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
|
||||
pub struct FileType {
|
||||
_private: (),
|
||||
}
|
||||
use cfg_if::cfg_if;
|
||||
|
||||
impl FileType {
|
||||
/// Returns `true` if this file type represents a regular directory.
|
||||
cfg_if! {
|
||||
if #[cfg(feature = "docs")] {
|
||||
/// The type of a file or directory.
|
||||
///
|
||||
/// If this file type represents a symbolic link, this method returns `false`.
|
||||
/// A file type is returned by [`Metadata::file_type`].
|
||||
///
|
||||
/// # Examples
|
||||
/// Note that file types are mutually exclusive, i.e. at most one of methods [`is_dir`],
|
||||
/// [`is_file`], and [`is_symlink`] can return `true`.
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
/// This type is a re-export of [`std::fs::FileType`].
|
||||
///
|
||||
/// let file_type = fs::metadata(".").await?.file_type();
|
||||
/// println!("{:?}", file_type.is_dir());
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn is_dir(&self) -> bool {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
/// [`Metadata::file_type`]: struct.Metadata.html#method.file_type
|
||||
/// [`is_dir`]: #method.is_dir
|
||||
/// [`is_file`]: #method.is_file
|
||||
/// [`is_symlink`]: #method.is_symlink
|
||||
/// [`std::fs::FileType`]: https://doc.rust-lang.org/std/fs/struct.FileType.html
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
|
||||
pub struct FileType {
|
||||
_private: (),
|
||||
}
|
||||
|
||||
/// Returns `true` if this file type represents a regular file.
|
||||
///
|
||||
/// If this file type represents a symbolic link, this method returns `false`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
///
|
||||
/// let file_type = fs::metadata("a.txt").await?.file_type();
|
||||
/// println!("{:?}", file_type.is_file());
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn is_file(&self) -> bool {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
impl FileType {
|
||||
/// Returns `true` if this file type represents a regular directory.
|
||||
///
|
||||
/// If this file type represents a symbolic link, this method returns `false`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
///
|
||||
/// let file_type = fs::metadata(".").await?.file_type();
|
||||
/// println!("{:?}", file_type.is_dir());
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn is_dir(&self) -> bool {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
/// Returns `true` if this file type represents a symbolic link.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
///
|
||||
/// let file_type = fs::metadata("a.txt").await?.file_type();
|
||||
/// println!("{:?}", file_type.is_symlink());
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn is_symlink(&self) -> bool {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
/// Returns `true` if this file type represents a regular file.
|
||||
///
|
||||
/// If this file type represents a symbolic link, this method returns `false`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
///
|
||||
/// let file_type = fs::metadata("a.txt").await?.file_type();
|
||||
/// println!("{:?}", file_type.is_file());
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn is_file(&self) -> bool {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
/// Returns `true` if this file type represents a symbolic link.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
///
|
||||
/// let file_type = fs::metadata("a.txt").await?.file_type();
|
||||
/// println!("{:?}", file_type.is_symlink());
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn is_symlink(&self) -> bool {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pub use std::fs::FileType;
|
||||
}
|
||||
}
|
||||
|
@ -1,56 +1,58 @@
|
||||
cfg_not_docs! {
|
||||
pub use std::fs::Permissions;
|
||||
}
|
||||
|
||||
cfg_docs! {
|
||||
/// A set of permissions on a file or directory.
|
||||
///
|
||||
/// This type is a re-export of [`std::fs::Permissions`].
|
||||
///
|
||||
/// [`std::fs::Permissions`]: https://doc.rust-lang.org/std/fs/struct.Permissions.html
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
pub struct Permissions {
|
||||
_private: (),
|
||||
}
|
||||
use cfg_if::cfg_if;
|
||||
|
||||
impl Permissions {
|
||||
/// Returns the read-only flag.
|
||||
cfg_if! {
|
||||
if #[cfg(feature = "docs")] {
|
||||
/// A set of permissions on a file or directory.
|
||||
///
|
||||
/// # Examples
|
||||
/// This type is a re-export of [`std::fs::Permissions`].
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
///
|
||||
/// let perm = fs::metadata("a.txt").await?.permissions();
|
||||
/// println!("{:?}", perm.readonly());
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn readonly(&self) -> bool {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
/// [`std::fs::Permissions`]: https://doc.rust-lang.org/std/fs/struct.Permissions.html
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
pub struct Permissions {
|
||||
_private: (),
|
||||
}
|
||||
|
||||
/// Configures the read-only flag.
|
||||
///
|
||||
/// [`fs::set_permissions`]: fn.set_permissions.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
///
|
||||
/// let mut perm = fs::metadata("a.txt").await?.permissions();
|
||||
/// perm.set_readonly(true);
|
||||
/// fs::set_permissions("a.txt", perm).await?;
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn set_readonly(&mut self, readonly: bool) {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
impl Permissions {
|
||||
/// Returns the read-only flag.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
///
|
||||
/// let perm = fs::metadata("a.txt").await?.permissions();
|
||||
/// println!("{:?}", perm.readonly());
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn readonly(&self) -> bool {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
/// Configures the read-only flag.
|
||||
///
|
||||
/// [`fs::set_permissions`]: fn.set_permissions.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use async_std::fs;
|
||||
///
|
||||
/// let mut perm = fs::metadata("a.txt").await?.permissions();
|
||||
/// perm.set_readonly(true);
|
||||
/// fs::set_permissions("a.txt", perm).await?;
|
||||
/// #
|
||||
/// # Ok(()) }) }
|
||||
/// ```
|
||||
pub fn set_readonly(&mut self, readonly: bool) {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
pub use std::fs::Permissions;
|
||||
}
|
||||
}
|
||||
|
@ -1,43 +0,0 @@
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::task::{Context, Poll};
|
||||
use crate::utils::{timer_after, Timer};
|
||||
|
||||
pin_project! {
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct DelayFuture<F> {
|
||||
#[pin]
|
||||
future: F,
|
||||
#[pin]
|
||||
delay: Timer,
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> DelayFuture<F> {
|
||||
pub fn new(future: F, dur: Duration) -> DelayFuture<F> {
|
||||
let delay = timer_after(dur);
|
||||
|
||||
DelayFuture { future, delay }
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future> Future for DelayFuture<F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
match this.delay.poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(_) => match this.future.poll(cx) {
|
||||
Poll::Ready(v) => Poll::Ready(v),
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
@ -1,52 +0,0 @@
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::IntoFuture;
|
||||
use crate::task::{ready, Context, Poll};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct FlattenFuture<Fut1, Fut2> {
|
||||
state: State<Fut1, Fut2>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum State<Fut1, Fut2> {
|
||||
First(Fut1),
|
||||
Second(Fut2),
|
||||
Empty,
|
||||
}
|
||||
|
||||
impl<Fut1, Fut2> FlattenFuture<Fut1, Fut2> {
|
||||
pub(crate) fn new(future: Fut1) -> FlattenFuture<Fut1, Fut2> {
|
||||
FlattenFuture {
|
||||
state: State::First(future),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fut1> Future for FlattenFuture<Fut1, <Fut1::Output as IntoFuture>::Future>
|
||||
where
|
||||
Fut1: Future,
|
||||
Fut1::Output: IntoFuture,
|
||||
{
|
||||
type Output = <Fut1::Output as IntoFuture>::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let Self { state } = unsafe { self.get_unchecked_mut() };
|
||||
loop {
|
||||
match state {
|
||||
State::First(fut1) => {
|
||||
let fut2 = ready!(unsafe { Pin::new_unchecked(fut1) }.poll(cx)).into_future();
|
||||
*state = State::Second(fut2);
|
||||
}
|
||||
State::Second(fut2) => {
|
||||
let v = ready!(unsafe { Pin::new_unchecked(fut2) }.poll(cx));
|
||||
*state = State::Empty;
|
||||
return Poll::Ready(v);
|
||||
}
|
||||
State::Empty => panic!("polled a completed future"),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::MaybeDone;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::task::{Context, Poll};
|
||||
use std::future::Future;
|
||||
|
||||
pin_project! {
|
||||
#[allow(missing_docs)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Join<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future,
|
||||
{
|
||||
#[pin] left: MaybeDone<L>,
|
||||
#[pin] right: MaybeDone<R>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R> Join<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future,
|
||||
{
|
||||
pub(crate) fn new(left: L, right: R) -> Self {
|
||||
Self {
|
||||
left: MaybeDone::new(left),
|
||||
right: MaybeDone::new(right),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R> Future for Join<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future,
|
||||
{
|
||||
type Output = (L::Output, R::Output);
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
let mut left = this.left;
|
||||
let mut right = this.right;
|
||||
|
||||
let is_left_ready = Future::poll(Pin::new(&mut left), cx).is_ready();
|
||||
if is_left_ready && right.as_ref().output().is_some() {
|
||||
return Poll::Ready((left.take().unwrap(), right.take().unwrap()));
|
||||
}
|
||||
|
||||
let is_right_ready = Future::poll(Pin::new(&mut right), cx).is_ready();
|
||||
if is_right_ready && left.as_ref().output().is_some() {
|
||||
return Poll::Ready((left.take().unwrap(), right.take().unwrap()));
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
@ -1,432 +0,0 @@
|
||||
cfg_unstable! {
|
||||
mod delay;
|
||||
mod flatten;
|
||||
mod race;
|
||||
mod try_race;
|
||||
mod join;
|
||||
mod try_join;
|
||||
|
||||
use std::time::Duration;
|
||||
use delay::DelayFuture;
|
||||
use flatten::FlattenFuture;
|
||||
use crate::future::IntoFuture;
|
||||
use race::Race;
|
||||
use try_race::TryRace;
|
||||
use join::Join;
|
||||
use try_join::TryJoin;
|
||||
}
|
||||
|
||||
cfg_unstable_default! {
|
||||
use crate::future::timeout::TimeoutFuture;
|
||||
}
|
||||
|
||||
extension_trait! {
|
||||
use core::pin::Pin;
|
||||
use core::ops::{Deref, DerefMut};
|
||||
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[doc = r#"
|
||||
A future represents an asynchronous computation.
|
||||
|
||||
A future is a value that may not have finished computing yet. This kind of
|
||||
"asynchronous value" makes it possible for a thread to continue doing useful
|
||||
work while it waits for the value to become available.
|
||||
|
||||
The [provided methods] do not really exist in the trait itself, but they become
|
||||
available when [`FutureExt`] from the [prelude] is imported:
|
||||
|
||||
```
|
||||
# #[allow(unused_imports)]
|
||||
use async_std::prelude::*;
|
||||
```
|
||||
|
||||
# The `poll` method
|
||||
|
||||
The core method of future, `poll`, *attempts* to resolve the future into a
|
||||
final value. This method does not block if the value is not ready. Instead,
|
||||
the current task is scheduled to be woken up when it's possible to make
|
||||
further progress by `poll`ing again. The `context` passed to the `poll`
|
||||
method can provide a [`Waker`], which is a handle for waking up the current
|
||||
task.
|
||||
|
||||
When using a future, you generally won't call `poll` directly, but instead
|
||||
`.await` the value.
|
||||
|
||||
[`Waker`]: ../task/struct.Waker.html
|
||||
[provided methods]: #provided-methods
|
||||
[`FutureExt`]: ../prelude/trait.FutureExt.html
|
||||
[prelude]: ../prelude/index.html
|
||||
"#]
|
||||
pub trait Future {
|
||||
#[doc = r#"
|
||||
The type of value produced on completion.
|
||||
"#]
|
||||
type Output;
|
||||
|
||||
#[doc = r#"
|
||||
Attempt to resolve the future to a final value, registering
|
||||
the current task for wakeup if the value is not yet available.
|
||||
|
||||
# Return value
|
||||
|
||||
This function returns:
|
||||
|
||||
- [`Poll::Pending`] if the future is not ready yet
|
||||
- [`Poll::Ready(val)`] with the result `val` of this future if it
|
||||
finished successfully.
|
||||
|
||||
Once a future has finished, clients should not `poll` it again.
|
||||
|
||||
When a future is not ready yet, `poll` returns `Poll::Pending` and
|
||||
stores a clone of the [`Waker`] copied from the current [`Context`].
|
||||
This [`Waker`] is then woken once the future can make progress.
|
||||
For example, a future waiting for a socket to become
|
||||
readable would call `.clone()` on the [`Waker`] and store it.
|
||||
When a signal arrives elsewhere indicating that the socket is readable,
|
||||
[`Waker::wake`] is called and the socket future's task is awoken.
|
||||
Once a task has been woken up, it should attempt to `poll` the future
|
||||
again, which may or may not produce a final value.
|
||||
|
||||
Note that on multiple calls to `poll`, only the [`Waker`] from the
|
||||
[`Context`] passed to the most recent call should be scheduled to
|
||||
receive a wakeup.
|
||||
|
||||
# Runtime characteristics
|
||||
|
||||
Futures alone are *inert*; they must be *actively* `poll`ed to make
|
||||
progress, meaning that each time the current task is woken up, it should
|
||||
actively re-`poll` pending futures that it still has an interest in.
|
||||
|
||||
The `poll` function is not called repeatedly in a tight loop -- instead,
|
||||
it should only be called when the future indicates that it is ready to
|
||||
make progress (by calling `wake()`). If you're familiar with the
|
||||
`poll(2)` or `select(2)` syscalls on Unix it's worth noting that futures
|
||||
typically do *not* suffer the same problems of "all wakeups must poll
|
||||
all events"; they are more like `epoll(4)`.
|
||||
|
||||
An implementation of `poll` should strive to return quickly, and should
|
||||
not block. Returning quickly prevents unnecessarily clogging up
|
||||
threads or event loops. If it is known ahead of time that a call to
|
||||
`poll` may end up taking awhile, the work should be offloaded to a
|
||||
thread pool (or something similar) to ensure that `poll` can return
|
||||
quickly.
|
||||
|
||||
# Panics
|
||||
|
||||
Once a future has completed (returned `Ready` from `poll`), calling its
|
||||
`poll` method again may panic, block forever, or cause other kinds of
|
||||
problems; the `Future` trait places no requirements on the effects of
|
||||
such a call. However, as the `poll` method is not marked `unsafe`,
|
||||
Rust's usual rules apply: calls must never cause undefined behavior
|
||||
(memory corruption, incorrect use of `unsafe` functions, or the like),
|
||||
regardless of the future's state.
|
||||
|
||||
[`Poll::Pending`]: ../task/enum.Poll.html#variant.Pending
|
||||
[`Poll::Ready(val)`]: ../task/enum.Poll.html#variant.Ready
|
||||
[`Context`]: ../task/struct.Context.html
|
||||
[`Waker`]: ../task/struct.Waker.html
|
||||
[`Waker::wake`]: ../task/struct.Waker.html#method.wake
|
||||
"#]
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
Extension methods for [`Future`].
|
||||
|
||||
[`Future`]: ../future/trait.Future.html
|
||||
"#]
|
||||
pub trait FutureExt: core::future::Future {
|
||||
/// Returns a Future that delays execution for a specified time.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::future;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let a = future::ready(1).delay(Duration::from_millis(2000));
|
||||
/// dbg!(a.await);
|
||||
/// # })
|
||||
/// ```
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn delay(self, dur: Duration) -> impl Future<Output = Self::Output> [DelayFuture<Self>]
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
DelayFuture::new(self, dur)
|
||||
}
|
||||
|
||||
/// Flatten out the execution of this future when the result itself
|
||||
/// can be converted into another future.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # async_std::task::block_on(async {
|
||||
/// use async_std::prelude::*;
|
||||
///
|
||||
/// let nested_future = async { async { 1 } };
|
||||
/// let future = nested_future.flatten();
|
||||
/// assert_eq!(future.await, 1);
|
||||
/// # })
|
||||
/// ```
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn flatten(
|
||||
self,
|
||||
) -> impl Future<Output = <Self::Output as IntoFuture>::Output>
|
||||
[FlattenFuture<Self, <Self::Output as IntoFuture>::Future>]
|
||||
where
|
||||
Self: Sized,
|
||||
<Self as Future>::Output: IntoFuture,
|
||||
{
|
||||
FlattenFuture::new(self)
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
Waits for one of two similarly-typed futures to complete.
|
||||
|
||||
Awaits multiple futures simultaneously, returning the output of the
|
||||
first future that completes.
|
||||
|
||||
This function will return a new future which awaits for either one of both
|
||||
futures to complete. If multiple futures are completed at the same time,
|
||||
resolution will occur in the order that they have been passed.
|
||||
|
||||
Note that this function consumes all futures passed, and once a future is
|
||||
completed, all other futures are dropped.
|
||||
|
||||
# Examples
|
||||
|
||||
```
|
||||
# async_std::task::block_on(async {
|
||||
use async_std::prelude::*;
|
||||
use async_std::future;
|
||||
|
||||
let a = future::pending();
|
||||
let b = future::ready(1u8);
|
||||
let c = future::ready(2u8);
|
||||
|
||||
let f = a.race(b).race(c);
|
||||
assert_eq!(f.await, 1u8);
|
||||
# });
|
||||
```
|
||||
"#]
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn race<F>(
|
||||
self,
|
||||
other: F,
|
||||
) -> impl Future<Output = <Self as std::future::Future>::Output> [Race<Self, F>]
|
||||
where
|
||||
Self: std::future::Future + Sized,
|
||||
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
|
||||
{
|
||||
Race::new(self, other)
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
Waits for one of two similarly-typed fallible futures to complete.
|
||||
|
||||
Awaits multiple futures simultaneously, returning all results once complete.
|
||||
|
||||
`try_race` is similar to [`race`], but keeps going if a future
|
||||
resolved to an error until all futures have been resolved. In which case
|
||||
an error is returned.
|
||||
|
||||
The ordering of which value is yielded when two futures resolve
|
||||
simultaneously is intentionally left unspecified.
|
||||
|
||||
[`race`]: #method.race
|
||||
|
||||
# Examples
|
||||
|
||||
```
|
||||
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
#
|
||||
use async_std::prelude::*;
|
||||
use async_std::future;
|
||||
use std::io::{Error, ErrorKind};
|
||||
|
||||
let a = future::pending::<Result<_, Error>>();
|
||||
let b = future::ready(Err(Error::from(ErrorKind::Other)));
|
||||
let c = future::ready(Ok(1u8));
|
||||
|
||||
let f = a.try_race(b).try_race(c);
|
||||
assert_eq!(f.await?, 1u8);
|
||||
#
|
||||
# Ok(()) }) }
|
||||
```
|
||||
"#]
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn try_race<F, T, E>(
|
||||
self,
|
||||
other: F
|
||||
) -> impl Future<Output = <Self as std::future::Future>::Output> [TryRace<Self, F>]
|
||||
where
|
||||
Self: std::future::Future<Output = Result<T, E>> + Sized,
|
||||
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
|
||||
{
|
||||
TryRace::new(self, other)
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
Waits for two similarly-typed futures to complete.
|
||||
|
||||
Awaits multiple futures simultaneously, returning the output of the
|
||||
futures once both complete.
|
||||
|
||||
This function returns a new future which polls both futures
|
||||
concurrently.
|
||||
|
||||
# Examples
|
||||
|
||||
```
|
||||
# async_std::task::block_on(async {
|
||||
use async_std::prelude::*;
|
||||
use async_std::future;
|
||||
|
||||
let a = future::ready(1u8);
|
||||
let b = future::ready(2u16);
|
||||
|
||||
let f = a.join(b);
|
||||
assert_eq!(f.await, (1u8, 2u16));
|
||||
# });
|
||||
```
|
||||
"#]
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn join<F>(
|
||||
self,
|
||||
other: F
|
||||
) -> impl Future<Output = (<Self as std::future::Future>::Output, <F as std::future::Future>::Output)> [Join<Self, F>]
|
||||
where
|
||||
Self: std::future::Future + Sized,
|
||||
F: std::future::Future,
|
||||
{
|
||||
Join::new(self, other)
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
Waits for two similarly-typed fallible futures to complete.
|
||||
|
||||
Awaits multiple futures simultaneously, returning all results once
|
||||
complete.
|
||||
|
||||
`try_join` is similar to [`join`], but returns an error immediately
|
||||
if a future resolves to an error.
|
||||
|
||||
[`join`]: #method.join
|
||||
|
||||
# Examples
|
||||
|
||||
```
|
||||
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
#
|
||||
use async_std::prelude::*;
|
||||
use async_std::future;
|
||||
|
||||
let a = future::ready(Err::<u8, &str>("Error"));
|
||||
let b = future::ready(Ok(1u8));
|
||||
|
||||
let f = a.try_join(b);
|
||||
assert_eq!(f.await, Err("Error"));
|
||||
|
||||
let a = future::ready(Ok::<u8, String>(1u8));
|
||||
let b = future::ready(Ok::<u16, String>(2u16));
|
||||
|
||||
let f = a.try_join(b);
|
||||
assert_eq!(f.await, Ok((1u8, 2u16)));
|
||||
#
|
||||
# Ok(()) }) }
|
||||
```
|
||||
"#]
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn try_join<F, A, B, E>(
|
||||
self,
|
||||
other: F
|
||||
) -> impl Future<Output = Result<(A, B), E>> [TryJoin<Self, F>]
|
||||
where
|
||||
Self: std::future::Future<Output = Result<A, E>> + Sized,
|
||||
F: std::future::Future<Output = Result<B, E>>,
|
||||
{
|
||||
TryJoin::new(self, other)
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
Waits for both the future and a timeout, if the timeout completes before
|
||||
the future, it returns an TimeoutError.
|
||||
|
||||
# Example
|
||||
```
|
||||
# async_std::task::block_on(async {
|
||||
#
|
||||
use std::time::Duration;
|
||||
|
||||
use async_std::prelude::*;
|
||||
use async_std::future;
|
||||
|
||||
let fut = future::ready(0);
|
||||
let dur = Duration::from_millis(100);
|
||||
let res = fut.timeout(dur).await;
|
||||
assert!(res.is_ok());
|
||||
|
||||
let fut = future::pending::<()>();
|
||||
let dur = Duration::from_millis(100);
|
||||
let res = fut.timeout(dur).await;
|
||||
assert!(res.is_err())
|
||||
#
|
||||
# });
|
||||
```
|
||||
"#]
|
||||
#[cfg(any(all(feature = "default", feature = "unstable"), feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn timeout(self, dur: Duration) -> impl Future<Output = Self::Output> [TimeoutFuture<Self>]
|
||||
where Self: Sized
|
||||
{
|
||||
TimeoutFuture::new(self, dur)
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future + Unpin + ?Sized> Future for Box<F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future + Unpin + ?Sized> Future for &mut F {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
|
||||
impl<P> Future for Pin<P>
|
||||
where
|
||||
P: DerefMut + Unpin,
|
||||
<P as Deref>::Target: Future,
|
||||
{
|
||||
type Output = <<P as Deref>::Target as Future>::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
|
||||
impl<F: Future> Future for std::panic::AssertUnwindSafe<F> {
|
||||
type Output = F::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
unreachable!("this impl only appears in the rendered docs")
|
||||
}
|
||||
}
|
||||
}
|
@ -1,57 +0,0 @@
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::MaybeDone;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
#[allow(missing_docs)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct Race<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future<Output = L::Output>
|
||||
{
|
||||
#[pin] left: MaybeDone<L>,
|
||||
#[pin] right: MaybeDone<R>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R> Race<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future<Output = L::Output>,
|
||||
{
|
||||
pub(crate) fn new(left: L, right: R) -> Self {
|
||||
Self {
|
||||
left: MaybeDone::new(left),
|
||||
right: MaybeDone::new(right),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R> Future for Race<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future<Output = L::Output>,
|
||||
{
|
||||
type Output = L::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
let mut left = this.left;
|
||||
if Future::poll(Pin::new(&mut left), cx).is_ready() {
|
||||
return Poll::Ready(left.take().unwrap());
|
||||
}
|
||||
|
||||
let mut right = this.right;
|
||||
if Future::poll(Pin::new(&mut right), cx).is_ready() {
|
||||
return Poll::Ready(right.take().unwrap());
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
@ -1,72 +0,0 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::MaybeDone;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::task::{Context, Poll};
|
||||
use std::future::Future;
|
||||
|
||||
pin_project! {
|
||||
#[allow(missing_docs)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct TryJoin<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future,
|
||||
{
|
||||
#[pin] left: MaybeDone<L>,
|
||||
#[pin] right: MaybeDone<R>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R> TryJoin<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future,
|
||||
{
|
||||
pub(crate) fn new(left: L, right: R) -> Self {
|
||||
Self {
|
||||
left: MaybeDone::new(left),
|
||||
right: MaybeDone::new(right),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R, A, B, E> Future for TryJoin<L, R>
|
||||
where
|
||||
L: Future<Output = Result<A, E>>,
|
||||
R: Future<Output = Result<B, E>>,
|
||||
{
|
||||
type Output = Result<(A, B), E>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
let mut left = this.left;
|
||||
let mut right = this.right;
|
||||
|
||||
if Future::poll(Pin::new(&mut left), cx).is_ready() {
|
||||
if left.as_ref().output().unwrap().is_err() {
|
||||
return Poll::Ready(Err(left.take().unwrap().err().unwrap()));
|
||||
} else if right.as_ref().output().is_some() {
|
||||
return Poll::Ready(Ok((
|
||||
left.take().unwrap().ok().unwrap(),
|
||||
right.take().unwrap().ok().unwrap(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
if Future::poll(Pin::new(&mut right), cx).is_ready() {
|
||||
if right.as_ref().output().unwrap().is_err() {
|
||||
return Poll::Ready(Err(right.take().unwrap().err().unwrap()));
|
||||
} else if left.as_ref().output().is_some() {
|
||||
return Poll::Ready(Ok((
|
||||
left.take().unwrap().ok().unwrap(),
|
||||
right.take().unwrap().ok().unwrap(),
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
@ -1,66 +0,0 @@
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::MaybeDone;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::task::{Context, Poll};
|
||||
use std::future::Future;
|
||||
|
||||
pin_project! {
|
||||
#[allow(missing_docs)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct TryRace<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future<Output = L::Output>
|
||||
{
|
||||
#[pin] left: MaybeDone<L>,
|
||||
#[pin] right: MaybeDone<R>,
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R> TryRace<L, R>
|
||||
where
|
||||
L: Future,
|
||||
R: Future<Output = L::Output>,
|
||||
{
|
||||
pub(crate) fn new(left: L, right: R) -> Self {
|
||||
Self {
|
||||
left: MaybeDone::new(left),
|
||||
right: MaybeDone::new(right),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<L, R, T, E> Future for TryRace<L, R>
|
||||
where
|
||||
L: Future<Output = Result<T, E>>,
|
||||
R: Future<Output = L::Output>,
|
||||
{
|
||||
type Output = L::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
let mut left_errored = false;
|
||||
|
||||
// Check if the left future is ready & successful. Continue if not.
|
||||
let mut left = this.left;
|
||||
if Future::poll(Pin::new(&mut left), cx).is_ready() {
|
||||
if left.as_ref().output().unwrap().is_ok() {
|
||||
return Poll::Ready(left.take().unwrap());
|
||||
} else {
|
||||
left_errored = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the right future is ready & successful. Return err if left
|
||||
// future also resolved to err. Continue if not.
|
||||
let mut right = this.right;
|
||||
let is_ready = Future::poll(Pin::new(&mut right), cx).is_ready();
|
||||
if is_ready && (right.as_ref().output().unwrap().is_ok() || left_errored) {
|
||||
return Poll::Ready(right.take().unwrap());
|
||||
}
|
||||
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
@ -1,53 +0,0 @@
|
||||
use std::future::Future;
|
||||
|
||||
/// Convert a type into a `Future`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_std::future::{Future, IntoFuture};
|
||||
/// use async_std::io;
|
||||
/// use async_std::pin::Pin;
|
||||
///
|
||||
/// struct Client;
|
||||
///
|
||||
/// impl Client {
|
||||
/// pub async fn send(self) -> io::Result<()> {
|
||||
/// // Send a request
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// impl IntoFuture for Client {
|
||||
/// type Output = io::Result<()>;
|
||||
///
|
||||
/// type Future = Pin<Box<dyn Future<Output = Self::Output>>>;
|
||||
///
|
||||
/// fn into_future(self) -> Self::Future {
|
||||
/// Box::pin(async {
|
||||
/// self.send().await
|
||||
/// })
|
||||
/// }
|
||||
/// }
|
||||
/// ```
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
pub trait IntoFuture {
|
||||
/// The type of value produced on completion.
|
||||
type Output;
|
||||
|
||||
/// Which kind of future are we turning this into?
|
||||
type Future: Future<Output = Self::Output>;
|
||||
|
||||
/// Create a future from a value
|
||||
fn into_future(self) -> Self::Future;
|
||||
}
|
||||
|
||||
impl<T: Future> IntoFuture for T {
|
||||
type Output = T::Output;
|
||||
type Future = T;
|
||||
|
||||
fn into_future(self) -> Self::Future {
|
||||
self
|
||||
}
|
||||
}
|
@ -1,79 +0,0 @@
|
||||
//! A type that wraps a future to keep track of its completion status.
|
||||
//!
|
||||
//! This implementation was taken from the original `macro_rules` `join/try_join`
|
||||
//! macros in the `futures-preview` crate.
|
||||
|
||||
use std::future::Future;
|
||||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use futures_core::ready;
|
||||
|
||||
/// A future that may have completed.
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum MaybeDone<Fut: Future> {
|
||||
/// A not-yet-completed future
|
||||
Future(Fut),
|
||||
|
||||
/// The output of the completed future
|
||||
Done(Fut::Output),
|
||||
|
||||
/// The empty variant after the result of a [`MaybeDone`] has been
|
||||
/// taken using the [`take`](MaybeDone::take) method.
|
||||
Gone,
|
||||
}
|
||||
|
||||
impl<Fut: Future> MaybeDone<Fut> {
|
||||
/// Create a new instance of `MaybeDone`.
|
||||
pub(crate) fn new(future: Fut) -> MaybeDone<Fut> {
|
||||
Self::Future(future)
|
||||
}
|
||||
|
||||
/// Returns an [`Option`] containing a reference to the output of the future.
|
||||
/// The output of this method will be [`Some`] if and only if the inner
|
||||
/// future has been completed and [`take`](MaybeDone::take)
|
||||
/// has not yet been called.
|
||||
#[inline]
|
||||
pub(crate) fn output(self: Pin<&Self>) -> Option<&Fut::Output> {
|
||||
let this = self.get_ref();
|
||||
match this {
|
||||
MaybeDone::Done(res) => Some(res),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Attempt to take the output of a `MaybeDone` without driving it
|
||||
/// towards completion.
|
||||
#[inline]
|
||||
pub(crate) fn take(self: Pin<&mut Self>) -> Option<Fut::Output> {
|
||||
unsafe {
|
||||
let this = self.get_unchecked_mut();
|
||||
match this {
|
||||
MaybeDone::Done(_) => {}
|
||||
MaybeDone::Future(_) | MaybeDone::Gone => return None,
|
||||
};
|
||||
if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
|
||||
Some(output)
|
||||
} else {
|
||||
unreachable!()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<Fut: Future> Future for MaybeDone<Fut> {
|
||||
type Output = ();
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let res = unsafe {
|
||||
match Pin::as_mut(&mut self).get_unchecked_mut() {
|
||||
MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)),
|
||||
MaybeDone::Done(_) => return Poll::Ready(()),
|
||||
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
|
||||
}
|
||||
};
|
||||
self.set(MaybeDone::Done(res));
|
||||
Poll::Ready(())
|
||||
}
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use super::read_until_internal;
|
||||
use crate::io::{self, BufRead};
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
/// A stream over the contents of an instance of [`BufRead`] split on a particular byte.
|
||||
///
|
||||
/// This stream is created by the [`split`] method on types that implement [`BufRead`].
|
||||
///
|
||||
/// This type is an async version of [`std::io::Split`].
|
||||
///
|
||||
/// [`split`]: trait.BufRead.html#method.lines
|
||||
/// [`BufRead`]: trait.BufRead.html
|
||||
/// [`std::io::Split`]: https://doc.rust-lang.org/std/io/struct.Split.html
|
||||
#[derive(Debug)]
|
||||
pub struct Split<R> {
|
||||
#[pin]
|
||||
pub(crate) reader: R,
|
||||
pub(crate) buf: Vec<u8>,
|
||||
pub(crate) read: usize,
|
||||
pub(crate) delim: u8,
|
||||
}
|
||||
}
|
||||
|
||||
impl<R: BufRead> Stream for Split<R> {
|
||||
type Item = io::Result<Vec<u8>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
let n = futures_core::ready!(read_until_internal(
|
||||
this.reader,
|
||||
cx,
|
||||
*this.delim,
|
||||
this.buf,
|
||||
this.read
|
||||
))?;
|
||||
if n == 0 && this.buf.is_empty() {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
if this.buf[this.buf.len() - 1] == *this.delim {
|
||||
this.buf.pop();
|
||||
}
|
||||
Poll::Ready(Some(Ok(mem::replace(this.buf, vec![]))))
|
||||
}
|
||||
}
|
@ -1,330 +1,59 @@
|
||||
//! Traits, helpers, and type definitions for core I/O functionality.
|
||||
//!
|
||||
//! The `async_std::io` module contains a number of common things you'll need
|
||||
//! when doing input and output. The most core part of this module is
|
||||
//! the [`Read`] and [`Write`] traits, which provide the
|
||||
//! most general interface for reading and writing input and output.
|
||||
//! Basic input and output.
|
||||
//!
|
||||
//! This module is an async version of [`std::io`].
|
||||
//!
|
||||
//! [`std::io`]: https://doc.rust-lang.org/std/io/index.html
|
||||
//!
|
||||
//! # Read and Write
|
||||
//!
|
||||
//! Because they are traits, [`Read`] and [`Write`] are implemented by a number
|
||||
//! of other types, and you can implement them for your types too. As such,
|
||||
//! you'll see a few different types of I/O throughout the documentation in
|
||||
//! this module: [`File`]s, [`TcpStream`]s, and sometimes even [`Vec<T>`]s. For
|
||||
//! example, [`Read`] adds a [`read`][`Read::read`] method, which we can use on
|
||||
//! [`File`]s:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use async_std::fs::File;
|
||||
//! use async_std::prelude::*;
|
||||
//!
|
||||
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
//! #
|
||||
//! let mut f = File::open("foo.txt").await?;
|
||||
//! let mut buffer = [0; 10];
|
||||
//!
|
||||
//! // read up to 10 bytes
|
||||
//! let n = f.read(&mut buffer).await?;
|
||||
//!
|
||||
//! println!("The bytes: {:?}", &buffer[..n]);
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
//!
|
||||
//! [`Read`] and [`Write`] are so important, implementors of the two traits have a
|
||||
//! nickname: readers and writers. So you'll sometimes see 'a reader' instead
|
||||
//! of 'a type that implements the [`Read`] trait'. Much easier!
|
||||
//!
|
||||
//! ## Seek and BufRead
|
||||
//!
|
||||
//! Beyond that, there are two important traits that are provided: [`Seek`]
|
||||
//! and [`BufRead`]. Both of these build on top of a reader to control
|
||||
//! how the reading happens. [`Seek`] lets you control where the next byte is
|
||||
//! coming from:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use async_std::fs::File;
|
||||
//! use async_std::io::SeekFrom;
|
||||
//! use async_std::prelude::*;
|
||||
//!
|
||||
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
//! #
|
||||
//! let mut f = File::open("foo.txt").await?;
|
||||
//! let mut buffer = [0; 10];
|
||||
//!
|
||||
//! // skip to the last 10 bytes of the file
|
||||
//! f.seek(SeekFrom::End(-10)).await?;
|
||||
//!
|
||||
//! // read up to 10 bytes
|
||||
//! let n = f.read(&mut buffer).await?;
|
||||
//!
|
||||
//! println!("The bytes: {:?}", &buffer[..n]);
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
//!
|
||||
//! [`BufRead`] uses an internal buffer to provide a number of other ways to read, but
|
||||
//! to show it off, we'll need to talk about buffers in general. Keep reading!
|
||||
//!
|
||||
//! ## BufReader and BufWriter
|
||||
//!
|
||||
//! Byte-based interfaces are unwieldy and can be inefficient, as we'd need to be
|
||||
//! making near-constant calls to the operating system. To help with this,
|
||||
//! `std::io` comes with two structs, [`BufReader`] and [`BufWriter`], which wrap
|
||||
//! readers and writers. The wrapper uses a buffer, reducing the number of
|
||||
//! calls and providing nicer methods for accessing exactly what you want.
|
||||
//!
|
||||
//! For example, [`BufReader`] works with the [`BufRead`] trait to add extra
|
||||
//! methods to any reader:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use async_std::fs::File;
|
||||
//! use async_std::io::BufReader;
|
||||
//! use async_std::prelude::*;
|
||||
//!
|
||||
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
//! #
|
||||
//! let f = File::open("foo.txt").await?;
|
||||
//! let mut reader = BufReader::new(f);
|
||||
//! let mut buffer = String::new();
|
||||
//!
|
||||
//! // read a line into buffer
|
||||
//! reader.read_line(&mut buffer).await?;
|
||||
//!
|
||||
//! println!("{}", buffer);
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
//!
|
||||
//! [`BufWriter`] doesn't add any new ways of writing; it just buffers every call
|
||||
//! to [`write`][`Write::write`]:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use async_std::fs::File;
|
||||
//! use async_std::io::prelude::*;
|
||||
//! use async_std::io::BufWriter;
|
||||
//!
|
||||
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
//! #
|
||||
//! let f = File::create("foo.txt").await?;
|
||||
//! {
|
||||
//! let mut writer = BufWriter::new(f);
|
||||
//!
|
||||
//! // write a byte to the buffer
|
||||
//! writer.write(&[42]).await?;
|
||||
//! } // the buffer is flushed once writer goes out of scope
|
||||
//! //
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Standard input and output
|
||||
//!
|
||||
//! A very common source of input is standard input:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use async_std::io;
|
||||
//!
|
||||
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
//! #
|
||||
//! let mut input = String::new();
|
||||
//!
|
||||
//! io::stdin().read_line(&mut input).await?;
|
||||
//!
|
||||
//! println!("You typed: {}", input.trim());
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
//!
|
||||
//! Note that you cannot use the [`?` operator] in functions that do not return
|
||||
//! a [`Result<T, E>`][`Result`]. Instead, you can call [`.unwrap()`]
|
||||
//! or `match` on the return value to catch any possible errors:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use async_std::io;
|
||||
//!
|
||||
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
//! #
|
||||
//! let mut input = String::new();
|
||||
//!
|
||||
//! io::stdin().read_line(&mut input).await.unwrap();
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
//!
|
||||
//! And a very common source of output is standard output:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use async_std::io;
|
||||
//! use async_std::io::prelude::*;
|
||||
//!
|
||||
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
//! #
|
||||
//! io::stdout().write(&[42]).await?;
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
//!
|
||||
//! Of course, using [`io::stdout`] directly is less common than something like
|
||||
//! [`println!`].
|
||||
//!
|
||||
//! ## Iterator types
|
||||
//!
|
||||
//! A large number of the structures provided by `std::io` are for various
|
||||
//! ways of iterating over I/O. For example, [`Lines`] is used to split over
|
||||
//! lines:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use async_std::fs::File;
|
||||
//! use async_std::io::BufReader;
|
||||
//! use async_std::prelude::*;
|
||||
//!
|
||||
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
//! #
|
||||
//! let f = File::open("foo.txt").await?;
|
||||
//! let reader = BufReader::new(f);
|
||||
//!
|
||||
//! let mut lines = reader.lines();
|
||||
//! while let Some(line) = lines.next().await {
|
||||
//! println!("{}", line?);
|
||||
//! }
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
//!
|
||||
//! ## Functions
|
||||
//! # Examples
|
||||
//!
|
||||
//! There are a number of [functions][functions-list] that offer access to various
|
||||
//! features. For example, we can use three of these functions to copy everything
|
||||
//! from standard input to standard output:
|
||||
//! Read a line from the standard input:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! use async_std::io;
|
||||
//!
|
||||
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
//! #
|
||||
//! io::copy(&mut io::stdin(), &mut io::stdout()).await?;
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
//!
|
||||
//! [functions-list]: #functions-1
|
||||
//!
|
||||
//! ## io::Result
|
||||
//!
|
||||
//! Last, but certainly not least, is [`io::Result`]. This type is used
|
||||
//! as the return type of many `std::io` functions that can cause an error, and
|
||||
//! can be returned from your own functions as well. Many of the examples in this
|
||||
//! module use the [`?` operator]:
|
||||
//!
|
||||
//! ```
|
||||
//! #![allow(dead_code)]
|
||||
//! use async_std::io;
|
||||
//!
|
||||
//! async fn read_input() -> io::Result<()> {
|
||||
//! let mut input = String::new();
|
||||
//!
|
||||
//! io::stdin().read_line(&mut input).await?;
|
||||
//!
|
||||
//! println!("You typed: {}", input.trim());
|
||||
//!
|
||||
//! Ok(())
|
||||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! The return type of `read_input`, [`io::Result<()>`][`io::Result`], is a very
|
||||
//! common type for functions which don't have a 'real' return value, but do want to
|
||||
//! return errors if they happen. In this case, the only purpose of this function is
|
||||
//! to read the line and print it, so we use `()`.
|
||||
//!
|
||||
//! ## Platform-specific behavior
|
||||
//!
|
||||
//! Many I/O functions throughout the standard library are documented to indicate
|
||||
//! what various library or syscalls they are delegated to. This is done to help
|
||||
//! applications both understand what's happening under the hood as well as investigate
|
||||
//! any possibly unclear semantics. Note, however, that this is informative, not a binding
|
||||
//! contract. The implementation of many of these functions are subject to change over
|
||||
//! time and may call fewer or more syscalls/library functions.
|
||||
//!
|
||||
//! [`Read`]: trait.Read.html
|
||||
//! [`Write`]: trait.Write.html
|
||||
//! [`Seek`]: trait.Seek.html
|
||||
//! [`BufRead`]: trait.BufRead.html
|
||||
//! [`File`]: ../fs/struct.File.html
|
||||
//! [`TcpStream`]: ../net/struct.TcpStream.html
|
||||
//! [`Vec<T>`]: ../vec/struct.Vec.html
|
||||
//! [`BufReader`]: struct.BufReader.html
|
||||
//! [`BufWriter`]: struct.BufWriter.html
|
||||
//! [`Write::write`]: trait.Write.html#tymethod.write
|
||||
//! [`io::stdout`]: fn.stdout.html
|
||||
//! [`println!`]: ../macro.println.html
|
||||
//! [`Lines`]: struct.Lines.html
|
||||
//! [`io::Result`]: type.Result.html
|
||||
//! [`?` operator]: https://doc.rust-lang.org/stable/book/appendix-02-operators.html
|
||||
//! [`Read::read`]: trait.Read.html#tymethod.read
|
||||
//! [`Result`]: https://doc.rust-lang.org/std/result/enum.Result.html
|
||||
//! [`.unwrap()`]: https://doc.rust-lang.org/std/result/enum.Result.html#method.unwrap
|
||||
|
||||
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
|
||||
|
||||
cfg_std! {
|
||||
#[doc(inline)]
|
||||
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
|
||||
|
||||
pub use buf_read::{BufRead, Lines, Split};
|
||||
pub use buf_reader::BufReader;
|
||||
pub use buf_writer::{BufWriter, IntoInnerError};
|
||||
pub use copy::copy;
|
||||
pub use cursor::Cursor;
|
||||
pub use empty::{empty, Empty};
|
||||
pub use read::*;
|
||||
pub use repeat::{repeat, Repeat};
|
||||
pub use seek::Seek;
|
||||
pub use sink::{sink, Sink};
|
||||
pub use write::Write;
|
||||
|
||||
pub mod prelude;
|
||||
|
||||
pub(crate) mod buf_read;
|
||||
pub(crate) mod read;
|
||||
pub(crate) mod seek;
|
||||
pub(crate) mod write;
|
||||
pub(crate) mod utils;
|
||||
|
||||
mod buf_reader;
|
||||
mod buf_writer;
|
||||
mod copy;
|
||||
mod cursor;
|
||||
mod empty;
|
||||
mod repeat;
|
||||
mod sink;
|
||||
}
|
||||
|
||||
cfg_default! {
|
||||
// For use in the print macros.
|
||||
#[doc(hidden)]
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
pub use stdio::{_eprint, _print};
|
||||
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
pub use stderr::{stderr, Stderr};
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
pub use stdin::{stdin, Stdin};
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
pub use stdout::{stdout, Stdout};
|
||||
pub use timeout::timeout;
|
||||
|
||||
mod timeout;
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
mod stderr;
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
mod stdin;
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
mod stdio;
|
||||
#[cfg(not(target_os = "unknown"))]
|
||||
mod stdout;
|
||||
}
|
||||
//! let stdin = io::stdin();
|
||||
//! let mut line = String::new();
|
||||
//! stdin.read_line(&mut line).await?;
|
||||
//! #
|
||||
//! # Ok(()) }) }
|
||||
//! ```
|
||||
|
||||
#[doc(inline)]
|
||||
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
|
||||
|
||||
pub use buf_read::{BufRead, Lines};
|
||||
pub use buf_reader::BufReader;
|
||||
pub use buf_writer::BufWriter;
|
||||
pub use copy::copy;
|
||||
pub use cursor::Cursor;
|
||||
pub use empty::{empty, Empty};
|
||||
pub use read::Read;
|
||||
pub use repeat::{repeat, Repeat};
|
||||
pub use seek::Seek;
|
||||
pub use sink::{sink, Sink};
|
||||
pub use stderr::{stderr, Stderr};
|
||||
pub use stdin::{stdin, Stdin};
|
||||
pub use stdout::{stdout, Stdout};
|
||||
pub use timeout::timeout;
|
||||
pub use write::Write;
|
||||
|
||||
pub mod prelude;
|
||||
|
||||
pub(crate) mod buf_read;
|
||||
pub(crate) mod read;
|
||||
pub(crate) mod seek;
|
||||
pub(crate) mod write;
|
||||
|
||||
mod buf_reader;
|
||||
mod buf_writer;
|
||||
mod copy;
|
||||
mod cursor;
|
||||
mod empty;
|
||||
mod repeat;
|
||||
mod sink;
|
||||
mod stderr;
|
||||
mod stdin;
|
||||
mod stdout;
|
||||
mod timeout;
|
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue