forked from mirror/async-std
		
	split io::read into multiple files
Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
This commit is contained in:
		
							parent
							
								
									98d9284e64
								
							
						
					
					
						commit
						a90100962d
					
				
					 6 changed files with 230 additions and 183 deletions
				
			
		|  | @ -1,15 +1,21 @@ | |||
| mod read; | ||||
| mod read_vectored; | ||||
| mod read_to_end; | ||||
| mod read_exact; | ||||
| mod read_to_string; | ||||
| 
 | ||||
| use read_to_string::ReadToStringFuture; | ||||
| use read_to_end::{ReadToEndFuture, read_to_end_internal}; | ||||
| use read::ReadFuture; | ||||
| use read_vectored::ReadVectoredFuture; | ||||
| use read_exact::ReadExactFuture; | ||||
| 
 | ||||
| use std::io::IoSliceMut; | ||||
| use std::mem; | ||||
| use std::pin::Pin; | ||||
| use std::str; | ||||
| 
 | ||||
| use cfg_if::cfg_if; | ||||
| use futures_io::AsyncRead; | ||||
| 
 | ||||
| use crate::future::Future; | ||||
| use crate::io; | ||||
| use crate::task::{Context, Poll}; | ||||
| 
 | ||||
| cfg_if! { | ||||
|     if #[cfg(feature = "docs")] { | ||||
|         #[doc(hidden)] | ||||
|  | @ -215,180 +221,3 @@ impl<T: AsyncRead + Unpin + ?Sized> Read for T { | |||
|         ReadFuture { reader: self, buf } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadFuture<'a, T: Unpin + ?Sized> { | ||||
|     reader: &'a mut T, | ||||
|     buf: &'a mut [u8], | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, T> { | ||||
|     type Output = io::Result<usize>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { reader, buf } = &mut *self; | ||||
|         Pin::new(reader).poll_read(cx, buf) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> { | ||||
|     reader: &'a mut T, | ||||
|     bufs: &'a mut [IoSliceMut<'a>], | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, T> { | ||||
|     type Output = io::Result<usize>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { reader, bufs } = &mut *self; | ||||
|         Pin::new(reader).poll_read_vectored(cx, bufs) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> { | ||||
|     reader: &'a mut T, | ||||
|     buf: &'a mut Vec<u8>, | ||||
|     start_len: usize, | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> { | ||||
|     type Output = io::Result<usize>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { | ||||
|             reader, | ||||
|             buf, | ||||
|             start_len, | ||||
|         } = &mut *self; | ||||
|         read_to_end_internal(Pin::new(reader), cx, buf, *start_len) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> { | ||||
|     reader: &'a mut T, | ||||
|     buf: &'a mut String, | ||||
|     bytes: Vec<u8>, | ||||
|     start_len: usize, | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> { | ||||
|     type Output = io::Result<usize>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { | ||||
|             reader, | ||||
|             buf, | ||||
|             bytes, | ||||
|             start_len, | ||||
|         } = &mut *self; | ||||
|         let reader = Pin::new(reader); | ||||
| 
 | ||||
|         let ret = futures_core::ready!(read_to_end_internal(reader, cx, bytes, *start_len)); | ||||
|         if str::from_utf8(&bytes).is_err() { | ||||
|             Poll::Ready(ret.and_then(|_| { | ||||
|                 Err(io::Error::new( | ||||
|                     io::ErrorKind::InvalidData, | ||||
|                     "stream did not contain valid UTF-8", | ||||
|                 )) | ||||
|             })) | ||||
|         } else { | ||||
|             debug_assert!(buf.is_empty()); | ||||
|             // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
 | ||||
|             mem::swap(unsafe { buf.as_mut_vec() }, bytes); | ||||
|             Poll::Ready(ret) | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadExactFuture<'a, T: Unpin + ?Sized> { | ||||
|     reader: &'a mut T, | ||||
|     buf: &'a mut [u8], | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> { | ||||
|     type Output = io::Result<()>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { reader, buf } = &mut *self; | ||||
| 
 | ||||
|         while !buf.is_empty() { | ||||
|             let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?; | ||||
|             let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n); | ||||
|             *buf = rest; | ||||
| 
 | ||||
|             if n == 0 { | ||||
|                 return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         Poll::Ready(Ok(())) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| // This uses an adaptive system to extend the vector when it fills. We want to
 | ||||
| // avoid paying to allocate and zero a huge chunk of memory if the reader only
 | ||||
| // has 4 bytes while still making large reads if the reader does have a ton
 | ||||
| // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
 | ||||
| // time is 4,500 times (!) slower than this if the reader has a very small
 | ||||
| // amount of data to return.
 | ||||
| //
 | ||||
| // Because we're extending the buffer with uninitialized data for trusted
 | ||||
| // readers, we need to make sure to truncate that if any of this panics.
 | ||||
| pub fn read_to_end_internal<R: AsyncRead + ?Sized>( | ||||
|     mut rd: Pin<&mut R>, | ||||
|     cx: &mut Context<'_>, | ||||
|     buf: &mut Vec<u8>, | ||||
|     start_len: usize, | ||||
| ) -> Poll<io::Result<usize>> { | ||||
|     struct Guard<'a> { | ||||
|         buf: &'a mut Vec<u8>, | ||||
|         len: usize, | ||||
|     } | ||||
| 
 | ||||
|     impl Drop for Guard<'_> { | ||||
|         fn drop(&mut self) { | ||||
|             unsafe { | ||||
|                 self.buf.set_len(self.len); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     let mut g = Guard { | ||||
|         len: buf.len(), | ||||
|         buf, | ||||
|     }; | ||||
|     let ret; | ||||
|     loop { | ||||
|         if g.len == g.buf.len() { | ||||
|             unsafe { | ||||
|                 g.buf.reserve(32); | ||||
|                 let capacity = g.buf.capacity(); | ||||
|                 g.buf.set_len(capacity); | ||||
|                 rd.initializer().initialize(&mut g.buf[g.len..]); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { | ||||
|             Ok(0) => { | ||||
|                 ret = Poll::Ready(Ok(g.len - start_len)); | ||||
|                 break; | ||||
|             } | ||||
|             Ok(n) => g.len += n, | ||||
|             Err(e) => { | ||||
|                 ret = Poll::Ready(Err(e)); | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     ret | ||||
| } | ||||
							
								
								
									
										23
									
								
								src/io/read/read.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								src/io/read/read.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,23 @@ | |||
| use crate::future::Future; | ||||
| use crate::task::{Context, Poll}; | ||||
| 
 | ||||
| use std::pin::Pin; | ||||
| use std::io; | ||||
| 
 | ||||
| use futures_io::AsyncRead; | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadFuture<'a, T: Unpin + ?Sized> { | ||||
|     pub(crate) reader: &'a mut T, | ||||
|     pub(crate) buf: &'a mut [u8], | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, T> { | ||||
|     type Output = io::Result<usize>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { reader, buf } = &mut *self; | ||||
|         Pin::new(reader).poll_read(cx, buf) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										35
									
								
								src/io/read/read_exact.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										35
									
								
								src/io/read/read_exact.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,35 @@ | |||
| use crate::future::Future; | ||||
| use crate::task::{Context, Poll}; | ||||
| 
 | ||||
| use std::io; | ||||
| use std::pin::Pin; | ||||
| use std::mem; | ||||
| 
 | ||||
| use futures_io::AsyncRead; | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadExactFuture<'a, T: Unpin + ?Sized> { | ||||
|     pub(crate) reader: &'a mut T, | ||||
|     pub(crate) buf: &'a mut [u8], | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> { | ||||
|     type Output = io::Result<()>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { reader, buf } = &mut *self; | ||||
| 
 | ||||
|         while !buf.is_empty() { | ||||
|             let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?; | ||||
|             let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n); | ||||
|             *buf = rest; | ||||
| 
 | ||||
|             if n == 0 { | ||||
|                 return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         Poll::Ready(Ok(())) | ||||
|     } | ||||
| } | ||||
							
								
								
									
										87
									
								
								src/io/read/read_to_end.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										87
									
								
								src/io/read/read_to_end.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,87 @@ | |||
| use crate::future::Future; | ||||
| use crate::task::{Context, Poll}; | ||||
| 
 | ||||
| use std::io; | ||||
| use std::pin::Pin; | ||||
| 
 | ||||
| use futures_io::AsyncRead; | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadToEndFuture<'a, T: Unpin + ?Sized> { | ||||
|     pub(crate) reader: &'a mut T, | ||||
|     pub(crate) buf: &'a mut Vec<u8>, | ||||
|     pub(crate) start_len: usize, | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, T> { | ||||
|     type Output = io::Result<usize>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { | ||||
|             reader, | ||||
|             buf, | ||||
|             start_len, | ||||
|         } = &mut *self; | ||||
|         read_to_end_internal(Pin::new(reader), cx, buf, *start_len) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| // This uses an adaptive system to extend the vector when it fills. We want to
 | ||||
| // avoid paying to allocate and zero a huge chunk of memory if the reader only
 | ||||
| // has 4 bytes while still making large reads if the reader does have a ton
 | ||||
| // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
 | ||||
| // time is 4,500 times (!) slower than this if the reader has a very small
 | ||||
| // amount of data to return.
 | ||||
| //
 | ||||
| // Because we're extending the buffer with uninitialized data for trusted
 | ||||
| // readers, we need to make sure to truncate that if any of this panics.
 | ||||
| pub fn read_to_end_internal<R: AsyncRead + ?Sized>( | ||||
|     mut rd: Pin<&mut R>, | ||||
|     cx: &mut Context<'_>, | ||||
|     buf: &mut Vec<u8>, | ||||
|     start_len: usize, | ||||
| ) -> Poll<io::Result<usize>> { | ||||
|     struct Guard<'a> { | ||||
|         buf: &'a mut Vec<u8>, | ||||
|         len: usize, | ||||
|     } | ||||
| 
 | ||||
|     impl Drop for Guard<'_> { | ||||
|         fn drop(&mut self) { | ||||
|             unsafe { | ||||
|                 self.buf.set_len(self.len); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     let mut g = Guard { | ||||
|         len: buf.len(), | ||||
|         buf, | ||||
|     }; | ||||
|     let ret; | ||||
|     loop { | ||||
|         if g.len == g.buf.len() { | ||||
|             unsafe { | ||||
|                 g.buf.reserve(32); | ||||
|                 let capacity = g.buf.capacity(); | ||||
|                 g.buf.set_len(capacity); | ||||
|                 rd.initializer().initialize(&mut g.buf[g.len..]); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { | ||||
|             Ok(0) => { | ||||
|                 ret = Poll::Ready(Ok(g.len - start_len)); | ||||
|                 break; | ||||
|             } | ||||
|             Ok(n) => g.len += n, | ||||
|             Err(e) => { | ||||
|                 ret = Poll::Ready(Err(e)); | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     ret | ||||
| } | ||||
							
								
								
									
										48
									
								
								src/io/read/read_to_string.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								src/io/read/read_to_string.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,48 @@ | |||
| use crate::future::Future; | ||||
| use crate::task::{Context, Poll}; | ||||
| use super::read_to_end_internal; | ||||
| 
 | ||||
| use std::io; | ||||
| use std::pin::Pin; | ||||
| use std::str; | ||||
| use std::mem; | ||||
| 
 | ||||
| use futures_io::AsyncRead; | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadToStringFuture<'a, T: Unpin + ?Sized> { | ||||
|     pub(crate) reader: &'a mut T, | ||||
|     pub(crate) buf: &'a mut String, | ||||
|     pub(crate) bytes: Vec<u8>, | ||||
|     pub(crate) start_len: usize, | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> { | ||||
|     type Output = io::Result<usize>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { | ||||
|             reader, | ||||
|             buf, | ||||
|             bytes, | ||||
|             start_len, | ||||
|         } = &mut *self; | ||||
|         let reader = Pin::new(reader); | ||||
| 
 | ||||
|         let ret = futures_core::ready!(read_to_end_internal(reader, cx, bytes, *start_len)); | ||||
|         if str::from_utf8(&bytes).is_err() { | ||||
|             Poll::Ready(ret.and_then(|_| { | ||||
|                 Err(io::Error::new( | ||||
|                     io::ErrorKind::InvalidData, | ||||
|                     "stream did not contain valid UTF-8", | ||||
|                 )) | ||||
|             })) | ||||
|         } else { | ||||
|             debug_assert!(buf.is_empty()); | ||||
|             // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
 | ||||
|             mem::swap(unsafe { buf.as_mut_vec() }, bytes); | ||||
|             Poll::Ready(ret) | ||||
|         } | ||||
|     } | ||||
| } | ||||
							
								
								
									
										25
									
								
								src/io/read/read_vectored.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										25
									
								
								src/io/read/read_vectored.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,25 @@ | |||
| use crate::future::Future; | ||||
| use crate::task::{Context, Poll}; | ||||
| 
 | ||||
| use std::io::IoSliceMut; | ||||
| use std::pin::Pin; | ||||
| 
 | ||||
| use futures_io::AsyncRead; | ||||
| 
 | ||||
| use crate::io; | ||||
| 
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct ReadVectoredFuture<'a, T: Unpin + ?Sized> { | ||||
|     pub(crate) reader: &'a mut T, | ||||
|     pub(crate) bufs: &'a mut [IoSliceMut<'a>], | ||||
| } | ||||
| 
 | ||||
| impl<T: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, T> { | ||||
|     type Output = io::Result<usize>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         let Self { reader, bufs } = &mut *self; | ||||
|         Pin::new(reader).poll_read_vectored(cx, bufs) | ||||
|     } | ||||
| } | ||||
		Loading…
	
		Reference in a new issue