From 48d4c9b18d7269b4e1314f1ef2a986bb45c3d47a Mon Sep 17 00:00:00 2001 From: Kirill Mironov Date: Tue, 20 Aug 2019 18:30:33 +0300 Subject: [PATCH] begin implementing BufWriter --- src/io/buf_writer.rs | 310 ++++++++++++++++++++++++++++++++++++------- src/io/mod.rs | 2 +- 2 files changed, 264 insertions(+), 48 deletions(-) diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index 5755063..9775a07 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -7,35 +7,138 @@ use crate::io::Write; const DEFAULT_CAPACITY: usize = 8 * 1024; - -pub struct BufWriter { - inner: Option, +/// Wraps a writer and buffers its output. +/// +/// It can be excessively inefficient to work directly with something that +/// implements [`Write`]. For example, every call to +/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A +/// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying +/// writer in large, infrequent batches. +/// +/// `BufWriter` can improve the speed of programs that make *small* and +/// *repeated* write calls to the same file or network socket. It does not +/// help when writing very large amounts at once, or writing just one or a few +/// times. It also provides no advantage when writing to a destination that is +/// in memory, like a `Vec`. +/// +/// When the `BufWriter` is dropped, the contents of its buffer will be written +/// out. However, any errors that happen in the process of flushing the buffer +/// when the writer is dropped will be ignored. Code that wishes to handle such +/// errors must manually call [`flush`] before the writer is dropped. +/// +/// This type is an async version of [`std::io::BufReader`]. +/// +/// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html +/// +/// # Examples +/// +/// Let's write the numbers one through ten to a [`TcpStream`]: +/// +/*/ ```no_run +/ use std::io::prelude::*; +/ use std::net::TcpStream; +/ +/ let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap(); +/ +/ for i in 0..10 { +/ stream.write(&[i+1]).unwrap(); +/ } +/ ```*/ +/// +/// Because we're not buffering, we write each one in turn, incurring the +/// overhead of a system call per byte written. We can fix this with a +/// `BufWriter`: +/// +/*/ ```no_run +/ use std::io::prelude::*; +/ use std::io::BufWriter; +/ use std::net::TcpStream; +/ +/ let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); +/ +/ for i in 0..10 { +/ stream.write(&[i+1]).unwrap(); +/ } +/ ```*/ +/// +/// By wrapping the stream with a `BufWriter`, these ten writes are all grouped +/// together by the buffer, and will all be written out in one system call when +/// the `stream` is dropped. +/// +/// [`Write`]: ../../std/io/trait.Write.html +/// [`TcpStream::write`]: ../../std/net/struct.TcpStream.html#method.write +/// [`TcpStream`]: ../../std/net/struct.TcpStream.html +/// [`flush`]: #method.flush +pub struct BufWriter { + inner: W, buf: Vec, - panicked: bool, + written: usize, } impl BufWriter { - pin_utils::unsafe_pinned!(inner: Option); - pin_utils::unsafe_unpinned!(panicked: bool); + pin_utils::unsafe_pinned!(inner: W); + pin_utils::unsafe_unpinned!(buf: Vec); + /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// ``` pub fn new(inner: W) -> BufWriter { BufWriter::with_capacity(DEFAULT_CAPACITY, inner) } + /// Creates a new `BufWriter` with the specified buffer capacity. + /// + /// # Examples + /// + /// Creating a buffer with a buffer of a hundred bytes. + /// + /// ```no_run + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let stream = TcpStream::connect("127.0.0.1:34254").unwrap(); + /// let mut buffer = BufWriter::with_capacity(100, stream); + /// ``` pub fn with_capacity(capacity: usize, inner: W) -> BufWriter { BufWriter { - inner: Some(inner), + inner, buf: Vec::with_capacity(capacity), - panicked: false, + written: 0, } } pub fn get_ref(&self) -> &W { - self.inner.as_ref().unwrap() + &self.inner } pub fn get_mut(&mut self) -> &mut W { - self.inner.as_mut().unwrap() + &mut self.inner + } + + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + self.inner() + } + + /// Consumes BufWriter, returning the underlying writer + /// + /// This method will not write leftover data, it will be lost. + /// For method that will attempt to write before returning the writer see [`poll_into_inner`] + /// + /// [`poll_into_inner`]: #method.poll_into_inner + pub fn into_inner(self) -> W { + self.inner + } + + pub fn poll_into_inner(mut self: Pin<&mut Self>, cx: Context<'_>) -> Poll> { + unimplemented!("poll into inner method") } pub fn buffer(&self) -> &[u8] { @@ -46,17 +149,13 @@ impl BufWriter { let Self { inner, buf, - panicked + written } = Pin::get_mut(self); - let mut panicked = Pin::new(panicked); - let mut written = 0; + let mut inner = Pin::new(inner); let len = buf.len(); let mut ret = Ok(()); - while written < len { - *panicked = true; - let r = Pin::new(inner.as_mut().unwrap()); - *panicked = false; - match r.poll_write(cx, &buf[written..]) { + while *written < len { + match inner.as_mut().poll_write(cx, &buf[*written..]) { Poll::Ready(Ok(0)) => { ret = Err(io::Error::new( io::ErrorKind::WriteZero, @@ -64,7 +163,7 @@ impl BufWriter { )); break; } - Poll::Ready(Ok(n)) => written += n, + Poll::Ready(Ok(n)) => *written += n, Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::Interrupted => {} Poll::Ready(Err(e)) => { ret = Err(e); @@ -73,22 +172,12 @@ impl BufWriter { Poll::Pending => return Poll::Pending, } } - if written > 0 { - buf.drain(..written); + if *written > 0 { + buf.drain(..*written); } + *written = 0; Poll::Ready(ret) } - - pub fn poll_into_inner( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - //TODO: Fix 'expected function, found struct `IntoInnerError`' compiler error - ) -> Poll> { - match ready!(self.as_mut().poll_flush_buf(cx)) { - Ok(()) => Poll::Ready(Ok(self.inner().take().unwrap())), - Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, ""))) - } - } } impl AsyncWrite for BufWriter { @@ -97,33 +186,28 @@ impl AsyncWrite for BufWriter { cx: &mut Context, buf: &[u8], ) -> Poll> { - let panicked = self.as_mut().panicked(); - if self.as_ref().buf.len() + buf.len() > self.as_ref().buf.capacity() { - match ready!(self.as_mut().poll_flush_buf(cx)) { - Ok(()) => {}, - Err(e) => return Poll::Ready(Err(e)) - } + if self.buf.len() + buf.len() > self.buf.capacity() { + ready!(self.as_mut().poll_flush_buf(cx))?; } - if buf.len() >= self.as_ref().buf.capacity() { - *panicked = true; - let r = ready!(self.as_mut().poll_write(cx, buf)); - *panicked = false; - return Poll::Ready(r) + if buf.len() >= self.buf.capacity() { + self.inner().poll_write(cx, buf) } else { - return Poll::Ready(ready!(self.as_ref().buf.write(buf).poll())) + self.buf().write(buf).poll() } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - unimplemented!() + ready!(self.as_mut().poll_flush_buf(cx))?; + self.inner().poll_flush(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - unimplemented!() + ready!(self.as_mut().poll_flush_buf(cx))?; + self.inner().poll_close(cx) } } -impl fmt::Debug for BufWriter { +impl fmt::Debug for BufWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") .field("writer", &self.inner) @@ -135,6 +219,138 @@ impl fmt::Debug for BufWriter { } } +pub struct LineWriter { + inner: BufWriter, + need_flush: bool, +} + +impl LineWriter { + pin_utils::unsafe_pinned!(inner: BufWriter); + pin_utils::unsafe_unpinned!(need_flush: bool); + /// Creates a new `LineWriter`. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::fs::File; + /// use async_std::io::LineWriter; + /// + /// fn main() -> std::io::Result<()> { + /// async_std::task::block_on(async { + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::new(file); + /// Ok(()) + /// }) + /// } + /// ``` + pub fn new(inner: W) -> LineWriter { + // Lines typically aren't that long, don't use a giant buffer + LineWriter::with_capacity(1024, inner) + } + + /// Creates a new `LineWriter` with a specified capacity for the internal + /// buffer. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::fs::File; + /// use async_std::io::LineWriter; + /// + /// fn main() -> std::io::Result<()> { + /// async_std::task::block_on(async { + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::with_capacity(100, file); + /// Ok(()) + /// }) + /// } + /// ``` + pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { + LineWriter { + inner: BufWriter::with_capacity(capacity, inner), + need_flush: false, + } + } + + pub fn get_ref(&self) -> &W { + self.inner.get_ref() + } + + pub fn get_mut(&mut self) -> &mut W { + self.inner.get_mut() + } + + pub fn into_inner(self) -> W { + self.inner.into_inner() + } +} + +impl AsyncWrite for LineWriter { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + if self.need_flush { + self.as_mut().poll_flush(cx)?; + } + + let i = match memchr::memrchr(b'\n', buf) { + Some(i) => i, + None => return self.as_mut().inner().as_mut().poll_write(cx, buf) + }; + + let n = ready!(self.as_mut().inner().as_mut().poll_write(cx, &buf[..=i])?); + *self.as_mut().need_flush() = true; + if ready!(self.as_mut().poll_flush(cx)).is_err() || n != 1 + 1 { + return Poll::Ready(Ok(n)) + } + match ready!(self.inner().poll_write(cx, &buf[i + 1..])) { + Ok(i) => Poll::Ready(Ok(n + 1)), + Err(_) => Poll::Ready(Ok(n)) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.as_mut().inner().poll_flush(cx)?; + *self.need_flush() = false; + Poll::Ready(Ok(())) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.as_mut().inner().poll_flush(cx)?; + self.inner().poll_close(cx) + } +} + +impl fmt::Debug for LineWriter where W: fmt::Debug { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("LineWriter") + .field("writer", &self.inner.inner) + .field("buffer", + &format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity())) + .finish() + } +} + + mod tests { + use crate::prelude::*; + use crate::task; + use super::LineWriter; + #[test] + fn test_line_buffer() { + task::block_on(async { + let mut writer = LineWriter::new(Vec::new()); + writer.write(&[0]).await.unwrap(); + assert_eq!(*writer.get_ref(), []); + writer.write(&[1]).await.unwrap(); + assert_eq!(*writer.get_ref(), []); + writer.flush().await.unwrap(); + assert_eq!(*writer.get_ref(), [0, 1]); + writer.write(&[0, b'\n', 1, b'\n', 2]).await.unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']); + writer.flush().await.unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]); + writer.write(&[3, b'\n']).await.unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); + }) + } } \ No newline at end of file diff --git a/src/io/mod.rs b/src/io/mod.rs index 53dab90..7584b68 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -28,7 +28,7 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom}; pub use buf_read::{BufRead, Lines}; pub use buf_reader::BufReader; -pub use buf_writer::BufWriter; +pub use buf_writer::{BufWriter, LineWriter}; pub use copy::copy; pub use empty::{empty, Empty}; pub use read::Read;