Revert "Stabilize most stream method and remove unnecessary macros"

mio-0-7
nasa 5 years ago committed by GitHub
parent 61f9483cc5
commit cc19592f80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -26,6 +26,7 @@ default = [
"async-task", "async-task",
"crossbeam-channel", "crossbeam-channel",
"crossbeam-deque", "crossbeam-deque",
"futures-timer",
"kv-log-macro", "kv-log-macro",
"log", "log",
"mio", "mio",
@ -34,14 +35,13 @@ default = [
"pin-project-lite", "pin-project-lite",
] ]
docs = ["attributes", "unstable", "default"] docs = ["attributes", "unstable", "default"]
unstable = ["std", "broadcaster"] unstable = ["std", "broadcaster", "futures-timer"]
attributes = ["async-attributes"] attributes = ["async-attributes"]
std = [ std = [
"alloc", "alloc",
"crossbeam-utils", "crossbeam-utils",
"futures-core/std", "futures-core/std",
"futures-io", "futures-io",
"futures-timer",
"memchr", "memchr",
"once_cell", "once_cell",
"pin-utils", "pin-utils",

@ -1,12 +1,11 @@
use futures::select; use futures::select;
use futures::FutureExt; use futures::FutureExt;
use std::io::{self, BufRead, BufReader as StdBufReader};
use async_std::{ use async_std::{
io::BufReader, io::{stdin, BufReader},
net::{TcpStream, ToSocketAddrs}, net::{TcpStream, ToSocketAddrs},
prelude::*, prelude::*,
stream, task, task,
}; };
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
@ -21,9 +20,8 @@ async fn try_main(addr: impl ToSocketAddrs) -> Result<()> {
let reader = BufReader::new(reader); let reader = BufReader::new(reader);
let mut lines_from_server = futures::StreamExt::fuse(reader.lines()); let mut lines_from_server = futures::StreamExt::fuse(reader.lines());
let stdin = StdBufReader::new(io::stdin()); let stdin = BufReader::new(stdin());
let mut lines_from_stdin = stream::from_iter(stdin.lines()); let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines());
loop { loop {
select! { select! {
line = lines_from_server.next().fuse() => match line { line = lines_from_server.next().fuse() => match line {

@ -1,7 +1,6 @@
//! Prints a file given as an argument to stdout. //! Prints a file given as an argument to stdout.
use std::env::args; use std::env::args;
use std::io::Write;
use async_std::fs::File; use async_std::fs::File;
use async_std::io; use async_std::io;
@ -15,7 +14,7 @@ fn main() -> io::Result<()> {
task::block_on(async { task::block_on(async {
let mut file = File::open(&path).await?; let mut file = File::open(&path).await?;
let mut stdout = std::io::stdout(); let mut stdout = io::stdout();
let mut buf = vec![0u8; LEN]; let mut buf = vec![0u8; LEN];
loop { loop {
@ -24,12 +23,12 @@ fn main() -> io::Result<()> {
// If this is the end of file, clean up and return. // If this is the end of file, clean up and return.
if n == 0 { if n == 0 {
stdout.flush()?; stdout.flush().await?;
return Ok(()); return Ok(());
} }
// Write the buffer into stdout. // Write the buffer into stdout.
stdout.write_all(&buf[..n])?; stdout.write_all(&buf[..n]).await?;
} }
}) })
} }

@ -1,18 +1,18 @@
//! Echoes lines read on stdin to stdout. //! Echoes lines read on stdin to stdout.
use async_std::io; use async_std::io;
use async_std::prelude::*;
use async_std::task; use async_std::task;
use std::io::Write;
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
task::block_on(async { task::block_on(async {
let stdin = std::io::stdin(); let stdin = io::stdin();
let mut stdout = std::io::stdout(); let mut stdout = io::stdout();
let mut line = String::new(); let mut line = String::new();
loop { loop {
// Read a line from stdin. // Read a line from stdin.
let n = stdin.read_line(&mut line)?; let n = stdin.read_line(&mut line).await?;
// If this is the end of stdin, return. // If this is the end of stdin, return.
if n == 0 { if n == 0 {
@ -20,8 +20,8 @@ fn main() -> io::Result<()> {
} }
// Write the line to stdout. // Write the line to stdout.
stdout.write_all(line.as_bytes())?; stdout.write_all(line.as_bytes()).await?;
stdout.flush()?; stdout.flush().await?;
line.clear(); line.clear();
} }
}) })

@ -8,11 +8,11 @@ use async_std::task;
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
// This async scope times out after 5 seconds. // This async scope times out after 5 seconds.
task::block_on(io::timeout(Duration::from_secs(5), async { task::block_on(io::timeout(Duration::from_secs(5), async {
let stdin = std::io::stdin(); let stdin = io::stdin();
// Read a line from the standard input and display it. // Read a line from the standard input and display it.
let mut line = String::new(); let mut line = String::new();
stdin.read_line(&mut line)?; stdin.read_line(&mut line).await?;
dbg!(line); dbg!(line);
Ok(()) Ok(())

@ -5,10 +5,9 @@ use std::future::Future;
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// use std::pin::Pin;
///
/// use async_std::future::{Future, IntoFuture}; /// use async_std::future::{Future, IntoFuture};
/// use async_std::io; /// use async_std::io;
/// use async_std::pin::Pin;
/// ///
/// struct Client; /// struct Client;
/// ///

@ -63,14 +63,12 @@ cfg_std! {
cfg_default! { cfg_default! {
pub use timeout::{timeout, TimeoutError}; pub use timeout::{timeout, TimeoutError};
mod timeout; mod timeout;
} }
cfg_unstable! { cfg_unstable! {
pub use into_future::IntoFuture; pub use into_future::IntoFuture;
pub(crate) use maybe_done::MaybeDone; pub(crate) use maybe_done::MaybeDone;
mod into_future; mod into_future;
mod maybe_done; mod maybe_done;
} }

@ -32,14 +32,13 @@ use crate::utils::Context as _;
/// ///
/// # Examples /// # Examples
/// ///
/// ```no_run /// ```
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// # /// #
/// use async_std::io; /// use async_std::io;
/// use async_std::fs::File;
/// ///
/// let mut reader: &[u8] = b"hello"; /// let mut reader: &[u8] = b"hello";
/// let mut writer = File::open("foo.txt").await?; /// let mut writer = io::stdout();
/// ///
/// io::copy(&mut reader, &mut writer).await?; /// io::copy(&mut reader, &mut writer).await?;
/// # /// #
@ -120,14 +119,13 @@ where
/// ///
/// # Examples /// # Examples
/// ///
/// ```no_run /// ```
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// # /// #
/// use async_std::io; /// use async_std::io;
/// use async_std::fs::File;
/// ///
/// let mut reader: &[u8] = b"hello"; /// let mut reader: &[u8] = b"hello";
/// let mut writer = File::open("foo.txt").await?; /// let mut writer = io::stdout();
/// ///
/// io::copy(&mut reader, &mut writer).await?; /// io::copy(&mut reader, &mut writer).await?;
/// # /// #

@ -122,6 +122,56 @@
//! # Ok(()) }) } //! # Ok(()) }) }
//! ``` //! ```
//! //!
//! ## Standard input and output
//!
//! A very common source of input is standard input:
//!
//! ```no_run
//! use async_std::io;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
//! let mut input = String::new();
//!
//! io::stdin().read_line(&mut input).await?;
//!
//! println!("You typed: {}", input.trim());
//! #
//! # Ok(()) }) }
//! ```
//!
//! Note that you cannot use the [`?` operator] in functions that do not return
//! a [`Result<T, E>`][`Result`]. Instead, you can call [`.unwrap()`]
//! or `match` on the return value to catch any possible errors:
//!
//! ```no_run
//! use async_std::io;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
//! let mut input = String::new();
//!
//! io::stdin().read_line(&mut input).await.unwrap();
//! #
//! # Ok(()) }) }
//! ```
//!
//! And a very common source of output is standard output:
//!
//! ```no_run
//! use async_std::io;
//! use async_std::io::prelude::*;
//!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
//! io::stdout().write(&[42]).await?;
//! #
//! # Ok(()) }) }
//! ```
//!
//! Of course, using [`io::stdout`] directly is less common than something like
//! [`println!`].
//!
//! ## Iterator types //! ## Iterator types
//! //!
//! A large number of the structures provided by `std::io` are for various //! A large number of the structures provided by `std::io` are for various
@ -154,14 +204,10 @@
//! //!
//! ```no_run //! ```no_run
//! use async_std::io; //! use async_std::io;
//! use async_std::fs::File;
//! //!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! # //! #
//! let mut reader: &[u8] = b"hello"; //! io::copy(&mut io::stdin(), &mut io::stdout()).await?;
//! let mut writer = File::open("foo.txt").await?;
//!
//! io::copy(&mut reader, &mut writer).await?;
//! # //! #
//! # Ok(()) }) } //! # Ok(()) }) }
//! ``` //! ```
@ -178,14 +224,13 @@
//! ``` //! ```
//! #![allow(dead_code)] //! #![allow(dead_code)]
//! use async_std::io; //! use async_std::io;
//! use std::time::Duration;
//! //!
//! async fn read_input() -> io::Result<()> { //! async fn read_input() -> io::Result<()> {
//! let f = io::timeout(Duration::from_secs(5), async { //! let mut input = String::new();
//! Ok(()) //!
//! }); //! io::stdin().read_line(&mut input).await?;
//! //!
//! assert_eq!(f.await?, ()); //! println!("You typed: {}", input.trim());
//! //!
//! Ok(()) //! Ok(())
//! } //! }
@ -215,6 +260,8 @@
//! [`BufReader`]: struct.BufReader.html //! [`BufReader`]: struct.BufReader.html
//! [`BufWriter`]: struct.BufWriter.html //! [`BufWriter`]: struct.BufWriter.html
//! [`Write::write`]: trait.Write.html#tymethod.write //! [`Write::write`]: trait.Write.html#tymethod.write
//! [`io::stdout`]: fn.stdout.html
//! [`println!`]: ../macro.println.html
//! [`Lines`]: struct.Lines.html //! [`Lines`]: struct.Lines.html
//! [`io::Result`]: type.Result.html //! [`io::Result`]: type.Result.html
//! [`?` operator]: https://doc.rust-lang.org/stable/book/appendix-02-operators.html //! [`?` operator]: https://doc.rust-lang.org/stable/book/appendix-02-operators.html
@ -258,7 +305,24 @@ cfg_std! {
} }
cfg_default! { cfg_default! {
// For use in the print macros.
#[doc(hidden)]
pub use stdio::{_eprint, _print};
pub use stderr::{stderr, Stderr};
pub use stdin::{stdin, Stdin};
pub use stdout::{stdout, Stdout};
pub use timeout::timeout; pub use timeout::timeout;
mod timeout; mod timeout;
mod stderr;
mod stdin;
mod stdio;
mod stdout;
}
cfg_unstable_default! {
pub use stderr::StderrLock;
pub use stdin::StdinLock;
pub use stdout::StdoutLock;
} }

@ -0,0 +1,261 @@
use std::pin::Pin;
use std::sync::Mutex;
use std::future::Future;
use crate::io::{self, Write};
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
cfg_unstable! {
use once_cell::sync::Lazy;
use std::io::Write as _;
}
/// Constructs a new handle to the standard error of the current process.
///
/// This function is an async version of [`std::io::stderr`].
///
/// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html
///
/// ### Note: Windows Portability Consideration
///
/// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io;
/// use async_std::prelude::*;
///
/// let mut stderr = io::stderr();
/// stderr.write_all(b"Hello, world!").await?;
/// #
/// # Ok(()) }) }
/// ```
pub fn stderr() -> Stderr {
Stderr(Mutex::new(State::Idle(Some(Inner {
stderr: std::io::stderr(),
buf: Vec::new(),
last_op: None,
}))))
}
/// A handle to the standard error of the current process.
///
/// This writer is created by the [`stderr`] function. See its documentation for
/// more.
///
/// ### Note: Windows Portability Consideration
///
/// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
///
/// [`stderr`]: fn.stderr.html
#[derive(Debug)]
pub struct Stderr(Mutex<State>);
/// A locked reference to the Stderr handle.
///
/// This handle implements the [`Write`] traits, and is constructed via the [`Stderr::lock`]
/// method.
///
/// [`Write`]: trait.Read.html
/// [`Stderr::lock`]: struct.Stderr.html#method.lock
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct StderrLock<'a>(std::io::StderrLock<'a>);
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
unsafe impl Send for StderrLock<'_> {}
/// The state of the asynchronous stderr.
///
/// The stderr can be either idle or busy performing an asynchronous operation.
#[derive(Debug)]
enum State {
/// The stderr is idle.
Idle(Option<Inner>),
/// The stderr is blocked on an asynchronous operation.
///
/// Awaiting this operation will result in the new state of the stderr.
Busy(JoinHandle<State>),
}
/// Inner representation of the asynchronous stderr.
#[derive(Debug)]
struct Inner {
/// The blocking stderr handle.
stderr: std::io::Stderr,
/// The write buffer.
buf: Vec<u8>,
/// The result of the last asynchronous operation on the stderr.
last_op: Option<Operation>,
}
/// Possible results of an asynchronous operation on the stderr.
#[derive(Debug)]
enum Operation {
Write(io::Result<usize>),
Flush(io::Result<()>),
}
impl Stderr {
/// Locks this handle to the standard error stream, returning a writable guard.
///
/// The lock is released when the returned lock goes out of scope. The returned guard also implements the Write trait for writing data.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io;
/// use async_std::prelude::*;
///
/// let stderr = io::stderr();
/// let mut handle = stderr.lock().await;
///
/// handle.write_all(b"hello world").await?;
/// #
/// # Ok(()) }) }
/// ```
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub async fn lock(&self) -> StderrLock<'static> {
static STDERR: Lazy<std::io::Stderr> = Lazy::new(std::io::stderr);
spawn_blocking(move || StderrLock(STDERR.lock())).await
}
}
impl Write for Stderr {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => {
let inner = opt.as_mut().unwrap();
// Check if the operation has completed.
if let Some(Operation::Write(res)) = inner.last_op.take() {
let n = res?;
// If more data was written than is available in the buffer, let's retry
// the write operation.
if n <= buf.len() {
return Poll::Ready(Ok(n));
}
} else {
let mut inner = opt.take().unwrap();
// Set the length of the inner buffer to the length of the provided buffer.
if inner.buf.len() < buf.len() {
inner.buf.reserve(buf.len() - inner.buf.len());
}
unsafe {
inner.buf.set_len(buf.len());
}
// Copy the data to write into the inner buffer.
inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously.
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::write(&mut inner.stderr, &inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
}));
}
}
// Poll the asynchronous operation the stderr is currently blocked on.
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => {
let inner = opt.as_mut().unwrap();
// Check if the operation has completed.
if let Some(Operation::Flush(res)) = inner.last_op.take() {
return Poll::Ready(res);
} else {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::flush(&mut inner.stderr);
inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner))
}));
}
}
// Poll the asynchronous operation the stderr is currently blocked on.
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
}
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}
cfg_unix! {
use crate::os::unix::io::{AsRawFd, RawFd};
impl AsRawFd for Stderr {
fn as_raw_fd(&self) -> RawFd {
std::io::stderr().as_raw_fd()
}
}
}
cfg_windows! {
use crate::os::windows::io::{AsRawHandle, RawHandle};
impl AsRawHandle for Stderr {
fn as_raw_handle(&self) -> RawHandle {
std::io::stderr().as_raw_handle()
}
}
}
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
impl io::Write for StderrLock<'_> {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(self.0.write(buf))
}
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(self.0.flush())
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}

@ -0,0 +1,279 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Mutex;
use crate::future;
use crate::io::{self, Read};
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
use crate::utils::Context as _;
cfg_unstable! {
use once_cell::sync::Lazy;
use std::io::Read as _;
}
/// Constructs a new handle to the standard input of the current process.
///
/// This function is an async version of [`std::io::stdin`].
///
/// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html
///
/// ### Note: Windows Portability Consideration
///
/// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io;
///
/// let stdin = io::stdin();
/// let mut line = String::new();
/// stdin.read_line(&mut line).await?;
/// #
/// # Ok(()) }) }
/// ```
pub fn stdin() -> Stdin {
Stdin(Mutex::new(State::Idle(Some(Inner {
stdin: std::io::stdin(),
line: String::new(),
buf: Vec::new(),
last_op: None,
}))))
}
/// A handle to the standard input of the current process.
///
/// This reader is created by the [`stdin`] function. See its documentation for
/// more.
///
/// ### Note: Windows Portability Consideration
///
/// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
///
/// [`stdin`]: fn.stdin.html
#[derive(Debug)]
pub struct Stdin(Mutex<State>);
/// A locked reference to the Stdin handle.
///
/// This handle implements the [`Read`] traits, and is constructed via the [`Stdin::lock`] method.
///
/// [`Read`]: trait.Read.html
/// [`Stdin::lock`]: struct.Stdin.html#method.lock
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(feature = "unstable")]
#[derive(Debug)]
pub struct StdinLock<'a>(std::io::StdinLock<'a>);
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
unsafe impl Send for StdinLock<'_> {}
/// The state of the asynchronous stdin.
///
/// The stdin can be either idle or busy performing an asynchronous operation.
#[derive(Debug)]
enum State {
/// The stdin is idle.
Idle(Option<Inner>),
/// The stdin is blocked on an asynchronous operation.
///
/// Awaiting this operation will result in the new state of the stdin.
Busy(JoinHandle<State>),
}
/// Inner representation of the asynchronous stdin.
#[derive(Debug)]
struct Inner {
/// The blocking stdin handle.
stdin: std::io::Stdin,
/// The line buffer.
line: String,
/// The write buffer.
buf: Vec<u8>,
/// The result of the last asynchronous operation on the stdin.
last_op: Option<Operation>,
}
/// Possible results of an asynchronous operation on the stdin.
#[derive(Debug)]
enum Operation {
ReadLine(io::Result<usize>),
Read(io::Result<usize>),
}
impl Stdin {
/// Reads a line of input into the specified buffer.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io;
///
/// let stdin = io::stdin();
/// let mut line = String::new();
/// stdin.read_line(&mut line).await?;
/// #
/// # Ok(()) }) }
/// ```
pub async fn read_line(&self, buf: &mut String) -> io::Result<usize> {
future::poll_fn(|cx| {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => {
let inner = opt.as_mut().unwrap();
// Check if the operation has completed.
if let Some(Operation::ReadLine(res)) = inner.last_op.take() {
let n = res?;
// Copy the read data into the buffer and return.
buf.push_str(&inner.line);
return Poll::Ready(Ok(n));
} else {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(spawn_blocking(move || {
inner.line.clear();
let res = inner.stdin.read_line(&mut inner.line);
inner.last_op = Some(Operation::ReadLine(res));
State::Idle(Some(inner))
}));
}
}
// Poll the asynchronous operation the stdin is currently blocked on.
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
}
}
})
.await
.context(|| String::from("could not read line on stdin"))
}
/// Locks this handle to the standard input stream, returning a readable guard.
///
/// The lock is released when the returned lock goes out of scope. The returned guard also implements the Read trait for accessing the underlying data.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io;
/// use async_std::prelude::*;
///
/// let mut buffer = String::new();
///
/// let stdin = io::stdin();
/// let mut handle = stdin.lock().await;
///
/// handle.read_to_string(&mut buffer).await?;
/// #
/// # Ok(()) }) }
/// ```
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub async fn lock(&self) -> StdinLock<'static> {
static STDIN: Lazy<std::io::Stdin> = Lazy::new(std::io::stdin);
spawn_blocking(move || StdinLock(STDIN.lock())).await
}
}
impl Read for Stdin {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => {
let inner = opt.as_mut().unwrap();
// Check if the operation has completed.
if let Some(Operation::Read(res)) = inner.last_op.take() {
let n = res?;
// If more data was read than fits into the buffer, let's retry the read
// operation.
if n <= buf.len() {
// Copy the read data into the buffer and return.
buf[..n].copy_from_slice(&inner.buf[..n]);
return Poll::Ready(Ok(n));
}
} else {
let mut inner = opt.take().unwrap();
// Set the length of the inner buffer to the length of the provided buffer.
if inner.buf.len() < buf.len() {
inner.buf.reserve(buf.len() - inner.buf.len());
}
unsafe {
inner.buf.set_len(buf.len());
}
// Start the operation asynchronously.
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf);
inner.last_op = Some(Operation::Read(res));
State::Idle(Some(inner))
}));
}
}
// Poll the asynchronous operation the stdin is currently blocked on.
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
}
}
}
}
cfg_unix! {
use crate::os::unix::io::{AsRawFd, RawFd};
impl AsRawFd for Stdin {
fn as_raw_fd(&self) -> RawFd {
std::io::stdin().as_raw_fd()
}
}
}
cfg_windows! {
use crate::os::windows::io::{AsRawHandle, RawHandle};
impl AsRawHandle for Stdin {
fn as_raw_handle(&self) -> RawHandle {
std::io::stdin().as_raw_handle()
}
}
}
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
impl Read for StdinLock<'_> {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(self.0.read(buf))
}
}

@ -0,0 +1,21 @@
//! Internal types for stdio.
//!
//! This module is a port of `libstd/io/stdio.rs`,and contains internal types for `print`/`eprint`.
use crate::io::{stderr, stdout};
use crate::prelude::*;
use std::fmt;
#[doc(hidden)]
pub async fn _print(args: fmt::Arguments<'_>) {
if let Err(e) = stdout().write_fmt(args).await {
panic!("failed printing to stdout: {}", e);
}
}
#[doc(hidden)]
pub async fn _eprint(args: fmt::Arguments<'_>) {
if let Err(e) = stderr().write_fmt(args).await {
panic!("failed printing to stderr: {}", e);
}
}

@ -0,0 +1,261 @@
use std::pin::Pin;
use std::sync::Mutex;
use std::future::Future;
use crate::io::{self, Write};
use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
cfg_unstable! {
use once_cell::sync::Lazy;
use std::io::Write as _;
}
/// Constructs a new handle to the standard output of the current process.
///
/// This function is an async version of [`std::io::stdout`].
///
/// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html
///
/// ### Note: Windows Portability Consideration
///
/// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io;
/// use async_std::prelude::*;
///
/// let mut stdout = io::stdout();
/// stdout.write_all(b"Hello, world!").await?;
/// #
/// # Ok(()) }) }
/// ```
pub fn stdout() -> Stdout {
Stdout(Mutex::new(State::Idle(Some(Inner {
stdout: std::io::stdout(),
buf: Vec::new(),
last_op: None,
}))))
}
/// A handle to the standard output of the current process.
///
/// This writer is created by the [`stdout`] function. See its documentation
/// for more.
///
/// ### Note: Windows Portability Consideration
///
/// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
///
/// [`stdout`]: fn.stdout.html
#[derive(Debug)]
pub struct Stdout(Mutex<State>);
/// A locked reference to the Stderr handle.
///
/// This handle implements the [`Write`] traits, and is constructed via the [`Stdout::lock`]
/// method.
///
/// [`Write`]: trait.Read.html
/// [`Stdout::lock`]: struct.Stdout.html#method.lock
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct StdoutLock<'a>(std::io::StdoutLock<'a>);
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
unsafe impl Send for StdoutLock<'_> {}
/// The state of the asynchronous stdout.
///
/// The stdout can be either idle or busy performing an asynchronous operation.
#[derive(Debug)]
enum State {
/// The stdout is idle.
Idle(Option<Inner>),
/// The stdout is blocked on an asynchronous operation.
///
/// Awaiting this operation will result in the new state of the stdout.
Busy(JoinHandle<State>),
}
/// Inner representation of the asynchronous stdout.
#[derive(Debug)]
struct Inner {
/// The blocking stdout handle.
stdout: std::io::Stdout,
/// The write buffer.
buf: Vec<u8>,
/// The result of the last asynchronous operation on the stdout.
last_op: Option<Operation>,
}
/// Possible results of an asynchronous operation on the stdout.
#[derive(Debug)]
enum Operation {
Write(io::Result<usize>),
Flush(io::Result<()>),
}
impl Stdout {
/// Locks this handle to the standard error stream, returning a writable guard.
///
/// The lock is released when the returned lock goes out of scope. The returned guard also implements the Write trait for writing data.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io;
/// use async_std::prelude::*;
///
/// let stdout = io::stdout();
/// let mut handle = stdout.lock().await;
///
/// handle.write_all(b"hello world").await?;
/// #
/// # Ok(()) }) }
/// ```
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub async fn lock(&self) -> StdoutLock<'static> {
static STDOUT: Lazy<std::io::Stdout> = Lazy::new(std::io::stdout);
spawn_blocking(move || StdoutLock(STDOUT.lock())).await
}
}
impl Write for Stdout {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => {
let inner = opt.as_mut().unwrap();
// Check if the operation has completed.
if let Some(Operation::Write(res)) = inner.last_op.take() {
let n = res?;
// If more data was written than is available in the buffer, let's retry
// the write operation.
if n <= buf.len() {
return Poll::Ready(Ok(n));
}
} else {
let mut inner = opt.take().unwrap();
// Set the length of the inner buffer to the length of the provided buffer.
if inner.buf.len() < buf.len() {
inner.buf.reserve(buf.len() - inner.buf.len());
}
unsafe {
inner.buf.set_len(buf.len());
}
// Copy the data to write into the inner buffer.
inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously.
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::write(&mut inner.stdout, &inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
}));
}
}
// Poll the asynchronous operation the stdout is currently blocked on.
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => {
let inner = opt.as_mut().unwrap();
// Check if the operation has completed.
if let Some(Operation::Flush(res)) = inner.last_op.take() {
return Poll::Ready(res);
} else {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(spawn_blocking(move || {
let res = std::io::Write::flush(&mut inner.stdout);
inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner))
}));
}
}
// Poll the asynchronous operation the stdout is currently blocked on.
State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
}
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}
cfg_unix! {
use crate::os::unix::io::{AsRawFd, RawFd};
impl AsRawFd for Stdout {
fn as_raw_fd(&self) -> RawFd {
std::io::stdout().as_raw_fd()
}
}
}
cfg_windows! {
use crate::os::windows::io::{AsRawHandle, RawHandle};
impl AsRawHandle for Stdout {
fn as_raw_handle(&self) -> RawHandle {
std::io::stdout().as_raw_handle()
}
}
}
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
impl Write for StdoutLock<'_> {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(self.0.write(buf))
}
fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(self.0.flush())
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.poll_flush(cx)
}
}

