From 56769d8b6c39f560229ec354452ba802aefc6755 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Wed, 8 Apr 2020 17:08:44 +0200 Subject: [PATCH] fix token handling --- src/os/unix/net/datagram.rs | 5 ++--- src/os/unix/net/listener.rs | 11 +++++------ src/os/unix/net/stream.rs | 1 + src/rt/reactor.rs | 18 ++++++++++++++++-- 4 files changed, 24 insertions(+), 11 deletions(-) diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index c7c9403..1c4ea7d 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -9,7 +9,6 @@ use crate::io; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; use crate::rt::Watcher; -use crate::task::spawn_blocking; /// A Unix datagram socket. /// @@ -65,8 +64,8 @@ impl UnixDatagram { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let socket = spawn_blocking(move || mio::net::UnixDatagram::bind(path)).await?; - Ok(UnixDatagram::new(socket)) + let mio_socket = mio::net::UnixDatagram::bind(path)?; + Ok(UnixDatagram::new(mio_socket)) } /// Creates a Unix datagram which is not bound to any address. diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index e77d423..5791808 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -12,7 +12,7 @@ use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::path::Path; use crate::rt::Watcher; use crate::stream::Stream; -use crate::task::{spawn_blocking, Context, Poll}; +use crate::task::{Context, Poll}; /// A Unix domain socket server, listening for connections. /// @@ -67,10 +67,10 @@ impl UnixListener { /// ``` pub async fn bind>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - let listener = spawn_blocking(move || mio::net::UnixListener::bind(path)).await?; + let mio_listener = mio::net::UnixListener::bind(path)?; Ok(UnixListener { - watcher: Watcher::new(listener), + watcher: Watcher::new(mio_listener), }) } @@ -92,10 +92,9 @@ impl UnixListener { /// ``` pub async fn accept(&self) -> io::Result<(UnixStream, mio::net::SocketAddr)> { future::poll_fn(|cx| { - let res = - futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() })); + let (mio_stream, addr) = + futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() }))?; - let (mio_stream, addr) = res?; let stream = UnixStream { watcher: Watcher::new(mio_stream), }; diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 3cd8fc9..86063c2 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -211,6 +211,7 @@ impl Write for &UnixStream { } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + self.shutdown(std::net::Shutdown::Write)?; Poll::Ready(Ok(())) } } diff --git a/src/rt/reactor.rs b/src/rt/reactor.rs index 1ea52ca..afef99e 100644 --- a/src/rt/reactor.rs +++ b/src/rt/reactor.rs @@ -67,15 +67,29 @@ impl Reactor { /// Creates a new reactor for polling I/O events. pub fn new() -> io::Result { let poller = mio::Poll::new()?; + let mut entries = Slab::new(); // Register a waker for waking up the polling thread. - let notify_token = mio::Token(0); // TODO: is this being 0 okay? + let vacant = entries.vacant_entry(); + let notify_token = mio::Token(vacant.key()); let notify_waker = mio::Waker::new(poller.registry(), notify_token)?; + // dumy entry to avoid reusing the same token + vacant.insert(Arc::new(Entry { + token: notify_token.clone(), + readers: Mutex::new(Readers { + ready: false, + wakers: Vec::new(), + }), + writers: Mutex::new(Writers { + ready: false, + wakers: Vec::new(), + }), + })); let reactor = Reactor { poller: Mutex::new(poller), events: Mutex::new(mio::Events::with_capacity(1000)), - entries: Mutex::new(Slab::new()), + entries: Mutex::new(entries), notify_waker, notify_token, };