From 6c4c958abc827c7f9b9d846ecaac86988b5e1249 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 29 Aug 2019 11:32:43 +0200 Subject: [PATCH 1/5] from/into stream Signed-off-by: Yoshua Wuyts update examples Signed-off-by: Yoshua Wuyts impl collect Signed-off-by: Yoshua Wuyts compiles! Signed-off-by: Yoshua Wuyts layout base for collect into vec Signed-off-by: Yoshua Wuyts fmt Signed-off-by: Yoshua Wuyts progress Signed-off-by: Yoshua Wuyts compiles! Signed-off-by: Yoshua Wuyts define failing test Signed-off-by: Yoshua Wuyts cargo fmt Signed-off-by: Yoshua Wuyts stuck again Signed-off-by: Yoshua Wuyts fix trait bounds! Signed-off-by: Yoshua Wuyts cargo fmt Signed-off-by: Yoshua Wuyts hide dyn fut impl Signed-off-by: Yoshua Wuyts dyn ret for vec Signed-off-by: Yoshua Wuyts cargo fmt Signed-off-by: Yoshua Wuyts collect docs Signed-off-by: Yoshua Wuyts remove macro from vec::from_stream Signed-off-by: Yoshua Wuyts shorten collect trait bound Signed-off-by: Yoshua Wuyts Remove some Unpin and Send bounds Signed-off-by: Yoshua Wuyts --- src/lib.rs | 1 + src/stream/from_stream.rs | 28 +++++++++++++++++ src/stream/into_stream.rs | 35 ++++++++++++++++++++++ src/stream/mod.rs | 4 +++ src/stream/stream/mod.rs | 63 +++++++++++++++++++++++++++++++++++++-- src/vec/from_stream.rs | 25 ++++++++++++++++ src/vec/mod.rs | 9 ++++++ 7 files changed, 162 insertions(+), 3 deletions(-) create mode 100644 src/stream/from_stream.rs create mode 100644 src/stream/into_stream.rs create mode 100644 src/vec/from_stream.rs create mode 100644 src/vec/mod.rs diff --git a/src/lib.rs b/src/lib.rs index 3bb35c01..83365170 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,7 @@ pub mod prelude; pub mod stream; pub mod sync; pub mod task; +mod vec; #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg(feature = "unstable")] diff --git a/src/stream/from_stream.rs b/src/stream/from_stream.rs new file mode 100644 index 00000000..8b9ad262 --- /dev/null +++ b/src/stream/from_stream.rs @@ -0,0 +1,28 @@ +use super::IntoStream; + +use std::pin::Pin; + +/// Conversion from a `Stream`. +/// +/// By implementing `FromStream` for a type, you define how it will be created from a stream. +/// This is common for types which describe a collection of some kind. +/// +/// See also: [`IntoStream`]. +/// +/// [`IntoStream`]: trait.IntoStream.html +pub trait FromStream { + /// Creates a value from a stream. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// // use async_std::stream::FromStream; + /// + /// // let _five_fives = async_std::stream::repeat(5).take(5); + /// ``` + fn from_stream<'a, S: IntoStream + 'a>( + stream: S, + ) -> Pin + 'a>>; +} diff --git a/src/stream/into_stream.rs b/src/stream/into_stream.rs new file mode 100644 index 00000000..63fb558d --- /dev/null +++ b/src/stream/into_stream.rs @@ -0,0 +1,35 @@ +use futures_core::stream::Stream; + +/// Conversion into a `Stream`. +/// +/// By implementing `IntoIterator` for a type, you define how it will be +/// converted to an iterator. This is common for types which describe a +/// collection of some kind. +/// +/// [`from_stream`]: #tymethod.from_stream +/// [`Stream`]: trait.Stream.html +/// [`collect`]: trait.Stream.html#method.collect +/// +/// See also: [`FromStream`]. +/// +/// [`FromStream`]: trait.FromStream.html +pub trait IntoStream { + /// The type of the elements being iterated over. + type Item; + + /// Which kind of stream are we turning this into? + type IntoStream: Stream; + + /// Creates a stream from a value. + fn into_stream(self) -> Self::IntoStream; +} + +impl IntoStream for I { + type Item = I::Item; + type IntoStream = I; + + #[inline] + fn into_stream(self) -> I { + self + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f749ebe5..a08c8278 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -23,12 +23,16 @@ pub use double_ended_stream::DoubleEndedStream; pub use empty::{empty, Empty}; +pub use from_stream::FromStream; +pub use into_stream::IntoStream; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use stream::{Scan, Stream, Take, Zip}; mod double_ended_stream; mod empty; +mod from_stream; +mod into_stream; mod once; mod repeat; mod stream; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 578dd567..88b54520 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -50,14 +50,15 @@ use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; +use super::from_stream::FromStream; +use crate::future::Future; +use crate::task::{Context, Poll}; use std::cmp::Ordering; use std::marker::PhantomData; use std::pin::Pin; use cfg_if::cfg_if; -use crate::task::{Context, Poll}; - cfg_if! { if #[cfg(feature = "docs")] { #[doc(hidden)] @@ -80,6 +81,21 @@ cfg_if! { } } +cfg_if! { + if #[cfg(feature = "docs")] { + #[doc(hidden)] + pub struct DynFuture<'a, T>(std::marker::PhantomData<&'a T>); + + macro_rules! dyn_ret { + ($a:lifetime, $o:ty) => (DynFuture<$a, $o>); + } + } else { + macro_rules! dyn_ret { + ($a:lifetime, $o:ty) => (Pin + 'a>>) + } + } +} + /// An asynchronous stream of values. /// /// This trait is an async version of [`std::iter::Iterator`]. @@ -528,7 +544,6 @@ pub trait Stream { /// /// Basic usage: /// - /// ``` /// # fn main() { async_std::task::block_on(async { /// # /// use async_std::prelude::*; @@ -652,6 +667,48 @@ pub trait Stream { { Zip::new(self, other) } + + /// Transforms a stream into a collection. + /// + /// `collect()` can take anything streamable, and turn it into a relevant + /// collection. This is one of the more powerful methods in the async + /// standard library, used in a variety of contexts. + /// + /// The most basic pattern in which `collect()` is used is to turn one + /// collection into another. You take a collection, call [`stream`] on it, + /// do a bunch of transformations, and then `collect()` at the end. + /// + /// Because `collect()` is so general, it can cause problems with type + /// inference. As such, `collect()` is one of the few times you'll see + /// the syntax affectionately known as the 'turbofish': `::<>`. This + /// helps the inference algorithm understand specifically which collection + /// you're trying to collect into. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let s = stream::repeat(9u8).take(3); + /// let buf: Vec = s.collect().await; + /// + /// assert_eq!(buf, vec![9; 3]); + /// # + /// # }) } + /// ``` + /// + /// [`stream`]: trait.Stream.html#tymethod.next + #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] + fn collect<'a, B>(self) -> dyn_ret!('a, B) + where + Self: futures_core::stream::Stream + Sized + 'a, + B: FromStream<::Item>, + { + FromStream::from_stream(self) + } } impl Stream for T { diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs new file mode 100644 index 00000000..ce2afc50 --- /dev/null +++ b/src/vec/from_stream.rs @@ -0,0 +1,25 @@ +use crate::stream::{FromStream, IntoStream, Stream}; + +use std::pin::Pin; + +impl FromStream for Vec { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Pin::from(Box::new(async move { + pin_utils::pin_mut!(stream); + + let mut out = vec![]; + while let Some(item) = stream.next().await { + out.push(item); + } + out + })) + } +} diff --git a/src/vec/mod.rs b/src/vec/mod.rs new file mode 100644 index 00000000..725fc8f7 --- /dev/null +++ b/src/vec/mod.rs @@ -0,0 +1,9 @@ +//! The Rust core allocation and collections library +//! +//! This library provides smart pointers and collections for managing +//! heap-allocated values. + +mod from_stream; + +#[doc(inline)] +pub use std::vec::Vec; From 6ee3f6cf9c9fece9f36af390f869b324dfd95663 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sat, 7 Sep 2019 03:19:47 +0200 Subject: [PATCH 2/5] tests pass again Signed-off-by: Yoshua Wuyts --- src/stream/from_stream.rs | 6 +++--- src/stream/into_stream.rs | 4 ++-- src/stream/stream/mod.rs | 7 ++++--- src/vec/from_stream.rs | 6 +++--- 4 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/stream/from_stream.rs b/src/stream/from_stream.rs index 8b9ad262..7a262f7a 100644 --- a/src/stream/from_stream.rs +++ b/src/stream/from_stream.rs @@ -10,7 +10,7 @@ use std::pin::Pin; /// See also: [`IntoStream`]. /// /// [`IntoStream`]: trait.IntoStream.html -pub trait FromStream { +pub trait FromStream { /// Creates a value from a stream. /// /// # Examples @@ -22,7 +22,7 @@ pub trait FromStream { /// /// // let _five_fives = async_std::stream::repeat(5).take(5); /// ``` - fn from_stream<'a, S: IntoStream + 'a>( + fn from_stream<'a, S: IntoStream + Send + 'a>( stream: S, - ) -> Pin + 'a>>; + ) -> Pin + Send + 'a>>; } diff --git a/src/stream/into_stream.rs b/src/stream/into_stream.rs index 63fb558d..42f95299 100644 --- a/src/stream/into_stream.rs +++ b/src/stream/into_stream.rs @@ -18,13 +18,13 @@ pub trait IntoStream { type Item; /// Which kind of stream are we turning this into? - type IntoStream: Stream; + type IntoStream: Stream + Send; /// Creates a stream from a value. fn into_stream(self) -> Self::IntoStream; } -impl IntoStream for I { +impl IntoStream for I { type Item = I::Item; type IntoStream = I; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 88b54520..529b7cfb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -91,7 +91,7 @@ cfg_if! { } } else { macro_rules! dyn_ret { - ($a:lifetime, $o:ty) => (Pin + 'a>>) + ($a:lifetime, $o:ty) => (Pin + Send + 'a>>) } } } @@ -544,6 +544,7 @@ pub trait Stream { /// /// Basic usage: /// + /// ``` /// # fn main() { async_std::task::block_on(async { /// # /// use async_std::prelude::*; @@ -551,7 +552,6 @@ pub trait Stream { /// /// let mut s = stream::repeat::(42).take(3); /// assert!(s.any(|x| x == 42).await); - /// /// # /// # }) } /// ``` @@ -704,7 +704,8 @@ pub trait Stream { #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] fn collect<'a, B>(self) -> dyn_ret!('a, B) where - Self: futures_core::stream::Stream + Sized + 'a, + Self: futures_core::stream::Stream + Sized + Send + 'a, + ::Item: Send, B: FromStream<::Item>, { FromStream::from_stream(self) diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index ce2afc50..f603d0dc 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -2,13 +2,13 @@ use crate::stream::{FromStream, IntoStream, Stream}; use std::pin::Pin; -impl FromStream for Vec { +impl FromStream for Vec { #[inline] fn from_stream<'a, S: IntoStream>( stream: S, - ) -> Pin + 'a>> + ) -> Pin + Send + 'a>> where - ::IntoStream: 'a, + ::IntoStream: Send + 'a, { let stream = stream.into_stream(); From cb7f3dd3767dd2467684b1ff1db60deda828d2c7 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 10 Sep 2019 19:38:36 +0200 Subject: [PATCH 3/5] remove unused types Signed-off-by: Yoshua Wuyts --- src/stream/stream/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 529b7cfb..29b51d99 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -51,8 +51,6 @@ use next::NextFuture; use nth::NthFuture; use super::from_stream::FromStream; -use crate::future::Future; -use crate::task::{Context, Poll}; use std::cmp::Ordering; use std::marker::PhantomData; use std::pin::Pin; From e6a3160c8b2c86ebfc687feef35ddeb3baedf75c Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Fri, 13 Sep 2019 02:49:48 +0200 Subject: [PATCH 4/5] add unstable cfg to FromStream/IntoStream Signed-off-by: Yoshua Wuyts --- src/stream/from_stream.rs | 1 + src/stream/into_stream.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/stream/from_stream.rs b/src/stream/from_stream.rs index 7a262f7a..91d3e24b 100644 --- a/src/stream/from_stream.rs +++ b/src/stream/from_stream.rs @@ -10,6 +10,7 @@ use std::pin::Pin; /// See also: [`IntoStream`]. /// /// [`IntoStream`]: trait.IntoStream.html +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait FromStream { /// Creates a value from a stream. /// diff --git a/src/stream/into_stream.rs b/src/stream/into_stream.rs index 42f95299..b2913170 100644 --- a/src/stream/into_stream.rs +++ b/src/stream/into_stream.rs @@ -13,6 +13,7 @@ use futures_core::stream::Stream; /// See also: [`FromStream`]. /// /// [`FromStream`]: trait.FromStream.html +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait IntoStream { /// The type of the elements being iterated over. type Item; From 98927a79a9bf6984680a505bb32522e108e86f33 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 17 Sep 2019 18:00:40 +0200 Subject: [PATCH 5/5] rebase Signed-off-by: Yoshua Wuyts --- src/stream/stream/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 29b51d99..ca83fbb8 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -54,6 +54,7 @@ use super::from_stream::FromStream; use std::cmp::Ordering; use std::marker::PhantomData; use std::pin::Pin; +use std::task::{Context, Poll}; use cfg_if::cfg_if;