245: feat: missing Read and Write methods r=yoshuawuyts a=dignifiedquire

Ref: #131 

- [x] Read::by_ref
- [x] Read::bytes
- [x] Read::chain
- [x] Read::take
- [ ] Write::by_ref
- [ ] ~~Write::write_fmt~~ postponed until https://github.com/async-rs/async-std/issues/247 is solved

Needs fixing:

- [x] `BufRead` for `Take`
- [x] `BufRead` for `Chain`
- [ ] `by_ref` conflict between `Read` and `Write`, unable to add both, as they conflict, and the current state of things does not allow to differentiate between the two.


Co-authored-by: dignifiedquire <dignifiedquire@users.noreply.github.com>
This commit is contained in:
bors[bot] 2019-09-28 00:51:06 +00:00 committed by GitHub
commit 383f7e9322
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 680 additions and 0 deletions

60
src/io/read/bytes.rs Normal file
View file

@ -0,0 +1,60 @@
use std::pin::Pin;
use crate::io::{self, Read};
use crate::stream::stream::Stream;
use crate::task::{Context, Poll};
/// A stream over `u8` values of a reader.
///
/// This struct is generally created by calling [`bytes`] on a reader.
/// Please see the documentation of [`bytes`] for more details.
///
/// [`bytes`]: trait.Read.html#method.bytes
#[derive(Debug)]
pub struct Bytes<T> {
pub(crate) inner: T,
}
impl<T: Read + Unpin> Stream for Bytes<T> {
type Item = io::Result<u8>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut byte = 0;
let rd = Pin::new(&mut self.inner);
match futures_core::ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
Ok(0) => Poll::Ready(None),
Ok(..) => Poll::Ready(Some(Ok(byte))),
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Poll::Pending,
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}
#[cfg(test)]
mod tests {
use crate::io;
use crate::prelude::*;
use crate::task;
#[test]
fn test_bytes_basics() -> std::io::Result<()> {
task::block_on(async move {
let raw: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8];
let source: io::Cursor<Vec<u8>> = io::Cursor::new(raw.clone());
let mut s = source.bytes();
// TODO(@dignifiedquire): Use collect, once it is stable.
let mut result = Vec::new();
while let Some(byte) = s.next().await {
result.push(byte?);
}
assert_eq!(result, raw);
Ok(())
})
}
}

197
src/io/read/chain.rs Normal file
View file

