forked from mirror/async-std
		
	Adds for_each stream combinator
This commit is contained in:
		
							parent
							
								
									33d2191cec
								
							
						
					
					
						commit
						6da7efc5ac
					
				
					 2 changed files with 83 additions and 0 deletions
				
			
		
							
								
								
									
										46
									
								
								src/stream/stream/for_each.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										46
									
								
								src/stream/stream/for_each.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,46 @@ | ||||||
|  | use std::marker::PhantomData; | ||||||
|  | use std::pin::Pin; | ||||||
|  | 
 | ||||||
|  | use crate::future::Future; | ||||||
|  | use crate::stream::Stream; | ||||||
|  | use crate::task::{Context, Poll}; | ||||||
|  | 
 | ||||||
|  | #[doc(hidden)] | ||||||
|  | #[allow(missing_debug_implementations)] | ||||||
|  | pub struct ForEachFuture<S, F, T> { | ||||||
|  |     stream: S, | ||||||
|  |     f: F, | ||||||
|  |     __t: PhantomData<T>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl<S, F, T> ForEachFuture<S, F, T> { | ||||||
|  |     pin_utils::unsafe_pinned!(stream: S); | ||||||
|  |     pin_utils::unsafe_unpinned!(f: F); | ||||||
|  | 
 | ||||||
|  |     pub(super) fn new(stream: S, f: F) -> Self { | ||||||
|  |         ForEachFuture { | ||||||
|  |             stream, | ||||||
|  |             f, | ||||||
|  |             __t: PhantomData, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl<S, F> Future for ForEachFuture<S, F, S::Item> | ||||||
|  | where | ||||||
|  |     S: Stream + Sized, | ||||||
|  |     F: FnMut(S::Item), | ||||||
|  | { | ||||||
|  |     type Output = (); | ||||||
|  | 
 | ||||||
|  |     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||||
|  |         loop { | ||||||
|  |             let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); | ||||||
|  | 
 | ||||||
|  |             match next { | ||||||
|  |                 Some(v) => (self.as_mut().f())(v), | ||||||
|  |                 None => return Poll::Ready(()), | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | @ -30,6 +30,7 @@ mod filter_map; | ||||||
| mod find; | mod find; | ||||||
| mod find_map; | mod find_map; | ||||||
| mod fold; | mod fold; | ||||||
|  | mod for_each; | ||||||
| mod fuse; | mod fuse; | ||||||
| mod inspect; | mod inspect; | ||||||
| mod min_by; | mod min_by; | ||||||
|  | @ -49,6 +50,7 @@ use filter_map::FilterMap; | ||||||
| use find::FindFuture; | use find::FindFuture; | ||||||
| use find_map::FindMapFuture; | use find_map::FindMapFuture; | ||||||
| use fold::FoldFuture; | use fold::FoldFuture; | ||||||
|  | use for_each::ForEachFuture; | ||||||
| use min_by::MinByFuture; | use min_by::MinByFuture; | ||||||
| use next::NextFuture; | use next::NextFuture; | ||||||
| use nth::NthFuture; | use nth::NthFuture; | ||||||
|  | @ -750,6 +752,41 @@ extension_trait! { | ||||||
|             FoldFuture::new(self, init, f) |             FoldFuture::new(self, init, f) | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  |         #[doc = r#" | ||||||
|  |             Call a closure on each element of the stream. | ||||||
|  | 
 | ||||||
|  |             # Examples | ||||||
|  | 
 | ||||||
|  |             ``` | ||||||
|  |             # fn main() { async_std::task::block_on(async { | ||||||
|  |             # | ||||||
|  |             use async_std::prelude::*; | ||||||
|  |             use std::collections::VecDeque; | ||||||
|  |             use std::sync::mpsc::channel; | ||||||
|  | 
 | ||||||
|  |             let (tx, rx) = channel(); | ||||||
|  | 
 | ||||||
|  |             let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect(); | ||||||
|  |             let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await; | ||||||
|  | 
 | ||||||
|  |             let v: Vec<_> = rx.iter().collect(); | ||||||
|  | 
 | ||||||
|  |             assert_eq!(v, vec![1, 2, 3]); | ||||||
|  |             # | ||||||
|  |             # }) } | ||||||
|  |             ``` | ||||||
|  |         "#]
 | ||||||
|  |         fn for_each<F>( | ||||||
|  |             self, | ||||||
|  |             f: F, | ||||||
|  |         ) -> impl Future<Output = ()> [ForEachFuture<Self, F, Self::Item>] | ||||||
|  |         where | ||||||
|  |             Self: Sized, | ||||||
|  |             F: FnMut(Self::Item), | ||||||
|  |         { | ||||||
|  |             ForEachFuture::new(self, f) | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         #[doc = r#" |         #[doc = r#" | ||||||
|             Tests if any element of the stream matches a predicate. |             Tests if any element of the stream matches a predicate. | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue