forked from mirror/async-std
Cleaning up stream pinning.
This commit is contained in:
parent
f611ceccc8
commit
f0875d2dca
20 changed files with 5 additions and 67 deletions
|
@ -12,8 +12,6 @@ impl<T: Ord> FromStream<T> for BinaryHeap<T> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = BinaryHeap::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
|
|
@ -12,8 +12,6 @@ impl<K: Ord, V> FromStream<(K, V)> for BTreeMap<K, V> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = BTreeMap::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
|
|
@ -12,8 +12,6 @@ impl<T: Ord> FromStream<T> for BTreeSet<T> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = BTreeSet::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
|
|
@ -17,8 +17,6 @@ where
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = HashMap::with_hasher(Default::default());
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
|
|
@ -17,8 +17,6 @@ where
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = HashSet::with_hasher(Default::default());
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
|
|
@ -12,8 +12,6 @@ impl<T> FromStream<T> for LinkedList<T> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = LinkedList::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
|
|
@ -12,8 +12,6 @@ impl<T> FromStream<T> for VecDeque<T> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = VecDeque::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
|
|
@ -17,8 +17,6 @@ where
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_error = false;
|
||||
|
|
|
@ -39,8 +39,6 @@ where
|
|||
where S: Stream<Item = Option<U>> + 'a
|
||||
{
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_none = false;
|
||||
|
|
|
@ -34,8 +34,6 @@ where
|
|||
where S: Stream<Item = Option<U>> + 'a
|
||||
{
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_none = false;
|
||||
|
|
|
@ -327,8 +327,6 @@ impl<P: AsRef<Path>> stream::Extend<P> for PathBuf {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
self.push(item.as_ref());
|
||||
}
|
||||
|
@ -342,10 +340,9 @@ impl<'b, P: AsRef<Path> + 'b> FromStream<P> for PathBuf {
|
|||
fn from_stream<'a, S: IntoStream<Item = P> + 'a>(
|
||||
stream: S,
|
||||
) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
|
||||
Box::pin(async move {
|
||||
let stream = stream.into_stream();
|
||||
pin_utils::pin_mut!(stream);
|
||||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
let mut out = Self::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
|
|
@ -17,8 +17,6 @@ where
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_error = None;
|
||||
|
|
|
@ -39,8 +39,6 @@ where
|
|||
where S: Stream<Item = Result<U, E>> + 'a
|
||||
{
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_error = None;
|
||||
|
|
|
@ -39,8 +39,6 @@ where
|
|||
where S: Stream<Item = Result<U, E>> + 'a
|
||||
{
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_error = None;
|
||||
|
|
|
@ -30,7 +30,7 @@ impl<F> Unpin for FromFn<F> {}
|
|||
/// use async_std::stream;
|
||||
///
|
||||
/// let mut count = 0u8;
|
||||
/// let s = stream::from_fn(|| {
|
||||
/// let mut s = stream::from_fn(|| {
|
||||
/// count += 1;
|
||||
/// if count > 3 {
|
||||
/// None
|
||||
|
@ -39,7 +39,6 @@ impl<F> Unpin for FromFn<F> {}
|
|||
/// }
|
||||
/// });
|
||||
///
|
||||
/// pin_utils::pin_mut!(s);
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// assert_eq!(s.next().await, Some(2));
|
||||
/// assert_eq!(s.next().await, Some(3));
|
||||
|
|
|
@ -28,9 +28,7 @@ impl<F> Unpin for RepeatWith<F> {}
|
|||
/// use async_std::prelude::*;
|
||||
/// use async_std::stream;
|
||||
///
|
||||
/// let s = stream::repeat_with(|| 1);
|
||||
///
|
||||
/// pin_utils::pin_mut!(s);
|
||||
/// let mut s = stream::repeat_with(|| 1);
|
||||
///
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
/// assert_eq!(s.next().await, Some(1));
|
||||
|
|
|
@ -13,8 +13,6 @@ impl stream::Extend<char> for String {
|
|||
self.reserve(stream.size_hint().0);
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
self.push(item);
|
||||
}
|
||||
|
@ -30,8 +28,6 @@ impl<'b> stream::Extend<&'b char> for String {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
self.push(*item);
|
||||
}
|
||||
|
@ -47,8 +43,6 @@ impl<'b> stream::Extend<&'b str> for String {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
self.push_str(item);
|
||||
}
|
||||
|
@ -64,8 +58,6 @@ impl stream::Extend<String> for String {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
self.push_str(&item);
|
||||
}
|
||||
|
@ -81,8 +73,6 @@ impl<'b> stream::Extend<Cow<'b, str>> for String {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
while let Some(item) = stream.next().await {
|
||||
self.push_str(&item);
|
||||
}
|
||||
|
|
|
@ -12,8 +12,6 @@ impl FromStream<char> for String {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = String::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
@ -29,8 +27,6 @@ impl<'b> FromStream<&'b char> for String {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = String::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
@ -46,8 +42,6 @@ impl<'b> FromStream<&'b str> for String {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = String::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
@ -63,8 +57,6 @@ impl FromStream<String> for String {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = String::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
@ -80,8 +72,6 @@ impl<'b> FromStream<Cow<'b, str>> for String {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = String::new();
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
|
|
@ -9,8 +9,8 @@ impl stream::Extend<()> for () {
|
|||
stream: T,
|
||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
|
||||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
while let Some(_) = stream.next().await {}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -17,8 +17,6 @@ impl<T> FromStream<T> for Vec<T> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = vec![];
|
||||
stream::extend(&mut out, stream).await;
|
||||
out
|
||||
|
@ -34,8 +32,6 @@ impl<'b, T: Clone> FromStream<T> for Cow<'b, [T]> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
Cow::Owned(FromStream::from_stream(stream).await)
|
||||
})
|
||||
}
|
||||
|
@ -49,8 +45,6 @@ impl<T> FromStream<T> for Box<[T]> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
Vec::from_stream(stream).await.into_boxed_slice()
|
||||
})
|
||||
}
|
||||
|
@ -64,8 +58,6 @@ impl<T> FromStream<T> for Rc<[T]> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
Vec::from_stream(stream).await.into()
|
||||
})
|
||||
}
|
||||
|
@ -79,8 +71,6 @@ impl<T> FromStream<T> for Arc<[T]> {
|
|||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
Vec::from_stream(stream).await.into()
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue