From f00d32ee7d5c6afd1abc9db5a0e60081af7296ea Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 15:30:52 +0900 Subject: [PATCH 01/55] Add TimeoutStream struct --- src/stream/stream/timeout.rs | 57 ++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 src/stream/stream/timeout.rs diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs new file mode 100644 index 0000000..7a8cf47 --- /dev/null +++ b/src/stream/stream/timeout.rs @@ -0,0 +1,57 @@ +use std::error::Error; +use std::fmt; +use std::pin::Pin; +use std::time::Duration; + +use futures_timer::Delay; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[derive(Debug)] +pub struct TimeoutStream { + stream: S, + delay: Delay, +} + +impl TimeoutStream { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_pinned!(delay: Delay); + + pub fn new(stream: S, dur: Duration) -> TimeoutStream { + let delay = Delay::new(dur); + + TimeoutStream { stream, delay } + } +} + +impl Stream for TimeoutStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.as_mut().stream().poll_next(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => match self.delay().poll(cx) { + Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError))), + Poll::Pending => Poll::Pending, + }, + } + } +} + +/// An error returned when a stream times out. +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[cfg(any(feature = "unstable", feature = "docs"))] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TimeoutError; + +impl Error for TimeoutError {} + +impl fmt::Display for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "stream has timed out".fmt(f) + } +} From 7a87dea085884eebbe7472be5b4658218b58cd32 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 15:31:07 +0900 Subject: [PATCH 02/55] feat: Add Stream::timeout --- src/stream/stream/mod.rs | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d582d70..c44917d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -104,13 +104,16 @@ cfg_if! { cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { mod merge; + mod timeout; use std::pin::Pin; + use std::time::Duration; use crate::future::Future; use crate::stream::FromStream; pub use merge::Merge; + pub use timeout::TimeoutStream; } } @@ -1044,6 +1047,40 @@ extension_trait! { Skip::new(self, n) } + #[doc=r#" + Await a stream or times out after a duration of time. + + If you want to await an I/O future consider using + [`io::timeout`](../io/fn.timeout.html) instead. + + # Examples + + ``` + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use std::time::Duration; + + use async_std::stream; + use async_std::prelude::*; + + let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1)); + + while let Some(v) = s.next().await { + assert_eq!(v, Ok(1)); + } + # + # Ok(()) }) } + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn timeout(self, dur: Duration) -> TimeoutStream + where + Self: Stream + Sized, + { + TimeoutStream::new(self, dur) + } + #[doc = r#" A combinator that applies a function as long as it returns successfully, producing a single, final value. Immediately returns the error when the function returns unsuccessfully. From 054f4fac740daaf5c2c6fcaccef9f374fc6c71ec Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 16:53:33 +0900 Subject: [PATCH 03/55] feat: Add future::delay --- src/future/delay.rs | 61 +++++++++++++++++++++++++++++++++++++++++++++ src/future/mod.rs | 2 ++ 2 files changed, 63 insertions(+) create mode 100644 src/future/delay.rs diff --git a/src/future/delay.rs b/src/future/delay.rs new file mode 100644 index 0000000..723c7c1 --- /dev/null +++ b/src/future/delay.rs @@ -0,0 +1,61 @@ +use std::pin::Pin; +use std::time::Duration; + +use futures_timer::Delay; + +use crate::future::Future; +use crate::task::{Context, Poll}; + +/// Creates a future that is delayed before it starts yielding items. +/// +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// use async_std::future; +/// use std::time::Duration; + +/// let a = future::delay(future::ready(1) ,Duration::from_millis(2000)); +/// dbg!(a.await); +/// # }) +/// ``` +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[cfg(any(feature = "unstable", feature = "docs"))] +pub fn delay(f: F, dur: Duration) -> DelayFuture +where + F: Future, +{ + DelayFuture::new(f, dur) +} + +#[doc(hidden)] +#[derive(Debug)] +pub struct DelayFuture { + future: F, + delay: Delay, +} + +impl DelayFuture { + pin_utils::unsafe_pinned!(future: F); + pin_utils::unsafe_pinned!(delay: Delay); + + pub fn new(future: F, dur: Duration) -> DelayFuture { + let delay = Delay::new(dur); + + DelayFuture { future, delay } + } +} + +impl Future for DelayFuture { + type Output = F::Output; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().delay().poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(_) => match self.future().poll(cx) { + Poll::Ready(v) => Poll::Ready(v), + Poll::Pending => Poll::Pending, + }, + } + } +} diff --git a/src/future/mod.rs b/src/future/mod.rs index cc3b7a5..d1a0f31 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -65,7 +65,9 @@ mod timeout; cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { mod into_future; + mod delay; pub use into_future::IntoFuture; + pub use delay::delay; } } From b251fc999a2faaeac6107af60f6dcf7ad077a5c3 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 19:18:05 +0900 Subject: [PATCH 04/55] Move delay method to FutureExt::delay --- src/future/future.rs | 22 ++++++++++++++++++++++ src/future/{ => future}/delay.rs | 22 ---------------------- src/future/mod.rs | 3 +-- 3 files changed, 23 insertions(+), 24 deletions(-) rename src/future/{ => future}/delay.rs (64%) diff --git a/src/future/future.rs b/src/future/future.rs index 556dc1a..38f3d70 100644 --- a/src/future/future.rs +++ b/src/future/future.rs @@ -105,6 +105,28 @@ extension_trait! { } pub trait FutureExt: std::future::Future { + /// Creates a future that is delayed before it starts yielding items. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// use async_std::future; + /// use std::time::Duration; + /// use async_std::future::FutureExt; + /// + /// let a = future::ready(1).delay(Duration::from_millis(2000)); + /// dbg!(a.await); + /// # }) + /// ``` + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + #[cfg(any(feature = "unstable", feature = "docs"))] + fn delay(self, dur: Duration) -> DelayFuture + where + Self: Future + Sized + { + DelayFuture::new(self, dur) + } } impl Future for Box { diff --git a/src/future/delay.rs b/src/future/future/delay.rs similarity index 64% rename from src/future/delay.rs rename to src/future/future/delay.rs index 723c7c1..319b4ff 100644 --- a/src/future/delay.rs +++ b/src/future/future/delay.rs @@ -6,28 +6,6 @@ use futures_timer::Delay; use crate::future::Future; use crate::task::{Context, Poll}; -/// Creates a future that is delayed before it starts yielding items. -/// -/// # Examples -/// -/// ``` -/// # async_std::task::block_on(async { -/// use async_std::future; -/// use std::time::Duration; - -/// let a = future::delay(future::ready(1) ,Duration::from_millis(2000)); -/// dbg!(a.await); -/// # }) -/// ``` -#[cfg_attr(feature = "docs", doc(cfg(unstable)))] -#[cfg(any(feature = "unstable", feature = "docs"))] -pub fn delay(f: F, dur: Duration) -> DelayFuture -where - F: Future, -{ - DelayFuture::new(f, dur) -} - #[doc(hidden)] #[derive(Debug)] pub struct DelayFuture { diff --git a/src/future/mod.rs b/src/future/mod.rs index d1a0f31..6bfd630 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -51,6 +51,7 @@ pub use async_macros::{select, try_select}; use cfg_if::cfg_if; pub use future::Future; +pub use future::FutureExt; pub use pending::pending; pub use poll_fn::poll_fn; pub use ready::ready; @@ -65,9 +66,7 @@ mod timeout; cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { mod into_future; - mod delay; pub use into_future::IntoFuture; - pub use delay::delay; } } From 358d2bc038f8894794ad71b6003938452859093b Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 19:20:07 +0900 Subject: [PATCH 05/55] Add import crate --- src/future/future.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/future/future.rs b/src/future/future.rs index 38f3d70..5254ac0 100644 --- a/src/future/future.rs +++ b/src/future/future.rs @@ -9,6 +9,15 @@ cfg_if::cfg_if! { } } +cfg_if::cfg_if! { + if #[cfg(any(feature = "unstable", feature = "docs"))] { + mod delay; + + use std::time::Duration; + use delay::DelayFuture; + } +} + extension_trait! { #[doc = r#" A future represents an asynchronous computation. From 10f32ca817551565d6602911a79ad6a9736fde95 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 19:49:07 +0900 Subject: [PATCH 06/55] Fix TimeoutError --- src/stream/stream/timeout.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 7a8cf47..f73ae87 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -35,7 +35,7 @@ impl Stream for TimeoutStream { Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => match self.delay().poll(cx) { - Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError))), + Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError { _private: () }))), Poll::Pending => Poll::Pending, }, } @@ -46,7 +46,9 @@ impl Stream for TimeoutStream { #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg(any(feature = "unstable", feature = "docs"))] #[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub struct TimeoutError; +pub struct TimeoutError { + _private: (), +} impl Error for TimeoutError {} From f1ed034600f17eb6ab2756e22e2fdc0f2944dd87 Mon Sep 17 00:00:00 2001 From: nasa Date: Wed, 16 Oct 2019 22:21:32 +0900 Subject: [PATCH 07/55] Update src/stream/stream/mod.rs Co-Authored-By: Yoshua Wuyts --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index c44917d..47b3f83 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -113,7 +113,7 @@ cfg_if! { use crate::stream::FromStream; pub use merge::Merge; - pub use timeout::TimeoutStream; + pub use timeout::{TimeoutError, TimeoutStream}; } } From 9d55fff81da0d4f76a9266df399fd8084406d4d2 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 22:38:28 +0900 Subject: [PATCH 08/55] fix export FutureExt --- src/future/future.rs | 2 +- src/future/mod.rs | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/future/future.rs b/src/future/future.rs index 5254ac0..4849772 100644 --- a/src/future/future.rs +++ b/src/future/future.rs @@ -120,9 +120,9 @@ extension_trait! { /// /// ``` /// # async_std::task::block_on(async { + /// use async_std::prelude::*; /// use async_std::future; /// use std::time::Duration; - /// use async_std::future::FutureExt; /// /// let a = future::ready(1).delay(Duration::from_millis(2000)); /// dbg!(a.await); diff --git a/src/future/mod.rs b/src/future/mod.rs index 6bfd630..cc3b7a5 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -51,7 +51,6 @@ pub use async_macros::{select, try_select}; use cfg_if::cfg_if; pub use future::Future; -pub use future::FutureExt; pub use pending::pending; pub use poll_fn::poll_fn; pub use ready::ready; From 53fa132d136cb3a47f6aa3c5ba7249c3d6b401a6 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 22:45:18 +0900 Subject: [PATCH 09/55] fix type Declaration --- src/future/future.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/future/future.rs b/src/future/future.rs index 4849772..abf7c18 100644 --- a/src/future/future.rs +++ b/src/future/future.rs @@ -130,7 +130,7 @@ extension_trait! { /// ``` #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg(any(feature = "unstable", feature = "docs"))] - fn delay(self, dur: Duration) -> DelayFuture + fn delay(self, dur: Duration) -> impl Future [DelayFuture] where Self: Future + Sized { From c3f6f969c51de4717ae4ef6639bb7f1ad916a6e8 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 22:56:17 +0900 Subject: [PATCH 10/55] fix: Rename TimeoutStream to Timeout --- src/stream/stream/mod.rs | 6 +++--- src/stream/stream/timeout.rs | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index c44917d..a881a7a 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -113,7 +113,7 @@ cfg_if! { use crate::stream::FromStream; pub use merge::Merge; - pub use timeout::TimeoutStream; + pub use timeout::Timeout; } } @@ -1074,11 +1074,11 @@ extension_trait! { "#] #[cfg(any(feature = "unstable", feature = "docs"))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn timeout(self, dur: Duration) -> TimeoutStream + fn timeout(self, dur: Duration) -> Timeout where Self: Stream + Sized, { - TimeoutStream::new(self, dur) + Timeout::new(self, dur) } #[doc = r#" diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index f73ae87..042dc12 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -11,23 +11,23 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[derive(Debug)] -pub struct TimeoutStream { +pub struct Timeout { stream: S, delay: Delay, } -impl TimeoutStream { +impl Timeout { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_pinned!(delay: Delay); - pub fn new(stream: S, dur: Duration) -> TimeoutStream { + pub fn new(stream: S, dur: Duration) -> Timeout { let delay = Delay::new(dur); - TimeoutStream { stream, delay } + Timeout { stream, delay } } } -impl Stream for TimeoutStream { +impl Stream for Timeout { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { From 0a4073449beac09b0bb2492a3754b57171147d5f Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 16 Oct 2019 22:56:48 +0900 Subject: [PATCH 11/55] doc: Add Stream::Timeout doc --- src/stream/stream/timeout.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 042dc12..7e0270e 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -9,7 +9,7 @@ use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; -#[doc(hidden)] +/// A stream with timeout time set #[derive(Debug)] pub struct Timeout { stream: S, From ec98b41c85bb8c2891755fc6b86366fb85413a5e Mon Sep 17 00:00:00 2001 From: k-nasa Date: Thu, 17 Oct 2019 23:56:01 +0900 Subject: [PATCH 12/55] feat: Add FlattenCompat struct --- src/lib.rs | 1 + src/stream/stream/flatten.rs | 24 ++++++++++++++++++++++++ src/stream/stream/mod.rs | 1 + 3 files changed, 26 insertions(+) create mode 100644 src/stream/stream/flatten.rs diff --git a/src/lib.rs b/src/lib.rs index e138c87..afbad31 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,7 @@ #![doc(test(attr(allow(unused_extern_crates, unused_variables))))] #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] #![recursion_limit = "1024"] +#![feature(associated_type_bounds)] use cfg_if::cfg_if; diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs new file mode 100644 index 0000000..fffda4d --- /dev/null +++ b/src/stream/stream/flatten.rs @@ -0,0 +1,24 @@ +use std::pin::Pin; + +use crate::stream::{IntoStream, Stream}; +use crate::task::{Context, Poll}; + +/// Real logic of both `Flatten` and `FlatMap` which simply delegate to +/// this type. +#[derive(Clone, Debug)] +struct FlattenCompat { + stream: S, + frontiter: Option, +} +impl FlattenCompat { + pin_utils::unsafe_unpinned!(stream: S); + pin_utils::unsafe_unpinned!(frontiter: Option); + + /// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`. + pub fn new(stream: S) -> FlattenCompat { + FlattenCompat { + stream, + frontiter: None, + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 764dc97..7047b03 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -106,6 +106,7 @@ cfg_if! { cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { mod merge; + mod flatten; use std::pin::Pin; From 8bef2e9e95e8a7c4a90bb334afdbc7d2f67122b0 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Thu, 17 Oct 2019 21:28:38 +0200 Subject: [PATCH 13/55] Don't flush files if they weren't written to --- src/fs/file.rs | 35 ++++++++++++++++++++--------------- src/fs/open_options.rs | 5 ++++- 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index 3129e96..745a584 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -66,6 +66,23 @@ pub struct File { } impl File { + /// Creates an async file handle. + pub(crate) fn new(file: std::fs::File, is_flushed: bool) -> File { + let file = Arc::new(file); + + File { + file: file.clone(), + lock: Lock::new(State { + file, + mode: Mode::Idle, + cache: Vec::new(), + is_flushed, + last_read_err: None, + last_write_err: None, + }), + } + } + /// Opens a file in read-only mode. /// /// See the [`OpenOptions::open`] function for more options. @@ -96,7 +113,7 @@ impl File { pub async fn open>(path: P) -> io::Result { let path = path.as_ref().to_owned(); let file = blocking::spawn(move || std::fs::File::open(&path)).await?; - Ok(file.into()) + Ok(File::new(file, true)) } /// Opens a file in write-only mode. @@ -131,7 +148,7 @@ impl File { pub async fn create>(path: P) -> io::Result { let path = path.as_ref().to_owned(); let file = blocking::spawn(move || std::fs::File::create(&path)).await?; - Ok(file.into()) + Ok(File::new(file, true)) } /// Synchronizes OS-internal buffered contents and metadata to disk. @@ -383,19 +400,7 @@ impl Seek for &File { impl From for File { fn from(file: std::fs::File) -> File { - let file = Arc::new(file); - - File { - file: file.clone(), - lock: Lock::new(State { - file, - mode: Mode::Idle, - cache: Vec::new(), - is_flushed: false, - last_read_err: None, - last_write_err: None, - }), - } + File::new(file, false) } } diff --git a/src/fs/open_options.rs b/src/fs/open_options.rs index a2eb9e7..7f70073 100644 --- a/src/fs/open_options.rs +++ b/src/fs/open_options.rs @@ -284,7 +284,10 @@ impl OpenOptions { pub fn open>(&self, path: P) -> impl Future> { let path = path.as_ref().to_owned(); let options = self.0.clone(); - async move { blocking::spawn(move || options.open(path).map(|f| f.into())).await } + async move { + let file = blocking::spawn(move || options.open(path)).await?; + Ok(File::new(file, true)) + } } } From bb1416420d047638d2cc8be21920206c3053870a Mon Sep 17 00:00:00 2001 From: k-nasa Date: Thu, 17 Oct 2019 23:56:32 +0900 Subject: [PATCH 14/55] feat: Add Stream trait for FlattenCompat --- src/stream/stream/flatten.rs | 49 ++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index fffda4d..7265d17 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -22,3 +22,52 @@ impl FlattenCompat { } } } + +impl Stream for FlattenCompat +where + S: Stream> + std::marker::Unpin, + U: Stream + std::marker::Unpin, +{ + type Item = U::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + if let Some(ref mut inner) = self.as_mut().frontiter() { + if let item @ Some(_) = futures_core::ready!(Pin::new(inner).poll_next(cx)) { + return Poll::Ready(item); + } + } + + match futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)) { + None => return Poll::Ready(None), + Some(inner) => *self.as_mut().frontiter() = Some(inner.into_stream()), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::FlattenCompat; + + use crate::prelude::*; + use crate::task; + + use std::collections::VecDeque; + + #[test] + fn test_poll_next() -> std::io::Result<()> { + let inner1: VecDeque = vec![1, 2, 3].into_iter().collect(); + let inner2: VecDeque = vec![4, 5, 6].into_iter().collect(); + + let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); + + task::block_on(async move { + let flat = FlattenCompat::new(s); + let v: Vec = flat.collect().await; + + assert_eq!(v, vec![1, 2, 3, 4, 5, 6]); + Ok(()) + }) + } +} From 2dee2897509f0ac5c1a8e26d768bfdf3cbe54099 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 18 Oct 2019 10:43:21 +0900 Subject: [PATCH 15/55] Add FlatMap struct --- src/stream/stream/flatten.rs | 37 ++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 7265d17..e922e94 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,8 +1,44 @@ use std::pin::Pin; +use crate::prelude::*; +use crate::stream::stream::map::Map; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; +#[allow(missing_debug_implementations)] +pub struct FlatMap { + inner: FlattenCompat, U>, +} + +impl FlatMap +where + S: Stream, + U: IntoStream, + F: FnMut(S::Item) -> U, +{ + pin_utils::unsafe_pinned!(inner: FlattenCompat, U>); + + pub fn new(stream: S, f: F) -> FlatMap { + FlatMap { + inner: FlattenCompat::new(stream.map(f)), + } + } +} + +impl Stream for FlatMap +where + S: Stream> + std::marker::Unpin, + S::Item: std::marker::Unpin, + U: Stream + std::marker::Unpin, + F: FnMut(S::Item) -> U + std::marker::Unpin, +{ + type Item = U::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.as_mut().inner().poll_next(cx) + } +} + /// Real logic of both `Flatten` and `FlatMap` which simply delegate to /// this type. #[derive(Clone, Debug)] @@ -10,6 +46,7 @@ struct FlattenCompat { stream: S, frontiter: Option, } + impl FlattenCompat { pin_utils::unsafe_unpinned!(stream: S); pin_utils::unsafe_unpinned!(frontiter: Option); From 2187a2a31d5f9c72e8ee32d2beae3c129bd8e80f Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 18 Oct 2019 10:43:36 +0900 Subject: [PATCH 16/55] feat: Add Stream::flat_map --- src/stream/stream/mod.rs | 41 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 7047b03..2f1a89f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -30,6 +30,7 @@ mod filter; mod filter_map; mod find; mod find_map; +mod flatten; mod fold; mod for_each; mod fuse; @@ -77,6 +78,7 @@ use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; +pub use flatten::FlatMap; pub use fuse::Fuse; pub use inspect::Inspect; pub use map::Map; @@ -93,6 +95,7 @@ use std::marker::PhantomData; use cfg_if::cfg_if; +use crate::stream::IntoStream; use crate::utils::extension_trait; cfg_if! { @@ -106,7 +109,6 @@ cfg_if! { cfg_if! { if #[cfg(any(feature = "unstable", feature = "docs"))] { mod merge; - mod flatten; use std::pin::Pin; @@ -496,7 +498,6 @@ extension_trait! { # # }) } ``` - "#] fn last( self, @@ -570,6 +571,42 @@ extension_trait! { Filter::new(self, predicate) } + #[doc= r#" + Creates an stream that works like map, but flattens nested structure. + + # Examples + + Basic usage: + + ``` + # async_std::task::block_on(async { + + use std::collections::VecDeque; + use async_std::prelude::*; + use async_std::stream::IntoStream; + + let inner1: VecDeque = vec![1,2,3].into_iter().collect(); + let inner2: VecDeque = vec![4,5,6].into_iter().collect(); + + let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); + + let flat= s.flat_map(|s| s.into_stream() ); + let v: Vec = flat.collect().await; + + assert_eq!(v, vec![1,2,3,4,5,6]); + + # }); + ``` + "#] + fn flat_map(self, f: F) -> FlatMap + where + Self: Sized, + U: IntoStream, + F: FnMut(Self::Item) -> U, + { + FlatMap::new(self, f) + } + #[doc = r#" Both filters and maps a stream. From cd862083a5db3d04c4ff2cbbaa3eb96e50f04ccd Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 18 Oct 2019 12:19:38 +0900 Subject: [PATCH 17/55] Add Flatten struct --- src/stream/stream/flatten.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index e922e94..06e9ec2 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -39,6 +39,21 @@ where } } +pub struct Flatten +where + S::Item: IntoStream, +{ + inner: FlattenCompat::IntoStream>, +} + +impl> Flatten { + pin_utils::unsafe_pinned!(inner: FlattenCompat::IntoStream>); + + pub fn new(stream: S) -> Flatten { + Flatten { inner: FlattenCompat::new(stream) } + } +} + /// Real logic of both `Flatten` and `FlatMap` which simply delegate to /// this type. #[derive(Clone, Debug)] From 8138afbfadd0b7f2640a28d9c0af0f59a59e3967 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 18 Oct 2019 12:20:02 +0900 Subject: [PATCH 18/55] feat: Add Stream trait for Flatten --- src/stream/stream/flatten.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 06e9ec2..5c76167 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -54,6 +54,19 @@ impl> Flatten { } } +impl Stream for Flatten +where + S: Stream> + std::marker::Unpin, + U: Stream + std::marker::Unpin, +{ + type Item = U::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.as_mut().inner().poll_next(cx) + } +} + + /// Real logic of both `Flatten` and `FlatMap` which simply delegate to /// this type. #[derive(Clone, Debug)] From 176359afae6adf0f4202fde938f0857c9759c9d9 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 18 Oct 2019 12:20:28 +0900 Subject: [PATCH 19/55] Add Stream::flatten --- src/stream/stream/mod.rs | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2f1a89f..63d1d8c 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -78,7 +78,7 @@ use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; -pub use flatten::FlatMap; +pub use flatten::{FlatMap, Flatten}; pub use fuse::Fuse; pub use inspect::Inspect; pub use map::Map; @@ -590,8 +590,7 @@ extension_trait! { let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); - let flat= s.flat_map(|s| s.into_stream() ); - let v: Vec = flat.collect().await; + let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await; assert_eq!(v, vec![1,2,3,4,5,6]); @@ -607,6 +606,37 @@ extension_trait! { FlatMap::new(self, f) } + #[doc = r#" + Creates an stream that flattens nested structure. + + # Examples + + Basic usage: + + ``` + # async_std::task::block_on(async { + + use std::collections::VecDeque; + use async_std::prelude::*; + + let inner1: VecDeque = vec![1,2,3].into_iter().collect(); + let inner2: VecDeque = vec![4,5,6].into_iter().collect(); + let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); + + let v: Vec<_> = s.flatten().collect().await; + + assert_eq!(v, vec![1,2,3,4,5,6]); + + # }); + "#] + fn flatten(self) -> Flatten + where + Self: Sized, + Self::Item: IntoStream, + { + Flatten::new(self) + } + #[doc = r#" Both filters and maps a stream. From 410d16eaf6f5933bc5516a709c27134f227111e4 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 18 Oct 2019 13:20:44 +0900 Subject: [PATCH 20/55] Add docs + To unstable feature --- src/stream/stream/flatten.rs | 11 +++++++++++ src/stream/stream/mod.rs | 11 +++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 5c76167..b970087 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -5,6 +5,11 @@ use crate::stream::stream::map::Map; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; +/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its +/// documentation for more. +/// +/// [`flat_map`]: trait.Stream.html#method.flat_map +/// [`Stream`]: trait.Stream.html #[allow(missing_debug_implementations)] pub struct FlatMap { inner: FlattenCompat, U>, @@ -39,6 +44,12 @@ where } } +/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its +/// documentation for more. +/// +/// [`flatten`]: trait.Stream.html#method.flatten +/// [`Stream`]: trait.Stream.html +#[allow(missing_debug_implementations)] pub struct Flatten where S::Item: IntoStream, diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 6802de9..8ad8cdc 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -30,7 +30,6 @@ mod filter; mod filter_map; mod find; mod find_map; -mod flatten; mod fold; mod for_each; mod fuse; @@ -78,7 +77,6 @@ use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; -pub use flatten::{FlatMap, Flatten}; pub use fuse::Fuse; pub use inspect::Inspect; pub use map::Map; @@ -93,17 +91,18 @@ pub use zip::Zip; use std::cmp::Ordering; use std::marker::PhantomData; -use crate::stream::IntoStream; - cfg_unstable! { use std::pin::Pin; use crate::future::Future; use crate::stream::FromStream; + use crate::stream::into_stream::IntoStream; pub use merge::Merge; + pub use flatten::{FlatMap, Flatten}; mod merge; + mod flatten; } extension_trait! { @@ -589,6 +588,8 @@ extension_trait! { # }); ``` "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn flat_map(self, f: F) -> FlatMap where Self: Sized, @@ -621,6 +622,8 @@ extension_trait! { # }); "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn flatten(self) -> Flatten where Self: Sized, From 0d4a907335beec5472f03c28c204e039fcc5e6fb Mon Sep 17 00:00:00 2001 From: Sunjay Varma Date: Sun, 20 Oct 2019 19:18:37 -0400 Subject: [PATCH 21/55] Added Extend + FromStream for PathBuf --- src/path/pathbuf.rs | 49 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/src/path/pathbuf.rs b/src/path/pathbuf.rs index 64744e1..5d81f1c 100644 --- a/src/path/pathbuf.rs +++ b/src/path/pathbuf.rs @@ -1,6 +1,12 @@ use std::ffi::{OsStr, OsString}; +#[cfg(feature = "unstable")] +use std::pin::Pin; use crate::path::Path; +#[cfg(feature = "unstable")] +use crate::prelude::*; +#[cfg(feature = "unstable")] +use crate::stream::{Extend, FromStream, IntoStream}; /// This struct is an async version of [`std::path::PathBuf`]. /// @@ -206,7 +212,7 @@ impl From for PathBuf { impl Into for PathBuf { fn into(self) -> std::path::PathBuf { - self.inner.into() + self.inner } } @@ -233,3 +239,44 @@ impl AsRef for PathBuf { self.inner.as_ref() } } + +#[cfg(feature = "unstable")] +impl> Extend

for PathBuf { + fn stream_extend<'a, S: IntoStream>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> + where + P: 'a, + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + //TODO: This can be added back in once this issue is resolved: + // https://github.com/rust-lang/rust/issues/58234 + //self.reserve(stream.size_hint().0); + + Box::pin(stream.for_each(move |item| self.push(item.as_ref()))) + } +} + +#[cfg(feature = "unstable")] +impl<'b, P: AsRef + 'b> FromStream

for PathBuf { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + let mut out = Self::new(); + out.stream_extend(stream).await; + out + }) + } +} From 88558eae6ebffaf77e999551b9f12b1cc883946d Mon Sep 17 00:00:00 2001 From: Andre Zanellato Date: Mon, 21 Oct 2019 19:47:14 -0300 Subject: [PATCH 22/55] Typos and sentence structure fixes --- docs/src/concepts/futures.md | 10 +++------- docs/src/concepts/tasks.md | 2 +- docs/src/overview/async-std.md | 2 +- docs/src/overview/stability-guarantees.md | 2 +- 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/docs/src/concepts/futures.md b/docs/src/concepts/futures.md index 67db720..7d9cc63 100644 --- a/docs/src/concepts/futures.md +++ b/docs/src/concepts/futures.md @@ -24,11 +24,7 @@ To sum up: Rust gives us the ability to safely abstract over important propertie ## An easy view of computation -While computation is a subject to write a whole [book](https://computationbook.com/) about, a very simplified view suffices for us: - -- computation is a sequence of composable operations -- they can branch based on a decision -- they either run to succession and yield a result, or they can yield an error +While computation is a subject to write a whole [book](https://computationbook.com/) about, a very simplified view suffices for us: A sequence of composable operations which can branch based on a decision, run to succession and yield a result or yield an error ## Deferring computation @@ -136,11 +132,11 @@ When executing 2 or more of these functions at the same time, our runtime system ## Conclusion -Working from values, we searched for something that expresses *working towards a value available sometime later*. From there, we talked about the concept of polling. +Working from values, we searched for something that expresses *working towards a value available later*. From there, we talked about the concept of polling. A `Future` is any data type that does not represent a value, but the ability to *produce a value at some point in the future*. Implementations of this are very varied and detailed depending on use-case, but the interface is simple. -Next, we will introduce you to `tasks`, which we need to actually *run* Futures. +Next, we will introduce you to `tasks`, which we will use to actually *run* Futures. [^1]: Two parties reading while it is guaranteed that no one is writing is always safe. diff --git a/docs/src/concepts/tasks.md b/docs/src/concepts/tasks.md index d4037a3..2142cac 100644 --- a/docs/src/concepts/tasks.md +++ b/docs/src/concepts/tasks.md @@ -80,7 +80,7 @@ Tasks in `async_std` are one of the core abstractions. Much like Rust's `thread` ## Blocking -`Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and by itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this: +`Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and of itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this: ```rust,edition2018 # extern crate async_std; diff --git a/docs/src/overview/async-std.md b/docs/src/overview/async-std.md index 2b59ffb..0086599 100644 --- a/docs/src/overview/async-std.md +++ b/docs/src/overview/async-std.md @@ -4,4 +4,4 @@ `async-std` provides an interface to all important primitives: filesystem operations, network operations and concurrency basics like timers. It also exposes a `task` in a model similar to the `thread` module found in the Rust standard lib. But it does not only include I/O primitives, but also `async/await` compatible versions of primitives like `Mutex`. -[organization]: https://github.com/async-rs/async-std +[organization]: https://github.com/async-rs diff --git a/docs/src/overview/stability-guarantees.md b/docs/src/overview/stability-guarantees.md index 84bb68d..d84b64a 100644 --- a/docs/src/overview/stability-guarantees.md +++ b/docs/src/overview/stability-guarantees.md @@ -31,7 +31,7 @@ In general, this crate will be conservative with respect to the minimum supporte ## Security fixes -Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give at least _3 month_ of ahead notice. +Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give at least _3 months_ of ahead. ## Credits From faad4c8c263181ade0fddf105fb875bb411959f8 Mon Sep 17 00:00:00 2001 From: Andre Zanellato Date: Mon, 21 Oct 2019 19:50:57 -0300 Subject: [PATCH 23/55] Sentence structure on notice --- docs/src/overview/stability-guarantees.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/overview/stability-guarantees.md b/docs/src/overview/stability-guarantees.md index d84b64a..8c14e20 100644 --- a/docs/src/overview/stability-guarantees.md +++ b/docs/src/overview/stability-guarantees.md @@ -31,7 +31,7 @@ In general, this crate will be conservative with respect to the minimum supporte ## Security fixes -Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give at least _3 months_ of ahead. +Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give a notice at least _3 months_ ahead. ## Credits From e26eb7a719015f3d8fa6804d2159ec0845093226 Mon Sep 17 00:00:00 2001 From: Kyle Tomsic Date: Sun, 20 Oct 2019 10:05:55 -0400 Subject: [PATCH 24/55] Add `Stream::sum()` and `Stream::product()` implementations These are the stream equivalents to `std::iter::Iterator::sum()` and `std::iter::Iterator::product()`. Note that this changeset tweaks the `Stream::Sum` and `Stream::Product` traits a little: rather than returning a generic future `F`, they return a pinned, boxed, `Future` trait object now. This is in line with other traits that return a future, e.g. `FromStream`. --- src/option/mod.rs | 5 +++ src/option/product.rs | 66 +++++++++++++++++++++++++++++ src/option/sum.rs | 63 ++++++++++++++++++++++++++++ src/result/mod.rs | 5 +++ src/result/product.rs | 64 ++++++++++++++++++++++++++++ src/result/sum.rs | 64 ++++++++++++++++++++++++++++ src/stream/product.rs | 64 ++++++++++++++++++++++++++-- src/stream/stream/mod.rs | 90 ++++++++++++++++++++++++++++++++++++++++ src/stream/sum.rs | 62 +++++++++++++++++++++++++-- 9 files changed, 476 insertions(+), 7 deletions(-) create mode 100644 src/option/product.rs create mode 100644 src/option/sum.rs create mode 100644 src/result/product.rs create mode 100644 src/result/sum.rs diff --git a/src/option/mod.rs b/src/option/mod.rs index afb29ad..76f096b 100644 --- a/src/option/mod.rs +++ b/src/option/mod.rs @@ -7,3 +7,8 @@ mod from_stream; #[doc(inline)] pub use std::option::Option; + +cfg_unstable! { + mod product; + mod sum; +} diff --git a/src/option/product.rs b/src/option/product.rs new file mode 100644 index 0000000..8b66bc6 --- /dev/null +++ b/src/option/product.rs @@ -0,0 +1,66 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Stream, Product}; + +impl Product> for Option +where + T: Product, +{ + #[doc = r#" + Takes each element in the `Stream`: if it is a `None`, no further + elements are taken, and the `None` is returned. Should no `None` occur, + the product of all elements is returned. + + # Examples + + This multiplies every integer in a vector, rejecting the product if a negative element is + encountered: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect(); + let prod: Option = v.map(|x| + if x < 0 { + None + } else { + Some(x) + }).product().await; + assert_eq!(prod, Some(8)); + # + # }) } + ``` + "#] + fn product<'a, S>(stream: S) -> Pin> + 'a>> + where S: Stream> + 'a + { + Box::pin(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_none = false; + let out = >::product(stream + .scan((), |_, elem| { + match elem { + Some(elem) => Some(elem), + None => { + found_none = true; + // Stop processing the stream on error + None + } + } + })).await; + + if found_none { + None + } else { + Some(out) + } + }) + } +} diff --git a/src/option/sum.rs b/src/option/sum.rs new file mode 100644 index 0000000..25dc920 --- /dev/null +++ b/src/option/sum.rs @@ -0,0 +1,63 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Stream, Sum}; + +impl Sum> for Option +where + T: Sum, +{ + #[doc = r#" + Takes each element in the `Iterator`: if it is a `None`, no further + elements are taken, and the `None` is returned. Should no `None` occur, + the sum of all elements is returned. + + # Examples + + This sums up the position of the character 'a' in a vector of strings, + if a word did not have the character 'a' the operation returns `None`: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let words: VecDeque<_> = vec!["have", "a", "great", "day"] + .into_iter() + .collect(); + let total: Option = words.map(|w| w.find('a')).sum().await; + assert_eq!(total, Some(5)); + # + # }) } + ``` + "#] + fn sum<'a, S>(stream: S) -> Pin> + 'a>> + where S: Stream> + 'a + { + Box::pin(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_none = false; + let out = >::sum(stream + .scan((), |_, elem| { + match elem { + Some(elem) => Some(elem), + None => { + found_none = true; + // Stop processing the stream on error + None + } + } + })).await; + + if found_none { + None + } else { + Some(out) + } + }) + } +} diff --git a/src/result/mod.rs b/src/result/mod.rs index 908f9c4..cae0ebd 100644 --- a/src/result/mod.rs +++ b/src/result/mod.rs @@ -7,3 +7,8 @@ mod from_stream; #[doc(inline)] pub use std::result::Result; + +cfg_unstable! { + mod product; + mod sum; +} diff --git a/src/result/product.rs b/src/result/product.rs new file mode 100644 index 0000000..17afa94 --- /dev/null +++ b/src/result/product.rs @@ -0,0 +1,64 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Stream, Product}; + +impl Product> for Result +where + T: Product, +{ + #[doc = r#" + Takes each element in the `Stream`: if it is an `Err`, no further + elements are taken, and the `Err` is returned. Should no `Err` occur, + the product of all elements is returned. + + # Examples + + This multiplies every integer in a vector, rejecting the product if a negative element is + encountered: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect(); + let res: Result = v.map(|x| + if x < 0 { + Err("Negative element found") + } else { + Ok(x) + }).product().await; + assert_eq!(res, Ok(8)); + # + # }) } + ``` + "#] + fn product<'a, S>(stream: S) -> Pin> + 'a>> + where S: Stream> + 'a + { + Box::pin(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_error = None; + let out = >::product(stream + .scan((), |_, elem| { + match elem { + Ok(elem) => Some(elem), + Err(err) => { + found_error = Some(err); + // Stop processing the stream on error + None + } + } + })).await; + match found_error { + Some(err) => Err(err), + None => Ok(out) + } + }) + } +} diff --git a/src/result/sum.rs b/src/result/sum.rs new file mode 100644 index 0000000..caca4f6 --- /dev/null +++ b/src/result/sum.rs @@ -0,0 +1,64 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Stream, Sum}; + +impl Sum> for Result +where + T: Sum, +{ + #[doc = r#" + Takes each element in the `Stream`: if it is an `Err`, no further + elements are taken, and the `Err` is returned. Should no `Err` occur, + the sum of all elements is returned. + + # Examples + + This sums up every integer in a vector, rejecting the sum if a negative + element is encountered: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let v: VecDeque<_> = vec![1, 2].into_iter().collect(); + let res: Result = v.map(|x| + if x < 0 { + Err("Negative element found") + } else { + Ok(x) + }).sum().await; + assert_eq!(res, Ok(3)); + # + # }) } + ``` + "#] + fn sum<'a, S>(stream: S) -> Pin> + 'a>> + where S: Stream> + 'a + { + Box::pin(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_error = None; + let out = >::sum(stream + .scan((), |_, elem| { + match elem { + Ok(elem) => Some(elem), + Err(err) => { + found_error = Some(err); + // Stop processing the stream on error + None + } + } + })).await; + match found_error { + Some(err) => Err(err), + None => Ok(out) + } + }) + } +} diff --git a/src/stream/product.rs b/src/stream/product.rs index 5799990..71b14c7 100644 --- a/src/stream/product.rs +++ b/src/stream/product.rs @@ -1,7 +1,9 @@ +use std::pin::Pin; + use crate::future::Future; use crate::stream::Stream; -/// Trait to represent types that can be created by productming up a stream. +/// Trait to represent types that can be created by multiplying the elements of a stream. /// /// This trait is used to implement the [`product`] method on streams. Types which /// implement the trait can be generated by the [`product`] method. Like @@ -16,8 +18,62 @@ use crate::stream::Stream; pub trait Product: Sized { /// Method which takes a stream and generates `Self` from the elements by /// multiplying the items. - fn product(stream: S) -> F + fn product<'a, S>(stream: S) -> Pin + 'a>> where - S: Stream, - F: Future; + S: Stream + 'a; +} + +use core::ops::Mul; +use core::num::Wrapping; +use crate::stream::stream::StreamExt; + +macro_rules! integer_product { + (@impls $one: expr, $($a:ty)*) => ($( + impl Product for $a { + fn product<'a, S>(stream: S) -> Pin+ 'a>> + where + S: Stream + 'a, + { + Box::pin(async move { stream.fold($one, Mul::mul).await } ) + } + } + impl<'a> Product<&'a $a> for $a { + fn product<'b, S>(stream: S) -> Pin + 'b>> + where + S: Stream + 'b, + { + Box::pin(async move { stream.fold($one, Mul::mul).await } ) + } + } + )*); + ($($a:ty)*) => ( + integer_product!(@impls 1, $($a)*); + integer_product!(@impls Wrapping(1), $(Wrapping<$a>)*); + ); } + +macro_rules! float_product { + ($($a:ty)*) => ($( + impl Product for $a { + fn product<'a, S>(stream: S) -> Pin+ 'a>> + where S: Stream + 'a, + { + Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } ) + } + } + impl<'a> Product<&'a $a> for $a { + fn product<'b, S>(stream: S) -> Pin+ 'b>> + where S: Stream + 'b, + { + Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } ) + } + } + )*); + ($($a:ty)*) => ( + float_product!($($a)*); + float_product!($(Wrapping<$a>)*); + ); +} + +integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize } +float_product!{ f32 f64 } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1..f2d3c6e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -96,6 +96,7 @@ cfg_unstable! { use crate::future::Future; use crate::stream::FromStream; + use crate::stream::{Product, Sum}; pub use merge::Merge; @@ -1536,6 +1537,95 @@ extension_trait! { { LtFuture::new(self, other) } + + #[doc = r#" + Sums the elements of an iterator. + + Takes each element, adds them together, and returns the result. + + An empty iterator returns the zero value of the type. + + # Panics + + When calling `sum()` and a primitive integer type is being returned, this + method will panic if the computation overflows and debug assertions are + enabled. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect(); + let sum: u8 = s.sum().await; + + assert_eq!(sum, 10); + # + # }) } + ``` + "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn sum<'a, S>( + self, + ) -> impl Future + 'a [Pin + 'a>>] + where + Self: Sized + Stream + 'a, + S: Sum, + { + Sum::sum(self) + } + + #[doc = r#" + Iterates over the entire iterator, multiplying all the elements + + An empty iterator returns the one value of the type. + + # Panics + + When calling `product()` and a primitive integer type is being returned, + method will panic if the computation overflows and debug assertions are + enabled. + + # Examples + + This example calculates the factorial of n (i.e. the product of the numbers from 1 to + n, inclusive): + + ``` + # fn main() { async_std::task::block_on(async { + # + async fn factorial(n: u32) -> u32 { + use std::collections::VecDeque; + use async_std::prelude::*; + + let s: VecDeque<_> = (1..=n).collect(); + s.product().await + } + + assert_eq!(factorial(0).await, 1); + assert_eq!(factorial(1).await, 1); + assert_eq!(factorial(5).await, 120); + # + # }) } + ``` + "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn product<'a, P>( + self, + ) -> impl Future + 'a [Pin + 'a>>] + where + Self: Sized + Stream + 'a, + P: Product, + { + Product::product(self) + } } impl Stream for Box { diff --git a/src/stream/sum.rs b/src/stream/sum.rs index a87ade1..dadbc34 100644 --- a/src/stream/sum.rs +++ b/src/stream/sum.rs @@ -1,3 +1,5 @@ +use std::pin::Pin; + use crate::future::Future; use crate::stream::Stream; @@ -16,8 +18,62 @@ use crate::stream::Stream; pub trait Sum: Sized { /// Method which takes a stream and generates `Self` from the elements by /// "summing up" the items. - fn sum(stream: S) -> F + fn sum<'a, S>(stream: S) -> Pin + 'a>> where - S: Stream, - F: Future; + S: Stream + 'a; +} + +use core::ops::Add; +use core::num::Wrapping; +use crate::stream::stream::StreamExt; + +macro_rules! integer_sum { + (@impls $zero: expr, $($a:ty)*) => ($( + impl Sum for $a { + fn sum<'a, S>(stream: S) -> Pin+ 'a>> + where + S: Stream + 'a, + { + Box::pin(async move { stream.fold($zero, Add::add).await } ) + } + } + impl<'a> Sum<&'a $a> for $a { + fn sum<'b, S>(stream: S) -> Pin + 'b>> + where + S: Stream + 'b, + { + Box::pin(async move { stream.fold($zero, Add::add).await } ) + } + } + )*); + ($($a:ty)*) => ( + integer_sum!(@impls 0, $($a)*); + integer_sum!(@impls Wrapping(0), $(Wrapping<$a>)*); + ); } + +macro_rules! float_sum { + ($($a:ty)*) => ($( + impl Sum for $a { + fn sum<'a, S>(stream: S) -> Pin + 'a>> + where S: Stream + 'a, + { + Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } ) + } + } + impl<'a> Sum<&'a $a> for $a { + fn sum<'b, S>(stream: S) -> Pin + 'b>> + where S: Stream + 'b, + { + Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } ) + } + } + )*); + ($($a:ty)*) => ( + float_sum!(@impls 0.0, $($a)*); + float_sum!(@impls Wrapping(0.0), $(Wrapping<$a>)*); + ); +} + +integer_sum!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize } +float_sum!{ f32 f64 } From d97b3dfdf3a159608023aea7a197b522514d817a Mon Sep 17 00:00:00 2001 From: k-nasa Date: Thu, 24 Oct 2019 08:29:05 +0900 Subject: [PATCH 25/55] fix: Remove Pin API related unsafe code --- src/future/future/delay.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/future/future/delay.rs b/src/future/future/delay.rs index 319b4ff..53a4d75 100644 --- a/src/future/future/delay.rs +++ b/src/future/future/delay.rs @@ -2,21 +2,23 @@ use std::pin::Pin; use std::time::Duration; use futures_timer::Delay; +use pin_project_lite::pin_project; use crate::future::Future; use crate::task::{Context, Poll}; +pin_project! { #[doc(hidden)] #[derive(Debug)] -pub struct DelayFuture { - future: F, - delay: Delay, + pub struct DelayFuture { + #[pin] + future: F, + #[pin] + delay: Delay, + } } impl DelayFuture { - pin_utils::unsafe_pinned!(future: F); - pin_utils::unsafe_pinned!(delay: Delay); - pub fn new(future: F, dur: Duration) -> DelayFuture { let delay = Delay::new(dur); @@ -27,10 +29,12 @@ impl DelayFuture { impl Future for DelayFuture { type Output = F::Output; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.as_mut().delay().poll(cx) { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + match this.delay.poll(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(_) => match self.future().poll(cx) { + Poll::Ready(_) => match this.future.poll(cx) { Poll::Ready(v) => Poll::Ready(v), Poll::Pending => Poll::Pending, }, From feeb3c10df6725b35f67bc9c8cc73e4a68e46e4d Mon Sep 17 00:00:00 2001 From: k-nasa Date: Thu, 24 Oct 2019 08:41:01 +0900 Subject: [PATCH 26/55] fix: Remove Pin API related unsafe code --- src/stream/stream/timeout.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index 7e0270e..3c14811 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -4,22 +4,24 @@ use std::pin::Pin; use std::time::Duration; use futures_timer::Delay; +use pin_project_lite::pin_project; use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; -/// A stream with timeout time set -#[derive(Debug)] -pub struct Timeout { - stream: S, - delay: Delay, +pin_project! { + /// A stream with timeout time set + #[derive(Debug)] + pub struct Timeout { + #[pin] + stream: S, + #[pin] + delay: Delay, + } } impl Timeout { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_pinned!(delay: Delay); - pub fn new(stream: S, dur: Duration) -> Timeout { let delay = Delay::new(dur); @@ -30,11 +32,13 @@ impl Timeout { impl Stream for Timeout { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match self.as_mut().stream().poll_next(cx) { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + match this.stream.poll_next(cx) { Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => match self.delay().poll(cx) { + Poll::Pending => match this.delay.poll(cx) { Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError { _private: () }))), Poll::Pending => Poll::Pending, }, From 271b6f4a1c6a8096ba68682430a984001de19d56 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 25 Oct 2019 23:58:08 +0900 Subject: [PATCH 27/55] fix: Using pin_project! --- src/stream/stream/flatten.rs | 85 +++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 41 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index b970087..d44fab0 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,18 +1,22 @@ use std::pin::Pin; +use pin_project_lite::pin_project; use crate::prelude::*; use crate::stream::stream::map::Map; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; -/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its -/// documentation for more. -/// -/// [`flat_map`]: trait.Stream.html#method.flat_map -/// [`Stream`]: trait.Stream.html -#[allow(missing_debug_implementations)] -pub struct FlatMap { - inner: FlattenCompat, U>, +pin_project! { + /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`flat_map`]: trait.Stream.html#method.flat_map + /// [`Stream`]: trait.Stream.html + #[allow(missing_debug_implementations)] + pub struct FlatMap { + #[pin] + inner: FlattenCompat, U>, + } } impl FlatMap @@ -21,7 +25,6 @@ where U: IntoStream, F: FnMut(S::Item) -> U, { - pin_utils::unsafe_pinned!(inner: FlattenCompat, U>); pub fn new(stream: S, f: F) -> FlatMap { FlatMap { @@ -33,33 +36,33 @@ where impl Stream for FlatMap where S: Stream> + std::marker::Unpin, - S::Item: std::marker::Unpin, U: Stream + std::marker::Unpin, - F: FnMut(S::Item) -> U + std::marker::Unpin, + F: FnMut(S::Item) -> U, { type Item = U::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().inner().poll_next(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) } } -/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its -/// documentation for more. -/// -/// [`flatten`]: trait.Stream.html#method.flatten -/// [`Stream`]: trait.Stream.html -#[allow(missing_debug_implementations)] -pub struct Flatten -where - S::Item: IntoStream, -{ - inner: FlattenCompat::IntoStream>, +pin_project!{ + /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`flatten`]: trait.Stream.html#method.flatten + /// [`Stream`]: trait.Stream.html + #[allow(missing_debug_implementations)] + pub struct Flatten + where + S::Item: IntoStream, + { + #[pin] + inner: FlattenCompat::IntoStream>, + } } impl> Flatten { - pin_utils::unsafe_pinned!(inner: FlattenCompat::IntoStream>); - pub fn new(stream: S) -> Flatten { Flatten { inner: FlattenCompat::new(stream) } } @@ -72,24 +75,23 @@ where { type Item = U::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.as_mut().inner().poll_next(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) } } -/// Real logic of both `Flatten` and `FlatMap` which simply delegate to -/// this type. -#[derive(Clone, Debug)] -struct FlattenCompat { - stream: S, - frontiter: Option, +pin_project! { + /// Real logic of both `Flatten` and `FlatMap` which simply delegate to + /// this type. + #[derive(Clone, Debug)] + struct FlattenCompat { + stream: S, + frontiter: Option, + } } impl FlattenCompat { - pin_utils::unsafe_unpinned!(stream: S); - pin_utils::unsafe_unpinned!(frontiter: Option); - /// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`. pub fn new(stream: S) -> FlattenCompat { FlattenCompat { @@ -106,17 +108,18 @@ where { type Item = U::Item; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); loop { - if let Some(ref mut inner) = self.as_mut().frontiter() { + if let Some(inner) = this.frontiter { if let item @ Some(_) = futures_core::ready!(Pin::new(inner).poll_next(cx)) { return Poll::Ready(item); } } - match futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx)) { + match futures_core::ready!(Pin::new(&mut this.stream).poll_next(cx)) { None => return Poll::Ready(None), - Some(inner) => *self.as_mut().frontiter() = Some(inner.into_stream()), + Some(inner) => *this.frontiter = Some(inner.into_stream()), } } } From 00e7e58bf3194c8a9e0b912061cdf0741a1834be Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 26 Oct 2019 00:26:53 +0900 Subject: [PATCH 28/55] fix type def --- src/stream/stream/flatten.rs | 23 ++++++++++------------- src/stream/stream/mod.rs | 12 ++++-------- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index d44fab0..bc1bee3 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -13,27 +13,27 @@ pin_project! { /// [`flat_map`]: trait.Stream.html#method.flat_map /// [`Stream`]: trait.Stream.html #[allow(missing_debug_implementations)] - pub struct FlatMap { + pub struct FlatMap { #[pin] - inner: FlattenCompat, U>, + inner: FlattenCompat, U>, } } -impl FlatMap +impl FlatMap where S: Stream, U: IntoStream, F: FnMut(S::Item) -> U, { - pub fn new(stream: S, f: F) -> FlatMap { + pub fn new(stream: S, f: F) -> FlatMap { FlatMap { inner: FlattenCompat::new(stream.map(f)), } } } -impl Stream for FlatMap +impl Stream for FlatMap where S: Stream> + std::marker::Unpin, U: Stream + std::marker::Unpin, @@ -53,22 +53,19 @@ pin_project!{ /// [`flatten`]: trait.Stream.html#method.flatten /// [`Stream`]: trait.Stream.html #[allow(missing_debug_implementations)] - pub struct Flatten - where - S::Item: IntoStream, - { + pub struct Flatten { #[pin] - inner: FlattenCompat::IntoStream>, + inner: FlattenCompat } } -impl> Flatten { - pub fn new(stream: S) -> Flatten { +impl> Flatten { + pub fn new(stream: S) -> Flatten { Flatten { inner: FlattenCompat::new(stream) } } } -impl Stream for Flatten +impl Stream for Flatten::IntoStream> where S: Stream> + std::marker::Unpin, U: Stream + std::marker::Unpin, diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8ad8cdc..c994a8f 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -30,6 +30,7 @@ mod filter; mod filter_map; mod find; mod find_map; +mod flatten; mod fold; mod for_each; mod fuse; @@ -77,6 +78,7 @@ use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; +pub use flatten::{FlatMap, Flatten}; pub use fuse::Fuse; pub use inspect::Inspect; pub use map::Map; @@ -99,10 +101,8 @@ cfg_unstable! { use crate::stream::into_stream::IntoStream; pub use merge::Merge; - pub use flatten::{FlatMap, Flatten}; mod merge; - mod flatten; } extension_trait! { @@ -588,9 +588,7 @@ extension_trait! { # }); ``` "#] - #[cfg(feature = "unstable")] - #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn flat_map(self, f: F) -> FlatMap + fn flat_map(self, f: F) -> FlatMap where Self: Sized, U: IntoStream, @@ -622,9 +620,7 @@ extension_trait! { # }); "#] - #[cfg(feature = "unstable")] - #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn flatten(self) -> Flatten + fn flatten(self) -> Flatten where Self: Sized, Self::Item: IntoStream, From 001368d3dfa4f9f0962c52c357a776e9e66e8780 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 26 Oct 2019 00:29:10 +0900 Subject: [PATCH 29/55] $cargo fmt --- src/stream/stream/flatten.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index bc1bee3..fc3592e 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,5 +1,5 @@ -use std::pin::Pin; use pin_project_lite::pin_project; +use std::pin::Pin; use crate::prelude::*; use crate::stream::stream::map::Map; @@ -25,7 +25,6 @@ where U: IntoStream, F: FnMut(S::Item) -> U, { - pub fn new(stream: S, f: F) -> FlatMap { FlatMap { inner: FlattenCompat::new(stream.map(f)), @@ -46,7 +45,7 @@ where } } -pin_project!{ +pin_project! { /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its /// documentation for more. /// @@ -61,7 +60,9 @@ pin_project!{ impl> Flatten { pub fn new(stream: S) -> Flatten { - Flatten { inner: FlattenCompat::new(stream) } + Flatten { + inner: FlattenCompat::new(stream), + } } } @@ -77,7 +78,6 @@ where } } - pin_project! { /// Real logic of both `Flatten` and `FlatMap` which simply delegate to /// this type. From 0c5abee284eae2136ab1b795d086e69761b8a913 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 26 Oct 2019 00:36:04 +0900 Subject: [PATCH 30/55] to unstable stream::flat_map, stream::flatten --- src/stream/stream/mod.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index c994a8f..596dc41 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -30,7 +30,6 @@ mod filter; mod filter_map; mod find; mod find_map; -mod flatten; mod fold; mod for_each; mod fuse; @@ -78,7 +77,6 @@ use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; -pub use flatten::{FlatMap, Flatten}; pub use fuse::Fuse; pub use inspect::Inspect; pub use map::Map; @@ -101,8 +99,10 @@ cfg_unstable! { use crate::stream::into_stream::IntoStream; pub use merge::Merge; + pub use flatten::{FlatMap, Flatten}; mod merge; + mod flatten; } extension_trait! { @@ -588,6 +588,8 @@ extension_trait! { # }); ``` "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn flat_map(self, f: F) -> FlatMap where Self: Sized, @@ -620,6 +622,8 @@ extension_trait! { # }); "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn flatten(self) -> Flatten where Self: Sized, From b66ffa670eb91356f49cc0ceb56a3fd95df3489a Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 26 Oct 2019 00:42:27 +0900 Subject: [PATCH 31/55] update recursion_limit --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 7f04e08..1e33c8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,7 +46,7 @@ #![doc(test(attr(deny(rust_2018_idioms, warnings))))] #![doc(test(attr(allow(unused_extern_crates, unused_variables))))] #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] -#![recursion_limit = "1024"] +#![recursion_limit = "2048"] #![feature(associated_type_bounds)] #[macro_use] From 7ce721f562e17db4eb7953ec5612c5edff8675d6 Mon Sep 17 00:00:00 2001 From: nasa Date: Sat, 26 Oct 2019 01:42:19 +0900 Subject: [PATCH 32/55] Update src/lib.rs Co-Authored-By: Taiki Endo --- src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 1e33c8f..22481b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,7 +47,6 @@ #![doc(test(attr(allow(unused_extern_crates, unused_variables))))] #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] #![recursion_limit = "2048"] -#![feature(associated_type_bounds)] #[macro_use] mod utils; From b7b5df13aa7843f42fe01c31fa8758727646dbd8 Mon Sep 17 00:00:00 2001 From: nasa Date: Sat, 26 Oct 2019 01:42:39 +0900 Subject: [PATCH 33/55] Update src/stream/stream/flatten.rs Co-Authored-By: Taiki Endo --- src/stream/stream/flatten.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index fc3592e..5627733 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -108,7 +108,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { - if let Some(inner) = this.frontiter { + if let Some(inner) = this.frontiter.as_mut().as_pin_mut() { if let item @ Some(_) = futures_core::ready!(Pin::new(inner).poll_next(cx)) { return Poll::Ready(item); } From 6168952d6f262ee04d4fc06533db2665e207edb3 Mon Sep 17 00:00:00 2001 From: nasa Date: Sat, 26 Oct 2019 01:42:57 +0900 Subject: [PATCH 34/55] Update src/stream/stream/flatten.rs Co-Authored-By: Taiki Endo --- src/stream/stream/flatten.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 5627733..b244f03 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -109,7 +109,7 @@ where let mut this = self.project(); loop { if let Some(inner) = this.frontiter.as_mut().as_pin_mut() { - if let item @ Some(_) = futures_core::ready!(Pin::new(inner).poll_next(cx)) { + if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) { return Poll::Ready(item); } } From bf3508ffb254db07a187293e8cdef7df651bce6e Mon Sep 17 00:00:00 2001 From: nasa Date: Sat, 26 Oct 2019 01:43:07 +0900 Subject: [PATCH 35/55] Update src/stream/stream/flatten.rs Co-Authored-By: Taiki Endo --- src/stream/stream/flatten.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index b244f03..1b21746 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -114,7 +114,7 @@ where } } - match futures_core::ready!(Pin::new(&mut this.stream).poll_next(cx)) { + match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { None => return Poll::Ready(None), Some(inner) => *this.frontiter = Some(inner.into_stream()), } From 8932cecec75b78058de240d2a4c72d90b07759b3 Mon Sep 17 00:00:00 2001 From: nasa Date: Sat, 26 Oct 2019 01:43:14 +0900 Subject: [PATCH 36/55] Update src/stream/stream/flatten.rs Co-Authored-By: Taiki Endo --- src/stream/stream/flatten.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 1b21746..5933085 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -116,7 +116,7 @@ where match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { None => return Poll::Ready(None), - Some(inner) => *this.frontiter = Some(inner.into_stream()), + Some(inner) => this.frontiter.set(Some(inner.into_stream())), } } } From 61b7a09c70d78329131dedc528baea5d6cda7942 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 26 Oct 2019 01:44:48 +0900 Subject: [PATCH 37/55] Fix type declaration --- src/stream/stream/flatten.rs | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 5933085..3bf6fe8 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -34,8 +34,9 @@ where impl Stream for FlatMap where - S: Stream> + std::marker::Unpin, - U: Stream + std::marker::Unpin, + S: Stream, + S::Item: IntoStream, + U: Stream, F: FnMut(S::Item) -> U, { type Item = U::Item; @@ -58,7 +59,11 @@ pin_project! { } } -impl> Flatten { +impl Flatten +where + S: Stream, + S::Item: IntoStream, +{ pub fn new(stream: S) -> Flatten { Flatten { inner: FlattenCompat::new(stream), @@ -68,8 +73,9 @@ impl> Flatten { impl Stream for Flatten::IntoStream> where - S: Stream> + std::marker::Unpin, - U: Stream + std::marker::Unpin, + S: Stream, + S::Item: IntoStream, + U: Stream, { type Item = U::Item; @@ -83,7 +89,9 @@ pin_project! { /// this type. #[derive(Clone, Debug)] struct FlattenCompat { + #[pin] stream: S, + #[pin] frontiter: Option, } } @@ -100,8 +108,9 @@ impl FlattenCompat { impl Stream for FlattenCompat where - S: Stream> + std::marker::Unpin, - U: Stream + std::marker::Unpin, + S: Stream, + S::Item: IntoStream, + U: Stream, { type Item = U::Item; From 81e3cab00db42c2084fba9734a53e7074048bfd9 Mon Sep 17 00:00:00 2001 From: nasa Date: Sat, 26 Oct 2019 06:14:57 +0900 Subject: [PATCH 38/55] Change homepage link (#389) --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7aaba68..ad88730 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ authors = [ edition = "2018" license = "Apache-2.0/MIT" repository = "https://github.com/async-rs/async-std" -homepage = "https://github.com/async-rs/async-std" +homepage = "https://async.rs" documentation = "https://docs.rs/async-std" description = "Async version of the Rust standard library" keywords = ["async", "await", "future", "std", "task"] From 610c66e774d1f70292f73111810feab1e990a055 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 27 Oct 2019 02:22:26 +0900 Subject: [PATCH 39/55] Remove usage of actions-rs/clippy-check --- .github/workflows/ci.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d47eabc..c79630a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -95,6 +95,5 @@ jobs: toolchain: ${{ steps.component.outputs.toolchain }} override: true - run: rustup component add clippy - - uses: actions-rs/clippy-check@v1 - with: - token: ${{ secrets.GITHUB_TOKEN }} + - name: clippy + run: cargo clippy --all --features unstable From 6549b66ad2e9ab0a6ea89dc1855b844de357f704 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 27 Oct 2019 03:28:20 +0900 Subject: [PATCH 40/55] run clippy check on beta & address clippy warnings --- .github/workflows/ci.yml | 23 +++++++---------------- src/lib.rs | 1 + src/stream/exact_size_stream.rs | 1 + src/task/task.rs | 2 ++ 4 files changed, 11 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c79630a..653834a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,12 +7,13 @@ on: - staging - trying +env: + RUSTFLAGS: -Dwarnings + jobs: build_and_test: name: Build and test runs-on: ${{ matrix.os }} - env: - RUSTFLAGS: -Dwarnings strategy: matrix: os: [ubuntu-latest, windows-latest, macOS-latest] @@ -48,8 +49,6 @@ jobs: check_fmt_and_docs: name: Checking fmt and docs runs-on: ubuntu-latest - env: - RUSTFLAGS: -Dwarnings steps: - uses: actions/checkout@master @@ -81,19 +80,11 @@ jobs: clippy_check: name: Clippy check runs-on: ubuntu-latest - # TODO: There is a lot of warnings - # env: - # RUSTFLAGS: -Dwarnings steps: - uses: actions/checkout@v1 - - id: component - uses: actions-rs/components-nightly@v1 - with: - component: clippy - - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ steps.component.outputs.toolchain }} - override: true - - run: rustup component add clippy + - name: Install rust + run: rustup update beta && rustup default beta + - name: Install clippy + run: rustup component add clippy - name: clippy run: cargo clippy --all --features unstable diff --git a/src/lib.rs b/src/lib.rs index 7f888a1..ad5aa8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ #![cfg_attr(feature = "docs", feature(doc_cfg))] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![allow(clippy::mutex_atomic, clippy::module_inception)] #![doc(test(attr(deny(rust_2018_idioms, warnings))))] #![doc(test(attr(allow(unused_extern_crates, unused_variables))))] #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] diff --git a/src/stream/exact_size_stream.rs b/src/stream/exact_size_stream.rs index 32a1eb3..8b6ba97 100644 --- a/src/stream/exact_size_stream.rs +++ b/src/stream/exact_size_stream.rs @@ -75,6 +75,7 @@ pub use crate::stream::Stream; /// assert_eq!(5, counter.len()); /// # }); /// ``` +#[allow(clippy::len_without_is_empty)] // ExactSizeIterator::is_empty is unstable #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait ExactSizeStream: Stream { diff --git a/src/task/task.rs b/src/task/task.rs index ca3cac1..3d8e108 100644 --- a/src/task/task.rs +++ b/src/task/task.rs @@ -167,6 +167,7 @@ impl Tag { } pub fn task(&self) -> &Task { + #[allow(clippy::transmute_ptr_to_ptr)] unsafe { let raw = self.raw_metadata.load(Ordering::Acquire); @@ -189,6 +190,7 @@ impl Tag { } } + #[allow(clippy::transmute_ptr_to_ptr)] mem::transmute::<&AtomicUsize, &Option>(&self.raw_metadata) .as_ref() .unwrap() From 13a08b0d54f5c983883ce3efca60278d38a8b6d7 Mon Sep 17 00:00:00 2001 From: nasa Date: Sun, 27 Oct 2019 12:35:14 +0900 Subject: [PATCH 41/55] Narrow the disclosure range of FlatMap::new Co-Authored-By: Taiki Endo --- src/stream/stream/flatten.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 3bf6fe8..3efb352 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -25,7 +25,7 @@ where U: IntoStream, F: FnMut(S::Item) -> U, { - pub fn new(stream: S, f: F) -> FlatMap { + pub(super) fn new(stream: S, f: F) -> FlatMap { FlatMap { inner: FlattenCompat::new(stream.map(f)), } From 37f14b0195e63838f3e4485ba14bb8e54cc59789 Mon Sep 17 00:00:00 2001 From: nasa Date: Sun, 27 Oct 2019 12:35:32 +0900 Subject: [PATCH 42/55] Narrow the disclosure range of Flatten::new Co-Authored-By: Taiki Endo --- src/stream/stream/flatten.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 3efb352..d81e79f 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -64,7 +64,7 @@ where S: Stream, S::Item: IntoStream, { - pub fn new(stream: S) -> Flatten { + pub(super) fn new(stream: S) -> Flatten { Flatten { inner: FlattenCompat::new(stream), } From a42ae2f3d925499ea8732a017baf7f68f146cf27 Mon Sep 17 00:00:00 2001 From: nasa Date: Sun, 27 Oct 2019 12:35:51 +0900 Subject: [PATCH 43/55] Narrow the disclosure range of FlattenCompat::new Co-Authored-By: Taiki Endo --- src/stream/stream/flatten.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index d81e79f..2533f37 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -98,7 +98,7 @@ pin_project! { impl FlattenCompat { /// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`. - pub fn new(stream: S) -> FlattenCompat { + fn new(stream: S) -> FlattenCompat { FlattenCompat { stream, frontiter: None, From c9d958d30974daec880edb90af1f5a85191df8e7 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sun, 27 Oct 2019 18:46:24 +0900 Subject: [PATCH 44/55] $cargo fix -Z unstable-options --clippy --features unstable --- examples/a-chat/main.rs | 2 +- examples/a-chat/server.rs | 2 +- tests/rwlock.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/a-chat/main.rs b/examples/a-chat/main.rs index ced7cac..89e5e2b 100644 --- a/examples/a-chat/main.rs +++ b/examples/a-chat/main.rs @@ -8,6 +8,6 @@ fn main() -> Result<()> { match (args.nth(1).as_ref().map(String::as_str), args.next()) { (Some("client"), None) => client::main(), (Some("server"), None) => server::main(), - _ => Err("Usage: a-chat [client|server]")?, + _ => Err("Usage: a-chat [client|server]".into()), } } diff --git a/examples/a-chat/server.rs b/examples/a-chat/server.rs index 77ebfd1..e049a49 100644 --- a/examples/a-chat/server.rs +++ b/examples/a-chat/server.rs @@ -45,7 +45,7 @@ async fn connection_loop(mut broker: Sender, stream: TcpStream) -> Result let mut lines = reader.lines(); let name = match lines.next().await { - None => Err("peer disconnected immediately")?, + None => return Err("peer disconnected immediately".into()), Some(line) => line?, }; let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); diff --git a/tests/rwlock.rs b/tests/rwlock.rs index ff25e86..370dcb9 100644 --- a/tests/rwlock.rs +++ b/tests/rwlock.rs @@ -13,7 +13,7 @@ use futures::channel::mpsc; /// Generates a random number in `0..n`. pub fn random(n: u32) -> u32 { thread_local! { - static RNG: Cell> = Cell::new(Wrapping(1406868647)); + static RNG: Cell> = Cell::new(Wrapping(1_406_868_647)); } RNG.with(|rng| { From 7c293d37f7ffc2127f9dbc48c668a3eaa8204eba Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sun, 27 Oct 2019 18:47:09 +0900 Subject: [PATCH 45/55] fix clippy::comparison_chain --- src/stream/interval.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/stream/interval.rs b/src/stream/interval.rs index 2f7fe9e..016f71a 100644 --- a/src/stream/interval.rs +++ b/src/stream/interval.rs @@ -111,6 +111,7 @@ fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant { #[cfg(test)] mod test { use super::next_interval; + use std::cmp::Ordering; use std::time::{Duration, Instant}; struct Timeline(Instant); @@ -134,12 +135,10 @@ mod test { // The math around Instant/Duration isn't 100% precise due to rounding // errors, see #249 for more info fn almost_eq(a: Instant, b: Instant) -> bool { - if a == b { - true - } else if a > b { - a - b < Duration::from_millis(1) - } else { - b - a < Duration::from_millis(1) + match a.cmp(&b) { + Ordering::Equal => true, + Ordering::Greater => a - b < Duration::from_millis(1), + Ordering::Less => b - a < Duration::from_millis(1), } } From 7fe2a1bbce08b1741aa47432d50e25032a0e041c Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sun, 27 Oct 2019 18:47:22 +0900 Subject: [PATCH 46/55] fix clippy::cognitive_complexity --- tests/buf_writer.rs | 1 + tests/channel.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/buf_writer.rs b/tests/buf_writer.rs index cb2368a..5df90e0 100644 --- a/tests/buf_writer.rs +++ b/tests/buf_writer.rs @@ -4,6 +4,7 @@ use async_std::task; #[test] fn test_buffered_writer() { + #![allow(clippy::cognitive_complexity)] task::block_on(async { let inner = Vec::new(); let mut writer = BufWriter::with_capacity(2, inner); diff --git a/tests/channel.rs b/tests/channel.rs index 91622b0..0c40f5a 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -37,6 +37,7 @@ fn capacity() { #[test] fn len_empty_full() { + #![allow(clippy::cognitive_complexity)] task::block_on(async { let (s, r) = channel(2); From fe49f2618fc30c6bf7dd001935053687f6c18c3d Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sun, 27 Oct 2019 20:34:44 +0900 Subject: [PATCH 47/55] fix clippy::redundant_clone --- src/task/block_on.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/task/block_on.rs b/src/task/block_on.rs index b0adc38..c10303d 100644 --- a/src/task/block_on.rs +++ b/src/task/block_on.rs @@ -154,6 +154,7 @@ where fn vtable() -> &'static RawWakerVTable { unsafe fn clone_raw(ptr: *const ()) -> RawWaker { + #![allow(clippy::redundant_clone)] let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); mem::forget(arc.clone()); RawWaker::new(ptr, vtable()) From a3a740c14ae75464c0170c326dc6eeddf63a331c Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 27 Oct 2019 22:21:27 +0100 Subject: [PATCH 48/55] backlink all docs Signed-off-by: Yoshua Wuyts --- src/stream/empty.rs | 5 +++++ src/stream/from_fn.rs | 3 ++- src/stream/interval.rs | 4 ++++ src/stream/once.rs | 3 ++- src/stream/repeat.rs | 5 +++-- src/stream/repeat_with.rs | 3 ++- src/stream/stream/chain.rs | 6 ++++++ src/stream/stream/filter.rs | 6 ++++++ src/stream/stream/fuse.rs | 6 ++++++ src/stream/stream/inspect.rs | 6 ++++++ src/stream/stream/merge.rs | 6 ++++-- src/stream/stream/scan.rs | 6 ++++++ src/stream/stream/skip.rs | 6 ++++++ src/stream/stream/skip_while.rs | 6 ++++++ src/stream/stream/step_by.rs | 6 ++++++ src/stream/stream/take.rs | 6 ++++++ src/stream/stream/take_while.rs | 6 ++++++ src/stream/stream/zip.rs | 6 ++++++ 18 files changed, 88 insertions(+), 7 deletions(-) diff --git a/src/stream/empty.rs b/src/stream/empty.rs index ceb91fe..4909070 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -6,6 +6,11 @@ use crate::task::{Context, Poll}; /// Creates a stream that doesn't yield any items. /// +/// This `struct` is created by the [`empty`] function. See its +/// documentation for more. +/// +/// [`empty`]: fn.empty.html +/// /// # Examples /// /// ``` diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 7fee892..5260d87 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -10,7 +10,8 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that yields elements by calling a closure. /// - /// This stream is constructed by [`from_fn`] function. + /// This stream is created by the [`from_fn`] function. See its + /// documentation for more. /// /// [`from_fn`]: fn.from_fn.html #[derive(Debug)] diff --git a/src/stream/interval.rs b/src/stream/interval.rs index 2f7fe9e..c3b8294 100644 --- a/src/stream/interval.rs +++ b/src/stream/interval.rs @@ -52,6 +52,10 @@ pub fn interval(dur: Duration) -> Interval { /// A stream representing notifications at fixed interval /// +/// This stream is created by the [`interval`] function. See its +/// documentation for more. +/// +/// [`interval`]: fn.interval.html #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] diff --git a/src/stream/once.rs b/src/stream/once.rs index ae90d63..d993c16 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -29,7 +29,8 @@ pub fn once(t: T) -> Once { pin_project! { /// A stream that yields a single item. /// - /// This stream is constructed by the [`once`] function. + /// This stream is created by the [`once`] function. See its + /// documentation for more. /// /// [`once`]: fn.once.html #[derive(Debug)] diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index 75fd697..abccb43 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -29,9 +29,10 @@ where /// A stream that yields the same item repeatedly. /// -/// This stream is constructed by the [`repeat`] function. +/// This stream is created by the [`repeat`] function. See its +/// documentation for more. /// -/// [`repeat`]: fn.repeat.html +/// [`repeat`]: fn.once.html #[derive(Debug)] pub struct Repeat { item: T, diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs index 15d4aa1..de53bc9 100644 --- a/src/stream/repeat_with.rs +++ b/src/stream/repeat_with.rs @@ -10,7 +10,8 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that repeats elements of type `T` endlessly by applying a provided closure. /// - /// This stream is constructed by the [`repeat_with`] function. + /// This stream is created by the [`repeat_with`] function. See its + /// documentation for more. /// /// [`repeat_with`]: fn.repeat_with.html #[derive(Debug)] diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index df31615..5e0eeb4 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// Chains two streams one after another. + /// + /// This `struct` is created by the [`chain`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`chain`]: trait.Stream.html#method.chain + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Chain { #[pin] diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index eb4153f..a2562e7 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream to filter elements of another stream with a predicate. + /// + /// This `struct` is created by the [`filter`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`filter`]: trait.Stream.html#method.filter + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Filter { #[pin] diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 1162970..39af9cb 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A `Stream` that is permanently closed once a single call to `poll` results in /// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`. + /// + /// This `struct` is created by the [`fuse`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`fuse`]: trait.Stream.html#method.fuse + /// [`Stream`]: trait.Stream.html #[derive(Clone, Debug)] pub struct Fuse { #[pin] diff --git a/src/stream/stream/inspect.rs b/src/stream/stream/inspect.rs index 5de60fb..ba60b0c 100644 --- a/src/stream/stream/inspect.rs +++ b/src/stream/stream/inspect.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that does something with each element of another stream. + /// + /// This `struct` is created by the [`inspect`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`inspect`]: trait.Stream.html#method.inspect + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Inspect { #[pin] diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index 3ccc223..d926ec4 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -7,9 +7,11 @@ use pin_project_lite::pin_project; pin_project! { /// A stream that merges two other streams into a single stream. /// - /// This stream is returned by [`Stream::merge`]. + /// This `struct` is created by the [`merge`] method on [`Stream`]. See its + /// documentation for more. /// - /// [`Stream::merge`]: trait.Stream.html#method.merge + /// [`merge`]: trait.Stream.html#method.merge + /// [`Stream`]: trait.Stream.html #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index c4771d8..385edf8 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -7,6 +7,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream to maintain state while polling another stream. + /// + /// This `struct` is created by the [`scan`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`scan`]: trait.Stream.html#method.scan + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Scan { #[pin] diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index 6562b99..cc2ba90 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -7,6 +7,12 @@ use crate::stream::Stream; pin_project! { /// A stream to skip first n elements of another stream. + /// + /// This `struct` is created by the [`skip`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`skip`]: trait.Stream.html#method.skip + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Skip { #[pin] diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 0499df2..6435d81 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream to skip elements of another stream based on a predicate. + /// + /// This `struct` is created by the [`skip_while`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`skip_while`]: trait.Stream.html#method.skip_while + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct SkipWhile { #[pin] diff --git a/src/stream/stream/step_by.rs b/src/stream/stream/step_by.rs index ab9e45b..1302098 100644 --- a/src/stream/stream/step_by.rs +++ b/src/stream/stream/step_by.rs @@ -7,6 +7,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that steps a given amount of elements of another stream. + /// + /// This `struct` is created by the [`step_by`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`step_by`]: trait.Stream.html#method.step_by + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct StepBy { #[pin] diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 835bc44..e680b42 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -7,6 +7,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that yields the first `n` items of another stream. + /// + /// This `struct` is created by the [`take`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`take`]: trait.Stream.html#method.take + /// [`Stream`]: trait.Stream.html #[derive(Clone, Debug)] pub struct Take { #[pin] diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index bf89458..35978b4 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that yields elements based on a predicate. + /// + /// This `struct` is created by the [`take_while`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`take_while`]: trait.Stream.html#method.take_while + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct TakeWhile { #[pin] diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 9b7c299..27681f3 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// An iterator that iterates two other iterators simultaneously. + /// + /// This `struct` is created by the [`zip`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`zip`]: trait.Stream.html#method.zip + /// [`Stream`]: trait.Stream.html pub struct Zip { item_slot: Option, #[pin] From 4475a229d66ab1b7b862019e59421ad0dad456b1 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Sun, 27 Oct 2019 22:40:49 +0100 Subject: [PATCH 49/55] backlink io docs Signed-off-by: Yoshua Wuyts --- src/io/empty.rs | 5 +++-- src/io/repeat.rs | 3 ++- src/io/sink.rs | 3 ++- src/io/stderr.rs | 16 +++++++++++++--- src/io/stdin.rs | 16 +++++++++++++--- src/io/stdout.rs | 16 +++++++++++++--- src/stream/repeat.rs | 2 +- 7 files changed, 47 insertions(+), 14 deletions(-) diff --git a/src/io/empty.rs b/src/io/empty.rs index d8d768e..9044267 100644 --- a/src/io/empty.rs +++ b/src/io/empty.rs @@ -28,9 +28,10 @@ pub fn empty() -> Empty { /// A reader that contains no data. /// -/// This reader is constructed by the [`sink`] function. +/// This reader is created by the [`empty`] function. See its +/// documentation for more. /// -/// [`sink`]: fn.sink.html +/// [`empty`]: fn.empty.html pub struct Empty { _private: (), } diff --git a/src/io/repeat.rs b/src/io/repeat.rs index a82e21b..5636817 100644 --- a/src/io/repeat.rs +++ b/src/io/repeat.rs @@ -29,7 +29,8 @@ pub fn repeat(byte: u8) -> Repeat { /// A reader which yields one byte over and over and over and over and over and... /// -/// This reader is constructed by the [`repeat`] function. +/// This reader is created by the [`repeat`] function. See its +/// documentation for more. /// /// [`repeat`]: fn.repeat.html pub struct Repeat { diff --git a/src/io/sink.rs b/src/io/sink.rs index faa763c..86aeb0a 100644 --- a/src/io/sink.rs +++ b/src/io/sink.rs @@ -25,7 +25,8 @@ pub fn sink() -> Sink { /// A writer that consumes and drops all data. /// -/// This writer is constructed by the [`sink`] function. +/// This writer is constructed by the [`sink`] function. See its documentation +/// for more. /// /// [`sink`]: fn.sink.html pub struct Sink { diff --git a/src/io/stderr.rs b/src/io/stderr.rs index 1ec28b2..0a8c470 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -11,6 +11,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll}; /// /// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html /// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// /// # Examples /// /// ```no_run @@ -34,12 +40,16 @@ pub fn stderr() -> Stderr { /// A handle to the standard error of the current process. /// -/// Created by the [`stderr`] function. +/// This writer is created by the [`stderr`] function. See its documentation for +/// more. +/// +/// ### Note: Windows Portability Consideration /// -/// This type is an async version of [`std::io::Stderr`]. +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. /// /// [`stderr`]: fn.stderr.html -/// [`std::io::Stderr`]: https://doc.rust-lang.org/std/io/struct.Stderr.html #[derive(Debug)] pub struct Stderr(Mutex); diff --git a/src/io/stdin.rs b/src/io/stdin.rs index dd3991f..22b9cf3 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -11,6 +11,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll}; /// /// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html /// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// /// # Examples /// /// ```no_run @@ -35,12 +41,16 @@ pub fn stdin() -> Stdin { /// A handle to the standard input of the current process. /// -/// Created by the [`stdin`] function. +/// This reader is created by the [`stdin`] function. See its documentation for +/// more. +/// +/// ### Note: Windows Portability Consideration /// -/// This type is an async version of [`std::io::Stdin`]. +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. /// /// [`stdin`]: fn.stdin.html -/// [`std::io::Stdin`]: https://doc.rust-lang.org/std/io/struct.Stdin.html #[derive(Debug)] pub struct Stdin(Mutex); diff --git a/src/io/stdout.rs b/src/io/stdout.rs index 7945bfd..1e9340f 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -11,6 +11,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll}; /// /// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html /// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// /// # Examples /// /// ```no_run @@ -34,12 +40,16 @@ pub fn stdout() -> Stdout { /// A handle to the standard output of the current process. /// -/// Created by the [`stdout`] function. +/// This writer is created by the [`stdout`] function. See its documentation +/// for more. +/// +/// ### Note: Windows Portability Consideration /// -/// This type is an async version of [`std::io::Stdout`]. +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. /// /// [`stdout`]: fn.stdout.html -/// [`std::io::Stdout`]: https://doc.rust-lang.org/std/io/struct.Stdout.html #[derive(Debug)] pub struct Stdout(Mutex); diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index abccb43..aaaff0c 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -32,7 +32,7 @@ where /// This stream is created by the [`repeat`] function. See its /// documentation for more. /// -/// [`repeat`]: fn.once.html +/// [`repeat`]: fn.repeat.html #[derive(Debug)] pub struct Repeat { item: T, From 613895d6be615430beb322c766b940387bd2992c Mon Sep 17 00:00:00 2001 From: k-nasa Date: Mon, 28 Oct 2019 13:58:54 +0900 Subject: [PATCH 50/55] doc: fix documantation text --- src/future/future.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/future/future.rs b/src/future/future.rs index e8075f1..fe68517 100644 --- a/src/future/future.rs +++ b/src/future/future.rs @@ -107,7 +107,7 @@ extension_trait! { } pub trait FutureExt: std::future::Future { - /// Creates a future that is delayed before it starts yielding items. + /// Returns a Future that delays execution for a specified time. /// /// # Examples /// From 434638661027d596b200da305fecda0b3ff48190 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 28 Oct 2019 12:42:23 +0100 Subject: [PATCH 51/55] fix doc recursion limit Signed-off-by: Yoshua Wuyts --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index ad5aa8f..b659c39 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,7 +47,7 @@ #![doc(test(attr(deny(rust_2018_idioms, warnings))))] #![doc(test(attr(allow(unused_extern_crates, unused_variables))))] #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] -#![recursion_limit = "1024"] +#![recursion_limit = "2048"] #[macro_use] mod utils; From c7dc147f739d3be5917b254cf85fd0733bd4f014 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Tue, 29 Oct 2019 09:27:35 +0900 Subject: [PATCH 52/55] fix indent --- src/future/future/delay.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/future/future/delay.rs b/src/future/future/delay.rs index 53a4d75..d672541 100644 --- a/src/future/future/delay.rs +++ b/src/future/future/delay.rs @@ -8,8 +8,8 @@ use crate::future::Future; use crate::task::{Context, Poll}; pin_project! { -#[doc(hidden)] -#[derive(Debug)] + #[doc(hidden)] + #[derive(Debug)] pub struct DelayFuture { #[pin] future: F, From 688976203e0ff6f564647f2f21b89fea7fafae88 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Tue, 29 Oct 2019 09:52:50 +0900 Subject: [PATCH 53/55] fix: Split FlattenCompat logic to Flatten and FlatMap --- src/stream/stream/flat_map.rs | 62 +++++++++++++++++++ src/stream/stream/flatten.rs | 112 +++------------------------------- src/stream/stream/mod.rs | 4 +- 3 files changed, 72 insertions(+), 106 deletions(-) create mode 100644 src/stream/stream/flat_map.rs diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs new file mode 100644 index 0000000..ed3268e --- /dev/null +++ b/src/stream/stream/flat_map.rs @@ -0,0 +1,62 @@ +use pin_project_lite::pin_project; +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::stream::map::Map; +use crate::stream::{IntoStream, Stream}; +use crate::task::{Context, Poll}; + +pin_project! { + /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`flat_map`]: trait.Stream.html#method.flat_map + /// [`Stream`]: trait.Stream.html + #[allow(missing_debug_implementations)] + pub struct FlatMap { + #[pin] + stream: Map, + #[pin] + inner_stream: Option, + } +} + +impl FlatMap +where + S: Stream, + U: IntoStream, + F: FnMut(S::Item) -> U, +{ + pub(super) fn new(stream: S, f: F) -> FlatMap { + FlatMap { + stream: stream.map(f), + inner_stream: None, + } + } +} + +impl Stream for FlatMap +where + S: Stream, + S::Item: IntoStream, + U: Stream, + F: FnMut(S::Item) -> U, +{ + type Item = U::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + loop { + if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { + if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) { + return Poll::Ready(item); + } + } + + match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { + None => return Poll::Ready(None), + Some(inner) => this.inner_stream.set(Some(inner.into_stream())), + } + } + } +} diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 2533f37..2ea0673 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -6,46 +6,6 @@ use crate::stream::stream::map::Map; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; -pin_project! { - /// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its - /// documentation for more. - /// - /// [`flat_map`]: trait.Stream.html#method.flat_map - /// [`Stream`]: trait.Stream.html - #[allow(missing_debug_implementations)] - pub struct FlatMap { - #[pin] - inner: FlattenCompat, U>, - } -} - -impl FlatMap -where - S: Stream, - U: IntoStream, - F: FnMut(S::Item) -> U, -{ - pub(super) fn new(stream: S, f: F) -> FlatMap { - FlatMap { - inner: FlattenCompat::new(stream.map(f)), - } - } -} - -impl Stream for FlatMap -where - S: Stream, - S::Item: IntoStream, - U: Stream, - F: FnMut(S::Item) -> U, -{ - type Item = U::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next(cx) - } -} - pin_project! { /// This `struct` is created by the [`flatten`] method on [`Stream`]. See its /// documentation for more. @@ -55,7 +15,9 @@ pin_project! { #[allow(missing_debug_implementations)] pub struct Flatten { #[pin] - inner: FlattenCompat + stream: S, + #[pin] + inner_stream: Option, } } @@ -66,47 +28,13 @@ where { pub(super) fn new(stream: S) -> Flatten { Flatten { - inner: FlattenCompat::new(stream), - } - } -} - -impl Stream for Flatten::IntoStream> -where - S: Stream, - S::Item: IntoStream, - U: Stream, -{ - type Item = U::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().inner.poll_next(cx) - } -} - -pin_project! { - /// Real logic of both `Flatten` and `FlatMap` which simply delegate to - /// this type. - #[derive(Clone, Debug)] - struct FlattenCompat { - #[pin] - stream: S, - #[pin] - frontiter: Option, - } -} - -impl FlattenCompat { - /// Adapts an iterator by flattening it, for use in `flatten()` and `flat_map()`. - fn new(stream: S) -> FlattenCompat { - FlattenCompat { stream, - frontiter: None, + inner_stream: None, } } } -impl Stream for FlattenCompat +impl Stream for Flatten::IntoStream> where S: Stream, S::Item: IntoStream, @@ -117,7 +45,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); loop { - if let Some(inner) = this.frontiter.as_mut().as_pin_mut() { + if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() { if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) { return Poll::Ready(item); } @@ -125,34 +53,8 @@ where match futures_core::ready!(this.stream.as_mut().poll_next(cx)) { None => return Poll::Ready(None), - Some(inner) => this.frontiter.set(Some(inner.into_stream())), + Some(inner) => this.inner_stream.set(Some(inner.into_stream())), } } } } - -#[cfg(test)] -mod tests { - use super::FlattenCompat; - - use crate::prelude::*; - use crate::task; - - use std::collections::VecDeque; - - #[test] - fn test_poll_next() -> std::io::Result<()> { - let inner1: VecDeque = vec![1, 2, 3].into_iter().collect(); - let inner2: VecDeque = vec![4, 5, 6].into_iter().collect(); - - let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect(); - - task::block_on(async move { - let flat = FlattenCompat::new(s); - let v: Vec = flat.collect().await; - - assert_eq!(v, vec![1, 2, 3, 4, 5, 6]); - Ok(()) - }) - } -} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 596dc41..29e68fc 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -99,10 +99,12 @@ cfg_unstable! { use crate::stream::into_stream::IntoStream; pub use merge::Merge; - pub use flatten::{FlatMap, Flatten}; + pub use flatten::Flatten; + pub use flat_map::FlatMap; mod merge; mod flatten; + mod flat_map; } extension_trait! { From ae7adf2c366e388a59ef3417dc7726892dfcf14c Mon Sep 17 00:00:00 2001 From: k-nasa Date: Tue, 29 Oct 2019 10:01:41 +0900 Subject: [PATCH 54/55] fix: Remove unused import --- src/stream/stream/flatten.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 2ea0673..f004986 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,8 +1,8 @@ use pin_project_lite::pin_project; use std::pin::Pin; -use crate::prelude::*; -use crate::stream::stream::map::Map; + + use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; From 1554b0440743a7551ed3c23a88586a5832dca592 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Tue, 29 Oct 2019 10:12:22 +0900 Subject: [PATCH 55/55] $cargo fmt --- src/stream/stream/flatten.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index f004986..5e791cd 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,8 +1,6 @@ use pin_project_lite::pin_project; use std::pin::Pin; - - use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll};