Merge pull request #299 from async-rs/blocking-updates

Blocking updates
yoshuawuyts-patch-1
Yoshua Wuyts 5 years ago committed by GitHub
commit d46364c834
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -32,5 +32,5 @@ use crate::task::blocking;
/// ```
pub async fn canonicalize<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::canonicalize(&path).map(Into::into) }).await
blocking::spawn(move || std::fs::canonicalize(&path).map(Into::into)).await
}

@ -41,5 +41,5 @@ use crate::task::blocking;
pub async fn copy<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<u64> {
let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned();
blocking::spawn(async move { std::fs::copy(&from, &to) }).await
blocking::spawn(move || std::fs::copy(&from, &to)).await
}

@ -34,5 +34,5 @@ use crate::task::blocking;
/// ```
pub async fn create_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::create_dir(path) }).await
blocking::spawn(move || std::fs::create_dir(path)).await
}

@ -29,5 +29,5 @@ use crate::task::blocking;
/// ```
pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::create_dir_all(path) }).await
blocking::spawn(move || std::fs::create_dir_all(path)).await
}

@ -108,7 +108,7 @@ impl DirBuilder {
}
let path = path.as_ref().to_owned();
async move { blocking::spawn(async move { builder.create(path) }).await }
async move { blocking::spawn(move || builder.create(path)).await }
}
}

@ -89,7 +89,7 @@ impl DirEntry {
/// ```
pub async fn metadata(&self) -> io::Result<Metadata> {
let inner = self.0.clone();
blocking::spawn(async move { inner.metadata() }).await
blocking::spawn(move || inner.metadata()).await
}
/// Reads the file type for this entry.
@ -127,7 +127,7 @@ impl DirEntry {
/// ```
pub async fn file_type(&self) -> io::Result<FileType> {
let inner = self.0.clone();
blocking::spawn(async move { inner.file_type() }).await
blocking::spawn(move || inner.file_type()).await
}
/// Returns the bare name of this entry without the leading path.

@ -97,7 +97,7 @@ impl File {
/// ```
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned();
let file = blocking::spawn(async move { std::fs::File::open(&path) }).await?;
let file = blocking::spawn(move || std::fs::File::open(&path)).await?;
Ok(file.into())
}
@ -132,7 +132,7 @@ impl File {
/// ```
pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned();
let file = blocking::spawn(async move { std::fs::File::create(&path) }).await?;
let file = blocking::spawn(move || std::fs::File::create(&path)).await?;
Ok(file.into())
}
@ -165,7 +165,7 @@ impl File {
})
.await?;
blocking::spawn(async move { state.file.sync_all() }).await
blocking::spawn(move || state.file.sync_all()).await
}
/// Synchronizes OS-internal buffered contents to disk.
@ -201,7 +201,7 @@ impl File {
})
.await?;
blocking::spawn(async move { state.file.sync_data() }).await
blocking::spawn(move || state.file.sync_data()).await
}
/// Truncates or extends the file.
@ -234,7 +234,7 @@ impl File {
})
.await?;
blocking::spawn(async move { state.file.set_len(size) }).await
blocking::spawn(move || state.file.set_len(size)).await
}
/// Reads the file's metadata.
@ -253,7 +253,7 @@ impl File {
/// ```
pub async fn metadata(&self) -> io::Result<Metadata> {
let file = self.file.clone();
blocking::spawn(async move { file.metadata() }).await
blocking::spawn(move || file.metadata()).await
}
/// Changes the permissions on the file.
@ -282,7 +282,7 @@ impl File {
/// ```
pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
let file = self.file.clone();
blocking::spawn(async move { file.set_permissions(perm) }).await
blocking::spawn(move || file.set_permissions(perm)).await
}
}
@ -702,7 +702,7 @@ impl LockGuard<State> {
self.register(cx);
// Start a read operation asynchronously.
blocking::spawn(async move {
blocking::spawn(move || {
// Read some data from the file into the cache.
let res = {
let State { file, cache, .. } = &mut *self;
@ -811,7 +811,7 @@ impl LockGuard<State> {
self.register(cx);
// Start a write operation asynchronously.
blocking::spawn(async move {
blocking::spawn(move || {
match (&*self.file).write_all(&self.cache) {
Ok(_) => {
// Switch to idle mode.
@ -844,7 +844,7 @@ impl LockGuard<State> {
self.register(cx);
// Start a flush operation asynchronously.
blocking::spawn(async move {
blocking::spawn(move || {
match (&*self.file).flush() {
Ok(()) => {
// Mark the file as flushed.

@ -32,5 +32,5 @@ use crate::task::blocking;
pub async fn hard_link<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned();
blocking::spawn(async move { std::fs::hard_link(&from, &to) }).await
blocking::spawn(move || std::fs::hard_link(&from, &to)).await
}

@ -36,7 +36,7 @@ use crate::task::blocking;
/// ```
pub async fn metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::metadata(path) }).await
blocking::spawn(move || std::fs::metadata(path)).await
}
cfg_if! {

@ -285,7 +285,7 @@ impl OpenOptions {
pub fn open<P: AsRef<Path>>(&self, path: P) -> impl Future<Output = io::Result<File>> {
let path = path.as_ref().to_owned();
let options = self.0.clone();
async move { blocking::spawn(async move { options.open(path).map(|f| f.into()) }).await }
async move { blocking::spawn(move || options.open(path).map(|f| f.into())).await }
}
}

@ -36,5 +36,5 @@ use crate::task::blocking;
/// ```
pub async fn read<P: AsRef<Path>>(path: P) -> io::Result<Vec<u8>> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::read(path) }).await
blocking::spawn(move || std::fs::read(path)).await
}

@ -45,7 +45,7 @@ use crate::task::{blocking, Context, JoinHandle, Poll};
/// ```
pub async fn read_dir<P: AsRef<Path>>(path: P) -> io::Result<ReadDir> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::read_dir(path) })
blocking::spawn(move || std::fs::read_dir(path))
.await
.map(ReadDir::new)
}
@ -91,7 +91,7 @@ impl Stream for ReadDir {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
self.0 = State::Busy(blocking::spawn(async move {
self.0 = State::Busy(blocking::spawn(move || {
let next = inner.next();
(inner, next)
}));

@ -28,5 +28,5 @@ use crate::task::blocking;
/// ```
pub async fn read_link<P: AsRef<Path>>(path: P) -> io::Result<PathBuf> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::read_link(path).map(Into::into) }).await
blocking::spawn(move || std::fs::read_link(path).map(Into::into)).await
}

@ -37,5 +37,5 @@ use crate::task::blocking;
/// ```
pub async fn read_to_string<P: AsRef<Path>>(path: P) -> io::Result<String> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::read_to_string(path) }).await
blocking::spawn(move || std::fs::read_to_string(path)).await
}

@ -29,5 +29,5 @@ use crate::task::blocking;
/// ```
pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::remove_dir(path) }).await
blocking::spawn(move || std::fs::remove_dir(path)).await
}

@ -29,5 +29,5 @@ use crate::task::blocking;
/// ```
pub async fn remove_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::remove_dir_all(path) }).await
blocking::spawn(move || std::fs::remove_dir_all(path)).await
}

@ -29,5 +29,5 @@ use crate::task::blocking;
/// ```
pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::remove_file(path) }).await
blocking::spawn(move || std::fs::remove_file(path)).await
}

@ -34,5 +34,5 @@ use crate::task::blocking;
pub async fn rename<P: AsRef<Path>, Q: AsRef<Path>>(from: P, to: Q) -> io::Result<()> {
let from = from.as_ref().to_owned();
let to = to.as_ref().to_owned();
blocking::spawn(async move { std::fs::rename(&from, &to) }).await
blocking::spawn(move || std::fs::rename(&from, &to)).await
}

@ -32,5 +32,5 @@ use crate::task::blocking;
/// ```
pub async fn set_permissions<P: AsRef<Path>>(path: P, perm: Permissions) -> io::Result<()> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::set_permissions(path, perm) }).await
blocking::spawn(move || std::fs::set_permissions(path, perm)).await
}

@ -34,5 +34,5 @@ use crate::task::blocking;
/// ```
pub async fn symlink_metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> {
let path = path.as_ref().to_owned();
blocking::spawn(async move { std::fs::symlink_metadata(path) }).await
blocking::spawn(move || std::fs::symlink_metadata(path)).await
}

@ -33,5 +33,5 @@ use crate::task::blocking;
pub async fn write<P: AsRef<Path>, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> {
let path = path.as_ref().to_owned();
let contents = contents.as_ref().to_owned();
blocking::spawn(async move { std::fs::write(path, contents) }).await
blocking::spawn(move || std::fs::write(path, contents)).await
}

@ -116,7 +116,7 @@ impl Write for Stderr {
inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
*state = State::Busy(blocking::spawn(move || {
let res = std::io::Write::write(&mut inner.stderr, &inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
@ -144,7 +144,7 @@ impl Write for Stderr {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
*state = State::Busy(blocking::spawn(move || {
let res = std::io::Write::flush(&mut inner.stderr);
inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner))

@ -119,7 +119,7 @@ impl Stdin {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
*state = State::Busy(blocking::spawn(move || {
inner.line.clear();
let res = inner.stdin.read_line(&mut inner.line);
inner.last_op = Some(Operation::ReadLine(res));
@ -172,7 +172,7 @@ impl Read for Stdin {
}
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
*state = State::Busy(blocking::spawn(move || {
let res = std::io::Read::read(&mut inner.stdin, &mut inner.buf);
inner.last_op = Some(Operation::Read(res));
State::Idle(Some(inner))

@ -116,7 +116,7 @@ impl Write for Stdout {
inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
*state = State::Busy(blocking::spawn(move || {
let res = std::io::Write::write(&mut inner.stdout, &inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
@ -144,7 +144,7 @@ impl Write for Stdout {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
*state = State::Busy(blocking::spawn(move || {
let res = std::io::Write::flush(&mut inner.stdout);
inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner))

@ -196,7 +196,7 @@ impl ToSocketAddrs for (&str, u16) {
}
let host = host.to_string();
let task = blocking::spawn(async move {
let task = blocking::spawn(move || {
std::net::ToSocketAddrs::to_socket_addrs(&(host.as_str(), port))
});
ToSocketAddrsFuture::Resolving(task)
@ -217,8 +217,7 @@ impl ToSocketAddrs for str {
}
let addr = self.to_string();
let task =
blocking::spawn(async move { std::net::ToSocketAddrs::to_socket_addrs(addr.as_str()) });
let task = blocking::spawn(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str()));
ToSocketAddrsFuture::Resolving(task)
}
}

@ -76,7 +76,7 @@ impl TcpStream {
let mut last_err = None;
for addr in addrs.to_socket_addrs().await? {
let res = blocking::spawn(async move {
let res = blocking::spawn(move || {
let std_stream = std::net::TcpStream::connect(addr)?;
let mio_stream = mio::net::TcpStream::from_stream(std_stream)?;
Ok(TcpStream {

@ -28,7 +28,7 @@ use crate::task::blocking;
pub async fn symlink<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Result<()> {
let src = src.as_ref().to_owned();
let dst = dst.as_ref().to_owned();
blocking::spawn(async move { std::os::unix::fs::symlink(&src, &dst) }).await
blocking::spawn(move || std::os::unix::fs::symlink(&src, &dst)).await
}
cfg_if! {

@ -67,7 +67,7 @@ impl UnixDatagram {
/// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
let path = path.as_ref().to_owned();
let socket = blocking::spawn(async move { mio_uds::UnixDatagram::bind(path) }).await?;
let socket = blocking::spawn(move || mio_uds::UnixDatagram::bind(path)).await?;
Ok(UnixDatagram::new(socket))
}

@ -68,7 +68,7 @@ impl UnixListener {
/// ```
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
let path = path.as_ref().to_owned();
let listener = blocking::spawn(async move { mio_uds::UnixListener::bind(path) }).await?;
let listener = blocking::spawn(move || mio_uds::UnixListener::bind(path)).await?;
Ok(UnixListener {
watcher: Watcher::new(listener),

@ -58,7 +58,7 @@ impl UnixStream {
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
let path = path.as_ref().to_owned();
blocking::spawn(async move {
blocking::spawn(move || {
let std_stream = std::os::unix::net::UnixStream::connect(path)?;
let mio_stream = mio_uds::UnixStream::from_stream(std_stream)?;
Ok(UnixStream {

@ -19,9 +19,9 @@ use kv_log_macro::trace;
/// Calling this function is similar to [spawning] a thread and immediately [joining] it, except an
/// asynchronous task will be spawned.
///
/// See also: [`task::blocking`].
/// See also: [`task::spawn_blocking`].
///
/// [`task::blocking`]: fn.blocking.html
/// [`task::spawn_blocking`]: fn.spawn_blocking.html
///
/// [spawning]: https://doc.rust-lang.org/std/thread/fn.spawn.html
/// [joining]: https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join

@ -7,7 +7,6 @@ use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static;
use crate::future::Future;
use crate::task::task::{JoinHandle, Tag};
use crate::utils::abort_on_panic;
@ -96,12 +95,13 @@ fn schedule(t: async_task::Task<Tag>) {
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub(crate) fn spawn<F, R>(future: F) -> JoinHandle<R>
pub(crate) fn spawn<F, R>(f: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let tag = Tag::new(None);
let future = async move { f() };
let (task, handle) = async_task::spawn(future, schedule, tag);
task.schedule();
JoinHandle::new(handle)

@ -60,9 +60,10 @@ cfg_if::cfg_if! {
/// is useful to prevent long-running synchronous operations from blocking the main futures
/// executor.
///
/// See also: [`task::block_on`].
/// See also: [`task::block_on`], [`task::spawn`].
///
/// [`task::block_on`]: fn.block_on.html
/// [`task::spawn`]: fn.spawn.html
///
/// # Examples
///
@ -73,7 +74,7 @@ cfg_if::cfg_if! {
/// #
/// use async_std::task;
///
/// task::blocking(async {
/// task::spawn_blocking(|| {
/// println!("long-running task here");
/// }).await;
/// #
@ -84,10 +85,10 @@ cfg_if::cfg_if! {
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub fn blocking<F, R>(future: F) -> task::JoinHandle<R>
pub fn spawn_blocking<F, R>(f: F) -> task::JoinHandle<R>
where
F: crate::future::Future<Output = R> + Send + 'static,
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
blocking::spawn(future)
blocking::spawn(f)
}

Loading…
Cancel
Save