Revamp IO traits and Stream trait

integrate-threads-example
Stjepan Glavina 5 years ago
parent 43d822cbc5
commit e44451a042

@ -26,6 +26,7 @@ futures-preview = "0.3.0-alpha.17"
futures-timer = "0.3.0" futures-timer = "0.3.0"
lazy_static = "1.3.0" lazy_static = "1.3.0"
log = { version = "0.4.8", features = ["kv_unstable"] } log = { version = "0.4.8", features = ["kv_unstable"] }
memchr = "2.2.1"
mio = "0.6.19" mio = "0.6.19"
mio-uds = "0.6.7" mio-uds = "0.6.7"
num_cpus = "1.10.0" num_cpus = "1.10.0"

@ -483,7 +483,7 @@ impl File {
} }
} }
impl AsyncRead for File { impl futures::io::AsyncRead for File {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -498,7 +498,7 @@ impl AsyncRead for File {
} }
} }
impl AsyncRead for &File { impl futures::io::AsyncRead for &File {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -564,7 +564,7 @@ impl AsyncRead for &File {
} }
} }
impl AsyncWrite for File { impl futures::io::AsyncWrite for File {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -582,7 +582,7 @@ impl AsyncWrite for File {
} }
} }
impl AsyncWrite for &File { impl futures::io::AsyncWrite for &File {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -693,7 +693,7 @@ impl AsyncWrite for &File {
} }
} }
impl AsyncSeek for File { impl futures::io::AsyncSeek for File {
fn poll_seek( fn poll_seek(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -3,44 +3,8 @@
#[doc(inline)] #[doc(inline)]
pub use std::future::Future; pub use std::future::Future;
/// Never resolves to a value. pub use pending::pending;
/// pub use ready::ready;
/// # Examples
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::future::pending;
/// use async_std::prelude::*;
/// use std::time::Duration;
///
/// let dur = Duration::from_secs(1);
/// assert!(pending::<()>().timeout(dur).await.is_err());
/// #
/// # }) }
/// ```
pub async fn pending<T>() -> T {
futures::future::pending::<T>().await
}
/// Resolves to the provided value. mod pending;
/// mod ready;
/// This function is an async version of [`std::convert::identity`].
///
/// [`std::convert::identity`]: https://doc.rust-lang.org/std/convert/fn.identity.html
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::future::ready;
///
/// assert_eq!(ready(10).await, 10);
/// #
/// # }) }
/// ```
pub async fn ready<T>(val: T) -> T {
val
}

@ -0,0 +1,19 @@
/// Never resolves to a value.
///
/// # Examples
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::future::pending;
/// use async_std::prelude::*;
/// use std::time::Duration;
///
/// let dur = Duration::from_secs(1);
/// assert!(pending::<()>().timeout(dur).await.is_err());
/// #
/// # }) }
/// ```
pub async fn pending<T>() -> T {
futures::future::pending::<T>().await
}

@ -0,0 +1,21 @@
/// Resolves to the provided value.
///
/// This function is an async version of [`std::convert::identity`].
///
/// [`std::convert::identity`]: https://doc.rust-lang.org/std/convert/fn.identity.html
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::future::ready;
///
/// assert_eq!(ready(10).await, 10);
/// #
/// # }) }
/// ```
pub async fn ready<T>(val: T) -> T {
val
}

@ -0,0 +1,321 @@
use std::future::Future;
use std::io::{self};
use std::mem;
use std::pin::Pin;
use std::str;
use std::task::{Context, Poll};
use cfg_if::cfg_if;
use futures::io::AsyncBufRead;
use futures::Stream;
cfg_if! {
if #[cfg(feature = "docs.rs")] {
#[doc(hidden)]
pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
/// Allows reading from a buffered byte stream.
///
/// This trait is an async version of [`std::io::BufRead`].
///
/// While it is currently not possible to implement this trait directly, it get implemented
/// automatically for all types that implement [`futures::io::AsyncBufRead`].
///
/// [`std::io::BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
/// [`futures::io::AsyncBufRead`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html
pub trait BufRead {
/// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
///
/// This function will read bytes from the underlying stream until the delimiter or EOF is
/// found. Once found, all bytes up to, and including, the delimiter (if found) will be
/// appended to `buf`.
///
/// If successful, this function will return the total number of bytes read.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, io::BufReader, prelude::*};
///
/// let mut f = BufReader::new(File::open("a.txt").await?);
///
/// let mut buf = vec![0; 1024];
/// let n = f.read_until(b'\n', &mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_until<'a>(
&'a mut self,
byte: u8,
buf: &'a mut Vec<u8>,
) -> ret!('a, ReadUntilFuture, io::Result<usize>)
where
Self: Unpin,
{
ReadUntilFuture {
reader: self,
byte,
buf,
read: 0,
}
}
/// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is reached.
///
/// This function will read bytes from the underlying stream until the newline delimiter (the
/// 0xA byte) or EOF is found. Once found, all bytes up to, and including, the delimiter (if
/// found) will be appended to `buf`.
///
/// If successful, this function will return the total number of bytes read.
///
/// If this function returns `Ok(0)`, the stream has reached EOF.
///
/// # Errors
///
/// This function has the same error semantics as [`read_until`] and will also return an error
/// if the read bytes are not valid UTF-8. If an I/O error is encountered then `buf` may
/// contain some bytes already read in the event that all data read so far was valid UTF-8.
///
/// [`read_until`]: #method.read_until
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, io::BufReader, prelude::*};
///
/// let mut f = BufReader::new(File::open("a.txt").await?);
///
/// let mut buf = String::new();
/// f.read_line(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_line<'a>(
&'a mut self,
buf: &'a mut String,
) -> ret!('a, ReadLineFuture, io::Result<usize>)
where
Self: Unpin,
{
ReadLineFuture {
reader: self,
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
buf,
read: 0,
}
}
/// Returns a stream over the lines of this byte stream.
///
/// The stream returned from this function will yield instances of
/// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte (the
/// 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
///
/// [`io::Result`]: type.Result.html
/// [`String`]: https://doc.rust-lang.org/std/string/struct.String.html
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, io::BufReader, prelude::*};
///
/// let mut f = BufReader::new(File::open("a.txt").await?);
///
/// let mut lines = f.lines();
/// let mut count = 0;
///
/// for line in lines.next().await {
/// line?;
/// count += 1;
/// }
/// #
/// # Ok(()) }) }
/// ```
fn lines(self) -> Lines<Self>
where
Self: Unpin + Sized,
{
Lines {
reader: self,
buf: String::new(),
bytes: Vec::new(),
read: 0,
}
}
}
impl<T: AsyncBufRead + Unpin + ?Sized> BufRead for T {}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
byte: u8,
buf: &'a mut Vec<u8>,
read: usize,
}
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
byte,
buf,
read,
} = &mut *self;
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadLineFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut String,
bytes: Vec<u8>,
read: usize,
}
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
bytes,
read,
} = &mut *self;
let reader = Pin::new(reader);
let ret = futures::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
}
/// A stream of lines in a byte stream.
///
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Lines`].
///
/// [`lines`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html
#[derive(Debug)]
pub struct Lines<R> {
reader: R,
buf: String,
bytes: Vec<u8>,
read: usize,
}
impl<R: AsyncBufRead> Stream for Lines<R> {
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
reader,
buf,
bytes,
read,
} = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() {
return Poll::Ready(None);
}
if buf.ends_with('\n') {
buf.pop();
if buf.ends_with('\r') {
buf.pop();
}
}
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
}
}
pub fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,
bytes: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
let ret = futures::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
pub fn read_until_internal<R: AsyncBufRead + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
byte: u8,
buf: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
loop {
let (done, used) = {
let available = futures::ready!(reader.as_mut().poll_fill_buf(cx))?;
if let Some(i) = memchr::memchr(byte, available) {
buf.extend_from_slice(&available[..=i]);
(true, i + 1)
} else {
buf.extend_from_slice(available);
(false, available.len())
}
};
reader.as_mut().consume(used);
*read += used;
if done || used == 0 {
return Poll::Ready(Ok(mem::replace(read, 0)));
}
}
}

@ -0,0 +1,253 @@
use std::io::{self, IoSliceMut, Read, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, fmt};
use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer};
// used by `BufReader` and `BufWriter`
// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
/// The `BufReader` struct adds buffering to any reader.
///
/// It can be excessively inefficient to work directly with a [`AsyncRead`]
/// instance. A `BufReader` performs large, infrequent reads on the underlying
/// [`AsyncRead`] and maintains an in-memory buffer of the results.
///
/// `BufReader` can improve the speed of programs that make *small* and
/// *repeated* read calls to the same file or network socket. It does not
/// help when reading very large amounts at once, or reading just one or a few
/// times. It also provides no advantage when reading from a source that is
/// already in memory, like a `Vec<u8>`.
///
/// When the `BufReader` is dropped, the contents of its buffer will be
/// discarded. Creating multiple instances of a `BufReader` on the same
/// stream can cause data loss.
///
/// [`AsyncRead`]: futures_io::AsyncRead
///
// TODO: Examples
pub struct BufReader<R> {
inner: R,
buf: Box<[u8]>,
pos: usize,
cap: usize,
}
impl<R: AsyncRead> BufReader<R> {
/// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB,
/// but may change in the future.
pub fn new(inner: R) -> Self {
Self::with_capacity(DEFAULT_BUF_SIZE, inner)
}
/// Creates a new `BufReader` with the specified buffer capacity.
pub fn with_capacity(capacity: usize, inner: R) -> Self {
unsafe {
let mut buffer = Vec::with_capacity(capacity);
buffer.set_len(capacity);
inner.initializer().initialize(&mut buffer);
Self {
inner,
buf: buffer.into_boxed_slice(),
pos: 0,
cap: 0,
}
}
}
}
impl<R> BufReader<R> {
pin_utils::unsafe_pinned!(inner: R);
pin_utils::unsafe_unpinned!(pos: usize);
pin_utils::unsafe_unpinned!(cap: usize);
/// Gets a reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_ref(&self) -> &R {
&self.inner
}
/// Gets a mutable reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_mut(&mut self) -> &mut R {
&mut self.inner
}
/// Gets a pinned mutable reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut R> {
self.inner()
}
/// Consumes this `BufWriter`, returning the underlying reader.
///
/// Note that any leftover data in the internal buffer is lost.
pub fn into_inner(self) -> R {
self.inner
}
/// Returns a reference to the internally buffered data.
///
/// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty.
pub fn buffer(&self) -> &[u8] {
&self.buf[self.pos..self.cap]
}
/// Invalidates all data in the internal buffer.
#[inline]
fn discard_buffer(mut self: Pin<&mut Self>) {
*self.as_mut().pos() = 0;
*self.cap() = 0;
}
}
impl<R: AsyncRead> AsyncRead for BufReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// If we don't have any buffered data and we're doing a massive read
// (larger than our internal buffer), bypass our internal buffer
// entirely.
if self.pos == self.cap && buf.len() >= self.buf.len() {
let res = futures::ready!(self.as_mut().inner().poll_read(cx, buf));
self.discard_buffer();
return Poll::Ready(res);
}
let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = rem.read(buf)?;
self.consume(nread);
Poll::Ready(Ok(nread))
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.pos == self.cap && total_len >= self.buf.len() {
let res = futures::ready!(self.as_mut().inner().poll_read_vectored(cx, bufs));
self.discard_buffer();
return Poll::Ready(res);
}
let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = rem.read_vectored(bufs)?;
self.consume(nread);
Poll::Ready(Ok(nread))
}
// we can't skip unconditionally because of the large buffer case in read.
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
}
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&'a [u8]>> {
let Self {
inner,
buf,
cap,
pos,
} = unsafe { self.get_unchecked_mut() };
let mut inner = unsafe { Pin::new_unchecked(inner) };
// If we've reached the end of our internal buffer then we need to fetch
// some more data from the underlying reader.
// Branch using `>=` instead of the more correct `==`
// to tell the compiler that the pos..cap slice is always valid.
if *pos >= *cap {
debug_assert!(*pos == *cap);
*cap = futures::ready!(inner.as_mut().poll_read(cx, buf))?;
*pos = 0;
}
Poll::Ready(Ok(&buf[*pos..*cap]))
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
*self.as_mut().pos() = cmp::min(self.pos + amt, self.cap);
}
}
impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufReader")
.field("reader", &self.inner)
.field(
"buffer",
&format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
)
.finish()
}
}
impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
/// Seek to an offset, in bytes, in the underlying reader.
///
/// The position used for seeking with `SeekFrom::Current(_)` is the
/// position the underlying reader would be at if the `BufReader` had no
/// internal buffer.
///
/// Seeking always discards the internal buffer, even if the seek position
/// would otherwise fall within it. This guarantees that calling
/// `.into_inner()` immediately after a seek yields the underlying reader
/// at the same position.
///
/// To seek without discarding the internal buffer, use
/// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative).
///
/// See [`AsyncSeek`](futures_io::AsyncSeek) for more details.
///
/// Note: In the edge case where you're seeking with `SeekFrom::Current(n)`
/// where `n` minus the internal buffer length overflows an `i64`, two
/// seeks will be performed instead of one. If the second seek returns
/// `Err`, the underlying reader will be left at the same position it would
/// have if you called `seek` with `SeekFrom::Current(0)`.
fn poll_seek(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
let result: u64;
if let SeekFrom::Current(n) = pos {
let remainder = (self.cap - self.pos) as i64;
// it should be safe to assume that remainder fits within an i64 as the alternative
// means we managed to allocate 8 exbibytes and that's absurd.
// But it's not out of the realm of possibility for some weird underlying reader to
// support seeking by i64::min_value() so we need to handle underflow when subtracting
// remainder.
if let Some(offset) = n.checked_sub(remainder) {
result = futures::ready!(
self.as_mut()
.inner()
.poll_seek(cx, SeekFrom::Current(offset))
)?;
} else {
// seek backwards by our remainder, and then by the offset
futures::ready!(
self.as_mut()
.inner()
.poll_seek(cx, SeekFrom::Current(-remainder))
)?;
self.as_mut().discard_buffer();
result =
futures::ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?;
}
} else {
// Seeking with Start/End doesn't care about our buffer length.
result = futures::ready!(self.as_mut().inner().poll_seek(cx, pos))?;
}
self.discard_buffer();
Poll::Ready(Ok(result))
}
}

@ -22,17 +22,24 @@
//! ``` //! ```
#[doc(inline)] #[doc(inline)]
pub use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, SeekFrom}; pub use std::io::{empty, sink, Cursor, Empty, Error, ErrorKind, Result, SeekFrom, Sink};
pub use buf_read::{BufRead, Lines};
pub use buf_reader::BufReader;
pub use copy::copy; pub use copy::copy;
pub use read::Read;
pub use seek::Seek;
pub use stderr::{stderr, Stderr}; pub use stderr::{stderr, Stderr};
pub use stdin::{stdin, Stdin}; pub use stdin::{stdin, Stdin};
pub use stdout::{stdout, Stdout}; pub use stdout::{stdout, Stdout};
pub use write::Write;
mod buf_read;
mod buf_reader;
mod copy; mod copy;
mod read;
mod seek;
mod stderr; mod stderr;
mod stdin; mod stdin;
mod stdout; mod stdout;
mod write;
#[doc(inline)]
pub use std::io::{empty, sink, Cursor, Empty, Error, ErrorKind, Result, Sink};

@ -0,0 +1,392 @@
use std::future::Future;
use std::io::{self, IoSliceMut};
use std::mem;
use std::pin::Pin;
use std::str;
use std::task::{Context, Poll};
use cfg_if::cfg_if;
use futures::io::AsyncRead;
cfg_if! {
if #[cfg(feature = "docs.rs")] {
#[doc(hidden)]
pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
/// Allows reading from a byte stream.
///
/// This trait is an async version of [`std::io::Read`].
///
/// While it is currently not possible to implement this trait directly, it get implemented
/// automatically for all types that implement [`futures::io::AsyncRead`].
///
/// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
/// [`futures::io::AsyncRead`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncRead.html
pub trait Read {
/// Reads some bytes from the byte stream.
///
/// Returns the number of bytes read from the start of the buffer.
///
/// If the return value is `Ok(n)`, then it must be guaranteed that `0 <= n <= buf.len()`. A
/// nonzero `n` value indicates that the buffer has been filled in with `n` bytes of data. If
/// `n` is `0`, then it can indicate one of two scenarios:
///
/// 1. This reader has reached its "end of file" and will likely no longer be able to produce
/// bytes. Note that this does not mean that the reader will always no longer be able to
/// produce bytes.
/// 2. The buffer specified was 0 bytes in length.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, prelude::*};
///
/// let mut f = File::open("a.txt").await?;
///
/// let mut buf = vec![0; 1024];
/// let n = f.read(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result<usize>)
where
Self: Unpin;
/// Like [`read`], except that it reads into a slice of buffers.
///
/// Data is copied to fill each buffer in order, with the final buffer written to possibly
/// being only partially filled. This method must behave as a single call to [`read`] with the
/// buffers concatenated would.
///
/// The default implementation calls [`read`] with either the first nonempty buffer provided,
/// or an empty one if none exists.
///
/// [`read`]: #tymethod.read
fn read_vectored<'a>(
&'a mut self,
bufs: &'a mut [IoSliceMut<'a>],
) -> ret!('a, ReadVectoredFuture, io::Result<usize>)
where
Self: Unpin,
{
ReadVectoredFuture { reader: self, bufs }
}
/// Reads all bytes from the byte stream.
///
/// All bytes read from this stream will be appended to the specified buffer `buf`. This
/// function will continuously call [`read`] to append more data to `buf` until [`read`]
/// returns either `Ok(0)` or an error.
///
/// If successful, this function will return the total number of bytes read.
///
/// [`read`]: #tymethod.read
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, prelude::*};
///
/// let mut f = File::open("a.txt").await?;
///
/// let mut buf = Vec::new();
/// f.read_to_end(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_to_end<'a>(
&'a mut self,
buf: &'a mut Vec<u8>,
) -> ret!('a, ReadToEndFuture, io::Result<usize>)
where
Self: Unpin,
{
let start_len = buf.len();
ReadToEndFuture {
reader: self,
buf,
start_len,
}
}
/// Reads all bytes from the byte stream and appends them into a string.
///
/// If successful, this function will return the number of bytes read.
///
/// If the data in this stream is not valid UTF-8 then an error will be returned and `buf` will
/// be left unmodified.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, prelude::*};
///
/// let mut f = File::open("a.txt").await?;
///
/// let mut buf = String::new();
/// f.read_to_string(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_to_string<'a>(
&'a mut self,
buf: &'a mut String,
) -> ret!('a, ReadToStringFuture, io::Result<usize>)
where
Self: Unpin,
{
let start_len = buf.len();
ReadToStringFuture {
reader: self,
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
buf,
start_len,
}
}
/// Reads the exact number of bytes required to fill `buf`.
///
/// This function reads as many bytes as necessary to completely fill the specified buffer
/// `buf`.
///
/// No guarantees are provided about the contents of `buf` when this function is called,
/// implementations cannot rely on any property of the contents of `buf` being true. It is
/// recommended that implementations only write data to `buf` instead of reading its contents.
///
/// If this function encounters an "end of file" before completely filling the buffer, it
/// returns an error of the kind [`ErrorKind::UnexpectedEof`]. The contents of `buf` are
/// unspecified in this case.
///
/// If any other read error is encountered then this function immediately returns. The contents
/// of `buf` are unspecified in this case.
///
/// If this function returns an error, it is unspecified how many bytes it has read, but it
/// will never read more than would be necessary to completely fill the buffer.
///
/// [`ErrorKind::UnexpectedEof`]: enum.ErrorKind.html#variant.UnexpectedEof
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, prelude::*};
///
/// let mut f = File::open("a.txt").await?;
///
/// let mut buf = vec![0; 10];
/// f.read_exact(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadExactFuture, io::Result<()>)
where
Self: Unpin,
{
ReadExactFuture { reader: self, buf }
}
}
impl<T: AsyncRead + Unpin + ?Sized> Read for T {
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result<usize>) {
ReadFuture { reader: self, buf }
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut [u8],
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf } = &mut *self;
Pin::new(reader).poll_read(cx, buf)
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
bufs: &'a mut [IoSliceMut<'a>],
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, bufs } = &mut *self;
Pin::new(reader).poll_read_vectored(cx, bufs)
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut Vec<u8>,
start_len: usize,
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
start_len,
} = &mut *self;
read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut String,
bytes: Vec<u8>,
start_len: usize,
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
bytes,
start_len,
} = &mut *self;
let reader = Pin::new(reader);
let ret = futures::ready!(read_to_end_internal(reader, cx, bytes, *start_len));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadExactFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut [u8],
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf } = &mut *self;
while !buf.is_empty() {
let n = futures::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n);
*buf = rest;
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into()));
}
}
Poll::Ready(Ok(()))
}
}
// This uses an adaptive system to extend the vector when it fills. We want to
// avoid paying to allocate and zero a huge chunk of memory if the reader only
// has 4 bytes while still making large reads if the reader does have a ton
// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
// time is 4,500 times (!) slower than this if the reader has a very small
// amount of data to return.
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
pub fn read_to_end_internal<R: AsyncRead + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,
start_len: usize,
) -> Poll<io::Result<usize>> {
struct Guard<'a> {
buf: &'a mut Vec<u8>,
len: usize,
}
impl Drop for Guard<'_> {
fn drop(&mut self) {
unsafe {
self.buf.set_len(self.len);
}
}
}
let mut g = Guard {
len: buf.len(),
buf,
};
let ret;
loop {
if g.len == g.buf.len() {
unsafe {
g.buf.reserve(32);
let capacity = g.buf.capacity();
g.buf.set_len(capacity);
rd.initializer().initialize(&mut g.buf[g.len..]);
}
}
match futures::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => {
ret = Poll::Ready(Ok(g.len - start_len));
break;
}
Ok(n) => g.len += n,
Err(e) => {
ret = Poll::Ready(Err(e));
break;
}
}
}
ret
}

@ -0,0 +1,81 @@
use std::future::Future;
use std::io::{self, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
use cfg_if::cfg_if;
use futures::io::AsyncSeek;
cfg_if! {
if #[cfg(feature = "docs.rs")] {
#[doc(hidden)]
pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
/// Allows seeking through a byte stream.
///
/// This trait is an async version of [`std::io::Seek`].
///
/// While it is currently not possible to implement this trait directly, it get implemented
/// automatically for all types that implement [`futures::io::AsyncSeek`].
///
/// [`std::io::Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
/// [`futures::io::AsyncSeek`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncSeek.html
pub trait Seek {
/// Seeks to a new position in a byte stream.
///
/// Returns the new position in the byte stream.
///
/// A seek beyond the end of stream is allowed, but behavior is defined by the
/// implementation.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, io::SeekFrom, prelude::*};
///
/// let mut f = File::open("a.txt").await?;
///
/// let file_len = f.seek(SeekFrom::End(0)).await?;
/// #
/// # Ok(()) }) }
/// ```
fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result<u64>)
where
Self: Unpin;
}
impl<T: AsyncSeek + Unpin + ?Sized> Seek for T {
fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result<u64>) {
SeekFuture { seeker: self, pos }
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct SeekFuture<'a, T: Unpin + ?Sized> {
seeker: &'a mut T,
pos: SeekFrom,
}
impl<T: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, T> {
type Output = io::Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pos = self.pos;
Pin::new(&mut *self.seeker).poll_seek(cx, pos)
}
}

@ -81,7 +81,7 @@ enum Operation {
Flush(io::Result<()>), Flush(io::Result<()>),
} }
impl AsyncWrite for Stderr { impl futures::io::AsyncWrite for Stderr {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -140,7 +140,7 @@ impl Stdin {
} }
} }
impl AsyncRead for Stdin { impl futures::io::AsyncRead for Stdin {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -81,7 +81,7 @@ enum Operation {
Flush(io::Result<()>), Flush(io::Result<()>),
} }
impl AsyncWrite for Stdout { impl futures::io::AsyncWrite for Stdout {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -0,0 +1,213 @@
use std::future::Future;
use std::io::{self, IoSlice};
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use cfg_if::cfg_if;
use futures::io::AsyncWrite;
cfg_if! {
if #[cfg(feature = "docs.rs")] {
#[doc(hidden)]
pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
/// Allows writing to a byte stream.
///
/// This trait is an async version of [`std::io::Write`].
///
/// While it is currently not possible to implement this trait directly, it get implemented
/// automatically for all types that implement [`futures::io::AsyncWrite`].
///
/// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
/// [`futures::io::AsyncWrite`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncWrite.html
pub trait Write {
/// Writes some bytes into the byte stream.
///
/// Returns the number of bytes written from the start of the buffer.
///
/// If the return value is `Ok(n)` then it must be guaranteed that `0 <= n <= buf.len()`. A
/// return value of `0` typically means that the underlying object is no longer able to accept
/// bytes and will likely not be able to in the future as well, or that the buffer provided is
/// empty.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, prelude::*};
///
/// let mut f = File::create("a.txt").await?;
///
/// let n = f.write(b"hello world").await?;
/// #
/// # Ok(()) }) }
/// ```
fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result<usize>)
where
Self: Unpin;
/// Flushes the stream to ensure that all buffered contents reach their destination.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, prelude::*};
///
/// let mut f = File::create("a.txt").await?;
///
/// f.write_all(b"hello world").await?;
/// f.flush().await?;
/// #
/// # Ok(()) }) }
/// ```
fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>)
where
Self: Unpin;
/// Like [`write`], except that it writes from a slice of buffers.
///
/// Data is copied from each buffer in order, with the final buffer read from possibly being
/// only partially consumed. This method must behave as a call to [`write`] with the buffers
/// concatenated would.
///
/// The default implementation calls [`write`] with either the first nonempty buffer provided,
/// or an empty one if none exists.
///
/// [`write`]: #tymethod.write
fn write_vectored<'a>(
&'a mut self,
bufs: &'a [IoSlice<'a>],
) -> ret!('a, WriteVectoredFuture, io::Result<usize>)
where
Self: Unpin,
{
WriteVectoredFuture { writer: self, bufs }
}
/// Writes an entire buffer into the byte stream.
///
/// This method will continuously call [`write`] until there is no more data to be written or
/// an error is returned. This method will not return until the entire buffer has been
/// successfully written or such an error occurs.
///
/// # Examples
///
/// ```no_run
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::{fs::File, prelude::*};
///
/// let mut f = File::create("a.txt").await?;
///
/// f.write_all(b"hello world").await?;
/// #
/// # Ok(()) }) }
/// ```
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteAllFuture, io::Result<()>)
where
Self: Unpin,
{
WriteAllFuture { writer: self, buf }
}
}
impl<T: AsyncWrite + Unpin + ?Sized> Write for T {
fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result<usize>) {
WriteFuture { writer: self, buf }
}
fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>) {
FlushFuture { writer: self }
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct WriteFuture<'a, T: Unpin + ?Sized> {
writer: &'a mut T,
buf: &'a [u8],
}
impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let buf = self.buf;
Pin::new(&mut *self.writer).poll_write(cx, buf)
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FlushFuture<'a, T: Unpin + ?Sized> {
writer: &'a mut T,
}
impl<T: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.writer).poll_flush(cx)
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct WriteVectoredFuture<'a, T: Unpin + ?Sized> {
writer: &'a mut T,
bufs: &'a [IoSlice<'a>],
}
impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let bufs = self.bufs;
Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct WriteAllFuture<'a, T: Unpin + ?Sized> {
writer: &'a mut T,
buf: &'a [u8],
}
impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { writer, buf } = &mut *self;
while !buf.is_empty() {
let n = futures::ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
let (_, rest) = mem::replace(buf, &[]).split_at(n);
*buf = rest;
if n == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}
}
Poll::Ready(Ok(()))
}
}

@ -352,7 +352,7 @@ impl TcpStream {
} }
} }
impl AsyncRead for TcpStream { impl futures::io::AsyncRead for TcpStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -370,7 +370,7 @@ impl AsyncRead for TcpStream {
} }
} }
impl AsyncRead for &TcpStream { impl futures::io::AsyncRead for &TcpStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -388,7 +388,7 @@ impl AsyncRead for &TcpStream {
} }
} }
impl AsyncWrite for TcpStream { impl futures::io::AsyncWrite for TcpStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -414,7 +414,7 @@ impl AsyncWrite for TcpStream {
} }
} }
impl AsyncWrite for &TcpStream { impl futures::io::AsyncWrite for &TcpStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -656,7 +656,7 @@ impl TcpListener {
#[derive(Debug)] #[derive(Debug)]
pub struct Incoming<'a>(&'a TcpListener); pub struct Incoming<'a>(&'a TcpListener);
impl<'a> Stream for Incoming<'a> { impl<'a> futures::Stream for Incoming<'a> {
type Item = io::Result<TcpStream>; type Item = io::Result<TcpStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

@ -755,7 +755,7 @@ impl UnixStream {
} }
} }
impl AsyncRead for UnixStream { impl futures::io::AsyncRead for UnixStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -765,7 +765,7 @@ impl AsyncRead for UnixStream {
} }
} }
impl AsyncRead for &UnixStream { impl futures::io::AsyncRead for &UnixStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -775,7 +775,7 @@ impl AsyncRead for &UnixStream {
} }
} }
impl AsyncWrite for UnixStream { impl futures::io::AsyncWrite for UnixStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -793,7 +793,7 @@ impl AsyncWrite for UnixStream {
} }
} }
impl AsyncWrite for &UnixStream { impl futures::io::AsyncWrite for &UnixStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -25,21 +25,16 @@
//! [`timeout`]: ../time/trait.Timeout.html#method.timeout //! [`timeout`]: ../time/trait.Timeout.html#method.timeout
#[doc(no_inline)] #[doc(no_inline)]
pub use futures::future::FutureExt as _; pub use crate::future::Future;
#[doc(no_inline)] #[doc(no_inline)]
pub use futures::future::TryFutureExt as _; pub use crate::io::BufRead as _;
#[doc(no_inline)] #[doc(no_inline)]
pub use futures::io::AsyncBufReadExt as _; pub use crate::io::Read as _;
#[doc(no_inline)] #[doc(no_inline)]
pub use futures::io::AsyncReadExt as _; pub use crate::io::Seek as _;
#[doc(no_inline)] #[doc(no_inline)]
pub use futures::io::AsyncSeekExt as _; pub use crate::io::Write as _;
#[doc(no_inline)] #[doc(no_inline)]
pub use futures::io::AsyncWriteExt as _; pub use crate::stream::Stream;
#[doc(no_inline)]
pub use futures::stream::StreamExt as _;
#[doc(no_inline)]
pub use futures::stream::TryStreamExt as _;
#[doc(no_inline)] #[doc(no_inline)]
pub use crate::time::Timeout as _; pub use crate::time::Timeout as _;

@ -0,0 +1,43 @@
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
/// Creates a stream that doesn't yield any items.
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::{prelude::*, stream};
///
/// let mut s = stream::empty::<i32>();
///
/// assert_eq!(s.next().await, None);
/// #
/// # }) }
/// ```
pub fn empty<T>() -> Empty<T> {
Empty {
_marker: PhantomData,
}
}
/// A stream that doesn't yield any items.
///
/// This stream is constructed by the [`empty`] function.
///
/// [`empty`]: fn.empty.html
#[derive(Debug)]
pub struct Empty<T> {
_marker: PhantomData<T>,
}
impl<T> futures::Stream for Empty<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}

@ -1,4 +1,4 @@
//! Composable asynchronous iteration. //! Asynchronous iteration.
//! //!
//! This module is an async version of [`std::iter`]. //! This module is an async version of [`std::iter`].
//! //!
@ -12,14 +12,21 @@
//! # //! #
//! use async_std::{prelude::*, stream}; //! use async_std::{prelude::*, stream};
//! //!
//! let mut stream = stream::repeat(9).take(3); //! let mut s = stream::repeat(9).take(3);
//! //!
//! while let Some(num) = stream.next().await { //! while let Some(v) = s.next().await {
//! assert_eq!(num, 9); //! assert_eq!(v, 9);
//! } //! }
//! # //! #
//! # }) } //! # }) }
//! ``` //! ```
#[doc(inline)] pub use empty::{empty, Empty};
pub use futures::stream::{empty, once, repeat, Empty, Once, Repeat, Stream}; pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use stream::{Stream, Take};
mod empty;
mod once;
mod repeat;
mod stream;

@ -0,0 +1,41 @@
use std::pin::Pin;
use std::task::{Context, Poll};
/// Creates a stream that yields a single item.
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::{prelude::*, stream};
///
/// let mut s = stream::once(7);
///
/// assert_eq!(s.next().await, Some(7));
/// assert_eq!(s.next().await, None);
/// #
/// # }) }
/// ```
pub fn once<T>(t: T) -> Once<T> {
Once { value: Some(t) }
}
/// A stream that yields a single item.
///
/// This stream is constructed by the [`once`] function.
///
/// [`once`]: fn.once.html
#[derive(Debug)]
pub struct Once<T> {
value: Option<T>,
}
impl<T: Unpin> futures::Stream for Once<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Ready(self.value.take())
}
}

