diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index ea0caec..c6f80fe 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use crate::fs::DirEntry; use crate::future::Future; use crate::io; +use crate::stream::Stream; use crate::task::{blocking, Context, Poll}; /// Returns a stream of entries in a directory. @@ -80,7 +81,7 @@ impl ReadDir { } } -impl futures_core::stream::Stream for ReadDir { +impl Stream for ReadDir { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/io/buf_read/lines.rs b/src/io/buf_read/lines.rs index a761eb4..6cb4a07 100644 --- a/src/io/buf_read/lines.rs +++ b/src/io/buf_read/lines.rs @@ -4,6 +4,7 @@ use std::str; use super::read_until_internal; use crate::io::{self, BufRead}; +use crate::stream::Stream; use crate::task::{Context, Poll}; /// A stream of lines in a byte stream. @@ -23,7 +24,7 @@ pub struct Lines { pub(crate) read: usize, } -impl futures_core::stream::Stream for Lines { +impl Stream for Lines { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index 78142a4..ed4f1f4 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -12,6 +12,7 @@ use crate::future::{self, Future}; use crate::io; use crate::net::driver::Watcher; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use crate::stream::Stream; use crate::task::{blocking, Context, Poll}; /// A Unix domain socket server, listening for connections. @@ -185,7 +186,7 @@ impl fmt::Debug for UnixListener { #[derive(Debug)] pub struct Incoming<'a>(&'a UnixListener); -impl futures_core::stream::Stream for Incoming<'_> { +impl Stream for Incoming<'_> { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/prelude.rs b/src/prelude.rs index 6efd747..4c26a28 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -34,3 +34,5 @@ pub use crate::io::read::ReadExt as _; pub use crate::io::seek::SeekExt as _; #[doc(hidden)] pub use crate::io::write::WriteExt as _; +#[doc(hidden)] +pub use crate::stream::stream::StreamExt as _; diff --git a/src/stream/empty.rs b/src/stream/empty.rs index f4cf552..c9deea8 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -1,6 +1,7 @@ use std::marker::PhantomData; use std::pin::Pin; +use crate::stream::Stream; use crate::task::{Context, Poll}; /// Creates a stream that doesn't yield any items. @@ -35,7 +36,7 @@ pub struct Empty { _marker: PhantomData, } -impl futures_core::stream::Stream for Empty { +impl Stream for Empty { type Item = T; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { diff --git a/src/stream/into_stream.rs b/src/stream/into_stream.rs index 7d1be4a..9233181 100644 --- a/src/stream/into_stream.rs +++ b/src/stream/into_stream.rs @@ -1,4 +1,4 @@ -use futures_core::stream::Stream; +use crate::stream::Stream; /// Conversion into a `Stream`. /// diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 5e17aa5..c5da997 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -26,12 +26,13 @@ use cfg_if::cfg_if; pub use empty::{empty, Empty}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Fuse, Scan, Stream, Take, Zip}; +pub use stream::{Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, Zip}; + +pub(crate) mod stream; mod empty; mod once; mod repeat; -mod stream; cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { diff --git a/src/stream/once.rs b/src/stream/once.rs index 09811d8..133a155 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -1,5 +1,6 @@ use std::pin::Pin; +use crate::stream::Stream; use crate::task::{Context, Poll}; /// Creates a stream that yields a single item. @@ -33,7 +34,7 @@ pub struct Once { value: Option, } -impl futures_core::stream::Stream for Once { +impl Stream for Once { type Item = T; fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index c2ee97a..1a6da41 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -1,5 +1,6 @@ use std::pin::Pin; +use crate::stream::Stream; use crate::task::{Context, Poll}; /// Creates a stream that yields the same item repeatedly. @@ -36,7 +37,7 @@ pub struct Repeat { item: T, } -impl futures_core::stream::Stream for Repeat { +impl Stream for Repeat { type Item = T; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index 080d9b4..2693382 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -1,7 +1,7 @@ use std::pin::Pin; use super::fuse::Fuse; -use crate::stream::Stream; +use crate::prelude::*; use crate::task::{Context, Poll}; /// Chains two streams one after another. diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index a29fc80..7d5a3d6 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -19,7 +19,7 @@ impl Enumerate { } } -impl futures_core::stream::Stream for Enumerate +impl Stream for Enumerate where S: Stream, { diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index 68211f7..3fd5453 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -25,7 +25,7 @@ impl Filter { } } -impl futures_core::stream::Stream for Filter +impl Stream for Filter where S: Stream, P: FnMut(&S::Item) -> bool, diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index 2f27515..756efff 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -27,7 +27,7 @@ impl FilterMap { } } -impl futures_core::stream::Stream for FilterMap +impl Stream for FilterMap where S: Stream, F: FnMut(S::Item) -> Option, diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d74822a..0348b6a 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -42,17 +42,6 @@ mod step_by; mod take; mod zip; -pub use chain::Chain; -pub use filter::Filter; -pub use fuse::Fuse; -pub use inspect::Inspect; -pub use scan::Scan; -pub use skip::Skip; -pub use skip_while::SkipWhile; -pub use step_by::StepBy; -pub use take::Take; -pub use zip::Zip; - use all::AllFuture; use any::AnyFuture; use enumerate::Enumerate; @@ -64,166 +53,941 @@ use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; +pub use chain::Chain; +pub use filter::Filter; +pub use fuse::Fuse; +pub use inspect::Inspect; +pub use scan::Scan; +pub use skip::Skip; +pub use skip_while::SkipWhile; +pub use step_by::StepBy; +pub use take::Take; +pub use zip::Zip; + use std::cmp::Ordering; use std::marker::PhantomData; -use std::pin::Pin; -use std::task::{Context, Poll}; use cfg_if::cfg_if; cfg_if! { - if #[cfg(feature = "unstable")] { + if #[cfg(any(feature = "unstable", feature = "docs"))] { + use std::pin::Pin; + use crate::future::Future; + use crate::stream::FromStream; } } cfg_if! { if #[cfg(feature = "docs")] { + use std::ops::{Deref, DerefMut}; + + use crate::task::{Context, Poll}; + #[doc(hidden)] pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>); - ($a:lifetime, $f:tt, $o:ty, $t1:ty) => (ImplFuture<$a, $o>); - ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => (ImplFuture<$a, $o>); - ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => (ImplFuture<$a, $o>); - ($f:ty, $o:ty) => (ImplFuture<'static, $o>); + /// An asynchronous stream of values. + /// + /// This trait is a re-export of [`futures::stream::Stream`] and is an async version of + /// [`std::iter::Iterator`]. + /// + /// The [provided methods] do not really exist in the trait itself, but they become + /// available when the prelude is imported: + /// + /// ``` + /// # #[allow(unused_imports)] + /// use async_std::prelude::*; + /// ``` + /// + /// [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html + /// [`futures::stream::Stream`]: + /// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.Stream.html + /// [provided methods]: #provided-methods + pub trait Stream { + /// The type of items yielded by this stream. + type Item; + + /// Attempts to receive the next item from the stream. + /// + /// There are several possible return values: + /// + /// * `Poll::Pending` means this stream's next value is not ready yet. + /// * `Poll::Ready(None)` means this stream has been exhausted. + /// * `Poll::Ready(Some(item))` means `item` was received out of the stream. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::pin::Pin; + /// + /// use async_std::prelude::*; + /// use async_std::stream; + /// use async_std::task::{Context, Poll}; + /// + /// fn increment( + /// s: impl Stream + Unpin, + /// ) -> impl Stream + Unpin { + /// struct Increment(S); + /// + /// impl + Unpin> Stream for Increment { + /// type Item = S::Item; + /// + /// fn poll_next( + /// mut self: Pin<&mut Self>, + /// cx: &mut Context<'_>, + /// ) -> Poll> { + /// match Pin::new(&mut self.0).poll_next(cx) { + /// Poll::Pending => Poll::Pending, + /// Poll::Ready(None) => Poll::Ready(None), + /// Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + /// } + /// } + /// } + /// + /// Increment(s) + /// } + /// + /// let mut s = increment(stream::once(7)); + /// + /// assert_eq!(s.next().await, Some(8)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + /// Advances the stream and returns the next value. + /// + /// Returns [`None`] when iteration is finished. Individual stream implementations may + /// choose to resume iteration, and so calling `next()` again may or may not eventually + /// start returning more values. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::once(7); + /// + /// assert_eq!(s.next().await, Some(7)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn next(&mut self) -> ImplFuture<'_, Option> + where + Self: Unpin, + { + unreachable!() + } + + /// Creates a stream that yields its first `n` elements. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat(9).take(3); + /// + /// while let Some(v) = s.next().await { + /// assert_eq!(v, 9); + /// } + /// # + /// # }) } + /// ``` + fn take(self, n: usize) -> Take + where + Self: Sized, + { + unreachable!() + } + + /// Creates a stream that yields each `step`th element. + /// + /// # Panics + /// + /// This method will panic if the given step is `0`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect(); + /// let mut stepped = s.step_by(2); + /// + /// assert_eq!(stepped.next().await, Some(0)); + /// assert_eq!(stepped.next().await, Some(2)); + /// assert_eq!(stepped.next().await, Some(4)); + /// assert_eq!(stepped.next().await, None); + /// + /// # + /// # }) } + /// ``` + fn step_by(self, step: usize) -> StepBy + where + Self: Sized, + { + unreachable!() + } + + /// Takes two streams and creates a new stream over both in sequence. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let first: VecDeque<_> = vec![0u8, 1].into_iter().collect(); + /// let second: VecDeque<_> = vec![2, 3].into_iter().collect(); + /// let mut c = first.chain(second); + /// + /// assert_eq!(c.next().await, Some(0)); + /// assert_eq!(c.next().await, Some(1)); + /// assert_eq!(c.next().await, Some(2)); + /// assert_eq!(c.next().await, Some(3)); + /// assert_eq!(c.next().await, None); + /// + /// # + /// # }) } + /// ``` + fn chain(self, other: U) -> Chain + where + Self: Sized, + U: Stream + Sized, + { + unreachable!() + } + + /// Creates a stream that gives the current element's count as well as the next value. + /// + /// # Overflow behaviour. + /// + /// This combinator does no guarding against overflows. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let s: VecDeque<_> = vec!['a', 'b', 'c'].into_iter().collect(); + /// let mut s = s.enumerate(); + /// + /// assert_eq!(s.next().await, Some((0, 'a'))); + /// assert_eq!(s.next().await, Some((1, 'b'))); + /// assert_eq!(s.next().await, Some((2, 'c'))); + /// assert_eq!(s.next().await, None); + /// + /// # + /// # }) } + /// ``` + fn enumerate(self) -> Enumerate + where + Self: Sized, + { + unreachable!() + } + + /// A combinator that does something with each element in the stream, passing the value + /// on. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let a: VecDeque<_> = vec![1u8, 2, 3, 4, 5].into_iter().collect(); + /// let sum = a + /// .inspect(|x| println!("about to filter {}", x)) + /// .filter(|x| x % 2 == 0) + /// .inspect(|x| println!("made it through filter: {}", x)) + /// .fold(0, |sum, i| sum + i).await; + /// + /// assert_eq!(sum, 6); + /// # + /// # }) } + /// ``` + fn inspect(self, f: F) -> Inspect + where + Self: Sized, + F: FnMut(&Self::Item), + { + unreachable!() + } + + /// Transforms this `Stream` into a "fused" `Stream` such that after the first time + /// `poll` returns `Poll::Ready(None)`, all future calls to `poll` will also return + /// `Poll::Ready(None)`. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::once(1).fuse(); + /// assert_eq!(s.next().await, Some(1)); + /// assert_eq!(s.next().await, None); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn fuse(self) -> Fuse + where + Self: Sized, + { + unreachable!() + } + + /// Creates a stream that uses a predicate to determine if an element should be yielded. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque = vec![1, 2, 3, 4].into_iter().collect(); + /// let mut s = s.filter(|i| i % 2 == 0); + /// + /// assert_eq!(s.next().await, Some(2)); + /// assert_eq!(s.next().await, Some(4)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn filter

(self, predicate: P) -> Filter + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + unreachable!() + } + + /// Both filters and maps a stream. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect(); + /// + /// let mut parsed = s.filter_map(|a| a.parse::().ok()); + /// + /// let one = parsed.next().await; + /// assert_eq!(one, Some(1)); + /// + /// let three = parsed.next().await; + /// assert_eq!(three, Some(3)); + /// + /// let five = parsed.next().await; + /// assert_eq!(five, Some(5)); + /// + /// let end = parsed.next().await; + /// assert_eq!(end, None); + /// # + /// # }) } + /// ``` + fn filter_map(self, f: F) -> FilterMap + where + Self: Sized, + F: FnMut(Self::Item) -> Option, + { + unreachable!() + } + + /// Returns the element that gives the minimum value with respect to the + /// specified comparison function. If several elements are equally minimum, + /// the first element is returned. If the stream is empty, `None` is returned. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// + /// let min = Stream::min_by(s.clone(), |x, y| x.cmp(y)).await; + /// assert_eq!(min, Some(1)); + /// + /// let min = Stream::min_by(s, |x, y| y.cmp(x)).await; + /// assert_eq!(min, Some(3)); + /// + /// let min = Stream::min_by(VecDeque::::new(), |x, y| x.cmp(y)).await; + /// assert_eq!(min, None); + /// # + /// # }) } + /// ``` + fn min_by(self, compare: F) -> ImplFuture<'static, Option> + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + unreachable!() + } + + /// Returns the nth element of the stream. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// + /// let second = s.nth(1).await; + /// assert_eq!(second, Some(2)); + /// # + /// # }) } + /// ``` + /// Calling `nth()` multiple times: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// + /// let second = s.nth(0).await; + /// assert_eq!(second, Some(1)); + /// + /// let second = s.nth(0).await; + /// assert_eq!(second, Some(2)); + /// # + /// # }) } + /// ``` + /// Returning `None` if the stream finished before returning `n` elements: + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// + /// let fourth = s.nth(4).await; + /// assert_eq!(fourth, None); + /// # + /// # }) } + /// ``` + fn nth(&mut self, n: usize) -> ImplFuture<'_, Option> + where + Self: Sized, + { + unreachable!() + } + + /// Tests if every element of the stream matches a predicate. + /// + /// `all()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if they all return + /// `true`, then so does `all()`. If any of them return `false`, it + /// returns `false`. + /// + /// `all()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `false`, given that no matter what else happens, + /// the result will also be `false`. + /// + /// An empty stream returns `true`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat::(42).take(3); + /// assert!(s.all(|x| x == 42).await); + /// + /// # + /// # }) } + /// ``` + /// + /// Empty stream: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::empty::(); + /// assert!(s.all(|_| false).await); + /// # + /// # }) } + /// ``` + #[inline] + fn all(&mut self, f: F) -> ImplFuture<'_, bool> + where + Self: Unpin + Sized, + F: FnMut(Self::Item) -> bool, + { + unreachable!() + } + + /// Searches for an element in a stream that satisfies a predicate. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let res = s.find(|x| *x == 2).await; + /// assert_eq!(res, Some(2)); + /// # + /// # }) } + /// ``` + /// + /// Resuming after a first find: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let res = s.find(|x| *x == 2).await; + /// assert_eq!(res, Some(2)); + /// + /// let next = s.next().await; + /// assert_eq!(next, Some(3)); + /// # + /// # }) } + /// ``` + fn find

(&mut self, p: P) -> ImplFuture<'_, Option> + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + unreachable!() + } + + /// Applies function to the elements of stream and returns the first non-none result. + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let mut s: VecDeque<&str> = vec!["lol", "NaN", "2", "5"].into_iter().collect(); + /// let first_number = s.find_map(|s| s.parse().ok()).await; + /// + /// assert_eq!(first_number, Some(2)); + /// # + /// # }) } + /// ``` + fn find_map(&mut self, f: F) -> ImplFuture<'_, Option> + where + Self: Sized, + F: FnMut(Self::Item) -> Option, + { + unreachable!() + } + + /// A combinator that applies a function to every element in a stream + /// producing a single, final value. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use std::collections::VecDeque; + /// + /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let sum = s.fold(0, |acc, x| acc + x).await; + /// + /// assert_eq!(sum, 6); + /// # + /// # }) } + /// ``` + fn fold(self, init: B, f: F) -> ImplFuture<'static, B> + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + unreachable!() + } + + /// Tests if any element of the stream matches a predicate. + /// + /// `any()` takes a closure that returns `true` or `false`. It applies + /// this closure to each element of the stream, and if any of them return + /// `true`, then so does `any()`. If they all return `false`, it + /// returns `false`. + /// + /// `any()` is short-circuiting; in other words, it will stop processing + /// as soon as it finds a `true`, given that no matter what else happens, + /// the result will also be `true`. + /// + /// An empty stream returns `false`. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::repeat::(42).take(3); + /// assert!(s.any(|x| x == 42).await); + /// # + /// # }) } + /// ``` + /// + /// Empty stream: + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::empty::(); + /// assert!(!s.any(|_| false).await); + /// # + /// # }) } + /// ``` + #[inline] + fn any(&mut self, f: F) -> ImplFuture<'_, bool> + where + Self: Unpin + Sized, + F: FnMut(Self::Item) -> bool, + { + unreachable!() + } + + /// A stream adaptor similar to [`fold`] that holds internal state and produces a new + /// stream. + /// + /// [`fold`]: #method.fold + /// + /// `scan()` takes two arguments: an initial value which seeds the internal state, and + /// a closure with two arguments, the first being a mutable reference to the internal + /// state and the second a stream element. The closure can assign to the internal state + /// to share state between iterations. + /// + /// On iteration, the closure will be applied to each element of the stream and the + /// return value from the closure, an `Option`, is yielded by the stream. + /// + /// ## Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let mut s = s.scan(1, |state, x| { + /// *state = *state * x; + /// Some(-*state) + /// }); + /// + /// assert_eq!(s.next().await, Some(-1)); + /// assert_eq!(s.next().await, Some(-2)); + /// assert_eq!(s.next().await, Some(-6)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + #[inline] + fn scan(self, initial_state: St, f: F) -> Scan + where + Self: Sized, + F: FnMut(&mut St, Self::Item) -> Option, + { + unreachable!() + } + + /// Combinator that `skip`s elements based on a predicate. + /// + /// Takes a closure argument. It will call this closure on every element in + /// the stream and ignore elements until it returns `false`. + /// + /// After `false` is returned, `SkipWhile`'s job is over and all further + /// elements in the strem are yielded. + /// + /// ## Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect(); + /// let mut s = a.skip_while(|x| x.is_negative()); + /// + /// assert_eq!(s.next().await, Some(0)); + /// assert_eq!(s.next().await, Some(1)); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn skip_while

(self, predicate: P) -> SkipWhile + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + unreachable!() + } + + /// Creates a combinator that skips the first `n` elements. + /// + /// ## Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let mut skipped = s.skip(2); + /// + /// assert_eq!(skipped.next().await, Some(3)); + /// assert_eq!(skipped.next().await, None); + /// # + /// # }) } + /// ``` + fn skip(self, n: usize) -> Skip + where + Self: Sized, + { + unreachable!() + } + + /// 'Zips up' two streams into a single stream of pairs. + /// + /// `zip()` returns a new stream that will iterate over two other streams, returning a + /// tuple where the first element comes from the first stream, and the second element + /// comes from the second stream. + /// + /// In other words, it zips two streams together, into a single one. + /// + /// If either stream returns [`None`], [`poll_next`] from the zipped stream will return + /// [`None`]. If the first stream returns [`None`], `zip` will short-circuit and + /// `poll_next` will not be called on the second stream. + /// + /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + /// [`poll_next`]: #tymethod.poll_next + /// + /// ## Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use std::collections::VecDeque; + /// use async_std::stream::Stream; + /// + /// let l: VecDeque = vec![1, 2, 3].into_iter().collect(); + /// let r: VecDeque = vec![4, 5, 6, 7].into_iter().collect(); + /// let mut s = l.zip(r); + /// + /// assert_eq!(s.next().await, Some((1, 4))); + /// assert_eq!(s.next().await, Some((2, 5))); + /// assert_eq!(s.next().await, Some((3, 6))); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + #[inline] + fn zip(self, other: U) -> Zip + where + Self: Sized, + U: Stream, + { + unreachable!() + } + + /// Transforms a stream into a collection. + /// + /// `collect()` can take anything streamable, and turn it into a relevant + /// collection. This is one of the more powerful methods in the async + /// standard library, used in a variety of contexts. + /// + /// The most basic pattern in which `collect()` is used is to turn one + /// collection into another. You take a collection, call [`stream`] on it, + /// do a bunch of transformations, and then `collect()` at the end. + /// + /// Because `collect()` is so general, it can cause problems with type + /// inference. As such, `collect()` is one of the few times you'll see + /// the syntax affectionately known as the 'turbofish': `::<>`. This + /// helps the inference algorithm understand specifically which collection + /// you're trying to collect into. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let s = stream::repeat(9u8).take(3); + /// let buf: Vec = s.collect().await; + /// + /// assert_eq!(buf, vec![9; 3]); + /// + /// // You can also collect streams of Result values + /// // into any collection that implements FromStream + /// let s = stream::repeat(Ok(9)).take(3); + /// // We are using Vec here, but other collections + /// // are supported as well + /// let buf: Result, ()> = s.collect().await; + /// + /// assert_eq!(buf, Ok(vec![9; 3])); + /// + /// // The stream will stop on the first Err and + /// // return that instead + /// let s = stream::repeat(Err(5)).take(3); + /// let buf: Result, u8> = s.collect().await; + /// + /// assert_eq!(buf, Err(5)); + /// # + /// # }) } + /// ``` + /// + /// [`stream`]: trait.Stream.html#tymethod.next + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] + fn collect<'a, B>(self) -> ImplFuture<'a, B> + where + Self: Sized + 'a, + B: FromStream, + { + unreachable!() + } } - } else { - macro_rules! ret { - ($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>); - ($a:lifetime, $f:tt, $o:ty, $t1:ty) => ($f<$a, Self, $t1>); - ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => ($f<$a, Self, $t1, $t2>); - ($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty, $t3:ty) => ($f<$a, Self, $t1, $t2, $t3>); - ($f:ty, $o:ty) => ($f); + + impl Stream for Box { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } } - } -} -cfg_if! { - if #[cfg(any(feature = "unstable", feature = "docs"))] { - use crate::stream::FromStream; + impl Stream for &mut S { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + } + + impl

Stream for Pin

+ where + P: DerefMut + Unpin, +

::Target: Stream, + { + type Item = <

::Target as Stream>::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + } + + impl Stream for std::collections::VecDeque { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + } + + impl Stream for std::panic::AssertUnwindSafe { + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unreachable!() + } + } + } else { + pub use futures_core::stream::Stream; } } -/// An asynchronous stream of values. -/// -/// This trait is a re-export of [`futures::stream::Stream`] and is an async version of -/// [`std::iter::Iterator`]. -/// -/// The [provided methods] do not really exist in the trait itself, but they become available when -/// the prelude is imported: -/// -/// ``` -/// # #[allow(unused_imports)] -/// use async_std::prelude::*; -/// ``` -/// -/// [`std::iter::Iterator`]: https://doc.rust-lang.org/std/iter/trait.Iterator.html -/// [`futures::stream::Stream`]: -/// https://docs.rs/futures-preview/0.3.0-alpha.17/futures/stream/trait.Stream.html -/// [provided methods]: #provided-methods -pub trait Stream { - /// The type of items yielded by this stream. - type Item; - - /// Attempts to receive the next item from the stream. - /// - /// There are several possible return values: - /// - /// * `Poll::Pending` means this stream's next value is not ready yet. - /// * `Poll::Ready(None)` means this stream has been exhausted. - /// * `Poll::Ready(Some(item))` means `item` was received out of the stream. - /// - /// # Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::pin::Pin; - /// - /// use async_std::prelude::*; - /// use async_std::stream; - /// use async_std::task::{Context, Poll}; - /// - /// fn increment(s: impl Stream + Unpin) -> impl Stream + Unpin { - /// struct Increment(S); - /// - /// impl + Unpin> Stream for Increment { - /// type Item = S::Item; - /// - /// fn poll_next( - /// mut self: Pin<&mut Self>, - /// cx: &mut Context<'_>, - /// ) -> Poll> { - /// match Pin::new(&mut self.0).poll_next(cx) { - /// Poll::Pending => Poll::Pending, - /// Poll::Ready(None) => Poll::Ready(None), - /// Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), - /// } - /// } - /// } - /// - /// Increment(s) - /// } - /// - /// let mut s = increment(stream::once(7)); - /// - /// assert_eq!(s.next().await, Some(8)); - /// assert_eq!(s.next().await, None); - /// # - /// # }) } - /// ``` - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// Advances the stream and returns the next value. - /// - /// Returns [`None`] when iteration is finished. Individual stream implementations may - /// choose to resume iteration, and so calling `next()` again may or may not eventually - /// start returning more values. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// - /// # Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use async_std::stream; - /// - /// let mut s = stream::once(7); - /// - /// assert_eq!(s.next().await, Some(7)); - /// assert_eq!(s.next().await, None); - /// # - /// # }) } - /// ``` - fn next(&mut self) -> ret!('_, NextFuture, Option) +#[doc(hidden)] +pub trait StreamExt: futures_core::stream::Stream { + fn next(&mut self) -> NextFuture<'_, Self> where Self: Unpin, { NextFuture { stream: self } } - /// Creates a stream that yields its first `n` elements. - /// - /// # Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use async_std::stream; - /// - /// let mut s = stream::repeat(9).take(3); - /// - /// while let Some(v) = s.next().await { - /// assert_eq!(v, 9); - /// } - /// # - /// # }) } - /// ``` fn take(self, n: usize) -> Take where Self: Sized, @@ -234,33 +998,6 @@ pub trait Stream { } } - /// Creates a stream that yields each `step`th element. - /// - /// # Panics - /// - /// This method will panic if the given step is `0`. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use std::collections::VecDeque; - /// - /// let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect(); - /// let mut stepped = s.step_by(2); - /// - /// assert_eq!(stepped.next().await, Some(0)); - /// assert_eq!(stepped.next().await, Some(2)); - /// assert_eq!(stepped.next().await, Some(4)); - /// assert_eq!(stepped.next().await, None); - /// - /// # - /// # }) } - /// ``` fn step_by(self, step: usize) -> StepBy where Self: Sized, @@ -268,31 +1005,6 @@ pub trait Stream { StepBy::new(self, step) } - /// Takes two streams and creates a new stream over both in sequence. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use std::collections::VecDeque; - /// - /// let first: VecDeque<_> = vec![0u8, 1].into_iter().collect(); - /// let second: VecDeque<_> = vec![2, 3].into_iter().collect(); - /// let mut c = first.chain(second); - /// - /// assert_eq!(c.next().await, Some(0)); - /// assert_eq!(c.next().await, Some(1)); - /// assert_eq!(c.next().await, Some(2)); - /// assert_eq!(c.next().await, Some(3)); - /// assert_eq!(c.next().await, None); - /// - /// # - /// # }) } - /// ``` fn chain(self, other: U) -> Chain where Self: Sized, @@ -301,31 +1013,6 @@ pub trait Stream { Chain::new(self, other) } - /// Creates a stream that gives the current element's count as well as the next value. - /// - /// # Overflow behaviour. - /// - /// This combinator does no guarding against overflows. - /// - /// # Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use std::collections::VecDeque; - /// - /// let s: VecDeque<_> = vec!['a', 'b', 'c'].into_iter().collect(); - /// let mut s = s.enumerate(); - /// - /// assert_eq!(s.next().await, Some((0, 'a'))); - /// assert_eq!(s.next().await, Some((1, 'b'))); - /// assert_eq!(s.next().await, Some((2, 'c'))); - /// assert_eq!(s.next().await, None); - /// - /// # - /// # }) } - /// ``` fn enumerate(self) -> Enumerate where Self: Sized, @@ -333,29 +1020,6 @@ pub trait Stream { Enumerate::new(self) } - /// A combinator that does something with each element in the stream, passing the value on. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use std::collections::VecDeque; - /// - /// let a: VecDeque<_> = vec![1u8, 2, 3, 4, 5].into_iter().collect(); - /// let sum = a - /// .inspect(|x| println!("about to filter {}", x)) - /// .filter(|x| x % 2 == 0) - /// .inspect(|x| println!("made it through filter: {}", x)) - /// .fold(0, |sum, i| sum + i).await; - /// - /// assert_eq!(sum, 6); - /// # - /// # }) } - /// ``` fn inspect(self, f: F) -> Inspect where Self: Sized, @@ -364,25 +1028,6 @@ pub trait Stream { Inspect::new(self, f) } - /// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll` - /// returns `Poll::Ready(None)`, all future calls to `poll` will also return - /// `Poll::Ready(None)`. - /// - /// # Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use async_std::stream; - /// - /// let mut s = stream::once(1).fuse(); - /// assert_eq!(s.next().await, Some(1)); - /// assert_eq!(s.next().await, None); - /// assert_eq!(s.next().await, None); - /// # - /// # }) } - /// ``` fn fuse(self) -> Fuse where Self: Sized, @@ -393,28 +1038,6 @@ pub trait Stream { } } - /// Creates a stream that uses a predicate to determine if an element - /// should be yeilded. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// - /// let s: VecDeque = vec![1, 2, 3, 4].into_iter().collect(); - /// let mut s = s.filter(|i| i % 2 == 0); - /// - /// assert_eq!(s.next().await, Some(2)); - /// assert_eq!(s.next().await, Some(4)); - /// assert_eq!(s.next().await, None); - /// # - /// # }) } - /// ``` fn filter

(self, predicate: P) -> Filter where Self: Sized, @@ -423,36 +1046,6 @@ pub trait Stream { Filter::new(self, predicate) } - /// Both filters and maps a stream. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// - /// let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect(); - /// - /// let mut parsed = s.filter_map(|a| a.parse::().ok()); - /// - /// let one = parsed.next().await; - /// assert_eq!(one, Some(1)); - /// - /// let three = parsed.next().await; - /// assert_eq!(three, Some(3)); - /// - /// let five = parsed.next().await; - /// assert_eq!(five, Some(5)); - /// - /// let end = parsed.next().await; - /// assert_eq!(end, None); - /// # - /// # }) } - /// ``` fn filter_map(self, f: F) -> FilterMap where Self: Sized, @@ -461,32 +1054,7 @@ pub trait Stream { FilterMap::new(self, f) } - /// Returns the element that gives the minimum value with respect to the - /// specified comparison function. If several elements are equally minimum, - /// the first element is returned. If the stream is empty, `None` is returned. - /// - /// # Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// - /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// - /// let min = Stream::min_by(s.clone(), |x, y| x.cmp(y)).await; - /// assert_eq!(min, Some(1)); - /// - /// let min = Stream::min_by(s, |x, y| y.cmp(x)).await; - /// assert_eq!(min, Some(3)); - /// - /// let min = Stream::min_by(VecDeque::::new(), |x, y| x.cmp(y)).await; - /// assert_eq!(min, None); - /// # - /// # }) } - /// ``` - fn min_by(self, compare: F) -> ret!(MinByFuture, Self::Item) + fn min_by(self, compare: F) -> MinByFuture where Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering, @@ -494,109 +1062,15 @@ pub trait Stream { MinByFuture::new(self, compare) } - /// Returns the nth element of the stream. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// - /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// - /// let second = s.nth(1).await; - /// assert_eq!(second, Some(2)); - /// # - /// # }) } - /// ``` - /// Calling `nth()` multiple times: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// - /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// - /// let second = s.nth(0).await; - /// assert_eq!(second, Some(1)); - /// - /// let second = s.nth(0).await; - /// assert_eq!(second, Some(2)); - /// # - /// # }) } - /// ``` - /// Returning `None` if the stream finished before returning `n` elements: - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// - /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// - /// let fourth = s.nth(4).await; - /// assert_eq!(fourth, None); - /// # - /// # }) } - /// ``` - fn nth(&mut self, n: usize) -> ret!('_, NthFuture, Option) + fn nth(&mut self, n: usize) -> NthFuture<'_, Self> where Self: Sized, { NthFuture::new(self, n) } - /// Tests if every element of the stream matches a predicate. - /// - /// `all()` takes a closure that returns `true` or `false`. It applies - /// this closure to each element of the stream, and if they all return - /// `true`, then so does `all()`. If any of them return `false`, it - /// returns `false`. - /// - /// `all()` is short-circuiting; in other words, it will stop processing - /// as soon as it finds a `false`, given that no matter what else happens, - /// the result will also be `false`. - /// - /// An empty stream returns `true`. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use async_std::stream; - /// - /// let mut s = stream::repeat::(42).take(3); - /// assert!(s.all(|x| x == 42).await); - /// - /// # - /// # }) } - /// ``` - /// - /// Empty stream: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use async_std::stream; - /// - /// let mut s = stream::empty::(); - /// assert!(s.all(|_| false).await); - /// # - /// # }) } - /// ``` #[inline] - fn all(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item) + fn all(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item> where Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, @@ -609,43 +1083,7 @@ pub trait Stream { } } - /// Searches for an element in a stream that satisfies a predicate. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use std::collections::VecDeque; - /// - /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// let res = s.find(|x| *x == 2).await; - /// assert_eq!(res, Some(2)); - /// # - /// # }) } - /// ``` - /// - /// Resuming after a first find: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use std::collections::VecDeque; - /// - /// let mut s: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// let res = s.find(|x| *x == 2).await; - /// assert_eq!(res, Some(2)); - /// - /// let next = s.next().await; - /// assert_eq!(next, Some(3)); - /// # - /// # }) } - /// ``` - fn find

(&mut self, p: P) -> ret!('_, FindFuture, Option, P, Self::Item) + fn find

(&mut self, p: P) -> FindFuture<'_, Self, P, Self::Item> where Self: Sized, P: FnMut(&Self::Item) -> bool, @@ -653,22 +1091,7 @@ pub trait Stream { FindFuture::new(self, p) } - /// Applies function to the elements of stream and returns the first non-none result. - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use std::collections::VecDeque; - /// - /// let mut s: VecDeque<&str> = vec!["lol", "NaN", "2", "5"].into_iter().collect(); - /// let first_number = s.find_map(|s| s.parse().ok()).await; - /// - /// assert_eq!(first_number, Some(2)); - /// # - /// # }) } - /// ``` - fn find_map(&mut self, f: F) -> ret!('_, FindMapFuture, Option, F, Self::Item, B) + fn find_map(&mut self, f: F) -> FindMapFuture<'_, Self, F, Self::Item, B> where Self: Sized, F: FnMut(Self::Item) -> Option, @@ -676,27 +1099,7 @@ pub trait Stream { FindMapFuture::new(self, f) } - /// A combinator that applies a function to every element in a stream - /// producing a single, final value. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use std::collections::VecDeque; - /// - /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// let sum = s.fold(0, |acc, x| acc + x).await; - /// - /// assert_eq!(sum, 6); - /// # - /// # }) } - /// ``` - fn fold(self, init: B, f: F) -> ret!(FoldFuture, Self::Item) + fn fold(self, init: B, f: F) -> FoldFuture where Self: Sized, F: FnMut(B, Self::Item) -> B, @@ -704,50 +1107,7 @@ pub trait Stream { FoldFuture::new(self, init, f) } - /// Tests if any element of the stream matches a predicate. - /// - /// `any()` takes a closure that returns `true` or `false`. It applies - /// this closure to each element of the stream, and if any of them return - /// `true`, then so does `any()`. If they all return `false`, it - /// returns `false`. - /// - /// `any()` is short-circuiting; in other words, it will stop processing - /// as soon as it finds a `true`, given that no matter what else happens, - /// the result will also be `true`. - /// - /// An empty stream returns `false`. - /// - /// # Examples - /// - /// Basic usage: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use async_std::stream; - /// - /// let mut s = stream::repeat::(42).take(3); - /// assert!(s.any(|x| x == 42).await); - /// # - /// # }) } - /// ``` - /// - /// Empty stream: - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use async_std::stream; - /// - /// let mut s = stream::empty::(); - /// assert!(!s.any(|_| false).await); - /// # - /// # }) } - /// ``` - #[inline] - fn any(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item) + fn any(&mut self, f: F) -> AnyFuture<'_, Self, F, Self::Item> where Self: Unpin + Sized, F: FnMut(Self::Item) -> bool, @@ -760,40 +1120,6 @@ pub trait Stream { } } - /// A stream adaptor similar to [`fold`] that holds internal state and produces a new stream. - /// - /// [`fold`]: #method.fold - /// - /// `scan()` takes two arguments: an initial value which seeds the internal state, and a - /// closure with two arguments, the first being a mutable reference to the internal state and - /// the second a stream element. The closure can assign to the internal state to share state - /// between iterations. - /// - /// On iteration, the closure will be applied to each element of the stream and the return - /// value from the closure, an `Option`, is yielded by the stream. - /// - /// ## Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// - /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// let mut s = s.scan(1, |state, x| { - /// *state = *state * x; - /// Some(-*state) - /// }); - /// - /// assert_eq!(s.next().await, Some(-1)); - /// assert_eq!(s.next().await, Some(-2)); - /// assert_eq!(s.next().await, Some(-6)); - /// assert_eq!(s.next().await, None); - /// # - /// # }) } - /// ``` - #[inline] fn scan(self, initial_state: St, f: F) -> Scan where Self: Sized, @@ -802,31 +1128,6 @@ pub trait Stream { Scan::new(self, initial_state, f) } - /// Combinator that `skip`s elements based on a predicate. - /// - /// Takes a closure argument. It will call this closure on every element in - /// the stream and ignore elements until it returns `false`. - /// - /// After `false` is returned, `SkipWhile`'s job is over and all further - /// elements in the strem are yeilded. - /// - /// ## Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// - /// let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect(); - /// let mut s = a.skip_while(|x| x.is_negative()); - /// - /// assert_eq!(s.next().await, Some(0)); - /// assert_eq!(s.next().await, Some(1)); - /// assert_eq!(s.next().await, None); - /// # - /// # }) } - /// ``` fn skip_while

(self, predicate: P) -> SkipWhile where Self: Sized, @@ -835,23 +1136,6 @@ pub trait Stream { SkipWhile::new(self, predicate) } - /// Creates a combinator that skips the first `n` elements. - /// - /// ## Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// let mut skipped = s.skip(2); - /// - /// assert_eq!(skipped.next().await, Some(3)); - /// assert_eq!(skipped.next().await, None); - /// # - /// # }) } - /// ``` fn skip(self, n: usize) -> Skip where Self: Sized, @@ -859,114 +1143,24 @@ pub trait Stream { Skip::new(self, n) } - /// 'Zips up' two streams into a single stream of pairs. - /// - /// `zip()` returns a new stream that will iterate over two other streams, returning a tuple - /// where the first element comes from the first stream, and the second element comes from the - /// second stream. - /// - /// In other words, it zips two streams together, into a single one. - /// - /// If either stream returns [`None`], [`poll_next`] from the zipped stream will return - /// [`None`]. If the first stream returns [`None`], `zip` will short-circuit and `poll_next` - /// will not be called on the second stream. - /// - /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None - /// [`poll_next`]: #tymethod.poll_next - /// - /// ## Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use std::collections::VecDeque; - /// use async_std::stream::Stream; - /// - /// let l: VecDeque = vec![1, 2, 3].into_iter().collect(); - /// let r: VecDeque = vec![4, 5, 6, 7].into_iter().collect(); - /// let mut s = l.zip(r); - /// - /// assert_eq!(s.next().await, Some((1, 4))); - /// assert_eq!(s.next().await, Some((2, 5))); - /// assert_eq!(s.next().await, Some((3, 6))); - /// assert_eq!(s.next().await, None); - /// # - /// # }) } - /// ``` - #[inline] fn zip(self, other: U) -> Zip where - Self: Sized, + Self: Stream + Sized, U: Stream, { Zip::new(self, other) } - /// Transforms a stream into a collection. - /// - /// `collect()` can take anything streamable, and turn it into a relevant - /// collection. This is one of the more powerful methods in the async - /// standard library, used in a variety of contexts. - /// - /// The most basic pattern in which `collect()` is used is to turn one - /// collection into another. You take a collection, call [`stream`] on it, - /// do a bunch of transformations, and then `collect()` at the end. - /// - /// Because `collect()` is so general, it can cause problems with type - /// inference. As such, `collect()` is one of the few times you'll see - /// the syntax affectionately known as the 'turbofish': `::<>`. This - /// helps the inference algorithm understand specifically which collection - /// you're trying to collect into. - /// - /// # Examples - /// - /// ``` - /// # fn main() { async_std::task::block_on(async { - /// # - /// use async_std::prelude::*; - /// use async_std::stream; - /// - /// let s = stream::repeat(9u8).take(3); - /// let buf: Vec = s.collect().await; - /// - /// assert_eq!(buf, vec![9; 3]); - /// - /// // You can also collect streams of Result values - /// // into any collection that implements FromStream - /// let s = stream::repeat(Ok(9)).take(3); - /// // We are using Vec here, but other collections - /// // are supported as well - /// let buf: Result, ()> = s.collect().await; - /// - /// assert_eq!(buf, Ok(vec![9; 3])); - /// - /// // The stream will stop on the first Err and - /// // return that instead - /// let s = stream::repeat(Err(5)).take(3); - /// let buf: Result, u8> = s.collect().await; - /// - /// assert_eq!(buf, Err(5)); - /// # - /// # }) } - /// ``` - /// - /// [`stream`]: trait.Stream.html#tymethod.next #[cfg(any(feature = "unstable", feature = "docs"))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] - fn collect<'a, B>(self) -> ret!(Pin + 'a>>, B) + fn collect<'a, B>(self) -> Pin + 'a>> where - Self: futures_core::stream::Stream + Sized + 'a, - B: FromStream<::Item>, + Self: Sized + 'a, + B: FromStream, { FromStream::from_stream(self) } } -impl Stream for T { - type Item = ::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - futures_core::stream::Stream::poll_next(self, cx) - } -} +impl StreamExt for T {} diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index 222022b..7897516 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -24,7 +24,7 @@ impl Scan { impl Unpin for Scan {} -impl futures_core::stream::Stream for Scan +impl Stream for Scan where S: Stream, F: FnMut(&mut St, S::Item) -> Option, diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 0dea1d0..81d48d2 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -17,7 +17,7 @@ impl Take { pin_utils::unsafe_unpinned!(remaining: usize); } -impl futures_core::stream::Stream for Take { +impl Stream for Take { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 05f9967..4c66aef 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -36,7 +36,7 @@ impl Zip { pin_utils::unsafe_pinned!(second: B); } -impl futures_core::stream::Stream for Zip { +impl Stream for Zip { type Item = (A::Item, B::Item); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> {