Split BufRead into multiple files

staging
Stjepan Glavina 5 years ago
parent 8c00cc53ce
commit 55550c6fc9

@ -0,0 +1,75 @@
use std::mem;
use std::pin::Pin;
use std::str;
use futures_io::AsyncBufRead;
use super::read_until_internal;
use crate::io;
use crate::task::{Context, Poll};
/// A stream of lines in a byte stream.
///
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Lines`].
///
/// [`lines`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html
#[derive(Debug)]
pub struct Lines<R> {
pub(crate) reader: R,
pub(crate) buf: String,
pub(crate) bytes: Vec<u8>,
pub(crate) read: usize,
}
impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
reader,
buf,
bytes,
read,
} = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() {
return Poll::Ready(None);
}
if buf.ends_with('\n') {
buf.pop();
if buf.ends_with('\r') {
buf.pop();
}
}
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
}
}
pub fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,
bytes: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}

@ -1,11 +1,17 @@
mod lines;
mod read_line;
mod read_until;
pub use lines::Lines;
use read_line::ReadLineFuture;
use read_until::ReadUntilFuture;
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
use std::str;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures_io::AsyncBufRead; use futures_io::AsyncBufRead;
use crate::future::Future;
use crate::io; use crate::io;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
@ -191,134 +197,6 @@ pub trait BufRead {
impl<T: AsyncBufRead + Unpin + ?Sized> BufRead for T {} impl<T: AsyncBufRead + Unpin + ?Sized> BufRead for T {}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
byte: u8,
buf: &'a mut Vec<u8>,
read: usize,
}
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
byte,
buf,
read,
} = &mut *self;
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadLineFuture<'a, T: Unpin + ?Sized> {
reader: &'a mut T,
buf: &'a mut String,
bytes: Vec<u8>,
read: usize,
}
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
bytes,
read,
} = &mut *self;
let reader = Pin::new(reader);
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
}
/// A stream of lines in a byte stream.
///
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
///
/// This type is an async version of [`std::io::Lines`].
///
/// [`lines`]: trait.BufRead.html#method.lines
/// [`BufRead`]: trait.BufRead.html
/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html
#[derive(Debug)]
pub struct Lines<R> {
reader: R,
buf: String,
bytes: Vec<u8>,
read: usize,
}
impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
reader,
buf,
bytes,
read,
} = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() {
return Poll::Ready(None);
}
if buf.ends_with('\n') {
buf.pop();
if buf.ends_with('\r') {
buf.pop();
}
}
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
}
}
pub fn read_line_internal<R: AsyncBufRead + ?Sized>(
reader: Pin<&mut R>,
cx: &mut Context<'_>,
buf: &mut String,
bytes: &mut Vec<u8>,
read: &mut usize,
) -> Poll<io::Result<usize>> {
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
pub fn read_until_internal<R: AsyncBufRead + ?Sized>( pub fn read_until_internal<R: AsyncBufRead + ?Sized>(
mut reader: Pin<&mut R>, mut reader: Pin<&mut R>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -0,0 +1,49 @@
use std::mem;
use std::pin::Pin;
use std::str;
use futures_io::AsyncBufRead;
use super::read_until_internal;
use crate::future::Future;
use crate::io;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadLineFuture<'a, T: Unpin + ?Sized> {
pub(crate) reader: &'a mut T,
pub(crate) buf: &'a mut String,
pub(crate) bytes: Vec<u8>,
pub(crate) read: usize,
}
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
buf,
bytes,
read,
} = &mut *self;
let reader = Pin::new(reader);
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8",
))
}))
} else {
debug_assert!(buf.is_empty());
debug_assert_eq!(*read, 0);
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
Poll::Ready(ret)
}
}
}

@ -0,0 +1,31 @@
use std::pin::Pin;
use futures_io::AsyncBufRead;
use super::read_until_internal;
use crate::future::Future;
use crate::io;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> {
pub(crate) reader: &'a mut T,
pub(crate) byte: u8,
pub(crate) buf: &'a mut Vec<u8>,
pub(crate) read: usize,
}
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
type Output = io::Result<usize>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self {
reader,
byte,
buf,
read,
} = &mut *self;
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
}
}
Loading…
Cancel
Save