diff --git a/src/io/read/bytes.rs b/src/io/read/bytes.rs new file mode 100644 index 00000000..42245243 --- /dev/null +++ b/src/io/read/bytes.rs @@ -0,0 +1,60 @@ +use std::pin::Pin; + +use crate::io::{self, Read}; +use crate::stream::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream over `u8` values of a reader. +/// +/// This struct is generally created by calling [`bytes`] on a reader. +/// Please see the documentation of [`bytes`] for more details. +/// +/// [`bytes`]: trait.Read.html#method.bytes +#[derive(Debug)] +pub struct Bytes { + pub(crate) inner: T, +} + +impl Stream for Bytes { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut byte = 0; + + let rd = Pin::new(&mut self.inner); + + match futures_core::ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) { + Ok(0) => Poll::Ready(None), + Ok(..) => Poll::Ready(Some(Ok(byte))), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Poll::Pending, + Err(e) => Poll::Ready(Some(Err(e))), + } + } +} + +#[cfg(test)] +mod tests { + use crate::io; + use crate::prelude::*; + use crate::task; + + #[test] + fn test_bytes_basics() -> std::io::Result<()> { + task::block_on(async move { + let raw: Vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8]; + let source: io::Cursor> = io::Cursor::new(raw.clone()); + + let mut s = source.bytes(); + + // TODO(@dignifiedquire): Use collect, once it is stable. + let mut result = Vec::new(); + while let Some(byte) = s.next().await { + result.push(byte?); + } + + assert_eq!(result, raw); + + Ok(()) + }) + } +} diff --git a/src/io/read/chain.rs b/src/io/read/chain.rs new file mode 100644 index 00000000..09517cca --- /dev/null +++ b/src/io/read/chain.rs @@ -0,0 +1,197 @@ +use crate::io::IoSliceMut; +use std::fmt; +use std::pin::Pin; + +use crate::io::{self, BufRead, Read}; +use crate::task::{Context, Poll}; + +/// Adaptor to chain together two readers. +/// +/// This struct is generally created by calling [`chain`] on a reader. +/// Please see the documentation of [`chain`] for more details. +/// +/// [`chain`]: trait.Read.html#method.chain +pub struct Chain { + pub(crate) first: T, + pub(crate) second: U, + pub(crate) done_first: bool, +} + +impl Chain { + /// Consumes the `Chain`, returning the wrapped readers. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; + /// + /// let chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.into_inner(); + /// # + /// # Ok(()) }) } + /// ``` + pub fn into_inner(self) -> (T, U) { + (self.first, self.second) + } + + /// Gets references to the underlying readers in this `Chain`. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; + /// + /// let chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.get_ref(); + /// # + /// # Ok(()) }) } + /// ``` + pub fn get_ref(&self) -> (&T, &U) { + (&self.first, &self.second) + } + + /// Gets mutable references to the underlying readers in this `Chain`. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying readers as doing so may corrupt the internal state of this + /// `Chain`. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; + /// + /// let mut chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.get_mut(); + /// # + /// # Ok(()) }) } + /// ``` + pub fn get_mut(&mut self) -> (&mut T, &mut U) { + (&mut self.first, &mut self.second) + } +} + +impl fmt::Debug for Chain { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Chain") + .field("t", &self.first) + .field("u", &self.second) + .finish() + } +} + +impl Read for Chain { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if !self.done_first { + let rd = Pin::new(&mut self.first); + + match futures_core::ready!(rd.poll_read(cx, buf)) { + Ok(0) if !buf.is_empty() => self.done_first = true, + Ok(n) => return Poll::Ready(Ok(n)), + Err(err) => return Poll::Ready(Err(err)), + } + } + + let rd = Pin::new(&mut self.second); + rd.poll_read(cx, buf) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + if !self.done_first { + let rd = Pin::new(&mut self.first); + + match futures_core::ready!(rd.poll_read_vectored(cx, bufs)) { + Ok(0) if !bufs.is_empty() => self.done_first = true, + Ok(n) => return Poll::Ready(Ok(n)), + Err(err) => return Poll::Ready(Err(err)), + } + } + + let rd = Pin::new(&mut self.second); + rd.poll_read_vectored(cx, bufs) + } +} + +impl BufRead for Chain { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + first, + second, + done_first, + } = unsafe { self.get_unchecked_mut() }; + + if !*done_first { + let first = unsafe { Pin::new_unchecked(first) }; + match futures_core::ready!(first.poll_fill_buf(cx)) { + Ok(buf) if buf.is_empty() => { + *done_first = true; + } + Ok(buf) => return Poll::Ready(Ok(buf)), + Err(err) => return Poll::Ready(Err(err)), + } + } + + let second = unsafe { Pin::new_unchecked(second) }; + second.poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + if !self.done_first { + let rd = Pin::new(&mut self.first); + rd.consume(amt) + } else { + let rd = Pin::new(&mut self.second); + rd.consume(amt) + } + } +} + +#[cfg(test)] +mod tests { + use crate::io; + use crate::prelude::*; + use crate::task; + + #[test] + fn test_chain_basics() -> std::io::Result<()> { + let source1: io::Cursor> = io::Cursor::new(vec![0, 1, 2]); + let source2: io::Cursor> = io::Cursor::new(vec![3, 4, 5]); + + task::block_on(async move { + let mut buffer = Vec::new(); + + let mut source = source1.chain(source2); + + assert_eq!(6, source.read_to_end(&mut buffer).await?); + assert_eq!(buffer, vec![0, 1, 2, 3, 4, 5]); + + Ok(()) + }) + } +} diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs index 5a2276e3..9c81f959 100644 --- a/src/io/read/mod.rs +++ b/src/io/read/mod.rs @@ -1,8 +1,11 @@ +mod bytes; +mod chain; mod read; mod read_exact; mod read_to_end; mod read_to_string; mod read_vectored; +mod take; use read::ReadFuture; use read_exact::ReadExactFuture; @@ -261,6 +264,156 @@ extension_trait! { { ReadExactFuture { reader: self, buf } } + + #[doc = r#" + Creates an adaptor which will read at most `limit` bytes from it. + + This function returns a new instance of `Read` which will read at most + `limit` bytes, after which it will always return EOF ([`Ok(0)`]). Any + read errors will not count towards the number of bytes read and future + calls to [`read()`] may succeed. + + # Examples + + [`File`]s implement `Read`: + + [`File`]: ../fs/struct.File.html + [`Ok(0)`]: ../../std/result/enum.Result.html#variant.Ok + [`read()`]: tymethod.read + + ```no_run + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use async_std::io::prelude::*; + use async_std::fs::File; + + let f = File::open("foo.txt").await?; + let mut buffer = [0; 5]; + + // read at most five bytes + let mut handle = f.take(5); + + handle.read(&mut buffer).await?; + # + # Ok(()) }) } + ``` + "#] + fn take(self, limit: u64) -> take::Take + where + Self: Sized, + { + take::Take { inner: self, limit } + } + + #[doc = r#" + Creates a "by reference" adaptor for this instance of `Read`. + + The returned adaptor also implements `Read` and will simply borrow this + current reader. + + # Examples + + [`File`][file]s implement `Read`: + + [file]: ../fs/struct.File.html + + ```no_run + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::fs::File; + + let mut f = File::open("foo.txt").await?; + let mut buffer = Vec::new(); + let mut other_buffer = Vec::new(); + + { + let reference = f.by_ref(); + + // read at most 5 bytes + reference.take(5).read_to_end(&mut buffer).await?; + + } // drop our &mut reference so we can use f again + + // original file still usable, read the rest + f.read_to_end(&mut other_buffer).await?; + # + # Ok(()) }) } + ``` + "#] + fn by_ref(&mut self) -> &mut Self where Self: Sized { self } + + + #[doc = r#" + Transforms this `Read` instance to a `Stream` over its bytes. + + The returned type implements `Stream` where the `Item` is + `Result`. + The yielded item is `Ok` if a byte was successfully read and `Err` + otherwise. EOF is mapped to returning `None` from this iterator. + + # Examples + + [`File`][file]s implement `Read`: + + [file]: ../fs/struct.File.html + + ```no_run + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::fs::File; + + let f = File::open("foo.txt").await?; + let mut s = f.bytes(); + + while let Some(byte) = s.next().await { + println!("{}", byte.unwrap()); + } + # + # Ok(()) }) } + ``` + "#] + fn bytes(self) -> bytes::Bytes where Self: Sized { + bytes::Bytes { inner: self } + } + + #[doc = r#" + Creates an adaptor which will chain this stream with another. + + The returned `Read` instance will first read all bytes from this object + until EOF is encountered. Afterwards the output is equivalent to the + output of `next`. + + # Examples + + [`File`][file]s implement `Read`: + + [file]: ../fs/struct.File.html + + ```no_run + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::fs::File; + + let f1 = File::open("foo.txt").await?; + let f2 = File::open("bar.txt").await?; + + let mut handle = f1.chain(f2); + let mut buffer = String::new(); + + // read the value into a String. We could use any Read method here, + // this is just one example. + handle.read_to_string(&mut buffer).await?; + # + # Ok(()) }) } + ``` + "#] + fn chain(self, next: R) -> chain::Chain where Self: Sized { + chain::Chain { first: self, second: next, done_first: false } + } + } impl Read for Box { @@ -307,3 +460,31 @@ extension_trait! { } } } + +#[cfg(test)] +mod tests { + use crate::io; + use crate::prelude::*; + + #[test] + fn test_read_by_ref() -> io::Result<()> { + crate::task::block_on(async { + let mut f = io::Cursor::new(vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8]); + let mut buffer = Vec::new(); + let mut other_buffer = Vec::new(); + + { + let reference = f.by_ref(); + + // read at most 5 bytes + assert_eq!(reference.take(5).read_to_end(&mut buffer).await?, 5); + assert_eq!(&buffer, &[0, 1, 2, 3, 4]) + } // drop our &mut reference so we can use f again + + // original file still usable, read the rest + assert_eq!(f.read_to_end(&mut other_buffer).await?, 4); + assert_eq!(&other_buffer, &[5, 6, 7, 8]); + Ok(()) + }) + } +} diff --git a/src/io/read/take.rs b/src/io/read/take.rs new file mode 100644 index 00000000..def4e240 --- /dev/null +++ b/src/io/read/take.rs @@ -0,0 +1,242 @@ +use std::cmp; +use std::pin::Pin; + +use crate::io::{self, BufRead, Read}; +use crate::task::{Context, Poll}; + +/// Reader adaptor which limits the bytes read from an underlying reader. +/// +/// This struct is generally created by calling [`take`] on a reader. +/// Please see the documentation of [`take`] for more details. +/// +/// [`take`]: trait.Read.html#method.take +#[derive(Debug)] +pub struct Take { + pub(crate) inner: T, + pub(crate) limit: u64, +} + +impl Take { + /// Returns the number of bytes that can be read before this instance will + /// return EOF. + /// + /// # Note + /// + /// This instance may reach `EOF` after reading fewer bytes than indicated by + /// this method if the underlying [`Read`] instance reaches EOF. + /// + /// [`Read`]: trait.Read.html + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// let f = File::open("foo.txt").await?; + /// + /// // read at most five bytes + /// let handle = f.take(5); + /// + /// println!("limit: {}", handle.limit()); + /// # + /// # Ok(()) }) } + /// ``` + pub fn limit(&self) -> u64 { + self.limit + } + + /// Sets the number of bytes that can be read before this instance will + /// return EOF. This is the same as constructing a new `Take` instance, so + /// the amount of bytes read and the previous limit value don't matter when + /// calling this method. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// let f = File::open("foo.txt").await?; + /// + /// // read at most five bytes + /// let mut handle = f.take(5); + /// handle.set_limit(10); + /// + /// assert_eq!(handle.limit(), 10); + /// # + /// # Ok(()) }) } + /// ``` + pub fn set_limit(&mut self, limit: u64) { + self.limit = limit; + } + + /// Consumes the `Take`, returning the wrapped reader. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// let file = File::open("foo.txt").await?; + /// + /// let mut buffer = [0; 5]; + /// let mut handle = file.take(5); + /// handle.read(&mut buffer).await?; + /// + /// let file = handle.into_inner(); + /// # + /// # Ok(()) }) } + /// ``` + pub fn into_inner(self) -> T { + self.inner + } + + /// Gets a reference to the underlying reader. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// let file = File::open("foo.txt").await?; + /// + /// let mut buffer = [0; 5]; + /// let mut handle = file.take(5); + /// handle.read(&mut buffer).await?; + /// + /// let file = handle.get_ref(); + /// # + /// # Ok(()) }) } + /// ``` + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying reader as doing so may corrupt the internal limit of this + /// `Take`. + /// + /// # Examples + /// + /// ```no_run + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// let file = File::open("foo.txt").await?; + /// + /// let mut buffer = [0; 5]; + /// let mut handle = file.take(5); + /// handle.read(&mut buffer).await?; + /// + /// let file = handle.get_mut(); + /// # + /// # Ok(()) }) } + /// ``` + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl Read for Take { + /// Attempt to read from the `AsyncRead` into `buf`. + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let Self { inner, limit } = &mut *self; + take_read_internal(Pin::new(inner), cx, buf, limit) + } +} + +pub fn take_read_internal( + mut rd: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut [u8], + limit: &mut u64, +) -> Poll> { + // Don't call into inner reader at all at EOF because it may still block + if *limit == 0 { + return Poll::Ready(Ok(0)); + } + + let max = cmp::min(buf.len() as u64, *limit) as usize; + + match futures_core::ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) { + Ok(n) => { + *limit -= n as u64; + Poll::Ready(Ok(n)) + } + Err(e) => Poll::Ready(Err(e)), + } +} + +impl BufRead for Take { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { inner, limit } = unsafe { self.get_unchecked_mut() }; + let inner = unsafe { Pin::new_unchecked(inner) }; + + if *limit == 0 { + return Poll::Ready(Ok(&[])); + } + + match futures_core::ready!(inner.poll_fill_buf(cx)) { + Ok(buf) => { + let cap = cmp::min(buf.len() as u64, *limit) as usize; + Poll::Ready(Ok(&buf[..cap])) + } + Err(e) => Poll::Ready(Err(e)), + } + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + // Don't let callers reset the limit by passing an overlarge value + let amt = cmp::min(amt as u64, self.limit) as usize; + self.limit -= amt as u64; + + let rd = Pin::new(&mut self.inner); + rd.consume(amt); + } +} + +#[cfg(test)] +mod tests { + use crate::io; + use crate::prelude::*; + use crate::task; + + #[test] + fn test_take_basics() -> std::io::Result<()> { + let source: io::Cursor> = io::Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]); + + task::block_on(async move { + let mut buffer = [0u8; 5]; + + // read at most five bytes + let mut handle = source.take(5); + + handle.read(&mut buffer).await?; + assert_eq!(buffer, [0, 1, 2, 3, 4]); + + // check that the we are actually at the end + assert_eq!(handle.read(&mut buffer).await.unwrap(), 0); + + Ok(()) + }) + } +}