2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-04-08 09:26:42 +00:00
This commit is contained in:
Florian Gilcher 2024-02-12 20:33:17 +00:00 committed by GitHub
commit dbef8f3690
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 81 additions and 75 deletions

View file

@ -1,13 +1,14 @@
use futures::select; use std::sync::Arc;
use futures::FutureExt;
use async_std::{ use async_std::{
io::{stdin, BufReader}, io::{stdin, BufReader},
net::{TcpStream, ToSocketAddrs}, net::{TcpStream, ToSocketAddrs},
prelude::*, prelude::*,
task, task,
future::select,
}; };
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
pub(crate) fn main() -> Result<()> { pub(crate) fn main() -> Result<()> {
@ -15,31 +16,28 @@ pub(crate) fn main() -> Result<()> {
} }
async fn try_main(addr: impl ToSocketAddrs) -> Result<()> { async fn try_main(addr: impl ToSocketAddrs) -> Result<()> {
let stream = TcpStream::connect(addr).await?; let stream = Arc::new(TcpStream::connect(addr).await?);
let (reader, mut writer) = (&stream, &stream); let (reader, writer) = (stream.clone(), stream.clone());
let reader = BufReader::new(reader);
let mut lines_from_server = futures::StreamExt::fuse(reader.lines());
let stdin = BufReader::new(stdin()); let incoming = task::spawn(async move {
let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines()); let mut messages = BufReader::new(&*reader).lines();
loop { while let Some(message) = messages.next().await {
select! { let message = message?;
line = lines_from_server.next().fuse() => match line { println!("{}", message);
Some(line) => { }
let line = line?; Ok(())
println!("{}", line); });
},
None => break, let outgoing = task::spawn(async move {
}, let mut stdin = BufReader::new(stdin()).lines();
line = lines_from_stdin.next().fuse() => match line {
Some(line) => { while let Some(line) = stdin.next().await {
let line = line?; let line = line?;
writer.write_all(line.as_bytes()).await?; let message = format!("{}\n", line);
writer.write_all(b"\n").await?; (&*writer).write_all(message.as_bytes()).await?;
} }
None => break, Ok(())
} });
}
} select!(incoming, outgoing).await
Ok(())
} }

View file

@ -3,18 +3,16 @@ use std::{
sync::Arc, sync::Arc,
}; };
use futures::{channel::mpsc, select, FutureExt, SinkExt};
use async_std::{ use async_std::{
io::BufReader, io::BufReader,
net::{TcpListener, TcpStream, ToSocketAddrs}, net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*, prelude::*,
task, task,
sync::{channel, Sender, Receiver},
stream,
}; };
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;
#[derive(Debug)] #[derive(Debug)]
enum Void {} enum Void {}
@ -26,7 +24,7 @@ pub(crate) fn main() -> Result<()> {
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?; let listener = TcpListener::bind(addr).await?;
let (broker_sender, broker_receiver) = mpsc::unbounded(); let (broker_sender, broker_receiver) = channel(10);
let broker = task::spawn(broker_loop(broker_receiver)); let broker = task::spawn(broker_loop(broker_receiver));
let mut incoming = listener.incoming(); let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await { while let Some(stream) = incoming.next().await {
@ -39,7 +37,7 @@ async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
Ok(()) Ok(())
} }
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> { async fn connection_loop(broker: Sender<Event>, stream: TcpStream) -> Result<()> {
let stream = Arc::new(stream); let stream = Arc::new(stream);
let reader = BufReader::new(&*stream); let reader = BufReader::new(&*stream);
let mut lines = reader.lines(); let mut lines = reader.lines();
@ -48,15 +46,14 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
None => return Err("peer disconnected immediately".into()), None => return Err("peer disconnected immediately".into()),
Some(line) => line?, Some(line) => line?,
}; };
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>(); let (_shutdown_sender, shutdown_receiver) = channel::<Void>(0);
broker broker
.send(Event::NewPeer { .send(Event::NewPeer {
name: name.clone(), name: name.clone(),
stream: Arc::clone(&stream), stream: Arc::clone(&stream),
shutdown: shutdown_receiver, shutdown: shutdown_receiver,
}) })
.await .await;
.unwrap();
while let Some(line) = lines.next().await { while let Some(line) = lines.next().await {
let line = line?; let line = line?;
@ -76,28 +73,36 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
to: dest, to: dest,
msg, msg,
}) })
.await .await;
.unwrap();
} }
Ok(()) Ok(())
} }
#[derive(Debug)]
enum ConnectionWriterEvent {
Message(String),
Shutdown
}
async fn connection_writer_loop( async fn connection_writer_loop(
messages: &mut Receiver<String>, messages: &mut Receiver<String>,
stream: Arc<TcpStream>, stream: Arc<TcpStream>,
mut shutdown: Receiver<Void>, shutdown: Receiver<Void>,
) -> Result<()> { ) -> Result<()> {
let mut stream = &*stream; let mut stream = &*stream;
loop { let messages = messages.map(ConnectionWriterEvent::Message);
select! { let shutdown = shutdown.map(|_| ConnectionWriterEvent::Shutdown).chain(stream::once(ConnectionWriterEvent::Shutdown));
msg = messages.next().fuse() => match msg {
Some(msg) => stream.write_all(msg.as_bytes()).await?, let mut events = shutdown.merge(messages);
None => break,
}, while let Some(event) = events.next().await {
void = shutdown.next().fuse() => match void { match event {
Some(void) => match void {}, ConnectionWriterEvent::Message(msg) => {
None => break, stream.write_all(msg.as_bytes()).await?;
}
ConnectionWriterEvent::Shutdown => {
break
} }
} }
} }
@ -118,58 +123,61 @@ enum Event {
}, },
} }
async fn broker_loop(mut events: Receiver<Event>) { #[derive(Debug)]
let (disconnect_sender, mut disconnect_receiver) = enum BrokerEvent {
mpsc::unbounded::<(String, Receiver<String>)>(); ClientEvent(Event),
let mut peers: HashMap<String, Sender<String>> = HashMap::new(); Disconnection((String, Receiver<String>)),
Shutdown,
}
loop { async fn broker_loop(events: Receiver<Event>) {
let event = select! { let (disconnect_sender, disconnect_receiver) = channel(10);
event = events.next().fuse() => match event {
None => break, let mut peers: HashMap<String, Sender<String>> = HashMap::new();
Some(event) => event, let disconnect_receiver = disconnect_receiver.map(BrokerEvent::Disconnection);
}, let events = events.map(BrokerEvent::ClientEvent).chain(stream::once(BrokerEvent::Shutdown));
disconnect = disconnect_receiver.next().fuse() => {
let (name, _pending_messages) = disconnect.unwrap(); let mut stream = disconnect_receiver.merge(events);
assert!(peers.remove(&name).is_some());
continue; while let Some(event) = stream.next().await {
},
};
match event { match event {
Event::Message { from, to, msg } => { BrokerEvent::ClientEvent(Event::Message { from, to, msg }) => {
for addr in to { for addr in to {
if let Some(peer) = peers.get_mut(&addr) { if let Some(peer) = peers.get_mut(&addr) {
let msg = format!("from {}: {}\n", from, msg); let msg = format!("from {}: {}\n", from, msg);
peer.send(msg).await.unwrap(); peer.send(msg).await;
} }
} }
} }
Event::NewPeer { BrokerEvent::ClientEvent(Event::NewPeer {
name, name,
stream, stream,
shutdown, shutdown,
} => match peers.entry(name.clone()) { }) => match peers.entry(name.clone()) {
Entry::Occupied(..) => (), Entry::Occupied(..) => (),
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
let (client_sender, mut client_receiver) = mpsc::unbounded(); let (client_sender, mut client_receiver) = channel(10);
entry.insert(client_sender); entry.insert(client_sender);
let mut disconnect_sender = disconnect_sender.clone(); let disconnect_sender = disconnect_sender.clone();
spawn_and_log_error(async move { spawn_and_log_error(async move {
let res = let res =
connection_writer_loop(&mut client_receiver, stream, shutdown).await; connection_writer_loop(&mut client_receiver, stream, shutdown).await;
disconnect_sender disconnect_sender
.send((name, client_receiver)) .send((name, client_receiver))
.await .await;
.unwrap();
res res
}); });
} }
}, }
BrokerEvent::Disconnection((name, _pending_messages)) => {
assert!(peers.remove(&name).is_some());
}
BrokerEvent::Shutdown => break,
} }
} }
drop(peers); drop(peers);
drop(disconnect_sender); drop(disconnect_sender);
while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {} while let Some(BrokerEvent::Disconnection((_name, _pending_messages))) = stream.next().await {}
} }
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()> fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>