diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs new file mode 100644 index 0000000..221b0f0 --- /dev/null +++ b/src/stream/stream/count.rs @@ -0,0 +1,44 @@ +use std::future::Future; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct CountFuture { + #[pin] + stream: S, + count: usize, + } +} + +impl CountFuture { + pub(crate) fn new(stream: S) -> Self { + CountFuture { stream, count: 0 } + } +} + +impl Future for CountFuture +where + S: Stream, +{ + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(_) => { + cx.waker().wake_by_ref(); + *this.count += 1; + Poll::Pending + } + None => Poll::Ready(*this.count), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d6292c3..51ac857 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -26,6 +26,7 @@ mod any; mod chain; mod cloned; mod cmp; +mod count; mod copied; mod cycle; mod enumerate; @@ -68,6 +69,7 @@ mod zip; use all::AllFuture; use any::AnyFuture; use cmp::CmpFuture; +use count::CountFuture; use cycle::Cycle; use enumerate::Enumerate; use eq::EqFuture; @@ -1889,6 +1891,33 @@ extension_trait! { CmpFuture::new(self, other) } + #[doc = r#" + Counts the number of elements in the stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let s1 = stream::from_iter(vec![0]); + let s2 = stream::from_iter(vec![1, 2, 3]); + + assert_eq!(s1.count().await, 1); + assert_eq!(s2.count().await, 3); + # + # }) } + ``` + "#] + fn count(self) -> impl Future [CountFuture] + where + Self: Sized, + { + CountFuture::new(self) + } + #[doc = r#" Determines if the elements of this `Stream` are lexicographically not equal to those of another.