From 724a9f4eb03e59b4873954bde45949e6d71a92c6 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 11 Sep 2019 17:06:02 +0200 Subject: [PATCH] Add Stream::poll_next --- src/stream/stream/all.rs | 37 ++++++++----------- src/stream/stream/any.rs | 37 ++++++++----------- src/stream/stream/min_by.rs | 5 +-- src/stream/stream/mod.rs | 72 +++++++++++++++++++++++++++++++------ src/stream/stream/next.rs | 6 ++-- src/stream/stream/take.rs | 9 ++--- 6 files changed, 104 insertions(+), 62 deletions(-) diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index 6271024..3b65fc7 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs @@ -1,43 +1,36 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - use std::marker::PhantomData; use std::pin::Pin; -#[derive(Debug)] -pub struct AllFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct AllFuture<'a, S, F, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, pub(crate) result: bool, - pub(crate) __item: PhantomData, + pub(crate) _marker: PhantomData, } -impl<'a, S, F, T> AllFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(result: bool); - pin_utils::unsafe_unpinned!(f: F); -} +impl Unpin for AllFuture<'_, S, F, T> {} impl Future for AllFuture<'_, S, F, S::Item> where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); + match next { Some(v) => { - let result = (self.as_mut().f())(v); - *self.as_mut().result() = result; + let result = (&mut self.f)(v); + self.result = result; + if result { // don't forget to wake this task again to pull the next item from stream cx.waker().wake_by_ref(); diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index f1f551a..a23adf4 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs @@ -1,43 +1,36 @@ -use crate::future::Future; -use crate::task::{Context, Poll}; - use std::marker::PhantomData; use std::pin::Pin; -#[derive(Debug)] -pub struct AnyFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct AnyFuture<'a, S, F, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, pub(crate) result: bool, - pub(crate) __item: PhantomData, + pub(crate) _marker: PhantomData, } -impl<'a, S, F, T> AnyFuture<'a, S, F, T> -where - F: FnMut(T) -> bool, -{ - pin_utils::unsafe_pinned!(stream: &'a mut S); - pin_utils::unsafe_unpinned!(result: bool); - pin_utils::unsafe_unpinned!(f: F); -} +impl Unpin for AnyFuture<'_, S, F, T> {} impl Future for AnyFuture<'_, S, F, S::Item> where - S: futures_core::stream::Stream + Unpin + Sized, + S: Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures_core::stream::Stream; - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); + match next { Some(v) => { - let result = (self.as_mut().f())(v); - *self.as_mut().result() = result; + let result = (&mut self.f)(v); + self.result = result; + if result { Poll::Ready(true) } else { diff --git a/src/stream/stream/min_by.rs b/src/stream/stream/min_by.rs index b65d88d..3223af9 100644 --- a/src/stream/stream/min_by.rs +++ b/src/stream/stream/min_by.rs @@ -6,7 +6,8 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; /// A future that yields the minimum item in a stream by a given comparison function. -#[derive(Clone, Debug)] +#[doc(hidden)] +#[allow(missing_debug_implementations)] pub struct MinByFuture { stream: S, compare: F, @@ -27,7 +28,7 @@ impl MinByFuture { impl Future for MinByFuture where - S: futures_core::stream::Stream + Unpin, + S: Stream + Unpin, S::Item: Copy, F: FnMut(&S::Item, &S::Item) -> Ordering, { diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 91b111e..17ba524 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -36,9 +36,12 @@ use next::NextFuture; use std::cmp::Ordering; use std::marker::PhantomData; +use std::pin::Pin; use cfg_if::cfg_if; +use crate::task::{Context, Poll}; + cfg_if! { if #[cfg(feature = "docs")] { #[doc(hidden)] @@ -73,6 +76,55 @@ pub trait Stream { /// The type of items yielded by this stream. type Item; + /// Attempts to receive the next item from the stream. + /// + /// There are several possible return values: + /// + /// * `Poll::Pending` means this stream's next value is not ready yet. + /// * `Poll::Ready(None)` means this stream has been exhausted. + /// * `Poll::Ready(Some(item))` means `item` was received out of the stream. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::pin::Pin; + /// + /// use async_std::prelude::*; + /// use async_std::stream; + /// use async_std::task::{Context, Poll}; + /// + /// fn increment(s: impl Stream + Unpin) -> impl Stream + Unpin { + /// struct Increment(S); + /// + /// impl + Unpin> Stream for Increment { + /// type Item = S::Item; + /// + /// fn poll_next( + /// mut self: Pin<&mut Self>, + /// cx: &mut Context<'_>, + /// ) -> Poll> { + /// match Pin::new(&mut self.0).poll_next(cx) { + /// Poll::Pending => Poll::Pending, + /// Poll::Ready(None) => Poll::Ready(None), + /// Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + /// } + /// } + /// } + /// + /// Increment(s) + /// } + /// + /// let mut s = increment(stream::once(7)); + /// + /// assert_eq!(s.next().await, Some(8)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + /// Advances the stream and returns the next value. /// /// Returns [`None`] when iteration is finished. Individual stream implementations may @@ -98,7 +150,10 @@ pub trait Stream { /// ``` fn next(&mut self) -> ret!('_, NextFuture, Option) where - Self: Unpin; + Self: Unpin, + { + NextFuture { stream: self } + } /// Creates a stream that yields its first `n` elements. /// @@ -207,13 +262,13 @@ pub trait Stream { #[inline] fn all(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item) where - Self: Sized, + Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { AllFuture { stream: self, result: true, // the default if the empty stream - __item: PhantomData, + _marker: PhantomData, f, } } @@ -264,13 +319,13 @@ pub trait Stream { #[inline] fn any(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item) where - Self: Sized, + Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, { AnyFuture { stream: self, result: false, // the default if the empty stream - __item: PhantomData, + _marker: PhantomData, f, } } @@ -279,10 +334,7 @@ pub trait Stream { impl Stream for T { type Item = ::Item; - fn next(&mut self) -> ret!('_, NextFuture, Option) - where - Self: Unpin, - { - NextFuture { stream: self } + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + futures_core::stream::Stream::poll_next(self, cx) } } diff --git a/src/stream/stream/next.rs b/src/stream/stream/next.rs index b64750d..de75f5e 100644 --- a/src/stream/stream/next.rs +++ b/src/stream/stream/next.rs @@ -1,6 +1,8 @@ +use std::pin::Pin; + use crate::future::Future; +use crate::stream::Stream; use crate::task::{Context, Poll}; -use std::pin::Pin; #[doc(hidden)] #[allow(missing_debug_implementations)] @@ -8,7 +10,7 @@ pub struct NextFuture<'a, T: Unpin + ?Sized> { pub(crate) stream: &'a mut T, } -impl Future for NextFuture<'_, T> { +impl Future for NextFuture<'_, T> { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 0499a6a..0dea1d0 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -1,7 +1,8 @@ -use crate::task::{Context, Poll}; - use std::pin::Pin; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + /// A stream that yields the first `n` items of another stream. #[derive(Clone, Debug)] pub struct Take { @@ -11,12 +12,12 @@ pub struct Take { impl Unpin for Take {} -impl Take { +impl Take { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(remaining: usize); } -impl futures_core::stream::Stream for Take { +impl futures_core::stream::Stream for Take { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {