224: Re-export IO traits from futures r=stjepang a=stjepang

Sorry for the big PR!

Instead of providing our own traits `async_std::io::{Read, Write, Seek, BufRead}`, we now re-export `futures::io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncRead}`. While re-exporting we rename them to strip away the "Async" prefix.

The documentation will display the contents of the original traits from the `futures` crate together with our own extension methods. There's a note in the docs saying the extenion methods become available only when `async_std::prelude::*` is imported.

Our extension traits are re-exported into the prelude, but are marked with `#[doc(hidden)]` so they're completely invisible to users.

The benefit of this is that people can now implement traits from `async_std::io` for their types and stay compatible with `futures`. This will also simplify some trait bounds in our APIs - for example, things like `where Self: futures_io::AsyncRead`.

At the same time, I cleaned up some trait bounds in our stream interfaces, but haven't otherwise fiddled with them much.

I intend to follow up with another PR doing the same change for `Stream` so that we re-export the stream trait from `futures`.

Co-authored-by: Stjepan Glavina <stjepang@gmail.com>
staging
bors[bot] 5 years ago committed by GitHub
commit 33ff41df48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -63,5 +63,6 @@ matrix:
- mdbook test -L ./target/debug/deps docs - mdbook test -L ./target/debug/deps docs
script: script:
- cargo check --all --benches --bins --examples --tests
- cargo check --features unstable --all --benches --bins --examples --tests - cargo check --features unstable --all --benches --bins --examples --tests
- cargo test --all --doc --features unstable - cargo test --all --doc --features unstable

@ -51,9 +51,9 @@ Remember the talk about "deferred computation" in the intro? That's all it is. I
Let's have a look at a simple function, specifically the return value: Let's have a look at a simple function, specifically the return value:
```rust,edition2018 ```rust,edition2018
# use std::{fs::File, io::{self, Read}}; # use std::{fs::File, io, io::prelude::*};
# #
fn read_file(path: &str) -> Result<String, io::Error> { fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path)?; let mut file = File::open(path)?;
let mut contents = String::new(); let mut contents = String::new();
file.read_to_string(&mut contents)?; file.read_to_string(&mut contents)?;
@ -67,9 +67,9 @@ Note that this return value talks about the past. The past has a drawback: all d
But we wanted to abstract over *computation* and let someone else choose how to run it. That's fundamentally incompatible with looking at the results of previous computation all the time. So, let's find a type that *describes* a computation without running it. Let's look at the function again: But we wanted to abstract over *computation* and let someone else choose how to run it. That's fundamentally incompatible with looking at the results of previous computation all the time. So, let's find a type that *describes* a computation without running it. Let's look at the function again:
```rust,edition2018 ```rust,edition2018
# use std::{fs::File, io::{self, Read}}; # use std::{fs::File, io, io::prelude::*};
# #
fn read_file(path: &str) -> Result<String, io::Error> { fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path)?; let mut file = File::open(path)?;
let mut contents = String::new(); let mut contents = String::new();
file.read_to_string(&mut contents)?; file.read_to_string(&mut contents)?;
@ -112,10 +112,9 @@ While the `Future` trait has existed in Rust for a while, it was inconvenient to
```rust,edition2018 ```rust,edition2018
# extern crate async_std; # extern crate async_std;
# use async_std::{fs::File, io::Read}; # use async_std::{fs::File, io, io::prelude::*};
# use std::io;
# #
async fn read_file(path: &str) -> Result<String, io::Error> { async fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path).await?; let mut file = File::open(path).await?;
let mut contents = String::new(); let mut contents = String::new();
file.read_to_string(&mut contents).await?; file.read_to_string(&mut contents).await?;
@ -125,7 +124,7 @@ async fn read_file(path: &str) -> Result<String, io::Error> {
Amazingly little difference, right? All we did is label the function `async` and insert 2 special commands: `.await`. Amazingly little difference, right? All we did is label the function `async` and insert 2 special commands: `.await`.
This `async` function sets up a deferred computation. When this function is called, it will produce a `Future<Output=Result<String, io::Error>>` instead of immediately returning a `Result<String, io::Error>`. (Or, more precisely, generate a type for you that implements `Future<Output=Result<String, io::Error>>`.) This `async` function sets up a deferred computation. When this function is called, it will produce a `Future<Output = io::Result<String>>` instead of immediately returning a `io::Result<String>`. (Or, more precisely, generate a type for you that implements `Future<Output = io::Result<String>>`.)
## What does `.await` do? ## What does `.await` do?

@ -6,9 +6,9 @@ In `async-std`, the [`tasks`][tasks] module is responsible for this. The simples
```rust,edition2018 ```rust,edition2018
# extern crate async_std; # extern crate async_std;
use async_std::{io, task, fs::File, io::Read}; use async_std::{fs::File, io, prelude::*, task};
async fn read_file(path: &str) -> Result<String, io::Error> { async fn read_file(path: &str) -> io::Result<String> {
let mut file = File::open(path).await?; let mut file = File::open(path).await?;
let mut contents = String::new(); let mut contents = String::new();
file.read_to_string(&mut contents).await?; file.read_to_string(&mut contents).await?;
@ -33,10 +33,9 @@ This asks the runtime baked into `async_std` to execute the code that reads a fi
```rust,edition2018 ```rust,edition2018
# extern crate async_std; # extern crate async_std;
# use async_std::{fs::File, io::Read, task}; # use async_std::{fs::File, io, prelude::*, task};
# use std::io;
# #
# async fn read_file(path: &str) -> Result<String, io::Error> { # async fn read_file(path: &str) -> io::Result<String> {
# let mut file = File::open(path).await?; # let mut file = File::open(path).await?;
# let mut contents = String::new(); # let mut contents = String::new();
# file.read_to_string(&mut contents).await?; # file.read_to_string(&mut contents).await?;

@ -29,7 +29,7 @@ Now we can write the server's accept loop:
# extern crate async_std; # extern crate async_std;
# use async_std::{ # use async_std::{
# net::{TcpListener, ToSocketAddrs}, # net::{TcpListener, ToSocketAddrs},
# prelude::Stream, # prelude::*,
# }; # };
# #
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
@ -67,7 +67,7 @@ Finally, let's add main:
# extern crate async_std; # extern crate async_std;
# use async_std::{ # use async_std::{
# net::{TcpListener, ToSocketAddrs}, # net::{TcpListener, ToSocketAddrs},
# prelude::Stream, # prelude::*,
# task, # task,
# }; # };
# #

@ -15,9 +15,8 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined
# extern crate futures_channel; # extern crate futures_channel;
# extern crate futures_util; # extern crate futures_util;
# use async_std::{ # use async_std::{
# io::{Write},
# net::TcpStream, # net::TcpStream,
# prelude::{Future, Stream}, # prelude::*,
# task, # task,
# }; # };
# use futures_channel::mpsc; # use futures_channel::mpsc;

@ -72,9 +72,9 @@ We use the `select` macro for this purpose:
# extern crate async_std; # extern crate async_std;
# extern crate futures_channel; # extern crate futures_channel;
# extern crate futures_util; # extern crate futures_util;
# use async_std::{io::Write, net::TcpStream}; # use async_std::{net::TcpStream, prelude::*};
use futures_channel::mpsc; use futures_channel::mpsc;
use futures_util::{select, FutureExt, StreamExt}; use futures_util::{select, FutureExt};
# use std::sync::Arc; # use std::sync::Arc;
# type Receiver<T> = mpsc::UnboundedReceiver<T>; # type Receiver<T> = mpsc::UnboundedReceiver<T>;
@ -94,11 +94,11 @@ async fn connection_writer_loop(
let mut shutdown = shutdown.fuse(); let mut shutdown = shutdown.fuse();
loop { // 2 loop { // 2
select! { select! {
msg = messages.next() => match msg { msg = messages.next().fuse() => match msg {
Some(msg) => stream.write_all(msg.as_bytes()).await?, Some(msg) => stream.write_all(msg.as_bytes()).await?,
None => break, None => break,
}, },
void = shutdown.next() => match void { void = shutdown.next().fuse() => match void {
Some(void) => match void {}, // 3 Some(void) => match void {}, // 3
None => break, None => break,
} }
@ -125,12 +125,13 @@ The final code looks like this:
# extern crate futures_channel; # extern crate futures_channel;
# extern crate futures_util; # extern crate futures_util;
use async_std::{ use async_std::{
io::{BufReader, BufRead, Write}, io::BufReader,
net::{TcpListener, TcpStream, ToSocketAddrs}, net::{TcpListener, TcpStream, ToSocketAddrs},
prelude::*,
task, task,
}; };
use futures_channel::mpsc; use futures_channel::mpsc;
use futures_util::{select, FutureExt, SinkExt, StreamExt}; use futures_util::{select, FutureExt, SinkExt};
use std::{ use std::{
collections::hash_map::{Entry, HashMap}, collections::hash_map::{Entry, HashMap},
future::Future, future::Future,
@ -209,11 +210,11 @@ async fn connection_writer_loop(
let mut shutdown = shutdown.fuse(); let mut shutdown = shutdown.fuse();
loop { loop {
select! { select! {
msg = messages.next() => match msg { msg = messages.next().fuse() => match msg {
Some(msg) => stream.write_all(msg.as_bytes()).await?, Some(msg) => stream.write_all(msg.as_bytes()).await?,
None => break, None => break,
}, },
void = shutdown.next() => match void { void = shutdown.next().fuse() => match void {
Some(void) => match void {}, Some(void) => match void {},
None => break, None => break,
} }
@ -243,11 +244,11 @@ async fn broker_loop(events: Receiver<Event>) {
let mut events = events.fuse(); let mut events = events.fuse();
loop { loop {
let event = select! { let event = select! {
event = events.next() => match event { event = events.next().fuse() => match event {
None => break, // 2 None => break, // 2
Some(event) => event, Some(event) => event,
}, },
disconnect = disconnect_receiver.next() => { disconnect = disconnect_receiver.next().fuse() => {
let (name, _pending_messages) = disconnect.unwrap(); // 3 let (name, _pending_messages) = disconnect.unwrap(); // 3
assert!(peers.remove(&name).is_some()); assert!(peers.remove(&name).is_some());
continue; continue;

@ -18,11 +18,12 @@ With async, we can just use the `select!` macro.
# extern crate async_std; # extern crate async_std;
# extern crate futures_util; # extern crate futures_util;
use async_std::{ use async_std::{
io::{stdin, BufRead, BufReader, Write}, io::{stdin, BufReader},
net::{TcpStream, ToSocketAddrs}, net::{TcpStream, ToSocketAddrs},
prelude::*,
task, task,
}; };
use futures_util::{select, FutureExt, StreamExt}; use futures_util::{select, FutureExt};
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
@ -38,14 +39,14 @@ async fn try_run(addr: impl ToSocketAddrs) -> Result<()> {
let mut lines_from_stdin = BufReader::new(stdin()).lines().fuse(); // 2 let mut lines_from_stdin = BufReader::new(stdin()).lines().fuse(); // 2
loop { loop {
select! { // 3 select! { // 3
line = lines_from_server.next() => match line { line = lines_from_server.next().fuse() => match line {
Some(line) => { Some(line) => {
let line = line?; let line = line?;
println!("{}", line); println!("{}", line);
}, },
None => break, None => break,
}, },
line = lines_from_stdin.next() => match line { line = lines_from_stdin.next().fuse() => match line {
Some(line) => { Some(line) => {
let line = line?; let line = line?;
writer.write_all(line.as_bytes()).await?; writer.write_all(line.as_bytes()).await?;

@ -10,9 +10,9 @@ We need to:
```rust,edition2018 ```rust,edition2018
# extern crate async_std; # extern crate async_std;
# use async_std::{ # use async_std::{
# io::{BufRead, BufReader}, # io::BufReader,
# net::{TcpListener, TcpStream, ToSocketAddrs}, # net::{TcpListener, TcpStream, ToSocketAddrs},
# prelude::Stream, # prelude::*,
# task, # task,
# }; # };
# #
@ -75,9 +75,9 @@ We can "fix" it by waiting for the task to be joined, like this:
# #![feature(async_closure)] # #![feature(async_closure)]
# extern crate async_std; # extern crate async_std;
# use async_std::{ # use async_std::{
# io::{BufRead, BufReader}, # io::BufReader,
# net::{TcpListener, TcpStream, ToSocketAddrs}, # net::{TcpListener, TcpStream, ToSocketAddrs},
# prelude::Stream, # prelude::*,
# task, # task,
# }; # };
# #
@ -125,7 +125,7 @@ So let's use a helper function for this:
# extern crate async_std; # extern crate async_std;
# use async_std::{ # use async_std::{
# io, # io,
# prelude::Future, # prelude::*,
# task, # task,
# }; # };
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()> fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>

@ -16,9 +16,8 @@ if Alice and Charley send two messages to Bob at the same time, Bob will see the
# extern crate futures_channel; # extern crate futures_channel;
# extern crate futures_util; # extern crate futures_util;
# use async_std::{ # use async_std::{
# io::Write,
# net::TcpStream, # net::TcpStream,
# prelude::Stream, # prelude::*,
# }; # };
use futures_channel::mpsc; // 1 use futures_channel::mpsc; // 1
use futures_util::sink::SinkExt; use futures_util::sink::SinkExt;

@ -9,11 +9,11 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer};
use crate::fs::{Metadata, Permissions}; use crate::fs::{Metadata, Permissions};
use crate::future; use crate::future;
use crate::io::{self, SeekFrom, Write}; use crate::io::{self, Read, Seek, SeekFrom, Write};
use crate::prelude::*;
use crate::task::{self, blocking, Context, Poll, Waker}; use crate::task::{self, blocking, Context, Poll, Waker};
/// An open file on the filesystem. /// An open file on the filesystem.
@ -302,7 +302,7 @@ impl fmt::Debug for File {
} }
} }
impl AsyncRead for File { impl Read for File {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -310,14 +310,9 @@ impl AsyncRead for File {
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
Pin::new(&mut &*self).poll_read(cx, buf) Pin::new(&mut &*self).poll_read(cx, buf)
} }
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
} }
impl AsyncRead for &File { impl Read for &File {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -326,14 +321,9 @@ impl AsyncRead for &File {
let state = futures_core::ready!(self.lock.poll_lock(cx)); let state = futures_core::ready!(self.lock.poll_lock(cx));
state.poll_read(cx, buf) state.poll_read(cx, buf)
} }
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
} }
impl AsyncWrite for File { impl Write for File {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -351,7 +341,7 @@ impl AsyncWrite for File {
} }
} }
impl AsyncWrite for &File { impl Write for &File {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -372,7 +362,7 @@ impl AsyncWrite for &File {
} }
} }
impl AsyncSeek for File { impl Seek for File {
fn poll_seek( fn poll_seek(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -382,7 +372,7 @@ impl AsyncSeek for File {
} }
} }
impl AsyncSeek for &File { impl Seek for &File {
fn poll_seek( fn poll_seek(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -1,32 +0,0 @@
use std::pin::Pin;
use futures_io::AsyncBufRead;
use crate::future::Future;
use crate::io;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FillBufFuture<'a, R: ?Sized> {
reader: &'a mut R,
}
impl<'a, R: ?Sized> FillBufFuture<'a, R> {
pub(crate) fn new(reader: &'a mut R) -> Self {
Self { reader }
}
}
impl<'a, R: AsyncBufRead + Unpin + ?Sized> Future for FillBufFuture<'a, R> {
type Output = io::Result<&'a [u8]>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&'a [u8]>> {
let Self { reader } = &mut *self;
let result = Pin::new(reader).poll_fill_buf(cx);
// This is safe because:
// 1. The buffer is valid for the lifetime of the reader.
// 2. Output is unrelated to the wrapper (Self).
result.map_ok(|buf| unsafe { std::mem::transmute::<&'_ [u8], &'a [u8]>(buf) })
}
}

@ -2,10 +2,8 @@ use std::mem;
use std::pin::Pin; use std::pin::Pin;
use std::str; use std::str;
use futures_io::AsyncBufRead;
use super::read_until_internal; use super::read_until_internal;
use crate::io; use crate::io::{self, BufRead};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// A stream of lines in a byte stream. /// A stream of lines in a byte stream.
@ -25,7 +23,7 @@ pub struct Lines<R> {
pub(crate) read: usize, pub(crate) read: usize,
} }
impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> { impl<R: BufRead> futures_core::stream::Stream for Lines<R> {
type Item = io::Result<String>; type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -50,7 +48,7 @@ impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
} }
} }
pub fn read_line_internal<R: AsyncBufRead + ?Sized>( pub fn read_line_internal<R: BufRead + ?Sized>(
reader: Pin<&mut R>, reader: Pin<&mut R>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut String, buf: &mut String,

@ -1,9 +1,7 @@
mod fill_buf;
mod lines; mod lines;
mod read_line; mod read_line;
mod read_until; mod read_until;
use fill_buf::FillBufFuture;
pub use lines::Lines; pub use lines::Lines;
use read_line::ReadLineFuture; use read_line::ReadLineFuture;
use read_until::ReadUntilFuture; use read_until::ReadUntilFuture;
@ -12,115 +10,259 @@ use std::mem;
use std::pin::Pin; use std::pin::Pin;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::AsyncBufRead;
use crate::io; use crate::io;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
cfg_if! { cfg_if! {
if #[cfg(feature = "docs")] { if #[cfg(feature = "docs")] {
use std::ops::{Deref, DerefMut};
#[doc(hidden)] #[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret { /// Allows reading from a buffered byte stream.
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); ///
/// This trait is a re-export of [`futures::io::AsyncBufRead`] and is an async version of
/// [`std::io::BufRead`].
///
/// The [provided methods] do not really exist in the trait itself, but they become
/// available when the prelude is imported:
///
/// ```
/// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
///
/// [`std::io::BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
/// [`futures::io::AsyncBufRead`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html
/// [provided methods]: #provided-methods
pub trait BufRead {
/// Returns the contents of the internal buffer, filling it with more data from the
/// inner reader if it is empty.
///
/// This function is a lower-level call. It needs to be paired with the [`consume`]
/// method to function properly. When calling this method, none of the contents will be
/// "read" in the sense that later calling `read` may return the same contents. As
/// such, [`consume`] must be called with the number of bytes that are consumed from
/// this buffer to ensure that the bytes are never returned twice.
///
/// [`consume`]: #tymethod.consume
///
/// An empty buffer returned indicates that the stream has reached EOF.
// TODO: write a proper doctest with `consume`
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>;
/// Tells this buffer that `amt` bytes have been consumed from the buffer, so they
/// should no longer be returned in calls to `read`.
fn consume(self: Pin<&mut Self>, amt: usize);
/// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
///
/// This function will read bytes from the underlying stream until the delimiter or EOF
/// is found. Once found, all bytes up to, and including, the delimiter (if found) will
/// be appended to `buf`.
///
/// If successful, this function will return the total number of bytes read.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let mut file = BufReader::new(File::open("a.txt").await?);
///
/// let mut buf = Vec::with_capacity(1024);
/// let n = file.read_until(b'\n', &mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
///
/// Multiple successful calls to `read_until` append all bytes up to and including to
/// `buf`:
/// ```
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let from: &[u8] = b"append\nexample\n";
/// let mut reader = BufReader::new(from);
/// let mut buf = vec![];
///
/// let mut size = reader.read_until(b'\n', &mut buf).await?;
/// assert_eq!(size, 7);
/// assert_eq!(buf, b"append\n");
///
/// size += reader.read_until(b'\n', &mut buf).await?;
/// assert_eq!(size, from.len());
///
/// assert_eq!(buf, from);
/// #
/// # Ok(()) }) }
/// ```
fn read_until<'a>(
&'a mut self,
byte: u8,
buf: &'a mut Vec<u8>,
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
unreachable!()
}
/// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is
/// reached.
///
/// This function will read bytes from the underlying stream until the newline
/// delimiter (the 0xA byte) or EOF is found. Once found, all bytes up to, and
/// including, the delimiter (if found) will be appended to `buf`.
///
/// If successful, this function will return the total number of bytes read.
///
/// If this function returns `Ok(0)`, the stream has reached EOF.
///
/// # Errors
///
/// This function has the same error semantics as [`read_until`] and will also return
/// an error if the read bytes are not valid UTF-8. If an I/O error is encountered then
/// `buf` may contain some bytes already read in the event that all data read so far
/// was valid UTF-8.
///
/// [`read_until`]: #method.read_until
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let mut file = BufReader::new(File::open("a.txt").await?);
///
/// let mut buf = String::new();
/// file.read_line(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_line<'a>(
&'a mut self,
buf: &'a mut String,
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
unreachable!()
}
/// Returns a stream over the lines of this byte stream.
///
/// The stream returned from this function will yield instances of
/// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte
/// (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
///
/// [`io::Result`]: type.Result.html
/// [`String`]: https://doc.rust-lang.org/std/string/struct.String.html
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let file = File::open("a.txt").await?;
/// let mut lines = BufReader::new(file).lines();
/// let mut count = 0;
///
/// while let Some(line) = lines.next().await {
/// line?;
/// count += 1;
/// }
/// #
/// # Ok(()) }) }
/// ```
fn lines(self) -> Lines<Self>
where
Self: Unpin + Sized,
{
unreachable!()
}
} }
} else {
macro_rules! ret { impl<T: BufRead + Unpin + ?Sized> BufRead for Box<T> {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&[u8]>> {
unreachable!()
}
fn consume(self: Pin<&mut Self>, amt: usize) {
unreachable!()
}
} }
}
}
/// Allows reading from a buffered byte stream. impl<T: BufRead + Unpin + ?Sized> BufRead for &mut T {
/// fn poll_fill_buf(
/// This trait is an async version of [`std::io::BufRead`]. self: Pin<&mut Self>,
/// cx: &mut Context<'_>,
/// While it is currently not possible to implement this trait directly, it gets implemented ) -> Poll<io::Result<&[u8]>> {
/// automatically for all types that implement [`futures::io::AsyncBufRead`]. unreachable!()
/// }
/// [`std::io::BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
/// [`futures::io::AsyncBufRead`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html
pub trait BufRead {
/// Tells this buffer that `amt` bytes have been consumed from the buffer, so they should no
/// longer be returned in calls to `read`.
fn consume(&mut self, amt: usize)
where
Self: Unpin;
/// Returns the contents of the internal buffer, filling it with more data from the inner fn consume(self: Pin<&mut Self>, amt: usize) {
/// reader if it is empty. unreachable!()
/// }
/// This function is a lower-level call. It needs to be paired with the [`consume`] method to }
/// function properly. When calling this method, none of the contents will be "read" in the
/// sense that later calling `read` may return the same contents. As such, [`consume`] must be impl<P> BufRead for Pin<P>
/// called with the number of bytes that are consumed from this buffer to ensure that the bytes where
/// are never returned twice. P: DerefMut + Unpin,
/// <P as Deref>::Target: BufRead,
/// [`consume`]: #tymethod.consume {
/// fn poll_fill_buf(
/// An empty buffer returned indicates that the stream has reached EOF. self: Pin<&mut Self>,
// TODO: write a proper doctest with `consume` cx: &mut Context<'_>,
fn fill_buf<'a>(&'a mut self) -> ret!('a, FillBufFuture, io::Result<&'a [u8]>) ) -> Poll<io::Result<&[u8]>> {
where unreachable!()
Self: Unpin, }
{
FillBufFuture::new(self) fn consume(self: Pin<&mut Self>, amt: usize) {
unreachable!()
}
}
impl BufRead for &[u8] {
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<&[u8]>> {
unreachable!()
}
fn consume(self: Pin<&mut Self>, amt: usize) {
unreachable!()
}
}
} else {
pub use futures_io::AsyncBufRead as BufRead;
} }
}
/// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached. #[doc(hidden)]
/// pub trait BufReadExt: futures_io::AsyncBufRead {
/// This function will read bytes from the underlying stream until the delimiter or EOF is fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'a, Self>
/// found. Once found, all bytes up to, and including, the delimiter (if found) will be
/// appended to `buf`.
///
/// If successful, this function will return the total number of bytes read.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let mut file = BufReader::new(File::open("a.txt").await?);
///
/// let mut buf = Vec::with_capacity(1024);
/// let n = file.read_until(b'\n', &mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
///
/// Multiple successful calls to `read_until` append all bytes up to and including to `buf`:
/// ```
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let from: &[u8] = b"append\nexample\n";
/// let mut reader = BufReader::new(from);
/// let mut buf = vec![];
///
/// let mut size = reader.read_until(b'\n', &mut buf).await?;
/// assert_eq!(size, 7);
/// assert_eq!(buf, b"append\n");
///
/// size += reader.read_until(b'\n', &mut buf).await?;
/// assert_eq!(size, from.len());
///
/// assert_eq!(buf, from);
/// #
/// # Ok(()) }) }
/// ```
fn read_until<'a>(
&'a mut self,
byte: u8,
buf: &'a mut Vec<u8>,
) -> ret!('a, ReadUntilFuture, io::Result<usize>)
where where
Self: Unpin, Self: Unpin,
{ {
@ -132,44 +274,7 @@ pub trait BufRead {
} }
} }
/// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is reached. fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'a, Self>
///
/// This function will read bytes from the underlying stream until the newline delimiter (the
/// 0xA byte) or EOF is found. Once found, all bytes up to, and including, the delimiter (if
/// found) will be appended to `buf`.
///
/// If successful, this function will return the total number of bytes read.
///
/// If this function returns `Ok(0)`, the stream has reached EOF.
///
/// # Errors
///
/// This function has the same error semantics as [`read_until`] and will also return an error
/// if the read bytes are not valid UTF-8. If an I/O error is encountered then `buf` may
/// contain some bytes already read in the event that all data read so far was valid UTF-8.
///
/// [`read_until`]: #method.read_until
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let mut file = BufReader::new(File::open("a.txt").await?);
///
/// let mut buf = String::new();
/// file.read_line(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_line<'a>(
&'a mut self,
buf: &'a mut String,
) -> ret!('a, ReadLineFuture, io::Result<usize>)
where where
Self: Unpin, Self: Unpin,
{ {
@ -181,35 +286,6 @@ pub trait BufRead {
} }
} }
/// Returns a stream over the lines of this byte stream.
///
/// The stream returned from this function will yield instances of
/// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte (the
/// 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
///
/// [`io::Result`]: type.Result.html
/// [`String`]: https://doc.rust-lang.org/std/string/struct.String.html
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::BufReader;
/// use async_std::prelude::*;
///
/// let file = File::open("a.txt").await?;
/// let mut lines = BufReader::new(file).lines();
/// let mut count = 0;
///
/// while let Some(line) = lines.next().await {
/// line?;
/// count += 1;
/// }
/// #
/// # Ok(()) }) }
/// ```
fn lines(self) -> Lines<Self> fn lines(self) -> Lines<Self>
where where
Self: Unpin + Sized, Self: Unpin + Sized,
@ -223,13 +299,9 @@ pub trait BufRead {
} }
} }
impl<T: AsyncBufRead + Unpin + ?Sized> BufRead for T { impl<T: futures_io::AsyncBufRead + ?Sized> BufReadExt for T {}
fn consume(&mut self, amt: usize) {
AsyncBufRead::consume(Pin::new(self), amt)
}
}
pub fn read_until_internal<R: AsyncBufRead + ?Sized>( pub fn read_until_internal<R: BufReadExt + ?Sized>(
mut reader: Pin<&mut R>, mut reader: Pin<&mut R>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
byte: u8, byte: u8,

@ -2,11 +2,9 @@ use std::mem;
use std::pin::Pin; use std::pin::Pin;
use std::str; use std::str;
use futures_io::AsyncBufRead;
use super::read_until_internal; use super::read_until_internal;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io::{self, BufRead};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
@ -18,7 +16,7 @@ pub struct ReadLineFuture<'a, T: Unpin + ?Sized> {
pub(crate) read: usize, pub(crate) read: usize,
} }
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> { impl<T: BufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
type Output = io::Result<usize>; type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -1,10 +1,8 @@
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncBufRead;
use super::read_until_internal; use super::read_until_internal;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io::{self, BufRead};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
@ -16,7 +14,7 @@ pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> {
pub(crate) read: usize, pub(crate) read: usize,
} }
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> { impl<T: BufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
type Output = io::Result<usize>; type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -2,9 +2,7 @@ use std::io::{IoSliceMut, Read as _};
use std::pin::Pin; use std::pin::Pin;
use std::{cmp, fmt}; use std::{cmp, fmt};
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer}; use crate::io::{self, BufRead, Read, Seek, SeekFrom};
use crate::io::{self, SeekFrom};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
const DEFAULT_CAPACITY: usize = 8 * 1024; const DEFAULT_CAPACITY: usize = 8 * 1024;
@ -193,7 +191,7 @@ impl<R> BufReader<R> {
} }
} }
impl<R: AsyncRead> AsyncRead for BufReader<R> { impl<R: Read> Read for BufReader<R> {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -229,14 +227,9 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
self.consume(nread); self.consume(nread);
Poll::Ready(Ok(nread)) Poll::Ready(Ok(nread))
} }
// we can't skip unconditionally because of the large buffer case in read.
unsafe fn initializer(&self) -> Initializer {
self.inner.initializer()
}
} }
impl<R: AsyncRead> AsyncBufRead for BufReader<R> { impl<R: Read> BufRead for BufReader<R> {
fn poll_fill_buf<'a>( fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>, self: Pin<&'a mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -278,7 +271,7 @@ impl<R: io::Read + fmt::Debug> fmt::Debug for BufReader<R> {
} }
} }
impl<R: AsyncSeek> AsyncSeek for BufReader<R> { impl<R: Seek> Seek for BufReader<R> {
/// Seeks to an offset, in bytes, in the underlying reader. /// Seeks to an offset, in bytes, in the underlying reader.
/// ///
/// The position used for seeking with `SeekFrom::Current(_)` is the position the underlying /// The position used for seeking with `SeekFrom::Current(_)` is the position the underlying

@ -1,9 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use crate::future::Future; use crate::future::Future;
use crate::io::{self, BufReader}; use crate::io::{self, BufRead, BufReader, Read, Write};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// Copies the entire contents of a reader into a writer. /// Copies the entire contents of a reader into a writer.
@ -45,8 +43,8 @@ use crate::task::{Context, Poll};
/// ``` /// ```
pub async fn copy<R, W>(reader: &mut R, writer: &mut W) -> io::Result<u64> pub async fn copy<R, W>(reader: &mut R, writer: &mut W) -> io::Result<u64>
where where
R: AsyncRead + Unpin + ?Sized, R: Read + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized, W: Write + Unpin + ?Sized,
{ {
pub struct CopyFuture<'a, R, W: ?Sized> { pub struct CopyFuture<'a, R, W: ?Sized> {
reader: R, reader: R,
@ -69,8 +67,8 @@ where
impl<R, W> Future for CopyFuture<'_, R, W> impl<R, W> Future for CopyFuture<'_, R, W>
where where
R: AsyncBufRead, R: BufRead,
W: AsyncWrite + Unpin + ?Sized, W: Write + Unpin + ?Sized,
{ {
type Output = io::Result<u64>; type Output = io::Result<u64>;

@ -1,8 +1,7 @@
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
use std::io::{self, IoSlice, IoSliceMut, SeekFrom};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use crate::io::{self, BufRead, IoSlice, IoSliceMut, Read, Seek, SeekFrom, Write};
use crate::task::{Context, Poll};
/// A `Cursor` wraps an in-memory buffer and provides it with a /// A `Cursor` wraps an in-memory buffer and provides it with a
/// [`Seek`] implementation. /// [`Seek`] implementation.
@ -153,7 +152,7 @@ impl<T> Cursor<T> {
} }
} }
impl<T> AsyncSeek for Cursor<T> impl<T> Seek for Cursor<T>
where where
T: AsRef<[u8]> + Unpin, T: AsRef<[u8]> + Unpin,
{ {
@ -162,11 +161,11 @@ where
_: &mut Context<'_>, _: &mut Context<'_>,
pos: SeekFrom, pos: SeekFrom,
) -> Poll<io::Result<u64>> { ) -> Poll<io::Result<u64>> {
Poll::Ready(io::Seek::seek(&mut self.inner, pos)) Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
} }
} }
impl<T> AsyncRead for Cursor<T> impl<T> Read for Cursor<T>
where where
T: AsRef<[u8]> + Unpin, T: AsRef<[u8]> + Unpin,
{ {
@ -175,7 +174,7 @@ where
_cx: &mut Context<'_>, _cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
Poll::Ready(io::Read::read(&mut self.inner, buf)) Poll::Ready(std::io::Read::read(&mut self.inner, buf))
} }
fn poll_read_vectored( fn poll_read_vectored(
@ -183,30 +182,30 @@ where
_: &mut Context<'_>, _: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>], bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
Poll::Ready(io::Read::read_vectored(&mut self.inner, bufs)) Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
} }
} }
impl<T> AsyncBufRead for Cursor<T> impl<T> BufRead for Cursor<T>
where where
T: AsRef<[u8]> + Unpin, T: AsRef<[u8]> + Unpin,
{ {
fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> { fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
Poll::Ready(io::BufRead::fill_buf(&mut self.get_mut().inner)) Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
} }
fn consume(mut self: Pin<&mut Self>, amt: usize) { fn consume(mut self: Pin<&mut Self>, amt: usize) {
io::BufRead::consume(&mut self.inner, amt) std::io::BufRead::consume(&mut self.inner, amt)
} }
} }
impl AsyncWrite for Cursor<&mut [u8]> { impl Write for Cursor<&mut [u8]> {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
_: &mut Context<'_>, _: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write(&mut self.inner, buf)) Poll::Ready(std::io::Write::write(&mut self.inner, buf))
} }
fn poll_write_vectored( fn poll_write_vectored(
@ -214,11 +213,11 @@ impl AsyncWrite for Cursor<&mut [u8]> {
_: &mut Context<'_>, _: &mut Context<'_>,
bufs: &[IoSlice<'_>], bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write_vectored(&mut self.inner, bufs)) Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
} }
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut self.inner)) Poll::Ready(std::io::Write::flush(&mut self.inner))
} }
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -226,13 +225,13 @@ impl AsyncWrite for Cursor<&mut [u8]> {
} }
} }
impl AsyncWrite for Cursor<&mut Vec<u8>> { impl Write for Cursor<&mut Vec<u8>> {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
_: &mut Context<'_>, _: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write(&mut self.inner, buf)) Poll::Ready(std::io::Write::write(&mut self.inner, buf))
} }
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -240,17 +239,17 @@ impl AsyncWrite for Cursor<&mut Vec<u8>> {
} }
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut self.inner)) Poll::Ready(std::io::Write::flush(&mut self.inner))
} }
} }
impl AsyncWrite for Cursor<Vec<u8>> { impl Write for Cursor<Vec<u8>> {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
_: &mut Context<'_>, _: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write(&mut self.inner, buf)) Poll::Ready(std::io::Write::write(&mut self.inner, buf))
} }
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -258,6 +257,6 @@ impl AsyncWrite for Cursor<Vec<u8>> {
} }
fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut self.inner)) Poll::Ready(std::io::Write::flush(&mut self.inner))
} }
} }

@ -1,9 +1,7 @@
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use futures_io::{AsyncBufRead, AsyncRead, Initializer}; use crate::io::{self, BufRead, Read};
use crate::io;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// Creates a reader that contains no data. /// Creates a reader that contains no data.
@ -43,7 +41,7 @@ impl fmt::Debug for Empty {
} }
} }
impl AsyncRead for Empty { impl Read for Empty {
#[inline] #[inline]
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
@ -52,14 +50,9 @@ impl AsyncRead for Empty {
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(0)) Poll::Ready(Ok(0))
} }
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
} }
impl AsyncBufRead for Empty { impl BufRead for Empty {
#[inline] #[inline]
fn poll_fill_buf<'a>( fn poll_fill_buf<'a>(
self: Pin<&'a mut Self>, self: Pin<&'a mut Self>,

@ -20,8 +20,6 @@
//! # Ok(()) }) } //! # Ok(()) }) }
//! ``` //! ```
pub mod prelude;
#[doc(inline)] #[doc(inline)]
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
@ -40,17 +38,20 @@ pub use stdout::{stdout, Stdout};
pub use timeout::timeout; pub use timeout::timeout;
pub use write::Write; pub use write::Write;
mod buf_read; pub mod prelude;
pub(crate) mod buf_read;
pub(crate) mod read;
pub(crate) mod seek;
pub(crate) mod write;
mod buf_reader; mod buf_reader;
mod copy; mod copy;
mod cursor; mod cursor;
mod empty; mod empty;
mod read;
mod repeat; mod repeat;
mod seek;
mod sink; mod sink;
mod stderr; mod stderr;
mod stdin; mod stdin;
mod stdout; mod stdout;
mod timeout; mod timeout;
mod write;

@ -9,10 +9,19 @@
//! ``` //! ```
#[doc(no_inline)] #[doc(no_inline)]
pub use super::BufRead; pub use crate::io::BufRead;
#[doc(no_inline)] #[doc(no_inline)]
pub use super::Read; pub use crate::io::Read;
#[doc(no_inline)] #[doc(no_inline)]
pub use super::Seek; pub use crate::io::Seek;
#[doc(no_inline)] #[doc(no_inline)]
pub use super::Write; pub use crate::io::Write;
#[doc(hidden)]
pub use crate::io::buf_read::BufReadExt as _;
#[doc(hidden)]
pub use crate::io::read::ReadExt as _;
#[doc(hidden)]
pub use crate::io::seek::SeekExt as _;
#[doc(hidden)]
pub use crate::io::write::WriteExt as _;

@ -10,119 +10,294 @@ use read_to_end::{read_to_end_internal, ReadToEndFuture};
use read_to_string::ReadToStringFuture; use read_to_string::ReadToStringFuture;
use read_vectored::ReadVectoredFuture; use read_vectored::ReadVectoredFuture;
use std::io;
use std::mem; use std::mem;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::AsyncRead;
use crate::io::IoSliceMut;
cfg_if! { cfg_if! {
if #[cfg(feature = "docs")] { if #[cfg(feature = "docs")] {
use std::pin::Pin;
use std::ops::{Deref, DerefMut};
use crate::io;
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret { /// Allows reading from a byte stream.
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); ///
/// This trait is a re-export of [`futures::io::AsyncRead`] and is an async version of
/// [`std::io::Read`].
///
/// Methods other than [`poll_read`] and [`poll_read_vectored`] do not really exist in the
/// trait itself, but they become available when the prelude is imported:
///
/// ```
/// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
///
/// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
/// [`futures::io::AsyncRead`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncRead.html
/// [`poll_read`]: #tymethod.poll_read
/// [`poll_read_vectored`]: #method.poll_read_vectored
pub trait Read {
/// Attempt to read from the `AsyncRead` into `buf`.
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>>;
/// Attempt to read from the `AsyncRead` into `bufs` using vectored IO operations.
fn poll_read_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
unreachable!()
}
/// Reads some bytes from the byte stream.
///
/// Returns the number of bytes read from the start of the buffer.
///
/// If the return value is `Ok(n)`, then it must be guaranteed that
/// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
/// filled in with `n` bytes of data. If `n` is `0`, then it can indicate one of two
/// scenarios:
///
/// 1. This reader has reached its "end of file" and will likely no longer be able to
/// produce bytes. Note that this does not mean that the reader will always no
/// longer be able to produce bytes.
/// 2. The buffer specified was 0 bytes in length.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let mut buf = vec![0; 1024];
/// let n = file.read(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin
{
unreachable!()
}
/// Like [`read`], except that it reads into a slice of buffers.
///
/// Data is copied to fill each buffer in order, with the final buffer written to
/// possibly being only partially filled. This method must behave as a single call to
/// [`read`] with the buffers concatenated would.
///
/// The default implementation calls [`read`] with either the first nonempty buffer
/// provided, or an empty one if none exists.
///
/// [`read`]: #tymethod.read
fn read_vectored<'a>(
&'a mut self,
bufs: &'a mut [IoSliceMut<'a>],
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
unreachable!()
}
/// Reads all bytes from the byte stream.
///
/// All bytes read from this stream will be appended to the specified buffer `buf`.
/// This function will continuously call [`read`] to append more data to `buf` until
/// [`read`] returns either `Ok(0)` or an error.
///
/// If successful, this function will return the total number of bytes read.
///
/// [`read`]: #tymethod.read
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let mut buf = Vec::new();
/// file.read_to_end(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_to_end<'a>(
&'a mut self,
buf: &'a mut Vec<u8>,
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
unreachable!()
}
/// Reads all bytes from the byte stream and appends them into a string.
///
/// If successful, this function will return the number of bytes read.
///
/// If the data in this stream is not valid UTF-8 then an error will be returned and
/// `buf` will be left unmodified.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let mut buf = String::new();
/// file.read_to_string(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_to_string<'a>(
&'a mut self,
buf: &'a mut String,
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
unreachable!()
}
/// Reads the exact number of bytes required to fill `buf`.
///
/// This function reads as many bytes as necessary to completely fill the specified
/// buffer `buf`.
///
/// No guarantees are provided about the contents of `buf` when this function is
/// called, implementations cannot rely on any property of the contents of `buf` being
/// true. It is recommended that implementations only write data to `buf` instead of
/// reading its contents.
///
/// If this function encounters an "end of file" before completely filling the buffer,
/// it returns an error of the kind [`ErrorKind::UnexpectedEof`]. The contents of
/// `buf` are unspecified in this case.
///
/// If any other read error is encountered then this function immediately returns. The
/// contents of `buf` are unspecified in this case.
///
/// If this function returns an error, it is unspecified how many bytes it has read,
/// but it will never read more than would be necessary to completely fill the buffer.
///
/// [`ErrorKind::UnexpectedEof`]: enum.ErrorKind.html#variant.UnexpectedEof
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let mut buf = vec![0; 10];
/// file.read_exact(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ImplFuture<'a, io::Result<()>>
where
Self: Unpin,
{
unreachable!()
}
} }
} else {
macro_rules! ret { impl<T: Read + Unpin + ?Sized> Read for Box<T> {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
}
impl<T: Read + Unpin + ?Sized> Read for &mut T {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
}
impl<P> Read for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: Read,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
}
impl Read for &[u8] {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
} }
} else {
pub use futures_io::AsyncRead as Read;
} }
} }
/// Allows reading from a byte stream. #[doc(hidden)]
/// pub trait ReadExt: futures_io::AsyncRead {
/// This trait is an async version of [`std::io::Read`]. fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
///
/// While it is currently not possible to implement this trait directly, it gets implemented
/// automatically for all types that implement [`futures::io::AsyncRead`].
///
/// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
/// [`futures::io::AsyncRead`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncRead.html
pub trait Read {
/// Reads some bytes from the byte stream.
///
/// Returns the number of bytes read from the start of the buffer.
///
/// If the return value is `Ok(n)`, then it must be guaranteed that `0 <= n <= buf.len()`. A
/// nonzero `n` value indicates that the buffer has been filled in with `n` bytes of data. If
/// `n` is `0`, then it can indicate one of two scenarios:
///
/// 1. This reader has reached its "end of file" and will likely no longer be able to produce
/// bytes. Note that this does not mean that the reader will always no longer be able to
/// produce bytes.
/// 2. The buffer specified was 0 bytes in length.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let mut buf = vec![0; 1024];
/// let n = file.read(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result<usize>)
where where
Self: Unpin; Self: Unpin,
{
/// Like [`read`], except that it reads into a slice of buffers. ReadFuture { reader: self, buf }
/// }
/// Data is copied to fill each buffer in order, with the final buffer written to possibly
/// being only partially filled. This method must behave as a single call to [`read`] with the
/// buffers concatenated would.
///
/// The default implementation calls [`read`] with either the first nonempty buffer provided,
/// or an empty one if none exists.
///
/// [`read`]: #tymethod.read
fn read_vectored<'a>( fn read_vectored<'a>(
&'a mut self, &'a mut self,
bufs: &'a mut [io::IoSliceMut<'a>], bufs: &'a mut [IoSliceMut<'a>],
) -> ret!('a, ReadVectoredFuture, io::Result<usize>) ) -> ReadVectoredFuture<'a, Self>
where where
Self: Unpin, Self: Unpin,
{ {
ReadVectoredFuture { reader: self, bufs } ReadVectoredFuture { reader: self, bufs }
} }
/// Reads all bytes from the byte stream. fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
///
/// All bytes read from this stream will be appended to the specified buffer `buf`. This
/// function will continuously call [`read`] to append more data to `buf` until [`read`]
/// returns either `Ok(0)` or an error.
///
/// If successful, this function will return the total number of bytes read.
///
/// [`read`]: #tymethod.read
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let mut buf = Vec::new();
/// file.read_to_end(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_to_end<'a>(
&'a mut self,
buf: &'a mut Vec<u8>,
) -> ret!('a, ReadToEndFuture, io::Result<usize>)
where where
Self: Unpin, Self: Unpin,
{ {
@ -134,32 +309,7 @@ pub trait Read {
} }
} }
/// Reads all bytes from the byte stream and appends them into a string. fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
///
/// If successful, this function will return the number of bytes read.
///
/// If the data in this stream is not valid UTF-8 then an error will be returned and `buf` will
/// be left unmodified.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let mut buf = String::new();
/// file.read_to_string(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_to_string<'a>(
&'a mut self,
buf: &'a mut String,
) -> ret!('a, ReadToStringFuture, io::Result<usize>)
where where
Self: Unpin, Self: Unpin,
{ {
@ -172,43 +322,7 @@ pub trait Read {
} }
} }
/// Reads the exact number of bytes required to fill `buf`. fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
///
/// This function reads as many bytes as necessary to completely fill the specified buffer
/// `buf`.
///
/// No guarantees are provided about the contents of `buf` when this function is called,
/// implementations cannot rely on any property of the contents of `buf` being true. It is
/// recommended that implementations only write data to `buf` instead of reading its contents.
///
/// If this function encounters an "end of file" before completely filling the buffer, it
/// returns an error of the kind [`ErrorKind::UnexpectedEof`]. The contents of `buf` are
/// unspecified in this case.
///
/// If any other read error is encountered then this function immediately returns. The contents
/// of `buf` are unspecified in this case.
///
/// If this function returns an error, it is unspecified how many bytes it has read, but it
/// will never read more than would be necessary to completely fill the buffer.
///
/// [`ErrorKind::UnexpectedEof`]: enum.ErrorKind.html#variant.UnexpectedEof
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let mut buf = vec![0; 10];
/// file.read_exact(&mut buf).await?;
/// #
/// # Ok(()) }) }
/// ```
fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadExactFuture, io::Result<()>)
where where
Self: Unpin, Self: Unpin,
{ {
@ -216,8 +330,4 @@ pub trait Read {
} }
} }
impl<T: AsyncRead + Unpin + ?Sized> Read for T { impl<T: futures_io::AsyncRead + ?Sized> ReadExt for T {}
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result<usize>) {
ReadFuture { reader: self, buf }
}
}

