2
0
Fork 1
mirror of https://github.com/async-rs/async-std.git synced 2025-02-28 15:19:41 +00:00

Re-export Stream from futures

This commit is contained in:
Stjepan Glavina 2019-09-22 16:34:07 +02:00
parent 33ff41df48
commit 73d7fea937
17 changed files with 951 additions and 748 deletions

View file

@ -4,6 +4,7 @@ use std::pin::Pin;
use crate::fs::DirEntry; use crate::fs::DirEntry;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
use crate::stream::Stream;
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
/// Returns a stream of entries in a directory. /// Returns a stream of entries in a directory.
@ -80,7 +81,7 @@ impl ReadDir {
} }
} }
impl futures_core::stream::Stream for ReadDir { impl Stream for ReadDir {
type Item = io::Result<DirEntry>; type Item = io::Result<DirEntry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

View file

@ -4,6 +4,7 @@ use std::str;
use super::read_until_internal; use super::read_until_internal;
use crate::io::{self, BufRead}; use crate::io::{self, BufRead};
use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// A stream of lines in a byte stream. /// A stream of lines in a byte stream.
@ -23,7 +24,7 @@ pub struct Lines<R> {
pub(crate) read: usize, pub(crate) read: usize,
} }
impl<R: BufRead> futures_core::stream::Stream for Lines<R> { impl<R: BufRead> Stream for Lines<R> {
type Item = io::Result<String>; type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

View file

@ -12,6 +12,7 @@ use crate::future::{self, Future};
use crate::io; use crate::io;
use crate::net::driver::Watcher; use crate::net::driver::Watcher;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::stream::Stream;
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
/// A Unix domain socket server, listening for connections. /// A Unix domain socket server, listening for connections.
@ -185,7 +186,7 @@ impl fmt::Debug for UnixListener {
#[derive(Debug)] #[derive(Debug)]
pub struct Incoming<'a>(&'a UnixListener); pub struct Incoming<'a>(&'a UnixListener);
impl futures_core::stream::Stream for Incoming<'_> { impl Stream for Incoming<'_> {
type Item = io::Result<UnixStream>; type Item = io::Result<UnixStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {

View file

@ -34,3 +34,5 @@ pub use crate::io::read::ReadExt as _;
pub use crate::io::seek::SeekExt as _; pub use crate::io::seek::SeekExt as _;
#[doc(hidden)] #[doc(hidden)]
pub use crate::io::write::WriteExt as _; pub use crate::io::write::WriteExt as _;
#[doc(hidden)]
pub use crate::stream::stream::StreamExt as _;

View file

@ -1,6 +1,7 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// Creates a stream that doesn't yield any items. /// Creates a stream that doesn't yield any items.
@ -35,7 +36,7 @@ pub struct Empty<T> {
_marker: PhantomData<T>, _marker: PhantomData<T>,
} }
impl<T> futures_core::stream::Stream for Empty<T> { impl<T> Stream for Empty<T> {
type Item = T; type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {

View file

@ -1,4 +1,4 @@
use futures_core::stream::Stream; use crate::stream::Stream;
/// Conversion into a `Stream`. /// Conversion into a `Stream`.
/// ///

View file

@ -26,12 +26,13 @@ use cfg_if::cfg_if;
pub use empty::{empty, Empty}; pub use empty::{empty, Empty};
pub use once::{once, Once}; pub use once::{once, Once};
pub use repeat::{repeat, Repeat}; pub use repeat::{repeat, Repeat};
pub use stream::{Fuse, Scan, Stream, Take, Zip}; pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, Zip};
pub(crate) mod stream;
mod empty; mod empty;
mod once; mod once;
mod repeat; mod repeat;
mod stream;
cfg_if! { cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] { if #[cfg(any(feature = "unstable", feature = "docs"))] {

View file

@ -1,5 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// Creates a stream that yields a single item. /// Creates a stream that yields a single item.
@ -33,7 +34,7 @@ pub struct Once<T> {
value: Option<T>, value: Option<T>,
} }
impl<T: Unpin> futures_core::stream::Stream for Once<T> { impl<T: Unpin> Stream for Once<T> {
type Item = T; type Item = T;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {

View file

@ -1,5 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// Creates a stream that yields the same item repeatedly. /// Creates a stream that yields the same item repeatedly.
@ -36,7 +37,7 @@ pub struct Repeat<T> {
item: T, item: T,
} }
impl<T: Clone> futures_core::stream::Stream for Repeat<T> { impl<T: Clone> Stream for Repeat<T> {
type Item = T; type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {

View file

@ -1,7 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use super::fuse::Fuse; use super::fuse::Fuse;
use crate::stream::Stream; use crate::prelude::*;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
/// Chains two streams one after another. /// Chains two streams one after another.

View file

@ -19,7 +19,7 @@ impl<S> Enumerate<S> {
} }
} }
impl<S> futures_core::stream::Stream for Enumerate<S> impl<S> Stream for Enumerate<S>
where where
S: Stream, S: Stream,
{ {

View file

@ -25,7 +25,7 @@ impl<S, P, T> Filter<S, P, T> {
} }
} }
impl<S, P> futures_core::stream::Stream for Filter<S, P, S::Item> impl<S, P> Stream for Filter<S, P, S::Item>
where where
S: Stream, S: Stream,
P: FnMut(&S::Item) -> bool, P: FnMut(&S::Item) -> bool,

View file

@ -27,7 +27,7 @@ impl<S, F, T, B> FilterMap<S, F, T, B> {
} }
} }
impl<S, F, B> futures_core::stream::Stream for FilterMap<S, F, S::Item, B> impl<S, F, B> Stream for FilterMap<S, F, S::Item, B>
where where
S: Stream, S: Stream,
F: FnMut(S::Item) -> Option<B>, F: FnMut(S::Item) -> Option<B>,

View file

@ -42,17 +42,6 @@ mod step_by;
mod take; mod take;
mod zip; mod zip;
pub use chain::Chain;
pub use filter::Filter;
pub use fuse::Fuse;
pub use inspect::Inspect;
pub use scan::Scan;
pub use skip::Skip;
pub use skip_while::SkipWhile;
pub use step_by::StepBy;
pub use take::Take;
pub use zip::Zip;
use all::AllFuture; use all::AllFuture;
use any::AnyFuture; use any::AnyFuture;
use enumerate::Enumerate; use enumerate::Enumerate;
@ -64,41 +53,24 @@ use min_by::MinByFuture;
use next::NextFuture; use next::NextFuture;
use nth::NthFuture; use nth::NthFuture;
pub use chain::Chain;
pub use filter::Filter;
pub use fuse::Fuse;
pub use inspect::Inspect;
pub use scan::Scan;
pub use skip::Skip;
pub use skip_while::SkipWhile;
pub use step_by::StepBy;
pub use take::Take;
pub use zip::Zip;
use std::cmp::Ordering; use std::cmp::Ordering;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll};
use cfg_if::cfg_if; use cfg_if::cfg_if;
cfg_if! {
if #[cfg(feature = "unstable")] {
use crate::future::Future; use crate::future::Future;
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
($a:lifetime, $f:tt, $o:ty, $t1:ty) => (ImplFuture<$a, $o>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => (ImplFuture<$a, $o>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => (ImplFuture<$a, $o>);
($f:ty, $o:ty) => (ImplFuture<'static, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
($a:lifetime, $f:tt, $o:ty, $t1:ty) => ($f<$a, Self, $t1>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => ($f<$a, Self, $t1, $t2>);
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => ($f<$a, Self, $t1, $t2, $t3>);
($f:ty, $o:ty) => ($f);
}
}
}
cfg_if! { cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] { if #[cfg(any(feature = "unstable", feature = "docs"))] {
@ -106,13 +78,22 @@ cfg_if! {
} }
} }
cfg_if! {
if #[cfg(feature = "docs")] {
use std::ops::{Deref, DerefMut};
use crate::task::{Context, Poll};
#[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
/// An asynchronous stream of values. /// An asynchronous stream of values.
/// ///
/// This trait is a re-export of [`futures::stream::Stream`] and is an async version of /// This trait is a re-export of [`futures::stream::Stream`] and is an async version of
/// [`std::iter::Iterator`]. /// [`std::iter::Iterator`].
/// ///
/// The [provided methods] do not really exist in the trait itself, but they become available when /// The [provided methods] do not really exist in the trait itself, but they become
/// the prelude is imported: /// available when the prelude is imported:
/// ///
/// ``` /// ```
/// # #[allow(unused_imports)] /// # #[allow(unused_imports)]
@ -146,7 +127,9 @@ pub trait Stream {
/// use async_std::stream; /// use async_std::stream;
/// use async_std::task::{Context, Poll}; /// use async_std::task::{Context, Poll};
/// ///
/// fn increment(s: impl Stream<Item = i32> + Unpin) -> impl Stream<Item = i32> + Unpin { /// fn increment(
/// s: impl Stream<Item = i32> + Unpin,
/// ) -> impl Stream<Item = i32> + Unpin {
/// struct Increment<S>(S); /// struct Increment<S>(S);
/// ///
/// impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> { /// impl<S: Stream<Item = i32> + Unpin> Stream for Increment<S> {
@ -199,11 +182,11 @@ pub trait Stream {
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>) fn next(&mut self) -> ImplFuture<'_, Option<Self::Item>>
where where
Self: Unpin, Self: Unpin,
{ {
NextFuture { stream: self } unreachable!()
} }
/// Creates a stream that yields its first `n` elements. /// Creates a stream that yields its first `n` elements.
@ -228,10 +211,7 @@ pub trait Stream {
where where
Self: Sized, Self: Sized,
{ {
Take { unreachable!()
stream: self,
remaining: n,
}
} }
/// Creates a stream that yields each `step`th element. /// Creates a stream that yields each `step`th element.
@ -265,7 +245,7 @@ pub trait Stream {
where where
Self: Sized, Self: Sized,
{ {
StepBy::new(self, step) unreachable!()
} }
/// Takes two streams and creates a new stream over both in sequence. /// Takes two streams and creates a new stream over both in sequence.
@ -298,7 +278,7 @@ pub trait Stream {
Self: Sized, Self: Sized,
U: Stream<Item = Self::Item> + Sized, U: Stream<Item = Self::Item> + Sized,
{ {
Chain::new(self, other) unreachable!()
} }
/// Creates a stream that gives the current element's count as well as the next value. /// Creates a stream that gives the current element's count as well as the next value.
@ -330,10 +310,11 @@ pub trait Stream {
where where
Self: Sized, Self: Sized,
{ {
Enumerate::new(self) unreachable!()
} }
/// A combinator that does something with each element in the stream, passing the value on. /// A combinator that does something with each element in the stream, passing the value
/// on.
/// ///
/// # Examples /// # Examples
/// ///
@ -361,11 +342,11 @@ pub trait Stream {
Self: Sized, Self: Sized,
F: FnMut(&Self::Item), F: FnMut(&Self::Item),
{ {
Inspect::new(self, f) unreachable!()
} }
/// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll` /// Transforms this `Stream` into a "fused" `Stream` such that after the first time
/// returns `Poll::Ready(None)`, all future calls to `poll` will also return /// `poll` returns `Poll::Ready(None)`, all future calls to `poll` will also return
/// `Poll::Ready(None)`. /// `Poll::Ready(None)`.
/// ///
/// # Examples /// # Examples
@ -387,14 +368,10 @@ pub trait Stream {
where where
Self: Sized, Self: Sized,
{ {
Fuse { unreachable!()
stream: self,
done: false,
}
} }
/// Creates a stream that uses a predicate to determine if an element /// Creates a stream that uses a predicate to determine if an element should be yielded.
/// should be yeilded.
/// ///
/// # Examples /// # Examples
/// ///
@ -420,7 +397,7 @@ pub trait Stream {
Self: Sized, Self: Sized,
P: FnMut(&Self::Item) -> bool, P: FnMut(&Self::Item) -> bool,
{ {
Filter::new(self, predicate) unreachable!()
} }
/// Both filters and maps a stream. /// Both filters and maps a stream.
@ -458,7 +435,7 @@ pub trait Stream {
Self: Sized, Self: Sized,
F: FnMut(Self::Item) -> Option<B>, F: FnMut(Self::Item) -> Option<B>,
{ {
FilterMap::new(self, f) unreachable!()
} }
/// Returns the element that gives the minimum value with respect to the /// Returns the element that gives the minimum value with respect to the
@ -486,12 +463,12 @@ pub trait Stream {
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```
fn min_by<F>(self, compare: F) -> ret!(MinByFuture<Self, F, Self::Item>, Self::Item) fn min_by<F>(self, compare: F) -> ImplFuture<'static, Option<Self::Item>>
where where
Self: Sized, Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering, F: FnMut(&Self::Item, &Self::Item) -> Ordering,
{ {
MinByFuture::new(self, compare) unreachable!()
} }
/// Returns the nth element of the stream. /// Returns the nth element of the stream.
@ -545,11 +522,11 @@ pub trait Stream {
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```
fn nth(&mut self, n: usize) -> ret!('_, NthFuture, Option<Self::Item>) fn nth(&mut self, n: usize) -> ImplFuture<'_, Option<Self::Item>>
where where
Self: Sized, Self: Sized,
{ {
NthFuture::new(self, n) unreachable!()
} }
/// Tests if every element of the stream matches a predicate. /// Tests if every element of the stream matches a predicate.
@ -596,17 +573,12 @@ pub trait Stream {
/// # }) } /// # }) }
/// ``` /// ```
#[inline] #[inline]
fn all<F>(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item) fn all<F>(&mut self, f: F) -> ImplFuture<'_, bool>
where where
Self: Unpin + Sized, Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool, F: FnMut(Self::Item) -> bool,
{ {
AllFuture { unreachable!()
stream: self,
result: true, // the default if the empty stream
_marker: PhantomData,
f,
}
} }
/// Searches for an element in a stream that satisfies a predicate. /// Searches for an element in a stream that satisfies a predicate.
@ -645,12 +617,12 @@ pub trait Stream {
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```
fn find<P>(&mut self, p: P) -> ret!('_, FindFuture, Option<Self::Item>, P, Self::Item) fn find<P>(&mut self, p: P) -> ImplFuture<'_, Option<Self::Item>>
where where
Self: Sized, Self: Sized,
P: FnMut(&Self::Item) -> bool, P: FnMut(&Self::Item) -> bool,
{ {
FindFuture::new(self, p) unreachable!()
} }
/// Applies function to the elements of stream and returns the first non-none result. /// Applies function to the elements of stream and returns the first non-none result.
@ -668,12 +640,12 @@ pub trait Stream {
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```
fn find_map<F, B>(&mut self, f: F) -> ret!('_, FindMapFuture, Option<B>, F, Self::Item, B) fn find_map<F, B>(&mut self, f: F) -> ImplFuture<'_, Option<B>>
where where
Self: Sized, Self: Sized,
F: FnMut(Self::Item) -> Option<B>, F: FnMut(Self::Item) -> Option<B>,
{ {
FindMapFuture::new(self, f) unreachable!()
} }
/// A combinator that applies a function to every element in a stream /// A combinator that applies a function to every element in a stream
@ -696,12 +668,12 @@ pub trait Stream {
/// # /// #
/// # }) } /// # }) }
/// ``` /// ```
fn fold<B, F>(self, init: B, f: F) -> ret!(FoldFuture<Self, F, Self::Item, B>, Self::Item) fn fold<B, F>(self, init: B, f: F) -> ImplFuture<'static, B>
where where
Self: Sized, Self: Sized,
F: FnMut(B, Self::Item) -> B, F: FnMut(B, Self::Item) -> B,
{ {
FoldFuture::new(self, init, f) unreachable!()
} }
/// Tests if any element of the stream matches a predicate. /// Tests if any element of the stream matches a predicate.
@ -747,30 +719,26 @@ pub trait Stream {
/// # }) } /// # }) }
/// ``` /// ```
#[inline] #[inline]
fn any<F>(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item) fn any<F>(&mut self, f: F) -> ImplFuture<'_, bool>
where where
Self: Unpin + Sized, Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool, F: FnMut(Self::Item) -> bool,
{ {
AnyFuture { unreachable!()
stream: self,
result: false, // the default if the empty stream
_marker: PhantomData,
f,
}
} }
/// A stream adaptor similar to [`fold`] that holds internal state and produces a new stream. /// A stream adaptor similar to [`fold`] that holds internal state and produces a new
/// stream.
/// ///
/// [`fold`]: #method.fold /// [`fold`]: #method.fold
/// ///
/// `scan()` takes two arguments: an initial value which seeds the internal state, and a /// `scan()` takes two arguments: an initial value which seeds the internal state, and
/// closure with two arguments, the first being a mutable reference to the internal state and /// a closure with two arguments, the first being a mutable reference to the internal
/// the second a stream element. The closure can assign to the internal state to share state /// state and the second a stream element. The closure can assign to the internal state
/// between iterations. /// to share state between iterations.
/// ///
/// On iteration, the closure will be applied to each element of the stream and the return /// On iteration, the closure will be applied to each element of the stream and the
/// value from the closure, an `Option`, is yielded by the stream. /// return value from the closure, an `Option`, is yielded by the stream.
/// ///
/// ## Examples /// ## Examples
/// ///
@ -799,7 +767,7 @@ pub trait Stream {
Self: Sized, Self: Sized,
F: FnMut(&mut St, Self::Item) -> Option<B>, F: FnMut(&mut St, Self::Item) -> Option<B>,
{ {
Scan::new(self, initial_state, f) unreachable!()
} }
/// Combinator that `skip`s elements based on a predicate. /// Combinator that `skip`s elements based on a predicate.
@ -808,7 +776,7 @@ pub trait Stream {
/// the stream and ignore elements until it returns `false`. /// the stream and ignore elements until it returns `false`.
/// ///
/// After `false` is returned, `SkipWhile`'s job is over and all further /// After `false` is returned, `SkipWhile`'s job is over and all further
/// elements in the strem are yeilded. /// elements in the strem are yielded.
/// ///
/// ## Examples /// ## Examples
/// ///
@ -832,7 +800,7 @@ pub trait Stream {
Self: Sized, Self: Sized,
P: FnMut(&Self::Item) -> bool, P: FnMut(&Self::Item) -> bool,
{ {
SkipWhile::new(self, predicate) unreachable!()
} }
/// Creates a combinator that skips the first `n` elements. /// Creates a combinator that skips the first `n` elements.
@ -856,20 +824,20 @@ pub trait Stream {
where where
Self: Sized, Self: Sized,
{ {
Skip::new(self, n) unreachable!()
} }
/// 'Zips up' two streams into a single stream of pairs. /// 'Zips up' two streams into a single stream of pairs.
/// ///
/// `zip()` returns a new stream that will iterate over two other streams, returning a tuple /// `zip()` returns a new stream that will iterate over two other streams, returning a
/// where the first element comes from the first stream, and the second element comes from the /// tuple where the first element comes from the first stream, and the second element
/// second stream. /// comes from the second stream.
/// ///
/// In other words, it zips two streams together, into a single one. /// In other words, it zips two streams together, into a single one.
/// ///
/// If either stream returns [`None`], [`poll_next`] from the zipped stream will return /// If either stream returns [`None`], [`poll_next`] from the zipped stream will return
/// [`None`]. If the first stream returns [`None`], `zip` will short-circuit and `poll_next` /// [`None`]. If the first stream returns [`None`], `zip` will short-circuit and
/// will not be called on the second stream. /// `poll_next` will not be called on the second stream.
/// ///
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
/// [`poll_next`]: #tymethod.poll_next /// [`poll_next`]: #tymethod.poll_next
@ -899,7 +867,7 @@ pub trait Stream {
Self: Sized, Self: Sized,
U: Stream, U: Stream,
{ {
Zip::new(self, other) unreachable!()
} }
/// Transforms a stream into a collection. /// Transforms a stream into a collection.
@ -954,19 +922,245 @@ pub trait Stream {
#[cfg(any(feature = "unstable", feature = "docs"))] #[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(self) -> ret!(Pin<Box<dyn Future<Output = B> + 'a>>, B) fn collect<'a, B>(self) -> ImplFuture<'a, B>
where where
Self: futures_core::stream::Stream + Sized + 'a, Self: Sized + 'a,
B: FromStream<<Self as futures_core::stream::Stream>::Item>, B: FromStream<Self::Item>,
{
unreachable!()
}
}
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unreachable!()
}
}
impl<S: Stream + Unpin + ?Sized> Stream for &mut S {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unreachable!()
}
}
impl<P> Stream for Pin<P>
where
P: DerefMut + Unpin,
<P as Deref>::Target: Stream,
{
type Item = <<P as Deref>::Target as Stream>::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unreachable!()
}
}
impl<T: Unpin> Stream for std::collections::VecDeque<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unreachable!()
}
}
impl<S: Stream> Stream for std::panic::AssertUnwindSafe<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
unreachable!()
}
}
} else {
pub use futures_core::stream::Stream;
}
}
#[doc(hidden)]
pub trait StreamExt: futures_core::stream::Stream {
fn next(&mut self) -> NextFuture<'_, Self>
where
Self: Unpin,
{
NextFuture { stream: self }
}
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take {
stream: self,
remaining: n,
}
}
fn step_by(self, step: usize) -> StepBy<Self>
where
Self: Sized,
{
StepBy::new(self, step)
}
fn chain<U>(self, other: U) -> Chain<Self, U>
where
Self: Sized,
U: Stream<Item = Self::Item> + Sized,
{
Chain::new(self, other)
}
fn enumerate(self) -> Enumerate<Self>
where
Self: Sized,
{
Enumerate::new(self)
}
fn inspect<F>(self, f: F) -> Inspect<Self, F, Self::Item>
where
Self: Sized,
F: FnMut(&Self::Item),
{
Inspect::new(self, f)
}
fn fuse(self) -> Fuse<Self>
where
Self: Sized,
{
Fuse {
stream: self,
done: false,
}
}
fn filter<P>(self, predicate: P) -> Filter<Self, P, Self::Item>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
Filter::new(self, predicate)
}
fn filter_map<B, F>(self, f: F) -> FilterMap<Self, F, Self::Item, B>
where
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
{
FilterMap::new(self, f)
}
fn min_by<F>(self, compare: F) -> MinByFuture<Self, F, Self::Item>
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
{
MinByFuture::new(self, compare)
}
fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
where
Self: Sized,
{
NthFuture::new(self, n)
}
#[inline]
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item>
where
Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool,
{
AllFuture {
stream: self,
result: true, // the default if the empty stream
_marker: PhantomData,
f,
}
}
fn find<P>(&mut self, p: P) -> FindFuture<'_, Self, P, Self::Item>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
FindFuture::new(self, p)
}
fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F, Self::Item, B>
where
Self: Sized,
F: FnMut(Self::Item) -> Option<B>,
{
FindMapFuture::new(self, f)
}
fn fold<B, F>(self, init: B, f: F) -> FoldFuture<Self, F, Self::Item, B>
where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
{
FoldFuture::new(self, init, f)
}
fn any<F>(&mut self, f: F) -> AnyFuture<'_, Self, F, Self::Item>
where
Self: Unpin + Sized,
F: FnMut(Self::Item) -> bool,
{
AnyFuture {
stream: self,
result: false, // the default if the empty stream
_marker: PhantomData,
f,
}
}
fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
where
Self: Sized,
F: FnMut(&mut St, Self::Item) -> Option<B>,
{
Scan::new(self, initial_state, f)
}
fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P, Self::Item>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
SkipWhile::new(self, predicate)
}
fn skip(self, n: usize) -> Skip<Self>
where
Self: Sized,
{
Skip::new(self, n)
}
fn zip<U>(self, other: U) -> Zip<Self, U>
where
Self: Stream + Sized,
U: Stream,
{
Zip::new(self, other)
}
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(self) -> Pin<Box<dyn Future<Output = B> + 'a>>
where
Self: Sized + 'a,
B: FromStream<Self::Item>,
{ {
FromStream::from_stream(self) FromStream::from_stream(self)
} }
} }
impl<T: futures_core::stream::Stream + ?Sized> Stream for T { impl<T: futures_core::stream::Stream + ?Sized> StreamExt for T {}
type Item = <Self as futures_core::stream::Stream>::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
futures_core::stream::Stream::poll_next(self, cx)
}
}

View file

@ -24,7 +24,7 @@ impl<S, St, F> Scan<S, St, F> {
impl<S: Unpin, St, F> Unpin for Scan<S, St, F> {} impl<S: Unpin, St, F> Unpin for Scan<S, St, F> {}
impl<S, St, F, B> futures_core::stream::Stream for Scan<S, St, F> impl<S, St, F, B> Stream for Scan<S, St, F>
where where
S: Stream, S: Stream,
F: FnMut(&mut St, S::Item) -> Option<B>, F: FnMut(&mut St, S::Item) -> Option<B>,

View file

@ -17,7 +17,7 @@ impl<S: Stream> Take<S> {
pin_utils::unsafe_unpinned!(remaining: usize); pin_utils::unsafe_unpinned!(remaining: usize);
} }
impl<S: Stream> futures_core::stream::Stream for Take<S> { impl<S: Stream> Stream for Take<S> {
type Item = S::Item; type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {

View file

@ -36,7 +36,7 @@ impl<A: Stream, B> Zip<A, B> {
pin_utils::unsafe_pinned!(second: B); pin_utils::unsafe_pinned!(second: B);
} }
impl<A: Stream, B: Stream> futures_core::stream::Stream for Zip<A, B> { impl<A: Stream, B: Stream> Stream for Zip<A, B> {
type Item = (A::Item, B::Item); type Item = (A::Item, B::Item);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {