get reactor compiling

pull/742/head
dignifiedquire 5 years ago
parent 80870af3bb
commit 9d0f2addd3

@ -31,6 +31,7 @@ default = [
"kv-log-macro", "kv-log-macro",
"log", "log",
"mio", "mio",
"mio/os-poll",
"mio/tcp", "mio/tcp",
"mio/udp", "mio/udp",
"mio/uds", "mio/uds",

@ -9,6 +9,9 @@ use crate::io;
use crate::rt::RUNTIME; use crate::rt::RUNTIME;
use crate::task::{Context, Poll, Waker}; use crate::task::{Context, Poll, Waker};
// TODO: ADD AIO and LIO?
const INTEREST_ALL: Interest = Interest::READABLE.add(Interest::WRITABLE);
/// Data associated with a registered I/O handle. /// Data associated with a registered I/O handle.
#[derive(Debug)] #[derive(Debug)]
struct Entry { struct Entry {
@ -25,7 +28,7 @@ struct Entry {
/// The state of a networking driver. /// The state of a networking driver.
pub struct Reactor { pub struct Reactor {
/// A mio instance that polls for new events. /// A mio instance that polls for new events.
poller: mio::Poll, poller: Mutex<mio::Poll>,
/// A list into which mio stores events. /// A list into which mio stores events.
events: Mutex<mio::Events>, events: Mutex<mio::Events>,
@ -33,8 +36,8 @@ pub struct Reactor {
/// A collection of registered I/O handles. /// A collection of registered I/O handles.
entries: Mutex<Slab<Arc<Entry>>>, entries: Mutex<Slab<Arc<Entry>>>,
/// Dummy I/O handle that is only used to wake up the polling thread. /// Mio waker that is only used to wake up the polling thread.
notify_reg: (mio::Registration, mio::SetReadiness), notify_waker: mio::Waker,
/// An identifier for the notification handle. /// An identifier for the notification handle.
notify_token: mio::Token, notify_token: mio::Token,
@ -64,22 +67,20 @@ impl Reactor {
/// Creates a new reactor for polling I/O events. /// Creates a new reactor for polling I/O events.
pub fn new() -> io::Result<Reactor> { pub fn new() -> io::Result<Reactor> {
let poller = mio::Poll::new()?; let poller = mio::Poll::new()?;
todo!();
// let notify_reg = mio::Registration::new2(); // Register a waker for waking up the polling thread.
let notify_token = mio::Token(0); // TODO: is this being 0 okay?
// let mut reactor = Reactor { let notify_waker = mio::Waker::new(poller.registry(), notify_token)?;
// poller,
// events: Mutex::new(mio::Events::with_capacity(1000)), let reactor = Reactor {
// entries: Mutex::new(Slab::new()), poller: Mutex::new(poller),
// notify_reg, events: Mutex::new(mio::Events::with_capacity(1000)),
// notify_token: mio::Token(0), entries: Mutex::new(Slab::new()),
// }; notify_waker,
notify_token,
// // Register a dummy I/O handle for waking up the polling thread. };
// let entry = reactor.register(&reactor.notify_reg.0)?;
// reactor.notify_token = entry.token; Ok(reactor)
// Ok(reactor)
} }
/// Registers an I/O event source and returns its associated entry. /// Registers an I/O event source and returns its associated entry.
@ -105,9 +106,11 @@ impl Reactor {
vacant.insert(entry.clone()); vacant.insert(entry.clone());
// Register the I/O event source in the poller. // Register the I/O event source in the poller.
// TODO: ADD AIO and LIO? self.poller
const interest: Interest = Interest::READABLE.add(Interest::WRITABLE); .lock()
self.poller.registry().register(source, token, interest)?; .unwrap()
.registry()
.register(source, token, INTEREST_ALL)?;
Ok(entry) Ok(entry)
} }
@ -115,7 +118,7 @@ impl Reactor {
/// Deregisters an I/O event source associated with an entry. /// Deregisters an I/O event source associated with an entry.
fn deregister(&self, source: &mut dyn Source, entry: &Entry) -> io::Result<()> { fn deregister(&self, source: &mut dyn Source, entry: &Entry) -> io::Result<()> {
// Deregister the I/O object from the mio instance. // Deregister the I/O object from the mio instance.
self.poller.registry().deregister(source)?; self.poller.lock().unwrap().registry().deregister(source)?;
// Remove the entry associated with the I/O object. // Remove the entry associated with the I/O object.
self.entries.lock().unwrap().remove(entry.token.0); self.entries.lock().unwrap().remove(entry.token.0);
@ -125,8 +128,7 @@ impl Reactor {
/// Notifies the reactor so that polling stops blocking. /// Notifies the reactor so that polling stops blocking.
pub fn notify(&self) -> io::Result<()> { pub fn notify(&self) -> io::Result<()> {
todo!() self.notify_waker.wake()
// self.notify_reg.1.set_readiness(mio::Ready::readable())
} }
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles. /// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
@ -136,7 +138,7 @@ impl Reactor {
let mut events = self.events.lock().unwrap(); let mut events = self.events.lock().unwrap();
// Block on the poller until at least one new event comes in. // Block on the poller until at least one new event comes in.
self.poller.poll(&mut events, timeout)?; self.poller.lock().unwrap().poll(&mut events, timeout)?;
// Lock the entire entry table while we're processing new events. // Lock the entire entry table while we're processing new events.
let entries = self.entries.lock().unwrap(); let entries = self.entries.lock().unwrap();
@ -149,8 +151,7 @@ impl Reactor {
if token == self.notify_token { if token == self.notify_token {
// If this is the notification token, we just need the notification state. // If this is the notification token, we just need the notification state.
todo!() self.notify_waker.wake()?;
// self.notify_reg.1.set_readiness(mio::Ready::empty())?;
} else { } else {
// Otherwise, look for the entry associated with this token. // Otherwise, look for the entry associated with this token.
if let Some(entry) = entries.get(token.0) { if let Some(entry) = entries.get(token.0) {
@ -198,7 +199,7 @@ impl<T: Source> Watcher<T> {
/// ///
/// The provided I/O event source will be kept registered inside the reactor's poller for the /// The provided I/O event source will be kept registered inside the reactor's poller for the
/// lifetime of the returned I/O handle. /// lifetime of the returned I/O handle.
pub fn new(source: T) -> Watcher<T> { pub fn new(mut source: T) -> Watcher<T> {
Watcher { Watcher {
entry: RUNTIME entry: RUNTIME
.reactor() .reactor()
@ -322,7 +323,7 @@ impl<T: Source> Watcher<T> {
/// This method is typically used to convert `Watcher`s to raw file descriptors/handles. /// This method is typically used to convert `Watcher`s to raw file descriptors/handles.
#[allow(dead_code)] #[allow(dead_code)]
pub fn into_inner(mut self) -> T { pub fn into_inner(mut self) -> T {
let source = self.source.take().unwrap(); let mut source = self.source.take().unwrap();
RUNTIME RUNTIME
.reactor() .reactor()
.deregister(&mut source, &self.entry) .deregister(&mut source, &self.entry)

Loading…
Cancel
Save