From 4e5828e64669516a28acb63f1fc11aaa2cbbefc5 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 23 Oct 2019 16:46:11 +0800 Subject: [PATCH 1/6] add stream::max_by method --- src/stream/stream/max_by.rs | 56 +++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 41 +++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/stream/stream/max_by.rs diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs new file mode 100644 index 0000000..d25a869 --- /dev/null +++ b/src/stream/stream/max_by.rs @@ -0,0 +1,56 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct MaxByFuture { + stream: S, + compare: F, + max: Option, +} + +impl MaxByFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(compare: F); + pin_utils::unsafe_unpinned!(max: Option); + + pub(super) fn new(stream: S, compare: F) -> Self { + MaxByFuture { + stream, + compare, + max: None, + } + } +} + +impl Future for MaxByFuture +where + S: Stream + Unpin + Sized, + S::Item: Copy, + F: FnMut(&S::Item, &S::Item) -> Ordering, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(new) => { + cx.waker().wake_by_ref(); + match self.as_mut().max().take() { + None => *self.as_mut().max() = Some(new), + Some(old) => match (&mut self.as_mut().compare())(&new, &old) { + Ordering::Greater => *self.as_mut().max() = Some(new), + _ => *self.as_mut().max() = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(self.max), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1..2ac4d70 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -40,6 +40,7 @@ mod last; mod le; mod lt; mod map; +mod max_by; mod min_by; mod next; mod nth; @@ -68,6 +69,7 @@ use gt::GtFuture; use last::LastFuture; use le::LeFuture; use lt::LtFuture; +use max_by::MaxByFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; @@ -639,6 +641,45 @@ extension_trait! { MinByFuture::new(self, compare) } + #[doc = r#" + Returns the element that gives the minimum value with respect to the + specified comparison function. If several elements are equally minimum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + + let max = s.clone().max_by(|x, y| x.cmp(y)).await; + assert_eq!(max, Some(3)); + + let max = s.max_by(|x, y| y.cmp(x)).await; + assert_eq!(max, Some(1)); + + let max = VecDeque::::new().max_by(|x, y| x.cmp(y)).await; + assert_eq!(max, None); + # + # }) } + ``` + "#] + fn max_by( + self, + compare: F, + ) -> impl Future> [MaxByFuture] + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + MaxByFuture::new(self, compare) + } + #[doc = r#" Returns the nth element of the stream. From d6f940110b2411ea06f2ed8b27f7d1b4e57bafe7 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 23 Oct 2019 19:04:04 +0800 Subject: [PATCH 2/6] update doc --- src/stream/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2ac4d70..cccd8b2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -642,8 +642,8 @@ extension_trait! { } #[doc = r#" - Returns the element that gives the minimum value with respect to the - specified comparison function. If several elements are equally minimum, + Returns the element that gives the maximum value with respect to the + specified comparison function. If several elements are equally maximum, the first element is returned. If the stream is empty, `None` is returned. # Examples From 37a7eadf17c55d0cb524080d34c2f30d848ecd7e Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Sat, 26 Oct 2019 11:52:41 +0800 Subject: [PATCH 3/6] use pin_project_lite --- src/stream/stream/max_by.rs | 38 +++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index d25a869..d3d640b 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -1,23 +1,24 @@ use std::cmp::Ordering; use std::pin::Pin; +use pin_project_lite::pin_project; + use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct MaxByFuture { - stream: S, - compare: F, - max: Option, +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct MaxByFuture { + #[pin] + stream: S, + compare: F, + max: Option, + } } impl MaxByFuture { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_unpinned!(compare: F); - pin_utils::unsafe_unpinned!(max: Option); - pub(super) fn new(stream: S, compare: F) -> Self { MaxByFuture { stream, @@ -35,22 +36,23 @@ where { type Output = Option; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); match next { Some(new) => { cx.waker().wake_by_ref(); - match self.as_mut().max().take() { - None => *self.as_mut().max() = Some(new), - Some(old) => match (&mut self.as_mut().compare())(&new, &old) { - Ordering::Greater => *self.as_mut().max() = Some(new), - _ => *self.as_mut().max() = Some(old), + match this.max.take() { + None => *this.max = Some(new), + Some(old) => match (this.compare)(&new, &old) { + Ordering::Greater => *this.max = Some(new), + _ => *this.max = Some(old), }, } Poll::Pending } - None => Poll::Ready(self.max), + None => Poll::Ready(*this.max), } } } From 006fc7e9de2fca94dfa807ef477667a9695aa05d Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:17:42 +0800 Subject: [PATCH 4/6] Update src/stream/stream/max_by.rs Co-Authored-By: Taiki Endo --- src/stream/stream/max_by.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index d3d640b..a41f49a 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -30,7 +30,7 @@ impl MaxByFuture { impl Future for MaxByFuture where - S: Stream + Unpin + Sized, + S: Stream, S::Item: Copy, F: FnMut(&S::Item, &S::Item) -> Ordering, { From a8d3d1483f29060b15ee4df80852a6a67603569f Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:17:50 +0800 Subject: [PATCH 5/6] Update src/stream/stream/max_by.rs Co-Authored-By: Taiki Endo --- src/stream/stream/max_by.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index a41f49a..6cd9e56 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -31,7 +31,6 @@ impl MaxByFuture { impl Future for MaxByFuture where S: Stream, - S::Item: Copy, F: FnMut(&S::Item, &S::Item) -> Ordering, { type Output = Option; From b57849e1cb6a2d473c0a1a62ff807ef237b99ff0 Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:18:01 +0800 Subject: [PATCH 6/6] Update src/stream/stream/max_by.rs Co-Authored-By: Taiki Endo --- src/stream/stream/max_by.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index 6cd9e56..a626b28 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -51,7 +51,7 @@ where } Poll::Pending } - None => Poll::Ready(*this.max), + None => Poll::Ready(this.max.take()), } } }