2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-04-10 18:36:43 +00:00
async-std/docs/src/tutorial/all_together.md
Alejandro Martinez Ruiz ba1ee2d204 Fix a-chat tutorial issues (#573)
* tutorial/receiving_messages: fix future output type bound

* tutorial/receiving_messages: remove unneeded message trimming

Trimming was done twice on messages, so one of the two instances can
be removed. I personally think removing the first instance, in which
we are splitting names from messages makes the code more readable
than removing the second instance, but other examples further in
the tutorial show the second instance removed.

* tutorial/receiving_messages: declare use of TcpStream and io::BufReader

Readers couldn't see the `use` lines corresponding to these two
structures.

* tutorial/connecting_readers_and_writers: typos and grammar fixes

* tutorial/all_together: remove unneeded use async_std::io

* tutorial: use SinkExt consistently from futures::sink::SinkExt

* tutorial/handling_disconnection: hide mpsc use clause and remove empty lines

The empty lines translate to the output making it look weird.

* tutorial/handling_disconnection: fix typos

* tutorial/handling_disconnection: use ? in broker_handle.await

We were happy to return an Err variant from the broker_handle before
and nothing has changed in this regard, so bubbling it up to run().
2019-11-21 18:03:10 +01:00

4.3 KiB

All Together

At this point, we only need to start the broker to get a fully-functioning (in the happy case!) chat:

# extern crate async_std;
# extern crate futures;
use async_std::{
    io::BufReader,
    net::{TcpListener, TcpStream, ToSocketAddrs},
    prelude::*,
    task,
};
use futures::channel::mpsc;
use futures::sink::SinkExt;
use std::{
    collections::hash_map::{HashMap, Entry},
    sync::Arc,
};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;

// main
fn run() -> Result<()> {
    task::block_on(accept_loop("127.0.0.1:8080"))
}

fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
    F: Future<Output = Result<()>> + Send + 'static,
{
    task::spawn(async move {
        if let Err(e) = fut.await {
            eprintln!("{}", e)
        }
    })
}

async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
    let listener = TcpListener::bind(addr).await?;

    let (broker_sender, broker_receiver) = mpsc::unbounded(); // 1
    let _broker_handle = task::spawn(broker_loop(broker_receiver));
    let mut incoming = listener.incoming();
    while let Some(stream) = incoming.next().await {
        let stream = stream?;
        println!("Accepting from: {}", stream.peer_addr()?);
        spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
    }
    Ok(())
}

async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
    let stream = Arc::new(stream); // 2
    let reader = BufReader::new(&*stream);
    let mut lines = reader.lines();

    let name = match lines.next().await {
        None => Err("peer disconnected immediately")?,
        Some(line) => line?,
    };
    broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await // 3
        .unwrap();

    while let Some(line) = lines.next().await {
        let line = line?;
        let (dest, msg) = match line.find(':') {
            None => continue,
            Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
        };
        let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
        let msg: String = msg.to_string();

        broker.send(Event::Message { // 4
            from: name.clone(),
            to: dest,
            msg,
        }).await.unwrap();
    }
    Ok(())
}

async fn connection_writer_loop(
    mut messages: Receiver<String>,
    stream: Arc<TcpStream>,
) -> Result<()> {
    let mut stream = &*stream;
    while let Some(msg) = messages.next().await {
        stream.write_all(msg.as_bytes()).await?;
    }
    Ok(())
}

#[derive(Debug)]
enum Event {
    NewPeer {
        name: String,
        stream: Arc<TcpStream>,
    },
    Message {
        from: String,
        to: Vec<String>,
        msg: String,
    },
}

async fn broker_loop(mut events: Receiver<Event>) -> Result<()> {
    let mut peers: HashMap<String, Sender<String>> = HashMap::new();

    while let Some(event) = events.next().await {
        match event {
            Event::Message { from, to, msg } => {
                for addr in to {
                    if let Some(peer) = peers.get_mut(&addr) {
                        let msg = format!("from {}: {}\n", from, msg);
                        peer.send(msg).await?
                    }
                }
            }
            Event::NewPeer { name, stream} => {
                match peers.entry(name) {
                    Entry::Occupied(..) => (),
                    Entry::Vacant(entry) => {
                        let (client_sender, client_receiver) = mpsc::unbounded();
                        entry.insert(client_sender); // 4
                        spawn_and_log_error(connection_writer_loop(client_receiver, stream)); // 5
                    }
                }
            }
        }
    }
    Ok(())
}
  1. Inside the accept_loop, we create the broker's channel and task.
  2. Inside connection_loop, we need to wrap TcpStream into an Arc, to be able to share it with the connection_writer_loop.
  3. On login, we notify the broker. Note that we .unwrap on send: broker should outlive all the clients and if that's not the case the broker probably panicked, so we can escalate the panic as well.
  4. Similarly, we forward parsed messages to the broker, assuming that it is alive.