From 303ac90b7c3b22dbc5d04395d485d893ddd58d6c Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Fri, 7 Feb 2020 22:09:42 +0300 Subject: [PATCH 01/10] Fixed `flat_map` --- src/stream/stream/flat_map.rs | 17 ++++++--- tests/stream.rs | 70 +++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs index 6c828c9..8d5a12f 100644 --- a/src/stream/stream/flat_map.rs +++ b/src/stream/stream/flat_map.rs @@ -51,14 +51,21 @@ where let mut this = self.project(); loop { if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { - if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) { - return Poll::Ready(item); + let next_item = futures_core::ready!(inner.poll_next(cx)); + + if next_item.is_some() { + return Poll::Ready(next_item); + } else { + this.inner_stream.set(None); } } - match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { - None => return Poll::Ready(None), - Some(inner) => this.inner_stream.set(Some(inner.into_stream())), + let inner = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + if inner.is_some() { + this.inner_stream.set(inner.map(IntoStream::into_stream)); + } else { + return Poll::Ready(None); } } } diff --git a/tests/stream.rs b/tests/stream.rs index 42a6191..75c1b10 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -98,3 +98,73 @@ fn merge_works_with_unfused_streams() { }); assert_eq!(xs, vec![92, 92]); } + +#[test] +fn flat_map_doesnt_poll_completed_inner_stream() { + async_std::task::block_on(async { + use async_std::prelude::*; + use async_std::task::*; + use std::convert::identity; + use std::marker::Unpin; + use std::pin::Pin; + + struct S(T); + + impl Stream for S { + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + unsafe { Pin::new_unchecked(&mut self.0) }.poll_next(ctx) + } + } + + struct StrictOnce { + polled: bool, + }; + + impl Stream for StrictOnce { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll> { + if !self.polled { + self.polled = true; + Poll::Ready(None) + } else { + panic!("Polled after completion!"); + } + } + } + + struct Interchanger { + polled: bool, + }; + + impl Stream for Interchanger { + type Item = S + Unpin>>; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + if self.polled { + let waker = ctx.waker().clone(); + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(10)); + waker.wake_by_ref(); + }); + self.polled = false; + Poll::Pending + } else { + self.polled = true; + Poll::Ready(Some(S(Box::new(StrictOnce { polled: false })))) + } + } + } + + assert_eq!( + Interchanger { polled: false } + .take(2) + .flat_map(identity) + .count() + .await, + 0 + ); + }); +} From c80915e216de0c8820a8d58efda04b98b07a38c3 Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Fri, 7 Feb 2020 22:22:38 +0300 Subject: [PATCH 02/10] Dont spawn thread in tests --- tests/stream.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/stream.rs b/tests/stream.rs index 75c1b10..fec1466 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -144,12 +144,8 @@ fn flat_map_doesnt_poll_completed_inner_stream() { fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { if self.polled { - let waker = ctx.waker().clone(); - std::thread::spawn(move || { - std::thread::sleep(std::time::Duration::from_millis(10)); - waker.wake_by_ref(); - }); self.polled = false; + ctx.waker().wake_by_ref(); Poll::Pending } else { self.polled = true; From b68be72763931033dc2917838f7e0e84349e401e Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Fri, 7 Feb 2020 22:42:59 +0300 Subject: [PATCH 03/10] Use `assert` instead of `panic` --- tests/stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/stream.rs b/tests/stream.rs index fec1466..58a441c 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -130,7 +130,7 @@ fn flat_map_doesnt_poll_completed_inner_stream() { self.polled = true; Poll::Ready(None) } else { - panic!("Polled after completion!"); + assert!(false, "Polled after completion!"); } } } From 85c32ef9d2fc661d1f951a5297a65298bd1842ce Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Fri, 7 Feb 2020 22:45:15 +0300 Subject: [PATCH 04/10] Use `assert` without `if`-clause --- tests/stream.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/stream.rs b/tests/stream.rs index 58a441c..e80fc99 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -126,12 +126,9 @@ fn flat_map_doesnt_poll_completed_inner_stream() { type Item = (); fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll> { - if !self.polled { - self.polled = true; - Poll::Ready(None) - } else { - assert!(false, "Polled after completion!"); - } + assert!(!self.polled, "Polled after completion!"); + self.polled = true; + Poll::Ready(None) } } From 32068942a6130d12a7152706ae6e24637377339d Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Sat, 8 Feb 2020 15:41:33 +0300 Subject: [PATCH 05/10] Fixed `flatten` --- src/stream/stream/flat_map.rs | 2 +- src/stream/stream/flatten.rs | 21 +++++--- tests/stream.rs | 96 +++++++++++++++++++---------------- 3 files changed, 68 insertions(+), 51 deletions(-) diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs index 8d5a12f..f9ceb86 100644 --- a/src/stream/stream/flat_map.rs +++ b/src/stream/stream/flat_map.rs @@ -69,4 +69,4 @@ where } } } -} +} \ No newline at end of file diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 1d6fcae..d0e0d20 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,5 +1,5 @@ -use std::fmt; -use std::pin::Pin; +use core::fmt; +use core::pin::Pin; use pin_project_lite::pin_project; @@ -52,14 +52,21 @@ where let mut this = self.project(); loop { if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { - if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) { - return Poll::Ready(item); + let next_item = futures_core::ready!(inner.poll_next(cx)); + + if next_item.is_some() { + return Poll::Ready(next_item); + } else { + this.inner_stream.set(None); } } - match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { - None => return Poll::Ready(None), - Some(inner) => this.inner_stream.set(Some(inner.into_stream())), + let inner = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + if inner.is_some() { + this.inner_stream.set(inner.map(IntoStream::into_stream)); + } else { + return Poll::Ready(None); } } } diff --git a/tests/stream.rs b/tests/stream.rs index e80fc99..210ceae 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -1,3 +1,5 @@ +use std::convert::identity; +use std::marker::Unpin; use std::pin::Pin; use std::task::{Context, Poll}; @@ -99,58 +101,52 @@ fn merge_works_with_unfused_streams() { assert_eq!(xs, vec![92, 92]); } -#[test] -fn flat_map_doesnt_poll_completed_inner_stream() { - async_std::task::block_on(async { - use async_std::prelude::*; - use async_std::task::*; - use std::convert::identity; - use std::marker::Unpin; - use std::pin::Pin; +struct S(T); - struct S(T); +impl Stream for S { + type Item = T::Item; - impl Stream for S { - type Item = T::Item; + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + unsafe { Pin::new_unchecked(&mut self.0) }.poll_next(ctx) + } +} - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { - unsafe { Pin::new_unchecked(&mut self.0) }.poll_next(ctx) - } - } +struct StrictOnce { + polled: bool, +} - struct StrictOnce { - polled: bool, - }; +impl Stream for StrictOnce { + type Item = (); - impl Stream for StrictOnce { - type Item = (); + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll> { + assert!(!self.polled, "Polled after completion!"); + self.polled = true; + Poll::Ready(None) + } +} - fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll> { - assert!(!self.polled, "Polled after completion!"); - self.polled = true; - Poll::Ready(None) - } - } +struct Interchanger { + polled: bool, +} - struct Interchanger { - polled: bool, - }; - - impl Stream for Interchanger { - type Item = S + Unpin>>; - - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { - if self.polled { - self.polled = false; - ctx.waker().wake_by_ref(); - Poll::Pending - } else { - self.polled = true; - Poll::Ready(Some(S(Box::new(StrictOnce { polled: false })))) - } - } +impl Stream for Interchanger { + type Item = S + Unpin>>; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll> { + if self.polled { + self.polled = false; + ctx.waker().wake_by_ref(); + Poll::Pending + } else { + self.polled = true; + Poll::Ready(Some(S(Box::new(StrictOnce { polled: false })))) } + } +} +#[test] +fn flat_map_doesnt_poll_completed_inner_stream() { + task::block_on(async { assert_eq!( Interchanger { polled: false } .take(2) @@ -161,3 +157,17 @@ fn flat_map_doesnt_poll_completed_inner_stream() { ); }); } + +#[test] +fn flatten_doesnt_poll_completed_inner_stream() { + task::block_on(async { + assert_eq!( + Interchanger { polled: false } + .take(2) + .flatten() + .count() + .await, + 0 + ); + }); +} From d7cab38b674109dae5804ee397a0cedd52bb467f Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Sat, 8 Feb 2020 15:49:01 +0300 Subject: [PATCH 06/10] `core` => `std` --- src/stream/stream/flatten.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index d0e0d20..13975f7 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,5 +1,5 @@ -use core::fmt; -use core::pin::Pin; +use std::fmt; +use std::pin::Pin; use pin_project_lite::pin_project; From 68063adddfc73ad9bd329610d4bd8cf258d11857 Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Sat, 8 Feb 2020 16:22:02 +0300 Subject: [PATCH 07/10] Add link to tests --- tests/stream.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/stream.rs b/tests/stream.rs index 210ceae..fdfa23c 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -144,6 +144,7 @@ impl Stream for Interchanger { } } +// https://github.com/async-rs/async-std/pull/701 #[test] fn flat_map_doesnt_poll_completed_inner_stream() { task::block_on(async { @@ -158,6 +159,7 @@ fn flat_map_doesnt_poll_completed_inner_stream() { }); } +// https://github.com/async-rs/async-std/pull/701 #[test] fn flatten_doesnt_poll_completed_inner_stream() { task::block_on(async { From 2323ac9a8eec2309073abc8861636351fbd7c28b Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Fri, 12 Jun 2020 18:03:07 +0300 Subject: [PATCH 08/10] Apply suggestions from code review Co-authored-by: nasa --- src/stream/stream/flat_map.rs | 20 +++++++------------- src/stream/stream/flatten.rs | 18 ++++++------------ 2 files changed, 13 insertions(+), 25 deletions(-) diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs index f9ceb86..97f5737 100644 --- a/src/stream/stream/flat_map.rs +++ b/src/stream/stream/flat_map.rs @@ -51,22 +51,16 @@ where let mut this = self.project(); loop { if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { - let next_item = futures_core::ready!(inner.poll_next(cx)); - - if next_item.is_some() { - return Poll::Ready(next_item); - } else { - this.inner_stream.set(None); + match futures_core::ready!(inner.poll_next(cx)) { + item @ Some(_) => return Poll::Ready(item), + None => this.inner_stream.set(None), } } - let inner = futures_core::ready!(this.stream.as_mut().poll_next(cx)); - - if inner.is_some() { - this.inner_stream.set(inner.map(IntoStream::into_stream)); - } else { - return Poll::Ready(None); + match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { + inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)), + None => return Poll::Ready(None), } } } -} \ No newline at end of file +} diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 13975f7..5f8d000 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -52,21 +52,15 @@ where let mut this = self.project(); loop { if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { - let next_item = futures_core::ready!(inner.poll_next(cx)); - - if next_item.is_some() { - return Poll::Ready(next_item); - } else { - this.inner_stream.set(None); + match futures_core::ready!(inner.poll_next(cx)) { + item @ Some(_) => return Poll::Ready(next_item), + None => this.inner_stream.set(None), } } - let inner = futures_core::ready!(this.stream.as_mut().poll_next(cx)); - - if inner.is_some() { - this.inner_stream.set(inner.map(IntoStream::into_stream)); - } else { - return Poll::Ready(None); + match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { + inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)), + None => Poll::Ready(None), } } } From df22d87d098397149e33a61b3cc676de0ad97c0b Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Fri, 12 Jun 2020 18:18:40 +0300 Subject: [PATCH 09/10] Removed unnecessary links + hotfix --- src/stream/stream/flatten.rs | 2 +- tests/stream.rs | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 7a767e1..3523603 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -53,7 +53,7 @@ where loop { if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { match futures_core::ready!(inner.poll_next(cx)) { - item @ Some(_) => return Poll::Ready(next_item), + item @ Some(_) => return Poll::Ready(item), None => this.inner_stream.set(None), } } diff --git a/tests/stream.rs b/tests/stream.rs index 47ff228..3a19233 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -154,7 +154,6 @@ impl Stream for Interchanger { } } -// https://github.com/async-rs/async-std/pull/701 #[test] fn flat_map_doesnt_poll_completed_inner_stream() { task::block_on(async { @@ -169,7 +168,6 @@ fn flat_map_doesnt_poll_completed_inner_stream() { }); } -// https://github.com/async-rs/async-std/pull/701 #[test] fn flatten_doesnt_poll_completed_inner_stream() { task::block_on(async { From 42425f6c1a8a944a4af6e3b1ec8d50c2251c0f46 Mon Sep 17 00:00:00 2001 From: Oleg Nosov Date: Sun, 14 Jun 2020 18:42:18 +0300 Subject: [PATCH 10/10] Another hotfix --- src/stream/stream/flatten.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 3523603..e7a498d 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -60,7 +60,7 @@ where match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { inner @ Some(_) => this.inner_stream.set(inner.map(IntoStream::into_stream)), - None => Poll::Ready(None), + None => return Poll::Ready(None), } } }