use async_std::sync::{Arc, Mutex}; use crate::bus::{BusConnection, IncomingMessage, Receiver, BusClient, ProcessReceiver, RequestEnvelope, RawReceiver, Response}; use std::collections::HashMap; use crate::utils::asyn::AsyncQueue; use async_std::task; use std::fmt::{Debug, Formatter, Error}; use futures::{Stream, StreamExt}; use std::task::{Context, Poll}; use std::future::Future; use std::pin::Pin; #[derive(Clone, Debug)] pub struct BusServer { process_connection: ProcessReceiver, process_client: BusClient, state: Arc>, } #[derive(Clone, Debug)] pub struct BusServerState { receivers: Vec, clients: ClientPool, message_queue: AsyncQueue, } impl BusServer { pub fn new() -> Self { let receiver = ProcessReceiver::new(); BusServer { process_connection: receiver.clone(), process_client: BusClient::new(Box::new(receiver)), state: Arc::new(Mutex::new(BusServerState { receivers: vec![], clients: ClientPool::new(), message_queue: AsyncQueue::new(), })), } } pub async fn attach_process_receiver(&mut self) { let copy = Box::new(self.process_connection.clone()) as Box; self.add_receiver(Receiver::new(copy)).await; } pub async fn add_receiver(&mut self, receiver: Receiver) { let mut state = self.state.lock().await; state.receivers.push(receiver); } pub async fn run(&mut self, handler: impl MessageHandler) { for receiver in &self.state.lock().await.receivers { let mut cloned_receiver = receiver.clone(); let mut cloned_server = self.clone(); task::spawn(async move { while let Some(client) = cloned_receiver.get().await { cloned_server.register_client(client).await; } }); } self.message_handler(handler).await; } async fn message_handler(&mut self, mut handler: impl MessageHandler) { let mut queue = { let state = self.state.lock().await; state.message_queue.clone() }; while let Some(message) = queue.next().await { handler.handle_message(message, self.clone()).await; } } async fn register_client(&mut self, client: Box) { let mut state = self.state.lock().await; let mut client = state.clients.register_client(client, self.clone()).await; task::spawn(async move { client.listen().await }); } async fn queue(&self, message: IncomingMessage) { let mut queue = { let state = self.state.lock().await; state.message_queue.clone() }; queue.push(message); } pub fn get_process_client(&self) -> BusClient { self.process_client.clone() } } #[async_trait] pub trait MessageHandler { async fn handle_message(&mut self, message: IncomingMessage, server: BusServer); } #[derive(Clone, Debug)] pub struct ClientPool { state: Arc> } impl ClientPool { async fn register_client(&mut self, client: Box, server: BusServer) -> Client { self.state.lock().await.register_client(client, server) } pub fn new() -> Self { ClientPool { state: Arc::new(Mutex::new(ClientMap::new())) } } } #[derive(Debug)] pub struct ClientMap { id: u64, clients: HashMap, } impl ClientMap { fn new() -> ClientMap { ClientMap { id: 0, clients: Default::default(), } } fn register_client(&mut self, client: Box, server: BusServer) -> Client { let client_id = self.id; self.id += 1; self.clients.insert(client_id, Client { raw_client: Arc::new(Mutex::new(RawClient { _id: client_id, connection: client, })), bus_server: server, message_queue: AsyncQueue::new(), id: client_id, }); let client = self.clients.get(&client_id).unwrap(); client.clone() } } impl Debug for Client { fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { write!(f, "Client {{ #{} }}", self.id) } } #[derive(Clone)] pub struct Client { raw_client: Arc>, bus_server: BusServer, message_queue: AsyncQueue, id: u64, } #[derive(Clone)] pub enum Message { In(RequestEnvelope), Ack(u64), Response(u64, Response), Close, } impl Client { async fn listen(&mut self) { let client = self.raw_client.clone(); while let Some(message) = self.next().await { match message { Message::In(message) => self.handle_request(message).await, Message::Ack(id) => client.lock().await.connection.ack(id).await, _ => {} } } self.raw_client.lock().await.close().await; } async fn handle_request(&self, request: RequestEnvelope) { self.bus_server.queue(IncomingMessage { client: self.clone(), request, }).await; } pub fn ack(&mut self, id: u64) { self.message_queue.push(Message::Ack(id)); } pub fn send(&mut self, id: u64, response: Response) { self.message_queue.push(Message::Response(id, response)); } } impl Stream for Client { type Item = Message; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let queue_pin = unsafe { Pin::new_unchecked(&mut self.message_queue) }; match Stream::poll_next(queue_pin, cx) { Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(message)) => return Poll::Ready(Some(message)), _ => {} } let mut lock = self.raw_client.lock(); if let Poll::Ready(mut client) = Future::poll(unsafe { Pin::new_unchecked(&mut lock) }, cx) { let connection_pin = unsafe { Pin::new_unchecked(&mut client.connection) }; match Stream::poll_next(connection_pin, cx) { Poll::Ready(None) => return Poll::Ready(None), Poll::Ready(Some(message)) => return Poll::Ready(Some(Message::In(message))), _ => {} } } Poll::Pending } } struct RawClient { _id: u64, connection: Box, } impl RawClient { async fn close(&mut self) { self.connection.close().await; } }