From 91aeb39e4c83f6d0794fa6f40f43154544418673 Mon Sep 17 00:00:00 2001 From: Kirill Mironov Date: Tue, 20 Aug 2019 14:48:15 +0300 Subject: [PATCH] begin implementing BufWriter --- src/io/buf_writer.rs | 140 +++++++++++++++++++++++++++++++++++++++++++ src/io/mod.rs | 4 ++ 2 files changed, 144 insertions(+) create mode 100644 src/io/buf_writer.rs diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs new file mode 100644 index 0000000..5755063 --- /dev/null +++ b/src/io/buf_writer.rs @@ -0,0 +1,140 @@ +use crate::task::{Context, Poll}; +use futures::{ready, AsyncWrite, Future, Stream}; +use std::io::{self, IntoInnerError}; +use std::pin::Pin; +use std::fmt; +use crate::io::Write; + +const DEFAULT_CAPACITY: usize = 8 * 1024; + + +pub struct BufWriter { + inner: Option, + buf: Vec, + panicked: bool, +} + +impl BufWriter { + pin_utils::unsafe_pinned!(inner: Option); + pin_utils::unsafe_unpinned!(panicked: bool); + + pub fn new(inner: W) -> BufWriter { + BufWriter::with_capacity(DEFAULT_CAPACITY, inner) + } + + pub fn with_capacity(capacity: usize, inner: W) -> BufWriter { + BufWriter { + inner: Some(inner), + buf: Vec::with_capacity(capacity), + panicked: false, + } + } + + pub fn get_ref(&self) -> &W { + self.inner.as_ref().unwrap() + } + + pub fn get_mut(&mut self) -> &mut W { + self.inner.as_mut().unwrap() + } + + pub fn buffer(&self) -> &[u8] { + &self.buf + } + + pub fn poll_flush_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + inner, + buf, + panicked + } = Pin::get_mut(self); + let mut panicked = Pin::new(panicked); + let mut written = 0; + let len = buf.len(); + let mut ret = Ok(()); + while written < len { + *panicked = true; + let r = Pin::new(inner.as_mut().unwrap()); + *panicked = false; + match r.poll_write(cx, &buf[written..]) { + Poll::Ready(Ok(0)) => { + ret = Err(io::Error::new( + io::ErrorKind::WriteZero, + "Failed to write buffered data", + )); + break; + } + Poll::Ready(Ok(n)) => written += n, + Poll::Ready(Err(ref e)) if e.kind() == io::ErrorKind::Interrupted => {} + Poll::Ready(Err(e)) => { + ret = Err(e); + break; + } + Poll::Pending => return Poll::Pending, + } + } + if written > 0 { + buf.drain(..written); + } + Poll::Ready(ret) + } + + pub fn poll_into_inner( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + //TODO: Fix 'expected function, found struct `IntoInnerError`' compiler error + ) -> Poll> { + match ready!(self.as_mut().poll_flush_buf(cx)) { + Ok(()) => Poll::Ready(Ok(self.inner().take().unwrap())), + Err(e) => Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, ""))) + } + } +} + +impl AsyncWrite for BufWriter { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + let panicked = self.as_mut().panicked(); + if self.as_ref().buf.len() + buf.len() > self.as_ref().buf.capacity() { + match ready!(self.as_mut().poll_flush_buf(cx)) { + Ok(()) => {}, + Err(e) => return Poll::Ready(Err(e)) + } + } + if buf.len() >= self.as_ref().buf.capacity() { + *panicked = true; + let r = ready!(self.as_mut().poll_write(cx, buf)); + *panicked = false; + return Poll::Ready(r) + } else { + return Poll::Ready(ready!(self.as_ref().buf.write(buf).poll())) + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unimplemented!() + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unimplemented!() + } +} + +impl fmt::Debug for BufWriter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BufReader") + .field("writer", &self.inner) + .field( + "buf", + &self.buf + ) + .finish() + } +} + +mod tests { + +} \ No newline at end of file diff --git a/src/io/mod.rs b/src/io/mod.rs index 13b91d6..53dab90 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -21,11 +21,14 @@ //! # Ok(()) }) } //! ``` +pub(crate) const DEFAULT_CAPACITY: usize = 8 * 1024; + #[doc(inline)] pub use std::io::{Error, ErrorKind, Result, SeekFrom}; pub use buf_read::{BufRead, Lines}; pub use buf_reader::BufReader; +pub use buf_writer::BufWriter; pub use copy::copy; pub use empty::{empty, Empty}; pub use read::Read; @@ -39,6 +42,7 @@ pub use write::Write; mod buf_read; mod buf_reader; +mod buf_writer; mod copy; mod empty; mod read;