Use our own Sink/Empty and fix compilation errors

pull/27/head
Stjepan Glavina 5 years ago
parent cefdf51fa3
commit 9b3e8b8f26

@ -8,7 +8,7 @@ use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future::{self, FutureExt, TryFutureExt}; use futures::future::{self, FutureExt, TryFutureExt};
use futures::io::{AsyncSeek, Initializer}; use futures::io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer};
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
@ -482,7 +482,7 @@ impl File {
} }
} }
impl futures::io::AsyncRead for File { impl AsyncRead for File {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -497,7 +497,7 @@ impl futures::io::AsyncRead for File {
} }
} }
impl futures::io::AsyncRead for &File { impl AsyncRead for &File {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -563,7 +563,7 @@ impl futures::io::AsyncRead for &File {
} }
} }
impl futures::io::AsyncWrite for File { impl AsyncWrite for File {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -581,7 +581,7 @@ impl futures::io::AsyncWrite for File {
} }
} }
impl futures::io::AsyncWrite for &File { impl AsyncWrite for &File {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -692,7 +692,7 @@ impl futures::io::AsyncWrite for &File {
} }
} }
impl futures::io::AsyncSeek for File { impl AsyncSeek for File {
fn poll_seek( fn poll_seek(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -0,0 +1,71 @@
use std::fmt;
use std::pin::Pin;
use futures::io::{AsyncBufRead, AsyncRead, Initializer};
use crate::io;
use crate::task::{Context, Poll};
/// Creates a reader that contains no data.
///
/// # Examples
///
/// ```rust
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io;
/// use async_std::prelude::*;
///
/// let mut buf = Vec::new();
/// let mut reader = io::empty();
/// reader.read_to_end(&mut buf).await?;
///
/// assert!(buf.is_empty());
/// #
/// # Ok(()) }) }
/// ```
pub fn empty() -> Empty {
Empty { _priv: () }
}
/// A reader that contains no data.
///
/// This reader is constructed by the [`sink`] function.
///
/// [`sink`]: fn.sink.html
pub struct Empty {
_priv: (),
}
impl fmt::Debug for Empty {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Empty { .. }")
}
}
impl AsyncRead for Empty {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
_: &mut [u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(0))
}
#[inline]
unsafe fn initializer(&self) -> Initializer {
Initializer::nop()
}
}
impl AsyncBufRead for Empty {
#[inline]
fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
Poll::Ready(Ok(&[]))
}
#[inline]
fn consume(self: Pin<&mut Self>, _: usize) {}
}

@ -22,13 +22,15 @@
//! ``` //! ```
#[doc(inline)] #[doc(inline)]
pub use std::io::{empty, sink, Cursor, Empty, Error, ErrorKind, Result, SeekFrom, Sink}; pub use std::io::{Error, ErrorKind, Result, SeekFrom};
pub use buf_read::{BufRead, Lines}; pub use buf_read::{BufRead, Lines};
pub use buf_reader::BufReader; pub use buf_reader::BufReader;
pub use copy::copy; pub use copy::copy;
pub use empty::{empty, Empty};
pub use read::Read; pub use read::Read;
pub use seek::Seek; pub use seek::Seek;
pub use sink::{sink, Sink};
pub use stderr::{stderr, Stderr}; pub use stderr::{stderr, Stderr};
pub use stdin::{stdin, Stdin}; pub use stdin::{stdin, Stdin};
pub use stdout::{stdout, Stdout}; pub use stdout::{stdout, Stdout};
@ -37,8 +39,10 @@ pub use write::Write;
mod buf_read; mod buf_read;
mod buf_reader; mod buf_reader;
mod copy; mod copy;
mod empty;
mod read; mod read;
mod seek; mod seek;
mod sink;
mod stderr; mod stderr;
mod stdin; mod stdin;
mod stdout; mod stdout;

@ -0,0 +1,63 @@
use std::fmt;
use std::pin::Pin;
use futures::io::AsyncWrite;
use crate::io;
use crate::task::{Context, Poll};
/// Creates a writer that consumes and drops all data.
///
/// # Examples
///
/// ```rust
/// # #![feature(async_await)]
/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
/// #
/// use async_std::io;
/// use async_std::prelude::*;
///
/// let mut writer = io::sink();
/// writer.write(b"hello world").await?;
/// #
/// # Ok(()) }) }
/// ```
pub fn sink() -> Sink {
Sink { _priv: () }
}
/// A writer that consumes and drops all data.
///
/// This writer is constructed by the [`sink`] function.
///
/// [`sink`]: fn.sink.html
pub struct Sink {
_priv: (),
}
impl fmt::Debug for Sink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Sink { .. }")
}
}
impl AsyncWrite for Sink {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Poll::Ready(Ok(buf.len()))
}
#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
#[inline]
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}

