forked from mirror/async-std
fix(stream): add send guards on collect
Closes #639 Co-authored-by: dignifiedquire <me@dignifiedquire.com>
This commit is contained in:
parent
8c4b425136
commit
c82b1efb69
27 changed files with 210 additions and 72 deletions
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, IntoStream};
|
use crate::stream::{self, IntoStream};
|
||||||
|
|
||||||
impl<T: Ord> stream::Extend<T> for BinaryHeap<T> {
|
impl<T: Ord + Send> stream::Extend<T> for BinaryHeap<T> {
|
||||||
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
self.reserve(stream.size_hint().0);
|
self.reserve(stream.size_hint().0);
|
||||||
|
|
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, FromStream, IntoStream};
|
use crate::stream::{self, FromStream, IntoStream};
|
||||||
|
|
||||||
impl<T: Ord> FromStream<T> for BinaryHeap<T> {
|
impl<T: Ord + Send> FromStream<T> for BinaryHeap<T> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, IntoStream};
|
use crate::stream::{self, IntoStream};
|
||||||
|
|
||||||
impl<K: Ord, V> stream::Extend<(K, V)> for BTreeMap<K, V> {
|
impl<K: Ord + Send, V: Send> stream::Extend<(K, V)> for BTreeMap<K, V> {
|
||||||
fn extend<'a, S: IntoStream<Item = (K, V)> + 'a>(
|
fn extend<'a, S: IntoStream<Item = (K, V)> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
Box::pin(stream.into_stream().for_each(move |(k, v)| {
|
Box::pin(stream.into_stream().for_each(move |(k, v)| {
|
||||||
self.insert(k, v);
|
self.insert(k, v);
|
||||||
}))
|
}))
|
||||||
|
|
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, FromStream, IntoStream};
|
use crate::stream::{self, FromStream, IntoStream};
|
||||||
|
|
||||||
impl<K: Ord, V> FromStream<(K, V)> for BTreeMap<K, V> {
|
impl<K: Ord + Send, V: Send> FromStream<(K, V)> for BTreeMap<K, V> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = (K, V)> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = (K, V)> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, IntoStream};
|
use crate::stream::{self, IntoStream};
|
||||||
|
|
||||||
impl<T: Ord> stream::Extend<T> for BTreeSet<T> {
|
impl<T: Ord + Send> stream::Extend<T> for BTreeSet<T> {
|
||||||
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
Box::pin(stream.into_stream().for_each(move |item| {
|
Box::pin(stream.into_stream().for_each(move |item| {
|
||||||
self.insert(item);
|
self.insert(item);
|
||||||
}))
|
}))
|
||||||
|
|
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, FromStream, IntoStream};
|
use crate::stream::{self, FromStream, IntoStream};
|
||||||
|
|
||||||
impl<T: Ord> FromStream<T> for BTreeSet<T> {
|
impl<T: Ord + Send> FromStream<T> for BTreeSet<T> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -7,13 +7,17 @@ use crate::stream::{self, IntoStream};
|
||||||
|
|
||||||
impl<K, V, H> stream::Extend<(K, V)> for HashMap<K, V, H>
|
impl<K, V, H> stream::Extend<(K, V)> for HashMap<K, V, H>
|
||||||
where
|
where
|
||||||
K: Eq + Hash,
|
K: Eq + Hash + Send,
|
||||||
H: BuildHasher + Default,
|
V: Send,
|
||||||
|
H: BuildHasher + Default + Send,
|
||||||
{
|
{
|
||||||
fn extend<'a, S: IntoStream<Item = (K, V)> + 'a>(
|
fn extend<'a, S: IntoStream<Item = (K, V)> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
// The following is adapted from the hashbrown source code:
|
// The following is adapted from the hashbrown source code:
|
||||||
|
|
|
@ -7,13 +7,17 @@ use crate::stream::{self, FromStream, IntoStream};
|
||||||
|
|
||||||
impl<K, V, H> FromStream<(K, V)> for HashMap<K, V, H>
|
impl<K, V, H> FromStream<(K, V)> for HashMap<K, V, H>
|
||||||
where
|
where
|
||||||
K: Eq + Hash,
|
K: Eq + Hash + Send,
|
||||||
H: BuildHasher + Default,
|
H: BuildHasher + Default + Send,
|
||||||
|
V: Send,
|
||||||
{
|
{
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = (K, V)> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = (K, V)> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -7,13 +7,16 @@ use crate::stream::{self, IntoStream};
|
||||||
|
|
||||||
impl<T, H> stream::Extend<T> for HashSet<T, H>
|
impl<T, H> stream::Extend<T> for HashSet<T, H>
|
||||||
where
|
where
|
||||||
T: Eq + Hash,
|
T: Eq + Hash + Send,
|
||||||
H: BuildHasher + Default,
|
H: BuildHasher + Default + Send,
|
||||||
{
|
{
|
||||||
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
// The Extend impl for HashSet in the standard library delegates to the internal HashMap.
|
// The Extend impl for HashSet in the standard library delegates to the internal HashMap.
|
||||||
// Thus, this impl is just a copy of the async Extend impl for HashMap in this crate.
|
// Thus, this impl is just a copy of the async Extend impl for HashMap in this crate.
|
||||||
|
|
||||||
|
|
|
@ -7,13 +7,16 @@ use crate::stream::{self, FromStream, IntoStream};
|
||||||
|
|
||||||
impl<T, H> FromStream<T> for HashSet<T, H>
|
impl<T, H> FromStream<T> for HashSet<T, H>
|
||||||
where
|
where
|
||||||
T: Eq + Hash,
|
T: Eq + Hash + Send,
|
||||||
H: BuildHasher + Default,
|
H: BuildHasher + Default + Send,
|
||||||
{
|
{
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, IntoStream};
|
use crate::stream::{self, IntoStream};
|
||||||
|
|
||||||
impl<T> stream::Extend<T> for LinkedList<T> {
|
impl<T: Send> stream::Extend<T> for LinkedList<T> {
|
||||||
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
Box::pin(stream.for_each(move |item| self.push_back(item)))
|
Box::pin(stream.for_each(move |item| self.push_back(item)))
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, FromStream, IntoStream};
|
use crate::stream::{self, FromStream, IntoStream};
|
||||||
|
|
||||||
impl<T> FromStream<T> for LinkedList<T> {
|
impl<T: Send> FromStream<T> for LinkedList<T> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, IntoStream};
|
use crate::stream::{self, IntoStream};
|
||||||
|
|
||||||
impl<T> stream::Extend<T> for VecDeque<T> {
|
impl<T: Send> stream::Extend<T> for VecDeque<T> {
|
||||||
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
self.reserve(stream.size_hint().0);
|
self.reserve(stream.size_hint().0);
|
||||||
|
|
|
@ -4,11 +4,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, FromStream, IntoStream};
|
use crate::stream::{self, FromStream, IntoStream};
|
||||||
|
|
||||||
impl<T> FromStream<T> for VecDeque<T> {
|
impl<T: Send> FromStream<T> for VecDeque<T> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -4,7 +4,7 @@ use crate::prelude::*;
|
||||||
use crate::stream::{FromStream, IntoStream};
|
use crate::stream::{FromStream, IntoStream};
|
||||||
use std::convert::identity;
|
use std::convert::identity;
|
||||||
|
|
||||||
impl<T, V> FromStream<Option<T>> for Option<V>
|
impl<T: Send, V> FromStream<Option<T>> for Option<V>
|
||||||
where
|
where
|
||||||
V: FromStream<T>,
|
V: FromStream<T>,
|
||||||
{
|
{
|
||||||
|
@ -14,7 +14,10 @@ where
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = Option<T>> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = Option<T>> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -323,7 +323,10 @@ impl<P: AsRef<Path>> stream::Extend<P> for PathBuf {
|
||||||
fn extend<'a, S: IntoStream<Item = P> + 'a>(
|
fn extend<'a, S: IntoStream<Item = P> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -337,11 +340,14 @@ impl<P: AsRef<Path>> stream::Extend<P> for PathBuf {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "unstable")]
|
#[cfg(feature = "unstable")]
|
||||||
impl<'b, P: AsRef<Path> + 'b> FromStream<P> for PathBuf {
|
impl<'b, P: AsRef<Path> + 'b + Send> FromStream<P> for PathBuf {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = P> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = P> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -5,6 +5,8 @@ use crate::stream::{FromStream, IntoStream};
|
||||||
|
|
||||||
impl<T, E, V> FromStream<Result<T, E>> for Result<V, E>
|
impl<T, E, V> FromStream<Result<T, E>> for Result<V, E>
|
||||||
where
|
where
|
||||||
|
T: Send,
|
||||||
|
E: Send,
|
||||||
V: FromStream<T>,
|
V: FromStream<T>,
|
||||||
{
|
{
|
||||||
/// Takes each element in the stream: if it is an `Err`, no further
|
/// Takes each element in the stream: if it is an `Err`, no further
|
||||||
|
@ -30,7 +32,10 @@ where
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = Result<T, E>> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = Result<T, E>> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -34,7 +34,9 @@ pub trait Extend<A> {
|
||||||
fn extend<'a, T: IntoStream<Item = A> + 'a>(
|
fn extend<'a, T: IntoStream<Item = A> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: T,
|
stream: T,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>>;
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<T as IntoStream>::IntoStream: Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extends a collection with the contents of a stream.
|
/// Extends a collection with the contents of a stream.
|
||||||
|
@ -69,6 +71,7 @@ pub async fn extend<'a, C, T, S>(collection: &mut C, stream: S)
|
||||||
where
|
where
|
||||||
C: Extend<T>,
|
C: Extend<T>,
|
||||||
S: IntoStream<Item = T> + 'a,
|
S: IntoStream<Item = T> + 'a,
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
{
|
{
|
||||||
Extend::extend(collection, stream).await
|
Extend::extend(collection, stream).await
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,7 +72,10 @@ use crate::stream::IntoStream;
|
||||||
/// impl FromStream<i32> for MyCollection {
|
/// impl FromStream<i32> for MyCollection {
|
||||||
/// fn from_stream<'a, S: IntoStream<Item = i32> + 'a>(
|
/// fn from_stream<'a, S: IntoStream<Item = i32> + 'a>(
|
||||||
/// stream: S,
|
/// stream: S,
|
||||||
/// ) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
/// ) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
/// where
|
||||||
|
/// <S as IntoStream>::IntoStream: Send,
|
||||||
|
/// {
|
||||||
/// let stream = stream.into_stream();
|
/// let stream = stream.into_stream();
|
||||||
///
|
///
|
||||||
/// Box::pin(async move {
|
/// Box::pin(async move {
|
||||||
|
@ -107,12 +110,12 @@ use crate::stream::IntoStream;
|
||||||
/// assert_eq!(c.0, vec![5, 5, 5, 5, 5]);
|
/// assert_eq!(c.0, vec![5, 5, 5, 5, 5]);
|
||||||
/// #
|
/// #
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
///```
|
/// ```
|
||||||
///
|
///
|
||||||
/// [`IntoStream`]: trait.IntoStream.html
|
/// [`IntoStream`]: trait.IntoStream.html
|
||||||
#[cfg(feature = "unstable")]
|
#[cfg(feature = "unstable")]
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
pub trait FromStream<T> {
|
pub trait FromStream<T: Send> {
|
||||||
/// Creates a value from a stream.
|
/// Creates a value from a stream.
|
||||||
///
|
///
|
||||||
/// # Examples
|
/// # Examples
|
||||||
|
@ -135,5 +138,7 @@ pub trait FromStream<T> {
|
||||||
/// ```
|
/// ```
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>>;
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1888,10 +1888,11 @@ extension_trait! {
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
fn collect<'a, B>(
|
fn collect<'a, B>(
|
||||||
self,
|
self,
|
||||||
) -> impl Future<Output = B> + 'a [Pin<Box<dyn Future<Output = B> + 'a>>]
|
) -> impl Future<Output = B> + 'a [Pin<Box<dyn Future<Output = B> + 'a + Send>>]
|
||||||
where
|
where
|
||||||
Self: Sized + 'a,
|
Self: Sized + 'a + Send,
|
||||||
B: FromStream<Self::Item>,
|
B: FromStream<Self::Item>,
|
||||||
|
Self::Item: Send,
|
||||||
{
|
{
|
||||||
FromStream::from_stream(self)
|
FromStream::from_stream(self)
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,10 @@ impl stream::Extend<char> for String {
|
||||||
fn extend<'a, S: IntoStream<Item = char> + 'a>(
|
fn extend<'a, S: IntoStream<Item = char> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
self.reserve(stream.size_hint().0);
|
self.reserve(stream.size_hint().0);
|
||||||
|
|
||||||
|
@ -26,7 +29,10 @@ impl<'b> stream::Extend<&'b char> for String {
|
||||||
fn extend<'a, S: IntoStream<Item = &'b char> + 'a>(
|
fn extend<'a, S: IntoStream<Item = &'b char> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -43,7 +49,10 @@ impl<'b> stream::Extend<&'b str> for String {
|
||||||
fn extend<'a, S: IntoStream<Item = &'b str> + 'a>(
|
fn extend<'a, S: IntoStream<Item = &'b str> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -60,7 +69,10 @@ impl stream::Extend<String> for String {
|
||||||
fn extend<'a, S: IntoStream<Item = String> + 'a>(
|
fn extend<'a, S: IntoStream<Item = String> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -77,7 +89,10 @@ impl<'b> stream::Extend<Cow<'b, str>> for String {
|
||||||
fn extend<'a, S: IntoStream<Item = Cow<'b, str>> + 'a>(
|
fn extend<'a, S: IntoStream<Item = Cow<'b, str>> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -8,7 +8,10 @@ impl FromStream<char> for String {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = char> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = char> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -23,7 +26,10 @@ impl<'b> FromStream<&'b char> for String {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = &'b char> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = &'b char> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -38,7 +44,10 @@ impl<'b> FromStream<&'b str> for String {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = &'b str> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = &'b str> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -53,7 +62,10 @@ impl FromStream<String> for String {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = String> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = String> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -68,7 +80,10 @@ impl<'b> FromStream<Cow<'b, str>> for String {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = Cow<'b, str>> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = Cow<'b, str>> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -4,10 +4,13 @@ use crate::prelude::*;
|
||||||
use crate::stream::{self, IntoStream};
|
use crate::stream::{self, IntoStream};
|
||||||
|
|
||||||
impl stream::Extend<()> for () {
|
impl stream::Extend<()> for () {
|
||||||
fn extend<'a, T: IntoStream<Item = ()> + 'a>(
|
fn extend<'a, S: IntoStream<Item = ()> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: T,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
|
@ -7,7 +7,10 @@ impl FromStream<()> for () {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = ()> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = ()> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
Box::pin(stream.into_stream().for_each(drop))
|
Box::pin(stream.into_stream().for_each(drop))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,11 +3,14 @@ use std::pin::Pin;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, IntoStream};
|
use crate::stream::{self, IntoStream};
|
||||||
|
|
||||||
impl<T> stream::Extend<T> for Vec<T> {
|
impl<T: Send> stream::Extend<T> for Vec<T> {
|
||||||
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
fn extend<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
&'a mut self,
|
&'a mut self,
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = ()> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send,
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
self.reserve(stream.size_hint().0);
|
self.reserve(stream.size_hint().0);
|
||||||
|
|
|
@ -6,13 +6,13 @@ use std::sync::Arc;
|
||||||
use crate::prelude::*;
|
use crate::prelude::*;
|
||||||
use crate::stream::{self, FromStream, IntoStream};
|
use crate::stream::{self, FromStream, IntoStream};
|
||||||
|
|
||||||
impl<T> FromStream<T> for Vec<T> {
|
impl<T: Send> FromStream<T> for Vec<T> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T>>(
|
fn from_stream<'a, S: IntoStream<Item = T>>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>>
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
where
|
where
|
||||||
<S as IntoStream>::IntoStream: 'a,
|
<S as IntoStream>::IntoStream: 'a + Send,
|
||||||
{
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
|
@ -24,11 +24,14 @@ impl<T> FromStream<T> for Vec<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'b, T: Clone> FromStream<T> for Cow<'b, [T]> {
|
impl<'b, T: Clone + Send> FromStream<T> for Cow<'b, [T]> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -37,11 +40,14 @@ impl<'b, T: Clone> FromStream<T> for Cow<'b, [T]> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> FromStream<T> for Box<[T]> {
|
impl<T: Send> FromStream<T> for Box<[T]> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -50,11 +56,14 @@ impl<T> FromStream<T> for Box<[T]> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> FromStream<T> for Rc<[T]> {
|
impl<T: Send> FromStream<T> for Rc<[T]> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
@ -63,11 +72,14 @@ impl<T> FromStream<T> for Rc<[T]> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> FromStream<T> for Arc<[T]> {
|
impl<T: Send> FromStream<T> for Arc<[T]> {
|
||||||
#[inline]
|
#[inline]
|
||||||
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
fn from_stream<'a, S: IntoStream<Item = T> + 'a>(
|
||||||
stream: S,
|
stream: S,
|
||||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
) -> Pin<Box<dyn Future<Output = Self> + 'a + Send>>
|
||||||
|
where
|
||||||
|
<S as IntoStream>::IntoStream: Send
|
||||||
|
{
|
||||||
let stream = stream.into_stream();
|
let stream = stream.into_stream();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
|
|
20
tests/collect.rs
Normal file
20
tests/collect.rs
Normal file
|
@ -0,0 +1,20 @@
|
||||||
|
#[cfg(feature = "unstable")]
|
||||||
|
#[test]
|
||||||
|
fn test_send() -> async_std::io::Result<()> {
|
||||||
|
use async_std::prelude::*;
|
||||||
|
use async_std::{stream, task};
|
||||||
|
|
||||||
|
task::block_on(async {
|
||||||
|
fn test_send_trait<T: Send>(_: &T) {}
|
||||||
|
|
||||||
|
let stream = stream::repeat(1u8).take(10);
|
||||||
|
test_send_trait(&stream);
|
||||||
|
|
||||||
|
let fut = stream.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
// This line triggers a compilation error
|
||||||
|
test_send_trait(&fut);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
}
|
Loading…
Reference in a new issue