2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-04-14 04:16:44 +00:00
async-std/docs/src/tutorial/connecting_readers_and_writers.md
bors[bot] 33ff41df48
Merge #224
224: Re-export IO traits from futures r=stjepang a=stjepang

Sorry for the big PR!

Instead of providing our own traits `async_std::io::{Read, Write, Seek, BufRead}`, we now re-export `futures::io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncRead}`. While re-exporting we rename them to strip away the "Async" prefix.

The documentation will display the contents of the original traits from the `futures` crate together with our own extension methods. There's a note in the docs saying the extenion methods become available only when `async_std::prelude::*` is imported.

Our extension traits are re-exported into the prelude, but are marked with `#[doc(hidden)]` so they're completely invisible to users.

The benefit of this is that people can now implement traits from `async_std::io` for their types and stay compatible with `futures`. This will also simplify some trait bounds in our APIs - for example, things like `where Self: futures_io::AsyncRead`.

At the same time, I cleaned up some trait bounds in our stream interfaces, but haven't otherwise fiddled with them much.

I intend to follow up with another PR doing the same change for `Stream` so that we re-export the stream trait from `futures`.

Co-authored-by: Stjepan Glavina <stjepang@gmail.com>
2019-09-22 13:50:53 +00:00

3.7 KiB

Connecting Readers and Writers

So how do we make sure that messages read in connection_loop flow into the relevant connection_writer_loop? We should somehow maintain an peers: HashMap<String, Sender<String>> map which allows a client to find destination channels. However, this map would be a bit of shared mutable state, so we'll have to wrap an RwLock over it and answer tough questions of what should happen if the client joins at the same moment as it receives a message.

One trick to make reasoning about state simpler comes from the actor model. We can create a dedicated broker tasks which owns the peers map and communicates with other tasks by channels. By hiding peers inside such an "actor" task, we remove the need for mutxes and also make serialization point explicit. The order of events "Bob sends message to Alice" and "Alice joins" is determined by the order of the corresponding events in the broker's event queue.

# extern crate async_std;
# extern crate futures_channel;
# extern crate futures_util;
# use async_std::{
#     net::TcpStream,
#     prelude::*,
#     task,
# };
# use futures_channel::mpsc;
# use futures_util::sink::SinkExt;
# use std::sync::Arc;
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
# type Sender<T> = mpsc::UnboundedSender<T>;
# type Receiver<T> = mpsc::UnboundedReceiver<T>;
#
# async fn connection_writer_loop(
#     mut messages: Receiver<String>,
#     stream: Arc<TcpStream>,
# ) -> Result<()> {
#     let mut stream = &*stream;
#     while let Some(msg) = messages.next().await {
#         stream.write_all(msg.as_bytes()).await?;
#     }
#     Ok(())
# }
#
# fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
# where
#     F: Future<Output = Result<()>> + Send + 'static,
# {
#     task::spawn(async move {
#         if let Err(e) = fut.await {
#             eprintln!("{}", e)
#         }
#     })
# }
#
use std::collections::hash_map::{Entry, HashMap};

#[derive(Debug)]
enum Event { // 1
    NewPeer {
        name: String,
        stream: Arc<TcpStream>,
    },
    Message {
        from: String,
        to: Vec<String>,
        msg: String,
    },
}

async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
    let mut peers: HashMap<String, Sender<String>> = HashMap::new(); // 2

    while let Some(event) = events.next().await {
        match event {
            Event::Message { from, to, msg } => {  // 3
                for addr in to {
                    if let Some(peer) = peers.get_mut(&addr) {
                        let msg = format!("from {}: {}\n", from, msg);
                        peer.send(msg).await?
                    }
                }
            }
            Event::NewPeer { name, stream } => {
                match peers.entry(name) {
                    Entry::Occupied(..) => (),
                    Entry::Vacant(entry) => {
                        let (client_sender, client_receiver) = mpsc::unbounded();
                        entry.insert(client_sender); // 4
                        spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
                    }
                }
            }
        }
    }
    Ok(())
}
  1. Broker should handle two types of events: a message or an arrival of a new peer.
  2. Internal state of the broker is a HashMap. Note how we don't need a Mutex here and can confidently say, at each iteration of the broker's loop, what is the current set of peers
  3. To handle a message, we send it over a channel to each destination
  4. To handle new peer, we first register it in the peer's map ...
  5. ... and then spawn a dedicated task to actually write the messages to the socket.