forked from mirror/async-std
Replace select!/try_select! with Future::{race,try_race} (#405)
* init Future::select Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * implement Future::select Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * try_select Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * fixes Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * works Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * pass clippy Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * please clippy Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * implement feedback from stjepan Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * rename select to race Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com> * fmt Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
This commit is contained in:
parent
3a2e6d5b92
commit
735fa6954e
5 changed files with 234 additions and 17 deletions
|
@ -1,9 +1,13 @@
|
||||||
cfg_unstable! {
|
cfg_unstable! {
|
||||||
mod delay;
|
mod delay;
|
||||||
|
mod race;
|
||||||
|
mod try_race;
|
||||||
|
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use delay::DelayFuture;
|
use delay::DelayFuture;
|
||||||
|
use race::Race;
|
||||||
|
use try_race::TryRace;
|
||||||
}
|
}
|
||||||
|
|
||||||
extension_trait! {
|
extension_trait! {
|
||||||
|
@ -129,6 +133,94 @@ extension_trait! {
|
||||||
{
|
{
|
||||||
DelayFuture::new(self, dur)
|
DelayFuture::new(self, dur)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[doc = r#"
|
||||||
|
Waits for one of two similarly-typed futures to complete.
|
||||||
|
|
||||||
|
Awaits multiple futures simultaneously, returning the output of the
|
||||||
|
first future that completes.
|
||||||
|
|
||||||
|
This function will return a new future which awaits for either one of both
|
||||||
|
futures to complete. If multiple futures are completed at the same time,
|
||||||
|
resolution will occur in the order that they have been passed.
|
||||||
|
|
||||||
|
Note that this macro consumes all futures passed, and once a future is
|
||||||
|
completed, all other futures are dropped.
|
||||||
|
|
||||||
|
This macro is only usable inside of async functions, closures, and blocks.
|
||||||
|
|
||||||
|
# Examples
|
||||||
|
|
||||||
|
```
|
||||||
|
# async_std::task::block_on(async {
|
||||||
|
use async_std::prelude::*;
|
||||||
|
use async_std::future;
|
||||||
|
|
||||||
|
let a = future::pending();
|
||||||
|
let b = future::ready(1u8);
|
||||||
|
let c = future::ready(2u8);
|
||||||
|
|
||||||
|
let f = a.race(b).race(c);
|
||||||
|
assert_eq!(f.await, 1u8);
|
||||||
|
# });
|
||||||
|
```
|
||||||
|
"#]
|
||||||
|
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||||
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
|
fn race<F>(
|
||||||
|
self,
|
||||||
|
other: F
|
||||||
|
) -> impl Future<Output = <Self as std::future::Future>::Output> [Race<Self, F>]
|
||||||
|
where
|
||||||
|
Self: std::future::Future + Sized,
|
||||||
|
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
|
||||||
|
{
|
||||||
|
Race::new(self, other)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[doc = r#"
|
||||||
|
Waits for one of two similarly-typed fallible futures to complete.
|
||||||
|
|
||||||
|
Awaits multiple futures simultaneously, returning all results once complete.
|
||||||
|
|
||||||
|
`try_race` is similar to [`race`], but keeps going if a future
|
||||||
|
resolved to an error until all futures have been resolved. In which case
|
||||||
|
an error is returned.
|
||||||
|
|
||||||
|
The ordering of which value is yielded when two futures resolve
|
||||||
|
simultaneously is intentionally left unspecified.
|
||||||
|
|
||||||
|
# Examples
|
||||||
|
|
||||||
|
```
|
||||||
|
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||||
|
#
|
||||||
|
use async_std::prelude::*;
|
||||||
|
use async_std::future;
|
||||||
|
use std::io::{Error, ErrorKind};
|
||||||
|
|
||||||
|
let a = future::pending::<Result<_, Error>>();
|
||||||
|
let b = future::ready(Err(Error::from(ErrorKind::Other)));
|
||||||
|
let c = future::ready(Ok(1u8));
|
||||||
|
|
||||||
|
let f = a.try_race(b).try_race(c);
|
||||||
|
assert_eq!(f.await?, 1u8);
|
||||||
|
#
|
||||||
|
# Ok(()) }) }
|
||||||
|
```
|
||||||
|
"#]
|
||||||
|
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||||
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
|
fn try_race<F: std::future::Future, T, E>(
|
||||||
|
self,
|
||||||
|
other: F
|
||||||
|
) -> impl Future<Output = <Self as std::future::Future>::Output> [TryRace<Self, F>]
|
||||||
|
where
|
||||||
|
Self: std::future::Future<Output = Result<T, E>> + Sized,
|
||||||
|
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
|
||||||
|
{
|
||||||
|
TryRace::new(self, other)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<F: Future + Unpin + ?Sized> Future for Box<F> {
|
impl<F: Future + Unpin + ?Sized> Future for Box<F> {
|
57
src/future/future/race.rs
Normal file
57
src/future/future/race.rs
Normal file
|
@ -0,0 +1,57 @@
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use async_macros::MaybeDone;
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
use std::future::Future;
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
#[allow(missing_docs)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct Race<L, R>
|
||||||
|
where
|
||||||
|
L: Future,
|
||||||
|
R: Future<Output = L::Output>
|
||||||
|
{
|
||||||
|
#[pin] left: MaybeDone<L>,
|
||||||
|
#[pin] right: MaybeDone<R>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L, R> Race<L, R>
|
||||||
|
where
|
||||||
|
L: Future,
|
||||||
|
R: Future<Output = L::Output>,
|
||||||
|
{
|
||||||
|
pub(crate) fn new(left: L, right: R) -> Self {
|
||||||
|
Self {
|
||||||
|
left: MaybeDone::new(left),
|
||||||
|
right: MaybeDone::new(right),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L, R> Future for Race<L, R>
|
||||||
|
where
|
||||||
|
L: Future,
|
||||||
|
R: Future<Output = L::Output>,
|
||||||
|
{
|
||||||
|
type Output = L::Output;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let this = self.project();
|
||||||
|
|
||||||
|
let mut left = this.left;
|
||||||
|
if Future::poll(Pin::new(&mut left), cx).is_ready() {
|
||||||
|
return Poll::Ready(left.take().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut right = this.right;
|
||||||
|
if Future::poll(Pin::new(&mut right), cx).is_ready() {
|
||||||
|
return Poll::Ready(right.take().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
66
src/future/future/try_race.rs
Normal file
66
src/future/future/try_race.rs
Normal file
|
@ -0,0 +1,66 @@
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use async_macros::MaybeDone;
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
|
use crate::task::{Context, Poll};
|
||||||
|
use std::future::Future;
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
#[allow(missing_docs)]
|
||||||
|
#[allow(missing_debug_implementations)]
|
||||||
|
pub struct TryRace<L, R>
|
||||||
|
where
|
||||||
|
L: Future,
|
||||||
|
R: Future<Output = L::Output>
|
||||||
|
{
|
||||||
|
#[pin] left: MaybeDone<L>,
|
||||||
|
#[pin] right: MaybeDone<R>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L, R> TryRace<L, R>
|
||||||
|
where
|
||||||
|
L: Future,
|
||||||
|
R: Future<Output = L::Output>,
|
||||||
|
{
|
||||||
|
pub(crate) fn new(left: L, right: R) -> Self {
|
||||||
|
Self {
|
||||||
|
left: MaybeDone::new(left),
|
||||||
|
right: MaybeDone::new(right),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<L, R, T, E> Future for TryRace<L, R>
|
||||||
|
where
|
||||||
|
L: Future<Output = Result<T, E>>,
|
||||||
|
R: Future<Output = L::Output>,
|
||||||
|
{
|
||||||
|
type Output = L::Output;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
let this = self.project();
|
||||||
|
let mut left_errored = false;
|
||||||
|
|
||||||
|
// Check if the left future is ready & successful. Continue if not.
|
||||||
|
let mut left = this.left;
|
||||||
|
if Future::poll(Pin::new(&mut left), cx).is_ready() {
|
||||||
|
if left.as_ref().output().unwrap().is_ok() {
|
||||||
|
return Poll::Ready(left.take().unwrap());
|
||||||
|
} else {
|
||||||
|
left_errored = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if the right future is ready & successful. Return err if left
|
||||||
|
// future also resolved to err. Continue if not.
|
||||||
|
let mut right = this.right;
|
||||||
|
let is_ready = Future::poll(Pin::new(&mut right), cx).is_ready();
|
||||||
|
if is_ready && (right.as_ref().output().unwrap().is_ok() || left_errored) {
|
||||||
|
return Poll::Ready(right.take().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,7 +4,7 @@
|
||||||
//!
|
//!
|
||||||
//! Often it's desireable to await multiple futures as if it was a single
|
//! Often it's desireable to await multiple futures as if it was a single
|
||||||
//! future. The `join` family of operations converts multiple futures into a
|
//! future. The `join` family of operations converts multiple futures into a
|
||||||
//! single future that returns all of their outputs. The `select` family of
|
//! single future that returns all of their outputs. The `race` family of
|
||||||
//! operations converts multiple future into a single future that returns the
|
//! operations converts multiple future into a single future that returns the
|
||||||
//! first output.
|
//! first output.
|
||||||
//!
|
//!
|
||||||
|
@ -12,8 +12,8 @@
|
||||||
//!
|
//!
|
||||||
//! | Name | Return signature | When does it return? |
|
//! | Name | Return signature | When does it return? |
|
||||||
//! | --- | --- | --- |
|
//! | --- | --- | --- |
|
||||||
//! | `future::join` | `(T1, T2)` | Wait for all to complete
|
//! | [`future::join!`] | `(T1, T2)` | Wait for all to complete
|
||||||
//! | `future::select` | `T` | Return on first value
|
//! | [`Future::race`] | `T` | Return on first value
|
||||||
//!
|
//!
|
||||||
//! ## Fallible Futures Concurrency
|
//! ## Fallible Futures Concurrency
|
||||||
//!
|
//!
|
||||||
|
@ -25,9 +25,9 @@
|
||||||
//! futures are dropped and an error is returned. This is referred to as
|
//! futures are dropped and an error is returned. This is referred to as
|
||||||
//! "short-circuiting".
|
//! "short-circuiting".
|
||||||
//!
|
//!
|
||||||
//! In the case of `try_select`, instead of returning the first future that
|
//! In the case of `try_race`, instead of returning the first future that
|
||||||
//! completes it returns the first future that _successfully_ completes. This
|
//! completes it returns the first future that _successfully_ completes. This
|
||||||
//! means `try_select` will keep going until any one of the futures returns
|
//! means `try_race` will keep going until any one of the futures returns
|
||||||
//! `Ok`, or _all_ futures have returned `Err`.
|
//! `Ok`, or _all_ futures have returned `Err`.
|
||||||
//!
|
//!
|
||||||
//! However sometimes it can be useful to use the base variants of the macros
|
//! However sometimes it can be useful to use the base variants of the macros
|
||||||
|
@ -36,10 +36,15 @@
|
||||||
//!
|
//!
|
||||||
//! | Name | Return signature | When does it return? |
|
//! | Name | Return signature | When does it return? |
|
||||||
//! | --- | --- | --- |
|
//! | --- | --- | --- |
|
||||||
//! | `future::join` | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
|
//! | [`future::join!`] | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
|
||||||
//! | `future::try_join` | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
|
//! | [`future::try_join!`] | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
|
||||||
//! | `future::select` | `Result<T, E>` | Return on first value
|
//! | [`Future::race`] | `Result<T, E>` | Return on first value
|
||||||
//! | `future::try_select` | `Result<T, E>` | Return on first `Ok`, reject on last Err
|
//! | [`Future::try_race`] | `Result<T, E>` | Return on first `Ok`, reject on last Err
|
||||||
|
//!
|
||||||
|
//! [`future::join!`]: macro.join.html
|
||||||
|
//! [`future::try_join!`]: macro.try_join.html
|
||||||
|
//! [`Future::race`]: trait.Future.html#method.race
|
||||||
|
//! [`Future::try_race`]: trait.Future.html#method.try_race
|
||||||
|
|
||||||
#[doc(inline)]
|
#[doc(inline)]
|
||||||
pub use async_macros::{join, try_join};
|
pub use async_macros::{join, try_join};
|
||||||
|
@ -57,9 +62,6 @@ mod ready;
|
||||||
mod timeout;
|
mod timeout;
|
||||||
|
|
||||||
cfg_unstable! {
|
cfg_unstable! {
|
||||||
#[doc(inline)]
|
|
||||||
pub use async_macros::{select, try_select};
|
|
||||||
|
|
||||||
pub use into_future::IntoFuture;
|
pub use into_future::IntoFuture;
|
||||||
mod into_future;
|
mod into_future;
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,7 +190,7 @@ macro_rules! extension_trait {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Parse the return type in an extension method.
|
// Parse the return type in an extension method.
|
||||||
(@doc ($($head:tt)*) -> impl Future<Output = $out:ty> [$f:ty] $($tail:tt)*) => {
|
(@doc ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {
|
||||||
extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*);
|
extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*);
|
||||||
};
|
};
|
||||||
(@ext ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {
|
(@ext ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {
|
||||||
|
|
Loading…
Reference in a new issue