From 659b44dfe6c8541226c1159af77ab8df29697a04 Mon Sep 17 00:00:00 2001 From: eater <=@eater.me> Date: Wed, 1 Jan 2020 01:44:56 +0100 Subject: [PATCH] replace singal queue with command bus --- Cargo.lock | 121 +++++++++++++++ sysf-unitmgr/Cargo.toml | 3 +- sysf-unitmgr/src/main.rs | 15 +- sysf-unitmgr/src/manager.rs | 67 ++------ sysf/Cargo.toml | 2 + sysf/src/bus/client.rs | 47 ++++++ sysf/src/bus/connection.rs | 38 +++++ sysf/src/bus/connection_process.rs | 99 ++++++++++++ sysf/src/bus/connection_unix.rs | 0 sysf/src/bus/mod.rs | 41 +++++ sysf/src/bus/server.rs | 240 +++++++++++++++++++++++++++++ sysf/src/lib.rs | 2 + sysf/src/utils/asyn.rs | 71 ++++++++- 13 files changed, 685 insertions(+), 61 deletions(-) create mode 100644 sysf/src/bus/client.rs create mode 100644 sysf/src/bus/connection.rs create mode 100644 sysf/src/bus/connection_process.rs create mode 100644 sysf/src/bus/connection_unix.rs create mode 100644 sysf/src/bus/mod.rs create mode 100644 sysf/src/bus/server.rs diff --git a/Cargo.lock b/Cargo.lock index e33b0b4..21c646f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,6 +67,15 @@ name = "autocfg" version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "bincode" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "bitflags" version = "1.2.1" @@ -85,6 +94,11 @@ dependencies = [ "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "byteorder" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "cfg-if" version = "0.1.10" @@ -152,6 +166,29 @@ name = "fuchsia-zircon-sys" version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-executor 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-channel" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "futures-channel-preview" version = "0.3.0-alpha.19" @@ -171,21 +208,70 @@ name = "futures-core-preview" version = "0.3.0-alpha.19" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-executor" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "futures-io" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-macro" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "futures-sink" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "futures-sink-preview" version = "0.3.0-alpha.19" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-task" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "futures-timer" version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "futures-util" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "memchr 2.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", + "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "futures-util-preview" version = "0.3.0-alpha.19" @@ -371,6 +457,21 @@ name = "pin-utils" version = "0.1.0-alpha.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "proc-macro-hack" +version = "0.5.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "proc-macro2 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "proc-macro-nested" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "proc-macro2" version = "1.0.6" @@ -418,6 +519,11 @@ name = "semver-parser" version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "serde" +version = "1.0.104" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "slab" version = "0.4.2" @@ -448,6 +554,8 @@ dependencies = [ "async-macros 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "async-std 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "async-trait 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "bincode 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -455,6 +563,7 @@ name = "sysf-unitmgr" version = "0.1.0" dependencies = [ "async-std 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "async-trait 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", "sysf 0.1.0", ] @@ -508,8 +617,10 @@ dependencies = [ "checksum async-task 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "abdb6048336bef96e8c8fc5573536c5cc5b391fbfd0980349959b7c3f7a40d19" "checksum async-trait 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)" = "c8df72488e87761e772f14ae0c2480396810e51b2c2ade912f97f0f7e5b95e3c" "checksum autocfg 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "1d49d90015b3c36167a20fe2810c5cd875ad504b39cff3d4eae7977e6b7c1cb2" +"checksum bincode 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5753e2a71534719bf3f4e57006c3a4f0d2c672a4b676eec84161f763eca87dbf" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum broadcaster 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)" = "07a1446420a56f1030271649ba0da46d23239b3a68c73591cea5247f15a788a0" +"checksum byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c3dd8985a7111efc5c80b44e23ecdd8c007de8ade3b96595387e812b957cf5" "checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" "checksum crossbeam-channel 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "acec9a3b0b3559f15aee4f90746c4e5e293b701c0f7d3925d24e01645267b68c" @@ -518,12 +629,19 @@ dependencies = [ "checksum crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ce446db02cdc3165b94ae73111e570793400d0794e46125cc4056c81cbb039f4" "checksum fuchsia-zircon 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" "checksum fuchsia-zircon-sys 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" +"checksum futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b6f16056ecbb57525ff698bb955162d0cd03bee84e6241c27ff75c08d8ca5987" +"checksum futures-channel 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fcae98ca17d102fd8a3603727b9259fcf7fa4239b603d2142926189bc8999b86" "checksum futures-channel-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)" = "d5e5f4df964fa9c1c2f8bddeb5c3611631cacd93baf810fc8bb2fb4b495c263a" "checksum futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "79564c427afefab1dfb3298535b21eda083ef7935b4f0ecbfcb121f0aec10866" "checksum futures-core-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)" = "b35b6263fb1ef523c3056565fa67b1d16f0a8604ff12b11b08c25f28a734c60a" +"checksum futures-executor 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "1e274736563f686a837a0568b478bdabfeaec2dca794b5649b04e2fe1627c231" "checksum futures-io 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e676577d229e70952ab25f3945795ba5b16d63ca794ca9d2c860e5595d20b5ff" +"checksum futures-macro 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "52e7c56c15537adb4f76d0b7a76ad131cb4d2f4f32d3b0bcabcbe1c7c5e87764" +"checksum futures-sink 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "171be33efae63c2d59e6dbba34186fe0d6394fb378069a76dfd80fdcffd43c16" "checksum futures-sink-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)" = "86f148ef6b69f75bb610d4f9a2336d4fc88c4b5b67129d1a340dd0fd362efeec" +"checksum futures-task 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "0bae52d6b29cf440e298856fec3965ee6fa71b06aa7495178615953fd669e5f9" "checksum futures-timer 2.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a1de7508b218029b0f01662ed8f61b1c964b3ae99d6f25462d0f55a595109df6" +"checksum futures-util 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c0d66274fb76985d3c62c886d1da7ac4c0903a8c9f754e8fe0f35a6a6cc39e76" "checksum futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)" = "5ce968633c17e5f97936bd2797b6e38fb56cf16a7422319f7ec2e30d3c470e8d" "checksum hermit-abi 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f629dc602392d3ec14bfc8a09b5e644d7ffd725102b48b81e59f90f2633621d7" "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" @@ -546,6 +664,8 @@ dependencies = [ "checksum parking_lot_core 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b876b1b9e7ac6e1a74a6da34d25c42e17e8862aa409cbbbdcfc8d86c6f3bc62b" "checksum pin-project-lite 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f0af6cbca0e6e3ce8692ee19fb8d734b641899e07b68eb73e9bbbd32f1703991" "checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" +"checksum proc-macro-hack 0.5.11 (registry+https://github.com/rust-lang/crates.io-index)" = "ecd45702f76d6d3c75a80564378ae228a85f0b59d2f3ed43c91b4a69eb2ebfc5" +"checksum proc-macro-nested 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "369a6ed065f249a159e06c45752c780bda2fb53c995718f9e484d08daa9eb42e" "checksum proc-macro2 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "9c9e470a8dc4aeae2dee2f335e8f533e2d4b347e1434e5671afc49b054592f27" "checksum quote 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "053a8c8bcc71fcce321828dc897a98ab9760bef03a4fc36693c231e5b3216cfe" "checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" @@ -553,6 +673,7 @@ dependencies = [ "checksum scopeguard 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b42e15e59b18a828bbf5c58ea01debb36b9b096346de35d941dcb89009f24a0d" "checksum semver 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" "checksum semver-parser 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" +"checksum serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)" = "414115f25f818d7dfccec8ee535d76949ae78584fc4f79a6f45a904bf8ab4449" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum smallvec 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)" = "f7b0758c52e15a8b5e3691eae6cc559f08eee9406e548a4477ba4e67770a82b6" "checksum syn 1.0.11 (registry+https://github.com/rust-lang/crates.io-index)" = "dff0acdb207ae2fe6d5976617f887eb1e35a2ba52c13c7234c790960cdad9238" diff --git a/sysf-unitmgr/Cargo.toml b/sysf-unitmgr/Cargo.toml index 28be3f4..5795cae 100644 --- a/sysf-unitmgr/Cargo.toml +++ b/sysf-unitmgr/Cargo.toml @@ -8,4 +8,5 @@ edition = "2018" [dependencies] async-std = { version = "1.4.0", features = ["default", "unstable", "attributes"] } -sysf = { path = "../sysf" } \ No newline at end of file +sysf = { path = "../sysf" } +async-trait = "0.1.22" \ No newline at end of file diff --git a/sysf-unitmgr/src/main.rs b/sysf-unitmgr/src/main.rs index e9d5b59..db75682 100644 --- a/sysf-unitmgr/src/main.rs +++ b/sysf-unitmgr/src/main.rs @@ -1,10 +1,11 @@ extern crate sysf; extern crate async_std; +#[macro_use] +extern crate async_trait; -use sysf::config::loader::Loader; use async_std::task; -use crate::manager::{Manager, ManagerSignal}; -use std::time::Duration; +use sysf::config::loader::Loader; +use crate::manager::Manager; mod manager; @@ -12,13 +13,11 @@ mod manager; async fn main() { let loader = Loader::with_root("/home/eater/projects/systemf/resources/systemd-root"); let mut manager = Manager::new(loader); - manager.target("sysinit.target"); - - let mut remote = manager.get_remote(); + let mut manager_clone = manager.clone(); task::spawn(async move { - task::sleep(Duration::from_secs(10)).await; - remote.signal(ManagerSignal::Noop); + manager_clone.target("sysinit.target").await; + println!("Target set."); }); manager.run().await; diff --git a/sysf-unitmgr/src/manager.rs b/sysf-unitmgr/src/manager.rs index 5f3041a..d195ddf 100644 --- a/sysf-unitmgr/src/manager.rs +++ b/sysf-unitmgr/src/manager.rs @@ -2,28 +2,13 @@ use sysf::registry::{DependencyTree, Registry}; use sysf::unit::UnitState; use std::collections::HashMap; use sysf::config::loader::Loader; -use sysf::utils::asyn::AsyncQueue; -use async_std::prelude::StreamExt; -use async_std::task; -use crate::manager::ManagerSignal::*; use async_std::sync::{Arc, Mutex, MutexGuard}; +use sysf::bus::{BusServer, BusClient, Request, MessageHandler, IncomingMessage}; #[derive(Clone, Debug)] -pub enum ManagerSignal { - TargetUpdate(String), - Noop, -} - -impl Default for ManagerSignal { - fn default() -> Self { - Noop - } -} - -#[derive(Clone, Default, Debug)] pub struct Manager { state: Arc>, - queue: AsyncQueue, + bus_server: BusServer, } #[derive(Clone, Default, Debug)] @@ -31,21 +16,19 @@ pub struct ManagerState { dependency_tree: DependencyTree, unit_status: HashMap, registry: Registry, - queue: AsyncQueue, } impl Manager { pub fn new(loader: Loader) -> Manager { - let queue = AsyncQueue::new(); + let bus_server = BusServer::new(); Manager { state: Arc::new(Mutex::new(ManagerState { dependency_tree: DependencyTree::default(), unit_status: HashMap::new(), registry: Registry::with_loader(loader), - queue: queue.clone(), })), - queue, + bus_server, } } @@ -56,47 +39,31 @@ impl Manager { block(self.state.lock().await) } - pub fn target(&mut self, target: &str) { - self.queue.push(ManagerSignal::TargetUpdate(target.to_string())); - } - - #[allow(dead_code)] - pub fn signal(&mut self, signal: ManagerSignal) { - self.queue.push(signal); + pub async fn target(&mut self, _target: &str) { + self.get_client().send(Request).await; } pub async fn run(&mut self) { - while let Some(signal) = self.queue.next().await { - let copy: Manager = self.clone(); - task::spawn(async { - match signal { - TargetUpdate(target) => { - copy.update_target(target).await; - } - - _ => {} - } - }); - } + self.bus_server.attach_process_receiver().await; + self.bus_server.run(self.clone()).await; } - pub fn get_remote(&self) -> ManagerRemote { - ManagerRemote(self.queue.clone()) - } - async fn update_target(self, target: String) { + + async fn _update_target(&self, target: String) { let mut me = self.state.lock().await; let tree = me.registry.get_dependency_tree(&target).await; me.dependency_tree = tree; + } - + fn get_client(&self) -> BusClient { + self.bus_server.get_process_client() } } -pub struct ManagerRemote(AsyncQueue); - -impl ManagerRemote { - pub fn signal(&mut self, signal: ManagerSignal) { - self.0.push(signal); +#[async_trait] +impl MessageHandler for Manager { + async fn handle_message(&mut self, mut message: IncomingMessage, _server: BusServer) { + message.ack().await; } } \ No newline at end of file diff --git a/sysf/Cargo.toml b/sysf/Cargo.toml index 2785880..75e03c8 100644 --- a/sysf/Cargo.toml +++ b/sysf/Cargo.toml @@ -8,3 +8,5 @@ edition = "2018" async-std = "1.4.0" async-macros = "2.0.0" async-trait = "0.1.22" +bincode = "1.2.1" +futures = "0.3.1" diff --git a/sysf/src/bus/client.rs b/sysf/src/bus/client.rs new file mode 100644 index 0000000..f5c1ec7 --- /dev/null +++ b/sysf/src/bus/client.rs @@ -0,0 +1,47 @@ +use async_std::sync::{Arc, Mutex}; +use crate::bus::{Request, RequestEnvelope, ResponseEnvelope}; +use std::fmt::{Debug, Formatter, Error}; + +#[async_trait] +pub trait RawBusClient: Send { + async fn send(&mut self, request: RequestEnvelope) -> ResponseEnvelope; +} + +#[derive(Clone)] +pub struct BusClient(Arc>); + +impl Debug for BusClient { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + write!(f, "BusClient()") + } +} + +pub struct BusClientState { + id: u64, + raw_client: Box, +} + +impl BusClient { + pub fn new(raw_client: Box) -> BusClient { + BusClient(Arc::new(Mutex::new( + BusClientState { + id: 0, + raw_client, + } + ))) + } + + pub async fn send(&mut self, request: Request) -> ResponseEnvelope { + let mut state = self.0.lock().await; + let id = state.id; + state.id += 1; + + let envelope = RequestEnvelope { + id, + request, + }; + + state.raw_client.send(envelope).await + } +} + diff --git a/sysf/src/bus/connection.rs b/sysf/src/bus/connection.rs new file mode 100644 index 0000000..5d1c59e --- /dev/null +++ b/sysf/src/bus/connection.rs @@ -0,0 +1,38 @@ +use crate::bus::{Response, RequestEnvelope}; +use async_std::sync::{Arc, Mutex}; +use std::fmt::{Formatter, Debug, Error}; +use futures::Stream; + +#[async_trait] +pub trait BusConnection: Send + Stream + Unpin { + async fn close(&mut self); + async fn ack(&mut self, id: u64); + async fn send(&mut self, id: u64, message: Response); +} + +#[derive(Clone)] +pub struct Receiver(Arc>>, String); + +impl Debug for Receiver { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + write!(f, "Receiver {{ {} }}", self.1) + } +} + +impl Receiver { + pub fn new(receiver: Box) -> Receiver { + let name = receiver.name(); + Receiver(Arc::new(Mutex::new(receiver)), name.to_string()) + } + + pub async fn get(&mut self) -> Option> { + self.0.lock().await.get().await + } +} + +#[async_trait] +pub trait RawReceiver: Send { + fn name<'s>(&self) -> &'s str; + async fn disconnect(&mut self); + async fn get(&mut self) -> Option>; +} diff --git a/sysf/src/bus/connection_process.rs b/sysf/src/bus/connection_process.rs new file mode 100644 index 0000000..843c188 --- /dev/null +++ b/sysf/src/bus/connection_process.rs @@ -0,0 +1,99 @@ +use std::collections::HashMap; +use crate::bus::{Response, BusConnection, RawReceiver, RequestEnvelope, ResponseEnvelope, RawBusClient}; +use crate::utils::asyn::{AsyncQueue, Deferred}; +use futures::Stream; +use async_std::sync::{Arc, Mutex}; +use async_macros::utils::task::{Context, Poll}; +use async_macros::utils::pin::Pin; + +#[derive(Clone, Debug)] +pub struct ProcessReceiver { + state: Arc>, + request_queue: AsyncQueue, +} + +impl ProcessReceiver { + pub fn new() -> Self { + ProcessReceiver { + state: Arc::new(Mutex::new(ProcessReceiverState { + id: 0, + callbacks: Default::default(), + registered_client: false, + })), + request_queue: Default::default(), + } + } + + async fn run_callback(&mut self, id: u64, response: Option) { + let mut state = self.state.lock().await; + if let Some(mut deferred) = state.callbacks.remove(&id) { + deferred.complete(ResponseEnvelope { + id, + response, + }) + } + } +} + +#[async_trait] +impl RawReceiver for ProcessReceiver { + fn name<'s>(&self) -> &'s str { + "in-process" + } + + async fn disconnect(&mut self) {} + + async fn get(&mut self) -> Option> { + let mut state = self.state.lock().await; + if state.registered_client { + None + } else { + state.registered_client = true; + let copy = Box::new(self.clone()) as Box; + Some(copy) + } + } +} + +#[derive(Debug)] +struct ProcessReceiverState { + id: u64, + callbacks: HashMap>, + registered_client: bool, +} + +#[async_trait] +impl BusConnection for ProcessReceiver { + async fn close(&mut self) {} + + async fn ack(&mut self, id: u64) { + self.run_callback(id, None).await; + } + + async fn send(&mut self, id: u64, message: Response) { + self.run_callback(id, Some(message)).await; + } +} + +impl Stream for ProcessReceiver { + type Item = RequestEnvelope; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Stream::poll_next(unsafe { Pin::new_unchecked(&mut self.request_queue) }, cx) + } +} + +#[async_trait] +impl RawBusClient for ProcessReceiver { + async fn send(&mut self, request: RequestEnvelope) -> ResponseEnvelope { + let deferred = Deferred::new(); + let def_clone = deferred.clone(); + { + let mut state = self.state.lock().await; + state.callbacks.insert(request.id, def_clone); + } + + self.request_queue.push(request); + deferred.await + } +} \ No newline at end of file diff --git a/sysf/src/bus/connection_unix.rs b/sysf/src/bus/connection_unix.rs new file mode 100644 index 0000000..e69de29 diff --git a/sysf/src/bus/mod.rs b/sysf/src/bus/mod.rs new file mode 100644 index 0000000..a016f06 --- /dev/null +++ b/sysf/src/bus/mod.rs @@ -0,0 +1,41 @@ +mod client; +mod server; +mod connection; +mod connection_process; +mod connection_unix; + +pub use client::*; +pub use server::*; +pub use connection::*; +pub use connection_unix::*; +pub use connection_process::*; + +#[derive(Clone, Default, Debug)] +pub struct Response; + +#[derive(Clone, Default, Debug)] +pub struct ResponseEnvelope { + id: u64, + response: Option, +} + +#[derive(Clone, Default, Debug)] +pub struct Request; + +#[derive(Clone, Default, Debug)] +pub struct RequestEnvelope { + id: u64, + request: Request, +} + +#[derive(Clone, Debug)] +pub struct IncomingMessage { + client: Client, + request: RequestEnvelope, +} + +impl IncomingMessage { + pub async fn ack(&mut self) { + self.client.ack(self.request.id); + } +} \ No newline at end of file diff --git a/sysf/src/bus/server.rs b/sysf/src/bus/server.rs new file mode 100644 index 0000000..09f8179 --- /dev/null +++ b/sysf/src/bus/server.rs @@ -0,0 +1,240 @@ +use async_std::sync::{Arc, Mutex}; +use crate::bus::{BusConnection, IncomingMessage, Receiver, BusClient, ProcessReceiver, RequestEnvelope, RawReceiver, Response}; +use std::collections::HashMap; +use crate::utils::asyn::AsyncQueue; +use async_std::task; +use std::fmt::{Debug, Formatter, Error}; +use futures::{Stream, StreamExt}; +use std::task::{Context, Poll}; +use std::future::Future; +use std::pin::Pin; + + +#[derive(Clone, Debug)] +pub struct BusServer { + process_connection: ProcessReceiver, + process_client: BusClient, + state: Arc>, +} + +#[derive(Clone, Debug)] +pub struct BusServerState { + receivers: Vec, + clients: ClientPool, + message_queue: AsyncQueue, +} + +impl BusServer { + pub fn new() -> Self { + let receiver = ProcessReceiver::new(); + BusServer { + process_connection: receiver.clone(), + process_client: BusClient::new(Box::new(receiver)), + state: Arc::new(Mutex::new(BusServerState { + receivers: vec![], + clients: ClientPool::new(), + message_queue: AsyncQueue::new(), + })), + } + } + + pub async fn attach_process_receiver(&mut self) { + let copy = Box::new(self.process_connection.clone()) as Box; + self.add_receiver(Receiver::new(copy)).await; + } + + pub async fn add_receiver(&mut self, receiver: Receiver) { + let mut state = self.state.lock().await; + state.receivers.push(receiver); + } + + pub async fn run(&mut self, handler: impl MessageHandler) { + for receiver in &self.state.lock().await.receivers { + let mut cloned_receiver = receiver.clone(); + let mut cloned_server = self.clone(); + + task::spawn(async move { + while let Some(client) = cloned_receiver.get().await { + cloned_server.register_client(client).await; + } + }); + } + + self.message_handler(handler).await; + } + + async fn message_handler(&mut self, mut handler: impl MessageHandler) { + let mut queue = { + let state = self.state.lock().await; + state.message_queue.clone() + }; + + while let Some(message) = queue.next().await { + handler.handle_message(message, self.clone()).await; + } + } + + async fn register_client(&mut self, client: Box) { + let mut state = self.state.lock().await; + let mut client = state.clients.register_client(client, self.clone()).await; + task::spawn(async move { + client.listen().await + }); + } + + async fn queue(&self, message: IncomingMessage) { + let mut queue = { + let state = self.state.lock().await; + state.message_queue.clone() + }; + queue.push(message); + } + + pub fn get_process_client(&self) -> BusClient { + self.process_client.clone() + } +} + +#[async_trait] +pub trait MessageHandler { + async fn handle_message(&mut self, message: IncomingMessage, server: BusServer); +} + +#[derive(Clone, Debug)] +pub struct ClientPool { + state: Arc> +} + +impl ClientPool { + async fn register_client(&mut self, client: Box, server: BusServer) -> Client { + self.state.lock().await.register_client(client, server) + } + + pub fn new() -> Self { + ClientPool { state: Arc::new(Mutex::new(ClientMap::new())) } + } +} + +#[derive(Debug)] +pub struct ClientMap { + id: u64, + clients: HashMap, +} + +impl ClientMap { + fn new() -> ClientMap { + ClientMap { + id: 0, + clients: Default::default(), + } + } + + fn register_client(&mut self, client: Box, server: BusServer) -> Client { + let client_id = self.id; + self.id += 1; + self.clients.insert(client_id, Client { + raw_client: Arc::new(Mutex::new(RawClient { + _id: client_id, + connection: client, + })), + bus_server: server, + message_queue: AsyncQueue::new(), + id: client_id, + }); + + let client = self.clients.get(&client_id).unwrap(); + + client.clone() + } +} + + +impl Debug for Client { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error> { + write!(f, "Client {{ #{} }}", self.id) + } +} + +#[derive(Clone)] +pub struct Client { + raw_client: Arc>, + bus_server: BusServer, + message_queue: AsyncQueue, + id: u64, +} + + +#[derive(Clone)] +pub enum Message { + In(RequestEnvelope), + Ack(u64), + Response(u64, Response), + Close, +} + +impl Client { + async fn listen(&mut self) { + let client = self.raw_client.clone(); + + while let Some(message) = self.next().await { + match message { + Message::In(message) => self.handle_request(message).await, + Message::Ack(id) => client.lock().await.connection.ack(id).await, + _ => {} + } + } + + self.raw_client.lock().await.close().await; + } + + async fn handle_request(&self, request: RequestEnvelope) { + self.bus_server.queue(IncomingMessage { + client: self.clone(), + request, + }).await; + } + + pub fn ack(&mut self, id: u64) { + self.message_queue.push(Message::Ack(id)); + } + + pub fn send(&mut self, id: u64, response: Response) { + self.message_queue.push(Message::Response(id, response)); + } +} + +impl Stream for Client { + type Item = Message; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let queue_pin = unsafe { Pin::new_unchecked(&mut self.message_queue) }; + match Stream::poll_next(queue_pin, cx) { + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(message)) => return Poll::Ready(Some(message)), + _ => {} + } + + let mut lock = self.raw_client.lock(); + if let Poll::Ready(mut client) = Future::poll(unsafe { Pin::new_unchecked(&mut lock) }, cx) { + let connection_pin = unsafe { Pin::new_unchecked(&mut client.connection) }; + match Stream::poll_next(connection_pin, cx) { + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(message)) => return Poll::Ready(Some(Message::In(message))), + _ => {} + } + } + + Poll::Pending + } +} + +struct RawClient { + _id: u64, + connection: Box, +} + +impl RawClient { + async fn close(&mut self) { + self.connection.close().await; + } +} \ No newline at end of file diff --git a/sysf/src/lib.rs b/sysf/src/lib.rs index f6c6d1c..9c297af 100644 --- a/sysf/src/lib.rs +++ b/sysf/src/lib.rs @@ -2,7 +2,9 @@ extern crate async_std; extern crate async_macros; #[macro_use] extern crate async_trait; +extern crate futures; +pub mod bus; pub mod registry; pub mod config; pub mod unit; diff --git a/sysf/src/utils/asyn.rs b/sysf/src/utils/asyn.rs index f6caf05..29524d7 100644 --- a/sysf/src/utils/asyn.rs +++ b/sysf/src/utils/asyn.rs @@ -7,6 +7,7 @@ use async_std::stream::Stream; use std::collections::VecDeque; use std::sync::Mutex; use async_std::sync::Arc; +use std::mem; #[async_trait] pub trait IntoResults { @@ -40,9 +41,19 @@ impl + Send> IntoResults> for Vec { } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] pub struct AsyncQueue(Arc>>); +impl Default for AsyncQueue { + fn default() -> Self { + AsyncQueue(Arc::new(Mutex::new(AsyncQueueState { + closed: false, + waker: None, + queue: Default::default(), + }))) + } +} + impl AsyncQueue { pub fn push(&mut self, value: T) { let mut res = self.0.lock().unwrap(); @@ -69,13 +80,23 @@ impl AsyncQueue { } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug)] struct AsyncQueueState { closed: bool, waker: Option, queue: VecDeque, } +impl Default for AsyncQueueState { + fn default() -> Self { + AsyncQueueState { + closed: false, + waker: None, + queue: Default::default(), + } + } +} + impl Stream for AsyncQueue { type Item = T; @@ -93,4 +114,50 @@ impl Stream for AsyncQueue { Poll::Pending } +} + +#[derive(Clone, Debug)] +pub struct Deferred { + state: Arc>>, +} + +impl Deferred { + pub fn new() -> Self { + Deferred { + state: Arc::new(Mutex::new(DeferredState { + waker: None, + result: None, + })) + } + } + + pub fn complete(&mut self, result: T) { + let mut state = self.state.lock().unwrap(); + state.result = Some(result); + if let Some(waker) = &state.waker { + waker.wake_by_ref() + } + } +} + +#[derive(Clone, Debug)] +struct DeferredState { + waker: Option, + result: Option, +} + +impl Future for Deferred { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut state = self.state.lock().unwrap(); + state.waker = Some(cx.waker().clone()); + + if let Some(_) = &state.result { + let result = mem::replace(&mut state.result, None); + Poll::Ready(result.unwrap()) + } else { + Poll::Pending + } + } } \ No newline at end of file