diff --git a/src/io/buf_read/lines.rs b/src/io/buf_read/lines.rs new file mode 100644 index 0000000..17ec447 --- /dev/null +++ b/src/io/buf_read/lines.rs @@ -0,0 +1,75 @@ +use std::mem; +use std::pin::Pin; +use std::str; + +use futures_io::AsyncBufRead; + +use super::read_until_internal; +use crate::io; +use crate::task::{Context, Poll}; + +/// A stream of lines in a byte stream. +/// +/// This stream is created by the [`lines`] method on types that implement [`BufRead`]. +/// +/// This type is an async version of [`std::io::Lines`]. +/// +/// [`lines`]: trait.BufRead.html#method.lines +/// [`BufRead`]: trait.BufRead.html +/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html +#[derive(Debug)] +pub struct Lines { + pub(crate) reader: R, + pub(crate) buf: String, + pub(crate) bytes: Vec, + pub(crate) read: usize, +} + +impl futures_core::stream::Stream for Lines { + type Item = io::Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + reader, + buf, + bytes, + read, + } = unsafe { self.get_unchecked_mut() }; + let reader = unsafe { Pin::new_unchecked(reader) }; + let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?; + if n == 0 && buf.is_empty() { + return Poll::Ready(None); + } + if buf.ends_with('\n') { + buf.pop(); + if buf.ends_with('\r') { + buf.pop(); + } + } + Poll::Ready(Some(Ok(mem::replace(buf, String::new())))) + } +} + +pub fn read_line_internal( + reader: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut String, + bytes: &mut Vec, + read: &mut usize, +) -> Poll> { + let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + })) + } else { + debug_assert!(buf.is_empty()); + debug_assert_eq!(*read, 0); + // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) + } +} diff --git a/src/io/buf_read.rs b/src/io/buf_read/mod.rs similarity index 63% rename from src/io/buf_read.rs rename to src/io/buf_read/mod.rs index 7e6b6fb..e320375 100644 --- a/src/io/buf_read.rs +++ b/src/io/buf_read/mod.rs @@ -1,11 +1,17 @@ +mod lines; +mod read_line; +mod read_until; + +pub use lines::Lines; +use read_line::ReadLineFuture; +use read_until::ReadUntilFuture; + use std::mem; use std::pin::Pin; -use std::str; use cfg_if::cfg_if; use futures_io::AsyncBufRead; -use crate::future::Future; use crate::io; use crate::task::{Context, Poll}; @@ -191,134 +197,6 @@ pub trait BufRead { impl BufRead for T {} -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> { - reader: &'a mut T, - byte: u8, - buf: &'a mut Vec, - read: usize, -} - -impl Future for ReadUntilFuture<'_, T> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { - reader, - byte, - buf, - read, - } = &mut *self; - read_until_internal(Pin::new(reader), cx, *byte, buf, read) - } -} - -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct ReadLineFuture<'a, T: Unpin + ?Sized> { - reader: &'a mut T, - buf: &'a mut String, - bytes: Vec, - read: usize, -} - -impl Future for ReadLineFuture<'_, T> { - type Output = io::Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { - reader, - buf, - bytes, - read, - } = &mut *self; - let reader = Pin::new(reader); - - let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); - if str::from_utf8(&bytes).is_err() { - Poll::Ready(ret.and_then(|_| { - Err(io::Error::new( - io::ErrorKind::InvalidData, - "stream did not contain valid UTF-8", - )) - })) - } else { - debug_assert!(buf.is_empty()); - debug_assert_eq!(*read, 0); - // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. - mem::swap(unsafe { buf.as_mut_vec() }, bytes); - Poll::Ready(ret) - } - } -} - -/// A stream of lines in a byte stream. -/// -/// This stream is created by the [`lines`] method on types that implement [`BufRead`]. -/// -/// This type is an async version of [`std::io::Lines`]. -/// -/// [`lines`]: trait.BufRead.html#method.lines -/// [`BufRead`]: trait.BufRead.html -/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html -#[derive(Debug)] -pub struct Lines { - reader: R, - buf: String, - bytes: Vec, - read: usize, -} - -impl futures_core::stream::Stream for Lines { - type Item = io::Result; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let Self { - reader, - buf, - bytes, - read, - } = unsafe { self.get_unchecked_mut() }; - let reader = unsafe { Pin::new_unchecked(reader) }; - let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?; - if n == 0 && buf.is_empty() { - return Poll::Ready(None); - } - if buf.ends_with('\n') { - buf.pop(); - if buf.ends_with('\r') { - buf.pop(); - } - } - Poll::Ready(Some(Ok(mem::replace(buf, String::new())))) - } -} - -pub fn read_line_internal( - reader: Pin<&mut R>, - cx: &mut Context<'_>, - buf: &mut String, - bytes: &mut Vec, - read: &mut usize, -) -> Poll> { - let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); - if str::from_utf8(&bytes).is_err() { - Poll::Ready(ret.and_then(|_| { - Err(io::Error::new( - io::ErrorKind::InvalidData, - "stream did not contain valid UTF-8", - )) - })) - } else { - debug_assert!(buf.is_empty()); - debug_assert_eq!(*read, 0); - // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. - mem::swap(unsafe { buf.as_mut_vec() }, bytes); - Poll::Ready(ret) - } -} - pub fn read_until_internal( mut reader: Pin<&mut R>, cx: &mut Context<'_>, diff --git a/src/io/buf_read/read_line.rs b/src/io/buf_read/read_line.rs new file mode 100644 index 0000000..7424637 --- /dev/null +++ b/src/io/buf_read/read_line.rs @@ -0,0 +1,49 @@ +use std::mem; +use std::pin::Pin; +use std::str; + +use futures_io::AsyncBufRead; + +use super::read_until_internal; +use crate::future::Future; +use crate::io; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadLineFuture<'a, T: Unpin + ?Sized> { + pub(crate) reader: &'a mut T, + pub(crate) buf: &'a mut String, + pub(crate) bytes: Vec, + pub(crate) read: usize, +} + +impl Future for ReadLineFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + reader, + buf, + bytes, + read, + } = &mut *self; + let reader = Pin::new(reader); + + let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); + if str::from_utf8(&bytes).is_err() { + Poll::Ready(ret.and_then(|_| { + Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + })) + } else { + debug_assert!(buf.is_empty()); + debug_assert_eq!(*read, 0); + // Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`. + mem::swap(unsafe { buf.as_mut_vec() }, bytes); + Poll::Ready(ret) + } + } +} diff --git a/src/io/buf_read/read_until.rs b/src/io/buf_read/read_until.rs new file mode 100644 index 0000000..c57a820 --- /dev/null +++ b/src/io/buf_read/read_until.rs @@ -0,0 +1,31 @@ +use std::pin::Pin; + +use futures_io::AsyncBufRead; + +use super::read_until_internal; +use crate::future::Future; +use crate::io; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> { + pub(crate) reader: &'a mut T, + pub(crate) byte: u8, + pub(crate) buf: &'a mut Vec, + pub(crate) read: usize, +} + +impl Future for ReadUntilFuture<'_, T> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { + reader, + byte, + buf, + read, + } = &mut *self; + read_until_internal(Pin::new(reader), cx, *byte, buf, read) + } +}