diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index c6f80fe..9b4269d 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -5,7 +5,7 @@ use crate::fs::DirEntry; use crate::future::Future; use crate::io; use crate::stream::Stream; -use crate::task::{blocking, Context, Poll}; +use crate::task::{blocking, Context, JoinHandle, Poll}; /// Returns a stream of entries in a directory. /// @@ -71,7 +71,7 @@ pub struct ReadDir(State); #[derive(Debug)] enum State { Idle(Option), - Busy(blocking::JoinHandle<(std::fs::ReadDir, Option>)>), + Busy(JoinHandle<(std::fs::ReadDir, Option>)>), } impl ReadDir { diff --git a/src/io/stderr.rs b/src/io/stderr.rs index e743e00..5706aa2 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -5,7 +5,7 @@ use cfg_if::cfg_if; use crate::future::Future; use crate::io::{self, Write}; -use crate::task::{blocking, Context, Poll}; +use crate::task::{blocking, Context, JoinHandle, Poll}; /// Constructs a new handle to the standard error of the current process. /// @@ -56,7 +56,7 @@ enum State { /// The stderr is blocked on an asynchronous operation. /// /// Awaiting this operation will result in the new state of the stderr. - Busy(blocking::JoinHandle), + Busy(JoinHandle), } /// Inner representation of the asynchronous stderr. diff --git a/src/io/stdin.rs b/src/io/stdin.rs index d2e9ec0..95a77b8 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -5,7 +5,7 @@ use cfg_if::cfg_if; use crate::future::{self, Future}; use crate::io::{self, Read}; -use crate::task::{blocking, Context, Poll}; +use crate::task::{blocking, Context, JoinHandle, Poll}; /// Constructs a new handle to the standard input of the current process. /// @@ -57,7 +57,7 @@ enum State { /// The stdin is blocked on an asynchronous operation. /// /// Awaiting this operation will result in the new state of the stdin. - Busy(blocking::JoinHandle), + Busy(JoinHandle), } /// Inner representation of the asynchronous stdin. diff --git a/src/io/stdout.rs b/src/io/stdout.rs index faf0c79..7849f1c 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -5,7 +5,7 @@ use cfg_if::cfg_if; use crate::future::Future; use crate::io::{self, Write}; -use crate::task::{blocking, Context, Poll}; +use crate::task::{blocking, Context, JoinHandle, Poll}; /// Constructs a new handle to the standard output of the current process. /// @@ -56,7 +56,7 @@ enum State { /// The stdout is blocked on an asynchronous operation. /// /// Awaiting this operation will result in the new state of the stdout. - Busy(blocking::JoinHandle), + Busy(JoinHandle), } /// Inner representation of the asynchronous stdout. diff --git a/src/net/addr.rs b/src/net/addr.rs index 71f43a5..baa41c8 100644 --- a/src/net/addr.rs +++ b/src/net/addr.rs @@ -7,8 +7,7 @@ use cfg_if::cfg_if; use crate::future::Future; use crate::io; -use crate::task::blocking; -use crate::task::{Context, Poll}; +use crate::task::{blocking, Context, JoinHandle, Poll}; cfg_if! { if #[cfg(feature = "docs")] { @@ -48,7 +47,7 @@ pub trait ToSocketAddrs { #[allow(missing_debug_implementations)] pub enum ToSocketAddrsFuture<'a, I> { Phantom(PhantomData<&'a ()>), - Join(blocking::JoinHandle>), + Join(JoinHandle>), Ready(Option>), } diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 8f4277c..53b52c8 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -1,7 +1,5 @@ //! A thread pool for running blocking functions asynchronously. -use std::fmt; -use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering}; use std::thread; use std::time::Duration; @@ -10,7 +8,7 @@ use crossbeam_channel::{bounded, Receiver, Sender}; use lazy_static::lazy_static; use crate::future::Future; -use crate::task::{Context, Poll}; +use crate::task::task::{JoinHandle, Tag}; use crate::utils::abort_on_panic; const MAX_THREADS: u64 = 10_000; @@ -18,8 +16,8 @@ const MAX_THREADS: u64 = 10_000; static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); struct Pool { - sender: Sender>, - receiver: Receiver>, + sender: Sender>, + receiver: Receiver>, } lazy_static! { @@ -85,7 +83,7 @@ fn maybe_create_another_blocking_thread() { // Enqueues work, attempting to send to the threadpool in a // nonblocking way and spinning up another worker thread if // there is not a thread ready to accept the work. -fn schedule(t: async_task::Task<()>) { +fn schedule(t: async_task::Task) { if let Err(err) = POOL.sender.try_send(t) { // We were not able to send to the channel without // blocking. Try to spin up another thread and then @@ -98,35 +96,15 @@ fn schedule(t: async_task::Task<()>) { /// Spawns a blocking task. /// /// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. -pub fn spawn(future: F) -> JoinHandle +pub(crate) fn spawn(future: F) -> JoinHandle where F: Future + Send + 'static, R: Send + 'static, { - let (task, handle) = async_task::spawn(future, schedule, ()); + let tag = Tag::new(None); + let (task, handle) = async_task::spawn(future, schedule, tag); task.schedule(); - JoinHandle(handle) -} - -/// A handle to a blocking task. -pub struct JoinHandle(async_task::JoinHandle); - -impl Unpin for JoinHandle {} - -impl Future for JoinHandle { - type Output = R; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.0).poll(cx).map(|out| out.unwrap()) - } -} - -impl fmt::Debug for JoinHandle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("JoinHandle") - .field("handle", &self.0) - .finish() - } + JoinHandle::new(handle) } /// Generates a random number in `0..n`. diff --git a/src/task/mod.rs b/src/task/mod.rs index 15eb7b1..a23a7c8 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -48,3 +48,19 @@ mod task_local; mod worker; pub(crate) mod blocking; + +/// Spawns a blocking task. +/// +/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. +// Once this function stabilizes we should merge `blocking::spawn` into this so +// all code in our crate uses `task::blocking` too. +#[cfg(any(feature = "unstable", feature = "docs"))] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[inline] +pub fn blocking(future: F) -> task::JoinHandle +where + F: crate::future::Future + Send + 'static, + R: Send + 'static, +{ + blocking::spawn(future) +}