From edfa2358a42872af35024dd83e78ea80e8fe6a4a Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 21 Sep 2019 10:39:56 +0200 Subject: [PATCH 01/11] Re-export IO traits from futures --- .travis.yml | 1 + src/fs/file.rs | 26 +- src/io/buf_read/fill_buf.rs | 32 --- src/io/buf_read/lines.rs | 8 +- src/io/buf_read/mod.rs | 410 ++++++++++++++++++------------- src/io/buf_read/read_line.rs | 6 +- src/io/buf_read/read_until.rs | 6 +- src/io/buf_reader.rs | 15 +- src/io/copy.rs | 12 +- src/io/cursor.rs | 43 ++-- src/io/empty.rs | 13 +- src/io/mod.rs | 12 +- src/io/prelude.rs | 17 +- src/io/read/mod.rs | 426 +++++++++++++++++++++------------ src/io/read/read.rs | 10 +- src/io/read/read_exact.rs | 10 +- src/io/read/read_to_end.rs | 12 +- src/io/read/read_to_string.rs | 12 +- src/io/read/read_vectored.rs | 12 +- src/io/repeat.rs | 11 +- src/io/seek.rs | 146 +++++++---- src/io/sink.rs | 6 +- src/io/stderr.rs | 17 +- src/io/stdin.rs | 20 +- src/io/stdout.rs | 17 +- src/io/write/flush.rs | 10 +- src/io/write/mod.rs | 365 +++++++++++++++++++--------- src/io/write/write.rs | 10 +- src/io/write/write_all.rs | 10 +- src/io/write/write_vectored.rs | 11 +- src/lib.rs | 8 +- src/net/tcp/listener.rs | 3 +- src/net/tcp/stream.rs | 11 +- src/os/unix/net/stream.rs | 11 +- src/prelude.rs | 11 + src/result/from_stream.rs | 11 +- src/stream/from_stream.rs | 6 +- src/stream/into_stream.rs | 4 +- src/stream/mod.rs | 12 +- src/stream/stream/mod.rs | 47 ++-- src/stream/stream/scan.rs | 4 +- src/stream/stream/zip.rs | 4 +- src/vec/from_stream.rs | 11 +- 43 files changed, 1076 insertions(+), 773 deletions(-) delete mode 100644 src/io/buf_read/fill_buf.rs diff --git a/.travis.yml b/.travis.yml index d2862fc..e482b50 100644 --- a/.travis.yml +++ b/.travis.yml @@ -63,5 +63,6 @@ matrix: - mdbook test -L ./target/debug/deps docs script: + - cargo check --all --benches --bins --examples --tests - cargo check --features unstable --all --benches --bins --examples --tests - cargo test --all --doc --features unstable diff --git a/src/fs/file.rs b/src/fs/file.rs index 33022bc..b2a897e 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -9,11 +9,11 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex}; use cfg_if::cfg_if; -use futures_io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer}; use crate::fs::{Metadata, Permissions}; use crate::future; -use crate::io::{self, SeekFrom, Write}; +use crate::io::{self, Seek, SeekFrom, Read, Write}; +use crate::prelude::*; use crate::task::{self, blocking, Context, Poll, Waker}; /// An open file on the filesystem. @@ -302,7 +302,7 @@ impl fmt::Debug for File { } } -impl AsyncRead for File { +impl Read for File { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -310,14 +310,9 @@ impl AsyncRead for File { ) -> Poll> { Pin::new(&mut &*self).poll_read(cx, buf) } - - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } -impl AsyncRead for &File { +impl Read for &File { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -326,14 +321,9 @@ impl AsyncRead for &File { let state = futures_core::ready!(self.lock.poll_lock(cx)); state.poll_read(cx, buf) } - - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } -impl AsyncWrite for File { +impl Write for File { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -351,7 +341,7 @@ impl AsyncWrite for File { } } -impl AsyncWrite for &File { +impl Write for &File { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -372,7 +362,7 @@ impl AsyncWrite for &File { } } -impl AsyncSeek for File { +impl Seek for File { fn poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -382,7 +372,7 @@ impl AsyncSeek for File { } } -impl AsyncSeek for &File { +impl Seek for &File { fn poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/io/buf_read/fill_buf.rs b/src/io/buf_read/fill_buf.rs deleted file mode 100644 index 0ce58cf..0000000 --- a/src/io/buf_read/fill_buf.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::pin::Pin; - -use futures_io::AsyncBufRead; - -use crate::future::Future; -use crate::io; -use crate::task::{Context, Poll}; - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct FillBufFuture<'a, R: ?Sized> { - reader: &'a mut R, -} - -impl<'a, R: ?Sized> FillBufFuture<'a, R> { - pub(crate) fn new(reader: &'a mut R) -> Self { - Self { reader } - } -} - -impl<'a, R: AsyncBufRead + Unpin + ?Sized> Future for FillBufFuture<'a, R> { - type Output = io::Result<&'a [u8]>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let Self { reader } = &mut *self; - let result = Pin::new(reader).poll_fill_buf(cx); - // This is safe because: - // 1. The buffer is valid for the lifetime of the reader. - // 2. Output is unrelated to the wrapper (Self). - result.map_ok(|buf| unsafe { std::mem::transmute::<&'_ [u8], &'a [u8]>(buf) }) - } -} diff --git a/src/io/buf_read/lines.rs b/src/io/buf_read/lines.rs index 0536086..a761eb4 100644 --- a/src/io/buf_read/lines.rs +++ b/src/io/buf_read/lines.rs @@ -2,10 +2,8 @@ use std::mem; use std::pin::Pin; use std::str; -use futures_io::AsyncBufRead; - use super::read_until_internal; -use crate::io; +use crate::io::{self, BufRead}; use crate::task::{Context, Poll}; /// A stream of lines in a byte stream. @@ -25,7 +23,7 @@ pub struct Lines { pub(crate) read: usize, } -impl futures_core::stream::Stream for Lines { +impl futures_core::stream::Stream for Lines { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -50,7 +48,7 @@ impl futures_core::stream::Stream for Lines { } } -pub fn read_line_internal( +pub fn read_line_internal( reader: Pin<&mut R>, cx: &mut Context<'_>, buf: &mut String, diff --git a/src/io/buf_read/mod.rs b/src/io/buf_read/mod.rs index b65d177..fa8e9eb 100644 --- a/src/io/buf_read/mod.rs +++ b/src/io/buf_read/mod.rs @@ -1,9 +1,7 @@ -mod fill_buf; mod lines; mod read_line; mod read_until; -use fill_buf::FillBufFuture; pub use lines::Lines; use read_line::ReadLineFuture; use read_until::ReadUntilFuture; @@ -12,115 +10,259 @@ use std::mem; use std::pin::Pin; use cfg_if::cfg_if; -use futures_io::AsyncBufRead; use crate::io; use crate::task::{Context, Poll}; cfg_if! { if #[cfg(feature = "docs")] { + use std::ops::{Deref, DerefMut}; + #[doc(hidden)] pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + /// Allows reading from a buffered byte stream. + /// + /// This trait is a re-export of [`futures::io::AsyncBufRead`] and is an async version of + /// [`std::io::BufRead`]. + /// + /// The [provided methods] do not really exist in the trait itself, but they become + /// available when the prelude is imported: + /// + /// ``` + /// # #[allow(unused_imports)] + /// use async_std::prelude::*; + /// ``` + /// + /// [`std::io::BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html + /// [`futures::io::AsyncBufRead`]: + /// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html + /// [provided methods]: #provided-methods + pub trait BufRead { + /// Returns the contents of the internal buffer, filling it with more data from the + /// inner reader if it is empty. + /// + /// This function is a lower-level call. It needs to be paired with the [`consume`] + /// method to function properly. When calling this method, none of the contents will be + /// "read" in the sense that later calling `read` may return the same contents. As + /// such, [`consume`] must be called with the number of bytes that are consumed from + /// this buffer to ensure that the bytes are never returned twice. + /// + /// [`consume`]: #tymethod.consume + /// + /// An empty buffer returned indicates that the stream has reached EOF. + // TODO: write a proper doctest with `consume` + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + /// Tells this buffer that `amt` bytes have been consumed from the buffer, so they + /// should no longer be returned in calls to `read`. + fn consume(self: Pin<&mut Self>, amt: usize); + + /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached. + /// + /// This function will read bytes from the underlying stream until the delimiter or EOF + /// is found. Once found, all bytes up to, and including, the delimiter (if found) will + /// be appended to `buf`. + /// + /// If successful, this function will return the total number of bytes read. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::BufReader; + /// use async_std::prelude::*; + /// + /// let mut file = BufReader::new(File::open("a.txt").await?); + /// + /// let mut buf = Vec::with_capacity(1024); + /// let n = file.read_until(b'\n', &mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + /// + /// Multiple successful calls to `read_until` append all bytes up to and including to + /// `buf`: + /// ``` + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::io::BufReader; + /// use async_std::prelude::*; + /// + /// let from: &[u8] = b"append\nexample\n"; + /// let mut reader = BufReader::new(from); + /// let mut buf = vec![]; + /// + /// let mut size = reader.read_until(b'\n', &mut buf).await?; + /// assert_eq!(size, 7); + /// assert_eq!(buf, b"append\n"); + /// + /// size += reader.read_until(b'\n', &mut buf).await?; + /// assert_eq!(size, from.len()); + /// + /// assert_eq!(buf, from); + /// # + /// # Ok(()) }) } + /// ``` + fn read_until<'a>( + &'a mut self, + byte: u8, + buf: &'a mut Vec, + ) -> ImplFuture<'a, io::Result> + where + Self: Unpin, + { + unreachable!() + } + + /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is + /// reached. + /// + /// This function will read bytes from the underlying stream until the newline + /// delimiter (the 0xA byte) or EOF is found. Once found, all bytes up to, and + /// including, the delimiter (if found) will be appended to `buf`. + /// + /// If successful, this function will return the total number of bytes read. + /// + /// If this function returns `Ok(0)`, the stream has reached EOF. + /// + /// # Errors + /// + /// This function has the same error semantics as [`read_until`] and will also return + /// an error if the read bytes are not valid UTF-8. If an I/O error is encountered then + /// `buf` may contain some bytes already read in the event that all data read so far + /// was valid UTF-8. + /// + /// [`read_until`]: #method.read_until + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::BufReader; + /// use async_std::prelude::*; + /// + /// let mut file = BufReader::new(File::open("a.txt").await?); + /// + /// let mut buf = String::new(); + /// file.read_line(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read_line<'a>( + &'a mut self, + buf: &'a mut String, + ) -> ImplFuture<'a, io::Result> + where + Self: Unpin, + { + unreachable!() + } + + /// Returns a stream over the lines of this byte stream. + /// + /// The stream returned from this function will yield instances of + /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte + /// (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end. + /// + /// [`io::Result`]: type.Result.html + /// [`String`]: https://doc.rust-lang.org/std/string/struct.String.html + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::BufReader; + /// use async_std::prelude::*; + /// + /// let file = File::open("a.txt").await?; + /// let mut lines = BufReader::new(file).lines(); + /// let mut count = 0; + /// + /// while let Some(line) = lines.next().await { + /// line?; + /// count += 1; + /// } + /// # + /// # Ok(()) }) } + /// ``` + fn lines(self) -> Lines + where + Self: Unpin + Sized, + { + unreachable!() + } } - } else { - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + + impl BufRead for Box { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + unreachable!() + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + unreachable!() + } } - } -} -/// Allows reading from a buffered byte stream. -/// -/// This trait is an async version of [`std::io::BufRead`]. -/// -/// While it is currently not possible to implement this trait directly, it gets implemented -/// automatically for all types that implement [`futures::io::AsyncBufRead`]. -/// -/// [`std::io::BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html -/// [`futures::io::AsyncBufRead`]: -/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncBufRead.html -pub trait BufRead { - /// Tells this buffer that `amt` bytes have been consumed from the buffer, so they should no - /// longer be returned in calls to `read`. - fn consume(&mut self, amt: usize) - where - Self: Unpin; + impl BufRead for &mut T { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + unreachable!() + } - /// Returns the contents of the internal buffer, filling it with more data from the inner - /// reader if it is empty. - /// - /// This function is a lower-level call. It needs to be paired with the [`consume`] method to - /// function properly. When calling this method, none of the contents will be "read" in the - /// sense that later calling `read` may return the same contents. As such, [`consume`] must be - /// called with the number of bytes that are consumed from this buffer to ensure that the bytes - /// are never returned twice. - /// - /// [`consume`]: #tymethod.consume - /// - /// An empty buffer returned indicates that the stream has reached EOF. - // TODO: write a proper doctest with `consume` - fn fill_buf<'a>(&'a mut self) -> ret!('a, FillBufFuture, io::Result<&'a [u8]>) - where - Self: Unpin, - { - FillBufFuture::new(self) + fn consume(self: Pin<&mut Self>, amt: usize) { + unreachable!() + } + } + + impl

BufRead for Pin

+ where + P: DerefMut + Unpin, +

::Target: BufRead, + { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + unreachable!() + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + unreachable!() + } + } + + impl BufRead for &[u8] { + fn poll_fill_buf( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + unreachable!() + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + unreachable!() + } + } + } else { + pub use futures_io::AsyncBufRead as BufRead; } +} - /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached. - /// - /// This function will read bytes from the underlying stream until the delimiter or EOF is - /// found. Once found, all bytes up to, and including, the delimiter (if found) will be - /// appended to `buf`. - /// - /// If successful, this function will return the total number of bytes read. - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::io::BufReader; - /// use async_std::prelude::*; - /// - /// let mut file = BufReader::new(File::open("a.txt").await?); - /// - /// let mut buf = Vec::with_capacity(1024); - /// let n = file.read_until(b'\n', &mut buf).await?; - /// # - /// # Ok(()) }) } - /// ``` - /// - /// Multiple successful calls to `read_until` append all bytes up to and including to `buf`: - /// ``` - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::io::BufReader; - /// use async_std::prelude::*; - /// - /// let from: &[u8] = b"append\nexample\n"; - /// let mut reader = BufReader::new(from); - /// let mut buf = vec![]; - /// - /// let mut size = reader.read_until(b'\n', &mut buf).await?; - /// assert_eq!(size, 7); - /// assert_eq!(buf, b"append\n"); - /// - /// size += reader.read_until(b'\n', &mut buf).await?; - /// assert_eq!(size, from.len()); - /// - /// assert_eq!(buf, from); - /// # - /// # Ok(()) }) } - /// ``` - fn read_until<'a>( - &'a mut self, - byte: u8, - buf: &'a mut Vec, - ) -> ret!('a, ReadUntilFuture, io::Result) +#[doc(hidden)] +pub trait BufReadExt: futures_io::AsyncBufRead { + fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec) -> ReadUntilFuture<'a, Self> where Self: Unpin, { @@ -132,44 +274,7 @@ pub trait BufRead { } } - /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is reached. - /// - /// This function will read bytes from the underlying stream until the newline delimiter (the - /// 0xA byte) or EOF is found. Once found, all bytes up to, and including, the delimiter (if - /// found) will be appended to `buf`. - /// - /// If successful, this function will return the total number of bytes read. - /// - /// If this function returns `Ok(0)`, the stream has reached EOF. - /// - /// # Errors - /// - /// This function has the same error semantics as [`read_until`] and will also return an error - /// if the read bytes are not valid UTF-8. If an I/O error is encountered then `buf` may - /// contain some bytes already read in the event that all data read so far was valid UTF-8. - /// - /// [`read_until`]: #method.read_until - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::io::BufReader; - /// use async_std::prelude::*; - /// - /// let mut file = BufReader::new(File::open("a.txt").await?); - /// - /// let mut buf = String::new(); - /// file.read_line(&mut buf).await?; - /// # - /// # Ok(()) }) } - /// ``` - fn read_line<'a>( - &'a mut self, - buf: &'a mut String, - ) -> ret!('a, ReadLineFuture, io::Result) + fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'a, Self> where Self: Unpin, { @@ -181,35 +286,6 @@ pub trait BufRead { } } - /// Returns a stream over the lines of this byte stream. - /// - /// The stream returned from this function will yield instances of - /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte (the - /// 0xA byte) or CRLF (0xD, 0xA bytes) at the end. - /// - /// [`io::Result`]: type.Result.html - /// [`String`]: https://doc.rust-lang.org/std/string/struct.String.html - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::io::BufReader; - /// use async_std::prelude::*; - /// - /// let file = File::open("a.txt").await?; - /// let mut lines = BufReader::new(file).lines(); - /// let mut count = 0; - /// - /// while let Some(line) = lines.next().await { - /// line?; - /// count += 1; - /// } - /// # - /// # Ok(()) }) } - /// ``` fn lines(self) -> Lines where Self: Unpin + Sized, @@ -223,13 +299,9 @@ pub trait BufRead { } } -impl BufRead for T { - fn consume(&mut self, amt: usize) { - AsyncBufRead::consume(Pin::new(self), amt) - } -} +impl BufReadExt for T {} -pub fn read_until_internal( +pub fn read_until_internal( mut reader: Pin<&mut R>, cx: &mut Context<'_>, byte: u8, diff --git a/src/io/buf_read/read_line.rs b/src/io/buf_read/read_line.rs index 7424637..29866be 100644 --- a/src/io/buf_read/read_line.rs +++ b/src/io/buf_read/read_line.rs @@ -2,11 +2,9 @@ use std::mem; use std::pin::Pin; use std::str; -use futures_io::AsyncBufRead; - use super::read_until_internal; use crate::future::Future; -use crate::io; +use crate::io::{self, BufRead}; use crate::task::{Context, Poll}; #[doc(hidden)] @@ -18,7 +16,7 @@ pub struct ReadLineFuture<'a, T: Unpin + ?Sized> { pub(crate) read: usize, } -impl Future for ReadLineFuture<'_, T> { +impl Future for ReadLineFuture<'_, T> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/buf_read/read_until.rs b/src/io/buf_read/read_until.rs index c57a820..72385ab 100644 --- a/src/io/buf_read/read_until.rs +++ b/src/io/buf_read/read_until.rs @@ -1,10 +1,8 @@ use std::pin::Pin; -use futures_io::AsyncBufRead; - use super::read_until_internal; use crate::future::Future; -use crate::io; +use crate::io::{self, BufRead}; use crate::task::{Context, Poll}; #[doc(hidden)] @@ -16,7 +14,7 @@ pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> { pub(crate) read: usize, } -impl Future for ReadUntilFuture<'_, T> { +impl Future for ReadUntilFuture<'_, T> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index 6329a1c..13cb4cc 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -2,9 +2,7 @@ use std::io::{IoSliceMut, Read as _}; use std::pin::Pin; use std::{cmp, fmt}; -use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer}; - -use crate::io::{self, SeekFrom}; +use crate::io::{self, BufRead, Read, Seek, SeekFrom}; use crate::task::{Context, Poll}; const DEFAULT_CAPACITY: usize = 8 * 1024; @@ -193,7 +191,7 @@ impl BufReader { } } -impl AsyncRead for BufReader { +impl Read for BufReader { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -229,14 +227,9 @@ impl AsyncRead for BufReader { self.consume(nread); Poll::Ready(Ok(nread)) } - - // we can't skip unconditionally because of the large buffer case in read. - unsafe fn initializer(&self) -> Initializer { - self.inner.initializer() - } } -impl AsyncBufRead for BufReader { +impl BufRead for BufReader { fn poll_fill_buf<'a>( self: Pin<&'a mut Self>, cx: &mut Context<'_>, @@ -278,7 +271,7 @@ impl fmt::Debug for BufReader { } } -impl AsyncSeek for BufReader { +impl Seek for BufReader { /// Seeks to an offset, in bytes, in the underlying reader. /// /// The position used for seeking with `SeekFrom::Current(_)` is the position the underlying diff --git a/src/io/copy.rs b/src/io/copy.rs index d046b4b..3840d2a 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -1,9 +1,7 @@ use std::pin::Pin; -use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; - use crate::future::Future; -use crate::io::{self, BufReader}; +use crate::io::{self, BufRead, BufReader, Read, Write}; use crate::task::{Context, Poll}; /// Copies the entire contents of a reader into a writer. @@ -45,8 +43,8 @@ use crate::task::{Context, Poll}; /// ``` pub async fn copy(reader: &mut R, writer: &mut W) -> io::Result where - R: AsyncRead + Unpin + ?Sized, - W: AsyncWrite + Unpin + ?Sized, + R: Read + Unpin + ?Sized, + W: Write + Unpin + ?Sized, { pub struct CopyFuture<'a, R, W: ?Sized> { reader: R, @@ -69,8 +67,8 @@ where impl Future for CopyFuture<'_, R, W> where - R: AsyncBufRead, - W: AsyncWrite + Unpin + ?Sized, + R: BufRead, + W: Write + Unpin + ?Sized, { type Output = io::Result; diff --git a/src/io/cursor.rs b/src/io/cursor.rs index c890c2f..8975840 100644 --- a/src/io/cursor.rs +++ b/src/io/cursor.rs @@ -1,8 +1,7 @@ -use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite}; - -use std::io::{self, IoSlice, IoSliceMut, SeekFrom}; use std::pin::Pin; -use std::task::{Context, Poll}; + +use crate::io::{self, IoSlice, IoSliceMut, Seek, SeekFrom, BufRead, Read, Write}; +use crate::task::{Context, Poll}; /// A `Cursor` wraps an in-memory buffer and provides it with a /// [`Seek`] implementation. @@ -153,7 +152,7 @@ impl Cursor { } } -impl AsyncSeek for Cursor +impl Seek for Cursor where T: AsRef<[u8]> + Unpin, { @@ -162,11 +161,11 @@ where _: &mut Context<'_>, pos: SeekFrom, ) -> Poll> { - Poll::Ready(io::Seek::seek(&mut self.inner, pos)) + Poll::Ready(std::io::Seek::seek(&mut self.inner, pos)) } } -impl AsyncRead for Cursor +impl Read for Cursor where T: AsRef<[u8]> + Unpin, { @@ -175,7 +174,7 @@ where _cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - Poll::Ready(io::Read::read(&mut self.inner, buf)) + Poll::Ready(std::io::Read::read(&mut self.inner, buf)) } fn poll_read_vectored( @@ -183,30 +182,30 @@ where _: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll> { - Poll::Ready(io::Read::read_vectored(&mut self.inner, bufs)) + Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs)) } } -impl AsyncBufRead for Cursor +impl BufRead for Cursor where T: AsRef<[u8]> + Unpin, { fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(io::BufRead::fill_buf(&mut self.get_mut().inner)) + Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner)) } fn consume(mut self: Pin<&mut Self>, amt: usize) { - io::BufRead::consume(&mut self.inner, amt) + std::io::BufRead::consume(&mut self.inner, amt) } } -impl AsyncWrite for Cursor<&mut [u8]> { +impl Write for Cursor<&mut [u8]> { fn poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Poll::Ready(io::Write::write(&mut self.inner, buf)) + Poll::Ready(std::io::Write::write(&mut self.inner, buf)) } fn poll_write_vectored( @@ -214,11 +213,11 @@ impl AsyncWrite for Cursor<&mut [u8]> { _: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll> { - Poll::Ready(io::Write::write_vectored(&mut self.inner, bufs)) + Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs)) } fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(io::Write::flush(&mut self.inner)) + Poll::Ready(std::io::Write::flush(&mut self.inner)) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -226,13 +225,13 @@ impl AsyncWrite for Cursor<&mut [u8]> { } } -impl AsyncWrite for Cursor<&mut Vec> { +impl Write for Cursor<&mut Vec> { fn poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Poll::Ready(io::Write::write(&mut self.inner, buf)) + Poll::Ready(std::io::Write::write(&mut self.inner, buf)) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -240,17 +239,17 @@ impl AsyncWrite for Cursor<&mut Vec> { } fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(io::Write::flush(&mut self.inner)) + Poll::Ready(std::io::Write::flush(&mut self.inner)) } } -impl AsyncWrite for Cursor> { +impl Write for Cursor> { fn poll_write( mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8], ) -> Poll> { - Poll::Ready(io::Write::write(&mut self.inner, buf)) + Poll::Ready(std::io::Write::write(&mut self.inner, buf)) } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -258,6 +257,6 @@ impl AsyncWrite for Cursor> { } fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(io::Write::flush(&mut self.inner)) + Poll::Ready(std::io::Write::flush(&mut self.inner)) } } diff --git a/src/io/empty.rs b/src/io/empty.rs index 2668dcc..d8d768e 100644 --- a/src/io/empty.rs +++ b/src/io/empty.rs @@ -1,9 +1,7 @@ use std::fmt; use std::pin::Pin; -use futures_io::{AsyncBufRead, AsyncRead, Initializer}; - -use crate::io; +use crate::io::{self, BufRead, Read}; use crate::task::{Context, Poll}; /// Creates a reader that contains no data. @@ -43,7 +41,7 @@ impl fmt::Debug for Empty { } } -impl AsyncRead for Empty { +impl Read for Empty { #[inline] fn poll_read( self: Pin<&mut Self>, @@ -52,14 +50,9 @@ impl AsyncRead for Empty { ) -> Poll> { Poll::Ready(Ok(0)) } - - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } -impl AsyncBufRead for Empty { +impl BufRead for Empty { #[inline] fn poll_fill_buf<'a>( self: Pin<&'a mut Self>, diff --git a/src/io/mod.rs b/src/io/mod.rs index 97b1499..301cfa5 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -20,25 +20,25 @@ //! # Ok(()) }) } //! ``` -pub mod prelude; - #[doc(inline)] pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; -pub use buf_read::{BufRead, Lines}; +pub use buf_read::{BufRead, BufReadExt, Lines}; pub use buf_reader::BufReader; pub use copy::copy; pub use cursor::Cursor; pub use empty::{empty, Empty}; -pub use read::Read; +pub use read::{Read, ReadExt}; pub use repeat::{repeat, Repeat}; -pub use seek::Seek; +pub use seek::{Seek, SeekExt}; pub use sink::{sink, Sink}; pub use stderr::{stderr, Stderr}; pub use stdin::{stdin, Stdin}; pub use stdout::{stdout, Stdout}; pub use timeout::timeout; -pub use write::Write; +pub use write::{Write, WriteExt}; + +pub mod prelude; mod buf_read; mod buf_reader; diff --git a/src/io/prelude.rs b/src/io/prelude.rs index 490be04..eeb0ced 100644 --- a/src/io/prelude.rs +++ b/src/io/prelude.rs @@ -9,10 +9,19 @@ //! ``` #[doc(no_inline)] -pub use super::BufRead; +pub use crate::io::BufRead; #[doc(no_inline)] -pub use super::Read; +pub use crate::io::Read; #[doc(no_inline)] -pub use super::Seek; +pub use crate::io::Seek; #[doc(no_inline)] -pub use super::Write; +pub use crate::io::Write; + +#[doc(hidden)] +pub use crate::io::BufReadExt as _; +#[doc(hidden)] +pub use crate::io::ReadExt as _; +#[doc(hidden)] +pub use crate::io::SeekExt as _; +#[doc(hidden)] +pub use crate::io::WriteExt as _; diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs index bc6671c..1983823 100644 --- a/src/io/read/mod.rs +++ b/src/io/read/mod.rs @@ -10,119 +10,294 @@ use read_to_end::{read_to_end_internal, ReadToEndFuture}; use read_to_string::ReadToStringFuture; use read_vectored::ReadVectoredFuture; -use std::io; use std::mem; use cfg_if::cfg_if; -use futures_io::AsyncRead; + +use crate::io::IoSliceMut; cfg_if! { if #[cfg(feature = "docs")] { + use std::pin::Pin; + use std::ops::{Deref, DerefMut}; + + use crate::io; + use crate::task::{Context, Poll}; + #[doc(hidden)] pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + /// Allows reading from a byte stream. + /// + /// This trait is a re-export of [`futures::io::AsyncRead`] and is an async version of + /// [`std::io::Read`]. + /// + /// Methods other than [`poll_read`] and [`poll_read_vectored`] do not really exist in the + /// trait itself, but they become available when the prelude is imported: + /// + /// ``` + /// # #[allow(unused_imports)] + /// use async_std::prelude::*; + /// ``` + /// + /// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html + /// [`futures::io::AsyncRead`]: + /// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncRead.html + /// [`poll_read`]: #tymethod.poll_read + /// [`poll_read_vectored`]: #method.poll_read_vectored + pub trait Read { + /// Attempt to read from the `AsyncRead` into `buf`. + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll>; + + /// Attempt to read from the `AsyncRead` into `bufs` using vectored IO operations. + fn poll_read_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + unreachable!() + } + + /// Reads some bytes from the byte stream. + /// + /// Returns the number of bytes read from the start of the buffer. + /// + /// If the return value is `Ok(n)`, then it must be guaranteed that + /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been + /// filled in with `n` bytes of data. If `n` is `0`, then it can indicate one of two + /// scenarios: + /// + /// 1. This reader has reached its "end of file" and will likely no longer be able to + /// produce bytes. Note that this does not mean that the reader will always no + /// longer be able to produce bytes. + /// 2. The buffer specified was 0 bytes in length. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::prelude::*; + /// + /// let mut file = File::open("a.txt").await?; + /// + /// let mut buf = vec![0; 1024]; + /// let n = file.read(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ImplFuture<'a, io::Result> + where + Self: Unpin + { + unreachable!() + } + + /// Like [`read`], except that it reads into a slice of buffers. + /// + /// Data is copied to fill each buffer in order, with the final buffer written to + /// possibly being only partially filled. This method must behave as a single call to + /// [`read`] with the buffers concatenated would. + /// + /// The default implementation calls [`read`] with either the first nonempty buffer + /// provided, or an empty one if none exists. + /// + /// [`read`]: #tymethod.read + fn read_vectored<'a>( + &'a mut self, + bufs: &'a mut [IoSliceMut<'a>], + ) -> ImplFuture<'a, io::Result> + where + Self: Unpin, + { + unreachable!() + } + + /// Reads all bytes from the byte stream. + /// + /// All bytes read from this stream will be appended to the specified buffer `buf`. + /// This function will continuously call [`read`] to append more data to `buf` until + /// [`read`] returns either `Ok(0)` or an error. + /// + /// If successful, this function will return the total number of bytes read. + /// + /// [`read`]: #tymethod.read + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::prelude::*; + /// + /// let mut file = File::open("a.txt").await?; + /// + /// let mut buf = Vec::new(); + /// file.read_to_end(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read_to_end<'a>( + &'a mut self, + buf: &'a mut Vec, + ) -> ImplFuture<'a, io::Result> + where + Self: Unpin, + { + unreachable!() + } + + /// Reads all bytes from the byte stream and appends them into a string. + /// + /// If successful, this function will return the number of bytes read. + /// + /// If the data in this stream is not valid UTF-8 then an error will be returned and + /// `buf` will be left unmodified. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::prelude::*; + /// + /// let mut file = File::open("a.txt").await?; + /// + /// let mut buf = String::new(); + /// file.read_to_string(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read_to_string<'a>( + &'a mut self, + buf: &'a mut String, + ) -> ImplFuture<'a, io::Result> + where + Self: Unpin, + { + unreachable!() + } + + /// Reads the exact number of bytes required to fill `buf`. + /// + /// This function reads as many bytes as necessary to completely fill the specified + /// buffer `buf`. + /// + /// No guarantees are provided about the contents of `buf` when this function is + /// called, implementations cannot rely on any property of the contents of `buf` being + /// true. It is recommended that implementations only write data to `buf` instead of + /// reading its contents. + /// + /// If this function encounters an "end of file" before completely filling the buffer, + /// it returns an error of the kind [`ErrorKind::UnexpectedEof`]. The contents of + /// `buf` are unspecified in this case. + /// + /// If any other read error is encountered then this function immediately returns. The + /// contents of `buf` are unspecified in this case. + /// + /// If this function returns an error, it is unspecified how many bytes it has read, + /// but it will never read more than would be necessary to completely fill the buffer. + /// + /// [`ErrorKind::UnexpectedEof`]: enum.ErrorKind.html#variant.UnexpectedEof + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::prelude::*; + /// + /// let mut file = File::open("a.txt").await?; + /// + /// let mut buf = vec![0; 10]; + /// file.read_exact(&mut buf).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ImplFuture<'a, io::Result<()>> + where + Self: Unpin, + { + unreachable!() + } } - } else { - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + + impl Read for Box { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + unreachable!() + } + } + + impl Read for &mut T { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + unreachable!() + } + } + + impl

