From 020eb85093d714b25015980ed0aeeb148ae25ec3 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 23 Oct 2019 18:59:06 +0800 Subject: [PATCH] add stream::min_by_key method --- src/stream/stream/min_by_key.rs | 58 +++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 38 +++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 src/stream/stream/min_by_key.rs diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs new file mode 100644 index 0000000..2cc34a0 --- /dev/null +++ b/src/stream/stream/min_by_key.rs @@ -0,0 +1,58 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct MinByKeyFuture { + stream: S, + min: Option, + key_by: K, +} + +impl MinByKeyFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(min: Option); + pin_utils::unsafe_unpinned!(key_by: K); + + pub(super) fn new(stream: S, key_by: K) -> Self { + MinByKeyFuture { + stream, + min: None, + key_by, + } + } +} + +impl Future for MinByKeyFuture +where + S: Stream + Unpin + Sized, + K: FnMut(&S::Item) -> S::Item, + S::Item: Ord + Copy, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(new) => { + let new = self.as_mut().key_by()(&new); + cx.waker().wake_by_ref(); + match self.as_mut().min().take() { + None => *self.as_mut().min() = Some(new), + + Some(old) => match new.cmp(&old) { + Ordering::Less => *self.as_mut().min() = Some(new), + _ => *self.as_mut().min() = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(self.min), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1..56dda48 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -53,6 +53,7 @@ mod take_while; mod try_fold; mod try_for_each; mod zip; +mod min_by_key; use all::AllFuture; use any::AnyFuture; @@ -74,6 +75,7 @@ use nth::NthFuture; use partial_cmp::PartialCmpFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; +use min_by_key::MinByKeyFuture; pub use chain::Chain; pub use filter::Filter; @@ -600,6 +602,42 @@ extension_trait! { FilterMap::new(self, f) } + #[doc = r#" + Returns the element that gives the minimum value with respect to the + specified key function. If several elements are equally minimum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, -3].into_iter().collect(); + + let min = s.clone().min_by_key(|x| x.abs()).await; + assert_eq!(min, Some(1)); + + let min = VecDeque::::new().min_by_key(|x| x.abs()).await; + assert_eq!(min, None); + # + # }) } + ``` + "#] + fn min_by_key( + self, + key_by: K, + ) -> impl Future> [MinByKeyFuture] + where + Self: Sized, + K: FnMut(&Self::Item) -> Self::Item, + { + MinByKeyFuture::new(self, key_by) + } + #[doc = r#" Returns the element that gives the minimum value with respect to the specified comparison function. If several elements are equally minimum,