forked from mirror/async-std
		
	Changed semantics of throttle to non-dropping variant with backpressure
This commit is contained in:
		
							parent
							
								
									14d7d3bf9c
								
							
						
					
					
						commit
						b591fc68bd
					
				
					 3 changed files with 67 additions and 25 deletions
				
			
		
							
								
								
									
										27
									
								
								examples/throttle.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										27
									
								
								examples/throttle.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,27 @@ | ||||||
|  | //! Spawns a timed task which gets throttled.
 | ||||||
|  | 
 | ||||||
|  | fn main() { | ||||||
|  |     #[cfg(feature = "unstable")] | ||||||
|  |     { | ||||||
|  |         use async_std::prelude::*; | ||||||
|  |         use async_std::task; | ||||||
|  | 
 | ||||||
|  |         task::block_on(async { | ||||||
|  |             use async_std::stream; | ||||||
|  |             use std::time::Duration; | ||||||
|  | 
 | ||||||
|  |             // emit value every 1 second
 | ||||||
|  |             let s = stream::interval(Duration::from_nanos(1000000)).enumerate(); | ||||||
|  | 
 | ||||||
|  |             // throttle for 2 seconds
 | ||||||
|  |             let s = s.throttle(Duration::from_secs(2)); | ||||||
|  | 
 | ||||||
|  |             s.for_each(|(n, _)| { | ||||||
|  |                 dbg!(n); | ||||||
|  |             }) | ||||||
|  |             .await; | ||||||
|  |             // => 0 .. 1 .. 2 .. 3
 | ||||||
|  |             // with a pause of 2 seconds between each print
 | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | @ -117,7 +117,6 @@ use std::time::Duration; | ||||||
| cfg_unstable! { | cfg_unstable! { | ||||||
|     use std::future::Future; |     use std::future::Future; | ||||||
|     use std::pin::Pin; |     use std::pin::Pin; | ||||||
|     use std::time::Duration; |  | ||||||
| 
 | 
 | ||||||
|     use crate::stream::into_stream::IntoStream; |     use crate::stream::into_stream::IntoStream; | ||||||
|     use crate::stream::{FromStream, Product, Sum}; |     use crate::stream::{FromStream, Product, Sum}; | ||||||
|  | @ -316,7 +315,33 @@ extension_trait! { | ||||||
|             TakeWhile::new(self, predicate) |             TakeWhile::new(self, predicate) | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         fn throttle(self, d: Duration) -> Throttle<Self, Self::Item> |         #[doc = r#" | ||||||
|  |             Limit the amount of items yielded per timeslice in a stream. | ||||||
|  | 
 | ||||||
|  |             # Examples | ||||||
|  |             ```ignore | ||||||
|  |             # fn main() { async_std::task::block_on(async { | ||||||
|  |             # | ||||||
|  |             use async_std::stream; | ||||||
|  |             use std::time::Duration; | ||||||
|  | 
 | ||||||
|  |             // emit value every 1 second
 | ||||||
|  |             let s = stream::interval(Duration::from_nanos(1000000)).enumerate(); | ||||||
|  | 
 | ||||||
|  |             // throttle for 2 seconds
 | ||||||
|  |             let s = s.throttle(Duration::from_secs(2)); | ||||||
|  | 
 | ||||||
|  |             s.for_each(|(n, _)| { | ||||||
|  |                 dbg!(n); | ||||||
|  |             }) | ||||||
|  |             .await; | ||||||
|  |             // => 0 .. 1 .. 2 .. 3
 | ||||||
|  |             // with a pause of 2 seconds between each print
 | ||||||
|  |             # | ||||||
|  |             # }) } | ||||||
|  |             ``` | ||||||
|  |         "#]
 | ||||||
|  |         fn throttle(self, d: Duration) -> Throttle<Self> | ||||||
|         where |         where | ||||||
|             Self: Sized, |             Self: Sized, | ||||||
|         { |         { | ||||||
|  |  | ||||||
|  | @ -7,60 +7,50 @@ use futures_timer::Delay; | ||||||
| use crate::stream::Stream; | use crate::stream::Stream; | ||||||
| use crate::task::{Context, Poll}; | use crate::task::{Context, Poll}; | ||||||
| 
 | 
 | ||||||
| /// A stream that only yields one element once every `duration`, and drops all others.
 | /// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements.
 | ||||||
| /// #[doc(hidden)]
 | /// #[doc(hidden)]
 | ||||||
| #[allow(missing_debug_implementations)] | #[allow(missing_debug_implementations)] | ||||||
| pub struct Throttle<S, T> { | pub struct Throttle<S> { | ||||||
|     stream: S, |     stream: S, | ||||||
|     duration: Duration, |     duration: Duration, | ||||||
|     delay: Option<Delay>, |     delay: Option<Delay>, | ||||||
|     last: Option<T>, |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<S: Unpin, T> Unpin for Throttle<S, T> {} | impl<S: Unpin> Unpin for Throttle<S> {} | ||||||
| 
 | 
 | ||||||
| impl<S: Stream> Throttle<S, S::Item> { | impl<S: Stream> Throttle<S> { | ||||||
|     pin_utils::unsafe_pinned!(stream: S); |     pin_utils::unsafe_pinned!(stream: S); | ||||||
|     pin_utils::unsafe_unpinned!(duration: Duration); |     pin_utils::unsafe_unpinned!(duration: Duration); | ||||||
|     pin_utils::unsafe_pinned!(delay: Option<Delay>); |     pin_utils::unsafe_pinned!(delay: Option<Delay>); | ||||||
|     pin_utils::unsafe_unpinned!(last: Option<S::Item>); |  | ||||||
| 
 | 
 | ||||||
|     pub(super) fn new(stream: S, duration: Duration) -> Self { |     pub(super) fn new(stream: S, duration: Duration) -> Self { | ||||||
|         Throttle { |         Throttle { | ||||||
|             stream, |             stream, | ||||||
|             duration, |             duration, | ||||||
|             delay: None, |             delay: None, | ||||||
|             last: None, |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<S: Stream> Stream for Throttle<S, S::Item> { | impl<S: Stream> Stream for Throttle<S> { | ||||||
|     type Item = S::Item; |     type Item = S::Item; | ||||||
| 
 | 
 | ||||||
|     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { |     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { | ||||||
|         if let Some(d) = self.as_mut().delay().as_pin_mut() { |         if let Some(d) = self.as_mut().delay().as_pin_mut() { | ||||||
|             if d.poll(cx).is_ready() { |             if d.poll(cx).is_ready() { | ||||||
|                 if let Some(v) = self.as_mut().last().take() { |  | ||||||
|                     // Sets last to None.
 |  | ||||||
|                     *self.as_mut().delay() = Some(Delay::new(self.duration)); |  | ||||||
|                     return Poll::Ready(Some(v)); |  | ||||||
|                 } else { |  | ||||||
|                 *self.as_mut().delay() = None; |                 *self.as_mut().delay() = None; | ||||||
|                 } |             } else { | ||||||
|  |                 return Poll::Pending; | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         match self.as_mut().stream().poll_next(cx) { |         match self.as_mut().stream().poll_next(cx) { | ||||||
|             Poll::Pending => Poll::Pending, |             Poll::Pending => { | ||||||
|             Poll::Ready(None) => return Poll::Ready(None), |  | ||||||
|             Poll::Ready(Some(v)) => { |  | ||||||
|                 if self.as_mut().delay().is_some() { |  | ||||||
|                     *self.as_mut().last() = Some(v); |  | ||||||
|                 cx.waker().wake_by_ref(); // Continue driving even though emitting Pending
 |                 cx.waker().wake_by_ref(); // Continue driving even though emitting Pending
 | ||||||
|                     return Poll::Pending; |                 Poll::Pending | ||||||
|             } |             } | ||||||
| 
 |             Poll::Ready(None) => Poll::Ready(None), | ||||||
|  |             Poll::Ready(Some(v)) => { | ||||||
|                 *self.as_mut().delay() = Some(Delay::new(self.duration)); |                 *self.as_mut().delay() = Some(Delay::new(self.duration)); | ||||||
|                 Poll::Ready(Some(v)) |                 Poll::Ready(Some(v)) | ||||||
|             } |             } | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue