From 78c49f92b6307d0fbc504dbb6a9414ad87f93818 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 16 Aug 2019 15:35:04 +0200 Subject: [PATCH] Add initial Fuse implementation for Stream Signed-off-by: Yoshua Wuyts --- src/stream/mod.rs | 2 +- src/stream/stream/fuse.rs | 54 +++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 34 ++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 src/stream/stream/fuse.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a08c827..ec1c23b 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -27,7 +27,7 @@ pub use from_stream::FromStream; pub use into_stream::IntoStream; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Scan, Stream, Take, Zip}; +pub use stream::{Fuse, Scan, Stream, Take, Zip}; mod double_ended_stream; mod empty; diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs new file mode 100644 index 0000000..7cfe43a --- /dev/null +++ b/src/stream/stream/fuse.rs @@ -0,0 +1,54 @@ +use std::pin::Pin; + +use crate::task::{Context, Poll}; + +/// A `Stream` that is permanently closed +/// once a single call to `poll` results in +/// `Poll::Ready(None)`, returning `Poll::Ready(None)` +/// for all future calls to `poll`. +#[derive(Clone, Debug)] +pub struct Fuse { + stream: S, + done: bool, +} + +impl Unpin for Fuse {} + +impl Fuse { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(done: bool); + + /// Returns `true` if the underlying stream is fused. + /// + /// If this `Stream` is fused, all future calls to + /// `poll` will return `Poll::Ready(None)`. + pub fn is_done(&self) -> bool { + self.done + } + + /// Consumes this `Fuse` and returns the inner + /// `Stream`, unfusing it if it had become + /// fused. + pub fn into_inner(self) -> S + where + S: Sized, + { + self.stream + } +} + +impl futures::Stream for Fuse { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.done { + Poll::Ready(None) + } else { + let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + if next.is_none() { + *self.as_mut().done() = true; + } + Poll::Ready(next) + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index ca83fbb..f72b7bc 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -28,6 +28,7 @@ mod filter_map; mod find; mod find_map; mod fold; +mod fuse; mod min_by; mod next; mod nth; @@ -35,6 +36,7 @@ mod scan; mod take; mod zip; +pub use fuse::Fuse; pub use scan::Scan; pub use take::Take; pub use zip::Zip; @@ -246,6 +248,38 @@ pub trait Stream { Enumerate::new(self) } + /// Transforms this `Stream` into a "fused" `Stream` + /// such that after the first time `poll` returns + /// `Poll::Ready(None)`, all future calls to + /// `poll` will also return `Poll::Ready(None)`. + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat(9).take(3); + /// + /// while let Some(v) = s.next().await { + /// assert_eq!(v, 9); + /// } + /// # + /// # }) } + /// ``` + fn fuse(self) -> Fuse + where + Self: Sized, + { + Fuse { + stream: self, + done: false, + } + } + /// Both filters and maps a stream. /// /// # Examples