From 80bee9a215bf574d5137f58b5d946751b0eb16c8 Mon Sep 17 00:00:00 2001 From: assemblaj Date: Mon, 14 Oct 2019 11:43:00 -0400 Subject: [PATCH] Adds Stream::partial_cmp --- src/stream/stream/mod.rs | 38 +++++++++++++ src/stream/stream/partial_cmp.rs | 92 ++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 src/stream/stream/partial_cmp.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 054d81b6..27539af3 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -37,6 +37,7 @@ mod map; mod min_by; mod next; mod nth; +mod partial_cmp; mod scan; mod skip; mod skip_while; @@ -56,6 +57,7 @@ use for_each::ForEachFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; +use partial_cmp::PartialCmpFuture; use try_for_each::TryForEeachFuture; pub use chain::Chain; @@ -1187,6 +1189,42 @@ extension_trait! { { Merge::new(self, other) } + + #[doc = r#" + Lexicographically compares the elements of this `Stream` with those + of another. + + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + use std::cmp::Ordering; + let s1 = VecDeque::from(vec![1]); + let s2 = VecDeque::from(vec![1, 2]); + let s3 = VecDeque::from(vec![1, 2, 3]); + let s4 = VecDeque::from(vec![1, 2, 4]); + assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal)); + assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less)); + assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater)); + assert_eq!(s3.clone().partial_cmp(s4.clone()).await, Some(Ordering::Less)); + assert_eq!(s4.clone().partial_cmp(s3.clone()).await, Some(Ordering::Greater)); + # + # }) } + ``` + "#] + fn partial_cmp( + self, + other: S + ) -> impl Future> [PartialCmpFuture] + where + Self: Sized + Stream, + S: Stream, + ::Item: PartialOrd, + { + PartialCmpFuture::new(self, other) + } } impl Stream for Box { diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs new file mode 100644 index 00000000..fac2705d --- /dev/null +++ b/src/stream/stream/partial_cmp.rs @@ -0,0 +1,92 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +// Lexicographically compares the elements of this `Stream` with those +// of another. +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct PartialCmpFuture { + l: Fuse, + r: Fuse, + l_cache: Option, + r_cache: Option, +} + +impl PartialCmpFuture { + pin_utils::unsafe_pinned!(l: Fuse); + pin_utils::unsafe_pinned!(r: Fuse); + pin_utils::unsafe_unpinned!(l_cache: Option); + pin_utils::unsafe_unpinned!(r_cache: Option); + + pub(super) fn new(l: L, r: R) -> Self { + PartialCmpFuture { + l: l.fuse(), + r: r.fuse(), + l_cache: None, + r_cache: None, + } + } +} + +impl Future for PartialCmpFuture +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialOrd, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + // Short circuit logic + // Stream that completes earliest can be considered Less, etc + let l_complete = self.l.done && self.as_mut().l_cache.is_none(); + let r_complete = self.r.done && self.as_mut().r_cache.is_none(); + + if l_complete && r_complete { + return Poll::Ready(Some(Ordering::Equal)); + } else if l_complete { + return Poll::Ready(Some(Ordering::Less)); + } else if r_complete { + return Poll::Ready(Some(Ordering::Greater)); + } + + // Get next value if possible and necesary + if !self.l.done && self.as_mut().l_cache.is_none() { + let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); + if let Some(item) = l_next { + *self.as_mut().l_cache() = Some(item); + } + } + + if !self.r.done && self.as_mut().r_cache.is_none() { + let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx)); + if let Some(item) = r_next { + *self.as_mut().r_cache() = Some(item); + } + } + + // Compare if both values are available. + if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { + let l_value = self.as_mut().l_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); + let result = l_value.partial_cmp(&r_value); + + if let Some(Ordering::Equal) = result { + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; + *self.as_mut().r_cache() = None; + } else { + // Return non equal value + return Poll::Ready(result); + } + } + } + } +}