diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 764dc97..8035769 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -50,6 +50,7 @@ mod skip_while; mod step_by; mod take; mod take_while; +mod throttle; mod try_fold; mod try_for_each; mod zip; @@ -86,10 +87,12 @@ pub use skip_while::SkipWhile; pub use step_by::StepBy; pub use take::Take; pub use take_while::TakeWhile; +pub use throttle::Throttle; pub use zip::Zip; use std::cmp::Ordering; use std::marker::PhantomData; +use std::time::Duration; use cfg_if::cfg_if; @@ -288,6 +291,13 @@ extension_trait! { TakeWhile::new(self, predicate) } + fn throttle(self, d: Duration) -> Throttle + where + Self: Sized, + { + Throttle::new(self, d) + } + #[doc = r#" Creates a stream that yields each `step`th element. diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs new file mode 100644 index 0000000..c37c972 --- /dev/null +++ b/src/stream/stream/throttle.rs @@ -0,0 +1,56 @@ +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +use futures_timer::Delay; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// A stream that only yields one element once every `duration`, and drops all others. +/// #[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Throttle { + stream: S, + duration: Duration, + delay: Option, +} + +impl Unpin for Throttle {} + +impl Throttle { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(duration: Duration); + pin_utils::unsafe_pinned!(delay: Option); + + pub(super) fn new(stream: S, duration: Duration) -> Self { + Throttle { + stream, + duration, + delay: None, + } + } +} + +impl Stream for Throttle { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.as_mut().stream().poll_next(cx) { + Poll::Ready(v) => match self.as_mut().delay().as_pin_mut() { + None => { + *self.as_mut().delay() = Some(Delay::new(self.duration)); + Poll::Ready(v) + } + Some(d) => match d.poll(cx) { + Poll::Ready(_) => { + *self.as_mut().delay() = Some(Delay::new(self.duration)); + Poll::Ready(v) + } + Poll::Pending => Poll::Pending, + }, + }, + Poll::Pending => Poll::Pending, + } + } +}