From d9aec105a1a5b3b18463bab32cf36d2fcd57a00a Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 27 Sep 2019 16:30:38 +0200 Subject: [PATCH] feat(io): implement Read::chain --- src/io/read/chain.rs | 199 +++++++++++++++++++++++++++++++++++++++++++ src/io/read/mod.rs | 40 ++++++++- 2 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 src/io/read/chain.rs diff --git a/src/io/read/chain.rs b/src/io/read/chain.rs new file mode 100644 index 0000000..c05a43c --- /dev/null +++ b/src/io/read/chain.rs @@ -0,0 +1,199 @@ +use crate::io::IoSliceMut; +use std::fmt; +use std::pin::Pin; + +use crate::io::{self, BufRead, Read}; +use crate::task::{Context, Poll}; + +/// Adaptor to chain together two readers. +/// +/// This struct is generally created by calling [`chain`] on a reader. +/// Please see the documentation of [`chain`] for more details. +/// +/// [`chain`]: trait.Read.html#method.chain +pub struct Chain { + pub(crate) first: T, + pub(crate) second: U, + pub(crate) done_first: bool, +} + +impl Chain { + /// Consumes the `Chain`, returning the wrapped readers. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; + /// + /// let chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.into_inner(); + /// Ok(()) + /// }) } + /// ``` + pub fn into_inner(self) -> (T, U) { + (self.first, self.second) + } + + /// Gets references to the underlying readers in this `Chain`. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; + /// + /// let chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.get_ref(); + /// Ok(()) + /// }) } + /// ``` + pub fn get_ref(&self) -> (&T, &U) { + (&self.first, &self.second) + } + + /// Gets mutable references to the underlying readers in this `Chain`. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying readers as doing so may corrupt the internal state of this + /// `Chain`. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; + /// + /// let mut chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.get_mut(); + /// Ok(()) + /// }) } + /// ``` + pub fn get_mut(&mut self) -> (&mut T, &mut U) { + (&mut self.first, &mut self.second) + } +} + +impl fmt::Debug for Chain { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Chain") + .field("t", &self.first) + .field("u", &self.second) + .finish() + } +} + +impl Read for Chain { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if !self.done_first { + let rd = Pin::new(&mut self.first); + + match futures_core::ready!(rd.poll_read(cx, buf)) { + Ok(0) if !buf.is_empty() => self.done_first = true, + Ok(n) => return Poll::Ready(Ok(n)), + Err(err) => return Poll::Ready(Err(err)), + } + } + + let rd = Pin::new(&mut self.second); + rd.poll_read(cx, buf) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + if !self.done_first { + let rd = Pin::new(&mut self.first); + + match futures_core::ready!(rd.poll_read_vectored(cx, bufs)) { + Ok(0) if !bufs.is_empty() => self.done_first = true, + Ok(n) => return Poll::Ready(Ok(n)), + Err(err) => return Poll::Ready(Err(err)), + } + } + + let rd = Pin::new(&mut self.second); + rd.poll_read_vectored(cx, bufs) + } +} + +impl BufRead for Chain { + fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // FIXME: how to make this compile? + + // let Self { + // first, + // second, + // done_first + // } = &mut *self; + + // if !*done_first { + // let rd = Pin::new(first); + + // match futures_core::ready!(rd.poll_fill_buf(cx)) { + // Ok(buf) if buf.is_empty() => { *done_first = true; } + // Ok(buf) => return Poll::Ready(Ok(buf)), + // Err(err) => return Poll::Ready(Err(err)), + // } + // } + + // let rd = Pin::new(second); + // rd.poll_fill_buf(cx) + unimplemented!() + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + if !self.done_first { + let rd = Pin::new(&mut self.first); + rd.consume(amt) + } else { + let rd = Pin::new(&mut self.second); + rd.consume(amt) + } + } +} + +#[cfg(test)] +mod tests { + use crate::io; + use crate::prelude::*; + use crate::task; + + #[test] + fn test_chain_basics() -> std::io::Result<()> { + let source1: io::Cursor> = io::Cursor::new(vec![0, 1, 2]); + let source2: io::Cursor> = io::Cursor::new(vec![3, 4, 5]); + + task::block_on(async move { + let mut buffer = Vec::new(); + + let mut source = source1.chain(source2); + + assert_eq!(6, source.read_to_end(&mut buffer).await?); + assert_eq!(buffer, vec![0, 1, 2, 3, 4, 5]); + + Ok(()) + }) + } +} diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs index c6b5bad..6b8ad71 100644 --- a/src/io/read/mod.rs +++ b/src/io/read/mod.rs @@ -1,4 +1,5 @@ mod bytes; +mod chain; mod read; mod read_exact; mod read_to_end; @@ -344,7 +345,7 @@ extension_trait! { fn by_ref(&mut self) -> &mut Self where Self: Sized { self } - #[doc=r#" + #[doc = r#" Transforms this `Read` instance to a `Stream` over its bytes. The returned type implements `Stream` where the `Item` is @@ -377,6 +378,43 @@ extension_trait! { fn bytes(self) -> bytes::Bytes where Self: Sized { bytes::Bytes { inner: self } } + + #[doc = r#" + Creates an adaptor which will chain this stream with another. + + The returned `Read` instance will first read all bytes from this object + until EOF is encountered. Afterwards the output is equivalent to the + output of `next`. + + # Examples + + [`File`][file]s implement `Read`: + + [file]: ../fs/struct.File.html + + ```no_run + use async_std::io; + use async_std::prelude::*; + use async_std::fs::File; + + fn main() -> io::Result<()> { async_std::task::block_on(async { + let f1 = File::open("foo.txt").await?; + let f2 = File::open("bar.txt").await?; + + let mut handle = f1.chain(f2); + let mut buffer = String::new(); + + // read the value into a String. We could use any Read method here, + // this is just one example. + handle.read_to_string(&mut buffer).await?; + Ok(()) + }) } + ``` + "#] + fn chain(self, next: R) -> chain::Chain where Self: Sized { + chain::Chain { first: self, second: next, done_first: false } + } + } impl Read for Box {