forked from mirror/async-std
Merge #161
161: Split BufRead into multiple files r=stjepang a=stjepang Co-authored-by: Stjepan Glavina <stjepang@gmail.com>
This commit is contained in:
commit
63f3a809aa
4 changed files with 163 additions and 130 deletions
75
src/io/buf_read/lines.rs
Normal file
75
src/io/buf_read/lines.rs
Normal file
|
@ -0,0 +1,75 @@
|
||||||
|
use std::mem;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::str;
|
||||||
|
|
||||||
|
use futures_io::AsyncBufRead;
|
||||||
|
|
||||||
|
use super::read_until_internal;
|
||||||
|
use crate::io;
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
|
/// A stream of lines in a byte stream.
|
||||||
|
///
|
||||||
|
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
|
||||||
|
///
|
||||||
|
/// This type is an async version of [`std::io::Lines`].
|
||||||
|
///
|
||||||
|
/// [`lines`]: trait.BufRead.html#method.lines
|
||||||
|
/// [`BufRead`]: trait.BufRead.html
|
||||||
|
/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Lines<R> {
|
||||||
|
pub(crate) reader: R,
|
||||||
|
pub(crate) buf: String,
|
||||||
|
pub(crate) bytes: Vec<u8>,
|
||||||
|
pub(crate) read: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
|
||||||
|
type Item = io::Result<String>;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
|
let Self {
|
||||||
|
reader,
|
||||||
|
buf,
|
||||||
|
bytes,
|
||||||
|
read,
|
||||||
|
} = unsafe { self.get_unchecked_mut() };
|
||||||
|
let reader = unsafe { Pin::new_unchecked(reader) };
|
||||||
|
let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
|
||||||
|
if n == 0 && buf.is_empty() {
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
if buf.ends_with('\n') {
|
||||||
|
buf.pop();
|
||||||
|
if buf.ends_with('\r') {
|
||||||
|
buf.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read_line_internal<R: AsyncBufRead + ?Sized>(
|
||||||
|
reader: Pin<&mut R>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
buf: &mut String,
|
||||||
|
bytes: &mut Vec<u8>,
|
||||||
|
read: &mut usize,
|
||||||
|
) -> Poll<io::Result<usize>> {
|
||||||
|
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
|
||||||
|
if str::from_utf8(&bytes).is_err() {
|
||||||
|
Poll::Ready(ret.and_then(|_| {
|
||||||
|
Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
"stream did not contain valid UTF-8",
|
||||||
|
))
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
debug_assert!(buf.is_empty());
|
||||||
|
debug_assert_eq!(*read, 0);
|
||||||
|
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
|
||||||
|
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
|
||||||
|
Poll::Ready(ret)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,11 +1,17 @@
|
||||||
|
mod lines;
|
||||||
|
mod read_line;
|
||||||
|
mod read_until;
|
||||||
|
|
||||||
|
pub use lines::Lines;
|
||||||
|
use read_line::ReadLineFuture;
|
||||||
|
use read_until::ReadUntilFuture;
|
||||||
|
|
||||||
use std::mem;
|
use std::mem;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::str;
|
|
||||||
|
|
||||||
use cfg_if::cfg_if;
|
use cfg_if::cfg_if;
|
||||||
use futures_io::AsyncBufRead;
|
use futures_io::AsyncBufRead;
|
||||||
|
|
||||||
use crate::future::Future;
|
|
||||||
use crate::io;
|
use crate::io;
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
|
@ -191,134 +197,6 @@ pub trait BufRead {
|
||||||
|
|
||||||
impl<T: AsyncBufRead + Unpin + ?Sized> BufRead for T {}
|
impl<T: AsyncBufRead + Unpin + ?Sized> BufRead for T {}
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> {
|
|
||||||
reader: &'a mut T,
|
|
||||||
byte: u8,
|
|
||||||
buf: &'a mut Vec<u8>,
|
|
||||||
read: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
|
|
||||||
type Output = io::Result<usize>;
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let Self {
|
|
||||||
reader,
|
|
||||||
byte,
|
|
||||||
buf,
|
|
||||||
read,
|
|
||||||
} = &mut *self;
|
|
||||||
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
#[allow(missing_debug_implementations)]
|
|
||||||
pub struct ReadLineFuture<'a, T: Unpin + ?Sized> {
|
|
||||||
reader: &'a mut T,
|
|
||||||
buf: &'a mut String,
|
|
||||||
bytes: Vec<u8>,
|
|
||||||
read: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
|
|
||||||
type Output = io::Result<usize>;
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
let Self {
|
|
||||||
reader,
|
|
||||||
buf,
|
|
||||||
bytes,
|
|
||||||
read,
|
|
||||||
} = &mut *self;
|
|
||||||
let reader = Pin::new(reader);
|
|
||||||
|
|
||||||
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
|
|
||||||
if str::from_utf8(&bytes).is_err() {
|
|
||||||
Poll::Ready(ret.and_then(|_| {
|
|
||||||
Err(io::Error::new(
|
|
||||||
io::ErrorKind::InvalidData,
|
|
||||||
"stream did not contain valid UTF-8",
|
|
||||||
))
|
|
||||||
}))
|
|
||||||
} else {
|
|
||||||
debug_assert!(buf.is_empty());
|
|
||||||
debug_assert_eq!(*read, 0);
|
|
||||||
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
|
|
||||||
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
|
|
||||||
Poll::Ready(ret)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A stream of lines in a byte stream.
|
|
||||||
///
|
|
||||||
/// This stream is created by the [`lines`] method on types that implement [`BufRead`].
|
|
||||||
///
|
|
||||||
/// This type is an async version of [`std::io::Lines`].
|
|
||||||
///
|
|
||||||
/// [`lines`]: trait.BufRead.html#method.lines
|
|
||||||
/// [`BufRead`]: trait.BufRead.html
|
|
||||||
/// [`std::io::Lines`]: https://doc.rust-lang.org/nightly/std/io/struct.Lines.html
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Lines<R> {
|
|
||||||
reader: R,
|
|
||||||
buf: String,
|
|
||||||
bytes: Vec<u8>,
|
|
||||||
read: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
|
|
||||||
type Item = io::Result<String>;
|
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
|
||||||
let Self {
|
|
||||||
reader,
|
|
||||||
buf,
|
|
||||||
bytes,
|
|
||||||
read,
|
|
||||||
} = unsafe { self.get_unchecked_mut() };
|
|
||||||
let reader = unsafe { Pin::new_unchecked(reader) };
|
|
||||||
let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
|
|
||||||
if n == 0 && buf.is_empty() {
|
|
||||||
return Poll::Ready(None);
|
|
||||||
}
|
|
||||||
if buf.ends_with('\n') {
|
|
||||||
buf.pop();
|
|
||||||
if buf.ends_with('\r') {
|
|
||||||
buf.pop();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Poll::Ready(Some(Ok(mem::replace(buf, String::new()))))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read_line_internal<R: AsyncBufRead + ?Sized>(
|
|
||||||
reader: Pin<&mut R>,
|
|
||||||
cx: &mut Context<'_>,
|
|
||||||
buf: &mut String,
|
|
||||||
bytes: &mut Vec<u8>,
|
|
||||||
read: &mut usize,
|
|
||||||
) -> Poll<io::Result<usize>> {
|
|
||||||
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
|
|
||||||
if str::from_utf8(&bytes).is_err() {
|
|
||||||
Poll::Ready(ret.and_then(|_| {
|
|
||||||
Err(io::Error::new(
|
|
||||||
io::ErrorKind::InvalidData,
|
|
||||||
"stream did not contain valid UTF-8",
|
|
||||||
))
|
|
||||||
}))
|
|
||||||
} else {
|
|
||||||
debug_assert!(buf.is_empty());
|
|
||||||
debug_assert_eq!(*read, 0);
|
|
||||||
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
|
|
||||||
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
|
|
||||||
Poll::Ready(ret)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read_until_internal<R: AsyncBufRead + ?Sized>(
|
pub fn read_until_internal<R: AsyncBufRead + ?Sized>(
|
||||||
mut reader: Pin<&mut R>,
|
mut reader: Pin<&mut R>,
|
||||||
cx: &mut Context<'_>,
|
cx: &mut Context<'_>,
|
49
src/io/buf_read/read_line.rs
Normal file
49
src/io/buf_read/read_line.rs
Normal file
|
@ -0,0 +1,49 @@
|
||||||
|
use std::mem;
|
||||||
|
use std::pin::Pin;
|
||||||
|
use std::str;
|
||||||
|
|
||||||
|
use futures_io::AsyncBufRead;
|
||||||
|
|
||||||
|
use super::read_until_internal;
|
||||||
|
use crate::future::Future;
|
||||||
|
use crate::io;
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct ReadLineFuture<'a, T: Unpin + ?Sized> {
|
||||||
|
pub(crate) reader: &'a mut T,
|
||||||
|
pub(crate) buf: &'a mut String,
|
||||||
|
pub(crate) bytes: Vec<u8>,
|
||||||
|
pub(crate) read: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
|
||||||
|
type Output = io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let Self {
|
||||||
|
reader,
|
||||||
|
buf,
|
||||||
|
bytes,
|
||||||
|
read,
|
||||||
|
} = &mut *self;
|
||||||
|
let reader = Pin::new(reader);
|
||||||
|
|
||||||
|
let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
|
||||||
|
if str::from_utf8(&bytes).is_err() {
|
||||||
|
Poll::Ready(ret.and_then(|_| {
|
||||||
|
Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidData,
|
||||||
|
"stream did not contain valid UTF-8",
|
||||||
|
))
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
debug_assert!(buf.is_empty());
|
||||||
|
debug_assert_eq!(*read, 0);
|
||||||
|
// Safety: `bytes` is a valid UTF-8 because `str::from_utf8` returned `Ok`.
|
||||||
|
mem::swap(unsafe { buf.as_mut_vec() }, bytes);
|
||||||
|
Poll::Ready(ret)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
31
src/io/buf_read/read_until.rs
Normal file
31
src/io/buf_read/read_until.rs
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use futures_io::AsyncBufRead;
|
||||||
|
|
||||||
|
use super::read_until_internal;
|
||||||
|
use crate::future::Future;
|
||||||
|
use crate::io;
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct ReadUntilFuture<'a, T: Unpin + ?Sized> {
|
||||||
|
pub(crate) reader: &'a mut T,
|
||||||
|
pub(crate) byte: u8,
|
||||||
|
pub(crate) buf: &'a mut Vec<u8>,
|
||||||
|
pub(crate) read: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, T> {
|
||||||
|
type Output = io::Result<usize>;
|
||||||
|
|
||||||
|
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let Self {
|
||||||
|
reader,
|
||||||
|
byte,
|
||||||
|
buf,
|
||||||
|
read,
|
||||||
|
} = &mut *self;
|
||||||
|
read_until_internal(Pin::new(reader), cx, *byte, buf, read)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue