Make sure ownership is transferred in into_raw_fd

Previously all of the into_raw_fd implementations only returns a copy of
the inner RawFd, while still holding the ownership of the file
descriptor when returning for into_raw_fd. Since `self` is dropped at
the end of into_raw_fd, the returned file descriptor will actually be
closed, render the function unuseable.

The patch makes sure that into_raw_fd actually takes the ownership of
the file descriptor all the way from the inner IoHandle. To achieve
this, I have to use an Option in IoHandle to store the I/O source. It's
not pretty, but I cannot come up with a better way.
staging
Yuxuan Shui 5 years ago
parent 2ca9c46b4b
commit 876059cfe0
No known key found for this signature in database
GPG Key ID: D3A4405BE6CC17F4

@ -183,7 +183,7 @@ pub struct IoHandle<T: Evented> {
entry: Arc<Entry>, entry: Arc<Entry>,
/// The I/O event source. /// The I/O event source.
source: T, source: Option<T>,
} }
impl<T: Evented> IoHandle<T> { impl<T: Evented> IoHandle<T> {
@ -196,13 +196,13 @@ impl<T: Evented> IoHandle<T> {
entry: REACTOR entry: REACTOR
.register(&source) .register(&source)
.expect("cannot register an I/O event source"), .expect("cannot register an I/O event source"),
source, source: Some(source),
} }
} }
/// Returns a reference to the inner I/O event source. /// Returns a reference to the inner I/O event source.
pub fn get_ref(&self) -> &T { pub fn get_ref(&self) -> &T {
&self.source self.source.as_ref().unwrap()
} }
/// Polls the I/O handle for reading. /// Polls the I/O handle for reading.
@ -278,13 +278,26 @@ impl<T: Evented> IoHandle<T> {
Ok(()) Ok(())
} }
/// Deregister and return the I/O source
///
/// This method is to support IntoRawFd in struct that uses IoHandle
pub fn into_inner(mut self) -> T {
let source = self.source.take().unwrap();
REACTOR
.deregister(&source, &self.entry)
.expect("cannot deregister I/O event source");
source
}
} }
impl<T: Evented> Drop for IoHandle<T> { impl<T: Evented> Drop for IoHandle<T> {
fn drop(&mut self) { fn drop(&mut self) {
REACTOR if let Some(ref source) = self.source {
.deregister(&self.source, &self.entry) REACTOR
.expect("cannot deregister I/O event source"); .deregister(source, &self.entry)
.expect("cannot deregister I/O event source");
}
} }
} }
@ -305,7 +318,7 @@ impl<T: Evented + std::io::Read + Unpin> AsyncRead for IoHandle<T> {
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?); futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?);
match self.source.read(buf) { match self.source.as_mut().unwrap().read(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_readable(cx)?; self.clear_readable(cx)?;
Poll::Pending Poll::Pending
@ -326,7 +339,7 @@ where
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?); futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?);
match (&self.source).read(buf) { match self.source.as_ref().unwrap().read(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_readable(cx)?; self.clear_readable(cx)?;
Poll::Pending Poll::Pending
@ -344,7 +357,7 @@ impl<T: Evented + std::io::Write + Unpin> AsyncWrite for IoHandle<T> {
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
futures_core::ready!(self.poll_writable(cx)?); futures_core::ready!(self.poll_writable(cx)?);
match self.source.write(buf) { match self.source.as_mut().unwrap().write(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_writable(cx)?; self.clear_writable(cx)?;
Poll::Pending Poll::Pending
@ -356,7 +369,7 @@ impl<T: Evented + std::io::Write + Unpin> AsyncWrite for IoHandle<T> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
futures_core::ready!(self.poll_writable(cx)?); futures_core::ready!(self.poll_writable(cx)?);
match self.source.flush() { match self.source.as_mut().unwrap().flush() {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_writable(cx)?; self.clear_writable(cx)?;
Poll::Pending Poll::Pending
@ -381,7 +394,7 @@ where
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
futures_core::ready!(self.poll_writable(cx)?); futures_core::ready!(self.poll_writable(cx)?);
match (&self.source).write(buf) { match self.get_ref().write(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_writable(cx)?; self.clear_writable(cx)?;
Poll::Pending Poll::Pending
@ -393,7 +406,7 @@ where
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
futures_core::ready!(self.poll_writable(cx)?); futures_core::ready!(self.poll_writable(cx)?);
match (&self.source).flush() { match self.get_ref().flush() {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
self.clear_writable(cx)?; self.clear_writable(cx)?;
Poll::Pending Poll::Pending

@ -50,9 +50,6 @@ use crate::task::{Context, Poll};
#[derive(Debug)] #[derive(Debug)]
pub struct TcpListener { pub struct TcpListener {
io_handle: IoHandle<mio::net::TcpListener>, io_handle: IoHandle<mio::net::TcpListener>,
#[cfg(unix)]
raw_fd: std::os::unix::io::RawFd,
// #[cfg(windows)] // #[cfg(windows)]
// raw_socket: std::os::windows::io::RawSocket, // raw_socket: std::os::windows::io::RawSocket,
} }
@ -87,7 +84,6 @@ impl TcpListener {
Ok(mio_listener) => { Ok(mio_listener) => {
#[cfg(unix)] #[cfg(unix)]
let listener = TcpListener { let listener = TcpListener {
raw_fd: mio_listener.as_raw_fd(),
io_handle: IoHandle::new(mio_listener), io_handle: IoHandle::new(mio_listener),
}; };
@ -136,7 +132,6 @@ impl TcpListener {
#[cfg(unix)] #[cfg(unix)]
let stream = TcpStream { let stream = TcpStream {
raw_fd: mio_stream.as_raw_fd(),
io_handle: IoHandle::new(mio_stream), io_handle: IoHandle::new(mio_stream),
}; };
@ -243,7 +238,6 @@ impl From<std::net::TcpListener> for TcpListener {
#[cfg(unix)] #[cfg(unix)]
let listener = TcpListener { let listener = TcpListener {
raw_fd: mio_listener.as_raw_fd(),
io_handle: IoHandle::new(mio_listener), io_handle: IoHandle::new(mio_listener),
}; };
@ -273,7 +267,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] { if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for TcpListener { impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
self.raw_fd self.io_handle.get_ref().as_raw_fd()
} }
} }
@ -285,7 +279,7 @@ cfg_if! {
impl IntoRawFd for TcpListener { impl IntoRawFd for TcpListener {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.raw_fd self.io_handle.into_inner().into_raw_fd()
} }
} }
} }

@ -51,9 +51,6 @@ use crate::task::{Context, Poll};
#[derive(Debug)] #[derive(Debug)]
pub struct TcpStream { pub struct TcpStream {
pub(super) io_handle: IoHandle<mio::net::TcpStream>, pub(super) io_handle: IoHandle<mio::net::TcpStream>,
#[cfg(unix)]
pub(super) raw_fd: std::os::unix::io::RawFd,
// #[cfg(windows)] // #[cfg(windows)]
// pub(super) raw_socket: std::os::windows::io::RawSocket, // pub(super) raw_socket: std::os::windows::io::RawSocket,
} }
@ -103,7 +100,6 @@ impl TcpStream {
let stream = mio::net::TcpStream::connect(&addr).map(|mio_stream| { let stream = mio::net::TcpStream::connect(&addr).map(|mio_stream| {
#[cfg(unix)] #[cfg(unix)]
let stream = TcpStream { let stream = TcpStream {
raw_fd: mio_stream.as_raw_fd(),
io_handle: IoHandle::new(mio_stream), io_handle: IoHandle::new(mio_stream),
}; };
@ -445,7 +441,6 @@ impl From<std::net::TcpStream> for TcpStream {
#[cfg(unix)] #[cfg(unix)]
let stream = TcpStream { let stream = TcpStream {
raw_fd: mio_stream.as_raw_fd(),
io_handle: IoHandle::new(mio_stream), io_handle: IoHandle::new(mio_stream),
}; };
@ -475,7 +470,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] { if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for TcpStream { impl AsRawFd for TcpStream {
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
self.raw_fd self.io_handle.get_ref().as_raw_fd()
} }
} }
@ -487,7 +482,7 @@ cfg_if! {
impl IntoRawFd for TcpStream { impl IntoRawFd for TcpStream {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.raw_fd self.io_handle.into_inner().into_raw_fd()
} }
} }
} }

@ -48,9 +48,6 @@ use crate::task::Poll;
#[derive(Debug)] #[derive(Debug)]
pub struct UdpSocket { pub struct UdpSocket {
io_handle: IoHandle<mio::net::UdpSocket>, io_handle: IoHandle<mio::net::UdpSocket>,
#[cfg(unix)]
raw_fd: std::os::unix::io::RawFd,
// #[cfg(windows)] // #[cfg(windows)]
// raw_socket: std::os::windows::io::RawSocket, // raw_socket: std::os::windows::io::RawSocket,
} }
@ -82,7 +79,6 @@ impl UdpSocket {
Ok(mio_socket) => { Ok(mio_socket) => {
#[cfg(unix)] #[cfg(unix)]
let socket = UdpSocket { let socket = UdpSocket {
raw_fd: mio_socket.as_raw_fd(),
io_handle: IoHandle::new(mio_socket), io_handle: IoHandle::new(mio_socket),
}; };
@ -515,7 +511,6 @@ impl From<std::net::UdpSocket> for UdpSocket {
#[cfg(unix)] #[cfg(unix)]
let socket = UdpSocket { let socket = UdpSocket {
raw_fd: mio_socket.as_raw_fd(),
io_handle: IoHandle::new(mio_socket), io_handle: IoHandle::new(mio_socket),
}; };
@ -545,7 +540,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] { if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for UdpSocket { impl AsRawFd for UdpSocket {
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
self.raw_fd self.io_handle.get_ref().as_raw_fd()
} }
} }
@ -557,7 +552,7 @@ cfg_if! {
impl IntoRawFd for UdpSocket { impl IntoRawFd for UdpSocket {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.raw_fd self.io_handle.into_inner().into_raw_fd()
} }
} }
} }

@ -44,15 +44,12 @@ use crate::task::{blocking, Poll};
pub struct UnixDatagram { pub struct UnixDatagram {
#[cfg(not(feature = "docs"))] #[cfg(not(feature = "docs"))]
io_handle: IoHandle<mio_uds::UnixDatagram>, io_handle: IoHandle<mio_uds::UnixDatagram>,
raw_fd: RawFd,
} }
impl UnixDatagram { impl UnixDatagram {
#[cfg(not(feature = "docs"))] #[cfg(not(feature = "docs"))]
fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram { fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram {
UnixDatagram { UnixDatagram {
raw_fd: socket.as_raw_fd(),
io_handle: IoHandle::new(socket), io_handle: IoHandle::new(socket),
} }
} }
@ -362,7 +359,6 @@ impl From<std::os::unix::net::UnixDatagram> for UnixDatagram {
fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram { fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram {
let mio_datagram = mio_uds::UnixDatagram::from_datagram(datagram).unwrap(); let mio_datagram = mio_uds::UnixDatagram::from_datagram(datagram).unwrap();
UnixDatagram { UnixDatagram {
raw_fd: mio_datagram.as_raw_fd(),
io_handle: IoHandle::new(mio_datagram), io_handle: IoHandle::new(mio_datagram),
} }
} }
@ -370,7 +366,7 @@ impl From<std::os::unix::net::UnixDatagram> for UnixDatagram {
impl AsRawFd for UnixDatagram { impl AsRawFd for UnixDatagram {
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
self.raw_fd self.io_handle.get_ref().as_raw_fd()
} }
} }
@ -383,6 +379,6 @@ impl FromRawFd for UnixDatagram {
impl IntoRawFd for UnixDatagram { impl IntoRawFd for UnixDatagram {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.raw_fd self.io_handle.into_inner().into_raw_fd()
} }
} }

@ -50,8 +50,6 @@ use crate::task::{blocking, Context, Poll};
pub struct UnixListener { pub struct UnixListener {
#[cfg(not(feature = "docs"))] #[cfg(not(feature = "docs"))]
io_handle: IoHandle<mio_uds::UnixListener>, io_handle: IoHandle<mio_uds::UnixListener>,
raw_fd: RawFd,
} }
impl UnixListener { impl UnixListener {
@ -73,7 +71,6 @@ impl UnixListener {
let listener = blocking::spawn(async move { mio_uds::UnixListener::bind(path) }).await?; let listener = blocking::spawn(async move { mio_uds::UnixListener::bind(path) }).await?;
Ok(UnixListener { Ok(UnixListener {
raw_fd: listener.as_raw_fd(),
io_handle: IoHandle::new(listener), io_handle: IoHandle::new(listener),
}) })
} }
@ -102,7 +99,6 @@ impl UnixListener {
Ok(Some((io, addr))) => { Ok(Some((io, addr))) => {
let mio_stream = mio_uds::UnixStream::from_stream(io)?; let mio_stream = mio_uds::UnixStream::from_stream(io)?;
let stream = UnixStream { let stream = UnixStream {
raw_fd: mio_stream.as_raw_fd(),
io_handle: IoHandle::new(mio_stream), io_handle: IoHandle::new(mio_stream),
}; };
Poll::Ready(Ok((stream, addr))) Poll::Ready(Ok((stream, addr)))
@ -214,7 +210,6 @@ impl From<std::os::unix::net::UnixListener> for UnixListener {
fn from(listener: std::os::unix::net::UnixListener) -> UnixListener { fn from(listener: std::os::unix::net::UnixListener) -> UnixListener {
let mio_listener = mio_uds::UnixListener::from_listener(listener).unwrap(); let mio_listener = mio_uds::UnixListener::from_listener(listener).unwrap();
UnixListener { UnixListener {
raw_fd: mio_listener.as_raw_fd(),
io_handle: IoHandle::new(mio_listener), io_handle: IoHandle::new(mio_listener),
} }
} }
@ -222,7 +217,7 @@ impl From<std::os::unix::net::UnixListener> for UnixListener {
impl AsRawFd for UnixListener { impl AsRawFd for UnixListener {
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
self.raw_fd self.io_handle.get_ref().as_raw_fd()
} }
} }
@ -235,6 +230,6 @@ impl FromRawFd for UnixListener {
impl IntoRawFd for UnixListener { impl IntoRawFd for UnixListener {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.raw_fd self.io_handle.into_inner().into_raw_fd()
} }
} }

@ -42,8 +42,6 @@ use crate::task::{blocking, Context, Poll};
pub struct UnixStream { pub struct UnixStream {
#[cfg(not(feature = "docs"))] #[cfg(not(feature = "docs"))]
pub(super) io_handle: IoHandle<mio_uds::UnixStream>, pub(super) io_handle: IoHandle<mio_uds::UnixStream>,
pub(super) raw_fd: RawFd,
} }
impl UnixStream { impl UnixStream {
@ -71,7 +69,6 @@ impl UnixStream {
let mut state = { let mut state = {
match blocking::spawn(async move { mio_uds::UnixStream::connect(path) }).await { match blocking::spawn(async move { mio_uds::UnixStream::connect(path) }).await {
Ok(mio_stream) => State::Waiting(UnixStream { Ok(mio_stream) => State::Waiting(UnixStream {
raw_fd: mio_stream.as_raw_fd(),
io_handle: IoHandle::new(mio_stream), io_handle: IoHandle::new(mio_stream),
}), }),
Err(err) => State::Error(err), Err(err) => State::Error(err),
@ -124,11 +121,9 @@ impl UnixStream {
pub fn pair() -> io::Result<(UnixStream, UnixStream)> { pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
let (a, b) = mio_uds::UnixStream::pair()?; let (a, b) = mio_uds::UnixStream::pair()?;
let a = UnixStream { let a = UnixStream {
raw_fd: a.as_raw_fd(),
io_handle: IoHandle::new(a), io_handle: IoHandle::new(a),
}; };
let b = UnixStream { let b = UnixStream {
raw_fd: b.as_raw_fd(),
io_handle: IoHandle::new(b), io_handle: IoHandle::new(b),
}; };
Ok((a, b)) Ok((a, b))
@ -271,7 +266,6 @@ impl From<std::os::unix::net::UnixStream> for UnixStream {
fn from(stream: std::os::unix::net::UnixStream) -> UnixStream { fn from(stream: std::os::unix::net::UnixStream) -> UnixStream {
let mio_stream = mio_uds::UnixStream::from_stream(stream).unwrap(); let mio_stream = mio_uds::UnixStream::from_stream(stream).unwrap();
UnixStream { UnixStream {
raw_fd: mio_stream.as_raw_fd(),
io_handle: IoHandle::new(mio_stream), io_handle: IoHandle::new(mio_stream),
} }
} }
@ -279,7 +273,7 @@ impl From<std::os::unix::net::UnixStream> for UnixStream {
impl AsRawFd for UnixStream { impl AsRawFd for UnixStream {
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
self.raw_fd self.io_handle.get_ref().as_raw_fd()
} }
} }
@ -292,6 +286,6 @@ impl FromRawFd for UnixStream {
impl IntoRawFd for UnixStream { impl IntoRawFd for UnixStream {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.raw_fd self.io_handle.into_inner().into_raw_fd()
} }
} }

@ -38,5 +38,4 @@ fn into_raw_fd() -> io::Result<()> {
Ok(()) Ok(())
}) })
} }

Loading…
Cancel
Save