mirror of
https://github.com/async-rs/async-std.git
synced 2025-01-16 02:39:55 +00:00
Rename server functions to follow *_loop convention (#139)
Rename: server -> accept_loop, client -> connection_loop, client_writer -> connection_writer_loop
This commit is contained in:
parent
4aceee1b61
commit
55ea367415
7 changed files with 54 additions and 53 deletions
|
@ -34,7 +34,8 @@ Now we can write the server's accept loop:
|
|||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { // 1
|
||||
|
||||
let listener = TcpListener::bind(addr).await?; // 2
|
||||
let mut incoming = listener.incoming();
|
||||
while let Some(stream) = incoming.next().await { // 3
|
||||
|
@ -44,7 +45,7 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
|
|||
}
|
||||
```
|
||||
|
||||
1. We mark the `server` function as `async`, which allows us to use `.await` syntax inside.
|
||||
1. We mark the `accept_loop` function as `async`, which allows us to use `.await` syntax inside.
|
||||
2. `TcpListener::bind` call returns a future, which we `.await` to extract the `Result`, and then `?` to get a `TcpListener`.
|
||||
Note how `.await` and `?` work nicely together.
|
||||
This is exactly how `std::net::TcpListener` works, but with `.await` added.
|
||||
|
@ -72,7 +73,7 @@ Finally, let's add main:
|
|||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
# async fn server(addr: impl ToSocketAddrs) -> Result<()> { // 1
|
||||
# async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { // 1
|
||||
# let listener = TcpListener::bind(addr).await?; // 2
|
||||
# let mut incoming = listener.incoming();
|
||||
# while let Some(stream) = incoming.next().await { // 3
|
||||
|
@ -83,7 +84,7 @@ Finally, let's add main:
|
|||
#
|
||||
// main
|
||||
fn run() -> Result<()> {
|
||||
let fut = server("127.0.0.1:8080");
|
||||
let fut = accept_loop("127.0.0.1:8080");
|
||||
task::block_on(fut)
|
||||
}
|
||||
```
|
||||
|
|
|
@ -25,7 +25,7 @@ type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
|||
|
||||
// main
|
||||
fn run() -> Result<()> {
|
||||
task::block_on(server("127.0.0.1:8080"))
|
||||
task::block_on(accept_loop("127.0.0.1:8080"))
|
||||
}
|
||||
|
||||
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
|
||||
|
@ -39,21 +39,21 @@ where
|
|||
})
|
||||
}
|
||||
|
||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
let (broker_sender, broker_receiver) = mpsc::unbounded(); // 1
|
||||
let _broker_handle = task::spawn(broker(broker_receiver));
|
||||
let _broker_handle = task::spawn(broker_loop(broker_receiver));
|
||||
let mut incoming = listener.incoming();
|
||||
while let Some(stream) = incoming.next().await {
|
||||
let stream = stream?;
|
||||
println!("Accepting from: {}", stream.peer_addr()?);
|
||||
spawn_and_log_error(client(broker_sender.clone(), stream));
|
||||
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
||||
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
||||
let stream = Arc::new(stream); // 2
|
||||
let reader = BufReader::new(&*stream);
|
||||
let mut lines = reader.lines();
|
||||
|
@ -83,7 +83,7 @@ async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn client_writer(
|
||||
async fn connection_writer_loop(
|
||||
mut messages: Receiver<String>,
|
||||
stream: Arc<TcpStream>,
|
||||
) -> Result<()> {
|
||||
|
@ -107,7 +107,7 @@ enum Event {
|
|||
},
|
||||
}
|
||||
|
||||
async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
||||
async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
|
||||
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
|
||||
|
||||
while let Some(event) = events.next().await {
|
||||
|
@ -126,7 +126,7 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
|||
Entry::Vacant(entry) => {
|
||||
let (client_sender, client_receiver) = mpsc::unbounded();
|
||||
entry.insert(client_sender); // 4
|
||||
spawn_and_log_error(client_writer(client_receiver, stream)); // 5
|
||||
spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -136,8 +136,8 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
|||
}
|
||||
```
|
||||
|
||||
1. Inside the `server`, we create the broker's channel and `task`.
|
||||
2. Inside `client`, we need to wrap `TcpStream` into an `Arc`, to be able to share it with the `client_writer`.
|
||||
1. Inside the `accept_loop`, we create the broker's channel and `task`.
|
||||
2. Inside `connection_loop`, we need to wrap `TcpStream` into an `Arc`, to be able to share it with the `connection_writer_loop`.
|
||||
3. On login, we notify the broker.
|
||||
Note that we `.unwrap` on send: broker should outlive all the clients and if that's not the case the broker probably panicked, so we can escalate the panic as well.
|
||||
4. Similarly, we forward parsed messages to the broker, assuming that it is alive.
|
||||
|
|
|
@ -53,7 +53,7 @@ Let's add waiting to the server:
|
|||
# }
|
||||
#
|
||||
#
|
||||
# async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
||||
# async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
||||
# let stream = Arc::new(stream); // 2
|
||||
# let reader = BufReader::new(&*stream);
|
||||
# let mut lines = reader.lines();
|
||||
|
@ -83,7 +83,7 @@ Let's add waiting to the server:
|
|||
# Ok(())
|
||||
# }
|
||||
#
|
||||
# async fn client_writer(
|
||||
# async fn connection_writer_loop(
|
||||
# mut messages: Receiver<String>,
|
||||
# stream: Arc<TcpStream>,
|
||||
# ) -> Result<()> {
|
||||
|
@ -107,7 +107,7 @@ Let's add waiting to the server:
|
|||
# },
|
||||
# }
|
||||
#
|
||||
# async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
||||
# async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
|
||||
# let mut peers: HashMap<String, Sender<String>> = HashMap::new();
|
||||
#
|
||||
# while let Some(event) = events.next().await {
|
||||
|
@ -126,7 +126,7 @@ Let's add waiting to the server:
|
|||
# Entry::Vacant(entry) => {
|
||||
# let (client_sender, client_receiver) = mpsc::unbounded();
|
||||
# entry.insert(client_sender); // 4
|
||||
# spawn_and_log_error(client_writer(client_receiver, stream)); // 5
|
||||
# spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
|
||||
# }
|
||||
# }
|
||||
# }
|
||||
|
@ -135,16 +135,16 @@ Let's add waiting to the server:
|
|||
# Ok(())
|
||||
# }
|
||||
#
|
||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
let (broker_sender, broker_receiver) = mpsc::unbounded();
|
||||
let broker_handle = task::spawn(broker(broker_receiver));
|
||||
let broker_handle = task::spawn(broker_loop(broker_receiver));
|
||||
let mut incoming = listener.incoming();
|
||||
while let Some(stream) = incoming.next().await {
|
||||
let stream = stream?;
|
||||
println!("Accepting from: {}", stream.peer_addr()?);
|
||||
spawn_and_log_error(client(broker_sender.clone(), stream));
|
||||
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
|
||||
}
|
||||
drop(broker_sender); // 1
|
||||
broker_handle.await?; // 5
|
||||
|
@ -175,7 +175,7 @@ And to the broker:
|
|||
# type Sender<T> = mpsc::UnboundedSender<T>;
|
||||
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||
#
|
||||
# async fn client_writer(
|
||||
# async fn connection_writer_loop(
|
||||
# mut messages: Receiver<String>,
|
||||
# stream: Arc<TcpStream>,
|
||||
# ) -> Result<()> {
|
||||
|
@ -210,7 +210,7 @@ And to the broker:
|
|||
# },
|
||||
# }
|
||||
#
|
||||
async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
||||
async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
|
||||
let mut writers = Vec::new();
|
||||
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
|
||||
while let Some(event) = events.next().await { // 2
|
||||
|
@ -229,7 +229,7 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
|||
Entry::Vacant(entry) => {
|
||||
let (client_sender, client_receiver) = mpsc::unbounded();
|
||||
entry.insert(client_sender);
|
||||
let handle = spawn_and_log_error(client_writer(client_receiver, stream));
|
||||
let handle = spawn_and_log_error(connection_writer_loop(client_receiver, stream));
|
||||
writers.push(handle); // 4
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
|
||||
## Connecting Readers and Writers
|
||||
|
||||
So how do we make sure that messages read in `client` flow into the relevant `client_writer`?
|
||||
So how do we make sure that messages read in `connection_loop` flow into the relevant `connection_writer_loop`?
|
||||
We should somehow maintain an `peers: HashMap<String, Sender<String>>` map which allows a client to find destination channels.
|
||||
However, this map would be a bit of shared mutable state, so we'll have to wrap an `RwLock` over it and answer tough questions of what should happen if the client joins at the same moment as it receives a message.
|
||||
|
||||
|
@ -28,7 +28,7 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined
|
|||
# type Sender<T> = mpsc::UnboundedSender<T>;
|
||||
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||
#
|
||||
# async fn client_writer(
|
||||
# async fn connection_writer_loop(
|
||||
# mut messages: Receiver<String>,
|
||||
# stream: Arc<TcpStream>,
|
||||
# ) -> Result<()> {
|
||||
|
@ -65,7 +65,7 @@ enum Event { // 1
|
|||
},
|
||||
}
|
||||
|
||||
async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
||||
async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
|
||||
let mut peers: HashMap<String, Sender<String>> = HashMap::new(); // 2
|
||||
|
||||
while let Some(event) = events.next().await {
|
||||
|
@ -84,7 +84,7 @@ async fn broker(mut events: Receiver<Event>) -> Result<()> {
|
|||
Entry::Vacant(entry) => {
|
||||
let (client_sender, client_receiver) = mpsc::unbounded();
|
||||
entry.insert(client_sender); // 4
|
||||
spawn_and_log_error(client_writer(client_receiver, stream)); // 5
|
||||
spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,7 +15,7 @@ There's a more minimal solution however, which makes clever use of RAII.
|
|||
Closing a channel is a synchronization event, so we don't need to send a shutdown message, we can just drop the sender.
|
||||
This way, we statically guarantee that we issue shutdown exactly once, even if we early return via `?` or panic.
|
||||
|
||||
First, let's add a shutdown channel to the `client`:
|
||||
First, let's add a shutdown channel to the `connection_loop`:
|
||||
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
|
@ -47,7 +47,7 @@ enum Event {
|
|||
},
|
||||
}
|
||||
|
||||
async fn client(mut broker: Sender<Event>, stream: Arc<TcpStream>) -> Result<()> {
|
||||
async fn connection_loop(mut broker: Sender<Event>, stream: Arc<TcpStream>) -> Result<()> {
|
||||
// ...
|
||||
# let name: String = unimplemented!();
|
||||
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>(); // 3
|
||||
|
@ -65,7 +65,7 @@ async fn client(mut broker: Sender<Event>, stream: Arc<TcpStream>) -> Result<()>
|
|||
2. We pass the shutdown channel to the writer task
|
||||
3. In the reader, we create a `_shutdown_sender` whose only purpose is to get dropped.
|
||||
|
||||
In the `client_writer`, we now need to choose between shutdown and message channels.
|
||||
In the `connection_writer_loop`, we now need to choose between shutdown and message channels.
|
||||
We use the `select` macro for this purpose:
|
||||
|
||||
```rust,edition2018
|
||||
|
@ -84,7 +84,7 @@ use futures_util::{select, FutureExt, StreamExt};
|
|||
# #[derive(Debug)]
|
||||
# enum Void {} // 1
|
||||
|
||||
async fn client_writer(
|
||||
async fn connection_writer_loop(
|
||||
messages: &mut Receiver<String>,
|
||||
stream: Arc<TcpStream>,
|
||||
shutdown: Receiver<Void>, // 1
|
||||
|
@ -112,7 +112,7 @@ async fn client_writer(
|
|||
2. Because of `select`, we can't use a `while let` loop, so we desugar it further into a `loop`.
|
||||
3. In the shutdown case we use `match void {}` as a statically-checked `unreachable!()`.
|
||||
|
||||
Another problem is that between the moment we detect disconnection in `client_writer` and the moment when we actually remove the peer from the `peers` map, new messages might be pushed into the peer's channel.
|
||||
Another problem is that between the moment we detect disconnection in `connection_writer_loop` and the moment when we actually remove the peer from the `peers` map, new messages might be pushed into the peer's channel.
|
||||
To not lose these messages completely, we'll return the messages channel back to the broker.
|
||||
This also allows us to establish a useful invariant that the message channel strictly outlives the peer in the `peers` map, and makes the broker itself infailable.
|
||||
|
||||
|
@ -146,25 +146,25 @@ enum Void {}
|
|||
|
||||
// main
|
||||
fn run() -> Result<()> {
|
||||
task::block_on(server("127.0.0.1:8080"))
|
||||
task::block_on(accept_loop("127.0.0.1:8080"))
|
||||
}
|
||||
|
||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let (broker_sender, broker_receiver) = mpsc::unbounded();
|
||||
let broker_handle = task::spawn(broker(broker_receiver));
|
||||
let broker_handle = task::spawn(broker_loop(broker_receiver));
|
||||
let mut incoming = listener.incoming();
|
||||
while let Some(stream) = incoming.next().await {
|
||||
let stream = stream?;
|
||||
println!("Accepting from: {}", stream.peer_addr()?);
|
||||
spawn_and_log_error(client(broker_sender.clone(), stream));
|
||||
spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
|
||||
}
|
||||
drop(broker_sender);
|
||||
broker_handle.await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
||||
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
||||
let stream = Arc::new(stream);
|
||||
let reader = BufReader::new(&*stream);
|
||||
let mut lines = reader.lines();
|
||||
|
@ -199,7 +199,7 @@ async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn client_writer(
|
||||
async fn connection_writer_loop(
|
||||
messages: &mut Receiver<String>,
|
||||
stream: Arc<TcpStream>,
|
||||
shutdown: Receiver<Void>,
|
||||
|
@ -236,7 +236,7 @@ enum Event {
|
|||
},
|
||||
}
|
||||
|
||||
async fn broker(events: Receiver<Event>) {
|
||||
async fn broker_loop(events: Receiver<Event>) {
|
||||
let (disconnect_sender, mut disconnect_receiver) = // 1
|
||||
mpsc::unbounded::<(String, Receiver<String>)>();
|
||||
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
|
||||
|
@ -271,7 +271,7 @@ async fn broker(events: Receiver<Event>) {
|
|||
entry.insert(client_sender);
|
||||
let mut disconnect_sender = disconnect_sender.clone();
|
||||
spawn_and_log_error(async move {
|
||||
let res = client_writer(&mut client_receiver, stream, shutdown).await;
|
||||
let res = connection_writer_loop(&mut client_receiver, stream, shutdown).await;
|
||||
disconnect_sender.send((name, client_receiver)).await // 4
|
||||
.unwrap();
|
||||
res
|
||||
|
|
|
@ -18,18 +18,18 @@ We need to:
|
|||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
async fn server(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
let mut incoming = listener.incoming();
|
||||
while let Some(stream) = incoming.next().await {
|
||||
let stream = stream?;
|
||||
println!("Accepting from: {}", stream.peer_addr()?);
|
||||
let _handle = task::spawn(client(stream)); // 1
|
||||
let _handle = task::spawn(connection_loop(stream)); // 1
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn client(stream: TcpStream) -> Result<()> {
|
||||
async fn connection_loop(stream: TcpStream) -> Result<()> {
|
||||
let reader = BufReader::new(&stream); // 2
|
||||
let mut lines = reader.lines();
|
||||
|
||||
|
@ -53,7 +53,7 @@ async fn client(stream: TcpStream) -> Result<()> {
|
|||
```
|
||||
|
||||
1. We use `task::spawn` function to spawn an independent task for working with each client.
|
||||
That is, after accepting the client the `server` loop immediately starts waiting for the next one.
|
||||
That is, after accepting the client the `accept_loop` immediately starts waiting for the next one.
|
||||
This is the core benefit of event-driven architecture: we serve many clients concurrently, without spending many hardware threads.
|
||||
|
||||
2. Luckily, the "split byte stream into lines" functionality is already implemented.
|
||||
|
@ -67,7 +67,7 @@ async fn client(stream: TcpStream) -> Result<()> {
|
|||
|
||||
## Managing Errors
|
||||
|
||||
One serious problem in the above solution is that, while we correctly propagate errors in the `client`, we just drop the error on the floor afterwards!
|
||||
One serious problem in the above solution is that, while we correctly propagate errors in the `connection_loop`, we just drop the error on the floor afterwards!
|
||||
That is, `task::spawn` does not return an error immediately (it can't, it needs to run the future to completion first), only after it is joined.
|
||||
We can "fix" it by waiting for the task to be joined, like this:
|
||||
|
||||
|
@ -83,7 +83,7 @@ We can "fix" it by waiting for the task to be joined, like this:
|
|||
#
|
||||
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
|
||||
#
|
||||
# async fn client(stream: TcpStream) -> Result<()> {
|
||||
# async fn connection_loop(stream: TcpStream) -> Result<()> {
|
||||
# let reader = BufReader::new(&stream); // 2
|
||||
# let mut lines = reader.lines();
|
||||
#
|
||||
|
@ -106,7 +106,7 @@ We can "fix" it by waiting for the task to be joined, like this:
|
|||
# }
|
||||
#
|
||||
# async move |stream| {
|
||||
let handle = task::spawn(client(stream));
|
||||
let handle = task::spawn(connection_loop(stream));
|
||||
handle.await
|
||||
# };
|
||||
```
|
||||
|
|
|
@ -1,13 +1,13 @@
|
|||
## Sending Messages
|
||||
|
||||
Now it's time to implement the other half -- sending messages.
|
||||
A most obvious way to implement sending is to give each `client` access to the write half of `TcpStream` of each other clients.
|
||||
A most obvious way to implement sending is to give each `connection_loop` access to the write half of `TcpStream` of each other clients.
|
||||
That way, a client can directly `.write_all` a message to recipients.
|
||||
However, this would be wrong: if Alice sends `bob: foo`, and Charley sends `bob: bar`, Bob might actually receive `fobaor`.
|
||||
Sending a message over a socket might require several syscalls, so two concurrent `.write_all`'s might interfere with each other!
|
||||
|
||||
As a rule of thumb, only a single task should write to each `TcpStream`.
|
||||
So let's create a `client_writer` task which receives messages over a channel and writes them to the socket.
|
||||
So let's create a `connection_writer_loop` task which receives messages over a channel and writes them to the socket.
|
||||
This task would be the point of serialization of messages.
|
||||
if Alice and Charley send two messages to Bob at the same time, Bob will see the messages in the same order as they arrive in the channel.
|
||||
|
||||
|
@ -28,7 +28,7 @@ use std::sync::Arc;
|
|||
type Sender<T> = mpsc::UnboundedSender<T>; // 2
|
||||
type Receiver<T> = mpsc::UnboundedReceiver<T>;
|
||||
|
||||
async fn client_writer(
|
||||
async fn connection_writer_loop(
|
||||
mut messages: Receiver<String>,
|
||||
stream: Arc<TcpStream>, // 3
|
||||
) -> Result<()> {
|
||||
|
@ -42,5 +42,5 @@ async fn client_writer(
|
|||
|
||||
1. We will use channels from the `futures` crate.
|
||||
2. For simplicity, we will use `unbounded` channels, and won't be discussing backpressure in this tutorial.
|
||||
3. As `client` and `client_writer` share the same `TcpStream`, we need to put it into an `Arc`.
|
||||
Note that because `client` only reads from the stream and `client_writer` only writes to the stream, we don't get a race here.
|
||||
3. As `connection_loop` and `connection_writer_loop` share the same `TcpStream`, we need to put it into an `Arc`.
|
||||
Note that because `client` only reads from the stream and `connection_writer_loop` only writes to the stream, we don't get a race here.
|
||||
|
|
Loading…
Reference in a new issue