forked from mirror/async-std
feat(io): implement Read::take
This commit is contained in:
parent
f6a2393fb5
commit
75dc819b2f
2 changed files with 256 additions and 0 deletions
|
@ -3,6 +3,7 @@ mod read_exact;
|
|||
mod read_to_end;
|
||||
mod read_to_string;
|
||||
mod read_vectored;
|
||||
mod take;
|
||||
|
||||
use read::ReadFuture;
|
||||
use read_exact::ReadExactFuture;
|
||||
|
@ -261,6 +262,47 @@ extension_trait! {
|
|||
{
|
||||
ReadExactFuture { reader: self, buf }
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
Creates an adaptor which will read at most `limit` bytes from it.
|
||||
|
||||
This function returns a new instance of `Read` which will read at most
|
||||
`limit` bytes, after which it will always return EOF ([`Ok(0)`]). Any
|
||||
read errors will not count towards the number of bytes read and future
|
||||
calls to [`read()`] may succeed.
|
||||
|
||||
# Examples
|
||||
|
||||
[`File`]s implement `Read`:
|
||||
|
||||
[`File`]: ../fs/struct.File.html
|
||||
[`Ok(0)`]: ../../std/result/enum.Result.html#variant.Ok
|
||||
[`read()`]: tymethod.read
|
||||
|
||||
```no_run
|
||||
use async_std::io::prelude::*;
|
||||
use async_std::fs::File;
|
||||
|
||||
fn main() -> std::io::Result<()> {
|
||||
async_std::task::block_on(async {
|
||||
let f = File::open("foo.txt").await?;
|
||||
let mut buffer = [0; 5];
|
||||
|
||||
// read at most five bytes
|
||||
let mut handle = f.take(5);
|
||||
|
||||
handle.read(&mut buffer).await?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
```
|
||||
"#]
|
||||
fn take(self, limit: u64) -> take::Take<Self>
|
||||
where
|
||||
Self: Sized,
|
||||
{
|
||||
take::Take { inner: self, limit }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Read + Unpin + ?Sized> Read for Box<T> {
|
||||
|
|
214
src/io/read/take.rs
Normal file
214
src/io/read/take.rs
Normal file
|
@ -0,0 +1,214 @@
|
|||
use std::cmp;
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::io::{self, Read};
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
/// Reader adaptor which limits the bytes read from an underlying reader.
|
||||
///
|
||||
/// This struct is generally created by calling [`take`] on a reader.
|
||||
/// Please see the documentation of [`take`] for more details.
|
||||
///
|
||||
/// [`take`]: trait.Read.html#method.take
|
||||
#[derive(Debug)]
|
||||
pub struct Take<T> {
|
||||
pub(crate) inner: T,
|
||||
pub(crate) limit: u64,
|
||||
}
|
||||
|
||||
impl<T> Take<T> {
|
||||
/// Returns the number of bytes that can be read before this instance will
|
||||
/// return EOF.
|
||||
///
|
||||
/// # Note
|
||||
///
|
||||
/// This instance may reach `EOF` after reading fewer bytes than indicated by
|
||||
/// this method if the underlying [`Read`] instance reaches EOF.
|
||||
///
|
||||
/// [`Read`]: trait.Read.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use async_std::io;
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::fs::File;
|
||||
///
|
||||
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
|
||||
/// let f = File::open("foo.txt").await?;
|
||||
///
|
||||
/// // read at most five bytes
|
||||
/// let handle = f.take(5);
|
||||
///
|
||||
/// println!("limit: {}", handle.limit());
|
||||
/// Ok(())
|
||||
/// }) }
|
||||
/// ```
|
||||
pub fn limit(&self) -> u64 {
|
||||
self.limit
|
||||
}
|
||||
|
||||
/// Sets the number of bytes that can be read before this instance will
|
||||
/// return EOF. This is the same as constructing a new `Take` instance, so
|
||||
/// the amount of bytes read and the previous limit value don't matter when
|
||||
/// calling this method.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use async_std::io;
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::fs::File;
|
||||
///
|
||||
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
|
||||
/// let f = File::open("foo.txt").await?;
|
||||
///
|
||||
/// // read at most five bytes
|
||||
/// let mut handle = f.take(5);
|
||||
/// handle.set_limit(10);
|
||||
///
|
||||
/// assert_eq!(handle.limit(), 10);
|
||||
/// Ok(())
|
||||
/// }) }
|
||||
/// ```
|
||||
pub fn set_limit(&mut self, limit: u64) {
|
||||
self.limit = limit;
|
||||
}
|
||||
|
||||
/// Consumes the `Take`, returning the wrapped reader.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use async_std::io;
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::fs::File;
|
||||
///
|
||||
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
|
||||
/// let file = File::open("foo.txt").await?;
|
||||
///
|
||||
/// let mut buffer = [0; 5];
|
||||
/// let mut handle = file.take(5);
|
||||
/// handle.read(&mut buffer).await?;
|
||||
///
|
||||
/// let file = handle.into_inner();
|
||||
/// Ok(())
|
||||
/// }) }
|
||||
/// ```
|
||||
pub fn into_inner(self) -> T {
|
||||
self.inner
|
||||
}
|
||||
|
||||
/// Gets a reference to the underlying reader.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use async_std::io;
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::fs::File;
|
||||
///
|
||||
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
|
||||
/// let file = File::open("foo.txt").await?;
|
||||
///
|
||||
/// let mut buffer = [0; 5];
|
||||
/// let mut handle = file.take(5);
|
||||
/// handle.read(&mut buffer).await?;
|
||||
///
|
||||
/// let file = handle.get_ref();
|
||||
/// Ok(())
|
||||
/// }) }
|
||||
/// ```
|
||||
pub fn get_ref(&self) -> &T {
|
||||
&self.inner
|
||||
}
|
||||
|
||||
/// Gets a mutable reference to the underlying reader.
|
||||
///
|
||||
/// Care should be taken to avoid modifying the internal I/O state of the
|
||||
/// underlying reader as doing so may corrupt the internal limit of this
|
||||
/// `Take`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
/// use async_std::io;
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::fs::File;
|
||||
///
|
||||
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
|
||||
/// let file = File::open("foo.txt").await?;
|
||||
///
|
||||
/// let mut buffer = [0; 5];
|
||||
/// let mut handle = file.take(5);
|
||||
/// handle.read(&mut buffer).await?;
|
||||
///
|
||||
/// let file = handle.get_mut();
|
||||
/// Ok(())
|
||||
/// }) }
|
||||
/// ```
|
||||
pub fn get_mut(&mut self) -> &mut T {
|
||||
&mut self.inner
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Read + Unpin> Read for Take<T> {
|
||||
/// Attempt to read from the `AsyncRead` into `buf`.
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let Self { inner, limit } = &mut *self;
|
||||
take_read_internal(Pin::new(inner), cx, buf, limit)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn take_read_internal<R: Read + ?Sized>(
|
||||
mut rd: Pin<&mut R>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut [u8],
|
||||
limit: &mut u64,
|
||||
) -> Poll<io::Result<usize>> {
|
||||
// Don't call into inner reader at all at EOF because it may still block
|
||||
if *limit == 0 {
|
||||
return Poll::Ready(Ok(0));
|
||||
}
|
||||
|
||||
let max = cmp::min(buf.len() as u64, *limit) as usize;
|
||||
|
||||
match futures_core::ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
|
||||
Ok(n) => {
|
||||
*limit -= n as u64;
|
||||
Poll::Ready(Ok(n))
|
||||
}
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::io;
|
||||
use crate::prelude::*;
|
||||
use crate::task;
|
||||
|
||||
#[test]
|
||||
fn test_take_basics() -> std::io::Result<()> {
|
||||
let source: io::Cursor<Vec<u8>> = io::Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
|
||||
|
||||
task::block_on(async move {
|
||||
let mut buffer = [0u8; 5];
|
||||
|
||||
// read at most five bytes
|
||||
let mut handle = source.take(5);
|
||||
|
||||
handle.read(&mut buffer).await?;
|
||||
assert_eq!(buffer, [0, 1, 2, 3, 4]);
|
||||
|
||||
// check that the we are actually at the end
|
||||
assert_eq!(handle.read(&mut buffer).await.unwrap(), 0);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue