From 48c82a9668ec3f18246d7cb5066b72f1e5e3133d Mon Sep 17 00:00:00 2001 From: zhangguyu Date: Thu, 31 Oct 2019 22:33:17 +0800 Subject: [PATCH 1/2] Add stream position --- src/stream/stream/mod.rs | 41 ++++++++++++++++++++++++++++ src/stream/stream/position.rs | 51 +++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 src/stream/stream/position.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e8190387..c4abe32f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -46,6 +46,7 @@ mod min_by_key; mod next; mod nth; mod partial_cmp; +mod position; mod scan; mod skip; mod skip_while; @@ -76,6 +77,7 @@ use min_by_key::MinByKeyFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; +use position::PositionFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; @@ -1548,6 +1550,45 @@ extension_trait! { PartialCmpFuture::new(self, other) } + #[doc = r#" + Searches for an element in a Stream that satisfies a predicate, returning + its index. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let res = s.clone().position(|x| *x == 1).await; + assert_eq!(res, Some(0)); + + let res = s.clone().position(|x| *x == 2).await; + assert_eq!(res, Some(1)); + + let res = s.clone().position(|x| *x == 3).await; + assert_eq!(res, Some(2)); + + let res = s.clone().position(|x| *x == 4).await; + assert_eq!(res, None); + # + # }) } + ``` + "#] + fn position

( + self, + predicate: P + ) -> impl Future> [PositionFuture] + where + Self: Sized + Stream, + P: FnMut(&Self::Item) -> bool, + { + PositionFuture::new(self, predicate) + } + #[doc = r#" Lexicographically compares the elements of this `Stream` with those of another using 'Ord'. diff --git a/src/stream/stream/position.rs b/src/stream/stream/position.rs new file mode 100644 index 00000000..3cd5b84c --- /dev/null +++ b/src/stream/stream/position.rs @@ -0,0 +1,51 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct PositionFuture { + #[pin] + stream: S, + predicate: P, + index:usize, + } +} + +impl PositionFuture { + pub(super) fn new(stream: S, predicate: P) -> Self { + PositionFuture { + stream, + predicate, + index: 0, + } + } +} + +impl Future for PositionFuture +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)), + Some(_) => { + cx.waker().wake_by_ref(); + *this.index += 1; + Poll::Pending + } + None => Poll::Ready(None), + } + } +} From 07d21e5eb37e6ddc2a1819dd0d27b83338f21299 Mon Sep 17 00:00:00 2001 From: zhangguyu Date: Thu, 31 Oct 2019 23:30:11 +0800 Subject: [PATCH 2/2] change trait bounds --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index c4abe32f..0469c7ae 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1583,7 +1583,7 @@ extension_trait! { predicate: P ) -> impl Future> [PositionFuture] where - Self: Sized + Stream, + Self: Sized, P: FnMut(&Self::Item) -> bool, { PositionFuture::new(self, predicate)