diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 91b111e..1d6c3fe 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -25,6 +25,7 @@ mod all; mod any; mod min_by; mod next; +mod nth; mod take; pub use take::Take; @@ -33,6 +34,7 @@ use all::AllFuture; use any::AnyFuture; use min_by::MinByFuture; use next::NextFuture; +use nth::NthFuture; use std::cmp::Ordering; use std::marker::PhantomData; @@ -161,6 +163,64 @@ pub trait Stream { MinByFuture::new(self, compare) } + /// Returns the nth element of the stream. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// + /// let second = s.nth(1).await; + /// assert_eq!(second, Some(2)); + /// # + /// # }) } + /// ``` + /// Calling `nth()` multiple times: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// + /// let second = s.nth(0).await; + /// assert_eq!(second, Some(1)); + /// + /// let second = s.nth(0).await; + /// assert_eq!(second, Some(2)); + /// # + /// # }) } + /// ``` + /// Returning `None` if the stream finished before returning `n` elements: + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// + /// let fourth = s.nth(4).await; + /// assert_eq!(fourth, None); + /// # + /// # }) } + /// ``` + fn nth(&mut self, n: usize) -> ret!('_, NthFuture, Option) + where + Self: Sized, + { + NthFuture::new(self, n) + } + /// Tests if every element of the stream matches a predicate. /// /// `all()` takes a closure that returns `true` or `false`. It applies diff --git a/src/stream/stream/nth.rs b/src/stream/stream/nth.rs new file mode 100644 index 0000000..346fa1a --- /dev/null +++ b/src/stream/stream/nth.rs @@ -0,0 +1,41 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +#[derive(Debug)] +pub struct NthFuture<'a, S> { + stream: &'a mut S, + n: usize, +} + +impl<'a, S> NthFuture<'a, S> { + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(n: usize); + + pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { + NthFuture { stream, n } + } +} + +impl<'a, S> futures_core::future::Future for NthFuture<'a, S> +where + S: futures_core::stream::Stream + Unpin + Sized, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use futures_core::stream::Stream; + + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => match self.n { + 0 => Poll::Ready(Some(v)), + _ => { + *self.as_mut().n() -= 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +}