From 9d0f2addd3cd6be2481e893a71a4c726158e1d7a Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 8 Apr 2020 13:21:05 +0200 Subject: [PATCH] get reactor compiling --- Cargo.toml | 1 + src/rt/reactor.rs | 61 ++++++++++++++++++++++++----------------------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d5bff22d..c07911a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ default = [ "kv-log-macro", "log", "mio", + "mio/os-poll", "mio/tcp", "mio/udp", "mio/uds", diff --git a/src/rt/reactor.rs b/src/rt/reactor.rs index e03749fe..1ea52ca9 100644 --- a/src/rt/reactor.rs +++ b/src/rt/reactor.rs @@ -9,6 +9,9 @@ use crate::io; use crate::rt::RUNTIME; use crate::task::{Context, Poll, Waker}; +// TODO: ADD AIO and LIO? +const INTEREST_ALL: Interest = Interest::READABLE.add(Interest::WRITABLE); + /// Data associated with a registered I/O handle. #[derive(Debug)] struct Entry { @@ -25,7 +28,7 @@ struct Entry { /// The state of a networking driver. pub struct Reactor { /// A mio instance that polls for new events. - poller: mio::Poll, + poller: Mutex, /// A list into which mio stores events. events: Mutex, @@ -33,8 +36,8 @@ pub struct Reactor { /// A collection of registered I/O handles. entries: Mutex>>, - /// Dummy I/O handle that is only used to wake up the polling thread. - notify_reg: (mio::Registration, mio::SetReadiness), + /// Mio waker that is only used to wake up the polling thread. + notify_waker: mio::Waker, /// An identifier for the notification handle. notify_token: mio::Token, @@ -64,22 +67,20 @@ impl Reactor { /// Creates a new reactor for polling I/O events. pub fn new() -> io::Result { let poller = mio::Poll::new()?; - todo!(); - // let notify_reg = mio::Registration::new2(); - - // let mut reactor = Reactor { - // poller, - // events: Mutex::new(mio::Events::with_capacity(1000)), - // entries: Mutex::new(Slab::new()), - // notify_reg, - // notify_token: mio::Token(0), - // }; - - // // Register a dummy I/O handle for waking up the polling thread. - // let entry = reactor.register(&reactor.notify_reg.0)?; - // reactor.notify_token = entry.token; - - // Ok(reactor) + + // Register a waker for waking up the polling thread. + let notify_token = mio::Token(0); // TODO: is this being 0 okay? + let notify_waker = mio::Waker::new(poller.registry(), notify_token)?; + + let reactor = Reactor { + poller: Mutex::new(poller), + events: Mutex::new(mio::Events::with_capacity(1000)), + entries: Mutex::new(Slab::new()), + notify_waker, + notify_token, + }; + + Ok(reactor) } /// Registers an I/O event source and returns its associated entry. @@ -105,9 +106,11 @@ impl Reactor { vacant.insert(entry.clone()); // Register the I/O event source in the poller. - // TODO: ADD AIO and LIO? - const interest: Interest = Interest::READABLE.add(Interest::WRITABLE); - self.poller.registry().register(source, token, interest)?; + self.poller + .lock() + .unwrap() + .registry() + .register(source, token, INTEREST_ALL)?; Ok(entry) } @@ -115,7 +118,7 @@ impl Reactor { /// Deregisters an I/O event source associated with an entry. fn deregister(&self, source: &mut dyn Source, entry: &Entry) -> io::Result<()> { // Deregister the I/O object from the mio instance. - self.poller.registry().deregister(source)?; + self.poller.lock().unwrap().registry().deregister(source)?; // Remove the entry associated with the I/O object. self.entries.lock().unwrap().remove(entry.token.0); @@ -125,8 +128,7 @@ impl Reactor { /// Notifies the reactor so that polling stops blocking. pub fn notify(&self) -> io::Result<()> { - todo!() - // self.notify_reg.1.set_readiness(mio::Ready::readable()) + self.notify_waker.wake() } /// Waits on the poller for new events and wakes up tasks blocked on I/O handles. @@ -136,7 +138,7 @@ impl Reactor { let mut events = self.events.lock().unwrap(); // Block on the poller until at least one new event comes in. - self.poller.poll(&mut events, timeout)?; + self.poller.lock().unwrap().poll(&mut events, timeout)?; // Lock the entire entry table while we're processing new events. let entries = self.entries.lock().unwrap(); @@ -149,8 +151,7 @@ impl Reactor { if token == self.notify_token { // If this is the notification token, we just need the notification state. - todo!() - // self.notify_reg.1.set_readiness(mio::Ready::empty())?; + self.notify_waker.wake()?; } else { // Otherwise, look for the entry associated with this token. if let Some(entry) = entries.get(token.0) { @@ -198,7 +199,7 @@ impl Watcher { /// /// The provided I/O event source will be kept registered inside the reactor's poller for the /// lifetime of the returned I/O handle. - pub fn new(source: T) -> Watcher { + pub fn new(mut source: T) -> Watcher { Watcher { entry: RUNTIME .reactor() @@ -322,7 +323,7 @@ impl Watcher { /// This method is typically used to convert `Watcher`s to raw file descriptors/handles. #[allow(dead_code)] pub fn into_inner(mut self) -> T { - let source = self.source.take().unwrap(); + let mut source = self.source.take().unwrap(); RUNTIME .reactor() .deregister(&mut source, &self.entry)