diff --git a/docs/src/tutorial/index.md b/docs/src/tutorial/index.md index 0136c1f..99ddf8e 100644 --- a/docs/src/tutorial/index.md +++ b/docs/src/tutorial/index.md @@ -8,5 +8,4 @@ How do you distribute the messages? In this tutorial, we will show you how to write one in `async-std`. -You can also find the tutorial in [our repository](https://github.com/async-rs/a-chat). - +You can also find the tutorial in [our repository](https://github.com/async-rs/async-std/blob/master/examples/a-chat). diff --git a/examples/a-chat/client.rs b/examples/a-chat/client.rs new file mode 100644 index 0000000..48634ba --- /dev/null +++ b/examples/a-chat/client.rs @@ -0,0 +1,45 @@ +use futures::select; +use futures::FutureExt; + +use async_std::{ + io::{stdin, BufReader}, + net::{TcpStream, ToSocketAddrs}, + prelude::*, + task, +}; + +type Result = std::result::Result>; + +pub(crate) fn main() -> Result<()> { + task::block_on(try_main("127.0.0.1:8080")) +} + +async fn try_main(addr: impl ToSocketAddrs) -> Result<()> { + let stream = TcpStream::connect(addr).await?; + let (reader, mut writer) = (&stream, &stream); + let reader = BufReader::new(reader); + let mut lines_from_server = futures::StreamExt::fuse(reader.lines()); + + let stdin = BufReader::new(stdin()); + let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines()); + loop { + select! { + line = lines_from_server.next().fuse() => match line { + Some(line) => { + let line = line?; + println!("{}", line); + }, + None => break, + }, + line = lines_from_stdin.next().fuse() => match line { + Some(line) => { + let line = line?; + writer.write_all(line.as_bytes()).await?; + writer.write_all(b"\n").await?; + } + None => break, + } + } + } + Ok(()) +} diff --git a/examples/a-chat/main.rs b/examples/a-chat/main.rs new file mode 100644 index 0000000..ced7cac --- /dev/null +++ b/examples/a-chat/main.rs @@ -0,0 +1,13 @@ +mod client; +mod server; + +type Result = std::result::Result>; + +fn main() -> Result<()> { + let mut args = std::env::args(); + match (args.nth(1).as_ref().map(String::as_str), args.next()) { + (Some("client"), None) => client::main(), + (Some("server"), None) => server::main(), + _ => Err("Usage: a-chat [client|server]")?, + } +} diff --git a/examples/a-chat/server.rs b/examples/a-chat/server.rs new file mode 100644 index 0000000..911d160 --- /dev/null +++ b/examples/a-chat/server.rs @@ -0,0 +1,185 @@ +use std::{ + collections::hash_map::{Entry, HashMap}, + sync::Arc, +}; + +use futures::{channel::mpsc, select, FutureExt, SinkExt}; + +use async_std::{ + io::BufReader, + net::{TcpListener, TcpStream, ToSocketAddrs}, + prelude::*, + task, +}; + +type Result = std::result::Result>; +type Sender = mpsc::UnboundedSender; +type Receiver = mpsc::UnboundedReceiver; + +#[derive(Debug)] +enum Void {} + +pub(crate) fn main() -> Result<()> { + task::block_on(accept_loop("127.0.0.1:8080")) +} + +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { + let listener = TcpListener::bind(addr).await?; + + let (broker_sender, broker_receiver) = mpsc::unbounded(); + let broker = task::spawn(broker_loop(broker_receiver)); + let mut incoming = listener.incoming(); + while let Some(stream) = incoming.next().await { + let stream = stream?; + println!("Accepting from: {}", stream.peer_addr()?); + spawn_and_log_error(connection_loop(broker_sender.clone(), stream)); + } + drop(broker_sender); + broker.await; + Ok(()) +} + +async fn connection_loop(mut broker: Sender, stream: TcpStream) -> Result<()> { + let stream = Arc::new(stream); + let reader = BufReader::new(&*stream); + let mut lines = reader.lines(); + + let name = match lines.next().await { + None => Err("peer disconnected immediately")?, + Some(line) => line?, + }; + let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); + broker + .send(Event::NewPeer { + name: name.clone(), + stream: Arc::clone(&stream), + shutdown: shutdown_receiver, + }) + .await + .unwrap(); + + while let Some(line) = lines.next().await { + let line = line?; + let (dest, msg) = match line.find(':') { + None => continue, + Some(idx) => (&line[..idx], line[idx + 1..].trim()), + }; + let dest: Vec = dest + .split(',') + .map(|name| name.trim().to_string()) + .collect(); + let msg: String = msg.trim().to_string(); + + broker + .send(Event::Message { + from: name.clone(), + to: dest, + msg, + }) + .await + .unwrap(); + } + + Ok(()) +} + +async fn connection_writer_loop( + messages: &mut Receiver, + stream: Arc, + mut shutdown: Receiver, +) -> Result<()> { + let mut stream = &*stream; + loop { + select! { + msg = messages.next().fuse() => match msg { + Some(msg) => stream.write_all(msg.as_bytes()).await?, + None => break, + }, + void = shutdown.next().fuse() => match void { + Some(void) => match void {}, + None => break, + } + } + } + Ok(()) +} + +#[derive(Debug)] +enum Event { + NewPeer { + name: String, + stream: Arc, + shutdown: Receiver, + }, + Message { + from: String, + to: Vec, + msg: String, + }, +} + +async fn broker_loop(mut events: Receiver) { + let (disconnect_sender, mut disconnect_receiver) = + mpsc::unbounded::<(String, Receiver)>(); + let mut peers: HashMap> = HashMap::new(); + + loop { + let event = select! { + event = events.next().fuse() => match event { + None => break, + Some(event) => event, + }, + disconnect = disconnect_receiver.next().fuse() => { + let (name, _pending_messages) = disconnect.unwrap(); + assert!(peers.remove(&name).is_some()); + continue; + }, + }; + match event { + Event::Message { from, to, msg } => { + for addr in to { + if let Some(peer) = peers.get_mut(&addr) { + peer.send(format!("from {}: {}\n", from, msg)) + .await + .unwrap() + } + } + } + Event::NewPeer { + name, + stream, + shutdown, + } => match peers.entry(name.clone()) { + Entry::Occupied(..) => (), + Entry::Vacant(entry) => { + let (client_sender, mut client_receiver) = mpsc::unbounded(); + entry.insert(client_sender); + let mut disconnect_sender = disconnect_sender.clone(); + spawn_and_log_error(async move { + let res = + connection_writer_loop(&mut client_receiver, stream, shutdown).await; + disconnect_sender + .send((name, client_receiver)) + .await + .unwrap(); + res + }); + } + }, + } + } + drop(peers); + drop(disconnect_sender); + while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {} +} + +fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> +where + F: Future> + Send + 'static, +{ + task::spawn(async move { + if let Err(e) = fut.await { + eprintln!("{}", e) + } + }) +}