From 735fa6954ee62602c2aa34c3b125830be403645c Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 2 Nov 2019 23:00:03 +0100 Subject: [PATCH] Replace select!/try_select! with Future::{race,try_race} (#405) * init Future::select Signed-off-by: Yoshua Wuyts * implement Future::select Signed-off-by: Yoshua Wuyts * try_select Signed-off-by: Yoshua Wuyts * fixes Signed-off-by: Yoshua Wuyts * works Signed-off-by: Yoshua Wuyts * pass clippy Signed-off-by: Yoshua Wuyts * please clippy Signed-off-by: Yoshua Wuyts * implement feedback from stjepan Signed-off-by: Yoshua Wuyts * rename select to race Signed-off-by: Yoshua Wuyts * fmt Signed-off-by: Yoshua Wuyts --- src/future/{future.rs => future/mod.rs} | 92 +++++++++++++++++++++++++ src/future/future/race.rs | 57 +++++++++++++++ src/future/future/try_race.rs | 66 ++++++++++++++++++ src/future/mod.rs | 34 ++++----- src/utils.rs | 2 +- 5 files changed, 234 insertions(+), 17 deletions(-) rename src/future/{future.rs => future/mod.rs} (67%) create mode 100644 src/future/future/race.rs create mode 100644 src/future/future/try_race.rs diff --git a/src/future/future.rs b/src/future/future/mod.rs similarity index 67% rename from src/future/future.rs rename to src/future/future/mod.rs index fe68517..dad94da 100644 --- a/src/future/future.rs +++ b/src/future/future/mod.rs @@ -1,9 +1,13 @@ cfg_unstable! { mod delay; + mod race; + mod try_race; use std::time::Duration; use delay::DelayFuture; + use race::Race; + use try_race::TryRace; } extension_trait! { @@ -129,6 +133,94 @@ extension_trait! { { DelayFuture::new(self, dur) } + + #[doc = r#" + Waits for one of two similarly-typed futures to complete. + + Awaits multiple futures simultaneously, returning the output of the + first future that completes. + + This function will return a new future which awaits for either one of both + futures to complete. If multiple futures are completed at the same time, + resolution will occur in the order that they have been passed. + + Note that this macro consumes all futures passed, and once a future is + completed, all other futures are dropped. + + This macro is only usable inside of async functions, closures, and blocks. + + # Examples + + ``` + # async_std::task::block_on(async { + use async_std::prelude::*; + use async_std::future; + + let a = future::pending(); + let b = future::ready(1u8); + let c = future::ready(2u8); + + let f = a.race(b).race(c); + assert_eq!(f.await, 1u8); + # }); + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn race( + self, + other: F + ) -> impl Future::Output> [Race] + where + Self: std::future::Future + Sized, + F: std::future::Future::Output>, + { + Race::new(self, other) + } + + #[doc = r#" + Waits for one of two similarly-typed fallible futures to complete. + + Awaits multiple futures simultaneously, returning all results once complete. + + `try_race` is similar to [`race`], but keeps going if a future + resolved to an error until all futures have been resolved. In which case + an error is returned. + + The ordering of which value is yielded when two futures resolve + simultaneously is intentionally left unspecified. + + # Examples + + ``` + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::future; + use std::io::{Error, ErrorKind}; + + let a = future::pending::>(); + let b = future::ready(Err(Error::from(ErrorKind::Other))); + let c = future::ready(Ok(1u8)); + + let f = a.try_race(b).try_race(c); + assert_eq!(f.await?, 1u8); + # + # Ok(()) }) } + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn try_race( + self, + other: F + ) -> impl Future::Output> [TryRace] + where + Self: std::future::Future> + Sized, + F: std::future::Future::Output>, + { + TryRace::new(self, other) + } } impl Future for Box { diff --git a/src/future/future/race.rs b/src/future/future/race.rs new file mode 100644 index 0000000..2fd604a --- /dev/null +++ b/src/future/future/race.rs @@ -0,0 +1,57 @@ +use std::pin::Pin; + +use async_macros::MaybeDone; +use pin_project_lite::pin_project; + +use crate::task::{Context, Poll}; +use std::future::Future; + +pin_project! { + #[allow(missing_docs)] + #[allow(missing_debug_implementations)] + pub struct Race + where + L: Future, + R: Future + { + #[pin] left: MaybeDone, + #[pin] right: MaybeDone, + } +} + +impl Race +where + L: Future, + R: Future, +{ + pub(crate) fn new(left: L, right: R) -> Self { + Self { + left: MaybeDone::new(left), + right: MaybeDone::new(right), + } + } +} + +impl Future for Race +where + L: Future, + R: Future, +{ + type Output = L::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let mut left = this.left; + if Future::poll(Pin::new(&mut left), cx).is_ready() { + return Poll::Ready(left.take().unwrap()); + } + + let mut right = this.right; + if Future::poll(Pin::new(&mut right), cx).is_ready() { + return Poll::Ready(right.take().unwrap()); + } + + Poll::Pending + } +} diff --git a/src/future/future/try_race.rs b/src/future/future/try_race.rs new file mode 100644 index 0000000..d0ca4a9 --- /dev/null +++ b/src/future/future/try_race.rs @@ -0,0 +1,66 @@ +use std::pin::Pin; + +use async_macros::MaybeDone; +use pin_project_lite::pin_project; + +use crate::task::{Context, Poll}; +use std::future::Future; + +pin_project! { + #[allow(missing_docs)] + #[allow(missing_debug_implementations)] + pub struct TryRace + where + L: Future, + R: Future + { + #[pin] left: MaybeDone, + #[pin] right: MaybeDone, + } +} + +impl TryRace +where + L: Future, + R: Future, +{ + pub(crate) fn new(left: L, right: R) -> Self { + Self { + left: MaybeDone::new(left), + right: MaybeDone::new(right), + } + } +} + +impl Future for TryRace +where + L: Future>, + R: Future, +{ + type Output = L::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let mut left_errored = false; + + // Check if the left future is ready & successful. Continue if not. + let mut left = this.left; + if Future::poll(Pin::new(&mut left), cx).is_ready() { + if left.as_ref().output().unwrap().is_ok() { + return Poll::Ready(left.take().unwrap()); + } else { + left_errored = true; + } + } + + // Check if the right future is ready & successful. Return err if left + // future also resolved to err. Continue if not. + let mut right = this.right; + let is_ready = Future::poll(Pin::new(&mut right), cx).is_ready(); + if is_ready && (right.as_ref().output().unwrap().is_ok() || left_errored) { + return Poll::Ready(right.take().unwrap()); + } + + Poll::Pending + } +} diff --git a/src/future/mod.rs b/src/future/mod.rs index a45bf96..dd28f28 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -4,16 +4,16 @@ //! //! Often it's desireable to await multiple futures as if it was a single //! future. The `join` family of operations converts multiple futures into a -//! single future that returns all of their outputs. The `select` family of +//! single future that returns all of their outputs. The `race` family of //! operations converts multiple future into a single future that returns the //! first output. //! //! For operating on futures the following macros can be used: //! -//! | Name | Return signature | When does it return? | -//! | --- | --- | --- | -//! | `future::join` | `(T1, T2)` | Wait for all to complete -//! | `future::select` | `T` | Return on first value +//! | Name | Return signature | When does it return? | +//! | --- | --- | --- | +//! | [`future::join!`] | `(T1, T2)` | Wait for all to complete +//! | [`Future::race`] | `T` | Return on first value //! //! ## Fallible Futures Concurrency //! @@ -25,21 +25,26 @@ //! futures are dropped and an error is returned. This is referred to as //! "short-circuiting". //! -//! In the case of `try_select`, instead of returning the first future that +//! In the case of `try_race`, instead of returning the first future that //! completes it returns the first future that _successfully_ completes. This -//! means `try_select` will keep going until any one of the futures returns +//! means `try_race` will keep going until any one of the futures returns //! `Ok`, or _all_ futures have returned `Err`. //! //! However sometimes it can be useful to use the base variants of the macros //! even on futures that return `Result`. Here is an overview of operations that //! work on `Result`, and their respective semantics: //! -//! | Name | Return signature | When does it return? | -//! | --- | --- | --- | -//! | `future::join` | `(Result, Result)` | Wait for all to complete -//! | `future::try_join` | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete -//! | `future::select` | `Result` | Return on first value -//! | `future::try_select` | `Result` | Return on first `Ok`, reject on last Err +//! | Name | Return signature | When does it return? | +//! | --- | --- | --- | +//! | [`future::join!`] | `(Result, Result)` | Wait for all to complete +//! | [`future::try_join!`] | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete +//! | [`Future::race`] | `Result` | Return on first value +//! | [`Future::try_race`] | `Result` | Return on first `Ok`, reject on last Err +//! +//! [`future::join!`]: macro.join.html +//! [`future::try_join!`]: macro.try_join.html +//! [`Future::race`]: trait.Future.html#method.race +//! [`Future::try_race`]: trait.Future.html#method.try_race #[doc(inline)] pub use async_macros::{join, try_join}; @@ -57,9 +62,6 @@ mod ready; mod timeout; cfg_unstable! { - #[doc(inline)] - pub use async_macros::{select, try_select}; - pub use into_future::IntoFuture; mod into_future; } diff --git a/src/utils.rs b/src/utils.rs index 4f3ffe4..a1d8510 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -190,7 +190,7 @@ macro_rules! extension_trait { }; // Parse the return type in an extension method. - (@doc ($($head:tt)*) -> impl Future [$f:ty] $($tail:tt)*) => { + (@doc ($($head:tt)*) -> impl Future $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => { extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*); }; (@ext ($($head:tt)*) -> impl Future $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {