Re-export IO traits from futures

This commit is contained in:
Stjepan Glavina 2019-09-21 10:39:56 +02:00
parent 8be7655672
commit edfa2358a4
43 changed files with 1075 additions and 772 deletions

View file

@ -63,5 +63,6 @@ matrix:
- mdbook test -L ./target/debug/deps docs
script:
- cargo check --all --benches --bins --examples --tests
- cargo check --features unstable --all --benches --bins --examples --tests
- cargo test --all --doc --features unstable

View file

@ -9,11 +9,11 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use cfg_if::cfg_if;
use futures_io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer};
use crate::fs::{Metadata, Permissions};
use crate::future;
use crate::io::{self, SeekFrom, Write};
use crate::io::{self, Seek, SeekFrom, Read, Write};
use crate::prelude::*;
use crate::task::{self, blocking, Context, Poll, Waker};
/// An open file on the filesystem.
@ -302,7 +302,7 @@ impl fmt::Debug for File {
}
}
impl AsyncRead for File {
impl Read for File {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -310,14 +310,9 @@ impl AsyncRead for File {
) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self).poll_read(cx, buf)
}
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
}
impl AsyncRead for &File {
impl Read for &File {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -326,14 +321,9 @@ impl AsyncRead for &File {
let state = futures_core::ready!(self.lock.poll_lock(cx));
state.poll_read(cx, buf)
}
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
}
impl AsyncWrite for File {
impl Write for File {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -351,7 +341,7 @@ impl AsyncWrite for File {
}
}
impl AsyncWrite for &File {
impl Write for &File {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -372,7 +362,7 @@ impl AsyncWrite for &File {
}
}
impl AsyncSeek for File {
impl Seek for File {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -382,7 +372,7 @@ impl AsyncSeek for File {
}
}
impl AsyncSeek for &File {
impl Seek for &File {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,

View file

@ -1,32 +0,0 @@
use std::pin::Pin;
use futures_io::AsyncBufRead;
use crate::future::Future;
use crate::io;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FillBufFuture<'a, R: ?Sized> {
reader: &'a mut R,
}
impl<'a, R: ?Sized> FillBufFuture<'a, R> {
pub(crate) fn new(reader: &'a mut R) -> Self {
Self { reader }
}
}
impl<'a, R: AsyncBufRead + Unpin + ?Sized> Future for FillBufFuture<'a, R> {
type Output = io::Result<&'a [u8]>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&'a [u8]>> {
let Self { reader } = &mut *self;
let result = Pin::new(reader).poll_fill_buf(cx);
// This is safe because:
// 1. The buffer is valid for the lifetime of the reader.
// 2. Output is unrelated to the wrapper (Self).
result.map_ok(|buf| unsafe { std::mem::transmute::<&'_ [u8], &'a [u8]>(buf) })
}
}

View file

@ -2,10 +2,8 @@ use std::mem;
use std::pin::Pin;
use std::str;
use futures_io::AsyncBufRead;
use super::read_until_internal;
use crate::io;
use crate::io::{self, BufRead};
use crate::task::{Context, Poll};
/// A stream of lines in a byte stream.
@ -25,7 +23,7 @@ pub struct Lines<R> {
pub(crate) read: usize,
}
impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
impl<R: BufRead> futures_core::stream::Stream for Lines<R> {
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -50,7 +48,7 @@ impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
}
}
pub fn read_line_internal<R: AsyncBufRead + ?Sized>(
pub fn read_line_internal<R: BufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,

View file

@ -1,9 +1,7 @@
mod fill_buf;
mod lines;
mod read_line;
mod read_until;
use fill_buf::FillBufFuture;
pub use lines::Lines;
use read_line::ReadLineFuture;
use read_until::ReadUntilFuture;
@ -12,68 +10,59 @@ use std::mem;
use std::pin::Pin;
use cfg_if::cfg_if;
use futures_io::AsyncBufRead;
use crate::io;
use crate::task::{Context, Poll};
cfg_if! {
if #[cfg(feature = "docs")] {
use std::ops::{Deref, DerefMut};
#[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
/// Allows reading from a buffered byte stream.
///
/// This trait is an async version of [`std::io::BufRead`].
/// This trait is a re-export of [`futures::io::AsyncBufRead`] and is an async version of
/// [`std::io::BufRead`].
///
/// While it is currently not possible to implement this trait directly, it gets implemented
/// automatically for all types that implement [`futures::io::AsyncBufRead`].
/// The [provided methods] do not really exist in the trait itself, but they become
/// available when the prelude is imported:
///
/// ```
/// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
///
/// [`std::io::BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
/// [`futures::io::AsyncBufRead`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html
/// [provided methods]: #provided-methods
pub trait BufRead {
/// Tells this buffer that `amt` bytes have been consumed from the buffer, so they should no
/// longer be returned in calls to `read`.
fn consume(&mut self, amt: usize)
where
Self: Unpin;
/// Returns the contents of the internal buffer, filling it with more data from the inner
/// reader if it is empty.
/// Returns the contents of the internal buffer, filling it with more data from the
/// inner reader if it is empty.
///
/// This function is a lower-level call. It needs to be paired with the [`consume`] method to
/// function properly. When calling this method, none of the contents will be "read" in the
/// sense that later calling `read` may return the same contents. As such, [`consume`] must be
/// called with the number of bytes that are consumed from this buffer to ensure that the bytes
/// are never returned twice.
/// This function is a lower-level call. It needs to be paired with the [`consume`]
/// method to function properly. When calling this method, none of the contents will be
/// "read" in the sense that later calling `read` may return the same contents. As
/// such, [`consume`] must be called with the number of bytes that are consumed from
/// this buffer to ensure that the bytes are never returned twice.
///
/// [`consume`]: #tymethod.consume
///
/// An empty buffer returned indicates that the stream has reached EOF.
// TODO: write a proper doctest with `consume`
fn fill_buf<'a>(&'a mut self) -> ret!('a, FillBufFuture, io::Result<&'a [u8]>)
where
Self: Unpin,
{
FillBufFuture::new(self)
}
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>;
/// Tells this buffer that `amt` bytes have been consumed from the buffer, so they
/// should no longer be returned in calls to `read`.
fn consume(self: Pin<&mut Self>, amt: usize);
/// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
///
/// This function will read bytes from the underlying stream until the delimiter or EOF is
/// found. Once found, all bytes up to, and including, the delimiter (if found) will be
/// appended to `buf`.
/// This function will read bytes from the underlying stream until the delimiter or EOF
/// is found. Once found, all bytes up to, and including, the delimiter (if found) will
/// be appended to `buf`.
///
/// If successful, this function will return the total number of bytes read.
///
@ -94,7 +83,8 @@ pub trait BufRead {
/// # Ok(()) }) }
/// ```
///
/// Multiple successful calls to `read_until` append all bytes up to and including to `buf`:
/// Multiple successful calls to `read_until` append all bytes up to and including to
/// `buf`:
/// ```
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
@ -120,23 +110,19 @@ pub trait BufRead {
&'a mut self,
byte: u8,
buf: &'a mut Vec<u8>,
) -> ret!('a, ReadUntilFuture, io::Result<usize>)
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
ReadUntilFuture {
reader: self,
byte,
buf,
read: 0,
}
unreachable!()
}
/// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is reached.
/// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is
/// reached.
///
/// This function will read bytes from the underlying stream until the newline delimiter (the
/// 0xA byte) or EOF is found. Once found, all bytes up to, and including, the delimiter (if
/// found) will be appended to `buf`.
/// This function will read bytes from the underlying stream until the newline
/// delimiter (the 0xA byte) or EOF is found. Once found, all bytes up to, and
/// including, the delimiter (if found) will be appended to `buf`.
///
/// If successful, this function will return the total number of bytes read.
///
@ -144,9 +130,10 @@ pub trait BufRead {
///
/// # Errors
///
/// This function has the same error semantics as [`read_until`] and will also return an error
/// if the read bytes are not valid UTF-8. If an I/O error is encountered then `buf` may
/// contain some bytes already read in the event that all data read so far was valid UTF-8.
/// This function has the same error semantics as [`read_until`] and will also return
/// an error if the read bytes are not valid UTF-8. If an I/O error is encountered then
/// `buf` may contain some bytes already read in the event that all data read so far
/// was valid UTF-8.
///
/// [`read_until`]: #method.read_until
///
@ -169,23 +156,18 @@ pub trait BufRead {
fn read_line<'a>(
&'a mut self,
buf: &'a mut String,
) -> ret!('a, ReadLineFuture, io::Result<usize>)
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
ReadLineFuture {
reader: self,
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
buf,
read: 0,
}
unreachable!()
}
/// Returns a stream over the lines of this byte stream.
///
/// The stream returned from this function will yield instances of
/// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte (the
/// 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
/// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte
/// (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
///
/// [`io::Result`]: type.Result.html
/// [`String`]: https://doc.rust-lang.org/std/string/struct.String.html
@ -210,6 +192,100 @@ pub trait BufRead {
/// #
/// # Ok(()) }) }
/// ```
fn lines(self) -> Lines<Self>
where
Self: Unpin + Sized,
{
unreachable!()
}
}
impl<T: BufRead + Unpin + ?Sized> BufRead for Box<T> {
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&[u8]>> {
unreachable!()
}
fn consume(self: Pin<&mut Self>, amt: usize) {
unreachable!()
}
}
impl<T: BufRead + Unpin + ?Sized> BufRead for &mut T {
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&[u8]>> {
unreachable!()
}
fn consume(self: Pin<&mut Self>, amt: usize) {
unreachable!()
}
}
impl<P> BufRead for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: BufRead,
{
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&[u8]>> {
unreachable!()
}
fn consume(self: Pin<&mut Self>, amt: usize) {
unreachable!()
}
}
impl BufRead for &[u8] {
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&[u8]>> {
unreachable!()
}
fn consume(self: Pin<&mut Self>, amt: usize) {
unreachable!()
}
}
} else {
pub use futures_io::AsyncBufRead as BufRead;
}
}
#[doc(hidden)]
pub trait BufReadExt: futures_io::AsyncBufRead {
fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'a, Self>
where
Self: Unpin,
{
ReadUntilFuture {
reader: self,
byte,
buf,
read: 0,
}
}
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'a, Self>
where
Self: Unpin,
{
ReadLineFuture {
reader: self,
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
buf,
read: 0,
}
}
fn lines(self) -> Lines<Self>
where
Self: Unpin + Sized,
@ -223,13 +299,9 @@ pub trait BufRead {
}
}
impl<T: AsyncBufRead + Unpin + ?Sized> BufRead for T {
fn consume(&mut self, amt: usize) {
AsyncBufRead::consume(Pin::new(self), amt)
}
}
impl<T: futures_io::AsyncBufRead + ?Sized> BufReadExt for T {}
pub fn read_until_internal<R: AsyncBufRead + ?Sized>(
pub fn read_until_internal<R: BufReadExt + ?Sized>(
mut reader: Pin<&mut R>,
cx: &mut Context<'_>,
byte: u8,

View file

@ -2,11 +2,9 @@ use std::mem;
use std::pin::Pin;
use std::str;
use futures_io::AsyncBufRead;
use super::read_until_internal;
use crate::future::Future;
use crate::io;
use crate::io::{self, BufRead};
use crate::task::{Context, Poll};
#[doc(hidden)]
@ -18,7 +16,7 @@ pub struct ReadLineFuture<'a, T: Unpin + ?Sized> {
pub(crate) read: usize,
}
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
impl<T: BufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -1,10 +1,8 @@
use std::pin::Pin;
use futures_io::AsyncBufRead;
use super::read_until_internal;
use crate::future::Future;
use crate::io;
use crate::io::{self, BufRead};
use crate::task::{Context, Poll};
#[doc(hidden)]
@ -16,7 +14,7 @@ pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> {
pub(crate) read: usize,
}
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
impl<T: BufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -2,9 +2,7 @@ use std::io::{IoSliceMut, Read as _};
use std::pin::Pin;
use std::{cmp, fmt};
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer};
use crate::io::{self, SeekFrom};
use crate::io::{self, BufRead, Read, Seek, SeekFrom};
use crate::task::{Context, Poll};
const DEFAULT_CAPACITY: usize = 8 * 1024;
@ -193,7 +191,7 @@ impl<R> BufReader<R> {
}
}
impl<R: AsyncRead> AsyncRead for BufReader<R> {
impl<R: Read> Read for BufReader<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -229,14 +227,9 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
self.consume(nread);
Poll::Ready(Ok(nread))
}
// we can't skip unconditionally because of the large buffer case in read.
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
}
impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
impl<R: Read> BufRead for BufReader<R> {
fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>,
cx: &mut Context<'_>,
@ -278,7 +271,7 @@ impl<R: io::Read + fmt::Debug> fmt::Debug for BufReader<R> {
}
}
impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
impl<R: Seek> Seek for BufReader<R> {
/// Seeks to an offset, in bytes, in the underlying reader.
///
/// The position used for seeking with `SeekFrom::Current(_)` is the position the underlying

View file

@ -1,9 +1,7 @@
use std::pin::Pin;
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use crate::future::Future;
use crate::io::{self, BufReader};
use crate::io::{self, BufRead, BufReader, Read, Write};
use crate::task::{Context, Poll};
/// Copies the entire contents of a reader into a writer.
@ -45,8 +43,8 @@ use crate::task::{Context, Poll};
/// ```
pub async fn copy<R, W>(reader: &mut R, writer: &mut W) -> io::Result<u64>
where
R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
R: Read + Unpin + ?Sized,
W: Write + Unpin + ?Sized,
{
pub struct CopyFuture<'a, R, W: ?Sized> {
reader: R,
@ -69,8 +67,8 @@ where
impl<R, W> Future for CopyFuture<'_, R, W>
where
R: AsyncBufRead,
W: AsyncWrite + Unpin + ?Sized,
R: BufRead,
W: Write + Unpin + ?Sized,
{
type Output = io::Result<u64>;

View file

@ -1,8 +1,7 @@
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
use std::io::{self, IoSlice, IoSliceMut, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::io::{self, IoSlice, IoSliceMut, Seek, SeekFrom, BufRead, Read, Write};
use crate::task::{Context, Poll};
/// A `Cursor` wraps an in-memory buffer and provides it with a
/// [`Seek`] implementation.
@ -153,7 +152,7 @@ impl<T> Cursor<T> {
}
}
impl<T> AsyncSeek for Cursor<T>
impl<T> Seek for Cursor<T>
where
T: AsRef<[u8]> + Unpin,
{
@ -162,11 +161,11 @@ where
_: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
Poll::Ready(io::Seek::seek(&mut self.inner, pos))
Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
}
}
impl<T> AsyncRead for Cursor<T>
impl<T> Read for Cursor<T>
where
T: AsRef<[u8]> + Unpin,
{
@ -175,7 +174,7 @@ where
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Read::read(&mut self.inner, buf))
Poll::Ready(std::io::Read::read(&mut self.inner, buf))
}
fn poll_read_vectored(
@ -183,30 +182,30 @@ where
_: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Read::read_vectored(&mut self.inner, bufs))
Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
}
}
impl<T> AsyncBufRead for Cursor<T>
impl<T> BufRead for Cursor<T>
where
T: AsRef<[u8]> + Unpin,
{
fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
Poll::Ready(io::BufRead::fill_buf(&mut self.get_mut().inner))
Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
io::BufRead::consume(&mut self.inner, amt)
std::io::BufRead::consume(&mut self.inner, amt)
}
}
impl AsyncWrite for Cursor<&mut [u8]> {
impl Write for Cursor<&mut [u8]> {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write(&mut self.inner, buf))
Poll::Ready(std::io::Write::write(&mut self.inner, buf))
}
fn poll_write_vectored(
@ -214,11 +213,11 @@ impl AsyncWrite for Cursor<&mut [u8]> {
_: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write_vectored(&mut self.inner, bufs))
Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut self.inner))
Poll::Ready(std::io::Write::flush(&mut self.inner))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -226,13 +225,13 @@ impl AsyncWrite for Cursor<&mut [u8]> {
}
}
impl AsyncWrite for Cursor<&mut Vec<u8>> {
impl Write for Cursor<&mut Vec<u8>> {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write(&mut self.inner, buf))
Poll::Ready(std::io::Write::write(&mut self.inner, buf))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -240,17 +239,17 @@ impl AsyncWrite for Cursor<&mut Vec<u8>> {
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut self.inner))
Poll::Ready(std::io::Write::flush(&mut self.inner))
}
}
impl AsyncWrite for Cursor<Vec<u8>> {
impl Write for Cursor<Vec<u8>> {
fn poll_write(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write(&mut self.inner, buf))
Poll::Ready(std::io::Write::write(&mut self.inner, buf))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -258,6 +257,6 @@ impl AsyncWrite for Cursor<Vec<u8>> {
}
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut self.inner))
Poll::Ready(std::io::Write::flush(&mut self.inner))
}
}

View file

@ -1,9 +1,7 @@
use std::fmt;
use std::pin::Pin;
use futures_io::{AsyncBufRead, AsyncRead, Initializer};
use crate::io;
use crate::io::{self, BufRead, Read};
use crate::task::{Context, Poll};
/// Creates a reader that contains no data.
@ -43,7 +41,7 @@ impl fmt::Debug for Empty {
}
}
impl AsyncRead for Empty {
impl Read for Empty {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
@ -52,14 +50,9 @@ impl AsyncRead for Empty {
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(0))
}
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
}
impl AsyncBufRead for Empty {
impl BufRead for Empty {
#[inline]
fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>,

View file

@ -20,25 +20,25 @@
//! # Ok(()) }) }
//! ```
pub mod prelude;
#[doc(inline)]
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
pub use buf_read::{BufRead, Lines};
pub use buf_read::{BufRead, BufReadExt, Lines};
pub use buf_reader::BufReader;
pub use copy::copy;
pub use cursor::Cursor;
pub use empty::{empty, Empty};
pub use read::Read;
pub use read::{Read, ReadExt};
pub use repeat::{repeat, Repeat};
pub use seek::Seek;
pub use seek::{Seek, SeekExt};
pub use sink::{sink, Sink};
pub use stderr::{stderr, Stderr};
pub use stdin::{stdin, Stdin};
pub use stdout::{stdout, Stdout};
pub use timeout::timeout;
pub use write::Write;
pub use write::{Write, WriteExt};
pub mod prelude;
mod buf_read;
mod buf_reader;

View file

@ -9,10 +9,19 @@
//! ```
#[doc(no_inline)]
pub use super::BufRead;
pub use crate::io::BufRead;
#[doc(no_inline)]
pub use super::Read;
pub use crate::io::Read;
#[doc(no_inline)]
pub use super::Seek;
pub use crate::io::Seek;
#[doc(no_inline)]
pub use super::Write;
pub use crate::io::Write;
#[doc(hidden)]
pub use crate::io::BufReadExt as _;
#[doc(hidden)]
pub use crate::io::ReadExt as _;
#[doc(hidden)]
pub use crate::io::SeekExt as _;
#[doc(hidden)]
pub use crate::io::WriteExt as _;

View file

@ -10,49 +10,70 @@ use read_to_end::{read_to_end_internal, ReadToEndFuture};
use read_to_string::ReadToStringFuture;
use read_vectored::ReadVectoredFuture;
use std::io;
use std::mem;
use cfg_if::cfg_if;
use futures_io::AsyncRead;
use crate::io::IoSliceMut;
cfg_if! {
if #[cfg(feature = "docs")] {
use std::pin::Pin;
use std::ops::{Deref, DerefMut};
use crate::io;
use crate::task::{Context, Poll};
#[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
/// Allows reading from a byte stream.
///
/// This trait is an async version of [`std::io::Read`].
/// This trait is a re-export of [`futures::io::AsyncRead`] and is an async version of
/// [`std::io::Read`].
///
/// While it is currently not possible to implement this trait directly, it gets implemented
/// automatically for all types that implement [`futures::io::AsyncRead`].
/// Methods other than [`poll_read`] and [`poll_read_vectored`] do not really exist in the
/// trait itself, but they become available when the prelude is imported:
///
/// ```
/// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
///
/// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
/// [`futures::io::AsyncRead`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncRead.html
/// [`poll_read`]: #tymethod.poll_read
/// [`poll_read_vectored`]: #method.poll_read_vectored
pub trait Read {
/// Attempt to read from the `AsyncRead` into `buf`.
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>>;
/// Attempt to read from the `AsyncRead` into `bufs` using vectored IO operations.
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
unreachable!()
}
/// Reads some bytes from the byte stream.
///
/// Returns the number of bytes read from the start of the buffer.
///
/// If the return value is `Ok(n)`, then it must be guaranteed that `0 <= n <= buf.len()`. A
/// nonzero `n` value indicates that the buffer has been filled in with `n` bytes of data. If
/// `n` is `0`, then it can indicate one of two scenarios:
/// If the return value is `Ok(n)`, then it must be guaranteed that
/// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
/// filled in with `n` bytes of data. If `n` is `0`, then it can indicate one of two
/// scenarios:
///
/// 1. This reader has reached its "end of file" and will likely no longer be able to produce
/// bytes. Note that this does not mean that the reader will always no longer be able to
/// produce bytes.
/// 1. This reader has reached its "end of file" and will likely no longer be able to
/// produce bytes. Note that this does not mean that the reader will always no
/// longer be able to produce bytes.
/// 2. The buffer specified was 0 bytes in length.
///
/// # Examples
@ -70,35 +91,38 @@ pub trait Read {
/// #
/// # Ok(()) }) }
/// ```
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result<usize>)
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin;
Self: Unpin
{
unreachable!()
}
/// Like [`read`], except that it reads into a slice of buffers.
///
/// Data is copied to fill each buffer in order, with the final buffer written to possibly
/// being only partially filled. This method must behave as a single call to [`read`] with the
/// buffers concatenated would.
/// Data is copied to fill each buffer in order, with the final buffer written to
/// possibly being only partially filled. This method must behave as a single call to
/// [`read`] with the buffers concatenated would.
///
/// The default implementation calls [`read`] with either the first nonempty buffer provided,
/// or an empty one if none exists.
/// The default implementation calls [`read`] with either the first nonempty buffer
/// provided, or an empty one if none exists.
///
/// [`read`]: #tymethod.read
fn read_vectored<'a>(
&'a mut self,
bufs: &'a mut [io::IoSliceMut<'a>],
) -> ret!('a, ReadVectoredFuture, io::Result<usize>)
bufs: &'a mut [IoSliceMut<'a>],
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
ReadVectoredFuture { reader: self, bufs }
unreachable!()
}
/// Reads all bytes from the byte stream.
///
/// All bytes read from this stream will be appended to the specified buffer `buf`. This
/// function will continuously call [`read`] to append more data to `buf` until [`read`]
/// returns either `Ok(0)` or an error.
/// All bytes read from this stream will be appended to the specified buffer `buf`.
/// This function will continuously call [`read`] to append more data to `buf` until
/// [`read`] returns either `Ok(0)` or an error.
///
/// If successful, this function will return the total number of bytes read.
///
@ -122,24 +146,19 @@ pub trait Read {
fn read_to_end<'a>(
&'a mut self,
buf: &'a mut Vec<u8>,
) -> ret!('a, ReadToEndFuture, io::Result<usize>)
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
let start_len = buf.len();
ReadToEndFuture {
reader: self,
buf,
start_len,
}
unreachable!()
}
/// Reads all bytes from the byte stream and appends them into a string.
///
/// If successful, this function will return the number of bytes read.
///
/// If the data in this stream is not valid UTF-8 then an error will be returned and `buf` will
/// be left unmodified.
/// If the data in this stream is not valid UTF-8 then an error will be returned and
/// `buf` will be left unmodified.
///
/// # Examples
///
@ -159,37 +178,32 @@ pub trait Read {
fn read_to_string<'a>(
&'a mut self,
buf: &'a mut String,
) -> ret!('a, ReadToStringFuture, io::Result<usize>)
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
let start_len = buf.len();
ReadToStringFuture {
reader: self,
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
buf,
start_len,
}
unreachable!()
}
/// Reads the exact number of bytes required to fill `buf`.
///
/// This function reads as many bytes as necessary to completely fill the specified buffer
/// `buf`.
/// This function reads as many bytes as necessary to completely fill the specified
/// buffer `buf`.
///
/// No guarantees are provided about the contents of `buf` when this function is called,
/// implementations cannot rely on any property of the contents of `buf` being true. It is
/// recommended that implementations only write data to `buf` instead of reading its contents.
/// No guarantees are provided about the contents of `buf` when this function is
/// called, implementations cannot rely on any property of the contents of `buf` being
/// true. It is recommended that implementations only write data to `buf` instead of
/// reading its contents.
///
/// If this function encounters an "end of file" before completely filling the buffer, it
/// returns an error of the kind [`ErrorKind::UnexpectedEof`]. The contents of `buf` are
/// unspecified in this case.
/// If this function encounters an "end of file" before completely filling the buffer,
/// it returns an error of the kind [`ErrorKind::UnexpectedEof`]. The contents of
/// `buf` are unspecified in this case.
///
/// If any other read error is encountered then this function immediately returns. The contents
/// of `buf` are unspecified in this case.
/// If any other read error is encountered then this function immediately returns. The
/// contents of `buf` are unspecified in this case.
///
/// If this function returns an error, it is unspecified how many bytes it has read, but it
/// will never read more than would be necessary to completely fill the buffer.
/// If this function returns an error, it is unspecified how many bytes it has read,
/// but it will never read more than would be necessary to completely fill the buffer.
///
/// [`ErrorKind::UnexpectedEof`]: enum.ErrorKind.html#variant.UnexpectedEof
///
@ -208,7 +222,107 @@ pub trait Read {
/// #
/// # Ok(()) }) }
/// ```
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadExactFuture, io::Result<()>)
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ImplFuture<'a, io::Result<()>>
where
Self: Unpin,
{
unreachable!()
}
}
impl<T: Read + Unpin + ?Sized> Read for Box<T> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
}
impl<T: Read + Unpin + ?Sized> Read for &mut T {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
}
impl<P> Read for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
}
impl Read for &[u8] {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
}
} else {
pub use futures_io::AsyncRead as Read;
}
}
#[doc(hidden)]
pub trait ReadExt: futures_io::AsyncRead {
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
where
Self: Unpin,
{
ReadFuture { reader: self, buf }
}
fn read_vectored<'a>(
&'a mut self,
bufs: &'a mut [IoSliceMut<'a>],
) -> ReadVectoredFuture<'a, Self>
where
Self: Unpin,
{
ReadVectoredFuture { reader: self, bufs }
}
fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
where
Self: Unpin,
{
let start_len = buf.len();
ReadToEndFuture {
reader: self,
buf,
start_len,
}
}
fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
where
Self: Unpin,
{
let start_len = buf.len();
ReadToStringFuture {
reader: self,
bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
buf,
start_len,
}
}
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
where
Self: Unpin,
{
@ -216,8 +330,4 @@ pub trait Read {
}
}
impl<T: AsyncRead + Unpin + ?Sized> Read for T {
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result<usize>) {
ReadFuture { reader: self, buf }
}
}
impl<T: futures_io::AsyncRead + ?Sized> ReadExt for T {}

View file

@ -1,10 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::pin::Pin;
use futures_io::AsyncRead;
use crate::future::Future;
use crate::io::{self, Read};
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
@ -13,7 +11,7 @@ pub struct ReadFuture<'a, T: Unpin + ?Sized> {
pub(crate) buf: &'a mut [u8],
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, T> {
impl<T: Read + Unpin + ?Sized> Future for ReadFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -1,11 +1,9 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::mem;
use std::pin::Pin;
use futures_io::AsyncRead;
use crate::future::Future;
use crate::task::{Context, Poll};
use crate::io::{self, Read};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
@ -14,7 +12,7 @@ pub struct ReadExactFuture<'a, T: Unpin + ?Sized> {
pub(crate) buf: &'a mut [u8],
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> {
impl<T: Read + Unpin + ?Sized> Future for ReadExactFuture<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -1,10 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::pin::Pin;
use futures_io::AsyncRead;
use crate::future::Future;
use crate::io::{self, Read};
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
@ -14,7 +12,7 @@ pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> {
pub(crate) start_len: usize,
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
impl<T: Read + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -36,7 +34,7 @@ impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
//
// Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics.
pub fn read_to_end_internal<R: AsyncRead + ?Sized>(
pub fn read_to_end_internal<R: Read + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut Vec<u8>,

View file

@ -1,13 +1,11 @@
use super::read_to_end_internal;
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::mem;
use std::pin::Pin;
use std::str;
use futures_io::AsyncRead;
use super::read_to_end_internal;
use crate::future::Future;
use crate::io::{self, Read};
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
@ -18,7 +16,7 @@ pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> {
pub(crate) start_len: usize,
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> {
impl<T: Read + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -1,12 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io::IoSliceMut;
use std::pin::Pin;
use futures_io::AsyncRead;
use crate::io;
use crate::io::{self, Read, IoSliceMut};
use crate::future::Future;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
@ -15,7 +11,7 @@ pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> {
pub(crate) bufs: &'a mut [IoSliceMut<'a>],
}
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, T> {
impl<T: Read + Unpin + ?Sized> Future for ReadVectoredFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -1,9 +1,7 @@
use std::fmt;
use std::pin::Pin;
use futures_io::{AsyncRead, Initializer};
use crate::io;
use crate::io::{self, Read};
use crate::task::{Context, Poll};
/// Creates an instance of a reader that infinitely repeats one byte.
@ -44,7 +42,7 @@ impl fmt::Debug for Repeat {
}
}
impl AsyncRead for Repeat {
impl Read for Repeat {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
@ -56,9 +54,4 @@ impl AsyncRead for Repeat {
}
Poll::Ready(Ok(buf.len()))
}
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
}

View file

@ -1,7 +1,6 @@
use std::pin::Pin;
use cfg_if::cfg_if;
use futures_io::AsyncSeek;
use crate::future::Future;
use crate::io::{self, SeekFrom};
@ -9,30 +8,36 @@ use crate::task::{Context, Poll};
cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
use std::ops::{Deref, DerefMut};
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
#[doc(hidden)]
pub struct ImplFuture<T>(std::marker::PhantomData<T>);
/// Allows seeking through a byte stream.
///
/// This trait is an async version of [`std::io::Seek`].
/// This trait is a re-export of [`futures::io::AsyncSeek`] and is an async version of
/// [`std::io::Seek`].
///
/// While it is currently not possible to implement this trait directly, it gets implemented
/// automatically for all types that implement [`futures::io::AsyncSeek`].
/// The [provided methods] do not really exist in the trait itself, but they become
/// available when the prelude is imported:
///
/// ```
/// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
///
/// [`std::io::Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
/// [`futures::io::AsyncSeek`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncSeek.html
/// [provided methods]: #provided-methods
pub trait Seek {
/// Attempt to seek to an offset, in bytes, in a stream.
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>>;
/// Seeks to a new position in a byte stream.
///
/// Returns the new position in the byte stream.
@ -55,17 +60,64 @@ pub trait Seek {
/// #
/// # Ok(()) }) }
/// ```
fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result<u64>)
fn seek(&mut self, pos: SeekFrom) -> ImplFuture<io::Result<u64>>
where
Self: Unpin;
Self: Unpin
{
unreachable!()
}
}
impl<T: AsyncSeek + Unpin + ?Sized> Seek for T {
fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result<u64>) {
impl<T: Seek + Unpin + ?Sized> Seek for Box<T> {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
unreachable!()
}
}
impl<T: Seek + Unpin + ?Sized> Seek for &mut T {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
unreachable!()
}
}
impl<P> Seek for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: Seek,
{
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
unreachable!()
}
}
} else {
pub use futures_io::AsyncSeek as Seek;
}
}
#[doc(hidden)]
pub trait SeekExt: futures_io::AsyncSeek {
fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
where
Self: Unpin,
{
SeekFuture { seeker: self, pos }
}
}
impl<T: futures_io::AsyncSeek + ?Sized> SeekExt for T {}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct SeekFuture<'a, T: Unpin + ?Sized> {
@ -73,7 +125,7 @@ pub struct SeekFuture<'a, T: Unpin + ?Sized> {
pos: SeekFrom,
}
impl<T: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, T> {
impl<T: SeekExt + Unpin + ?Sized> Future for SeekFuture<'_, T> {
type Output = io::Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -1,9 +1,7 @@
use std::fmt;
use std::pin::Pin;
use futures_io::AsyncWrite;
use crate::io;
use crate::io::{self, Write};
use crate::task::{Context, Poll};
/// Creates a writer that consumes and drops all data.
@ -40,7 +38,7 @@ impl fmt::Debug for Sink {
}
}
impl AsyncWrite for Sink {
impl Write for Sink {
#[inline]
fn poll_write(
self: Pin<&mut Self>,

View file

@ -1,11 +1,10 @@
use std::io;
use std::pin::Pin;
use std::sync::Mutex;
use cfg_if::cfg_if;
use futures_io::AsyncWrite;
use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, Poll};
/// Constructs a new handle to the standard error of the current process.
@ -29,7 +28,7 @@ use crate::task::{blocking, Context, Poll};
/// ```
pub fn stderr() -> Stderr {
Stderr(Mutex::new(State::Idle(Some(Inner {
stderr: io::stderr(),
stderr: std::io::stderr(),
buf: Vec::new(),
last_op: None,
}))))
@ -64,7 +63,7 @@ enum State {
#[derive(Debug)]
struct Inner {
/// The blocking stderr handle.
stderr: io::Stderr,
stderr: std::io::Stderr,
/// The write buffer.
buf: Vec<u8>,
@ -80,7 +79,7 @@ enum Operation {
Flush(io::Result<()>),
}
impl AsyncWrite for Stderr {
impl Write for Stderr {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -118,7 +117,7 @@ impl AsyncWrite for Stderr {
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = io::Write::write(&mut inner.stderr, &mut inner.buf);
let res = std::io::Write::write(&mut inner.stderr, &mut inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
}));
@ -146,7 +145,7 @@ impl AsyncWrite for Stderr {
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = io::Write::flush(&mut inner.stderr);
let res = std::io::Write::flush(&mut inner.stderr);
inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner))
}));
@ -179,7 +178,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for Stderr {
fn as_raw_fd(&self) -> RawFd {
io::stderr().as_raw_fd()
std::io::stderr().as_raw_fd()
}
}
}
@ -190,7 +189,7 @@ cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
impl AsRawHandle for Stderr {
fn as_raw_handle(&self) -> RawHandle {
io::stderr().as_raw_handle()
std::io::stderr().as_raw_handle()
}
}
}

View file

@ -1,10 +1,9 @@
use std::io;
use std::pin::Pin;
use std::sync::Mutex;
use cfg_if::cfg_if;
use futures_io::{AsyncRead, Initializer};
use crate::io::{self, Read};
use crate::future::{self, Future};
use crate::task::{blocking, Context, Poll};
@ -29,7 +28,7 @@ use crate::task::{blocking, Context, Poll};
/// ```
pub fn stdin() -> Stdin {
Stdin(Mutex::new(State::Idle(Some(Inner {
stdin: io::stdin(),
stdin: std::io::stdin(),
line: String::new(),
buf: Vec::new(),
last_op: None,
@ -65,7 +64,7 @@ enum State {
#[derive(Debug)]
struct Inner {
/// The blocking stdin handle.
stdin: io::Stdin,
stdin: std::io::Stdin,
/// The line buffer.
line: String,
@ -137,7 +136,7 @@ impl Stdin {
}
}
impl AsyncRead for Stdin {
impl Read for Stdin {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -174,7 +173,7 @@ impl AsyncRead for Stdin {
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = io::Read::read(&mut inner.stdin, &mut inner.buf);
let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf);
inner.last_op = Some(Operation::Read(res));
State::Idle(Some(inner))
}));
@ -185,11 +184,6 @@ impl AsyncRead for Stdin {
}
}
}
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
}
cfg_if! {
@ -208,7 +202,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for Stdin {
fn as_raw_fd(&self) -> RawFd {
io::stdin().as_raw_fd()
std::io::stdin().as_raw_fd()
}
}
}
@ -219,7 +213,7 @@ cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
impl AsRawHandle for Stdin {
fn as_raw_handle(&self) -> RawHandle {
io::stdin().as_raw_handle()
std::io::stdin().as_raw_handle()
}
}
}

View file

@ -1,12 +1,11 @@
use std::io;
use std::pin::Pin;
use std::sync::Mutex;
use cfg_if::cfg_if;
use futures_io::AsyncWrite;
use crate::future::Future;
use crate::task::{blocking, Context, Poll};
use crate::io::{self, Write};
/// Constructs a new handle to the standard output of the current process.
///
@ -29,7 +28,7 @@ use crate::task::{blocking, Context, Poll};
/// ```
pub fn stdout() -> Stdout {
Stdout(Mutex::new(State::Idle(Some(Inner {
stdout: io::stdout(),
stdout: std::io::stdout(),
buf: Vec::new(),
last_op: None,
}))))
@ -64,7 +63,7 @@ enum State {
#[derive(Debug)]
struct Inner {
/// The blocking stdout handle.
stdout: io::Stdout,
stdout: std::io::Stdout,
/// The write buffer.
buf: Vec<u8>,
@ -80,7 +79,7 @@ enum Operation {
Flush(io::Result<()>),
}
impl AsyncWrite for Stdout {
impl Write for Stdout {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -118,7 +117,7 @@ impl AsyncWrite for Stdout {
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = io::Write::write(&mut inner.stdout, &mut inner.buf);
let res = std::io::Write::write(&mut inner.stdout, &mut inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
}));
@ -146,7 +145,7 @@ impl AsyncWrite for Stdout {
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = io::Write::flush(&mut inner.stdout);
let res = std::io::Write::flush(&mut inner.stdout);
inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner))
}));
@ -179,7 +178,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for Stdout {
fn as_raw_fd(&self) -> RawFd {
io::stdout().as_raw_fd()
std::io::stdout().as_raw_fd()
}
}
}
@ -190,7 +189,7 @@ cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
impl AsRawHandle for Stdout {
fn as_raw_handle(&self) -> RawHandle {
io::stdout().as_raw_handle()
std::io::stdout().as_raw_handle()
}
}
}

View file

@ -1,10 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::pin::Pin;
use futures_io::AsyncWrite;
use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
@ -12,7 +10,7 @@ pub struct FlushFuture<'a, T: Unpin + ?Sized> {
pub(crate) writer: &'a mut T,
}
impl<T: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, T> {
impl<T: Write + Unpin + ?Sized> Future for FlushFuture<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -8,45 +8,75 @@ use write::WriteFuture;
use write_all::WriteAllFuture;
use write_vectored::WriteVectoredFuture;
use std::io;
use cfg_if::cfg_if;
use futures_io::AsyncWrite;
use crate::io::IoSlice;
cfg_if! {
if #[cfg(feature = "docs")] {
use std::pin::Pin;
use std::ops::{Deref, DerefMut};
use crate::io;
use crate::task::{Context, Poll};
#[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
/// Allows writing to a byte stream.
///
/// This trait is an async version of [`std::io::Write`].
/// This trait is a re-export of [`futures::io::AsyncWrite`] and is an async version of
/// [`std::io::Write`].
///
/// While it is currently not possible to implement this trait directly, it gets implemented
/// automatically for all types that implement [`futures::io::AsyncWrite`].
/// Methods other than [`poll_write`], [`poll_write_vectored`], [`poll_flush`], and
/// [`poll_close`] do not really exist in the trait itself, but they become available when
/// the prelude is imported:
///
/// ```
/// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
///
/// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
/// [`futures::io::AsyncWrite`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncWrite.html
/// [`poll_write`]: #tymethod.poll_write
/// [`poll_write_vectored`]: #method.poll_write_vectored
/// [`poll_flush`]: #tymethod.poll_flush
/// [`poll_close`]: #tymethod.poll_close
pub trait Write {
/// Attempt to write bytes from `buf` into the object.
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>>;
/// Attempt to write bytes from `bufs` into the object using vectored
/// IO operations.
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>]
) -> Poll<io::Result<usize>> {
unreachable!()
}
/// Attempt to flush the object, ensuring that any buffered data reach
/// their destination.
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
/// Attempt to close the object.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
/// Writes some bytes into the byte stream.
///
/// Returns the number of bytes written from the start of the buffer.
///
/// If the return value is `Ok(n)` then it must be guaranteed that `0 <= n <= buf.len()`. A
/// return value of `0` typically means that the underlying object is no longer able to accept
/// bytes and will likely not be able to in the future as well, or that the buffer provided is
/// empty.
/// If the return value is `Ok(n)` then it must be guaranteed that
/// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
/// object is no longer able to accept bytes and will likely not be able to in the
/// future as well, or that the buffer provided is empty.
///
/// # Examples
///
@ -62,9 +92,12 @@ pub trait Write {
/// #
/// # Ok(()) }) }
/// ```
fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result<usize>)
fn write<'a>(&'a mut self, buf: &'a [u8]) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin;
Self: Unpin,
{
unreachable!()
}
/// Flushes the stream to ensure that all buffered contents reach their destination.
///
@ -83,35 +116,38 @@ pub trait Write {
/// #
/// # Ok(()) }) }
/// ```
fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>)
fn flush(&mut self) -> ImplFuture<'_, io::Result<()>>
where
Self: Unpin;
Self: Unpin,
{
unreachable!()
}
/// Like [`write`], except that it writes from a slice of buffers.
///
/// Data is copied from each buffer in order, with the final buffer read from possibly being
/// only partially consumed. This method must behave as a call to [`write`] with the buffers
/// concatenated would.
/// Data is copied from each buffer in order, with the final buffer read from possibly
/// being only partially consumed. This method must behave as a call to [`write`] with
/// the buffers concatenated would.
///
/// The default implementation calls [`write`] with either the first nonempty buffer provided,
/// or an empty one if none exists.
/// The default implementation calls [`write`] with either the first nonempty buffer
/// provided, or an empty one if none exists.
///
/// [`write`]: #tymethod.write
fn write_vectored<'a>(
&'a mut self,
bufs: &'a [io::IoSlice<'a>],
) -> ret!('a, WriteVectoredFuture, io::Result<usize>)
bufs: &'a [IoSlice<'a>],
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
WriteVectoredFuture { writer: self, bufs }
unreachable!()
}
/// Writes an entire buffer into the byte stream.
///
/// This method will continuously call [`write`] until there is no more data to be written or
/// an error is returned. This method will not return until the entire buffer has been
/// successfully written or such an error occurs.
/// This method will continuously call [`write`] until there is no more data to be
/// written or an error is returned. This method will not return until the entire
/// buffer has been successfully written or such an error occurs.
///
/// [`write`]: #tymethod.write
///
@ -131,7 +167,118 @@ pub trait Write {
/// ```
///
/// [`write`]: #tymethod.write
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteAllFuture, io::Result<()>)
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ImplFuture<'a, io::Result<()>>
where
Self: Unpin,
{
unreachable!()
}
}
impl<T: Write + Unpin + ?Sized> Write for Box<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
}
impl<T: Write + Unpin + ?Sized> Write for &mut T {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
}
impl<P> Write for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
}
impl Write for Vec<u8> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
}
} else {
pub use futures_io::AsyncWrite as Write;
}
}
#[doc(hidden)]
pub trait WriteExt: Write {
fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
where
Self: Unpin,
{
WriteFuture { writer: self, buf }
}
fn flush(&mut self) -> FlushFuture<'_, Self>
where
Self: Unpin,
{
FlushFuture { writer: self }
}
fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
where
Self: Unpin,
{
WriteVectoredFuture { writer: self, bufs }
}
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
where
Self: Unpin,
{
@ -139,12 +286,4 @@ pub trait Write {
}
}
impl<T: AsyncWrite + Unpin + ?Sized> Write for T {
fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result<usize>) {
WriteFuture { writer: self, buf }
}
fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>) {
FlushFuture { writer: self }
}
}
impl<T: Write + ?Sized> WriteExt for T {}

View file

@ -1,10 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::pin::Pin;
use futures_io::AsyncWrite;
use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
@ -13,7 +11,7 @@ pub struct WriteFuture<'a, T: Unpin + ?Sized> {
pub(crate) buf: &'a [u8],
}
impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, T> {
impl<T: Write + Unpin + ?Sized> Future for WriteFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -1,11 +1,9 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::mem;
use std::pin::Pin;
use futures_io::AsyncWrite;
use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
@ -14,7 +12,7 @@ pub struct WriteAllFuture<'a, T: Unpin + ?Sized> {
pub(crate) buf: &'a [u8],
}
impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, T> {
impl<T: Write + Unpin + ?Sized> Future for WriteAllFuture<'_, T> {
type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -1,11 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::io::IoSlice;
use std::pin::Pin;
use futures_io::AsyncWrite;
use crate::future::Future;
use crate::task::{Context, Poll};
use crate::io::{self, Write, IoSlice};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
@ -14,7 +11,7 @@ pub struct WriteVectoredFuture<'a, T: Unpin + ?Sized> {
pub(crate) bufs: &'a [IoSlice<'a>],
}
impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, T> {
impl<T: Write + Unpin + ?Sized> Future for WriteVectoredFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

View file

@ -28,7 +28,12 @@
//!
//! # Features
//!
//! Unstable APIs in this crate are available when the `unstable` Cargo feature is enabled:
//! Items marked with
//! <span
//! class="module-item stab portability"
//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
//! ><code>unstable</code></span>
//! are available only when the `unstable` Cargo feature is enabled:
//!
//! ```toml
//! [dependencies.async-std]
@ -58,6 +63,7 @@ cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub mod pin;
mod vec;
mod result;
}

View file

@ -8,6 +8,7 @@ use crate::future::{self, Future};
use crate::io;
use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs;
use crate::stream::Stream;
use crate::task::{Context, Poll};
/// A TCP socket server, listening for connections.
@ -190,7 +191,7 @@ impl TcpListener {
#[derive(Debug)]
pub struct Incoming<'a>(&'a TcpListener);
impl<'a> futures_core::stream::Stream for Incoming<'a> {
impl<'a> Stream for Incoming<'a> {
type Item = io::Result<TcpStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

View file

@ -3,10 +3,9 @@ use std::net::SocketAddr;
use std::pin::Pin;
use cfg_if::cfg_if;
use futures_io::{AsyncRead, AsyncWrite};
use crate::future;
use crate::io;
use crate::io::{self, Read, Write};
use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs;
use crate::task::blocking;
@ -285,7 +284,7 @@ impl TcpStream {
}
}
impl AsyncRead for TcpStream {
impl Read for TcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -303,7 +302,7 @@ impl AsyncRead for TcpStream {
}
}
impl AsyncRead for &TcpStream {
impl Read for &TcpStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -313,7 +312,7 @@ impl AsyncRead for &TcpStream {
}
}
impl AsyncWrite for TcpStream {
impl Write for TcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -339,7 +338,7 @@ impl AsyncWrite for TcpStream {
}
}
impl AsyncWrite for &TcpStream {
impl Write for &TcpStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,

View file

@ -6,11 +6,10 @@ use std::net::Shutdown;
use std::path::Path;
use std::pin::Pin;
use futures_io::{AsyncRead, AsyncWrite};
use mio_uds;
use super::SocketAddr;
use crate::io;
use crate::io::{self, Read, Write};
use crate::net::driver::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::task::{blocking, Context, Poll};
@ -154,7 +153,7 @@ impl UnixStream {
}
}
impl AsyncRead for UnixStream {
impl Read for UnixStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -164,7 +163,7 @@ impl AsyncRead for UnixStream {
}
}
impl AsyncRead for &UnixStream {
impl Read for &UnixStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -174,7 +173,7 @@ impl AsyncRead for &UnixStream {
}
}
impl AsyncWrite for UnixStream {
impl Write for UnixStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
@ -192,7 +191,7 @@ impl AsyncWrite for UnixStream {
}
}
impl AsyncWrite for &UnixStream {
impl Write for &UnixStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,

View file

@ -25,3 +25,14 @@ pub use crate::io::Write as _;
pub use crate::stream::Stream;
#[doc(no_inline)]
pub use crate::task_local;
#[doc(hidden)]
pub use crate::io::BufReadExt as _;
#[doc(hidden)]
pub use crate::io::ReadExt as _;
#[doc(hidden)]
pub use crate::io::SeekExt as _;
#[doc(hidden)]
pub use crate::io::WriteExt as _;
#[doc(hidden)]
pub use crate::stream::stream::Stream as _;

View file

@ -1,8 +1,9 @@
use crate::stream::{FromStream, IntoStream, Stream};
use std::pin::Pin;
impl<T: Send, E: Send, V> FromStream<Result<T, E>> for Result<V, E>
use crate::prelude::*;
use crate::stream::{FromStream, IntoStream};
impl<T, E, V> FromStream<Result<T, E>> for Result<V, E>
where
V: FromStream<T>,
{
@ -12,9 +13,9 @@ where
#[inline]
fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>
where
<S as IntoStream>::IntoStream: Send + 'a,
<S as IntoStream>::IntoStream: 'a,
{
let stream = stream.into_stream();

View file

@ -12,7 +12,7 @@ use std::pin::Pin;
/// [`IntoStream`]: trait.IntoStream.html
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub trait FromStream<T: Send> {
pub trait FromStream<T> {
/// Creates a value from a stream.
///
/// # Examples
@ -24,7 +24,7 @@ pub trait FromStream<T: Send> {
///
/// // let _five_fives = async_std::stream::repeat(5).take(5);
/// ```
fn from_stream<'a, S: IntoStream<Item = T> + Send + 'a>(
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>;
) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>;
}

View file

@ -20,13 +20,13 @@ pub trait IntoStream {
type Item;
/// Which kind of stream are we turning this into?
type IntoStream: Stream<Item = Self::Item> + Send;
type IntoStream: Stream<Item = Self::Item>;
/// Creates a stream from a value.
fn into_stream(self) -> Self::IntoStream;
}
impl<I: Stream + Send> IntoStream for I {
impl<I: Stream> IntoStream for I {
type Item = I::Item;
type IntoStream = I;

View file

@ -26,12 +26,20 @@ use cfg_if::cfg_if;
pub use empty::{empty, Empty};
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use stream::{Fuse, Scan, Stream, Take, Zip};
pub use stream::{Fuse, Scan, Take, Zip};
mod empty;
mod once;
mod repeat;
mod stream;
pub(crate) mod stream;
cfg_if! {
if #[cfg(feature = "docs")] {
pub use stream::Stream;
} else {
pub use futures_core::stream::Stream;
}
}
cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {

View file

@ -60,8 +60,8 @@ use std::task::{Context, Poll};
use cfg_if::cfg_if;
cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {
use crate::stream::FromStream;
if #[cfg(feature = "unstable")] {
use crate::future::Future;
}
}
@ -75,7 +75,7 @@ cfg_if! {
($a:lifetime, $f:tt, $o:ty, $t1:ty) => (ImplFuture<$a, $o>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => (ImplFuture<$a, $o>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => (ImplFuture<$a, $o>);
($f:ty, $o:ty) => (ImplFuture<'static, $o>);
}
} else {
macro_rules! ret {
@ -83,36 +83,34 @@ cfg_if! {
($a:lifetime, $f:tt, $o:ty, $t1:ty) => ($f<$a, Self, $t1>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => ($f<$a, Self, $t1, $t2>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => ($f<$a, Self, $t1, $t2, $t3>);
($f:ty, $o:ty) => ($f);
}
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
pub struct DynFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (DynFuture<$a, $o>);
}
} else {
#[allow(unused_macros)]
macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (Pin<Box<dyn core::future::Future<Output = $o> + Send + 'a>>)
}
if #[cfg(any(feature = "unstable", feature = "docs"))] {
use crate::stream::FromStream;
}
}
/// An asynchronous stream of values.
///
/// This trait is an async version of [`std::iter::Iterator`].
/// This trait is a re-export of [`futures::stream::Stream`] and is an async version of
/// [`std::iter::Iterator`].
///
/// While it is currently not possible to implement this trait directly, it gets implemented
/// automatically for all types that implement [`futures::stream::Stream`].
/// The [provided methods] do not really exist in the trait itself, but they become available when
/// the prelude is imported:
///
/// ```
/// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
///
/// [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html
/// [`futures::stream::Stream`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.Stream.html
/// [provided methods]: #provided-methods
pub trait Stream {
/// The type of items yielded by this stream.
type Item;
@ -345,7 +343,7 @@ pub trait Stream {
/// #
/// # }) }
/// ```
fn min_by<F>(self, compare: F) -> MinByFuture<Self, F, Self::Item>
fn min_by<F>(self, compare: F) -> ret!(MinByFuture<Self, F, Self::Item>, Self::Item)
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
@ -555,7 +553,7 @@ pub trait Stream {
/// #
/// # }) }
/// ```
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, F, Self::Item, B>
fn fold<B, F>(self, init: B, f: F) -> ret!(FoldFuture<Self, F, Self::Item, B>, Self::Item)
where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
@ -753,13 +751,12 @@ pub trait Stream {
/// ```
///
/// [`stream`]: trait.Stream.html#tymethod.next
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
fn collect<'a, B>(self) -> dyn_ret!('a, B)
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(self) -> ret!(Pin<Box<dyn Future<Output = B> + 'a>>, B)
where
Self: futures_core::stream::Stream + Sized + Send + 'a,
<Self as futures_core::stream::Stream>::Item: Send,
Self: futures_core::stream::Stream + Sized + 'a,
B: FromStream<<Self as futures_core::stream::Stream>::Item>,
{
FromStream::from_stream(self)

View file

@ -24,9 +24,9 @@ impl<S, St, F> Scan<S, St, F> {
impl<S: Unpin, St, F> Unpin for Scan<S, St, F> {}
impl<S, St, F, B> futures_core::stream::Stream for Scan<S, St, F>
impl<S, St, F, B> Stream for Scan<S, St, F>
where
S: Stream,
S: crate::stream::Stream,
F: FnMut(&mut St, S::Item) -> Option<B>,
{
type Item = B;

View file

@ -5,7 +5,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll};
/// An iterator that iterates two other iterators simultaneously.
pub struct Zip<A: Stream, B> {
pub struct Zip<A: crate::stream::stream::Stream, B> {
item_slot: Option<A::Item>,
first: A,
second: B,
@ -22,7 +22,7 @@ impl<A: fmt::Debug + Stream, B: fmt::Debug> fmt::Debug for Zip<A, B> {
impl<A: Unpin + Stream, B: Unpin> Unpin for Zip<A, B> {}
impl<A: Stream, B> Zip<A, B> {
impl<A: crate::stream::stream::Stream, B> Zip<A, B> {
pub(crate) fn new(first: A, second: B) -> Self {
Zip {
item_slot: None,

View file

@ -1,14 +1,15 @@
use crate::stream::{FromStream, IntoStream, Stream};
use std::pin::Pin;
impl<T: Send> FromStream<T> for Vec<T> {
use crate::prelude::*;
use crate::stream::{FromStream, IntoStream};
impl<T> FromStream<T> for Vec<T> {
#[inline]
fn from_stream<'a, S: IntoStream<Item = T>>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>
where
<S as IntoStream>::IntoStream: Send + 'a,
<S as IntoStream>::IntoStream: 'a,
{
let stream = stream.into_stream();