Compare commits

...

6 Commits

Author SHA1 Message Date
dignifiedquire 4fd8fe03f3 cleanup imports 5 years ago
dignifiedquire 1e99e3453f always call shutdown 5 years ago
dignifiedquire 56769d8b6c fix token handling 5 years ago
dignifiedquire 121bc7d726 set nonblocking mode on conversion from std 5 years ago
dignifiedquire 9d0f2addd3 get reactor compiling 5 years ago
dignifiedquire 80870af3bb mechanical first pass at updating to mio 0.7 5 years ago

@ -31,7 +31,10 @@ default = [
"kv-log-macro", "kv-log-macro",
"log", "log",
"mio", "mio",
"mio-uds", "mio/os-poll",
"mio/tcp",
"mio/udp",
"mio/uds",
"num_cpus", "num_cpus",
"pin-project-lite", "pin-project-lite",
] ]
@ -67,8 +70,7 @@ futures-timer = { version = "3.0.2", optional = true }
kv-log-macro = { version = "1.0.4", optional = true } kv-log-macro = { version = "1.0.4", optional = true }
log = { version = "0.4.8", features = ["kv_unstable"], optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true }
memchr = { version = "2.3.3", optional = true } memchr = { version = "2.3.3", optional = true }
mio = { version = "0.6.19", optional = true } mio = { version = "0.7.0", optional = true }
mio-uds = { version = "0.6.7", optional = true }
num_cpus = { version = "1.12.0", optional = true } num_cpus = { version = "1.12.0", optional = true }
once_cell = { version = "1.3.1", optional = true } once_cell = { version = "1.3.1", optional = true }
pin-project-lite = { version = "0.1.4", optional = true } pin-project-lite = { version = "0.1.4", optional = true }

