From a430e2781920907996e7ae2e4227419580455185 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 12 Aug 2019 18:00:21 +0200 Subject: [PATCH] Cleanup, docs, fmt --- src/fs/dir_builder.rs | 2 +- src/fs/dir_entry.rs | 7 +- src/fs/file.rs | 9 +- src/fs/open_options.rs | 2 +- src/fs/read_dir.rs | 9 +- src/io/buf_read.rs | 12 +-- src/io/buf_reader.rs | 200 ++++++++++++++++++++++++++++++----------- src/io/copy.rs | 3 +- src/io/read.rs | 7 +- src/io/seek.rs | 7 +- src/io/stderr.rs | 6 +- src/io/stdin.rs | 7 +- src/io/stdout.rs | 6 +- src/io/write.rs | 7 +- src/net/driver.rs | 16 ++-- src/net/tcp.rs | 17 ++-- src/net/udp.rs | 12 +-- src/os/unix/net.rs | 22 ++--- src/stream/empty.rs | 3 +- src/stream/once.rs | 3 +- src/stream/repeat.rs | 3 +- src/stream/stream.rs | 5 +- src/sync/mutex.rs | 5 +- src/sync/rwlock.rs | 5 +- src/task/blocking.rs | 4 +- src/task/pool.rs | 4 +- src/task/sleep.rs | 2 +- src/task/task.rs | 4 +- src/time/mod.rs | 5 +- tests/mutex.rs | 2 +- tests/rwlock.rs | 2 +- 31 files changed, 244 insertions(+), 154 deletions(-) diff --git a/src/fs/dir_builder.rs b/src/fs/dir_builder.rs index 66edad8..a4a9568 100644 --- a/src/fs/dir_builder.rs +++ b/src/fs/dir_builder.rs @@ -1,10 +1,10 @@ use std::fs; -use std::future::Future; use std::io; use std::path::Path; use cfg_if::cfg_if; +use crate::future::Future; use crate::task::blocking; /// A builder for creating directories in various manners. diff --git a/src/fs/dir_entry.rs b/src/fs/dir_entry.rs index dc84117..f8bb9e7 100644 --- a/src/fs/dir_entry.rs +++ b/src/fs/dir_entry.rs @@ -1,16 +1,15 @@ use std::ffi::OsString; use std::fs; -use std::future::Future; use std::io; use std::path::PathBuf; use std::pin::Pin; use std::sync::Mutex; -use std::task::Poll; use cfg_if::cfg_if; -use futures::prelude::*; +use futures::future::{self, FutureExt, TryFutureExt}; -use crate::task::blocking; +use crate::future::Future; +use crate::task::{blocking, Poll}; /// An entry inside a directory. /// diff --git a/src/fs/file.rs b/src/fs/file.rs index fa6cc00..0332d68 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -1,18 +1,17 @@ //! Types for working with files. use std::fs; -use std::future::Future; use std::io::{self, SeekFrom}; use std::path::Path; use std::pin::Pin; use std::sync::Mutex; -use std::task::{Context, Poll}; use cfg_if::cfg_if; -use futures::io::Initializer; -use futures::prelude::*; +use futures::future::{self, FutureExt, TryFutureExt}; +use futures::io::{AsyncSeek, Initializer}; -use crate::task::blocking; +use crate::future::Future; +use crate::task::{blocking, Context, Poll}; /// A reference to a file on the filesystem. /// diff --git a/src/fs/open_options.rs b/src/fs/open_options.rs index 07cfad4..b2ed401 100644 --- a/src/fs/open_options.rs +++ b/src/fs/open_options.rs @@ -1,11 +1,11 @@ use std::fs; -use std::future::Future; use std::io; use std::path::Path; use cfg_if::cfg_if; use super::File; +use crate::future::Future; use crate::task::blocking; /// Options and flags which for configuring how a file is opened. diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index e663c14..7c2830b 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -1,15 +1,12 @@ use std::fs; -use std::future::Future; use std::io; use std::pin::Pin; use std::sync::Mutex; -use std::task::{Context, Poll}; - -use futures::Stream; use super::DirEntry; -use crate::task::blocking; +use crate::future::Future; +use crate::task::{blocking, Context, Poll}; /// A stream over entries in a directory. /// @@ -55,7 +52,7 @@ impl ReadDir { } } -impl Stream for ReadDir { +impl futures::Stream for ReadDir { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/io/buf_read.rs b/src/io/buf_read.rs index 8e0e9a3..b29bca3 100644 --- a/src/io/buf_read.rs +++ b/src/io/buf_read.rs @@ -1,18 +1,18 @@ -use std::future::Future; -use std::io::{self}; +use std::io; use std::mem; use std::pin::Pin; use std::str; -use std::task::{Context, Poll}; use cfg_if::cfg_if; use futures::io::AsyncBufRead; -use futures::Stream; + +use crate::future::Future; +use crate::task::{Context, Poll}; cfg_if! { if #[cfg(feature = "docs.rs")] { #[doc(hidden)] - pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); + pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); macro_rules! ret { ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); @@ -245,7 +245,7 @@ pub struct Lines { read: usize, } -impl Stream for Lines { +impl futures::Stream for Lines { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index 85eb2db..e681d35 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -1,33 +1,46 @@ -use std::io::{self, IoSliceMut, Read, SeekFrom}; +use std::io::{self, IoSliceMut, Read as _, SeekFrom}; use std::pin::Pin; -use std::task::{Context, Poll}; use std::{cmp, fmt}; use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer}; -// used by `BufReader` and `BufWriter` -// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 -const DEFAULT_BUF_SIZE: usize = 8 * 1024; +use crate::task::{Context, Poll}; -/// The `BufReader` struct adds buffering to any reader. +const DEFAULT_CAPACITY: usize = 8 * 1024; + +/// Adds buffering to any reader. +/// +/// It can be excessively inefficient to work directly with a [`Read`] instance. A `BufReader` +/// performs large, infrequent reads on the underlying [`Read`] and maintains an in-memory buffer +/// of the incoming byte stream. +/// +/// `BufReader` can improve the speed of programs that make *small* and *repeated* read calls to +/// the same file or network socket. It does not help when reading very large amounts at once, or +/// reading just one or a few times. It also provides no advantage when reading from a source that +/// is already in memory, like a `Vec`. +/// +/// When the `BufReader` is dropped, the contents of its buffer will be discarded. Creating +/// multiple instances of a `BufReader` on the same stream can cause data loss. /// -/// It can be excessively inefficient to work directly with a [`AsyncRead`] -/// instance. A `BufReader` performs large, infrequent reads on the underlying -/// [`AsyncRead`] and maintains an in-memory buffer of the results. +/// [`Read`]: trait.Read.html /// -/// `BufReader` can improve the speed of programs that make *small* and -/// *repeated* read calls to the same file or network socket. It does not -/// help when reading very large amounts at once, or reading just one or a few -/// times. It also provides no advantage when reading from a source that is -/// already in memory, like a `Vec`. +/// # Examples /// -/// When the `BufReader` is dropped, the contents of its buffer will be -/// discarded. Creating multiple instances of a `BufReader` on the same -/// stream can cause data loss. +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::fs::File; +/// use async_std::io::BufReader; +/// use async_std::prelude::*; /// -/// [`AsyncRead`]: futures_io::AsyncRead +/// let mut f = BufReader::new(File::open("a.txt").await?); /// -// TODO: Examples +/// let mut line = String::new(); +/// f.read_line(&mut line).await?; +/// # +/// # Ok(()) }) } +/// ``` pub struct BufReader { inner: R, buf: Box<[u8]>, @@ -36,19 +49,49 @@ pub struct BufReader { } impl BufReader { - /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, - /// but may change in the future. - pub fn new(inner: R) -> Self { - Self::with_capacity(DEFAULT_BUF_SIZE, inner) + /// Creates a buffered reader with default buffer capacity. + /// + /// The default capacity is currently 8 KB, but may change in the future. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::BufReader; + /// + /// let f = BufReader::new(File::open("a.txt").await?); + /// # + /// # Ok(()) }) } + /// ``` + pub fn new(inner: R) -> BufReader { + BufReader::with_capacity(DEFAULT_CAPACITY, inner) } - /// Creates a new `BufReader` with the specified buffer capacity. - pub fn with_capacity(capacity: usize, inner: R) -> Self { + /// Creates a new buffered reader with the specified capacity. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::BufReader; + /// + /// let f = BufReader::with_capacity(1024, File::open("a.txt").await?); + /// # + /// # Ok(()) }) } + /// ``` + pub fn with_capacity(capacity: usize, inner: R) -> BufReader { unsafe { let mut buffer = Vec::with_capacity(capacity); buffer.set_len(capacity); inner.initializer().initialize(&mut buffer); - Self { + + BufReader { inner, buf: buffer.into_boxed_slice(), pos: 0, @@ -66,6 +109,21 @@ impl BufReader { /// Gets a reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::BufReader; + /// + /// let f = BufReader::new(File::open("a.txt").await?); + /// let inner = f.get_ref(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_ref(&self) -> &R { &self.inner } @@ -73,31 +131,69 @@ impl BufReader { /// Gets a mutable reference to the underlying reader. /// /// It is inadvisable to directly read from the underlying reader. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::BufReader; + /// + /// let mut f = BufReader::new(File::open("a.txt").await?); + /// let inner = f.get_mut(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_mut(&mut self) -> &mut R { &mut self.inner } - /// Gets a pinned mutable reference to the underlying reader. + /// Returns a reference to the internal buffer. /// - /// It is inadvisable to directly read from the underlying reader. - pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut R> { - self.inner() + /// This function will not attempt to fill the buffer if it is empty. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::BufReader; + /// + /// let f = BufReader::new(File::open("a.txt").await?); + /// let buffer = f.buffer(); + /// # + /// # Ok(()) }) } + /// ``` + pub fn buffer(&self) -> &[u8] { + &self.buf[self.pos..self.cap] } - /// Consumes this `BufWriter`, returning the underlying reader. + /// Unwraps the buffered reader, returning the underlying reader. /// /// Note that any leftover data in the internal buffer is lost. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::BufReader; + /// + /// let f = BufReader::new(File::open("a.txt").await?); + /// let inner = f.into_inner(); + /// # + /// # Ok(()) }) } + /// ``` pub fn into_inner(self) -> R { self.inner } - /// Returns a reference to the internally buffered data. - /// - /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. - pub fn buffer(&self) -> &[u8] { - &self.buf[self.pos..self.cap] - } - /// Invalidates all data in the internal buffer. #[inline] fn discard_buffer(mut self: Pin<&mut Self>) { @@ -192,27 +288,23 @@ impl fmt::Debug for BufReader { } impl AsyncSeek for BufReader { - /// Seek to an offset, in bytes, in the underlying reader. + /// Seeks to an offset, in bytes, in the underlying reader. /// - /// The position used for seeking with `SeekFrom::Current(_)` is the - /// position the underlying reader would be at if the `BufReader` had no - /// internal buffer. + /// The position used for seeking with `SeekFrom::Current(_)` is the position the underlying + /// reader would be at if the `BufReader` had no internal buffer. /// - /// Seeking always discards the internal buffer, even if the seek position - /// would otherwise fall within it. This guarantees that calling - /// `.into_inner()` immediately after a seek yields the underlying reader - /// at the same position. + /// Seeking always discards the internal buffer, even if the seek position would otherwise fall + /// within it. This guarantees that calling `.into_inner()` immediately after a seek yields the + /// underlying reader at the same position. /// - /// To seek without discarding the internal buffer, use - /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative). + /// See [`Seek`] for more details. /// - /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. + /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the + /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If + /// the second seek returns `Err`, the underlying reader will be left at the same position it + /// would have if you called `seek` with `SeekFrom::Current(0)`. /// - /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` - /// where `n` minus the internal buffer length overflows an `i64`, two - /// seeks will be performed instead of one. If the second seek returns - /// `Err`, the underlying reader will be left at the same position it would - /// have if you called `seek` with `SeekFrom::Current(0)`. + /// [`Seek`]: trait.Seek.html fn poll_seek( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/io/copy.rs b/src/io/copy.rs index d930680..ed16660 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -1,6 +1,7 @@ -use futures::prelude::*; use std::io; +use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite}; + /// Copies the entire contents of a reader into a writer. /// /// This function will continuously read data from `reader` and then diff --git a/src/io/read.rs b/src/io/read.rs index fae2b1c..89190f7 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -1,17 +1,18 @@ -use std::future::Future; use std::io::{self, IoSliceMut}; use std::mem; use std::pin::Pin; use std::str; -use std::task::{Context, Poll}; use cfg_if::cfg_if; use futures::io::AsyncRead; +use crate::future::Future; +use crate::task::{Context, Poll}; + cfg_if! { if #[cfg(feature = "docs.rs")] { #[doc(hidden)] - pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); + pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); macro_rules! ret { ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); diff --git a/src/io/seek.rs b/src/io/seek.rs index 43841d3..fd48fa8 100644 --- a/src/io/seek.rs +++ b/src/io/seek.rs @@ -1,15 +1,16 @@ -use std::future::Future; use std::io::{self, SeekFrom}; use std::pin::Pin; -use std::task::{Context, Poll}; use cfg_if::cfg_if; use futures::io::AsyncSeek; +use crate::future::Future; +use crate::task::{Context, Poll}; + cfg_if! { if #[cfg(feature = "docs.rs")] { #[doc(hidden)] - pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); + pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); macro_rules! ret { ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); diff --git a/src/io/stderr.rs b/src/io/stderr.rs index 3c57f55..14ff2b7 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -1,13 +1,11 @@ -use std::future::Future; use std::io; use std::pin::Pin; use std::sync::Mutex; -use std::task::{Context, Poll}; use cfg_if::cfg_if; -use futures::prelude::*; -use crate::task::blocking; +use crate::future::Future; +use crate::task::{blocking, Context, Poll}; /// Constructs a new handle to the standard error of the current process. /// diff --git a/src/io/stdin.rs b/src/io/stdin.rs index eabdc51..84e8dd9 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -1,14 +1,13 @@ -use std::future::Future; use std::io; use std::pin::Pin; use std::sync::Mutex; -use std::task::{Context, Poll}; use cfg_if::cfg_if; +use futures::future; use futures::io::Initializer; -use futures::prelude::*; -use crate::task::blocking; +use crate::future::Future; +use crate::task::{blocking, Context, Poll}; /// Constructs a new handle to the standard input of the current process. /// diff --git a/src/io/stdout.rs b/src/io/stdout.rs index b078819..5626354 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -1,13 +1,11 @@ -use std::future::Future; use std::io; use std::pin::Pin; use std::sync::Mutex; -use std::task::{Context, Poll}; use cfg_if::cfg_if; -use futures::prelude::*; -use crate::task::blocking; +use crate::future::Future; +use crate::task::{blocking, Context, Poll}; /// Constructs a new handle to the standard output of the current process. /// diff --git a/src/io/write.rs b/src/io/write.rs index 9054add..f1f402d 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,16 +1,17 @@ -use std::future::Future; use std::io::{self, IoSlice}; use std::mem; use std::pin::Pin; -use std::task::{Context, Poll}; use cfg_if::cfg_if; use futures::io::AsyncWrite; +use crate::future::Future; +use crate::task::{Context, Poll}; + cfg_if! { if #[cfg(feature = "docs.rs")] { #[doc(hidden)] - pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); + pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); macro_rules! ret { ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); diff --git a/src/net/driver.rs b/src/net/driver.rs index 337cb4e..b2c4e8a 100644 --- a/src/net/driver.rs +++ b/src/net/driver.rs @@ -3,13 +3,13 @@ use std::io::{self, prelude::*}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll, Waker}; -use futures::{prelude::*, ready}; +use futures::io::{AsyncRead, AsyncWrite}; use lazy_static::lazy_static; use mio::{self, Evented}; use slab::Slab; +use crate::task::{Context, Poll, Waker}; use crate::utils::abort_on_panic; /// Data associated with a registered I/O handle. @@ -302,7 +302,7 @@ impl AsyncRead for IoHandle { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - ready!(Pin::new(&mut *self).poll_readable(cx)?); + futures::ready!(Pin::new(&mut *self).poll_readable(cx)?); match self.source.read(buf) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -323,7 +323,7 @@ where cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - ready!(Pin::new(&mut *self).poll_readable(cx)?); + futures::ready!(Pin::new(&mut *self).poll_readable(cx)?); match (&self.source).read(buf) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -341,7 +341,7 @@ impl AsyncWrite for IoHandle { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - ready!(self.poll_writable(cx)?); + futures::ready!(self.poll_writable(cx)?); match self.source.write(buf) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -353,7 +353,7 @@ impl AsyncWrite for IoHandle { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.poll_writable(cx)?); + futures::ready!(self.poll_writable(cx)?); match self.source.flush() { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -378,7 +378,7 @@ where cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - ready!(self.poll_writable(cx)?); + futures::ready!(self.poll_writable(cx)?); match (&self.source).write(buf) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -390,7 +390,7 @@ where } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - ready!(self.poll_writable(cx)?); + futures::ready!(self.poll_writable(cx)?); match (&self.source).flush() { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 8e7bf3b..c184333 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -2,13 +2,13 @@ use std::io::{self, IoSlice, IoSliceMut}; use std::mem; use std::net::{self, SocketAddr, ToSocketAddrs}; use std::pin::Pin; -use std::task::{Context, Poll}; use cfg_if::cfg_if; -use futures::{prelude::*, ready}; -use futures::stream::FusedStream; +use futures::future; +use crate::future::Future; use crate::net::driver::IoHandle; +use crate::task::{Context, Poll}; /// A TCP stream between a local and a remote socket. /// @@ -260,7 +260,8 @@ impl TcpStream { /// ``` pub async fn peek(&self, buf: &mut [u8]) -> io::Result { let res = future::poll_fn(|cx| { - ready!(self.io_handle.poll_readable(cx)?); + futures::ready!(self.io_handle.poll_readable(cx)?); + match self.io_handle.get_ref().peek(buf) { Ok(len) => Poll::Ready(Ok(len)), Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -559,7 +560,7 @@ impl TcpListener { /// ``` pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { future::poll_fn(|cx| { - ready!(self.io_handle.poll_readable(cx)?); + futures::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().accept_std() { Ok((io, addr)) => { @@ -663,15 +664,11 @@ impl<'a> futures::Stream for Incoming<'a> { let future = self.0.accept(); pin_utils::pin_mut!(future); - let (socket, _) = ready!(future.poll(cx))?; + let (socket, _) = futures::ready!(future.poll(cx))?; Poll::Ready(Some(Ok(socket))) } } -impl<'a> FusedStream for Incoming<'a> { - fn is_terminated(&self) -> bool { false } -} - impl From for TcpStream { /// Converts a `std::net::TcpStream` into its asynchronous equivalent. fn from(stream: net::TcpStream) -> TcpStream { diff --git a/src/net/udp.rs b/src/net/udp.rs index 6f08b00..964eb14 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,11 +1,11 @@ use std::io; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}; -use std::task::Poll; use cfg_if::cfg_if; -use futures::{prelude::*, ready}; +use futures::future; use crate::net::driver::IoHandle; +use crate::task::Poll; /// A UDP socket. /// @@ -168,7 +168,7 @@ impl UdpSocket { }; future::poll_fn(|cx| { - ready!(self.io_handle.poll_writable(cx)?); + futures::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().send_to(buf, &addr) { Ok(n) => Poll::Ready(Ok(n)), @@ -204,7 +204,7 @@ impl UdpSocket { /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { future::poll_fn(|cx| { - ready!(self.io_handle.poll_readable(cx)?); + futures::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().recv_from(buf) { Ok(n) => Poll::Ready(Ok(n)), @@ -288,7 +288,7 @@ impl UdpSocket { /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { future::poll_fn(|cx| { - ready!(self.io_handle.poll_writable(cx)?); + futures::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().send(buf) { Ok(n) => Poll::Ready(Ok(n)), @@ -324,7 +324,7 @@ impl UdpSocket { /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { future::poll_fn(|cx| { - ready!(self.io_handle.poll_readable(cx)?); + futures::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().recv(buf) { Ok(n) => Poll::Ready(Ok(n)), diff --git a/src/os/unix/net.rs b/src/os/unix/net.rs index b8beb59..6b6810c 100644 --- a/src/os/unix/net.rs +++ b/src/os/unix/net.rs @@ -6,15 +6,15 @@ use std::mem; use std::net::Shutdown; use std::path::Path; use std::pin::Pin; -use std::task::{Context, Poll}; use cfg_if::cfg_if; -use futures::{prelude::*, ready}; +use futures::future; use mio_uds; +use crate::future::Future; use crate::net::driver::IoHandle; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -use crate::task::blocking; +use crate::task::{blocking, Context, Poll}; /// A Unix datagram socket. /// @@ -214,7 +214,7 @@ impl UnixDatagram { /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { future::poll_fn(|cx| { - ready!(self.io_handle.poll_readable(cx)?); + futures::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().recv_from(buf) { Ok(n) => Poll::Ready(Ok(n)), @@ -248,7 +248,7 @@ impl UnixDatagram { /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { future::poll_fn(|cx| { - ready!(self.io_handle.poll_writable(cx)?); + futures::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().recv(buf) { Ok(n) => Poll::Ready(Ok(n)), @@ -281,7 +281,7 @@ impl UnixDatagram { /// ``` pub async fn send_to>(&self, buf: &[u8], path: P) -> io::Result { future::poll_fn(|cx| { - ready!(self.io_handle.poll_writable(cx)?); + futures::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().send_to(buf, path.as_ref()) { Ok(n) => Poll::Ready(Ok(n)), @@ -315,7 +315,7 @@ impl UnixDatagram { /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { future::poll_fn(|cx| { - ready!(self.io_handle.poll_writable(cx)?); + futures::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().send(buf) { Ok(n) => Poll::Ready(Ok(n)), @@ -457,7 +457,7 @@ impl UnixListener { /// ``` pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { future::poll_fn(|cx| { - ready!(self.io_handle.poll_readable(cx)?); + futures::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().accept_std() { Ok(Some((io, addr))) => { @@ -560,14 +560,14 @@ impl fmt::Debug for UnixListener { #[derive(Debug)] pub struct Incoming<'a>(&'a UnixListener); -impl Stream for Incoming<'_> { +impl futures::Stream for Incoming<'_> { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let future = self.0.accept(); futures::pin_mut!(future); - let (socket, _) = ready!(future.poll(cx))?; + let (socket, _) = futures::ready!(future.poll(cx))?; Poll::Ready(Some(Ok(socket))) } } @@ -639,7 +639,7 @@ impl UnixStream { future::poll_fn(|cx| { match &mut state { State::Waiting(stream) => { - ready!(stream.io_handle.poll_writable(cx)?); + futures::ready!(stream.io_handle.poll_writable(cx)?); if let Some(err) = stream.io_handle.get_ref().take_error()? { return Poll::Ready(Err(err)); diff --git a/src/stream/empty.rs b/src/stream/empty.rs index 30bcef1..eed6064 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -1,6 +1,7 @@ use std::marker::PhantomData; use std::pin::Pin; -use std::task::{Context, Poll}; + +use crate::task::{Context, Poll}; /// Creates a stream that doesn't yield any items. /// diff --git a/src/stream/once.rs b/src/stream/once.rs index 3d85304..36a6eba 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -1,5 +1,6 @@ use std::pin::Pin; -use std::task::{Context, Poll}; + +use crate::task::{Context, Poll}; /// Creates a stream that yields a single item. /// diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index 97a8514..84847c4 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -1,5 +1,6 @@ use std::pin::Pin; -use std::task::{Context, Poll}; + +use crate::task::{Context, Poll}; /// Creates a stream that yields the same item repeatedly. /// diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 1da6e05..dee195d 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -21,12 +21,13 @@ //! # }) } //! ``` -use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; use cfg_if::cfg_if; +use crate::future::Future; +use crate::task::{Context, Poll}; + cfg_if! { if #[cfg(feature = "docs.rs")] { #[doc(hidden)] diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index b6af503..d629a29 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -1,13 +1,14 @@ use std::cell::UnsafeCell; use std::fmt; -use std::future::Future; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{Context, Poll, Waker}; use slab::Slab; +use crate::future::Future; +use crate::task::{Context, Poll, Waker}; + /// Set if the mutex is locked. const LOCK: usize = 1 << 0; diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index 71ac142..a345ff9 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -1,13 +1,14 @@ use std::cell::UnsafeCell; use std::fmt; -use std::future::Future; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::task::{Context, Poll, Waker}; use slab::Slab; +use crate::future::Future; +use crate::task::{Context, Poll, Waker}; + /// Set if a write lock is held. const WRITE_LOCK: usize = 1 << 0; diff --git a/src/task/blocking.rs b/src/task/blocking.rs index ccffa7d..da8338e 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -1,14 +1,14 @@ //! A thread pool for running blocking functions asynchronously. use std::fmt; -use std::future::Future; use std::pin::Pin; -use std::task::{Context, Poll}; use std::thread; use crossbeam::channel::{unbounded, Receiver, Sender}; use lazy_static::lazy_static; +use crate::future::Future; +use crate::task::{Context, Poll}; use crate::utils::abort_on_panic; struct Pool { diff --git a/src/task/pool.rs b/src/task/pool.rs index b68519f..4220298 100644 --- a/src/task/pool.rs +++ b/src/task/pool.rs @@ -1,6 +1,5 @@ use std::cell::{Cell, UnsafeCell}; use std::fmt::Arguments; -use std::future::Future; use std::io; use std::mem; use std::panic::{self, AssertUnwindSafe}; @@ -9,11 +8,12 @@ use std::ptr; use std::thread; use crossbeam::channel::{unbounded, Sender}; -use futures::prelude::*; +use futures::future::FutureExt; use lazy_static::lazy_static; use super::task; use super::{JoinHandle, Task}; +use crate::future::Future; /// Returns a handle to the current task. /// diff --git a/src/task/sleep.rs b/src/task/sleep.rs index 97c65b1..2739d87 100644 --- a/src/task/sleep.rs +++ b/src/task/sleep.rs @@ -1,6 +1,6 @@ use std::time::Duration; -use futures::prelude::*; +use futures::future; use crate::time::Timeout; diff --git a/src/task/task.rs b/src/task/task.rs index 18789cd..834b2ac 100644 --- a/src/task/task.rs +++ b/src/task/task.rs @@ -1,14 +1,14 @@ use std::fmt; -use std::future::Future; use std::i64; use std::mem; use std::num::NonZeroU64; use std::pin::Pin; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; -use std::task::{Context, Poll}; use super::local; +use crate::future::Future; +use crate::task::{Context, Poll}; /// A handle to a task. #[derive(Clone)] diff --git a/src/time/mod.rs b/src/time/mod.rs index 75a1ac8..1a2c8f9 100644 --- a/src/time/mod.rs +++ b/src/time/mod.rs @@ -28,16 +28,17 @@ use std::error::Error; use std::fmt; -use std::future::Future; use std::io; use std::pin::Pin; -use std::task::{Context, Poll}; use std::time::Duration; use cfg_if::cfg_if; use futures_timer::Delay; use pin_utils::unsafe_pinned; +use crate::future::Future; +use crate::task::{Context, Poll}; + cfg_if! { if #[cfg(feature = "docs.rs")] { #[doc(hidden)] diff --git a/tests/mutex.rs b/tests/mutex.rs index 7e41894..8871f59 100644 --- a/tests/mutex.rs +++ b/tests/mutex.rs @@ -2,10 +2,10 @@ use std::sync::Arc; +use async_std::prelude::*; use async_std::sync::Mutex; use async_std::task; use futures::channel::mpsc; -use futures::prelude::*; #[test] fn smoke() { diff --git a/tests/rwlock.rs b/tests/rwlock.rs index 2e08383..2d1e0e1 100644 --- a/tests/rwlock.rs +++ b/tests/rwlock.rs @@ -7,10 +7,10 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use async_std::prelude::*; use async_std::sync::RwLock; use async_std::task; use futures::channel::mpsc; -use futures::prelude::*; /// Generates a random number in `0..n`. pub fn random(n: u32) -> u32 {