fix: stream::take_while (#485)

When the predicate is false, the stream should be ended.
This commit is contained in:
Friedel Ziegelmayer 2019-11-09 00:00:03 +01:00 committed by Stjepan Glavina
parent d2d63348c7
commit 4a78f731b7
2 changed files with 9 additions and 6 deletions

View file

@ -303,6 +303,7 @@ extension_trait! {
# #
# }) } # }) }
```
"#] "#]
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P, Self::Item> fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P, Self::Item>
where where
@ -397,9 +398,9 @@ extension_trait! {
use async_std::stream; use async_std::stream;
let v = stream::from_iter(vec![&1, &2, &3]); let v = stream::from_iter(vec![&1, &2, &3]);
let mut v_cloned = v.cloned(); let mut v_cloned = v.cloned();
assert_eq!(v_cloned.next().await, Some(1)); assert_eq!(v_cloned.next().await, Some(1));
assert_eq!(v_cloned.next().await, Some(2)); assert_eq!(v_cloned.next().await, Some(2));
assert_eq!(v_cloned.next().await, Some(3)); assert_eq!(v_cloned.next().await, Some(3));

View file

@ -45,10 +45,12 @@ where
let next = futures_core::ready!(this.stream.poll_next(cx)); let next = futures_core::ready!(this.stream.poll_next(cx));
match next { match next {
Some(v) if (this.predicate)(&v) => Poll::Ready(Some(v)), Some(v) => {
Some(_) => { if (this.predicate)(&v) {
cx.waker().wake_by_ref(); Poll::Ready(Some(v))
Poll::Pending } else {
Poll::Ready(None)
}
} }
None => Poll::Ready(None), None => Poll::Ready(None),
} }