@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncWrite;
use crate::future::Future; use crate::future::Future;
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
@ -80,7 +81,7 @@ enum Operation {
Flush(io::Result<()>), Flush(io::Result<()>),
} }
impl futures::io::AsyncWrite for Stderr { impl AsyncWrite for Stderr {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -4,7 +4,7 @@ use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future; use futures::future;
use futures::io::Initializer; use futures::io::{AsyncRead, Initializer};
use crate::future::Future; use crate::future::Future;
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
@ -140,7 +140,7 @@ impl Stdin {
} }
} }
impl futures::io::AsyncRead for Stdin { impl AsyncRead for Stdin {
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncWrite;
use crate::future::Future; use crate::future::Future;
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
@ -80,7 +81,7 @@ enum Operation {
Flush(io::Result<()>), Flush(io::Result<()>),
} }
impl futures::io::AsyncWrite for Stdout { impl AsyncWrite for Stdout {
fn poll_write( fn poll_write(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -5,6 +5,7 @@ use std::pin::Pin;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future; use futures::future;
use futures::io::{AsyncRead, AsyncWrite};
use crate::io; use crate::io;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
@ -355,7 +356,7 @@ impl TcpStream {
} }
} }
impl futures::io::AsyncRead for TcpStream { impl AsyncRead for TcpStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -373,7 +374,7 @@ impl futures::io::AsyncRead for TcpStream {
} }
} }
impl futures::io::AsyncRead for &TcpStream { impl AsyncRead for &TcpStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -391,7 +392,7 @@ impl futures::io::AsyncRead for &TcpStream {
} }
} }
impl futures::io::AsyncWrite for TcpStream { impl AsyncWrite for TcpStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -417,7 +418,7 @@ impl futures::io::AsyncWrite for TcpStream {
} }
} }
impl futures::io::AsyncWrite for &TcpStream { impl AsyncWrite for &TcpStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -7,6 +7,7 @@ use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use futures::future; use futures::future;
use futures::io::{AsyncRead, AsyncWrite};
use mio_uds; use mio_uds;
use super::SocketAddr; use super::SocketAddr;
@ -198,7 +199,7 @@ impl UnixStream {
} }
} }
impl futures::io::AsyncRead for UnixStream { impl AsyncRead for UnixStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -208,7 +209,7 @@ impl futures::io::AsyncRead for UnixStream {
} }
} }
impl futures::io::AsyncRead for &UnixStream { impl AsyncRead for &UnixStream {
fn poll_read( fn poll_read(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -218,7 +219,7 @@ impl futures::io::AsyncRead for &UnixStream {
} }
} }
impl futures::io::AsyncWrite for UnixStream { impl AsyncWrite for UnixStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -236,7 +237,7 @@ impl futures::io::AsyncWrite for UnixStream {
} }
} }
impl futures::io::AsyncWrite for &UnixStream { impl AsyncWrite for &UnixStream {
fn poll_write( fn poll_write(
self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,

@ -27,11 +27,11 @@ lazy_static! {
for _ in 0..2 { for _ in 0..2 {
thread::Builder::new() thread::Builder::new()
.name("async-blocking-driver".to_string()) .name("async-blocking-driver".to_string())
.spawn(|| { .spawn(|| abort_on_panic(|| {
for task in &POOL.receiver { for task in &POOL.receiver {
abort_on_panic(|| task.run()); task.run();
} }
}) }))
.expect("cannot start a thread driving blocking tasks"); .expect("cannot start a thread driving blocking tasks");
} }
@ -77,24 +77,12 @@ fn maybe_create_another_blocking_thread() {
// nonblocking way and spinning up another worker thread if // nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work. // there is not a thread ready to accept the work.
fn schedule(t: async_task::Task<()>) { fn schedule(t: async_task::Task<()>) {
let first_try_result = POOL.sender.try_send(t); if let Err(err) = POOL.sender.try_send(t) {
match first_try_result {
Ok(()) => {
// NICEEEE
}
Err(crossbeam::channel::TrySendError::Full(t)) => {
// We were not able to send to the channel without // We were not able to send to the channel without
// blocking. Try to spin up another thread and then // blocking. Try to spin up another thread and then
// retry sending while blocking. // retry sending while blocking.
maybe_create_another_blocking_thread(); maybe_create_another_blocking_thread();
POOL.sender.send(t).unwrap() POOL.sender.send(err.into_inner()).unwrap();
}
Err(crossbeam::channel::TrySendError::Disconnected(_)) => {
panic!(
"unable to send to blocking threadpool \
due to receiver disconnection"
);
}
} }
} }

Loading…
Cancel
Save