Read for Pin

+ where + P: DerefMut + Unpin, +

::Target: Read, + { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + unreachable!() + } + } + + impl Read for &[u8] { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + unreachable!() + } } + } else { + pub use futures_io::AsyncRead as Read; } } -/// Allows reading from a byte stream. -/// -/// This trait is an async version of [`std::io::Read`]. -/// -/// While it is currently not possible to implement this trait directly, it gets implemented -/// automatically for all types that implement [`futures::io::AsyncRead`]. -/// -/// [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html -/// [`futures::io::AsyncRead`]: -/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncRead.html -pub trait Read { - /// Reads some bytes from the byte stream. - /// - /// Returns the number of bytes read from the start of the buffer. - /// - /// If the return value is `Ok(n)`, then it must be guaranteed that `0 <= n <= buf.len()`. A - /// nonzero `n` value indicates that the buffer has been filled in with `n` bytes of data. If - /// `n` is `0`, then it can indicate one of two scenarios: - /// - /// 1. This reader has reached its "end of file" and will likely no longer be able to produce - /// bytes. Note that this does not mean that the reader will always no longer be able to - /// produce bytes. - /// 2. The buffer specified was 0 bytes in length. - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::prelude::*; - /// - /// let mut file = File::open("a.txt").await?; - /// - /// let mut buf = vec![0; 1024]; - /// let n = file.read(&mut buf).await?; - /// # - /// # Ok(()) }) } - /// ``` - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result) +#[doc(hidden)] +pub trait ReadExt: futures_io::AsyncRead { + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self> where - Self: Unpin; - - /// Like [`read`], except that it reads into a slice of buffers. - /// - /// Data is copied to fill each buffer in order, with the final buffer written to possibly - /// being only partially filled. This method must behave as a single call to [`read`] with the - /// buffers concatenated would. - /// - /// The default implementation calls [`read`] with either the first nonempty buffer provided, - /// or an empty one if none exists. - /// - /// [`read`]: #tymethod.read + Self: Unpin, + { + ReadFuture { reader: self, buf } + } + fn read_vectored<'a>( &'a mut self, - bufs: &'a mut [io::IoSliceMut<'a>], - ) -> ret!('a, ReadVectoredFuture, io::Result) + bufs: &'a mut [IoSliceMut<'a>], + ) -> ReadVectoredFuture<'a, Self> where Self: Unpin, { ReadVectoredFuture { reader: self, bufs } } - /// Reads all bytes from the byte stream. - /// - /// All bytes read from this stream will be appended to the specified buffer `buf`. This - /// function will continuously call [`read`] to append more data to `buf` until [`read`] - /// returns either `Ok(0)` or an error. - /// - /// If successful, this function will return the total number of bytes read. - /// - /// [`read`]: #tymethod.read - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::prelude::*; - /// - /// let mut file = File::open("a.txt").await?; - /// - /// let mut buf = Vec::new(); - /// file.read_to_end(&mut buf).await?; - /// # - /// # Ok(()) }) } - /// ``` - fn read_to_end<'a>( - &'a mut self, - buf: &'a mut Vec, - ) -> ret!('a, ReadToEndFuture, io::Result) + fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec) -> ReadToEndFuture<'a, Self> where Self: Unpin, { @@ -134,32 +309,7 @@ pub trait Read { } } - /// Reads all bytes from the byte stream and appends them into a string. - /// - /// If successful, this function will return the number of bytes read. - /// - /// If the data in this stream is not valid UTF-8 then an error will be returned and `buf` will - /// be left unmodified. - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::prelude::*; - /// - /// let mut file = File::open("a.txt").await?; - /// - /// let mut buf = String::new(); - /// file.read_to_string(&mut buf).await?; - /// # - /// # Ok(()) }) } - /// ``` - fn read_to_string<'a>( - &'a mut self, - buf: &'a mut String, - ) -> ret!('a, ReadToStringFuture, io::Result) + fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self> where Self: Unpin, { @@ -172,43 +322,7 @@ pub trait Read { } } - /// Reads the exact number of bytes required to fill `buf`. - /// - /// This function reads as many bytes as necessary to completely fill the specified buffer - /// `buf`. - /// - /// No guarantees are provided about the contents of `buf` when this function is called, - /// implementations cannot rely on any property of the contents of `buf` being true. It is - /// recommended that implementations only write data to `buf` instead of reading its contents. - /// - /// If this function encounters an "end of file" before completely filling the buffer, it - /// returns an error of the kind [`ErrorKind::UnexpectedEof`]. The contents of `buf` are - /// unspecified in this case. - /// - /// If any other read error is encountered then this function immediately returns. The contents - /// of `buf` are unspecified in this case. - /// - /// If this function returns an error, it is unspecified how many bytes it has read, but it - /// will never read more than would be necessary to completely fill the buffer. - /// - /// [`ErrorKind::UnexpectedEof`]: enum.ErrorKind.html#variant.UnexpectedEof - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::prelude::*; - /// - /// let mut file = File::open("a.txt").await?; - /// - /// let mut buf = vec![0; 10]; - /// file.read_exact(&mut buf).await?; - /// # - /// # Ok(()) }) } - /// ``` - fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadExactFuture, io::Result<()>) + fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self> where Self: Unpin, { @@ -216,8 +330,4 @@ pub trait Read { } } -impl Read for T { - fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ret!('a, ReadFuture, io::Result) { - ReadFuture { reader: self, buf } - } -} +impl ReadExt for T {} diff --git a/src/io/read/read.rs b/src/io/read/read.rs index 0f88353..c46aff6 100644 --- a/src/io/read/read.rs +++ b/src/io/read/read.rs @@ -1,10 +1,8 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - -use std::io; use std::pin::Pin; -use futures_io::AsyncRead; +use crate::future::Future; +use crate::io::{self, Read}; +use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -13,7 +11,7 @@ pub struct ReadFuture<'a, T: Unpin + ?Sized> { pub(crate) buf: &'a mut [u8], } -impl Future for ReadFuture<'_, T> { +impl Future for ReadFuture<'_, T> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/read/read_exact.rs b/src/io/read/read_exact.rs index ffc3600..d6f0218 100644 --- a/src/io/read/read_exact.rs +++ b/src/io/read/read_exact.rs @@ -1,11 +1,9 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - -use std::io; use std::mem; use std::pin::Pin; -use futures_io::AsyncRead; +use crate::future::Future; +use crate::task::{Context, Poll}; +use crate::io::{self, Read}; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -14,7 +12,7 @@ pub struct ReadExactFuture<'a, T: Unpin + ?Sized> { pub(crate) buf: &'a mut [u8], } -impl Future for ReadExactFuture<'_, T> { +impl Future for ReadExactFuture<'_, T> { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/read/read_to_end.rs b/src/io/read/read_to_end.rs index d51f599..331f26f 100644 --- a/src/io/read/read_to_end.rs +++ b/src/io/read/read_to_end.rs @@ -1,10 +1,8 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - -use std::io; use std::pin::Pin; -use futures_io::AsyncRead; +use crate::future::Future; +use crate::io::{self, Read}; +use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -14,7 +12,7 @@ pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> { pub(crate) start_len: usize, } -impl Future for ReadToEndFuture<'_, T> { +impl Future for ReadToEndFuture<'_, T> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -36,7 +34,7 @@ impl Future for ReadToEndFuture<'_, T> { // // Because we're extending the buffer with uninitialized data for trusted // readers, we need to make sure to truncate that if any of this panics. -pub fn read_to_end_internal( +pub fn read_to_end_internal( mut rd: Pin<&mut R>, cx: &mut Context<'_>, buf: &mut Vec, diff --git a/src/io/read/read_to_string.rs b/src/io/read/read_to_string.rs index 9a3bced..60773e0 100644 --- a/src/io/read/read_to_string.rs +++ b/src/io/read/read_to_string.rs @@ -1,13 +1,11 @@ -use super::read_to_end_internal; -use crate::future::Future; -use crate::task::{Context, Poll}; - -use std::io; use std::mem; use std::pin::Pin; use std::str; -use futures_io::AsyncRead; +use super::read_to_end_internal; +use crate::future::Future; +use crate::io::{self, Read}; +use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -18,7 +16,7 @@ pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> { pub(crate) start_len: usize, } -impl Future for ReadToStringFuture<'_, T> { +impl Future for ReadToStringFuture<'_, T> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/read/read_vectored.rs b/src/io/read/read_vectored.rs index b65b79f..15fecfa 100644 --- a/src/io/read/read_vectored.rs +++ b/src/io/read/read_vectored.rs @@ -1,12 +1,8 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - -use std::io::IoSliceMut; use std::pin::Pin; -use futures_io::AsyncRead; - -use crate::io; +use crate::io::{self, Read, IoSliceMut}; +use crate::future::Future; +use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -15,7 +11,7 @@ pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> { pub(crate) bufs: &'a mut [IoSliceMut<'a>], } -impl Future for ReadVectoredFuture<'_, T> { +impl Future for ReadVectoredFuture<'_, T> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/repeat.rs b/src/io/repeat.rs index 7f8b3a6..a82e21b 100644 --- a/src/io/repeat.rs +++ b/src/io/repeat.rs @@ -1,9 +1,7 @@ use std::fmt; use std::pin::Pin; -use futures_io::{AsyncRead, Initializer}; - -use crate::io; +use crate::io::{self, Read}; use crate::task::{Context, Poll}; /// Creates an instance of a reader that infinitely repeats one byte. @@ -44,7 +42,7 @@ impl fmt::Debug for Repeat { } } -impl AsyncRead for Repeat { +impl Read for Repeat { #[inline] fn poll_read( self: Pin<&mut Self>, @@ -56,9 +54,4 @@ impl AsyncRead for Repeat { } Poll::Ready(Ok(buf.len())) } - - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } diff --git a/src/io/seek.rs b/src/io/seek.rs index b16da75..f2dd4d9 100644 --- a/src/io/seek.rs +++ b/src/io/seek.rs @@ -1,7 +1,6 @@ use std::pin::Pin; use cfg_if::cfg_if; -use futures_io::AsyncSeek; use crate::future::Future; use crate::io::{self, SeekFrom}; @@ -9,63 +8,116 @@ use crate::task::{Context, Poll}; cfg_if! { if #[cfg(feature = "docs")] { + use std::ops::{Deref, DerefMut}; + #[doc(hidden)] - pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); + pub struct ImplFuture(std::marker::PhantomData); + + /// Allows seeking through a byte stream. + /// + /// This trait is a re-export of [`futures::io::AsyncSeek`] and is an async version of + /// [`std::io::Seek`]. + /// + /// The [provided methods] do not really exist in the trait itself, but they become + /// available when the prelude is imported: + /// + /// ``` + /// # #[allow(unused_imports)] + /// use async_std::prelude::*; + /// ``` + /// + /// [`std::io::Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html + /// [`futures::io::AsyncSeek`]: + /// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncSeek.html + /// [provided methods]: #provided-methods + pub trait Seek { + /// Attempt to seek to an offset, in bytes, in a stream. + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll>; - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + /// Seeks to a new position in a byte stream. + /// + /// Returns the new position in the byte stream. + /// + /// A seek beyond the end of stream is allowed, but behavior is defined by the + /// implementation. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::io::SeekFrom; + /// use async_std::prelude::*; + /// + /// let mut file = File::open("a.txt").await?; + /// + /// let file_len = file.seek(SeekFrom::End(0)).await?; + /// # + /// # Ok(()) }) } + /// ``` + fn seek(&mut self, pos: SeekFrom) -> ImplFuture> + where + Self: Unpin + { + unreachable!() + } } - } else { - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + + impl Seek for Box { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + unreachable!() + } } + + impl Seek for &mut T { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + unreachable!() + } + } + + impl

Seek for Pin

+ where + P: DerefMut + Unpin, +

::Target: Seek, + { + fn poll_seek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + unreachable!() + } + } + } else { + pub use futures_io::AsyncSeek as Seek; } } -/// Allows seeking through a byte stream. -/// -/// This trait is an async version of [`std::io::Seek`]. -/// -/// While it is currently not possible to implement this trait directly, it gets implemented -/// automatically for all types that implement [`futures::io::AsyncSeek`]. -/// -/// [`std::io::Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html -/// [`futures::io::AsyncSeek`]: -/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncSeek.html -pub trait Seek { - /// Seeks to a new position in a byte stream. - /// - /// Returns the new position in the byte stream. - /// - /// A seek beyond the end of stream is allowed, but behavior is defined by the - /// implementation. - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::io::SeekFrom; - /// use async_std::prelude::*; - /// - /// let mut file = File::open("a.txt").await?; - /// - /// let file_len = file.seek(SeekFrom::End(0)).await?; - /// # - /// # Ok(()) }) } - /// ``` - fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result) +#[doc(hidden)] +pub trait SeekExt: futures_io::AsyncSeek { + fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self> where - Self: Unpin; -} - -impl Seek for T { - fn seek(&mut self, pos: SeekFrom) -> ret!('_, SeekFuture, io::Result) { + Self: Unpin, + { SeekFuture { seeker: self, pos } } } +impl SeekExt for T {} + #[doc(hidden)] #[allow(missing_debug_implementations)] pub struct SeekFuture<'a, T: Unpin + ?Sized> { @@ -73,7 +125,7 @@ pub struct SeekFuture<'a, T: Unpin + ?Sized> { pos: SeekFrom, } -impl Future for SeekFuture<'_, T> { +impl Future for SeekFuture<'_, T> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/sink.rs b/src/io/sink.rs index 071f6ed..faa763c 100644 --- a/src/io/sink.rs +++ b/src/io/sink.rs @@ -1,9 +1,7 @@ use std::fmt; use std::pin::Pin; -use futures_io::AsyncWrite; - -use crate::io; +use crate::io::{self, Write}; use crate::task::{Context, Poll}; /// Creates a writer that consumes and drops all data. @@ -40,7 +38,7 @@ impl fmt::Debug for Sink { } } -impl AsyncWrite for Sink { +impl Write for Sink { #[inline] fn poll_write( self: Pin<&mut Self>, diff --git a/src/io/stderr.rs b/src/io/stderr.rs index bb2318f..64e0b18 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -1,11 +1,10 @@ -use std::io; use std::pin::Pin; use std::sync::Mutex; use cfg_if::cfg_if; -use futures_io::AsyncWrite; use crate::future::Future; +use crate::io::{self, Write}; use crate::task::{blocking, Context, Poll}; /// Constructs a new handle to the standard error of the current process. @@ -29,7 +28,7 @@ use crate::task::{blocking, Context, Poll}; /// ``` pub fn stderr() -> Stderr { Stderr(Mutex::new(State::Idle(Some(Inner { - stderr: io::stderr(), + stderr: std::io::stderr(), buf: Vec::new(), last_op: None, })))) @@ -64,7 +63,7 @@ enum State { #[derive(Debug)] struct Inner { /// The blocking stderr handle. - stderr: io::Stderr, + stderr: std::io::Stderr, /// The write buffer. buf: Vec, @@ -80,7 +79,7 @@ enum Operation { Flush(io::Result<()>), } -impl AsyncWrite for Stderr { +impl Write for Stderr { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -118,7 +117,7 @@ impl AsyncWrite for Stderr { // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { - let res = io::Write::write(&mut inner.stderr, &mut inner.buf); + let res = std::io::Write::write(&mut inner.stderr, &mut inner.buf); inner.last_op = Some(Operation::Write(res)); State::Idle(Some(inner)) })); @@ -146,7 +145,7 @@ impl AsyncWrite for Stderr { // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { - let res = io::Write::flush(&mut inner.stderr); + let res = std::io::Write::flush(&mut inner.stderr); inner.last_op = Some(Operation::Flush(res)); State::Idle(Some(inner)) })); @@ -179,7 +178,7 @@ cfg_if! { if #[cfg(any(unix, feature = "docs"))] { impl AsRawFd for Stderr { fn as_raw_fd(&self) -> RawFd { - io::stderr().as_raw_fd() + std::io::stderr().as_raw_fd() } } } @@ -190,7 +189,7 @@ cfg_if! { if #[cfg(any(windows, feature = "docs"))] { impl AsRawHandle for Stderr { fn as_raw_handle(&self) -> RawHandle { - io::stderr().as_raw_handle() + std::io::stderr().as_raw_handle() } } } diff --git a/src/io/stdin.rs b/src/io/stdin.rs index 9fe432d..cbf7fc4 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -1,10 +1,9 @@ -use std::io; use std::pin::Pin; use std::sync::Mutex; use cfg_if::cfg_if; -use futures_io::{AsyncRead, Initializer}; +use crate::io::{self, Read}; use crate::future::{self, Future}; use crate::task::{blocking, Context, Poll}; @@ -29,7 +28,7 @@ use crate::task::{blocking, Context, Poll}; /// ``` pub fn stdin() -> Stdin { Stdin(Mutex::new(State::Idle(Some(Inner { - stdin: io::stdin(), + stdin: std::io::stdin(), line: String::new(), buf: Vec::new(), last_op: None, @@ -65,7 +64,7 @@ enum State { #[derive(Debug)] struct Inner { /// The blocking stdin handle. - stdin: io::Stdin, + stdin: std::io::Stdin, /// The line buffer. line: String, @@ -137,7 +136,7 @@ impl Stdin { } } -impl AsyncRead for Stdin { +impl Read for Stdin { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -174,7 +173,7 @@ impl AsyncRead for Stdin { // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { - let res = io::Read::read(&mut inner.stdin, &mut inner.buf); + let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf); inner.last_op = Some(Operation::Read(res)); State::Idle(Some(inner)) })); @@ -185,11 +184,6 @@ impl AsyncRead for Stdin { } } } - - #[inline] - unsafe fn initializer(&self) -> Initializer { - Initializer::nop() - } } cfg_if! { @@ -208,7 +202,7 @@ cfg_if! { if #[cfg(any(unix, feature = "docs"))] { impl AsRawFd for Stdin { fn as_raw_fd(&self) -> RawFd { - io::stdin().as_raw_fd() + std::io::stdin().as_raw_fd() } } } @@ -219,7 +213,7 @@ cfg_if! { if #[cfg(any(windows, feature = "docs"))] { impl AsRawHandle for Stdin { fn as_raw_handle(&self) -> RawHandle { - io::stdin().as_raw_handle() + std::io::stdin().as_raw_handle() } } } diff --git a/src/io/stdout.rs b/src/io/stdout.rs index f62f3df..b5fcccf 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -1,12 +1,11 @@ -use std::io; use std::pin::Pin; use std::sync::Mutex; use cfg_if::cfg_if; -use futures_io::AsyncWrite; use crate::future::Future; use crate::task::{blocking, Context, Poll}; +use crate::io::{self, Write}; /// Constructs a new handle to the standard output of the current process. /// @@ -29,7 +28,7 @@ use crate::task::{blocking, Context, Poll}; /// ``` pub fn stdout() -> Stdout { Stdout(Mutex::new(State::Idle(Some(Inner { - stdout: io::stdout(), + stdout: std::io::stdout(), buf: Vec::new(), last_op: None, })))) @@ -64,7 +63,7 @@ enum State { #[derive(Debug)] struct Inner { /// The blocking stdout handle. - stdout: io::Stdout, + stdout: std::io::Stdout, /// The write buffer. buf: Vec, @@ -80,7 +79,7 @@ enum Operation { Flush(io::Result<()>), } -impl AsyncWrite for Stdout { +impl Write for Stdout { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -118,7 +117,7 @@ impl AsyncWrite for Stdout { // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { - let res = io::Write::write(&mut inner.stdout, &mut inner.buf); + let res = std::io::Write::write(&mut inner.stdout, &mut inner.buf); inner.last_op = Some(Operation::Write(res)); State::Idle(Some(inner)) })); @@ -146,7 +145,7 @@ impl AsyncWrite for Stdout { // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { - let res = io::Write::flush(&mut inner.stdout); + let res = std::io::Write::flush(&mut inner.stdout); inner.last_op = Some(Operation::Flush(res)); State::Idle(Some(inner)) })); @@ -179,7 +178,7 @@ cfg_if! { if #[cfg(any(unix, feature = "docs"))] { impl AsRawFd for Stdout { fn as_raw_fd(&self) -> RawFd { - io::stdout().as_raw_fd() + std::io::stdout().as_raw_fd() } } } @@ -190,7 +189,7 @@ cfg_if! { if #[cfg(any(windows, feature = "docs"))] { impl AsRawHandle for Stdout { fn as_raw_handle(&self) -> RawHandle { - io::stdout().as_raw_handle() + std::io::stdout().as_raw_handle() } } } diff --git a/src/io/write/flush.rs b/src/io/write/flush.rs index 21ada15..08f2b5b 100644 --- a/src/io/write/flush.rs +++ b/src/io/write/flush.rs @@ -1,10 +1,8 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - -use std::io; use std::pin::Pin; -use futures_io::AsyncWrite; +use crate::future::Future; +use crate::io::{self, Write}; +use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -12,7 +10,7 @@ pub struct FlushFuture<'a, T: Unpin + ?Sized> { pub(crate) writer: &'a mut T, } -impl Future for FlushFuture<'_, T> { +impl Future for FlushFuture<'_, T> { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/write/mod.rs b/src/io/write/mod.rs index 63a3cd8..f5f1602 100644 --- a/src/io/write/mod.rs +++ b/src/io/write/mod.rs @@ -8,130 +8,277 @@ use write::WriteFuture; use write_all::WriteAllFuture; use write_vectored::WriteVectoredFuture; -use std::io; - use cfg_if::cfg_if; -use futures_io::AsyncWrite; + +use crate::io::IoSlice; cfg_if! { if #[cfg(feature = "docs")] { + use std::pin::Pin; + use std::ops::{Deref, DerefMut}; + + use crate::io; + use crate::task::{Context, Poll}; + #[doc(hidden)] pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); + /// Allows writing to a byte stream. + /// + /// This trait is a re-export of [`futures::io::AsyncWrite`] and is an async version of + /// [`std::io::Write`]. + /// + /// Methods other than [`poll_write`], [`poll_write_vectored`], [`poll_flush`], and + /// [`poll_close`] do not really exist in the trait itself, but they become available when + /// the prelude is imported: + /// + /// ``` + /// # #[allow(unused_imports)] + /// use async_std::prelude::*; + /// ``` + /// + /// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html + /// [`futures::io::AsyncWrite`]: + /// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncWrite.html + /// [`poll_write`]: #tymethod.poll_write + /// [`poll_write_vectored`]: #method.poll_write_vectored + /// [`poll_flush`]: #tymethod.poll_flush + /// [`poll_close`]: #tymethod.poll_close + pub trait Write { + /// Attempt to write bytes from `buf` into the object. + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll>; + + /// Attempt to write bytes from `bufs` into the object using vectored + /// IO operations. + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[IoSlice<'_>] + ) -> Poll> { + unreachable!() + } + + /// Attempt to flush the object, ensuring that any buffered data reach + /// their destination. + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + /// Attempt to close the object. + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + /// Writes some bytes into the byte stream. + /// + /// Returns the number of bytes written from the start of the buffer. + /// + /// If the return value is `Ok(n)` then it must be guaranteed that + /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying + /// object is no longer able to accept bytes and will likely not be able to in the + /// future as well, or that the buffer provided is empty. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::prelude::*; + /// + /// let mut file = File::create("a.txt").await?; + /// + /// let n = file.write(b"hello world").await?; + /// # + /// # Ok(()) }) } + /// ``` + fn write<'a>(&'a mut self, buf: &'a [u8]) -> ImplFuture<'a, io::Result> + where + Self: Unpin, + { + unreachable!() + } + + /// Flushes the stream to ensure that all buffered contents reach their destination. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::prelude::*; + /// + /// let mut file = File::create("a.txt").await?; + /// + /// file.write_all(b"hello world").await?; + /// file.flush().await?; + /// # + /// # Ok(()) }) } + /// ``` + fn flush(&mut self) -> ImplFuture<'_, io::Result<()>> + where + Self: Unpin, + { + unreachable!() + } + + /// Like [`write`], except that it writes from a slice of buffers. + /// + /// Data is copied from each buffer in order, with the final buffer read from possibly + /// being only partially consumed. This method must behave as a call to [`write`] with + /// the buffers concatenated would. + /// + /// The default implementation calls [`write`] with either the first nonempty buffer + /// provided, or an empty one if none exists. + /// + /// [`write`]: #tymethod.write + fn write_vectored<'a>( + &'a mut self, + bufs: &'a [IoSlice<'a>], + ) -> ImplFuture<'a, io::Result> + where + Self: Unpin, + { + unreachable!() + } + + /// Writes an entire buffer into the byte stream. + /// + /// This method will continuously call [`write`] until there is no more data to be + /// written or an error is returned. This method will not return until the entire + /// buffer has been successfully written or such an error occurs. + /// + /// [`write`]: #tymethod.write + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::fs::File; + /// use async_std::prelude::*; + /// + /// let mut file = File::create("a.txt").await?; + /// + /// file.write_all(b"hello world").await?; + /// # + /// # Ok(()) }) } + /// ``` + /// + /// [`write`]: #tymethod.write + fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ImplFuture<'a, io::Result<()>> + where + Self: Unpin, + { + unreachable!() + } } - } else { - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); + + impl Write for Box { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + unreachable!() + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + } + + impl Write for &mut T { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + unreachable!() + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + } + + impl