@ -0,0 +1,197 @@
use crate::io::IoSliceMut;
use std::fmt;
use std::pin::Pin;
use crate::io::{self, BufRead, Read};
use crate::task::{Context, Poll};
/// Adaptor to chain together two readers.
///
/// This struct is generally created by calling [`chain`] on a reader.
/// Please see the documentation of [`chain`] for more details.
///
/// [`chain`]: trait.Read.html#method.chain
pub struct Chain<T, U> {
pub(crate) first: T,
pub(crate) second: U,
pub(crate) done_first: bool,
}
impl<T, U> Chain<T, U> {
/// Consumes the `Chain`, returning the wrapped readers.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::fs::File;
///
/// let foo_file = File::open("foo.txt").await?;
/// let bar_file = File::open("bar.txt").await?;
///
/// let chain = foo_file.chain(bar_file);
/// let (foo_file, bar_file) = chain.into_inner();
/// #
/// # Ok(()) }) }
/// ```
pub fn into_inner(self) -> (T, U) {
(self.first, self.second)
}
/// Gets references to the underlying readers in this `Chain`.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::fs::File;
///
/// let foo_file = File::open("foo.txt").await?;
/// let bar_file = File::open("bar.txt").await?;
///
/// let chain = foo_file.chain(bar_file);
/// let (foo_file, bar_file) = chain.get_ref();
/// #
/// # Ok(()) }) }
/// ```
pub fn get_ref(&self) -> (&T, &U) {
(&self.first, &self.second)
}
/// Gets mutable references to the underlying readers in this `Chain`.
///
/// Care should be taken to avoid modifying the internal I/O state of the
/// underlying readers as doing so may corrupt the internal state of this
/// `Chain`.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::fs::File;
///
/// let foo_file = File::open("foo.txt").await?;
/// let bar_file = File::open("bar.txt").await?;
///
/// let mut chain = foo_file.chain(bar_file);
/// let (foo_file, bar_file) = chain.get_mut();
/// #
/// # Ok(()) }) }
/// ```
pub fn get_mut(&mut self) -> (&mut T, &mut U) {
(&mut self.first, &mut self.second)
}
}
impl<T: fmt::Debug, U: fmt::Debug> fmt::Debug for Chain<T, U> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Chain")
.field("t", &self.first)
.field("u", &self.second)
.finish()
}
}
impl<T: Read + Unpin, U: Read + Unpin> Read for Chain<T, U> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
if !self.done_first {
let rd = Pin::new(&mut self.first);
match futures_core::ready!(rd.poll_read(cx, buf)) {
Ok(0) if !buf.is_empty() => self.done_first = true,
Ok(n) => return Poll::Ready(Ok(n)),
Err(err) => return Poll::Ready(Err(err)),
}
}
let rd = Pin::new(&mut self.second);
rd.poll_read(cx, buf)
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
if !self.done_first {
let rd = Pin::new(&mut self.first);
match futures_core::ready!(rd.poll_read_vectored(cx, bufs)) {
Ok(0) if !bufs.is_empty() => self.done_first = true,
Ok(n) => return Poll::Ready(Ok(n)),
Err(err) => return Poll::Ready(Err(err)),
}
}
let rd = Pin::new(&mut self.second);
rd.poll_read_vectored(cx, bufs)
}
}
impl<T: BufRead + Unpin, U: BufRead + Unpin> BufRead for Chain<T, U> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let Self {
first,
second,
done_first,
} = unsafe { self.get_unchecked_mut() };
if !*done_first {
let first = unsafe { Pin::new_unchecked(first) };
match futures_core::ready!(first.poll_fill_buf(cx)) {
Ok(buf) if buf.is_empty() => {
*done_first = true;
}
Ok(buf) => return Poll::Ready(Ok(buf)),
Err(err) => return Poll::Ready(Err(err)),
}
}
let second = unsafe { Pin::new_unchecked(second) };
second.poll_fill_buf(cx)
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
if !self.done_first {
let rd = Pin::new(&mut self.first);
rd.consume(amt)
} else {
let rd = Pin::new(&mut self.second);
rd.consume(amt)
}
}
}
#[cfg(test)]
mod tests {
use crate::io;
use crate::prelude::*;
use crate::task;
#[test]
fn test_chain_basics() -> std::io::Result<()> {
let source1: io::Cursor<Vec<u8>> = io::Cursor::new(vec![0, 1, 2]);
let source2: io::Cursor<Vec<u8>> = io::Cursor::new(vec![3, 4, 5]);
task::block_on(async move {
let mut buffer = Vec::new();
let mut source = source1.chain(source2);
assert_eq!(6, source.read_to_end(&mut buffer).await?);
assert_eq!(buffer, vec![0, 1, 2, 3, 4, 5]);
Ok(())
})
}
}

View file

@ -1,8 +1,11 @@
mod bytes;
mod chain;
mod read;
mod read_exact;
mod read_to_end;
mod read_to_string;
mod read_vectored;
mod take;
use read::ReadFuture;
use read_exact::ReadExactFuture;
@ -261,6 +264,156 @@ extension_trait! {
{
ReadExactFuture { reader: self, buf }
}
#[doc = r#"
Creates an adaptor which will read at most `limit` bytes from it.
This function returns a new instance of `Read` which will read at most
`limit` bytes, after which it will always return EOF ([`Ok(0)`]). Any
read errors will not count towards the number of bytes read and future
calls to [`read()`] may succeed.
# Examples
[`File`]s implement `Read`:
[`File`]: ../fs/struct.File.html
[`Ok(0)`]: ../../std/result/enum.Result.html#variant.Ok
[`read()`]: tymethod.read
```no_run
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
#
use async_std::io::prelude::*;
use async_std::fs::File;
let f = File::open("foo.txt").await?;
let mut buffer = [0; 5];
// read at most five bytes
let mut handle = f.take(5);
handle.read(&mut buffer).await?;
#
# Ok(()) }) }
```
"#]
fn take(self, limit: u64) -> take::Take<Self>
where
Self: Sized,
{
take::Take { inner: self, limit }
}
#[doc = r#"
Creates a "by reference" adaptor for this instance of `Read`.
The returned adaptor also implements `Read` and will simply borrow this
current reader.
# Examples
[`File`][file]s implement `Read`:
[file]: ../fs/struct.File.html
```no_run
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::fs::File;
let mut f = File::open("foo.txt").await?;
let mut buffer = Vec::new();
let mut other_buffer = Vec::new();
{
let reference = f.by_ref();
// read at most 5 bytes
reference.take(5).read_to_end(&mut buffer).await?;
} // drop our &mut reference so we can use f again
// original file still usable, read the rest
f.read_to_end(&mut other_buffer).await?;
#
# Ok(()) }) }
```
"#]
fn by_ref(&mut self) -> &mut Self where Self: Sized { self }
#[doc = r#"
Transforms this `Read` instance to a `Stream` over its bytes.
The returned type implements `Stream` where the `Item` is
`Result<u8, io::Error>`.
The yielded item is `Ok` if a byte was successfully read and `Err`
otherwise. EOF is mapped to returning `None` from this iterator.
# Examples
[`File`][file]s implement `Read`:
[file]: ../fs/struct.File.html
```no_run
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::fs::File;
let f = File::open("foo.txt").await?;
let mut s = f.bytes();
while let Some(byte) = s.next().await {
println!("{}", byte.unwrap());
}
#
# Ok(()) }) }
```
"#]
fn bytes(self) -> bytes::Bytes<Self> where Self: Sized {
bytes::Bytes { inner: self }
}
#[doc = r#"
Creates an adaptor which will chain this stream with another.
The returned `Read` instance will first read all bytes from this object
until EOF is encountered. Afterwards the output is equivalent to the
output of `next`.
# Examples
[`File`][file]s implement `Read`:
[file]: ../fs/struct.File.html
```no_run
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::fs::File;
let f1 = File::open("foo.txt").await?;
let f2 = File::open("bar.txt").await?;
let mut handle = f1.chain(f2);
let mut buffer = String::new();
// read the value into a String. We could use any Read method here,
// this is just one example.
handle.read_to_string(&mut buffer).await?;
#
# Ok(()) }) }
```
"#]
fn chain<R: Read>(self, next: R) -> chain::Chain<Self, R> where Self: Sized {
chain::Chain { first: self, second: next, done_first: false }
}
}
impl<T: Read + Unpin + ?Sized> Read for Box<T> {
@ -307,3 +460,31 @@ extension_trait! {
}
}
}
#[cfg(test)]
mod tests {
use crate::io;
use crate::prelude::*;
#[test]
fn test_read_by_ref() -> io::Result<()> {
crate::task::block_on(async {
let mut f = io::Cursor::new(vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8]);
let mut buffer = Vec::new();
let mut other_buffer = Vec::new();
{
let reference = f.by_ref();
// read at most 5 bytes
assert_eq!(reference.take(5).read_to_end(&mut buffer).await?, 5);
assert_eq!(&buffer, &[0, 1, 2, 3, 4])
} // drop our &mut reference so we can use f again
// original file still usable, read the rest
assert_eq!(f.read_to_end(&mut other_buffer).await?, 4);
assert_eq!(&other_buffer, &[5, 6, 7, 8]);
Ok(())
})
}
}

242
src/io/read/take.rs Normal file
View file

@ -0,0 +1,242 @@
use std::cmp;
use std::pin::Pin;
use crate::io::{self, BufRead, Read};
use crate::task::{Context, Poll};
/// Reader adaptor which limits the bytes read from an underlying reader.
///
/// This struct is generally created by calling [`take`] on a reader.
/// Please see the documentation of [`take`] for more details.
///
/// [`take`]: trait.Read.html#method.take
#[derive(Debug)]
pub struct Take<T> {
pub(crate) inner: T,
pub(crate) limit: u64,
}
impl<T> Take<T> {
/// Returns the number of bytes that can be read before this instance will
/// return EOF.
///
/// # Note
///
/// This instance may reach `EOF` after reading fewer bytes than indicated by
/// this method if the underlying [`Read`] instance reaches EOF.
///
/// [`Read`]: trait.Read.html
///
/// # Examples
///
/// ```no_run
/// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::fs::File;
///
/// let f = File::open("foo.txt").await?;
///
/// // read at most five bytes
/// let handle = f.take(5);
///
/// println!("limit: {}", handle.limit());
/// #
/// # Ok(()) }) }
/// ```
pub fn limit(&self) -> u64 {
self.limit
}
/// Sets the number of bytes that can be read before this instance will
/// return EOF. This is the same as constructing a new `Take` instance, so
/// the amount of bytes read and the previous limit value don't matter when
/// calling this method.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::fs::File;
///
/// let f = File::open("foo.txt").await?;
///
/// // read at most five bytes
/// let mut handle = f.take(5);
/// handle.set_limit(10);
///
/// assert_eq!(handle.limit(), 10);
/// #
/// # Ok(()) }) }
/// ```
pub fn set_limit(&mut self, limit: u64) {
self.limit = limit;
}
/// Consumes the `Take`, returning the wrapped reader.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::fs::File;
///
/// let file = File::open("foo.txt").await?;
///
/// let mut buffer = [0; 5];
/// let mut handle = file.take(5);
/// handle.read(&mut buffer).await?;
///
/// let file = handle.into_inner();
/// #
/// # Ok(()) }) }
/// ```
pub fn into_inner(self) -> T {
self.inner
}
/// Gets a reference to the underlying reader.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::fs::File;
///
/// let file = File::open("foo.txt").await?;
///
/// let mut buffer = [0; 5];
/// let mut handle = file.take(5);
/// handle.read(&mut buffer).await?;
///
/// let file = handle.get_ref();
/// #
/// # Ok(()) }) }
/// ```
pub fn get_ref(&self) -> &T {
&self.inner
}
/// Gets a mutable reference to the underlying reader.
///
/// Care should be taken to avoid modifying the internal I/O state of the
/// underlying reader as doing so may corrupt the internal limit of this
/// `Take`.
///
/// # Examples
///
/// ```no_run
/// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::fs::File;
///
/// let file = File::open("foo.txt").await?;
///
/// let mut buffer = [0; 5];
/// let mut handle = file.take(5);
/// handle.read(&mut buffer).await?;
///
/// let file = handle.get_mut();
/// #
/// # Ok(()) }) }
/// ```
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}
}
impl<T: Read + Unpin> Read for Take<T> {
/// Attempt to read from the `AsyncRead` into `buf`.
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let Self { inner, limit } = &mut *self;
take_read_internal(Pin::new(inner), cx, buf, limit)
}
}
pub fn take_read_internal<R: Read + ?Sized>(
mut rd: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut [u8],
limit: &mut u64,
) -> Poll<io::Result<usize>> {
// Don't call into inner reader at all at EOF because it may still block
if *limit == 0 {
return Poll::Ready(Ok(0));
}
let max = cmp::min(buf.len() as u64, *limit) as usize;
match futures_core::ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
Ok(n) => {
*limit -= n as u64;
Poll::Ready(Ok(n))
}
Err(e) => Poll::Ready(Err(e)),
}
}
impl<T: BufRead + Unpin> BufRead for Take<T> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let Self { inner, limit } = unsafe { self.get_unchecked_mut() };
let inner = unsafe { Pin::new_unchecked(inner) };
if *limit == 0 {
return Poll::Ready(Ok(&[]));
}
match futures_core::ready!(inner.poll_fill_buf(cx)) {
Ok(buf) => {
let cap = cmp::min(buf.len() as u64, *limit) as usize;
Poll::Ready(Ok(&buf[..cap]))
}
Err(e) => Poll::Ready(Err(e)),
}
}
fn consume(mut self: Pin<&mut Self>, amt: usize) {
// Don't let callers reset the limit by passing an overlarge value
let amt = cmp::min(amt as u64, self.limit) as usize;
self.limit -= amt as u64;
let rd = Pin::new(&mut self.inner);
rd.consume(amt);
}
}
#[cfg(test)]
mod tests {
use crate::io;
use crate::prelude::*;
use crate::task;
#[test]
fn test_take_basics() -> std::io::Result<()> {
let source: io::Cursor<Vec<u8>> = io::Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
task::block_on(async move {
let mut buffer = [0u8; 5];
// read at most five bytes
let mut handle = source.take(5);
handle.read(&mut buffer).await?;
assert_eq!(buffer, [0, 1, 2, 3, 4]);
// check that the we are actually at the end
assert_eq!(handle.read(&mut buffer).await.unwrap(), 0);
Ok(())
})
}
}