From 4e5828e64669516a28acb63f1fc11aaa2cbbefc5 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 23 Oct 2019 16:46:11 +0800 Subject: [PATCH 01/31] add stream::max_by method --- src/stream/stream/max_by.rs | 56 +++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 41 +++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/stream/stream/max_by.rs diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs new file mode 100644 index 0000000..d25a869 --- /dev/null +++ b/src/stream/stream/max_by.rs @@ -0,0 +1,56 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct MaxByFuture { + stream: S, + compare: F, + max: Option, +} + +impl MaxByFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(compare: F); + pin_utils::unsafe_unpinned!(max: Option); + + pub(super) fn new(stream: S, compare: F) -> Self { + MaxByFuture { + stream, + compare, + max: None, + } + } +} + +impl Future for MaxByFuture +where + S: Stream + Unpin + Sized, + S::Item: Copy, + F: FnMut(&S::Item, &S::Item) -> Ordering, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(new) => { + cx.waker().wake_by_ref(); + match self.as_mut().max().take() { + None => *self.as_mut().max() = Some(new), + Some(old) => match (&mut self.as_mut().compare())(&new, &old) { + Ordering::Greater => *self.as_mut().max() = Some(new), + _ => *self.as_mut().max() = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(self.max), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1..2ac4d70 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -40,6 +40,7 @@ mod last; mod le; mod lt; mod map; +mod max_by; mod min_by; mod next; mod nth; @@ -68,6 +69,7 @@ use gt::GtFuture; use last::LastFuture; use le::LeFuture; use lt::LtFuture; +use max_by::MaxByFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; @@ -639,6 +641,45 @@ extension_trait! { MinByFuture::new(self, compare) } + #[doc = r#" + Returns the element that gives the minimum value with respect to the + specified comparison function. If several elements are equally minimum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + + let max = s.clone().max_by(|x, y| x.cmp(y)).await; + assert_eq!(max, Some(3)); + + let max = s.max_by(|x, y| y.cmp(x)).await; + assert_eq!(max, Some(1)); + + let max = VecDeque::::new().max_by(|x, y| x.cmp(y)).await; + assert_eq!(max, None); + # + # }) } + ``` + "#] + fn max_by( + self, + compare: F, + ) -> impl Future> [MaxByFuture] + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + MaxByFuture::new(self, compare) + } + #[doc = r#" Returns the nth element of the stream. From 020eb85093d714b25015980ed0aeeb148ae25ec3 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 23 Oct 2019 18:59:06 +0800 Subject: [PATCH 02/31] add stream::min_by_key method --- src/stream/stream/min_by_key.rs | 58 +++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 38 +++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 src/stream/stream/min_by_key.rs diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs new file mode 100644 index 0000000..2cc34a0 --- /dev/null +++ b/src/stream/stream/min_by_key.rs @@ -0,0 +1,58 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct MinByKeyFuture { + stream: S, + min: Option, + key_by: K, +} + +impl MinByKeyFuture { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(min: Option); + pin_utils::unsafe_unpinned!(key_by: K); + + pub(super) fn new(stream: S, key_by: K) -> Self { + MinByKeyFuture { + stream, + min: None, + key_by, + } + } +} + +impl Future for MinByKeyFuture +where + S: Stream + Unpin + Sized, + K: FnMut(&S::Item) -> S::Item, + S::Item: Ord + Copy, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match next { + Some(new) => { + let new = self.as_mut().key_by()(&new); + cx.waker().wake_by_ref(); + match self.as_mut().min().take() { + None => *self.as_mut().min() = Some(new), + + Some(old) => match new.cmp(&old) { + Ordering::Less => *self.as_mut().min() = Some(new), + _ => *self.as_mut().min() = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(self.min), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1..56dda48 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -53,6 +53,7 @@ mod take_while; mod try_fold; mod try_for_each; mod zip; +mod min_by_key; use all::AllFuture; use any::AnyFuture; @@ -74,6 +75,7 @@ use nth::NthFuture; use partial_cmp::PartialCmpFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; +use min_by_key::MinByKeyFuture; pub use chain::Chain; pub use filter::Filter; @@ -600,6 +602,42 @@ extension_trait! { FilterMap::new(self, f) } + #[doc = r#" + Returns the element that gives the minimum value with respect to the + specified key function. If several elements are equally minimum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, -3].into_iter().collect(); + + let min = s.clone().min_by_key(|x| x.abs()).await; + assert_eq!(min, Some(1)); + + let min = VecDeque::::new().min_by_key(|x| x.abs()).await; + assert_eq!(min, None); + # + # }) } + ``` + "#] + fn min_by_key( + self, + key_by: K, + ) -> impl Future> [MinByKeyFuture] + where + Self: Sized, + K: FnMut(&Self::Item) -> Self::Item, + { + MinByKeyFuture::new(self, key_by) + } + #[doc = r#" Returns the element that gives the minimum value with respect to the specified comparison function. If several elements are equally minimum, From d6f940110b2411ea06f2ed8b27f7d1b4e57bafe7 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 23 Oct 2019 19:04:04 +0800 Subject: [PATCH 03/31] update doc --- src/stream/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2ac4d70..cccd8b2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -642,8 +642,8 @@ extension_trait! { } #[doc = r#" - Returns the element that gives the minimum value with respect to the - specified comparison function. If several elements are equally minimum, + Returns the element that gives the maximum value with respect to the + specified comparison function. If several elements are equally maximum, the first element is returned. If the stream is empty, `None` is returned. # Examples From f5a0a0ba86e93b0d1dd7f4b091fef814b5e30849 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Wed, 23 Oct 2019 19:17:24 +0800 Subject: [PATCH 04/31] fmt --- src/stream/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 56dda48..b5011c2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -41,6 +41,7 @@ mod le; mod lt; mod map; mod min_by; +mod min_by_key; mod next; mod nth; mod partial_cmp; @@ -53,7 +54,6 @@ mod take_while; mod try_fold; mod try_for_each; mod zip; -mod min_by_key; use all::AllFuture; use any::AnyFuture; @@ -70,12 +70,12 @@ use last::LastFuture; use le::LeFuture; use lt::LtFuture; use min_by::MinByFuture; +use min_by_key::MinByKeyFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; -use min_by_key::MinByKeyFuture; pub use chain::Chain; pub use filter::Filter; From 37a7eadf17c55d0cb524080d34c2f30d848ecd7e Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Sat, 26 Oct 2019 11:52:41 +0800 Subject: [PATCH 05/31] use pin_project_lite --- src/stream/stream/max_by.rs | 38 +++++++++++++++++++------------------ 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index d25a869..d3d640b 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -1,23 +1,24 @@ use std::cmp::Ordering; use std::pin::Pin; +use pin_project_lite::pin_project; + use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; -#[doc(hidden)] -#[allow(missing_debug_implementations)] -pub struct MaxByFuture { - stream: S, - compare: F, - max: Option, +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct MaxByFuture { + #[pin] + stream: S, + compare: F, + max: Option, + } } impl MaxByFuture { - pin_utils::unsafe_pinned!(stream: S); - pin_utils::unsafe_unpinned!(compare: F); - pin_utils::unsafe_unpinned!(max: Option); - pub(super) fn new(stream: S, compare: F) -> Self { MaxByFuture { stream, @@ -35,22 +36,23 @@ where { type Output = Option; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); match next { Some(new) => { cx.waker().wake_by_ref(); - match self.as_mut().max().take() { - None => *self.as_mut().max() = Some(new), - Some(old) => match (&mut self.as_mut().compare())(&new, &old) { - Ordering::Greater => *self.as_mut().max() = Some(new), - _ => *self.as_mut().max() = Some(old), + match this.max.take() { + None => *this.max = Some(new), + Some(old) => match (this.compare)(&new, &old) { + Ordering::Greater => *this.max = Some(new), + _ => *this.max = Some(old), }, } Poll::Pending } - None => Poll::Ready(self.max), + None => Poll::Ready(*this.max), } } } From 006fc7e9de2fca94dfa807ef477667a9695aa05d Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:17:42 +0800 Subject: [PATCH 06/31] Update src/stream/stream/max_by.rs Co-Authored-By: Taiki Endo --- src/stream/stream/max_by.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index d3d640b..a41f49a 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -30,7 +30,7 @@ impl MaxByFuture { impl Future for MaxByFuture where - S: Stream + Unpin + Sized, + S: Stream, S::Item: Copy, F: FnMut(&S::Item, &S::Item) -> Ordering, { From a8d3d1483f29060b15ee4df80852a6a67603569f Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:17:50 +0800 Subject: [PATCH 07/31] Update src/stream/stream/max_by.rs Co-Authored-By: Taiki Endo --- src/stream/stream/max_by.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index a41f49a..6cd9e56 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -31,7 +31,6 @@ impl MaxByFuture { impl Future for MaxByFuture where S: Stream, - S::Item: Copy, F: FnMut(&S::Item, &S::Item) -> Ordering, { type Output = Option; From b57849e1cb6a2d473c0a1a62ff807ef237b99ff0 Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:18:01 +0800 Subject: [PATCH 08/31] Update src/stream/stream/max_by.rs Co-Authored-By: Taiki Endo --- src/stream/stream/max_by.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index 6cd9e56..a626b28 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -51,7 +51,7 @@ where } Poll::Pending } - None => Poll::Ready(*this.max), + None => Poll::Ready(this.max.take()), } } } From 5a4fdeb1cd6ce25192298e01fa5a5792279570cd Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:18:18 +0800 Subject: [PATCH 09/31] Update src/stream/stream/min_by_key.rs Co-Authored-By: Taiki Endo --- src/stream/stream/min_by_key.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index cac6073..d6dda2f 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -30,7 +30,7 @@ impl MinByKeyFuture { impl Future for MinByKeyFuture where - S: Stream + Unpin + Sized, + S: Stream, K: FnMut(&S::Item) -> S::Item, S::Item: Ord + Copy, { From fb78ed18124780164a9cc4b1986103649dec6979 Mon Sep 17 00:00:00 2001 From: yjh Date: Sun, 27 Oct 2019 00:19:49 +0800 Subject: [PATCH 10/31] Update src/stream/stream/min_by_key.rs Co-Authored-By: Taiki Endo --- src/stream/stream/min_by_key.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index d6dda2f..a482c63 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -32,7 +32,7 @@ impl Future for MinByKeyFuture where S: Stream, K: FnMut(&S::Item) -> S::Item, - S::Item: Ord + Copy, + S::Item: Ord, { type Output = Option; From 7cfec4e8cec531792330e4caeb72a9145ea7a941 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Sun, 27 Oct 2019 00:26:19 +0800 Subject: [PATCH 11/31] use take and remove Copy --- src/stream/stream/min_by_key.rs | 2 +- src/stream/stream/mod.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index a482c63..6557f22 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -54,7 +54,7 @@ where } Poll::Pending } - None => Poll::Ready(*this.min), + None => Poll::Ready(this.min.take()), } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index b5011c2..ca43e7d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -633,6 +633,7 @@ extension_trait! { ) -> impl Future> [MinByKeyFuture] where Self: Sized, + Self::Item: Ord, K: FnMut(&Self::Item) -> Self::Item, { MinByKeyFuture::new(self, key_by) From 4c4604d63ec1305ae10481e956a5309b33c3f483 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 28 Oct 2019 00:08:32 +0100 Subject: [PATCH 12/31] add stream mod docs Signed-off-by: Yoshua Wuyts --- src/stream/mod.rs | 297 ++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 288 insertions(+), 9 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index e796510..a95e918 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -2,24 +2,303 @@ //! //! This module is an async version of [`std::iter`]. //! -//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html +//! If you've found yourself with an asynchronous collection of some kind, +//! and needed to perform an operation on the elements of said collection, +//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic +//! asynchronous Rust code, so it's worth becoming familiar with them. //! -//! # Examples +//! Before explaining more, let's talk about how this module is structured: +//! +//! # Organization +//! +//! This module is largely organized by type: +//! +//! * [Traits] are the core portion: these traits define what kind of streams +//! exist and what you can do with them. The methods of these traits are worth +//! putting some extra study time into. +//! * [Functions] provide some helpful ways to create some basic streams. +//! * [Structs] are often the return types of the various methods on this +//! module's traits. You'll usually want to look at the method that creates +//! the `struct`, rather than the `struct` itself. For more detail about why, +//! see '[Implementing Stream](#implementing-stream)'. +//! +//! [Traits]: #traits +//! [Functions]: #functions +//! [Structs]: #structs +//! +//! That's it! Let's dig into streams. +//! +//! # Stream +//! +//! The heart and soul of this module is the [`Stream`] trait. The core of +//! [`Stream`] looks like this: //! //! ``` -//! # async_std::task::block_on(async { +//! # use async_std::task::{Context, Poll}; +//! # use std::pin::Pin; +//! trait Stream { +//! type Item; +//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; +//! } +//! ``` +//! +//! A stream has a method, [`next`], which when called, returns an +//! [`Poll`]<[`Option`]`>`. [`next`] will return `Ready(Some(Item))` +//! as long as there are elements, and once they've all been exhausted, will +//! return `None` to indicate that iteration is finished. If we're waiting on +//! something asynchronous to resolve `Pending` is returned. +//! +//! Individual streams may choose to resume iteration, and so calling +//! [`next`] again may or may not eventually start returning `Ready(Some(Item))` +//! again at some point. +//! +//! [`Stream`]'s full definition includes a number of other methods as well, +//! but they are default methods, built on top of [`next`], and so you get +//! them for free. +//! +//! Streams are also composable, and it's common to chain them together to do +//! more complex forms of processing. See the [Adapters](#adapters) section +//! below for more details. +//! +//! [`Poll`]: ../task/enum.Poll.html +//! [`Stream`]: trait.Stream.html +//! [`next`]: trait.Stream.html#tymethod.next +//! [`Option`]: ../../std/option/enum.Option.html +//! +//! # The three forms of streaming +//! +//! There are three common methods which can create streams from a collection: +//! +//! * `stream()`, which iterates over `&T`. +//! * `stream_mut()`, which iterates over `&mut T`. +//! * `into_stream()`, which iterates over `T`. +//! +//! Various things in async-std may implement one or more of the +//! three, where appropriate. +//! +//! # Implementing Stream +//! +//! Creating a stream of your own involves two steps: creating a `struct` to +//! hold the stream's state, and then `impl`ementing [`Stream`] for that +//! `struct`. This is why there are so many `struct`s in this module: there is +//! one for each stream and iterator adapter. +//! +//! Let's make a stream named `Counter` which counts from `1` to `5`: +//! +//! ``` +//! # use async_std::prelude::*; +//! # use async_std::task::{Context, Poll}; +//! # use std::pin::Pin; +//! // First, the struct: +//! +//! /// A stream which counts from one to five +//! struct Counter { +//! count: usize, +//! } +//! +//! // we want our count to start at one, so let's add a new() method to help. +//! // This isn't strictly necessary, but is convenient. Note that we start +//! // `count` at zero, we'll see why in `next()`'s implementation below. +//! impl Counter { +//! fn new() -> Counter { +//! Counter { count: 0 } +//! } +//! } +//! +//! // Then, we implement `Stream` for our `Counter`: +//! +//! impl Stream for Counter { +//! // we will be counting with usize +//! type Item = usize; +//! +//! // poll_next() is the only required method +//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +//! // Increment our count. This is why we started at zero. +//! self.count += 1; +//! +//! // Check to see if we've finished counting or not. +//! if self.count < 6 { +//! Poll::Ready(Some(self.count)) +//! } else { +//! Poll::Ready(None) +//! } +//! } +//! } +//! +//! // And now we can use it! +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # -//! use async_std::prelude::*; -//! use async_std::stream; +//! let mut counter = Counter::new(); //! -//! let mut s = stream::repeat(9).take(3); +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); //! -//! while let Some(v) = s.next().await { -//! assert_eq!(v, 9); +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! # +//! # Ok(()) }) } +//! ``` +//! +//! This will print `1` through `5`, each on their own line. +//! +//! Calling `next().await` this way gets repetitive. Rust has a construct which +//! can call `next()` on your stream, until it reaches `None`. Let's go over +//! that next. +//! +//! # while let Loops and IntoStream +//! +//! Rust's `while let` loop syntax is actually sugar for streams. Here's a basic +//! example of `while let`: +//! +//! ``` +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! # use async_std::prelude::*; +//! # use async_std::stream; +//! let mut values = stream::repeat(1u8).take(5); +//! +//! while let Some(x) = values.next().await { +//! println!("{}", x); //! } //! # -//! # }) +//! # Ok(()) }) } //! ``` +//! +//! This will print the numbers one through five, each on their own line. But +//! you'll notice something here: we never called anything on our vector to +//! produce a stream. What gives? +//! +//! There's a trait in the standard library for converting something into an +//! stream: [`IntoStream`]. This trait has one method, [`into_stream], +//! which converts the thing implementing [`IntoStream`] into a stream. +//! +//! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler +//! support yet. This means that automatic conversions like with `for` loops +//! doesn't occur yet, and `into_stream` will always have to be called manually. +//! +//! [`IntoStream`]: trait.IntoStream.html +//! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream +//! +//! # Adapters +//! +//! Functions which take an [`Stream`] and return another [`Stream`] are +//! often called 'stream adapters', as they're a form of the 'adapter +//! pattern'. +//! +//! Common stream adapters include [`map`], [`take`], and [`filter`]. +//! For more, see their documentation. +//! +//! [`map`]: trait.Stream.html#method.map +//! [`take`]: trait.Stream.html#method.take +//! [`filter`]: trait.Stream.html#method.filter +//! +//! # Laziness +//! +//! Streams (and stream [adapters](#adapters)) are *lazy*. This means that +//! just creating a stream doesn't _do_ a whole lot. Nothing really happens +//! until you call [`next`]. This is sometimes a source of confusion when +//! creating a stream solely for its side effects. For example, the [`map`] +//! method calls a closure on each element it iterates over: +//! +//! ``` +//! # #![allow(unused_must_use)] +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! # use async_std::prelude::*; +//! # use async_std::stream; +//! let v = stream::repeat(1u8).take(5); +//! v.map(|x| println!("{}", x)); +//! # +//! # Ok(()) }) } +//! ``` +//! +//! This will not print any values, as we only created a stream, rather than +//! using it. The compiler will warn us about this kind of behavior: +//! +//! ```text +//! warning: unused result that must be used: streams are lazy and +//! do nothing unless consumed +//! ``` +//! +//! The idiomatic way to write a [`map`] for its side effects is to use a +//! `while let` loop instead: +//! +//! ``` +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! # use async_std::prelude::*; +//! # use async_std::stream; +//! let mut v = stream::repeat(1u8).take(5); +//! +//! while let Some(x) = &v.next().await { +//! println!("{}", x); +//! } +//! # +//! # Ok(()) }) } +//! ``` +//! +//! [`map`]: trait.Stream.html#method.map +//! +//! The two most common ways to evaluate a stream are to use a `while let` loop +//! like this, or using the [`collect`] method to produce a new collection. +//! +//! [`collect`]: trait.Stream.html#method.collect +//! +//! # Infinity +//! +//! Streams do not have to be finite. As an example, an repeat stream is +//! an infinite stream: +//! +//! ``` +//! # use async_std::stream; +//! let numbers = stream::repeat(1u8); +//! ``` +//! +//! It is common to use the [`take`] stream adapter to turn an infinite +//! stream into a finite one: +//! +//! ``` +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! # use async_std::prelude::*; +//! # use async_std::stream; +//! let numbers = stream::repeat(1u8); +//! let mut five_numbers = numbers.take(5); +//! +//! while let Some(number) = five_numbers.next().await { +//! println!("{}", number); +//! } +//! # +//! # Ok(()) }) } +//! ``` +//! +//! This will print the numbers `0` through `4`, each on their own line. +//! +//! Bear in mind that methods on infinite streams, even those for which a +//! result can be determined mathematically in finite time, may not terminate. +//! Specifically, methods such as [`min`], which in the general case require +//! traversing every element in the stream, are likely not to return +//! successfully for any infinite streams. +//! +//! ```ignore +//! let ones = async_std::stream::repeat(1); +//! let least = ones.min().await.unwrap(); // Oh no! An infinite loop! +//! // `ones.min()` causes an infinite loop, so we won't reach this point! +//! println!("The smallest number one is {}.", least); +//! ``` +//! +//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html +//! [`take`]: trait.Stream.html#method.take +//! [`min`]: trait.Stream.html#method.min pub use empty::{empty, Empty}; pub use from_fn::{from_fn, FromFn}; From 20abd5cebfd7baf15108949ffac446af9d88d2b4 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 28 Oct 2019 00:15:13 +0100 Subject: [PATCH 13/31] standardize net docs Signed-off-by: Yoshua Wuyts --- src/net/mod.rs | 34 +++++++++++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/net/mod.rs b/src/net/mod.rs index b3ae287..29e4309 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,14 +1,42 @@ //! Networking primitives for TCP/UDP communication. //! -//! For OS-specific networking primitives like Unix domain sockets, refer to the [`async_std::os`] -//! module. +//! This module provides networking functionality for the Transmission Control and User +//! Datagram Protocols, as well as types for IP and socket addresses. //! //! This module is an async version of [`std::net`]. //! +//! # Organization +//! +//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP +//! * [`UdpSocket`] provides functionality for communication over UDP +//! * [`IpAddr`] represents IP addresses of either IPv4 or IPv6; [`Ipv4Addr`] and +//! [`Ipv6Addr`] are respectively IPv4 and IPv6 addresses +//! * [`SocketAddr`] represents socket addresses of either IPv4 or IPv6; [`SocketAddrV4`] +//! and [`SocketAddrV6`] are respectively IPv4 and IPv6 socket addresses +//! * [`ToSocketAddrs`] is a trait that used for generic address resolution when interacting +//! with networking objects like [`TcpListener`], [`TcpStream`] or [`UdpSocket`] +//! * Other types are return or parameter types for various methods in this module +//! +//! [`IpAddr`]: enum.IpAddr.html +//! [`Ipv4Addr`]: struct.Ipv4Addr.html +//! [`Ipv6Addr`]: struct.Ipv6Addr.html +//! [`SocketAddr`]: enum.SocketAddr.html +//! [`SocketAddrV4`]: struct.SocketAddrV4.html +//! [`SocketAddrV6`]: struct.SocketAddrV6.html +//! [`TcpListener`]: struct.TcpListener.html +//! [`TcpStream`]: struct.TcpStream.html +//! [`ToSocketAddrs`]: trait.ToSocketAddrs.html +//! [`UdpSocket`]: struct.UdpSocket.html +//! +//! # Platform-specific extensions +//! +//! APIs such as Unix domain sockets are available on certain platforms only. You can find +//! platform-specific extensions in the [`async_std::os`] module. +//! //! [`async_std::os`]: ../os/index.html //! [`std::net`]: https://doc.rust-lang.org/std/net/index.html //! -//! ## Examples +//! # Examples //! //! A simple UDP echo server: //! From 5f8e2cbd4a4917b0444447c1c73905bef9341c0d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 28 Oct 2019 00:34:27 +0100 Subject: [PATCH 14/31] add mod level docs for sync Signed-off-by: Yoshua Wuyts --- src/sync/mod.rs | 143 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3ad2776..0fe7322 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -4,6 +4,149 @@ //! //! [`std::sync`]: https://doc.rust-lang.org/std/sync/index.html //! +//! ## The need for synchronization +//! +//! Conceptually, a Rust program is a series of operations which will +//! be executed on a computer. The timeline of events happening in the +//! program is consistent with the order of the operations in the code. +//! +//! Consider the following code, operating on some global static variables: +//! +//! ```rust +//! static mut A: u32 = 0; +//! static mut B: u32 = 0; +//! static mut C: u32 = 0; +//! +//! fn main() { +//! unsafe { +//! A = 3; +//! B = 4; +//! A = A + B; +//! C = B; +//! println!("{} {} {}", A, B, C); +//! C = A; +//! } +//! } +//! ``` +//! +//! It appears as if some variables stored in memory are changed, an addition +//! is performed, result is stored in `A` and the variable `C` is +//! modified twice. +//! +//! When only a single thread is involved, the results are as expected: +//! the line `7 4 4` gets printed. +//! +//! As for what happens behind the scenes, when optimizations are enabled the +//! final generated machine code might look very different from the code: +//! +//! - The first store to `C` might be moved before the store to `A` or `B`, +//! _as if_ we had written `C = 4; A = 3; B = 4`. +//! +//! - Assignment of `A + B` to `A` might be removed, since the sum can be stored +//! in a temporary location until it gets printed, with the global variable +//! never getting updated. +//! +//! - The final result could be determined just by looking at the code +//! at compile time, so [constant folding] might turn the whole +//! block into a simple `println!("7 4 4")`. +//! +//! The compiler is allowed to perform any combination of these +//! optimizations, as long as the final optimized code, when executed, +//! produces the same results as the one without optimizations. +//! +//! Due to the [concurrency] involved in modern computers, assumptions +//! about the program's execution order are often wrong. Access to +//! global variables can lead to nondeterministic results, **even if** +//! compiler optimizations are disabled, and it is **still possible** +//! to introduce synchronization bugs. +//! +//! Note that thanks to Rust's safety guarantees, accessing global (static) +//! variables requires `unsafe` code, assuming we don't use any of the +//! synchronization primitives in this module. +//! +//! [constant folding]: https://en.wikipedia.org/wiki/Constant_folding +//! [concurrency]: https://en.wikipedia.org/wiki/Concurrency_(computer_science) +//! +//! ## Out-of-order execution +//! +//! Instructions can execute in a different order from the one we define, due to +//! various reasons: +//! +//! - The **compiler** reordering instructions: If the compiler can issue an +//! instruction at an earlier point, it will try to do so. For example, it +//! might hoist memory loads at the top of a code block, so that the CPU can +//! start [prefetching] the values from memory. +//! +//! In single-threaded scenarios, this can cause issues when writing +//! signal handlers or certain kinds of low-level code. +//! Use [compiler fences] to prevent this reordering. +//! +//! - A **single processor** executing instructions [out-of-order]: +//! Modern CPUs are capable of [superscalar] execution, +//! i.e., multiple instructions might be executing at the same time, +//! even though the machine code describes a sequential process. +//! +//! This kind of reordering is handled transparently by the CPU. +//! +//! - A **multiprocessor** system executing multiple hardware threads +//! at the same time: In multi-threaded scenarios, you can use two +//! kinds of primitives to deal with synchronization: +//! - [memory fences] to ensure memory accesses are made visible to +//! other CPUs in the right order. +//! - [atomic operations] to ensure simultaneous access to the same +//! memory location doesn't lead to undefined behavior. +//! +//! [prefetching]: https://en.wikipedia.org/wiki/Cache_prefetching +//! [compiler fences]: https://doc.rust-lang.org/std/sync/atomic/fn.compiler_fence.html +//! [out-of-order]: https://en.wikipedia.org/wiki/Out-of-order_execution +//! [superscalar]: https://en.wikipedia.org/wiki/Superscalar_processor +//! [memory fences]: https://doc.rust-lang.org/std/sync/atomic/fn.fence.html +//! [atomic operations]: https://doc.rust-lang.org/std/sync/atomic/index.html +//! +//! ## Higher-level synchronization objects +//! +//! Most of the low-level synchronization primitives are quite error-prone and +//! inconvenient to use, which is why async-std also exposes some +//! higher-level synchronization objects. +//! +//! These abstractions can be built out of lower-level primitives. +//! For efficiency, the sync objects in async-std are usually +//! implemented with help from the scheduler, which is +//! able to reschedule the tasks while they are blocked on acquiring +//! a lock. +//! +//! The following is an overview of the available synchronization +//! objects: +//! +//! - [`Arc`]: Atomically Reference-Counted pointer, which can be used +//! in multithreaded environments to prolong the lifetime of some +//! data until all the threads have finished using it. +//! +//! - [`Barrier`]: Ensures multiple threads will wait for each other +//! to reach a point in the program, before continuing execution all +//! together. +//! +//! - [`channel`]: Multi-producer, multi-consumer queues, used for +//! message-based communication. Can provide a lightweight +//! inter-task synchronisation mechanism, at the cost of some +//! extra memory. +//! +//! - [`Mutex`]: Mutual Exclusion mechanism, which ensures that at +//! most one task at a time is able to access some data. +//! +//! - [`RwLock`]: Provides a mutual exclusion mechanism which allows +//! multiple readers at the same time, while allowing only one +//! writer at a time. In some cases, this can be more efficient than +//! a mutex. +//! +//! [`Arc`]: crate::sync::Arc +//! [`Barrier`]: crate::sync::Barrier +//! [`Condvar`]: crate::sync::Condvar +//! [`channel`]: fn.channel.html +//! [`Mutex`]: crate::sync::Mutex +//! [`Once`]: crate::sync::Once +//! [`RwLock`]: crate::sync::RwLock +//! //! # Examples //! //! Spawn a task that updates an integer protected by a mutex: From b3ae6f2b03216ca88eca503d2834f0b1e2c9ce7f Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 28 Oct 2019 13:00:25 +0100 Subject: [PATCH 15/31] update Stream::fuse docs Signed-off-by: Yoshua Wuyts --- src/stream/stream/fuse.rs | 3 +-- src/stream/stream/mod.rs | 8 +++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 39af9cb..6297bef 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -6,8 +6,7 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { - /// A `Stream` that is permanently closed once a single call to `poll` results in - /// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`. + /// A stream that yields `None` forever after the underlying stream yields `None` once. /// /// This `struct` is created by the [`fuse`] method on [`Stream`]. See its /// documentation for more. diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 2b237de..f8640c8 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -501,9 +501,11 @@ extension_trait! { } #[doc = r#" - Transforms this `Stream` into a "fused" `Stream` such that after the first time - `poll` returns `Poll::Ready(None)`, all future calls to `poll` will also return - `Poll::Ready(None)`. + Creates a stream which ends after the first `None`. + + After a stream returns `None`, future calls may or may not yield `Some(T)` again. + `fuse()` adapts an iterator, ensuring that after a `None` is given, it will always + return `None` forever. # Examples From eb081b1948edf525ce459ef560eb4b13f8d49600 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 29 Oct 2019 10:23:54 +0100 Subject: [PATCH 16/31] Apply suggestions from code review Co-Authored-By: Florian Gilcher --- src/stream/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a95e918..6db0dbe 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -156,7 +156,7 @@ //! //! # while let Loops and IntoStream //! -//! Rust's `while let` loop syntax is actually sugar for streams. Here's a basic +//! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic //! example of `while let`: //! //! ``` @@ -191,7 +191,7 @@ //! # Adapters //! //! Functions which take an [`Stream`] and return another [`Stream`] are -//! often called 'stream adapters', as they're a form of the 'adapter +//! often called 'stream adapters', as they are a form of the 'adapter //! pattern'. //! //! Common stream adapters include [`map`], [`take`], and [`filter`]. From 3a06a1211b0f8787854d32e3cf5eb0d8fdd769c8 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 29 Oct 2019 10:56:33 +0100 Subject: [PATCH 17/31] Add feedback from review Signed-off-by: Yoshua Wuyts --- src/sync/mod.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 0fe7322..d10e6bd 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -6,6 +6,9 @@ //! //! ## The need for synchronization //! +//! async-std's sync primitives are scheduler-aware, making it possible to +//! `.await` their operations - for example the locking of a [`Mutex`]. +//! //! Conceptually, a Rust program is a series of operations which will //! be executed on a computer. The timeline of events happening in the //! program is consistent with the order of the operations in the code. From b3d1fa9c98363c5dbc180e78407f4b29027f5fc4 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 28 Oct 2019 11:33:40 +0100 Subject: [PATCH 18/31] v0.99.11 Signed-off-by: Yoshua Wuyts --- CHANGELOG.md | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 2 +- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f0e735a..19af02e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,54 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview ## [Unreleased] +# [0.99.11] - 2019-10-29 + +This patch introduces `async_std::sync::channel`, a novel asynchronous port of +the ultra-fast Crossbeam channels. This has been one of the most anticipated +features for async-std, and we're excited to be providing a first version of +this! + +In addition to channels, this patch has the regular list of new methods, types, +and doc fixes. + +## Examples + +__Send and receive items from a channel__ +```rust +// Create a bounded channel with a max-size of 1 +let (s, r) = channel(1); + +// This call returns immediately because there is enough space in the channel. +s.send(1).await; + +task::spawn(async move { + // This call blocks the current task because the channel is full. + // It will be able to complete only after the first message is received. + s.send(2).await; +}); + +// Receive items from the channel +task::sleep(Duration::from_secs(1)).await; +assert_eq!(r.recv().await, Some(1)); +assert_eq!(r.recv().await, Some(2)); +``` + +## Added +- Added `sync::channel` as "unstable". +- Added doc links from instantiated structs to the methods that create them. +- Implemented `Extend` + `FromStream` for `PathBuf`. + +## Changed +- Fixed an issue with `block_on` so it works even when nested. +- Fixed issues with our Clippy check on CI. +- Replaced our uses of `cfg_if` with our own macros, simplifying the codebase. +- Updated the homepage link in `Cargo.toml` to point to [async.rs](https://async.rs). +- Updated the module-level documentation for `stream` and `sync`. +- Various typos and grammar fixes. + +## Removed +Nothing was removed in this release. + # [0.99.10] - 2019-10-16 This patch stabilizes several core concurrency macros, introduces async versions @@ -281,7 +329,8 @@ task::blocking(async { - Initial beta release -[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.10...HEAD +[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.11...HEAD +[0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11 [0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.9...v0.99.10 [0.99.9]: https://github.com/async-rs/async-std/compare/v0.99.8...v0.99.9 [0.99.8]: https://github.com/async-rs/async-std/compare/v0.99.7...v0.99.8 diff --git a/Cargo.toml b/Cargo.toml index ad88730..ab74dd0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-std" -version = "0.99.10" +version = "0.99.11" authors = [ "Stjepan Glavina ", "Yoshua Wuyts ", From b10930207cde0afc6821521d7b1bdd2374b5398d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 29 Oct 2019 00:44:07 +0100 Subject: [PATCH 19/31] more Signed-off-by: Yoshua Wuyts --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 19af02e..c34fa24 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,9 @@ assert_eq!(r.recv().await, Some(2)); - Added `sync::channel` as "unstable". - Added doc links from instantiated structs to the methods that create them. - Implemented `Extend` + `FromStream` for `PathBuf`. +- Added `Stream::sum` as "unstable" +- Added `Stream::product` as "unstable" +- Added `Stream::timeout` as "unstable" ## Changed - Fixed an issue with `block_on` so it works even when nested. @@ -51,6 +54,7 @@ assert_eq!(r.recv().await, Some(2)); - Updated the homepage link in `Cargo.toml` to point to [async.rs](https://async.rs). - Updated the module-level documentation for `stream` and `sync`. - Various typos and grammar fixes. +- Removed redundant file flushes, improving the performance of `File` operations ## Removed Nothing was removed in this release. From 2adaaa9d3f1fadad44ec58be8af72cd0d839054f Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 29 Oct 2019 02:24:14 +0100 Subject: [PATCH 20/31] more updates Signed-off-by: Yoshua Wuyts --- CHANGELOG.md | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c34fa24..eda6038 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -40,12 +40,17 @@ assert_eq!(r.recv().await, Some(2)); ``` ## Added +- Added `Future::delay` as "unstable" +- Added `Stream::flat_map` as "unstable" +- Added `Stream::flatten` as "unstable" +- Added `Stream::product` as "unstable" +- Added `Stream::sum` as "unstable" +- Added `Stream::min_by_key` +- Added `Stream::max_by` +- Added `Stream::timeout` as "unstable" - Added `sync::channel` as "unstable". - Added doc links from instantiated structs to the methods that create them. - Implemented `Extend` + `FromStream` for `PathBuf`. -- Added `Stream::sum` as "unstable" -- Added `Stream::product` as "unstable" -- Added `Stream::timeout` as "unstable" ## Changed - Fixed an issue with `block_on` so it works even when nested. From b942d0a40580e1df63ddcbf7505df0fe625c5a77 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Tue, 29 Oct 2019 21:44:56 +0800 Subject: [PATCH 21/31] add stream-min --- src/stream/stream/min.rs | 60 ++++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 37 +++++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/stream/stream/min.rs diff --git a/src/stream/stream/min.rs b/src/stream/stream/min.rs new file mode 100644 index 0000000..1ab5606 --- /dev/null +++ b/src/stream/stream/min.rs @@ -0,0 +1,60 @@ +use std::marker::PhantomData; +use std::cmp::{Ordering, Ord}; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct MinFuture { + #[pin] + stream: S, + _compare: PhantomData, + min: Option, + } +} + +impl MinFuture { + pub(super) fn new(stream: S) -> Self { + Self { + stream, + _compare: PhantomData, + min: None, + } + } +} + +impl Future for MinFuture +where + S: Stream, + S::Item: Ord, + F: FnMut(&S::Item, &S::Item) -> Ordering, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(new) => { + cx.waker().wake_by_ref(); + match this.min.take() { + None => *this.min = Some(new), + + Some(old) => match new.cmp(&old) { + Ordering::Less => *this.min = Some(new), + _ => *this.min = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(this.min.take()), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e819038..27090a5 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -41,6 +41,7 @@ mod le; mod lt; mod map; mod max_by; +mod min; mod min_by; mod min_by_key; mod next; @@ -71,6 +72,7 @@ use last::LastFuture; use le::LeFuture; use lt::LtFuture; use max_by::MaxByFuture; +use min::MinFuture; use min_by::MinByFuture; use min_by_key::MinByKeyFuture; use next::NextFuture; @@ -753,6 +755,41 @@ extension_trait! { self, compare: F, ) -> impl Future> [MinByFuture] + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + MinByFuture::new(self, compare) + } + + #[doc = r#" + Returns the element that gives the minimum value. If several elements are equally minimum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + + let min = s.clone().min().await; + assert_eq!(min, Some(1)); + + let min = VecDeque::::new().min().await; + assert_eq!(min, None); + # + # }) } + ``` + "#] + fn min_by( + self, + compare: F, + ) -> impl Future> [MinByFuture] where Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering, From 021862dcc88e6bdda67f010cd0d127e741efae1e Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Tue, 29 Oct 2019 21:49:30 +0800 Subject: [PATCH 22/31] fix min --- src/stream/stream/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 27090a5..5c42989 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -786,15 +786,14 @@ extension_trait! { # }) } ``` "#] - fn min_by( + fn min( self, - compare: F, - ) -> impl Future> [MinByFuture] + ) -> impl Future> [MinFuture] where Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering, { - MinByFuture::new(self, compare) + MinFuture::new(self) } #[doc = r#" From 3620b2b6abcc42ef1955803805a2ffd322bd61ef Mon Sep 17 00:00:00 2001 From: k-nasa Date: Wed, 30 Oct 2019 09:17:12 +0900 Subject: [PATCH 23/31] fix: Add only rustfmt on Checking fmt and docs actions --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 653834a..dd8ec89 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,8 +59,10 @@ jobs: - uses: actions-rs/toolchain@v1 with: + profile: minimal toolchain: ${{ steps.component.outputs.toolchain }} override: true + components: rustfmt - name: setup run: | From ff6a44fcd5e6b122ca42ae7563b7d155bcde6f66 Mon Sep 17 00:00:00 2001 From: Wu Yu Wei Date: Wed, 30 Oct 2019 19:23:08 +0800 Subject: [PATCH 24/31] Use once_cell instead of lazy_static (#416) `once_cell` provides a neat way of initializing lazy singletons without macro. This PR use `sync::Lazy` to streamline same pattern proposed in related rust RFC. Resolve #406 --- Cargo.toml | 2 +- src/net/driver/mod.rs | 34 ++++++++++++++++------------------ src/task/blocking.rs | 42 +++++++++++++++++++++--------------------- src/task/pool.rs | 40 +++++++++++++++++++--------------------- src/task/task_local.rs | 6 ++---- 5 files changed, 59 insertions(+), 65 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ab74dd0..dcf2c7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,12 +33,12 @@ crossbeam-utils = "0.6.6" futures-core-preview = "=0.3.0-alpha.19" futures-io-preview = "=0.3.0-alpha.19" futures-timer = "1.0.2" -lazy_static = "1.4.0" log = { version = "0.4.8", features = ["kv_unstable"] } memchr = "2.2.1" mio = "0.6.19" mio-uds = "0.6.7" num_cpus = "1.10.1" +once_cell = "1.2.0" pin-utils = "0.1.0-alpha.4" slab = "0.4.2" kv-log-macro = "1.0.4" diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index 806acdb..40e0abb 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -1,8 +1,8 @@ use std::fmt; use std::sync::{Arc, Mutex}; -use lazy_static::lazy_static; use mio::{self, Evented}; +use once_cell::sync::Lazy; use slab::Slab; use crate::io; @@ -100,25 +100,23 @@ impl Reactor { // } } -lazy_static! { - /// The state of the global networking driver. - static ref REACTOR: Reactor = { - // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O - // handles. - std::thread::Builder::new() - .name("async-net-driver".to_string()) - .spawn(move || { - // If the driver thread panics, there's not much we can do. It is not a - // recoverable error and there is no place to propagate it into so we just abort. - abort_on_panic(|| { - main_loop().expect("async networking thread has panicked"); - }) +/// The state of the global networking driver. +static REACTOR: Lazy = Lazy::new(|| { + // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O + // handles. + std::thread::Builder::new() + .name("async-net-driver".to_string()) + .spawn(move || { + // If the driver thread panics, there's not much we can do. It is not a + // recoverable error and there is no place to propagate it into so we just abort. + abort_on_panic(|| { + main_loop().expect("async networking thread has panicked"); }) - .expect("cannot start a thread driving blocking tasks"); + }) + .expect("cannot start a thread driving blocking tasks"); - Reactor::new().expect("cannot initialize reactor") - }; -} + Reactor::new().expect("cannot initialize reactor") +}); /// Waits on the poller for new events and wakes up tasks blocked on I/O handles. fn main_loop() -> io::Result<()> { diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 3216012..1f1a222 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -5,7 +5,7 @@ use std::thread; use std::time::Duration; use crossbeam_channel::{bounded, Receiver, Sender}; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use crate::task::task::{JoinHandle, Tag}; use crate::utils::abort_on_panic; @@ -19,30 +19,30 @@ struct Pool { receiver: Receiver>, } -lazy_static! { - static ref POOL: Pool = { - for _ in 0..2 { - thread::Builder::new() - .name("async-blocking-driver".to_string()) - .spawn(|| abort_on_panic(|| { +static POOL: Lazy = Lazy::new(|| { + for _ in 0..2 { + thread::Builder::new() + .name("async-blocking-driver".to_string()) + .spawn(|| { + abort_on_panic(|| { for task in &POOL.receiver { task.run(); } - })) - .expect("cannot start a thread driving blocking tasks"); - } + }) + }) + .expect("cannot start a thread driving blocking tasks"); + } - // We want to use an unbuffered channel here to help - // us drive our dynamic control. In effect, the - // kernel's scheduler becomes the queue, reducing - // the number of buffers that work must flow through - // before being acted on by a core. This helps keep - // latency snappy in the overall async system by - // reducing bufferbloat. - let (sender, receiver) = bounded(0); - Pool { sender, receiver } - }; -} + // We want to use an unbuffered channel here to help + // us drive our dynamic control. In effect, the + // kernel's scheduler becomes the queue, reducing + // the number of buffers that work must flow through + // before being acted on by a core. This helps keep + // latency snappy in the overall async system by + // reducing bufferbloat. + let (sender, receiver) = bounded(0); + Pool { sender, receiver } +}); // Create up to MAX_THREADS dynamic blocking task worker threads. // Dynamic threads will terminate themselves if they don't diff --git a/src/task/pool.rs b/src/task/pool.rs index bfaa17d..3fd7047 100644 --- a/src/task/pool.rs +++ b/src/task/pool.rs @@ -3,7 +3,7 @@ use std::thread; use crossbeam_deque::{Injector, Stealer, Worker}; use kv_log_macro::trace; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use super::sleepers::Sleepers; use super::task; @@ -111,28 +111,26 @@ impl Pool { #[inline] pub(crate) fn get() -> &'static Pool { - lazy_static! { - static ref POOL: Pool = { - let num_threads = num_cpus::get().max(1); - let mut stealers = Vec::new(); + static POOL: Lazy = Lazy::new(|| { + let num_threads = num_cpus::get().max(1); + let mut stealers = Vec::new(); - // Spawn worker threads. - for _ in 0..num_threads { - let worker = Worker::new_fifo(); - stealers.push(worker.stealer()); + // Spawn worker threads. + for _ in 0..num_threads { + let worker = Worker::new_fifo(); + stealers.push(worker.stealer()); - thread::Builder::new() - .name("async-task-driver".to_string()) - .spawn(|| abort_on_panic(|| worker::main_loop(worker))) - .expect("cannot start a thread driving tasks"); - } + thread::Builder::new() + .name("async-task-driver".to_string()) + .spawn(|| abort_on_panic(|| worker::main_loop(worker))) + .expect("cannot start a thread driving tasks"); + } - Pool { - injector: Injector::new(), - stealers, - sleepers: Sleepers::new(), - } - }; - } + Pool { + injector: Injector::new(), + stealers, + sleepers: Sleepers::new(), + } + }); &*POOL } diff --git a/src/task/task_local.rs b/src/task/task_local.rs index c72937f..e92f4f9 100644 --- a/src/task/task_local.rs +++ b/src/task/task_local.rs @@ -5,7 +5,7 @@ use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use super::worker; use crate::utils::abort_on_panic; @@ -174,9 +174,7 @@ impl LocalKey { fn key(&self) -> usize { #[cold] fn init(key: &AtomicUsize) -> usize { - lazy_static! { - static ref COUNTER: Mutex = Mutex::new(1); - } + static COUNTER: Lazy> = Lazy::new(|| Mutex::new(1)); let mut counter = COUNTER.lock().unwrap(); let prev = key.compare_and_swap(0, *counter, Ordering::AcqRel); From 5fee91c0502cff2618649210928c50c116474d7a Mon Sep 17 00:00:00 2001 From: JayatiGoyal <44127709+JayatiGoyal@users.noreply.github.com> Date: Thu, 31 Oct 2019 00:36:42 +0530 Subject: [PATCH 25/31] corrected a typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 7aeaed8..9af20a3 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ syntax. ## Features - __Modern:__ Built from the ground up for `std::future` and `async/await` with - blazing fast compilation times. + blazing fast compilation time. - __Fast:__ Our robust allocator and threadpool designs provide ultra-high throughput with predictably low latency. - __Intuitive:__ Complete parity with the stdlib means you only need to learn From f5efaaa7ba82e6a0707a82ffa6cda499fdb6d694 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Thu, 31 Oct 2019 14:44:19 +0800 Subject: [PATCH 26/31] Add stream eq --- src/stream/stream/eq.rs | 61 ++++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 38 +++++++++++++++++++++++++ 2 files changed, 99 insertions(+) create mode 100644 src/stream/stream/eq.rs diff --git a/src/stream/stream/eq.rs b/src/stream/stream/eq.rs new file mode 100644 index 0000000..42a37d8 --- /dev/null +++ b/src/stream/stream/eq.rs @@ -0,0 +1,61 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + // Lexicographically compares the elements of this `Stream` with those + // of another. + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct EqFuture { + #[pin] + l: Fuse, + #[pin] + r: Fuse, + } +} + +impl EqFuture +where + L::Item: PartialEq, +{ + pub(super) fn new(l: L, r: R) -> Self { + EqFuture { + l: l.fuse(), + r: r.fuse(), + } + } +} + +impl Future for EqFuture + where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx)); + let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx)); + + if this.l.done && this.r.done { + return Poll::Ready(true); + } + + match (l_val, r_val) { + (Some(l), Some(r)) if l != r => {return Poll::Ready(false);}, + _ => {}, + } + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e819038..07cd03a 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -26,6 +26,7 @@ mod any; mod chain; mod cmp; mod enumerate; +mod eq; mod filter; mod filter_map; mod find; @@ -60,6 +61,7 @@ use all::AllFuture; use any::AnyFuture; use cmp::CmpFuture; use enumerate::Enumerate; +use eq::EqFuture; use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture; @@ -1622,6 +1624,42 @@ extension_trait! { GeFuture::new(self, other) } + #[doc = r#" + Determines if the elements of this `Stream` are lexicographically + equal to those of another. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let single: VecDeque = vec![1].into_iter().collect(); + let single_eq: VecDeque = vec![10].into_iter().collect(); + let multi: VecDeque = vec![1,2].into_iter().collect(); + let multi_eq: VecDeque = vec![1,5].into_iter().collect(); + assert_eq!(single.clone().eq(single.clone()).await, true); + assert_eq!(single_eq.clone().eq(single.clone()).await, false); + assert_eq!(multi.clone().eq(single_eq.clone()).await, false); + assert_eq!(multi_eq.clone().eq(multi.clone()).await, false); + # + # }) } + ``` + "#] + fn eq( + self, + other: S + ) -> impl Future [EqFuture] + where + Self: Sized + Stream, + S: Sized + Stream, + ::Item: PartialEq, + { + EqFuture::new(self, other) + } + #[doc = r#" Determines if the elements of this `Stream` are lexicographically greater than those of another. From 17db7ffcd35e4c7e350adba6e3f39daddce52536 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Thu, 31 Oct 2019 18:05:51 +0800 Subject: [PATCH 27/31] Add stream ne --- src/stream/stream/mod.rs | 35 +++++++++++++++++++++++ src/stream/stream/ne.rs | 62 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 src/stream/stream/ne.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e819038..b8fac7e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -43,6 +43,7 @@ mod map; mod max_by; mod min_by; mod min_by_key; +mod ne; mod next; mod nth; mod partial_cmp; @@ -73,6 +74,7 @@ use lt::LtFuture; use max_by::MaxByFuture; use min_by::MinByFuture; use min_by_key::MinByKeyFuture; +use ne::NeFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; @@ -1586,6 +1588,39 @@ extension_trait! { CmpFuture::new(self, other) } + #[doc = r#" + Determines if the elements of this `Stream` are lexicographically + not equal to those of another. + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + let single: VecDeque = vec![1].into_iter().collect(); + let single_ne: VecDeque = vec![10].into_iter().collect(); + let multi: VecDeque = vec![1,2].into_iter().collect(); + let multi_ne: VecDeque = vec![1,5].into_iter().collect(); + assert_eq!(single.clone().ne(single.clone()).await, false); + assert_eq!(single_ne.clone().ne(single.clone()).await, true); + assert_eq!(multi.clone().ne(single_ne.clone()).await, true); + assert_eq!(multi_ne.clone().ne(multi.clone()).await, true); + # + # }) } + ``` + "#] + fn ne( + self, + other: S + ) -> impl Future [NeFuture] + where + Self: Sized + Stream, + S: Sized + Stream, + ::Item: PartialEq, + { + NeFuture::new(self, other) + } + #[doc = r#" Determines if the elements of this `Stream` are lexicographically greater than or equal to those of another. diff --git a/src/stream/stream/ne.rs b/src/stream/stream/ne.rs new file mode 100644 index 0000000..2f17ed0 --- /dev/null +++ b/src/stream/stream/ne.rs @@ -0,0 +1,62 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + // Lexicographically compares the elements of this `Stream` with those + // of another. + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct NeFuture { + #[pin] + l: Fuse, + #[pin] + r: Fuse, + } +} + +impl NeFuture + where + L::Item: PartialEq, +{ + pub(super) fn new(l: L, r: R) -> Self { + Self { + l: l.fuse(), + r: r.fuse(), + } + } +} + +impl Future for NeFuture + where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx)); + let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx)); + + if this.l.done || this.r.done { + return Poll::Ready(false); + } + + match (l_val, r_val) { + (Some(l), Some(r)) if l == r => {continue;}, + _ => { return Poll::Ready(true); }, + } + + } + } +} \ No newline at end of file From 204da3339152596667ff6e6872afda73c997f517 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Thu, 31 Oct 2019 21:16:13 +0800 Subject: [PATCH 28/31] fmt code --- src/stream/stream/ne.rs | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/stream/stream/ne.rs b/src/stream/stream/ne.rs index 2f17ed0..ffeaca8 100644 --- a/src/stream/stream/ne.rs +++ b/src/stream/stream/ne.rs @@ -22,8 +22,8 @@ pin_project! { } impl NeFuture - where - L::Item: PartialEq, +where + L::Item: PartialEq, { pub(super) fn new(l: L, r: R) -> Self { Self { @@ -34,10 +34,10 @@ impl NeFuture } impl Future for NeFuture - where - L: Stream + Sized, - R: Stream + Sized, - L::Item: PartialEq, +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, { type Output = bool; @@ -53,10 +53,13 @@ impl Future for NeFuture } match (l_val, r_val) { - (Some(l), Some(r)) if l == r => {continue;}, - _ => { return Poll::Ready(true); }, + (Some(l), Some(r)) if l == r => { + continue; + } + _ => { + return Poll::Ready(true); + } } - } } -} \ No newline at end of file +} From 1ab3d901e42fb7c2e9b44303c2b4cdf5116d68b9 Mon Sep 17 00:00:00 2001 From: yjhmelody <465402634@qq.com> Date: Thu, 31 Oct 2019 21:17:07 +0800 Subject: [PATCH 29/31] fmt code --- src/stream/stream/eq.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/stream/stream/eq.rs b/src/stream/stream/eq.rs index 42a37d8..5343c1a 100644 --- a/src/stream/stream/eq.rs +++ b/src/stream/stream/eq.rs @@ -34,10 +34,10 @@ where } impl Future for EqFuture - where - L: Stream + Sized, - R: Stream + Sized, - L::Item: PartialEq, +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, { type Output = bool; @@ -53,8 +53,10 @@ impl Future for EqFuture } match (l_val, r_val) { - (Some(l), Some(r)) if l != r => {return Poll::Ready(false);}, - _ => {}, + (Some(l), Some(r)) if l != r => { + return Poll::Ready(false); + } + _ => {} } } } From 48c82a9668ec3f18246d7cb5066b72f1e5e3133d Mon Sep 17 00:00:00 2001 From: zhangguyu Date: Thu, 31 Oct 2019 22:33:17 +0800 Subject: [PATCH 30/31] Add stream position --- src/stream/stream/mod.rs | 41 ++++++++++++++++++++++++++++ src/stream/stream/position.rs | 51 +++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) create mode 100644 src/stream/stream/position.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e819038..c4abe32 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -46,6 +46,7 @@ mod min_by_key; mod next; mod nth; mod partial_cmp; +mod position; mod scan; mod skip; mod skip_while; @@ -76,6 +77,7 @@ use min_by_key::MinByKeyFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; +use position::PositionFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; @@ -1548,6 +1550,45 @@ extension_trait! { PartialCmpFuture::new(self, other) } + #[doc = r#" + Searches for an element in a Stream that satisfies a predicate, returning + its index. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let res = s.clone().position(|x| *x == 1).await; + assert_eq!(res, Some(0)); + + let res = s.clone().position(|x| *x == 2).await; + assert_eq!(res, Some(1)); + + let res = s.clone().position(|x| *x == 3).await; + assert_eq!(res, Some(2)); + + let res = s.clone().position(|x| *x == 4).await; + assert_eq!(res, None); + # + # }) } + ``` + "#] + fn position

( + self, + predicate: P + ) -> impl Future> [PositionFuture] + where + Self: Sized + Stream, + P: FnMut(&Self::Item) -> bool, + { + PositionFuture::new(self, predicate) + } + #[doc = r#" Lexicographically compares the elements of this `Stream` with those of another using 'Ord'. diff --git a/src/stream/stream/position.rs b/src/stream/stream/position.rs new file mode 100644 index 0000000..3cd5b84 --- /dev/null +++ b/src/stream/stream/position.rs @@ -0,0 +1,51 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct PositionFuture { + #[pin] + stream: S, + predicate: P, + index:usize, + } +} + +impl PositionFuture { + pub(super) fn new(stream: S, predicate: P) -> Self { + PositionFuture { + stream, + predicate, + index: 0, + } + } +} + +impl Future for PositionFuture +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)), + Some(_) => { + cx.waker().wake_by_ref(); + *this.index += 1; + Poll::Pending + } + None => Poll::Ready(None), + } + } +} From 07d21e5eb37e6ddc2a1819dd0d27b83338f21299 Mon Sep 17 00:00:00 2001 From: zhangguyu Date: Thu, 31 Oct 2019 23:30:11 +0800 Subject: [PATCH 31/31] change trait bounds --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index c4abe32..0469c7a 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1583,7 +1583,7 @@ extension_trait! { predicate: P ) -> impl Future> [PositionFuture] where - Self: Sized + Stream, + Self: Sized, P: FnMut(&Self::Item) -> bool, { PositionFuture::new(self, predicate)