forked from mirror/async-std
		
	Merge pull request #327 from assemblaj/assemblaj-partial_cmp_final
Adds Stream::partial_cmp
This commit is contained in:
		
						commit
						529a58a066
					
				
					 2 changed files with 130 additions and 0 deletions
				
			
		|  | @ -37,6 +37,7 @@ mod map; | |||
| mod min_by; | ||||
| mod next; | ||||
| mod nth; | ||||
| mod partial_cmp; | ||||
| mod scan; | ||||
| mod skip; | ||||
| mod skip_while; | ||||
|  | @ -56,6 +57,7 @@ use for_each::ForEachFuture; | |||
| use min_by::MinByFuture; | ||||
| use next::NextFuture; | ||||
| use nth::NthFuture; | ||||
| use partial_cmp::PartialCmpFuture; | ||||
| use try_for_each::TryForEeachFuture; | ||||
| 
 | ||||
| pub use chain::Chain; | ||||
|  | @ -1187,6 +1189,42 @@ extension_trait! { | |||
|         { | ||||
|             Merge::new(self, other) | ||||
|         } | ||||
| 
 | ||||
|         #[doc = r#" | ||||
|             Lexicographically compares the elements of this `Stream` with those | ||||
|             of another. | ||||
|             
 | ||||
|             # Examples | ||||
|             ``` | ||||
|             # fn main() { async_std::task::block_on(async { | ||||
|             # | ||||
|             use async_std::prelude::*; | ||||
|             use std::collections::VecDeque; | ||||
|             use std::cmp::Ordering; | ||||
|             let s1 = VecDeque::from(vec![1]); | ||||
|             let s2 = VecDeque::from(vec![1, 2]); | ||||
|             let s3 = VecDeque::from(vec![1, 2, 3]); | ||||
|             let s4 = VecDeque::from(vec![1, 2, 4]); | ||||
|             assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal)); | ||||
|             assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less)); | ||||
|             assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater));       
 | ||||
|             assert_eq!(s3.clone().partial_cmp(s4.clone()).await, Some(Ordering::Less)); | ||||
|             assert_eq!(s4.clone().partial_cmp(s3.clone()).await, Some(Ordering::Greater));                             
 | ||||
|             # | ||||
|             # }) } | ||||
|             ``` | ||||
|         "#]
 | ||||
|         fn partial_cmp<S>( | ||||
|            self, | ||||
|            other: S | ||||
|         ) -> impl Future<Output = Option<Ordering>>  [PartialCmpFuture<Self, S>] | ||||
|         where | ||||
|             Self: Sized + Stream, | ||||
|             S: Stream, | ||||
|             <Self as Stream>::Item: PartialOrd<S::Item>, | ||||
|         { | ||||
|             PartialCmpFuture::new(self, other) | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     impl<S: Stream + Unpin + ?Sized> Stream for Box<S> { | ||||
|  |  | |||
							
								
								
									
										92
									
								
								src/stream/stream/partial_cmp.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										92
									
								
								src/stream/stream/partial_cmp.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,92 @@ | |||
| use std::cmp::Ordering; | ||||
| use std::pin::Pin; | ||||
| 
 | ||||
| use super::fuse::Fuse; | ||||
| use crate::future::Future; | ||||
| use crate::prelude::*; | ||||
| use crate::stream::Stream; | ||||
| use crate::task::{Context, Poll}; | ||||
| 
 | ||||
| // Lexicographically compares the elements of this `Stream` with those
 | ||||
| // of another.
 | ||||
| #[doc(hidden)] | ||||
| #[allow(missing_debug_implementations)] | ||||
| pub struct PartialCmpFuture<L: Stream, R: Stream> { | ||||
|     l: Fuse<L>, | ||||
|     r: Fuse<R>, | ||||
|     l_cache: Option<L::Item>, | ||||
|     r_cache: Option<R::Item>, | ||||
| } | ||||
| 
 | ||||
| impl<L: Stream, R: Stream> PartialCmpFuture<L, R> { | ||||
|     pin_utils::unsafe_pinned!(l: Fuse<L>); | ||||
|     pin_utils::unsafe_pinned!(r: Fuse<R>); | ||||
|     pin_utils::unsafe_unpinned!(l_cache: Option<L::Item>); | ||||
|     pin_utils::unsafe_unpinned!(r_cache: Option<R::Item>); | ||||
| 
 | ||||
|     pub(super) fn new(l: L, r: R) -> Self { | ||||
|         PartialCmpFuture { | ||||
|             l: l.fuse(), | ||||
|             r: r.fuse(), | ||||
|             l_cache: None, | ||||
|             r_cache: None, | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R> | ||||
| where | ||||
|     L: Stream + Sized, | ||||
|     R: Stream + Sized, | ||||
|     L::Item: PartialOrd<R::Item>, | ||||
| { | ||||
|     type Output = Option<Ordering>; | ||||
| 
 | ||||
|     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||
|         loop { | ||||
|             // Short circuit logic
 | ||||
|             // Stream that completes earliest can be considered Less, etc
 | ||||
|             let l_complete = self.l.done && self.as_mut().l_cache.is_none(); | ||||
|             let r_complete = self.r.done && self.as_mut().r_cache.is_none(); | ||||
| 
 | ||||
|             if l_complete && r_complete { | ||||
|                 return Poll::Ready(Some(Ordering::Equal)); | ||||
|             } else if l_complete { | ||||
|                 return Poll::Ready(Some(Ordering::Less)); | ||||
|             } else if r_complete { | ||||
|                 return Poll::Ready(Some(Ordering::Greater)); | ||||
|             } | ||||
| 
 | ||||
|             // Get next value if possible and necesary
 | ||||
|             if !self.l.done && self.as_mut().l_cache.is_none() { | ||||
|                 let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); | ||||
|                 if let Some(item) = l_next { | ||||
|                     *self.as_mut().l_cache() = Some(item); | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             if !self.r.done && self.as_mut().r_cache.is_none() { | ||||
|                 let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx)); | ||||
|                 if let Some(item) = r_next { | ||||
|                     *self.as_mut().r_cache() = Some(item); | ||||
|                 } | ||||
|             } | ||||
| 
 | ||||
|             // Compare if both values are available.
 | ||||
|             if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { | ||||
|                 let l_value = self.as_mut().l_cache().take().unwrap(); | ||||
|                 let r_value = self.as_mut().r_cache().take().unwrap(); | ||||
|                 let result = l_value.partial_cmp(&r_value); | ||||
| 
 | ||||
|                 if let Some(Ordering::Equal) = result { | ||||
|                     // Reset cache to prepare for next comparison
 | ||||
|                     *self.as_mut().l_cache() = None; | ||||
|                     *self.as_mut().r_cache() = None; | ||||
|                 } else { | ||||
|                     // Return non equal value
 | ||||
|                     return Poll::Ready(result); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
		Loading…
	
		Reference in a new issue