mirror of
https://github.com/async-rs/async-std.git
synced 2025-04-09 01:46:42 +00:00
mechanical first pass at updating to mio 0.7
This commit is contained in:
parent
fc4e472599
commit
80870af3bb
8 changed files with 95 additions and 111 deletions
|
@ -31,7 +31,9 @@ default = [
|
||||||
"kv-log-macro",
|
"kv-log-macro",
|
||||||
"log",
|
"log",
|
||||||
"mio",
|
"mio",
|
||||||
"mio-uds",
|
"mio/tcp",
|
||||||
|
"mio/udp",
|
||||||
|
"mio/uds",
|
||||||
"num_cpus",
|
"num_cpus",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
]
|
]
|
||||||
|
@ -67,8 +69,7 @@ futures-timer = { version = "3.0.2", optional = true }
|
||||||
kv-log-macro = { version = "1.0.4", optional = true }
|
kv-log-macro = { version = "1.0.4", optional = true }
|
||||||
log = { version = "0.4.8", features = ["kv_unstable"], optional = true }
|
log = { version = "0.4.8", features = ["kv_unstable"], optional = true }
|
||||||
memchr = { version = "2.3.3", optional = true }
|
memchr = { version = "2.3.3", optional = true }
|
||||||
mio = { version = "0.6.19", optional = true }
|
mio = { version = "0.7.0", optional = true }
|
||||||
mio-uds = { version = "0.6.7", optional = true }
|
|
||||||
num_cpus = { version = "1.12.0", optional = true }
|
num_cpus = { version = "1.12.0", optional = true }
|
||||||
once_cell = { version = "1.3.1", optional = true }
|
once_cell = { version = "1.3.1", optional = true }
|
||||||
pin-project-lite = { version = "0.1.4", optional = true }
|
pin-project-lite = { version = "0.1.4", optional = true }
|
||||||
|
|
|
@ -5,8 +5,8 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use crate::future;
|
use crate::future;
|
||||||
use crate::io;
|
use crate::io;
|
||||||
use crate::rt::Watcher;
|
|
||||||
use crate::net::{TcpStream, ToSocketAddrs};
|
use crate::net::{TcpStream, ToSocketAddrs};
|
||||||
|
use crate::rt::Watcher;
|
||||||
use crate::stream::Stream;
|
use crate::stream::Stream;
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ impl TcpListener {
|
||||||
let addrs = addrs.to_socket_addrs().await?;
|
let addrs = addrs.to_socket_addrs().await?;
|
||||||
|
|
||||||
for addr in addrs {
|
for addr in addrs {
|
||||||
match mio::net::TcpListener::bind(&addr) {
|
match mio::net::TcpListener::bind(addr) {
|
||||||
Ok(mio_listener) => {
|
Ok(mio_listener) => {
|
||||||
return Ok(TcpListener {
|
return Ok(TcpListener {
|
||||||
watcher: Watcher::new(mio_listener),
|
watcher: Watcher::new(mio_listener),
|
||||||
|
@ -114,11 +114,9 @@ impl TcpListener {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
|
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
|
||||||
let (io, addr) =
|
let (mio_stream, addr) =
|
||||||
future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept_std()))
|
future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.accept())).await?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
let mio_stream = mio::net::TcpStream::from_stream(io)?;
|
|
||||||
let stream = TcpStream {
|
let stream = TcpStream {
|
||||||
watcher: Arc::new(Watcher::new(mio_stream)),
|
watcher: Arc::new(Watcher::new(mio_stream)),
|
||||||
};
|
};
|
||||||
|
@ -206,7 +204,7 @@ impl<'a> Stream for Incoming<'a> {
|
||||||
impl From<std::net::TcpListener> for TcpListener {
|
impl From<std::net::TcpListener> for TcpListener {
|
||||||
/// Converts a `std::net::TcpListener` into its asynchronous equivalent.
|
/// Converts a `std::net::TcpListener` into its asynchronous equivalent.
|
||||||
fn from(listener: std::net::TcpListener) -> TcpListener {
|
fn from(listener: std::net::TcpListener) -> TcpListener {
|
||||||
let mio_listener = mio::net::TcpListener::from_std(listener).unwrap();
|
let mio_listener = mio::net::TcpListener::from_std(listener);
|
||||||
TcpListener {
|
TcpListener {
|
||||||
watcher: Watcher::new(mio_listener),
|
watcher: Watcher::new(mio_listener),
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,8 +5,8 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use crate::future;
|
use crate::future;
|
||||||
use crate::io::{self, Read, Write};
|
use crate::io::{self, Read, Write};
|
||||||
use crate::rt::Watcher;
|
|
||||||
use crate::net::ToSocketAddrs;
|
use crate::net::ToSocketAddrs;
|
||||||
|
use crate::rt::Watcher;
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
/// A TCP stream between a local and a remote socket.
|
/// A TCP stream between a local and a remote socket.
|
||||||
|
@ -79,7 +79,7 @@ impl TcpStream {
|
||||||
// when it returns with `Ok`. We therefore wait for write readiness to
|
// when it returns with `Ok`. We therefore wait for write readiness to
|
||||||
// be sure the connection has either been established or there was an
|
// be sure the connection has either been established or there was an
|
||||||
// error which we check for afterwards.
|
// error which we check for afterwards.
|
||||||
let watcher = match mio::net::TcpStream::connect(&addr) {
|
let watcher = match mio::net::TcpStream::connect(addr) {
|
||||||
Ok(s) => Watcher::new(s),
|
Ok(s) => Watcher::new(s),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
last_err = Some(e);
|
last_err = Some(e);
|
||||||
|
@ -370,7 +370,7 @@ impl Write for &TcpStream {
|
||||||
impl From<std::net::TcpStream> for TcpStream {
|
impl From<std::net::TcpStream> for TcpStream {
|
||||||
/// Converts a `std::net::TcpStream` into its asynchronous equivalent.
|
/// Converts a `std::net::TcpStream` into its asynchronous equivalent.
|
||||||
fn from(stream: std::net::TcpStream) -> TcpStream {
|
fn from(stream: std::net::TcpStream) -> TcpStream {
|
||||||
let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap();
|
let mio_stream = mio::net::TcpStream::from_std(stream);
|
||||||
TcpStream {
|
TcpStream {
|
||||||
watcher: Arc::new(Watcher::new(mio_stream)),
|
watcher: Arc::new(Watcher::new(mio_stream)),
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,12 +69,10 @@ impl UdpSocket {
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<UdpSocket> {
|
pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<UdpSocket> {
|
||||||
let mut last_err = None;
|
let mut last_err = None;
|
||||||
let addrs = addrs
|
let addrs = addrs.to_socket_addrs().await?;
|
||||||
.to_socket_addrs()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
for addr in addrs {
|
for addr in addrs {
|
||||||
match mio::net::UdpSocket::bind(&addr) {
|
match mio::net::UdpSocket::bind(addr) {
|
||||||
Ok(mio_socket) => {
|
Ok(mio_socket) => {
|
||||||
return Ok(UdpSocket {
|
return Ok(UdpSocket {
|
||||||
watcher: Watcher::new(mio_socket),
|
watcher: Watcher::new(mio_socket),
|
||||||
|
@ -155,7 +153,7 @@ impl UdpSocket {
|
||||||
|
|
||||||
future::poll_fn(|cx| {
|
future::poll_fn(|cx| {
|
||||||
self.watcher
|
self.watcher
|
||||||
.poll_write_with(cx, |inner| inner.send_to(buf, &addr))
|
.poll_write_with(cx, |inner| inner.send_to(buf, addr))
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.context(|| format!("could not send packet to {}", addr))
|
.context(|| format!("could not send packet to {}", addr))
|
||||||
|
@ -498,7 +496,7 @@ impl UdpSocket {
|
||||||
impl From<std::net::UdpSocket> for UdpSocket {
|
impl From<std::net::UdpSocket> for UdpSocket {
|
||||||
/// Converts a `std::net::UdpSocket` into its asynchronous equivalent.
|
/// Converts a `std::net::UdpSocket` into its asynchronous equivalent.
|
||||||
fn from(socket: std::net::UdpSocket) -> UdpSocket {
|
fn from(socket: std::net::UdpSocket) -> UdpSocket {
|
||||||
let mio_socket = mio::net::UdpSocket::from_socket(socket).unwrap();
|
let mio_socket = mio::net::UdpSocket::from_std(socket);
|
||||||
UdpSocket {
|
UdpSocket {
|
||||||
watcher: Watcher::new(mio_socket),
|
watcher: Watcher::new(mio_socket),
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,14 +3,12 @@
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::net::Shutdown;
|
use std::net::Shutdown;
|
||||||
|
|
||||||
use mio_uds;
|
|
||||||
|
|
||||||
use super::SocketAddr;
|
use super::SocketAddr;
|
||||||
use crate::future;
|
use crate::future;
|
||||||
use crate::io;
|
use crate::io;
|
||||||
use crate::rt::Watcher;
|
|
||||||
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||||
use crate::path::Path;
|
use crate::path::Path;
|
||||||
|
use crate::rt::Watcher;
|
||||||
use crate::task::spawn_blocking;
|
use crate::task::spawn_blocking;
|
||||||
|
|
||||||
/// A Unix datagram socket.
|
/// A Unix datagram socket.
|
||||||
|
@ -42,11 +40,11 @@ use crate::task::spawn_blocking;
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub struct UnixDatagram {
|
pub struct UnixDatagram {
|
||||||
watcher: Watcher<mio_uds::UnixDatagram>,
|
watcher: Watcher<mio::net::UnixDatagram>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UnixDatagram {
|
impl UnixDatagram {
|
||||||
fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram {
|
fn new(socket: mio::net::UnixDatagram) -> UnixDatagram {
|
||||||
UnixDatagram {
|
UnixDatagram {
|
||||||
watcher: Watcher::new(socket),
|
watcher: Watcher::new(socket),
|
||||||
}
|
}
|
||||||
|
@ -67,7 +65,7 @@ impl UnixDatagram {
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
|
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
|
||||||
let path = path.as_ref().to_owned();
|
let path = path.as_ref().to_owned();
|
||||||
let socket = spawn_blocking(move || mio_uds::UnixDatagram::bind(path)).await?;
|
let socket = spawn_blocking(move || mio::net::UnixDatagram::bind(path)).await?;
|
||||||
Ok(UnixDatagram::new(socket))
|
Ok(UnixDatagram::new(socket))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,7 +83,7 @@ impl UnixDatagram {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn unbound() -> io::Result<UnixDatagram> {
|
pub fn unbound() -> io::Result<UnixDatagram> {
|
||||||
let socket = mio_uds::UnixDatagram::unbound()?;
|
let socket = mio::net::UnixDatagram::unbound()?;
|
||||||
Ok(UnixDatagram::new(socket))
|
Ok(UnixDatagram::new(socket))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +103,7 @@ impl UnixDatagram {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
|
pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> {
|
||||||
let (a, b) = mio_uds::UnixDatagram::pair()?;
|
let (a, b) = mio::net::UnixDatagram::pair()?;
|
||||||
let a = UnixDatagram::new(a);
|
let a = UnixDatagram::new(a);
|
||||||
let b = UnixDatagram::new(b);
|
let b = UnixDatagram::new(b);
|
||||||
Ok((a, b))
|
Ok((a, b))
|
||||||
|
@ -152,7 +150,7 @@ impl UnixDatagram {
|
||||||
/// #
|
/// #
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn local_addr(&self) -> io::Result<SocketAddr> {
|
pub fn local_addr(&self) -> io::Result<mio::net::SocketAddr> {
|
||||||
self.watcher.get_ref().local_addr()
|
self.watcher.get_ref().local_addr()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -175,7 +173,7 @@ impl UnixDatagram {
|
||||||
/// #
|
/// #
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
|
pub fn peer_addr(&self) -> io::Result<mio::net::SocketAddr> {
|
||||||
self.watcher.get_ref().peer_addr()
|
self.watcher.get_ref().peer_addr()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -196,7 +194,7 @@ impl UnixDatagram {
|
||||||
/// #
|
/// #
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
|
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, mio::net::SocketAddr)> {
|
||||||
future::poll_fn(|cx| {
|
future::poll_fn(|cx| {
|
||||||
self.watcher
|
self.watcher
|
||||||
.poll_read_with(cx, |inner| inner.recv_from(buf))
|
.poll_read_with(cx, |inner| inner.recv_from(buf))
|
||||||
|
@ -315,7 +313,7 @@ impl fmt::Debug for UnixDatagram {
|
||||||
impl From<std::os::unix::net::UnixDatagram> for UnixDatagram {
|
impl From<std::os::unix::net::UnixDatagram> for UnixDatagram {
|
||||||
/// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent.
|
/// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent.
|
||||||
fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram {
|
fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram {
|
||||||
let mio_datagram = mio_uds::UnixDatagram::from_datagram(datagram).unwrap();
|
let mio_datagram = mio::net::UnixDatagram::from_std(datagram);
|
||||||
UnixDatagram {
|
UnixDatagram {
|
||||||
watcher: Watcher::new(mio_datagram),
|
watcher: Watcher::new(mio_datagram),
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,18 +1,16 @@
|
||||||
//! Unix-specific networking extensions.
|
//! Unix-specific networking extensions.
|
||||||
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::pin::Pin;
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
|
use std::pin::Pin;
|
||||||
use mio_uds;
|
|
||||||
|
|
||||||
use super::SocketAddr;
|
use super::SocketAddr;
|
||||||
use super::UnixStream;
|
use super::UnixStream;
|
||||||
use crate::future;
|
use crate::future;
|
||||||
use crate::io;
|
use crate::io;
|
||||||
use crate::rt::Watcher;
|
|
||||||
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||||
use crate::path::Path;
|
use crate::path::Path;
|
||||||
|
use crate::rt::Watcher;
|
||||||
use crate::stream::Stream;
|
use crate::stream::Stream;
|
||||||
use crate::task::{spawn_blocking, Context, Poll};
|
use crate::task::{spawn_blocking, Context, Poll};
|
||||||
|
|
||||||
|
@ -50,7 +48,7 @@ use crate::task::{spawn_blocking, Context, Poll};
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub struct UnixListener {
|
pub struct UnixListener {
|
||||||
watcher: Watcher<mio_uds::UnixListener>,
|
watcher: Watcher<mio::net::UnixListener>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UnixListener {
|
impl UnixListener {
|
||||||
|
@ -69,7 +67,7 @@ impl UnixListener {
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
|
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
|
||||||
let path = path.as_ref().to_owned();
|
let path = path.as_ref().to_owned();
|
||||||
let listener = spawn_blocking(move || mio_uds::UnixListener::bind(path)).await?;
|
let listener = spawn_blocking(move || mio::net::UnixListener::bind(path)).await?;
|
||||||
|
|
||||||
Ok(UnixListener {
|
Ok(UnixListener {
|
||||||
watcher: Watcher::new(listener),
|
watcher: Watcher::new(listener),
|
||||||
|
@ -92,28 +90,16 @@ impl UnixListener {
|
||||||
/// #
|
/// #
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
|
pub async fn accept(&self) -> io::Result<(UnixStream, mio::net::SocketAddr)> {
|
||||||
future::poll_fn(|cx| {
|
future::poll_fn(|cx| {
|
||||||
let res = futures_core::ready!(self.watcher.poll_read_with(cx, |inner| {
|
let res =
|
||||||
match inner.accept_std() {
|
futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() }));
|
||||||
// Converting to `WouldBlock` so that the watcher will
|
|
||||||
// add the waker of this task to a list of readers.
|
|
||||||
Ok(None) => Err(io::ErrorKind::WouldBlock.into()),
|
|
||||||
res => res,
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
|
|
||||||
match res? {
|
let (mio_stream, addr) = res?;
|
||||||
Some((io, addr)) => {
|
|
||||||
let mio_stream = mio_uds::UnixStream::from_stream(io)?;
|
|
||||||
let stream = UnixStream {
|
let stream = UnixStream {
|
||||||
watcher: Watcher::new(mio_stream),
|
watcher: Watcher::new(mio_stream),
|
||||||
};
|
};
|
||||||
Poll::Ready(Ok((stream, addr)))
|
Poll::Ready(Ok((stream, addr)))
|
||||||
}
|
|
||||||
// This should never happen since `None` is converted to `WouldBlock`
|
|
||||||
None => unreachable!(),
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -162,7 +148,7 @@ impl UnixListener {
|
||||||
/// #
|
/// #
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn local_addr(&self) -> io::Result<SocketAddr> {
|
pub fn local_addr(&self) -> io::Result<mio::net::SocketAddr> {
|
||||||
self.watcher.get_ref().local_addr()
|
self.watcher.get_ref().local_addr()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -209,7 +195,7 @@ impl Stream for Incoming<'_> {
|
||||||
impl From<std::os::unix::net::UnixListener> for UnixListener {
|
impl From<std::os::unix::net::UnixListener> for UnixListener {
|
||||||
/// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent.
|
/// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent.
|
||||||
fn from(listener: std::os::unix::net::UnixListener) -> UnixListener {
|
fn from(listener: std::os::unix::net::UnixListener) -> UnixListener {
|
||||||
let mio_listener = mio_uds::UnixListener::from_listener(listener).unwrap();
|
let mio_listener = mio::net::UnixListener::from_std(listener);
|
||||||
UnixListener {
|
UnixListener {
|
||||||
watcher: Watcher::new(mio_listener),
|
watcher: Watcher::new(mio_listener),
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,14 +5,13 @@ use std::io::{Read as _, Write as _};
|
||||||
use std::net::Shutdown;
|
use std::net::Shutdown;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
|
|
||||||
use mio_uds;
|
|
||||||
|
|
||||||
use super::SocketAddr;
|
use super::SocketAddr;
|
||||||
|
use crate::future;
|
||||||
use crate::io::{self, Read, Write};
|
use crate::io::{self, Read, Write};
|
||||||
use crate::rt::Watcher;
|
|
||||||
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||||
use crate::path::Path;
|
use crate::path::Path;
|
||||||
use crate::task::{spawn_blocking, Context, Poll};
|
use crate::rt::Watcher;
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
/// A Unix stream socket.
|
/// A Unix stream socket.
|
||||||
///
|
///
|
||||||
|
@ -38,7 +37,7 @@ use crate::task::{spawn_blocking, Context, Poll};
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub struct UnixStream {
|
pub struct UnixStream {
|
||||||
pub(super) watcher: Watcher<mio_uds::UnixStream>,
|
pub(super) watcher: Watcher<mio::net::UnixStream>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl UnixStream {
|
impl UnixStream {
|
||||||
|
@ -58,14 +57,20 @@ impl UnixStream {
|
||||||
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
|
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
|
||||||
let path = path.as_ref().to_owned();
|
let path = path.as_ref().to_owned();
|
||||||
|
|
||||||
spawn_blocking(move || {
|
// mio's UnixStream::connect is non-blocking and may just be in progress
|
||||||
let std_stream = std::os::unix::net::UnixStream::connect(path)?;
|
// when it returns with `Ok`. We therefore wait for write readiness to
|
||||||
let mio_stream = mio_uds::UnixStream::from_stream(std_stream)?;
|
// be sure the connection has either been established or there was an
|
||||||
Ok(UnixStream {
|
// error which we check for afterwards.
|
||||||
watcher: Watcher::new(mio_stream),
|
let mio_stream = mio::net::UnixStream::connect(path)?;
|
||||||
})
|
let watcher = Watcher::new(mio_stream);
|
||||||
})
|
|
||||||
.await
|
future::poll_fn(|cx| watcher.poll_write_ready(cx)).await;
|
||||||
|
|
||||||
|
match watcher.get_ref().take_error() {
|
||||||
|
Ok(None) => Ok(UnixStream { watcher }),
|
||||||
|
Ok(Some(e)) => Err(e),
|
||||||
|
Err(e) => Err(e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates an unnamed pair of connected sockets.
|
/// Creates an unnamed pair of connected sockets.
|
||||||
|
@ -84,7 +89,7 @@ impl UnixStream {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
|
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
|
||||||
let (a, b) = mio_uds::UnixStream::pair()?;
|
let (a, b) = mio::net::UnixStream::pair()?;
|
||||||
let a = UnixStream {
|
let a = UnixStream {
|
||||||
watcher: Watcher::new(a),
|
watcher: Watcher::new(a),
|
||||||
};
|
};
|
||||||
|
@ -108,7 +113,7 @@ impl UnixStream {
|
||||||
/// #
|
/// #
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn local_addr(&self) -> io::Result<SocketAddr> {
|
pub fn local_addr(&self) -> io::Result<mio::net::SocketAddr> {
|
||||||
self.watcher.get_ref().local_addr()
|
self.watcher.get_ref().local_addr()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,7 +131,7 @@ impl UnixStream {
|
||||||
/// #
|
/// #
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
|
pub fn peer_addr(&self) -> io::Result<mio::net::SocketAddr> {
|
||||||
self.watcher.get_ref().peer_addr()
|
self.watcher.get_ref().peer_addr()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -230,7 +235,7 @@ impl fmt::Debug for UnixStream {
|
||||||
impl From<std::os::unix::net::UnixStream> for UnixStream {
|
impl From<std::os::unix::net::UnixStream> for UnixStream {
|
||||||
/// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent.
|
/// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent.
|
||||||
fn from(stream: std::os::unix::net::UnixStream) -> UnixStream {
|
fn from(stream: std::os::unix::net::UnixStream) -> UnixStream {
|
||||||
let mio_stream = mio_uds::UnixStream::from_stream(stream).unwrap();
|
let mio_stream = mio::net::UnixStream::from_std(stream);
|
||||||
UnixStream {
|
UnixStream {
|
||||||
watcher: Watcher::new(mio_stream),
|
watcher: Watcher::new(mio_stream),
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ use std::fmt;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use mio::{self, Evented};
|
use mio::{self, event::Source, Interest};
|
||||||
use slab::Slab;
|
use slab::Slab;
|
||||||
|
|
||||||
use crate::io;
|
use crate::io;
|
||||||
|
@ -64,25 +64,26 @@ impl Reactor {
|
||||||
/// Creates a new reactor for polling I/O events.
|
/// Creates a new reactor for polling I/O events.
|
||||||
pub fn new() -> io::Result<Reactor> {
|
pub fn new() -> io::Result<Reactor> {
|
||||||
let poller = mio::Poll::new()?;
|
let poller = mio::Poll::new()?;
|
||||||
let notify_reg = mio::Registration::new2();
|
todo!();
|
||||||
|
// let notify_reg = mio::Registration::new2();
|
||||||
|
|
||||||
let mut reactor = Reactor {
|
// let mut reactor = Reactor {
|
||||||
poller,
|
// poller,
|
||||||
events: Mutex::new(mio::Events::with_capacity(1000)),
|
// events: Mutex::new(mio::Events::with_capacity(1000)),
|
||||||
entries: Mutex::new(Slab::new()),
|
// entries: Mutex::new(Slab::new()),
|
||||||
notify_reg,
|
// notify_reg,
|
||||||
notify_token: mio::Token(0),
|
// notify_token: mio::Token(0),
|
||||||
};
|
// };
|
||||||
|
|
||||||
// Register a dummy I/O handle for waking up the polling thread.
|
// // Register a dummy I/O handle for waking up the polling thread.
|
||||||
let entry = reactor.register(&reactor.notify_reg.0)?;
|
// let entry = reactor.register(&reactor.notify_reg.0)?;
|
||||||
reactor.notify_token = entry.token;
|
// reactor.notify_token = entry.token;
|
||||||
|
|
||||||
Ok(reactor)
|
// Ok(reactor)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Registers an I/O event source and returns its associated entry.
|
/// Registers an I/O event source and returns its associated entry.
|
||||||
fn register(&self, source: &dyn Evented) -> io::Result<Arc<Entry>> {
|
fn register(&self, source: &mut dyn Source) -> io::Result<Arc<Entry>> {
|
||||||
let mut entries = self.entries.lock().unwrap();
|
let mut entries = self.entries.lock().unwrap();
|
||||||
|
|
||||||
// Reserve a vacant spot in the slab and use its key as the token value.
|
// Reserve a vacant spot in the slab and use its key as the token value.
|
||||||
|
@ -104,17 +105,17 @@ impl Reactor {
|
||||||
vacant.insert(entry.clone());
|
vacant.insert(entry.clone());
|
||||||
|
|
||||||
// Register the I/O event source in the poller.
|
// Register the I/O event source in the poller.
|
||||||
let interest = mio::Ready::all();
|
// TODO: ADD AIO and LIO?
|
||||||
let opts = mio::PollOpt::edge();
|
const interest: Interest = Interest::READABLE.add(Interest::WRITABLE);
|
||||||
self.poller.register(source, token, interest, opts)?;
|
self.poller.registry().register(source, token, interest)?;
|
||||||
|
|
||||||
Ok(entry)
|
Ok(entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Deregisters an I/O event source associated with an entry.
|
/// Deregisters an I/O event source associated with an entry.
|
||||||
fn deregister(&self, source: &dyn Evented, entry: &Entry) -> io::Result<()> {
|
fn deregister(&self, source: &mut dyn Source, entry: &Entry) -> io::Result<()> {
|
||||||
// Deregister the I/O object from the mio instance.
|
// Deregister the I/O object from the mio instance.
|
||||||
self.poller.deregister(source)?;
|
self.poller.registry().deregister(source)?;
|
||||||
|
|
||||||
// Remove the entry associated with the I/O object.
|
// Remove the entry associated with the I/O object.
|
||||||
self.entries.lock().unwrap().remove(entry.token.0);
|
self.entries.lock().unwrap().remove(entry.token.0);
|
||||||
|
@ -124,7 +125,8 @@ impl Reactor {
|
||||||
|
|
||||||
/// Notifies the reactor so that polling stops blocking.
|
/// Notifies the reactor so that polling stops blocking.
|
||||||
pub fn notify(&self) -> io::Result<()> {
|
pub fn notify(&self) -> io::Result<()> {
|
||||||
self.notify_reg.1.set_readiness(mio::Ready::readable())
|
todo!()
|
||||||
|
// self.notify_reg.1.set_readiness(mio::Ready::readable())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
|
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
|
||||||
|
@ -147,16 +149,13 @@ impl Reactor {
|
||||||
|
|
||||||
if token == self.notify_token {
|
if token == self.notify_token {
|
||||||
// If this is the notification token, we just need the notification state.
|
// If this is the notification token, we just need the notification state.
|
||||||
self.notify_reg.1.set_readiness(mio::Ready::empty())?;
|
todo!()
|
||||||
|
// self.notify_reg.1.set_readiness(mio::Ready::empty())?;
|
||||||
} else {
|
} else {
|
||||||
// Otherwise, look for the entry associated with this token.
|
// Otherwise, look for the entry associated with this token.
|
||||||
if let Some(entry) = entries.get(token.0) {
|
if let Some(entry) = entries.get(token.0) {
|
||||||
// Set the readiness flags from this I/O event.
|
|
||||||
let readiness = event.readiness();
|
|
||||||
|
|
||||||
// Wake up reader tasks blocked on this I/O handle.
|
// Wake up reader tasks blocked on this I/O handle.
|
||||||
let reader_interests = mio::Ready::all() - mio::Ready::writable();
|
if event.is_readable() {
|
||||||
if !(readiness & reader_interests).is_empty() {
|
|
||||||
let mut readers = entry.readers.lock().unwrap();
|
let mut readers = entry.readers.lock().unwrap();
|
||||||
readers.ready = true;
|
readers.ready = true;
|
||||||
for w in readers.wakers.drain(..) {
|
for w in readers.wakers.drain(..) {
|
||||||
|
@ -166,8 +165,7 @@ impl Reactor {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wake up writer tasks blocked on this I/O handle.
|
// Wake up writer tasks blocked on this I/O handle.
|
||||||
let writer_interests = mio::Ready::all() - mio::Ready::readable();
|
if event.is_writable() {
|
||||||
if !(readiness & writer_interests).is_empty() {
|
|
||||||
let mut writers = entry.writers.lock().unwrap();
|
let mut writers = entry.writers.lock().unwrap();
|
||||||
writers.ready = true;
|
writers.ready = true;
|
||||||
for w in writers.wakers.drain(..) {
|
for w in writers.wakers.drain(..) {
|
||||||
|
@ -187,7 +185,7 @@ impl Reactor {
|
||||||
///
|
///
|
||||||
/// This handle wraps an I/O event source and exposes a "futurized" interface on top of it,
|
/// This handle wraps an I/O event source and exposes a "futurized" interface on top of it,
|
||||||
/// implementing traits `AsyncRead` and `AsyncWrite`.
|
/// implementing traits `AsyncRead` and `AsyncWrite`.
|
||||||
pub struct Watcher<T: Evented> {
|
pub struct Watcher<T: Source> {
|
||||||
/// Data associated with the I/O handle.
|
/// Data associated with the I/O handle.
|
||||||
entry: Arc<Entry>,
|
entry: Arc<Entry>,
|
||||||
|
|
||||||
|
@ -195,7 +193,7 @@ pub struct Watcher<T: Evented> {
|
||||||
source: Option<T>,
|
source: Option<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Evented> Watcher<T> {
|
impl<T: Source> Watcher<T> {
|
||||||
/// Creates a new I/O handle.
|
/// Creates a new I/O handle.
|
||||||
///
|
///
|
||||||
/// The provided I/O event source will be kept registered inside the reactor's poller for the
|
/// The provided I/O event source will be kept registered inside the reactor's poller for the
|
||||||
|
@ -204,7 +202,7 @@ impl<T: Evented> Watcher<T> {
|
||||||
Watcher {
|
Watcher {
|
||||||
entry: RUNTIME
|
entry: RUNTIME
|
||||||
.reactor()
|
.reactor()
|
||||||
.register(&source)
|
.register(&mut source)
|
||||||
.expect("cannot register an I/O event source"),
|
.expect("cannot register an I/O event source"),
|
||||||
source: Some(source),
|
source: Some(source),
|
||||||
}
|
}
|
||||||
|
@ -327,15 +325,15 @@ impl<T: Evented> Watcher<T> {
|
||||||
let source = self.source.take().unwrap();
|
let source = self.source.take().unwrap();
|
||||||
RUNTIME
|
RUNTIME
|
||||||
.reactor()
|
.reactor()
|
||||||
.deregister(&source, &self.entry)
|
.deregister(&mut source, &self.entry)
|
||||||
.expect("cannot deregister I/O event source");
|
.expect("cannot deregister I/O event source");
|
||||||
source
|
source
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Evented> Drop for Watcher<T> {
|
impl<T: Source> Drop for Watcher<T> {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if let Some(ref source) = self.source {
|
if let Some(ref mut source) = self.source {
|
||||||
RUNTIME
|
RUNTIME
|
||||||
.reactor()
|
.reactor()
|
||||||
.deregister(source, &self.entry)
|
.deregister(source, &self.entry)
|
||||||
|
@ -344,7 +342,7 @@ impl<T: Evented> Drop for Watcher<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> {
|
impl<T: Source + fmt::Debug> fmt::Debug for Watcher<T> {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
f.debug_struct("Watcher")
|
f.debug_struct("Watcher")
|
||||||
.field("entry", &self.entry)
|
.field("entry", &self.entry)
|
||||||
|
|
Loading…
Reference in a new issue