From 78c49f92b6307d0fbc504dbb6a9414ad87f93818 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 16 Aug 2019 15:35:04 +0200 Subject: [PATCH 1/4] Add initial Fuse implementation for Stream Signed-off-by: Yoshua Wuyts --- src/stream/mod.rs | 2 +- src/stream/stream/fuse.rs | 54 +++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 34 ++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 src/stream/stream/fuse.rs diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a08c827..ec1c23b 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -27,7 +27,7 @@ pub use from_stream::FromStream; pub use into_stream::IntoStream; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Scan, Stream, Take, Zip}; +pub use stream::{Fuse, Scan, Stream, Take, Zip}; mod double_ended_stream; mod empty; diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs new file mode 100644 index 0000000..7cfe43a --- /dev/null +++ b/src/stream/stream/fuse.rs @@ -0,0 +1,54 @@ +use std::pin::Pin; + +use crate::task::{Context, Poll}; + +/// A `Stream` that is permanently closed +/// once a single call to `poll` results in +/// `Poll::Ready(None)`, returning `Poll::Ready(None)` +/// for all future calls to `poll`. +#[derive(Clone, Debug)] +pub struct Fuse { + stream: S, + done: bool, +} + +impl Unpin for Fuse {} + +impl Fuse { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(done: bool); + + /// Returns `true` if the underlying stream is fused. + /// + /// If this `Stream` is fused, all future calls to + /// `poll` will return `Poll::Ready(None)`. + pub fn is_done(&self) -> bool { + self.done + } + + /// Consumes this `Fuse` and returns the inner + /// `Stream`, unfusing it if it had become + /// fused. + pub fn into_inner(self) -> S + where + S: Sized, + { + self.stream + } +} + +impl futures::Stream for Fuse { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.done { + Poll::Ready(None) + } else { + let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + if next.is_none() { + *self.as_mut().done() = true; + } + Poll::Ready(next) + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index ca83fbb..f72b7bc 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -28,6 +28,7 @@ mod filter_map; mod find; mod find_map; mod fold; +mod fuse; mod min_by; mod next; mod nth; @@ -35,6 +36,7 @@ mod scan; mod take; mod zip; +pub use fuse::Fuse; pub use scan::Scan; pub use take::Take; pub use zip::Zip; @@ -246,6 +248,38 @@ pub trait Stream { Enumerate::new(self) } + /// Transforms this `Stream` into a "fused" `Stream` + /// such that after the first time `poll` returns + /// `Poll::Ready(None)`, all future calls to + /// `poll` will also return `Poll::Ready(None)`. + /// + /// # Examples + /// + /// ``` + /// # #![feature(async_await)] + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat(9).take(3); + /// + /// while let Some(v) = s.next().await { + /// assert_eq!(v, 9); + /// } + /// # + /// # }) } + /// ``` + fn fuse(self) -> Fuse + where + Self: Sized, + { + Fuse { + stream: self, + done: false, + } + } + /// Both filters and maps a stream. /// /// # Examples From 44b3d3daddc5fbf71a56ad903625883c9a497bc0 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 16 Aug 2019 16:06:00 +0200 Subject: [PATCH 2/4] Remove irrelevant example Signed-off-by: Yoshua Wuyts --- src/stream/stream/fuse.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 7cfe43a..4d1a1b9 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -37,6 +37,7 @@ impl Fuse { } } + impl futures::Stream for Fuse { type Item = S::Item; From 7b4bb26c5c702b8d70d53febcefc616f98977184 Mon Sep 17 00:00:00 2001 From: Tyler Neely Date: Fri, 16 Aug 2019 16:12:02 +0200 Subject: [PATCH 3/4] Remove redundant Sized bound Signed-off-by: Yoshua Wuyts --- src/stream/stream/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index f72b7bc..5f7b900 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -270,10 +270,7 @@ pub trait Stream { /// # /// # }) } /// ``` - fn fuse(self) -> Fuse - where - Self: Sized, - { + fn fuse(self) -> Fuse { Fuse { stream: self, done: false, From aa94d450d6109a7a26ec2eeefde230db37ae9e41 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 18 Sep 2019 00:00:30 +0200 Subject: [PATCH 4/4] update stream::fuse Signed-off-by: Yoshua Wuyts --- src/stream/stream/fuse.rs | 38 ++++++++------------------------------ src/stream/stream/mod.rs | 22 +++++++++++----------- 2 files changed, 19 insertions(+), 41 deletions(-) diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 4d1a1b9..3541937 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -1,51 +1,29 @@ use std::pin::Pin; +use std::task::{Context, Poll}; -use crate::task::{Context, Poll}; - -/// A `Stream` that is permanently closed -/// once a single call to `poll` results in -/// `Poll::Ready(None)`, returning `Poll::Ready(None)` -/// for all future calls to `poll`. +/// A `Stream` that is permanently closed once a single call to `poll` results in +/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`. #[derive(Clone, Debug)] pub struct Fuse { - stream: S, - done: bool, + pub(crate) stream: S, + pub(crate) done: bool, } impl Unpin for Fuse {} -impl Fuse { +impl Fuse { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(done: bool); - - /// Returns `true` if the underlying stream is fused. - /// - /// If this `Stream` is fused, all future calls to - /// `poll` will return `Poll::Ready(None)`. - pub fn is_done(&self) -> bool { - self.done - } - - /// Consumes this `Fuse` and returns the inner - /// `Stream`, unfusing it if it had become - /// fused. - pub fn into_inner(self) -> S - where - S: Sized, - { - self.stream - } } - -impl futures::Stream for Fuse { +impl futures_core::Stream for Fuse { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.done { Poll::Ready(None) } else { - let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); if next.is_none() { *self.as_mut().done() = true; } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 5f7b900..e13e2eb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -248,29 +248,29 @@ pub trait Stream { Enumerate::new(self) } - /// Transforms this `Stream` into a "fused" `Stream` - /// such that after the first time `poll` returns - /// `Poll::Ready(None)`, all future calls to - /// `poll` will also return `Poll::Ready(None)`. + /// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll` + /// returns `Poll::Ready(None)`, all future calls to `poll` will also return + /// `Poll::Ready(None)`. /// /// # Examples /// /// ``` - /// # #![feature(async_await)] /// # fn main() { async_std::task::block_on(async { /// # /// use async_std::prelude::*; /// use async_std::stream; /// - /// let mut s = stream::repeat(9).take(3); - /// - /// while let Some(v) = s.next().await { - /// assert_eq!(v, 9); - /// } + /// let mut s = stream::once(1).fuse(); + /// assert_eq!(s.next().await, Some(1)); + /// assert_eq!(s.next().await, None); + /// assert_eq!(s.next().await, None); /// # /// # }) } /// ``` - fn fuse(self) -> Fuse { + fn fuse(self) -> Fuse + where + Self: Sized, + { Fuse { stream: self, done: false,