From 79bbf4938deeef223f405e1756945fea45965986 Mon Sep 17 00:00:00 2001 From: razican Date: Mon, 11 Nov 2019 10:44:12 +0100 Subject: [PATCH 1/6] Randomize Stream::merge to improve the throughput. Implements #490. --- src/stream/stream/merge.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index fe3579e..ababcf4 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -27,7 +27,10 @@ pin_project! { impl Merge { pub(crate) fn new(left: L, right: R) -> Self { - Self { left: left.fuse(), right: right.fuse() } + Self { + left: left.fuse(), + right: right.fuse(), + } } } @@ -40,14 +43,19 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - match this.left.poll_next(cx) { + let (first, second) = if (utils::random(1) == 1) { + (this.left, this.right) + } else { + (this.right, this.left) + }; + match first.poll_next(cx) { Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => this.right.poll_next(cx), - Poll::Pending => match this.right.poll_next(cx) { + Poll::Ready(None) => second.poll_next(cx), + Poll::Pending => match second.poll_next(cx) { Poll::Ready(Some(item)) => Poll::Ready(Some(item)), Poll::Ready(None) => Poll::Pending, Poll::Pending => Poll::Pending, - } + }, } } } From 0c37d4af106487f575d5fbd6f9a764ed25418c5f Mon Sep 17 00:00:00 2001 From: razican Date: Mon, 11 Nov 2019 11:25:50 +0100 Subject: [PATCH 2/6] Anonymous function to avoid type issues --- src/stream/stream/merge.rs | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index ababcf4..6c8c20b 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -5,6 +5,7 @@ use pin_project_lite::pin_project; use crate::prelude::*; use crate::stream::Fuse; +use crate::utils; pin_project! { /// A stream that merges two other streams into a single stream. @@ -43,19 +44,27 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - let (first, second) = if (utils::random(1) == 1) { - (this.left, this.right) + if utils::random(1) == 1 { + poll_next_in_order(cx, this.left, this.right) } else { - (this.right, this.left) - }; - match first.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => second.poll_next(cx), - Poll::Pending => match second.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => Poll::Pending, - Poll::Pending => Poll::Pending, - }, + poll_next_in_order(cx, this.right, this.left) } } } + +/// Pools the next item, trying in order, first the first item, then the second one. +fn poll_next_in_order(cx: &mut Context<'_>, first: F, second: S) -> Poll> +where + F: Stream, + S: Stream, +{ + match first.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => second.poll_next(cx), + Poll::Pending => match second.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => Poll::Pending, + Poll::Pending => Poll::Pending, + }, + } +} From e48e4637361c37cc18d447c5ec3fc2eb135c6fd5 Mon Sep 17 00:00:00 2001 From: razican Date: Mon, 11 Nov 2019 11:26:32 +0100 Subject: [PATCH 3/6] Duplicating code due to strange Rust error. --- src/stream/stream/merge.rs | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index 6c8c20b..b08b586 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -45,26 +45,25 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); if utils::random(1) == 1 { - poll_next_in_order(cx, this.left, this.right) + match this.left.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => this.right.poll_next(cx), + Poll::Pending => match this.right.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => Poll::Pending, + Poll::Pending => Poll::Pending, + }, + } } else { - poll_next_in_order(cx, this.right, this.left) + match this.right.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => this.left.poll_next(cx), + Poll::Pending => match this.left.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => Poll::Pending, + Poll::Pending => Poll::Pending, + }, + } } } } - -/// Pools the next item, trying in order, first the first item, then the second one. -fn poll_next_in_order(cx: &mut Context<'_>, first: F, second: S) -> Poll> -where - F: Stream, - S: Stream, -{ - match first.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => second.poll_next(cx), - Poll::Pending => match second.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => Poll::Pending, - Poll::Pending => Poll::Pending, - }, - } -} From 5d558ca213327f465feb60ac96ad9ae8421002e2 Mon Sep 17 00:00:00 2001 From: razican Date: Mon, 11 Nov 2019 11:39:30 +0100 Subject: [PATCH 4/6] Fixed test, order is no longer guaranteed --- src/stream/stream/mod.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8ea1459..7c4bceb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1670,11 +1670,14 @@ extension_trait! { let c = stream::once(3u8); let mut s = a.merge(b).merge(c); + let mut lst = Vec::new(); - assert_eq!(s.next().await, Some(1u8)); - assert_eq!(s.next().await, Some(2u8)); - assert_eq!(s.next().await, Some(3u8)); - assert_eq!(s.next().await, None); + while let Some(n) = s.next().await { + lst.push(n) + } + + lst.sort_unstable(); + assert_eq!(&lst, &[1u8, 2u8, 3u8]); # }); ``` "#] From f6829859fef56585498938f99ca2cec731abd1c1 Mon Sep 17 00:00:00 2001 From: Razican Date: Mon, 18 Nov 2019 16:39:21 +0100 Subject: [PATCH 5/6] Fixed deduplication of code --- src/stream/stream/merge.rs | 40 +++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index b08b586..bdf7a29 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -45,25 +45,29 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); if utils::random(1) == 1 { - match this.left.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => this.right.poll_next(cx), - Poll::Pending => match this.right.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => Poll::Pending, - Poll::Pending => Poll::Pending, - }, - } + poll_next_in_order(this.left, this.right, cx) } else { - match this.right.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => this.left.poll_next(cx), - Poll::Pending => match this.left.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => Poll::Pending, - Poll::Pending => Poll::Pending, - }, - } + poll_next_in_order(this.right, this.left, cx) } } } + +fn poll_next_in_order( + first: Pin<&mut F>, + second: Pin<&mut S>, + cx: &mut Context<'_>, +) -> Poll> +where + F: Stream, + S: Stream, +{ + match first.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => second.poll_next(cx), + Poll::Pending => match second.poll_next(cx) { + Poll::Ready(Some(item)) => Poll::Ready(Some(item)), + Poll::Ready(None) => Poll::Pending, + Poll::Pending => Poll::Pending, + }, + } +} From 72ca2c1a24ea535752401fbfb9628f30ec209efe Mon Sep 17 00:00:00 2001 From: razican Date: Tue, 19 Nov 2019 21:14:56 +0100 Subject: [PATCH 6/6] Improved the code with some minor changes --- src/stream/stream/merge.rs | 7 +++---- src/stream/stream/mod.rs | 10 +++------- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index bdf7a29..d9d2b0a 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -62,12 +62,11 @@ where S: Stream, { match first.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), Poll::Ready(None) => second.poll_next(cx), + Poll::Ready(item) => Poll::Ready(item), Poll::Pending => match second.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) => Poll::Pending, - Poll::Pending => Poll::Pending, + Poll::Ready(None) | Poll::Pending => Poll::Pending, + Poll::Ready(item) => Poll::Ready(item), }, } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 7c4bceb..223aea5 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1663,18 +1663,14 @@ extension_trait! { ``` # async_std::task::block_on(async { use async_std::prelude::*; - use async_std::stream; + use async_std::stream::{self, FromStream}; let a = stream::once(1u8); let b = stream::once(2u8); let c = stream::once(3u8); - let mut s = a.merge(b).merge(c); - let mut lst = Vec::new(); - - while let Some(n) = s.next().await { - lst.push(n) - } + let s = a.merge(b).merge(c); + let mut lst = Vec::from_stream(s).await; lst.sort_unstable(); assert_eq!(&lst, &[1u8, 2u8, 3u8]);