2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-01-19 20:13:51 +00:00

Merge pull request #409 from yjhmelody/stream-min

Add Stream min
This commit is contained in:
Yoshua Wuyts 2019-11-01 00:41:54 +01:00 committed by GitHub
commit cc75b65b8c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 96 additions and 0 deletions

60
src/stream/stream/min.rs Normal file
View file

@ -0,0 +1,60 @@
use std::marker::PhantomData;
use std::cmp::{Ordering, Ord};
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MinFuture<S, F, T> {
#[pin]
stream: S,
_compare: PhantomData<F>,
min: Option<T>,
}
}
impl<S, F, T> MinFuture<S, F, T> {
pub(super) fn new(stream: S) -> Self {
Self {
stream,
_compare: PhantomData,
min: None,
}
}
}
impl<S, F> Future for MinFuture<S, F, S::Item>
where
S: Stream,
S::Item: Ord,
F: FnMut(&S::Item, &S::Item) -> Ordering,
{
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
cx.waker().wake_by_ref();
match this.min.take() {
None => *this.min = Some(new),
Some(old) => match new.cmp(&old) {
Ordering::Less => *this.min = Some(new),
_ => *this.min = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(this.min.take()),
}
}
}

View file

@ -42,6 +42,7 @@ mod le;
mod lt; mod lt;
mod map; mod map;
mod max_by; mod max_by;
mod min;
mod min_by; mod min_by;
mod min_by_key; mod min_by_key;
mod ne; mod ne;
@ -75,6 +76,7 @@ use last::LastFuture;
use le::LeFuture; use le::LeFuture;
use lt::LtFuture; use lt::LtFuture;
use max_by::MaxByFuture; use max_by::MaxByFuture;
use min::MinFuture;
use min_by::MinByFuture; use min_by::MinByFuture;
use min_by_key::MinByKeyFuture; use min_by_key::MinByKeyFuture;
use ne::NeFuture; use ne::NeFuture;
@ -766,6 +768,40 @@ extension_trait! {
MinByFuture::new(self, compare) MinByFuture::new(self, compare)
} }
#[doc = r#"
Returns the element that gives the minimum value. If several elements are equally minimum,
the first element is returned. If the stream is empty, `None` is returned.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let min = s.clone().min().await;
assert_eq!(min, Some(1));
let min = VecDeque::<usize>::new().min().await;
assert_eq!(min, None);
#
# }) }
```
"#]
fn min<F>(
self,
) -> impl Future<Output = Option<Self::Item>> [MinFuture<Self, F, Self::Item>]
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
{
MinFuture::new(self)
}
#[doc = r#" #[doc = r#"
Returns the element that gives the maximum value with respect to the Returns the element that gives the maximum value with respect to the
specified comparison function. If several elements are equally maximum, specified comparison function. If several elements are equally maximum,