@ -1,10 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncRead; use crate::future::Future;
use crate::io::{self, Read};
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -13,7 +11,7 @@ pub struct ReadFuture<'a, T: Unpin + ?Sized> {
pub(crate) buf: &'a mut [u8], pub(crate) buf: &'a mut [u8],
} }
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, T> { impl<T: Read + Unpin + ?Sized> Future for ReadFuture<'_, T> {
type Output = io::Result<usize>; type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -1,11 +1,9 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncRead; use crate::future::Future;
use crate::io::{self, Read};
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -14,7 +12,7 @@ pub struct ReadExactFuture<'a, T: Unpin + ?Sized> {
pub(crate) buf: &'a mut [u8], pub(crate) buf: &'a mut [u8],
} }
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> { impl<T: Read + Unpin + ?Sized> Future for ReadExactFuture<'_, T> {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -1,10 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncRead; use crate::future::Future;
use crate::io::{self, Read};
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -14,7 +12,7 @@ pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> {
pub(crate) start_len: usize, pub(crate) start_len: usize,
} }
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> { impl<T: Read + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
type Output = io::Result<usize>; type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -36,7 +34,7 @@ impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> {
// //
// Because we're extending the buffer with uninitialized data for trusted // Because we're extending the buffer with uninitialized data for trusted
// readers, we need to make sure to truncate that if any of this panics. // readers, we need to make sure to truncate that if any of this panics.
pub fn read_to_end_internal<R: AsyncRead + ?Sized>( pub fn read_to_end_internal<R: Read + ?Sized>(
mut rd: Pin<&mut R>, mut rd: Pin<&mut R>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut Vec<u8>, buf: &mut Vec<u8>,

@ -1,13 +1,11 @@
use super::read_to_end_internal;
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
use std::str; use std::str;
use futures_io::AsyncRead; use super::read_to_end_internal;
use crate::future::Future;
use crate::io::{self, Read};
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -18,7 +16,7 @@ pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> {
pub(crate) start_len: usize, pub(crate) start_len: usize,
} }
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> { impl<T: Read + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> {
type Output = io::Result<usize>; type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -1,12 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io::IoSliceMut;
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncRead; use crate::future::Future;
use crate::io::{self, IoSliceMut, Read};
use crate::io; use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -15,7 +11,7 @@ pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> {
pub(crate) bufs: &'a mut [IoSliceMut<'a>], pub(crate) bufs: &'a mut [IoSliceMut<'a>],
} }
impl<T: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, T> { impl<T: Read + Unpin + ?Sized> Future for ReadVectoredFuture<'_, T> {
type Output = io::Result<usize>; type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -1,9 +1,7 @@
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use futures_io::{AsyncRead, Initializer}; use crate::io::{self, Read};
use crate::io;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// Creates an instance of a reader that infinitely repeats one byte. /// Creates an instance of a reader that infinitely repeats one byte.
@ -44,7 +42,7 @@ impl fmt::Debug for Repeat {
} }
} }
impl AsyncRead for Repeat { impl Read for Repeat {
#[inline] #[inline]
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
@ -56,9 +54,4 @@ impl AsyncRead for Repeat {
} }
Poll::Ready(Ok(buf.len())) Poll::Ready(Ok(buf.len()))
} }
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
} }

@ -1,7 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::AsyncSeek;
use crate::future::Future; use crate::future::Future;
use crate::io::{self, SeekFrom}; use crate::io::{self, SeekFrom};
@ -9,63 +8,116 @@ use crate::task::{Context, Poll};
cfg_if! { cfg_if! {
if #[cfg(feature = "docs")] { if #[cfg(feature = "docs")] {
use std::ops::{Deref, DerefMut};
#[doc(hidden)] #[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); pub struct ImplFuture<T>(std::marker::PhantomData<T>);
/// Allows seeking through a byte stream.
///
/// This trait is a re-export of [`futures::io::AsyncSeek`] and is an async version of
/// [`std::io::Seek`].
///
/// The [provided methods] do not really exist in the trait itself, but they become
/// available when the prelude is imported:
///
/// ```
/// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
///
/// [`std::io::Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
/// [`futures::io::AsyncSeek`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncSeek.html
/// [provided methods]: #provided-methods
pub trait Seek {
/// Attempt to seek to an offset, in bytes, in a stream.
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>>;
macro_rules! ret { /// Seeks to a new position in a byte stream.
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); ///
/// Returns the new position in the byte stream.
///
/// A seek beyond the end of stream is allowed, but behavior is defined by the
/// implementation.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::SeekFrom;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let file_len = file.seek(SeekFrom::End(0)).await?;
/// #
/// # Ok(()) }) }
/// ```
fn seek(&mut self, pos: SeekFrom) -> ImplFuture<io::Result<u64>>
where
Self: Unpin
{
unreachable!()
}
} }
} else {
macro_rules! ret { impl<T: Seek + Unpin + ?Sized> Seek for Box<T> {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
unreachable!()
}
} }
impl<T: Seek + Unpin + ?Sized> Seek for &mut T {
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
unreachable!()
}
}
impl<P> Seek for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: Seek,
{
fn poll_seek(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<io::Result<u64>> {
unreachable!()
}
}
} else {
pub use futures_io::AsyncSeek as Seek;
} }
} }
/// Allows seeking through a byte stream. #[doc(hidden)]
/// pub trait SeekExt: futures_io::AsyncSeek {
/// This trait is an async version of [`std::io::Seek`]. fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
///
/// While it is currently not possible to implement this trait directly, it gets implemented
/// automatically for all types that implement [`futures::io::AsyncSeek`].
///
/// [`std::io::Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html
/// [`futures::io::AsyncSeek`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncSeek.html
pub trait Seek {
/// Seeks to a new position in a byte stream.
///
/// Returns the new position in the byte stream.
///
/// A seek beyond the end of stream is allowed, but behavior is defined by the
/// implementation.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::io::SeekFrom;
/// use async_std::prelude::*;
///
/// let mut file = File::open("a.txt").await?;
///
/// let file_len = file.seek(SeekFrom::End(0)).await?;
/// #
/// # Ok(()) }) }
/// ```
fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result<u64>)
where where
Self: Unpin; Self: Unpin,
} {
impl<T: AsyncSeek + Unpin + ?Sized> Seek for T {
fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result<u64>) {
SeekFuture { seeker: self, pos } SeekFuture { seeker: self, pos }
} }
} }
impl<T: futures_io::AsyncSeek + ?Sized> SeekExt for T {}
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct SeekFuture<'a, T: Unpin + ?Sized> { pub struct SeekFuture<'a, T: Unpin + ?Sized> {
@ -73,7 +125,7 @@ pub struct SeekFuture<'a, T: Unpin + ?Sized> {
pos: SeekFrom, pos: SeekFrom,
} }
impl<T: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, T> { impl<T: SeekExt + Unpin + ?Sized> Future for SeekFuture<'_, T> {
type Output = io::Result<u64>; type Output = io::Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -1,9 +1,7 @@
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncWrite; use crate::io::{self, Write};
use crate::io;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// Creates a writer that consumes and drops all data. /// Creates a writer that consumes and drops all data.
@ -40,7 +38,7 @@ impl fmt::Debug for Sink {
} }
} }
impl AsyncWrite for Sink { impl Write for Sink {
#[inline] #[inline]
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,

@ -1,11 +1,10 @@
use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::AsyncWrite;
use crate::future::Future; use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
/// Constructs a new handle to the standard error of the current process. /// Constructs a new handle to the standard error of the current process.
@ -29,7 +28,7 @@ use crate::task::{blocking, Context, Poll};
/// ``` /// ```
pub fn stderr() -> Stderr { pub fn stderr() -> Stderr {
Stderr(Mutex::new(State::Idle(Some(Inner { Stderr(Mutex::new(State::Idle(Some(Inner {
stderr: io::stderr(), stderr: std::io::stderr(),
buf: Vec::new(), buf: Vec::new(),
last_op: None, last_op: None,
})))) }))))
@ -64,7 +63,7 @@ enum State {
#[derive(Debug)] #[derive(Debug)]
struct Inner { struct Inner {
/// The blocking stderr handle. /// The blocking stderr handle.
stderr: io::Stderr, stderr: std::io::Stderr,
/// The write buffer. /// The write buffer.
buf: Vec<u8>, buf: Vec<u8>,
@ -80,7 +79,7 @@ enum Operation {
Flush(io::Result<()>), Flush(io::Result<()>),
} }
impl AsyncWrite for Stderr { impl Write for Stderr {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -118,7 +117,7 @@ impl AsyncWrite for Stderr {
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { *state = State::Busy(blocking::spawn(async move {
let res = io::Write::write(&mut inner.stderr, &mut inner.buf); let res = std::io::Write::write(&mut inner.stderr, &mut inner.buf);
inner.last_op = Some(Operation::Write(res)); inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner)) State::Idle(Some(inner))
})); }));
@ -146,7 +145,7 @@ impl AsyncWrite for Stderr {
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { *state = State::Busy(blocking::spawn(async move {
let res = io::Write::flush(&mut inner.stderr); let res = std::io::Write::flush(&mut inner.stderr);
inner.last_op = Some(Operation::Flush(res)); inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner)) State::Idle(Some(inner))
})); }));
@ -179,7 +178,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] { if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for Stderr { impl AsRawFd for Stderr {
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
io::stderr().as_raw_fd() std::io::stderr().as_raw_fd()
} }
} }
} }
@ -190,7 +189,7 @@ cfg_if! {
if #[cfg(any(windows, feature = "docs"))] { if #[cfg(any(windows, feature = "docs"))] {
impl AsRawHandle for Stderr { impl AsRawHandle for Stderr {
fn as_raw_handle(&self) -> RawHandle { fn as_raw_handle(&self) -> RawHandle {
io::stderr().as_raw_handle() std::io::stderr().as_raw_handle()
} }
} }
} }

