diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs new file mode 100644 index 0000000..fcd75f6 --- /dev/null +++ b/src/stream/stream/count.rs @@ -0,0 +1,41 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct CountFuture { + stream: S, + count: usize, +} + +impl CountFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(count: usize); + + pub(crate) fn new(stream: S) -> Self { + CountFuture { stream, count: 0 } + } +} + +impl Future for CountFuture +where + S: Sized + Stream, +{ + type Output = usize; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(_) => { + cx.waker().wake_by_ref(); + *self.as_mut().count() += 1; + Poll::Pending + } + None => Poll::Ready(self.count), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1..b9d4bc8 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -25,6 +25,7 @@ mod all; mod any; mod chain; mod cmp; +mod count; mod enumerate; mod filter; mod filter_map; @@ -57,6 +58,7 @@ mod zip; use all::AllFuture; use any::AnyFuture; use cmp::CmpFuture; +use count::CountFuture; use enumerate::Enumerate; use filter_map::FilterMap; use find::FindFuture; @@ -1392,6 +1394,33 @@ extension_trait! { CmpFuture::new(self, other) } + #[doc = r#" + Counts the number of elements in the stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let s1 = VecDeque::from(vec![0]); + let s2 = VecDeque::from(vec![1, 2, 3]); + + assert_eq!(s1.count().await, 1); + assert_eq!(s2.count().await, 3); + # + # }) } + ``` + "#] + fn count(self) -> impl Future [CountFuture] + where + Self: Sized + Stream, + { + CountFuture::new(self) + } + #[doc = r#" Determines if the elements of this `Stream` are lexicographically greater than or equal to those of another.