From aa94d450d6109a7a26ec2eeefde230db37ae9e41 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 18 Sep 2019 00:00:30 +0200 Subject: [PATCH] update stream::fuse Signed-off-by: Yoshua Wuyts --- src/stream/stream/fuse.rs | 38 ++++++++------------------------------ src/stream/stream/mod.rs | 22 +++++++++++----------- 2 files changed, 19 insertions(+), 41 deletions(-) diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 4d1a1b9a..35419370 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -1,51 +1,29 @@ use std::pin::Pin; +use std::task::{Context, Poll}; -use crate::task::{Context, Poll}; - -/// A `Stream` that is permanently closed -/// once a single call to `poll` results in -/// `Poll::Ready(None)`, returning `Poll::Ready(None)` -/// for all future calls to `poll`. +/// A `Stream` that is permanently closed once a single call to `poll` results in +/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`. #[derive(Clone, Debug)] pub struct Fuse { - stream: S, - done: bool, + pub(crate) stream: S, + pub(crate) done: bool, } impl Unpin for Fuse {} -impl Fuse { +impl Fuse { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(done: bool); - - /// Returns `true` if the underlying stream is fused. - /// - /// If this `Stream` is fused, all future calls to - /// `poll` will return `Poll::Ready(None)`. - pub fn is_done(&self) -> bool { - self.done - } - - /// Consumes this `Fuse` and returns the inner - /// `Stream`, unfusing it if it had become - /// fused. - pub fn into_inner(self) -> S - where - S: Sized, - { - self.stream - } } - -impl futures::Stream for Fuse { +impl futures_core::Stream for Fuse { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.done { Poll::Ready(None) } else { - let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); if next.is_none() { *self.as_mut().done() = true; } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 5f7b9008..e13e2ebc 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -248,29 +248,29 @@ pub trait Stream { Enumerate::new(self) } - /// Transforms this `Stream` into a "fused" `Stream` - /// such that after the first time `poll` returns - /// `Poll::Ready(None)`, all future calls to - /// `poll` will also return `Poll::Ready(None)`. + /// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll` + /// returns `Poll::Ready(None)`, all future calls to `poll` will also return + /// `Poll::Ready(None)`. /// /// # Examples /// /// ``` - /// # #![feature(async_await)] /// # fn main() { async_std::task::block_on(async { /// # /// use async_std::prelude::*; /// use async_std::stream; /// - /// let mut s = stream::repeat(9).take(3); - /// - /// while let Some(v) = s.next().await { - /// assert_eq!(v, 9); - /// } + /// let mut s = stream::once(1).fuse(); + /// assert_eq!(s.next().await, Some(1)); + /// assert_eq!(s.next().await, None); + /// assert_eq!(s.next().await, None); /// # /// # }) } /// ``` - fn fuse(self) -> Fuse { + fn fuse(self) -> Fuse + where + Self: Sized, + { Fuse { stream: self, done: false,