diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 756e8e9..a91517d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -315,7 +315,6 @@ extension_trait! { TakeWhile::new(self, predicate) } - #[cfg(all(feature = "default", feature = "unstable"))] #[doc = r#" Limit the amount of items yielded per timeslice in a stream. @@ -342,6 +341,8 @@ extension_trait! { # }) } ``` "#] + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn throttle(self, d: Duration) -> Throttle where Self: Sized, diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index 010839c..8813607 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -3,26 +3,25 @@ use std::pin::Pin; use std::time::Duration; use futures_timer::Delay; +use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -/// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements. -/// #[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct Throttle { - stream: S, - duration: Duration, - delay: Option, +pin_project! { + /// A stream that only yields one element once every `duration`, and applies backpressure. Does not drop any elements. + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct Throttle { + #[pin] + stream: S, + duration: Duration, + #[pin] + delay: Option, + } } -impl Unpin for Throttle {} - impl Throttle { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_unpinned!(duration: Duration); - pin_utils::unsafe_pinned!(delay: Option); - pub(super) fn new(stream: S, duration: Duration) -> Self { Throttle { stream, @@ -35,23 +34,24 @@ impl Throttle { impl Stream for Throttle { type Item = S::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(d) = self.as_mut().delay().as_pin_mut() { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + if let Some(d) = this.delay.as_mut().as_pin_mut() { if d.poll(cx).is_ready() { - *self.as_mut().delay() = None; + this.delay.set(None); } else { return Poll::Pending; } } - match self.as_mut().stream().poll_next(cx) { + match this.stream.poll_next(cx) { Poll::Pending => { cx.waker().wake_by_ref(); // Continue driving even though emitting Pending Poll::Pending } Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { - *self.as_mut().delay() = Some(Delay::new(self.duration)); + this.delay.set(Some(Delay::new(*this.duration))); Poll::Ready(Some(v)) } }