forked from mirror/async-std
		
	add stream::count
This commit is contained in:
		
							parent
							
								
									ec23632f3e
								
							
						
					
					
						commit
						a9a7bdc290
					
				
					 2 changed files with 70 additions and 0 deletions
				
			
		
							
								
								
									
										41
									
								
								src/stream/stream/count.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										41
									
								
								src/stream/stream/count.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,41 @@ | ||||||
|  | use std::pin::Pin; | ||||||
|  | 
 | ||||||
|  | use crate::future::Future; | ||||||
|  | use crate::stream::Stream; | ||||||
|  | use crate::task::{Context, Poll}; | ||||||
|  | 
 | ||||||
|  | #[doc(hidden)] | ||||||
|  | #[allow(missing_debug_implementations)] | ||||||
|  | pub struct CountFuture<S> { | ||||||
|  |     stream: S, | ||||||
|  |     count: usize, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl<S> CountFuture<S> { | ||||||
|  |     pin_utils::unsafe_pinned!(stream: S); | ||||||
|  |     pin_utils::unsafe_unpinned!(count: usize); | ||||||
|  | 
 | ||||||
|  |     pub(crate) fn new(stream: S) -> Self { | ||||||
|  |         CountFuture { stream, count: 0 } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl<S> Future for CountFuture<S> | ||||||
|  | where | ||||||
|  |     S: Sized + Stream, | ||||||
|  | { | ||||||
|  |     type Output = usize; | ||||||
|  | 
 | ||||||
|  |     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||||||
|  |         let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); | ||||||
|  | 
 | ||||||
|  |         match next { | ||||||
|  |             Some(_) => { | ||||||
|  |                 cx.waker().wake_by_ref(); | ||||||
|  |                 *self.as_mut().count() += 1; | ||||||
|  |                 Poll::Pending | ||||||
|  |             } | ||||||
|  |             None => Poll::Ready(self.count), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | @ -25,6 +25,7 @@ mod all; | ||||||
| mod any; | mod any; | ||||||
| mod chain; | mod chain; | ||||||
| mod cmp; | mod cmp; | ||||||
|  | mod count; | ||||||
| mod enumerate; | mod enumerate; | ||||||
| mod filter; | mod filter; | ||||||
| mod filter_map; | mod filter_map; | ||||||
|  | @ -57,6 +58,7 @@ mod zip; | ||||||
| use all::AllFuture; | use all::AllFuture; | ||||||
| use any::AnyFuture; | use any::AnyFuture; | ||||||
| use cmp::CmpFuture; | use cmp::CmpFuture; | ||||||
|  | use count::CountFuture; | ||||||
| use enumerate::Enumerate; | use enumerate::Enumerate; | ||||||
| use filter_map::FilterMap; | use filter_map::FilterMap; | ||||||
| use find::FindFuture; | use find::FindFuture; | ||||||
|  | @ -1392,6 +1394,33 @@ extension_trait! { | ||||||
|             CmpFuture::new(self, other) |             CmpFuture::new(self, other) | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|  |         #[doc = r#" | ||||||
|  |             Counts the number of elements in the stream. | ||||||
|  | 
 | ||||||
|  |             # Examples | ||||||
|  | 
 | ||||||
|  |             ``` | ||||||
|  |             # fn main() { async_std::task::block_on(async { | ||||||
|  |             # | ||||||
|  |             use async_std::prelude::*; | ||||||
|  |             use std::collections::VecDeque; | ||||||
|  | 
 | ||||||
|  |             let s1 = VecDeque::from(vec![0]); | ||||||
|  |             let s2 = VecDeque::from(vec![1, 2, 3]); | ||||||
|  | 
 | ||||||
|  |             assert_eq!(s1.count().await, 1); | ||||||
|  |             assert_eq!(s2.count().await, 3); | ||||||
|  |             # | ||||||
|  |             # }) } | ||||||
|  |             ``` | ||||||
|  |         "#]
 | ||||||
|  |         fn count(self) -> impl Future<Output = Ordering> [CountFuture<Self>] | ||||||
|  |         where | ||||||
|  |             Self: Sized + Stream, | ||||||
|  |         { | ||||||
|  |             CountFuture::new(self) | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|         #[doc = r#" |         #[doc = r#" | ||||||
|             Determines if the elements of this `Stream` are lexicographically |             Determines if the elements of this `Stream` are lexicographically | ||||||
|             greater than or equal to those of another. |             greater than or equal to those of another. | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue