adds stream::nth combinator

This commit is contained in:
Fedor Sakharov 2019-09-08 21:42:35 +03:00
parent ba43a05d01
commit 45cd3b0894
No known key found for this signature in database
GPG key ID: 93D436E666BF0FEE
2 changed files with 101 additions and 0 deletions

View file

@ -25,6 +25,7 @@ mod all;
mod any; mod any;
mod min_by; mod min_by;
mod next; mod next;
mod nth;
mod take; mod take;
pub use take::Take; pub use take::Take;
@ -33,6 +34,7 @@ use all::AllFuture;
use any::AnyFuture; use any::AnyFuture;
use min_by::MinByFuture; use min_by::MinByFuture;
use next::NextFuture; use next::NextFuture;
use nth::NthFuture;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -161,6 +163,64 @@ pub trait Stream {
MinByFuture::new(self, compare) MinByFuture::new(self, compare)
} }
/// Returns the nth element of the stream.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
///
/// let second = s.nth(1).await;
/// assert_eq!(second, Some(2));
/// #
/// # }) }
/// ```
/// Calling `nth()` multiple times:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
///
/// let second = s.nth(0).await;
/// assert_eq!(second, Some(1));
///
/// let second = s.nth(0).await;
/// assert_eq!(second, Some(2));
/// #
/// # }) }
/// ```
/// Returning `None` if the stream finished before returning `n` elements:
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use std::collections::VecDeque;
/// use async_std::stream::Stream;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
///
/// let fourth = s.nth(4).await;
/// assert_eq!(fourth, None);
/// #
/// # }) }
/// ```
fn nth(&mut self, n: usize) -> ret!('_, NthFuture, Option<Self::Item>)
where
Self: Sized,
{
NthFuture::new(self, n)
}
/// Tests if every element of the stream matches a predicate. /// Tests if every element of the stream matches a predicate.
/// ///
/// `all()` takes a closure that returns `true` or `false`. It applies /// `all()` takes a closure that returns `true` or `false`. It applies

41
src/stream/stream/nth.rs Normal file
View file

@ -0,0 +1,41 @@
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug)]
pub struct NthFuture<'a, S> {
stream: &'a mut S,
n: usize,
}
impl<'a, S> NthFuture<'a, S> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(n: usize);
pub(crate) fn new(stream: &'a mut S, n: usize) -> Self {
NthFuture { stream, n }
}
}
impl<'a, S> futures_core::future::Future for NthFuture<'a, S>
where
S: futures_core::stream::Stream + Unpin + Sized,
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream;
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => match self.n {
0 => Poll::Ready(Some(v)),
_ => {
*self.as_mut().n() -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}