Merge pull request #524 from yjhmelody/stream-max

Add Stream max
This commit is contained in:
Yoshua Wuyts 2019-11-14 21:28:36 +01:00 committed by GitHub
commit 4e1d79adb1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 95 additions and 0 deletions

60
src/stream/stream/max.rs Normal file
View file

@ -0,0 +1,60 @@
use std::cmp::{Ord, Ordering};
use std::marker::PhantomData;
use std::pin::Pin;
use std::future::Future;
use pin_project_lite::pin_project;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MaxFuture<S, F, T> {
#[pin]
stream: S,
_compare: PhantomData<F>,
max: Option<T>,
}
}
impl<S, F, T> MaxFuture<S, F, T> {
pub(super) fn new(stream: S) -> Self {
Self {
stream,
_compare: PhantomData,
max: None,
}
}
}
impl<S, F> Future for MaxFuture<S, F, S::Item>
where
S: Stream,
S::Item: Ord,
F: FnMut(&S::Item, &S::Item) -> Ordering,
{
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
cx.waker().wake_by_ref();
match this.max.take() {
None => *this.max = Some(new),
Some(old) => match new.cmp(&old) {
Ordering::Greater => *this.max = Some(new),
_ => *this.max = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(this.max.take()),
}
}
}

View file

@ -44,6 +44,7 @@ mod last;
mod le; mod le;
mod lt; mod lt;
mod map; mod map;
mod max;
mod max_by; mod max_by;
mod max_by_key; mod max_by_key;
mod min; mod min;
@ -80,6 +81,7 @@ use gt::GtFuture;
use last::LastFuture; use last::LastFuture;
use le::LeFuture; use le::LeFuture;
use lt::LtFuture; use lt::LtFuture;
use max::MaxFuture;
use max_by::MaxByFuture; use max_by::MaxByFuture;
use max_by_key::MaxByKeyFuture; use max_by_key::MaxByKeyFuture;
use min::MinFuture; use min::MinFuture;
@ -964,6 +966,39 @@ extension_trait! {
MinByFuture::new(self, compare) MinByFuture::new(self, compare)
} }
#[doc = r#"
Returns the element that gives the maximum value. If several elements are equally maximum,
the first element is returned. If the stream is empty, `None` is returned.
# Examples
```ignore
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let s = stream::from_iter(vec![1usize, 2, 3]);
let max = s.clone().max().await;
assert_eq!(max, Some(3));
let max = stream::empty::<usize>().max().await;
assert_eq!(max, None);
#
# }) }
```
"#]
fn max<F>(
self,
) -> impl Future<Output = Option<Self::Item>> [MaxFuture<Self, F, Self::Item>]
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
{
MaxFuture::new(self)
}
#[doc = r#" #[doc = r#"
Returns the element that gives the minimum value. If several elements are equally minimum, Returns the element that gives the minimum value. If several elements are equally minimum,
the first element is returned. If the stream is empty, `None` is returned. the first element is returned. If the stream is empty, `None` is returned.