diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 95b0444..3584be3 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -281,7 +281,7 @@ extension_trait! { TakeWhile::new(self, predicate) } - fn throttle(self, d: Duration) -> Throttle + fn throttle(self, d: Duration) -> Throttle where Self: Sized, { diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index c37c972..8e96fea 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -10,47 +10,58 @@ use crate::task::{Context, Poll}; /// A stream that only yields one element once every `duration`, and drops all others. /// #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct Throttle { +pub struct Throttle { stream: S, duration: Duration, delay: Option, + last: Option, } -impl Unpin for Throttle {} +impl Unpin for Throttle {} -impl Throttle { +impl Throttle { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(duration: Duration); pin_utils::unsafe_pinned!(delay: Option); + pin_utils::unsafe_unpinned!(last: Option); pub(super) fn new(stream: S, duration: Duration) -> Self { Throttle { stream, duration, delay: None, + last: None, } } } -impl Stream for Throttle { +impl Stream for Throttle { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.as_mut().stream().poll_next(cx) { - Poll::Ready(v) => match self.as_mut().delay().as_pin_mut() { - None => { + if let Some(d) = self.as_mut().delay().as_pin_mut() { + if d.poll(cx).is_ready() { + if let Some(v) = self.as_mut().last().take() { + // Sets last to None. *self.as_mut().delay() = Some(Delay::new(self.duration)); - Poll::Ready(v) + return Poll::Ready(Some(v)); } - Some(d) => match d.poll(cx) { - Poll::Ready(_) => { - *self.as_mut().delay() = Some(Delay::new(self.duration)); - Poll::Ready(v) - } - Poll::Pending => Poll::Pending, - }, - }, + } + } + + match self.as_mut().stream().poll_next(cx) { Poll::Pending => Poll::Pending, + Poll::Ready(None) => return Poll::Ready(None), + Poll::Ready(Some(v)) => { + if self.as_mut().delay().is_some() { + *self.as_mut().last() = Some(v); + cx.waker().wake_by_ref(); // Continue driving even though emitting Pending + return Poll::Pending; + } + + *self.as_mut().delay() = Some(Delay::new(self.duration)); + Poll::Ready(Some(v)) + } } } }