diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs new file mode 100644 index 0000000..7a8cf47 --- /dev/null +++ b/src/stream/stream/timeout.rs @@ -0,0 +1,57 @@ +use std::error::Error; +use std::fmt; +use std::pin::Pin; +use std::time::Duration; + +use futures_timer::Delay; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[derive(Debug)] +pub struct TimeoutStream { + stream: S, + delay: Delay, +} + +impl TimeoutStream { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_pinned!(delay: Delay); + + pub fn new(stream: S, dur: Duration) -> TimeoutStream { + let delay = Delay::new(dur); + + TimeoutStream { stream, delay } + } +} + +impl Stream for TimeoutStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.as_mut().stream().poll_next(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => match self.delay().poll(cx) { + Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError))), + Poll::Pending => Poll::Pending, + }, + } + } +} + +/// An error returned when a stream times out. +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[cfg(any(feature = "unstable", feature = "docs"))] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TimeoutError; + +impl Error for TimeoutError {} + +impl fmt::Display for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "stream has timed out".fmt(f) + } +}