mirror of
https://github.com/async-rs/async-std.git
synced 2025-01-16 10:49:55 +00:00
Add Stream::poll_next
This commit is contained in:
parent
2c02037673
commit
724a9f4eb0
6 changed files with 105 additions and 63 deletions
|
@ -1,43 +1,36 @@
|
|||
use crate::future::Future;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AllFuture<'a, S, F, T>
|
||||
where
|
||||
F: FnMut(T) -> bool,
|
||||
{
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct AllFuture<'a, S, F, T> {
|
||||
pub(crate) stream: &'a mut S,
|
||||
pub(crate) f: F,
|
||||
pub(crate) result: bool,
|
||||
pub(crate) __item: PhantomData<T>,
|
||||
pub(crate) _marker: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<'a, S, F, T> AllFuture<'a, S, F, T>
|
||||
where
|
||||
F: FnMut(T) -> bool,
|
||||
{
|
||||
pin_utils::unsafe_pinned!(stream: &'a mut S);
|
||||
pin_utils::unsafe_unpinned!(result: bool);
|
||||
pin_utils::unsafe_unpinned!(f: F);
|
||||
}
|
||||
impl<S: Unpin, F, T> Unpin for AllFuture<'_, S, F, T> {}
|
||||
|
||||
impl<S, F> Future for AllFuture<'_, S, F, S::Item>
|
||||
where
|
||||
S: futures_core::stream::Stream + Unpin + Sized,
|
||||
S: Stream + Unpin + Sized,
|
||||
F: FnMut(S::Item) -> bool,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
use futures_core::stream::Stream;
|
||||
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
|
||||
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
|
||||
|
||||
match next {
|
||||
Some(v) => {
|
||||
let result = (self.as_mut().f())(v);
|
||||
*self.as_mut().result() = result;
|
||||
let result = (&mut self.f)(v);
|
||||
self.result = result;
|
||||
|
||||
if result {
|
||||
// don't forget to wake this task again to pull the next item from stream
|
||||
cx.waker().wake_by_ref();
|
||||
|
|
|
@ -1,43 +1,36 @@
|
|||
use crate::future::Future;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AnyFuture<'a, S, F, T>
|
||||
where
|
||||
F: FnMut(T) -> bool,
|
||||
{
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct AnyFuture<'a, S, F, T> {
|
||||
pub(crate) stream: &'a mut S,
|
||||
pub(crate) f: F,
|
||||
pub(crate) result: bool,
|
||||
pub(crate) __item: PhantomData<T>,
|
||||
pub(crate) _marker: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<'a, S, F, T> AnyFuture<'a, S, F, T>
|
||||
where
|
||||
F: FnMut(T) -> bool,
|
||||
{
|
||||
pin_utils::unsafe_pinned!(stream: &'a mut S);
|
||||
pin_utils::unsafe_unpinned!(result: bool);
|
||||
pin_utils::unsafe_unpinned!(f: F);
|
||||
}
|
||||
impl<S: Unpin, F, T> Unpin for AnyFuture<'_, S, F, T> {}
|
||||
|
||||
impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
|
||||
where
|
||||
S: futures_core::stream::Stream + Unpin + Sized,
|
||||
S: Stream + Unpin + Sized,
|
||||
F: FnMut(S::Item) -> bool,
|
||||
{
|
||||
type Output = bool;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
use futures_core::stream::Stream;
|
||||
let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
|
||||
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx));
|
||||
|
||||
match next {
|
||||
Some(v) => {
|
||||
let result = (self.as_mut().f())(v);
|
||||
*self.as_mut().result() = result;
|
||||
let result = (&mut self.f)(v);
|
||||
self.result = result;
|
||||
|
||||
if result {
|
||||
Poll::Ready(true)
|
||||
} else {
|
||||
|
|
|
@ -6,7 +6,8 @@ use crate::stream::Stream;
|
|||
use crate::task::{Context, Poll};
|
||||
|
||||
/// A future that yields the minimum item in a stream by a given comparison function.
|
||||
#[derive(Clone, Debug)]
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct MinByFuture<S: Stream, F> {
|
||||
stream: S,
|
||||
compare: F,
|
||||
|
@ -27,7 +28,7 @@ impl<S: Stream + Unpin, F> MinByFuture<S, F> {
|
|||
|
||||
impl<S, F> Future for MinByFuture<S, F>
|
||||
where
|
||||
S: futures_core::stream::Stream + Unpin,
|
||||
S: Stream + Unpin,
|
||||
S::Item: Copy,
|
||||
F: FnMut(&S::Item, &S::Item) -> Ordering,
|
||||
{
|
||||
|
|
|
@ -36,9 +36,12 @@ use next::NextFuture;
|
|||
|
||||
use std::cmp::Ordering;
|
||||
use std::marker::PhantomData;
|
||||
use std::pin::Pin;
|
||||
|
||||
use cfg_if::cfg_if;
|
||||
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
cfg_if! {
|
||||
if #[cfg(feature = "docs")] {
|
||||
#[doc(hidden)]
|
||||
|
@ -73,6 +76,55 @@ pub trait Stream {
|
|||
/// The type of items yielded by this stream.
|
||||
type Item;
|
||||
|
||||
/// Attempts to receive the next item from the stream.
|
||||
///
|
||||
/// There are several possible return values:
|
||||
///
|
||||
/// * `Poll::Pending` means this stream's next value is not ready yet.
|
||||
/// * `Poll::Ready(None)` means this stream has been exhausted.
|
||||
/// * `Poll::Ready(Some(item))` means `item` was received out of the stream.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// # fn main() { async_std::task::block_on(async {
|
||||
/// #
|
||||
/// use std::pin::Pin;
|
||||
///
|
||||
/// use async_std::prelude::*;
|
||||
/// use async_std::stream;
|
||||
/// use async_std::task::{Context, Poll};
|
||||
///
|
||||
/// fn increment(s: impl Stream<Item = i32> + Unpin) -> impl Stream<Item = i32> + Unpin {
|
||||
/// struct Increment<S>(S);
|
||||
///
|
||||
/// impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> {
|
||||
/// type Item = S::Item;
|
||||
///
|
||||
/// fn poll_next(
|
||||
/// mut self: Pin<&mut Self>,
|
||||
/// cx: &mut Context<'_>,
|
||||
/// ) -> Poll<Option<Self::Item>> {
|
||||
/// match Pin::new(&mut self.0).poll_next(cx) {
|
||||
/// Poll::Pending => Poll::Pending,
|
||||
/// Poll::Ready(None) => Poll::Ready(None),
|
||||
/// Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
|
||||
/// }
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// Increment(s)
|
||||
/// }
|
||||
///
|
||||
/// let mut s = increment(stream::once(7));
|
||||
///
|
||||
/// assert_eq!(s.next().await, Some(8));
|
||||
/// assert_eq!(s.next().await, None);
|
||||
/// #
|
||||
/// # }) }
|
||||
/// ```
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
|
||||
|
||||
/// Advances the stream and returns the next value.
|
||||
///
|
||||
/// Returns [`None`] when iteration is finished. Individual stream implementations may
|
||||
|
@ -98,7 +150,10 @@ pub trait Stream {
|
|||
/// ```
|
||||
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
|
||||
where
|
||||
Self: Unpin;
|
||||
Self: Unpin,
|
||||
{
|
||||
NextFuture { stream: self }
|
||||
}
|
||||
|
||||
/// Creates a stream that yields its first `n` elements.
|
||||
///
|
||||
|
@ -207,13 +262,13 @@ pub trait Stream {
|
|||
#[inline]
|
||||
fn all<F>(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item)
|
||||
where
|
||||
Self: Sized,
|
||||
Self: Unpin + Sized,
|
||||
F: FnMut(Self::Item) -> bool,
|
||||
{
|
||||
AllFuture {
|
||||
stream: self,
|
||||
result: true, // the default if the empty stream
|
||||
__item: PhantomData,
|
||||
_marker: PhantomData,
|
||||
f,
|
||||
}
|
||||
}
|
||||
|
@ -264,13 +319,13 @@ pub trait Stream {
|
|||
#[inline]
|
||||
fn any<F>(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item)
|
||||
where
|
||||
Self: Sized,
|
||||
Self: Unpin + Sized,
|
||||
F: FnMut(Self::Item) -> bool,
|
||||
{
|
||||
AnyFuture {
|
||||
stream: self,
|
||||
result: false, // the default if the empty stream
|
||||
__item: PhantomData,
|
||||
_marker: PhantomData,
|
||||
f,
|
||||
}
|
||||
}
|
||||
|
@ -279,10 +334,7 @@ pub trait Stream {
|
|||
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Stream for T {
|
||||
type Item = <Self as futures_core::stream::Stream>::Item;
|
||||
|
||||
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
|
||||
where
|
||||
Self: Unpin,
|
||||
{
|
||||
NextFuture { stream: self }
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
futures_core::stream::Stream::poll_next(self, cx)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,16 @@
|
|||
use crate::future::Future;
|
||||
use crate::task::{Context, Poll};
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
#[doc(hidden)]
|
||||
#[allow(missing_debug_implementations)]
|
||||
pub struct NextFuture<'a, T: Unpin + ?Sized> {
|
||||
pub(crate) stream: &'a mut T,
|
||||
}
|
||||
|
||||
impl<T: futures_core::stream::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
|
||||
impl<T: Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
|
||||
type Output = Option<T::Item>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use crate::task::{Context, Poll};
|
||||
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
/// A stream that yields the first `n` items of another stream.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Take<S> {
|
||||
|
@ -11,12 +12,12 @@ pub struct Take<S> {
|
|||
|
||||
impl<S: Unpin> Unpin for Take<S> {}
|
||||
|
||||
impl<S: futures_core::stream::Stream> Take<S> {
|
||||
impl<S: Stream> Take<S> {
|
||||
pin_utils::unsafe_pinned!(stream: S);
|
||||
pin_utils::unsafe_unpinned!(remaining: usize);
|
||||
}
|
||||
|
||||
impl<S: futures_core::stream::Stream> futures_core::stream::Stream for Take<S> {
|
||||
impl<S: Stream> futures_core::stream::Stream for Take<S> {
|
||||
type Item = S::Item;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
|
||||
|
|
Loading…
Reference in a new issue