diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index 8179fb31..3cd00143 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -1,6 +1,6 @@ use std::cmp::Ordering; -use std::pin::Pin; use std::future::Future; +use std::pin::Pin; use pin_project_lite::pin_project; @@ -13,7 +13,7 @@ pin_project! { pub struct MinByKeyFuture { #[pin] stream: S, - min: Option, + min: Option<(T, T)>, key_by: K, } } @@ -37,24 +37,32 @@ where type Output = Option; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn key(mut f: impl FnMut(&T) -> B) -> impl FnMut(T) -> (B, T) { + move |x| (f(&x), x) + } + let this = self.project(); let next = futures_core::ready!(this.stream.poll_next(cx)); match next { Some(new) => { - let new = (this.key_by)(&new); + let (key, value) = key(this.key_by)(new); cx.waker().wake_by_ref(); - match this.min.take() { - None => *this.min = Some(new), - Some(old) => match new.cmp(&old) { - Ordering::Less => *this.min = Some(new), + match this.min.take() { + None => *this.min = Some((key, value)), + + Some(old) => match key.cmp(&old.0) { + Ordering::Less => *this.min = Some((key, value)), _ => *this.min = Some(old), }, } Poll::Pending } - None => Poll::Ready(this.min.take()), + None => Poll::Ready(match this.min.take() { + None => None, + Some(max) => Some(max.1), + }), } } }