diff --git a/src/fs/dir_builder.rs b/src/fs/dir_builder.rs index a4a95689..56476103 100644 --- a/src/fs/dir_builder.rs +++ b/src/fs/dir_builder.rs @@ -1,10 +1,10 @@ use std::fs; -use std::io; use std::path::Path; use cfg_if::cfg_if; use crate::future::Future; +use crate::io; use crate::task::blocking; /// A builder for creating directories in various manners. diff --git a/src/fs/dir_entry.rs b/src/fs/dir_entry.rs index f8bb9e7f..a16adaee 100644 --- a/src/fs/dir_entry.rs +++ b/src/fs/dir_entry.rs @@ -1,6 +1,5 @@ use std::ffi::OsString; use std::fs; -use std::io; use std::path::PathBuf; use std::pin::Pin; use std::sync::Mutex; @@ -9,6 +8,7 @@ use cfg_if::cfg_if; use futures::future::{self, FutureExt, TryFutureExt}; use crate::future::Future; +use crate::io; use crate::task::{blocking, Poll}; /// An entry inside a directory. diff --git a/src/fs/file.rs b/src/fs/file.rs index 0332d68a..0a0a8e57 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -1,7 +1,7 @@ //! Types for working with files. use std::fs; -use std::io::{self, SeekFrom}; +use std::io::{Read as _, Seek, SeekFrom, Write as _}; use std::path::Path; use std::pin::Pin; use std::sync::Mutex; @@ -11,6 +11,7 @@ use futures::future::{self, FutureExt, TryFutureExt}; use futures::io::{AsyncSeek, Initializer}; use crate::future::Future; +use crate::io; use crate::task::{blocking, Context, Poll}; /// A reference to a file on the filesystem. @@ -543,10 +544,10 @@ impl futures::io::AsyncRead for &File { *state = State::Busy(blocking::spawn(async move { if offset > 0 { let pos = SeekFrom::Current(-(offset as i64)); - let _ = io::Seek::seek(&mut inner.file, pos); + let _ = Seek::seek(&mut inner.file, pos); } - let res = io::Read::read(&mut inner.file, &mut inner.buf); + let res = inner.file.read(&mut inner.buf); inner.last_op = Some(Operation::Read(res)); State::Idle(Some(inner)) })); @@ -621,7 +622,7 @@ impl futures::io::AsyncWrite for &File { // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { - let res = io::Write::write(&mut inner.file, &mut inner.buf); + let res = inner.file.write(&mut inner.buf); inner.last_op = Some(Operation::Write(res)); State::Idle(Some(inner)) })); @@ -654,7 +655,7 @@ impl futures::io::AsyncWrite for &File { // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { - let res = io::Write::flush(&mut inner.file); + let res = inner.file.flush(); inner.last_op = Some(Operation::Flush(res)); State::Idle(Some(inner)) })); @@ -725,7 +726,7 @@ impl AsyncSeek for &File { // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { - let res = io::Seek::seek(&mut inner.file, pos); + let res = inner.file.seek(pos); inner.last_op = Some(Operation::Seek(res)); State::Idle(Some(inner)) })); diff --git a/src/fs/mod.rs b/src/fs/mod.rs index a11bba5f..19a98a82 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -22,10 +22,10 @@ //! ``` use std::fs; -use std::io; use std::path::{Path, PathBuf}; use crate::task::blocking; +use std::io; pub use dir_builder::DirBuilder; pub use dir_entry::DirEntry; diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index 7c2830b6..55dec157 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -1,11 +1,11 @@ use std::fs; -use std::io; use std::pin::Pin; use std::sync::Mutex; use super::DirEntry; use crate::future::Future; +use crate::io; use crate::task::{blocking, Context, Poll}; /// A stream over entries in a directory. diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index e681d351..fb90c8dd 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -1,9 +1,10 @@ -use std::io::{self, IoSliceMut, Read as _, SeekFrom}; +use std::io::{IoSliceMut, Read as _, SeekFrom}; use std::pin::Pin; use std::{cmp, fmt}; use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer}; +use crate::io; use crate::task::{Context, Poll}; const DEFAULT_CAPACITY: usize = 8 * 1024; @@ -22,7 +23,10 @@ const DEFAULT_CAPACITY: usize = 8 * 1024; /// When the `BufReader` is dropped, the contents of its buffer will be discarded. Creating /// multiple instances of a `BufReader` on the same stream can cause data loss. /// +/// This type is an async version of [`std::io::BufReader`]. +/// /// [`Read`]: trait.Read.html +/// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html /// /// # Examples /// diff --git a/src/io/copy.rs b/src/io/copy.rs index ed166607..01894e80 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -1,7 +1,7 @@ -use std::io; - use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite}; +use crate::io; + /// Copies the entire contents of a reader into a writer. /// /// This function will continuously read data from `reader` and then diff --git a/src/io/read.rs b/src/io/read.rs index 89190f76..9c07dfc4 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,4 +1,4 @@ -use std::io::{self, IoSliceMut}; +use std::io::IoSliceMut; use std::mem; use std::pin::Pin; use std::str; @@ -7,6 +7,7 @@ use cfg_if::cfg_if; use futures::io::AsyncRead; use crate::future::Future; +use crate::io; use crate::task::{Context, Poll}; cfg_if! { diff --git a/src/io/seek.rs b/src/io/seek.rs index fd48fa8d..93a0b1c5 100644 --- a/src/io/seek.rs +++ b/src/io/seek.rs @@ -1,10 +1,11 @@ -use std::io::{self, SeekFrom}; +use std::io::SeekFrom; use std::pin::Pin; use cfg_if::cfg_if; use futures::io::AsyncSeek; use crate::future::Future; +use crate::io; use crate::task::{Context, Poll}; cfg_if! { diff --git a/src/io/write.rs b/src/io/write.rs index f1f402dd..01cbd3a4 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,4 +1,4 @@ -use std::io::{self, IoSlice}; +use std::io::IoSlice; use std::mem; use std::pin::Pin; @@ -6,6 +6,7 @@ use cfg_if::cfg_if; use futures::io::AsyncWrite; use crate::future::Future; +use crate::io; use crate::task::{Context, Poll}; cfg_if! { diff --git a/src/net/driver.rs b/src/net/driver/mod.rs similarity index 98% rename from src/net/driver.rs rename to src/net/driver/mod.rs index b2c4e8ae..4002254d 100644 --- a/src/net/driver.rs +++ b/src/net/driver/mod.rs @@ -1,5 +1,5 @@ use std::fmt; -use std::io::{self, prelude::*}; +use std::io::{Read as _, Write as _}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -9,6 +9,7 @@ use lazy_static::lazy_static; use mio::{self, Evented}; use slab::Slab; +use crate::io; use crate::task::{Context, Poll, Waker}; use crate::utils::abort_on_panic; @@ -296,7 +297,7 @@ impl fmt::Debug for IoHandle { } } -impl AsyncRead for IoHandle { +impl AsyncRead for IoHandle { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -316,7 +317,7 @@ impl AsyncRead for IoHandle { impl<'a, T: Evented + Unpin> AsyncRead for &'a IoHandle where - &'a T: Read, + &'a T: std::io::Read, { fn poll_read( mut self: Pin<&mut Self>, @@ -335,7 +336,7 @@ where } } -impl AsyncWrite for IoHandle { +impl AsyncWrite for IoHandle { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -371,7 +372,7 @@ impl AsyncWrite for IoHandle { impl<'a, T: Evented + Unpin> AsyncWrite for &'a IoHandle where - &'a T: Write, + &'a T: std::io::Write, { fn poll_write( self: Pin<&mut Self>, diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs new file mode 100644 index 00000000..90f3f638 --- /dev/null +++ b/src/net/tcp/listener.rs @@ -0,0 +1,317 @@ +use std::net::{self, SocketAddr, ToSocketAddrs}; +use std::pin::Pin; + +use cfg_if::cfg_if; +use futures::future; + +use super::TcpStream; +use crate::future::Future; +use crate::io; +use crate::net::driver::IoHandle; +use crate::task::{Context, Poll}; + +/// A TCP socket server, listening for connections. +/// +/// After creating a `TcpListener` by [`bind`]ing it to a socket address, it listens for incoming +/// TCP connections. These can be accepted by awaiting elements from the async stream of +/// [`incoming`] connections. +/// +/// The socket will be closed when the value is dropped. +/// +/// The Transmission Control Protocol is specified in [IETF RFC 793]. +/// +/// This type is an async version of [`std::net::TcpListener`]. +/// +/// [`bind`]: #method.bind +/// [`incoming`]: #method.incoming +/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 +/// [`std::net::TcpListener`]: https://doc.rust-lang.org/std/net/struct.TcpListener.html +/// +/// # Examples +/// +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::{io, net::TcpListener, prelude::*}; +/// +/// let listener = TcpListener::bind("127.0.0.1:8080").await?; +/// let mut incoming = listener.incoming(); +/// +/// while let Some(stream) = incoming.next().await { +/// let stream = stream?; +/// let (reader, writer) = &mut (&stream, &stream); +/// io::copy(reader, writer).await?; +/// } +/// # +/// # Ok(()) }) } +/// ``` +#[derive(Debug)] +pub struct TcpListener { + io_handle: IoHandle, + + #[cfg(unix)] + raw_fd: std::os::unix::io::RawFd, + // #[cfg(windows)] + // raw_socket: std::os::windows::io::RawSocket, +} + +impl TcpListener { + /// Creates a new `TcpListener` which will be bound to the specified address. + /// + /// The returned listener is ready for accepting connections. + /// + /// Binding with a port number of 0 will request that the OS assigns a port to this listener. + /// The port allocated can be queried via the [`local_addr`] method. + /// + /// # Examples + /// Create a TCP listener bound to 127.0.0.1:0: + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::net::TcpListener; + /// + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// # + /// # Ok(()) }) } + /// ``` + /// + /// [`local_addr`]: #method.local_addr + pub async fn bind(addrs: A) -> io::Result { + let mut last_err = None; + + for addr in addrs.to_socket_addrs()? { + match mio::net::TcpListener::bind(&addr) { + Ok(mio_listener) => { + #[cfg(unix)] + let listener = TcpListener { + raw_fd: mio_listener.as_raw_fd(), + io_handle: IoHandle::new(mio_listener), + }; + + #[cfg(windows)] + let listener = TcpListener { + // raw_socket: mio_listener.as_raw_socket(), + io_handle: IoHandle::new(mio_listener), + }; + return Ok(listener); + } + Err(err) => last_err = Some(err), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "could not resolve to any addresses", + ) + })) + } + + /// Accepts a new incoming connection to this listener. + /// + /// When a connection is established, the corresponding stream and address will be returned. + /// + /// ## Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::net::TcpListener; + /// + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// let (stream, addr) = listener.accept().await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { + future::poll_fn(|cx| { + futures::ready!(self.io_handle.poll_readable(cx)?); + + match self.io_handle.get_ref().accept_std() { + Ok((io, addr)) => { + let mio_stream = mio::net::TcpStream::from_stream(io)?; + + #[cfg(unix)] + let stream = TcpStream { + raw_fd: mio_stream.as_raw_fd(), + io_handle: IoHandle::new(mio_stream), + }; + + #[cfg(windows)] + let stream = TcpStream { + // raw_socket: mio_stream.as_raw_socket(), + io_handle: IoHandle::new(mio_stream), + }; + + Poll::Ready(Ok((stream, addr))) + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io_handle.clear_readable(cx)?; + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + }) + .await + } + + /// Returns a stream of incoming connections. + /// + /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of + /// connections is infinite, i.e awaiting the next connection will never result in [`None`]. + /// + /// [`accept`]: #method.accept + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// + /// ## Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{net::TcpListener, prelude::*}; + /// + /// let listener = TcpListener::bind("127.0.0.1:0").await?; + /// let mut incoming = listener.incoming(); + /// + /// while let Some(stream) = incoming.next().await { + /// let mut stream = stream?; + /// stream.write_all(b"hello world").await?; + /// } + /// # + /// # Ok(()) }) } + /// ``` + pub fn incoming(&self) -> Incoming<'_> { + Incoming(self) + } + + /// Returns the local address that this listener is bound to. + /// + /// This can be useful, for example, to identify when binding to port 0 which port was assigned + /// by the OS. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::net::TcpListener; + /// + /// let listener = TcpListener::bind("127.0.0.1:8080").await?; + /// let addr = listener.local_addr()?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn local_addr(&self) -> io::Result { + self.io_handle.get_ref().local_addr() + } +} + +/// A stream of incoming TCP connections. +/// +/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is +/// created by the [`incoming`] method on [`TcpListener`]. +/// +/// This type is an async version of [`std::net::Incoming`]. +/// +/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None +/// [`incoming`]: struct.TcpListener.html#method.incoming +/// [`TcpListener`]: struct.TcpListener.html +/// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html +#[derive(Debug)] +pub struct Incoming<'a>(&'a TcpListener); + +impl<'a> futures::Stream for Incoming<'a> { + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let future = self.0.accept(); + pin_utils::pin_mut!(future); + + let (socket, _) = futures::ready!(future.poll(cx))?; + Poll::Ready(Some(Ok(socket))) + } +} + +impl From for TcpListener { + /// Converts a `std::net::TcpListener` into its asynchronous equivalent. + fn from(listener: net::TcpListener) -> TcpListener { + let mio_listener = mio::net::TcpListener::from_std(listener).unwrap(); + + #[cfg(unix)] + let listener = TcpListener { + raw_fd: mio_listener.as_raw_fd(), + io_handle: IoHandle::new(mio_listener), + }; + + #[cfg(windows)] + let listener = TcpListener { + // raw_socket: mio_listener.as_raw_socket(), + io_handle: IoHandle::new(mio_listener), + }; + + listener + } +} + +cfg_if! { + if #[cfg(feature = "docs.rs")] { + use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; + // use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}; + } else if #[cfg(unix)] { + use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; + } else if #[cfg(windows)] { + // use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle}; + } +} + +#[cfg_attr(feature = "docs.rs", doc(cfg(unix)))] +cfg_if! { + if #[cfg(any(unix, feature = "docs.rs"))] { + impl AsRawFd for TcpListener { + fn as_raw_fd(&self) -> RawFd { + self.raw_fd + } + } + + impl FromRawFd for TcpListener { + unsafe fn from_raw_fd(fd: RawFd) -> TcpListener { + net::TcpListener::from_raw_fd(fd).into() + } + } + + impl IntoRawFd for TcpListener { + fn into_raw_fd(self) -> RawFd { + self.raw_fd + } + } + } +} + +#[cfg_attr(feature = "docs.rs", doc(cfg(windows)))] +cfg_if! { + if #[cfg(any(windows, feature = "docs.rs"))] { + // impl AsRawSocket for TcpListener { + // fn as_raw_socket(&self) -> RawSocket { + // self.raw_socket + // } + // } + // + // impl FromRawSocket for TcpListener { + // unsafe fn from_raw_socket(handle: RawSocket) -> TcpListener { + // net::TcpListener::from_raw_socket(handle).try_into().unwrap() + // } + // } + // + // impl IntoRawSocket for TcpListener { + // fn into_raw_socket(self) -> RawSocket { + // self.raw_socket + // } + // } + } +} diff --git a/src/net/tcp/mod.rs b/src/net/tcp/mod.rs new file mode 100644 index 00000000..71454405 --- /dev/null +++ b/src/net/tcp/mod.rs @@ -0,0 +1,5 @@ +pub use listener::{Incoming, TcpListener}; +pub use stream::TcpStream; + +mod listener; +mod stream; diff --git a/src/net/tcp.rs b/src/net/tcp/stream.rs similarity index 63% rename from src/net/tcp.rs rename to src/net/tcp/stream.rs index c1843331..a063caeb 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp/stream.rs @@ -1,4 +1,4 @@ -use std::io::{self, IoSlice, IoSliceMut}; +use std::io::{IoSlice, IoSliceMut}; use std::mem; use std::net::{self, SocketAddr, ToSocketAddrs}; use std::pin::Pin; @@ -6,7 +6,7 @@ use std::pin::Pin; use cfg_if::cfg_if; use futures::future; -use crate::future::Future; +use crate::io; use crate::net::driver::IoHandle; use crate::task::{Context, Poll}; @@ -48,12 +48,12 @@ use crate::task::{Context, Poll}; /// ``` #[derive(Debug)] pub struct TcpStream { - io_handle: IoHandle, + pub(super) io_handle: IoHandle, #[cfg(unix)] - raw_fd: std::os::unix::io::RawFd, + pub(super) raw_fd: std::os::unix::io::RawFd, // #[cfg(windows)] - // raw_socket: std::os::windows::io::RawSocket, + // pub(super) raw_socket: std::os::windows::io::RawSocket, } impl TcpStream { @@ -441,234 +441,6 @@ impl futures::io::AsyncWrite for &TcpStream { } } -/// A TCP socket server, listening for connections. -/// -/// After creating a `TcpListener` by [`bind`]ing it to a socket address, it listens for incoming -/// TCP connections. These can be accepted by awaiting elements from the async stream of -/// [`incoming`] connections. -/// -/// The socket will be closed when the value is dropped. -/// -/// The Transmission Control Protocol is specified in [IETF RFC 793]. -/// -/// This type is an async version of [`std::net::TcpListener`]. -/// -/// [`bind`]: #method.bind -/// [`incoming`]: #method.incoming -/// [IETF RFC 793]: https://tools.ietf.org/html/rfc793 -/// [`std::net::TcpListener`]: https://doc.rust-lang.org/std/net/struct.TcpListener.html -/// -/// # Examples -/// -/// ```no_run -/// # #![feature(async_await)] -/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { -/// # -/// use async_std::{io, net::TcpListener, prelude::*}; -/// -/// let listener = TcpListener::bind("127.0.0.1:8080").await?; -/// let mut incoming = listener.incoming(); -/// -/// while let Some(stream) = incoming.next().await { -/// let stream = stream?; -/// let (reader, writer) = &mut (&stream, &stream); -/// io::copy(reader, writer).await?; -/// } -/// # -/// # Ok(()) }) } -/// ``` -#[derive(Debug)] -pub struct TcpListener { - io_handle: IoHandle, - - #[cfg(unix)] - raw_fd: std::os::unix::io::RawFd, - // #[cfg(windows)] - // raw_socket: std::os::windows::io::RawSocket, -} - -impl TcpListener { - /// Creates a new `TcpListener` which will be bound to the specified address. - /// - /// The returned listener is ready for accepting connections. - /// - /// Binding with a port number of 0 will request that the OS assigns a port to this listener. - /// The port allocated can be queried via the [`local_addr`] method. - /// - /// # Examples - /// Create a TCP listener bound to 127.0.0.1:0: - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::net::TcpListener; - /// - /// let listener = TcpListener::bind("127.0.0.1:0").await?; - /// # - /// # Ok(()) }) } - /// ``` - /// - /// [`local_addr`]: #method.local_addr - pub async fn bind(addrs: A) -> io::Result { - let mut last_err = None; - - for addr in addrs.to_socket_addrs()? { - match mio::net::TcpListener::bind(&addr) { - Ok(mio_listener) => { - #[cfg(unix)] - let listener = TcpListener { - raw_fd: mio_listener.as_raw_fd(), - io_handle: IoHandle::new(mio_listener), - }; - - #[cfg(windows)] - let listener = TcpListener { - // raw_socket: mio_listener.as_raw_socket(), - io_handle: IoHandle::new(mio_listener), - }; - return Ok(listener); - } - Err(err) => last_err = Some(err), - } - } - - Err(last_err.unwrap_or_else(|| { - io::Error::new( - io::ErrorKind::InvalidInput, - "could not resolve to any addresses", - ) - })) - } - - /// Accepts a new incoming connection to this listener. - /// - /// When a connection is established, the corresponding stream and address will be returned. - /// - /// ## Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::net::TcpListener; - /// - /// let listener = TcpListener::bind("127.0.0.1:0").await?; - /// let (stream, addr) = listener.accept().await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { - future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_readable(cx)?); - - match self.io_handle.get_ref().accept_std() { - Ok((io, addr)) => { - let mio_stream = mio::net::TcpStream::from_stream(io)?; - - #[cfg(unix)] - let stream = TcpStream { - raw_fd: mio_stream.as_raw_fd(), - io_handle: IoHandle::new(mio_stream), - }; - - #[cfg(windows)] - let stream = TcpStream { - // raw_socket: mio_stream.as_raw_socket(), - io_handle: IoHandle::new(mio_stream), - }; - - Poll::Ready(Ok((stream, addr))) - } - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io_handle.clear_readable(cx)?; - Poll::Pending - } - Err(err) => Poll::Ready(Err(err)), - } - }) - .await - } - - /// Returns a stream of incoming connections. - /// - /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of - /// connections is infinite, i.e awaiting the next connection will never result in [`None`]. - /// - /// [`accept`]: #method.accept - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// - /// ## Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::{net::TcpListener, prelude::*}; - /// - /// let listener = TcpListener::bind("127.0.0.1:0").await?; - /// let mut incoming = listener.incoming(); - /// - /// while let Some(stream) = incoming.next().await { - /// let mut stream = stream?; - /// stream.write_all(b"hello world").await?; - /// } - /// # - /// # Ok(()) }) } - /// ``` - pub fn incoming(&self) -> Incoming<'_> { - Incoming(self) - } - - /// Returns the local address that this listener is bound to. - /// - /// This can be useful, for example, to identify when binding to port 0 which port was assigned - /// by the OS. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::net::TcpListener; - /// - /// let listener = TcpListener::bind("127.0.0.1:8080").await?; - /// let addr = listener.local_addr()?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn local_addr(&self) -> io::Result { - self.io_handle.get_ref().local_addr() - } -} - -/// A stream of incoming TCP connections. -/// -/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is -/// created by the [`incoming`] method on [`TcpListener`]. -/// -/// This type is an async version of [`std::net::Incoming`]. -/// -/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None -/// [`incoming`]: struct.TcpListener.html#method.incoming -/// [`TcpListener`]: struct.TcpListener.html -/// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html -#[derive(Debug)] -pub struct Incoming<'a>(&'a TcpListener); - -impl<'a> futures::Stream for Incoming<'a> { - type Item = io::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let future = self.0.accept(); - pin_utils::pin_mut!(future); - - let (socket, _) = futures::ready!(future.poll(cx))?; - Poll::Ready(Some(Ok(socket))) - } -} - impl From for TcpStream { /// Converts a `std::net::TcpStream` into its asynchronous equivalent. fn from(stream: net::TcpStream) -> TcpStream { @@ -690,27 +462,6 @@ impl From for TcpStream { } } -impl From for TcpListener { - /// Converts a `std::net::TcpListener` into its asynchronous equivalent. - fn from(listener: net::TcpListener) -> TcpListener { - let mio_listener = mio::net::TcpListener::from_std(listener).unwrap(); - - #[cfg(unix)] - let listener = TcpListener { - raw_fd: mio_listener.as_raw_fd(), - io_handle: IoHandle::new(mio_listener), - }; - - #[cfg(windows)] - let listener = TcpListener { - // raw_socket: mio_listener.as_raw_socket(), - io_handle: IoHandle::new(mio_listener), - }; - - listener - } -} - cfg_if! { if #[cfg(feature = "docs.rs")] { use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -725,24 +476,6 @@ cfg_if! { #[cfg_attr(feature = "docs.rs", doc(cfg(unix)))] cfg_if! { if #[cfg(any(unix, feature = "docs.rs"))] { - impl AsRawFd for TcpListener { - fn as_raw_fd(&self) -> RawFd { - self.raw_fd - } - } - - impl FromRawFd for TcpListener { - unsafe fn from_raw_fd(fd: RawFd) -> TcpListener { - net::TcpListener::from_raw_fd(fd).into() - } - } - - impl IntoRawFd for TcpListener { - fn into_raw_fd(self) -> RawFd { - self.raw_fd - } - } - impl AsRawFd for TcpStream { fn as_raw_fd(&self) -> RawFd { self.raw_fd @@ -766,24 +499,6 @@ cfg_if! { #[cfg_attr(feature = "docs.rs", doc(cfg(windows)))] cfg_if! { if #[cfg(any(windows, feature = "docs.rs"))] { - // impl AsRawSocket for TcpListener { - // fn as_raw_socket(&self) -> RawSocket { - // self.raw_socket - // } - // } - // - // impl FromRawSocket for TcpListener { - // unsafe fn from_raw_socket(handle: RawSocket) -> TcpListener { - // net::TcpListener::from_raw_socket(handle).try_into().unwrap() - // } - // } - // - // impl IntoRawSocket for TcpListener { - // fn into_raw_socket(self) -> RawSocket { - // self.raw_socket - // } - // } - // // impl AsRawSocket for TcpStream { // fn as_raw_socket(&self) -> RawSocket { // self.raw_socket diff --git a/src/net/udp.rs b/src/net/udp/mod.rs similarity index 100% rename from src/net/udp.rs rename to src/net/udp/mod.rs diff --git a/src/os/unix/fs.rs b/src/os/unix/fs.rs index e58f7a3d..025760a7 100644 --- a/src/os/unix/fs.rs +++ b/src/os/unix/fs.rs @@ -1,10 +1,10 @@ //! Unix-specific filesystem extensions. -use std::io; use std::path::Path; use cfg_if::cfg_if; +use crate::io; use crate::task::blocking; /// Creates a new symbolic link on the filesystem. diff --git a/src/os/unix/net.rs b/src/os/unix/net.rs deleted file mode 100644 index 6b6810c1..00000000 --- a/src/os/unix/net.rs +++ /dev/null @@ -1,1007 +0,0 @@ -//! Unix-specific networking extensions. - -use std::fmt; -use std::io; -use std::mem; -use std::net::Shutdown; -use std::path::Path; -use std::pin::Pin; - -use cfg_if::cfg_if; -use futures::future; -use mio_uds; - -use crate::future::Future; -use crate::net::driver::IoHandle; -use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -use crate::task::{blocking, Context, Poll}; - -/// A Unix datagram socket. -/// -/// After creating a `UnixDatagram` by [`bind`]ing it to a path, data can be [sent to] and -/// [received from] any other socket address. -/// -/// This type is an async version of [`std::os::unix::net::UnixDatagram`]. -/// -/// [`std::os::unix::net::UnixDatagram`]: -/// https://doc.rust-lang.org/std/os/unix/net/struct.UnixDatagram.html -/// [`bind`]: #method.bind -/// [received from]: #method.recv_from -/// [sent to]: #method.send_to -/// -/// ## Examples -/// -/// ```no_run -/// # #![feature(async_await)] -/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { -/// # -/// use async_std::os::unix::net::UnixDatagram; -/// -/// let socket = UnixDatagram::bind("/tmp/socket1").await?; -/// socket.send_to(b"hello world", "/tmp/socket2").await?; -/// -/// let mut buf = vec![0u8; 1024]; -/// let (n, peer) = socket.recv_from(&mut buf).await?; -/// # -/// # Ok(()) }) } -/// ``` -pub struct UnixDatagram { - #[cfg(not(feature = "docs.rs"))] - io_handle: IoHandle, - - raw_fd: RawFd, -} - -impl UnixDatagram { - #[cfg(not(feature = "docs.rs"))] - fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram { - UnixDatagram { - raw_fd: socket.as_raw_fd(), - io_handle: IoHandle::new(socket), - } - } - - /// Creates a Unix datagram socket bound to the given path. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let socket = UnixDatagram::bind("/tmp/socket").await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn bind>(path: P) -> io::Result { - let path = path.as_ref().to_owned(); - let socket = blocking::spawn(async move { mio_uds::UnixDatagram::bind(path) }).await?; - Ok(UnixDatagram::new(socket)) - } - - /// Creates a Unix datagram which is not bound to any address. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let socket = UnixDatagram::unbound()?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn unbound() -> io::Result { - let socket = mio_uds::UnixDatagram::unbound()?; - Ok(UnixDatagram::new(socket)) - } - - /// Creates an unnamed pair of connected sockets. - /// - /// Returns two sockets which are connected to each other. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let (socket1, socket2) = UnixDatagram::pair()?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { - let (a, b) = mio_uds::UnixDatagram::pair()?; - let a = UnixDatagram::new(a); - let b = UnixDatagram::new(b); - Ok((a, b)) - } - - /// Connects the socket to the specified address. - /// - /// The [`send`] method may be used to send data to the specified address. [`recv`] and - /// [`recv_from`] will only receive data from that address. - /// - /// [`send`]: #method.send - /// [`recv`]: #method.recv - /// [`recv_from`]: #method.recv_from - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let socket = UnixDatagram::unbound()?; - /// socket.connect("/tmp/socket").await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn connect>(&self, path: P) -> io::Result<()> { - // TODO(stjepang): Connect the socket on a blocking pool. - let p = path.as_ref(); - self.io_handle.get_ref().connect(p) - } - - /// Returns the address of this socket. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let socket = UnixDatagram::bind("/tmp/socket").await?; - /// let addr = socket.local_addr()?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn local_addr(&self) -> io::Result { - self.io_handle.get_ref().local_addr() - } - - /// Returns the address of this socket's peer. - /// - /// The [`connect`] method will connect the socket to a peer. - /// - /// [`connect`]: #method.connect - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let mut socket = UnixDatagram::unbound()?; - /// socket.connect("/tmp/socket").await?; - /// let peer = socket.peer_addr()?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn peer_addr(&self) -> io::Result { - self.io_handle.get_ref().peer_addr() - } - - /// Receives data from the socket. - /// - /// On success, returns the number of bytes read and the address from where the data came. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let mut socket = UnixDatagram::unbound()?; - /// let mut buf = vec![0; 1024]; - /// let (n, peer) = socket.recv_from(&mut buf).await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_readable(cx)?); - - match self.io_handle.get_ref().recv_from(buf) { - Ok(n) => Poll::Ready(Ok(n)), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io_handle.clear_readable(cx)?; - Poll::Pending - } - Err(err) => Poll::Ready(Err(err)), - } - }) - .await - } - - /// Receives data from the socket. - /// - /// On success, returns the number of bytes read. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let socket = UnixDatagram::bind("/tmp/socket").await?; - /// let mut buf = vec![0; 1024]; - /// let n = socket.recv(&mut buf).await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn recv(&self, buf: &mut [u8]) -> io::Result { - future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_writable(cx)?); - - match self.io_handle.get_ref().recv(buf) { - Ok(n) => Poll::Ready(Ok(n)), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io_handle.clear_writable(cx)?; - Poll::Pending - } - Err(err) => Poll::Ready(Err(err)), - } - }) - .await - } - - /// Sends data on the socket to the specified address. - /// - /// On success, returns the number of bytes written. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let mut socket = UnixDatagram::unbound()?; - /// socket.send_to(b"hello world", "/tmp/socket").await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn send_to>(&self, buf: &[u8], path: P) -> io::Result { - future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_writable(cx)?); - - match self.io_handle.get_ref().send_to(buf, path.as_ref()) { - Ok(n) => Poll::Ready(Ok(n)), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io_handle.clear_writable(cx)?; - Poll::Pending - } - Err(err) => Poll::Ready(Err(err)), - } - }) - .await - } - - /// Sends data on the socket to the socket's peer. - /// - /// On success, returns the number of bytes written. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let mut socket = UnixDatagram::unbound()?; - /// socket.connect("/tmp/socket").await?; - /// socket.send(b"hello world").await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn send(&self, buf: &[u8]) -> io::Result { - future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_writable(cx)?); - - match self.io_handle.get_ref().send(buf) { - Ok(n) => Poll::Ready(Ok(n)), - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io_handle.clear_writable(cx)?; - Poll::Pending - } - Err(err) => Poll::Ready(Err(err)), - } - }) - .await - } - - /// Shut down the read, write, or both halves of this connection. - /// - /// This function will cause all pending and future I/O calls on the specified portions to - /// immediately return with an appropriate value (see the documentation of [`Shutdown`]). - /// - /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html - /// - /// ## Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixDatagram; - /// use std::net::Shutdown; - /// - /// let socket = UnixDatagram::unbound()?; - /// socket.shutdown(Shutdown::Both)?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.io_handle.get_ref().shutdown(how) - } -} - -impl fmt::Debug for UnixDatagram { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut builder = f.debug_struct("UnixDatagram"); - builder.field("fd", &self.as_raw_fd()); - - if let Ok(addr) = self.local_addr() { - builder.field("local", &addr); - } - - if let Ok(addr) = self.peer_addr() { - builder.field("peer", &addr); - } - - builder.finish() - } -} - -/// A Unix domain socket server, listening for connections. -/// -/// After creating a `UnixListener` by [`bind`]ing it to a socket address, it listens for incoming -/// connections. These can be accepted by awaiting elements from the async stream of [`incoming`] -/// connections. -/// -/// The socket will be closed when the value is dropped. -/// -/// This type is an async version of [`std::os::unix::net::UnixListener`]. -/// -/// [`std::os::unix::net::UnixListener`]: -/// https://doc.rust-lang.org/std/os/unix/net/struct.UnixListener.html -/// [`bind`]: #method.bind -/// [`incoming`]: #method.incoming -/// -/// # Examples -/// -/// ```no_run -/// # #![feature(async_await)] -/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { -/// # -/// use async_std::os::unix::net::UnixListener; -/// use async_std::prelude::*; -/// -/// let listener = UnixListener::bind("/tmp/socket").await?; -/// let mut incoming = listener.incoming(); -/// -/// while let Some(stream) = incoming.next().await { -/// let mut stream = stream?; -/// stream.write_all(b"hello world").await?; -/// } -/// # -/// # Ok(()) }) } -/// ``` -pub struct UnixListener { - #[cfg(not(feature = "docs.rs"))] - io_handle: IoHandle, - - raw_fd: RawFd, -} - -impl UnixListener { - /// Creates a Unix datagram listener bound to the given path. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixListener; - /// - /// let listener = UnixListener::bind("/tmp/socket").await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn bind>(path: P) -> io::Result { - let path = path.as_ref().to_owned(); - let listener = blocking::spawn(async move { mio_uds::UnixListener::bind(path) }).await?; - - Ok(UnixListener { - raw_fd: listener.as_raw_fd(), - io_handle: IoHandle::new(listener), - }) - } - - /// Accepts a new incoming connection to this listener. - /// - /// When a connection is established, the corresponding stream and address will be returned. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixListener; - /// - /// let listener = UnixListener::bind("/tmp/socket").await?; - /// let (socket, addr) = listener.accept().await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { - future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_readable(cx)?); - - match self.io_handle.get_ref().accept_std() { - Ok(Some((io, addr))) => { - let mio_stream = mio_uds::UnixStream::from_stream(io)?; - let stream = UnixStream { - raw_fd: mio_stream.as_raw_fd(), - io_handle: IoHandle::new(mio_stream), - }; - Poll::Ready(Ok((stream, addr))) - } - Ok(None) => { - self.io_handle.clear_readable(cx)?; - Poll::Pending - } - Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { - self.io_handle.clear_readable(cx)?; - Poll::Pending - } - Err(err) => Poll::Ready(Err(err)), - } - }) - .await - } - - /// Returns a stream of incoming connections. - /// - /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of - /// connections is infinite, i.e awaiting the next connection will never result in [`None`]. - /// - /// [`accept`]: #method.accept - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixListener; - /// use async_std::prelude::*; - /// - /// let listener = UnixListener::bind("/tmp/socket").await?; - /// let mut incoming = listener.incoming(); - /// - /// while let Some(stream) = incoming.next().await { - /// let mut stream = stream?; - /// stream.write_all(b"hello world").await?; - /// } - /// # - /// # Ok(()) }) } - /// ``` - pub fn incoming(&self) -> Incoming<'_> { - Incoming(self) - } - - /// Returns the local socket address of this listener. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixListener; - /// - /// let listener = UnixListener::bind("/tmp/socket").await?; - /// let addr = listener.local_addr()?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn local_addr(&self) -> io::Result { - self.io_handle.get_ref().local_addr() - } -} - -impl fmt::Debug for UnixListener { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut builder = f.debug_struct("UnixListener"); - builder.field("fd", &self.as_raw_fd()); - - if let Ok(addr) = self.local_addr() { - builder.field("local", &addr); - } - - builder.finish() - } -} - -/// A stream of incoming Unix domain socket connections. -/// -/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is -/// created by the [`incoming`] method on [`UnixListener`]. -/// -/// This type is an async version of [`std::os::unix::net::Incoming`]. -/// -/// [`std::os::unix::net::Incoming`]: https://doc.rust-lang.org/std/os/unix/net/struct.Incoming.html -/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None -/// [`incoming`]: struct.UnixListener.html#method.incoming -/// [`UnixListener`]: struct.UnixListener.html -#[derive(Debug)] -pub struct Incoming<'a>(&'a UnixListener); - -impl futures::Stream for Incoming<'_> { - type Item = io::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let future = self.0.accept(); - futures::pin_mut!(future); - - let (socket, _) = futures::ready!(future.poll(cx))?; - Poll::Ready(Some(Ok(socket))) - } -} - -/// A Unix stream socket. -/// -/// This type is an async version of [`std::os::unix::net::UnixStream`]. -/// -/// [`std::os::unix::net::UnixStream`]: -/// https://doc.rust-lang.org/std/os/unix/net/struct.UnixStream.html -/// -/// # Examples -/// -/// ```no_run -/// # #![feature(async_await)] -/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { -/// # -/// use async_std::os::unix::net::UnixStream; -/// use async_std::prelude::*; -/// -/// let mut stream = UnixStream::connect("/tmp/socket").await?; -/// stream.write_all(b"hello world").await?; -/// -/// let mut response = Vec::new(); -/// stream.read_to_end(&mut response).await?; -/// # -/// # Ok(()) }) } -/// ``` -pub struct UnixStream { - #[cfg(not(feature = "docs.rs"))] - io_handle: IoHandle, - - raw_fd: RawFd, -} - -impl UnixStream { - /// Connects to the socket to the specified address. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixStream; - /// - /// let stream = UnixStream::connect("/tmp/socket").await?; - /// # - /// # Ok(()) }) } - /// ``` - pub async fn connect>(path: P) -> io::Result { - enum State { - Waiting(UnixStream), - Error(io::Error), - Done, - } - - let path = path.as_ref().to_owned(); - let mut state = { - match blocking::spawn(async move { mio_uds::UnixStream::connect(path) }).await { - Ok(mio_stream) => State::Waiting(UnixStream { - raw_fd: mio_stream.as_raw_fd(), - io_handle: IoHandle::new(mio_stream), - }), - Err(err) => State::Error(err), - } - }; - - future::poll_fn(|cx| { - match &mut state { - State::Waiting(stream) => { - futures::ready!(stream.io_handle.poll_writable(cx)?); - - if let Some(err) = stream.io_handle.get_ref().take_error()? { - return Poll::Ready(Err(err)); - } - } - State::Error(_) => { - let err = match mem::replace(&mut state, State::Done) { - State::Error(err) => err, - _ => unreachable!(), - }; - - return Poll::Ready(Err(err)); - } - State::Done => panic!("`UnixStream::connect()` future polled after completion"), - } - - match mem::replace(&mut state, State::Done) { - State::Waiting(stream) => Poll::Ready(Ok(stream)), - _ => unreachable!(), - } - }) - .await - } - - /// Creates an unnamed pair of connected sockets. - /// - /// Returns two streams which are connected to each other. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixStream; - /// - /// let stream = UnixStream::pair()?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn pair() -> io::Result<(UnixStream, UnixStream)> { - let (a, b) = mio_uds::UnixStream::pair()?; - let a = UnixStream { - raw_fd: a.as_raw_fd(), - io_handle: IoHandle::new(a), - }; - let b = UnixStream { - raw_fd: b.as_raw_fd(), - io_handle: IoHandle::new(b), - }; - Ok((a, b)) - } - - /// Returns the socket address of the local half of this connection. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixStream; - /// - /// let stream = UnixStream::connect("/tmp/socket").await?; - /// let addr = stream.local_addr()?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn local_addr(&self) -> io::Result { - self.io_handle.get_ref().local_addr() - } - - /// Returns the socket address of the remote half of this connection. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixStream; - /// - /// let stream = UnixStream::connect("/tmp/socket").await?; - /// let peer = stream.peer_addr()?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn peer_addr(&self) -> io::Result { - self.io_handle.get_ref().peer_addr() - } - - /// Shuts down the read, write, or both halves of this connection. - /// - /// This function will cause all pending and future I/O calls on the specified portions to - /// immediately return with an appropriate value (see the documentation of [`Shutdown`]). - /// - /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::os::unix::net::UnixStream; - /// use std::net::Shutdown; - /// - /// let stream = UnixStream::connect("/tmp/socket").await?; - /// stream.shutdown(Shutdown::Both)?; - /// # - /// # Ok(()) }) } - /// ``` - pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { - self.io_handle.get_ref().shutdown(how) - } -} - -impl futures::io::AsyncRead for UnixStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut &*self).poll_read(cx, buf) - } -} - -impl futures::io::AsyncRead for &UnixStream { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut &self.io_handle).poll_read(cx, buf) - } -} - -impl futures::io::AsyncWrite for UnixStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut &*self).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &*self).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &*self).poll_close(cx) - } -} - -impl futures::io::AsyncWrite for &UnixStream { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut &self.io_handle).poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &self.io_handle).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut &self.io_handle).poll_close(cx) - } -} - -impl fmt::Debug for UnixStream { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let mut builder = f.debug_struct("UnixStream"); - builder.field("fd", &self.as_raw_fd()); - - if let Ok(addr) = self.local_addr() { - builder.field("local", &addr); - } - - if let Ok(addr) = self.peer_addr() { - builder.field("peer", &addr); - } - - builder.finish() - } -} - -#[cfg(unix)] -impl From for UnixStream { - /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent. - fn from(stream: std::os::unix::net::UnixStream) -> UnixStream { - let mio_stream = mio_uds::UnixStream::from_stream(stream).unwrap(); - UnixStream { - raw_fd: mio_stream.as_raw_fd(), - io_handle: IoHandle::new(mio_stream), - } - } -} - -#[cfg(unix)] -impl From for UnixDatagram { - /// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent. - fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram { - let mio_datagram = mio_uds::UnixDatagram::from_datagram(datagram).unwrap(); - UnixDatagram { - raw_fd: mio_datagram.as_raw_fd(), - io_handle: IoHandle::new(mio_datagram), - } - } -} - -#[cfg(unix)] -impl From for UnixListener { - /// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent. - fn from(listener: std::os::unix::net::UnixListener) -> UnixListener { - let mio_listener = mio_uds::UnixListener::from_listener(listener).unwrap(); - UnixListener { - raw_fd: mio_listener.as_raw_fd(), - io_handle: IoHandle::new(mio_listener), - } - } -} - -impl AsRawFd for UnixListener { - fn as_raw_fd(&self) -> RawFd { - self.raw_fd - } -} - -impl FromRawFd for UnixListener { - unsafe fn from_raw_fd(fd: RawFd) -> UnixListener { - let listener = std::os::unix::net::UnixListener::from_raw_fd(fd); - listener.into() - } -} - -impl IntoRawFd for UnixListener { - fn into_raw_fd(self) -> RawFd { - self.raw_fd - } -} - -impl AsRawFd for UnixStream { - fn as_raw_fd(&self) -> RawFd { - self.raw_fd - } -} - -impl FromRawFd for UnixStream { - unsafe fn from_raw_fd(fd: RawFd) -> UnixStream { - let stream = std::os::unix::net::UnixStream::from_raw_fd(fd); - stream.into() - } -} - -impl IntoRawFd for UnixStream { - fn into_raw_fd(self) -> RawFd { - self.raw_fd - } -} - -impl AsRawFd for UnixDatagram { - fn as_raw_fd(&self) -> RawFd { - self.raw_fd - } -} - -impl FromRawFd for UnixDatagram { - unsafe fn from_raw_fd(fd: RawFd) -> UnixDatagram { - let datagram = std::os::unix::net::UnixDatagram::from_raw_fd(fd); - datagram.into() - } -} - -impl IntoRawFd for UnixDatagram { - fn into_raw_fd(self) -> RawFd { - self.raw_fd - } -} - -cfg_if! { - if #[cfg(feature = "docs.rs")] { - /// An address associated with a Unix socket. - /// - /// # Examples - /// - /// ``` - /// use async_std::os::unix::net::UnixListener; - /// - /// let socket = UnixListener::bind("/tmp/socket").await?; - /// let addr = socket.local_addr()?; - /// ``` - #[derive(Clone)] - pub struct SocketAddr { - _private: (), - } - - impl SocketAddr { - /// Returns `true` if the address is unnamed. - /// - /// # Examples - /// - /// A named address: - /// - /// ```no_run - /// use async_std::os::unix::net::UnixListener; - /// - /// let socket = UnixListener::bind("/tmp/socket").await?; - /// let addr = socket.local_addr()?; - /// assert_eq!(addr.is_unnamed(), false); - /// ``` - /// - /// An unnamed address: - /// - /// ```no_run - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let socket = UnixDatagram::unbound().await?; - /// let addr = socket.local_addr()?; - /// assert_eq!(addr.is_unnamed(), true); - /// ``` - pub fn is_unnamed(&self) -> bool { - unreachable!() - } - - /// Returns the contents of this address if it is a `pathname` address. - /// - /// # Examples - /// - /// With a pathname: - /// - /// ```no_run - /// use async_std::os::unix::net::UnixListener; - /// use std::path::Path; - /// - /// let socket = UnixListener::bind("/tmp/socket").await?; - /// let addr = socket.local_addr()?; - /// assert_eq!(addr.as_pathname(), Some(Path::new("/tmp/socket"))); - /// ``` - /// - /// Without a pathname: - /// - /// ``` - /// use async_std::os::unix::net::UnixDatagram; - /// - /// let socket = UnixDatagram::unbound()?; - /// let addr = socket.local_addr()?; - /// assert_eq!(addr.as_pathname(), None); - /// ``` - pub fn as_pathname(&self) -> Option<&Path> { - unreachable!() - } - } - - impl fmt::Debug for SocketAddr { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - unreachable!() - } - } - } else { - #[doc(inline)] - pub use std::os::unix::net::SocketAddr; - } -} diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs new file mode 100644 index 00000000..468f875a --- /dev/null +++ b/src/os/unix/net/datagram.rs @@ -0,0 +1,400 @@ +//! Unix-specific networking extensions. + +use std::fmt; +use std::net::Shutdown; +use std::path::Path; + +use futures::future; +use mio_uds; + +use super::SocketAddr; +use crate::io; +use crate::net::driver::IoHandle; +use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use crate::task::{blocking, Poll}; + +/// A Unix datagram socket. +/// +/// After creating a `UnixDatagram` by [`bind`]ing it to a path, data can be [sent to] and +/// [received from] any other socket address. +/// +/// This type is an async version of [`std::os::unix::net::UnixDatagram`]. +/// +/// [`std::os::unix::net::UnixDatagram`]: +/// https://doc.rust-lang.org/std/os/unix/net/struct.UnixDatagram.html +/// [`bind`]: #method.bind +/// [received from]: #method.recv_from +/// [sent to]: #method.send_to +/// +/// ## Examples +/// +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::os::unix::net::UnixDatagram; +/// +/// let socket = UnixDatagram::bind("/tmp/socket1").await?; +/// socket.send_to(b"hello world", "/tmp/socket2").await?; +/// +/// let mut buf = vec![0u8; 1024]; +/// let (n, peer) = socket.recv_from(&mut buf).await?; +/// # +/// # Ok(()) }) } +/// ``` +pub struct UnixDatagram { + #[cfg(not(feature = "docs.rs"))] + io_handle: IoHandle, + + raw_fd: RawFd, +} + +impl UnixDatagram { + #[cfg(not(feature = "docs.rs"))] + fn new(socket: mio_uds::UnixDatagram) -> UnixDatagram { + UnixDatagram { + raw_fd: socket.as_raw_fd(), + io_handle: IoHandle::new(socket), + } + } + + /// Creates a Unix datagram socket bound to the given path. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let socket = UnixDatagram::bind("/tmp/socket").await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn bind>(path: P) -> io::Result { + let path = path.as_ref().to_owned(); + let socket = blocking::spawn(async move { mio_uds::UnixDatagram::bind(path) }).await?; + Ok(UnixDatagram::new(socket)) + } + + /// Creates a Unix datagram which is not bound to any address. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let socket = UnixDatagram::unbound()?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn unbound() -> io::Result { + let socket = mio_uds::UnixDatagram::unbound()?; + Ok(UnixDatagram::new(socket)) + } + + /// Creates an unnamed pair of connected sockets. + /// + /// Returns two sockets which are connected to each other. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let (socket1, socket2) = UnixDatagram::pair()?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn pair() -> io::Result<(UnixDatagram, UnixDatagram)> { + let (a, b) = mio_uds::UnixDatagram::pair()?; + let a = UnixDatagram::new(a); + let b = UnixDatagram::new(b); + Ok((a, b)) + } + + /// Connects the socket to the specified address. + /// + /// The [`send`] method may be used to send data to the specified address. [`recv`] and + /// [`recv_from`] will only receive data from that address. + /// + /// [`send`]: #method.send + /// [`recv`]: #method.recv + /// [`recv_from`]: #method.recv_from + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket").await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn connect>(&self, path: P) -> io::Result<()> { + // TODO(stjepang): Connect the socket on a blocking pool. + let p = path.as_ref(); + self.io_handle.get_ref().connect(p) + } + + /// Returns the address of this socket. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let socket = UnixDatagram::bind("/tmp/socket").await?; + /// let addr = socket.local_addr()?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn local_addr(&self) -> io::Result { + self.io_handle.get_ref().local_addr() + } + + /// Returns the address of this socket's peer. + /// + /// The [`connect`] method will connect the socket to a peer. + /// + /// [`connect`]: #method.connect + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let mut socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket").await?; + /// let peer = socket.peer_addr()?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn peer_addr(&self) -> io::Result { + self.io_handle.get_ref().peer_addr() + } + + /// Receives data from the socket. + /// + /// On success, returns the number of bytes read and the address from where the data came. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let mut socket = UnixDatagram::unbound()?; + /// let mut buf = vec![0; 1024]; + /// let (n, peer) = socket.recv_from(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { + future::poll_fn(|cx| { + futures::ready!(self.io_handle.poll_readable(cx)?); + + match self.io_handle.get_ref().recv_from(buf) { + Ok(n) => Poll::Ready(Ok(n)), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io_handle.clear_readable(cx)?; + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + }) + .await + } + + /// Receives data from the socket. + /// + /// On success, returns the number of bytes read. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let socket = UnixDatagram::bind("/tmp/socket").await?; + /// let mut buf = vec![0; 1024]; + /// let n = socket.recv(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn recv(&self, buf: &mut [u8]) -> io::Result { + future::poll_fn(|cx| { + futures::ready!(self.io_handle.poll_writable(cx)?); + + match self.io_handle.get_ref().recv(buf) { + Ok(n) => Poll::Ready(Ok(n)), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io_handle.clear_writable(cx)?; + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + }) + .await + } + + /// Sends data on the socket to the specified address. + /// + /// On success, returns the number of bytes written. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let mut socket = UnixDatagram::unbound()?; + /// socket.send_to(b"hello world", "/tmp/socket").await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn send_to>(&self, buf: &[u8], path: P) -> io::Result { + future::poll_fn(|cx| { + futures::ready!(self.io_handle.poll_writable(cx)?); + + match self.io_handle.get_ref().send_to(buf, path.as_ref()) { + Ok(n) => Poll::Ready(Ok(n)), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io_handle.clear_writable(cx)?; + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + }) + .await + } + + /// Sends data on the socket to the socket's peer. + /// + /// On success, returns the number of bytes written. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let mut socket = UnixDatagram::unbound()?; + /// socket.connect("/tmp/socket").await?; + /// socket.send(b"hello world").await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn send(&self, buf: &[u8]) -> io::Result { + future::poll_fn(|cx| { + futures::ready!(self.io_handle.poll_writable(cx)?); + + match self.io_handle.get_ref().send(buf) { + Ok(n) => Poll::Ready(Ok(n)), + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io_handle.clear_writable(cx)?; + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + }) + .await + } + + /// Shut down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the specified portions to + /// immediately return with an appropriate value (see the documentation of [`Shutdown`]). + /// + /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html + /// + /// ## Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixDatagram; + /// use std::net::Shutdown; + /// + /// let socket = UnixDatagram::unbound()?; + /// socket.shutdown(Shutdown::Both)?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io_handle.get_ref().shutdown(how) + } +} + +impl fmt::Debug for UnixDatagram { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut builder = f.debug_struct("UnixDatagram"); + builder.field("fd", &self.as_raw_fd()); + + if let Ok(addr) = self.local_addr() { + builder.field("local", &addr); + } + + if let Ok(addr) = self.peer_addr() { + builder.field("peer", &addr); + } + + builder.finish() + } +} + +impl From for UnixDatagram { + /// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent. + fn from(datagram: std::os::unix::net::UnixDatagram) -> UnixDatagram { + let mio_datagram = mio_uds::UnixDatagram::from_datagram(datagram).unwrap(); + UnixDatagram { + raw_fd: mio_datagram.as_raw_fd(), + io_handle: IoHandle::new(mio_datagram), + } + } +} + +impl AsRawFd for UnixDatagram { + fn as_raw_fd(&self) -> RawFd { + self.raw_fd + } +} + +impl FromRawFd for UnixDatagram { + unsafe fn from_raw_fd(fd: RawFd) -> UnixDatagram { + let datagram = std::os::unix::net::UnixDatagram::from_raw_fd(fd); + datagram.into() + } +} + +impl IntoRawFd for UnixDatagram { + fn into_raw_fd(self) -> RawFd { + self.raw_fd + } +} diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs new file mode 100644 index 00000000..f39de1ad --- /dev/null +++ b/src/os/unix/net/listener.rs @@ -0,0 +1,246 @@ +//! Unix-specific networking extensions. + +use std::fmt; +use std::path::Path; +use std::pin::Pin; + +use futures::future; +use mio_uds; + +use super::SocketAddr; +use super::UnixStream; +use crate::future::Future; +use crate::io; +use crate::net::driver::IoHandle; +use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use crate::task::{blocking, Context, Poll}; + +/// A Unix domain socket server, listening for connections. +/// +/// After creating a `UnixListener` by [`bind`]ing it to a socket address, it listens for incoming +/// connections. These can be accepted by awaiting elements from the async stream of [`incoming`] +/// connections. +/// +/// The socket will be closed when the value is dropped. +/// +/// This type is an async version of [`std::os::unix::net::UnixListener`]. +/// +/// [`std::os::unix::net::UnixListener`]: +/// https://doc.rust-lang.org/std/os/unix/net/struct.UnixListener.html +/// [`bind`]: #method.bind +/// [`incoming`]: #method.incoming +/// +/// # Examples +/// +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::os::unix::net::UnixListener; +/// use async_std::prelude::*; +/// +/// let listener = UnixListener::bind("/tmp/socket").await?; +/// let mut incoming = listener.incoming(); +/// +/// while let Some(stream) = incoming.next().await { +/// let mut stream = stream?; +/// stream.write_all(b"hello world").await?; +/// } +/// # +/// # Ok(()) }) } +/// ``` +pub struct UnixListener { + #[cfg(not(feature = "docs.rs"))] + io_handle: IoHandle, + + raw_fd: RawFd, +} + +impl UnixListener { + /// Creates a Unix datagram listener bound to the given path. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixListener; + /// + /// let listener = UnixListener::bind("/tmp/socket").await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn bind>(path: P) -> io::Result { + let path = path.as_ref().to_owned(); + let listener = blocking::spawn(async move { mio_uds::UnixListener::bind(path) }).await?; + + Ok(UnixListener { + raw_fd: listener.as_raw_fd(), + io_handle: IoHandle::new(listener), + }) + } + + /// Accepts a new incoming connection to this listener. + /// + /// When a connection is established, the corresponding stream and address will be returned. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixListener; + /// + /// let listener = UnixListener::bind("/tmp/socket").await?; + /// let (socket, addr) = listener.accept().await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { + future::poll_fn(|cx| { + futures::ready!(self.io_handle.poll_readable(cx)?); + + match self.io_handle.get_ref().accept_std() { + Ok(Some((io, addr))) => { + let mio_stream = mio_uds::UnixStream::from_stream(io)?; + let stream = UnixStream { + raw_fd: mio_stream.as_raw_fd(), + io_handle: IoHandle::new(mio_stream), + }; + Poll::Ready(Ok((stream, addr))) + } + Ok(None) => { + self.io_handle.clear_readable(cx)?; + Poll::Pending + } + Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { + self.io_handle.clear_readable(cx)?; + Poll::Pending + } + Err(err) => Poll::Ready(Err(err)), + } + }) + .await + } + + /// Returns a stream of incoming connections. + /// + /// Iterating over this stream is equivalent to calling [`accept`] in a loop. The stream of + /// connections is infinite, i.e awaiting the next connection will never result in [`None`]. + /// + /// [`accept`]: #method.accept + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixListener; + /// use async_std::prelude::*; + /// + /// let listener = UnixListener::bind("/tmp/socket").await?; + /// let mut incoming = listener.incoming(); + /// + /// while let Some(stream) = incoming.next().await { + /// let mut stream = stream?; + /// stream.write_all(b"hello world").await?; + /// } + /// # + /// # Ok(()) }) } + /// ``` + pub fn incoming(&self) -> Incoming<'_> { + Incoming(self) + } + + /// Returns the local socket address of this listener. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixListener; + /// + /// let listener = UnixListener::bind("/tmp/socket").await?; + /// let addr = listener.local_addr()?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn local_addr(&self) -> io::Result { + self.io_handle.get_ref().local_addr() + } +} + +impl fmt::Debug for UnixListener { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut builder = f.debug_struct("UnixListener"); + builder.field("fd", &self.as_raw_fd()); + + if let Ok(addr) = self.local_addr() { + builder.field("local", &addr); + } + + builder.finish() + } +} + +/// A stream of incoming Unix domain socket connections. +/// +/// This stream is infinite, i.e awaiting the next connection will never result in [`None`]. It is +/// created by the [`incoming`] method on [`UnixListener`]. +/// +/// This type is an async version of [`std::os::unix::net::Incoming`]. +/// +/// [`std::os::unix::net::Incoming`]: https://doc.rust-lang.org/std/os/unix/net/struct.Incoming.html +/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None +/// [`incoming`]: struct.UnixListener.html#method.incoming +/// [`UnixListener`]: struct.UnixListener.html +#[derive(Debug)] +pub struct Incoming<'a>(&'a UnixListener); + +impl futures::Stream for Incoming<'_> { + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let future = self.0.accept(); + futures::pin_mut!(future); + + let (socket, _) = futures::ready!(future.poll(cx))?; + Poll::Ready(Some(Ok(socket))) + } +} + +impl From for UnixListener { + /// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent. + fn from(listener: std::os::unix::net::UnixListener) -> UnixListener { + let mio_listener = mio_uds::UnixListener::from_listener(listener).unwrap(); + UnixListener { + raw_fd: mio_listener.as_raw_fd(), + io_handle: IoHandle::new(mio_listener), + } + } +} + +impl AsRawFd for UnixListener { + fn as_raw_fd(&self) -> RawFd { + self.raw_fd + } +} + +impl FromRawFd for UnixListener { + unsafe fn from_raw_fd(fd: RawFd) -> UnixListener { + let listener = std::os::unix::net::UnixListener::from_raw_fd(fd); + listener.into() + } +} + +impl IntoRawFd for UnixListener { + fn into_raw_fd(self) -> RawFd { + self.raw_fd + } +} diff --git a/src/os/unix/net/mod.rs b/src/os/unix/net/mod.rs new file mode 100644 index 00000000..11c38b26 --- /dev/null +++ b/src/os/unix/net/mod.rs @@ -0,0 +1,96 @@ +//! Unix-specific networking extensions. + +use cfg_if::cfg_if; + +pub use datagram::UnixDatagram; +pub use listener::{Incoming, UnixListener}; +pub use stream::UnixStream; + +mod datagram; +mod listener; +mod stream; + +cfg_if! { + if #[cfg(feature = "docs.rs")] { + /// An address associated with a Unix socket. + /// + /// # Examples + /// + /// ``` + /// use async_std::os::unix::net::UnixListener; + /// + /// let socket = UnixListener::bind("/tmp/socket").await?; + /// let addr = socket.local_addr()?; + /// ``` + #[derive(Clone)] + pub struct SocketAddr { + _private: (), + } + + impl SocketAddr { + /// Returns `true` if the address is unnamed. + /// + /// # Examples + /// + /// A named address: + /// + /// ```no_run + /// use async_std::os::unix::net::UnixListener; + /// + /// let socket = UnixListener::bind("/tmp/socket").await?; + /// let addr = socket.local_addr()?; + /// assert_eq!(addr.is_unnamed(), false); + /// ``` + /// + /// An unnamed address: + /// + /// ```no_run + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let socket = UnixDatagram::unbound().await?; + /// let addr = socket.local_addr()?; + /// assert_eq!(addr.is_unnamed(), true); + /// ``` + pub fn is_unnamed(&self) -> bool { + unreachable!() + } + + /// Returns the contents of this address if it is a `pathname` address. + /// + /// # Examples + /// + /// With a pathname: + /// + /// ```no_run + /// use async_std::os::unix::net::UnixListener; + /// use std::path::Path; + /// + /// let socket = UnixListener::bind("/tmp/socket").await?; + /// let addr = socket.local_addr()?; + /// assert_eq!(addr.as_pathname(), Some(Path::new("/tmp/socket"))); + /// ``` + /// + /// Without a pathname: + /// + /// ``` + /// use async_std::os::unix::net::UnixDatagram; + /// + /// let socket = UnixDatagram::unbound()?; + /// let addr = socket.local_addr()?; + /// assert_eq!(addr.as_pathname(), None); + /// ``` + pub fn as_pathname(&self) -> Option<&Path> { + unreachable!() + } + } + + impl fmt::Debug for SocketAddr { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + unreachable!() + } + } + } else { + #[doc(inline)] + pub use std::os::unix::net::SocketAddr; + } +} diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs new file mode 100644 index 00000000..d92e2a3f --- /dev/null +++ b/src/os/unix/net/stream.rs @@ -0,0 +1,302 @@ +//! Unix-specific networking extensions. + +use std::fmt; +use std::mem; +use std::net::Shutdown; +use std::path::Path; +use std::pin::Pin; + +use futures::future; +use mio_uds; + +use super::SocketAddr; +use crate::io; +use crate::net::driver::IoHandle; +use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use crate::task::{blocking, Context, Poll}; + +/// A Unix stream socket. +/// +/// This type is an async version of [`std::os::unix::net::UnixStream`]. +/// +/// [`std::os::unix::net::UnixStream`]: +/// https://doc.rust-lang.org/std/os/unix/net/struct.UnixStream.html +/// +/// # Examples +/// +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::os::unix::net::UnixStream; +/// use async_std::prelude::*; +/// +/// let mut stream = UnixStream::connect("/tmp/socket").await?; +/// stream.write_all(b"hello world").await?; +/// +/// let mut response = Vec::new(); +/// stream.read_to_end(&mut response).await?; +/// # +/// # Ok(()) }) } +/// ``` +pub struct UnixStream { + #[cfg(not(feature = "docs.rs"))] + pub(super) io_handle: IoHandle, + + pub(super) raw_fd: RawFd, +} + +impl UnixStream { + /// Connects to the socket to the specified address. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixStream; + /// + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn connect>(path: P) -> io::Result { + enum State { + Waiting(UnixStream), + Error(io::Error), + Done, + } + + let path = path.as_ref().to_owned(); + let mut state = { + match blocking::spawn(async move { mio_uds::UnixStream::connect(path) }).await { + Ok(mio_stream) => State::Waiting(UnixStream { + raw_fd: mio_stream.as_raw_fd(), + io_handle: IoHandle::new(mio_stream), + }), + Err(err) => State::Error(err), + } + }; + + future::poll_fn(|cx| { + match &mut state { + State::Waiting(stream) => { + futures::ready!(stream.io_handle.poll_writable(cx)?); + + if let Some(err) = stream.io_handle.get_ref().take_error()? { + return Poll::Ready(Err(err)); + } + } + State::Error(_) => { + let err = match mem::replace(&mut state, State::Done) { + State::Error(err) => err, + _ => unreachable!(), + }; + + return Poll::Ready(Err(err)); + } + State::Done => panic!("`UnixStream::connect()` future polled after completion"), + } + + match mem::replace(&mut state, State::Done) { + State::Waiting(stream) => Poll::Ready(Ok(stream)), + _ => unreachable!(), + } + }) + .await + } + + /// Creates an unnamed pair of connected sockets. + /// + /// Returns two streams which are connected to each other. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixStream; + /// + /// let stream = UnixStream::pair()?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn pair() -> io::Result<(UnixStream, UnixStream)> { + let (a, b) = mio_uds::UnixStream::pair()?; + let a = UnixStream { + raw_fd: a.as_raw_fd(), + io_handle: IoHandle::new(a), + }; + let b = UnixStream { + raw_fd: b.as_raw_fd(), + io_handle: IoHandle::new(b), + }; + Ok((a, b)) + } + + /// Returns the socket address of the local half of this connection. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixStream; + /// + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// let addr = stream.local_addr()?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn local_addr(&self) -> io::Result { + self.io_handle.get_ref().local_addr() + } + + /// Returns the socket address of the remote half of this connection. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixStream; + /// + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// let peer = stream.peer_addr()?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn peer_addr(&self) -> io::Result { + self.io_handle.get_ref().peer_addr() + } + + /// Shuts down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the specified portions to + /// immediately return with an appropriate value (see the documentation of [`Shutdown`]). + /// + /// [`Shutdown`]: https://doc.rust-lang.org/std/net/enum.Shutdown.html + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::os::unix::net::UnixStream; + /// use std::net::Shutdown; + /// + /// let stream = UnixStream::connect("/tmp/socket").await?; + /// stream.shutdown(Shutdown::Both)?; + /// # + /// # Ok(()) }) } + /// ``` + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.io_handle.get_ref().shutdown(how) + } +} + +impl futures::io::AsyncRead for UnixStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut &*self).poll_read(cx, buf) + } +} + +impl futures::io::AsyncRead for &UnixStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut &self.io_handle).poll_read(cx, buf) + } +} + +impl futures::io::AsyncWrite for UnixStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut &*self).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &*self).poll_close(cx) + } +} + +impl futures::io::AsyncWrite for &UnixStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut &self.io_handle).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &self.io_handle).poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut &self.io_handle).poll_close(cx) + } +} + +impl fmt::Debug for UnixStream { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut builder = f.debug_struct("UnixStream"); + builder.field("fd", &self.as_raw_fd()); + + if let Ok(addr) = self.local_addr() { + builder.field("local", &addr); + } + + if let Ok(addr) = self.peer_addr() { + builder.field("peer", &addr); + } + + builder.finish() + } +} + +impl From for UnixStream { + /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent. + fn from(stream: std::os::unix::net::UnixStream) -> UnixStream { + let mio_stream = mio_uds::UnixStream::from_stream(stream).unwrap(); + UnixStream { + raw_fd: mio_stream.as_raw_fd(), + io_handle: IoHandle::new(mio_stream), + } + } +} + +impl AsRawFd for UnixStream { + fn as_raw_fd(&self) -> RawFd { + self.raw_fd + } +} + +impl FromRawFd for UnixStream { + unsafe fn from_raw_fd(fd: RawFd) -> UnixStream { + let stream = std::os::unix::net::UnixStream::from_raw_fd(fd); + stream.into() + } +} + +impl IntoRawFd for UnixStream { + fn into_raw_fd(self) -> RawFd { + self.raw_fd + } +} diff --git a/src/task/pool.rs b/src/task/pool.rs index 42202987..0f70f14a 100644 --- a/src/task/pool.rs +++ b/src/task/pool.rs @@ -1,6 +1,5 @@ use std::cell::{Cell, UnsafeCell}; use std::fmt::Arguments; -use std::io; use std::mem; use std::panic::{self, AssertUnwindSafe}; use std::pin::Pin; @@ -14,6 +13,7 @@ use lazy_static::lazy_static; use super::task; use super::{JoinHandle, Task}; use crate::future::Future; +use crate::io; /// Returns a handle to the current task. /// diff --git a/src/time/mod.rs b/src/time/mod.rs index 1a2c8f99..83dee9cf 100644 --- a/src/time/mod.rs +++ b/src/time/mod.rs @@ -28,7 +28,6 @@ use std::error::Error; use std::fmt; -use std::io; use std::pin::Pin; use std::time::Duration; @@ -37,6 +36,7 @@ use futures_timer::Delay; use pin_utils::unsafe_pinned; use crate::future::Future; +use crate::io; use crate::task::{Context, Poll}; cfg_if! {