From 4b96ea127366bca42f6143e7cdf681884e9d6857 Mon Sep 17 00:00:00 2001 From: assemblaj <7599535+assemblaj@users.noreply.github.com> Date: Tue, 15 Oct 2019 20:23:41 -0400 Subject: [PATCH] Adds Stream::cmp (#273) * Adds cmp * Fixes formatting * cleans up examples * attempts to fix rustdoc issue * formats with cargo fmt * Adds proper trait bounds for cmp --- src/stream/stream/cmp.rs | 91 ++++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 39 +++++++++++++++++ 2 files changed, 130 insertions(+) create mode 100644 src/stream/stream/cmp.rs diff --git a/src/stream/stream/cmp.rs b/src/stream/stream/cmp.rs new file mode 100644 index 0000000..fc7161a --- /dev/null +++ b/src/stream/stream/cmp.rs @@ -0,0 +1,91 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +// Lexicographically compares the elements of this `Stream` with those +// of another using `Ord`. +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct CmpFuture { + l: Fuse, + r: Fuse, + l_cache: Option, + r_cache: Option, +} + +impl CmpFuture { + pin_utils::unsafe_pinned!(l: Fuse); + pin_utils::unsafe_pinned!(r: Fuse); + pin_utils::unsafe_unpinned!(l_cache: Option); + pin_utils::unsafe_unpinned!(r_cache: Option); + + pub(super) fn new(l: L, r: R) -> Self { + CmpFuture { + l: l.fuse(), + r: r.fuse(), + l_cache: None, + r_cache: None, + } + } +} + +impl Future for CmpFuture +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: Ord, +{ + type Output = Ordering; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + // Stream that completes earliest can be considered Less, etc + let l_complete = self.l.done && self.as_mut().l_cache.is_none(); + let r_complete = self.r.done && self.as_mut().r_cache.is_none(); + + if l_complete && r_complete { + return Poll::Ready(Ordering::Equal); + } else if l_complete { + return Poll::Ready(Ordering::Less); + } else if r_complete { + return Poll::Ready(Ordering::Greater); + } + + // Get next value if possible and necesary + if !self.l.done && self.as_mut().l_cache.is_none() { + let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); + if let Some(item) = l_next { + *self.as_mut().l_cache() = Some(item); + } + } + + if !self.r.done && self.as_mut().r_cache.is_none() { + let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx)); + if let Some(item) = r_next { + *self.as_mut().r_cache() = Some(item); + } + } + + // Compare if both values are available. + if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { + let l_value = self.as_mut().l_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); + let result = l_value.cmp(&r_value); + + if let Ordering::Equal = result { + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; + *self.as_mut().r_cache() = None; + } else { + // Return non equal value + return Poll::Ready(result); + } + } + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index fbbd8f6..f2b9830 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -24,6 +24,7 @@ mod all; mod any; mod chain; +mod cmp; mod enumerate; mod filter; mod filter_map; @@ -53,6 +54,7 @@ mod zip; use all::AllFuture; use any::AnyFuture; +use cmp::CmpFuture; use enumerate::Enumerate; use filter_map::FilterMap; use find::FindFuture; @@ -1270,6 +1272,43 @@ extension_trait! { PartialCmpFuture::new(self, other) } + #[doc = r#" + Lexicographically compares the elements of this `Stream` with those + of another using 'Ord'. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + use std::cmp::Ordering; + let s1 = VecDeque::from(vec![1]); + let s2 = VecDeque::from(vec![1, 2]); + let s3 = VecDeque::from(vec![1, 2, 3]); + let s4 = VecDeque::from(vec![1, 2, 4]); + assert_eq!(s1.clone().cmp(s1.clone()).await, Ordering::Equal); + assert_eq!(s1.clone().cmp(s2.clone()).await, Ordering::Less); + assert_eq!(s2.clone().cmp(s1.clone()).await, Ordering::Greater); + assert_eq!(s3.clone().cmp(s4.clone()).await, Ordering::Less); + assert_eq!(s4.clone().cmp(s3.clone()).await, Ordering::Greater); + # + # }) } + ``` + "#] + fn cmp( + self, + other: S + ) -> impl Future [CmpFuture] + where + Self: Sized + Stream, + S: Stream, + ::Item: Ord + { + CmpFuture::new(self, other) + } #[doc = r#" Determines if the elements of this `Stream` are lexicographically