From 879af6dc857d37f51dc48c638b06ee1a922dade9 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 13 Nov 2019 10:50:09 +0800 Subject: [PATCH] Add Stream max --- src/stream/stream/max.rs | 60 ++++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 35 +++++++++++++++++++++++ 2 files changed, 95 insertions(+) create mode 100644 src/stream/stream/max.rs diff --git a/src/stream/stream/max.rs b/src/stream/stream/max.rs new file mode 100644 index 00000000..d8ff119d --- /dev/null +++ b/src/stream/stream/max.rs @@ -0,0 +1,60 @@ +use std::cmp::{Ord, Ordering}; +use std::marker::PhantomData; +use std::pin::Pin; +use std::future::Future; + +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct MaxFuture { + #[pin] + stream: S, + _compare: PhantomData, + max: Option, + } +} + +impl MaxFuture { + pub(super) fn new(stream: S) -> Self { + Self { + stream, + _compare: PhantomData, + max: None, + } + } +} + +impl Future for MaxFuture +where + S: Stream, + S::Item: Ord, + F: FnMut(&S::Item, &S::Item) -> Ordering, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(new) => { + cx.waker().wake_by_ref(); + match this.max.take() { + None => *this.max = Some(new), + + Some(old) => match new.cmp(&old) { + Ordering::Greater => *this.max = Some(new), + _ => *this.max = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(this.max.take()), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 5756a21e..a3cf0552 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -44,6 +44,7 @@ mod last; mod le; mod lt; mod map; +mod max; mod max_by; mod max_by_key; mod min; @@ -80,6 +81,7 @@ use gt::GtFuture; use last::LastFuture; use le::LeFuture; use lt::LtFuture; +use max::MaxFuture; use max_by::MaxByFuture; use max_by_key::MaxByKeyFuture; use min::MinFuture; @@ -913,6 +915,39 @@ extension_trait! { } #[doc = r#" + Returns the element that gives the maximum value. If several elements are equally maximum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ```ignore + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let s = stream::from_iter(vec![1usize, 2, 3]); + + let max = s.clone().max().await; + assert_eq!(max, Some(3)); + + let max = stream::empty::().max().await; + assert_eq!(max, None); + # + # }) } + ``` + "#] + fn max( + self, + ) -> impl Future> [MaxFuture] + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + MaxFuture::new(self) + } + + #[doc = r#" Returns the element that gives the minimum value. If several elements are equally minimum, the first element is returned. If the stream is empty, `None` is returned.