2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-01-19 12:03:50 +00:00

adds stream::find combinator

This commit is contained in:
Fedor Sakharov 2019-09-10 23:38:11 +03:00
parent 6f9ec665a2
commit 97a5f9b50c
No known key found for this signature in database
GPG key ID: 93D436E666BF0FEE
2 changed files with 95 additions and 0 deletions

49
src/stream/stream/find.rs Normal file
View file

@ -0,0 +1,49 @@
use crate::task::{Context, Poll};
use std::marker::PhantomData;
use std::pin::Pin;
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FindFuture<'a, S, P, T> {
stream: &'a mut S,
p: P,
__t: PhantomData<T>,
}
impl<'a, S, P, T> FindFuture<'a, S, P, T> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(p: P);
pub(super) fn new(stream: &'a mut S, p: P) -> Self {
FindFuture {
stream,
p,
__t: PhantomData,
}
}
}
impl<'a, S, P> futures_core::future::Future for FindFuture<'a, S, P, S::Item>
where
S: futures_core::stream::Stream + Unpin + Sized,
P: FnMut(&S::Item) -> bool,
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream;
let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match item {
Some(v) => match (self.as_mut().p())(&v) {
true => Poll::Ready(Some(v)),
false => {
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}

View file

@ -24,6 +24,7 @@
mod all; mod all;
mod any; mod any;
mod filter_map; mod filter_map;
mod find;
mod find_map; mod find_map;
mod min_by; mod min_by;
mod next; mod next;
@ -35,6 +36,7 @@ pub use take::Take;
use all::AllFuture; use all::AllFuture;
use any::AnyFuture; use any::AnyFuture;
use filter_map::FilterMap; use filter_map::FilterMap;
use find::FindFuture;
use find_map::FindMapFuture; use find_map::FindMapFuture;
use min_by::MinByFuture; use min_by::MinByFuture;
use next::NextFuture; use next::NextFuture;
@ -321,6 +323,50 @@ pub trait Stream {
} }
} }
/// Searches for an element in a stream that satisfies a predicate.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use std::collections::VecDeque;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
/// let res = s.find(|x| *x == 2).await;
/// assert_eq!(res, Some(2));
/// #
/// # }) }
/// ```
///
/// Resuming after a first find:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use std::collections::VecDeque;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
/// let res = s.find(|x| *x == 2).await;
/// assert_eq!(res, Some(2));
///
/// let next = s.next().await;
/// assert_eq!(next, Some(3));
/// #
/// # }) }
/// ```
fn find<P>(&mut self, p: P) -> ret!('_, FindFuture, Option<Self::Item>, P, Self::Item)
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
FindFuture::new(self, p)
}
/// Applies function to the elements of stream and returns the first non-none result. /// Applies function to the elements of stream and returns the first non-none result.
/// ///
/// ``` /// ```