From 6c4c958abc827c7f9b9d846ecaac86988b5e1249 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Thu, 29 Aug 2019 11:32:43 +0200 Subject: [PATCH] from/into stream Signed-off-by: Yoshua Wuyts update examples Signed-off-by: Yoshua Wuyts impl collect Signed-off-by: Yoshua Wuyts compiles! Signed-off-by: Yoshua Wuyts layout base for collect into vec Signed-off-by: Yoshua Wuyts fmt Signed-off-by: Yoshua Wuyts progress Signed-off-by: Yoshua Wuyts compiles! Signed-off-by: Yoshua Wuyts define failing test Signed-off-by: Yoshua Wuyts cargo fmt Signed-off-by: Yoshua Wuyts stuck again Signed-off-by: Yoshua Wuyts fix trait bounds! Signed-off-by: Yoshua Wuyts cargo fmt Signed-off-by: Yoshua Wuyts hide dyn fut impl Signed-off-by: Yoshua Wuyts dyn ret for vec Signed-off-by: Yoshua Wuyts cargo fmt Signed-off-by: Yoshua Wuyts collect docs Signed-off-by: Yoshua Wuyts remove macro from vec::from_stream Signed-off-by: Yoshua Wuyts shorten collect trait bound Signed-off-by: Yoshua Wuyts Remove some Unpin and Send bounds Signed-off-by: Yoshua Wuyts --- src/lib.rs | 1 + src/stream/from_stream.rs | 28 +++++++++++++++++ src/stream/into_stream.rs | 35 ++++++++++++++++++++++ src/stream/mod.rs | 4 +++ src/stream/stream/mod.rs | 63 +++++++++++++++++++++++++++++++++++++-- src/vec/from_stream.rs | 25 ++++++++++++++++ src/vec/mod.rs | 9 ++++++ 7 files changed, 162 insertions(+), 3 deletions(-) create mode 100644 src/stream/from_stream.rs create mode 100644 src/stream/into_stream.rs create mode 100644 src/vec/from_stream.rs create mode 100644 src/vec/mod.rs diff --git a/src/lib.rs b/src/lib.rs index 3bb35c0..8336517 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,6 +51,7 @@ pub mod prelude; pub mod stream; pub mod sync; pub mod task; +mod vec; #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg(feature = "unstable")] diff --git a/src/stream/from_stream.rs b/src/stream/from_stream.rs new file mode 100644 index 0000000..8b9ad26 --- /dev/null +++ b/src/stream/from_stream.rs @@ -0,0 +1,28 @@ +use super::IntoStream; + +use std::pin::Pin; + +/// Conversion from a `Stream`. +/// +/// By implementing `FromStream` for a type, you define how it will be created from a stream. +/// This is common for types which describe a collection of some kind. +/// +/// See also: [`IntoStream`]. +/// +/// [`IntoStream`]: trait.IntoStream.html +pub trait FromStream { + /// Creates a value from a stream. + /// + /// # Examples + /// + /// Basic usage: + /// + /// ``` + /// // use async_std::stream::FromStream; + /// + /// // let _five_fives = async_std::stream::repeat(5).take(5); + /// ``` + fn from_stream<'a, S: IntoStream + 'a>( + stream: S, + ) -> Pin + 'a>>; +} diff --git a/src/stream/into_stream.rs b/src/stream/into_stream.rs new file mode 100644 index 0000000..63fb558 --- /dev/null +++ b/src/stream/into_stream.rs @@ -0,0 +1,35 @@ +use futures_core::stream::Stream; + +/// Conversion into a `Stream`. +/// +/// By implementing `IntoIterator` for a type, you define how it will be +/// converted to an iterator. This is common for types which describe a +/// collection of some kind. +/// +/// [`from_stream`]: #tymethod.from_stream +/// [`Stream`]: trait.Stream.html +/// [`collect`]: trait.Stream.html#method.collect +/// +/// See also: [`FromStream`]. +/// +/// [`FromStream`]: trait.FromStream.html +pub trait IntoStream { + /// The type of the elements being iterated over. + type Item; + + /// Which kind of stream are we turning this into? + type IntoStream: Stream; + + /// Creates a stream from a value. + fn into_stream(self) -> Self::IntoStream; +} + +impl IntoStream for I { + type Item = I::Item; + type IntoStream = I; + + #[inline] + fn into_stream(self) -> I { + self + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index f749ebe..a08c827 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -23,12 +23,16 @@ pub use double_ended_stream::DoubleEndedStream; pub use empty::{empty, Empty}; +pub use from_stream::FromStream; +pub use into_stream::IntoStream; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use stream::{Scan, Stream, Take, Zip}; mod double_ended_stream; mod empty; +mod from_stream; +mod into_stream; mod once; mod repeat; mod stream; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 578dd56..88b5452 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -50,14 +50,15 @@ use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; +use super::from_stream::FromStream; +use crate::future::Future; +use crate::task::{Context, Poll}; use std::cmp::Ordering; use std::marker::PhantomData; use std::pin::Pin; use cfg_if::cfg_if; -use crate::task::{Context, Poll}; - cfg_if! { if #[cfg(feature = "docs")] { #[doc(hidden)] @@ -80,6 +81,21 @@ cfg_if! { } } +cfg_if! { + if #[cfg(feature = "docs")] { + #[doc(hidden)] + pub struct DynFuture<'a, T>(std::marker::PhantomData<&'a T>); + + macro_rules! dyn_ret { + ($a:lifetime, $o:ty) => (DynFuture<$a, $o>); + } + } else { + macro_rules! dyn_ret { + ($a:lifetime, $o:ty) => (Pin + 'a>>) + } + } +} + /// An asynchronous stream of values. /// /// This trait is an async version of [`std::iter::Iterator`]. @@ -528,7 +544,6 @@ pub trait Stream { /// /// Basic usage: /// - /// ``` /// # fn main() { async_std::task::block_on(async { /// # /// use async_std::prelude::*; @@ -652,6 +667,48 @@ pub trait Stream { { Zip::new(self, other) } + + /// Transforms a stream into a collection. + /// + /// `collect()` can take anything streamable, and turn it into a relevant + /// collection. This is one of the more powerful methods in the async + /// standard library, used in a variety of contexts. + /// + /// The most basic pattern in which `collect()` is used is to turn one + /// collection into another. You take a collection, call [`stream`] on it, + /// do a bunch of transformations, and then `collect()` at the end. + /// + /// Because `collect()` is so general, it can cause problems with type + /// inference. As such, `collect()` is one of the few times you'll see + /// the syntax affectionately known as the 'turbofish': `::<>`. This + /// helps the inference algorithm understand specifically which collection + /// you're trying to collect into. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let s = stream::repeat(9u8).take(3); + /// let buf: Vec = s.collect().await; + /// + /// assert_eq!(buf, vec![9; 3]); + /// # + /// # }) } + /// ``` + /// + /// [`stream`]: trait.Stream.html#tymethod.next + #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] + fn collect<'a, B>(self) -> dyn_ret!('a, B) + where + Self: futures_core::stream::Stream + Sized + 'a, + B: FromStream<::Item>, + { + FromStream::from_stream(self) + } } impl Stream for T { diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs new file mode 100644 index 0000000..ce2afc5 --- /dev/null +++ b/src/vec/from_stream.rs @@ -0,0 +1,25 @@ +use crate::stream::{FromStream, IntoStream, Stream}; + +use std::pin::Pin; + +impl FromStream for Vec { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Pin::from(Box::new(async move { + pin_utils::pin_mut!(stream); + + let mut out = vec![]; + while let Some(item) = stream.next().await { + out.push(item); + } + out + })) + } +} diff --git a/src/vec/mod.rs b/src/vec/mod.rs new file mode 100644 index 0000000..725fc8f --- /dev/null +++ b/src/vec/mod.rs @@ -0,0 +1,9 @@ +//! The Rust core allocation and collections library +//! +//! This library provides smart pointers and collections for managing +//! heap-allocated values. + +mod from_stream; + +#[doc(inline)] +pub use std::vec::Vec;