From 2a2a473889f8a9905016cedb555c76cbcc0c3911 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Sun, 22 Sep 2019 09:46:26 +0300 Subject: [PATCH] adds stream::chain combinator --- src/stream/stream/chain.rs | 50 ++++++++++++++++++++++++++++++++++++++ src/stream/stream/fuse.rs | 8 +++--- src/stream/stream/mod.rs | 35 ++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 3 deletions(-) create mode 100644 src/stream/stream/chain.rs diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs new file mode 100644 index 0000000..080d9b4 --- /dev/null +++ b/src/stream/stream/chain.rs @@ -0,0 +1,50 @@ +use std::pin::Pin; + +use super::fuse::Fuse; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +/// Chains two streams one after another. +#[derive(Debug)] +pub struct Chain { + first: Fuse, + second: Fuse, +} + +impl Chain { + pin_utils::unsafe_pinned!(first: Fuse); + pin_utils::unsafe_pinned!(second: Fuse); + + pub(super) fn new(first: S, second: U) -> Self { + Chain { + first: first.fuse(), + second: second.fuse(), + } + } +} + +impl> Stream for Chain { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if !self.first.done { + let next = futures_core::ready!(self.as_mut().first().poll_next(cx)); + if let Some(next) = next { + return Poll::Ready(Some(next)); + } + } + + if !self.second.done { + let next = futures_core::ready!(self.as_mut().second().poll_next(cx)); + if let Some(next) = next { + return Poll::Ready(Some(next)); + } + } + + if self.first.done && self.second.done { + return Poll::Ready(None); + } + + Poll::Pending + } +} diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 3541937..ff5bdab 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -1,5 +1,7 @@ use std::pin::Pin; -use std::task::{Context, Poll}; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; /// A `Stream` that is permanently closed once a single call to `poll` results in /// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`. @@ -11,12 +13,12 @@ pub struct Fuse { impl Unpin for Fuse {} -impl Fuse { +impl Fuse { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(done: bool); } -impl futures_core::Stream for Fuse { +impl Stream for Fuse { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 65992eb..0979d82 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -23,6 +23,7 @@ mod all; mod any; +mod chain; mod enumerate; mod filter; mod filter_map; @@ -41,6 +42,7 @@ mod step_by; mod take; mod zip; +pub use chain::Chain; pub use filter::Filter; pub use fuse::Fuse; pub use inspect::Inspect; @@ -268,6 +270,39 @@ pub trait Stream { StepBy::new(self, step) } + /// Takes two streams and creates a new stream over both in sequence. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let first: VecDeque<_> = vec![0u8, 1].into_iter().collect(); + /// let second: VecDeque<_> = vec![2, 3].into_iter().collect(); + /// let mut c = first.chain(second); + /// + /// assert_eq!(c.next().await, Some(0)); + /// assert_eq!(c.next().await, Some(1)); + /// assert_eq!(c.next().await, Some(2)); + /// assert_eq!(c.next().await, Some(3)); + /// assert_eq!(c.next().await, None); + /// + /// # + /// # }) } + /// ``` + fn chain(self, other: U) -> Chain + where + Self: Sized, + U: Stream + Sized, + { + Chain::new(self, other) + } + /// Creates a stream that gives the current element's count as well as the next value. /// /// # Overflow behaviour.