diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 0e0ee912..ec15728b 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -18,6 +18,7 @@ pin_project! { stream: S, #[pin] delay: Timer, + duration: Duration, } } @@ -25,7 +26,7 @@ impl Timeout { pub(crate) fn new(stream: S, dur: Duration) -> Self { let delay = timer_after(dur); - Self { stream, delay } + Self { stream, delay, duration: dur } } } @@ -33,16 +34,20 @@ impl Stream for Timeout { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); + let mut this = self.project(); - match this.stream.poll_next(cx) { + let r = match this.stream.poll_next(cx) { Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => match this.delay.poll(cx) { + Poll::Pending => match this.delay.as_mut().poll(cx) { Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError { _private: () }))), - Poll::Pending => Poll::Pending, + Poll::Pending => return Poll::Pending, }, - } + }; + + *this.delay.as_mut() = timer_after(*this.duration); + + r } }