From e44451a042b97c6bf7d0ab0578f0005923195b02 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 12 Aug 2019 12:43:44 +0200 Subject: [PATCH] Revamp IO traits and Stream trait --- Cargo.toml | 1 + src/fs/file.rs | 10 +- src/future/mod.rs | 44 +---- src/future/pending.rs | 19 ++ src/future/ready.rs | 21 +++ src/io/buf_read.rs | 321 ++++++++++++++++++++++++++++++++++ src/io/buf_reader.rs | 253 +++++++++++++++++++++++++++ src/io/mod.rs | 15 +- src/io/read.rs | 392 ++++++++++++++++++++++++++++++++++++++++++ src/io/seek.rs | 81 +++++++++ src/io/stderr.rs | 2 +- src/io/stdin.rs | 2 +- src/io/stdout.rs | 2 +- src/io/write.rs | 213 +++++++++++++++++++++++ src/net/tcp.rs | 10 +- src/os/unix/net.rs | 8 +- src/prelude.rs | 17 +- src/stream/empty.rs | 43 +++++ src/stream/mod.rs | 19 +- src/stream/once.rs | 41 +++++ src/stream/repeat.rs | 44 +++++ src/stream/stream.rs | 169 ++++++++++++++++++ src/task/mod.rs | 2 +- src/time/mod.rs | 31 ++-- 24 files changed, 1670 insertions(+), 90 deletions(-) create mode 100644 src/future/pending.rs create mode 100644 src/future/ready.rs create mode 100644 src/io/buf_read.rs create mode 100644 src/io/buf_reader.rs create mode 100644 src/io/read.rs create mode 100644 src/io/seek.rs create mode 100644 src/io/write.rs create mode 100644 src/stream/empty.rs create mode 100644 src/stream/once.rs create mode 100644 src/stream/repeat.rs create mode 100644 src/stream/stream.rs diff --git a/Cargo.toml b/Cargo.toml index 34da039..63e127a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ futures-preview = "0.3.0-alpha.17" futures-timer = "0.3.0" lazy_static = "1.3.0" log = { version = "0.4.8", features = ["kv_unstable"] } +memchr = "2.2.1" mio = "0.6.19" mio-uds = "0.6.7" num_cpus = "1.10.0" diff --git a/src/fs/file.rs b/src/fs/file.rs index bdf398a..fa6cc00 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -483,7 +483,7 @@ impl File { } } -impl AsyncRead for File { +impl futures::io::AsyncRead for File { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -498,7 +498,7 @@ impl AsyncRead for File { } } -impl AsyncRead for &File { +impl futures::io::AsyncRead for &File { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -564,7 +564,7 @@ impl AsyncRead for &File { } } -impl AsyncWrite for File { +impl futures::io::AsyncWrite for File { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -582,7 +582,7 @@ impl AsyncWrite for File { } } -impl AsyncWrite for &File { +impl futures::io::AsyncWrite for &File { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -693,7 +693,7 @@ impl AsyncWrite for &File { } } -impl AsyncSeek for File { +impl futures::io::AsyncSeek for File { fn poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/future/mod.rs b/src/future/mod.rs index 58e765a..29e2b04 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -3,44 +3,8 @@ #[doc(inline)] pub use std::future::Future; -/// Never resolves to a value. -/// -/// # Examples -/// ``` -/// # #![feature(async_await)] -/// # fn main() { async_std::task::block_on(async { -/// # -/// use async_std::future::pending; -/// use async_std::prelude::*; -/// use std::time::Duration; -/// -/// let dur = Duration::from_secs(1); -/// assert!(pending::<()>().timeout(dur).await.is_err()); -/// # -/// # }) } -/// ``` -pub async fn pending() -> T { - futures::future::pending::().await -} +pub use pending::pending; +pub use ready::ready; -/// Resolves to the provided value. -/// -/// This function is an async version of [`std::convert::identity`]. -/// -/// [`std::convert::identity`]: https://doc.rust-lang.org/std/convert/fn.identity.html -/// -/// # Examples -/// -/// ``` -/// # #![feature(async_await)] -/// # fn main() { async_std::task::block_on(async { -/// # -/// use async_std::future::ready; -/// -/// assert_eq!(ready(10).await, 10); -/// # -/// # }) } -/// ``` -pub async fn ready(val: T) -> T { - val -} +mod pending; +mod ready; diff --git a/src/future/pending.rs b/src/future/pending.rs new file mode 100644 index 0000000..f1ec26e --- /dev/null +++ b/src/future/pending.rs @@ -0,0 +1,19 @@ +/// Never resolves to a value. +/// +/// # Examples +/// ``` +/// # #![feature(async_await)] +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::future::pending; +/// use async_std::prelude::*; +/// use std::time::Duration; +/// +/// let dur = Duration::from_secs(1); +/// assert!(pending::<()>().timeout(dur).await.is_err()); +/// # +/// # }) } +/// ``` +pub async fn pending() -> T { + futures::future::pending::().await +} diff --git a/src/future/ready.rs b/src/future/ready.rs new file mode 100644 index 0000000..7535d50 --- /dev/null +++ b/src/future/ready.rs @@ -0,0 +1,21 @@ +/// Resolves to the provided value. +/// +/// This function is an async version of [`std::convert::identity`]. +/// +/// [`std::convert::identity`]: https://doc.rust-lang.org/std/convert/fn.identity.html +/// +/// # Examples +/// +/// ``` +/// # #![feature(async_await)] +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::future::ready; +/// +/// assert_eq!(ready(10).await, 10); +/// # +/// # }) } +/// ``` +pub async fn ready(val: T) -> T { + val +} diff --git a/src/io/buf_read.rs b/src/io/buf_read.rs new file mode 100644 index 0000000..90c1fff --- /dev/null +++ b/src/io/buf_read.rs @@ -0,0 +1,321 @@ +use std::future::Future; +use std::io::{self}; +use std::mem; +use std::pin::Pin; +use std::str; +use std::task::{Context, Poll}; + +use cfg_if::cfg_if; +use futures::io::AsyncBufRead; +use futures::Stream; + +cfg_if! { + if #[cfg(feature = "docs.rs")] { + #[doc(hidden)] + pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); + + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + } + } else { + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + } + } +} + +/// Allows reading from a buffered byte stream. +/// +/// This trait is an async version of [`std::io::BufRead`]. +/// +/// While it is currently not possible to implement this trait directly, it get implemented +/// automatically for all types that implement [`futures::io::AsyncBufRead`]. +/// +/// [`std::io::BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html +/// [`futures::io::AsyncBufRead`]: +/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html +pub trait BufRead { + /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached. + /// + /// This function will read bytes from the underlying stream until the delimiter or EOF is + /// found. Once found, all bytes up to, and including, the delimiter (if found) will be + /// appended to `buf`. + /// + /// If successful, this function will return the total number of bytes read. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, io::BufReader, prelude::*}; + /// + /// let mut f = BufReader::new(File::open("a.txt").await?); + /// + /// let mut buf = vec![0; 1024]; + /// let n = f.read_until(b'\n', &mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read_until<'a>( + &'a mut self, + byte: u8, + buf: &'a mut Vec, + ) -> ret!('a, ReadUntilFuture, io::Result) + where + Self: Unpin, + { + ReadUntilFuture { + reader: self, + byte, + buf, + read: 0, + } + } + + /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is reached. + /// + /// This function will read bytes from the underlying stream until the newline delimiter (the + /// 0xA byte) or EOF is found. Once found, all bytes up to, and including, the delimiter (if + /// found) will be appended to `buf`. + /// + /// If successful, this function will return the total number of bytes read. + /// + /// If this function returns `Ok(0)`, the stream has reached EOF. + /// + /// # Errors + /// + /// This function has the same error semantics as [`read_until`] and will also return an error + /// if the read bytes are not valid UTF-8. If an I/O error is encountered then `buf` may + /// contain some bytes already read in the event that all data read so far was valid UTF-8. + /// + /// [`read_until`]: #method.read_until + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, io::BufReader, prelude::*}; + /// + /// let mut f = BufReader::new(File::open("a.txt").await?); + /// + /// let mut buf = String::new(); + /// f.read_line(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read_line<'a>( + &'a mut self, + buf: &'a mut String, + ) -> ret!('a, ReadLineFuture, io::Result) + where + Self: Unpin, + { + ReadLineFuture { + reader: self, + bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) }, + buf, + read: 0, + } + } + + /// Returns a stream over the lines of this byte stream. + /// + /// The stream returned from this function will yield instances of + /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte (the + /// 0xA byte) or CRLF (0xD, 0xA bytes) at the end. + /// + /// [`io::Result`]: type.Result.html + /// [`String`]: https://doc.rust-lang.org/std/string/struct.String.html + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, io::BufReader, prelude::*}; + /// + /// let mut f = BufReader::new(File::open("a.txt").await?); + /// + /// let mut lines = f.lines(); + /// let mut count = 0; + /// + /// for line in lines.next().await { + /// line?; + /// count += 1; + /// } + /// # + /// # Ok(()) }) } + /// ``` + fn lines(self) -> Lines + where + Self: Unpin + Sized, + { + Lines { + reader: self, + buf: String::new(), + bytes: Vec::new(), + read: 0, + } + } +} + +impl BufRead for T {} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> { + reader: &'a mut T, + byte: u8, + buf: &'a mut Vec, + read: usize, +} + +impl Future for ReadUntilFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + reader, + byte, + buf, + read, + } = &mut *self; + read_until_internal(Pin::new(reader), cx, *byte, buf, read) + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadLineFuture<'a, T: Unpin + ?Sized> { + reader: &'a mut T, + buf: &'a mut String, + bytes: Vec, + read: usize, +} + +impl Future for ReadLineFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + reader, + buf, + bytes, + read, + } = &mut *self; + let reader = Pin::new(reader); + + let ret = futures::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + })) + } else { + debug_assert!(buf.is_empty()); + debug_assert_eq!(*read, 0); + // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) + } + } +} + +/// A stream of lines in a byte stream. +/// +/// This stream is created by the [`lines`] method on types that implement [`BufRead`]. +/// +/// This type is an async version of [`std::io::Lines`]. +/// +/// [`lines`]: trait.BufRead.html#method.lines +/// [`BufRead`]: trait.BufRead.html +/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html +#[derive(Debug)] +pub struct Lines { + reader: R, + buf: String, + bytes: Vec, + read: usize, +} + +impl Stream for Lines { + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + reader, + buf, + bytes, + read, + } = unsafe { self.get_unchecked_mut() }; + let reader = unsafe { Pin::new_unchecked(reader) }; + let n = futures::ready!(read_line_internal(reader, cx, buf, bytes, read))?; + if n == 0 && buf.is_empty() { + return Poll::Ready(None); + } + if buf.ends_with('\n') { + buf.pop(); + if buf.ends_with('\r') { + buf.pop(); + } + } + Poll::Ready(Some(Ok(mem::replace(buf, String::new())))) + } +} + +pub fn read_line_internal( + reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut String, + bytes: &mut Vec, + read: &mut usize, +) -> Poll> { + let ret = futures::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + })) + } else { + debug_assert!(buf.is_empty()); + debug_assert_eq!(*read, 0); + // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) + } +} + +pub fn read_until_internal( + mut reader: Pin<&mut R>, + cx: &mut Context<'_>, + byte: u8, + buf: &mut Vec, + read: &mut usize, +) -> Poll> { + loop { + let (done, used) = { + let available = futures::ready!(reader.as_mut().poll_fill_buf(cx))?; + if let Some(i) = memchr::memchr(byte, available) { + buf.extend_from_slice(&available[..=i]); + (true, i + 1) + } else { + buf.extend_from_slice(available); + (false, available.len()) + } + }; + reader.as_mut().consume(used); + *read += used; + if done || used == 0 { + return Poll::Ready(Ok(mem::replace(read, 0))); + } + } +} diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs new file mode 100644 index 0000000..85eb2db --- /dev/null +++ b/src/io/buf_reader.rs @@ -0,0 +1,253 @@ +use std::io::{self, IoSliceMut, Read, SeekFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::{cmp, fmt}; + +use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer}; + +// used by `BufReader` and `BufWriter` +// https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1 +const DEFAULT_BUF_SIZE: usize = 8 * 1024; + +/// The `BufReader` struct adds buffering to any reader. +/// +/// It can be excessively inefficient to work directly with a [`AsyncRead`] +/// instance. A `BufReader` performs large, infrequent reads on the underlying +/// [`AsyncRead`] and maintains an in-memory buffer of the results. +/// +/// `BufReader` can improve the speed of programs that make *small* and +/// *repeated* read calls to the same file or network socket. It does not +/// help when reading very large amounts at once, or reading just one or a few +/// times. It also provides no advantage when reading from a source that is +/// already in memory, like a `Vec`. +/// +/// When the `BufReader` is dropped, the contents of its buffer will be +/// discarded. Creating multiple instances of a `BufReader` on the same +/// stream can cause data loss. +/// +/// [`AsyncRead`]: futures_io::AsyncRead +/// +// TODO: Examples +pub struct BufReader { + inner: R, + buf: Box<[u8]>, + pos: usize, + cap: usize, +} + +impl BufReader { + /// Creates a new `BufReader` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + pub fn new(inner: R) -> Self { + Self::with_capacity(DEFAULT_BUF_SIZE, inner) + } + + /// Creates a new `BufReader` with the specified buffer capacity. + pub fn with_capacity(capacity: usize, inner: R) -> Self { + unsafe { + let mut buffer = Vec::with_capacity(capacity); + buffer.set_len(capacity); + inner.initializer().initialize(&mut buffer); + Self { + inner, + buf: buffer.into_boxed_slice(), + pos: 0, + cap: 0, + } + } + } +} + +impl BufReader { + pin_utils::unsafe_pinned!(inner: R); + pin_utils::unsafe_unpinned!(pos: usize); + pin_utils::unsafe_unpinned!(cap: usize); + + /// Gets a reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_ref(&self) -> &R { + &self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_mut(&mut self) -> &mut R { + &mut self.inner + } + + /// Gets a pinned mutable reference to the underlying reader. + /// + /// It is inadvisable to directly read from the underlying reader. + pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut R> { + self.inner() + } + + /// Consumes this `BufWriter`, returning the underlying reader. + /// + /// Note that any leftover data in the internal buffer is lost. + pub fn into_inner(self) -> R { + self.inner + } + + /// Returns a reference to the internally buffered data. + /// + /// Unlike `fill_buf`, this will not attempt to fill the buffer if it is empty. + pub fn buffer(&self) -> &[u8] { + &self.buf[self.pos..self.cap] + } + + /// Invalidates all data in the internal buffer. + #[inline] + fn discard_buffer(mut self: Pin<&mut Self>) { + *self.as_mut().pos() = 0; + *self.cap() = 0; + } +} + +impl AsyncRead for BufReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + // If we don't have any buffered data and we're doing a massive read + // (larger than our internal buffer), bypass our internal buffer + // entirely. + if self.pos == self.cap && buf.len() >= self.buf.len() { + let res = futures::ready!(self.as_mut().inner().poll_read(cx, buf)); + self.discard_buffer(); + return Poll::Ready(res); + } + let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?; + let nread = rem.read(buf)?; + self.consume(nread); + Poll::Ready(Ok(nread)) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + let total_len = bufs.iter().map(|b| b.len()).sum::(); + if self.pos == self.cap && total_len >= self.buf.len() { + let res = futures::ready!(self.as_mut().inner().poll_read_vectored(cx, bufs)); + self.discard_buffer(); + return Poll::Ready(res); + } + let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?; + let nread = rem.read_vectored(bufs)?; + self.consume(nread); + Poll::Ready(Ok(nread)) + } + + // we can't skip unconditionally because of the large buffer case in read. + unsafe fn initializer(&self) -> Initializer { + self.inner.initializer() + } +} + +impl AsyncBufRead for BufReader { + fn poll_fill_buf<'a>( + self: Pin<&'a mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let Self { + inner, + buf, + cap, + pos, + } = unsafe { self.get_unchecked_mut() }; + let mut inner = unsafe { Pin::new_unchecked(inner) }; + + // If we've reached the end of our internal buffer then we need to fetch + // some more data from the underlying reader. + // Branch using `>=` instead of the more correct `==` + // to tell the compiler that the pos..cap slice is always valid. + if *pos >= *cap { + debug_assert!(*pos == *cap); + *cap = futures::ready!(inner.as_mut().poll_read(cx, buf))?; + *pos = 0; + } + Poll::Ready(Ok(&buf[*pos..*cap])) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + *self.as_mut().pos() = cmp::min(self.pos + amt, self.cap); + } +} + +impl fmt::Debug for BufReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufReader") + .field("reader", &self.inner) + .field( + "buffer", + &format_args!("{}/{}", self.cap - self.pos, self.buf.len()), + ) + .finish() + } +} + +impl AsyncSeek for BufReader { + /// Seek to an offset, in bytes, in the underlying reader. + /// + /// The position used for seeking with `SeekFrom::Current(_)` is the + /// position the underlying reader would be at if the `BufReader` had no + /// internal buffer. + /// + /// Seeking always discards the internal buffer, even if the seek position + /// would otherwise fall within it. This guarantees that calling + /// `.into_inner()` immediately after a seek yields the underlying reader + /// at the same position. + /// + /// To seek without discarding the internal buffer, use + /// [`BufReader::poll_seek_relative`](BufReader::poll_seek_relative). + /// + /// See [`AsyncSeek`](futures_io::AsyncSeek) for more details. + /// + /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` + /// where `n` minus the internal buffer length overflows an `i64`, two + /// seeks will be performed instead of one. If the second seek returns + /// `Err`, the underlying reader will be left at the same position it would + /// have if you called `seek` with `SeekFrom::Current(0)`. + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + let result: u64; + if let SeekFrom::Current(n) = pos { + let remainder = (self.cap - self.pos) as i64; + // it should be safe to assume that remainder fits within an i64 as the alternative + // means we managed to allocate 8 exbibytes and that's absurd. + // But it's not out of the realm of possibility for some weird underlying reader to + // support seeking by i64::min_value() so we need to handle underflow when subtracting + // remainder. + if let Some(offset) = n.checked_sub(remainder) { + result = futures::ready!( + self.as_mut() + .inner() + .poll_seek(cx, SeekFrom::Current(offset)) + )?; + } else { + // seek backwards by our remainder, and then by the offset + futures::ready!( + self.as_mut() + .inner() + .poll_seek(cx, SeekFrom::Current(-remainder)) + )?; + self.as_mut().discard_buffer(); + result = + futures::ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?; + } + } else { + // Seeking with Start/End doesn't care about our buffer length. + result = futures::ready!(self.as_mut().inner().poll_seek(cx, pos))?; + } + self.discard_buffer(); + Poll::Ready(Ok(result)) + } +} diff --git a/src/io/mod.rs b/src/io/mod.rs index c05a739..daf70b8 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -22,17 +22,24 @@ //! ``` #[doc(inline)] -pub use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, SeekFrom}; +pub use std::io::{empty, sink, Cursor, Empty, Error, ErrorKind, Result, SeekFrom, Sink}; +pub use buf_read::{BufRead, Lines}; +pub use buf_reader::BufReader; pub use copy::copy; +pub use read::Read; +pub use seek::Seek; pub use stderr::{stderr, Stderr}; pub use stdin::{stdin, Stdin}; pub use stdout::{stdout, Stdout}; +pub use write::Write; +mod buf_read; +mod buf_reader; mod copy; +mod read; +mod seek; mod stderr; mod stdin; mod stdout; - -#[doc(inline)] -pub use std::io::{empty, sink, Cursor, Empty, Error, ErrorKind, Result, Sink}; +mod write; diff --git a/src/io/read.rs b/src/io/read.rs new file mode 100644 index 0000000..7c1c2cb --- /dev/null +++ b/src/io/read.rs @@ -0,0 +1,392 @@ +use std::future::Future; +use std::io::{self, IoSliceMut}; +use std::mem; +use std::pin::Pin; +use std::str; +use std::task::{Context, Poll}; + +use cfg_if::cfg_if; +use futures::io::AsyncRead; + +cfg_if! { + if #[cfg(feature = "docs.rs")] { + #[doc(hidden)] + pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); + + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + } + } else { + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + } + } +} + +/// Allows reading from a byte stream. +/// +/// This trait is an async version of [`std::io::Read`]. +/// +/// While it is currently not possible to implement this trait directly, it get implemented +/// automatically for all types that implement [`futures::io::AsyncRead`]. +/// +/// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html +/// [`futures::io::AsyncRead`]: +/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncRead.html +pub trait Read { + /// Reads some bytes from the byte stream. + /// + /// Returns the number of bytes read from the start of the buffer. + /// + /// If the return value is `Ok(n)`, then it must be guaranteed that `0 <= n <= buf.len()`. A + /// nonzero `n` value indicates that the buffer has been filled in with `n` bytes of data. If + /// `n` is `0`, then it can indicate one of two scenarios: + /// + /// 1. This reader has reached its "end of file" and will likely no longer be able to produce + /// bytes. Note that this does not mean that the reader will always no longer be able to + /// produce bytes. + /// 2. The buffer specified was 0 bytes in length. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, prelude::*}; + /// + /// let mut f = File::open("a.txt").await?; + /// + /// let mut buf = vec![0; 1024]; + /// let n = f.read(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result) + where + Self: Unpin; + + /// Like [`read`], except that it reads into a slice of buffers. + /// + /// Data is copied to fill each buffer in order, with the final buffer written to possibly + /// being only partially filled. This method must behave as a single call to [`read`] with the + /// buffers concatenated would. + /// + /// The default implementation calls [`read`] with either the first nonempty buffer provided, + /// or an empty one if none exists. + /// + /// [`read`]: #tymethod.read + fn read_vectored<'a>( + &'a mut self, + bufs: &'a mut [IoSliceMut<'a>], + ) -> ret!('a, ReadVectoredFuture, io::Result) + where + Self: Unpin, + { + ReadVectoredFuture { reader: self, bufs } + } + + /// Reads all bytes from the byte stream. + /// + /// All bytes read from this stream will be appended to the specified buffer `buf`. This + /// function will continuously call [`read`] to append more data to `buf` until [`read`] + /// returns either `Ok(0)` or an error. + /// + /// If successful, this function will return the total number of bytes read. + /// + /// [`read`]: #tymethod.read + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, prelude::*}; + /// + /// let mut f = File::open("a.txt").await?; + /// + /// let mut buf = Vec::new(); + /// f.read_to_end(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read_to_end<'a>( + &'a mut self, + buf: &'a mut Vec, + ) -> ret!('a, ReadToEndFuture, io::Result) + where + Self: Unpin, + { + let start_len = buf.len(); + ReadToEndFuture { + reader: self, + buf, + start_len, + } + } + + /// Reads all bytes from the byte stream and appends them into a string. + /// + /// If successful, this function will return the number of bytes read. + /// + /// If the data in this stream is not valid UTF-8 then an error will be returned and `buf` will + /// be left unmodified. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, prelude::*}; + /// + /// let mut f = File::open("a.txt").await?; + /// + /// let mut buf = String::new(); + /// f.read_to_string(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read_to_string<'a>( + &'a mut self, + buf: &'a mut String, + ) -> ret!('a, ReadToStringFuture, io::Result) + where + Self: Unpin, + { + let start_len = buf.len(); + ReadToStringFuture { + reader: self, + bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) }, + buf, + start_len, + } + } + + /// Reads the exact number of bytes required to fill `buf`. + /// + /// This function reads as many bytes as necessary to completely fill the specified buffer + /// `buf`. + /// + /// No guarantees are provided about the contents of `buf` when this function is called, + /// implementations cannot rely on any property of the contents of `buf` being true. It is + /// recommended that implementations only write data to `buf` instead of reading its contents. + /// + /// If this function encounters an "end of file" before completely filling the buffer, it + /// returns an error of the kind [`ErrorKind::UnexpectedEof`]. The contents of `buf` are + /// unspecified in this case. + /// + /// If any other read error is encountered then this function immediately returns. The contents + /// of `buf` are unspecified in this case. + /// + /// If this function returns an error, it is unspecified how many bytes it has read, but it + /// will never read more than would be necessary to completely fill the buffer. + /// + /// [`ErrorKind::UnexpectedEof`]: enum.ErrorKind.html#variant.UnexpectedEof + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, prelude::*}; + /// + /// let mut f = File::open("a.txt").await?; + /// + /// let mut buf = vec![0; 10]; + /// f.read_exact(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadExactFuture, io::Result<()>) + where + Self: Unpin, + { + ReadExactFuture { reader: self, buf } + } +} + +impl Read for T { + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result) { + ReadFuture { reader: self, buf } + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadFuture<'a, T: Unpin + ?Sized> { + reader: &'a mut T, + buf: &'a mut [u8], +} + +impl Future for ReadFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, buf } = &mut *self; + Pin::new(reader).poll_read(cx, buf) + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> { + reader: &'a mut T, + bufs: &'a mut [IoSliceMut<'a>], +} + +impl Future for ReadVectoredFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, bufs } = &mut *self; + Pin::new(reader).poll_read_vectored(cx, bufs) + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> { + reader: &'a mut T, + buf: &'a mut Vec, + start_len: usize, +} + +impl Future for ReadToEndFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + reader, + buf, + start_len, + } = &mut *self; + read_to_end_internal(Pin::new(reader), cx, buf, *start_len) + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> { + reader: &'a mut T, + buf: &'a mut String, + bytes: Vec, + start_len: usize, +} + +impl Future for ReadToStringFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + reader, + buf, + bytes, + start_len, + } = &mut *self; + let reader = Pin::new(reader); + + let ret = futures::ready!(read_to_end_internal(reader, cx, bytes, *start_len)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + })) + } else { + debug_assert!(buf.is_empty()); + // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) + } + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadExactFuture<'a, T: Unpin + ?Sized> { + reader: &'a mut T, + buf: &'a mut [u8], +} + +impl Future for ReadExactFuture<'_, T> { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, buf } = &mut *self; + + while !buf.is_empty() { + let n = futures::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?; + let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n); + *buf = rest; + + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())); + } + } + + Poll::Ready(Ok(())) + } +} + +// This uses an adaptive system to extend the vector when it fills. We want to +// avoid paying to allocate and zero a huge chunk of memory if the reader only +// has 4 bytes while still making large reads if the reader does have a ton +// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every +// time is 4,500 times (!) slower than this if the reader has a very small +// amount of data to return. +// +// Because we're extending the buffer with uninitialized data for trusted +// readers, we need to make sure to truncate that if any of this panics. +pub fn read_to_end_internal( + mut rd: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec, + start_len: usize, +) -> Poll> { + struct Guard<'a> { + buf: &'a mut Vec, + len: usize, + } + + impl Drop for Guard<'_> { + fn drop(&mut self) { + unsafe { + self.buf.set_len(self.len); + } + } + } + + let mut g = Guard { + len: buf.len(), + buf, + }; + let ret; + loop { + if g.len == g.buf.len() { + unsafe { + g.buf.reserve(32); + let capacity = g.buf.capacity(); + g.buf.set_len(capacity); + rd.initializer().initialize(&mut g.buf[g.len..]); + } + } + + match futures::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { + Ok(0) => { + ret = Poll::Ready(Ok(g.len - start_len)); + break; + } + Ok(n) => g.len += n, + Err(e) => { + ret = Poll::Ready(Err(e)); + break; + } + } + } + + ret +} diff --git a/src/io/seek.rs b/src/io/seek.rs new file mode 100644 index 0000000..aff0970 --- /dev/null +++ b/src/io/seek.rs @@ -0,0 +1,81 @@ +use std::future::Future; +use std::io::{self, SeekFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use cfg_if::cfg_if; +use futures::io::AsyncSeek; + +cfg_if! { + if #[cfg(feature = "docs.rs")] { + #[doc(hidden)] + pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); + + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + } + } else { + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + } + } +} + +/// Allows seeking through a byte stream. +/// +/// This trait is an async version of [`std::io::Seek`]. +/// +/// While it is currently not possible to implement this trait directly, it get implemented +/// automatically for all types that implement [`futures::io::AsyncSeek`]. +/// +/// [`std::io::Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html +/// [`futures::io::AsyncSeek`]: +/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncSeek.html +pub trait Seek { + /// Seeks to a new position in a byte stream. + /// + /// Returns the new position in the byte stream. + /// + /// A seek beyond the end of stream is allowed, but behavior is defined by the + /// implementation. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, io::SeekFrom, prelude::*}; + /// + /// let mut f = File::open("a.txt").await?; + /// + /// let file_len = f.seek(SeekFrom::End(0)).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result) + where + Self: Unpin; +} + +impl Seek for T { + fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result) { + SeekFuture { seeker: self, pos } + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct SeekFuture<'a, T: Unpin + ?Sized> { + seeker: &'a mut T, + pos: SeekFrom, +} + +impl Future for SeekFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pos = self.pos; + Pin::new(&mut *self.seeker).poll_seek(cx, pos) + } +} diff --git a/src/io/stderr.rs b/src/io/stderr.rs index bfa6dec..3c57f55 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -81,7 +81,7 @@ enum Operation { Flush(io::Result<()>), } -impl AsyncWrite for Stderr { +impl futures::io::AsyncWrite for Stderr { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/io/stdin.rs b/src/io/stdin.rs index 8a517d3..eabdc51 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -140,7 +140,7 @@ impl Stdin { } } -impl AsyncRead for Stdin { +impl futures::io::AsyncRead for Stdin { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/io/stdout.rs b/src/io/stdout.rs index 796804b..b078819 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -81,7 +81,7 @@ enum Operation { Flush(io::Result<()>), } -impl AsyncWrite for Stdout { +impl futures::io::AsyncWrite for Stdout { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/io/write.rs b/src/io/write.rs new file mode 100644 index 0000000..95b64be --- /dev/null +++ b/src/io/write.rs @@ -0,0 +1,213 @@ +use std::future::Future; +use std::io::{self, IoSlice}; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use cfg_if::cfg_if; +use futures::io::AsyncWrite; + +cfg_if! { + if #[cfg(feature = "docs.rs")] { + #[doc(hidden)] + pub struct ImplFuture<'a, t>(std::marker::PhantomData<&'a t>); + + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + } + } else { + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + } + } +} + +/// Allows writing to a byte stream. +/// +/// This trait is an async version of [`std::io::Write`]. +/// +/// While it is currently not possible to implement this trait directly, it get implemented +/// automatically for all types that implement [`futures::io::AsyncWrite`]. +/// +/// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html +/// [`futures::io::AsyncWrite`]: +/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncWrite.html +pub trait Write { + /// Writes some bytes into the byte stream. + /// + /// Returns the number of bytes written from the start of the buffer. + /// + /// If the return value is `Ok(n)` then it must be guaranteed that `0 <= n <= buf.len()`. A + /// return value of `0` typically means that the underlying object is no longer able to accept + /// bytes and will likely not be able to in the future as well, or that the buffer provided is + /// empty. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, prelude::*}; + /// + /// let mut f = File::create("a.txt").await?; + /// + /// let n = f.write(b"hello world").await?; + /// # + /// # Ok(()) }) } + /// ``` + fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result) + where + Self: Unpin; + + /// Flushes the stream to ensure that all buffered contents reach their destination. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, prelude::*}; + /// + /// let mut f = File::create("a.txt").await?; + /// + /// f.write_all(b"hello world").await?; + /// f.flush().await?; + /// # + /// # Ok(()) }) } + /// ``` + fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>) + where + Self: Unpin; + + /// Like [`write`], except that it writes from a slice of buffers. + /// + /// Data is copied from each buffer in order, with the final buffer read from possibly being + /// only partially consumed. This method must behave as a call to [`write`] with the buffers + /// concatenated would. + /// + /// The default implementation calls [`write`] with either the first nonempty buffer provided, + /// or an empty one if none exists. + /// + /// [`write`]: #tymethod.write + fn write_vectored<'a>( + &'a mut self, + bufs: &'a [IoSlice<'a>], + ) -> ret!('a, WriteVectoredFuture, io::Result) + where + Self: Unpin, + { + WriteVectoredFuture { writer: self, bufs } + } + + /// Writes an entire buffer into the byte stream. + /// + /// This method will continuously call [`write`] until there is no more data to be written or + /// an error is returned. This method will not return until the entire buffer has been + /// successfully written or such an error occurs. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::{fs::File, prelude::*}; + /// + /// let mut f = File::create("a.txt").await?; + /// + /// f.write_all(b"hello world").await?; + /// # + /// # Ok(()) }) } + /// ``` + fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteAllFuture, io::Result<()>) + where + Self: Unpin, + { + WriteAllFuture { writer: self, buf } + } +} + +impl Write for T { + fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result) { + WriteFuture { writer: self, buf } + } + + fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>) { + FlushFuture { writer: self } + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct WriteFuture<'a, T: Unpin + ?Sized> { + writer: &'a mut T, + buf: &'a [u8], +} + +impl Future for WriteFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let buf = self.buf; + Pin::new(&mut *self.writer).poll_write(cx, buf) + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct FlushFuture<'a, T: Unpin + ?Sized> { + writer: &'a mut T, +} + +impl Future for FlushFuture<'_, T> { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut *self.writer).poll_flush(cx) + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct WriteVectoredFuture<'a, T: Unpin + ?Sized> { + writer: &'a mut T, + bufs: &'a [IoSlice<'a>], +} + +impl Future for WriteVectoredFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let bufs = self.bufs; + Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs) + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct WriteAllFuture<'a, T: Unpin + ?Sized> { + writer: &'a mut T, + buf: &'a [u8], +} + +impl Future for WriteAllFuture<'_, T> { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { writer, buf } = &mut *self; + + while !buf.is_empty() { + let n = futures::ready!(Pin::new(&mut **writer).poll_write(cx, buf))?; + let (_, rest) = mem::replace(buf, &[]).split_at(n); + *buf = rest; + + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + } + + Poll::Ready(Ok(())) + } +} diff --git a/src/net/tcp.rs b/src/net/tcp.rs index 42580c4..8e7bf3b 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -352,7 +352,7 @@ impl TcpStream { } } -impl AsyncRead for TcpStream { +impl futures::io::AsyncRead for TcpStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -370,7 +370,7 @@ impl AsyncRead for TcpStream { } } -impl AsyncRead for &TcpStream { +impl futures::io::AsyncRead for &TcpStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -388,7 +388,7 @@ impl AsyncRead for &TcpStream { } } -impl AsyncWrite for TcpStream { +impl futures::io::AsyncWrite for TcpStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -414,7 +414,7 @@ impl AsyncWrite for TcpStream { } } -impl AsyncWrite for &TcpStream { +impl futures::io::AsyncWrite for &TcpStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -656,7 +656,7 @@ impl TcpListener { #[derive(Debug)] pub struct Incoming<'a>(&'a TcpListener); -impl<'a> Stream for Incoming<'a> { +impl<'a> futures::Stream for Incoming<'a> { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/os/unix/net.rs b/src/os/unix/net.rs index 2fac4b4..b8beb59 100644 --- a/src/os/unix/net.rs +++ b/src/os/unix/net.rs @@ -755,7 +755,7 @@ impl UnixStream { } } -impl AsyncRead for UnixStream { +impl futures::io::AsyncRead for UnixStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -765,7 +765,7 @@ impl AsyncRead for UnixStream { } } -impl AsyncRead for &UnixStream { +impl futures::io::AsyncRead for &UnixStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -775,7 +775,7 @@ impl AsyncRead for &UnixStream { } } -impl AsyncWrite for UnixStream { +impl futures::io::AsyncWrite for UnixStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -793,7 +793,7 @@ impl AsyncWrite for UnixStream { } } -impl AsyncWrite for &UnixStream { +impl futures::io::AsyncWrite for &UnixStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/prelude.rs b/src/prelude.rs index 0e147e5..1f35312 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -25,21 +25,16 @@ //! [`timeout`]: ../time/trait.Timeout.html#method.timeout #[doc(no_inline)] -pub use futures::future::FutureExt as _; +pub use crate::future::Future; #[doc(no_inline)] -pub use futures::future::TryFutureExt as _; +pub use crate::io::BufRead as _; #[doc(no_inline)] -pub use futures::io::AsyncBufReadExt as _; +pub use crate::io::Read as _; #[doc(no_inline)] -pub use futures::io::AsyncReadExt as _; +pub use crate::io::Seek as _; #[doc(no_inline)] -pub use futures::io::AsyncSeekExt as _; +pub use crate::io::Write as _; #[doc(no_inline)] -pub use futures::io::AsyncWriteExt as _; -#[doc(no_inline)] -pub use futures::stream::StreamExt as _; -#[doc(no_inline)] -pub use futures::stream::TryStreamExt as _; - +pub use crate::stream::Stream; #[doc(no_inline)] pub use crate::time::Timeout as _; diff --git a/src/stream/empty.rs b/src/stream/empty.rs new file mode 100644 index 0000000..30bcef1 --- /dev/null +++ b/src/stream/empty.rs @@ -0,0 +1,43 @@ +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Creates a stream that doesn't yield any items. +/// +/// # Examples +/// +/// ``` +/// # #![feature(async_await)] +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::{prelude::*, stream}; +/// +/// let mut s = stream::empty::(); +/// +/// assert_eq!(s.next().await, None); +/// # +/// # }) } +/// ``` +pub fn empty() -> Empty { + Empty { + _marker: PhantomData, + } +} + +/// A stream that doesn't yield any items. +/// +/// This stream is constructed by the [`empty`] function. +/// +/// [`empty`]: fn.empty.html +#[derive(Debug)] +pub struct Empty { + _marker: PhantomData, +} + +impl futures::Stream for Empty { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(None) + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 0ddef6e..5100ede 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -1,4 +1,4 @@ -//! Composable asynchronous iteration. +//! Asynchronous iteration. //! //! This module is an async version of [`std::iter`]. //! @@ -12,14 +12,21 @@ //! # //! use async_std::{prelude::*, stream}; //! -//! let mut stream = stream::repeat(9).take(3); +//! let mut s = stream::repeat(9).take(3); //! -//! while let Some(num) = stream.next().await { -//! assert_eq!(num, 9); +//! while let Some(v) = s.next().await { +//! assert_eq!(v, 9); //! } //! # //! # }) } //! ``` -#[doc(inline)] -pub use futures::stream::{empty, once, repeat, Empty, Once, Repeat, Stream}; +pub use empty::{empty, Empty}; +pub use once::{once, Once}; +pub use repeat::{repeat, Repeat}; +pub use stream::{Stream, Take}; + +mod empty; +mod once; +mod repeat; +mod stream; diff --git a/src/stream/once.rs b/src/stream/once.rs new file mode 100644 index 0000000..3d85304 --- /dev/null +++ b/src/stream/once.rs @@ -0,0 +1,41 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Creates a stream that yields a single item. +/// +/// # Examples +/// +/// ``` +/// # #![feature(async_await)] +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::{prelude::*, stream}; +/// +/// let mut s = stream::once(7); +/// +/// assert_eq!(s.next().await, Some(7)); +/// assert_eq!(s.next().await, None); +/// # +/// # }) } +/// ``` +pub fn once(t: T) -> Once { + Once { value: Some(t) } +} + +/// A stream that yields a single item. +/// +/// This stream is constructed by the [`once`] function. +/// +/// [`once`]: fn.once.html +#[derive(Debug)] +pub struct Once { + value: Option, +} + +impl futures::Stream for Once { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.value.take()) + } +} diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs new file mode 100644 index 0000000..97a8514 --- /dev/null +++ b/src/stream/repeat.rs @@ -0,0 +1,44 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Creates a stream that yields the same item repeatedly. +/// +/// # Examples +/// +/// ``` +/// # #![feature(async_await)] +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::{prelude::*, stream}; +/// +/// let mut s = stream::repeat(7); +/// +/// assert_eq!(s.next().await, Some(7)); +/// assert_eq!(s.next().await, Some(7)); +/// # +/// # }) } +/// ``` +pub fn repeat(item: T) -> Repeat +where + T: Clone, +{ + Repeat { item } +} + +/// A stream that yields the same item repeatedly. +/// +/// This stream is constructed by the [`repeat`] function. +/// +/// [`repeat`]: fn.repeat.html +#[derive(Debug)] +pub struct Repeat { + item: T, +} + +impl futures::Stream for Repeat { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Some(self.item.clone())) + } +} diff --git a/src/stream/stream.rs b/src/stream/stream.rs new file mode 100644 index 0000000..1da6e05 --- /dev/null +++ b/src/stream/stream.rs @@ -0,0 +1,169 @@ +//! Asynchronous iteration. +//! +//! This module is an async version of [`std::iter`]. +//! +//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html +//! +//! # Examples +//! +//! ``` +//! # #![feature(async_await)] +//! # fn main() { async_std::task::block_on(async { +//! # +//! use async_std::{prelude::*, stream}; +//! +//! let mut s = stream::repeat(9).take(3); +//! +//! while let Some(v) = s.next().await { +//! assert_eq!(v, 9); +//! } +//! # +//! # }) } +//! ``` + +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(feature = "docs.rs")] { + #[doc(hidden)] + pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); + + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + } + } else { + macro_rules! ret { + ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + } + } +} + +/// An asynchronous stream of values. +/// +/// This trait is an async version of [`std::iter::Iterator`]. +/// +/// While it is currently not possible to implement this trait directly, it gets implemented +/// automatically for all types that implement [`futures::stream::Stream`]. +/// +/// [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html +/// [`futures::stream::Stream`]: +/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.Stream.html +pub trait Stream { + /// The type of items yielded by this stream. + type Item; + + /// Advances the stream and returns the next value. + /// + /// Returns [`None`] when iteration is finished. Individual stream implementations may + /// choose to resume iteration, and so calling `next()` again may or may not eventually + /// start returning more values. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::{prelude::*, stream}; + /// + /// let mut s = stream::once(7); + /// + /// assert_eq!(s.next().await, Some(7)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option) + where + Self: Unpin; + + /// Creates a stream that yields its first `n` elements. + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::{prelude::*, stream}; + /// + /// let mut s = stream::repeat(9).take(3); + /// + /// while let Some(v) = s.next().await { + /// assert_eq!(v, 9); + /// } + /// # + /// # }) } + /// ``` + fn take(self, n: usize) -> Take + where + Self: Sized, + { + Take { + stream: self, + remaining: n, + } + } +} + +impl Stream for T { + type Item = ::Item; + + fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option) + where + Self: Unpin, + { + NextFuture { stream: self } + } +} + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct NextFuture<'a, T: Unpin + ?Sized> { + stream: &'a mut T, +} + +impl Future for NextFuture<'_, T> { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut *self.stream).poll_next(cx) + } +} + +/// A stream that yields the first `n` items of another stream. +#[derive(Clone, Debug)] +pub struct Take { + stream: S, + remaining: usize, +} + +impl Unpin for Take {} + +impl Take { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(remaining: usize); +} + +impl futures::Stream for Take { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.remaining == 0 { + Poll::Ready(None) + } else { + let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(_) => *self.as_mut().remaining() -= 1, + None => *self.as_mut().remaining() = 0, + } + Poll::Ready(next) + } + } +} diff --git a/src/task/mod.rs b/src/task/mod.rs index fb59919..4f572f3 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -23,7 +23,7 @@ //! ``` #[doc(inline)] -pub use futures::task::{Context, Poll, Waker}; +pub use std::task::{Context, Poll, Waker}; pub use local::{AccessError, LocalKey}; pub use pool::{block_on, current, spawn, Builder}; diff --git a/src/time/mod.rs b/src/time/mod.rs index 38473ff..75a1ac8 100644 --- a/src/time/mod.rs +++ b/src/time/mod.rs @@ -34,9 +34,25 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::Duration; +use cfg_if::cfg_if; use futures_timer::Delay; use pin_utils::unsafe_pinned; +cfg_if! { + if #[cfg(feature = "docs.rs")] { + #[doc(hidden)] + pub struct ImplFuture(std::marker::PhantomData); + + macro_rules! ret { + ($f:tt, $o:ty) => (ImplFuture<$o>); + } + } else { + macro_rules! ret { + ($f:tt, $o:ty) => ($f); + } + } +} + /// An error returned when a future times out. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub struct TimeoutError; @@ -57,9 +73,6 @@ impl From for io::Error { /// An extension trait that configures timeouts for futures. pub trait Timeout: Future + Sized { - /// TODO - type TimeoutFuture: Future::Output, TimeoutError>>; - /// Awaits a future to completion or times out after a duration of time. /// /// # Examples @@ -81,13 +94,7 @@ pub trait Timeout: Future + Sized { /// # /// # Ok(()) }) } /// ``` - fn timeout(self, dur: Duration) -> Self::TimeoutFuture; -} - -impl Timeout for F { - type TimeoutFuture = TimeoutFuture; - - fn timeout(self, dur: Duration) -> Self::TimeoutFuture { + fn timeout(self, dur: Duration) -> ret!(TimeoutFuture, Result) { TimeoutFuture { future: self, delay: Delay::new(dur), @@ -97,7 +104,7 @@ impl Timeout for F { /// A future that times out after a duration of time. #[doc(hidden)] -#[derive(Debug)] +#[allow(missing_debug_implementations)] pub struct TimeoutFuture { future: F, delay: Delay, @@ -121,3 +128,5 @@ impl Future for TimeoutFuture { } } } + +impl Timeout for F {}