From 020eb85093d714b25015980ed0aeeb148ae25ec3 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 23 Oct 2019 18:59:06 +0800 Subject: [PATCH 1/5] add stream::min_by_key method --- src/stream/stream/min_by_key.rs | 58 +++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 38 +++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 src/stream/stream/min_by_key.rs diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs new file mode 100644 index 0000000..2cc34a0 --- /dev/null +++ b/src/stream/stream/min_by_key.rs @@ -0,0 +1,58 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct MinByKeyFuture { + stream: S, + min: Option, + key_by: K, +} + +impl MinByKeyFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(min: Option); + pin_utils::unsafe_unpinned!(key_by: K); + + pub(super) fn new(stream: S, key_by: K) -> Self { + MinByKeyFuture { + stream, + min: None, + key_by, + } + } +} + +impl Future for MinByKeyFuture +where + S: Stream + Unpin + Sized, + K: FnMut(&S::Item) -> S::Item, + S::Item: Ord + Copy, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(new) => { + let new = self.as_mut().key_by()(&new); + cx.waker().wake_by_ref(); + match self.as_mut().min().take() { + None => *self.as_mut().min() = Some(new), + + Some(old) => match new.cmp(&old) { + Ordering::Less => *self.as_mut().min() = Some(new), + _ => *self.as_mut().min() = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(self.min), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1..56dda48 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -53,6 +53,7 @@ mod take_while; mod try_fold; mod try_for_each; mod zip; +mod min_by_key; use all::AllFuture; use any::AnyFuture; @@ -74,6 +75,7 @@ use nth::NthFuture; use partial_cmp::PartialCmpFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; +use min_by_key::MinByKeyFuture; pub use chain::Chain; pub use filter::Filter; @@ -600,6 +602,42 @@ extension_trait! { FilterMap::new(self, f) } + #[doc = r#" + Returns the element that gives the minimum value with respect to the + specified key function. If several elements are equally minimum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, -3].into_iter().collect(); + + let min = s.clone().min_by_key(|x| x.abs()).await; + assert_eq!(min, Some(1)); + + let min = VecDeque::::new().min_by_key(|x| x.abs()).await; + assert_eq!(min, None); + # + # }) } + ``` + "#] + fn min_by_key( + self, + key_by: K, + ) -> impl Future> [MinByKeyFuture] + where + Self: Sized, + K: FnMut(&Self::Item) -> Self::Item, + { + MinByKeyFuture::new(self, key_by) + } + #[doc = r#" Returns the element that gives the minimum value with respect to the specified comparison function. If several elements are equally minimum, From f5a0a0ba86e93b0d1dd7f4b091fef814b5e30849 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 23 Oct 2019 19:17:24 +0800 Subject: [PATCH 2/5] fmt --- src/stream/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 56dda48..b5011c2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -41,6 +41,7 @@ mod le; mod lt; mod map; mod min_by; +mod min_by_key; mod next; mod nth; mod partial_cmp; @@ -53,7 +54,6 @@ mod take_while; mod try_fold; mod try_for_each; mod zip; -mod min_by_key; use all::AllFuture; use any::AnyFuture; @@ -70,12 +70,12 @@ use last::LastFuture; use le::LeFuture; use lt::LtFuture; use min_by::MinByFuture; +use min_by_key::MinByKeyFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; -use min_by_key::MinByKeyFuture; pub use chain::Chain; pub use filter::Filter; From 5a4fdeb1cd6ce25192298e01fa5a5792279570cd Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:18:18 +0800 Subject: [PATCH 3/5] Update src/stream/stream/min_by_key.rs Co-Authored-By: Taiki Endo --- src/stream/stream/min_by_key.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index cac6073..d6dda2f 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -30,7 +30,7 @@ impl MinByKeyFuture { impl Future for MinByKeyFuture where - S: Stream + Unpin + Sized, + S: Stream, K: FnMut(&S::Item) -> S::Item, S::Item: Ord + Copy, { From fb78ed18124780164a9cc4b1986103649dec6979 Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:19:49 +0800 Subject: [PATCH 4/5] Update src/stream/stream/min_by_key.rs Co-Authored-By: Taiki Endo --- src/stream/stream/min_by_key.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index d6dda2f..a482c63 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -32,7 +32,7 @@ impl Future for MinByKeyFuture where S: Stream, K: FnMut(&S::Item) -> S::Item, - S::Item: Ord + Copy, + S::Item: Ord, { type Output = Option; From 7cfec4e8cec531792330e4caeb72a9145ea7a941 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Sun, 27 Oct 2019 00:26:19 +0800 Subject: [PATCH 5/5] use take and remove Copy --- src/stream/stream/min_by_key.rs | 2 +- src/stream/stream/mod.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index a482c63..6557f22 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -54,7 +54,7 @@ where } Poll::Pending } - None => Poll::Ready(*this.min), + None => Poll::Ready(this.min.take()), } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b5011c2..ca43e7d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -633,6 +633,7 @@ extension_trait! { ) -> impl Future> [MinByKeyFuture] where Self: Sized, + Self::Item: Ord, K: FnMut(&Self::Item) -> Self::Item, { MinByKeyFuture::new(self, key_by)