diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 3bf95e3..3e5dfe4 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -48,6 +48,7 @@ mod ne; mod next; mod nth; mod partial_cmp; +mod position; mod scan; mod skip; mod skip_while; @@ -80,6 +81,7 @@ use ne::NeFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; +use position::PositionFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; @@ -1552,6 +1554,45 @@ extension_trait! { PartialCmpFuture::new(self, other) } + #[doc = r#" + Searches for an element in a Stream that satisfies a predicate, returning + its index. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let res = s.clone().position(|x| *x == 1).await; + assert_eq!(res, Some(0)); + + let res = s.clone().position(|x| *x == 2).await; + assert_eq!(res, Some(1)); + + let res = s.clone().position(|x| *x == 3).await; + assert_eq!(res, Some(2)); + + let res = s.clone().position(|x| *x == 4).await; + assert_eq!(res, None); + # + # }) } + ``` + "#] + fn position

( + self, + predicate: P + ) -> impl Future> [PositionFuture] + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + PositionFuture::new(self, predicate) + } + #[doc = r#" Lexicographically compares the elements of this `Stream` with those of another using 'Ord'. diff --git a/src/stream/stream/position.rs b/src/stream/stream/position.rs new file mode 100644 index 0000000..3cd5b84 --- /dev/null +++ b/src/stream/stream/position.rs @@ -0,0 +1,51 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct PositionFuture { + #[pin] + stream: S, + predicate: P, + index:usize, + } +} + +impl PositionFuture { + pub(super) fn new(stream: S, predicate: P) -> Self { + PositionFuture { + stream, + predicate, + index: 0, + } + } +} + +impl Future for PositionFuture +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)), + Some(_) => { + cx.waker().wake_by_ref(); + *this.index += 1; + Poll::Pending + } + None => Poll::Ready(None), + } + } +}