@ -23,9 +23,9 @@ use crate::io;
/// use async_std::io; /// use async_std::io;
/// ///
/// io::timeout(Duration::from_secs(5), async { /// io::timeout(Duration::from_secs(5), async {
/// let stdin = std::io::stdin(); /// let stdin = io::stdin();
/// let mut line = String::new(); /// let mut line = String::new();
/// let n = stdin.read_line(&mut line)?; /// let n = stdin.read_line(&mut line).await?;
/// Ok(()) /// Ok(())
/// }) /// })
/// .await?; /// .await?;

@ -273,6 +273,9 @@ cfg_default! {
} }
cfg_unstable! { cfg_unstable! {
pub mod pin;
pub mod process;
mod unit; mod unit;
mod vec; mod vec;
mod result; mod result;

@ -1,3 +1,171 @@
/// Prints to the standard output.
///
/// Equivalent to the [`println!`] macro except that a newline is not printed at
/// the end of the message.
///
/// Note that stdout is frequently line-buffered by default so it may be
/// necessary to use [`io::stdout().flush()`][flush] to ensure the output is emitted
/// immediately.
///
/// Use `print!` only for the primary output of your program. Use
/// [`eprint!`] instead to print error and progress messages.
///
/// [`println!`]: macro.println.html
/// [flush]: io/trait.Write.html#tymethod.flush
/// [`eprint!`]: macro.eprint.html
///
/// # Panics
///
/// Panics if writing to `io::stdout()` fails.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::io;
/// use async_std::prelude::*;
/// use async_std::print;
///
/// print!("this ").await;
/// print!("will ").await;
/// print!("be ").await;
/// print!("on ").await;
/// print!("the ").await;
/// print!("same ").await;
/// print!("line ").await;
///
/// io::stdout().flush().await.unwrap();
///
/// print!("this string has a newline, why not choose println! instead?\n").await;
///
/// io::stdout().flush().await.unwrap();
/// #
/// # })
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[macro_export]
macro_rules! print {
($($arg:tt)*) => ($crate::io::_print(format_args!($($arg)*)))
}
/// Prints to the standard output, with a newline.
///
/// On all platforms, the newline is the LINE FEED character (`\n`/`U+000A`) alone
/// (no additional CARRIAGE RETURN (`\r`/`U+000D`)).
///
/// Use the [`format!`] syntax to write data to the standard output.
/// See [`std::fmt`] for more information.
///
/// Use `println!` only for the primary output of your program. Use
/// [`eprintln!`] instead to print error and progress messages.
///
/// [`format!`]: macro.format.html
/// [`std::fmt`]: https://doc.rust-lang.org/std/fmt/index.html
/// [`eprintln!`]: macro.eprintln.html
/// # Panics
///
/// Panics if writing to `io::stdout` fails.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::println;
///
/// println!().await; // prints just a newline
/// println!("hello there!").await;
/// println!("format {} arguments", "some").await;
/// #
/// # })
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[macro_export]
macro_rules! println {
() => ($crate::print!("\n"));
($($arg:tt)*) => (async {
$crate::io::_print(format_args!($($arg)*)).await;
$crate::io::_print(format_args!("\n")).await;
})
}
/// Prints to the standard error.
///
/// Equivalent to the [`print!`] macro, except that output goes to
/// [`io::stderr`] instead of `io::stdout`. See [`print!`] for
/// example usage.
///
/// Use `eprint!` only for error and progress messages. Use `print!`
/// instead for the primary output of your program.
///
/// [`io::stderr`]: io/struct.Stderr.html
/// [`print!`]: macro.print.html
///
/// # Panics
///
/// Panics if writing to `io::stderr` fails.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::eprint;
///
/// eprint!("Error: Could not complete task").await;
/// #
/// # })
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[macro_export]
macro_rules! eprint {
($($arg:tt)*) => ($crate::io::_eprint(format_args!($($arg)*)))
}
/// Prints to the standard error, with a newline.
///
/// Equivalent to the [`println!`] macro, except that output goes to
/// [`io::stderr`] instead of `io::stdout`. See [`println!`] for
/// example usage.
///
/// Use `eprintln!` only for error and progress messages. Use `println!`
/// instead for the primary output of your program.
///
/// [`io::stderr`]: io/struct.Stderr.html
/// [`println!`]: macro.println.html
///
/// # Panics
///
/// Panics if writing to `io::stderr` fails.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use async_std::eprintln;
///
/// eprintln!("Error: Could not complete task").await;
/// #
/// # })
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[macro_export]
macro_rules! eprintln {
() => (async { $crate::eprint!("\n").await; });
($($arg:tt)*) => (
async {
$crate::io::_eprint(format_args!($($arg)*)).await;
$crate::io::_eprint(format_args!("\n")).await;
}
);
}
/// Declares task-local values. /// Declares task-local values.
/// ///
/// The macro wraps any number of static declarations and makes them task-local. Attributes and /// The macro wraps any number of static declarations and makes them task-local. Attributes and

@ -38,14 +38,16 @@ cfg_std! {
pub use crate::io::prelude::SeekExt as _; pub use crate::io::prelude::SeekExt as _;
#[doc(no_inline)] #[doc(no_inline)]
pub use crate::io::prelude::WriteExt as _; pub use crate::io::prelude::WriteExt as _;
#[doc(no_inline)]
pub use crate::stream::DoubleEndedStream;
#[doc(no_inline)]
pub use crate::stream::ExactSizeStream;
} }
cfg_default! { cfg_default! {
#[doc(no_inline)] #[doc(no_inline)]
pub use crate::task_local; pub use crate::task_local;
} }
cfg_unstable! {
#[doc(no_inline)]
pub use crate::stream::DoubleEndedStream;
#[doc(no_inline)]
pub use crate::stream::ExactSizeStream;
}

@ -1,7 +1,7 @@
use crate::stream::Stream; use crate::stream::Stream;
use core::pin::Pin; use std::pin::Pin;
use core::task::{Context, Poll}; use std::task::{Context, Poll};
mod next_back; mod next_back;
mod nth_back; mod nth_back;
@ -22,6 +22,8 @@ use try_rfold::TryRFoldFuture;
/// `Item`s from the back, as well as the front. /// `Item`s from the back, as well as the front.
/// ///
/// [`Stream`]: trait.Stream.html /// [`Stream`]: trait.Stream.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait DoubleEndedStream: Stream { pub trait DoubleEndedStream: Stream {
#[doc = r#" #[doc = r#"
Attempts to receive the next item from the back of the stream. Attempts to receive the next item from the back of the stream.

@ -76,6 +76,8 @@ pub use crate::stream::Stream;
/// # }); /// # });
/// ``` /// ```
#[allow(clippy::len_without_is_empty)] // ExactSizeIterator::is_empty is unstable #[allow(clippy::len_without_is_empty)] // ExactSizeIterator::is_empty is unstable
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait ExactSizeStream: Stream { pub trait ExactSizeStream: Stream {
/// Returns the exact number of times the stream will iterate. /// Returns the exact number of times the stream will iterate.
/// ///

@ -27,6 +27,8 @@ use crate::stream::IntoStream;
/// # /// #
/// # }) /// # })
/// ``` /// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait Extend<A> { pub trait Extend<A> {
/// Extends a collection with the contents of a stream. /// Extends a collection with the contents of a stream.
fn extend<'a, T: IntoStream<Item = A> + 'a>( fn extend<'a, T: IntoStream<Item = A> + 'a>(

@ -14,6 +14,8 @@ use crate::stream::Stream;
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
/// [`Stream::fuse`]: trait.Stream.html#method.fuse /// [`Stream::fuse`]: trait.Stream.html#method.fuse
/// [`Fuse`]: struct.Fuse.html /// [`Fuse`]: struct.Fuse.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait FusedStream: Stream {} pub trait FusedStream: Stream {}
impl<S: FusedStream + ?Sized + Unpin> FusedStream for &mut S {} impl<S: FusedStream + ?Sized + Unpin> FusedStream for &mut S {}

@ -41,6 +41,8 @@ use futures_timer::Delay;
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub fn interval(dur: Duration) -> Interval { pub fn interval(dur: Duration) -> Interval {
Interval { Interval {
delay: Delay::new(dur), delay: Delay::new(dur),
@ -54,6 +56,8 @@ pub fn interval(dur: Duration) -> Interval {
/// documentation for more. /// documentation for more.
/// ///
/// [`interval`]: fn.interval.html /// [`interval`]: fn.interval.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)] #[derive(Debug)]
pub struct Interval { pub struct Interval {
delay: Delay, delay: Delay,

@ -317,32 +317,29 @@ mod once;
mod repeat; mod repeat;
mod repeat_with; mod repeat_with;
cfg_std! { cfg_unstable! {
pub use double_ended_stream::DoubleEndedStream;
pub use exact_size_stream::ExactSizeStream;
pub use fused_stream::FusedStream;
pub use interval::{interval, Interval};
pub use pending::{pending, Pending};
pub use product::Product;
pub use successors::{successors, Successors};
pub use sum::Sum;
mod double_ended_stream; mod double_ended_stream;
mod exact_size_stream; mod exact_size_stream;
mod extend;
mod from_stream;
mod fused_stream; mod fused_stream;
mod interval; mod interval;
mod into_stream;
mod pending; mod pending;
mod product; mod product;
mod successors; mod successors;
mod sum; mod sum;
}
cfg_unstable! {
mod from_stream;
mod into_stream;
mod extend;
pub use double_ended_stream::DoubleEndedStream;
pub use exact_size_stream::ExactSizeStream;
pub use extend::{extend, Extend}; pub use extend::{extend, Extend};
pub use from_stream::FromStream; pub use from_stream::FromStream;
pub use fused_stream::FusedStream;
pub use interval::{interval, Interval};
pub use into_stream::IntoStream; pub use into_stream::IntoStream;
pub use pending::{pending, Pending};
pub use product::Product;
pub use stream::Merge;
pub use successors::{successors, Successors};
pub use sum::Sum;
} }

@ -5,7 +5,7 @@ use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
#[cfg(feature = "std")] #[cfg(feature = "unstable")]
use crate::stream::DoubleEndedStream; use crate::stream::DoubleEndedStream;
/// Creates a stream that yields a single item. /// Creates a stream that yields a single item.
@ -50,8 +50,8 @@ impl<T> Stream for Once<T> {
} }
} }
#[cfg(feature = "std")] #[cfg(feature = "unstable")]
impl<T> DoubleEndedStream for Once<T> { impl <T> DoubleEndedStream for Once<T> {
fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.project().value.take()) Poll::Ready(self.project().value.take())
} }

@ -1,6 +1,5 @@
use alloc::boxed::Box;
use core::future::Future;
use core::pin::Pin; use core::pin::Pin;
use core::future::Future;
use crate::stream::Stream; use crate::stream::Stream;
@ -14,6 +13,8 @@ use crate::stream::Stream;
/// [`product`]: trait.Product.html#tymethod.product /// [`product`]: trait.Product.html#tymethod.product
/// [`FromStream`]: trait.FromStream.html /// [`FromStream`]: trait.FromStream.html
/// [`Stream::product`]: trait.Stream.html#method.product /// [`Stream::product`]: trait.Stream.html#method.product
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait Product<A = Self>: Sized { pub trait Product<A = Self>: Sized {
/// Method which takes a stream and generates `Self` from the elements by /// Method which takes a stream and generates `Self` from the elements by
/// multiplying the items. /// multiplying the items.
@ -22,9 +23,9 @@ pub trait Product<A = Self>: Sized {
S: Stream<Item = A> + 'a; S: Stream<Item = A> + 'a;
} }
use crate::stream::stream::StreamExt;
use core::num::Wrapping;
use core::ops::Mul; use core::ops::Mul;
use core::num::Wrapping;
use crate::stream::stream::StreamExt;
macro_rules! integer_product { macro_rules! integer_product {
(@impls $one: expr, $($a:ty)*) => ($( (@impls $one: expr, $($a:ty)*) => ($(
@ -74,5 +75,5 @@ macro_rules! float_product {
); );
} }
integer_product! { i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize } integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
float_product! { f32 f64 } float_product!{ f32 f64 }

@ -9,6 +9,8 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub struct CountFuture<S> { pub struct CountFuture<S> {
#[pin] #[pin]
stream: S, stream: S,

@ -16,6 +16,8 @@ pin_project! {
/// ///
/// [`merge`]: trait.Stream.html#method.merge /// [`merge`]: trait.Stream.html#method.merge
/// [`Stream`]: trait.Stream.html /// [`Stream`]: trait.Stream.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)] #[derive(Debug)]
pub struct Merge<L, R> { pub struct Merge<L, R> {
#[pin] #[pin]

@ -112,43 +112,35 @@ pub use zip::Zip;
use core::cmp::Ordering; use core::cmp::Ordering;
cfg_std! { cfg_unstable! {
use core::time::Duration;
use crate::stream::{Product, Sum};
use alloc::boxed::Box;
use core::future::Future; use core::future::Future;
use core::pin::Pin; use core::pin::Pin;
use core::time::Duration;
use unzip::UnzipFuture;
use count::CountFuture;
pub use throttle::Throttle;
pub use merge::Merge;
pub use delay::Delay;
pub use timeout::{Timeout, TimeoutError};
mod timeout;
mod throttle;
mod merge;
mod delay;
mod unzip;
mod count;
}
cfg_unstable! {
use crate::stream::FromStream;
use crate::stream::into_stream::IntoStream; use crate::stream::into_stream::IntoStream;
use crate::stream::{FromStream, Product, Sum};
use crate::stream::Extend; use crate::stream::Extend;
use count::CountFuture;
use partition::PartitionFuture; use partition::PartitionFuture;
use unzip::UnzipFuture;
pub use merge::Merge;
pub use flatten::Flatten; pub use flatten::Flatten;
pub use flat_map::FlatMap; pub use flat_map::FlatMap;
pub use timeout::{TimeoutError, Timeout};
pub use throttle::Throttle;
pub use delay::Delay;
mod count;
mod merge;
mod flatten; mod flatten;
mod flat_map; mod flat_map;
mod partition; mod partition;
mod timeout;
mod throttle;
mod delay;
mod unzip;
} }
extension_trait! { extension_trait! {
@ -363,7 +355,8 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
#[cfg(feature = "std")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn throttle(self, d: Duration) -> Throttle<Self> fn throttle(self, d: Duration) -> Throttle<Self>
where where
Self: Sized, Self: Sized,
@ -605,7 +598,8 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
#[cfg(feature = "std")] #[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn delay(self, dur: std::time::Duration) -> Delay<Self> fn delay(self, dur: std::time::Duration) -> Delay<Self>
where where
Self: Sized, Self: Sized,
@ -1517,6 +1511,8 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn by_ref(&mut self) -> &mut Self { fn by_ref(&mut self) -> &mut Self {
self self
} }
@ -1660,7 +1656,8 @@ extension_trait! {
# Ok(()) }) } # Ok(()) }) }
``` ```
"#] "#]
#[cfg(feature = "std")] #[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn timeout(self, dur: Duration) -> Timeout<Self> fn timeout(self, dur: Duration) -> Timeout<Self>
where where
Self: Stream + Sized, Self: Stream + Sized,
@ -1825,7 +1822,8 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
#[cfg(feature = "std")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn unzip<A, B, FromA, FromB>(self) -> impl Future<Output = (FromA, FromB)> [UnzipFuture<Self, FromA, FromB>] fn unzip<A, B, FromA, FromB>(self) -> impl Future<Output = (FromA, FromB)> [UnzipFuture<Self, FromA, FromB>]
where where
FromA: Default + Extend<A>, FromA: Default + Extend<A>,
@ -1923,7 +1921,8 @@ extension_trait! {
# }); # });
``` ```
"#] "#]
#[cfg(feature = "std")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn merge<U>(self, other: U) -> Merge<Self, U> fn merge<U>(self, other: U) -> Merge<Self, U>
where where
Self: Sized, Self: Sized,
@ -2069,7 +2068,8 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
#[cfg(feature = "std")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn count(self) -> impl Future<Output = usize> [CountFuture<Self>] fn count(self) -> impl Future<Output = usize> [CountFuture<Self>]
where where
Self: Sized, Self: Sized,
@ -2330,7 +2330,8 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
#[cfg(feature = "std")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn sum<'a, S>( fn sum<'a, S>(
self, self,
) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>] ) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>]
@ -2375,7 +2376,8 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
#[cfg(feature = "std")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn product<'a, P>( fn product<'a, P>(
self, self,
) -> impl Future<Output = P> + 'a [Pin<Box<dyn Future<Output = P> + 'a>>] ) -> impl Future<Output = P> + 'a [Pin<Box<dyn Future<Output = P> + 'a>>]

@ -47,6 +47,8 @@ impl<S: Stream> Stream for Timeout<S> {
} }
/// An error returned when a stream times out. /// An error returned when a stream times out.
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct TimeoutError { pub struct TimeoutError {
_private: (), _private: (),

@ -8,6 +8,8 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub struct UnzipFuture<S, FromA, FromB> { pub struct UnzipFuture<S, FromA, FromB> {
#[pin] #[pin]
stream: S, stream: S,

@ -27,6 +27,8 @@ use pin_project_lite::pin_project;
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub fn successors<F, T>(first: Option<T>, succ: F) -> Successors<F, T> pub fn successors<F, T>(first: Option<T>, succ: F) -> Successors<F, T>
where where
F: FnMut(&T) -> Option<T>, F: FnMut(&T) -> Option<T>,
@ -41,6 +43,8 @@ pin_project! {
/// This stream is constructed by [`successors`] function /// This stream is constructed by [`successors`] function
/// ///
/// [`successors`]: fn.succssors.html /// [`successors`]: fn.succssors.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)] #[derive(Debug)]
pub struct Successors<F, T> pub struct Successors<F, T>
where where

@ -1,4 +1,3 @@
use alloc::boxed::Box;
use core::future::Future; use core::future::Future;
use core::pin::Pin; use core::pin::Pin;
@ -14,6 +13,8 @@ use crate::stream::Stream;
/// [`sum`]: trait.Sum.html#tymethod.sum /// [`sum`]: trait.Sum.html#tymethod.sum
/// [`FromStream`]: trait.FromStream.html /// [`FromStream`]: trait.FromStream.html
/// [`Stream::sum`]: trait.Stream.html#method.sum /// [`Stream::sum`]: trait.Stream.html#method.sum
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait Sum<A = Self>: Sized { pub trait Sum<A = Self>: Sized {
/// Method which takes a stream and generates `Self` from the elements by /// Method which takes a stream and generates `Self` from the elements by
/// "summing up" the items. /// "summing up" the items.

@ -21,7 +21,7 @@ pub fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
} }
/// Generates a random number in `0..n`. /// Generates a random number in `0..n`.
#[cfg(any(feature = "std", feature = "default"))] #[cfg(any(feature = "unstable", feature = "default"))]
pub fn random(n: u32) -> u32 { pub fn random(n: u32) -> u32 {
use std::cell::Cell; use std::cell::Cell;
use std::num::Wrapping; use std::num::Wrapping;
@ -257,6 +257,11 @@ macro_rules! extension_trait {
$(#[cfg(feature = "docs")] $imp)* $(#[cfg(feature = "docs")] $imp)*
}; };
// Optimization: expand `$head` eagerly before starting a new method definition.
(@ext ($($head:tt)*) #[doc = $d:literal] $($tail:tt)*) => {
$($head)* extension_trait!(@ext (#[doc = $d]) $($tail)*);
};
// Parse the return type in an extension method. // Parse the return type in an extension method.
(@doc ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => { (@doc ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {
extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*); extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*);

@ -7,9 +7,10 @@ use async_std::task;
#[should_panic(expected = "timed out")] #[should_panic(expected = "timed out")]
fn io_timeout_timedout() { fn io_timeout_timedout() {
task::block_on(async { task::block_on(async {
io::timeout(Duration::from_millis(100), async { io::timeout(Duration::from_secs(1), async {
task::sleep(Duration::from_secs(1)).await; let stdin = io::stdin();
let mut line = String::new();
let _n = stdin.read_line(&mut line).await?;
Ok(()) Ok(())
}) })
.await .await

Loading…
Cancel
Save