from/into stream

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

update examples

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

impl collect

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

compiles!

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

layout base for collect into vec

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

fmt

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

progress

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

compiles!

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

define failing test

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

cargo fmt

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

stuck again

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

fix trait bounds!

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

cargo fmt

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

hide dyn fut impl

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

dyn ret for vec

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

cargo fmt

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

collect docs

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

remove macro from vec::from_stream

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

shorten collect trait bound

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>

Remove some Unpin and Send bounds

Signed-off-by: Yoshua Wuyts <yoshuawuyts@gmail.com>
This commit is contained in:
Yoshua Wuyts 2019-08-29 11:32:43 +02:00
parent 60a62f90fd
commit 6c4c958abc
Failed to extract signature
7 changed files with 162 additions and 3 deletions

View file

@ -51,6 +51,7 @@ pub mod prelude;
pub mod stream;
pub mod sync;
pub mod task;
mod vec;
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(feature = "unstable")]

28
src/stream/from_stream.rs Normal file
View file

@ -0,0 +1,28 @@
use super::IntoStream;
use std::pin::Pin;
/// Conversion from a `Stream`.
///
/// By implementing `FromStream` for a type, you define how it will be created from a stream.
/// This is common for types which describe a collection of some kind.
///
/// See also: [`IntoStream`].
///
/// [`IntoStream`]: trait.IntoStream.html
pub trait FromStream<T> {
/// Creates a value from a stream.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// // use async_std::stream::FromStream;
///
/// // let _five_fives = async_std::stream::repeat(5).take(5);
/// ```
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>;
}

35
src/stream/into_stream.rs Normal file
View file

@ -0,0 +1,35 @@
use futures_core::stream::Stream;
/// Conversion into a `Stream`.
///
/// By implementing `IntoIterator` for a type, you define how it will be
/// converted to an iterator. This is common for types which describe a
/// collection of some kind.
///
/// [`from_stream`]: #tymethod.from_stream
/// [`Stream`]: trait.Stream.html
/// [`collect`]: trait.Stream.html#method.collect
///
/// See also: [`FromStream`].
///
/// [`FromStream`]: trait.FromStream.html
pub trait IntoStream {
/// The type of the elements being iterated over.
type Item;
/// Which kind of stream are we turning this into?
type IntoStream: Stream<Item = Self::Item>;
/// Creates a stream from a value.
fn into_stream(self) -> Self::IntoStream;
}
impl<I: Stream> IntoStream for I {
type Item = I::Item;
type IntoStream = I;
#[inline]
fn into_stream(self) -> I {
self
}
}

View file

@ -23,12 +23,16 @@
pub use double_ended_stream::DoubleEndedStream;
pub use empty::{empty, Empty};
pub use from_stream::FromStream;
pub use into_stream::IntoStream;
pub use once::{once, Once};
pub use repeat::{repeat, Repeat};
pub use stream::{Scan, Stream, Take, Zip};
mod double_ended_stream;
mod empty;
mod from_stream;
mod into_stream;
mod once;
mod repeat;
mod stream;

View file

@ -50,14 +50,15 @@ use min_by::MinByFuture;
use next::NextFuture;
use nth::NthFuture;
use super::from_stream::FromStream;
use crate::future::Future;
use crate::task::{Context, Poll};
use std::cmp::Ordering;
use std::marker::PhantomData;
use std::pin::Pin;
use cfg_if::cfg_if;
use crate::task::{Context, Poll};
cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
@ -80,6 +81,21 @@ cfg_if! {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
pub struct DynFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (DynFuture<$a, $o>);
}
} else {
macro_rules! dyn_ret {
($a:lifetime, $o:ty) => (Pin<Box<dyn core::future::Future<Output = $o> + 'a>>)
}
}
}
/// An asynchronous stream of values.
///
/// This trait is an async version of [`std::iter::Iterator`].
@ -528,7 +544,6 @@ pub trait Stream {
///
/// Basic usage:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
@ -652,6 +667,48 @@ pub trait Stream {
{
Zip::new(self, other)
}
/// Transforms a stream into a collection.
///
/// `collect()` can take anything streamable, and turn it into a relevant
/// collection. This is one of the more powerful methods in the async
/// standard library, used in a variety of contexts.
///
/// The most basic pattern in which `collect()` is used is to turn one
/// collection into another. You take a collection, call [`stream`] on it,
/// do a bunch of transformations, and then `collect()` at the end.
///
/// Because `collect()` is so general, it can cause problems with type
/// inference. As such, `collect()` is one of the few times you'll see
/// the syntax affectionately known as the 'turbofish': `::<>`. This
/// helps the inference algorithm understand specifically which collection
/// you're trying to collect into.
///
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let s = stream::repeat(9u8).take(3);
/// let buf: Vec<u8> = s.collect().await;
///
/// assert_eq!(buf, vec![9; 3]);
/// #
/// # }) }
/// ```
///
/// [`stream`]: trait.Stream.html#tymethod.next
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(self) -> dyn_ret!('a, B)
where
Self: futures_core::stream::Stream + Sized + 'a,
B: FromStream<<Self as futures_core::stream::Stream>::Item>,
{
FromStream::from_stream(self)
}
}
impl<T: futures_core::stream::Stream + ?Sized> Stream for T {

25
src/vec/from_stream.rs Normal file
View file

@ -0,0 +1,25 @@
use crate::stream::{FromStream, IntoStream, Stream};
use std::pin::Pin;
impl<T> FromStream<T> for Vec<T> {
#[inline]
fn from_stream<'a, S: IntoStream<Item = T>>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>
where
<S as IntoStream>::IntoStream: 'a,
{
let stream = stream.into_stream();
Pin::from(Box::new(async move {
pin_utils::pin_mut!(stream);
let mut out = vec![];
while let Some(item) = stream.next().await {
out.push(item);
}
out
}))
}
}

9
src/vec/mod.rs Normal file
View file

@ -0,0 +1,9 @@
//! The Rust core allocation and collections library
//!
//! This library provides smart pointers and collections for managing
//! heap-allocated values.
mod from_stream;
#[doc(inline)]
pub use std::vec::Vec;