mirror of
https://github.com/async-rs/async-std.git
synced 2025-01-16 10:49:55 +00:00
Merge branch 'book-test' of https://github.com/freebroccolo/async-std into freebroccolo-book-test
This commit is contained in:
commit
b2fc92e5d6
15 changed files with 494 additions and 142 deletions
18
.travis.yml
18
.travis.yml
|
@ -3,15 +3,12 @@ language: rust
|
||||||
env:
|
env:
|
||||||
- RUSTFLAGS="-D warnings"
|
- RUSTFLAGS="-D warnings"
|
||||||
|
|
||||||
before_script:
|
|
||||||
- rustup component add rustfmt
|
|
||||||
|
|
||||||
matrix:
|
matrix:
|
||||||
fast_finish: true
|
fast_finish: true
|
||||||
include:
|
include:
|
||||||
- rust: nightly
|
- rust: nightly
|
||||||
os: linux
|
os: linux
|
||||||
env: BUILD_DOCS=1
|
env: BUILD_DOCS=1 BUILD_BOOK=1
|
||||||
- rust: nightly
|
- rust: nightly
|
||||||
os: osx
|
os: osx
|
||||||
osx_image: xcode9.2
|
osx_image: xcode9.2
|
||||||
|
@ -19,8 +16,15 @@ matrix:
|
||||||
- rust: nightly-x86_64-pc-windows-msvc
|
- rust: nightly-x86_64-pc-windows-msvc
|
||||||
os: windows
|
os: windows
|
||||||
|
|
||||||
|
before_script:
|
||||||
|
- rustup component add rustfmt
|
||||||
|
- (test -x $HOME/.cargo/bin/cargo-install-update || cargo install cargo-update)
|
||||||
|
- (test -x $HOME/.cargo/bin/mdbook || cargo install --vers "^0.3" mdbook)
|
||||||
|
- cargo install-update -a
|
||||||
|
|
||||||
script:
|
script:
|
||||||
- cargo check --all --benches --bins --examples --tests
|
- if ![[ -n "$BUILD_BOOK" ]]; then cargo check --all --benches --bins --examples --tests && cargo test --all; fi
|
||||||
- cargo test --all
|
- if [[ -n "$BUILD_BOOK" ]]; then cargo test --all --benches --bins --examples --tests; fi
|
||||||
- cargo fmt --all -- --check
|
- cargo fmt --all -- --check
|
||||||
- if [[ -n "$BUILD_DOCS" ]]; then cargo doc --features docs; fi
|
- if [[ -n "$BUILD_DOCS" ]]; then cargo doc --features docs; fi
|
||||||
|
- if [[ -n "$BUILD_BOOK" ]]; then mdbook build docs && mdbook test -L ./target/debug/deps docs; fi
|
||||||
|
|
|
@ -26,7 +26,6 @@ docs = []
|
||||||
async-task = "1.0.0"
|
async-task = "1.0.0"
|
||||||
cfg-if = "0.1.9"
|
cfg-if = "0.1.9"
|
||||||
crossbeam-channel = "0.3.9"
|
crossbeam-channel = "0.3.9"
|
||||||
futures-preview = "0.3.0-alpha.17"
|
|
||||||
futures-timer = "0.3.0"
|
futures-timer = "0.3.0"
|
||||||
lazy_static = "1.3.0"
|
lazy_static = "1.3.0"
|
||||||
log = { version = "0.4.8", features = ["kv_unstable"] }
|
log = { version = "0.4.8", features = ["kv_unstable"] }
|
||||||
|
@ -37,6 +36,10 @@ num_cpus = "1.10.0"
|
||||||
pin-utils = "0.1.0-alpha.4"
|
pin-utils = "0.1.0-alpha.4"
|
||||||
slab = "0.4.2"
|
slab = "0.4.2"
|
||||||
|
|
||||||
|
[dependencies.futures-preview]
|
||||||
|
version = "0.3.0-alpha.17"
|
||||||
|
features = ["async-await", "nightly"]
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
femme = "1.1.0"
|
femme = "1.1.0"
|
||||||
tempdir = "0.3.7"
|
tempdir = "0.3.7"
|
||||||
|
|
0
docs/src/concepts/data.csv
Normal file
0
docs/src/concepts/data.csv
Normal file
|
|
@ -50,12 +50,14 @@ Remember the talk about "deferred computation" in the intro? That's all it is. I
|
||||||
|
|
||||||
Let's have a look at a simple function, specifically the return value:
|
Let's have a look at a simple function, specifically the return value:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# use std::{fs::File, io::{self, Read}};
|
||||||
|
#
|
||||||
fn read_file(path: &str) -> Result<String, io::Error> {
|
fn read_file(path: &str) -> Result<String, io::Error> {
|
||||||
let mut file = File.open(path)?;
|
let mut file = File::open(path)?;
|
||||||
let mut contents = String::new();
|
let mut contents = String::new();
|
||||||
file.read_to_string(&mut contents)?;
|
file.read_to_string(&mut contents)?;
|
||||||
contents
|
Ok(contents)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -64,12 +66,14 @@ Note that this return value talks about the past. The past has a drawback: all d
|
||||||
|
|
||||||
But we wanted to abstract over *computation* and let someone else choose how to run it. That's fundamentally incompatible with looking at the results of previous computation all the time. So, let's find a type that *describes* a computation without running it. Let's look at the function again:
|
But we wanted to abstract over *computation* and let someone else choose how to run it. That's fundamentally incompatible with looking at the results of previous computation all the time. So, let's find a type that *describes* a computation without running it. Let's look at the function again:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# use std::{fs::File, io::{self, Read}};
|
||||||
|
#
|
||||||
fn read_file(path: &str) -> Result<String, io::Error> {
|
fn read_file(path: &str) -> Result<String, io::Error> {
|
||||||
let mut file = File.open(path)?;
|
let mut file = File::open(path)?;
|
||||||
let mut contents = String::new();
|
let mut contents = String::new();
|
||||||
file.read_to_string(&mut contents)?;
|
file.read_to_string(&mut contents)?;
|
||||||
contents
|
Ok(contents)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -79,10 +83,11 @@ This is the moment where we could reach for [threads](https://en.wikipedia.org/w
|
||||||
|
|
||||||
What we are searching for is something that represents ongoing work towards a result in the future. Whenever we say "something" in Rust, we almost always mean a trait. Let's start with an incomplete definition of the `Future` trait:
|
What we are searching for is something that represents ongoing work towards a result in the future. Whenever we say "something" in Rust, we almost always mean a trait. Let's start with an incomplete definition of the `Future` trait:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# use std::{pin::Pin, task::{Context, Poll}};
|
||||||
|
#
|
||||||
trait Future {
|
trait Future {
|
||||||
type Output;
|
type Output;
|
||||||
|
|
||||||
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
@ -105,14 +110,16 @@ Note that calling `poll` again after case 1 happened may result in confusing beh
|
||||||
|
|
||||||
While the `Future` trait has existed in Rust for a while, it was inconvenient to build and describe them. For this, Rust now has a special syntax: `async`. The example from above, implemented with `async-std`, would look like this:
|
While the `Future` trait has existed in Rust for a while, it was inconvenient to build and describe them. For this, Rust now has a special syntax: `async`. The example from above, implemented with `async-std`, would look like this:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
use async_std::fs::File;
|
# extern crate async_std;
|
||||||
|
# use async_std::{fs::File, io::Read};
|
||||||
|
# use std::io;
|
||||||
|
#
|
||||||
async fn read_file(path: &str) -> Result<String, io::Error> {
|
async fn read_file(path: &str) -> Result<String, io::Error> {
|
||||||
let mut file = File.open(path).await?;
|
let mut file = File::open(path).await?;
|
||||||
let mut contents = String::new();
|
let mut contents = String::new();
|
||||||
file.read_to_string(&mut contents).await?;
|
file.read_to_string(&mut contents).await?;
|
||||||
contents
|
Ok(contents)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|
|
@ -4,15 +4,16 @@ Now that we know what Futures are, we want to run them!
|
||||||
|
|
||||||
In `async-std`, the [`tasks`][tasks] module is responsible for this. The simplest way is using the `block_on` function:
|
In `async-std`, the [`tasks`][tasks] module is responsible for this. The simplest way is using the `block_on` function:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
use async_std::fs::File;
|
# extern crate async_std;
|
||||||
use async_std::task;
|
# use async_std::{fs::File, io::Read, task};
|
||||||
|
# use std::io;
|
||||||
|
#
|
||||||
async fn read_file(path: &str) -> Result<String, io::Error> {
|
async fn read_file(path: &str) -> Result<String, io::Error> {
|
||||||
let mut file = File::open(path).await?;
|
let mut file = File::open(path).await?;
|
||||||
let mut contents = String::new();
|
let mut contents = String::new();
|
||||||
file.read_to_string(&mut contents).await?;
|
file.read_to_string(&mut contents).await?;
|
||||||
contents
|
Ok(contents)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
|
@ -31,24 +32,35 @@ fn main() {
|
||||||
|
|
||||||
This asks the runtime baked into `async_std` to execute the code that reads a file. Let's go one by one, though, inside to outside.
|
This asks the runtime baked into `async_std` to execute the code that reads a file. Let's go one by one, though, inside to outside.
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# use async_std::{fs::File, io::Read, task};
|
||||||
|
# use std::io;
|
||||||
|
#
|
||||||
|
# async fn read_file(path: &str) -> Result<String, io::Error> {
|
||||||
|
# let mut file = File::open(path).await?;
|
||||||
|
# let mut contents = String::new();
|
||||||
|
# file.read_to_string(&mut contents).await?;
|
||||||
|
# Ok(contents)
|
||||||
|
# }
|
||||||
|
#
|
||||||
async {
|
async {
|
||||||
let result = read_file("data.csv").await;
|
let result = read_file("data.csv").await;
|
||||||
match result {
|
match result {
|
||||||
Ok(s) => println!("{}", s),
|
Ok(s) => println!("{}", s),
|
||||||
Err(e) => println!("Error reading file: {:?}", e)
|
Err(e) => println!("Error reading file: {:?}", e)
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
```
|
```
|
||||||
|
|
||||||
This is an `async` *block*. Async blocks are necessary to call `async` functions, and will instruct the compiler to include all the relevant instructions to do so. In Rust, all blocks return a value and `async` blocks happen to return a value of the kind `Future`.
|
This is an `async` *block*. Async blocks are necessary to call `async` functions, and will instruct the compiler to include all the relevant instructions to do so. In Rust, all blocks return a value and `async` blocks happen to return a value of the kind `Future`.
|
||||||
|
|
||||||
But let's get to the interesting part:
|
But let's get to the interesting part:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
task::spawn(async { })
|
# use async_std::task;
|
||||||
|
task::spawn(async { });
|
||||||
```
|
```
|
||||||
|
|
||||||
`spawn` takes a `Future` and starts running it on a `Task`. It returns a `JoinHandle`. Futures in Rust are sometimes called *cold* Futures. You need something that starts running them. To run a Future, there may be some additional bookkeeping required, e.g. whether it's running or finished, where it is being placed in memory and what the current state is. This bookkeeping part is abstracted away in a `Task`.
|
`spawn` takes a `Future` and starts running it on a `Task`. It returns a `JoinHandle`. Futures in Rust are sometimes called *cold* Futures. You need something that starts running them. To run a Future, there may be some additional bookkeeping required, e.g. whether it's running or finished, where it is being placed in memory and what the current state is. This bookkeeping part is abstracted away in a `Task`.
|
||||||
|
@ -72,7 +84,9 @@ Tasks in `async_std` are one of the core abstractions. Much like Rust's `thread`
|
||||||
|
|
||||||
`Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and by itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this:
|
`Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and by itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# use async_std::task;
|
||||||
fn main() {
|
fn main() {
|
||||||
task::block_on(async {
|
task::block_on(async {
|
||||||
// this is std::fs, which blocks
|
// this is std::fs, which blocks
|
||||||
|
@ -91,7 +105,9 @@ In case of `panic`, behaviour differs depending on whether there's a reasonable
|
||||||
|
|
||||||
In practice, that means that `block_on` propagates panics to the blocking component:
|
In practice, that means that `block_on` propagates panics to the blocking component:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018,should_panic
|
||||||
|
# extern crate async_std;
|
||||||
|
# use async_std::task;
|
||||||
fn main() {
|
fn main() {
|
||||||
task::block_on(async {
|
task::block_on(async {
|
||||||
panic!("test");
|
panic!("test");
|
||||||
|
@ -106,7 +122,10 @@ note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace.
|
||||||
|
|
||||||
While panicing a spawned task will abort:
|
While panicing a spawned task will abort:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018,should_panic
|
||||||
|
# extern crate async_std;
|
||||||
|
# use async_std::task;
|
||||||
|
# use std::time::Duration;
|
||||||
task::spawn(async {
|
task::spawn(async {
|
||||||
panic!("test");
|
panic!("test");
|
||||||
});
|
});
|
||||||
|
|
|
@ -6,11 +6,11 @@ A collection of small, useful patterns.
|
||||||
|
|
||||||
`async-std` doesn't provide a `split()` method on `io` handles. Instead, splitting a stream into a read and write half can be done like this:
|
`async-std` doesn't provide a `split()` method on `io` handles. Instead, splitting a stream into a read and write half can be done like this:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
use async_std::io;
|
# extern crate async_std;
|
||||||
|
use async_std::{io, net::TcpStream};
|
||||||
async fn echo(stream: io::TcpStream) {
|
async fn echo(stream: TcpStream) {
|
||||||
let (reader, writer) = &mut (&stream, &stream);
|
let (reader, writer) = &mut (&stream, &stream);
|
||||||
io::copy(reader, writer).await?;
|
io::copy(reader, writer).await;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
|
@ -32,7 +32,7 @@ This policy is adapted from the [Rust project](https://www.rust-lang.org/policie
|
||||||
|
|
||||||
## PGP Key
|
## PGP Key
|
||||||
|
|
||||||
```
|
```text
|
||||||
-----BEGIN PGP PUBLIC KEY BLOCK-----
|
-----BEGIN PGP PUBLIC KEY BLOCK-----
|
||||||
|
|
||||||
mQENBF1Wu/ABCADJaGt4HwSlqKB9BGHWYKZj/6mTMbmc29vsEOcCSQKo6myCf9zc
|
mQENBF1Wu/ABCADJaGt4HwSlqKB9BGHWYKZj/6mTMbmc29vsEOcCSQKo6myCf9zc
|
||||||
|
|
|
@ -2,15 +2,14 @@
|
||||||
|
|
||||||
Let's implement the scaffold of the server: a loop that binds a TCP socket to an address and starts accepting connections.
|
Let's implement the scaffold of the server: a loop that binds a TCP socket to an address and starts accepting connections.
|
||||||
|
|
||||||
|
|
||||||
First of all, let's add required import boilerplate:
|
First of all, let's add required import boilerplate:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
use std::net::ToSocketAddrs; // 1
|
use std::net::ToSocketAddrs; // 1
|
||||||
|
|
||||||
use async_std::{
|
use async_std::{
|
||||||
prelude::*, // 2
|
prelude::*, // 2
|
||||||
task, // 3
|
task, // 3
|
||||||
net::TcpListener, // 4
|
net::TcpListener, // 4
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -18,7 +17,7 @@ type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>
|
||||||
```
|
```
|
||||||
|
|
||||||
1. `async_std` uses `std` types where appropriate.
|
1. `async_std` uses `std` types where appropriate.
|
||||||
We'll need `ToSocketAddrs` to specify address to listen on.
|
We'll need `ToSocketAddrs` to specify address to listen on.
|
||||||
2. `prelude` re-exports some traits required to work with futures and streams.
|
2. `prelude` re-exports some traits required to work with futures and streams.
|
||||||
3. The `task` module roughly corresponds to the `std::thread` module, but tasks are much lighter weight.
|
3. The `task` module roughly corresponds to the `std::thread` module, but tasks are much lighter weight.
|
||||||
A single thread can run many tasks.
|
A single thread can run many tasks.
|
||||||
|
@ -27,10 +26,18 @@ type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>
|
||||||
To propagate the errors, we will use a boxed error trait object.
|
To propagate the errors, we will use a boxed error trait object.
|
||||||
Do you know that there's `From<&'_ str> for Box<dyn Error>` implementation in stdlib, which allows you to use strings with `?` operator?
|
Do you know that there's `From<&'_ str> for Box<dyn Error>` implementation in stdlib, which allows you to use strings with `?` operator?
|
||||||
|
|
||||||
|
|
||||||
Now we can write the server's accept loop:
|
Now we can write the server's accept loop:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# use async_std::{
|
||||||
|
# net::TcpListener,
|
||||||
|
# prelude::Stream,
|
||||||
|
# };
|
||||||
|
# use std::net::ToSocketAddrs;
|
||||||
|
#
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
#
|
||||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
|
async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
|
||||||
let listener = TcpListener::bind(addr).await?; // 2
|
let listener = TcpListener::bind(addr).await?; // 2
|
||||||
let mut incoming = listener.incoming();
|
let mut incoming = listener.incoming();
|
||||||
|
@ -48,20 +55,39 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
|
||||||
Mirroring API of `std` is an explicit design goal of `async_std`.
|
Mirroring API of `std` is an explicit design goal of `async_std`.
|
||||||
3. Here, we would like to iterate incoming sockets, just how one would do in `std`:
|
3. Here, we would like to iterate incoming sockets, just how one would do in `std`:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018,should_panic
|
||||||
let listener: std::net::TcpListener = unimplemented!();
|
let listener: std::net::TcpListener = unimplemented!();
|
||||||
for stream in listener.incoming() {
|
for stream in listener.incoming() {
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
}
|
Unfortunately this doesn't quite work with `async` yet, because there's no support for `async` for-loops in the language yet.
|
||||||
```
|
For this reason we have to implement the loop manually, by using `while let Some(item) = iter.next().await` pattern.
|
||||||
|
|
||||||
Unfortunately this doesn't quite work with `async` yet, because there's no support for `async` for-loops in the language yet.
|
|
||||||
For this reason we have to implement the loop manually, by using `while let Some(item) = iter.next().await` pattern.
|
|
||||||
|
|
||||||
Finally, let's add main:
|
Finally, let's add main:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
fn main() -> Result<()> {
|
# extern crate async_std;
|
||||||
|
# use async_std::{
|
||||||
|
# net::TcpListener,
|
||||||
|
# prelude::Stream,
|
||||||
|
# task,
|
||||||
|
# };
|
||||||
|
# use std::net::ToSocketAddrs;
|
||||||
|
#
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
#
|
||||||
|
# async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
|
||||||
|
# let listener = TcpListener::bind(addr).await?; // 2
|
||||||
|
# let mut incoming = listener.incoming();
|
||||||
|
# while let Some(stream) = incoming.next().await { // 3
|
||||||
|
# // TODO
|
||||||
|
# }
|
||||||
|
# Ok(())
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
// main
|
||||||
|
fn run() -> Result<()> {
|
||||||
let fut = server("127.0.0.1:8080");
|
let fut = server("127.0.0.1:8080");
|
||||||
task::block_on(fut)
|
task::block_on(fut)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,36 +1,46 @@
|
||||||
|
|
||||||
## All Together
|
## All Together
|
||||||
|
|
||||||
At this point, we only need to start the broker to get a fully-functioning (in the happy case!) chat:
|
At this point, we only need to start the broker to get a fully-functioning (in the happy case!) chat:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
use std::{
|
# extern crate async_std;
|
||||||
net::ToSocketAddrs,
|
# extern crate futures;
|
||||||
sync::Arc,
|
use async_std::{
|
||||||
collections::hash_map::{HashMap, Entry},
|
io::{self, BufReader},
|
||||||
|
net::{TcpListener, TcpStream},
|
||||||
|
prelude::*,
|
||||||
|
task,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{
|
use futures::{
|
||||||
channel::mpsc,
|
channel::mpsc,
|
||||||
SinkExt,
|
SinkExt,
|
||||||
};
|
};
|
||||||
|
use std::{
|
||||||
use async_std::{
|
collections::hash_map::{HashMap, Entry},
|
||||||
io::BufReader,
|
net::ToSocketAddrs,
|
||||||
prelude::*,
|
sync::Arc,
|
||||||
task,
|
|
||||||
net::{TcpListener, TcpStream},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
type Sender<T> = mpsc::UnboundedSender<T>;
|
type Sender<T> = mpsc::UnboundedSender<T>;
|
||||||
type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||||
|
|
||||||
|
// main
|
||||||
fn main() -> Result<()> {
|
fn run() -> Result<()> {
|
||||||
task::block_on(server("127.0.0.1:8080"))
|
task::block_on(server("127.0.0.1:8080"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
|
||||||
|
where
|
||||||
|
F: Future<Output = Result<()>> + Send + 'static,
|
||||||
|
{
|
||||||
|
task::spawn(async move {
|
||||||
|
if let Err(e) = fut.await {
|
||||||
|
eprintln!("{}", e)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
||||||
let listener = TcpListener::bind(addr).await?;
|
let listener = TcpListener::bind(addr).await?;
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,122 @@ In `a-chat`, we already have an unidirectional flow of messages: `reader -> brok
|
||||||
However, we never wait for broker and writers, which might cause some messages to get dropped.
|
However, we never wait for broker and writers, which might cause some messages to get dropped.
|
||||||
Let's add waiting to the server:
|
Let's add waiting to the server:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# extern crate futures;
|
||||||
|
# use async_std::{
|
||||||
|
# io::{self, BufReader},
|
||||||
|
# net::{TcpListener, TcpStream},
|
||||||
|
# prelude::*,
|
||||||
|
# task,
|
||||||
|
# };
|
||||||
|
# use futures::{
|
||||||
|
# channel::mpsc,
|
||||||
|
# SinkExt,
|
||||||
|
# };
|
||||||
|
# use std::{
|
||||||
|
# collections::hash_map::{HashMap, Entry},
|
||||||
|
# net::ToSocketAddrs,
|
||||||
|
# sync::Arc,
|
||||||
|
# };
|
||||||
|
#
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
# type Sender<T> = mpsc::UnboundedSender<T>;
|
||||||
|
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||||
|
#
|
||||||
|
# fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
|
||||||
|
# where
|
||||||
|
# F: Future<Output = Result<()>> + Send + 'static,
|
||||||
|
# {
|
||||||
|
# task::spawn(async move {
|
||||||
|
# if let Err(e) = fut.await {
|
||||||
|
# eprintln!("{}", e)
|
||||||
|
# }
|
||||||
|
# })
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
#
|
||||||
|
# async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
||||||
|
# let stream = Arc::new(stream); // 2
|
||||||
|
# let reader = BufReader::new(&*stream);
|
||||||
|
# let mut lines = reader.lines();
|
||||||
|
#
|
||||||
|
# let name = match lines.next().await {
|
||||||
|
# None => Err("peer disconnected immediately")?,
|
||||||
|
# Some(line) => line?,
|
||||||
|
# };
|
||||||
|
# broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await // 3
|
||||||
|
# .unwrap();
|
||||||
|
#
|
||||||
|
# while let Some(line) = lines.next().await {
|
||||||
|
# let line = line?;
|
||||||
|
# let (dest, msg) = match line.find(':') {
|
||||||
|
# None => continue,
|
||||||
|
# Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
|
||||||
|
# };
|
||||||
|
# let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
|
||||||
|
# let msg: String = msg.trim().to_string();
|
||||||
|
#
|
||||||
|
# broker.send(Event::Message { // 4
|
||||||
|
# from: name.clone(),
|
||||||
|
# to: dest,
|
||||||
|
# msg,
|
||||||
|
# }).await.unwrap();
|
||||||
|
# }
|
||||||
|
# Ok(())
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# async fn client_writer(
|
||||||
|
# mut messages: Receiver<String>,
|
||||||
|
# stream: Arc<TcpStream>,
|
||||||
|
# ) -> Result<()> {
|
||||||
|
# let mut stream = &*stream;
|
||||||
|
# while let Some(msg) = messages.next().await {
|
||||||
|
# stream.write_all(msg.as_bytes()).await?;
|
||||||
|
# }
|
||||||
|
# Ok(())
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# #[derive(Debug)]
|
||||||
|
# enum Event {
|
||||||
|
# NewPeer {
|
||||||
|
# name: String,
|
||||||
|
# stream: Arc<TcpStream>,
|
||||||
|
# },
|
||||||
|
# Message {
|
||||||
|
# from: String,
|
||||||
|
# to: Vec<String>,
|
||||||
|
# msg: String,
|
||||||
|
# },
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
||||||
|
# let mut peers: HashMap<String, Sender<String>> = HashMap::new();
|
||||||
|
#
|
||||||
|
# while let Some(event) = events.next().await {
|
||||||
|
# match event {
|
||||||
|
# Event::Message { from, to, msg } => {
|
||||||
|
# for addr in to {
|
||||||
|
# if let Some(peer) = peers.get_mut(&addr) {
|
||||||
|
# peer.send(format!("from {}: {}\n", from, msg)).await?
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# Event::NewPeer { name, stream} => {
|
||||||
|
# match peers.entry(name) {
|
||||||
|
# Entry::Occupied(..) => (),
|
||||||
|
# Entry::Vacant(entry) => {
|
||||||
|
# let (client_sender, client_receiver) = mpsc::unbounded();
|
||||||
|
# entry.insert(client_sender); // 4
|
||||||
|
# spawn_and_log_error(client_writer(client_receiver, stream)); // 5
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# }
|
||||||
|
# Ok(())
|
||||||
|
# }
|
||||||
|
#
|
||||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
||||||
let listener = TcpListener::bind(addr).await?;
|
let listener = TcpListener::bind(addr).await?;
|
||||||
|
|
||||||
|
@ -40,11 +155,67 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
||||||
|
|
||||||
And to the broker:
|
And to the broker:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# extern crate futures;
|
||||||
|
# use async_std::{
|
||||||
|
# io::{self, BufReader},
|
||||||
|
# net::{TcpListener, TcpStream},
|
||||||
|
# prelude::*,
|
||||||
|
# task,
|
||||||
|
# };
|
||||||
|
# use futures::{
|
||||||
|
# channel::mpsc,
|
||||||
|
# SinkExt,
|
||||||
|
# };
|
||||||
|
# use std::{
|
||||||
|
# collections::hash_map::{HashMap, Entry},
|
||||||
|
# net::ToSocketAddrs,
|
||||||
|
# sync::Arc,
|
||||||
|
# };
|
||||||
|
#
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
# type Sender<T> = mpsc::UnboundedSender<T>;
|
||||||
|
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||||
|
#
|
||||||
|
# async fn client_writer(
|
||||||
|
# mut messages: Receiver<String>,
|
||||||
|
# stream: Arc<TcpStream>,
|
||||||
|
# ) -> Result<()> {
|
||||||
|
# let mut stream = &*stream;
|
||||||
|
# while let Some(msg) = messages.next().await {
|
||||||
|
# stream.write_all(msg.as_bytes()).await?;
|
||||||
|
# }
|
||||||
|
# Ok(())
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
|
||||||
|
# where
|
||||||
|
# F: Future<Output = Result<()>> + Send + 'static,
|
||||||
|
# {
|
||||||
|
# task::spawn(async move {
|
||||||
|
# if let Err(e) = fut.await {
|
||||||
|
# eprintln!("{}", e)
|
||||||
|
# }
|
||||||
|
# })
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# #[derive(Debug)]
|
||||||
|
# enum Event {
|
||||||
|
# NewPeer {
|
||||||
|
# name: String,
|
||||||
|
# stream: Arc<TcpStream>,
|
||||||
|
# },
|
||||||
|
# Message {
|
||||||
|
# from: String,
|
||||||
|
# to: Vec<String>,
|
||||||
|
# msg: String,
|
||||||
|
# },
|
||||||
|
# }
|
||||||
|
#
|
||||||
async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
||||||
let mut writers = Vec::new();
|
let mut writers = Vec::new();
|
||||||
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
|
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
|
||||||
|
|
||||||
while let Some(event) = events.next().await { // 2
|
while let Some(event) = events.next().await { // 2
|
||||||
match event {
|
match event {
|
||||||
Event::Message { from, to, msg } => {
|
Event::Message { from, to, msg } => {
|
||||||
|
|
|
@ -10,7 +10,48 @@ We can create a dedicated broker tasks which owns the `peers` map and communicat
|
||||||
By hiding `peers` inside such an "actor" task, we remove the need for mutxes and also make serialization point explicit.
|
By hiding `peers` inside such an "actor" task, we remove the need for mutxes and also make serialization point explicit.
|
||||||
The order of events "Bob sends message to Alice" and "Alice joins" is determined by the order of the corresponding events in the broker's event queue.
|
The order of events "Bob sends message to Alice" and "Alice joins" is determined by the order of the corresponding events in the broker's event queue.
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# extern crate futures;
|
||||||
|
# use async_std::{
|
||||||
|
# io::{Write},
|
||||||
|
# net::TcpStream,
|
||||||
|
# prelude::{Future, Stream},
|
||||||
|
# task,
|
||||||
|
# };
|
||||||
|
# use futures::channel::mpsc;
|
||||||
|
# use futures::SinkExt;
|
||||||
|
# use std::{
|
||||||
|
# collections::hash_map::{Entry, HashMap},
|
||||||
|
# sync::Arc,
|
||||||
|
# };
|
||||||
|
#
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
# type Sender<T> = mpsc::UnboundedSender<T>;
|
||||||
|
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||||
|
#
|
||||||
|
# async fn client_writer(
|
||||||
|
# mut messages: Receiver<String>,
|
||||||
|
# stream: Arc<TcpStream>,
|
||||||
|
# ) -> Result<()> {
|
||||||
|
# let mut stream = &*stream;
|
||||||
|
# while let Some(msg) = messages.next().await {
|
||||||
|
# stream.write_all(msg.as_bytes()).await?;
|
||||||
|
# }
|
||||||
|
# Ok(())
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
|
||||||
|
# where
|
||||||
|
# F: Future<Output = Result<()>> + Send + 'static,
|
||||||
|
# {
|
||||||
|
# task::spawn(async move {
|
||||||
|
# if let Err(e) = fut.await {
|
||||||
|
# eprintln!("{}", e)
|
||||||
|
# }
|
||||||
|
# })
|
||||||
|
# }
|
||||||
|
#
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum Event { // 1
|
enum Event { // 1
|
||||||
NewPeer {
|
NewPeer {
|
||||||
|
|
|
@ -1,11 +1,11 @@
|
||||||
## Handling Disconnections
|
## Handling Disconnections
|
||||||
|
|
||||||
Currently, we only ever *add* new peers to the map.
|
Currently, we only ever _add_ new peers to the map.
|
||||||
This is clearly wrong: if a peer closes connection to the chat, we should not try to send any more messages to it.
|
This is clearly wrong: if a peer closes connection to the chat, we should not try to send any more messages to it.
|
||||||
|
|
||||||
One subtlety with handling disconnection is that we can detect it either in the reader's task, or in the writer's task.
|
One subtlety with handling disconnection is that we can detect it either in the reader's task, or in the writer's task.
|
||||||
The most obvious solution here is to just remove the peer from the `peers` map in both cases, but this would be wrong.
|
The most obvious solution here is to just remove the peer from the `peers` map in both cases, but this would be wrong.
|
||||||
If *both* read and write fail, we'll remove the peer twice, but it can be the case that the peer reconnected between the two failures!
|
If _both_ read and write fail, we'll remove the peer twice, but it can be the case that the peer reconnected between the two failures!
|
||||||
To fix this, we will only remove the peer when the write side finishes.
|
To fix this, we will only remove the peer when the write side finishes.
|
||||||
If the read side finishes we will notify the write side that it should stop as well.
|
If the read side finishes we will notify the write side that it should stop as well.
|
||||||
That is, we need to add an ability to signal shutdown for the writer task.
|
That is, we need to add an ability to signal shutdown for the writer task.
|
||||||
|
@ -17,7 +17,17 @@ This way, we statically guarantee that we issue shutdown exactly once, even if w
|
||||||
|
|
||||||
First, let's add a shutdown channel to the `client`:
|
First, let's add a shutdown channel to the `client`:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# extern crate futures;
|
||||||
|
# use async_std::net::TcpStream;
|
||||||
|
# use futures::{channel::mpsc, SinkExt};
|
||||||
|
# use std::sync::Arc;
|
||||||
|
#
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
# type Sender<T> = mpsc::UnboundedSender<T>;
|
||||||
|
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||||
|
#
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum Void {} // 1
|
enum Void {} // 1
|
||||||
|
|
||||||
|
@ -35,17 +45,17 @@ enum Event {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
async fn client(mut broker: Sender<Event>, stream: Arc<TcpStream>) -> Result<()> {
|
||||||
// ...
|
// ...
|
||||||
|
# let name: String = unimplemented!();
|
||||||
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>(); // 3
|
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>(); // 3
|
||||||
broker.send(Event::NewPeer {
|
broker.send(Event::NewPeer {
|
||||||
name: name.clone(),
|
name: name.clone(),
|
||||||
stream: Arc::clone(&stream),
|
stream: Arc::clone(&stream),
|
||||||
shutdown: shutdown_receiver,
|
shutdown: shutdown_receiver,
|
||||||
}).await.unwrap();
|
}).await.unwrap();
|
||||||
|
|
||||||
// ...
|
// ...
|
||||||
|
# unimplemented!()
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -56,23 +66,35 @@ async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
||||||
In the `client_writer`, we now need to choose between shutdown and message channels.
|
In the `client_writer`, we now need to choose between shutdown and message channels.
|
||||||
We use the `select` macro for this purpose:
|
We use the `select` macro for this purpose:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
use futures::select;
|
# extern crate async_std;
|
||||||
use futures::FutureExt;
|
# extern crate futures;
|
||||||
|
# use async_std::{io::Write, net::TcpStream};
|
||||||
|
use futures::{channel::mpsc, select, FutureExt, StreamExt};
|
||||||
|
# use std::sync::Arc;
|
||||||
|
|
||||||
|
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
# type Sender<T> = mpsc::UnboundedSender<T>;
|
||||||
|
|
||||||
|
# #[derive(Debug)]
|
||||||
|
# enum Void {} // 1
|
||||||
|
|
||||||
async fn client_writer(
|
async fn client_writer(
|
||||||
messages: &mut Receiver<String>,
|
messages: &mut Receiver<String>,
|
||||||
stream: Arc<TcpStream>,
|
stream: Arc<TcpStream>,
|
||||||
mut shutdown: Receiver<Void>, // 1
|
shutdown: Receiver<Void>, // 1
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut stream = &*stream;
|
let mut stream = &*stream;
|
||||||
|
let mut messages = messages.fuse();
|
||||||
|
let mut shutdown = shutdown.fuse();
|
||||||
loop { // 2
|
loop { // 2
|
||||||
select! {
|
select! {
|
||||||
msg = messages.next().fuse() => match msg {
|
msg = messages.next() => match msg {
|
||||||
Some(msg) => stream.write_all(msg.as_bytes()).await?,
|
Some(msg) => stream.write_all(msg.as_bytes()).await?,
|
||||||
None => break,
|
None => break,
|
||||||
},
|
},
|
||||||
void = shutdown.next().fuse() => match void {
|
void = shutdown.next() => match void {
|
||||||
Some(void) => match void {}, // 3
|
Some(void) => match void {}, // 3
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
|
@ -94,25 +116,19 @@ This also allows us to establish a useful invariant that the message channel str
|
||||||
|
|
||||||
The final code looks like this:
|
The final code looks like this:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# extern crate futures;
|
||||||
|
use async_std::{
|
||||||
|
io::{BufReader, BufRead, Write},
|
||||||
|
net::{TcpListener, TcpStream},
|
||||||
|
task,
|
||||||
|
};
|
||||||
|
use futures::{channel::mpsc, future::Future, select, FutureExt, SinkExt, StreamExt};
|
||||||
use std::{
|
use std::{
|
||||||
|
collections::hash_map::{Entry, HashMap},
|
||||||
net::ToSocketAddrs,
|
net::ToSocketAddrs,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
collections::hash_map::{HashMap, Entry},
|
|
||||||
};
|
|
||||||
|
|
||||||
use futures::{
|
|
||||||
channel::mpsc,
|
|
||||||
SinkExt,
|
|
||||||
FutureExt,
|
|
||||||
select,
|
|
||||||
};
|
|
||||||
|
|
||||||
use async_std::{
|
|
||||||
io::BufReader,
|
|
||||||
prelude::*,
|
|
||||||
task,
|
|
||||||
net::{TcpListener, TcpStream},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
@ -122,13 +138,13 @@ type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum Void {}
|
enum Void {}
|
||||||
|
|
||||||
fn main() -> Result<()> {
|
// main
|
||||||
|
fn run() -> Result<()> {
|
||||||
task::block_on(server("127.0.0.1:8080"))
|
task::block_on(server("127.0.0.1:8080"))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
||||||
let listener = TcpListener::bind(addr).await?;
|
let listener = TcpListener::bind(addr).await?;
|
||||||
|
|
||||||
let (broker_sender, broker_receiver) = mpsc::unbounded();
|
let (broker_sender, broker_receiver) = mpsc::unbounded();
|
||||||
let broker_handle = task::spawn(broker(broker_receiver));
|
let broker_handle = task::spawn(broker(broker_receiver));
|
||||||
let mut incoming = listener.incoming();
|
let mut incoming = listener.incoming();
|
||||||
|
@ -180,16 +196,18 @@ async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
||||||
async fn client_writer(
|
async fn client_writer(
|
||||||
messages: &mut Receiver<String>,
|
messages: &mut Receiver<String>,
|
||||||
stream: Arc<TcpStream>,
|
stream: Arc<TcpStream>,
|
||||||
mut shutdown: Receiver<Void>,
|
shutdown: Receiver<Void>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut stream = &*stream;
|
let mut stream = &*stream;
|
||||||
|
let mut messages = messages.fuse();
|
||||||
|
let mut shutdown = shutdown.fuse();
|
||||||
loop {
|
loop {
|
||||||
select! {
|
select! {
|
||||||
msg = messages.next().fuse() => match msg {
|
msg = messages.next() => match msg {
|
||||||
Some(msg) => stream.write_all(msg.as_bytes()).await?,
|
Some(msg) => stream.write_all(msg.as_bytes()).await?,
|
||||||
None => break,
|
None => break,
|
||||||
},
|
},
|
||||||
void = shutdown.next().fuse() => match void {
|
void = shutdown.next() => match void {
|
||||||
Some(void) => match void {},
|
Some(void) => match void {},
|
||||||
None => break,
|
None => break,
|
||||||
}
|
}
|
||||||
|
@ -212,11 +230,11 @@ enum Event {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn broker(mut events: Receiver<Event>) {
|
async fn broker(events: Receiver<Event>) {
|
||||||
let (disconnect_sender, mut disconnect_receiver) = // 1
|
let (disconnect_sender, mut disconnect_receiver) = // 1
|
||||||
mpsc::unbounded::<(String, Receiver<String>)>();
|
mpsc::unbounded::<(String, Receiver<String>)>();
|
||||||
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
|
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
|
||||||
|
let mut events = events.fuse();
|
||||||
loop {
|
loop {
|
||||||
let event = select! {
|
let event = select! {
|
||||||
event = events.next() => match event {
|
event = events.next() => match event {
|
||||||
|
|
|
@ -14,44 +14,39 @@ Specifically, the client should *simultaneously* read from stdin and from the so
|
||||||
Programming this with threads is cumbersome, especially when implementing clean shutdown.
|
Programming this with threads is cumbersome, especially when implementing clean shutdown.
|
||||||
With async, we can just use the `select!` macro.
|
With async, we can just use the `select!` macro.
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
use std::net::ToSocketAddrs;
|
# extern crate async_std;
|
||||||
|
# extern crate futures;
|
||||||
use futures::select;
|
|
||||||
use futures::FutureExt;
|
|
||||||
|
|
||||||
use async_std::{
|
use async_std::{
|
||||||
prelude::*,
|
io::{stdin, BufRead, BufReader, Write},
|
||||||
net::TcpStream,
|
net::TcpStream,
|
||||||
task,
|
task,
|
||||||
io::{stdin, BufReader},
|
|
||||||
};
|
};
|
||||||
|
use futures::{select, FutureExt, StreamExt};
|
||||||
|
use std::net::ToSocketAddrs;
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
|
||||||
|
// main
|
||||||
fn main() -> Result<()> {
|
fn run() -> Result<()> {
|
||||||
task::block_on(try_main("127.0.0.1:8080"))
|
task::block_on(try_run("127.0.0.1:8080"))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_main(addr: impl ToSocketAddrs) -> Result<()> {
|
async fn try_run(addr: impl ToSocketAddrs) -> Result<()> {
|
||||||
let stream = TcpStream::connect(addr).await?;
|
let stream = TcpStream::connect(addr).await?;
|
||||||
let (reader, mut writer) = (&stream, &stream); // 1
|
let (reader, mut writer) = (&stream, &stream); // 1
|
||||||
let reader = BufReader::new(reader);
|
let mut lines_from_server = BufReader::new(reader).lines().fuse(); // 2
|
||||||
let mut lines_from_server = futures::StreamExt::fuse(reader.lines()); // 2
|
let mut lines_from_stdin = BufReader::new(stdin()).lines().fuse(); // 2
|
||||||
|
|
||||||
let stdin = BufReader::new(stdin());
|
|
||||||
let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines()); // 2
|
|
||||||
loop {
|
loop {
|
||||||
select! { // 3
|
select! { // 3
|
||||||
line = lines_from_server.next().fuse() => match line {
|
line = lines_from_server.next() => match line {
|
||||||
Some(line) => {
|
Some(line) => {
|
||||||
let line = line?;
|
let line = line?;
|
||||||
println!("{}", line);
|
println!("{}", line);
|
||||||
},
|
},
|
||||||
None => break,
|
None => break,
|
||||||
},
|
},
|
||||||
line = lines_from_stdin.next().fuse() => match line {
|
line = lines_from_stdin.next() => match line {
|
||||||
Some(line) => {
|
Some(line) => {
|
||||||
let line = line?;
|
let line = line?;
|
||||||
writer.write_all(line.as_bytes()).await?;
|
writer.write_all(line.as_bytes()).await?;
|
||||||
|
|
|
@ -7,11 +7,18 @@ We need to:
|
||||||
2. interpret the first line as a login
|
2. interpret the first line as a login
|
||||||
3. parse the rest of the lines as a `login: message`
|
3. parse the rest of the lines as a `login: message`
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
use async_std::io::BufReader;
|
# extern crate async_std;
|
||||||
use async_std::net::TcpStream;
|
# use async_std::{
|
||||||
use async_std::io::BufReader;
|
# io::{BufRead, BufReader},
|
||||||
|
# net::{TcpListener, TcpStream},
|
||||||
|
# prelude::Stream,
|
||||||
|
# task,
|
||||||
|
# };
|
||||||
|
# use std::net::ToSocketAddrs;
|
||||||
|
#
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
#
|
||||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
||||||
let listener = TcpListener::bind(addr).await?;
|
let listener = TcpListener::bind(addr).await?;
|
||||||
let mut incoming = listener.incoming();
|
let mut incoming = listener.incoming();
|
||||||
|
@ -65,9 +72,45 @@ One serious problem in the above solution is that, while we correctly propagate
|
||||||
That is, `task::spawn` does not return an error immediately (it can't, it needs to run the future to completion first), only after it is joined.
|
That is, `task::spawn` does not return an error immediately (it can't, it needs to run the future to completion first), only after it is joined.
|
||||||
We can "fix" it by waiting for the task to be joined, like this:
|
We can "fix" it by waiting for the task to be joined, like this:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# #![feature(async_closure)]
|
||||||
|
# extern crate async_std;
|
||||||
|
# use async_std::{
|
||||||
|
# io::{BufRead, BufReader},
|
||||||
|
# net::{TcpListener, TcpStream},
|
||||||
|
# prelude::Stream,
|
||||||
|
# task,
|
||||||
|
# };
|
||||||
|
# use std::net::ToSocketAddrs;
|
||||||
|
#
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
|
#
|
||||||
|
# async fn client(stream: TcpStream) -> Result<()> {
|
||||||
|
# let reader = BufReader::new(&stream); // 2
|
||||||
|
# let mut lines = reader.lines();
|
||||||
|
#
|
||||||
|
# let name = match lines.next().await { // 3
|
||||||
|
# None => Err("peer disconnected immediately")?,
|
||||||
|
# Some(line) => line?,
|
||||||
|
# };
|
||||||
|
# println!("name = {}", name);
|
||||||
|
#
|
||||||
|
# while let Some(line) = lines.next().await { // 4
|
||||||
|
# let line = line?;
|
||||||
|
# let (dest, msg) = match line.find(':') { // 5
|
||||||
|
# None => continue,
|
||||||
|
# Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
|
||||||
|
# };
|
||||||
|
# let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
|
||||||
|
# let msg: String = msg.trim().to_string();
|
||||||
|
# }
|
||||||
|
# Ok(())
|
||||||
|
# }
|
||||||
|
#
|
||||||
|
# async move |stream| {
|
||||||
let handle = task::spawn(client(stream));
|
let handle = task::spawn(client(stream));
|
||||||
handle.await?
|
handle.await
|
||||||
|
# };
|
||||||
```
|
```
|
||||||
|
|
||||||
The `.await` waits until the client finishes, and `?` propagates the result.
|
The `.await` waits until the client finishes, and `?` propagates the result.
|
||||||
|
@ -80,10 +123,16 @@ That is, a flaky internet connection of one peer brings down the whole chat room
|
||||||
A correct way to handle client errors in this case is log them, and continue serving other clients.
|
A correct way to handle client errors in this case is log them, and continue serving other clients.
|
||||||
So let's use a helper function for this:
|
So let's use a helper function for this:
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# use async_std::{
|
||||||
|
# io,
|
||||||
|
# prelude::Future,
|
||||||
|
# task,
|
||||||
|
# };
|
||||||
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
|
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
|
||||||
where
|
where
|
||||||
F: Future<Output = Result<()>> + Send + 'static,
|
F: Future<Output = io::Result<()>> + Send + 'static,
|
||||||
{
|
{
|
||||||
task::spawn(async move {
|
task::spawn(async move {
|
||||||
if let Err(e) = fut.await {
|
if let Err(e) = fut.await {
|
||||||
|
|
|
@ -11,11 +11,20 @@ So let's create a `client_writer` task which receives messages over a channel an
|
||||||
This task would be the point of serialization of messages.
|
This task would be the point of serialization of messages.
|
||||||
if Alice and Charley send two messages to Bob at the same time, Bob will see the messages in the same order as they arrive in the channel.
|
if Alice and Charley send two messages to Bob at the same time, Bob will see the messages in the same order as they arrive in the channel.
|
||||||
|
|
||||||
```rust
|
```rust,edition2018
|
||||||
|
# extern crate async_std;
|
||||||
|
# extern crate futures;
|
||||||
|
# use async_std::{
|
||||||
|
# io::Write,
|
||||||
|
# net::TcpStream,
|
||||||
|
# prelude::Stream,
|
||||||
|
# };
|
||||||
|
# use std::sync::Arc;
|
||||||
use futures::channel::mpsc; // 1
|
use futures::channel::mpsc; // 1
|
||||||
use futures::SinkExt;
|
use futures::SinkExt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||||
type Sender<T> = mpsc::UnboundedSender<T>; // 2
|
type Sender<T> = mpsc::UnboundedSender<T>; // 2
|
||||||
type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue