From a9a7bdc29039951c39290741f1e512afa4f70831 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Fri, 18 Oct 2019 07:23:52 +0200 Subject: [PATCH 1/6] add stream::count --- src/stream/stream/count.rs | 41 ++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 29 +++++++++++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 src/stream/stream/count.rs diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs new file mode 100644 index 0000000..fcd75f6 --- /dev/null +++ b/src/stream/stream/count.rs @@ -0,0 +1,41 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct CountFuture { + stream: S, + count: usize, +} + +impl CountFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(count: usize); + + pub(crate) fn new(stream: S) -> Self { + CountFuture { stream, count: 0 } + } +} + +impl Future for CountFuture +where + S: Sized + Stream, +{ + type Output = usize; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(_) => { + cx.waker().wake_by_ref(); + *self.as_mut().count() += 1; + Poll::Pending + } + None => Poll::Ready(self.count), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1..b9d4bc8 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -25,6 +25,7 @@ mod all; mod any; mod chain; mod cmp; +mod count; mod enumerate; mod filter; mod filter_map; @@ -57,6 +58,7 @@ mod zip; use all::AllFuture; use any::AnyFuture; use cmp::CmpFuture; +use count::CountFuture; use enumerate::Enumerate; use filter_map::FilterMap; use find::FindFuture; @@ -1392,6 +1394,33 @@ extension_trait! { CmpFuture::new(self, other) } + #[doc = r#" + Counts the number of elements in the stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let s1 = VecDeque::from(vec![0]); + let s2 = VecDeque::from(vec![1, 2, 3]); + + assert_eq!(s1.count().await, 1); + assert_eq!(s2.count().await, 3); + # + # }) } + ``` + "#] + fn count(self) -> impl Future [CountFuture] + where + Self: Sized + Stream, + { + CountFuture::new(self) + } + #[doc = r#" Determines if the elements of this `Stream` are lexicographically greater than or equal to those of another. From 97094b2a1c78e20ed01bbc1a27776506907c49a9 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Sun, 20 Oct 2019 22:03:51 +0200 Subject: [PATCH 2/6] remove Sized constraint --- src/stream/stream/count.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs index fcd75f6..b6d53ca 100644 --- a/src/stream/stream/count.rs +++ b/src/stream/stream/count.rs @@ -22,7 +22,7 @@ impl CountFuture { impl Future for CountFuture where - S: Sized + Stream, + S: Stream, { type Output = usize; From 6608d39c59f508ae09a32410df14c2e623b9cf44 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Sat, 26 Oct 2019 21:58:34 +0200 Subject: [PATCH 3/6] remove Stream trait bound --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b9d4bc8..9e0c1ef 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1416,7 +1416,7 @@ extension_trait! { "#] fn count(self) -> impl Future [CountFuture] where - Self: Sized + Stream, + Self: Sized, { CountFuture::new(self) } From 7d2282dbd284c80a5ae32f1a4d1c24234755f147 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Mon, 11 Nov 2019 22:11:06 +0100 Subject: [PATCH 4/6] fix merge conflict --- src/stream/stream/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 653261f..2f21c38 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -68,11 +68,8 @@ mod zip; use all::AllFuture; use any::AnyFuture; use cmp::CmpFuture; -<<<<<<< HEAD use count::CountFuture; -======= use cycle::Cycle; ->>>>>>> master use enumerate::Enumerate; use eq::EqFuture; use filter_map::FilterMap; From 37922408e56560eec2049ba8b57ca4217a203211 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Mon, 11 Nov 2019 22:17:29 +0100 Subject: [PATCH 5/6] use pin_project --- src/stream/stream/count.rs | 29 ++++++++++++++++------------- src/stream/stream/mod.rs | 6 +++--- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs index b6d53ca..221b0f0 100644 --- a/src/stream/stream/count.rs +++ b/src/stream/stream/count.rs @@ -1,20 +1,22 @@ +use std::future::Future; use std::pin::Pin; -use crate::future::Future; +use pin_project_lite::pin_project; + use crate::stream::Stream; use crate::task::{Context, Poll}; -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct CountFuture { - stream: S, - count: usize, +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct CountFuture { + #[pin] + stream: S, + count: usize, + } } impl CountFuture { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_unpinned!(count: usize); - pub(crate) fn new(stream: S) -> Self { CountFuture { stream, count: 0 } } @@ -26,16 +28,17 @@ where { type Output = usize; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); match next { Some(_) => { cx.waker().wake_by_ref(); - *self.as_mut().count() += 1; + *this.count += 1; Poll::Pending } - None => Poll::Ready(self.count), + None => Poll::Ready(*this.count), } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2f21c38..d659227 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1813,10 +1813,10 @@ extension_trait! { # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; - use std::collections::VecDeque; + use async_std::stream; - let s1 = VecDeque::from(vec![0]); - let s2 = VecDeque::from(vec![1, 2, 3]); + let s1 = stream::from_iter(vec![0]); + let s2 = stream::from_iter(vec![1, 2, 3]); assert_eq!(s1.count().await, 1); assert_eq!(s2.count().await, 3); From 9ebe41f2d62f41aef481b2b4d780e6309080ade0 Mon Sep 17 00:00:00 2001 From: Johannes Weissmann Date: Thu, 14 Nov 2019 10:34:09 +0100 Subject: [PATCH 6/6] Update src/stream/stream/mod.rs Co-Authored-By: nasa --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d659227..98c63ff 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1824,7 +1824,7 @@ extension_trait! { # }) } ``` "#] - fn count(self) -> impl Future [CountFuture] + fn count(self) -> impl Future [CountFuture] where Self: Sized, {