From 635c592950e1d70ebf96336b285c4c43a88e9357 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 27 Nov 2019 14:26:04 +0900 Subject: [PATCH] feat: Add stream::delay --- src/stream/stream/delay.rs | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/src/stream/stream/delay.rs b/src/stream/stream/delay.rs index f6de5f2d..b0450104 100644 --- a/src/stream/stream/delay.rs +++ b/src/stream/stream/delay.rs @@ -2,22 +2,24 @@ use std::future::Future; use std::pin::Pin; use std::time::Duration; +use pin_project_lite::pin_project; + use crate::stream::Stream; use crate::task::{Context, Poll}; +pin_project! { #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct Delay { - stream: S, - delay: futures_timer::Delay, - delay_done: bool, + pub struct Delay { + #[pin] + stream: S, + #[pin] + delay: futures_timer::Delay, + delay_done: bool, + } } impl Delay { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_pinned!(delay: futures_timer::Delay); - pin_utils::unsafe_unpinned!(delay_done: bool); - pub(super) fn new(stream: S, dur: Duration) -> Self { Delay { stream, @@ -33,12 +35,14 @@ where { type Item = S::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if !self.delay_done { - futures_core::ready!(self.as_mut().delay().poll(cx)); - *self.as_mut().delay_done() = true; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + if !*this.delay_done { + futures_core::ready!(this.delay.poll(cx)); + *this.delay_done = true; } - self.as_mut().stream().poll_next(cx) + this.stream.poll_next(cx) } }