Merge pull request #264 from montekki/fs-stream-for-each

Adds for_each stream combinator
This commit is contained in:
Yoshua Wuyts 2019-10-01 15:32:36 +02:00 committed by GitHub
commit a97a1fffff
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 83 additions and 0 deletions

View file

@ -0,0 +1,46 @@
use std::marker::PhantomData;
use std::pin::Pin;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct ForEachFuture<S, F, T> {
stream: S,
f: F,
__t: PhantomData<T>,
}
impl<S, F, T> ForEachFuture<S, F, T> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(f: F);
pub(super) fn new(stream: S, f: F) -> Self {
ForEachFuture {
stream,
f,
__t: PhantomData,
}
}
}
impl<S, F> Future for ForEachFuture<S, F, S::Item>
where
S: Stream + Sized,
F: FnMut(S::Item),
{
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(v) => (self.as_mut().f())(v),
None => return Poll::Ready(()),
}
}
}
}

View file

@ -30,6 +30,7 @@ mod filter_map;
mod find; mod find;
mod find_map; mod find_map;
mod fold; mod fold;
mod for_each;
mod fuse; mod fuse;
mod inspect; mod inspect;
mod min_by; mod min_by;
@ -49,6 +50,7 @@ use filter_map::FilterMap;
use find::FindFuture; use find::FindFuture;
use find_map::FindMapFuture; use find_map::FindMapFuture;
use fold::FoldFuture; use fold::FoldFuture;
use for_each::ForEachFuture;
use min_by::MinByFuture; use min_by::MinByFuture;
use next::NextFuture; use next::NextFuture;
use nth::NthFuture; use nth::NthFuture;
@ -750,6 +752,41 @@ extension_trait! {
FoldFuture::new(self, init, f) FoldFuture::new(self, init, f)
} }
#[doc = r#"
Call a closure on each element of the stream.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use std::sync::mpsc::channel;
let (tx, rx) = channel();
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await;
let v: Vec<_> = rx.iter().collect();
assert_eq!(v, vec![1, 2, 3]);
#
# }) }
```
"#]
fn for_each<F>(
self,
f: F,
) -> impl Future<Output = ()> [ForEachFuture<Self, F, Self::Item>]
where
Self: Sized,
F: FnMut(Self::Item),
{
ForEachFuture::new(self, f)
}
#[doc = r#" #[doc = r#"
Tests if any element of the stream matches a predicate. Tests if any element of the stream matches a predicate.