From 7a87dea085884eebbe7472be5b4658218b58cd32 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 15:31:07 +0900 Subject: [PATCH] feat: Add Stream::timeout --- src/stream/stream/mod.rs | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d582d70..c44917d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -104,13 +104,16 @@ cfg_if! { cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { mod merge; + mod timeout; use std::pin::Pin; + use std::time::Duration; use crate::future::Future; use crate::stream::FromStream; pub use merge::Merge; + pub use timeout::TimeoutStream; } } @@ -1044,6 +1047,40 @@ extension_trait! { Skip::new(self, n) } + #[doc=r#" + Await a stream or times out after a duration of time. + + If you want to await an I/O future consider using + [`io::timeout`](../io/fn.timeout.html) instead. + + # Examples + + ``` + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use std::time::Duration; + + use async_std::stream; + use async_std::prelude::*; + + let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1)); + + while let Some(v) = s.next().await { + assert_eq!(v, Ok(1)); + } + # + # Ok(()) }) } + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn timeout(self, dur: Duration) -> TimeoutStream + where + Self: Stream + Sized, + { + TimeoutStream::new(self, dur) + } + #[doc = r#" A combinator that applies a function as long as it returns successfully, producing a single, final value. Immediately returns the error when the function returns unsuccessfully.