fix token handling

This commit is contained in:
dignifiedquire 2020-04-08 17:08:44 +02:00
parent 121bc7d726
commit 56769d8b6c
4 changed files with 24 additions and 11 deletions

View file

@ -9,7 +9,6 @@ use crate::io;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path; use crate::path::Path;
use crate::rt::Watcher; use crate::rt::Watcher;
use crate::task::spawn_blocking;
/// A Unix datagram socket. /// A Unix datagram socket.
/// ///
@ -65,8 +64,8 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> { pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let socket = spawn_blocking(move || mio::net::UnixDatagram::bind(path)).await?; let mio_socket = mio::net::UnixDatagram::bind(path)?;
Ok(UnixDatagram::new(socket)) Ok(UnixDatagram::new(mio_socket))
} }
/// Creates a Unix datagram which is not bound to any address. /// Creates a Unix datagram which is not bound to any address.

View file

@ -12,7 +12,7 @@ use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path; use crate::path::Path;
use crate::rt::Watcher; use crate::rt::Watcher;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{spawn_blocking, Context, Poll}; use crate::task::{Context, Poll};
/// A Unix domain socket server, listening for connections. /// A Unix domain socket server, listening for connections.
/// ///
@ -67,10 +67,10 @@ impl UnixListener {
/// ``` /// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> { pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let listener = spawn_blocking(move || mio::net::UnixListener::bind(path)).await?; let mio_listener = mio::net::UnixListener::bind(path)?;
Ok(UnixListener { Ok(UnixListener {
watcher: Watcher::new(listener), watcher: Watcher::new(mio_listener),
}) })
} }
@ -92,10 +92,9 @@ impl UnixListener {
/// ``` /// ```
pub async fn accept(&self) -> io::Result<(UnixStream, mio::net::SocketAddr)> { pub async fn accept(&self) -> io::Result<(UnixStream, mio::net::SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
let res = let (mio_stream, addr) =
futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() })); futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { inner.accept() }))?;
let (mio_stream, addr) = res?;
let stream = UnixStream { let stream = UnixStream {
watcher: Watcher::new(mio_stream), watcher: Watcher::new(mio_stream),
}; };

View file

@ -211,6 +211,7 @@ impl Write for &UnixStream {
} }
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.shutdown(std::net::Shutdown::Write)?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
} }

View file

@ -67,15 +67,29 @@ impl Reactor {
/// Creates a new reactor for polling I/O events. /// Creates a new reactor for polling I/O events.
pub fn new() -> io::Result<Reactor> { pub fn new() -> io::Result<Reactor> {
let poller = mio::Poll::new()?; let poller = mio::Poll::new()?;
let mut entries = Slab::new();
// Register a waker for waking up the polling thread. // Register a waker for waking up the polling thread.
let notify_token = mio::Token(0); // TODO: is this being 0 okay? let vacant = entries.vacant_entry();
let notify_token = mio::Token(vacant.key());
let notify_waker = mio::Waker::new(poller.registry(), notify_token)?; let notify_waker = mio::Waker::new(poller.registry(), notify_token)?;
// dumy entry to avoid reusing the same token
vacant.insert(Arc::new(Entry {
token: notify_token.clone(),
readers: Mutex::new(Readers {
ready: false,
wakers: Vec::new(),
}),
writers: Mutex::new(Writers {
ready: false,
wakers: Vec::new(),
}),
}));
let reactor = Reactor { let reactor = Reactor {
poller: Mutex::new(poller), poller: Mutex::new(poller),
events: Mutex::new(mio::Events::with_capacity(1000)), events: Mutex::new(mio::Events::with_capacity(1000)),
entries: Mutex::new(Slab::new()), entries: Mutex::new(entries),
notify_waker, notify_waker,
notify_token, notify_token,
}; };