From 91aeb39e4c83f6d0794fa6f40f43154544418673 Mon Sep 17 00:00:00 2001 From: Kirill Mironov Date: Tue, 20 Aug 2019 14:48:15 +0300 Subject: [PATCH 1/5] begin implementing BufWriter --- src/io/buf_writer.rs | 140 +++++++++++++++++++++++++++++++++++++++++++ src/io/mod.rs | 4 ++ 2 files changed, 144 insertions(+) create mode 100644 src/io/buf_writer.rs diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs new file mode 100644 index 00000000..57550637 --- /dev/null +++ b/src/io/buf_writer.rs @@ -0,0 +1,140 @@ +use crate::task::{Context, Poll}; +use futures::{ready, AsyncWrite, Future, Stream}; +use std::io::{self, IntoInnerError}; +use std::pin::Pin; +use std::fmt; +use crate::io::Write; + +const DEFAULT_CAPACITY: usize = 8 * 1024; + + +pub struct BufWriter { + inner: Option, + buf: Vec, + panicked: bool, +} + +impl BufWriter { + pin_utils::unsafe_pinned!(inner: Option); + pin_utils::unsafe_unpinned!(panicked: bool); + + pub fn new(inner: W) -> BufWriter { + BufWriter::with_capacity(DEFAULT_CAPACITY, inner) + } + + pub fn with_capacity(capacity: usize, inner: W) -> BufWriter { + BufWriter { + inner: Some(inner), + buf: Vec::with_capacity(capacity), + panicked: false, + } + } + + pub fn get_ref(&self) -> &W { + self.inner.as_ref().unwrap() + } + + pub fn get_mut(&mut self) -> &mut W { + self.inner.as_mut().unwrap() + } + + pub fn buffer(&self) -> &[u8] { + &self.buf + } + + pub fn poll_flush_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + inner, + buf, + panicked + } = Pin::get_mut(self); + let mut panicked = Pin::new(panicked); + let mut written = 0; + let len = buf.len(); + let mut ret = Ok(()); + while written < len { + *panicked = true; + let r = Pin::new(inner.as_mut().unwrap()); + *panicked = false; + match r.poll_write(cx, &buf[written..]) { + Poll::Ready(Ok(0)) => { + ret = Err(io::Error::new( + io::ErrorKind::WriteZero, + "Failed to write buffered data", + )); + break; + } + Poll::Ready(Ok(n)) => written += n, + Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::Interrupted => {} + Poll::Ready(Err(e)) => { + ret = Err(e); + break; + } + Poll::Pending => return Poll::Pending, + } + } + if written > 0 { + buf.drain(..written); + } + Poll::Ready(ret) + } + + pub fn poll_into_inner( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + //TODO: Fix 'expected function, found struct `IntoInnerError`' compiler error + ) -> Poll> { + match ready!(self.as_mut().poll_flush_buf(cx)) { + Ok(()) => Poll::Ready(Ok(self.inner().take().unwrap())), + Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, ""))) + } + } +} + +impl AsyncWrite for BufWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + let panicked = self.as_mut().panicked(); + if self.as_ref().buf.len() + buf.len() > self.as_ref().buf.capacity() { + match ready!(self.as_mut().poll_flush_buf(cx)) { + Ok(()) => {}, + Err(e) => return Poll::Ready(Err(e)) + } + } + if buf.len() >= self.as_ref().buf.capacity() { + *panicked = true; + let r = ready!(self.as_mut().poll_write(cx, buf)); + *panicked = false; + return Poll::Ready(r) + } else { + return Poll::Ready(ready!(self.as_ref().buf.write(buf).poll())) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unimplemented!() + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unimplemented!() + } +} + +impl fmt::Debug for BufWriter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufReader") + .field("writer", &self.inner) + .field( + "buf", + &self.buf + ) + .finish() + } +} + +mod tests { + +} \ No newline at end of file diff --git a/src/io/mod.rs b/src/io/mod.rs index 13b91d6c..53dab906 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -21,11 +21,14 @@ //! # Ok(()) }) } //! ``` +pub(crate) const DEFAULT_CAPACITY: usize = 8 * 1024; + #[doc(inline)] pub use std::io::{Error, ErrorKind, Result, SeekFrom}; pub use buf_read::{BufRead, Lines}; pub use buf_reader::BufReader; +pub use buf_writer::BufWriter; pub use copy::copy; pub use empty::{empty, Empty}; pub use read::Read; @@ -39,6 +42,7 @@ pub use write::Write; mod buf_read; mod buf_reader; +mod buf_writer; mod copy; mod empty; mod read; From 48d4c9b18d7269b4e1314f1ef2a986bb45c3d47a Mon Sep 17 00:00:00 2001 From: Kirill Mironov Date: Tue, 20 Aug 2019 18:30:33 +0300 Subject: [PATCH 2/5] begin implementing BufWriter --- src/io/buf_writer.rs | 312 ++++++++++++++++++++++++++++++++++++------- src/io/mod.rs | 2 +- 2 files changed, 265 insertions(+), 49 deletions(-) diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index 57550637..9775a078 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -7,35 +7,138 @@ use crate::io::Write; const DEFAULT_CAPACITY: usize = 8 * 1024; - -pub struct BufWriter { - inner: Option, +/// Wraps a writer and buffers its output. +/// +/// It can be excessively inefficient to work directly with something that +/// implements [`Write`]. For example, every call to +/// [`write`][`TcpStream::write`] on [`TcpStream`] results in a system call. A +/// `BufWriter` keeps an in-memory buffer of data and writes it to an underlying +/// writer in large, infrequent batches. +/// +/// `BufWriter` can improve the speed of programs that make *small* and +/// *repeated* write calls to the same file or network socket. It does not +/// help when writing very large amounts at once, or writing just one or a few +/// times. It also provides no advantage when writing to a destination that is +/// in memory, like a `Vec`. +/// +/// When the `BufWriter` is dropped, the contents of its buffer will be written +/// out. However, any errors that happen in the process of flushing the buffer +/// when the writer is dropped will be ignored. Code that wishes to handle such +/// errors must manually call [`flush`] before the writer is dropped. +/// +/// This type is an async version of [`std::io::BufReader`]. +/// +/// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html +/// +/// # Examples +/// +/// Let's write the numbers one through ten to a [`TcpStream`]: +/// +/*/ ```no_run +/ use std::io::prelude::*; +/ use std::net::TcpStream; +/ +/ let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap(); +/ +/ for i in 0..10 { +/ stream.write(&[i+1]).unwrap(); +/ } +/ ```*/ +/// +/// Because we're not buffering, we write each one in turn, incurring the +/// overhead of a system call per byte written. We can fix this with a +/// `BufWriter`: +/// +/*/ ```no_run +/ use std::io::prelude::*; +/ use std::io::BufWriter; +/ use std::net::TcpStream; +/ +/ let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); +/ +/ for i in 0..10 { +/ stream.write(&[i+1]).unwrap(); +/ } +/ ```*/ +/// +/// By wrapping the stream with a `BufWriter`, these ten writes are all grouped +/// together by the buffer, and will all be written out in one system call when +/// the `stream` is dropped. +/// +/// [`Write`]: ../../std/io/trait.Write.html +/// [`TcpStream::write`]: ../../std/net/struct.TcpStream.html#method.write +/// [`TcpStream`]: ../../std/net/struct.TcpStream.html +/// [`flush`]: #method.flush +pub struct BufWriter { + inner: W, buf: Vec, - panicked: bool, + written: usize, } impl BufWriter { - pin_utils::unsafe_pinned!(inner: Option); - pin_utils::unsafe_unpinned!(panicked: bool); + pin_utils::unsafe_pinned!(inner: W); + pin_utils::unsafe_unpinned!(buf: Vec); + /// Creates a new `BufWriter` with a default buffer capacity. The default is currently 8 KB, + /// but may change in the future. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// ``` pub fn new(inner: W) -> BufWriter { BufWriter::with_capacity(DEFAULT_CAPACITY, inner) } + /// Creates a new `BufWriter` with the specified buffer capacity. + /// + /// # Examples + /// + /// Creating a buffer with a buffer of a hundred bytes. + /// + /// ```no_run + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let stream = TcpStream::connect("127.0.0.1:34254").unwrap(); + /// let mut buffer = BufWriter::with_capacity(100, stream); + /// ``` pub fn with_capacity(capacity: usize, inner: W) -> BufWriter { BufWriter { - inner: Some(inner), + inner, buf: Vec::with_capacity(capacity), - panicked: false, + written: 0, } } pub fn get_ref(&self) -> &W { - self.inner.as_ref().unwrap() + &self.inner } pub fn get_mut(&mut self) -> &mut W { - self.inner.as_mut().unwrap() + &mut self.inner + } + + pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + self.inner() + } + + /// Consumes BufWriter, returning the underlying writer + /// + /// This method will not write leftover data, it will be lost. + /// For method that will attempt to write before returning the writer see [`poll_into_inner`] + /// + /// [`poll_into_inner`]: #method.poll_into_inner + pub fn into_inner(self) -> W { + self.inner + } + + pub fn poll_into_inner(mut self: Pin<&mut Self>, cx: Context<'_>) -> Poll> { + unimplemented!("poll into inner method") } pub fn buffer(&self) -> &[u8] { @@ -46,17 +149,13 @@ impl BufWriter { let Self { inner, buf, - panicked + written } = Pin::get_mut(self); - let mut panicked = Pin::new(panicked); - let mut written = 0; + let mut inner = Pin::new(inner); let len = buf.len(); let mut ret = Ok(()); - while written < len { - *panicked = true; - let r = Pin::new(inner.as_mut().unwrap()); - *panicked = false; - match r.poll_write(cx, &buf[written..]) { + while *written < len { + match inner.as_mut().poll_write(cx, &buf[*written..]) { Poll::Ready(Ok(0)) => { ret = Err(io::Error::new( io::ErrorKind::WriteZero, @@ -64,7 +163,7 @@ impl BufWriter { )); break; } - Poll::Ready(Ok(n)) => written += n, + Poll::Ready(Ok(n)) => *written += n, Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::Interrupted => {} Poll::Ready(Err(e)) => { ret = Err(e); @@ -73,22 +172,12 @@ impl BufWriter { Poll::Pending => return Poll::Pending, } } - if written > 0 { - buf.drain(..written); + if *written > 0 { + buf.drain(..*written); } + *written = 0; Poll::Ready(ret) } - - pub fn poll_into_inner( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - //TODO: Fix 'expected function, found struct `IntoInnerError`' compiler error - ) -> Poll> { - match ready!(self.as_mut().poll_flush_buf(cx)) { - Ok(()) => Poll::Ready(Ok(self.inner().take().unwrap())), - Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, ""))) - } - } } impl AsyncWrite for BufWriter { @@ -97,33 +186,28 @@ impl AsyncWrite for BufWriter { cx: &mut Context, buf: &[u8], ) -> Poll> { - let panicked = self.as_mut().panicked(); - if self.as_ref().buf.len() + buf.len() > self.as_ref().buf.capacity() { - match ready!(self.as_mut().poll_flush_buf(cx)) { - Ok(()) => {}, - Err(e) => return Poll::Ready(Err(e)) - } + if self.buf.len() + buf.len() > self.buf.capacity() { + ready!(self.as_mut().poll_flush_buf(cx))?; } - if buf.len() >= self.as_ref().buf.capacity() { - *panicked = true; - let r = ready!(self.as_mut().poll_write(cx, buf)); - *panicked = false; - return Poll::Ready(r) + if buf.len() >= self.buf.capacity() { + self.inner().poll_write(cx, buf) } else { - return Poll::Ready(ready!(self.as_ref().buf.write(buf).poll())) + self.buf().write(buf).poll() } } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - unimplemented!() + ready!(self.as_mut().poll_flush_buf(cx))?; + self.inner().poll_flush(cx) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - unimplemented!() + ready!(self.as_mut().poll_flush_buf(cx))?; + self.inner().poll_close(cx) } } -impl fmt::Debug for BufWriter { +impl fmt::Debug for BufWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") .field("writer", &self.inner) @@ -135,6 +219,138 @@ impl fmt::Debug for BufWriter { } } -mod tests { +pub struct LineWriter { + inner: BufWriter, + need_flush: bool, +} +impl LineWriter { + pin_utils::unsafe_pinned!(inner: BufWriter); + pin_utils::unsafe_unpinned!(need_flush: bool); + /// Creates a new `LineWriter`. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::fs::File; + /// use async_std::io::LineWriter; + /// + /// fn main() -> std::io::Result<()> { + /// async_std::task::block_on(async { + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::new(file); + /// Ok(()) + /// }) + /// } + /// ``` + pub fn new(inner: W) -> LineWriter { + // Lines typically aren't that long, don't use a giant buffer + LineWriter::with_capacity(1024, inner) + } + + /// Creates a new `LineWriter` with a specified capacity for the internal + /// buffer. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::fs::File; + /// use async_std::io::LineWriter; + /// + /// fn main() -> std::io::Result<()> { + /// async_std::task::block_on(async { + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::with_capacity(100, file); + /// Ok(()) + /// }) + /// } + /// ``` + pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { + LineWriter { + inner: BufWriter::with_capacity(capacity, inner), + need_flush: false, + } + } + + pub fn get_ref(&self) -> &W { + self.inner.get_ref() + } + + pub fn get_mut(&mut self) -> &mut W { + self.inner.get_mut() + } + + pub fn into_inner(self) -> W { + self.inner.into_inner() + } +} + +impl AsyncWrite for LineWriter { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + if self.need_flush { + self.as_mut().poll_flush(cx)?; + } + + let i = match memchr::memrchr(b'\n', buf) { + Some(i) => i, + None => return self.as_mut().inner().as_mut().poll_write(cx, buf) + }; + + let n = ready!(self.as_mut().inner().as_mut().poll_write(cx, &buf[..=i])?); + *self.as_mut().need_flush() = true; + if ready!(self.as_mut().poll_flush(cx)).is_err() || n != 1 + 1 { + return Poll::Ready(Ok(n)) + } + match ready!(self.inner().poll_write(cx, &buf[i + 1..])) { + Ok(i) => Poll::Ready(Ok(n + 1)), + Err(_) => Poll::Ready(Ok(n)) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.as_mut().inner().poll_flush(cx)?; + *self.need_flush() = false; + Poll::Ready(Ok(())) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.as_mut().inner().poll_flush(cx)?; + self.inner().poll_close(cx) + } +} + +impl fmt::Debug for LineWriter where W: fmt::Debug { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("LineWriter") + .field("writer", &self.inner.inner) + .field("buffer", + &format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity())) + .finish() + } +} + + +mod tests { + use crate::prelude::*; + use crate::task; + use super::LineWriter; + + #[test] + fn test_line_buffer() { + task::block_on(async { + let mut writer = LineWriter::new(Vec::new()); + writer.write(&[0]).await.unwrap(); + assert_eq!(*writer.get_ref(), []); + writer.write(&[1]).await.unwrap(); + assert_eq!(*writer.get_ref(), []); + writer.flush().await.unwrap(); + assert_eq!(*writer.get_ref(), [0, 1]); + writer.write(&[0, b'\n', 1, b'\n', 2]).await.unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']); + writer.flush().await.unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]); + writer.write(&[3, b'\n']).await.unwrap(); + assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); + }) + } } \ No newline at end of file diff --git a/src/io/mod.rs b/src/io/mod.rs index 53dab906..7584b68c 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -28,7 +28,7 @@ pub use std::io::{Error, ErrorKind, Result, SeekFrom}; pub use buf_read::{BufRead, Lines}; pub use buf_reader::BufReader; -pub use buf_writer::BufWriter; +pub use buf_writer::{BufWriter, LineWriter}; pub use copy::copy; pub use empty::{empty, Empty}; pub use read::Read; From a0759a6c53a5d542277b7ccab7039c82143f9606 Mon Sep 17 00:00:00 2001 From: Kirill Mironov Date: Tue, 20 Aug 2019 19:06:20 +0300 Subject: [PATCH 3/5] Implement LineWriter and BufWriter --- src/io/buf_writer.rs | 347 +++++++++++++++++++++++++++++++++---------- src/io/mod.rs | 2 - 2 files changed, 269 insertions(+), 80 deletions(-) diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index 9775a078..415660e1 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -1,9 +1,8 @@ use crate::task::{Context, Poll}; -use futures::{ready, AsyncWrite, Future, Stream}; -use std::io::{self, IntoInnerError}; -use std::pin::Pin; +use futures::{ready, AsyncWrite}; use std::fmt; -use crate::io::Write; +use std::io; +use std::pin::Pin; const DEFAULT_CAPACITY: usize = 8 * 1024; @@ -34,41 +33,50 @@ const DEFAULT_CAPACITY: usize = 8 * 1024; /// /// Let's write the numbers one through ten to a [`TcpStream`]: /// -/*/ ```no_run -/ use std::io::prelude::*; -/ use std::net::TcpStream; -/ -/ let mut stream = TcpStream::connect("127.0.0.1:34254").unwrap(); -/ -/ for i in 0..10 { -/ stream.write(&[i+1]).unwrap(); -/ } -/ ```*/ +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// use async_std::net::TcpStream; +/// use async_std::io::Write; +/// +/// let mut stream = TcpStream::connect("127.0.0.1:34254").await?; +/// +/// for i in 0..10 { +/// let arr = [i+1]; +/// stream.write(&arr).await?; +/// } +/// # +/// # Ok(()) }) } +/// ``` /// /// Because we're not buffering, we write each one in turn, incurring the /// overhead of a system call per byte written. We can fix this with a /// `BufWriter`: /// -/*/ ```no_run -/ use std::io::prelude::*; -/ use std::io::BufWriter; -/ use std::net::TcpStream; -/ -/ let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); -/ -/ for i in 0..10 { -/ stream.write(&[i+1]).unwrap(); -/ } -/ ```*/ +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// use async_std::io::BufWriter; +/// use async_std::net::TcpStream; +/// use async_std::io::Write; +/// +/// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); +/// for i in 0..10 { +/// let arr = [i+1]; +/// stream.write(&arr).await?; +/// }; +/// # +/// # Ok(()) }) } +/// ``` /// /// By wrapping the stream with a `BufWriter`, these ten writes are all grouped /// together by the buffer, and will all be written out in one system call when /// the `stream` is dropped. /// -/// [`Write`]: ../../std/io/trait.Write.html -/// [`TcpStream::write`]: ../../std/net/struct.TcpStream.html#method.write -/// [`TcpStream`]: ../../std/net/struct.TcpStream.html -/// [`flush`]: #method.flush +/// [`Write`]: trait.Write.html +/// [`TcpStream::write`]: ../net/struct.TcpStream.html#method.write +/// [`TcpStream`]: ../net/struct.TcpStream.html +/// [`flush`]: trait.Write.html#tymethod.flush pub struct BufWriter { inner: W, buf: Vec, @@ -85,10 +93,15 @@ impl BufWriter { /// # Examples /// /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; /// use async_std::net::TcpStream; /// - /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").unwrap()); + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); + /// # + /// # Ok(()) }) } /// ``` pub fn new(inner: W) -> BufWriter { BufWriter::with_capacity(DEFAULT_CAPACITY, inner) @@ -101,11 +114,16 @@ impl BufWriter { /// Creating a buffer with a buffer of a hundred bytes. /// /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; /// use async_std::net::TcpStream; /// - /// let stream = TcpStream::connect("127.0.0.1:34254").unwrap(); + /// let stream = TcpStream::connect("127.0.0.1:34254").await?; /// let mut buffer = BufWriter::with_capacity(100, stream); + /// # + /// # Ok(()) }) } /// ``` pub fn with_capacity(capacity: usize, inner: W) -> BufWriter { BufWriter { @@ -115,17 +133,54 @@ impl BufWriter { } } + /// Gets a reference to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); + /// + /// // We can use reference just like buffer + /// let reference = buffer.get_ref(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_ref(&self) -> &W { &self.inner } + /// Gets a mutable reference to the underlying writer. + /// + /// It is inadvisable to directly write to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let mut buffer = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); + /// + /// // We can use reference just like buffer + /// let reference = buffer.get_mut(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_mut(&mut self) -> &mut W { &mut self.inner } - pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { - self.inner() - } + // pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> { + // self.inner() + // } /// Consumes BufWriter, returning the underlying writer /// @@ -137,19 +192,41 @@ impl BufWriter { self.inner } - pub fn poll_into_inner(mut self: Pin<&mut Self>, cx: Context<'_>) -> Poll> { - unimplemented!("poll into inner method") - } + // pub fn poll_into_inner(self: Pin<&mut Self>, _cx: Context<'_>) -> Poll> { + // unimplemented!("poll into inner method") + // } + /// Returns a reference to the internally buffered data. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::BufWriter; + /// use async_std::net::TcpStream; + /// + /// let buf_writer = BufWriter::new(TcpStream::connect("127.0.0.1:34251").await?); + /// + /// // See how many bytes are currently buffered + /// let bytes_buffered = buf_writer.buffer().len(); + /// # + /// # Ok(()) }) } + /// ``` pub fn buffer(&self) -> &[u8] { &self.buf } - pub fn poll_flush_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + /// Poll buffer flushing until completion + /// + /// This is used in types that wrap around BufWrite, one such example: [`LineWriter`] + /// + /// [`LineWriter`]: struct.LineWriter.html + pub fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let Self { inner, buf, - written + written, } = Pin::get_mut(self); let mut inner = Pin::new(inner); let len = buf.len(); @@ -183,7 +260,7 @@ impl BufWriter { impl AsyncWrite for BufWriter { fn poll_write( mut self: Pin<&mut Self>, - cx: &mut Context, + cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { if self.buf.len() + buf.len() > self.buf.capacity() { @@ -192,16 +269,16 @@ impl AsyncWrite for BufWriter { if buf.len() >= self.buf.capacity() { self.inner().poll_write(cx, buf) } else { - self.buf().write(buf).poll() + Pin::new(&mut *self.buf()).poll_write(cx, buf) } } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll_flush_buf(cx))?; self.inner().poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll_flush_buf(cx))?; self.inner().poll_close(cx) } @@ -211,14 +288,78 @@ impl fmt::Debug for BufWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") .field("writer", &self.inner) - .field( - "buf", - &self.buf - ) + .field("buf", &self.buf) .finish() } } +/// Wraps a writer and buffers output to it, flushing whenever a newline +/// (`0x0a`, `'\n'`) is detected. +/// +/// The [`BufWriter`][bufwriter] struct wraps a writer and buffers its output. +/// But it only does this batched write when it goes out of scope, or when the +/// internal buffer is full. Sometimes, you'd prefer to write each line as it's +/// completed, rather than the entire buffer at once. Enter `LineWriter`. It +/// does exactly that. +/// +/// Like [`BufWriter`][bufwriter], a `LineWriter`’s buffer will also be flushed when the +/// `LineWriter` goes out of scope or when its internal buffer is full. +/// +/// [bufwriter]: struct.BufWriter.html +/// +/// If there's still a partial line in the buffer when the `LineWriter` is +/// dropped, it will flush those contents. +/// +/// This type is an async version of [`std::io::LineWriter`] +/// +/// [`std::io::LineWriter`]: https://doc.rust-lang.org/std/io/struct.LineWriter.html +/// +/// # Examples +/// +/// We can use `LineWriter` to write one line at a time, significantly +/// reducing the number of actual writes to the file. +/// +/// ```no_run +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// use async_std::io::{LineWriter, Write}; +/// use async_std::fs::{File, self}; +/// let road_not_taken = b"I shall be telling this with a sigh +/// Somewhere ages and ages hence: +/// Two roads diverged in a wood, and I - +/// I took the one less traveled by, +/// And that has made all the difference."; +/// +/// let file = File::create("poem.txt").await?; +/// let mut file = LineWriter::new(file); +/// +/// file.write_all(b"I shall be telling this with a sigh").await?; +/// +/// // No bytes are written until a newline is encountered (or +/// // the internal buffer is filled). +/// assert_eq!(fs::read_to_string("poem.txt").await?, ""); +/// file.write_all(b"\n").await?; +/// assert_eq!( +/// fs::read_to_string("poem.txt").await?, +/// "I shall be telling this with a sigh\n", +/// ); +/// +/// // Write the rest of the poem. +/// file.write_all(b"Somewhere ages and ages hence: +/// Two roads diverged in a wood, and I - +/// I took the one less traveled by, +/// And that has made all the difference.").await?; +/// +/// // The last line of the poem doesn't end in a newline, so +/// // we have to flush or drop the `LineWriter` to finish +/// // writing. +/// file.flush().await?; +/// +/// // Confirm the whole poem was written. +/// assert_eq!(fs::read("poem.txt").await?, &road_not_taken[..]); +/// # +/// # Ok(()) }) } +/// ``` pub struct LineWriter { inner: BufWriter, need_flush: bool, @@ -232,16 +373,15 @@ impl LineWriter { /// # Examples /// /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::fs::File; /// use async_std::io::LineWriter; /// - /// fn main() -> std::io::Result<()> { - /// async_std::task::block_on(async { - /// let file = File::create("poem.txt").await?; - /// let file = LineWriter::new(file); - /// Ok(()) - /// }) - /// } + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::new(file); + /// # + /// # Ok(()) }) } /// ``` pub fn new(inner: W) -> LineWriter { // Lines typically aren't that long, don't use a giant buffer @@ -254,16 +394,15 @@ impl LineWriter { /// # Examples /// /// ```no_run + /// # #![feature(async_await)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::fs::File; /// use async_std::io::LineWriter; /// - /// fn main() -> std::io::Result<()> { - /// async_std::task::block_on(async { - /// let file = File::create("poem.txt").await?; - /// let file = LineWriter::with_capacity(100, file); - /// Ok(()) - /// }) - /// } + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::with_capacity(100, file); + /// # + /// # Ok(()) }) } /// ``` pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { LineWriter { @@ -272,68 +411,120 @@ impl LineWriter { } } + /// Gets a reference to the underlying writer. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::LineWriter; + /// use async_std::fs::File; + /// + /// let file = File::create("poem.txt").await?; + /// let file = LineWriter::new(file); + /// + /// // We can use reference just like buffer + /// let reference = file.get_ref(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_ref(&self) -> &W { self.inner.get_ref() } + /// Gets a mutable reference to the underlying writer. + /// + /// Caution must be taken when calling methods on the mutable reference + /// returned as extra writes could corrupt the output stream. + /// + /// # Examples + /// + /// ```no_run + /// # #![feature(async_await)] + /// # #![allow(unused_mut)] + /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + /// use async_std::io::LineWriter; + /// use async_std::fs::File; + /// + /// let file = File::create("poem.txt").await?; + /// let mut file = LineWriter::new(file); + /// + /// // We can use reference just like buffer + /// let reference = file.get_mut(); + /// # + /// # Ok(()) }) } + /// ``` pub fn get_mut(&mut self) -> &mut W { self.inner.get_mut() } + //TODO: Implement flushing before returning inner + #[allow(missing_docs)] pub fn into_inner(self) -> W { self.inner.into_inner() } } impl AsyncWrite for LineWriter { - fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll> { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { if self.need_flush { - self.as_mut().poll_flush(cx)?; + let _ = self.as_mut().poll_flush(cx)?; } let i = match memchr::memrchr(b'\n', buf) { Some(i) => i, - None => return self.as_mut().inner().as_mut().poll_write(cx, buf) + None => return self.as_mut().inner().as_mut().poll_write(cx, buf), }; let n = ready!(self.as_mut().inner().as_mut().poll_write(cx, &buf[..=i])?); *self.as_mut().need_flush() = true; - if ready!(self.as_mut().poll_flush(cx)).is_err() || n != 1 + 1 { - return Poll::Ready(Ok(n)) + if ready!(self.as_mut().poll_flush(cx)).is_err() || n != i + 1 { + return Poll::Ready(Ok(n)); } match ready!(self.inner().poll_write(cx, &buf[i + 1..])) { - Ok(i) => Poll::Ready(Ok(n + 1)), - Err(_) => Poll::Ready(Ok(n)) + Ok(_) => Poll::Ready(Ok(n + 1)), + Err(_) => Poll::Ready(Ok(n)), } } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.as_mut().inner().poll_flush(cx)?; + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let _ = self.as_mut().inner().poll_flush(cx)?; *self.need_flush() = false; Poll::Ready(Ok(())) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.as_mut().inner().poll_flush(cx)?; + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let _ = self.as_mut().inner().poll_flush(cx)?; self.inner().poll_close(cx) } } -impl fmt::Debug for LineWriter where W: fmt::Debug { +impl fmt::Debug for LineWriter +where + W: fmt::Debug, +{ fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("LineWriter") .field("writer", &self.inner.inner) - .field("buffer", - &format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity())) + .field( + "buffer", + &format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity()), + ) .finish() } } - mod tests { + #![allow(unused_imports)] + use super::LineWriter; use crate::prelude::*; use crate::task; - use super::LineWriter; #[test] fn test_line_buffer() { @@ -353,4 +544,4 @@ mod tests { assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); }) } -} \ No newline at end of file +} diff --git a/src/io/mod.rs b/src/io/mod.rs index 7584b68c..35cf58ce 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -21,8 +21,6 @@ //! # Ok(()) }) } //! ``` -pub(crate) const DEFAULT_CAPACITY: usize = 8 * 1024; - #[doc(inline)] pub use std::io::{Error, ErrorKind, Result, SeekFrom}; From d23af83189f872e60baaccf17ca9783d2b7cb948 Mon Sep 17 00:00:00 2001 From: Kirill Mironov Date: Tue, 24 Sep 2019 15:59:46 +0300 Subject: [PATCH 4/5] removed LineWriter and implemented requested changes Signed-off-by: Kirill Mironov --- src/io/buf_writer.rs | 333 +++++++++++-------------------------------- src/io/mod.rs | 2 +- 2 files changed, 82 insertions(+), 253 deletions(-) diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index 415660e1..62efbae6 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -1,5 +1,6 @@ use crate::task::{Context, Poll}; -use futures::{ready, AsyncWrite}; +use futures_core::ready; +use futures_io::{AsyncWrite, AsyncSeek, SeekFrom}; use std::fmt; use std::io; use std::pin::Pin; @@ -34,10 +35,9 @@ const DEFAULT_CAPACITY: usize = 8 * 1024; /// Let's write the numbers one through ten to a [`TcpStream`]: /// /// ```no_run -/// # #![feature(async_await)] /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::net::TcpStream; -/// use async_std::io::Write; +/// use async_std::prelude::*; /// /// let mut stream = TcpStream::connect("127.0.0.1:34254").await?; /// @@ -54,11 +54,10 @@ const DEFAULT_CAPACITY: usize = 8 * 1024; /// `BufWriter`: /// /// ```no_run -/// # #![feature(async_await)] /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; /// use async_std::net::TcpStream; -/// use async_std::io::Write; +/// use async_std::prelude::*; /// /// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); /// for i in 0..10 { @@ -83,7 +82,7 @@ pub struct BufWriter { written: usize, } -impl BufWriter { +impl BufWriter { pin_utils::unsafe_pinned!(inner: W); pin_utils::unsafe_unpinned!(buf: Vec); @@ -93,7 +92,6 @@ impl BufWriter { /// # Examples /// /// ```no_run - /// # #![feature(async_await)] /// # #![allow(unused_mut)] /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; @@ -114,7 +112,6 @@ impl BufWriter { /// Creating a buffer with a buffer of a hundred bytes. /// /// ```no_run - /// # #![feature(async_await)] /// # #![allow(unused_mut)] /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; @@ -138,7 +135,6 @@ impl BufWriter { /// # Examples /// /// ```no_run - /// # #![feature(async_await)] /// # #![allow(unused_mut)] /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; @@ -162,7 +158,6 @@ impl BufWriter { /// # Examples /// /// ```no_run - /// # #![feature(async_await)] /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; /// use async_std::net::TcpStream; @@ -201,7 +196,6 @@ impl BufWriter { /// # Examples /// /// ```no_run - /// # #![feature(async_await)] /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// use async_std::io::BufWriter; /// use async_std::net::TcpStream; @@ -222,13 +216,13 @@ impl BufWriter { /// This is used in types that wrap around BufWrite, one such example: [`LineWriter`] /// /// [`LineWriter`]: struct.LineWriter.html - pub fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let Self { inner, buf, written, - } = Pin::get_mut(self); - let mut inner = Pin::new(inner); + } = unsafe { Pin::get_unchecked_mut(self) }; + let mut inner = unsafe { Pin::new_unchecked(inner) }; let len = buf.len(); let mut ret = Ok(()); while *written < len { @@ -257,7 +251,7 @@ impl BufWriter { } } -impl AsyncWrite for BufWriter { +impl AsyncWrite for BufWriter { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -284,7 +278,7 @@ impl AsyncWrite for BufWriter { } } -impl fmt::Debug for BufWriter { +impl fmt::Debug for BufWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") .field("writer", &self.inner) @@ -293,255 +287,90 @@ impl fmt::Debug for BufWriter { } } -/// Wraps a writer and buffers output to it, flushing whenever a newline -/// (`0x0a`, `'\n'`) is detected. -/// -/// The [`BufWriter`][bufwriter] struct wraps a writer and buffers its output. -/// But it only does this batched write when it goes out of scope, or when the -/// internal buffer is full. Sometimes, you'd prefer to write each line as it's -/// completed, rather than the entire buffer at once. Enter `LineWriter`. It -/// does exactly that. -/// -/// Like [`BufWriter`][bufwriter], a `LineWriter`’s buffer will also be flushed when the -/// `LineWriter` goes out of scope or when its internal buffer is full. -/// -/// [bufwriter]: struct.BufWriter.html -/// -/// If there's still a partial line in the buffer when the `LineWriter` is -/// dropped, it will flush those contents. -/// -/// This type is an async version of [`std::io::LineWriter`] -/// -/// [`std::io::LineWriter`]: https://doc.rust-lang.org/std/io/struct.LineWriter.html -/// -/// # Examples -/// -/// We can use `LineWriter` to write one line at a time, significantly -/// reducing the number of actual writes to the file. -/// -/// ```no_run -/// # #![feature(async_await)] -/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { -/// use async_std::io::{LineWriter, Write}; -/// use async_std::fs::{File, self}; -/// let road_not_taken = b"I shall be telling this with a sigh -/// Somewhere ages and ages hence: -/// Two roads diverged in a wood, and I - -/// I took the one less traveled by, -/// And that has made all the difference."; -/// -/// let file = File::create("poem.txt").await?; -/// let mut file = LineWriter::new(file); -/// -/// file.write_all(b"I shall be telling this with a sigh").await?; -/// -/// // No bytes are written until a newline is encountered (or -/// // the internal buffer is filled). -/// assert_eq!(fs::read_to_string("poem.txt").await?, ""); -/// file.write_all(b"\n").await?; -/// assert_eq!( -/// fs::read_to_string("poem.txt").await?, -/// "I shall be telling this with a sigh\n", -/// ); -/// -/// // Write the rest of the poem. -/// file.write_all(b"Somewhere ages and ages hence: -/// Two roads diverged in a wood, and I - -/// I took the one less traveled by, -/// And that has made all the difference.").await?; -/// -/// // The last line of the poem doesn't end in a newline, so -/// // we have to flush or drop the `LineWriter` to finish -/// // writing. -/// file.flush().await?; -/// -/// // Confirm the whole poem was written. -/// assert_eq!(fs::read("poem.txt").await?, &road_not_taken[..]); -/// # -/// # Ok(()) }) } -/// ``` -pub struct LineWriter { - inner: BufWriter, - need_flush: bool, -} +impl AsyncSeek for BufWriter { + /// Seek to the offset, in bytes, in the underlying writer. + /// + /// Seeking always writes out the internal buffer before seeking. -impl LineWriter { - pin_utils::unsafe_pinned!(inner: BufWriter); - pin_utils::unsafe_unpinned!(need_flush: bool); - /// Creates a new `LineWriter`. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// use async_std::fs::File; - /// use async_std::io::LineWriter; - /// - /// let file = File::create("poem.txt").await?; - /// let file = LineWriter::new(file); - /// # - /// # Ok(()) }) } - /// ``` - pub fn new(inner: W) -> LineWriter { - // Lines typically aren't that long, don't use a giant buffer - LineWriter::with_capacity(1024, inner) - } - - /// Creates a new `LineWriter` with a specified capacity for the internal - /// buffer. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// use async_std::fs::File; - /// use async_std::io::LineWriter; - /// - /// let file = File::create("poem.txt").await?; - /// let file = LineWriter::with_capacity(100, file); - /// # - /// # Ok(()) }) } - /// ``` - pub fn with_capacity(capacity: usize, inner: W) -> LineWriter { - LineWriter { - inner: BufWriter::with_capacity(capacity, inner), - need_flush: false, - } - } - - /// Gets a reference to the underlying writer. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # #![allow(unused_mut)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// use async_std::io::LineWriter; - /// use async_std::fs::File; - /// - /// let file = File::create("poem.txt").await?; - /// let file = LineWriter::new(file); - /// - /// // We can use reference just like buffer - /// let reference = file.get_ref(); - /// # - /// # Ok(()) }) } - /// ``` - pub fn get_ref(&self) -> &W { - self.inner.get_ref() - } - - /// Gets a mutable reference to the underlying writer. - /// - /// Caution must be taken when calling methods on the mutable reference - /// returned as extra writes could corrupt the output stream. - /// - /// # Examples - /// - /// ```no_run - /// # #![feature(async_await)] - /// # #![allow(unused_mut)] - /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { - /// use async_std::io::LineWriter; - /// use async_std::fs::File; - /// - /// let file = File::create("poem.txt").await?; - /// let mut file = LineWriter::new(file); - /// - /// // We can use reference just like buffer - /// let reference = file.get_mut(); - /// # - /// # Ok(()) }) } - /// ``` - pub fn get_mut(&mut self) -> &mut W { - self.inner.get_mut() - } - - //TODO: Implement flushing before returning inner - #[allow(missing_docs)] - pub fn into_inner(self) -> W { - self.inner.into_inner() - } -} - -impl AsyncWrite for LineWriter { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - if self.need_flush { - let _ = self.as_mut().poll_flush(cx)?; - } - - let i = match memchr::memrchr(b'\n', buf) { - Some(i) => i, - None => return self.as_mut().inner().as_mut().poll_write(cx, buf), - }; - - let n = ready!(self.as_mut().inner().as_mut().poll_write(cx, &buf[..=i])?); - *self.as_mut().need_flush() = true; - if ready!(self.as_mut().poll_flush(cx)).is_err() || n != i + 1 { - return Poll::Ready(Ok(n)); - } - match ready!(self.inner().poll_write(cx, &buf[i + 1..])) { - Ok(_) => Poll::Ready(Ok(n + 1)), - Err(_) => Poll::Ready(Ok(n)), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let _ = self.as_mut().inner().poll_flush(cx)?; - *self.need_flush() = false; - Poll::Ready(Ok(())) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let _ = self.as_mut().inner().poll_flush(cx)?; - self.inner().poll_close(cx) - } -} - -impl fmt::Debug for LineWriter -where - W: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.debug_struct("LineWriter") - .field("writer", &self.inner.inner) - .field( - "buffer", - &format_args!("{}/{}", self.inner.buf.len(), self.inner.buf.capacity()), - ) - .finish() + fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + ready!(self.as_mut().poll_flush_buf(cx))?; + self.inner().poll_seek(cx, pos) } } mod tests { #![allow(unused_imports)] - use super::LineWriter; + + use super::BufWriter; use crate::prelude::*; use crate::task; + use crate::io::{self, SeekFrom}; #[test] - fn test_line_buffer() { + fn test_buffered_writer() { task::block_on(async { - let mut writer = LineWriter::new(Vec::new()); - writer.write(&[0]).await.unwrap(); - assert_eq!(*writer.get_ref(), []); - writer.write(&[1]).await.unwrap(); - assert_eq!(*writer.get_ref(), []); - writer.flush().await.unwrap(); + let inner = Vec::new(); + let mut writer = BufWriter::with_capacity(2, inner); + + writer.write(&[0, 1]).await.unwrap(); + assert_eq!(writer.buffer(), []); assert_eq!(*writer.get_ref(), [0, 1]); - writer.write(&[0, b'\n', 1, b'\n', 2]).await.unwrap(); - assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n']); + + writer.write(&[2]).await.unwrap(); + assert_eq!(writer.buffer(), [2]); + assert_eq!(*writer.get_ref(), [0, 1]); + + writer.write(&[3]).await.unwrap(); + assert_eq!(writer.buffer(), [2, 3]); + assert_eq!(*writer.get_ref(), [0, 1]); + writer.flush().await.unwrap(); - assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2]); - writer.write(&[3, b'\n']).await.unwrap(); - assert_eq!(*writer.get_ref(), [0, 1, 0, b'\n', 1, b'\n', 2, 3, b'\n']); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + writer.write(&[4]).await.unwrap(); + writer.write(&[5]).await.unwrap(); + assert_eq!(writer.buffer(), [4, 5]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3]); + + writer.write(&[6]).await.unwrap(); + assert_eq!(writer.buffer(), [6]); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5]); + + writer.write(&[7, 8]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8]); + + writer.write(&[9, 10, 11]).await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + + writer.flush().await.unwrap(); + assert_eq!(writer.buffer(), []); + assert_eq!(*writer.get_ref(), [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); + }) + } + + #[test] + fn test_buffered_writer_inner_into_inner_does_not_flush() { + task::block_on(async { + let mut w = BufWriter::with_capacity(3, Vec::new()); + w.write(&[0, 1]).await.unwrap(); + assert_eq!(*w.get_ref(), []); + let w = w.into_inner(); + assert_eq!(w, []); + }) + } + + #[test] + fn test_buffered_writer_seek() { + task::block_on(async { + let mut w = BufWriter::with_capacity(3, io::Cursor::new(Vec::new())); + w.write_all(&[0, 1, 2, 3, 4, 5]).await.unwrap(); + w.write_all(&[6, 7]).await.unwrap(); + assert_eq!(w.seek(SeekFrom::Current(0)).await.ok(), Some(8)); + assert_eq!(&w.get_ref().get_ref()[..], &[0, 1, 2, 3, 4, 5, 6, 7][..]); + assert_eq!(w.seek(SeekFrom::Start(2)).await.ok(), Some(2)); }) } } diff --git a/src/io/mod.rs b/src/io/mod.rs index 3e485ea7..7a942854 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -25,7 +25,7 @@ pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; pub use buf_read::{BufRead, Lines}; pub use buf_reader::BufReader; -pub use buf_writer::{BufWriter, LineWriter}; +pub use buf_writer::BufWriter; pub use copy::copy; pub use cursor::Cursor; pub use empty::{empty, Empty}; From 63154f5b7affe73585c340de19b071a60643784b Mon Sep 17 00:00:00 2001 From: Kirill Mironov Date: Tue, 24 Sep 2019 16:33:02 +0300 Subject: [PATCH 5/5] cargo fmt --- src/io/buf_writer.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index 62efbae6..2b7545a1 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -1,6 +1,6 @@ use crate::task::{Context, Poll}; use futures_core::ready; -use futures_io::{AsyncWrite, AsyncSeek, SeekFrom}; +use futures_io::{AsyncSeek, AsyncWrite, SeekFrom}; use std::fmt; use std::io; use std::pin::Pin; @@ -292,7 +292,11 @@ impl AsyncSeek for BufWriter { /// /// Seeking always writes out the internal buffer before seeking. - fn poll_seek(mut self: Pin<&mut Self>, cx: &mut Context<'_>, pos: SeekFrom) -> Poll> { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { ready!(self.as_mut().poll_flush_buf(cx))?; self.inner().poll_seek(cx, pos) } @@ -302,9 +306,9 @@ mod tests { #![allow(unused_imports)] use super::BufWriter; + use crate::io::{self, SeekFrom}; use crate::prelude::*; use crate::task; - use crate::io::{self, SeekFrom}; #[test] fn test_buffered_writer() {