From 352f18bc2a404b755dd7cd021ca50b3a79a7cc14 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 11 Nov 2019 11:10:36 +0100 Subject: [PATCH 01/12] Use async_std::sync::Arc in examples (#501) --- src/stream/from_fn.rs | 3 +-- src/sync/mod.rs | 4 +--- src/sync/mutex.rs | 12 +++--------- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index a28a901..f7a421f 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -34,8 +34,7 @@ pin_project! { /// # async_std::task::block_on(async { /// # /// use async_std::prelude::*; -/// use async_std::sync::Mutex; -/// use std::sync::Arc; +/// use async_std::sync::{Arc, Mutex}; /// use async_std::stream; /// /// let count = Arc::new(Mutex::new(0u8)); diff --git a/src/sync/mod.rs b/src/sync/mod.rs index fdeb48c..088c520 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -155,9 +155,7 @@ //! ``` //! # async_std::task::block_on(async { //! # -//! use std::sync::Arc; -//! -//! use async_std::sync::Mutex; +//! use async_std::sync::{Arc, Mutex}; //! use async_std::task; //! //! let m1 = Arc::new(Mutex::new(0)); diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 5bec6a2..2c0ac0c 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -19,9 +19,7 @@ use crate::task::{Context, Poll}; /// ``` /// # async_std::task::block_on(async { /// # -/// use std::sync::Arc; -/// -/// use async_std::sync::Mutex; +/// use async_std::sync::{Arc, Mutex}; /// use async_std::task; /// /// let m = Arc::new(Mutex::new(0)); @@ -77,9 +75,7 @@ impl Mutex { /// ``` /// # async_std::task::block_on(async { /// # - /// use std::sync::Arc; - /// - /// use async_std::sync::Mutex; + /// use async_std::sync::{Arc, Mutex}; /// use async_std::task; /// /// let m1 = Arc::new(Mutex::new(10)); @@ -155,9 +151,7 @@ impl Mutex { /// ``` /// # async_std::task::block_on(async { /// # - /// use std::sync::Arc; - /// - /// use async_std::sync::Mutex; + /// use async_std::sync::{Arc, Mutex}; /// use async_std::task; /// /// let m1 = Arc::new(Mutex::new(10)); From c2f750d2882b4215ae6962d419056ba00b000f6d Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 11 Nov 2019 10:21:42 +0100 Subject: [PATCH 02/12] Cleanup stream module --- src/io/mod.rs | 18 ++-- src/stream/extend.rs | 6 +- src/stream/from_fn.rs | 81 ++++++------------ src/stream/from_iter.rs | 2 +- src/stream/mod.rs | 4 +- src/stream/once.rs | 2 +- src/stream/repeat.rs | 2 +- src/stream/repeat_with.rs | 79 +++++++---------- src/stream/stream/chain.rs | 2 +- src/stream/stream/cloned.rs | 1 + src/stream/stream/copied.rs | 4 +- src/stream/stream/cycle.rs | 81 +++++++----------- src/stream/stream/enumerate.rs | 3 +- src/stream/stream/filter.rs | 9 +- src/stream/stream/filter_map.rs | 19 ++--- src/stream/stream/find.rs | 18 ++-- src/stream/stream/find_map.rs | 20 ++--- src/stream/stream/flat_map.rs | 17 ++-- src/stream/stream/flatten.rs | 35 ++++++-- src/stream/stream/fold.rs | 14 ++-- src/stream/stream/for_each.rs | 9 +- src/stream/stream/inspect.rs | 9 +- src/stream/stream/map.rs | 15 ++-- src/stream/stream/mod.rs | 135 +++++++++++++++--------------- src/stream/stream/position.rs | 49 ++++++----- src/stream/stream/skip_while.rs | 9 +- src/stream/stream/take_while.rs | 9 +- src/stream/stream/timeout.rs | 2 +- src/stream/stream/try_fold.rs | 45 +++++----- src/stream/stream/try_for_each.rs | 43 ++++------ src/stream/stream/zip.rs | 2 +- 31 files changed, 315 insertions(+), 429 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index c471159..4e83230 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -19,8 +19,8 @@ //! [`File`]s: //! //! ```no_run -//! use async_std::prelude::*; //! use async_std::fs::File; +//! use async_std::prelude::*; //! //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # @@ -47,9 +47,9 @@ //! coming from: //! //! ```no_run -//! use async_std::io::prelude::*; -//! use async_std::io::SeekFrom; //! use async_std::fs::File; +//! use async_std::io::SeekFrom; +//! use async_std::prelude::*; //! //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # @@ -82,9 +82,9 @@ //! methods to any reader: //! //! ```no_run -//! use async_std::io::prelude::*; -//! use async_std::io::BufReader; //! use async_std::fs::File; +//! use async_std::io::BufReader; +//! use async_std::prelude::*; //! //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # @@ -104,9 +104,9 @@ //! to [`write`][`Write::write`]: //! //! ```no_run -//! use async_std::io::prelude::*; -//! use async_std::io::BufWriter; //! use async_std::fs::File; +//! use async_std::io::BufWriter; +//! use async_std::io::prelude::*; //! //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # @@ -179,9 +179,9 @@ //! lines: //! //! ```no_run -//! use async_std::prelude::*; -//! use async_std::io::BufReader; //! use async_std::fs::File; +//! use async_std::io::BufReader; +//! use async_std::prelude::*; //! //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # diff --git a/src/stream/extend.rs b/src/stream/extend.rs index 7bdfd34..c48fe1e 100644 --- a/src/stream/extend.rs +++ b/src/stream/extend.rs @@ -65,10 +65,10 @@ pub trait Extend { /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] -pub async fn extend<'a, C, A, T>(collection: &mut C, stream: T) +pub async fn extend<'a, C, T, S>(collection: &mut C, stream: S) where - C: Extend, - T: IntoStream + 'a, + C: Extend, + S: IntoStream + 'a, { Extend::extend(collection, stream).await } diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index a28a901..24432c7 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -1,28 +1,21 @@ -use std::marker::PhantomData; use std::pin::Pin; -use std::future::Future; - -use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -pin_project! { - /// A stream that yields elements by calling a closure. - /// - /// This stream is created by the [`from_fn`] function. See its - /// documentation for more. - /// - /// [`from_fn`]: fn.from_fn.html - #[derive(Debug)] - pub struct FromFn { - f: F, - #[pin] - future: Option, - __t: PhantomData, - } +/// A stream that yields elements by calling a closure. +/// +/// This stream is created by the [`from_fn`] function. See its +/// documentation for more. +/// +/// [`from_fn`]: fn.from_fn.html +#[derive(Clone, Debug)] +pub struct FromFn { + f: F, } +impl Unpin for FromFn {} + /// Creates a new stream where to produce each new element a provided closure is called. /// /// This allows creating a custom stream with any behaviour without using the more verbose @@ -34,22 +27,15 @@ pin_project! { /// # async_std::task::block_on(async { /// # /// use async_std::prelude::*; -/// use async_std::sync::Mutex; -/// use std::sync::Arc; /// use async_std::stream; /// -/// let count = Arc::new(Mutex::new(0u8)); +/// let mut count = 0u8; /// let s = stream::from_fn(|| { -/// let count = Arc::clone(&count); -/// -/// async move { -/// *count.lock().await += 1; -/// -/// if *count.lock().await > 3 { -/// None -/// } else { -/// Some(*count.lock().await) -/// } +/// count += 1; +/// if count > 3 { +/// None +/// } else { +/// Some(count) /// } /// }); /// @@ -61,38 +47,21 @@ pin_project! { /// # /// # }) /// ``` -pub fn from_fn(f: F) -> FromFn +pub fn from_fn(f: F) -> FromFn where - F: FnMut() -> Fut, - Fut: Future>, + F: FnMut() -> Option, { - FromFn { - f, - future: None, - __t: PhantomData, - } + FromFn { f } } -impl Stream for FromFn +impl Stream for FromFn where - F: FnMut() -> Fut, - Fut: Future>, + F: FnMut() -> Option, { type Item = T; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - loop { - if this.future.is_some() { - let next = - futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); - this.future.set(None); - - return Poll::Ready(next); - } else { - let fut = (this.f)(); - this.future.set(Some(fut)); - } - } + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + let item = (&mut self.f)(); + Poll::Ready(item) } } diff --git a/src/stream/from_iter.rs b/src/stream/from_iter.rs index a83afce..d7a31d6 100644 --- a/src/stream/from_iter.rs +++ b/src/stream/from_iter.rs @@ -6,7 +6,7 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { - /// A stream that created from iterator + /// A stream that was created from iterator. /// /// This stream is created by the [`from_iter`] function. /// See it documentation for more. diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 692c5de..f782882 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -306,9 +306,7 @@ pub use from_iter::{from_iter, FromIter}; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; pub use repeat_with::{repeat_with, RepeatWith}; -pub use stream::{ - Chain, Filter, Fuse, Inspect, Scan, Skip, SkipWhile, StepBy, Stream, Take, TakeWhile, Zip, -}; +pub use stream::*; pub(crate) mod stream; diff --git a/src/stream/once.rs b/src/stream/once.rs index d993c16..a33bd6a 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -33,7 +33,7 @@ pin_project! { /// documentation for more. /// /// [`once`]: fn.once.html - #[derive(Debug)] + #[derive(Clone, Debug)] pub struct Once { value: Option, } diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index aaaff0c..f3dfdbd 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -33,7 +33,7 @@ where /// documentation for more. /// /// [`repeat`]: fn.repeat.html -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Repeat { item: T, } diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs index 6e7cfa3..e183a77 100644 --- a/src/stream/repeat_with.rs +++ b/src/stream/repeat_with.rs @@ -1,28 +1,21 @@ -use std::marker::PhantomData; use std::pin::Pin; -use std::future::Future; - -use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; -pin_project! { - /// A stream that repeats elements of type `T` endlessly by applying a provided closure. - /// - /// This stream is created by the [`repeat_with`] function. See its - /// documentation for more. - /// - /// [`repeat_with`]: fn.repeat_with.html - #[derive(Debug)] - pub struct RepeatWith { - f: F, - #[pin] - future: Option, - __a: PhantomData, - } +/// A stream that repeats elements of type `T` endlessly by applying a provided closure. +/// +/// This stream is created by the [`repeat_with`] function. See its +/// documentation for more. +/// +/// [`repeat_with`]: fn.repeat_with.html +#[derive(Clone, Debug)] +pub struct RepeatWith { + f: F, } +impl Unpin for RepeatWith {} + /// Creates a new stream that repeats elements of type `A` endlessly by applying the provided closure. /// /// # Examples @@ -35,7 +28,7 @@ pin_project! { /// use async_std::prelude::*; /// use async_std::stream; /// -/// let s = stream::repeat_with(|| async { 1 }); +/// let s = stream::repeat_with(|| 1); /// /// pin_utils::pin_mut!(s); /// @@ -54,48 +47,38 @@ pin_project! { /// use async_std::prelude::*; /// use async_std::stream; /// -/// let s = stream::repeat_with(|| async { 1u8 }).take(2); +/// let mut n = 1; +/// let s = stream::repeat_with(|| { +/// let item = n; +/// n *= 2; +/// item +/// }) +/// .take(4); /// /// pin_utils::pin_mut!(s); /// /// assert_eq!(s.next().await, Some(1)); -/// assert_eq!(s.next().await, Some(1)); +/// assert_eq!(s.next().await, Some(2)); +/// assert_eq!(s.next().await, Some(4)); +/// assert_eq!(s.next().await, Some(8)); /// assert_eq!(s.next().await, None); /// # }) /// ``` -pub fn repeat_with(repeater: F) -> RepeatWith +pub fn repeat_with(repeater: F) -> RepeatWith where - F: FnMut() -> Fut, - Fut: Future, + F: FnMut() -> T, { - RepeatWith { - f: repeater, - future: None, - __a: PhantomData, - } + RepeatWith { f: repeater } } -impl Stream for RepeatWith +impl Stream for RepeatWith where - F: FnMut() -> Fut, - Fut: Future, + F: FnMut() -> T, { - type Item = A; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - loop { - if this.future.is_some() { - let res = futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); - - this.future.set(None); - - return Poll::Ready(Some(res)); - } else { - let fut = (this.f)(); + type Item = T; - this.future.set(Some(fut)); - } - } + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + let item = (&mut self.f)(); + Poll::Ready(Some(item)) } } diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index 5e0eeb4..f6d9cf6 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -7,7 +7,7 @@ use crate::prelude::*; use crate::task::{Context, Poll}; pin_project! { - /// Chains two streams one after another. + /// A stream that chains two streams one after another. /// /// This `struct` is created by the [`chain`] method on [`Stream`]. See its /// documentation for more. diff --git a/src/stream/stream/cloned.rs b/src/stream/stream/cloned.rs index 19dfbc8..4c77c5c 100644 --- a/src/stream/stream/cloned.rs +++ b/src/stream/stream/cloned.rs @@ -4,6 +4,7 @@ use pin_project_lite::pin_project; use std::pin::Pin; pin_project! { + /// A stream that clones the elements of an underlying stream. #[derive(Debug)] pub struct Cloned { #[pin] diff --git a/src/stream/stream/copied.rs b/src/stream/stream/copied.rs index 477d59d..e3c8367 100644 --- a/src/stream/stream/copied.rs +++ b/src/stream/stream/copied.rs @@ -4,8 +4,8 @@ use pin_project_lite::pin_project; use std::pin::Pin; pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] + /// A stream that copies the elements of an underlying stream. + #[derive(Debug)] pub struct Copied { #[pin] stream: S, diff --git a/src/stream/stream/cycle.rs b/src/stream/stream/cycle.rs index 8a31cc1..7f01a61 100644 --- a/src/stream/stream/cycle.rs +++ b/src/stream/stream/cycle.rs @@ -1,73 +1,54 @@ +use std::mem::ManuallyDrop; use std::pin::Pin; -use pin_project_lite::pin_project; - use crate::stream::Stream; use crate::task::{Context, Poll}; -pin_project! { - /// A stream that will repeatedly yield the same list of elements - pub struct Cycle { - #[pin] - source: S, - index: usize, - buffer: Vec, - state: CycleState, - } -} - -#[derive(Eq, PartialEq)] -enum CycleState { - FromStream, - FromBuffer, +/// A stream that will repeatedly yield the same list of elements. +#[derive(Debug)] +pub struct Cycle { + orig: S, + source: ManuallyDrop, } -impl Cycle +impl Cycle where - S: Stream, - S::Item: Clone, + S: Stream + Clone, { - pub fn new(source: S) -> Cycle { + pub fn new(source: S) -> Cycle { Cycle { - source, - index: 0, - buffer: Vec::new(), - state: CycleState::FromStream, + orig: source.clone(), + source: ManuallyDrop::new(source), } } } -impl Stream for Cycle +impl Drop for Cycle { + fn drop(&mut self) { + unsafe { + ManuallyDrop::drop(&mut self.source); + } + } +} + +impl Stream for Cycle where - S: Stream, - S::Item: Clone, + S: Stream + Clone, { type Item = S::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - let mut next; - if *this.state == CycleState::FromStream { - next = futures_core::ready!(this.source.poll_next(cx)); - - if let Some(val) = next { - this.buffer.push(val.clone()); - next = Some(val) - } else { - *this.state = CycleState::FromBuffer; - next = this.buffer.get(*this.index).cloned(); + unsafe { + let this = self.get_unchecked_mut(); + + match futures_core::ready!(Pin::new_unchecked(&mut *this.source).poll_next(cx)) { + Some(item) => Poll::Ready(Some(item)), + None => { + ManuallyDrop::drop(&mut this.source); + this.source = ManuallyDrop::new(this.orig.clone()); + Pin::new_unchecked(&mut *this.source).poll_next(cx) + } } - } else { - let mut index = *this.index; - if index == this.buffer.len() { - index = 0 - } - next = Some(this.buffer[index].clone()); - - *this.index = index + 1; } - - Poll::Ready(next) } } diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index 2a3afa8..a758010 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -6,8 +6,7 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] + #[derive(Debug)] pub struct Enumerate { #[pin] stream: S, diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index a2562e7..594b094 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use pin_project_lite::pin_project; @@ -15,25 +14,23 @@ pin_project! { /// [`filter`]: trait.Stream.html#method.filter /// [`Stream`]: trait.Stream.html #[derive(Debug)] - pub struct Filter { + pub struct Filter { #[pin] stream: S, predicate: P, - __t: PhantomData, } } -impl Filter { +impl Filter { pub(super) fn new(stream: S, predicate: P) -> Self { Filter { stream, predicate, - __t: PhantomData, } } } -impl Stream for Filter +impl Stream for Filter where S: Stream, P: FnMut(&S::Item) -> bool, diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index 6a4593f..e110f51 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -7,29 +6,21 @@ use pin_project_lite::pin_project; use crate::stream::Stream; pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] - pub struct FilterMap { + #[derive(Debug)] + pub struct FilterMap { #[pin] stream: S, f: F, - __from: PhantomData, - __to: PhantomData, } } -impl FilterMap { +impl FilterMap { pub(crate) fn new(stream: S, f: F) -> Self { - FilterMap { - stream, - f, - __from: PhantomData, - __to: PhantomData, - } + FilterMap { stream, f } } } -impl Stream for FilterMap +impl Stream for FilterMap where S: Stream, F: FnMut(S::Item) -> Option, diff --git a/src/stream/stream/find.rs b/src/stream/stream/find.rs index b37a6a4..0c5ad62 100644 --- a/src/stream/stream/find.rs +++ b/src/stream/stream/find.rs @@ -1,31 +1,25 @@ -use std::marker::PhantomData; -use std::pin::Pin; use std::future::Future; +use std::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct FindFuture<'a, S, P, T> { +pub struct FindFuture<'a, S, P> { stream: &'a mut S, p: P, - __t: PhantomData, } -impl<'a, S, P, T> FindFuture<'a, S, P, T> { +impl<'a, S, P> FindFuture<'a, S, P> { pub(super) fn new(stream: &'a mut S, p: P) -> Self { - FindFuture { - stream, - p, - __t: PhantomData, - } + FindFuture { stream, p } } } -impl Unpin for FindFuture<'_, S, P, T> {} +impl Unpin for FindFuture<'_, S, P> {} -impl<'a, S, P> Future for FindFuture<'a, S, P, S::Item> +impl<'a, S, P> Future for FindFuture<'a, S, P> where S: Stream + Unpin + Sized, P: FnMut(&S::Item) -> bool, diff --git a/src/stream/stream/find_map.rs b/src/stream/stream/find_map.rs index 16993fc..b10bd9c 100644 --- a/src/stream/stream/find_map.rs +++ b/src/stream/stream/find_map.rs @@ -1,33 +1,25 @@ -use std::marker::PhantomData; +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use std::future::Future; use crate::stream::Stream; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct FindMapFuture<'a, S, F, T, B> { +pub struct FindMapFuture<'a, S, F> { stream: &'a mut S, f: F, - __b: PhantomData, - __t: PhantomData, } -impl<'a, S, B, F, T> FindMapFuture<'a, S, F, T, B> { +impl<'a, S, F> FindMapFuture<'a, S, F> { pub(super) fn new(stream: &'a mut S, f: F) -> Self { - FindMapFuture { - stream, - f, - __b: PhantomData, - __t: PhantomData, - } + FindMapFuture { stream, f } } } -impl Unpin for FindMapFuture<'_, S, F, T, B> {} +impl Unpin for FindMapFuture<'_, S, F> {} -impl<'a, S, B, F> Future for FindMapFuture<'a, S, F, S::Item, B> +impl<'a, S, B, F> Future for FindMapFuture<'a, S, F> where S: Stream + Unpin + Sized, F: FnMut(S::Item) -> Option, diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs index ed3268e..ab45c9c 100644 --- a/src/stream/stream/flat_map.rs +++ b/src/stream/stream/flat_map.rs @@ -1,33 +1,36 @@ -use pin_project_lite::pin_project; use std::pin::Pin; +use pin_project_lite::pin_project; + use crate::prelude::*; use crate::stream::stream::map::Map; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; pin_project! { + /// A stream that maps each element to a stream, and yields the elements of the produced + /// streams. + /// /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its /// documentation for more. /// /// [`flat_map`]: trait.Stream.html#method.flat_map /// [`Stream`]: trait.Stream.html - #[allow(missing_debug_implementations)] - pub struct FlatMap { + pub struct FlatMap { #[pin] - stream: Map, + stream: Map, #[pin] inner_stream: Option, } } -impl FlatMap +impl FlatMap where S: Stream, U: IntoStream, F: FnMut(S::Item) -> U, { - pub(super) fn new(stream: S, f: F) -> FlatMap { + pub(super) fn new(stream: S, f: F) -> FlatMap { FlatMap { stream: stream.map(f), inner_stream: None, @@ -35,7 +38,7 @@ where } } -impl Stream for FlatMap +impl Stream for FlatMap where S: Stream, S::Item: IntoStream, diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 5e791cd..edaffd0 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,30 +1,38 @@ -use pin_project_lite::pin_project; +use std::fmt; use std::pin::Pin; +use pin_project_lite::pin_project; + use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; pin_project! { + /// A stream that flattens one level of nesting in an stream of things that can be turned into + /// streams. + /// /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its /// documentation for more. /// /// [`flatten`]: trait.Stream.html#method.flatten /// [`Stream`]: trait.Stream.html - #[allow(missing_debug_implementations)] - pub struct Flatten { + pub struct Flatten + where + S: Stream, + S::Item: IntoStream, + { #[pin] stream: S, #[pin] - inner_stream: Option, + inner_stream: Option<::IntoStream>, } } -impl Flatten +impl Flatten where S: Stream, S::Item: IntoStream, { - pub(super) fn new(stream: S) -> Flatten { + pub(super) fn new(stream: S) -> Flatten { Flatten { stream, inner_stream: None, @@ -32,7 +40,7 @@ where } } -impl Stream for Flatten::IntoStream> +impl Stream for Flatten where S: Stream, S::Item: IntoStream, @@ -56,3 +64,16 @@ where } } } + +impl fmt::Debug for Flatten +where + S: fmt::Debug + Stream, + S::Item: IntoStream, + U: fmt::Debug + Stream, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Flatten") + .field("inner", &self.stream) + .finish() + } +} diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index 66a7672..c4da591 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -1,6 +1,5 @@ -use std::marker::PhantomData; -use std::pin::Pin; use std::future::Future; +use std::pin::Pin; use pin_project_lite::pin_project; @@ -8,29 +7,26 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] - pub struct FoldFuture { + #[derive(Debug)] + pub struct FoldFuture { #[pin] stream: S, f: F, acc: Option, - __t: PhantomData, } } -impl FoldFuture { +impl FoldFuture { pub(super) fn new(stream: S, init: B, f: F) -> Self { FoldFuture { stream, f, acc: Some(init), - __t: PhantomData, } } } -impl Future for FoldFuture +impl Future for FoldFuture where S: Stream + Sized, F: FnMut(B, S::Item) -> B, diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index 6383ed7..01833fd 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use std::future::Future; @@ -10,25 +9,23 @@ use crate::task::{Context, Poll}; pin_project! { #[doc(hidden)] #[allow(missing_debug_implementations)] - pub struct ForEachFuture { + pub struct ForEachFuture { #[pin] stream: S, f: F, - __t: PhantomData, } } -impl ForEachFuture { +impl ForEachFuture { pub(super) fn new(stream: S, f: F) -> Self { ForEachFuture { stream, f, - __t: PhantomData, } } } -impl Future for ForEachFuture +impl Future for ForEachFuture where S: Stream + Sized, F: FnMut(S::Item), diff --git a/src/stream/stream/inspect.rs b/src/stream/stream/inspect.rs index ba60b0c..acf2246 100644 --- a/src/stream/stream/inspect.rs +++ b/src/stream/stream/inspect.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use pin_project_lite::pin_project; @@ -15,25 +14,23 @@ pin_project! { /// [`inspect`]: trait.Stream.html#method.inspect /// [`Stream`]: trait.Stream.html #[derive(Debug)] - pub struct Inspect { + pub struct Inspect { #[pin] stream: S, f: F, - __t: PhantomData, } } -impl Inspect { +impl Inspect { pub(super) fn new(stream: S, f: F) -> Self { Inspect { stream, f, - __t: PhantomData, } } } -impl Stream for Inspect +impl Stream for Inspect where S: Stream, F: FnMut(&S::Item), diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index a1fafc3..7accb6f 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use pin_project_lite::pin_project; @@ -7,29 +6,25 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] - pub struct Map { + /// A stream that maps value of another stream with a function. + #[derive(Debug)] + pub struct Map { #[pin] stream: S, f: F, - __from: PhantomData, - __to: PhantomData, } } -impl Map { +impl Map { pub(crate) fn new(stream: S, f: F) -> Self { Map { stream, f, - __from: PhantomData, - __to: PhantomData, } } } -impl Stream for Map +impl Stream for Map where S: Stream, F: FnMut(S::Item) -> B, diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8ea1459..5756a21 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -288,6 +288,7 @@ extension_trait! { Creates a stream that yields elements based on a predicate. # Examples + ``` # fn main() { async_std::task::block_on(async { # @@ -300,12 +301,11 @@ extension_trait! { assert_eq!(s.next().await, Some(1)); assert_eq!(s.next().await, Some(2)); assert_eq!(s.next().await, None); - # # }) } ``` "#] - fn take_while

(self, predicate: P) -> TakeWhile + fn take_while

(self, predicate: P) -> TakeWhile where Self: Sized, P: FnMut(&Self::Item) -> bool, @@ -410,10 +410,10 @@ extension_trait! { # }) } ``` "#] - fn cloned<'a,T>(self) -> Cloned + fn cloned<'a, T>(self) -> Cloned where Self: Sized + Stream, - T : 'a + Clone, + T: Clone + 'a, { Cloned::new(self) } @@ -443,10 +443,10 @@ extension_trait! { # }) } ``` "#] - fn copied<'a,T>(self) -> Copied + fn copied<'a, T>(self) -> Copied where Self: Sized + Stream, - T : 'a + Copy, + T: Copy + 'a, { Copied::new(self) } @@ -475,10 +475,9 @@ extension_trait! { # }) ``` "#] - fn cycle(self) -> Cycle - where - Self: Sized, - Self::Item: Clone, + fn cycle(self) -> Cycle + where + Self: Clone + Sized, { Cycle::new(self) } @@ -505,7 +504,6 @@ extension_trait! { assert_eq!(s.next().await, Some((1, 'b'))); assert_eq!(s.next().await, Some((2, 'c'))); assert_eq!(s.next().await, None); - # # }) } ``` @@ -540,7 +538,7 @@ extension_trait! { # }) } ``` "#] - fn map(self, f: F) -> Map + fn map(self, f: F) -> Map where Self: Sized, F: FnMut(Self::Item) -> B, @@ -565,17 +563,18 @@ extension_trait! { let s = stream::from_iter(vec![1, 2, 3, 4, 5]); let sum = s - .inspect(|x| println!("about to filter {}", x)) - .filter(|x| x % 2 == 0) - .inspect(|x| println!("made it through filter: {}", x)) - .fold(0, |sum, i| sum + i).await; + .inspect(|x| println!("about to filter {}", x)) + .filter(|x| x % 2 == 0) + .inspect(|x| println!("made it through filter: {}", x)) + .fold(0, |sum, i| sum + i) + .await; assert_eq!(sum, 6); # # }) } ``` "#] - fn inspect(self, f: F) -> Inspect + fn inspect(self, f: F) -> Inspect where Self: Sized, F: FnMut(&Self::Item), @@ -618,7 +617,6 @@ extension_trait! { # # }) } ``` - "#] fn last( self, @@ -685,7 +683,7 @@ extension_trait! { # }) } ``` "#] - fn filter

(self, predicate: P) -> Filter + fn filter

(self, predicate: P) -> Filter where Self: Sized, P: FnMut(&Self::Item) -> bool, @@ -721,7 +719,7 @@ extension_trait! { "#] #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn flat_map(self, f: F) -> FlatMap + fn flat_map(self, f: F) -> FlatMap where Self: Sized, U: IntoStream, @@ -755,7 +753,7 @@ extension_trait! { "#] #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn flatten(self) -> Flatten + fn flatten(self) -> Flatten where Self: Sized, Self::Item: IntoStream, @@ -796,7 +794,7 @@ extension_trait! { # }) } ``` "#] - fn filter_map(self, f: F) -> FilterMap + fn filter_map(self, f: F) -> FilterMap where Self: Sized, F: FnMut(Self::Item) -> Option, @@ -804,7 +802,7 @@ extension_trait! { FilterMap::new(self, f) } - #[doc = r#" + #[doc = r#" Returns the element that gives the minimum value with respect to the specified key function. If several elements are equally minimum, the first element is returned. If the stream is empty, `None` is returned. @@ -828,19 +826,19 @@ extension_trait! { # }) } ``` "#] - fn min_by_key( + fn min_by_key( self, - key_by: K, - ) -> impl Future> [MinByKeyFuture] + key_by: F, + ) -> impl Future> [MinByKeyFuture] where Self: Sized, - Self::Item: Ord, - K: FnMut(&Self::Item) -> Self::Item, + B: Ord, + F: FnMut(&Self::Item) -> B, { MinByKeyFuture::new(self, key_by) } - #[doc = r#" + #[doc = r#" Returns the element that gives the maximum value with respect to the specified key function. If several elements are equally maximum, the first element is returned. If the stream is empty, `None` is returned. @@ -864,14 +862,14 @@ extension_trait! { # }) } ``` "#] - fn max_by_key( + fn max_by_key( self, - key_by: K, - ) -> impl Future> [MaxByKeyFuture] + key_by: F, + ) -> impl Future> [MaxByKeyFuture] where Self: Sized, - Self::Item: Ord, - K: FnMut(&Self::Item) -> Self::Item, + B: Ord, + F: FnMut(&Self::Item) -> B, { MaxByKeyFuture::new(self, key_by) } @@ -1043,7 +1041,7 @@ extension_trait! { n: usize, ) -> impl Future> + '_ [NthFuture<'_, Self>] where - Self: Sized, + Self: Unpin + Sized, { NthFuture::new(self, n) } @@ -1151,9 +1149,9 @@ extension_trait! { fn find

( &mut self, p: P, - ) -> impl Future> + '_ [FindFuture<'_, Self, P, Self::Item>] + ) -> impl Future> + '_ [FindFuture<'_, Self, P>] where - Self: Sized, + Self: Unpin + Sized, P: FnMut(&Self::Item) -> bool, { FindFuture::new(self, p) @@ -1179,9 +1177,9 @@ extension_trait! { fn find_map( &mut self, f: F, - ) -> impl Future> + '_ [FindMapFuture<'_, Self, F, Self::Item, B>] + ) -> impl Future> + '_ [FindMapFuture<'_, Self, F>] where - Self: Sized, + Self: Unpin + Sized, F: FnMut(Self::Item) -> Option, { FindMapFuture::new(self, f) @@ -1213,7 +1211,7 @@ extension_trait! { self, init: B, f: F, - ) -> impl Future [FoldFuture] + ) -> impl Future [FoldFuture] where Self: Sized, F: FnMut(B, Self::Item) -> B, @@ -1248,7 +1246,7 @@ extension_trait! { fn for_each( self, f: F, - ) -> impl Future [ForEachFuture] + ) -> impl Future [ForEachFuture] where Self: Sized, F: FnMut(Self::Item), @@ -1389,7 +1387,7 @@ extension_trait! { # }) } ``` "#] - fn skip_while

(self, predicate: P) -> SkipWhile + fn skip_while

(self, predicate: P) -> SkipWhile where Self: Sized, P: FnMut(&Self::Item) -> bool, @@ -1472,7 +1470,7 @@ extension_trait! { use async_std::prelude::*; use async_std::stream; - let s = stream::from_iter(vec![1usize, 2, 3]); + let mut s = stream::from_iter(vec![1usize, 2, 3]); let sum = s.try_fold(0, |acc, v| { if (acc+v) % 2 == 1 { Ok(v+3) @@ -1487,12 +1485,12 @@ extension_trait! { ``` "#] fn try_fold( - self, + &mut self, init: T, f: F, - ) -> impl Future> [TryFoldFuture] + ) -> impl Future> + '_ [TryFoldFuture<'_, Self, F, T>] where - Self: Sized, + Self: Unpin + Sized, F: FnMut(B, Self::Item) -> Result, { TryFoldFuture::new(self, init, f) @@ -1512,7 +1510,7 @@ extension_trait! { let (tx, rx) = channel(); - let s = stream::from_iter(vec![1u8, 2, 3]); + let mut s = stream::from_iter(vec![1u8, 2, 3]); let s = s.try_for_each(|v| { if v % 2 == 1 { tx.clone().send(v).unwrap(); @@ -1533,11 +1531,11 @@ extension_trait! { ``` "#] fn try_for_each( - self, + &mut self, f: F, - ) -> impl Future [TryForEachFuture] + ) -> impl Future + 'a [TryForEachFuture<'_, Self, F>] where - Self: Sized, + Self: Unpin + Sized, F: FnMut(Self::Item) -> Result<(), E>, { TryForEachFuture::new(self, f) @@ -1582,7 +1580,7 @@ extension_trait! { #[inline] fn zip(self, other: U) -> Zip where - Self: Sized + Stream, + Self: Sized, U: Stream, { Zip::new(self, other) @@ -1641,7 +1639,6 @@ extension_trait! { "#] #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - #[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"] fn collect<'a, B>( self, ) -> impl Future + 'a [Pin + 'a>>] @@ -1740,28 +1737,28 @@ extension_trait! { use async_std::stream; let s = stream::from_iter(vec![1usize, 2, 3]); - let res = s.clone().position(|x| *x == 1).await; + let res = s.clone().position(|x| x == 1).await; assert_eq!(res, Some(0)); - let res = s.clone().position(|x| *x == 2).await; + let res = s.clone().position(|x| x == 2).await; assert_eq!(res, Some(1)); - let res = s.clone().position(|x| *x == 3).await; + let res = s.clone().position(|x| x == 3).await; assert_eq!(res, Some(2)); - let res = s.clone().position(|x| *x == 4).await; + let res = s.clone().position(|x| x == 4).await; assert_eq!(res, None); # # }) } ``` "#] fn position

( - self, - predicate: P - ) -> impl Future> [PositionFuture] + &mut self, + predicate: P, + ) -> impl Future> + '_ [PositionFuture<'_, Self, P>] where - Self: Sized, - P: FnMut(&Self::Item) -> bool, + Self: Unpin + Sized, + P: FnMut(Self::Item) -> bool, { PositionFuture::new(self, predicate) } @@ -1805,10 +1802,12 @@ extension_trait! { CmpFuture::new(self, other) } - #[doc = r#" + #[doc = r#" Determines if the elements of this `Stream` are lexicographically not equal to those of another. + # Examples + ``` # fn main() { async_std::task::block_on(async { # @@ -1833,7 +1832,7 @@ extension_trait! { other: S ) -> impl Future [NeFuture] where - Self: Sized + Stream, + Self: Sized, S: Sized + Stream, ::Item: PartialEq, { @@ -2026,11 +2025,11 @@ extension_trait! { } #[doc = r#" - Sums the elements of an iterator. + Sums the elements of a stream. Takes each element, adds them together, and returns the result. - An empty iterator returns the zero value of the type. + An empty streams returns the zero value of the type. # Panics @@ -2063,15 +2062,15 @@ extension_trait! { ) -> impl Future + 'a [Pin + 'a>>] where Self: Sized + Stream + 'a, - S: Sum, + S: Sum, { Sum::sum(self) } #[doc = r#" - Iterates over the entire iterator, multiplying all the elements + Multiplies all elements of the stream. - An empty iterator returns the one value of the type. + An empty stream returns the one value of the type. # Panics diff --git a/src/stream/stream/position.rs b/src/stream/stream/position.rs index 3d8f40d..5a51d7a 100644 --- a/src/stream/stream/position.rs +++ b/src/stream/stream/position.rs @@ -1,24 +1,21 @@ -use std::pin::Pin; use std::future::Future; - -use pin_project_lite::pin_project; +use std::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; -pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] - pub struct PositionFuture { - #[pin] - stream: S, - predicate: P, - index:usize, - } +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct PositionFuture<'a, S, P> { + stream: &'a mut S, + predicate: P, + index: usize, } -impl PositionFuture { - pub(super) fn new(stream: S, predicate: P) -> Self { +impl<'a, S, P> Unpin for PositionFuture<'a, S, P> {} + +impl<'a, S, P> PositionFuture<'a, S, P> { + pub(super) fn new(stream: &'a mut S, predicate: P) -> Self { PositionFuture { stream, predicate, @@ -27,23 +24,25 @@ impl PositionFuture { } } -impl Future for PositionFuture +impl<'a, S, P> Future for PositionFuture<'a, S, P> where - S: Stream, - P: FnMut(&S::Item) -> bool, + S: Stream + Unpin, + P: FnMut(S::Item) -> bool, { type Output = Option; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let next = futures_core::ready!(this.stream.poll_next(cx)); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); match next { - Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)), - Some(_) => { - cx.waker().wake_by_ref(); - *this.index += 1; - Poll::Pending + Some(v) => { + if (&mut self.predicate)(v) { + Poll::Ready(Some(self.index)) + } else { + cx.waker().wake_by_ref(); + self.index += 1; + Poll::Pending + } } None => Poll::Ready(None), } diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 6435d81..5cb273e 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use pin_project_lite::pin_project; @@ -15,25 +14,23 @@ pin_project! { /// [`skip_while`]: trait.Stream.html#method.skip_while /// [`Stream`]: trait.Stream.html #[derive(Debug)] - pub struct SkipWhile { + pub struct SkipWhile { #[pin] stream: S, predicate: Option

, - __t: PhantomData, } } -impl SkipWhile { +impl SkipWhile { pub(crate) fn new(stream: S, predicate: P) -> Self { SkipWhile { stream, predicate: Some(predicate), - __t: PhantomData, } } } -impl Stream for SkipWhile +impl Stream for SkipWhile where S: Stream, P: FnMut(&S::Item) -> bool, diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index 9b2945f..08b5a86 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use pin_project_lite::pin_project; @@ -15,25 +14,23 @@ pin_project! { /// [`take_while`]: trait.Stream.html#method.take_while /// [`Stream`]: trait.Stream.html #[derive(Debug)] - pub struct TakeWhile { + pub struct TakeWhile { #[pin] stream: S, predicate: P, - __t: PhantomData, } } -impl TakeWhile { +impl TakeWhile { pub(super) fn new(stream: S, predicate: P) -> Self { TakeWhile { stream, predicate, - __t: PhantomData, } } } -impl Stream for TakeWhile +impl Stream for TakeWhile where S: Stream, P: FnMut(&S::Item) -> bool, diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 636e406..560a0e4 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -22,7 +22,7 @@ pin_project! { } impl Timeout { - pub fn new(stream: S, dur: Duration) -> Timeout { + pub(crate) fn new(stream: S, dur: Duration) -> Timeout { let delay = Delay::new(dur); Timeout { stream, delay } diff --git a/src/stream/stream/try_fold.rs b/src/stream/stream/try_fold.rs index bf92ff5..efb9e33 100644 --- a/src/stream/stream/try_fold.rs +++ b/src/stream/stream/try_fold.rs @@ -1,58 +1,51 @@ -use std::marker::PhantomData; use std::pin::Pin; -use std::future::Future; - -use pin_project_lite::pin_project; +use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; -pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] - pub struct TryFoldFuture { - #[pin] - stream: S, - f: F, - acc: Option, - __t: PhantomData, - } +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct TryFoldFuture<'a, S, F, T> { + stream: &'a mut S, + f: F, + acc: Option, } -impl TryFoldFuture { - pub(super) fn new(stream: S, init: T, f: F) -> Self { +impl<'a, S, F, T> Unpin for TryFoldFuture<'a, S, F, T> {} + +impl<'a, S, F, T> TryFoldFuture<'a, S, F, T> { + pub(super) fn new(stream: &'a mut S, init: T, f: F) -> Self { TryFoldFuture { stream, f, acc: Some(init), - __t: PhantomData, } } } -impl Future for TryFoldFuture +impl<'a, S, F, T, E> Future for TryFoldFuture<'a, S, F, T> where - S: Stream + Sized, + S: Stream + Unpin, F: FnMut(T, S::Item) -> Result, { type Output = Result; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); match next { Some(v) => { - let old = this.acc.take().unwrap(); - let new = (this.f)(old, v); + let old = self.acc.take().unwrap(); + let new = (&mut self.f)(old, v); match new { - Ok(o) => *this.acc = Some(o), + Ok(o) => self.acc = Some(o), Err(e) => return Poll::Ready(Err(e)), } } - None => return Poll::Ready(Ok(this.acc.take().unwrap())), + None => return Poll::Ready(Ok(self.acc.take().unwrap())), } } } diff --git a/src/stream/stream/try_for_each.rs b/src/stream/stream/try_for_each.rs index 36198f9..30e3185 100644 --- a/src/stream/stream/try_for_each.rs +++ b/src/stream/stream/try_for_each.rs @@ -1,52 +1,39 @@ use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; -use pin_project_lite::pin_project; - use crate::stream::Stream; use crate::task::{Context, Poll}; -pin_project! { - #[doc(hidden)] - #[allow(missing_debug_implementations)] - pub struct TryForEachFuture { - #[pin] - stream: S, - f: F, - __from: PhantomData, - __to: PhantomData, - } +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct TryForEachFuture<'a, S, F> { + stream: &'a mut S, + f: F, } -impl TryForEachFuture { - pub(crate) fn new(stream: S, f: F) -> Self { - TryForEachFuture { - stream, - f, - __from: PhantomData, - __to: PhantomData, - } +impl<'a, S, F> Unpin for TryForEachFuture<'a, S, F> {} + +impl<'a, S, F> TryForEachFuture<'a, S, F> { + pub(crate) fn new(stream: &'a mut S, f: F) -> Self { + TryForEachFuture { stream, f } } } -impl Future for TryForEachFuture +impl<'a, S, F, E> Future for TryForEachFuture<'a, S, F> where - S: Stream, - S::Item: std::fmt::Debug, + S: Stream + Unpin, F: FnMut(S::Item) -> Result<(), E>, { type Output = Result<(), E>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - let item = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + let item = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)); match item { None => return Poll::Ready(Ok(())), Some(v) => { - let res = (this.f)(v); + let res = (&mut self.f)(v); if let Err(e) = res { return Poll::Ready(Err(e)); } diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 27681f3..f57d735 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -7,7 +7,7 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { - /// An iterator that iterates two other iterators simultaneously. + /// A stream that takes items from two other streams simultaneously. /// /// This `struct` is created by the [`zip`] method on [`Stream`]. See its /// documentation for more. From 5438258ceec7932fa407c5c7e0697c7ccec7d6f2 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Mon, 11 Nov 2019 13:19:59 +0100 Subject: [PATCH 03/12] Remove unused import --- src/stream/from_fn.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index e407866..24432c7 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -28,7 +28,6 @@ impl Unpin for FromFn {} /// # /// use async_std::prelude::*; /// use async_std::stream; -/// use async_std::sync::{Arc, Mutex}; /// /// let mut count = 0u8; /// let s = stream::from_fn(|| { From eea7af24db69585c7b51e3ff363cbc473a8ed84d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 10 Nov 2019 17:56:12 +0100 Subject: [PATCH 04/12] fix bugs in changelog Signed-off-by: Yoshua Wuyts --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3568abf..3747143 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -388,8 +388,9 @@ task::blocking(async { - Initial beta release -[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.11...HEAD -[0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11 +[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.12...HEAD +[0.99.12]: https://github.com/async-rs/async-std/compare/v0.99.11...v0.99.12 +[0.99.11]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11 [0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.9...v0.99.10 [0.99.9]: https://github.com/async-rs/async-std/compare/v0.99.8...v0.99.9 [0.99.8]: https://github.com/async-rs/async-std/compare/v0.99.7...v0.99.8 From 4aa9928ece3d968ea229842cbdaf53dd81fc11c2 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 10 Nov 2019 18:14:46 +0100 Subject: [PATCH 05/12] v1.0.0 Signed-off-by: Yoshua Wuyts --- CHANGELOG.md | 44 +++++++++++++++++++++++++++++- Cargo.toml | 2 +- docs/src/tutorial/specification.md | 4 +-- src/lib.rs | 6 ++-- 4 files changed, 49 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3747143..c890f7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,47 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview ## [Unreleased] +# [1.0.0] - 2019-11-11 + +[API Documentation](https://docs.rs/async-std/1.0.0/async-std) + +This release marks the `1.0.0` release of async-std; a major milestone for our +development. This release itself mostly includes quality of life improvements +for all of modules, including more consistent API bounds for a lot of our +submodules. + +The biggest change is that we're now using the full semver range, +`major.minor.patch`, and any breaking changes to our "stable" APIs will require +an update of the `major` number. + +We're excited we've hit this milestone together with you all. Thank you! + +## Added + +- Added `Future::join` as "unstable", replacing `future::join!`. +- Added `Future::try_join` as "unstable", replacing `future::try_join!`. +- Enabled `stable` and `beta` channel testing on CI. +- Implemented `FromIterator` and `Extend` for `PathBuf`. +- Implemented `FromStream` for `PathBuf`. +- Loosened the trait bounds of `io::copy` on "unstable". + +## Changed + +- Added a `Sync` bound to `RwLock`, resolving a memory safety issue. +- Fixed a bug in `Stream::take_while` where it could continue after it should've + ended. +- Fixed a bug where our `attributes` Cargo feature wasn't working as intended. +- Improved documentation of `Stream::merge`, documenting ordering guarantees. +- Update doc imports in examples to prefer async-std's types. +- Various quality of life improvements to the `future` submodule. +- Various quality of life improvements to the `path` submodule. +- Various quality of life improvements to the `stream` submodule. + +## Removed + +- Removed `future::join!` in favor of `Future::join`. +- Removed `future::try_join!` in favor of `Future::try_join`. + # [0.99.12] - 2019-11-07 [API Documentation](https://docs.rs/async-std/0.99.12/async-std) @@ -388,7 +429,8 @@ task::blocking(async { - Initial beta release -[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.12...HEAD +[Unreleased]: https://github.com/async-rs/async-std/compare/v1.0.0...HEAD +[1.0.0]: https://github.com/async-rs/async-std/compare/v0.99.12...v1.0.0 [0.99.12]: https://github.com/async-rs/async-std/compare/v0.99.11...v0.99.12 [0.99.11]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11 [0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.9...v0.99.10 diff --git a/Cargo.toml b/Cargo.toml index bdbeafa..c1c6865 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-std" -version = "0.99.12" +version = "1.0.0" authors = [ "Stjepan Glavina ", "Yoshua Wuyts ", diff --git a/docs/src/tutorial/specification.md b/docs/src/tutorial/specification.md index 322c49f..c384ec2 100644 --- a/docs/src/tutorial/specification.md +++ b/docs/src/tutorial/specification.md @@ -50,6 +50,6 @@ Add the following lines to `Cargo.toml`: ```toml [dependencies] -futures-preview = { version = "0.3.0-alpha.19", features = [ "async-await" ] } -async-std = "0.99" +futures = "0.3.0" +async-std = "1.00" ``` diff --git a/src/lib.rs b/src/lib.rs index 04ed8fb..ddc6462 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -154,7 +154,7 @@ //! //! ```toml //! [dependencies.async-std] -//! version = "0.99" +//! version = "1.0.0" //! features = ["unstable"] //! ``` //! @@ -167,7 +167,7 @@ //! //! ```toml //! [dependencies.async-std] -//! version = "0.99" +//! version = "1.0.0" //! features = ["attributes"] //! ``` //! @@ -176,7 +176,7 @@ //! //! ```toml //! [dependencies.async-std] -//! version = "0.99" +//! version = "1.0.0" //! default-features = false //! features = ["std"] //! ``` From 9ad0cf9f800e33776e99b7d5a4ded4c36d62a1df Mon Sep 17 00:00:00 2001 From: CosciaDiPollo <57640603+CosciaDiPollo@users.noreply.github.com> Date: Mon, 11 Nov 2019 21:14:55 +0100 Subject: [PATCH 06/12] Correct a typo on the async-std version (#508) Correct a typo on the async-std version in the Cargo.toml file of the documentation. --- docs/src/tutorial/specification.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/tutorial/specification.md b/docs/src/tutorial/specification.md index c384ec2..307c79e 100644 --- a/docs/src/tutorial/specification.md +++ b/docs/src/tutorial/specification.md @@ -51,5 +51,5 @@ Add the following lines to `Cargo.toml`: ```toml [dependencies] futures = "0.3.0" -async-std = "1.00" +async-std = "1.0.0" ``` From 6677d52c2df87eab7f06400f9fc1099d725c96f8 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 12 Nov 2019 00:35:29 +0100 Subject: [PATCH 07/12] Improve thread creating algorithm in spawn_blocking --- src/task/spawn_blocking.rs | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index 6076d1b..a93a68b 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -68,16 +68,13 @@ static POOL: Lazy = Lazy::new(|| { fn start_thread() { SLEEPING.fetch_add(1, Ordering::SeqCst); - - // Generate a random duration of time between 1 second and 10 seconds. If the thread doesn't - // receive the next task in this duration of time, it will stop running. - let timeout = Duration::from_millis(1000 + u64::from(random(9_000))); + let timeout = Duration::from_secs(10); thread::Builder::new() .name("async-std/blocking".to_string()) .spawn(move || { loop { - let task = match POOL.receiver.recv_timeout(timeout) { + let mut task = match POOL.receiver.recv_timeout(timeout) { Ok(task) => task, Err(_) => { // Check whether this is the last sleeping thread. @@ -100,8 +97,22 @@ fn start_thread() { start_thread(); } - // Run the task. - abort_on_panic(|| task.run()); + loop { + // Run the task. + abort_on_panic(|| task.run()); + + // Try taking another task if there are any available. + task = match POOL.receiver.try_recv() { + Ok(task) => task, + Err(_) => break, + }; + } + + // If there is at least one sleeping thread, stop this thread instead of putting it + // to sleep. + if SLEEPING.load(Ordering::SeqCst) > 0 { + return; + } SLEEPING.fetch_add(1, Ordering::SeqCst); } From 21c5c48cb6cbf2cd6ab687f4e7938bcb5e35cea3 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 12 Nov 2019 00:37:54 +0100 Subject: [PATCH 08/12] Lower the timeout to 1 second --- src/task/spawn_blocking.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index a93a68b..6cce082 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -68,7 +68,7 @@ static POOL: Lazy = Lazy::new(|| { fn start_thread() { SLEEPING.fetch_add(1, Ordering::SeqCst); - let timeout = Duration::from_secs(10); + let timeout = Duration::from_secs(1); thread::Builder::new() .name("async-std/blocking".to_string()) From 1a50ffd144684199325cbd8a38fc0953373dc5ee Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 12 Nov 2019 00:38:22 +0100 Subject: [PATCH 09/12] Delete unused import --- src/task/spawn_blocking.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index 6cce082..578afa4 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -6,7 +6,7 @@ use crossbeam_channel::{unbounded, Receiver, Sender}; use once_cell::sync::Lazy; use crate::task::{JoinHandle, Task}; -use crate::utils::{abort_on_panic, random}; +use crate::utils::abort_on_panic; /// Spawns a blocking task. /// From b5b2b5a0a34c6a4ebaf078aa8b8f8eb02a6593ac Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 12 Nov 2019 00:48:26 +0100 Subject: [PATCH 10/12] 1.0.1 Signed-off-by: Yoshua Wuyts --- CHANGELOG.md | 14 +++++++++++++- Cargo.toml | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c890f7a..37c14c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview ## [Unreleased] +# [1.0.1] - 2019-11-12 + +We were seeing a regression in our fs performance, caused by too many +long-running tasks. This patch fixes that regression by being more proactive +about closing down idle threads. + +## Changes + +- Improved thread startup/shutdown algorithm in spawn_blocking. +- Fixed a typo in the tutorial. + # [1.0.0] - 2019-11-11 [API Documentation](https://docs.rs/async-std/1.0.0/async-std) @@ -429,7 +440,8 @@ task::blocking(async { - Initial beta release -[Unreleased]: https://github.com/async-rs/async-std/compare/v1.0.0...HEAD +[Unreleased]: https://github.com/async-rs/async-std/compare/v1.0.1...HEAD +[1.0.0]: https://github.com/async-rs/async-std/compare/v1.0.0...v1.0.1 [1.0.0]: https://github.com/async-rs/async-std/compare/v0.99.12...v1.0.0 [0.99.12]: https://github.com/async-rs/async-std/compare/v0.99.11...v0.99.12 [0.99.11]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11 diff --git a/Cargo.toml b/Cargo.toml index c1c6865..4d19a93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-std" -version = "1.0.0" +version = "1.0.1" authors = [ "Stjepan Glavina ", "Yoshua Wuyts ", From 9d7b2d6696ad369a5710700a7b4c3321fcd05cd8 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 12 Nov 2019 01:34:07 +0100 Subject: [PATCH 11/12] fix changelog Signed-off-by: Yoshua Wuyts --- CHANGELOG.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37c14c7..9d19fdd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,13 +9,15 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview # [1.0.1] - 2019-11-12 +[API Documentation](https://docs.rs/async-std/1.0.1/async-std) + We were seeing a regression in our fs performance, caused by too many long-running tasks. This patch fixes that regression by being more proactive about closing down idle threads. ## Changes -- Improved thread startup/shutdown algorithm in spawn_blocking. +- Improved thread startup/shutdown algorithm in `task::spawn_blocking`. - Fixed a typo in the tutorial. # [1.0.0] - 2019-11-11 @@ -441,7 +443,7 @@ task::blocking(async { - Initial beta release [Unreleased]: https://github.com/async-rs/async-std/compare/v1.0.1...HEAD -[1.0.0]: https://github.com/async-rs/async-std/compare/v1.0.0...v1.0.1 +[1.0.1]: https://github.com/async-rs/async-std/compare/v1.0.0...v1.0.1 [1.0.0]: https://github.com/async-rs/async-std/compare/v0.99.12...v1.0.0 [0.99.12]: https://github.com/async-rs/async-std/compare/v0.99.11...v0.99.12 [0.99.11]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11 From 0d5c7a217fac0ff611c4e0ad59d8480a4314c454 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 12 Nov 2019 02:09:38 +0100 Subject: [PATCH 12/12] stabilize task::yield_now Signed-off-by: Yoshua Wuyts --- src/task/mod.rs | 8 +++----- src/task/yield_now.rs | 2 -- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/src/task/mod.rs b/src/task/mod.rs index bcdea72..198e578 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -124,6 +124,9 @@ cfg_std! { #[doc(inline)] pub use async_macros::ready; + + pub use yield_now::yield_now; + mod yield_now; } cfg_default! { @@ -157,8 +160,3 @@ cfg_default! { #[cfg(not(any(feature = "unstable", test)))] pub(crate) use spawn_blocking::spawn_blocking; } - -cfg_unstable! { - pub use yield_now::yield_now; - mod yield_now; -} diff --git a/src/task/yield_now.rs b/src/task/yield_now.rs index 03f83e2..4030696 100644 --- a/src/task/yield_now.rs +++ b/src/task/yield_now.rs @@ -26,8 +26,6 @@ use crate::task::{Context, Poll}; /// # /// # }) /// ``` -#[cfg(feature = "unstable")] -#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[inline] pub async fn yield_now() { YieldNow(false).await