diff --git a/Cargo.toml b/Cargo.toml index c1479c2..2d4f602 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ default = [ "async-task", "crossbeam-channel", "crossbeam-deque", + "futures-timer", "kv-log-macro", "log", "mio", @@ -34,14 +35,13 @@ default = [ "pin-project-lite", ] docs = ["attributes", "unstable", "default"] -unstable = ["std", "broadcaster"] +unstable = ["std", "broadcaster", "futures-timer"] attributes = ["async-attributes"] std = [ "alloc", "crossbeam-utils", "futures-core/std", "futures-io", - "futures-timer", "memchr", "once_cell", "pin-utils", diff --git a/examples/a-chat/client.rs b/examples/a-chat/client.rs index 1964297..48634ba 100644 --- a/examples/a-chat/client.rs +++ b/examples/a-chat/client.rs @@ -1,12 +1,11 @@ use futures::select; use futures::FutureExt; -use std::io::{self, BufRead, BufReader as StdBufReader}; use async_std::{ - io::BufReader, + io::{stdin, BufReader}, net::{TcpStream, ToSocketAddrs}, prelude::*, - stream, task, + task, }; type Result = std::result::Result>; @@ -21,9 +20,8 @@ async fn try_main(addr: impl ToSocketAddrs) -> Result<()> { let reader = BufReader::new(reader); let mut lines_from_server = futures::StreamExt::fuse(reader.lines()); - let stdin = StdBufReader::new(io::stdin()); - let mut lines_from_stdin = stream::from_iter(stdin.lines()); - + let stdin = BufReader::new(stdin()); + let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines()); loop { select! { line = lines_from_server.next().fuse() => match line { diff --git a/examples/print-file.rs b/examples/print-file.rs index 794390b..e2cdde7 100644 --- a/examples/print-file.rs +++ b/examples/print-file.rs @@ -1,7 +1,6 @@ //! Prints a file given as an argument to stdout. use std::env::args; -use std::io::Write; use async_std::fs::File; use async_std::io; @@ -15,7 +14,7 @@ fn main() -> io::Result<()> { task::block_on(async { let mut file = File::open(&path).await?; - let mut stdout = std::io::stdout(); + let mut stdout = io::stdout(); let mut buf = vec![0u8; LEN]; loop { @@ -24,12 +23,12 @@ fn main() -> io::Result<()> { // If this is the end of file, clean up and return. if n == 0 { - stdout.flush()?; + stdout.flush().await?; return Ok(()); } // Write the buffer into stdout. - stdout.write_all(&buf[..n])?; + stdout.write_all(&buf[..n]).await?; } }) } diff --git a/examples/stdin-echo.rs b/examples/stdin-echo.rs index cf0674a..9514219 100644 --- a/examples/stdin-echo.rs +++ b/examples/stdin-echo.rs @@ -1,18 +1,18 @@ //! Echoes lines read on stdin to stdout. use async_std::io; +use async_std::prelude::*; use async_std::task; -use std::io::Write; fn main() -> io::Result<()> { task::block_on(async { - let stdin = std::io::stdin(); - let mut stdout = std::io::stdout(); + let stdin = io::stdin(); + let mut stdout = io::stdout(); let mut line = String::new(); loop { // Read a line from stdin. - let n = stdin.read_line(&mut line)?; + let n = stdin.read_line(&mut line).await?; // If this is the end of stdin, return. if n == 0 { @@ -20,8 +20,8 @@ fn main() -> io::Result<()> { } // Write the line to stdout. - stdout.write_all(line.as_bytes())?; - stdout.flush()?; + stdout.write_all(line.as_bytes()).await?; + stdout.flush().await?; line.clear(); } }) diff --git a/examples/stdin-timeout.rs b/examples/stdin-timeout.rs index 2bcab5e..f13c387 100644 --- a/examples/stdin-timeout.rs +++ b/examples/stdin-timeout.rs @@ -8,11 +8,11 @@ use async_std::task; fn main() -> io::Result<()> { // This async scope times out after 5 seconds. task::block_on(io::timeout(Duration::from_secs(5), async { - let stdin = std::io::stdin(); + let stdin = io::stdin(); // Read a line from the standard input and display it. let mut line = String::new(); - stdin.read_line(&mut line)?; + stdin.read_line(&mut line).await?; dbg!(line); Ok(()) diff --git a/src/future/into_future.rs b/src/future/into_future.rs index 473ed45..8e5e5e0 100644 --- a/src/future/into_future.rs +++ b/src/future/into_future.rs @@ -5,10 +5,9 @@ use std::future::Future; /// # Examples /// /// ``` -/// use std::pin::Pin; -/// /// use async_std::future::{Future, IntoFuture}; /// use async_std::io; +/// use async_std::pin::Pin; /// /// struct Client; /// diff --git a/src/future/mod.rs b/src/future/mod.rs index 56cd71d..9b75533 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -63,14 +63,12 @@ cfg_std! { cfg_default! { pub use timeout::{timeout, TimeoutError}; - mod timeout; } cfg_unstable! { pub use into_future::IntoFuture; pub(crate) use maybe_done::MaybeDone; - mod into_future; mod maybe_done; } diff --git a/src/io/copy.rs b/src/io/copy.rs index f946c8b..f05ed0e 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -32,14 +32,13 @@ use crate::utils::Context as _; /// /// # Examples /// -/// ```no_run +/// ``` /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # /// use async_std::io; -/// use async_std::fs::File; /// /// let mut reader: &[u8] = b"hello"; -/// let mut writer = File::open("foo.txt").await?; +/// let mut writer = io::stdout(); /// /// io::copy(&mut reader, &mut writer).await?; /// # @@ -120,14 +119,13 @@ where /// /// # Examples /// -/// ```no_run +/// ``` /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # /// use async_std::io; -/// use async_std::fs::File; /// /// let mut reader: &[u8] = b"hello"; -/// let mut writer = File::open("foo.txt").await?; +/// let mut writer = io::stdout(); /// /// io::copy(&mut reader, &mut writer).await?; /// # diff --git a/src/io/mod.rs b/src/io/mod.rs index 65c204c..51c473d 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -122,6 +122,56 @@ //! # Ok(()) }) } //! ``` //! +//! ## Standard input and output +//! +//! A very common source of input is standard input: +//! +//! ```no_run +//! use async_std::io; +//! +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! let mut input = String::new(); +//! +//! io::stdin().read_line(&mut input).await?; +//! +//! println!("You typed: {}", input.trim()); +//! # +//! # Ok(()) }) } +//! ``` +//! +//! Note that you cannot use the [`?` operator] in functions that do not return +//! a [`Result`][`Result`]. Instead, you can call [`.unwrap()`] +//! or `match` on the return value to catch any possible errors: +//! +//! ```no_run +//! use async_std::io; +//! +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! let mut input = String::new(); +//! +//! io::stdin().read_line(&mut input).await.unwrap(); +//! # +//! # Ok(()) }) } +//! ``` +//! +//! And a very common source of output is standard output: +//! +//! ```no_run +//! use async_std::io; +//! use async_std::io::prelude::*; +//! +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! io::stdout().write(&[42]).await?; +//! # +//! # Ok(()) }) } +//! ``` +//! +//! Of course, using [`io::stdout`] directly is less common than something like +//! [`println!`]. +//! //! ## Iterator types //! //! A large number of the structures provided by `std::io` are for various @@ -154,14 +204,10 @@ //! //! ```no_run //! use async_std::io; -//! use async_std::fs::File; //! //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # -//! let mut reader: &[u8] = b"hello"; -//! let mut writer = File::open("foo.txt").await?; -//! -//! io::copy(&mut reader, &mut writer).await?; +//! io::copy(&mut io::stdin(), &mut io::stdout()).await?; //! # //! # Ok(()) }) } //! ``` @@ -178,14 +224,13 @@ //! ``` //! #![allow(dead_code)] //! use async_std::io; -//! use std::time::Duration; //! //! async fn read_input() -> io::Result<()> { -//! let f = io::timeout(Duration::from_secs(5), async { -//! Ok(()) -//! }); +//! let mut input = String::new(); +//! +//! io::stdin().read_line(&mut input).await?; //! -//! assert_eq!(f.await?, ()); +//! println!("You typed: {}", input.trim()); //! //! Ok(()) //! } @@ -215,6 +260,8 @@ //! [`BufReader`]: struct.BufReader.html //! [`BufWriter`]: struct.BufWriter.html //! [`Write::write`]: trait.Write.html#tymethod.write +//! [`io::stdout`]: fn.stdout.html +//! [`println!`]: ../macro.println.html //! [`Lines`]: struct.Lines.html //! [`io::Result`]: type.Result.html //! [`?` operator]: https://doc.rust-lang.org/stable/book/appendix-02-operators.html @@ -258,7 +305,24 @@ cfg_std! { } cfg_default! { + // For use in the print macros. + #[doc(hidden)] + pub use stdio::{_eprint, _print}; + + pub use stderr::{stderr, Stderr}; + pub use stdin::{stdin, Stdin}; + pub use stdout::{stdout, Stdout}; pub use timeout::timeout; mod timeout; + mod stderr; + mod stdin; + mod stdio; + mod stdout; +} + +cfg_unstable_default! { + pub use stderr::StderrLock; + pub use stdin::StdinLock; + pub use stdout::StdoutLock; } diff --git a/src/io/stderr.rs b/src/io/stderr.rs new file mode 100644 index 0000000..5ff8a02 --- /dev/null +++ b/src/io/stderr.rs @@ -0,0 +1,261 @@ +use std::pin::Pin; +use std::sync::Mutex; +use std::future::Future; + +use crate::io::{self, Write}; +use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; + +cfg_unstable! { + use once_cell::sync::Lazy; + use std::io::Write as _; +} + +/// Constructs a new handle to the standard error of the current process. +/// +/// This function is an async version of [`std::io::stderr`]. +/// +/// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html +/// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// +/// # Examples +/// +/// ```no_run +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::io; +/// use async_std::prelude::*; +/// +/// let mut stderr = io::stderr(); +/// stderr.write_all(b"Hello, world!").await?; +/// # +/// # Ok(()) }) } +/// ``` +pub fn stderr() -> Stderr { + Stderr(Mutex::new(State::Idle(Some(Inner { + stderr: std::io::stderr(), + buf: Vec::new(), + last_op: None, + })))) +} + +/// A handle to the standard error of the current process. +/// +/// This writer is created by the [`stderr`] function. See its documentation for +/// more. +/// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// +/// [`stderr`]: fn.stderr.html +#[derive(Debug)] +pub struct Stderr(Mutex); + +/// A locked reference to the Stderr handle. +/// +/// This handle implements the [`Write`] traits, and is constructed via the [`Stderr::lock`] +/// method. +/// +/// [`Write`]: trait.Read.html +/// [`Stderr::lock`]: struct.Stderr.html#method.lock +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[derive(Debug)] +pub struct StderrLock<'a>(std::io::StderrLock<'a>); + +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +unsafe impl Send for StderrLock<'_> {} + +/// The state of the asynchronous stderr. +/// +/// The stderr can be either idle or busy performing an asynchronous operation. +#[derive(Debug)] +enum State { + /// The stderr is idle. + Idle(Option), + + /// The stderr is blocked on an asynchronous operation. + /// + /// Awaiting this operation will result in the new state of the stderr. + Busy(JoinHandle), +} + +/// Inner representation of the asynchronous stderr. +#[derive(Debug)] +struct Inner { + /// The blocking stderr handle. + stderr: std::io::Stderr, + + /// The write buffer. + buf: Vec, + + /// The result of the last asynchronous operation on the stderr. + last_op: Option, +} + +/// Possible results of an asynchronous operation on the stderr. +#[derive(Debug)] +enum Operation { + Write(io::Result), + Flush(io::Result<()>), +} + +impl Stderr { + /// Locks this handle to the standard error stream, returning a writable guard. + /// + /// The lock is released when the returned lock goes out of scope. The returned guard also implements the Write trait for writing data. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::io; + /// use async_std::prelude::*; + /// + /// let stderr = io::stderr(); + /// let mut handle = stderr.lock().await; + /// + /// handle.write_all(b"hello world").await?; + /// # + /// # Ok(()) }) } + /// ``` + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[cfg(any(feature = "unstable", feature = "docs"))] + pub async fn lock(&self) -> StderrLock<'static> { + static STDERR: Lazy = Lazy::new(std::io::stderr); + + spawn_blocking(move || StderrLock(STDERR.lock())).await + } +} + +impl Write for Stderr { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let state = &mut *self.0.lock().unwrap(); + + loop { + match state { + State::Idle(opt) => { + let inner = opt.as_mut().unwrap(); + + // Check if the operation has completed. + if let Some(Operation::Write(res)) = inner.last_op.take() { + let n = res?; + + // If more data was written than is available in the buffer, let's retry + // the write operation. + if n <= buf.len() { + return Poll::Ready(Ok(n)); + } + } else { + let mut inner = opt.take().unwrap(); + + // Set the length of the inner buffer to the length of the provided buffer. + if inner.buf.len() < buf.len() { + inner.buf.reserve(buf.len() - inner.buf.len()); + } + unsafe { + inner.buf.set_len(buf.len()); + } + + // Copy the data to write into the inner buffer. + inner.buf[..buf.len()].copy_from_slice(buf); + + // Start the operation asynchronously. + *state = State::Busy(spawn_blocking(move || { + let res = std::io::Write::write(&mut inner.stderr, &inner.buf); + inner.last_op = Some(Operation::Write(res)); + State::Idle(Some(inner)) + })); + } + } + // Poll the asynchronous operation the stderr is currently blocked on. + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let state = &mut *self.0.lock().unwrap(); + + loop { + match state { + State::Idle(opt) => { + let inner = opt.as_mut().unwrap(); + + // Check if the operation has completed. + if let Some(Operation::Flush(res)) = inner.last_op.take() { + return Poll::Ready(res); + } else { + let mut inner = opt.take().unwrap(); + + // Start the operation asynchronously. + *state = State::Busy(spawn_blocking(move || { + let res = std::io::Write::flush(&mut inner.stderr); + inner.last_op = Some(Operation::Flush(res)); + State::Idle(Some(inner)) + })); + } + } + // Poll the asynchronous operation the stderr is currently blocked on. + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), + } + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} + +cfg_unix! { + use crate::os::unix::io::{AsRawFd, RawFd}; + + impl AsRawFd for Stderr { + fn as_raw_fd(&self) -> RawFd { + std::io::stderr().as_raw_fd() + } + } +} + +cfg_windows! { + use crate::os::windows::io::{AsRawHandle, RawHandle}; + + impl AsRawHandle for Stderr { + fn as_raw_handle(&self) -> RawHandle { + std::io::stderr().as_raw_handle() + } + } +} + +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +impl io::Write for StderrLock<'_> { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Poll::Ready(self.0.write(buf)) + } + + fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.0.flush()) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} diff --git a/src/io/stdin.rs b/src/io/stdin.rs new file mode 100644 index 0000000..369ccae --- /dev/null +++ b/src/io/stdin.rs @@ -0,0 +1,279 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::Mutex; + +use crate::future; +use crate::io::{self, Read}; +use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; +use crate::utils::Context as _; + +cfg_unstable! { + use once_cell::sync::Lazy; + use std::io::Read as _; +} + +/// Constructs a new handle to the standard input of the current process. +/// +/// This function is an async version of [`std::io::stdin`]. +/// +/// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html +/// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// +/// # Examples +/// +/// ```no_run +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::io; +/// +/// let stdin = io::stdin(); +/// let mut line = String::new(); +/// stdin.read_line(&mut line).await?; +/// # +/// # Ok(()) }) } +/// ``` +pub fn stdin() -> Stdin { + Stdin(Mutex::new(State::Idle(Some(Inner { + stdin: std::io::stdin(), + line: String::new(), + buf: Vec::new(), + last_op: None, + })))) +} + +/// A handle to the standard input of the current process. +/// +/// This reader is created by the [`stdin`] function. See its documentation for +/// more. +/// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// +/// [`stdin`]: fn.stdin.html +#[derive(Debug)] +pub struct Stdin(Mutex); + +/// A locked reference to the Stdin handle. +/// +/// This handle implements the [`Read`] traits, and is constructed via the [`Stdin::lock`] method. +/// +/// [`Read`]: trait.Read.html +/// [`Stdin::lock`]: struct.Stdin.html#method.lock +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[cfg(feature = "unstable")] +#[derive(Debug)] +pub struct StdinLock<'a>(std::io::StdinLock<'a>); + +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +unsafe impl Send for StdinLock<'_> {} + +/// The state of the asynchronous stdin. +/// +/// The stdin can be either idle or busy performing an asynchronous operation. +#[derive(Debug)] +enum State { + /// The stdin is idle. + Idle(Option), + + /// The stdin is blocked on an asynchronous operation. + /// + /// Awaiting this operation will result in the new state of the stdin. + Busy(JoinHandle), +} + +/// Inner representation of the asynchronous stdin. +#[derive(Debug)] +struct Inner { + /// The blocking stdin handle. + stdin: std::io::Stdin, + + /// The line buffer. + line: String, + + /// The write buffer. + buf: Vec, + + /// The result of the last asynchronous operation on the stdin. + last_op: Option, +} + +/// Possible results of an asynchronous operation on the stdin. +#[derive(Debug)] +enum Operation { + ReadLine(io::Result), + Read(io::Result), +} + +impl Stdin { + /// Reads a line of input into the specified buffer. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::io; + /// + /// let stdin = io::stdin(); + /// let mut line = String::new(); + /// stdin.read_line(&mut line).await?; + /// # + /// # Ok(()) }) } + /// ``` + pub async fn read_line(&self, buf: &mut String) -> io::Result { + future::poll_fn(|cx| { + let state = &mut *self.0.lock().unwrap(); + + loop { + match state { + State::Idle(opt) => { + let inner = opt.as_mut().unwrap(); + + // Check if the operation has completed. + if let Some(Operation::ReadLine(res)) = inner.last_op.take() { + let n = res?; + + // Copy the read data into the buffer and return. + buf.push_str(&inner.line); + return Poll::Ready(Ok(n)); + } else { + let mut inner = opt.take().unwrap(); + + // Start the operation asynchronously. + *state = State::Busy(spawn_blocking(move || { + inner.line.clear(); + let res = inner.stdin.read_line(&mut inner.line); + inner.last_op = Some(Operation::ReadLine(res)); + State::Idle(Some(inner)) + })); + } + } + // Poll the asynchronous operation the stdin is currently blocked on. + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), + } + } + }) + .await + .context(|| String::from("could not read line on stdin")) + } + + /// Locks this handle to the standard input stream, returning a readable guard. + /// + /// The lock is released when the returned lock goes out of scope. The returned guard also implements the Read trait for accessing the underlying data. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::io; + /// use async_std::prelude::*; + /// + /// let mut buffer = String::new(); + /// + /// let stdin = io::stdin(); + /// let mut handle = stdin.lock().await; + /// + /// handle.read_to_string(&mut buffer).await?; + /// # + /// # Ok(()) }) } + /// ``` + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[cfg(any(feature = "unstable", feature = "docs"))] + pub async fn lock(&self) -> StdinLock<'static> { + static STDIN: Lazy = Lazy::new(std::io::stdin); + + spawn_blocking(move || StdinLock(STDIN.lock())).await + } +} + +impl Read for Stdin { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let state = &mut *self.0.lock().unwrap(); + + loop { + match state { + State::Idle(opt) => { + let inner = opt.as_mut().unwrap(); + + // Check if the operation has completed. + if let Some(Operation::Read(res)) = inner.last_op.take() { + let n = res?; + + // If more data was read than fits into the buffer, let's retry the read + // operation. + if n <= buf.len() { + // Copy the read data into the buffer and return. + buf[..n].copy_from_slice(&inner.buf[..n]); + return Poll::Ready(Ok(n)); + } + } else { + let mut inner = opt.take().unwrap(); + + // Set the length of the inner buffer to the length of the provided buffer. + if inner.buf.len() < buf.len() { + inner.buf.reserve(buf.len() - inner.buf.len()); + } + unsafe { + inner.buf.set_len(buf.len()); + } + + // Start the operation asynchronously. + *state = State::Busy(spawn_blocking(move || { + let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf); + inner.last_op = Some(Operation::Read(res)); + State::Idle(Some(inner)) + })); + } + } + // Poll the asynchronous operation the stdin is currently blocked on. + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), + } + } + } +} + +cfg_unix! { + use crate::os::unix::io::{AsRawFd, RawFd}; + + impl AsRawFd for Stdin { + fn as_raw_fd(&self) -> RawFd { + std::io::stdin().as_raw_fd() + } + } +} + +cfg_windows! { + use crate::os::windows::io::{AsRawHandle, RawHandle}; + + impl AsRawHandle for Stdin { + fn as_raw_handle(&self) -> RawHandle { + std::io::stdin().as_raw_handle() + } + } +} + +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +impl Read for StdinLock<'_> { + fn poll_read( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Poll::Ready(self.0.read(buf)) + } +} diff --git a/src/io/stdio.rs b/src/io/stdio.rs new file mode 100644 index 0000000..0e11e1a --- /dev/null +++ b/src/io/stdio.rs @@ -0,0 +1,21 @@ +//! Internal types for stdio. +//! +//! This module is a port of `libstd/io/stdio.rs`,and contains internal types for `print`/`eprint`. + +use crate::io::{stderr, stdout}; +use crate::prelude::*; +use std::fmt; + +#[doc(hidden)] +pub async fn _print(args: fmt::Arguments<'_>) { + if let Err(e) = stdout().write_fmt(args).await { + panic!("failed printing to stdout: {}", e); + } +} + +#[doc(hidden)] +pub async fn _eprint(args: fmt::Arguments<'_>) { + if let Err(e) = stderr().write_fmt(args).await { + panic!("failed printing to stderr: {}", e); + } +} diff --git a/src/io/stdout.rs b/src/io/stdout.rs new file mode 100644 index 0000000..1711c09 --- /dev/null +++ b/src/io/stdout.rs @@ -0,0 +1,261 @@ +use std::pin::Pin; +use std::sync::Mutex; +use std::future::Future; + +use crate::io::{self, Write}; +use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; + +cfg_unstable! { + use once_cell::sync::Lazy; + use std::io::Write as _; +} + +/// Constructs a new handle to the standard output of the current process. +/// +/// This function is an async version of [`std::io::stdout`]. +/// +/// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html +/// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// +/// # Examples +/// +/// ```no_run +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::io; +/// use async_std::prelude::*; +/// +/// let mut stdout = io::stdout(); +/// stdout.write_all(b"Hello, world!").await?; +/// # +/// # Ok(()) }) } +/// ``` +pub fn stdout() -> Stdout { + Stdout(Mutex::new(State::Idle(Some(Inner { + stdout: std::io::stdout(), + buf: Vec::new(), + last_op: None, + })))) +} + +/// A handle to the standard output of the current process. +/// +/// This writer is created by the [`stdout`] function. See its documentation +/// for more. +/// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// +/// [`stdout`]: fn.stdout.html +#[derive(Debug)] +pub struct Stdout(Mutex); + +/// A locked reference to the Stderr handle. +/// +/// This handle implements the [`Write`] traits, and is constructed via the [`Stdout::lock`] +/// method. +/// +/// [`Write`]: trait.Read.html +/// [`Stdout::lock`]: struct.Stdout.html#method.lock +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[derive(Debug)] +pub struct StdoutLock<'a>(std::io::StdoutLock<'a>); + +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +unsafe impl Send for StdoutLock<'_> {} + +/// The state of the asynchronous stdout. +/// +/// The stdout can be either idle or busy performing an asynchronous operation. +#[derive(Debug)] +enum State { + /// The stdout is idle. + Idle(Option), + + /// The stdout is blocked on an asynchronous operation. + /// + /// Awaiting this operation will result in the new state of the stdout. + Busy(JoinHandle), +} + +/// Inner representation of the asynchronous stdout. +#[derive(Debug)] +struct Inner { + /// The blocking stdout handle. + stdout: std::io::Stdout, + + /// The write buffer. + buf: Vec, + + /// The result of the last asynchronous operation on the stdout. + last_op: Option, +} + +/// Possible results of an asynchronous operation on the stdout. +#[derive(Debug)] +enum Operation { + Write(io::Result), + Flush(io::Result<()>), +} + +impl Stdout { + /// Locks this handle to the standard error stream, returning a writable guard. + /// + /// The lock is released when the returned lock goes out of scope. The returned guard also implements the Write trait for writing data. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::io; + /// use async_std::prelude::*; + /// + /// let stdout = io::stdout(); + /// let mut handle = stdout.lock().await; + /// + /// handle.write_all(b"hello world").await?; + /// # + /// # Ok(()) }) } + /// ``` + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[cfg(any(feature = "unstable", feature = "docs"))] + pub async fn lock(&self) -> StdoutLock<'static> { + static STDOUT: Lazy = Lazy::new(std::io::stdout); + + spawn_blocking(move || StdoutLock(STDOUT.lock())).await + } +} + +impl Write for Stdout { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let state = &mut *self.0.lock().unwrap(); + + loop { + match state { + State::Idle(opt) => { + let inner = opt.as_mut().unwrap(); + + // Check if the operation has completed. + if let Some(Operation::Write(res)) = inner.last_op.take() { + let n = res?; + + // If more data was written than is available in the buffer, let's retry + // the write operation. + if n <= buf.len() { + return Poll::Ready(Ok(n)); + } + } else { + let mut inner = opt.take().unwrap(); + + // Set the length of the inner buffer to the length of the provided buffer. + if inner.buf.len() < buf.len() { + inner.buf.reserve(buf.len() - inner.buf.len()); + } + unsafe { + inner.buf.set_len(buf.len()); + } + + // Copy the data to write into the inner buffer. + inner.buf[..buf.len()].copy_from_slice(buf); + + // Start the operation asynchronously. + *state = State::Busy(spawn_blocking(move || { + let res = std::io::Write::write(&mut inner.stdout, &inner.buf); + inner.last_op = Some(Operation::Write(res)); + State::Idle(Some(inner)) + })); + } + } + // Poll the asynchronous operation the stdout is currently blocked on. + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let state = &mut *self.0.lock().unwrap(); + + loop { + match state { + State::Idle(opt) => { + let inner = opt.as_mut().unwrap(); + + // Check if the operation has completed. + if let Some(Operation::Flush(res)) = inner.last_op.take() { + return Poll::Ready(res); + } else { + let mut inner = opt.take().unwrap(); + + // Start the operation asynchronously. + *state = State::Busy(spawn_blocking(move || { + let res = std::io::Write::flush(&mut inner.stdout); + inner.last_op = Some(Operation::Flush(res)); + State::Idle(Some(inner)) + })); + } + } + // Poll the asynchronous operation the stdout is currently blocked on. + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), + } + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} + +cfg_unix! { + use crate::os::unix::io::{AsRawFd, RawFd}; + + impl AsRawFd for Stdout { + fn as_raw_fd(&self) -> RawFd { + std::io::stdout().as_raw_fd() + } + } +} + +cfg_windows! { + use crate::os::windows::io::{AsRawHandle, RawHandle}; + + impl AsRawHandle for Stdout { + fn as_raw_handle(&self) -> RawHandle { + std::io::stdout().as_raw_handle() + } + } +} + +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +impl Write for StdoutLock<'_> { + fn poll_write( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Poll::Ready(self.0.write(buf)) + } + + fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.0.flush()) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_flush(cx) + } +} diff --git a/src/io/timeout.rs b/src/io/timeout.rs index 3627439..6e22dbf 100644 --- a/src/io/timeout.rs +++ b/src/io/timeout.rs @@ -23,9 +23,9 @@ use crate::io; /// use async_std::io; /// /// io::timeout(Duration::from_secs(5), async { -/// let stdin = std::io::stdin(); +/// let stdin = io::stdin(); /// let mut line = String::new(); -/// let n = stdin.read_line(&mut line)?; +/// let n = stdin.read_line(&mut line).await?; /// Ok(()) /// }) /// .await?; diff --git a/src/lib.rs b/src/lib.rs index 8cd0d30..d498792 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -273,6 +273,9 @@ cfg_default! { } cfg_unstable! { + pub mod pin; + pub mod process; + mod unit; mod vec; mod result; diff --git a/src/macros.rs b/src/macros.rs index 22cd00d..638a234 100644 --- a/src/macros.rs +++ b/src/macros.rs @@ -1,3 +1,171 @@ +/// Prints to the standard output. +/// +/// Equivalent to the [`println!`] macro except that a newline is not printed at +/// the end of the message. +/// +/// Note that stdout is frequently line-buffered by default so it may be +/// necessary to use [`io::stdout().flush()`][flush] to ensure the output is emitted +/// immediately. +/// +/// Use `print!` only for the primary output of your program. Use +/// [`eprint!`] instead to print error and progress messages. +/// +/// [`println!`]: macro.println.html +/// [flush]: io/trait.Write.html#tymethod.flush +/// [`eprint!`]: macro.eprint.html +/// +/// # Panics +/// +/// Panics if writing to `io::stdout()` fails. +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use async_std::io; +/// use async_std::prelude::*; +/// use async_std::print; +/// +/// print!("this ").await; +/// print!("will ").await; +/// print!("be ").await; +/// print!("on ").await; +/// print!("the ").await; +/// print!("same ").await; +/// print!("line ").await; +/// +/// io::stdout().flush().await.unwrap(); +/// +/// print!("this string has a newline, why not choose println! instead?\n").await; +/// +/// io::stdout().flush().await.unwrap(); +/// # +/// # }) +/// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[macro_export] +macro_rules! print { + ($($arg:tt)*) => ($crate::io::_print(format_args!($($arg)*))) +} + +/// Prints to the standard output, with a newline. +/// +/// On all platforms, the newline is the LINE FEED character (`\n`/`U+000A`) alone +/// (no additional CARRIAGE RETURN (`\r`/`U+000D`)). +/// +/// Use the [`format!`] syntax to write data to the standard output. +/// See [`std::fmt`] for more information. +/// +/// Use `println!` only for the primary output of your program. Use +/// [`eprintln!`] instead to print error and progress messages. +/// +/// [`format!`]: macro.format.html +/// [`std::fmt`]: https://doc.rust-lang.org/std/fmt/index.html +/// [`eprintln!`]: macro.eprintln.html +/// # Panics +/// +/// Panics if writing to `io::stdout` fails. +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use async_std::println; +/// +/// println!().await; // prints just a newline +/// println!("hello there!").await; +/// println!("format {} arguments", "some").await; +/// # +/// # }) +/// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[macro_export] +macro_rules! println { + () => ($crate::print!("\n")); + ($($arg:tt)*) => (async { + $crate::io::_print(format_args!($($arg)*)).await; + $crate::io::_print(format_args!("\n")).await; + }) +} + +/// Prints to the standard error. +/// +/// Equivalent to the [`print!`] macro, except that output goes to +/// [`io::stderr`] instead of `io::stdout`. See [`print!`] for +/// example usage. +/// +/// Use `eprint!` only for error and progress messages. Use `print!` +/// instead for the primary output of your program. +/// +/// [`io::stderr`]: io/struct.Stderr.html +/// [`print!`]: macro.print.html +/// +/// # Panics +/// +/// Panics if writing to `io::stderr` fails. +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use async_std::eprint; +/// +/// eprint!("Error: Could not complete task").await; +/// # +/// # }) +/// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[macro_export] +macro_rules! eprint { + ($($arg:tt)*) => ($crate::io::_eprint(format_args!($($arg)*))) +} + +/// Prints to the standard error, with a newline. +/// +/// Equivalent to the [`println!`] macro, except that output goes to +/// [`io::stderr`] instead of `io::stdout`. See [`println!`] for +/// example usage. +/// +/// Use `eprintln!` only for error and progress messages. Use `println!` +/// instead for the primary output of your program. +/// +/// [`io::stderr`]: io/struct.Stderr.html +/// [`println!`]: macro.println.html +/// +/// # Panics +/// +/// Panics if writing to `io::stderr` fails. +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use async_std::eprintln; +/// +/// eprintln!("Error: Could not complete task").await; +/// # +/// # }) +/// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[macro_export] +macro_rules! eprintln { + () => (async { $crate::eprint!("\n").await; }); + ($($arg:tt)*) => ( + async { + $crate::io::_eprint(format_args!($($arg)*)).await; + $crate::io::_eprint(format_args!("\n")).await; + } + ); +} + /// Declares task-local values. /// /// The macro wraps any number of static declarations and makes them task-local. Attributes and diff --git a/src/prelude.rs b/src/prelude.rs index a522745..a2a14a1 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -38,14 +38,16 @@ cfg_std! { pub use crate::io::prelude::SeekExt as _; #[doc(no_inline)] pub use crate::io::prelude::WriteExt as _; - - #[doc(no_inline)] - pub use crate::stream::DoubleEndedStream; - #[doc(no_inline)] - pub use crate::stream::ExactSizeStream; } cfg_default! { #[doc(no_inline)] pub use crate::task_local; } + +cfg_unstable! { + #[doc(no_inline)] + pub use crate::stream::DoubleEndedStream; + #[doc(no_inline)] + pub use crate::stream::ExactSizeStream; +} diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs index 1563e1b..a177865 100644 --- a/src/stream/double_ended_stream/mod.rs +++ b/src/stream/double_ended_stream/mod.rs @@ -1,7 +1,7 @@ use crate::stream::Stream; -use core::pin::Pin; -use core::task::{Context, Poll}; +use std::pin::Pin; +use std::task::{Context, Poll}; mod next_back; mod nth_back; @@ -22,6 +22,8 @@ use try_rfold::TryRFoldFuture; /// `Item`s from the back, as well as the front. /// /// [`Stream`]: trait.Stream.html +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait DoubleEndedStream: Stream { #[doc = r#" Attempts to receive the next item from the back of the stream. diff --git a/src/stream/exact_size_stream.rs b/src/stream/exact_size_stream.rs index 28792c6..8b6ba97 100644 --- a/src/stream/exact_size_stream.rs +++ b/src/stream/exact_size_stream.rs @@ -76,6 +76,8 @@ pub use crate::stream::Stream; /// # }); /// ``` #[allow(clippy::len_without_is_empty)] // ExactSizeIterator::is_empty is unstable +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait ExactSizeStream: Stream { /// Returns the exact number of times the stream will iterate. /// diff --git a/src/stream/extend.rs b/src/stream/extend.rs index 9f54007..702cbca 100644 --- a/src/stream/extend.rs +++ b/src/stream/extend.rs @@ -27,6 +27,8 @@ use crate::stream::IntoStream; /// # /// # }) /// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait Extend { /// Extends a collection with the contents of a stream. fn extend<'a, T: IntoStream + 'a>( diff --git a/src/stream/fused_stream.rs b/src/stream/fused_stream.rs index 5d02f1d..e14ab5b 100644 --- a/src/stream/fused_stream.rs +++ b/src/stream/fused_stream.rs @@ -14,6 +14,8 @@ use crate::stream::Stream; /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None /// [`Stream::fuse`]: trait.Stream.html#method.fuse /// [`Fuse`]: struct.Fuse.html +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait FusedStream: Stream {} impl FusedStream for &mut S {} diff --git a/src/stream/interval.rs b/src/stream/interval.rs index 8dd6b5e..7a0c174 100644 --- a/src/stream/interval.rs +++ b/src/stream/interval.rs @@ -41,6 +41,8 @@ use futures_timer::Delay; /// # /// # Ok(()) }) } /// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub fn interval(dur: Duration) -> Interval { Interval { delay: Delay::new(dur), @@ -54,6 +56,8 @@ pub fn interval(dur: Duration) -> Interval { /// documentation for more. /// /// [`interval`]: fn.interval.html +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] pub struct Interval { delay: Delay, diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f1c5fdf..0bfd4e8 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -317,32 +317,29 @@ mod once; mod repeat; mod repeat_with; -cfg_std! { - pub use double_ended_stream::DoubleEndedStream; - pub use exact_size_stream::ExactSizeStream; - pub use fused_stream::FusedStream; - pub use interval::{interval, Interval}; - pub use pending::{pending, Pending}; - pub use product::Product; - pub use successors::{successors, Successors}; - pub use sum::Sum; - +cfg_unstable! { mod double_ended_stream; mod exact_size_stream; + mod extend; + mod from_stream; mod fused_stream; mod interval; + mod into_stream; mod pending; mod product; mod successors; mod sum; -} - -cfg_unstable! { - mod from_stream; - mod into_stream; - mod extend; + pub use double_ended_stream::DoubleEndedStream; + pub use exact_size_stream::ExactSizeStream; pub use extend::{extend, Extend}; pub use from_stream::FromStream; + pub use fused_stream::FusedStream; + pub use interval::{interval, Interval}; pub use into_stream::IntoStream; + pub use pending::{pending, Pending}; + pub use product::Product; + pub use stream::Merge; + pub use successors::{successors, Successors}; + pub use sum::Sum; } diff --git a/src/stream/once.rs b/src/stream/once.rs index 9daeac5..b86f181 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -5,7 +5,7 @@ use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -#[cfg(feature = "std")] +#[cfg(feature = "unstable")] use crate::stream::DoubleEndedStream; /// Creates a stream that yields a single item. @@ -50,8 +50,8 @@ impl Stream for Once { } } -#[cfg(feature = "std")] -impl DoubleEndedStream for Once { +#[cfg(feature = "unstable")] +impl DoubleEndedStream for Once { fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(self.project().value.take()) } diff --git a/src/stream/product.rs b/src/stream/product.rs index 9794846..15497e8 100644 --- a/src/stream/product.rs +++ b/src/stream/product.rs @@ -1,6 +1,5 @@ -use alloc::boxed::Box; -use core::future::Future; use core::pin::Pin; +use core::future::Future; use crate::stream::Stream; @@ -14,6 +13,8 @@ use crate::stream::Stream; /// [`product`]: trait.Product.html#tymethod.product /// [`FromStream`]: trait.FromStream.html /// [`Stream::product`]: trait.Stream.html#method.product +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait Product: Sized { /// Method which takes a stream and generates `Self` from the elements by /// multiplying the items. @@ -22,9 +23,9 @@ pub trait Product: Sized { S: Stream + 'a; } -use crate::stream::stream::StreamExt; -use core::num::Wrapping; use core::ops::Mul; +use core::num::Wrapping; +use crate::stream::stream::StreamExt; macro_rules! integer_product { (@impls $one: expr, $($a:ty)*) => ($( @@ -74,5 +75,5 @@ macro_rules! float_product { ); } -integer_product! { i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize } -float_product! { f32 f64 } +integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize } +float_product!{ f32 f64 } diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs index ae6c78c..63e0449 100644 --- a/src/stream/stream/count.rs +++ b/src/stream/stream/count.rs @@ -9,6 +9,8 @@ use crate::task::{Context, Poll}; pin_project! { #[doc(hidden)] #[allow(missing_debug_implementations)] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub struct CountFuture { #[pin] stream: S, diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index d1eea9d..2320972 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -16,6 +16,8 @@ pin_project! { /// /// [`merge`]: trait.Stream.html#method.merge /// [`Stream`]: trait.Stream.html + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] pub struct Merge { #[pin] diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 5cdf530..d0cc718 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -112,43 +112,35 @@ pub use zip::Zip; use core::cmp::Ordering; -cfg_std! { - use core::time::Duration; - use crate::stream::{Product, Sum}; - use alloc::boxed::Box; +cfg_unstable! { use core::future::Future; use core::pin::Pin; + use core::time::Duration; - use unzip::UnzipFuture; - use count::CountFuture; - - pub use throttle::Throttle; - pub use merge::Merge; - pub use delay::Delay; - pub use timeout::{Timeout, TimeoutError}; - - mod timeout; - mod throttle; - mod merge; - mod delay; - mod unzip; - mod count; -} - -cfg_unstable! { - use crate::stream::FromStream; use crate::stream::into_stream::IntoStream; + use crate::stream::{FromStream, Product, Sum}; use crate::stream::Extend; + use count::CountFuture; use partition::PartitionFuture; + use unzip::UnzipFuture; + pub use merge::Merge; pub use flatten::Flatten; pub use flat_map::FlatMap; + pub use timeout::{TimeoutError, Timeout}; + pub use throttle::Throttle; + pub use delay::Delay; - + mod count; + mod merge; mod flatten; mod flat_map; mod partition; + mod timeout; + mod throttle; + mod delay; + mod unzip; } extension_trait! { @@ -363,7 +355,8 @@ extension_trait! { # }) } ``` "#] - #[cfg(feature = "std")] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn throttle(self, d: Duration) -> Throttle where Self: Sized, @@ -605,7 +598,8 @@ extension_trait! { # }) } ``` "#] - #[cfg(feature = "std")] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn delay(self, dur: std::time::Duration) -> Delay where Self: Sized, @@ -1517,6 +1511,8 @@ extension_trait! { # }) } ``` "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn by_ref(&mut self) -> &mut Self { self } @@ -1660,7 +1656,8 @@ extension_trait! { # Ok(()) }) } ``` "#] - #[cfg(feature = "std")] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn timeout(self, dur: Duration) -> Timeout where Self: Stream + Sized, @@ -1825,7 +1822,8 @@ extension_trait! { # }) } ``` "#] - #[cfg(feature = "std")] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn unzip(self) -> impl Future [UnzipFuture] where FromA: Default + Extend, @@ -1923,7 +1921,8 @@ extension_trait! { # }); ``` "#] - #[cfg(feature = "std")] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn merge(self, other: U) -> Merge where Self: Sized, @@ -2069,7 +2068,8 @@ extension_trait! { # }) } ``` "#] - #[cfg(feature = "std")] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn count(self) -> impl Future [CountFuture] where Self: Sized, @@ -2330,7 +2330,8 @@ extension_trait! { # }) } ``` "#] - #[cfg(feature = "std")] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn sum<'a, S>( self, ) -> impl Future + 'a [Pin + 'a>>] @@ -2375,7 +2376,8 @@ extension_trait! { # }) } ``` "#] - #[cfg(feature = "std")] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn product<'a, P>( self, ) -> impl Future + 'a [Pin + 'a>>] diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 411be7e..ce658c8 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -47,6 +47,8 @@ impl Stream for Timeout { } /// An error returned when a stream times out. +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[cfg(any(feature = "unstable", feature = "docs"))] #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct TimeoutError { _private: (), diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs index 94cbc0a..7771509 100644 --- a/src/stream/stream/unzip.rs +++ b/src/stream/stream/unzip.rs @@ -8,6 +8,8 @@ use crate::task::{Context, Poll}; pin_project! { #[derive(Clone, Debug)] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub struct UnzipFuture { #[pin] stream: S, diff --git a/src/stream/successors.rs b/src/stream/successors.rs index fb450c6..a9ce40f 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -27,6 +27,8 @@ use pin_project_lite::pin_project; /// # /// # }) } /// ``` +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub fn successors(first: Option, succ: F) -> Successors where F: FnMut(&T) -> Option, @@ -41,6 +43,8 @@ pin_project! { /// This stream is constructed by [`successors`] function /// /// [`successors`]: fn.succssors.html + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] pub struct Successors where diff --git a/src/stream/sum.rs b/src/stream/sum.rs index dc41a0f..3b3144e 100644 --- a/src/stream/sum.rs +++ b/src/stream/sum.rs @@ -1,4 +1,3 @@ -use alloc::boxed::Box; use core::future::Future; use core::pin::Pin; @@ -14,6 +13,8 @@ use crate::stream::Stream; /// [`sum`]: trait.Sum.html#tymethod.sum /// [`FromStream`]: trait.FromStream.html /// [`Stream::sum`]: trait.Stream.html#method.sum +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait Sum: Sized { /// Method which takes a stream and generates `Self` from the elements by /// "summing up" the items. diff --git a/src/utils.rs b/src/utils.rs index a930a84..f18b74d 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -21,7 +21,7 @@ pub fn abort_on_panic(f: impl FnOnce() -> T) -> T { } /// Generates a random number in `0..n`. -#[cfg(any(feature = "std", feature = "default"))] +#[cfg(any(feature = "unstable", feature = "default"))] pub fn random(n: u32) -> u32 { use std::cell::Cell; use std::num::Wrapping; @@ -257,6 +257,11 @@ macro_rules! extension_trait { $(#[cfg(feature = "docs")] $imp)* }; + // Optimization: expand `$head` eagerly before starting a new method definition. + (@ext ($($head:tt)*) #[doc = $d:literal] $($tail:tt)*) => { + $($head)* extension_trait!(@ext (#[doc = $d]) $($tail)*); + }; + // Parse the return type in an extension method. (@doc ($($head:tt)*) -> impl Future $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => { extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*); diff --git a/tests/io_timeout.rs b/tests/io_timeout.rs index aa464f4..85a17ab 100644 --- a/tests/io_timeout.rs +++ b/tests/io_timeout.rs @@ -7,9 +7,10 @@ use async_std::task; #[should_panic(expected = "timed out")] fn io_timeout_timedout() { task::block_on(async { - io::timeout(Duration::from_millis(100), async { - task::sleep(Duration::from_secs(1)).await; - + io::timeout(Duration::from_secs(1), async { + let stdin = io::stdin(); + let mut line = String::new(); + let _n = stdin.read_line(&mut line).await?; Ok(()) }) .await