diff --git a/src/fs/file.rs b/src/fs/file.rs index 330c61f..c61a249 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -8,7 +8,7 @@ use std::sync::Mutex; use cfg_if::cfg_if; use futures::future::{self, FutureExt, TryFutureExt}; -use futures::io::{AsyncSeek, Initializer}; +use futures::io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer}; use crate::future::Future; use crate::io; @@ -482,7 +482,7 @@ impl File { } } -impl futures::io::AsyncRead for File { +impl AsyncRead for File { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -497,7 +497,7 @@ impl futures::io::AsyncRead for File { } } -impl futures::io::AsyncRead for &File { +impl AsyncRead for &File { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -563,7 +563,7 @@ impl futures::io::AsyncRead for &File { } } -impl futures::io::AsyncWrite for File { +impl AsyncWrite for File { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -581,7 +581,7 @@ impl futures::io::AsyncWrite for File { } } -impl futures::io::AsyncWrite for &File { +impl AsyncWrite for &File { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -692,7 +692,7 @@ impl futures::io::AsyncWrite for &File { } } -impl futures::io::AsyncSeek for File { +impl AsyncSeek for File { fn poll_seek( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/io/empty.rs b/src/io/empty.rs new file mode 100644 index 0000000..41978bc --- /dev/null +++ b/src/io/empty.rs @@ -0,0 +1,71 @@ +use std::fmt; +use std::pin::Pin; + +use futures::io::{AsyncBufRead, AsyncRead, Initializer}; + +use crate::io; +use crate::task::{Context, Poll}; + +/// Creates a reader that contains no data. +/// +/// # Examples +/// +/// ```rust +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::io; +/// use async_std::prelude::*; +/// +/// let mut buf = Vec::new(); +/// let mut reader = io::empty(); +/// reader.read_to_end(&mut buf).await?; +/// +/// assert!(buf.is_empty()); +/// # +/// # Ok(()) }) } +/// ``` +pub fn empty() -> Empty { + Empty { _priv: () } +} + +/// A reader that contains no data. +/// +/// This reader is constructed by the [`sink`] function. +/// +/// [`sink`]: fn.sink.html +pub struct Empty { + _priv: (), +} + +impl fmt::Debug for Empty { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Empty { .. }") + } +} + +impl AsyncRead for Empty { + #[inline] + fn poll_read( + self: Pin<&mut Self>, + _: &mut Context<'_>, + _: &mut [u8], + ) -> Poll> { + Poll::Ready(Ok(0)) + } + + #[inline] + unsafe fn initializer(&self) -> Initializer { + Initializer::nop() + } +} + +impl AsyncBufRead for Empty { + #[inline] + fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(&[])) + } + + #[inline] + fn consume(self: Pin<&mut Self>, _: usize) {} +} diff --git a/src/io/mod.rs b/src/io/mod.rs index daf70b8..0355031 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -22,13 +22,15 @@ //! ``` #[doc(inline)] -pub use std::io::{empty, sink, Cursor, Empty, Error, ErrorKind, Result, SeekFrom, Sink}; +pub use std::io::{Error, ErrorKind, Result, SeekFrom}; pub use buf_read::{BufRead, Lines}; pub use buf_reader::BufReader; pub use copy::copy; +pub use empty::{empty, Empty}; pub use read::Read; pub use seek::Seek; +pub use sink::{sink, Sink}; pub use stderr::{stderr, Stderr}; pub use stdin::{stdin, Stdin}; pub use stdout::{stdout, Stdout}; @@ -37,8 +39,10 @@ pub use write::Write; mod buf_read; mod buf_reader; mod copy; +mod empty; mod read; mod seek; +mod sink; mod stderr; mod stdin; mod stdout; diff --git a/src/io/sink.rs b/src/io/sink.rs new file mode 100644 index 0000000..f4f5fdd --- /dev/null +++ b/src/io/sink.rs @@ -0,0 +1,63 @@ +use std::fmt; +use std::pin::Pin; + +use futures::io::AsyncWrite; + +use crate::io; +use crate::task::{Context, Poll}; + +/// Creates a writer that consumes and drops all data. +/// +/// # Examples +/// +/// ```rust +/// # #![feature(async_await)] +/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +/// # +/// use async_std::io; +/// use async_std::prelude::*; +/// +/// let mut writer = io::sink(); +/// writer.write(b"hello world").await?; +/// # +/// # Ok(()) }) } +/// ``` +pub fn sink() -> Sink { + Sink { _priv: () } +} + +/// A writer that consumes and drops all data. +/// +/// This writer is constructed by the [`sink`] function. +/// +/// [`sink`]: fn.sink.html +pub struct Sink { + _priv: (), +} + +impl fmt::Debug for Sink { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Sink { .. }") + } +} + +impl AsyncWrite for Sink { + #[inline] + fn poll_write( + self: Pin<&mut Self>, + _: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Poll::Ready(Ok(buf.len())) + } + + #[inline] + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + #[inline] + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } +} diff --git a/src/io/stderr.rs b/src/io/stderr.rs index 9983f61..4c81abb 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use std::sync::Mutex; use cfg_if::cfg_if; +use futures::io::AsyncWrite; use crate::future::Future; use crate::task::{blocking, Context, Poll}; @@ -80,7 +81,7 @@ enum Operation { Flush(io::Result<()>), } -impl futures::io::AsyncWrite for Stderr { +impl AsyncWrite for Stderr { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/io/stdin.rs b/src/io/stdin.rs index 8747e7e..a3b9368 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -4,7 +4,7 @@ use std::sync::Mutex; use cfg_if::cfg_if; use futures::future; -use futures::io::Initializer; +use futures::io::{AsyncRead, Initializer}; use crate::future::Future; use crate::task::{blocking, Context, Poll}; @@ -140,7 +140,7 @@ impl Stdin { } } -impl futures::io::AsyncRead for Stdin { +impl AsyncRead for Stdin { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/io/stdout.rs b/src/io/stdout.rs index 9100f24..abf42ef 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use std::sync::Mutex; use cfg_if::cfg_if; +use futures::io::AsyncWrite; use crate::future::Future; use crate::task::{blocking, Context, Poll}; @@ -80,7 +81,7 @@ enum Operation { Flush(io::Result<()>), } -impl futures::io::AsyncWrite for Stdout { +impl AsyncWrite for Stdout { fn poll_write( mut self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 234467a..27df560 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -5,6 +5,7 @@ use std::pin::Pin; use cfg_if::cfg_if; use futures::future; +use futures::io::{AsyncRead, AsyncWrite}; use crate::io; use crate::net::driver::IoHandle; @@ -355,7 +356,7 @@ impl TcpStream { } } -impl futures::io::AsyncRead for TcpStream { +impl AsyncRead for TcpStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -373,7 +374,7 @@ impl futures::io::AsyncRead for TcpStream { } } -impl futures::io::AsyncRead for &TcpStream { +impl AsyncRead for &TcpStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -391,7 +392,7 @@ impl futures::io::AsyncRead for &TcpStream { } } -impl futures::io::AsyncWrite for TcpStream { +impl AsyncWrite for TcpStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -417,7 +418,7 @@ impl futures::io::AsyncWrite for TcpStream { } } -impl futures::io::AsyncWrite for &TcpStream { +impl AsyncWrite for &TcpStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 607c234..c7bdedf 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -7,6 +7,7 @@ use std::path::Path; use std::pin::Pin; use futures::future; +use futures::io::{AsyncRead, AsyncWrite}; use mio_uds; use super::SocketAddr; @@ -198,7 +199,7 @@ impl UnixStream { } } -impl futures::io::AsyncRead for UnixStream { +impl AsyncRead for UnixStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -208,7 +209,7 @@ impl futures::io::AsyncRead for UnixStream { } } -impl futures::io::AsyncRead for &UnixStream { +impl AsyncRead for &UnixStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -218,7 +219,7 @@ impl futures::io::AsyncRead for &UnixStream { } } -impl futures::io::AsyncWrite for UnixStream { +impl AsyncWrite for UnixStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -236,7 +237,7 @@ impl futures::io::AsyncWrite for UnixStream { } } -impl futures::io::AsyncWrite for &UnixStream { +impl AsyncWrite for &UnixStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/task/blocking.rs b/src/task/blocking.rs index a7a610a..90c6e35 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -27,11 +27,11 @@ lazy_static! { for _ in 0..2 { thread::Builder::new() .name("async-blocking-driver".to_string()) - .spawn(|| { + .spawn(|| abort_on_panic(|| { for task in &POOL.receiver { - abort_on_panic(|| task.run()); + task.run(); } - }) + })) .expect("cannot start a thread driving blocking tasks"); } @@ -77,24 +77,12 @@ fn maybe_create_another_blocking_thread() { // nonblocking way and spinning up another worker thread if // there is not a thread ready to accept the work. fn schedule(t: async_task::Task<()>) { - let first_try_result = POOL.sender.try_send(t); - match first_try_result { - Ok(()) => { - // NICEEEE - } - Err(crossbeam::channel::TrySendError::Full(t)) => { - // We were not able to send to the channel without - // blocking. Try to spin up another thread and then - // retry sending while blocking. - maybe_create_another_blocking_thread(); - POOL.sender.send(t).unwrap() - } - Err(crossbeam::channel::TrySendError::Disconnected(_)) => { - panic!( - "unable to send to blocking threadpool \ - due to receiver disconnection" - ); - } + if let Err(err) = POOL.sender.try_send(t) { + // We were not able to send to the channel without + // blocking. Try to spin up another thread and then + // retry sending while blocking. + maybe_create_another_blocking_thread(); + POOL.sender.send(err.into_inner()).unwrap(); } }