2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-01-19 12:03:50 +00:00

Add stream eq

This commit is contained in:
yjhmelody 2019-10-31 14:44:19 +08:00
parent cc949f48ea
commit f5efaaa7ba
2 changed files with 99 additions and 0 deletions

61
src/stream/stream/eq.rs Normal file
View file

@ -0,0 +1,61 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::fuse::Fuse;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
// Lexicographically compares the elements of this `Stream` with those
// of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct EqFuture<L: Stream, R: Stream> {
#[pin]
l: Fuse<L>,
#[pin]
r: Fuse<R>,
}
}
impl<L: Stream, R: Stream> EqFuture<L, R>
where
L::Item: PartialEq<R::Item>,
{
pub(super) fn new(l: L, r: R) -> Self {
EqFuture {
l: l.fuse(),
r: r.fuse(),
}
}
}
impl<L: Stream, R: Stream> Future for EqFuture<L, R>
where
L: Stream + Sized,
R: Stream + Sized,
L::Item: PartialEq<R::Item>,
{
type Output = bool;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx));
let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx));
if this.l.done && this.r.done {
return Poll::Ready(true);
}
match (l_val, r_val) {
(Some(l), Some(r)) if l != r => {return Poll::Ready(false);},
_ => {},
}
}
}
}

View file

@ -26,6 +26,7 @@ mod any;
mod chain; mod chain;
mod cmp; mod cmp;
mod enumerate; mod enumerate;
mod eq;
mod filter; mod filter;
mod filter_map; mod filter_map;
mod find; mod find;
@ -60,6 +61,7 @@ use all::AllFuture;
use any::AnyFuture; use any::AnyFuture;
use cmp::CmpFuture; use cmp::CmpFuture;
use enumerate::Enumerate; use enumerate::Enumerate;
use eq::EqFuture;
use filter_map::FilterMap; use filter_map::FilterMap;
use find::FindFuture; use find::FindFuture;
use find_map::FindMapFuture; use find_map::FindMapFuture;
@ -1622,6 +1624,42 @@ extension_trait! {
GeFuture::new(self, other) GeFuture::new(self, other)
} }
#[doc = r#"
Determines if the elements of this `Stream` are lexicographically
equal to those of another.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
let single: VecDeque<isize> = vec![1].into_iter().collect();
let single_eq: VecDeque<isize> = vec![10].into_iter().collect();
let multi: VecDeque<isize> = vec![1,2].into_iter().collect();
let multi_eq: VecDeque<isize> = vec![1,5].into_iter().collect();
assert_eq!(single.clone().eq(single.clone()).await, true);
assert_eq!(single_eq.clone().eq(single.clone()).await, false);
assert_eq!(multi.clone().eq(single_eq.clone()).await, false);
assert_eq!(multi_eq.clone().eq(multi.clone()).await, false);
#
# }) }
```
"#]
fn eq<S>(
self,
other: S
) -> impl Future<Output = bool> [EqFuture<Self, S>]
where
Self: Sized + Stream,
S: Sized + Stream,
<Self as Stream>::Item: PartialEq<S::Item>,
{
EqFuture::new(self, other)
}
#[doc = r#" #[doc = r#"
Determines if the elements of this `Stream` are lexicographically Determines if the elements of this `Stream` are lexicographically
greater than those of another. greater than those of another.