@ -1,11 +1,10 @@
use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::{AsyncRead, Initializer};
use crate::future::{self, Future}; use crate::future::{self, Future};
use crate::io::{self, Read};
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
/// Constructs a new handle to the standard input of the current process. /// Constructs a new handle to the standard input of the current process.
@ -29,7 +28,7 @@ use crate::task::{blocking, Context, Poll};
/// ``` /// ```
pub fn stdin() -> Stdin { pub fn stdin() -> Stdin {
Stdin(Mutex::new(State::Idle(Some(Inner { Stdin(Mutex::new(State::Idle(Some(Inner {
stdin: io::stdin(), stdin: std::io::stdin(),
line: String::new(), line: String::new(),
buf: Vec::new(), buf: Vec::new(),
last_op: None, last_op: None,
@ -65,7 +64,7 @@ enum State {
#[derive(Debug)] #[derive(Debug)]
struct Inner { struct Inner {
/// The blocking stdin handle. /// The blocking stdin handle.
stdin: io::Stdin, stdin: std::io::Stdin,
/// The line buffer. /// The line buffer.
line: String, line: String,
@ -137,7 +136,7 @@ impl Stdin {
} }
} }
impl AsyncRead for Stdin { impl Read for Stdin {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -174,7 +173,7 @@ impl AsyncRead for Stdin {
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { *state = State::Busy(blocking::spawn(async move {
let res = io::Read::read(&mut inner.stdin, &mut inner.buf); let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf);
inner.last_op = Some(Operation::Read(res)); inner.last_op = Some(Operation::Read(res));
State::Idle(Some(inner)) State::Idle(Some(inner))
})); }));
@ -185,11 +184,6 @@ impl AsyncRead for Stdin {
} }
} }
} }
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
} }
cfg_if! { cfg_if! {
@ -208,7 +202,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] { if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for Stdin { impl AsRawFd for Stdin {
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
io::stdin().as_raw_fd() std::io::stdin().as_raw_fd()
} }
} }
} }
@ -219,7 +213,7 @@ cfg_if! {
if #[cfg(any(windows, feature = "docs"))] { if #[cfg(any(windows, feature = "docs"))] {
impl AsRawHandle for Stdin { impl AsRawHandle for Stdin {
fn as_raw_handle(&self) -> RawHandle { fn as_raw_handle(&self) -> RawHandle {
io::stdin().as_raw_handle() std::io::stdin().as_raw_handle()
} }
} }
} }

@ -1,11 +1,10 @@
use std::io;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::AsyncWrite;
use crate::future::Future; use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
/// Constructs a new handle to the standard output of the current process. /// Constructs a new handle to the standard output of the current process.
@ -29,7 +28,7 @@ use crate::task::{blocking, Context, Poll};
/// ``` /// ```
pub fn stdout() -> Stdout { pub fn stdout() -> Stdout {
Stdout(Mutex::new(State::Idle(Some(Inner { Stdout(Mutex::new(State::Idle(Some(Inner {
stdout: io::stdout(), stdout: std::io::stdout(),
buf: Vec::new(), buf: Vec::new(),
last_op: None, last_op: None,
})))) }))))
@ -64,7 +63,7 @@ enum State {
#[derive(Debug)] #[derive(Debug)]
struct Inner { struct Inner {
/// The blocking stdout handle. /// The blocking stdout handle.
stdout: io::Stdout, stdout: std::io::Stdout,
/// The write buffer. /// The write buffer.
buf: Vec<u8>, buf: Vec<u8>,
@ -80,7 +79,7 @@ enum Operation {
Flush(io::Result<()>), Flush(io::Result<()>),
} }
impl AsyncWrite for Stdout { impl Write for Stdout {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -118,7 +117,7 @@ impl AsyncWrite for Stdout {
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { *state = State::Busy(blocking::spawn(async move {
let res = io::Write::write(&mut inner.stdout, &mut inner.buf); let res = std::io::Write::write(&mut inner.stdout, &mut inner.buf);
inner.last_op = Some(Operation::Write(res)); inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner)) State::Idle(Some(inner))
})); }));
@ -146,7 +145,7 @@ impl AsyncWrite for Stdout {
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { *state = State::Busy(blocking::spawn(async move {
let res = io::Write::flush(&mut inner.stdout); let res = std::io::Write::flush(&mut inner.stdout);
inner.last_op = Some(Operation::Flush(res)); inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner)) State::Idle(Some(inner))
})); }));
@ -179,7 +178,7 @@ cfg_if! {
if #[cfg(any(unix, feature = "docs"))] { if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for Stdout { impl AsRawFd for Stdout {
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
io::stdout().as_raw_fd() std::io::stdout().as_raw_fd()
} }
} }
} }
@ -190,7 +189,7 @@ cfg_if! {
if #[cfg(any(windows, feature = "docs"))] { if #[cfg(any(windows, feature = "docs"))] {
impl AsRawHandle for Stdout { impl AsRawHandle for Stdout {
fn as_raw_handle(&self) -> RawHandle { fn as_raw_handle(&self) -> RawHandle {
io::stdout().as_raw_handle() std::io::stdout().as_raw_handle()
} }
} }
} }

