From f0875d2dca8428f140c709bfc45f651748692e7b Mon Sep 17 00:00:00 2001 From: Grzegorz Gierlach Date: Tue, 12 Nov 2019 19:34:08 +0100 Subject: [PATCH 1/2] Cleaning up stream pinning. --- src/collections/binary_heap/from_stream.rs | 2 -- src/collections/btree_map/from_stream.rs | 2 -- src/collections/btree_set/from_stream.rs | 2 -- src/collections/hash_map/from_stream.rs | 2 -- src/collections/hash_set/from_stream.rs | 2 -- src/collections/linked_list/from_stream.rs | 2 -- src/collections/vec_deque/from_stream.rs | 2 -- src/option/from_stream.rs | 2 -- src/option/product.rs | 2 -- src/option/sum.rs | 2 -- src/path/pathbuf.rs | 7 ++----- src/result/from_stream.rs | 2 -- src/result/product.rs | 2 -- src/result/sum.rs | 2 -- src/stream/from_fn.rs | 3 +-- src/stream/repeat_with.rs | 4 +--- src/string/extend.rs | 10 ---------- src/string/from_stream.rs | 10 ---------- src/unit/extend.rs | 2 +- src/vec/from_stream.rs | 10 ---------- 20 files changed, 5 insertions(+), 67 deletions(-) diff --git a/src/collections/binary_heap/from_stream.rs b/src/collections/binary_heap/from_stream.rs index 148a57f..6851948 100644 --- a/src/collections/binary_heap/from_stream.rs +++ b/src/collections/binary_heap/from_stream.rs @@ -12,8 +12,6 @@ impl FromStream for BinaryHeap { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = BinaryHeap::new(); stream::extend(&mut out, stream).await; out diff --git a/src/collections/btree_map/from_stream.rs b/src/collections/btree_map/from_stream.rs index e0653ab..8531223 100644 --- a/src/collections/btree_map/from_stream.rs +++ b/src/collections/btree_map/from_stream.rs @@ -12,8 +12,6 @@ impl FromStream<(K, V)> for BTreeMap { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = BTreeMap::new(); stream::extend(&mut out, stream).await; out diff --git a/src/collections/btree_set/from_stream.rs b/src/collections/btree_set/from_stream.rs index c4197df..318af9e 100644 --- a/src/collections/btree_set/from_stream.rs +++ b/src/collections/btree_set/from_stream.rs @@ -12,8 +12,6 @@ impl FromStream for BTreeSet { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = BTreeSet::new(); stream::extend(&mut out, stream).await; out diff --git a/src/collections/hash_map/from_stream.rs b/src/collections/hash_map/from_stream.rs index bf47d8e..d74a7cc 100644 --- a/src/collections/hash_map/from_stream.rs +++ b/src/collections/hash_map/from_stream.rs @@ -17,8 +17,6 @@ where let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = HashMap::with_hasher(Default::default()); stream::extend(&mut out, stream).await; out diff --git a/src/collections/hash_set/from_stream.rs b/src/collections/hash_set/from_stream.rs index 69b3853..dc5e61e 100644 --- a/src/collections/hash_set/from_stream.rs +++ b/src/collections/hash_set/from_stream.rs @@ -17,8 +17,6 @@ where let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = HashSet::with_hasher(Default::default()); stream::extend(&mut out, stream).await; out diff --git a/src/collections/linked_list/from_stream.rs b/src/collections/linked_list/from_stream.rs index 1226247..d93bbb7 100644 --- a/src/collections/linked_list/from_stream.rs +++ b/src/collections/linked_list/from_stream.rs @@ -12,8 +12,6 @@ impl FromStream for LinkedList { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = LinkedList::new(); stream::extend(&mut out, stream).await; out diff --git a/src/collections/vec_deque/from_stream.rs b/src/collections/vec_deque/from_stream.rs index 767ec06..241bd74 100644 --- a/src/collections/vec_deque/from_stream.rs +++ b/src/collections/vec_deque/from_stream.rs @@ -12,8 +12,6 @@ impl FromStream for VecDeque { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = VecDeque::new(); stream::extend(&mut out, stream).await; out diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index d2d53b6..8679114 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -17,8 +17,6 @@ where let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_error = false; diff --git a/src/option/product.rs b/src/option/product.rs index 9b7274f..abaab73 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -39,8 +39,6 @@ where where S: Stream> + 'a { Box::pin(async move { - pin_utils::pin_mut!(stream); - // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; diff --git a/src/option/sum.rs b/src/option/sum.rs index 5c154f4..d2e4483 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -34,8 +34,6 @@ where where S: Stream> + 'a { Box::pin(async move { - pin_utils::pin_mut!(stream); - // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; diff --git a/src/path/pathbuf.rs b/src/path/pathbuf.rs index 56a63a4..808acb2 100644 --- a/src/path/pathbuf.rs +++ b/src/path/pathbuf.rs @@ -327,8 +327,6 @@ impl> stream::Extend

for PathBuf { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - while let Some(item) = stream.next().await { self.push(item.as_ref()); } @@ -342,10 +340,9 @@ impl<'b, P: AsRef + 'b> FromStream

for PathBuf { fn from_stream<'a, S: IntoStream + 'a>( stream: S, ) -> Pin + 'a>> { - Box::pin(async move { - let stream = stream.into_stream(); - pin_utils::pin_mut!(stream); + let stream = stream.into_stream(); + Box::pin(async move { let mut out = Self::new(); stream::extend(&mut out, stream).await; out diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 9296797..a8490d6 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -17,8 +17,6 @@ where let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_error = None; diff --git a/src/result/product.rs b/src/result/product.rs index fd24216..ec9d94a 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -39,8 +39,6 @@ where where S: Stream> + 'a { Box::pin(async move { - pin_utils::pin_mut!(stream); - // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_error = None; diff --git a/src/result/sum.rs b/src/result/sum.rs index dd68772..ccc4240 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -39,8 +39,6 @@ where where S: Stream> + 'a { Box::pin(async move { - pin_utils::pin_mut!(stream); - // Using `scan` here because it is able to stop the stream early // if a failure occurs let mut found_error = None; diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 24432c7..3ace658 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -30,7 +30,7 @@ impl Unpin for FromFn {} /// use async_std::stream; /// /// let mut count = 0u8; -/// let s = stream::from_fn(|| { +/// let mut s = stream::from_fn(|| { /// count += 1; /// if count > 3 { /// None @@ -39,7 +39,6 @@ impl Unpin for FromFn {} /// } /// }); /// -/// pin_utils::pin_mut!(s); /// assert_eq!(s.next().await, Some(1)); /// assert_eq!(s.next().await, Some(2)); /// assert_eq!(s.next().await, Some(3)); diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs index e183a77..954693d 100644 --- a/src/stream/repeat_with.rs +++ b/src/stream/repeat_with.rs @@ -28,9 +28,7 @@ impl Unpin for RepeatWith {} /// use async_std::prelude::*; /// use async_std::stream; /// -/// let s = stream::repeat_with(|| 1); -/// -/// pin_utils::pin_mut!(s); +/// let mut s = stream::repeat_with(|| 1); /// /// assert_eq!(s.next().await, Some(1)); /// assert_eq!(s.next().await, Some(1)); diff --git a/src/string/extend.rs b/src/string/extend.rs index 55bec0c..43bd46d 100644 --- a/src/string/extend.rs +++ b/src/string/extend.rs @@ -13,8 +13,6 @@ impl stream::Extend for String { self.reserve(stream.size_hint().0); Box::pin(async move { - pin_utils::pin_mut!(stream); - while let Some(item) = stream.next().await { self.push(item); } @@ -30,8 +28,6 @@ impl<'b> stream::Extend<&'b char> for String { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - while let Some(item) = stream.next().await { self.push(*item); } @@ -47,8 +43,6 @@ impl<'b> stream::Extend<&'b str> for String { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - while let Some(item) = stream.next().await { self.push_str(item); } @@ -64,8 +58,6 @@ impl stream::Extend for String { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - while let Some(item) = stream.next().await { self.push_str(&item); } @@ -81,8 +73,6 @@ impl<'b> stream::Extend> for String { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - while let Some(item) = stream.next().await { self.push_str(&item); } diff --git a/src/string/from_stream.rs b/src/string/from_stream.rs index eb6818c..375ac37 100644 --- a/src/string/from_stream.rs +++ b/src/string/from_stream.rs @@ -12,8 +12,6 @@ impl FromStream for String { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = String::new(); stream::extend(&mut out, stream).await; out @@ -29,8 +27,6 @@ impl<'b> FromStream<&'b char> for String { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = String::new(); stream::extend(&mut out, stream).await; out @@ -46,8 +42,6 @@ impl<'b> FromStream<&'b str> for String { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = String::new(); stream::extend(&mut out, stream).await; out @@ -63,8 +57,6 @@ impl FromStream for String { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = String::new(); stream::extend(&mut out, stream).await; out @@ -80,8 +72,6 @@ impl<'b> FromStream> for String { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = String::new(); stream::extend(&mut out, stream).await; out diff --git a/src/unit/extend.rs b/src/unit/extend.rs index 27f5d4e..5b0bc1d 100644 --- a/src/unit/extend.rs +++ b/src/unit/extend.rs @@ -9,8 +9,8 @@ impl stream::Extend<()> for () { stream: T, ) -> Pin + 'a>> { let stream = stream.into_stream(); + Box::pin(async move { - pin_utils::pin_mut!(stream); while let Some(_) = stream.next().await {} }) } diff --git a/src/vec/from_stream.rs b/src/vec/from_stream.rs index cdd4767..e88e820 100644 --- a/src/vec/from_stream.rs +++ b/src/vec/from_stream.rs @@ -17,8 +17,6 @@ impl FromStream for Vec { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - let mut out = vec![]; stream::extend(&mut out, stream).await; out @@ -34,8 +32,6 @@ impl<'b, T: Clone> FromStream for Cow<'b, [T]> { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - Cow::Owned(FromStream::from_stream(stream).await) }) } @@ -49,8 +45,6 @@ impl FromStream for Box<[T]> { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - Vec::from_stream(stream).await.into_boxed_slice() }) } @@ -64,8 +58,6 @@ impl FromStream for Rc<[T]> { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - Vec::from_stream(stream).await.into() }) } @@ -79,8 +71,6 @@ impl FromStream for Arc<[T]> { let stream = stream.into_stream(); Box::pin(async move { - pin_utils::pin_mut!(stream); - Vec::from_stream(stream).await.into() }) } From e442eba625fb881dedc7572cf591e7f8b51ef0d0 Mon Sep 17 00:00:00 2001 From: Grzegorz Gierlach Date: Tue, 12 Nov 2019 19:51:58 +0100 Subject: [PATCH 2/2] Cleaning up stream pinning. --- src/path/pathbuf.rs | 2 ++ src/stream/from_fn.rs | 4 +++- src/stream/repeat_with.rs | 4 +++- src/string/extend.rs | 10 ++++++++++ src/unit/extend.rs | 2 ++ 5 files changed, 20 insertions(+), 2 deletions(-) diff --git a/src/path/pathbuf.rs b/src/path/pathbuf.rs index 808acb2..e684df8 100644 --- a/src/path/pathbuf.rs +++ b/src/path/pathbuf.rs @@ -327,6 +327,8 @@ impl> stream::Extend

for PathBuf { let stream = stream.into_stream(); Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(item) = stream.next().await { self.push(item.as_ref()); } diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 3ace658..8067176 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -30,7 +30,7 @@ impl Unpin for FromFn {} /// use async_std::stream; /// /// let mut count = 0u8; -/// let mut s = stream::from_fn(|| { +/// let s = stream::from_fn(|| { /// count += 1; /// if count > 3 { /// None @@ -39,6 +39,8 @@ impl Unpin for FromFn {} /// } /// }); /// +/// pin_utils::pin_mut!(s); +/// /// assert_eq!(s.next().await, Some(1)); /// assert_eq!(s.next().await, Some(2)); /// assert_eq!(s.next().await, Some(3)); diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs index 954693d..e183a77 100644 --- a/src/stream/repeat_with.rs +++ b/src/stream/repeat_with.rs @@ -28,7 +28,9 @@ impl Unpin for RepeatWith {} /// use async_std::prelude::*; /// use async_std::stream; /// -/// let mut s = stream::repeat_with(|| 1); +/// let s = stream::repeat_with(|| 1); +/// +/// pin_utils::pin_mut!(s); /// /// assert_eq!(s.next().await, Some(1)); /// assert_eq!(s.next().await, Some(1)); diff --git a/src/string/extend.rs b/src/string/extend.rs index 43bd46d..55bec0c 100644 --- a/src/string/extend.rs +++ b/src/string/extend.rs @@ -13,6 +13,8 @@ impl stream::Extend for String { self.reserve(stream.size_hint().0); Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(item) = stream.next().await { self.push(item); } @@ -28,6 +30,8 @@ impl<'b> stream::Extend<&'b char> for String { let stream = stream.into_stream(); Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(item) = stream.next().await { self.push(*item); } @@ -43,6 +47,8 @@ impl<'b> stream::Extend<&'b str> for String { let stream = stream.into_stream(); Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(item) = stream.next().await { self.push_str(item); } @@ -58,6 +64,8 @@ impl stream::Extend for String { let stream = stream.into_stream(); Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(item) = stream.next().await { self.push_str(&item); } @@ -73,6 +81,8 @@ impl<'b> stream::Extend> for String { let stream = stream.into_stream(); Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(item) = stream.next().await { self.push_str(&item); } diff --git a/src/unit/extend.rs b/src/unit/extend.rs index 5b0bc1d..55c8e0d 100644 --- a/src/unit/extend.rs +++ b/src/unit/extend.rs @@ -11,6 +11,8 @@ impl stream::Extend<()> for () { let stream = stream.into_stream(); Box::pin(async move { + pin_utils::pin_mut!(stream); + while let Some(_) = stream.next().await {} }) }