diff --git a/src/stream/min_by.rs b/src/stream/min_by.rs new file mode 100644 index 0000000..b21de77 --- /dev/null +++ b/src/stream/min_by.rs @@ -0,0 +1,54 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use super::stream::Stream; +use crate::future::Future; +use crate::task::{Context, Poll}; + +/// A future that yields the minimum item in a stream by a given comparison function. +#[derive(Clone, Debug)] +pub struct MinBy { + stream: S, + compare: F, + min: Option, +} + +impl Unpin for MinBy {} + +impl MinBy { + pub(super) fn new(stream: S, compare: F) -> Self { + MinBy { + stream, + compare, + min: None, + } + } +} + +impl Future for MinBy +where + S: futures_core::stream::Stream + Unpin, + S::Item: Copy, + F: FnMut(&S::Item, &S::Item) -> Ordering, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); + + match next { + Some(new) => { + cx.waker().wake_by_ref(); + match self.as_mut().min.take() { + None => self.as_mut().min = Some(new), + Some(old) => match (&mut self.as_mut().compare)(&new, &old) { + Ordering::Less => self.as_mut().min = Some(new), + _ => self.as_mut().min = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(self.min), + } + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 8dcc6d5..5eed9c2 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -27,6 +27,7 @@ pub use repeat::{repeat, Repeat}; pub use stream::{Stream, Take}; mod empty; +mod min_by; mod once; mod repeat; mod stream; diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 698eb32..95b4a61 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -21,10 +21,12 @@ //! # }) } //! ``` +use std::cmp::Ordering; use std::pin::Pin; use cfg_if::cfg_if; +use super::min_by::MinBy; use crate::future::Future; use crate::task::{Context, Poll}; use std::marker::PhantomData; @@ -118,6 +120,39 @@ pub trait Stream { } } + /// Returns the element that gives the minimum value with respect to the + /// specified comparison function. If several elements are equally minimum, + /// the first element is returned. If the stream is empty, `None` is returned. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// + /// let min = Stream::min_by(s.clone(), |x, y| x.cmp(y)).await; + /// assert_eq!(min, Some(1)); + /// + /// let min = Stream::min_by(s, |x, y| y.cmp(x)).await; + /// assert_eq!(min, Some(3)); + /// + /// let min = Stream::min_by(VecDeque::::new(), |x, y| x.cmp(y)).await; + /// assert_eq!(min, None); + /// # + /// # }) } + /// ``` + fn min_by(self, compare: F) -> MinBy + where + Self: Sized + Unpin, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + MinBy::new(self, compare) + } + /// Tests if every element of the stream matches a predicate. /// /// `all()` takes a closure that returns `true` or `false`. It applies