From b14282457c9128fa86c2b98392d2cee3c48b1cdc Mon Sep 17 00:00:00 2001 From: "Abhishek C. Sharma" Date: Fri, 8 Nov 2019 11:19:52 +0530 Subject: [PATCH] Add Future::join and Future::try_join --- src/future/future/join.rs | 62 ++++++++++++++++++++++++ src/future/future/mod.rs | 88 +++++++++++++++++++++++++++++++++++ src/future/future/try_join.rs | 72 ++++++++++++++++++++++++++++ 3 files changed, 222 insertions(+) create mode 100644 src/future/future/join.rs create mode 100644 src/future/future/try_join.rs diff --git a/src/future/future/join.rs b/src/future/future/join.rs new file mode 100644 index 00000000..90ea3237 --- /dev/null +++ b/src/future/future/join.rs @@ -0,0 +1,62 @@ +use std::pin::Pin; + +use async_macros::MaybeDone; +use pin_project_lite::pin_project; + +use crate::task::{Context, Poll}; +use std::future::Future; + +pin_project! { + #[allow(missing_docs)] + #[allow(missing_debug_implementations)] + pub struct Join + where + L: Future, + R: Future + { + #[pin] left: MaybeDone, + #[pin] right: MaybeDone, + } +} + +impl Join +where + L: Future, + R: Future, +{ + pub(crate) fn new(left: L, right: R) -> Self { + Self { + left: MaybeDone::new(left), + right: MaybeDone::new(right), + } + } +} + +impl Future for Join +where + L: Future, + R: Future, +{ + type Output = (L::Output, R::Output); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let mut left = this.left; + let mut right = this.right; + + if Future::poll(Pin::new(&mut left), cx).is_ready() { + if right.as_ref().output().is_some() { + return Poll::Ready((left.take().unwrap(), right.take().unwrap())); + } + } + + if Future::poll(Pin::new(&mut right), cx).is_ready() { + if left.as_ref().output().is_some() { + return Poll::Ready((left.take().unwrap(), right.take().unwrap())); + } + } + + Poll::Pending + } +} diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index d712dc80..729ace7c 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs @@ -3,6 +3,8 @@ cfg_unstable! { mod flatten; mod race; mod try_race; + mod join; + mod try_join; use std::time::Duration; @@ -11,6 +13,8 @@ cfg_unstable! { use crate::future::IntoFuture; use race::Race; use try_race::TryRace; + use join::Join; + use try_join::TryJoin; } extension_trait! { @@ -264,6 +268,90 @@ extension_trait! { { TryRace::new(self, other) } + + #[doc = r#" + Waits for two similarly-typed futures to complete. + + Awaits multiple futures simultaneously, returning the output of the + futures once both complete. + + This function returns a new future which polls both futures + concurrently. + + # Examples + + ``` + # async_std::task::block_on(async { + use async_std::prelude::*; + use async_std::future; + + let a = future::ready(1u8); + let b = future::ready(2u8); + + let f = a.join(b); + assert_eq!(f.await, (1u8, 2u8)); + # }); + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn join( + self, + other: F + ) -> impl Future::Output, ::Output)> [Join] + where + Self: std::future::Future + Sized, + F: std::future::Future::Output>, + { + Join::new(self, other) + } + + #[doc = r#" + Waits for two similarly-typed fallible futures to complete. + + Awaits multiple futures simultaneously, returning all results once + complete. + + `try_join` is similar to [`join`], but returns an error immediately + if a future resolves to an error. + + [`join`]: #method.join + + # Examples + + ``` + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::future; + + let a = future::ready(Err("Error")); + let b = future::ready(Ok(1u8)); + + let f = a.try_join(b); + assert_eq!(f.await, Err("Error")); + + let a = future::ready(Ok::(1u8)); + let b = future::ready(Ok::(2u8)); + + let f = a.try_join(b); + assert_eq!(f.await, Ok((1u8, 2u8))); + # + # Ok(()) }) } + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn try_join( + self, + other: F + ) -> impl Future> [TryJoin] + where + Self: std::future::Future> + Sized, + F: std::future::Future::Output>, + { + TryJoin::new(self, other) + } } impl Future for Box { diff --git a/src/future/future/try_join.rs b/src/future/future/try_join.rs new file mode 100644 index 00000000..58ae6d62 --- /dev/null +++ b/src/future/future/try_join.rs @@ -0,0 +1,72 @@ +use std::pin::Pin; + +use async_macros::MaybeDone; +use pin_project_lite::pin_project; + +use crate::task::{Context, Poll}; +use std::future::Future; + +pin_project! { + #[allow(missing_docs)] + #[allow(missing_debug_implementations)] + pub struct TryJoin + where + L: Future, + R: Future + { + #[pin] left: MaybeDone, + #[pin] right: MaybeDone, + } +} + +impl TryJoin +where + L: Future, + R: Future, +{ + pub(crate) fn new(left: L, right: R) -> Self { + Self { + left: MaybeDone::new(left), + right: MaybeDone::new(right), + } + } +} + +impl Future for TryJoin +where + L: Future>, + R: Future, +{ + type Output = Result<(T, T), E>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let mut left = this.left; + let mut right = this.right; + + if Future::poll(Pin::new(&mut left), cx).is_ready() { + if left.as_ref().output().unwrap().is_err() { + return Poll::Ready(Err(left.take().unwrap().err().unwrap())); + } else if right.as_ref().output().is_some() { + return Poll::Ready(Ok(( + left.take().unwrap().ok().unwrap(), + right.take().unwrap().ok().unwrap(), + ))); + } + } + + if Future::poll(Pin::new(&mut right), cx).is_ready() { + if right.as_ref().output().unwrap().is_err() { + return Poll::Ready(Err(right.take().unwrap().err().unwrap())); + } else if left.as_ref().output().is_some() { + return Poll::Ready(Ok(( + left.take().unwrap().ok().unwrap(), + right.take().unwrap().ok().unwrap(), + ))); + } + } + + Poll::Pending + } +}