Merge pull request #520 from gierlachg/stream_pinning

Cleaning up stream pinning.
bench-mutexes
Florian Gilcher 5 years ago committed by GitHub
commit 6338341369
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,8 +12,6 @@ impl<T: Ord> FromStream<T> for BinaryHeap<T> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = BinaryHeap::new(); let mut out = BinaryHeap::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out

@ -12,8 +12,6 @@ impl<K: Ord, V> FromStream<(K, V)> for BTreeMap<K, V> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = BTreeMap::new(); let mut out = BTreeMap::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out

@ -12,8 +12,6 @@ impl<T: Ord> FromStream<T> for BTreeSet<T> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = BTreeSet::new(); let mut out = BTreeSet::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out

@ -17,8 +17,6 @@ where
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = HashMap::with_hasher(Default::default()); let mut out = HashMap::with_hasher(Default::default());
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out

@ -17,8 +17,6 @@ where
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = HashSet::with_hasher(Default::default()); let mut out = HashSet::with_hasher(Default::default());
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out

@ -12,8 +12,6 @@ impl<T> FromStream<T> for LinkedList<T> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = LinkedList::new(); let mut out = LinkedList::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out

@ -12,8 +12,6 @@ impl<T> FromStream<T> for VecDeque<T> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = VecDeque::new(); let mut out = VecDeque::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out

@ -17,8 +17,6 @@ where
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early // Using `scan` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_error = false; let mut found_error = false;

@ -39,8 +39,6 @@ where
where S: Stream<Item = Option<U>> + 'a where S: Stream<Item = Option<U>> + 'a
{ {
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early // Using `scan` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_none = false; let mut found_none = false;

@ -34,8 +34,6 @@ where
where S: Stream<Item = Option<U>> + 'a where S: Stream<Item = Option<U>> + 'a
{ {
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early // Using `scan` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_none = false; let mut found_none = false;

@ -342,10 +342,9 @@ impl<'b, P: AsRef<Path> + 'b> FromStream<P> for PathBuf {
fn from_stream<'a, S: IntoStream<Item = P> + 'a>( fn from_stream<'a, S: IntoStream<Item = P> + 'a>(
stream: S, stream: S,
) -> Pin<Box<dyn Future<Output = Self> + 'a>> { ) -> Pin<Box<dyn Future<Output = Self> + 'a>> {
Box::pin(async move { let stream = stream.into_stream();
let stream = stream.into_stream();
pin_utils::pin_mut!(stream);
Box::pin(async move {
let mut out = Self::new(); let mut out = Self::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out

@ -17,8 +17,6 @@ where
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early // Using `scan` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_error = None; let mut found_error = None;

@ -39,8 +39,6 @@ where
where S: Stream<Item = Result<U, E>> + 'a where S: Stream<Item = Result<U, E>> + 'a
{ {
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early // Using `scan` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_error = None; let mut found_error = None;

@ -39,8 +39,6 @@ where
where S: Stream<Item = Result<U, E>> + 'a where S: Stream<Item = Result<U, E>> + 'a
{ {
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early // Using `scan` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_error = None; let mut found_error = None;

@ -40,6 +40,7 @@ impl<F> Unpin for FromFn<F> {}
/// }); /// });
/// ///
/// pin_utils::pin_mut!(s); /// pin_utils::pin_mut!(s);
///
/// assert_eq!(s.next().await, Some(1)); /// assert_eq!(s.next().await, Some(1));
/// assert_eq!(s.next().await, Some(2)); /// assert_eq!(s.next().await, Some(2));
/// assert_eq!(s.next().await, Some(3)); /// assert_eq!(s.next().await, Some(3));

@ -12,8 +12,6 @@ impl FromStream<char> for String {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = String::new(); let mut out = String::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out
@ -29,8 +27,6 @@ impl<'b> FromStream<&'b char> for String {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = String::new(); let mut out = String::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out
@ -46,8 +42,6 @@ impl<'b> FromStream<&'b str> for String {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = String::new(); let mut out = String::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out
@ -63,8 +57,6 @@ impl FromStream<String> for String {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = String::new(); let mut out = String::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out
@ -80,8 +72,6 @@ impl<'b> FromStream<Cow<'b, str>> for String {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = String::new(); let mut out = String::new();
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out

@ -9,8 +9,10 @@ impl stream::Extend<()> for () {
stream: T, stream: T,
) -> Pin<Box<dyn Future<Output = ()> + 'a>> { ) -> Pin<Box<dyn Future<Output = ()> + 'a>> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream); pin_utils::pin_mut!(stream);
while let Some(_) = stream.next().await {} while let Some(_) = stream.next().await {}
}) })
} }

@ -17,8 +17,6 @@ impl<T> FromStream<T> for Vec<T> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = vec![]; let mut out = vec![];
stream::extend(&mut out, stream).await; stream::extend(&mut out, stream).await;
out out
@ -34,8 +32,6 @@ impl<'b, T: Clone> FromStream<T> for Cow<'b, [T]> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
Cow::Owned(FromStream::from_stream(stream).await) Cow::Owned(FromStream::from_stream(stream).await)
}) })
} }
@ -49,8 +45,6 @@ impl<T> FromStream<T> for Box<[T]> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
Vec::from_stream(stream).await.into_boxed_slice() Vec::from_stream(stream).await.into_boxed_slice()
}) })
} }
@ -64,8 +58,6 @@ impl<T> FromStream<T> for Rc<[T]> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
Vec::from_stream(stream).await.into() Vec::from_stream(stream).await.into()
}) })
} }
@ -79,8 +71,6 @@ impl<T> FromStream<T> for Arc<[T]> {
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
pin_utils::pin_mut!(stream);
Vec::from_stream(stream).await.into() Vec::from_stream(stream).await.into()
}) })
} }

Loading…
Cancel
Save