From 4942dc7f9fd7862f2dc3363f254c8f5065ae18d9 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Sun, 3 Nov 2019 19:19:52 +0800 Subject: [PATCH 1/5] Add Stream cloned --- src/stream/stream/cloned.rs | 33 +++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 37 ++++++++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 src/stream/stream/cloned.rs diff --git a/src/stream/stream/cloned.rs b/src/stream/stream/cloned.rs new file mode 100644 index 00000000..779e49d1 --- /dev/null +++ b/src/stream/stream/cloned.rs @@ -0,0 +1,33 @@ +use crate::stream::Stream; +use crate::task::{Context, Poll}; +use pin_project_lite::pin_project; +use std::pin::Pin; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct Cloned { + #[pin] + stream: S, + } +} + +impl Cloned { + pub(super) fn new(stream: S) -> Self { + Self { stream } + } +} + +impl<'a, S, T: 'a> Stream for Cloned +where + S: Stream, + T: Clone, +{ + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + Poll::Ready(next.cloned()) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e182c033..9595f80f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -24,6 +24,7 @@ mod all; mod any; mod chain; +mod cloned; mod cmp; mod copied; mod enumerate; @@ -91,6 +92,7 @@ use try_fold::TryFoldFuture; use try_for_each::TryForEachFuture; pub use chain::Chain; +pub use cloned::Cloned; pub use copied::Copied; pub use filter::Filter; pub use fuse::Fuse; @@ -373,6 +375,40 @@ extension_trait! { Chain::new(self, other) } + #[doc = r#" + Creates an stream which copies all of its elements. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let v: VecDeque<_> = vec![&1, &2, &3].into_iter().collect(); + + let mut v_cloned = v.cloned(); + + assert_eq!(v_cloned.next().await, Some(1)); + assert_eq!(v_cloned.next().await, Some(2)); + assert_eq!(v_cloned.next().await, Some(3)); + assert_eq!(v_cloned.next().await, None); + + # + # }) } + ``` + "#] + fn cloned<'a,T>(self) -> Cloned + where + Self: Sized + Stream, + T : 'a + Clone, + { + Cloned::new(self) + } + #[doc = r#" Creates an stream which copies all of its elements. @@ -395,7 +431,6 @@ extension_trait! { assert_eq!(v_copied.next().await, Some(2)); assert_eq!(v_copied.next().await, Some(3)); assert_eq!(v_copied.next().await, None); - # # }) } From bf0cd5987aeb536a71ca09cdfbde7e01ffbbac6d Mon Sep 17 00:00:00 2001 From: yjh Date: Mon, 4 Nov 2019 11:49:43 +0800 Subject: [PATCH 2/5] Update src/stream/stream/cloned.rs Co-Authored-By: nasa --- src/stream/stream/cloned.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream/stream/cloned.rs b/src/stream/stream/cloned.rs index 779e49d1..1d7eef47 100644 --- a/src/stream/stream/cloned.rs +++ b/src/stream/stream/cloned.rs @@ -4,7 +4,6 @@ use pin_project_lite::pin_project; use std::pin::Pin; pin_project! { - #[doc(hidden)] #[allow(missing_debug_implementations)] pub struct Cloned { #[pin] From 8bef812e78ab836491e904f2500f0c950e93ac2e Mon Sep 17 00:00:00 2001 From: yjh Date: Mon, 4 Nov 2019 11:49:50 +0800 Subject: [PATCH 3/5] Update src/stream/stream/cloned.rs Co-Authored-By: nasa --- src/stream/stream/cloned.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/cloned.rs b/src/stream/stream/cloned.rs index 1d7eef47..19dfbc87 100644 --- a/src/stream/stream/cloned.rs +++ b/src/stream/stream/cloned.rs @@ -4,7 +4,7 @@ use pin_project_lite::pin_project; use std::pin::Pin; pin_project! { - #[allow(missing_debug_implementations)] + #[derive(Debug)] pub struct Cloned { #[pin] stream: S, From a35602f375851fa82cdd0ffd73feea1ff06d4287 Mon Sep 17 00:00:00 2001 From: yjh Date: Tue, 5 Nov 2019 21:08:56 +0800 Subject: [PATCH 4/5] Update mod.rs --- src/stream/stream/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 01357b28..7112c381 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -387,10 +387,10 @@ extension_trait! { use async_std::prelude::*; use std::collections::VecDeque; - let v: VecDeque<_> = vec![&1, &2, &3].into_iter().collect(); - + let v = stream::from_iter(vec![&1, &2, &3]); + let mut v_cloned = v.cloned(); - + assert_eq!(v_cloned.next().await, Some(1)); assert_eq!(v_cloned.next().await, Some(2)); assert_eq!(v_cloned.next().await, Some(3)); From 5179f30d2d8adc39259e4dfee810d14c0c02a698 Mon Sep 17 00:00:00 2001 From: yjh Date: Tue, 5 Nov 2019 21:15:33 +0800 Subject: [PATCH 5/5] use async_std::stream --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 7112c381..12b258d6 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -385,7 +385,7 @@ extension_trait! { # fn main() { async_std::task::block_on(async { # use async_std::prelude::*; - use std::collections::VecDeque; + use async_std::stream; let v = stream::from_iter(vec![&1, &2, &3]);