Write for Pin

+ where + P: DerefMut + Unpin, +

::Target: Write, + { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + unreachable!() + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + } + + impl Write for Vec { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + unreachable!() + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } } + } else { + pub use futures_io::AsyncWrite as Write; } } -/// Allows writing to a byte stream. -/// -/// This trait is an async version of [`std::io::Write`]. -/// -/// While it is currently not possible to implement this trait directly, it gets implemented -/// automatically for all types that implement [`futures::io::AsyncWrite`]. -/// -/// [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html -/// [`futures::io::AsyncWrite`]: -/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/io/trait.AsyncWrite.html -pub trait Write { - /// Writes some bytes into the byte stream. - /// - /// Returns the number of bytes written from the start of the buffer. - /// - /// If the return value is `Ok(n)` then it must be guaranteed that `0 <= n <= buf.len()`. A - /// return value of `0` typically means that the underlying object is no longer able to accept - /// bytes and will likely not be able to in the future as well, or that the buffer provided is - /// empty. - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::prelude::*; - /// - /// let mut file = File::create("a.txt").await?; - /// - /// let n = file.write(b"hello world").await?; - /// # - /// # Ok(()) }) } - /// ``` - fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result) +#[doc(hidden)] +pub trait WriteExt: Write { + fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self> where - Self: Unpin; - - /// Flushes the stream to ensure that all buffered contents reach their destination. - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::prelude::*; - /// - /// let mut file = File::create("a.txt").await?; - /// - /// file.write_all(b"hello world").await?; - /// file.flush().await?; - /// # - /// # Ok(()) }) } - /// ``` - fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>) + Self: Unpin, + { + WriteFuture { writer: self, buf } + } + + fn flush(&mut self) -> FlushFuture<'_, Self> where - Self: Unpin; - - /// Like [`write`], except that it writes from a slice of buffers. - /// - /// Data is copied from each buffer in order, with the final buffer read from possibly being - /// only partially consumed. This method must behave as a call to [`write`] with the buffers - /// concatenated would. - /// - /// The default implementation calls [`write`] with either the first nonempty buffer provided, - /// or an empty one if none exists. - /// - /// [`write`]: #tymethod.write - fn write_vectored<'a>( - &'a mut self, - bufs: &'a [io::IoSlice<'a>], - ) -> ret!('a, WriteVectoredFuture, io::Result) + Self: Unpin, + { + FlushFuture { writer: self } + } + + fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self> where Self: Unpin, { WriteVectoredFuture { writer: self, bufs } } - /// Writes an entire buffer into the byte stream. - /// - /// This method will continuously call [`write`] until there is no more data to be written or - /// an error is returned. This method will not return until the entire buffer has been - /// successfully written or such an error occurs. - /// - /// [`write`]: #tymethod.write - /// - /// # Examples - /// - /// ```no_run - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// # - /// use async_std::fs::File; - /// use async_std::prelude::*; - /// - /// let mut file = File::create("a.txt").await?; - /// - /// file.write_all(b"hello world").await?; - /// # - /// # Ok(()) }) } - /// ``` - /// - /// [`write`]: #tymethod.write - fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteAllFuture, io::Result<()>) + fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self> where Self: Unpin, { @@ -139,12 +286,4 @@ pub trait Write { } } -impl Write for T { - fn write<'a>(&'a mut self, buf: &'a [u8]) -> ret!('a, WriteFuture, io::Result) { - WriteFuture { writer: self, buf } - } - - fn flush(&mut self) -> ret!('_, FlushFuture, io::Result<()>) { - FlushFuture { writer: self } - } -} +impl WriteExt for T {} diff --git a/src/io/write/write.rs b/src/io/write/write.rs index 361e6af..da6e5c5 100644 --- a/src/io/write/write.rs +++ b/src/io/write/write.rs @@ -1,10 +1,8 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - -use std::io; use std::pin::Pin; -use futures_io::AsyncWrite; +use crate::future::Future; +use crate::io::{self, Write}; +use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -13,7 +11,7 @@ pub struct WriteFuture<'a, T: Unpin + ?Sized> { pub(crate) buf: &'a [u8], } -impl Future for WriteFuture<'_, T> { +impl Future for WriteFuture<'_, T> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/write/write_all.rs b/src/io/write/write_all.rs index d43854a..5353f7a 100644 --- a/src/io/write/write_all.rs +++ b/src/io/write/write_all.rs @@ -1,11 +1,9 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - -use std::io; use std::mem; use std::pin::Pin; -use futures_io::AsyncWrite; +use crate::future::Future; +use crate::io::{self, Write}; +use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -14,7 +12,7 @@ pub struct WriteAllFuture<'a, T: Unpin + ?Sized> { pub(crate) buf: &'a [u8], } -impl Future for WriteAllFuture<'_, T> { +impl Future for WriteAllFuture<'_, T> { type Output = io::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/io/write/write_vectored.rs b/src/io/write/write_vectored.rs index 0f3e49a..d4e2da7 100644 --- a/src/io/write/write_vectored.rs +++ b/src/io/write/write_vectored.rs @@ -1,11 +1,8 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - -use std::io; -use std::io::IoSlice; use std::pin::Pin; -use futures_io::AsyncWrite; +use crate::future::Future; +use crate::task::{Context, Poll}; +use crate::io::{self, Write, IoSlice}; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -14,7 +11,7 @@ pub struct WriteVectoredFuture<'a, T: Unpin + ?Sized> { pub(crate) bufs: &'a [IoSlice<'a>], } -impl Future for WriteVectoredFuture<'_, T> { +impl Future for WriteVectoredFuture<'_, T> { type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/lib.rs b/src/lib.rs index d4ed7c2..31635ef 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,12 @@ //! //! # Features //! -//! Unstable APIs in this crate are available when the `unstable` Cargo feature is enabled: +//! Items marked with +//! unstable +//! are available only when the `unstable` Cargo feature is enabled: //! //! ```toml //! [dependencies.async-std] @@ -58,6 +63,7 @@ cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub mod pin; + mod vec; mod result; } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 4afa574..26e19d7 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -8,6 +8,7 @@ use crate::future::{self, Future}; use crate::io; use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; +use crate::stream::Stream; use crate::task::{Context, Poll}; /// A TCP socket server, listening for connections. @@ -190,7 +191,7 @@ impl TcpListener { #[derive(Debug)] pub struct Incoming<'a>(&'a TcpListener); -impl<'a> futures_core::stream::Stream for Incoming<'a> { +impl<'a> Stream for Incoming<'a> { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 7c10602..1056943 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -3,10 +3,9 @@ use std::net::SocketAddr; use std::pin::Pin; use cfg_if::cfg_if; -use futures_io::{AsyncRead, AsyncWrite}; use crate::future; -use crate::io; +use crate::io::{self, Read, Write}; use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; use crate::task::blocking; @@ -285,7 +284,7 @@ impl TcpStream { } } -impl AsyncRead for TcpStream { +impl Read for TcpStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -303,7 +302,7 @@ impl AsyncRead for TcpStream { } } -impl AsyncRead for &TcpStream { +impl Read for &TcpStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -313,7 +312,7 @@ impl AsyncRead for &TcpStream { } } -impl AsyncWrite for TcpStream { +impl Write for TcpStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -339,7 +338,7 @@ impl AsyncWrite for TcpStream { } } -impl AsyncWrite for &TcpStream { +impl Write for &TcpStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 28f43eb..ae30b5b 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -6,11 +6,10 @@ use std::net::Shutdown; use std::path::Path; use std::pin::Pin; -use futures_io::{AsyncRead, AsyncWrite}; use mio_uds; use super::SocketAddr; -use crate::io; +use crate::io::{self, Read, Write}; use crate::net::driver::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::task::{blocking, Context, Poll}; @@ -154,7 +153,7 @@ impl UnixStream { } } -impl AsyncRead for UnixStream { +impl Read for UnixStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -164,7 +163,7 @@ impl AsyncRead for UnixStream { } } -impl AsyncRead for &UnixStream { +impl Read for &UnixStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -174,7 +173,7 @@ impl AsyncRead for &UnixStream { } } -impl AsyncWrite for UnixStream { +impl Write for UnixStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -192,7 +191,7 @@ impl AsyncWrite for UnixStream { } } -impl AsyncWrite for &UnixStream { +impl Write for &UnixStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/prelude.rs b/src/prelude.rs index a50e123..7ad894f 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -25,3 +25,14 @@ pub use crate::io::Write as _; pub use crate::stream::Stream; #[doc(no_inline)] pub use crate::task_local; + +#[doc(hidden)] +pub use crate::io::BufReadExt as _; +#[doc(hidden)] +pub use crate::io::ReadExt as _; +#[doc(hidden)] +pub use crate::io::SeekExt as _; +#[doc(hidden)] +pub use crate::io::WriteExt as _; +#[doc(hidden)] +pub use crate::stream::stream::Stream as _; diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 71cf61d..74cc567 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -1,8 +1,9 @@ -use crate::stream::{FromStream, IntoStream, Stream}; - use std::pin::Pin; -impl FromStream> for Result +use crate::prelude::*; +use crate::stream::{FromStream, IntoStream}; + +impl FromStream> for Result where V: FromStream, { @@ -12,9 +13,9 @@ where #[inline] fn from_stream<'a, S: IntoStream>>( stream: S, - ) -> Pin + Send + 'a>> + ) -> Pin + 'a>> where - ::IntoStream: Send + 'a, + ::IntoStream: 'a, { let stream = stream.into_stream(); diff --git a/src/stream/from_stream.rs b/src/stream/from_stream.rs index d046e3b..984e5c8 100644 --- a/src/stream/from_stream.rs +++ b/src/stream/from_stream.rs @@ -12,7 +12,7 @@ use std::pin::Pin; /// [`IntoStream`]: trait.IntoStream.html #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg(any(feature = "unstable", feature = "docs"))] -pub trait FromStream { +pub trait FromStream { /// Creates a value from a stream. /// /// # Examples @@ -24,7 +24,7 @@ pub trait FromStream { /// /// // let _five_fives = async_std::stream::repeat(5).take(5); /// ``` - fn from_stream<'a, S: IntoStream + Send + 'a>( + fn from_stream<'a, S: IntoStream + 'a>( stream: S, - ) -> Pin + Send + 'a>>; + ) -> Pin + 'a>>; } diff --git a/src/stream/into_stream.rs b/src/stream/into_stream.rs index 5e4311e..7d1be4a 100644 --- a/src/stream/into_stream.rs +++ b/src/stream/into_stream.rs @@ -20,13 +20,13 @@ pub trait IntoStream { type Item; /// Which kind of stream are we turning this into? - type IntoStream: Stream + Send; + type IntoStream: Stream; /// Creates a stream from a value. fn into_stream(self) -> Self::IntoStream; } -impl IntoStream for I { +impl IntoStream for I { type Item = I::Item; type IntoStream = I; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 5e17aa5..da79b19 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -26,12 +26,20 @@ use cfg_if::cfg_if; pub use empty::{empty, Empty}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Fuse, Scan, Stream, Take, Zip}; +pub use stream::{Fuse, Scan, Take, Zip}; mod empty; mod once; mod repeat; -mod stream; +pub(crate) mod stream; + +cfg_if! { + if #[cfg(feature = "docs")] { + pub use stream::Stream; + } else { + pub use futures_core::stream::Stream; + } +} cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8ddab21..7527deb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -60,8 +60,8 @@ use std::task::{Context, Poll}; use cfg_if::cfg_if; cfg_if! { - if #[cfg(any(feature = "unstable", feature = "docs"))] { - use crate::stream::FromStream; + if #[cfg(feature = "unstable")] { + use crate::future::Future; } } @@ -75,7 +75,7 @@ cfg_if! { ($a:lifetime, $f:tt, $o:ty, $t1:ty) => (ImplFuture<$a, $o>); ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => (ImplFuture<$a, $o>); ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => (ImplFuture<$a, $o>); - + ($f:ty, $o:ty) => (ImplFuture<'static, $o>); } } else { macro_rules! ret { @@ -83,36 +83,34 @@ cfg_if! { ($a:lifetime, $f:tt, $o:ty, $t1:ty) => ($f<$a, Self, $t1>); ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => ($f<$a, Self, $t1, $t2>); ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => ($f<$a, Self, $t1, $t2, $t3>); + ($f:ty, $o:ty) => ($f); } } } cfg_if! { - if #[cfg(feature = "docs")] { - #[doc(hidden)] - pub struct DynFuture<'a, T>(std::marker::PhantomData<&'a T>); - - macro_rules! dyn_ret { - ($a:lifetime, $o:ty) => (DynFuture<$a, $o>); - } - } else { - #[allow(unused_macros)] - macro_rules! dyn_ret { - ($a:lifetime, $o:ty) => (Pin + Send + 'a>>) - } + if #[cfg(any(feature = "unstable", feature = "docs"))] { + use crate::stream::FromStream; } } /// An asynchronous stream of values. /// -/// This trait is an async version of [`std::iter::Iterator`]. +/// This trait is a re-export of [`futures::stream::Stream`] and is an async version of +/// [`std::iter::Iterator`]. +/// +/// The [provided methods] do not really exist in the trait itself, but they become available when +/// the prelude is imported: /// -/// While it is currently not possible to implement this trait directly, it gets implemented -/// automatically for all types that implement [`futures::stream::Stream`]. +/// ``` +/// # #[allow(unused_imports)] +/// use async_std::prelude::*; +/// ``` /// /// [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html /// [`futures::stream::Stream`]: /// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.Stream.html +/// [provided methods]: #provided-methods pub trait Stream { /// The type of items yielded by this stream. type Item; @@ -345,7 +343,7 @@ pub trait Stream { /// # /// # }) } /// ``` - fn min_by(self, compare: F) -> MinByFuture + fn min_by(self, compare: F) -> ret!(MinByFuture, Self::Item) where Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering, @@ -555,7 +553,7 @@ pub trait Stream { /// # /// # }) } /// ``` - fn fold(self, init: B, f: F) -> FoldFuture + fn fold(self, init: B, f: F) -> ret!(FoldFuture, Self::Item) where Self: Sized, F: FnMut(B, Self::Item) -> B, @@ -753,13 +751,12 @@ pub trait Stream { /// ``` /// /// [`stream`]: trait.Stream.html#tymethod.next - #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] - #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg(any(feature = "unstable", feature = "docs"))] - fn collect<'a, B>(self) -> dyn_ret!('a, B) + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] + fn collect<'a, B>(self) -> ret!(Pin + 'a>>, B) where - Self: futures_core::stream::Stream + Sized + Send + 'a, - ::Item: Send, + Self: futures_core::stream::Stream + Sized + 'a, B: FromStream<::Item>, { FromStream::from_stream(self) diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index 222022b..cd44788 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -24,9 +24,9 @@ impl Scan { impl Unpin for Scan {} -impl futures_core::stream::Stream for Scan +impl Stream for Scan where - S: Stream, + S: crate::stream::Stream, F: FnMut(&mut St, S::Item) -> Option, { type Item = B; diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 8f7c9ab..706dbfa 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -5,7 +5,7 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; /// An iterator that iterates two other iterators simultaneously. -pub struct Zip { +pub struct Zip { item_slot: Option, first: A, second: B, @@ -22,7 +22,7 @@ impl fmt::Debug for Zip { impl Unpin for Zip {} -impl Zip { +impl Zip { pub(crate) fn new(first: A, second: B) -> Self { Zip { item_slot: None, diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index f603d0d..ad82797 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -1,14 +1,15 @@ -use crate::stream::{FromStream, IntoStream, Stream}; - use std::pin::Pin; -impl FromStream for Vec { +use crate::prelude::*; +use crate::stream::{FromStream, IntoStream}; + +impl FromStream for Vec { #[inline] fn from_stream<'a, S: IntoStream>( stream: S, - ) -> Pin + Send + 'a>> + ) -> Pin + 'a>> where - ::IntoStream: Send + 'a, + ::IntoStream: 'a, { let stream = stream.into_stream(); From 1fa196812aec1697a55563a1dd4e929ad748ffa8 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 21 Sep 2019 15:05:57 +0200 Subject: [PATCH 02/11] Fix compilation errors around Stream --- src/fs/file.rs | 2 +- src/io/cursor.rs | 2 +- src/io/read/read_exact.rs | 2 +- src/io/read/read_vectored.rs | 2 +- src/io/stdin.rs | 2 +- src/io/stdout.rs | 2 +- src/io/write/write_vectored.rs | 2 +- src/prelude.rs | 4 +--- src/stream/mod.rs | 12 ++---------- src/stream/stream/scan.rs | 4 ++-- src/stream/stream/zip.rs | 8 ++++---- 11 files changed, 16 insertions(+), 26 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index b2a897e..bdf9347 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -12,7 +12,7 @@ use cfg_if::cfg_if; use crate::fs::{Metadata, Permissions}; use crate::future; -use crate::io::{self, Seek, SeekFrom, Read, Write}; +use crate::io::{self, Read, Seek, SeekFrom, Write}; use crate::prelude::*; use crate::task::{self, blocking, Context, Poll, Waker}; diff --git a/src/io/cursor.rs b/src/io/cursor.rs index 8975840..2dfc8a7 100644 --- a/src/io/cursor.rs +++ b/src/io/cursor.rs @@ -1,6 +1,6 @@ use std::pin::Pin; -use crate::io::{self, IoSlice, IoSliceMut, Seek, SeekFrom, BufRead, Read, Write}; +use crate::io::{self, BufRead, IoSlice, IoSliceMut, Read, Seek, SeekFrom, Write}; use crate::task::{Context, Poll}; /// A `Cursor` wraps an in-memory buffer and provides it with a diff --git a/src/io/read/read_exact.rs b/src/io/read/read_exact.rs index d6f0218..c970f43 100644 --- a/src/io/read/read_exact.rs +++ b/src/io/read/read_exact.rs @@ -2,8 +2,8 @@ use std::mem; use std::pin::Pin; use crate::future::Future; -use crate::task::{Context, Poll}; use crate::io::{self, Read}; +use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] diff --git a/src/io/read/read_vectored.rs b/src/io/read/read_vectored.rs index 15fecfa..8e52ba2 100644 --- a/src/io/read/read_vectored.rs +++ b/src/io/read/read_vectored.rs @@ -1,7 +1,7 @@ use std::pin::Pin; -use crate::io::{self, Read, IoSliceMut}; use crate::future::Future; +use crate::io::{self, IoSliceMut, Read}; use crate::task::{Context, Poll}; #[doc(hidden)] diff --git a/src/io/stdin.rs b/src/io/stdin.rs index cbf7fc4..d2e9ec0 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -3,8 +3,8 @@ use std::sync::Mutex; use cfg_if::cfg_if; -use crate::io::{self, Read}; use crate::future::{self, Future}; +use crate::io::{self, Read}; use crate::task::{blocking, Context, Poll}; /// Constructs a new handle to the standard input of the current process. diff --git a/src/io/stdout.rs b/src/io/stdout.rs index b5fcccf..b7fee70 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -4,8 +4,8 @@ use std::sync::Mutex; use cfg_if::cfg_if; use crate::future::Future; -use crate::task::{blocking, Context, Poll}; use crate::io::{self, Write}; +use crate::task::{blocking, Context, Poll}; /// Constructs a new handle to the standard output of the current process. /// diff --git a/src/io/write/write_vectored.rs b/src/io/write/write_vectored.rs index d4e2da7..5f8492b 100644 --- a/src/io/write/write_vectored.rs +++ b/src/io/write/write_vectored.rs @@ -1,8 +1,8 @@ use std::pin::Pin; use crate::future::Future; +use crate::io::{self, IoSlice, Write}; use crate::task::{Context, Poll}; -use crate::io::{self, Write, IoSlice}; #[doc(hidden)] #[allow(missing_debug_implementations)] diff --git a/src/prelude.rs b/src/prelude.rs index 7ad894f..d28ec64 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -21,7 +21,7 @@ pub use crate::io::Read as _; pub use crate::io::Seek as _; #[doc(no_inline)] pub use crate::io::Write as _; -#[doc(no_inline)] +#[doc(hidden)] pub use crate::stream::Stream; #[doc(no_inline)] pub use crate::task_local; @@ -34,5 +34,3 @@ pub use crate::io::ReadExt as _; pub use crate::io::SeekExt as _; #[doc(hidden)] pub use crate::io::WriteExt as _; -#[doc(hidden)] -pub use crate::stream::stream::Stream as _; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index da79b19..5e17aa5 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -26,20 +26,12 @@ use cfg_if::cfg_if; pub use empty::{empty, Empty}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Fuse, Scan, Take, Zip}; +pub use stream::{Fuse, Scan, Stream, Take, Zip}; mod empty; mod once; mod repeat; -pub(crate) mod stream; - -cfg_if! { - if #[cfg(feature = "docs")] { - pub use stream::Stream; - } else { - pub use futures_core::stream::Stream; - } -} +mod stream; cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index cd44788..222022b 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -24,9 +24,9 @@ impl Scan { impl Unpin for Scan {} -impl Stream for Scan +impl futures_core::stream::Stream for Scan where - S: crate::stream::Stream, + S: Stream, F: FnMut(&mut St, S::Item) -> Option, { type Item = B; diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 706dbfa..05f9967 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -5,13 +5,13 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; /// An iterator that iterates two other iterators simultaneously. -pub struct Zip { +pub struct Zip { item_slot: Option, first: A, second: B, } -impl fmt::Debug for Zip { +impl fmt::Debug for Zip { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Zip") .field("first", &self.first) @@ -20,9 +20,9 @@ impl fmt::Debug for Zip { } } -impl Unpin for Zip {} +impl Unpin for Zip {} -impl Zip { +impl Zip { pub(crate) fn new(first: A, second: B) -> Self { Zip { item_slot: None, From a97d26ca13d8199440e15d8da954b611f68b7563 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 21 Sep 2019 15:22:50 +0200 Subject: [PATCH 03/11] Fix imports in the book --- docs/src/concepts/futures.md | 3 +-- docs/src/concepts/tasks.md | 5 ++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/docs/src/concepts/futures.md b/docs/src/concepts/futures.md index 02a37ba..a23ec07 100644 --- a/docs/src/concepts/futures.md +++ b/docs/src/concepts/futures.md @@ -112,8 +112,7 @@ While the `Future` trait has existed in Rust for a while, it was inconvenient to ```rust,edition2018 # extern crate async_std; -# use async_std::{fs::File, io::Read}; -# use std::io; +# use async_std::{fs::File, io, prelude::*}; # async fn read_file(path: &str) -> Result { let mut file = File::open(path).await?; diff --git a/docs/src/concepts/tasks.md b/docs/src/concepts/tasks.md index 0711c12..15bb82f 100644 --- a/docs/src/concepts/tasks.md +++ b/docs/src/concepts/tasks.md @@ -6,7 +6,7 @@ In `async-std`, the [`tasks`][tasks] module is responsible for this. The simples ```rust,edition2018 # extern crate async_std; -use async_std::{io, task, fs::File, io::Read}; +use async_std::{fs::File, io, prelude::*, task}; async fn read_file(path: &str) -> Result { let mut file = File::open(path).await?; @@ -33,8 +33,7 @@ This asks the runtime baked into `async_std` to execute the code that reads a fi ```rust,edition2018 # extern crate async_std; -# use async_std::{fs::File, io::Read, task}; -# use std::io; +# use async_std::{fs::File, prelude::*, task}; # # async fn read_file(path: &str) -> Result { # let mut file = File::open(path).await?; From f2ca3f37a96352ebabf1f88529e9779383ae5c13 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 22 Sep 2019 11:47:39 +0200 Subject: [PATCH 04/11] Fix build errors in docs --- docs/src/concepts/futures.md | 12 ++++++------ docs/src/concepts/tasks.md | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/src/concepts/futures.md b/docs/src/concepts/futures.md index a23ec07..05cc040 100644 --- a/docs/src/concepts/futures.md +++ b/docs/src/concepts/futures.md @@ -51,9 +51,9 @@ Remember the talk about "deferred computation" in the intro? That's all it is. I Let's have a look at a simple function, specifically the return value: ```rust,edition2018 -# use std::{fs::File, io::{self, Read}}; +# use std::{fs::File, io, io::prelude::*}; # -fn read_file(path: &str) -> Result { +fn read_file(path: &str) -> io::Result { let mut file = File::open(path)?; let mut contents = String::new(); file.read_to_string(&mut contents)?; @@ -69,7 +69,7 @@ But we wanted to abstract over *computation* and let someone else choose how to ```rust,edition2018 # use std::{fs::File, io::{self, Read}}; # -fn read_file(path: &str) -> Result { +fn read_file(path: &str) -> io::Result { let mut file = File::open(path)?; let mut contents = String::new(); file.read_to_string(&mut contents)?; @@ -112,9 +112,9 @@ While the `Future` trait has existed in Rust for a while, it was inconvenient to ```rust,edition2018 # extern crate async_std; -# use async_std::{fs::File, io, prelude::*}; +# use async_std::{fs::File, io, io::prelude::*}; # -async fn read_file(path: &str) -> Result { +async fn read_file(path: &str) -> io::Result { let mut file = File::open(path).await?; let mut contents = String::new(); file.read_to_string(&mut contents).await?; @@ -124,7 +124,7 @@ async fn read_file(path: &str) -> Result { Amazingly little difference, right? All we did is label the function `async` and insert 2 special commands: `.await`. -This `async` function sets up a deferred computation. When this function is called, it will produce a `Future>` instead of immediately returning a `Result`. (Or, more precisely, generate a type for you that implements `Future>`.) +This `async` function sets up a deferred computation. When this function is called, it will produce a `Future>` instead of immediately returning a `io::Result`. (Or, more precisely, generate a type for you that implements `Future>`.) ## What does `.await` do? diff --git a/docs/src/concepts/tasks.md b/docs/src/concepts/tasks.md index 15bb82f..d4037a3 100644 --- a/docs/src/concepts/tasks.md +++ b/docs/src/concepts/tasks.md @@ -8,7 +8,7 @@ In `async-std`, the [`tasks`][tasks] module is responsible for this. The simples # extern crate async_std; use async_std::{fs::File, io, prelude::*, task}; -async fn read_file(path: &str) -> Result { +async fn read_file(path: &str) -> io::Result { let mut file = File::open(path).await?; let mut contents = String::new(); file.read_to_string(&mut contents).await?; @@ -33,9 +33,9 @@ This asks the runtime baked into `async_std` to execute the code that reads a fi ```rust,edition2018 # extern crate async_std; -# use async_std::{fs::File, prelude::*, task}; +# use async_std::{fs::File, io, prelude::*, task}; # -# async fn read_file(path: &str) -> Result { +# async fn read_file(path: &str) -> io::Result { # let mut file = File::open(path).await?; # let mut contents = String::new(); # file.read_to_string(&mut contents).await?; From 217e435e8e20dcff4852597f22b440d59bf628c5 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 22 Sep 2019 12:03:56 +0200 Subject: [PATCH 05/11] Fix more compilation errors in the book --- docs/src/concepts/futures.md | 2 +- docs/src/tutorial/accept_loop.md | 4 ++-- docs/src/tutorial/connecting_readers_and_writers.md | 3 +-- docs/src/tutorial/handling_disconnection.md | 5 +++-- docs/src/tutorial/implementing_a_client.md | 3 ++- docs/src/tutorial/receiving_messages.md | 10 +++++----- docs/src/tutorial/sending_messages.md | 3 +-- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/docs/src/concepts/futures.md b/docs/src/concepts/futures.md index 05cc040..67db720 100644 --- a/docs/src/concepts/futures.md +++ b/docs/src/concepts/futures.md @@ -67,7 +67,7 @@ Note that this return value talks about the past. The past has a drawback: all d But we wanted to abstract over *computation* and let someone else choose how to run it. That's fundamentally incompatible with looking at the results of previous computation all the time. So, let's find a type that *describes* a computation without running it. Let's look at the function again: ```rust,edition2018 -# use std::{fs::File, io::{self, Read}}; +# use std::{fs::File, io, io::prelude::*}; # fn read_file(path: &str) -> io::Result { let mut file = File::open(path)?; diff --git a/docs/src/tutorial/accept_loop.md b/docs/src/tutorial/accept_loop.md index 96c15ba..50a6b5f 100644 --- a/docs/src/tutorial/accept_loop.md +++ b/docs/src/tutorial/accept_loop.md @@ -29,7 +29,7 @@ Now we can write the server's accept loop: # extern crate async_std; # use async_std::{ # net::{TcpListener, ToSocketAddrs}, -# prelude::Stream, +# prelude::*, # }; # # type Result = std::result::Result>; @@ -66,7 +66,7 @@ Finally, let's add main: # extern crate async_std; # use async_std::{ # net::{TcpListener, ToSocketAddrs}, -# prelude::Stream, +# prelude::*, # task, # }; # diff --git a/docs/src/tutorial/connecting_readers_and_writers.md b/docs/src/tutorial/connecting_readers_and_writers.md index d5da471..5f98ac2 100644 --- a/docs/src/tutorial/connecting_readers_and_writers.md +++ b/docs/src/tutorial/connecting_readers_and_writers.md @@ -15,9 +15,8 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined # extern crate futures_channel; # extern crate futures_util; # use async_std::{ -# io::{Write}, # net::TcpStream, -# prelude::{Future, Stream}, +# prelude::*, # task, # }; # use futures_channel::mpsc; diff --git a/docs/src/tutorial/handling_disconnection.md b/docs/src/tutorial/handling_disconnection.md index 27c5052..9d7c572 100644 --- a/docs/src/tutorial/handling_disconnection.md +++ b/docs/src/tutorial/handling_disconnection.md @@ -72,7 +72,7 @@ We use the `select` macro for this purpose: # extern crate async_std; # extern crate futures_channel; # extern crate futures_util; -# use async_std::{io::Write, net::TcpStream}; +# use async_std::{net::TcpStream, prelude::*}; use futures_channel::mpsc; use futures_util::{select, FutureExt, StreamExt}; # use std::sync::Arc; @@ -125,8 +125,9 @@ The final code looks like this: # extern crate futures_channel; # extern crate futures_util; use async_std::{ - io::{BufReader, BufRead, Write}, + io::BufReader, net::{TcpListener, TcpStream, ToSocketAddrs}, + prelude::*, task, }; use futures_channel::mpsc; diff --git a/docs/src/tutorial/implementing_a_client.md b/docs/src/tutorial/implementing_a_client.md index a3ba93a..8eab22f 100644 --- a/docs/src/tutorial/implementing_a_client.md +++ b/docs/src/tutorial/implementing_a_client.md @@ -18,8 +18,9 @@ With async, we can just use the `select!` macro. # extern crate async_std; # extern crate futures_util; use async_std::{ - io::{stdin, BufRead, BufReader, Write}, + io::{stdin, BufReader}, net::{TcpStream, ToSocketAddrs}, + prelude::*, task, }; use futures_util::{select, FutureExt, StreamExt}; diff --git a/docs/src/tutorial/receiving_messages.md b/docs/src/tutorial/receiving_messages.md index 667cf1c..182e55c 100644 --- a/docs/src/tutorial/receiving_messages.md +++ b/docs/src/tutorial/receiving_messages.md @@ -10,9 +10,9 @@ We need to: ```rust,edition2018 # extern crate async_std; # use async_std::{ -# io::{BufRead, BufReader}, +# io::BufReader, # net::{TcpListener, TcpStream, ToSocketAddrs}, -# prelude::Stream, +# prelude::*, # task, # }; # @@ -75,9 +75,9 @@ We can "fix" it by waiting for the task to be joined, like this: # #![feature(async_closure)] # extern crate async_std; # use async_std::{ -# io::{BufRead, BufReader}, +# io::BufReader, # net::{TcpListener, TcpStream, ToSocketAddrs}, -# prelude::Stream, +# prelude::*, # task, # }; # @@ -125,7 +125,7 @@ So let's use a helper function for this: # extern crate async_std; # use async_std::{ # io, -# prelude::Future, +# prelude::*, # task, # }; fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> diff --git a/docs/src/tutorial/sending_messages.md b/docs/src/tutorial/sending_messages.md index 6fec8c9..028970e 100644 --- a/docs/src/tutorial/sending_messages.md +++ b/docs/src/tutorial/sending_messages.md @@ -16,9 +16,8 @@ if Alice and Charley send two messages to Bob at the same time, Bob will see the # extern crate futures_channel; # extern crate futures_util; # use async_std::{ -# io::Write, # net::TcpStream, -# prelude::Stream, +# prelude::*, # }; use futures_channel::mpsc; // 1 use futures_util::sink::SinkExt; From bfab20da03e903b0d8d08e73d16d77f2b2fb0853 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 22 Sep 2019 12:08:08 +0200 Subject: [PATCH 06/11] Don't re-export ext traits in async_std::io --- src/io/mod.rs | 17 +++++++++-------- src/io/prelude.rs | 8 ++++---- src/prelude.rs | 8 ++++---- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index 301cfa5..8dd1ad1 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -23,34 +23,35 @@ #[doc(inline)] pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; -pub use buf_read::{BufRead, BufReadExt, Lines}; +pub use buf_read::{BufRead, Lines}; pub use buf_reader::BufReader; pub use copy::copy; pub use cursor::Cursor; pub use empty::{empty, Empty}; -pub use read::{Read, ReadExt}; +pub use read::Read; pub use repeat::{repeat, Repeat}; -pub use seek::{Seek, SeekExt}; +pub use seek::Seek; pub use sink::{sink, Sink}; pub use stderr::{stderr, Stderr}; pub use stdin::{stdin, Stdin}; pub use stdout::{stdout, Stdout}; pub use timeout::timeout; -pub use write::{Write, WriteExt}; +pub use write::Write; pub mod prelude; -mod buf_read; +pub(crate) mod buf_read; +pub(crate) mod read; +pub(crate) mod seek; +pub(crate) mod write; + mod buf_reader; mod copy; mod cursor; mod empty; -mod read; mod repeat; -mod seek; mod sink; mod stderr; mod stdin; mod stdout; mod timeout; -mod write; diff --git a/src/io/prelude.rs b/src/io/prelude.rs index eeb0ced..fb1b945 100644 --- a/src/io/prelude.rs +++ b/src/io/prelude.rs @@ -18,10 +18,10 @@ pub use crate::io::Seek; pub use crate::io::Write; #[doc(hidden)] -pub use crate::io::BufReadExt as _; +pub use crate::io::buf_read::BufReadExt as _; #[doc(hidden)] -pub use crate::io::ReadExt as _; +pub use crate::io::read::ReadExt as _; #[doc(hidden)] -pub use crate::io::SeekExt as _; +pub use crate::io::seek::SeekExt as _; #[doc(hidden)] -pub use crate::io::WriteExt as _; +pub use crate::io::write::WriteExt as _; diff --git a/src/prelude.rs b/src/prelude.rs index d28ec64..6efd747 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -27,10 +27,10 @@ pub use crate::stream::Stream; pub use crate::task_local; #[doc(hidden)] -pub use crate::io::BufReadExt as _; +pub use crate::io::buf_read::BufReadExt as _; #[doc(hidden)] -pub use crate::io::ReadExt as _; +pub use crate::io::read::ReadExt as _; #[doc(hidden)] -pub use crate::io::SeekExt as _; +pub use crate::io::seek::SeekExt as _; #[doc(hidden)] -pub use crate::io::WriteExt as _; +pub use crate::io::write::WriteExt as _; From 797a6b2d905ebfa3958966454caf9f48952f65b1 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 22 Sep 2019 12:16:35 +0200 Subject: [PATCH 07/11] Add a missing assert in a doc example --- src/task/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/task/mod.rs b/src/task/mod.rs index 1844cca..7424502 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -17,6 +17,7 @@ //! let handle = task::spawn(async { //! 1 + 2 //! }); +//! assert_eq!(handle.await, 3); //! # //! # }) } //! ``` From 0e3c47c3bfd74959b336865c66ef208e77f56060 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 22 Sep 2019 12:41:04 +0200 Subject: [PATCH 08/11] Fix imports in docs --- docs/src/tutorial/handling_disconnection.md | 4 ++-- docs/src/tutorial/implementing_a_client.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/src/tutorial/handling_disconnection.md b/docs/src/tutorial/handling_disconnection.md index 9d7c572..21f67ab 100644 --- a/docs/src/tutorial/handling_disconnection.md +++ b/docs/src/tutorial/handling_disconnection.md @@ -74,7 +74,7 @@ We use the `select` macro for this purpose: # extern crate futures_util; # use async_std::{net::TcpStream, prelude::*}; use futures_channel::mpsc; -use futures_util::{select, FutureExt, StreamExt}; +use futures_util::{select, FutureExt}; # use std::sync::Arc; # type Receiver = mpsc::UnboundedReceiver; @@ -131,7 +131,7 @@ use async_std::{ task, }; use futures_channel::mpsc; -use futures_util::{select, FutureExt, SinkExt, StreamExt}; +use futures_util::{select, FutureExt, SinkExt}; use std::{ collections::hash_map::{Entry, HashMap}, future::Future, diff --git a/docs/src/tutorial/implementing_a_client.md b/docs/src/tutorial/implementing_a_client.md index 8eab22f..4510398 100644 --- a/docs/src/tutorial/implementing_a_client.md +++ b/docs/src/tutorial/implementing_a_client.md @@ -23,7 +23,7 @@ use async_std::{ prelude::*, task, }; -use futures_util::{select, FutureExt, StreamExt}; +use futures_util::{select, FutureExt}; type Result = std::result::Result>; From d55cfb1da82a247d3a3d52b7a3e957e9d9bae2cd Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 22 Sep 2019 12:44:46 +0200 Subject: [PATCH 09/11] impl FusedStream for Fuse --- src/stream/stream/fuse.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 3541937..13850c5 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -31,3 +31,9 @@ impl futures_core::Stream for Fuse { } } } + +impl futures_core::stream::FusedStream for Fuse { + fn is_terminated(&self) -> bool { + self.done + } +} From 17534cfffc72f6022d8ffd3bb0e709eba2734acc Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 22 Sep 2019 15:19:54 +0200 Subject: [PATCH 10/11] Fuse next() future --- docs/src/tutorial/handling_disconnection.md | 12 ++++++------ src/stream/stream/fuse.rs | 6 ------ 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/docs/src/tutorial/handling_disconnection.md b/docs/src/tutorial/handling_disconnection.md index 21f67ab..e1eb8e9 100644 --- a/docs/src/tutorial/handling_disconnection.md +++ b/docs/src/tutorial/handling_disconnection.md @@ -94,11 +94,11 @@ async fn client_writer( let mut shutdown = shutdown.fuse(); loop { // 2 select! { - msg = messages.next() => match msg { + msg = messages.next().fuse() => match msg { Some(msg) => stream.write_all(msg.as_bytes()).await?, None => break, }, - void = shutdown.next() => match void { + void = shutdown.next().fuse() => match void { Some(void) => match void {}, // 3 None => break, } @@ -210,11 +210,11 @@ async fn client_writer( let mut shutdown = shutdown.fuse(); loop { select! { - msg = messages.next() => match msg { + msg = messages.next().fuse() => match msg { Some(msg) => stream.write_all(msg.as_bytes()).await?, None => break, }, - void = shutdown.next() => match void { + void = shutdown.next().fuse() => match void { Some(void) => match void {}, None => break, } @@ -244,11 +244,11 @@ async fn broker(events: Receiver) { let mut events = events.fuse(); loop { let event = select! { - event = events.next() => match event { + event = events.next().fuse() => match event { None => break, // 2 Some(event) => event, }, - disconnect = disconnect_receiver.next() => { + disconnect = disconnect_receiver.next().fuse() => { let (name, _pending_messages) = disconnect.unwrap(); // 3 assert!(peers.remove(&name).is_some()); continue; diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 13850c5..3541937 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -31,9 +31,3 @@ impl futures_core::Stream for Fuse { } } } - -impl futures_core::stream::FusedStream for Fuse { - fn is_terminated(&self) -> bool { - self.done - } -} From 85b80cfe9a0a20a6e4d9091ad48027dcbf7ff656 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 22 Sep 2019 15:35:53 +0200 Subject: [PATCH 11/11] Fuse futures in select! --- docs/src/tutorial/implementing_a_client.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/src/tutorial/implementing_a_client.md b/docs/src/tutorial/implementing_a_client.md index 4510398..3aac67f 100644 --- a/docs/src/tutorial/implementing_a_client.md +++ b/docs/src/tutorial/implementing_a_client.md @@ -39,14 +39,14 @@ async fn try_run(addr: impl ToSocketAddrs) -> Result<()> { let mut lines_from_stdin = BufReader::new(stdin()).lines().fuse(); // 2 loop { select! { // 3 - line = lines_from_server.next() => match line { + line = lines_from_server.next().fuse() => match line { Some(line) => { let line = line?; println!("{}", line); }, None => break, }, - line = lines_from_stdin.next() => match line { + line = lines_from_stdin.next().fuse() => match line { Some(line) => { let line = line?; writer.write_all(line.as_bytes()).await?;