From a5a6dc24c4cfd8ab2025890b29349382519acfb8 Mon Sep 17 00:00:00 2001 From: Wonwoo Choi Date: Wed, 18 Sep 2019 12:00:46 +0900 Subject: [PATCH] Add stream::Extend --- src/stream/extend.rs | 50 ++++++++++++++++++++++++++++++++++++++++++++ src/stream/mod.rs | 2 ++ src/vec/extend.rs | 19 +++++++++++++++++ src/vec/mod.rs | 1 + 4 files changed, 72 insertions(+) create mode 100644 src/stream/extend.rs create mode 100644 src/vec/extend.rs diff --git a/src/stream/extend.rs b/src/stream/extend.rs new file mode 100644 index 0000000..8e1c963 --- /dev/null +++ b/src/stream/extend.rs @@ -0,0 +1,50 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::{IntoStream, Stream}; + +/// Extend a collection with the contents of a stream. +/// +/// Streams produce a series of values asynchronously, and collections can also be thought of as a +/// series of values. The `Extend` trait bridges this gap, allowing you to extend a collection +/// asynchronously by including the contents of that stream. When extending a collection with an +/// already existing key, that entry is updated or, in the case of collections that permit multiple +/// entries with equal keys, that entry is inserted. +/// +/// ## Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream::{self, Extend}; +/// +/// let mut v: Vec = vec![1, 2]; +/// let s = stream::repeat(3usize).take(3); +/// v.extend_with_stream(s).await; +/// +/// assert_eq!(v, vec![1, 2, 3, 3, 3]); +/// # +/// # }) } +/// ``` +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub trait Extend { + /// Extends a collection with the contents of a stream. + fn extend_with_stream<'a, T: IntoStream + 'a>( + &'a mut self, + stream: T, + ) -> Pin + 'a>>; +} + +impl Extend<()> for () { + fn extend_with_stream<'a, T: IntoStream + 'a>( + &'a mut self, + stream: T, + ) -> Pin + 'a>> { + let stream = stream.into_stream(); + Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(_) = stream.next().await {} + }) + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 36cf3a4..6393afb 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -36,10 +36,12 @@ mod stream; cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { mod double_ended_stream; + mod extend; mod from_stream; mod into_stream; pub use double_ended_stream::DoubleEndedStream; + pub use extend::Extend; pub use from_stream::FromStream; pub use into_stream::IntoStream; } diff --git a/src/vec/extend.rs b/src/vec/extend.rs new file mode 100644 index 0000000..33df5a9 --- /dev/null +++ b/src/vec/extend.rs @@ -0,0 +1,19 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::{Extend, IntoStream, Stream}; + +impl Extend for Vec { + fn extend_with_stream<'a, S: IntoStream + 'a>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> { + let stream = stream.into_stream(); + Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(item) = stream.next().await { + self.push(item); + } + }) + } +} diff --git a/src/vec/mod.rs b/src/vec/mod.rs index 725fc8f..77a0b74 100644 --- a/src/vec/mod.rs +++ b/src/vec/mod.rs @@ -3,6 +3,7 @@ //! This library provides smart pointers and collections for managing //! heap-allocated values. +mod extend; mod from_stream; #[doc(inline)]