diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index 9775a07..415660e 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -1,9 +1,8 @@ use crate::task::{Context, Poll}; -use futures::{ready, AsyncWrite, Future, Stream}; -use std::io::{self, IntoInnerError}; -use std::pin::Pin; +use futures::{ready, AsyncWrite}; use std::fmt; -use crate::io::Write; +use std::io; +use std::pin::Pin; const DEFAULT_CAPACITY: usize = 8 * 1024; @@ -34,41 +33,50 @@ const DEFAULT_CAPACITY: usize = 8 * 1024; /// /// Let's write the numbers one through ten to a [`TcpStream`]: /// -/*/ ```no_run -/ use std::io::prelude::*; -/ use std::net::TcpStream; -/ -/ let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap(); -/ -/ for i in 0..10 { -/ stream.write(&[i+1]).unwrap(); -/ } -/ ```*/ +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// use async_std::net::TcpStream; +/// use async_std::io::Write; +/// +/// let mut stream = TcpStream::connect("127.0.0.1:34254").await?; +/// +/// for i in 0..10 { +/// let arr = [i+1]; +/// stream.write(&arr).await?; +/// } +/// # +/// # Ok(()) }) } +/// ``` /// /// Because we're not buffering, we write each one in turn, incurring the /// overhead of a system call per byte written. We can fix this with a /// `BufWriter`: /// -/*/ ```no_run -/ use std::io::prelude::*; -/ use std::io::BufWriter; -/ use std::net::TcpStream; -/ -/ let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); -/ -/ for i in 0..10 { -/ stream.write(&[i+1]).unwrap(); -/ } -/ ```*/ +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// use async_std::io::BufWriter; +/// use async_std::net::TcpStream; +/// use async_std::io::Write; +/// +/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); +/// for i in 0..10 { +/// let arr = [i+1]; +/// stream.write(&arr).await?; +/// }; +/// # +/// # Ok(()) }) } +/// ``` /// /// By wrapping the stream with a `BufWriter`, these ten writes are all grouped /// together by the buffer, and will all be written out in one system call when /// the `stream` is dropped. /// -/// [`Write`]: ../../std/io/trait.Write.html -/// [`TcpStream::write`]: ../../std/net/struct.TcpStream.html#method.write -/// [`TcpStream`]: ../../std/net/struct.TcpStream.html -/// [`flush`]: #method.flush +/// [`Write`]: trait.Write.html +/// [`TcpStream::write`]: ../net/struct.TcpStream.html#method.write +/// [`TcpStream`]: ../net/struct.TcpStream.html +/// [`flush`]: trait.Write.html#tymethod.flush pub struct BufWriter { inner: W, buf: Vec, @@ -85,10 +93,15 @@ impl BufWriter { /// # Examples /// /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; /// use async_std::net::TcpStream; /// - /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); + /// # + /// # Ok(()) }) } /// ``` pub fn new(inner: W) -> BufWriter { BufWriter::with_capacity(DEFAULT_CAPACITY, inner) @@ -101,11 +114,16 @@ impl BufWriter { /// Creating a buffer with a buffer of a hundred bytes. /// /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; /// use async_std::net::TcpStream; /// - /// let stream = TcpStream::connect("127.0.0.1:34254").unwrap(); + /// let stream = TcpStream::connect("127.0.0.1:34254").await?; /// let mut buffer = BufWriter::with_capacity(100, stream); + /// # + /// # Ok(()) }) } /// ``` pub fn with_capacity(capacity: usize, inner: W) -> BufWriter { BufWriter { @@ -115,17 +133,54 @@ impl BufWriter { } } + /// Gets a reference to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); + /// + /// // We can use reference just like buffer + /// let reference = buffer.get_ref(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_ref(&self) -> &W { &self.inner } + /// Gets a mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); + /// + /// // We can use reference just like buffer + /// let reference = buffer.get_mut(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_mut(&mut self) -> &mut W { &mut self.inner } - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.inner() - } + // pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + // self.inner() + // } /// Consumes BufWriter, returning the underlying writer /// @@ -137,19 +192,41 @@ impl BufWriter { self.inner } - pub fn poll_into_inner(mut self: Pin<&mut Self>, cx: Context<'_>) -> Poll> { - unimplemented!("poll into inner method") - } + // pub fn poll_into_inner(self: Pin<&mut Self>, _cx: Context<'_>) -> Poll> { + // unimplemented!("poll into inner method") + // } + /// Returns a reference to the internally buffered data. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34251").await?); + /// + /// // See how many bytes are currently buffered + /// let bytes_buffered = buf_writer.buffer().len(); + /// # + /// # Ok(()) }) } + /// ``` pub fn buffer(&self) -> &[u8] { &self.buf } - pub fn poll_flush_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + /// Poll buffer flushing until completion + /// + /// This is used in types that wrap around BufWrite, one such example: [`LineWriter`] + /// + /// [`LineWriter`]: struct.LineWriter.html + pub fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let Self { inner, buf, - written + written, } = Pin::get_mut(self); let mut inner = Pin::new(inner); let len = buf.len(); @@ -183,7 +260,7 @@ impl BufWriter { impl AsyncWrite for BufWriter { fn poll_write( mut self: Pin<&mut Self>, - cx: &mut Context, + cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { if self.buf.len() + buf.len() > self.buf.capacity() { @@ -192,16 +269,16 @@ impl AsyncWrite for BufWriter { if buf.len() >= self.buf.capacity() { self.inner().poll_write(cx, buf) } else { - self.buf().write(buf).poll() + Pin::new(&mut *self.buf()).poll_write(cx, buf) } } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll_flush_buf(cx))?; self.inner().poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll_flush_buf(cx))?; self.inner().poll_close(cx) } @@ -211,14 +288,78 @@ impl fmt::Debug for BufWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") .field("writer", &self.inner) - .field( - "buf", - &self.buf - ) + .field("buf", &self.buf) .finish() } } +/// Wraps a writer and buffers output to it, flushing whenever a newline +/// (`0x0a`, `'\n'`) is detected. +/// +/// The [`BufWriter`][bufwriter] struct wraps a writer and buffers its output. +/// But it only does this batched write when it goes out of scope, or when the +/// internal buffer is full. Sometimes, you'd prefer to write each line as it's +/// completed, rather than the entire buffer at once. Enter `LineWriter`. It +/// does exactly that. +/// +/// Like [`BufWriter`][bufwriter], a `LineWriter`’s buffer will also be flushed when the +/// `LineWriter` goes out of scope or when its internal buffer is full. +/// +/// [bufwriter]: struct.BufWriter.html +/// +/// If there's still a partial line in the buffer when the `LineWriter` is +/// dropped, it will flush those contents. +/// +/// This type is an async version of [`std::io::LineWriter`] +/// +/// [`std::io::LineWriter`]: https://doc.rust-lang.org/std/io/struct.LineWriter.html +/// +/// # Examples +/// +/// We can use `LineWriter` to write one line at a time, significantly +/// reducing the number of actual writes to the file. +/// +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// use async_std::io::{LineWriter, Write}; +/// use async_std::fs::{File, self}; +/// let road_not_taken = b"I shall be telling this with a sigh +/// Somewhere ages and ages hence: +/// Two roads diverged in a wood, and I - +/// I took the one less traveled by, +/// And that has made all the difference."; +/// +/// let file = File::create("poem.txt").await?; +/// let mut file = LineWriter::new(file); +/// +/// file.write_all(b"I shall be telling this with a sigh").await?; +/// +/// // No bytes are written until a newline is encountered (or +/// // the internal buffer is filled). +/// assert_eq!(fs::read_to_string("poem.txt").await?, ""); +/// file.write_all(b"\n").await?; +/// assert_eq!( +/// fs::read_to_string("poem.txt").await?, +/// "I shall be telling this with a sigh\n", +/// ); +/// +/// // Write the rest of the poem. +/// file.write_all(b"Somewhere ages and ages hence: +/// Two roads diverged in a wood, and I - +/// I took the one less traveled by, +/// And that has made all the difference.").await?; +/// +/// // The last line of the poem doesn't end in a newline, so +/// // we have to flush or drop the `LineWriter` to finish +/// // writing. +/// file.flush().await?; +/// +/// // Confirm the whole poem was written. +/// assert_eq!(fs::read("poem.txt").await?, &road_not_taken[..]); +/// # +/// # Ok(()) }) } +/// ``` pub struct LineWriter { inner: BufWriter, need_flush: bool, @@ -232,16 +373,15 @@ impl LineWriter { /// # Examples /// /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::fs::File; /// use async_std::io::LineWriter; /// - /// fn main() -> std::io::Result<()> { - /// async_std::task::block_on(async { - /// let file = File::create("poem.txt").await?; - /// let file = LineWriter::new(file); - /// Ok(()) - /// }) - /// } + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::new(file); + /// # + /// # Ok(()) }) } /// ``` pub fn new(inner: W) -> LineWriter { // Lines typically aren't that long, don't use a giant buffer @@ -254,16 +394,15 @@ impl LineWriter { /// # Examples /// /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::fs::File; /// use async_std::io::LineWriter; /// - /// fn main() -> std::io::Result<()> { - /// async_std::task::block_on(async { - /// let file = File::create("poem.txt").await?; - /// let file = LineWriter::with_capacity(100, file); - /// Ok(()) - /// }) - /// } + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::with_capacity(100, file); + /// # + /// # Ok(()) }) } /// ``` pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { LineWriter { @@ -272,68 +411,120 @@ impl LineWriter { } } + /// Gets a reference to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::LineWriter; + /// use async_std::fs::File; + /// + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::new(file); + /// + /// // We can use reference just like buffer + /// let reference = file.get_ref(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_ref(&self) -> &W { self.inner.get_ref() } + /// Gets a mutable reference to the underlying writer. + /// + /// Caution must be taken when calling methods on the mutable reference + /// returned as extra writes could corrupt the output stream. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::LineWriter; + /// use async_std::fs::File; + /// + /// let file = File::create("poem.txt").await?; + /// let mut file = LineWriter::new(file); + /// + /// // We can use reference just like buffer + /// let reference = file.get_mut(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_mut(&mut self) -> &mut W { self.inner.get_mut() } + //TODO: Implement flushing before returning inner + #[allow(missing_docs)] pub fn into_inner(self) -> W { self.inner.into_inner() } } impl AsyncWrite for LineWriter { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { if self.need_flush { - self.as_mut().poll_flush(cx)?; + let _ = self.as_mut().poll_flush(cx)?; } let i = match memchr::memrchr(b'\n', buf) { Some(i) => i, - None => return self.as_mut().inner().as_mut().poll_write(cx, buf) + None => return self.as_mut().inner().as_mut().poll_write(cx, buf), }; let n = ready!(self.as_mut().inner().as_mut().poll_write(cx, &buf[..=i])?); *self.as_mut().need_flush() = true; - if ready!(self.as_mut().poll_flush(cx)).is_err() || n != 1 + 1 { - return Poll::Ready(Ok(n)) + if ready!(self.as_mut().poll_flush(cx)).is_err() || n != i + 1 { + return Poll::Ready(Ok(n)); } match ready!(self.inner().poll_write(cx, &buf[i + 1..])) { - Ok(i) => Poll::Ready(Ok(n + 1)), - Err(_) => Poll::Ready(Ok(n)) + Ok(_) => Poll::Ready(Ok(n + 1)), + Err(_) => Poll::Ready(Ok(n)), } } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.as_mut().inner().poll_flush(cx)?; + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let _ = self.as_mut().inner().poll_flush(cx)?; *self.need_flush() = false; Poll::Ready(Ok(())) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.as_mut().inner().poll_flush(cx)?; + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let _ = self.as_mut().inner().poll_flush(cx)?; self.inner().poll_close(cx) } } -impl fmt::Debug for LineWriter where W: fmt::Debug { +impl fmt::Debug for LineWriter +where + W: fmt::Debug, +{ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("LineWriter") .field("writer", &self.inner.inner) - .field("buffer", - &format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity())) + .field( + "buffer", + &format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity()), + ) .finish() } } - mod tests { + #![allow(unused_imports)] + use super::LineWriter; use crate::prelude::*; use crate::task; - use super::LineWriter; #[test] fn test_line_buffer() { @@ -353,4 +544,4 @@ mod tests { assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); }) } -} \ No newline at end of file +} diff --git a/src/io/mod.rs b/src/io/mod.rs index 7584b68..35cf58c 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -21,8 +21,6 @@ //! # Ok(()) }) } //! ``` -pub(crate) const DEFAULT_CAPACITY: usize = 8 * 1024; - #[doc(inline)] pub use std::io::{Error, ErrorKind, Result, SeekFrom};