From 4a78f731b7510af1a5f7bfedfb78852cb40a0248 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Sat, 9 Nov 2019 00:00:03 +0100 Subject: [PATCH] fix: stream::take_while (#485) When the predicate is false, the stream should be ended. --- src/stream/stream/mod.rs | 5 +++-- src/stream/stream/take_while.rs | 10 ++++++---- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index f9a9e3a4..d0d69350 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -303,6 +303,7 @@ extension_trait! { # # }) } + ``` "#] fn take_while

(self, predicate: P) -> TakeWhile where @@ -397,9 +398,9 @@ extension_trait! { use async_std::stream; let v = stream::from_iter(vec![&1, &2, &3]); - + let mut v_cloned = v.cloned(); - + assert_eq!(v_cloned.next().await, Some(1)); assert_eq!(v_cloned.next().await, Some(2)); assert_eq!(v_cloned.next().await, Some(3)); diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index 35978b47..9b2945fd 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -45,10 +45,12 @@ where let next = futures_core::ready!(this.stream.poll_next(cx)); match next { - Some(v) if (this.predicate)(&v) => Poll::Ready(Some(v)), - Some(_) => { - cx.waker().wake_by_ref(); - Poll::Pending + Some(v) => { + if (this.predicate)(&v) { + Poll::Ready(Some(v)) + } else { + Poll::Ready(None) + } } None => Poll::Ready(None), }