forked from mirror/async-std
		
	feat: Add stream::delay
This commit is contained in:
		
							parent
							
								
									3b055f364e
								
							
						
					
					
						commit
						635c592950
					
				
					 1 changed files with 17 additions and 13 deletions
				
			
		|  | @ -2,22 +2,24 @@ use std::future::Future; | ||||||
| use std::pin::Pin; | use std::pin::Pin; | ||||||
| use std::time::Duration; | use std::time::Duration; | ||||||
| 
 | 
 | ||||||
|  | use pin_project_lite::pin_project; | ||||||
|  | 
 | ||||||
| use crate::stream::Stream; | use crate::stream::Stream; | ||||||
| use crate::task::{Context, Poll}; | use crate::task::{Context, Poll}; | ||||||
| 
 | 
 | ||||||
|  | pin_project! { | ||||||
| #[doc(hidden)] | #[doc(hidden)] | ||||||
| #[allow(missing_debug_implementations)] | #[allow(missing_debug_implementations)] | ||||||
|     pub struct Delay<S> { |     pub struct Delay<S> { | ||||||
|  |         #[pin] | ||||||
|         stream: S, |         stream: S, | ||||||
|  |         #[pin] | ||||||
|         delay: futures_timer::Delay, |         delay: futures_timer::Delay, | ||||||
|         delay_done: bool, |         delay_done: bool, | ||||||
|     } |     } | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
| impl<S> Delay<S> { | impl<S> Delay<S> { | ||||||
|     pin_utils::unsafe_pinned!(stream: S); |  | ||||||
|     pin_utils::unsafe_pinned!(delay: futures_timer::Delay); |  | ||||||
|     pin_utils::unsafe_unpinned!(delay_done: bool); |  | ||||||
| 
 |  | ||||||
|     pub(super) fn new(stream: S, dur: Duration) -> Self { |     pub(super) fn new(stream: S, dur: Duration) -> Self { | ||||||
|         Delay { |         Delay { | ||||||
|             stream, |             stream, | ||||||
|  | @ -33,12 +35,14 @@ where | ||||||
| { | { | ||||||
|     type Item = S::Item; |     type Item = S::Item; | ||||||
| 
 | 
 | ||||||
|     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { |     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||||||
|         if !self.delay_done { |         let this = self.project(); | ||||||
|             futures_core::ready!(self.as_mut().delay().poll(cx)); | 
 | ||||||
|             *self.as_mut().delay_done() = true; |         if !*this.delay_done { | ||||||
|  |             futures_core::ready!(this.delay.poll(cx)); | ||||||
|  |             *this.delay_done = true; | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         self.as_mut().stream().poll_next(cx) |         this.stream.poll_next(cx) | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue