Merge pull request #251 from async-rs/blocking-unstable

add an unstable `task::blocking` function
write-by-ref
Yoshua Wuyts 5 years ago committed by GitHub
commit 9ab7b1ae6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,7 +5,7 @@ use crate::fs::DirEntry;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, JoinHandle, Poll};
/// Returns a stream of entries in a directory. /// Returns a stream of entries in a directory.
/// ///
@ -71,7 +71,7 @@ pub struct ReadDir(State);
#[derive(Debug)] #[derive(Debug)]
enum State { enum State {
Idle(Option<std::fs::ReadDir>), Idle(Option<std::fs::ReadDir>),
Busy(blocking::JoinHandle<(std::fs::ReadDir, Option<io::Result<std::fs::DirEntry>>)>), Busy(JoinHandle<(std::fs::ReadDir, Option<io::Result<std::fs::DirEntry>>)>),
} }
impl ReadDir { impl ReadDir {

@ -5,7 +5,7 @@ use cfg_if::cfg_if;
use crate::future::Future; use crate::future::Future;
use crate::io::{self, Write}; use crate::io::{self, Write};
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, JoinHandle, Poll};
/// Constructs a new handle to the standard error of the current process. /// Constructs a new handle to the standard error of the current process.
/// ///
@ -56,7 +56,7 @@ enum State {
/// The stderr is blocked on an asynchronous operation. /// The stderr is blocked on an asynchronous operation.
/// ///
/// Awaiting this operation will result in the new state of the stderr. /// Awaiting this operation will result in the new state of the stderr.
Busy(blocking::JoinHandle<State>), Busy(JoinHandle<State>),
} }
/// Inner representation of the asynchronous stderr. /// Inner representation of the asynchronous stderr.

@ -5,7 +5,7 @@ use cfg_if::cfg_if;
use crate::future::{self, Future}; use crate::future::{self, Future};
use crate::io::{self, Read}; use crate::io::{self, Read};
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, JoinHandle, Poll};
/// Constructs a new handle to the standard input of the current process. /// Constructs a new handle to the standard input of the current process.
/// ///
@ -57,7 +57,7 @@ enum State {
/// The stdin is blocked on an asynchronous operation. /// The stdin is blocked on an asynchronous operation.
/// ///
/// Awaiting this operation will result in the new state of the stdin. /// Awaiting this operation will result in the new state of the stdin.
Busy(blocking::JoinHandle<State>), Busy(JoinHandle<State>),
} }
/// Inner representation of the asynchronous stdin. /// Inner representation of the asynchronous stdin.

@ -5,7 +5,7 @@ use cfg_if::cfg_if;
use crate::future::Future; use crate::future::Future;
use crate::io::{self, Write}; use crate::io::{self, Write};
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, JoinHandle, Poll};
/// Constructs a new handle to the standard output of the current process. /// Constructs a new handle to the standard output of the current process.
/// ///
@ -56,7 +56,7 @@ enum State {
/// The stdout is blocked on an asynchronous operation. /// The stdout is blocked on an asynchronous operation.
/// ///
/// Awaiting this operation will result in the new state of the stdout. /// Awaiting this operation will result in the new state of the stdout.
Busy(blocking::JoinHandle<State>), Busy(JoinHandle<State>),
} }
/// Inner representation of the asynchronous stdout. /// Inner representation of the asynchronous stdout.

@ -7,8 +7,7 @@ use cfg_if::cfg_if;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
use crate::task::blocking; use crate::task::{blocking, Context, JoinHandle, Poll};
use crate::task::{Context, Poll};
cfg_if! { cfg_if! {
if #[cfg(feature = "docs")] { if #[cfg(feature = "docs")] {
@ -48,7 +47,7 @@ pub trait ToSocketAddrs {
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub enum ToSocketAddrsFuture<'a, I> { pub enum ToSocketAddrsFuture<'a, I> {
Phantom(PhantomData<&'a ()>), Phantom(PhantomData<&'a ()>),
Join(blocking::JoinHandle<io::Result<I>>), Join(JoinHandle<io::Result<I>>),
Ready(Option<io::Result<I>>), Ready(Option<io::Result<I>>),
} }

@ -1,7 +1,5 @@
//! A thread pool for running blocking functions asynchronously. //! A thread pool for running blocking functions asynchronously.
use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
@ -10,7 +8,7 @@ use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use crate::future::Future; use crate::future::Future;
use crate::task::{Context, Poll}; use crate::task::task::{JoinHandle, Tag};
use crate::utils::abort_on_panic; use crate::utils::abort_on_panic;
const MAX_THREADS: u64 = 10_000; const MAX_THREADS: u64 = 10_000;
@ -18,8 +16,8 @@ const MAX_THREADS: u64 = 10_000;
static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0); static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
struct Pool { struct Pool {
sender: Sender<async_task::Task<()>>, sender: Sender<async_task::Task<Tag>>,
receiver: Receiver<async_task::Task<()>>, receiver: Receiver<async_task::Task<Tag>>,
} }
lazy_static! { lazy_static! {
@ -85,7 +83,7 @@ fn maybe_create_another_blocking_thread() {
// Enqueues work, attempting to send to the threadpool in a // Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if // nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work. // there is not a thread ready to accept the work.
fn schedule(t: async_task::Task<()>) { fn schedule(t: async_task::Task<Tag>) {
if let Err(err) = POOL.sender.try_send(t) { if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without // We were not able to send to the channel without
// blocking. Try to spin up another thread and then // blocking. Try to spin up another thread and then
@ -98,35 +96,15 @@ fn schedule(t: async_task::Task<()>) {
/// Spawns a blocking task. /// Spawns a blocking task.
/// ///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. /// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub fn spawn<F, R>(future: F) -> JoinHandle<R> pub(crate) fn spawn<F, R>(future: F) -> JoinHandle<R>
where where
F: Future<Output = R> + Send + 'static, F: Future<Output = R> + Send + 'static,
R: Send + 'static, R: Send + 'static,
{ {
let (task, handle) = async_task::spawn(future, schedule, ()); let tag = Tag::new(None);
let (task, handle) = async_task::spawn(future, schedule, tag);
task.schedule(); task.schedule();
JoinHandle(handle) JoinHandle::new(handle)
}
/// A handle to a blocking task.
pub struct JoinHandle<R>(async_task::JoinHandle<R, ()>);
impl<R> Unpin for JoinHandle<R> {}
impl<R> Future for JoinHandle<R> {
type Output = R;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx).map(|out| out.unwrap())
}
}
impl<R> fmt::Debug for JoinHandle<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinHandle")
.field("handle", &self.0)
.finish()
}
} }
/// Generates a random number in `0..n`. /// Generates a random number in `0..n`.

@ -48,3 +48,19 @@ mod task_local;
mod worker; mod worker;
pub(crate) mod blocking; pub(crate) mod blocking;
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
// Once this function stabilizes we should merge `blocking::spawn` into this so
// all code in our crate uses `task::blocking` too.
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub fn blocking<F, R>(future: F) -> task::JoinHandle<R>
where
F: crate::future::Future<Output = R> + Send + 'static,
R: Send + 'static,
{
blocking::spawn(future)
}

Loading…
Cancel
Save