From 80870af3bb2e72d81eecc245e7a350fe5370febb Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 8 Apr 2020 12:55:24 +0200 Subject: [PATCH] mechanical first pass at updating to mio 0.7 --- Cargo.toml | 7 ++-- src/net/tcp/listener.rs | 12 +++--- src/net/tcp/stream.rs | 6 +-- src/net/udp/mod.rs | 10 ++--- src/os/unix/net/datagram.rs | 22 +++++------ src/os/unix/net/listener.rs | 44 ++++++++-------------- src/os/unix/net/stream.rs | 39 ++++++++++--------- src/rt/reactor.rs | 74 ++++++++++++++++++------------------- 8 files changed, 99 insertions(+), 115 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d49eb95..d5bff22 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,9 @@ default = [ "kv-log-macro", "log", "mio", - "mio-uds", + "mio/tcp", + "mio/udp", + "mio/uds", "num_cpus", "pin-project-lite", ] @@ -67,8 +69,7 @@ futures-timer = { version = "3.0.2", optional = true } kv-log-macro = { version = "1.0.4", optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true } memchr = { version = "2.3.3", optional = true } -mio = { version = "0.6.19", optional = true } -mio-uds = { version = "0.6.7", optional = true } +mio = { version = "0.7.0", optional = true } num_cpus = { version = "1.12.0", optional = true } once_cell = { version = "1.3.1", optional = true } pin-project-lite = { version = "0.1.4", optional = true } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 9e15d40..dc0b917 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use crate::future; use crate::io; -use crate::rt::Watcher; use crate::net::{TcpStream, ToSocketAddrs}; +use crate::rt::Watcher; use crate::stream::Stream; use crate::task::{Context, Poll}; @@ -79,7 +79,7 @@ impl TcpListener { let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match mio::net::TcpListener::bind(&addr) { + match mio::net::TcpListener::bind(addr) { Ok(mio_listener) => { return Ok(TcpListener { watcher: Watcher::new(mio_listener), @@ -114,11 +114,9 @@ impl TcpListener { /// # Ok(()) }) } /// ``` pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { - let (io, addr) = - future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept_std())) - .await?; + let (mio_stream, addr) = + future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept())).await?; - let mio_stream = mio::net::TcpStream::from_stream(io)?; let stream = TcpStream { watcher: Arc::new(Watcher::new(mio_stream)), }; @@ -206,7 +204,7 @@ impl<'a> Stream for Incoming<'a> { impl From for TcpListener { /// Converts a `std::net::TcpListener` into its asynchronous equivalent. fn from(listener: std::net::TcpListener) -> TcpListener { - let mio_listener = mio::net::TcpListener::from_std(listener).unwrap(); + let mio_listener = mio::net::TcpListener::from_std(listener); TcpListener { watcher: Watcher::new(mio_listener), } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 1f50e8f..e1bb067 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use crate::future; use crate::io::{self, Read, Write}; -use crate::rt::Watcher; use crate::net::ToSocketAddrs; +use crate::rt::Watcher; use crate::task::{Context, Poll}; /// A TCP stream between a local and a remote socket. @@ -79,7 +79,7 @@ impl TcpStream { // when it returns with `Ok`. We therefore wait for write readiness to // be sure the connection has either been established or there was an // error which we check for afterwards. - let watcher = match mio::net::TcpStream::connect(&addr) { + let watcher = match mio::net::TcpStream::connect(addr) { Ok(s) => Watcher::new(s), Err(e) => { last_err = Some(e); @@ -370,7 +370,7 @@ impl Write for &TcpStream { impl From for TcpStream { /// Converts a `std::net::TcpStream` into its asynchronous equivalent. fn from(stream: std::net::TcpStream) -> TcpStream { - let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap(); + let mio_stream = mio::net::TcpStream::from_std(stream); TcpStream { watcher: Arc::new(Watcher::new(mio_stream)), } diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 774478d..b1a7025 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -69,12 +69,10 @@ impl UdpSocket { /// ``` pub async fn bind(addrs: A) -> io::Result { let mut last_err = None; - let addrs = addrs - .to_socket_addrs() - .await?; + let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match mio::net::UdpSocket::bind(&addr) { + match mio::net::UdpSocket::bind(addr) { Ok(mio_socket) => { return Ok(UdpSocket { watcher: Watcher::new(mio_socket), @@ -155,7 +153,7 @@ impl UdpSocket { future::poll_fn(|cx| { self.watcher - .poll_write_with(cx, |inner| inner.send_to(buf, &addr)) + .poll_write_with(cx, |inner| inner.send_to(buf, addr)) }) .await .context(|| format!("could not send packet to {}", addr)) @@ -498,7 +496,7 @@ impl UdpSocket { impl From for UdpSocket { /// Converts a `std::net::UdpSocket` into its asynchronous equivalent. fn from(socket: std::net::UdpSocket) -> UdpSocket { - let mio_socket = mio::net::UdpSocket::from_socket(socket).unwrap(); + let mio_socket = mio::net::UdpSocket::from_std(socket); UdpSocket { watcher: Watcher::new(mio_socket), } diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index 5a2d6ec..20b1046 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -3,14 +3,12 @@ use std::fmt; use std::net::Shutdown; -use mio_uds; - use super::SocketAddr; use crate::future; use crate::io; -use crate::rt::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; +use crate::rt::Watcher; use crate::task::spawn_blocking; /// A Unix datagram socket. @@ -42,11 +40,11 @@ use crate::task::spawn_blocking; /// # Ok(()) }) } /// ``` pub struct UnixDatagram { - watcher: Watcher, + watcher: Watcher, } impl UnixDatagram { - fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram { + fn new(socket: mio::net::UnixDatagram) -> UnixDatagram { UnixDatagram { watcher: Watcher::new(socket), } @@ -67,7 +65,7 @@ impl UnixDatagram { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let socket = spawn_blocking(move || mio_uds::UnixDatagram::bind(path)).await?; + let socket = spawn_blocking(move || mio::net::UnixDatagram::bind(path)).await?; Ok(UnixDatagram::new(socket)) } @@ -85,7 +83,7 @@ impl UnixDatagram { /// # Ok(()) }) } /// ``` pub fn unbound() -> io::Result { - let socket = mio_uds::UnixDatagram::unbound()?; + let socket = mio::net::UnixDatagram::unbound()?; Ok(UnixDatagram::new(socket)) } @@ -105,7 +103,7 @@ impl UnixDatagram { /// # Ok(()) }) } /// ``` pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { - let (a, b) = mio_uds::UnixDatagram::pair()?; + let (a, b) = mio::net::UnixDatagram::pair()?; let a = UnixDatagram::new(a); let b = UnixDatagram::new(b); Ok((a, b)) @@ -152,7 +150,7 @@ impl UnixDatagram { /// # /// # Ok(()) }) } /// ``` - pub fn local_addr(&self) -> io::Result { + pub fn local_addr(&self) -> io::Result { self.watcher.get_ref().local_addr() } @@ -175,7 +173,7 @@ impl UnixDatagram { /// # /// # Ok(()) }) } /// ``` - pub fn peer_addr(&self) -> io::Result { + pub fn peer_addr(&self) -> io::Result { self.watcher.get_ref().peer_addr() } @@ -196,7 +194,7 @@ impl UnixDatagram { /// # /// # Ok(()) }) } /// ``` - pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, mio::net::SocketAddr)> { future::poll_fn(|cx| { self.watcher .poll_read_with(cx, |inner| inner.recv_from(buf)) @@ -315,7 +313,7 @@ impl fmt::Debug for UnixDatagram { impl From for UnixDatagram { /// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent. fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram { - let mio_datagram = mio_uds::UnixDatagram::from_datagram(datagram).unwrap(); + let mio_datagram = mio::net::UnixDatagram::from_std(datagram); UnixDatagram { watcher: Watcher::new(mio_datagram), } diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index 9f6bdcb..ecced86 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -1,18 +1,16 @@ //! Unix-specific networking extensions. use std::fmt; -use std::pin::Pin; use std::future::Future; - -use mio_uds; +use std::pin::Pin; use super::SocketAddr; use super::UnixStream; use crate::future; use crate::io; -use crate::rt::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; +use crate::rt::Watcher; use crate::stream::Stream; use crate::task::{spawn_blocking, Context, Poll}; @@ -50,7 +48,7 @@ use crate::task::{spawn_blocking, Context, Poll}; /// # Ok(()) }) } /// ``` pub struct UnixListener { - watcher: Watcher, + watcher: Watcher, } impl UnixListener { @@ -69,7 +67,7 @@ impl UnixListener { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let listener = spawn_blocking(move || mio_uds::UnixListener::bind(path)).await?; + let listener = spawn_blocking(move || mio::net::UnixListener::bind(path)).await?; Ok(UnixListener { watcher: Watcher::new(listener), @@ -92,28 +90,16 @@ impl UnixListener { /// # /// # Ok(()) }) } /// ``` - pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { + pub async fn accept(&self) -> io::Result<(UnixStream, mio::net::SocketAddr)> { future::poll_fn(|cx| { - let res = futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { - match inner.accept_std() { - // Converting to `WouldBlock` so that the watcher will - // add the waker of this task to a list of readers. - Ok(None) => Err(io::ErrorKind::WouldBlock.into()), - res => res, - } - })); - - match res? { - Some((io, addr)) => { - let mio_stream = mio_uds::UnixStream::from_stream(io)?; - let stream = UnixStream { - watcher: Watcher::new(mio_stream), - }; - Poll::Ready(Ok((stream, addr))) - } - // This should never happen since `None` is converted to `WouldBlock` - None => unreachable!(), - } + let res = + futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() })); + + let (mio_stream, addr) = res?; + let stream = UnixStream { + watcher: Watcher::new(mio_stream), + }; + Poll::Ready(Ok((stream, addr))) }) .await } @@ -162,7 +148,7 @@ impl UnixListener { /// # /// # Ok(()) }) } /// ``` - pub fn local_addr(&self) -> io::Result { + pub fn local_addr(&self) -> io::Result { self.watcher.get_ref().local_addr() } } @@ -209,7 +195,7 @@ impl Stream for Incoming<'_> { impl From for UnixListener { /// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent. fn from(listener: std::os::unix::net::UnixListener) -> UnixListener { - let mio_listener = mio_uds::UnixListener::from_listener(listener).unwrap(); + let mio_listener = mio::net::UnixListener::from_std(listener); UnixListener { watcher: Watcher::new(mio_listener), } diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index a1c83f1..169ad22 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -5,14 +5,13 @@ use std::io::{Read as _, Write as _}; use std::net::Shutdown; use std::pin::Pin; -use mio_uds; - use super::SocketAddr; +use crate::future; use crate::io::{self, Read, Write}; -use crate::rt::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; -use crate::task::{spawn_blocking, Context, Poll}; +use crate::rt::Watcher; +use crate::task::{Context, Poll}; /// A Unix stream socket. /// @@ -38,7 +37,7 @@ use crate::task::{spawn_blocking, Context, Poll}; /// # Ok(()) }) } /// ``` pub struct UnixStream { - pub(super) watcher: Watcher, + pub(super) watcher: Watcher, } impl UnixStream { @@ -58,14 +57,20 @@ impl UnixStream { pub async fn connect>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || { - let std_stream = std::os::unix::net::UnixStream::connect(path)?; - let mio_stream = mio_uds::UnixStream::from_stream(std_stream)?; - Ok(UnixStream { - watcher: Watcher::new(mio_stream), - }) - }) - .await + // mio's UnixStream::connect is non-blocking and may just be in progress + // when it returns with `Ok`. We therefore wait for write readiness to + // be sure the connection has either been established or there was an + // error which we check for afterwards. + let mio_stream = mio::net::UnixStream::connect(path)?; + let watcher = Watcher::new(mio_stream); + + future::poll_fn(|cx| watcher.poll_write_ready(cx)).await; + + match watcher.get_ref().take_error() { + Ok(None) => Ok(UnixStream { watcher }), + Ok(Some(e)) => Err(e), + Err(e) => Err(e), + } } /// Creates an unnamed pair of connected sockets. @@ -84,7 +89,7 @@ impl UnixStream { /// # Ok(()) }) } /// ``` pub fn pair() -> io::Result<(UnixStream, UnixStream)> { - let (a, b) = mio_uds::UnixStream::pair()?; + let (a, b) = mio::net::UnixStream::pair()?; let a = UnixStream { watcher: Watcher::new(a), }; @@ -108,7 +113,7 @@ impl UnixStream { /// # /// # Ok(()) }) } /// ``` - pub fn local_addr(&self) -> io::Result { + pub fn local_addr(&self) -> io::Result { self.watcher.get_ref().local_addr() } @@ -126,7 +131,7 @@ impl UnixStream { /// # /// # Ok(()) }) } /// ``` - pub fn peer_addr(&self) -> io::Result { + pub fn peer_addr(&self) -> io::Result { self.watcher.get_ref().peer_addr() } @@ -230,7 +235,7 @@ impl fmt::Debug for UnixStream { impl From for UnixStream { /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent. fn from(stream: std::os::unix::net::UnixStream) -> UnixStream { - let mio_stream = mio_uds::UnixStream::from_stream(stream).unwrap(); + let mio_stream = mio::net::UnixStream::from_std(stream); UnixStream { watcher: Watcher::new(mio_stream), } diff --git a/src/rt/reactor.rs b/src/rt/reactor.rs index 2a35b72..e03749f 100644 --- a/src/rt/reactor.rs +++ b/src/rt/reactor.rs @@ -2,7 +2,7 @@ use std::fmt; use std::sync::{Arc, Mutex}; use std::time::Duration; -use mio::{self, Evented}; +use mio::{self, event::Source, Interest}; use slab::Slab; use crate::io; @@ -64,25 +64,26 @@ impl Reactor { /// Creates a new reactor for polling I/O events. pub fn new() -> io::Result { let poller = mio::Poll::new()?; - let notify_reg = mio::Registration::new2(); - - let mut reactor = Reactor { - poller, - events: Mutex::new(mio::Events::with_capacity(1000)), - entries: Mutex::new(Slab::new()), - notify_reg, - notify_token: mio::Token(0), - }; - - // Register a dummy I/O handle for waking up the polling thread. - let entry = reactor.register(&reactor.notify_reg.0)?; - reactor.notify_token = entry.token; - - Ok(reactor) + todo!(); + // let notify_reg = mio::Registration::new2(); + + // let mut reactor = Reactor { + // poller, + // events: Mutex::new(mio::Events::with_capacity(1000)), + // entries: Mutex::new(Slab::new()), + // notify_reg, + // notify_token: mio::Token(0), + // }; + + // // Register a dummy I/O handle for waking up the polling thread. + // let entry = reactor.register(&reactor.notify_reg.0)?; + // reactor.notify_token = entry.token; + + // Ok(reactor) } /// Registers an I/O event source and returns its associated entry. - fn register(&self, source: &dyn Evented) -> io::Result> { + fn register(&self, source: &mut dyn Source) -> io::Result> { let mut entries = self.entries.lock().unwrap(); // Reserve a vacant spot in the slab and use its key as the token value. @@ -104,17 +105,17 @@ impl Reactor { vacant.insert(entry.clone()); // Register the I/O event source in the poller. - let interest = mio::Ready::all(); - let opts = mio::PollOpt::edge(); - self.poller.register(source, token, interest, opts)?; + // TODO: ADD AIO and LIO? + const interest: Interest = Interest::READABLE.add(Interest::WRITABLE); + self.poller.registry().register(source, token, interest)?; Ok(entry) } /// Deregisters an I/O event source associated with an entry. - fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> { + fn deregister(&self, source: &mut dyn Source, entry: &Entry) -> io::Result<()> { // Deregister the I/O object from the mio instance. - self.poller.deregister(source)?; + self.poller.registry().deregister(source)?; // Remove the entry associated with the I/O object. self.entries.lock().unwrap().remove(entry.token.0); @@ -124,7 +125,8 @@ impl Reactor { /// Notifies the reactor so that polling stops blocking. pub fn notify(&self) -> io::Result<()> { - self.notify_reg.1.set_readiness(mio::Ready::readable()) + todo!() + // self.notify_reg.1.set_readiness(mio::Ready::readable()) } /// Waits on the poller for new events and wakes up tasks blocked on I/O handles. @@ -147,16 +149,13 @@ impl Reactor { if token == self.notify_token { // If this is the notification token, we just need the notification state. - self.notify_reg.1.set_readiness(mio::Ready::empty())?; + todo!() + // self.notify_reg.1.set_readiness(mio::Ready::empty())?; } else { // Otherwise, look for the entry associated with this token. if let Some(entry) = entries.get(token.0) { - // Set the readiness flags from this I/O event. - let readiness = event.readiness(); - // Wake up reader tasks blocked on this I/O handle. - let reader_interests = mio::Ready::all() - mio::Ready::writable(); - if !(readiness & reader_interests).is_empty() { + if event.is_readable() { let mut readers = entry.readers.lock().unwrap(); readers.ready = true; for w in readers.wakers.drain(..) { @@ -166,8 +165,7 @@ impl Reactor { } // Wake up writer tasks blocked on this I/O handle. - let writer_interests = mio::Ready::all() - mio::Ready::readable(); - if !(readiness & writer_interests).is_empty() { + if event.is_writable() { let mut writers = entry.writers.lock().unwrap(); writers.ready = true; for w in writers.wakers.drain(..) { @@ -187,7 +185,7 @@ impl Reactor { /// /// This handle wraps an I/O event source and exposes a "futurized" interface on top of it, /// implementing traits `AsyncRead` and `AsyncWrite`. -pub struct Watcher { +pub struct Watcher { /// Data associated with the I/O handle. entry: Arc, @@ -195,7 +193,7 @@ pub struct Watcher { source: Option, } -impl Watcher { +impl Watcher { /// Creates a new I/O handle. /// /// The provided I/O event source will be kept registered inside the reactor's poller for the @@ -204,7 +202,7 @@ impl Watcher { Watcher { entry: RUNTIME .reactor() - .register(&source) + .register(&mut source) .expect("cannot register an I/O event source"), source: Some(source), } @@ -327,15 +325,15 @@ impl Watcher { let source = self.source.take().unwrap(); RUNTIME .reactor() - .deregister(&source, &self.entry) + .deregister(&mut source, &self.entry) .expect("cannot deregister I/O event source"); source } } -impl Drop for Watcher { +impl Drop for Watcher { fn drop(&mut self) { - if let Some(ref source) = self.source { + if let Some(ref mut source) = self.source { RUNTIME .reactor() .deregister(source, &self.entry) @@ -344,7 +342,7 @@ impl Drop for Watcher { } } -impl fmt::Debug for Watcher { +impl fmt::Debug for Watcher { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Watcher") .field("entry", &self.entry)