diff --git a/src/io/read.rs b/src/io/read/mod.rs similarity index 57% rename from src/io/read.rs rename to src/io/read/mod.rs index 4b6bb1f..3cd2350 100644 --- a/src/io/read.rs +++ b/src/io/read/mod.rs @@ -1,15 +1,21 @@ +mod read; +mod read_vectored; +mod read_to_end; +mod read_exact; +mod read_to_string; + +use read_to_string::ReadToStringFuture; +use read_to_end::{ReadToEndFuture, read_to_end_internal}; +use read::ReadFuture; +use read_vectored::ReadVectoredFuture; +use read_exact::ReadExactFuture; + use std::io::IoSliceMut; use std::mem; -use std::pin::Pin; -use std::str; use cfg_if::cfg_if; use futures_io::AsyncRead; -use crate::future::Future; -use crate::io; -use crate::task::{Context, Poll}; - cfg_if! { if #[cfg(feature = "docs")] { #[doc(hidden)] @@ -215,180 +221,3 @@ impl Read for T { ReadFuture { reader: self, buf } } } - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct ReadFuture<'a, T: Unpin + ?Sized> { - reader: &'a mut T, - buf: &'a mut [u8], -} - -impl Future for ReadFuture<'_, T> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { reader, buf } = &mut *self; - Pin::new(reader).poll_read(cx, buf) - } -} - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> { - reader: &'a mut T, - bufs: &'a mut [IoSliceMut<'a>], -} - -impl Future for ReadVectoredFuture<'_, T> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { reader, bufs } = &mut *self; - Pin::new(reader).poll_read_vectored(cx, bufs) - } -} - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> { - reader: &'a mut T, - buf: &'a mut Vec, - start_len: usize, -} - -impl Future for ReadToEndFuture<'_, T> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { - reader, - buf, - start_len, - } = &mut *self; - read_to_end_internal(Pin::new(reader), cx, buf, *start_len) - } -} - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> { - reader: &'a mut T, - buf: &'a mut String, - bytes: Vec, - start_len: usize, -} - -impl Future for ReadToStringFuture<'_, T> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { - reader, - buf, - bytes, - start_len, - } = &mut *self; - let reader = Pin::new(reader); - - let ret = futures_core::ready!(read_to_end_internal(reader, cx, bytes, *start_len)); - if str::from_utf8(&bytes).is_err() { - Poll::Ready(ret.and_then(|_| { - Err(io::Error::new( - io::ErrorKind::InvalidData, - "stream did not contain valid UTF-8", - )) - })) - } else { - debug_assert!(buf.is_empty()); - // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. - mem::swap(unsafe { buf.as_mut_vec() }, bytes); - Poll::Ready(ret) - } - } -} - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct ReadExactFuture<'a, T: Unpin + ?Sized> { - reader: &'a mut T, - buf: &'a mut [u8], -} - -impl Future for ReadExactFuture<'_, T> { - type Output = io::Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { reader, buf } = &mut *self; - - while !buf.is_empty() { - let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?; - let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n); - *buf = rest; - - if n == 0 { - return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())); - } - } - - Poll::Ready(Ok(())) - } -} - -// This uses an adaptive system to extend the vector when it fills. We want to -// avoid paying to allocate and zero a huge chunk of memory if the reader only -// has 4 bytes while still making large reads if the reader does have a ton -// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every -// time is 4,500 times (!) slower than this if the reader has a very small -// amount of data to return. -// -// Because we're extending the buffer with uninitialized data for trusted -// readers, we need to make sure to truncate that if any of this panics. -pub fn read_to_end_internal( - mut rd: Pin<&mut R>, - cx: &mut Context<'_>, - buf: &mut Vec, - start_len: usize, -) -> Poll> { - struct Guard<'a> { - buf: &'a mut Vec, - len: usize, - } - - impl Drop for Guard<'_> { - fn drop(&mut self) { - unsafe { - self.buf.set_len(self.len); - } - } - } - - let mut g = Guard { - len: buf.len(), - buf, - }; - let ret; - loop { - if g.len == g.buf.len() { - unsafe { - g.buf.reserve(32); - let capacity = g.buf.capacity(); - g.buf.set_len(capacity); - rd.initializer().initialize(&mut g.buf[g.len..]); - } - } - - match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { - Ok(0) => { - ret = Poll::Ready(Ok(g.len - start_len)); - break; - } - Ok(n) => g.len += n, - Err(e) => { - ret = Poll::Ready(Err(e)); - break; - } - } - } - - ret -} diff --git a/src/io/read/read.rs b/src/io/read/read.rs new file mode 100644 index 0000000..5f729e8 --- /dev/null +++ b/src/io/read/read.rs @@ -0,0 +1,23 @@ +use crate::future::Future; +use crate::task::{Context, Poll}; + +use std::pin::Pin; +use std::io; + +use futures_io::AsyncRead; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadFuture<'a, T: Unpin + ?Sized> { + pub(crate) reader: &'a mut T, + pub(crate) buf: &'a mut [u8], +} + +impl Future for ReadFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, buf } = &mut *self; + Pin::new(reader).poll_read(cx, buf) + } +} diff --git a/src/io/read/read_exact.rs b/src/io/read/read_exact.rs new file mode 100644 index 0000000..1e960d2 --- /dev/null +++ b/src/io/read/read_exact.rs @@ -0,0 +1,35 @@ +use crate::future::Future; +use crate::task::{Context, Poll}; + +use std::io; +use std::pin::Pin; +use std::mem; + +use futures_io::AsyncRead; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadExactFuture<'a, T: Unpin + ?Sized> { + pub(crate) reader: &'a mut T, + pub(crate) buf: &'a mut [u8], +} + +impl Future for ReadExactFuture<'_, T> { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, buf } = &mut *self; + + while !buf.is_empty() { + let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?; + let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n); + *buf = rest; + + if n == 0 { + return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())); + } + } + + Poll::Ready(Ok(())) + } +} diff --git a/src/io/read/read_to_end.rs b/src/io/read/read_to_end.rs new file mode 100644 index 0000000..d51f599 --- /dev/null +++ b/src/io/read/read_to_end.rs @@ -0,0 +1,87 @@ +use crate::future::Future; +use crate::task::{Context, Poll}; + +use std::io; +use std::pin::Pin; + +use futures_io::AsyncRead; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> { + pub(crate) reader: &'a mut T, + pub(crate) buf: &'a mut Vec, + pub(crate) start_len: usize, +} + +impl Future for ReadToEndFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + reader, + buf, + start_len, + } = &mut *self; + read_to_end_internal(Pin::new(reader), cx, buf, *start_len) + } +} + +// This uses an adaptive system to extend the vector when it fills. We want to +// avoid paying to allocate and zero a huge chunk of memory if the reader only +// has 4 bytes while still making large reads if the reader does have a ton +// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every +// time is 4,500 times (!) slower than this if the reader has a very small +// amount of data to return. +// +// Because we're extending the buffer with uninitialized data for trusted +// readers, we need to make sure to truncate that if any of this panics. +pub fn read_to_end_internal( + mut rd: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut Vec, + start_len: usize, +) -> Poll> { + struct Guard<'a> { + buf: &'a mut Vec, + len: usize, + } + + impl Drop for Guard<'_> { + fn drop(&mut self) { + unsafe { + self.buf.set_len(self.len); + } + } + } + + let mut g = Guard { + len: buf.len(), + buf, + }; + let ret; + loop { + if g.len == g.buf.len() { + unsafe { + g.buf.reserve(32); + let capacity = g.buf.capacity(); + g.buf.set_len(capacity); + rd.initializer().initialize(&mut g.buf[g.len..]); + } + } + + match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { + Ok(0) => { + ret = Poll::Ready(Ok(g.len - start_len)); + break; + } + Ok(n) => g.len += n, + Err(e) => { + ret = Poll::Ready(Err(e)); + break; + } + } + } + + ret +} diff --git a/src/io/read/read_to_string.rs b/src/io/read/read_to_string.rs new file mode 100644 index 0000000..10daba5 --- /dev/null +++ b/src/io/read/read_to_string.rs @@ -0,0 +1,48 @@ +use crate::future::Future; +use crate::task::{Context, Poll}; +use super::read_to_end_internal; + +use std::io; +use std::pin::Pin; +use std::str; +use std::mem; + +use futures_io::AsyncRead; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> { + pub(crate) reader: &'a mut T, + pub(crate) buf: &'a mut String, + pub(crate) bytes: Vec, + pub(crate) start_len: usize, +} + +impl Future for ReadToStringFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + reader, + buf, + bytes, + start_len, + } = &mut *self; + let reader = Pin::new(reader); + + let ret = futures_core::ready!(read_to_end_internal(reader, cx, bytes, *start_len)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + })) + } else { + debug_assert!(buf.is_empty()); + // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) + } + } +} diff --git a/src/io/read/read_vectored.rs b/src/io/read/read_vectored.rs new file mode 100644 index 0000000..b65b79f --- /dev/null +++ b/src/io/read/read_vectored.rs @@ -0,0 +1,25 @@ +use crate::future::Future; +use crate::task::{Context, Poll}; + +use std::io::IoSliceMut; +use std::pin::Pin; + +use futures_io::AsyncRead; + +use crate::io; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> { + pub(crate) reader: &'a mut T, + pub(crate) bufs: &'a mut [IoSliceMut<'a>], +} + +impl Future for ReadVectoredFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, bufs } = &mut *self; + Pin::new(reader).poll_read_vectored(cx, bufs) + } +}