@ -1,10 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncWrite; use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -12,7 +10,7 @@ pub struct FlushFuture<'a, T: Unpin + ?Sized> {
pub(crate) writer: &'a mut T, pub(crate) writer: &'a mut T,
} }
impl<T: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, T> { impl<T: Write + Unpin + ?Sized> Future for FlushFuture<'_, T> {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -8,130 +8,277 @@ use write::WriteFuture;
use write_all::WriteAllFuture; use write_all::WriteAllFuture;
use write_vectored::WriteVectoredFuture; use write_vectored::WriteVectoredFuture;
use std::io;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::AsyncWrite;
use crate::io::IoSlice;
cfg_if! { cfg_if! {
if #[cfg(feature = "docs")] { if #[cfg(feature = "docs")] {
use std::pin::Pin;
use std::ops::{Deref, DerefMut};
use crate::io;
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret { /// Allows writing to a byte stream.
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); ///
/// This trait is a re-export of [`futures::io::AsyncWrite`] and is an async version of
/// [`std::io::Write`].
///
/// Methods other than [`poll_write`], [`poll_write_vectored`], [`poll_flush`], and
/// [`poll_close`] do not really exist in the trait itself, but they become available when
/// the prelude is imported:
///
/// ```
/// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
///
/// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
/// [`futures::io::AsyncWrite`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncWrite.html
/// [`poll_write`]: #tymethod.poll_write
/// [`poll_write_vectored`]: #method.poll_write_vectored
/// [`poll_flush`]: #tymethod.poll_flush
/// [`poll_close`]: #tymethod.poll_close
pub trait Write {
/// Attempt to write bytes from `buf` into the object.
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>>;
/// Attempt to write bytes from `bufs` into the object using vectored
/// IO operations.
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>]
) -> Poll<io::Result<usize>> {
unreachable!()
}
/// Attempt to flush the object, ensuring that any buffered data reach
/// their destination.
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
/// Attempt to close the object.
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
/// Writes some bytes into the byte stream.
///
/// Returns the number of bytes written from the start of the buffer.
///
/// If the return value is `Ok(n)` then it must be guaranteed that
/// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
/// object is no longer able to accept bytes and will likely not be able to in the
/// future as well, or that the buffer provided is empty.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::create("a.txt").await?;
///
/// let n = file.write(b"hello world").await?;
/// #
/// # Ok(()) }) }
/// ```
fn write<'a>(&'a mut self, buf: &'a [u8]) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
unreachable!()
}
/// Flushes the stream to ensure that all buffered contents reach their destination.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::create("a.txt").await?;
///
/// file.write_all(b"hello world").await?;
/// file.flush().await?;
/// #
/// # Ok(()) }) }
/// ```
fn flush(&mut self) -> ImplFuture<'_, io::Result<()>>
where
Self: Unpin,
{
unreachable!()
}
/// Like [`write`], except that it writes from a slice of buffers.
///
/// Data is copied from each buffer in order, with the final buffer read from possibly
/// being only partially consumed. This method must behave as a call to [`write`] with
/// the buffers concatenated would.
///
/// The default implementation calls [`write`] with either the first nonempty buffer
/// provided, or an empty one if none exists.
///
/// [`write`]: #tymethod.write
fn write_vectored<'a>(
&'a mut self,
bufs: &'a [IoSlice<'a>],
) -> ImplFuture<'a, io::Result<usize>>
where
Self: Unpin,
{
unreachable!()
}
/// Writes an entire buffer into the byte stream.
///
/// This method will continuously call [`write`] until there is no more data to be
/// written or an error is returned. This method will not return until the entire
/// buffer has been successfully written or such an error occurs.
///
/// [`write`]: #tymethod.write
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::create("a.txt").await?;
///
/// file.write_all(b"hello world").await?;
/// #
/// # Ok(()) }) }
/// ```
///
/// [`write`]: #tymethod.write
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ImplFuture<'a, io::Result<()>>
where
Self: Unpin,
{
unreachable!()
}
} }
} else {
macro_rules! ret { impl<T: Write + Unpin + ?Sized> Write for Box<T> {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
}
impl<T: Write + Unpin + ?Sized> Write for &mut T {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
}
impl<P> Write for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: Write,
{
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
}
impl Write for Vec<u8> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
unreachable!()
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
unreachable!()
}
} }
} else {
pub use futures_io::AsyncWrite as Write;
} }
} }
/// Allows writing to a byte stream. #[doc(hidden)]
/// pub trait WriteExt: Write {
/// This trait is an async version of [`std::io::Write`]. fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
///
/// While it is currently not possible to implement this trait directly, it gets implemented
/// automatically for all types that implement [`futures::io::AsyncWrite`].
///
/// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
/// [`futures::io::AsyncWrite`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncWrite.html
pub trait Write {
/// Writes some bytes into the byte stream.
///
/// Returns the number of bytes written from the start of the buffer.
///
/// If the return value is `Ok(n)` then it must be guaranteed that `0 <= n <= buf.len()`. A
/// return value of `0` typically means that the underlying object is no longer able to accept
/// bytes and will likely not be able to in the future as well, or that the buffer provided is
/// empty.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::create("a.txt").await?;
///
/// let n = file.write(b"hello world").await?;
/// #
/// # Ok(()) }) }
/// ```
fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result<usize>)
where where
Self: Unpin; Self: Unpin,
{
/// Flushes the stream to ensure that all buffered contents reach their destination. WriteFuture { writer: self, buf }
/// }
/// # Examples
/// fn flush(&mut self) -> FlushFuture<'_, Self>
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::create("a.txt").await?;
///
/// file.write_all(b"hello world").await?;
/// file.flush().await?;
/// #
/// # Ok(()) }) }
/// ```
fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>)
where where
Self: Unpin; Self: Unpin,
{
/// Like [`write`], except that it writes from a slice of buffers. FlushFuture { writer: self }
/// }
/// Data is copied from each buffer in order, with the final buffer read from possibly being
/// only partially consumed. This method must behave as a call to [`write`] with the buffers fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
/// concatenated would.
///
/// The default implementation calls [`write`] with either the first nonempty buffer provided,
/// or an empty one if none exists.
///
/// [`write`]: #tymethod.write
fn write_vectored<'a>(
&'a mut self,
bufs: &'a [io::IoSlice<'a>],
) -> ret!('a, WriteVectoredFuture, io::Result<usize>)
where where
Self: Unpin, Self: Unpin,
{ {
WriteVectoredFuture { writer: self, bufs } WriteVectoredFuture { writer: self, bufs }
} }
/// Writes an entire buffer into the byte stream. fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
///
/// This method will continuously call [`write`] until there is no more data to be written or
/// an error is returned. This method will not return until the entire buffer has been
/// successfully written or such an error occurs.
///
/// [`write`]: #tymethod.write
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::fs::File;
/// use async_std::prelude::*;
///
/// let mut file = File::create("a.txt").await?;
///
/// file.write_all(b"hello world").await?;
/// #
/// # Ok(()) }) }
/// ```
///
/// [`write`]: #tymethod.write
fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteAllFuture, io::Result<()>)
where where
Self: Unpin, Self: Unpin,
{ {
@ -139,12 +286,4 @@ pub trait Write {
} }
} }
impl<T: AsyncWrite + Unpin + ?Sized> Write for T { impl<T: Write + ?Sized> WriteExt for T {}
fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result<usize>) {
WriteFuture { writer: self, buf }
}
fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>) {
FlushFuture { writer: self }
}
}

@ -1,10 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncWrite; use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -13,7 +11,7 @@ pub struct WriteFuture<'a, T: Unpin + ?Sized> {
pub(crate) buf: &'a [u8], pub(crate) buf: &'a [u8],
} }
impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, T> { impl<T: Write + Unpin + ?Sized> Future for WriteFuture<'_, T> {
type Output = io::Result<usize>; type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -1,11 +1,9 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncWrite; use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -14,7 +12,7 @@ pub struct WriteAllFuture<'a, T: Unpin + ?Sized> {
pub(crate) buf: &'a [u8], pub(crate) buf: &'a [u8],
} }
impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, T> { impl<T: Write + Unpin + ?Sized> Future for WriteAllFuture<'_, T> {
type Output = io::Result<()>; type Output = io::Result<()>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -1,11 +1,8 @@
use crate::future::Future;
use crate::task::{Context, Poll};
use std::io;
use std::io::IoSlice;
use std::pin::Pin; use std::pin::Pin;
use futures_io::AsyncWrite; use crate::future::Future;
use crate::io::{self, IoSlice, Write};
use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
@ -14,7 +11,7 @@ pub struct WriteVectoredFuture<'a, T: Unpin + ?Sized> {
pub(crate) bufs: &'a [IoSlice<'a>], pub(crate) bufs: &'a [IoSlice<'a>],
} }
impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, T> { impl<T: Write + Unpin + ?Sized> Future for WriteVectoredFuture<'_, T> {
type Output = io::Result<usize>; type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {

@ -28,7 +28,12 @@
//! //!
//! # Features //! # Features
//! //!
//! Unstable APIs in this crate are available when the `unstable` Cargo feature is enabled: //! Items marked with
//! <span
//! class="module-item stab portability"
//! style="display: inline; border-radius: 3px; padding: 2px; font-size: 80%; line-height: 1.2;"
//! ><code>unstable</code></span>
//! are available only when the `unstable` Cargo feature is enabled:
//! //!
//! ```toml //! ```toml
//! [dependencies.async-std] //! [dependencies.async-std]
@ -58,6 +63,7 @@ cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] { if #[cfg(any(feature = "unstable", feature = "docs"))] {
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub mod pin; pub mod pin;
mod vec; mod vec;
mod result; mod result;
} }

@ -8,6 +8,7 @@ use crate::future::{self, Future};
use crate::io; use crate::io;
use crate::net::driver::Watcher; use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// A TCP socket server, listening for connections. /// A TCP socket server, listening for connections.
@ -190,7 +191,7 @@ impl TcpListener {
#[derive(Debug)] #[derive(Debug)]
pub struct Incoming<'a>(&'a TcpListener); pub struct Incoming<'a>(&'a TcpListener);
impl<'a> futures_core::stream::Stream for Incoming<'a> { impl<'a> Stream for Incoming<'a> {
type Item = io::Result<TcpStream>; type Item = io::Result<TcpStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

@ -3,10 +3,9 @@ use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::{AsyncRead, AsyncWrite};
use crate::future; use crate::future;
use crate::io; use crate::io::{self, Read, Write};
use crate::net::driver::Watcher; use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
use crate::task::blocking; use crate::task::blocking;
@ -285,7 +284,7 @@ impl TcpStream {
} }
} }
impl AsyncRead for TcpStream { impl Read for TcpStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -303,7 +302,7 @@ impl AsyncRead for TcpStream {
} }
} }
impl AsyncRead for &TcpStream { impl Read for &TcpStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -313,7 +312,7 @@ impl AsyncRead for &TcpStream {
} }
} }
impl AsyncWrite for TcpStream { impl Write for TcpStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -339,7 +338,7 @@ impl AsyncWrite for TcpStream {
} }
} }
impl AsyncWrite for &TcpStream { impl Write for &TcpStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -6,11 +6,10 @@ use std::net::Shutdown;
use std::path::Path; use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use futures_io::{AsyncRead, AsyncWrite};
use mio_uds; use mio_uds;
use super::SocketAddr; use super::SocketAddr;
use crate::io; use crate::io::{self, Read, Write};
use crate::net::driver::Watcher; use crate::net::driver::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
@ -154,7 +153,7 @@ impl UnixStream {
} }
} }
impl AsyncRead for UnixStream { impl Read for UnixStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -164,7 +163,7 @@ impl AsyncRead for UnixStream {
} }
} }
impl AsyncRead for &UnixStream { impl Read for &UnixStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -174,7 +173,7 @@ impl AsyncRead for &UnixStream {
} }
} }
impl AsyncWrite for UnixStream { impl Write for UnixStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -192,7 +191,7 @@ impl AsyncWrite for UnixStream {
} }
} }
impl AsyncWrite for &UnixStream { impl Write for &UnixStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -21,7 +21,16 @@ pub use crate::io::Read as _;
pub use crate::io::Seek as _; pub use crate::io::Seek as _;
#[doc(no_inline)] #[doc(no_inline)]
pub use crate::io::Write as _; pub use crate::io::Write as _;
#[doc(no_inline)] #[doc(hidden)]
pub use crate::stream::Stream; pub use crate::stream::Stream;
#[doc(no_inline)] #[doc(no_inline)]
pub use crate::task_local; pub use crate::task_local;
#[doc(hidden)]
pub use crate::io::buf_read::BufReadExt as _;
#[doc(hidden)]
pub use crate::io::read::ReadExt as _;
#[doc(hidden)]
pub use crate::io::seek::SeekExt as _;
#[doc(hidden)]
pub use crate::io::write::WriteExt as _;

@ -1,8 +1,9 @@
use crate::stream::{FromStream, IntoStream, Stream};
use std::pin::Pin; use std::pin::Pin;
impl<T: Send, E: Send, V> FromStream<Result<T, E>> for Result<V, E> use crate::prelude::*;
use crate::stream::{FromStream, IntoStream};
impl<T, E, V> FromStream<Result<T, E>> for Result<V, E>
where where
V: FromStream<T>, V: FromStream<T>,
{ {
@ -12,9 +13,9 @@ where
#[inline] #[inline]
fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>( fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>(
stream: S, stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>> ) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>
where where
<S as IntoStream>::IntoStream: Send + 'a, <S as IntoStream>::IntoStream: 'a,
{ {
let stream = stream.into_stream(); let stream = stream.into_stream();

@ -12,7 +12,7 @@ use std::pin::Pin;
/// [`IntoStream`]: trait.IntoStream.html /// [`IntoStream`]: trait.IntoStream.html
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))] #[cfg(any(feature = "unstable", feature = "docs"))]
pub trait FromStream<T: Send> { pub trait FromStream<T> {
/// Creates a value from a stream. /// Creates a value from a stream.
/// ///
/// # Examples /// # Examples
@ -24,7 +24,7 @@ pub trait FromStream<T: Send> {
/// ///
/// // let _five_fives = async_std::stream::repeat(5).take(5); /// // let _five_fives = async_std::stream::repeat(5).take(5);
/// ``` /// ```
fn from_stream<'a, S: IntoStream<Item = T> + Send + 'a>( fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
stream: S, stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>; ) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>;
} }

@ -20,13 +20,13 @@ pub trait IntoStream {
type Item; type Item;
/// Which kind of stream are we turning this into? /// Which kind of stream are we turning this into?
type IntoStream: Stream<Item = Self::Item> + Send; type IntoStream: Stream<Item = Self::Item>;
/// Creates a stream from a value. /// Creates a stream from a value.
fn into_stream(self) -> Self::IntoStream; fn into_stream(self) -> Self::IntoStream;
} }
impl<I: Stream + Send> IntoStream for I { impl<I: Stream> IntoStream for I {
type Item = I::Item; type Item = I::Item;
type IntoStream = I; type IntoStream = I;

@ -72,8 +72,8 @@ use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
cfg_if! { cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] { if #[cfg(feature = "unstable")] {
use crate::stream::FromStream; use crate::future::Future;
} }
} }
@ -87,7 +87,7 @@ cfg_if! {
($a:lifetime, $f:tt, $o:ty, $t1:ty) => (ImplFuture<$a, $o>); ($a:lifetime, $f:tt, $o:ty, $t1:ty) => (ImplFuture<$a, $o>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => (ImplFuture<$a, $o>); ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => (ImplFuture<$a, $o>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => (ImplFuture<$a, $o>); ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => (ImplFuture<$a, $o>);
($f:ty, $o:ty) => (ImplFuture<'static, $o>);
} }
} else { } else {
macro_rules! ret { macro_rules! ret {
@ -95,36 +95,34 @@ cfg_if! {
($a:lifetime, $f:tt, $o:ty, $t1:ty) => ($f<$a, Self, $t1>); ($a:lifetime, $f:tt, $o:ty, $t1:ty) => ($f<$a, Self, $t1>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => ($f<$a, Self, $t1, $t2>); ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => ($f<$a, Self, $t1, $t2>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => ($f<$a, Self, $t1, $t2, $t3>); ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => ($f<$a, Self, $t1, $t2, $t3>);
($f:ty, $o:ty) => ($f);
} }
} }
} }
cfg_if! { cfg_if! {
if #[cfg(feature = "docs")] { if #[cfg(any(feature = "unstable", feature = "docs"))] {
#[doc(hidden)] use crate::stream::FromStream;
pub struct DynFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (DynFuture<$a, $o>);
}
} else {
#[allow(unused_macros)]
macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (Pin<Box<dyn core::future::Future<Output = $o> + Send + 'a>>)
}
} }
} }
/// An asynchronous stream of values. /// An asynchronous stream of values.
/// ///
/// This trait is an async version of [`std::iter::Iterator`]. /// This trait is a re-export of [`futures::stream::Stream`] and is an async version of
/// [`std::iter::Iterator`].
///
/// The [provided methods] do not really exist in the trait itself, but they become available when
/// the prelude is imported:
/// ///
/// While it is currently not possible to implement this trait directly, it gets implemented /// ```
/// automatically for all types that implement [`futures::stream::Stream`]. /// # #[allow(unused_imports)]
/// use async_std::prelude::*;
/// ```
/// ///
/// [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html /// [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html
/// [`futures::stream::Stream`]: /// [`futures::stream::Stream`]:
/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.Stream.html /// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.Stream.html
/// [provided methods]: #provided-methods
pub trait Stream { pub trait Stream {
/// The type of items yielded by this stream. /// The type of items yielded by this stream.
type Item; type Item;
@ -488,7 +486,7 @@ pub trait Stream {
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```
fn min_by<F>(self, compare: F) -> MinByFuture<Self, F, Self::Item> fn min_by<F>(self, compare: F) -> ret!(MinByFuture<Self, F, Self::Item>, Self::Item)
where where
Self: Sized, Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering, F: FnMut(&Self::Item, &Self::Item) -> Ordering,
@ -698,7 +696,7 @@ pub trait Stream {
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, F, Self::Item, B> fn fold<B, F>(self, init: B, f: F) -> ret!(FoldFuture<Self, F, Self::Item, B>, Self::Item)
where where
Self: Sized, Self: Sized,
F: FnMut(B, Self::Item) -> B, F: FnMut(B, Self::Item) -> B,
@ -953,13 +951,12 @@ pub trait Stream {
/// ``` /// ```
/// ///
/// [`stream`]: trait.Stream.html#tymethod.next /// [`stream`]: trait.Stream.html#tymethod.next
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))] #[cfg(any(feature = "unstable", feature = "docs"))]
fn collect<'a, B>(self) -> dyn_ret!('a, B) #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(self) -> ret!(Pin<Box<dyn Future<Output = B> + 'a>>, B)
where where
Self: futures_core::stream::Stream + Sized + Send + 'a, Self: futures_core::stream::Stream + Sized + 'a,
<Self as futures_core::stream::Stream>::Item: Send,
B: FromStream<<Self as futures_core::stream::Stream>::Item>, B: FromStream<<Self as futures_core::stream::Stream>::Item>,
{ {
FromStream::from_stream(self) FromStream::from_stream(self)

@ -11,7 +11,7 @@ pub struct Zip<A: Stream, B> {
second: B, second: B,
} }
impl<A: fmt::Debug + Stream, B: fmt::Debug> fmt::Debug for Zip<A, B> { impl<A: Stream + fmt::Debug, B: fmt::Debug> fmt::Debug for Zip<A, B> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Zip") fmt.debug_struct("Zip")
.field("first", &self.first) .field("first", &self.first)
@ -20,7 +20,7 @@ impl<A: fmt::Debug + Stream, B: fmt::Debug> fmt::Debug for Zip<A, B> {
} }
} }
impl<A: Unpin + Stream, B: Unpin> Unpin for Zip<A, B> {} impl<A: Stream + Unpin, B: Unpin> Unpin for Zip<A, B> {}
impl<A: Stream, B> Zip<A, B> { impl<A: Stream, B> Zip<A, B> {
pub(crate) fn new(first: A, second: B) -> Self { pub(crate) fn new(first: A, second: B) -> Self {

@ -17,6 +17,7 @@
//! let handle = task::spawn(async { //! let handle = task::spawn(async {
//! 1 + 2 //! 1 + 2
//! }); //! });
//! assert_eq!(handle.await, 3);
//! # //! #
//! # }) } //! # }) }
//! ``` //! ```

@ -1,14 +1,15 @@
use crate::stream::{FromStream, IntoStream, Stream};
use std::pin::Pin; use std::pin::Pin;
impl<T: Send> FromStream<T> for Vec<T> { use crate::prelude::*;
use crate::stream::{FromStream, IntoStream};
impl<T> FromStream<T> for Vec<T> {
#[inline] #[inline]
fn from_stream<'a, S: IntoStream<Item = T>>( fn from_stream<'a, S: IntoStream<Item = T>>(
stream: S, stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>> ) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>
where where
<S as IntoStream>::IntoStream: Send + 'a, <S as IntoStream>::IntoStream: 'a,
{ {
let stream = stream.into_stream(); let stream = stream.into_stream();

Loading…
Cancel
Save