From a5a6dc24c4cfd8ab2025890b29349382519acfb8 Mon Sep 17 00:00:00 2001 From: Wonwoo Choi Date: Wed, 18 Sep 2019 12:00:46 +0900 Subject: [PATCH 1/2] Add stream::Extend --- src/stream/extend.rs | 50 ++++++++++++++++++++++++++++++++++++++++++++ src/stream/mod.rs | 2 ++ src/vec/extend.rs | 19 +++++++++++++++++ src/vec/mod.rs | 1 + 4 files changed, 72 insertions(+) create mode 100644 src/stream/extend.rs create mode 100644 src/vec/extend.rs diff --git a/src/stream/extend.rs b/src/stream/extend.rs new file mode 100644 index 00000000..8e1c963a --- /dev/null +++ b/src/stream/extend.rs @@ -0,0 +1,50 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::{IntoStream, Stream}; + +/// Extend a collection with the contents of a stream. +/// +/// Streams produce a series of values asynchronously, and collections can also be thought of as a +/// series of values. The `Extend` trait bridges this gap, allowing you to extend a collection +/// asynchronously by including the contents of that stream. When extending a collection with an +/// already existing key, that entry is updated or, in the case of collections that permit multiple +/// entries with equal keys, that entry is inserted. +/// +/// ## Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::prelude::*; +/// use async_std::stream::{self, Extend}; +/// +/// let mut v: Vec = vec![1, 2]; +/// let s = stream::repeat(3usize).take(3); +/// v.extend_with_stream(s).await; +/// +/// assert_eq!(v, vec![1, 2, 3, 3, 3]); +/// # +/// # }) } +/// ``` +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub trait Extend { + /// Extends a collection with the contents of a stream. + fn extend_with_stream<'a, T: IntoStream + 'a>( + &'a mut self, + stream: T, + ) -> Pin + 'a>>; +} + +impl Extend<()> for () { + fn extend_with_stream<'a, T: IntoStream + 'a>( + &'a mut self, + stream: T, + ) -> Pin + 'a>> { + let stream = stream.into_stream(); + Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(_) = stream.next().await {} + }) + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 36cf3a4e..6393afbe 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -36,10 +36,12 @@ mod stream; cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { mod double_ended_stream; + mod extend; mod from_stream; mod into_stream; pub use double_ended_stream::DoubleEndedStream; + pub use extend::Extend; pub use from_stream::FromStream; pub use into_stream::IntoStream; } diff --git a/src/vec/extend.rs b/src/vec/extend.rs new file mode 100644 index 00000000..33df5a90 --- /dev/null +++ b/src/vec/extend.rs @@ -0,0 +1,19 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::{Extend, IntoStream, Stream}; + +impl Extend for Vec { + fn extend_with_stream<'a, S: IntoStream + 'a>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> { + let stream = stream.into_stream(); + Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(item) = stream.next().await { + self.push(item); + } + }) + } +} diff --git a/src/vec/mod.rs b/src/vec/mod.rs index 725fc8f7..77a0b746 100644 --- a/src/vec/mod.rs +++ b/src/vec/mod.rs @@ -3,6 +3,7 @@ //! This library provides smart pointers and collections for managing //! heap-allocated values. +mod extend; mod from_stream; #[doc(inline)] From 9c00d0b90391da247f138767a2d625f0c5bf99c4 Mon Sep 17 00:00:00 2001 From: Wonwoo Choi Date: Thu, 19 Sep 2019 18:34:31 +0900 Subject: [PATCH 2/2] Rename: extend_with_stream => stream_extend --- src/stream/extend.rs | 6 +++--- src/vec/extend.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/stream/extend.rs b/src/stream/extend.rs index 8e1c963a..35c90644 100644 --- a/src/stream/extend.rs +++ b/src/stream/extend.rs @@ -21,7 +21,7 @@ use crate::stream::{IntoStream, Stream}; /// /// let mut v: Vec = vec![1, 2]; /// let s = stream::repeat(3usize).take(3); -/// v.extend_with_stream(s).await; +/// v.stream_extend(s).await; /// /// assert_eq!(v, vec![1, 2, 3, 3, 3]); /// # @@ -30,14 +30,14 @@ use crate::stream::{IntoStream, Stream}; #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait Extend { /// Extends a collection with the contents of a stream. - fn extend_with_stream<'a, T: IntoStream + 'a>( + fn stream_extend<'a, T: IntoStream + 'a>( &'a mut self, stream: T, ) -> Pin + 'a>>; } impl Extend<()> for () { - fn extend_with_stream<'a, T: IntoStream + 'a>( + fn stream_extend<'a, T: IntoStream + 'a>( &'a mut self, stream: T, ) -> Pin + 'a>> { diff --git a/src/vec/extend.rs b/src/vec/extend.rs index 33df5a90..2ebcf344 100644 --- a/src/vec/extend.rs +++ b/src/vec/extend.rs @@ -4,7 +4,7 @@ use crate::future::Future; use crate::stream::{Extend, IntoStream, Stream}; impl Extend for Vec { - fn extend_with_stream<'a, S: IntoStream + 'a>( + fn stream_extend<'a, S: IntoStream + 'a>( &'a mut self, stream: S, ) -> Pin + 'a>> {