diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 7e0270e..3c14811 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -4,22 +4,24 @@ use std::pin::Pin; use std::time::Duration; use futures_timer::Delay; +use pin_project_lite::pin_project; use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; -/// A stream with timeout time set -#[derive(Debug)] -pub struct Timeout { - stream: S, - delay: Delay, +pin_project! { + /// A stream with timeout time set + #[derive(Debug)] + pub struct Timeout { + #[pin] + stream: S, + #[pin] + delay: Delay, + } } impl Timeout { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_pinned!(delay: Delay); - pub fn new(stream: S, dur: Duration) -> Timeout { let delay = Delay::new(dur); @@ -30,11 +32,13 @@ impl Timeout { impl Stream for Timeout { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.as_mut().stream().poll_next(cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + match this.stream.poll_next(cx) { Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => match self.delay().poll(cx) { + Poll::Pending => match this.delay.poll(cx) { Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError { _private: () }))), Poll::Pending => Poll::Pending, },