2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-02-07 05:05:32 +00:00

Randomize Stream::merge to improve the throughput. Implements #490.

This commit is contained in:
razican 2019-11-11 10:44:12 +01:00
parent 417b548692
commit 79bbf4938d
No known key found for this signature in database
GPG key ID: 76E895FB1EDE827C

View file

@ -27,7 +27,10 @@ pin_project! {
impl<L: Stream, R: Stream> Merge<L, R> { impl<L: Stream, R: Stream> Merge<L, R> {
pub(crate) fn new(left: L, right: R) -> Self { pub(crate) fn new(left: L, right: R) -> Self {
Self { left: left.fuse(), right: right.fuse() } Self {
left: left.fuse(),
right: right.fuse(),
}
} }
} }
@ -40,14 +43,19 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project(); let this = self.project();
match this.left.poll_next(cx) { let (first, second) = if (utils::random(1) == 1) {
(this.left, this.right)
} else {
(this.right, this.left)
};
match first.poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)), Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => this.right.poll_next(cx), Poll::Ready(None) => second.poll_next(cx),
Poll::Pending => match this.right.poll_next(cx) { Poll::Pending => match second.poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)), Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Pending, Poll::Ready(None) => Poll::Pending,
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
} },
} }
} }
} }