@ -0,0 +1,44 @@
use std::pin::Pin;
use std::task::{Context, Poll};
/// Creates a stream that yields the same item repeatedly.
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::{prelude::*, stream};
///
/// let mut s = stream::repeat(7);
///
/// assert_eq!(s.next().await, Some(7));
/// assert_eq!(s.next().await, Some(7));
/// #
/// # }) }
/// ```
pub fn repeat<T>(item: T) -> Repeat<T>
where
T: Clone,
{
Repeat { item }
}
/// A stream that yields the same item repeatedly.
///
/// This stream is constructed by the [`repeat`] function.
///
/// [`repeat`]: fn.repeat.html
#[derive(Debug)]
pub struct Repeat<T> {
item: T,
}
impl<T: Clone> futures::Stream for Repeat<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(self.item.clone()))
}
}

@ -0,0 +1,169 @@
//! Asynchronous iteration.
//!
//! This module is an async version of [`std::iter`].
//!
//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
//!
//! # Examples
//!
//! ```
//! # #![feature(async_await)]
//! # fn main() { async_std::task::block_on(async {
//! #
//! use async_std::{prelude::*, stream};
//!
//! let mut s = stream::repeat(9).take(3);
//!
//! while let Some(v) = s.next().await {
//! assert_eq!(v, 9);
//! }
//! #
//! # }) }
//! ```
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use cfg_if::cfg_if;
cfg_if! {
if #[cfg(feature = "docs.rs")] {
#[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
/// An asynchronous stream of values.
///
/// This trait is an async version of [`std::iter::Iterator`].
///
/// While it is currently not possible to implement this trait directly, it gets implemented
/// automatically for all types that implement [`futures::stream::Stream`].
///
/// [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html
/// [`futures::stream::Stream`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.Stream.html
pub trait Stream {
/// The type of items yielded by this stream.
type Item;
/// Advances the stream and returns the next value.
///
/// Returns [`None`] when iteration is finished. Individual stream implementations may
/// choose to resume iteration, and so calling `next()` again may or may not eventually
/// start returning more values.
///
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::{prelude::*, stream};
///
/// let mut s = stream::once(7);
///
/// assert_eq!(s.next().await, Some(7));
/// assert_eq!(s.next().await, None);
/// #
/// # }) }
/// ```
fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option<Self::Item>)
where
Self: Unpin;
/// Creates a stream that yields its first `n` elements.
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::{prelude::*, stream};
///
/// let mut s = stream::repeat(9).take(3);
///
/// while let Some(v) = s.next().await {
/// assert_eq!(v, 9);
/// }
/// #
/// # }) }
/// ```
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take {
stream: self,
remaining: n,
}
}
}
impl<T: futures::Stream + Unpin + ?Sized> Stream for T {
type Item = <Self as futures::Stream>::Item;
fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option<Self::Item>)
where
Self: Unpin,
{
NextFuture { stream: self }
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NextFuture<'a, T: Unpin + ?Sized> {
stream: &'a mut T,
}
impl<T: futures::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
type Output = Option<T::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.stream).poll_next(cx)
}
}
/// A stream that yields the first `n` items of another stream.
#[derive(Clone, Debug)]
pub struct Take<S> {
stream: S,
remaining: usize,
}
impl<S: Unpin> Unpin for Take<S> {}
impl<S: futures::Stream> Take<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(remaining: usize);
}
impl<S: futures::Stream> futures::Stream for Take<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
if self.remaining == 0 {
Poll::Ready(None)
} else {
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(_) => *self.as_mut().remaining() -= 1,
None => *self.as_mut().remaining() = 0,
}
Poll::Ready(next)
}
}
}

@ -23,7 +23,7 @@
//! ``` //! ```
#[doc(inline)] #[doc(inline)]
pub use futures::task::{Context, Poll, Waker}; pub use std::task::{Context, Poll, Waker};
pub use local::{AccessError, LocalKey}; pub use local::{AccessError, LocalKey};
pub use pool::{block_on, current, spawn, Builder}; pub use pool::{block_on, current, spawn, Builder};

@ -34,9 +34,25 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::Duration; use std::time::Duration;
use cfg_if::cfg_if;
use futures_timer::Delay; use futures_timer::Delay;
use pin_utils::unsafe_pinned; use pin_utils::unsafe_pinned;
cfg_if! {
if #[cfg(feature = "docs.rs")] {
#[doc(hidden)]
pub struct ImplFuture<T>(std::marker::PhantomData<T>);
macro_rules! ret {
($f:tt, $o:ty) => (ImplFuture<$o>);
}
} else {
macro_rules! ret {
($f:tt, $o:ty) => ($f<Self>);
}
}
}
/// An error returned when a future times out. /// An error returned when a future times out.
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct TimeoutError; pub struct TimeoutError;
@ -57,9 +73,6 @@ impl From<TimeoutError> for io::Error {
/// An extension trait that configures timeouts for futures. /// An extension trait that configures timeouts for futures.
pub trait Timeout: Future + Sized { pub trait Timeout: Future + Sized {
/// TODO
type TimeoutFuture: Future<Output = Result<<Self as Future>::Output, TimeoutError>>;
/// Awaits a future to completion or times out after a duration of time. /// Awaits a future to completion or times out after a duration of time.
/// ///
/// # Examples /// # Examples
@ -81,13 +94,7 @@ pub trait Timeout: Future + Sized {
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
fn timeout(self, dur: Duration) -> Self::TimeoutFuture; fn timeout(self, dur: Duration) -> ret!(TimeoutFuture, Result<Self::Output, TimeoutError>) {
}
impl<F: Future> Timeout for F {
type TimeoutFuture = TimeoutFuture<F>;
fn timeout(self, dur: Duration) -> Self::TimeoutFuture {
TimeoutFuture { TimeoutFuture {
future: self, future: self,
delay: Delay::new(dur), delay: Delay::new(dur),
@ -97,7 +104,7 @@ impl<F: Future> Timeout for F {
/// A future that times out after a duration of time. /// A future that times out after a duration of time.
#[doc(hidden)] #[doc(hidden)]
#[derive(Debug)] #[allow(missing_debug_implementations)]
pub struct TimeoutFuture<F> { pub struct TimeoutFuture<F> {
future: F, future: F,
delay: Delay, delay: Delay,
@ -121,3 +128,5 @@ impl<F: Future> Future for TimeoutFuture<F> {
} }
} }
} }
impl<F: Future> Timeout for F {}

Loading…
Cancel
Save