mirror of
https://github.com/async-rs/async-std.git
synced 2025-02-28 15:19:41 +00:00
impl feedback
Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
This commit is contained in:
parent
df15c04f28
commit
647aab819f
7 changed files with 21 additions and 42 deletions
|
@ -5,7 +5,7 @@ use crate::fs::DirEntry;
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::io;
|
use crate::io;
|
||||||
use crate::stream::Stream;
|
use crate::stream::Stream;
|
||||||
use crate::task::{blocking, Context, Poll};
|
use crate::task::{blocking, Context, Poll, JoinHandle};
|
||||||
|
|
||||||
/// Returns a stream of entries in a directory.
|
/// Returns a stream of entries in a directory.
|
||||||
///
|
///
|
||||||
|
@ -71,7 +71,7 @@ pub struct ReadDir(State);
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum State {
|
enum State {
|
||||||
Idle(Option<std::fs::ReadDir>),
|
Idle(Option<std::fs::ReadDir>),
|
||||||
Busy(blocking::JoinHandle<(std::fs::ReadDir, Option<io::Result<std::fs::DirEntry>>)>),
|
Busy(JoinHandle<(std::fs::ReadDir, Option<io::Result<std::fs::DirEntry>>)>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReadDir {
|
impl ReadDir {
|
||||||
|
|
|
@ -5,7 +5,7 @@ use cfg_if::cfg_if;
|
||||||
|
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::io::{self, Write};
|
use crate::io::{self, Write};
|
||||||
use crate::task::{blocking, Context, Poll};
|
use crate::task::{blocking, Context, Poll, JoinHandle};
|
||||||
|
|
||||||
/// Constructs a new handle to the standard error of the current process.
|
/// Constructs a new handle to the standard error of the current process.
|
||||||
///
|
///
|
||||||
|
@ -56,7 +56,7 @@ enum State {
|
||||||
/// The stderr is blocked on an asynchronous operation.
|
/// The stderr is blocked on an asynchronous operation.
|
||||||
///
|
///
|
||||||
/// Awaiting this operation will result in the new state of the stderr.
|
/// Awaiting this operation will result in the new state of the stderr.
|
||||||
Busy(blocking::JoinHandle<State>),
|
Busy(JoinHandle<State>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inner representation of the asynchronous stderr.
|
/// Inner representation of the asynchronous stderr.
|
||||||
|
|
|
@ -5,7 +5,7 @@ use cfg_if::cfg_if;
|
||||||
|
|
||||||
use crate::future::{self, Future};
|
use crate::future::{self, Future};
|
||||||
use crate::io::{self, Read};
|
use crate::io::{self, Read};
|
||||||
use crate::task::{blocking, Context, Poll};
|
use crate::task::{blocking, Context, JoinHandle, Poll};
|
||||||
|
|
||||||
/// Constructs a new handle to the standard input of the current process.
|
/// Constructs a new handle to the standard input of the current process.
|
||||||
///
|
///
|
||||||
|
@ -57,7 +57,7 @@ enum State {
|
||||||
/// The stdin is blocked on an asynchronous operation.
|
/// The stdin is blocked on an asynchronous operation.
|
||||||
///
|
///
|
||||||
/// Awaiting this operation will result in the new state of the stdin.
|
/// Awaiting this operation will result in the new state of the stdin.
|
||||||
Busy(blocking::JoinHandle<State>),
|
Busy(JoinHandle<State>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inner representation of the asynchronous stdin.
|
/// Inner representation of the asynchronous stdin.
|
||||||
|
|
|
@ -5,7 +5,7 @@ use cfg_if::cfg_if;
|
||||||
|
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::io::{self, Write};
|
use crate::io::{self, Write};
|
||||||
use crate::task::{blocking, Context, Poll};
|
use crate::task::{blocking, Context, JoinHandle, Poll};
|
||||||
|
|
||||||
/// Constructs a new handle to the standard output of the current process.
|
/// Constructs a new handle to the standard output of the current process.
|
||||||
///
|
///
|
||||||
|
@ -56,7 +56,7 @@ enum State {
|
||||||
/// The stdout is blocked on an asynchronous operation.
|
/// The stdout is blocked on an asynchronous operation.
|
||||||
///
|
///
|
||||||
/// Awaiting this operation will result in the new state of the stdout.
|
/// Awaiting this operation will result in the new state of the stdout.
|
||||||
Busy(blocking::JoinHandle<State>),
|
Busy(JoinHandle<State>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Inner representation of the asynchronous stdout.
|
/// Inner representation of the asynchronous stdout.
|
||||||
|
|
|
@ -7,8 +7,7 @@ use cfg_if::cfg_if;
|
||||||
|
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::io;
|
use crate::io;
|
||||||
use crate::task::blocking;
|
use crate::task::{blocking, Context, JoinHandle, Poll};
|
||||||
use crate::task::{Context, Poll};
|
|
||||||
|
|
||||||
cfg_if! {
|
cfg_if! {
|
||||||
if #[cfg(feature = "docs")] {
|
if #[cfg(feature = "docs")] {
|
||||||
|
@ -48,7 +47,7 @@ pub trait ToSocketAddrs {
|
||||||
#[allow(missing_debug_implementations)]
|
#[allow(missing_debug_implementations)]
|
||||||
pub enum ToSocketAddrsFuture<'a, I> {
|
pub enum ToSocketAddrsFuture<'a, I> {
|
||||||
Phantom(PhantomData<&'a ()>),
|
Phantom(PhantomData<&'a ()>),
|
||||||
Join(blocking::JoinHandle<io::Result<I>>),
|
Join(JoinHandle<io::Result<I>>),
|
||||||
Ready(Option<io::Result<I>>),
|
Ready(Option<io::Result<I>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
//! A thread pool for running blocking functions asynchronously.
|
//! A thread pool for running blocking functions asynchronously.
|
||||||
|
|
||||||
use std::fmt;
|
|
||||||
use std::pin::Pin;
|
|
||||||
use std::sync::atomic::{AtomicU64, Ordering};
|
use std::sync::atomic::{AtomicU64, Ordering};
|
||||||
use std::thread;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -10,7 +8,7 @@ use crossbeam_channel::{bounded, Receiver, Sender};
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
|
|
||||||
use crate::future::Future;
|
use crate::future::Future;
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::task::{JoinHandle, Tag};
|
||||||
use crate::utils::abort_on_panic;
|
use crate::utils::abort_on_panic;
|
||||||
|
|
||||||
const MAX_THREADS: u64 = 10_000;
|
const MAX_THREADS: u64 = 10_000;
|
||||||
|
@ -18,8 +16,8 @@ const MAX_THREADS: u64 = 10_000;
|
||||||
static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
|
static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
|
||||||
|
|
||||||
struct Pool {
|
struct Pool {
|
||||||
sender: Sender<async_task::Task<()>>,
|
sender: Sender<async_task::Task<Tag>>,
|
||||||
receiver: Receiver<async_task::Task<()>>,
|
receiver: Receiver<async_task::Task<Tag>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
|
@ -85,7 +83,7 @@ fn maybe_create_another_blocking_thread() {
|
||||||
// Enqueues work, attempting to send to the threadpool in a
|
// Enqueues work, attempting to send to the threadpool in a
|
||||||
// nonblocking way and spinning up another worker thread if
|
// nonblocking way and spinning up another worker thread if
|
||||||
// there is not a thread ready to accept the work.
|
// there is not a thread ready to accept the work.
|
||||||
fn schedule(t: async_task::Task<()>) {
|
fn schedule(t: async_task::Task<Tag>) {
|
||||||
if let Err(err) = POOL.sender.try_send(t) {
|
if let Err(err) = POOL.sender.try_send(t) {
|
||||||
// We were not able to send to the channel without
|
// We were not able to send to the channel without
|
||||||
// blocking. Try to spin up another thread and then
|
// blocking. Try to spin up another thread and then
|
||||||
|
@ -98,35 +96,15 @@ fn schedule(t: async_task::Task<()>) {
|
||||||
/// Spawns a blocking task.
|
/// Spawns a blocking task.
|
||||||
///
|
///
|
||||||
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
|
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
|
||||||
pub fn spawn<F, R>(future: F) -> JoinHandle<R>
|
pub(crate) fn spawn<F, R>(future: F) -> JoinHandle<R>
|
||||||
where
|
where
|
||||||
F: Future<Output = R> + Send + 'static,
|
F: Future<Output = R> + Send + 'static,
|
||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
{
|
{
|
||||||
let (task, handle) = async_task::spawn(future, schedule, ());
|
let tag = Tag::new(None);
|
||||||
|
let (task, handle) = async_task::spawn(future, schedule, tag);
|
||||||
task.schedule();
|
task.schedule();
|
||||||
JoinHandle(handle)
|
JoinHandle::new(handle)
|
||||||
}
|
|
||||||
|
|
||||||
/// A handle to a blocking task.
|
|
||||||
pub struct JoinHandle<R>(async_task::JoinHandle<R, ()>);
|
|
||||||
|
|
||||||
impl<R> Unpin for JoinHandle<R> {}
|
|
||||||
|
|
||||||
impl<R> Future for JoinHandle<R> {
|
|
||||||
type Output = R;
|
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
|
||||||
Pin::new(&mut self.0).poll(cx).map(|out| out.unwrap())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<R> fmt::Debug for JoinHandle<R> {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
f.debug_struct("JoinHandle")
|
|
||||||
.field("handle", &self.0)
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generates a random number in `0..n`.
|
/// Generates a random number in `0..n`.
|
||||||
|
|
|
@ -52,10 +52,12 @@ pub(crate) mod blocking;
|
||||||
/// Spawns a blocking task.
|
/// Spawns a blocking task.
|
||||||
///
|
///
|
||||||
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
|
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
|
||||||
|
// Once this function stabilizes we should merge `blocking::spawn` into this so
|
||||||
|
// all code in our crate uses `task::blocking` too.
|
||||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn blocking<F, R>(future: F) -> blocking::JoinHandle<R>
|
pub fn blocking<F, R>(future: F) -> task::JoinHandle<R>
|
||||||
where
|
where
|
||||||
F: crate::future::Future<Output = R> + Send + 'static,
|
F: crate::future::Future<Output = R> + Send + 'static,
|
||||||
R: Send + 'static,
|
R: Send + 'static,
|
||||||
|
|
Loading…
Reference in a new issue