Cleanup, docs, fmt

integrate-threads-example
Stjepan Glavina 5 years ago
parent 0ab2c8f49b
commit a430e27819

@ -1,10 +1,10 @@
use std::fs; use std::fs;
use std::future::Future;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use crate::future::Future;
use crate::task::blocking; use crate::task::blocking;
/// A builder for creating directories in various manners. /// A builder for creating directories in various manners.

@ -1,16 +1,15 @@
use std::ffi::OsString; use std::ffi::OsString;
use std::fs; use std::fs;
use std::future::Future;
use std::io; use std::io;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::Poll;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::prelude::*; use futures::future::{self, FutureExt, TryFutureExt};
use crate::task::blocking; use crate::future::Future;
use crate::task::{blocking, Poll};
/// An entry inside a directory. /// An entry inside a directory.
/// ///

@ -1,18 +1,17 @@
//! Types for working with files. //! Types for working with files.
use std::fs; use std::fs;
use std::future::Future;
use std::io::{self, SeekFrom}; use std::io::{self, SeekFrom};
use std::path::Path; use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::Initializer; use futures::future::{self, FutureExt, TryFutureExt};
use futures::prelude::*; use futures::io::{AsyncSeek, Initializer};
use crate::task::blocking; use crate::future::Future;
use crate::task::{blocking, Context, Poll};
/// A reference to a file on the filesystem. /// A reference to a file on the filesystem.
/// ///

@ -1,11 +1,11 @@
use std::fs; use std::fs;
use std::future::Future;
use std::io; use std::io;
use std::path::Path; use std::path::Path;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use super::File; use super::File;
use crate::future::Future;
use crate::task::blocking; use crate::task::blocking;
/// Options and flags which for configuring how a file is opened. /// Options and flags which for configuring how a file is opened.

@ -1,15 +1,12 @@
use std::fs; use std::fs;
use std::future::Future;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::{Context, Poll};
use futures::Stream;
use super::DirEntry; use super::DirEntry;
use crate::task::blocking; use crate::future::Future;
use crate::task::{blocking, Context, Poll};
/// A stream over entries in a directory. /// A stream over entries in a directory.
/// ///
@ -55,7 +52,7 @@ impl ReadDir {
} }
} }
impl Stream for ReadDir { impl futures::Stream for ReadDir {
type Item = io::Result<DirEntry>; type Item = io::Result<DirEntry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

@ -1,18 +1,18 @@
use std::future::Future; use std::io;
use std::io::{self};
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
use std::str; use std::str;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncBufRead; use futures::io::AsyncBufRead;
use futures::Stream;
use crate::future::Future;
use crate::task::{Context, Poll};
cfg_if! { cfg_if! {
if #[cfg(feature = "docs.rs")] { if #[cfg(feature = "docs.rs")] {
#[doc(hidden)] #[doc(hidden)]
pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret { macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
@ -245,7 +245,7 @@ pub struct Lines<R> {
read: usize, read: usize,
} }
impl<R: AsyncBufRead> Stream for Lines<R> { impl<R: AsyncBufRead> futures::Stream for Lines<R> {
type Item = io::Result<String>; type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

@ -1,33 +1,46 @@
use std::io::{self, IoSliceMut, Read, SeekFrom}; use std::io::{self, IoSliceMut, Read as _, SeekFrom};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, fmt}; use std::{cmp, fmt};
use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer}; use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer};
// used by `BufReader` and `BufWriter` use crate::task::{Context, Poll};
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
/// The `BufReader` struct adds buffering to any reader. const DEFAULT_CAPACITY: usize = 8 * 1024;
/// Adds buffering to any reader.
///
/// It can be excessively inefficient to work directly with a [`Read`] instance. A `BufReader`
/// performs large, infrequent reads on the underlying [`Read`] and maintains an in-memory buffer
/// of the incoming byte stream.
///
/// `BufReader` can improve the speed of programs that make *small* and *repeated* read calls to
/// the same file or network socket. It does not help when reading very large amounts at once, or
/// reading just one or a few times. It also provides no advantage when reading from a source that
/// is already in memory, like a `Vec<u8>`.
///
/// When the `BufReader` is dropped, the contents of its buffer will be discarded. Creating
/// multiple instances of a `BufReader` on the same stream can cause data loss.
/// ///
/// It can be excessively inefficient to work directly with a [`AsyncRead`] /// [`Read`]: trait.Read.html
/// instance. A `BufReader` performs large, infrequent reads on the underlying
/// [`AsyncRead`] and maintains an in-memory buffer of the results.
/// ///
/// `BufReader` can improve the speed of programs that make *small* and /// # Examples
/// *repeated* read calls to the same file or network socket. It does not
/// help when reading very large amounts at once, or reading just one or a few
/// times. It also provides no advantage when reading from a source that is
/// already in memory, like a `Vec<u8>`.
/// ///
/// When the `BufReader` is dropped, the contents of its buffer will be /// ```no_run
/// discarded. Creating multiple instances of a `BufReader` on the same /// # #![feature(async_await)]
/// stream can cause data loss. /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
/// ///
/// [`AsyncRead`]: futures_io::AsyncRead /// let mut f = BufReader::new(File::open("a.txt").await?);
/// ///
// TODO: Examples /// let mut line = String::new();
/// f.read_line(&mut line).await?;
/// #
/// # Ok(()) }) }
/// ```
pub struct BufReader<R> { pub struct BufReader<R> {
inner: R, inner: R,
buf: Box<[u8]>, buf: Box<[u8]>,
@ -36,19 +49,49 @@ pub struct BufReader<R> {
} }
impl<R: AsyncRead> BufReader<R> { impl<R: AsyncRead> BufReader<R> {
/// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, /// Creates a buffered reader with default buffer capacity.
/// but may change in the future. ///
pub fn new(inner: R) -> Self { /// The default capacity is currently 8 KB, but may change in the future.
Self::with_capacity(DEFAULT_BUF_SIZE, inner) ///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
///
/// let f = BufReader::new(File::open("a.txt").await?);
/// #
/// # Ok(()) }) }
/// ```
pub fn new(inner: R) -> BufReader<R> {
BufReader::with_capacity(DEFAULT_CAPACITY, inner)
} }
/// Creates a new `BufReader` with the specified buffer capacity. /// Creates a new buffered reader with the specified capacity.
pub fn with_capacity(capacity: usize, inner: R) -> Self { ///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
///
/// let f = BufReader::with_capacity(1024, File::open("a.txt").await?);
/// #
/// # Ok(()) }) }
/// ```
pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
unsafe { unsafe {
let mut buffer = Vec::with_capacity(capacity); let mut buffer = Vec::with_capacity(capacity);
buffer.set_len(capacity); buffer.set_len(capacity);
inner.initializer().initialize(&mut buffer); inner.initializer().initialize(&mut buffer);
Self {
BufReader {
inner, inner,
buf: buffer.into_boxed_slice(), buf: buffer.into_boxed_slice(),
pos: 0, pos: 0,
@ -66,6 +109,21 @@ impl<R> BufReader<R> {
/// Gets a reference to the underlying reader. /// Gets a reference to the underlying reader.
/// ///
/// It is inadvisable to directly read from the underlying reader. /// It is inadvisable to directly read from the underlying reader.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
///
/// let f = BufReader::new(File::open("a.txt").await?);
/// let inner = f.get_ref();
/// #
/// # Ok(()) }) }
/// ```
pub fn get_ref(&self) -> &R { pub fn get_ref(&self) -> &R {
&self.inner &self.inner
} }
@ -73,31 +131,69 @@ impl<R> BufReader<R> {
/// Gets a mutable reference to the underlying reader. /// Gets a mutable reference to the underlying reader.
/// ///
/// It is inadvisable to directly read from the underlying reader. /// It is inadvisable to directly read from the underlying reader.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
///
/// let mut f = BufReader::new(File::open("a.txt").await?);
/// let inner = f.get_mut();
/// #
/// # Ok(()) }) }
/// ```
pub fn get_mut(&mut self) -> &mut R { pub fn get_mut(&mut self) -> &mut R {
&mut self.inner &mut self.inner
} }
/// Gets a pinned mutable reference to the underlying reader. /// Returns a reference to the internal buffer.
/// ///
/// It is inadvisable to directly read from the underlying reader. /// This function will not attempt to fill the buffer if it is empty.
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut R> { ///
self.inner() /// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
///
/// let f = BufReader::new(File::open("a.txt").await?);
/// let buffer = f.buffer();
/// #
/// # Ok(()) }) }
/// ```
pub fn buffer(&self) -> &[u8] {
&self.buf[self.pos..self.cap]
} }
/// Consumes this `BufWriter`, returning the underlying reader. /// Unwraps the buffered reader, returning the underlying reader.
/// ///
/// Note that any leftover data in the internal buffer is lost. /// Note that any leftover data in the internal buffer is lost.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
///
/// let f = BufReader::new(File::open("a.txt").await?);
/// let inner = f.into_inner();
/// #
/// # Ok(()) }) }
/// ```
pub fn into_inner(self) -> R { pub fn into_inner(self) -> R {
self.inner self.inner
} }
/// Returns a reference to the internally buffered data.
///
/// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
pub fn buffer(&self) -> &[u8] {
&self.buf[self.pos..self.cap]
}
/// Invalidates all data in the internal buffer. /// Invalidates all data in the internal buffer.
#[inline] #[inline]
fn discard_buffer(mut self: Pin<&mut Self>) { fn discard_buffer(mut self: Pin<&mut Self>) {
@ -192,27 +288,23 @@ impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
} }
impl<R: AsyncSeek> AsyncSeek for BufReader<R> { impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
/// Seek to an offset, in bytes, in the underlying reader. /// Seeks to an offset, in bytes, in the underlying reader.
/// ///
/// The position used for seeking with `SeekFrom::Current(_)` is the /// The position used for seeking with `SeekFrom::Current(_)` is the position the underlying
/// position the underlying reader would be at if the `BufReader` had no /// reader would be at if the `BufReader` had no internal buffer.
/// internal buffer.
/// ///
/// Seeking always discards the internal buffer, even if the seek position /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
/// would otherwise fall within it. This guarantees that calling /// within it. This guarantees that calling `.into_inner()` immediately after a seek yields the
/// `.into_inner()` immediately after a seek yields the underlying reader /// underlying reader at the same position.
/// at the same position.
/// ///
/// To seek without discarding the internal buffer, use /// See [`Seek`] for more details.
/// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
/// ///
/// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
/// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
/// the second seek returns `Err`, the underlying reader will be left at the same position it
/// would have if you called `seek` with `SeekFrom::Current(0)`.
/// ///
/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` /// [`Seek`]: trait.Seek.html
/// where `n` minus the internal buffer length overflows an `i64`, two
/// seeks will be performed instead of one. If the second seek returns
/// `Err`, the underlying reader will be left at the same position it would
/// have if you called `seek` with `SeekFrom::Current(0)`.
fn poll_seek( fn poll_seek(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -1,6 +1,7 @@
use futures::prelude::*;
use std::io; use std::io;
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite};
/// Copies the entire contents of a reader into a writer. /// Copies the entire contents of a reader into a writer.
/// ///
/// This function will continuously read data from `reader` and then /// This function will continuously read data from `reader` and then

@ -1,17 +1,18 @@
use std::future::Future;
use std::io::{self, IoSliceMut}; use std::io::{self, IoSliceMut};
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
use std::str; use std::str;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncRead; use futures::io::AsyncRead;
use crate::future::Future;
use crate::task::{Context, Poll};
cfg_if! { cfg_if! {
if #[cfg(feature = "docs.rs")] { if #[cfg(feature = "docs.rs")] {
#[doc(hidden)] #[doc(hidden)]
pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret { macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);

@ -1,15 +1,16 @@
use std::future::Future;
use std::io::{self, SeekFrom}; use std::io::{self, SeekFrom};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncSeek; use futures::io::AsyncSeek;
use crate::future::Future;
use crate::task::{Context, Poll};
cfg_if! { cfg_if! {
if #[cfg(feature = "docs.rs")] { if #[cfg(feature = "docs.rs")] {
#[doc(hidden)] #[doc(hidden)]
pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret { macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);

@ -1,13 +1,11 @@
use std::future::Future;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::prelude::*;
use crate::task::blocking; use crate::future::Future;
use crate::task::{blocking, Context, Poll};
/// Constructs a new handle to the standard error of the current process. /// Constructs a new handle to the standard error of the current process.
/// ///

@ -1,14 +1,13 @@
use std::future::Future;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future;
use futures::io::Initializer; use futures::io::Initializer;
use futures::prelude::*;
use crate::task::blocking; use crate::future::Future;
use crate::task::{blocking, Context, Poll};
/// Constructs a new handle to the standard input of the current process. /// Constructs a new handle to the standard input of the current process.
/// ///

@ -1,13 +1,11 @@
use std::future::Future;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::prelude::*;
use crate::task::blocking; use crate::future::Future;
use crate::task::{blocking, Context, Poll};
/// Constructs a new handle to the standard output of the current process. /// Constructs a new handle to the standard output of the current process.
/// ///

@ -1,16 +1,17 @@
use std::future::Future;
use std::io::{self, IoSlice}; use std::io::{self, IoSlice};
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncWrite; use futures::io::AsyncWrite;
use crate::future::Future;
use crate::task::{Context, Poll};
cfg_if! { cfg_if! {
if #[cfg(feature = "docs.rs")] { if #[cfg(feature = "docs.rs")] {
#[doc(hidden)] #[doc(hidden)]
pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret { macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);

@ -3,13 +3,13 @@ use std::io::{self, prelude::*};
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use futures::{prelude::*, ready}; use futures::io::{AsyncRead, AsyncWrite};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use mio::{self, Evented}; use mio::{self, Evented};
use slab::Slab; use slab::Slab;
use crate::task::{Context, Poll, Waker};
use crate::utils::abort_on_panic; use crate::utils::abort_on_panic;
/// Data associated with a registered I/O handle. /// Data associated with a registered I/O handle.
@ -302,7 +302,7 @@ impl<T: Evented + Unpin + Read> AsyncRead for IoHandle<T> {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
ready!(Pin::new(&mut *self).poll_readable(cx)?); futures::ready!(Pin::new(&mut *self).poll_readable(cx)?);
match self.source.read(buf) { match self.source.read(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -323,7 +323,7 @@ where
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
ready!(Pin::new(&mut *self).poll_readable(cx)?); futures::ready!(Pin::new(&mut *self).poll_readable(cx)?);
match (&self.source).read(buf) { match (&self.source).read(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -341,7 +341,7 @@ impl<T: Evented + Unpin + Write> AsyncWrite for IoHandle<T> {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
ready!(self.poll_writable(cx)?); futures::ready!(self.poll_writable(cx)?);
match self.source.write(buf) { match self.source.write(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -353,7 +353,7 @@ impl<T: Evented + Unpin + Write> AsyncWrite for IoHandle<T> {
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(self.poll_writable(cx)?); futures::ready!(self.poll_writable(cx)?);
match self.source.flush() { match self.source.flush() {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -378,7 +378,7 @@ where
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
ready!(self.poll_writable(cx)?); futures::ready!(self.poll_writable(cx)?);
match (&self.source).write(buf) { match (&self.source).write(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -390,7 +390,7 @@ where
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
ready!(self.poll_writable(cx)?); futures::ready!(self.poll_writable(cx)?);
match (&self.source).flush() { match (&self.source).flush() {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {

@ -2,13 +2,13 @@ use std::io::{self, IoSlice, IoSliceMut};
use std::mem; use std::mem;
use std::net::{self, SocketAddr, ToSocketAddrs}; use std::net::{self, SocketAddr, ToSocketAddrs};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::{prelude::*, ready}; use futures::future;
use futures::stream::FusedStream;
use crate::future::Future;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
use crate::task::{Context, Poll};
/// A TCP stream between a local and a remote socket. /// A TCP stream between a local and a remote socket.
/// ///
@ -260,7 +260,8 @@ impl TcpStream {
/// ``` /// ```
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
let res = future::poll_fn(|cx| { let res = future::poll_fn(|cx| {
ready!(self.io_handle.poll_readable(cx)?); futures::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().peek(buf) { match self.io_handle.get_ref().peek(buf) {
Ok(len) => Poll::Ready(Ok(len)), Ok(len) => Poll::Ready(Ok(len)),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -559,7 +560,7 @@ impl TcpListener {
/// ``` /// ```
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_readable(cx)?); futures::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().accept_std() { match self.io_handle.get_ref().accept_std() {
Ok((io, addr)) => { Ok((io, addr)) => {
@ -663,15 +664,11 @@ impl<'a> futures::Stream for Incoming<'a> {
let future = self.0.accept(); let future = self.0.accept();
pin_utils::pin_mut!(future); pin_utils::pin_mut!(future);
let (socket, _) = ready!(future.poll(cx))?; let (socket, _) = futures::ready!(future.poll(cx))?;
Poll::Ready(Some(Ok(socket))) Poll::Ready(Some(Ok(socket)))
} }
} }
impl<'a> FusedStream for Incoming<'a> {
fn is_terminated(&self) -> bool { false }
}
impl From<net::TcpStream> for TcpStream { impl From<net::TcpStream> for TcpStream {
/// Converts a `std::net::TcpStream` into its asynchronous equivalent. /// Converts a `std::net::TcpStream` into its asynchronous equivalent.
fn from(stream: net::TcpStream) -> TcpStream { fn from(stream: net::TcpStream) -> TcpStream {

@ -1,11 +1,11 @@
use std::io; use std::io;
use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs}; use std::net::{self, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs};
use std::task::Poll;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::{prelude::*, ready}; use futures::future;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
use crate::task::Poll;
/// A UDP socket. /// A UDP socket.
/// ///
@ -168,7 +168,7 @@ impl UdpSocket {
}; };
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_writable(cx)?); futures::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().send_to(buf, &addr) { match self.io_handle.get_ref().send_to(buf, &addr) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -204,7 +204,7 @@ impl UdpSocket {
/// ``` /// ```
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_readable(cx)?); futures::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().recv_from(buf) { match self.io_handle.get_ref().recv_from(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -288,7 +288,7 @@ impl UdpSocket {
/// ``` /// ```
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_writable(cx)?); futures::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().send(buf) { match self.io_handle.get_ref().send(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -324,7 +324,7 @@ impl UdpSocket {
/// ``` /// ```
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_readable(cx)?); futures::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().recv(buf) { match self.io_handle.get_ref().recv(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),

@ -6,15 +6,15 @@ use std::mem;
use std::net::Shutdown; use std::net::Shutdown;
use std::path::Path; use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::{prelude::*, ready}; use futures::future;
use mio_uds; use mio_uds;
use crate::future::Future;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::task::blocking; use crate::task::{blocking, Context, Poll};
/// A Unix datagram socket. /// A Unix datagram socket.
/// ///
@ -214,7 +214,7 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_readable(cx)?); futures::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().recv_from(buf) { match self.io_handle.get_ref().recv_from(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -248,7 +248,7 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_writable(cx)?); futures::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().recv(buf) { match self.io_handle.get_ref().recv(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -281,7 +281,7 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> { pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_writable(cx)?); futures::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().send_to(buf, path.as_ref()) { match self.io_handle.get_ref().send_to(buf, path.as_ref()) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -315,7 +315,7 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_writable(cx)?); futures::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().send(buf) { match self.io_handle.get_ref().send(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -457,7 +457,7 @@ impl UnixListener {
/// ``` /// ```
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
ready!(self.io_handle.poll_readable(cx)?); futures::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().accept_std() { match self.io_handle.get_ref().accept_std() {
Ok(Some((io, addr))) => { Ok(Some((io, addr))) => {
@ -560,14 +560,14 @@ impl fmt::Debug for UnixListener {
#[derive(Debug)] #[derive(Debug)]
pub struct Incoming<'a>(&'a UnixListener); pub struct Incoming<'a>(&'a UnixListener);
impl Stream for Incoming<'_> { impl futures::Stream for Incoming<'_> {
type Item = io::Result<UnixStream>; type Item = io::Result<UnixStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let future = self.0.accept(); let future = self.0.accept();
futures::pin_mut!(future); futures::pin_mut!(future);
let (socket, _) = ready!(future.poll(cx))?; let (socket, _) = futures::ready!(future.poll(cx))?;
Poll::Ready(Some(Ok(socket))) Poll::Ready(Some(Ok(socket)))
} }
} }
@ -639,7 +639,7 @@ impl UnixStream {
future::poll_fn(|cx| { future::poll_fn(|cx| {
match &mut state { match &mut state {
State::Waiting(stream) => { State::Waiting(stream) => {
ready!(stream.io_handle.poll_writable(cx)?); futures::ready!(stream.io_handle.poll_writable(cx)?);
if let Some(err) = stream.io_handle.get_ref().take_error()? { if let Some(err) = stream.io_handle.get_ref().take_error()? {
return Poll::Ready(Err(err)); return Poll::Ready(Err(err));

@ -1,6 +1,7 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use crate::task::{Context, Poll};
/// Creates a stream that doesn't yield any items. /// Creates a stream that doesn't yield any items.
/// ///

@ -1,5 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use crate::task::{Context, Poll};
/// Creates a stream that yields a single item. /// Creates a stream that yields a single item.
/// ///

@ -1,5 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use crate::task::{Context, Poll};
/// Creates a stream that yields the same item repeatedly. /// Creates a stream that yields the same item repeatedly.
/// ///

@ -21,12 +21,13 @@
//! # }) } //! # }) }
//! ``` //! ```
use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use crate::future::Future;
use crate::task::{Context, Poll};
cfg_if! { cfg_if! {
if #[cfg(feature = "docs.rs")] { if #[cfg(feature = "docs.rs")] {
#[doc(hidden)] #[doc(hidden)]

@ -1,13 +1,14 @@
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::fmt; use std::fmt;
use std::future::Future;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
use slab::Slab; use slab::Slab;
use crate::future::Future;
use crate::task::{Context, Poll, Waker};
/// Set if the mutex is locked. /// Set if the mutex is locked.
const LOCK: usize = 1 << 0; const LOCK: usize = 1 << 0;

@ -1,13 +1,14 @@
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::fmt; use std::fmt;
use std::future::Future;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
use slab::Slab; use slab::Slab;
use crate::future::Future;
use crate::task::{Context, Poll, Waker};
/// Set if a write lock is held. /// Set if a write lock is held.
const WRITE_LOCK: usize = 1 << 0; const WRITE_LOCK: usize = 1 << 0;

@ -1,14 +1,14 @@
//! A thread pool for running blocking functions asynchronously. //! A thread pool for running blocking functions asynchronously.
use std::fmt; use std::fmt;
use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread; use std::thread;
use crossbeam::channel::{unbounded, Receiver, Sender}; use crossbeam::channel::{unbounded, Receiver, Sender};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use crate::future::Future;
use crate::task::{Context, Poll};
use crate::utils::abort_on_panic; use crate::utils::abort_on_panic;
struct Pool { struct Pool {

@ -1,6 +1,5 @@
use std::cell::{Cell, UnsafeCell}; use std::cell::{Cell, UnsafeCell};
use std::fmt::Arguments; use std::fmt::Arguments;
use std::future::Future;
use std::io; use std::io;
use std::mem; use std::mem;
use std::panic::{self, AssertUnwindSafe}; use std::panic::{self, AssertUnwindSafe};
@ -9,11 +8,12 @@ use std::ptr;
use std::thread; use std::thread;
use crossbeam::channel::{unbounded, Sender}; use crossbeam::channel::{unbounded, Sender};
use futures::prelude::*; use futures::future::FutureExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use super::task; use super::task;
use super::{JoinHandle, Task}; use super::{JoinHandle, Task};
use crate::future::Future;
/// Returns a handle to the current task. /// Returns a handle to the current task.
/// ///

@ -1,6 +1,6 @@
use std::time::Duration; use std::time::Duration;
use futures::prelude::*; use futures::future;
use crate::time::Timeout; use crate::time::Timeout;

@ -1,14 +1,14 @@
use std::fmt; use std::fmt;
use std::future::Future;
use std::i64; use std::i64;
use std::mem; use std::mem;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll};
use super::local; use super::local;
use crate::future::Future;
use crate::task::{Context, Poll};
/// A handle to a task. /// A handle to a task.
#[derive(Clone)] #[derive(Clone)]

@ -28,16 +28,17 @@
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::future::Future;
use std::io; use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_timer::Delay; use futures_timer::Delay;
use pin_utils::unsafe_pinned; use pin_utils::unsafe_pinned;
use crate::future::Future;
use crate::task::{Context, Poll};
cfg_if! { cfg_if! {
if #[cfg(feature = "docs.rs")] { if #[cfg(feature = "docs.rs")] {
#[doc(hidden)] #[doc(hidden)]

@ -2,10 +2,10 @@
use std::sync::Arc; use std::sync::Arc;
use async_std::prelude::*;
use async_std::sync::Mutex; use async_std::sync::Mutex;
use async_std::task; use async_std::task;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::prelude::*;
#[test] #[test]
fn smoke() { fn smoke() {

@ -7,10 +7,10 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use async_std::prelude::*;
use async_std::sync::RwLock; use async_std::sync::RwLock;
use async_std::task; use async_std::task;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::prelude::*;
/// Generates a random number in `0..n`. /// Generates a random number in `0..n`.
pub fn random(n: u32) -> u32 { pub fn random(n: u32) -> u32 {

Loading…
Cancel
Save