@ -5,8 +5,8 @@ use std::sync::Arc;
use crate::future; use crate::future;
use crate::io; use crate::io;
use crate::rt::Watcher;
use crate::net::{TcpStream, ToSocketAddrs}; use crate::net::{TcpStream, ToSocketAddrs};
use crate::rt::Watcher;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
@ -79,7 +79,7 @@ impl TcpListener {
let addrs = addrs.to_socket_addrs().await?; let addrs = addrs.to_socket_addrs().await?;
for addr in addrs { for addr in addrs {
match mio::net::TcpListener::bind(&addr) { match mio::net::TcpListener::bind(addr) {
Ok(mio_listener) => { Ok(mio_listener) => {
return Ok(TcpListener { return Ok(TcpListener {
watcher: Watcher::new(mio_listener), watcher: Watcher::new(mio_listener),
@ -114,11 +114,9 @@ impl TcpListener {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (io, addr) = let (mio_stream, addr) =
future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept_std())) future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept())).await?;
.await?;
let mio_stream = mio::net::TcpStream::from_stream(io)?;
let stream = TcpStream { let stream = TcpStream {
watcher: Arc::new(Watcher::new(mio_stream)), watcher: Arc::new(Watcher::new(mio_stream)),
}; };
@ -206,7 +204,12 @@ impl<'a> Stream for Incoming<'a> {
impl From<std::net::TcpListener> for TcpListener { impl From<std::net::TcpListener> for TcpListener {
/// Converts a `std::net::TcpListener` into its asynchronous equivalent. /// Converts a `std::net::TcpListener` into its asynchronous equivalent.
fn from(listener: std::net::TcpListener) -> TcpListener { fn from(listener: std::net::TcpListener) -> TcpListener {
let mio_listener = mio::net::TcpListener::from_std(listener).unwrap(); // Make sure we are in nonblocking mode.
listener
.set_nonblocking(true)
.expect("failed to set nonblocking mode");
let mio_listener = mio::net::TcpListener::from_std(listener);
TcpListener { TcpListener {
watcher: Watcher::new(mio_listener), watcher: Watcher::new(mio_listener),
} }

@ -5,8 +5,8 @@ use std::sync::Arc;
use crate::future; use crate::future;
use crate::io::{self, Read, Write}; use crate::io::{self, Read, Write};
use crate::rt::Watcher;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
use crate::rt::Watcher;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// A TCP stream between a local and a remote socket. /// A TCP stream between a local and a remote socket.
@ -79,7 +79,7 @@ impl TcpStream {
// when it returns with `Ok`. We therefore wait for write readiness to // when it returns with `Ok`. We therefore wait for write readiness to
// be sure the connection has either been established or there was an // be sure the connection has either been established or there was an
// error which we check for afterwards. // error which we check for afterwards.
let watcher = match mio::net::TcpStream::connect(&addr) { let watcher = match mio::net::TcpStream::connect(addr) {
Ok(s) => Watcher::new(s), Ok(s) => Watcher::new(s),
Err(e) => { Err(e) => {
last_err = Some(e); last_err = Some(e);
@ -343,6 +343,7 @@ impl Write for TcpStream {
} }
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.shutdown(std::net::Shutdown::Write)?;
Pin::new(&mut &*self).poll_close(cx) Pin::new(&mut &*self).poll_close(cx)
} }
} }
@ -370,7 +371,12 @@ impl Write for &TcpStream {
impl From<std::net::TcpStream> for TcpStream { impl From<std::net::TcpStream> for TcpStream {
/// Converts a `std::net::TcpStream` into its asynchronous equivalent. /// Converts a `std::net::TcpStream` into its asynchronous equivalent.
fn from(stream: std::net::TcpStream) -> TcpStream { fn from(stream: std::net::TcpStream) -> TcpStream {
let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap(); // Make sure we are in nonblocking mode.
stream
.set_nonblocking(true)
.expect("failed to set nonblocking mode");
let mio_stream = mio::net::TcpStream::from_std(stream);
TcpStream { TcpStream {
watcher: Arc::new(Watcher::new(mio_stream)), watcher: Arc::new(Watcher::new(mio_stream)),
} }

@ -69,12 +69,10 @@ impl UdpSocket {
/// ``` /// ```
pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<UdpSocket> { pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<UdpSocket> {
let mut last_err = None; let mut last_err = None;
let addrs = addrs let addrs = addrs.to_socket_addrs().await?;
.to_socket_addrs()
.await?;
for addr in addrs { for addr in addrs {
match mio::net::UdpSocket::bind(&addr) { match mio::net::UdpSocket::bind(addr) {
Ok(mio_socket) => { Ok(mio_socket) => {
return Ok(UdpSocket { return Ok(UdpSocket {
watcher: Watcher::new(mio_socket), watcher: Watcher::new(mio_socket),
@ -155,7 +153,7 @@ impl UdpSocket {
future::poll_fn(|cx| { future::poll_fn(|cx| {
self.watcher self.watcher
.poll_write_with(cx, |inner| inner.send_to(buf, &addr)) .poll_write_with(cx, |inner| inner.send_to(buf, addr))
}) })
.await .await
.context(|| format!("could not send packet to {}", addr)) .context(|| format!("could not send packet to {}", addr))
@ -498,7 +496,12 @@ impl UdpSocket {
impl From<std::net::UdpSocket> for UdpSocket { impl From<std::net::UdpSocket> for UdpSocket {
/// Converts a `std::net::UdpSocket` into its asynchronous equivalent. /// Converts a `std::net::UdpSocket` into its asynchronous equivalent.
fn from(socket: std::net::UdpSocket) -> UdpSocket { fn from(socket: std::net::UdpSocket) -> UdpSocket {
let mio_socket = mio::net::UdpSocket::from_socket(socket).unwrap(); // Make sure we are in nonblocking mode.
socket
.set_nonblocking(true)
.expect("failed to set nonblocking mode");
let mio_socket = mio::net::UdpSocket::from_std(socket);
UdpSocket { UdpSocket {
watcher: Watcher::new(mio_socket), watcher: Watcher::new(mio_socket),
} }

@ -3,15 +3,11 @@
use std::fmt; use std::fmt;
use std::net::Shutdown; use std::net::Shutdown;
use mio_uds;
use super::SocketAddr;
use crate::future; use crate::future;
use crate::io; use crate::io;
use crate::rt::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path; use crate::path::Path;
use crate::task::spawn_blocking; use crate::rt::Watcher;
/// A Unix datagram socket. /// A Unix datagram socket.
/// ///
@ -42,11 +38,11 @@ use crate::task::spawn_blocking;
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub struct UnixDatagram { pub struct UnixDatagram {
watcher: Watcher<mio_uds::UnixDatagram>, watcher: Watcher<mio::net::UnixDatagram>,
} }
impl UnixDatagram { impl UnixDatagram {
fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram { fn new(socket: mio::net::UnixDatagram) -> UnixDatagram {
UnixDatagram { UnixDatagram {
watcher: Watcher::new(socket), watcher: Watcher::new(socket),
} }
@ -67,8 +63,8 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> { pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let socket = spawn_blocking(move || mio_uds::UnixDatagram::bind(path)).await?; let mio_socket = mio::net::UnixDatagram::bind(path)?;
Ok(UnixDatagram::new(socket)) Ok(UnixDatagram::new(mio_socket))
} }
/// Creates a Unix datagram which is not bound to any address. /// Creates a Unix datagram which is not bound to any address.
@ -85,7 +81,7 @@ impl UnixDatagram {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn unbound() -> io::Result<UnixDatagram> { pub fn unbound() -> io::Result<UnixDatagram> {
let socket = mio_uds::UnixDatagram::unbound()?; let socket = mio::net::UnixDatagram::unbound()?;
Ok(UnixDatagram::new(socket)) Ok(UnixDatagram::new(socket))
} }
@ -105,7 +101,7 @@ impl UnixDatagram {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
let (a, b) = mio_uds::UnixDatagram::pair()?; let (a, b) = mio::net::UnixDatagram::pair()?;
let a = UnixDatagram::new(a); let a = UnixDatagram::new(a);
let b = UnixDatagram::new(b); let b = UnixDatagram::new(b);
Ok((a, b)) Ok((a, b))
@ -152,7 +148,7 @@ impl UnixDatagram {
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> { pub fn local_addr(&self) -> io::Result<mio::net::SocketAddr> {
self.watcher.get_ref().local_addr() self.watcher.get_ref().local_addr()
} }
@ -175,7 +171,7 @@ impl UnixDatagram {
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn peer_addr(&self) -> io::Result<SocketAddr> { pub fn peer_addr(&self) -> io::Result<mio::net::SocketAddr> {
self.watcher.get_ref().peer_addr() self.watcher.get_ref().peer_addr()
} }
@ -196,7 +192,7 @@ impl UnixDatagram {
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, mio::net::SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
self.watcher self.watcher
.poll_read_with(cx, |inner| inner.recv_from(buf)) .poll_read_with(cx, |inner| inner.recv_from(buf))
@ -315,7 +311,12 @@ impl fmt::Debug for UnixDatagram {
impl From<std::os::unix::net::UnixDatagram> for UnixDatagram { impl From<std::os::unix::net::UnixDatagram> for UnixDatagram {
/// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent. /// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent.
fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram { fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram {
let mio_datagram = mio_uds::UnixDatagram::from_datagram(datagram).unwrap(); // Make sure we are in nonblocking mode.
datagram
.set_nonblocking(true)
.expect("failed to set nonblocking mode");
let mio_datagram = mio::net::UnixDatagram::from_std(datagram);
UnixDatagram { UnixDatagram {
watcher: Watcher::new(mio_datagram), watcher: Watcher::new(mio_datagram),
} }

@ -1,20 +1,17 @@
//! Unix-specific networking extensions. //! Unix-specific networking extensions.
use std::fmt; use std::fmt;
use std::pin::Pin;
use std::future::Future; use std::future::Future;
use std::pin::Pin;
use mio_uds;
use super::SocketAddr;
use super::UnixStream; use super::UnixStream;
use crate::future; use crate::future;
use crate::io; use crate::io;
use crate::rt::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path; use crate::path::Path;
use crate::rt::Watcher;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{spawn_blocking, Context, Poll}; use crate::task::{Context, Poll};
/// A Unix domain socket server, listening for connections. /// A Unix domain socket server, listening for connections.
/// ///
@ -50,7 +47,7 @@ use crate::task::{spawn_blocking, Context, Poll};
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub struct UnixListener { pub struct UnixListener {
watcher: Watcher<mio_uds::UnixListener>, watcher: Watcher<mio::net::UnixListener>,
} }
impl UnixListener { impl UnixListener {
@ -69,10 +66,10 @@ impl UnixListener {
/// ``` /// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> { pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let listener = spawn_blocking(move || mio_uds::UnixListener::bind(path)).await?; let mio_listener = mio::net::UnixListener::bind(path)?;
Ok(UnixListener { Ok(UnixListener {
watcher: Watcher::new(listener), watcher: Watcher::new(mio_listener),
}) })
} }
@ -92,28 +89,15 @@ impl UnixListener {
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { pub async fn accept(&self) -> io::Result<(UnixStream, mio::net::SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
let res = futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { let (mio_stream, addr) =
match inner.accept_std() { futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() }))?;
// Converting to `WouldBlock` so that the watcher will
// add the waker of this task to a list of readers. let stream = UnixStream {
Ok(None) => Err(io::ErrorKind::WouldBlock.into()), watcher: Watcher::new(mio_stream),
res => res, };
} Poll::Ready(Ok((stream, addr)))
}));
match res? {
Some((io, addr)) => {
let mio_stream = mio_uds::UnixStream::from_stream(io)?;
let stream = UnixStream {
watcher: Watcher::new(mio_stream),
};
Poll::Ready(Ok((stream, addr)))
}
// This should never happen since `None` is converted to `WouldBlock`
None => unreachable!(),
}
}) })
.await .await
} }
@ -162,7 +146,7 @@ impl UnixListener {
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> { pub fn local_addr(&self) -> io::Result<mio::net::SocketAddr> {
self.watcher.get_ref().local_addr() self.watcher.get_ref().local_addr()
} }
} }
@ -209,7 +193,12 @@ impl Stream for Incoming<'_> {
impl From<std::os::unix::net::UnixListener> for UnixListener { impl From<std::os::unix::net::UnixListener> for UnixListener {
/// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent. /// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent.
fn from(listener: std::os::unix::net::UnixListener) -> UnixListener { fn from(listener: std::os::unix::net::UnixListener) -> UnixListener {
let mio_listener = mio_uds::UnixListener::from_listener(listener).unwrap(); // Make sure we are in nonblocking mode.
listener
.set_nonblocking(true)
.expect("failed to set nonblocking mode");
let mio_listener = mio::net::UnixListener::from_std(listener);
UnixListener { UnixListener {
watcher: Watcher::new(mio_listener), watcher: Watcher::new(mio_listener),
} }

@ -5,14 +5,12 @@ use std::io::{Read as _, Write as _};
use std::net::Shutdown; use std::net::Shutdown;
use std::pin::Pin; use std::pin::Pin;
use mio_uds; use crate::future;
use super::SocketAddr;
use crate::io::{self, Read, Write}; use crate::io::{self, Read, Write};
use crate::rt::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path; use crate::path::Path;
use crate::task::{spawn_blocking, Context, Poll}; use crate::rt::Watcher;
use crate::task::{Context, Poll};
/// A Unix stream socket. /// A Unix stream socket.
/// ///
@ -38,7 +36,7 @@ use crate::task::{spawn_blocking, Context, Poll};
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub struct UnixStream { pub struct UnixStream {
pub(super) watcher: Watcher<mio_uds::UnixStream>, pub(super) watcher: Watcher<mio::net::UnixStream>,
} }
impl UnixStream { impl UnixStream {
@ -58,14 +56,20 @@ impl UnixStream {
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
spawn_blocking(move || { // mio's UnixStream::connect is non-blocking and may just be in progress
let std_stream = std::os::unix::net::UnixStream::connect(path)?; // when it returns with `Ok`. We therefore wait for write readiness to
let mio_stream = mio_uds::UnixStream::from_stream(std_stream)?; // be sure the connection has either been established or there was an
Ok(UnixStream { // error which we check for afterwards.
watcher: Watcher::new(mio_stream), let mio_stream = mio::net::UnixStream::connect(path)?;
}) let watcher = Watcher::new(mio_stream);
})
.await future::poll_fn(|cx| watcher.poll_write_ready(cx)).await;
match watcher.get_ref().take_error() {
Ok(None) => Ok(UnixStream { watcher }),
Ok(Some(e)) => Err(e),
Err(e) => Err(e),
}
} }
/// Creates an unnamed pair of connected sockets. /// Creates an unnamed pair of connected sockets.
@ -84,7 +88,7 @@ impl UnixStream {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn pair() -> io::Result<(UnixStream, UnixStream)> { pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
let (a, b) = mio_uds::UnixStream::pair()?; let (a, b) = mio::net::UnixStream::pair()?;
let a = UnixStream { let a = UnixStream {
watcher: Watcher::new(a), watcher: Watcher::new(a),
}; };
@ -108,7 +112,7 @@ impl UnixStream {
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn local_addr(&self) -> io::Result<SocketAddr> { pub fn local_addr(&self) -> io::Result<mio::net::SocketAddr> {
self.watcher.get_ref().local_addr() self.watcher.get_ref().local_addr()
} }
@ -126,7 +130,7 @@ impl UnixStream {
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn peer_addr(&self) -> io::Result<SocketAddr> { pub fn peer_addr(&self) -> io::Result<mio::net::SocketAddr> {
self.watcher.get_ref().peer_addr() self.watcher.get_ref().peer_addr()
} }
@ -187,6 +191,7 @@ impl Write for UnixStream {
} }
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.shutdown(std::net::Shutdown::Write)?;
Pin::new(&mut &*self).poll_close(cx) Pin::new(&mut &*self).poll_close(cx)
} }
} }
@ -206,6 +211,7 @@ impl Write for &UnixStream {
} }
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.shutdown(std::net::Shutdown::Write)?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
} }
@ -230,7 +236,12 @@ impl fmt::Debug for UnixStream {
impl From<std::os::unix::net::UnixStream> for UnixStream { impl From<std::os::unix::net::UnixStream> for UnixStream {
/// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent. /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent.
fn from(stream: std::os::unix::net::UnixStream) -> UnixStream { fn from(stream: std::os::unix::net::UnixStream) -> UnixStream {
let mio_stream = mio_uds::UnixStream::from_stream(stream).unwrap(); // Make sure we are in nonblocking mode.
stream
.set_nonblocking(true)
.expect("failed to set nonblocking mode");
let mio_stream = mio::net::UnixStream::from_std(stream);
UnixStream { UnixStream {
watcher: Watcher::new(mio_stream), watcher: Watcher::new(mio_stream),
} }

@ -2,13 +2,16 @@ use std::fmt;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use mio::{self, Evented}; use mio::{self, event::Source, Interest};
use slab::Slab; use slab::Slab;
use crate::io; use crate::io;
use crate::rt::RUNTIME; use crate::rt::RUNTIME;
use crate::task::{Context, Poll, Waker}; use crate::task::{Context, Poll, Waker};
// TODO: ADD AIO and LIO?
const INTEREST_ALL: Interest = Interest::READABLE.add(Interest::WRITABLE);
/// Data associated with a registered I/O handle. /// Data associated with a registered I/O handle.
#[derive(Debug)] #[derive(Debug)]
struct Entry { struct Entry {
@ -25,7 +28,7 @@ struct Entry {
/// The state of a networking driver. /// The state of a networking driver.
pub struct Reactor { pub struct Reactor {
/// A mio instance that polls for new events. /// A mio instance that polls for new events.
poller: mio::Poll, poller: Mutex<mio::Poll>,
/// A list into which mio stores events. /// A list into which mio stores events.
events: Mutex<mio::Events>, events: Mutex<mio::Events>,
@ -33,8 +36,8 @@ pub struct Reactor {
/// A collection of registered I/O handles. /// A collection of registered I/O handles.
entries: Mutex<Slab<Arc<Entry>>>, entries: Mutex<Slab<Arc<Entry>>>,
/// Dummy I/O handle that is only used to wake up the polling thread. /// Mio waker that is only used to wake up the polling thread.
notify_reg: (mio::Registration, mio::SetReadiness), notify_waker: mio::Waker,
/// An identifier for the notification handle. /// An identifier for the notification handle.
notify_token: mio::Token, notify_token: mio::Token,
@ -64,25 +67,38 @@ impl Reactor {
/// Creates a new reactor for polling I/O events. /// Creates a new reactor for polling I/O events.
pub fn new() -> io::Result<Reactor> { pub fn new() -> io::Result<Reactor> {
let poller = mio::Poll::new()?; let poller = mio::Poll::new()?;
let notify_reg = mio::Registration::new2(); let mut entries = Slab::new();
let mut reactor = Reactor { // Register a waker for waking up the polling thread.
poller, let vacant = entries.vacant_entry();
let notify_token = mio::Token(vacant.key());
let notify_waker = mio::Waker::new(poller.registry(), notify_token)?;
// dumy entry to avoid reusing the same token
vacant.insert(Arc::new(Entry {
token: notify_token.clone(),
readers: Mutex::new(Readers {
ready: false,
wakers: Vec::new(),
}),
writers: Mutex::new(Writers {
ready: false,
wakers: Vec::new(),
}),
}));
let reactor = Reactor {
poller: Mutex::new(poller),
events: Mutex::new(mio::Events::with_capacity(1000)), events: Mutex::new(mio::Events::with_capacity(1000)),
entries: Mutex::new(Slab::new()), entries: Mutex::new(entries),
notify_reg, notify_waker,
notify_token: mio::Token(0), notify_token,
}; };
// Register a dummy I/O handle for waking up the polling thread.
let entry = reactor.register(&reactor.notify_reg.0)?;
reactor.notify_token = entry.token;
Ok(reactor) Ok(reactor)
} }
/// Registers an I/O event source and returns its associated entry. /// Registers an I/O event source and returns its associated entry.
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> { fn register(&self, source: &mut dyn Source) -> io::Result<Arc<Entry>> {
let mut entries = self.entries.lock().unwrap(); let mut entries = self.entries.lock().unwrap();
// Reserve a vacant spot in the slab and use its key as the token value. // Reserve a vacant spot in the slab and use its key as the token value.
@ -104,17 +120,19 @@ impl Reactor {
vacant.insert(entry.clone()); vacant.insert(entry.clone());
// Register the I/O event source in the poller. // Register the I/O event source in the poller.
let interest = mio::Ready::all(); self.poller
let opts = mio::PollOpt::edge(); .lock()
self.poller.register(source, token, interest, opts)?; .unwrap()
.registry()
.register(source, token, INTEREST_ALL)?;
Ok(entry) Ok(entry)
} }
/// Deregisters an I/O event source associated with an entry. /// Deregisters an I/O event source associated with an entry.
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> { fn deregister(&self, source: &mut dyn Source, entry: &Entry) -> io::Result<()> {
// Deregister the I/O object from the mio instance. // Deregister the I/O object from the mio instance.
self.poller.deregister(source)?; self.poller.lock().unwrap().registry().deregister(source)?;
// Remove the entry associated with the I/O object. // Remove the entry associated with the I/O object.
self.entries.lock().unwrap().remove(entry.token.0); self.entries.lock().unwrap().remove(entry.token.0);
@ -124,7 +142,7 @@ impl Reactor {
/// Notifies the reactor so that polling stops blocking. /// Notifies the reactor so that polling stops blocking.
pub fn notify(&self) -> io::Result<()> { pub fn notify(&self) -> io::Result<()> {
self.notify_reg.1.set_readiness(mio::Ready::readable()) self.notify_waker.wake()
} }
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles. /// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
@ -134,7 +152,7 @@ impl Reactor {
let mut events = self.events.lock().unwrap(); let mut events = self.events.lock().unwrap();
// Block on the poller until at least one new event comes in. // Block on the poller until at least one new event comes in.
self.poller.poll(&mut events, timeout)?; self.poller.lock().unwrap().poll(&mut events, timeout)?;
// Lock the entire entry table while we're processing new events. // Lock the entire entry table while we're processing new events.
let entries = self.entries.lock().unwrap(); let entries = self.entries.lock().unwrap();
@ -147,16 +165,12 @@ impl Reactor {
if token == self.notify_token { if token == self.notify_token {
// If this is the notification token, we just need the notification state. // If this is the notification token, we just need the notification state.
self.notify_reg.1.set_readiness(mio::Ready::empty())?; self.notify_waker.wake()?;
} else { } else {
// Otherwise, look for the entry associated with this token. // Otherwise, look for the entry associated with this token.
if let Some(entry) = entries.get(token.0) { if let Some(entry) = entries.get(token.0) {
// Set the readiness flags from this I/O event.
let readiness = event.readiness();
// Wake up reader tasks blocked on this I/O handle. // Wake up reader tasks blocked on this I/O handle.
let reader_interests = mio::Ready::all() - mio::Ready::writable(); if event.is_readable() {
if !(readiness & reader_interests).is_empty() {
let mut readers = entry.readers.lock().unwrap(); let mut readers = entry.readers.lock().unwrap();
readers.ready = true; readers.ready = true;
for w in readers.wakers.drain(..) { for w in readers.wakers.drain(..) {
@ -166,8 +180,7 @@ impl Reactor {
} }
// Wake up writer tasks blocked on this I/O handle. // Wake up writer tasks blocked on this I/O handle.
let writer_interests = mio::Ready::all() - mio::Ready::readable(); if event.is_writable() {
if !(readiness & writer_interests).is_empty() {
let mut writers = entry.writers.lock().unwrap(); let mut writers = entry.writers.lock().unwrap();
writers.ready = true; writers.ready = true;
for w in writers.wakers.drain(..) { for w in writers.wakers.drain(..) {
@ -187,7 +200,7 @@ impl Reactor {
/// ///
/// This handle wraps an I/O event source and exposes a "futurized" interface on top of it, /// This handle wraps an I/O event source and exposes a "futurized" interface on top of it,
/// implementing traits `AsyncRead` and `AsyncWrite`. /// implementing traits `AsyncRead` and `AsyncWrite`.
pub struct Watcher<T: Evented> { pub struct Watcher<T: Source> {
/// Data associated with the I/O handle. /// Data associated with the I/O handle.
entry: Arc<Entry>, entry: Arc<Entry>,
@ -195,16 +208,16 @@ pub struct Watcher<T: Evented> {
source: Option<T>, source: Option<T>,
} }
impl<T: Evented> Watcher<T> { impl<T: Source> Watcher<T> {
/// Creates a new I/O handle. /// Creates a new I/O handle.
/// ///
/// The provided I/O event source will be kept registered inside the reactor's poller for the /// The provided I/O event source will be kept registered inside the reactor's poller for the
/// lifetime of the returned I/O handle. /// lifetime of the returned I/O handle.
pub fn new(source: T) -> Watcher<T> { pub fn new(mut source: T) -> Watcher<T> {
Watcher { Watcher {
entry: RUNTIME entry: RUNTIME
.reactor() .reactor()
.register(&source) .register(&mut source)
.expect("cannot register an I/O event source"), .expect("cannot register an I/O event source"),
source: Some(source), source: Some(source),
} }
@ -324,18 +337,18 @@ impl<T: Evented> Watcher<T> {
/// This method is typically used to convert `Watcher`s to raw file descriptors/handles. /// This method is typically used to convert `Watcher`s to raw file descriptors/handles.
#[allow(dead_code)] #[allow(dead_code)]
pub fn into_inner(mut self) -> T { pub fn into_inner(mut self) -> T {
let source = self.source.take().unwrap(); let mut source = self.source.take().unwrap();
RUNTIME RUNTIME
.reactor() .reactor()
.deregister(&source, &self.entry) .deregister(&mut source, &self.entry)
.expect("cannot deregister I/O event source"); .expect("cannot deregister I/O event source");
source source
} }
} }
impl<T: Evented> Drop for Watcher<T> { impl<T: Source> Drop for Watcher<T> {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(ref source) = self.source { if let Some(ref mut source) = self.source {
RUNTIME RUNTIME
.reactor() .reactor()
.deregister(source, &self.entry) .deregister(source, &self.entry)
@ -344,7 +357,7 @@ impl<T: Evented> Drop for Watcher<T> {
} }
} }
impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> { impl<T: Source + fmt::Debug> fmt::Debug for Watcher<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Watcher") f.debug_struct("Watcher")
.field("entry", &self.entry) .field("entry", &self.entry)

Loading…
Cancel
Save