forked from mirror/async-std
		
	
		
			
				
	
	
		
			184 lines
		
	
	
	
		
			5.3 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
			
		
		
	
	
			184 lines
		
	
	
	
		
			5.3 KiB
		
	
	
	
		
			Rust
		
	
	
	
	
	
| use std::{
 | |
|     collections::hash_map::{Entry, HashMap},
 | |
|     sync::Arc,
 | |
| };
 | |
| 
 | |
| use futures::{channel::mpsc, select, FutureExt, SinkExt};
 | |
| 
 | |
| use async_std::{
 | |
|     io::BufReader,
 | |
|     net::{TcpListener, TcpStream, ToSocketAddrs},
 | |
|     prelude::*,
 | |
|     task,
 | |
| };
 | |
| 
 | |
| type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
 | |
| type Sender<T> = mpsc::UnboundedSender<T>;
 | |
| type Receiver<T> = mpsc::UnboundedReceiver<T>;
 | |
| 
 | |
| #[derive(Debug)]
 | |
| enum Void {}
 | |
| 
 | |
| pub(crate) fn main() -> Result<()> {
 | |
|     task::block_on(accept_loop("127.0.0.1:8080"))
 | |
| }
 | |
| 
 | |
| async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
 | |
|     let listener = TcpListener::bind(addr).await?;
 | |
| 
 | |
|     let (broker_sender, broker_receiver) = mpsc::unbounded();
 | |
|     let broker = task::spawn(broker_loop(broker_receiver));
 | |
|     let mut incoming = listener.incoming();
 | |
|     while let Some(stream) = incoming.next().await {
 | |
|         let stream = stream?;
 | |
|         println!("Accepting from: {}", stream.peer_addr()?);
 | |
|         spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
 | |
|     }
 | |
|     drop(broker_sender);
 | |
|     broker.await;
 | |
|     Ok(())
 | |
| }
 | |
| 
 | |
| async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
 | |
|     let stream = Arc::new(stream);
 | |
|     let reader = BufReader::new(&*stream);
 | |
|     let mut lines = reader.lines();
 | |
| 
 | |
|     let name = match lines.next().await {
 | |
|         None => return Err("peer disconnected immediately".into()),
 | |
|         Some(line) => line?,
 | |
|     };
 | |
|     let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
 | |
|     broker
 | |
|         .send(Event::NewPeer {
 | |
|             name: name.clone(),
 | |
|             stream: Arc::clone(&stream),
 | |
|             shutdown: shutdown_receiver,
 | |
|         })
 | |
|         .await
 | |
|         .unwrap();
 | |
| 
 | |
|     while let Some(line) = lines.next().await {
 | |
|         let line = line?;
 | |
|         let (dest, msg) = match line.find(':') {
 | |
|             None => continue,
 | |
|             Some(idx) => (&line[..idx], line[idx + 1..].trim()),
 | |
|         };
 | |
|         let dest: Vec<String> = dest
 | |
|             .split(',')
 | |
|             .map(|name| name.trim().to_string())
 | |
|             .collect();
 | |
|         let msg: String = msg.trim().to_string();
 | |
| 
 | |
|         broker
 | |
|             .send(Event::Message {
 | |
|                 from: name.clone(),
 | |
|                 to: dest,
 | |
|                 msg,
 | |
|             })
 | |
|             .await
 | |
|             .unwrap();
 | |
|     }
 | |
| 
 | |
|     Ok(())
 | |
| }
 | |
| 
 | |
| async fn connection_writer_loop(
 | |
|     messages: &mut Receiver<String>,
 | |
|     stream: Arc<TcpStream>,
 | |
|     mut shutdown: Receiver<Void>,
 | |
| ) -> Result<()> {
 | |
|     let mut stream = &*stream;
 | |
|     loop {
 | |
|         select! {
 | |
|             msg = messages.next().fuse() => match msg {
 | |
|                 Some(msg) => stream.write_all(msg.as_bytes()).await?,
 | |
|                 None => break,
 | |
|             },
 | |
|             void = shutdown.next().fuse() => match void {
 | |
|                 Some(void) => match void {},
 | |
|                 None => break,
 | |
|             }
 | |
|         }
 | |
|     }
 | |
|     Ok(())
 | |
| }
 | |
| 
 | |
| #[derive(Debug)]
 | |
| enum Event {
 | |
|     NewPeer {
 | |
|         name: String,
 | |
|         stream: Arc<TcpStream>,
 | |
|         shutdown: Receiver<Void>,
 | |
|     },
 | |
|     Message {
 | |
|         from: String,
 | |
|         to: Vec<String>,
 | |
|         msg: String,
 | |
|     },
 | |
| }
 | |
| 
 | |
| async fn broker_loop(mut events: Receiver<Event>) {
 | |
|     let (disconnect_sender, mut disconnect_receiver) =
 | |
|         mpsc::unbounded::<(String, Receiver<String>)>();
 | |
|     let mut peers: HashMap<String, Sender<String>> = HashMap::new();
 | |
| 
 | |
|     loop {
 | |
|         let event = select! {
 | |
|             event = events.next().fuse() => match event {
 | |
|                 None => break,
 | |
|                 Some(event) => event,
 | |
|             },
 | |
|             disconnect = disconnect_receiver.next().fuse() => {
 | |
|                 let (name, _pending_messages) = disconnect.unwrap();
 | |
|                 assert!(peers.remove(&name).is_some());
 | |
|                 continue;
 | |
|             },
 | |
|         };
 | |
|         match event {
 | |
|             Event::Message { from, to, msg } => {
 | |
|                 for addr in to {
 | |
|                     if let Some(peer) = peers.get_mut(&addr) {
 | |
|                         let msg = format!("from {}: {}\n", from, msg);
 | |
|                         peer.send(msg).await.unwrap();
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
|             Event::NewPeer {
 | |
|                 name,
 | |
|                 stream,
 | |
|                 shutdown,
 | |
|             } => match peers.entry(name.clone()) {
 | |
|                 Entry::Occupied(..) => (),
 | |
|                 Entry::Vacant(entry) => {
 | |
|                     let (client_sender, mut client_receiver) = mpsc::unbounded();
 | |
|                     entry.insert(client_sender);
 | |
|                     let mut disconnect_sender = disconnect_sender.clone();
 | |
|                     spawn_and_log_error(async move {
 | |
|                         let res =
 | |
|                             connection_writer_loop(&mut client_receiver, stream, shutdown).await;
 | |
|                         disconnect_sender
 | |
|                             .send((name, client_receiver))
 | |
|                             .await
 | |
|                             .unwrap();
 | |
|                         res
 | |
|                     });
 | |
|                 }
 | |
|             },
 | |
|         }
 | |
|     }
 | |
|     drop(peers);
 | |
|     drop(disconnect_sender);
 | |
|     while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {}
 | |
| }
 | |
| 
 | |
| fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
 | |
| where
 | |
|     F: Future<Output = Result<()>> + Send + 'static,
 | |
| {
 | |
|     task::spawn(async move {
 | |
|         if let Err(e) = fut.await {
 | |
|             eprintln!("{}", e)
 | |
|         }
 | |
|     })
 | |
| }
 |