Merge remote-tracking branch 'upstream/master'

poc-serde-support
Abhishek C. Sharma 5 years ago
commit e36172e808

@ -56,3 +56,11 @@ futures-preview = { version = "=0.3.0-alpha.19", features = ["async-await"] }
# These are used by the book for examples
futures-channel-preview = "=0.3.0-alpha.19"
futures-util-preview = "=0.3.0-alpha.19"
[[test]]
name = "stream"
required-features = ["unstable"]
[[example]]
name = "tcp-ipv4-and-6-echo"
required-features = ["unstable"]

@ -1,12 +1,16 @@
cfg_unstable! {
mod delay;
mod flatten;
mod race;
mod try_race;
use std::time::Duration;
use delay::DelayFuture;
use flatten::FlattenFuture;
use crate::future::IntoFuture;
use race::Race;
use try_race::TryRace;
}
extension_trait! {
@ -156,6 +160,94 @@ extension_trait! {
{
FlattenFuture::new(self)
}
#[doc = r#"
Waits for one of two similarly-typed futures to complete.
Awaits multiple futures simultaneously, returning the output of the
first future that completes.
This function will return a new future which awaits for either one of both
futures to complete. If multiple futures are completed at the same time,
resolution will occur in the order that they have been passed.
Note that this macro consumes all futures passed, and once a future is
completed, all other futures are dropped.
This macro is only usable inside of async functions, closures, and blocks.
# Examples
```
# async_std::task::block_on(async {
use async_std::prelude::*;
use async_std::future;
let a = future::pending();
let b = future::ready(1u8);
let c = future::ready(2u8);
let f = a.race(b).race(c);
assert_eq!(f.await, 1u8);
# });
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn race<F>(
self,
other: F
) -> impl Future<Output = <Self as std::future::Future>::Output> [Race<Self, F>]
where
Self: std::future::Future + Sized,
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
{
Race::new(self, other)
}
#[doc = r#"
Waits for one of two similarly-typed fallible futures to complete.
Awaits multiple futures simultaneously, returning all results once complete.
`try_race` is similar to [`race`], but keeps going if a future
resolved to an error until all futures have been resolved. In which case
an error is returned.
The ordering of which value is yielded when two futures resolve
simultaneously is intentionally left unspecified.
# Examples
```
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::future;
use std::io::{Error, ErrorKind};
let a = future::pending::<Result<_, Error>>();
let b = future::ready(Err(Error::from(ErrorKind::Other)));
let c = future::ready(Ok(1u8));
let f = a.try_race(b).try_race(c);
assert_eq!(f.await?, 1u8);
#
# Ok(()) }) }
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn try_race<F: std::future::Future, T, E>(
self,
other: F
) -> impl Future<Output = <Self as std::future::Future>::Output> [TryRace<Self, F>]
where
Self: std::future::Future<Output = Result<T, E>> + Sized,
F: std::future::Future<Output = <Self as std::future::Future>::Output>,
{
TryRace::new(self, other)
}
}
impl<F: Future + Unpin + ?Sized> Future for Box<F> {

@ -0,0 +1,57 @@
use std::pin::Pin;
use async_macros::MaybeDone;
use pin_project_lite::pin_project;
use crate::task::{Context, Poll};
use std::future::Future;
pin_project! {
#[allow(missing_docs)]
#[allow(missing_debug_implementations)]
pub struct Race<L, R>
where
L: Future,
R: Future<Output = L::Output>
{
#[pin] left: MaybeDone<L>,
#[pin] right: MaybeDone<R>,
}
}
impl<L, R> Race<L, R>
where
L: Future,
R: Future<Output = L::Output>,
{
pub(crate) fn new(left: L, right: R) -> Self {
Self {
left: MaybeDone::new(left),
right: MaybeDone::new(right),
}
}
}
impl<L, R> Future for Race<L, R>
where
L: Future,
R: Future<Output = L::Output>,
{
type Output = L::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut left = this.left;
if Future::poll(Pin::new(&mut left), cx).is_ready() {
return Poll::Ready(left.take().unwrap());
}
let mut right = this.right;
if Future::poll(Pin::new(&mut right), cx).is_ready() {
return Poll::Ready(right.take().unwrap());
}
Poll::Pending
}
}

@ -0,0 +1,66 @@
use std::pin::Pin;
use async_macros::MaybeDone;
use pin_project_lite::pin_project;
use crate::task::{Context, Poll};
use std::future::Future;
pin_project! {
#[allow(missing_docs)]
#[allow(missing_debug_implementations)]
pub struct TryRace<L, R>
where
L: Future,
R: Future<Output = L::Output>
{
#[pin] left: MaybeDone<L>,
#[pin] right: MaybeDone<R>,
}
}
impl<L, R> TryRace<L, R>
where
L: Future,
R: Future<Output = L::Output>,
{
pub(crate) fn new(left: L, right: R) -> Self {
Self {
left: MaybeDone::new(left),
right: MaybeDone::new(right),
}
}
}
impl<L, R, T, E> Future for TryRace<L, R>
where
L: Future<Output = Result<T, E>>,
R: Future<Output = L::Output>,
{
type Output = L::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut left_errored = false;
// Check if the left future is ready & successful. Continue if not.
let mut left = this.left;
if Future::poll(Pin::new(&mut left), cx).is_ready() {
if left.as_ref().output().unwrap().is_ok() {
return Poll::Ready(left.take().unwrap());
} else {
left_errored = true;
}
}
// Check if the right future is ready & successful. Return err if left
// future also resolved to err. Continue if not.
let mut right = this.right;
let is_ready = Future::poll(Pin::new(&mut right), cx).is_ready();
if is_ready && (right.as_ref().output().unwrap().is_ok() || left_errored) {
return Poll::Ready(right.take().unwrap());
}
Poll::Pending
}
}

@ -4,16 +4,16 @@
//!
//! Often it's desireable to await multiple futures as if it was a single
//! future. The `join` family of operations converts multiple futures into a
//! single future that returns all of their outputs. The `select` family of
//! single future that returns all of their outputs. The `race` family of
//! operations converts multiple future into a single future that returns the
//! first output.
//!
//! For operating on futures the following macros can be used:
//!
//! | Name | Return signature | When does it return? |
//! | --- | --- | --- |
//! | `future::join` | `(T1, T2)` | Wait for all to complete
//! | `future::select` | `T` | Return on first value
//! | Name | Return signature | When does it return? |
//! | --- | --- | --- |
//! | [`future::join!`] | `(T1, T2)` | Wait for all to complete
//! | [`Future::race`] | `T` | Return on first value
//!
//! ## Fallible Futures Concurrency
//!
@ -25,21 +25,26 @@
//! futures are dropped and an error is returned. This is referred to as
//! "short-circuiting".
//!
//! In the case of `try_select`, instead of returning the first future that
//! In the case of `try_race`, instead of returning the first future that
//! completes it returns the first future that _successfully_ completes. This
//! means `try_select` will keep going until any one of the futures returns
//! means `try_race` will keep going until any one of the futures returns
//! `Ok`, or _all_ futures have returned `Err`.
//!
//! However sometimes it can be useful to use the base variants of the macros
//! even on futures that return `Result`. Here is an overview of operations that
//! work on `Result`, and their respective semantics:
//!
//! | Name | Return signature | When does it return? |
//! | --- | --- | --- |
//! | `future::join` | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
//! | `future::try_join` | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
//! | `future::select` | `Result<T, E>` | Return on first value
//! | `future::try_select` | `Result<T, E>` | Return on first `Ok`, reject on last Err
//! | Name | Return signature | When does it return? |
//! | --- | --- | --- |
//! | [`future::join!`] | `(Result<T, E>, Result<T, E>)` | Wait for all to complete
//! | [`future::try_join!`] | `Result<(T1, T2), E>` | Return on first `Err`, wait for all to complete
//! | [`Future::race`] | `Result<T, E>` | Return on first value
//! | [`Future::try_race`] | `Result<T, E>` | Return on first `Ok`, reject on last Err
//!
//! [`future::join!`]: macro.join.html
//! [`future::try_join!`]: macro.try_join.html
//! [`Future::race`]: trait.Future.html#method.race
//! [`Future::try_race`]: trait.Future.html#method.try_race
#[doc(inline)]
pub use async_macros::{join, try_join};
@ -57,9 +62,6 @@ mod ready;
mod timeout;
cfg_unstable! {
#[doc(inline)]
pub use async_macros::{select, try_select};
pub use into_future::IntoFuture;
mod into_future;
}

@ -282,9 +282,9 @@ pub use read::Read;
pub use repeat::{repeat, Repeat};
pub use seek::Seek;
pub use sink::{sink, Sink};
pub use stderr::{stderr, Stderr, StderrLock};
pub use stdin::{stdin, Stdin, StdinLock};
pub use stdout::{stdout, Stdout, StdoutLock};
pub use stderr::{stderr, Stderr};
pub use stdin::{stdin, Stdin};
pub use stdout::{stdout, Stdout};
pub use timeout::timeout;
pub use write::Write;
@ -311,3 +311,9 @@ mod stdin;
mod stdio;
mod stdout;
mod timeout;
cfg_unstable! {
pub use stderr::StderrLock;
pub use stdin::StdinLock;
pub use stdout::StdoutLock;
}

@ -1,4 +1,3 @@
use std::io::Write as StdWrite;
use std::pin::Pin;
use std::sync::Mutex;
@ -8,6 +7,7 @@ use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
cfg_unstable! {
use once_cell::sync::Lazy;
use std::io::Write as _;
}
/// Constructs a new handle to the standard error of the current process.
@ -59,13 +59,19 @@ pub fn stderr() -> Stderr {
pub struct Stderr(Mutex<State>);
/// A locked reference to the Stderr handle.
/// This handle implements the [`Write`] traits, and is constructed via the [`Stderr::lock`] method.
///
/// This handle implements the [`Write`] traits, and is constructed via the [`Stderr::lock`]
/// method.
///
/// [`Write`]: trait.Read.html
/// [`Stderr::lock`]: struct.Stderr.html#method.lock
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct StderrLock<'a>(std::io::StderrLock<'a>);
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
unsafe impl Send for StderrLock<'_> {}
/// The state of the asynchronous stderr.
@ -234,7 +240,9 @@ cfg_windows! {
}
}
impl Write for StderrLock<'_> {
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
impl io::Write for StderrLock<'_> {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,

@ -7,6 +7,7 @@ use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
cfg_unstable! {
use once_cell::sync::Lazy;
use std::io::Read as _;
}
/// Constructs a new handle to the standard input of the current process.
@ -59,13 +60,18 @@ pub fn stdin() -> Stdin {
pub struct Stdin(Mutex<State>);
/// A locked reference to the Stdin handle.
///
/// This handle implements the [`Read`] traits, and is constructed via the [`Stdin::lock`] method.
///
/// [`Read`]: trait.Read.html
/// [`Stdin::lock`]: struct.Stdin.html#method.lock
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(feature = "unstable")]
#[derive(Debug)]
pub struct StdinLock<'a>(std::io::StdinLock<'a>);
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
unsafe impl Send for StdinLock<'_> {}
/// The state of the asynchronous stdin.
@ -257,14 +263,14 @@ cfg_windows! {
}
}
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
impl Read for StdinLock<'_> {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
use std::io::Read as StdRead;
Poll::Ready(self.0.read(buf))
}
}

@ -1,4 +1,3 @@
use std::io::Write as StdWrite;
use std::pin::Pin;
use std::sync::Mutex;
@ -8,6 +7,7 @@ use crate::task::{spawn_blocking, Context, JoinHandle, Poll};
cfg_unstable! {
use once_cell::sync::Lazy;
use std::io::Write as _;
}
/// Constructs a new handle to the standard output of the current process.
@ -59,13 +59,19 @@ pub fn stdout() -> Stdout {
pub struct Stdout(Mutex<State>);
/// A locked reference to the Stderr handle.
/// This handle implements the [`Write`] traits, and is constructed via the [`Stdout::lock`] method.
///
/// This handle implements the [`Write`] traits, and is constructed via the [`Stdout::lock`]
/// method.
///
/// [`Write`]: trait.Read.html
/// [`Stdout::lock`]: struct.Stdout.html#method.lock
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct StdoutLock<'a>(std::io::StdoutLock<'a>);
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
unsafe impl Send for StdoutLock<'_> {}
/// The state of the asynchronous stdout.
@ -234,6 +240,8 @@ cfg_windows! {
}
}
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
impl Write for StdoutLock<'_> {
fn poll_write(
mut self: Pin<&mut Self>,

@ -20,12 +20,10 @@ where
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let words: VecDeque<_> = vec!["have", "a", "great", "day"]
.into_iter()
.collect();
let words = stream::from_iter(vec!["have", "a", "great", "day"]);
let total: Option<usize> = words.map(|w| w.find('a')).sum().await;
assert_eq!(total, Some(5));
#

@ -20,10 +20,10 @@ where
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
let v = stream::from_iter(vec![1, 2, 4]);
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")

@ -20,10 +20,10 @@ where
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let v: VecDeque<_> = vec![1, 2].into_iter().collect();
let v = stream::from_iter(vec![1, 2]);
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")

@ -12,7 +12,7 @@ pin_project! {
/// See it documentation for more.
///
/// [`from_iter`]: fn.from_iter.html
#[derive(Debug)]
#[derive(Clone, Debug)]
pub struct FromIter<I> {
iter: I,
}

@ -0,0 +1,33 @@
use crate::stream::Stream;
use crate::task::{Context, Poll};
use pin_project_lite::pin_project;
use std::pin::Pin;
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct Copied<S> {
#[pin]
stream: S,
}
}
impl<S> Copied<S> {
pub(super) fn new(stream: S) -> Self {
Copied { stream }
}
}
impl<'a, S, T: 'a> Stream for Copied<S>
where
S: Stream<Item = &'a T>,
T: Copy,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
Poll::Ready(next.copied())
}
}

@ -0,0 +1,60 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MaxByKeyFuture<S, T, K> {
#[pin]
stream: S,
max: Option<T>,
key_by: K,
}
}
impl<S, T, K> MaxByKeyFuture<S, T, K> {
pub(super) fn new(stream: S, key_by: K) -> Self {
Self {
stream,
max: None,
key_by,
}
}
}
impl<S, K> Future for MaxByKeyFuture<S, S::Item, K>
where
S: Stream,
K: FnMut(&S::Item) -> S::Item,
S::Item: Ord,
{
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
let new = (this.key_by)(&new);
cx.waker().wake_by_ref();
match this.max.take() {
None => *this.max = Some(new),
Some(old) => match new.cmp(&old) {
Ordering::Greater => *this.max = Some(new),
_ => *this.max = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(this.max.take()),
}
}
}

@ -4,6 +4,9 @@ use std::task::{Context, Poll};
use futures_core::Stream;
use pin_project_lite::pin_project;
use crate::prelude::*;
use crate::stream::Fuse;
pin_project! {
/// A stream that merges two other streams into a single stream.
///
@ -17,15 +20,15 @@ pin_project! {
#[derive(Debug)]
pub struct Merge<L, R> {
#[pin]
left: L,
left: Fuse<L>,
#[pin]
right: R,
right: Fuse<R>,
}
}
impl<L, R> Merge<L, R> {
impl<L: Stream, R: Stream> Merge<L, R> {
pub(crate) fn new(left: L, right: R) -> Self {
Self { left, right }
Self { left: left.fuse(), right: right.fuse() }
}
}
@ -38,13 +41,14 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if let Poll::Ready(Some(item)) = this.left.poll_next(cx) {
// The first stream made progress. The Merge needs to be polled
// again to check the progress of the second stream.
cx.waker().wake_by_ref();
Poll::Ready(Some(item))
} else {
this.right.poll_next(cx)
match this.left.poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => this.right.poll_next(cx),
Poll::Pending => match this.right.poll_next(cx) {
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
Poll::Ready(None) => Poll::Pending,
Poll::Pending => Poll::Pending,
}
}
}
}

@ -25,6 +25,7 @@ mod all;
mod any;
mod chain;
mod cmp;
mod copied;
mod enumerate;
mod eq;
mod filter;
@ -42,6 +43,7 @@ mod le;
mod lt;
mod map;
mod max_by;
mod max_by_key;
mod min;
mod min_by;
mod min_by_key;
@ -76,6 +78,7 @@ use last::LastFuture;
use le::LeFuture;
use lt::LtFuture;
use max_by::MaxByFuture;
use max_by_key::MaxByKeyFuture;
use min::MinFuture;
use min_by::MinByFuture;
use min_by_key::MinByKeyFuture;
@ -88,6 +91,7 @@ use try_fold::TryFoldFuture;
use try_for_each::TryForEachFuture;
pub use chain::Chain;
pub use copied::Copied;
pub use filter::Filter;
pub use fuse::Fuse;
pub use inspect::Inspect;
@ -276,11 +280,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3, 4].into_iter().collect();
let s = stream::from_iter(vec![1, 2, 3, 4]);
let mut s = s.take_while(|x| x < &3 );
assert_eq!(s.next().await, Some(1));
@ -313,9 +316,9 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect();
let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
let mut stepped = s.step_by(2);
assert_eq!(stepped.next().await, Some(0));
@ -345,10 +348,10 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let first: VecDeque<_> = vec![0u8, 1].into_iter().collect();
let second: VecDeque<_> = vec![2, 3].into_iter().collect();
let first = stream::from_iter(vec![0u8, 1]);
let second = stream::from_iter(vec![2, 3]);
let mut c = first.chain(second);
assert_eq!(c.next().await, Some(0));
@ -369,6 +372,41 @@ extension_trait! {
Chain::new(self, other)
}
#[doc = r#"
Creates an stream which copies all of its elements.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let s = stream::from_iter(vec![&1, &2, &3]);
let second = stream::from_iter(vec![2, 3]);
let mut s_copied = s.copied();
assert_eq!(s_copied.next().await, Some(1));
assert_eq!(s_copied.next().await, Some(2));
assert_eq!(s_copied.next().await, Some(3));
assert_eq!(s_copied.next().await, None);
#
# }) }
```
"#]
fn copied<'a,T>(self) -> Copied<Self>
where
Self: Sized + Stream<Item = &'a T>,
T : 'a + Copy,
{
Copied::new(self)
}
#[doc = r#"
Creates a stream that gives the current element's count as well as the next value.
@ -382,9 +420,9 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let s: VecDeque<_> = vec!['a', 'b', 'c'].into_iter().collect();
let s = stream::from_iter(vec!['a', 'b', 'c']);
let mut s = s.enumerate();
assert_eq!(s.next().await, Some((0, 'a')));
@ -412,9 +450,9 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1, 2, 3]);
let mut s = s.map(|x| 2 * x);
assert_eq!(s.next().await, Some(2));
@ -446,10 +484,11 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let a: VecDeque<_> = vec![1u8, 2, 3, 4, 5].into_iter().collect();
let sum = a
let s = stream::from_iter(vec![1, 2, 3, 4, 5]);
let sum = s
.inspect(|x| println!("about to filter {}", x))
.filter(|x| x % 2 == 0)
.inspect(|x| println!("made it through filter: {}", x))
@ -478,11 +517,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1, 2, 3]);
let last = s.last().await;
assert_eq!(last, Some(3));
@ -494,11 +532,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
use crate::async_std::prelude::*;
let s: VecDeque<usize> = vec![].into_iter().collect();
let s = stream::empty::<()>();
let last = s.last().await;
assert_eq!(last, None);
@ -559,11 +596,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3, 4].into_iter().collect();
let s = stream::from_iter(vec![1, 2, 3, 4]);
let mut s = s.filter(|i| i % 2 == 0);
assert_eq!(s.next().await, Some(2));
@ -591,14 +627,14 @@ extension_trait! {
```
# async_std::task::block_on(async {
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream::IntoStream;
use async_std::stream;
let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
let inner1 = stream::from_iter(vec![1,2,3]);
let inner2 = stream::from_iter(vec![4,5,6]);
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();
let s = stream::from_iter(vec![inner1, inner2]);
let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await;
@ -628,12 +664,12 @@ extension_trait! {
```
# async_std::task::block_on(async {
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();
let inner1 = stream::from_iter(vec![1u8,2,3]);
let inner2 = stream::from_iter(vec![4u8,5,6]);
let s = stream::from_iter(vec![inner1, inner2]);
let v: Vec<_> = s.flatten().collect().await;
@ -661,11 +697,11 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<&str> = vec!["1", "lol", "3", "NaN", "5"].into_iter().collect();
let s = stream::from_iter(vec!["1", "lol", "3", "NaN", "5"]);
let mut parsed = s.filter_map(|a| a.parse::<u32>().ok());
@ -702,16 +738,15 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<i32> = vec![1, 2, -3].into_iter().collect();
let s = stream::from_iter(vec![1isize, 2, -3]);
let min = s.clone().min_by_key(|x| x.abs()).await;
assert_eq!(min, Some(1));
let min = VecDeque::<isize>::new().min_by_key(|x| x.abs()).await;
let min = stream::empty::<isize>().min_by_key(|x| x.abs()).await;
assert_eq!(min, None);
#
# }) }
@ -729,6 +764,42 @@ extension_trait! {
MinByKeyFuture::new(self, key_by)
}
#[doc = r#"
Returns the element that gives the maximum value with respect to the
specified key function. If several elements are equally maximum,
the first element is returned. If the stream is empty, `None` is returned.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let s = stream::from_iter(vec![-1isize, -2, -3]);
let max = s.clone().max_by_key(|x| x.abs()).await;
assert_eq!(max, Some(3));
let max = stream::empty::<isize>().min_by_key(|x| x.abs()).await;
assert_eq!(max, None);
#
# }) }
```
"#]
fn max_by_key<K>(
self,
key_by: K,
) -> impl Future<Output = Option<Self::Item>> [MaxByKeyFuture<Self, Self::Item, K>]
where
Self: Sized,
Self::Item: Ord,
K: FnMut(&Self::Item) -> Self::Item,
{
MaxByKeyFuture::new(self, key_by)
}
#[doc = r#"
Returns the element that gives the minimum value with respect to the
specified comparison function. If several elements are equally minimum,
@ -739,11 +810,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1u8, 2, 3]);
let min = s.clone().min_by(|x, y| x.cmp(y)).await;
assert_eq!(min, Some(1));
@ -751,7 +821,7 @@ extension_trait! {
let min = s.min_by(|x, y| y.cmp(x)).await;
assert_eq!(min, Some(3));
let min = VecDeque::<usize>::new().min_by(|x, y| x.cmp(y)).await;
let min = stream::empty::<u8>().min_by(|x, y| x.cmp(y)).await;
assert_eq!(min, None);
#
# }) }
@ -777,15 +847,15 @@ extension_trait! {
```ignore
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1usize, 2, 3]);
let min = s.clone().min().await;
assert_eq!(min, Some(1));
let min = VecDeque::<usize>::new().min().await;
let min = stream::empty::<usize>().min().await;
assert_eq!(min, None);
#
# }) }
@ -811,11 +881,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1u8, 2, 3]);
let max = s.clone().max_by(|x, y| x.cmp(y)).await;
assert_eq!(max, Some(3));
@ -823,7 +892,7 @@ extension_trait! {
let max = s.max_by(|x, y| y.cmp(x)).await;
assert_eq!(max, Some(1));
let max = VecDeque::<usize>::new().max_by(|x, y| x.cmp(y)).await;
let max = stream::empty::<usize>().max_by(|x, y| x.cmp(y)).await;
assert_eq!(max, None);
#
# }) }
@ -850,11 +919,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let mut s = stream::from_iter(vec![1u8, 2, 3]);
let second = s.nth(1).await;
assert_eq!(second, Some(2));
@ -866,11 +934,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::stream;
use async_std::prelude::*;
let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let mut s = stream::from_iter(vec![1u8, 2, 3]);
let second = s.nth(0).await;
assert_eq!(second, Some(1));
@ -884,11 +951,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let mut s = stream::from_iter(vec![1u8, 2, 3]);
let fourth = s.nth(4).await;
assert_eq!(fourth, None);
@ -979,9 +1045,9 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let mut s = stream::from_iter(vec![1u8, 2, 3]);
let res = s.find(|x| *x == 2).await;
assert_eq!(res, Some(2));
#
@ -994,9 +1060,9 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let mut s= stream::from_iter(vec![1, 2, 3]);
let res = s.find(|x| *x == 2).await;
assert_eq!(res, Some(2));
@ -1024,9 +1090,9 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let mut s: VecDeque<&str> = vec!["lol", "NaN", "2", "5"].into_iter().collect();
let mut s = stream::from_iter(vec!["lol", "NaN", "2", "5"]);
let first_number = s.find_map(|s| s.parse().ok()).await;
assert_eq!(first_number, Some(2));
@ -1057,9 +1123,9 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1u8, 2, 3]);
let sum = s.fold(0, |acc, x| acc + x).await;
assert_eq!(sum, 6);
@ -1088,12 +1154,12 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
use std::sync::mpsc::channel;
let (tx, rx) = channel();
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1usize, 2, 3]);
let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await;
let v: Vec<_> = rx.iter().collect();
@ -1194,11 +1260,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<isize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1isize, 2, 3]);
let mut s = s.scan(1, |state, x| {
*state = *state * x;
Some(-*state)
@ -1235,11 +1300,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let a: VecDeque<_> = vec![-1i32, 0, 1].into_iter().collect();
let a = stream::from_iter(vec![-1i32, 0, 1]);
let mut s = a.skip_while(|x| x.is_negative());
assert_eq!(s.next().await, Some(0));
@ -1265,11 +1329,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1u8, 2, 3]);
let mut skipped = s.skip(2);
assert_eq!(skipped.next().await, Some(3));
@ -1331,9 +1394,9 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1usize, 2, 3]);
let sum = s.try_fold(0, |acc, v| {
if (acc+v) % 2 == 1 {
Ok(v+3)
@ -1367,13 +1430,13 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use std::sync::mpsc::channel;
use async_std::prelude::*;
use async_std::stream;
let (tx, rx) = channel();
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1u8, 2, 3]);
let s = s.try_for_each(|v| {
if v % 2 == 1 {
tx.clone().send(v).unwrap();
@ -1425,12 +1488,11 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let l: VecDeque<isize> = vec![1, 2, 3].into_iter().collect();
let r: VecDeque<isize> = vec![4, 5, 6, 7].into_iter().collect();
let l = stream::from_iter(vec![1u8, 2, 3]);
let r = stream::from_iter(vec![4u8, 5, 6, 7]);
let mut s = l.zip(r);
assert_eq!(s.next().await, Some((1, 4)));
@ -1560,14 +1622,14 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
use std::cmp::Ordering;
let s1 = VecDeque::from(vec![1]);
let s2 = VecDeque::from(vec![1, 2]);
let s3 = VecDeque::from(vec![1, 2, 3]);
let s4 = VecDeque::from(vec![1, 2, 4]);
let s1 = stream::from_iter(vec![1]);
let s2 = stream::from_iter(vec![1, 2]);
let s3 = stream::from_iter(vec![1, 2, 3]);
let s4 = stream::from_iter(vec![1, 2, 4]);
assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal));
assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less));
assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater));
@ -1599,9 +1661,9 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let s = stream::from_iter(vec![1usize, 2, 3]);
let res = s.clone().position(|x| *x == 1).await;
assert_eq!(res, Some(0));
@ -1638,13 +1700,14 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
use std::cmp::Ordering;
let s1 = VecDeque::from(vec![1]);
let s2 = VecDeque::from(vec![1, 2]);
let s3 = VecDeque::from(vec![1, 2, 3]);
let s4 = VecDeque::from(vec![1, 2, 4]);
let s1 = stream::from_iter(vec![1]);
let s2 = stream::from_iter(vec![1, 2]);
let s3 = stream::from_iter(vec![1, 2, 3]);
let s4 = stream::from_iter(vec![1, 2, 4]);
assert_eq!(s1.clone().cmp(s1.clone()).await, Ordering::Equal);
assert_eq!(s1.clone().cmp(s2.clone()).await, Ordering::Less);
assert_eq!(s2.clone().cmp(s1.clone()).await, Ordering::Greater);
@ -1674,11 +1737,13 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
let single: VecDeque<isize> = vec![1].into_iter().collect();
let single_ne: VecDeque<isize> = vec![10].into_iter().collect();
let multi: VecDeque<isize> = vec![1,2].into_iter().collect();
let multi_ne: VecDeque<isize> = vec![1,5].into_iter().collect();
use async_std::stream;
let single = stream::from_iter(vec![1usize]);
let single_ne = stream::from_iter(vec![10usize]);
let multi = stream::from_iter(vec![1usize,2]);
let multi_ne = stream::from_iter(vec![1usize,5]);
assert_eq!(single.clone().ne(single.clone()).await, false);
assert_eq!(single_ne.clone().ne(single.clone()).await, true);
assert_eq!(multi.clone().ne(single_ne.clone()).await, true);
@ -1709,12 +1774,13 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let single = stream::from_iter(vec![1]);
let single_gt = stream::from_iter(vec![10]);
let multi = stream::from_iter(vec![1,2]);
let multi_gt = stream::from_iter(vec![1,5]);
let single: VecDeque<isize> = vec![1].into_iter().collect();
let single_gt: VecDeque<isize> = vec![10].into_iter().collect();
let multi: VecDeque<isize> = vec![1,2].into_iter().collect();
let multi_gt: VecDeque<isize> = vec![1,5].into_iter().collect();
assert_eq!(single.clone().ge(single.clone()).await, true);
assert_eq!(single_gt.clone().ge(single.clone()).await, true);
assert_eq!(multi.clone().ge(single_gt.clone()).await, false);
@ -1745,12 +1811,13 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let single = stream::from_iter(vec![1]);
let single_eq = stream::from_iter(vec![10]);
let multi = stream::from_iter(vec![1,2]);
let multi_eq = stream::from_iter(vec![1,5]);
let single: VecDeque<isize> = vec![1].into_iter().collect();
let single_eq: VecDeque<isize> = vec![10].into_iter().collect();
let multi: VecDeque<isize> = vec![1,2].into_iter().collect();
let multi_eq: VecDeque<isize> = vec![1,5].into_iter().collect();
assert_eq!(single.clone().eq(single.clone()).await, true);
assert_eq!(single_eq.clone().eq(single.clone()).await, false);
assert_eq!(multi.clone().eq(single_eq.clone()).await, false);
@ -1781,12 +1848,13 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let single = stream::from_iter(vec![1]);
let single_gt = stream::from_iter(vec![10]);
let multi = stream::from_iter(vec![1,2]);
let multi_gt = stream::from_iter(vec![1,5]);
let single = VecDeque::from(vec![1]);
let single_gt = VecDeque::from(vec![10]);
let multi = VecDeque::from(vec![1,2]);
let multi_gt = VecDeque::from(vec![1,5]);
assert_eq!(single.clone().gt(single.clone()).await, false);
assert_eq!(single_gt.clone().gt(single.clone()).await, true);
assert_eq!(multi.clone().gt(single_gt.clone()).await, false);
@ -1817,12 +1885,13 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let single = stream::from_iter(vec![1]);
let single_gt = stream::from_iter(vec![10]);
let multi = stream::from_iter(vec![1,2]);
let multi_gt = stream::from_iter(vec![1,5]);
let single = VecDeque::from(vec![1]);
let single_gt = VecDeque::from(vec![10]);
let multi = VecDeque::from(vec![1,2]);
let multi_gt = VecDeque::from(vec![1,5]);
assert_eq!(single.clone().le(single.clone()).await, true);
assert_eq!(single.clone().le(single_gt.clone()).await, true);
assert_eq!(multi.clone().le(single_gt.clone()).await, true);
@ -1853,12 +1922,12 @@ extension_trait! {
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
use async_std::stream;
let single = VecDeque::from(vec![1]);
let single_gt = VecDeque::from(vec![10]);
let multi = VecDeque::from(vec![1,2]);
let multi_gt = VecDeque::from(vec![1,5]);
let single = stream::from_iter(vec![1]);
let single_gt = stream::from_iter(vec![10]);
let multi = stream::from_iter(vec![1,2]);
let multi_gt = stream::from_iter(vec![1,5]);
assert_eq!(single.clone().lt(single.clone()).await, false);
assert_eq!(single.clone().lt(single_gt.clone()).await, true);
@ -1900,10 +1969,10 @@ extension_trait! {
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream;
let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect();
let s = stream::from_iter(vec![0u8, 1, 2, 3, 4]);
let sum: u8 = s.sum().await;
assert_eq!(sum, 10);

@ -22,7 +22,7 @@ use crate::sync::WakerSet;
///
/// Senders and receivers can be cloned. When all senders associated with a channel get dropped, it
/// becomes closed. Receive operations on a closed and empty channel return `None` instead of
/// blocking.
/// trying to await a message.
///
/// # Panics
///
@ -44,7 +44,7 @@ use crate::sync::WakerSet;
/// s.send(1).await;
///
/// task::spawn(async move {
/// // This call blocks the current task because the channel is full.
/// // This call will have to wait because the channel is full.
/// // It will be able to complete only after the first message is received.
/// s.send(2).await;
/// });
@ -102,8 +102,7 @@ pub struct Sender<T> {
impl<T> Sender<T> {
/// Sends a message into the channel.
///
/// If the channel is full, this method will block the current task until the send operation
/// can proceed.
/// If the channel is full, this method will wait until there is space in the channel.
///
/// # Examples
///
@ -346,9 +345,8 @@ pub struct Receiver<T> {
impl<T> Receiver<T> {
/// Receives a message from the channel.
///
/// If the channel is empty and it still has senders, this method will block the current task
/// until the receive operation can proceed. If the channel is empty and there are no more
/// senders, this method returns `None`.
/// If the channel is empty and still has senders, this method will wait until a message is
/// sent into the channel or until all senders get dropped.
///
/// # Examples
///

@ -1,26 +1,21 @@
use std::cell::UnsafeCell;
use std::fmt;
use std::isize;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::process;
use std::sync::atomic::{AtomicUsize, Ordering};
use slab::Slab;
use crate::future::Future;
use crate::task::{Context, Poll, Waker};
use crate::sync::WakerSet;
use crate::task::{Context, Poll};
/// Set if a write lock is held.
#[allow(clippy::identity_op)]
const WRITE_LOCK: usize = 1 << 0;
/// Set if there are read operations blocked on the lock.
const BLOCKED_READS: usize = 1 << 1;
/// Set if there are write operations blocked on the lock.
const BLOCKED_WRITES: usize = 1 << 2;
/// The value of a single blocked read contributing to the read count.
const ONE_READ: usize = 1 << 3;
const ONE_READ: usize = 1 << 1;
/// The bits in which the read count is stored.
const READ_COUNT_MASK: usize = !(ONE_READ - 1);
@ -56,8 +51,8 @@ const READ_COUNT_MASK: usize = !(ONE_READ - 1);
/// ```
pub struct RwLock<T> {
state: AtomicUsize,
reads: std::sync::Mutex<Slab<Option<Waker>>>,
writes: std::sync::Mutex<Slab<Option<Waker>>>,
read_wakers: WakerSet,
write_wakers: WakerSet,
value: UnsafeCell<T>,
}
@ -77,8 +72,8 @@ impl<T> RwLock<T> {
pub fn new(t: T) -> RwLock<T> {
RwLock {
state: AtomicUsize::new(0),
reads: std::sync::Mutex::new(Slab::new()),
writes: std::sync::Mutex::new(Slab::new()),
read_wakers: WakerSet::new(),
write_wakers: WakerSet::new(),
value: UnsafeCell::new(t),
}
}
@ -104,100 +99,61 @@ impl<T> RwLock<T> {
/// # })
/// ```
pub async fn read(&self) -> RwLockReadGuard<'_, T> {
pub struct LockFuture<'a, T> {
pub struct ReadFuture<'a, T> {
lock: &'a RwLock<T>,
opt_key: Option<usize>,
acquired: bool,
}
impl<'a, T> Future for LockFuture<'a, T> {
impl<'a, T> Future for ReadFuture<'a, T> {
type Output = RwLockReadGuard<'a, T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.lock.try_read() {
Some(guard) => {
self.acquired = true;
Poll::Ready(guard)
}
let poll = match self.lock.try_read() {
Some(guard) => Poll::Ready(guard),
None => {
let mut reads = self.lock.reads.lock().unwrap();
// Register the current task.
// Insert this lock operation.
match self.opt_key {
None => {
// Insert a new entry into the list of blocked reads.
let w = cx.waker().clone();
let key = reads.insert(Some(w));
self.opt_key = Some(key);
if reads.len() == 1 {
self.lock.state.fetch_or(BLOCKED_READS, Ordering::Relaxed);
}
}
Some(key) => {
// There is already an entry in the list of blocked reads. Just
// reset the waker if it was removed.
if reads[key].is_none() {
let w = cx.waker().clone();
reads[key] = Some(w);
}
}
None => self.opt_key = Some(self.lock.read_wakers.insert(cx)),
Some(key) => self.lock.read_wakers.update(key, cx),
}
// Try locking again because it's possible the lock got unlocked just
// before the current task was registered as a blocked task.
// before the current task was inserted into the waker set.
match self.lock.try_read() {
Some(guard) => {
self.acquired = true;
Poll::Ready(guard)
}
Some(guard) => Poll::Ready(guard),
None => Poll::Pending,
}
}
};
if poll.is_ready() {
// If the current task is in the set, remove it.
if let Some(key) = self.opt_key.take() {
self.lock.read_wakers.complete(key);
}
}
poll
}
}
impl<T> Drop for LockFuture<'_, T> {
impl<T> Drop for ReadFuture<'_, T> {
fn drop(&mut self) {
// If the current task is still in the set, that means it is being cancelled now.
if let Some(key) = self.opt_key {
let mut reads = self.lock.reads.lock().unwrap();
let opt_waker = reads.remove(key);
if reads.is_empty() {
self.lock.state.fetch_and(!BLOCKED_READS, Ordering::Relaxed);
}
self.lock.read_wakers.cancel(key);
if opt_waker.is_none() {
// We were awoken. Wake up another blocked read.
if let Some((_, opt_waker)) = reads.iter_mut().next() {
if let Some(w) = opt_waker.take() {
w.wake();
return;
}
}
drop(reads);
if !self.acquired {
// We didn't acquire the lock and didn't wake another blocked read.
// Wake a blocked write instead.
let mut writes = self.lock.writes.lock().unwrap();
if let Some((_, opt_waker)) = writes.iter_mut().next() {
if let Some(w) = opt_waker.take() {
w.wake();
return;
}
}
}
// If there are no active readers, wake one of the writers.
if self.lock.state.load(Ordering::SeqCst) & READ_COUNT_MASK == 0 {
self.lock.write_wakers.notify_one();
}
}
}
}
LockFuture {
ReadFuture {
lock: self,
opt_key: None,
acquired: false,
}
.await
}
@ -226,7 +182,7 @@ impl<T> RwLock<T> {
/// # })
/// ```
pub fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
let mut state = self.state.load(Ordering::Acquire);
let mut state = self.state.load(Ordering::SeqCst);
loop {
// If a write lock is currently held, then a read lock cannot be acquired.
@ -234,12 +190,17 @@ impl<T> RwLock<T> {
return None;
}
// Make sure the number of readers doesn't overflow.
if state > isize::MAX as usize {
process::abort();
}
// Increment the number of active reads.
match self.state.compare_exchange_weak(
state,
state + ONE_READ,
Ordering::AcqRel,
Ordering::Acquire,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => return Some(RwLockReadGuard(self)),
Err(s) => state = s,
@ -268,99 +229,59 @@ impl<T> RwLock<T> {
/// # })
/// ```
pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
pub struct LockFuture<'a, T> {
pub struct WriteFuture<'a, T> {
lock: &'a RwLock<T>,
opt_key: Option<usize>,
acquired: bool,
}
impl<'a, T> Future for LockFuture<'a, T> {
impl<'a, T> Future for WriteFuture<'a, T> {
type Output = RwLockWriteGuard<'a, T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.lock.try_write() {
Some(guard) => {
self.acquired = true;
Poll::Ready(guard)
}
let poll = match self.lock.try_write() {
Some(guard) => Poll::Ready(guard),
None => {
let mut writes = self.lock.writes.lock().unwrap();
// Register the current task.
// Insert this lock operation.
match self.opt_key {
None => {
// Insert a new entry into the list of blocked writes.
let w = cx.waker().clone();
let key = writes.insert(Some(w));
self.opt_key = Some(key);
if writes.len() == 1 {
self.lock.state.fetch_or(BLOCKED_WRITES, Ordering::Relaxed);
}
}
Some(key) => {
// There is already an entry in the list of blocked writes. Just
// reset the waker if it was removed.
if writes[key].is_none() {
let w = cx.waker().clone();
writes[key] = Some(w);
}
}
None => self.opt_key = Some(self.lock.write_wakers.insert(cx)),
Some(key) => self.lock.write_wakers.update(key, cx),
}
// Try locking again because it's possible the lock got unlocked just
// before the current task was registered as a blocked task.
// before the current task was inserted into the waker set.
match self.lock.try_write() {
Some(guard) => {
self.acquired = true;
Poll::Ready(guard)
}
Some(guard) => Poll::Ready(guard),
None => Poll::Pending,
}
}
};
if poll.is_ready() {
// If the current task is in the set, remove it.
if let Some(key) = self.opt_key.take() {
self.lock.write_wakers.complete(key);
}
}
poll
}
}
impl<T> Drop for LockFuture<'_, T> {
impl<T> Drop for WriteFuture<'_, T> {
fn drop(&mut self) {
// If the current task is still in the set, that means it is being cancelled now.
if let Some(key) = self.opt_key {
let mut writes = self.lock.writes.lock().unwrap();
let opt_waker = writes.remove(key);
if writes.is_empty() {
self.lock
.state
.fetch_and(!BLOCKED_WRITES, Ordering::Relaxed);
}
if opt_waker.is_none() && !self.acquired {
// We were awoken but didn't acquire the lock. Wake up another write.
if let Some((_, opt_waker)) = writes.iter_mut().next() {
if let Some(w) = opt_waker.take() {
w.wake();
return;
}
}
drop(writes);
// There are no blocked writes. Wake a blocked read instead.
let mut reads = self.lock.reads.lock().unwrap();
if let Some((_, opt_waker)) = reads.iter_mut().next() {
if let Some(w) = opt_waker.take() {
w.wake();
return;
}
}
if !self.lock.write_wakers.cancel(key) {
// If no other blocked reader was notified, notify all readers.
self.lock.read_wakers.notify_all();
}
}
}
}
LockFuture {
WriteFuture {
lock: self,
opt_key: None,
acquired: false,
}
.await
}
@ -389,24 +310,10 @@ impl<T> RwLock<T> {
/// # })
/// ```
pub fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
let mut state = self.state.load(Ordering::Acquire);
loop {
// If any kind of lock is currently held, then a write lock cannot be acquired.
if state & (WRITE_LOCK | READ_COUNT_MASK) != 0 {
return None;
}
// Set the write lock.
match self.state.compare_exchange_weak(
state,
state | WRITE_LOCK,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return Some(RwLockWriteGuard(self)),
Err(s) => state = s,
}
if self.state.compare_and_swap(0, WRITE_LOCK, Ordering::SeqCst) == 0 {
Some(RwLockWriteGuard(self))
} else {
None
}
}
@ -449,18 +356,15 @@ impl<T> RwLock<T> {
impl<T: fmt::Debug> fmt::Debug for RwLock<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.try_read() {
None => {
struct LockedPlaceholder;
impl fmt::Debug for LockedPlaceholder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<locked>")
}
}
f.debug_struct("RwLock")
.field("data", &LockedPlaceholder)
.finish()
struct Locked;
impl fmt::Debug for Locked {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<locked>")
}
}
match self.try_read() {
None => f.debug_struct("RwLock").field("data", &Locked).finish(),
Some(guard) => f.debug_struct("RwLock").field("data", &&*guard).finish(),
}
}
@ -486,18 +390,11 @@ unsafe impl<T: Sync> Sync for RwLockReadGuard<'_, T> {}
impl<T> Drop for RwLockReadGuard<'_, T> {
fn drop(&mut self) {
let state = self.0.state.fetch_sub(ONE_READ, Ordering::AcqRel);
// If this was the last read and there are blocked writes, wake one of them up.
if (state & READ_COUNT_MASK) == ONE_READ && state & BLOCKED_WRITES != 0 {
let mut writes = self.0.writes.lock().unwrap();
let state = self.0.state.fetch_sub(ONE_READ, Ordering::SeqCst);
if let Some((_, opt_waker)) = writes.iter_mut().next() {
// If there is no waker in this entry, that means it was already woken.
if let Some(w) = opt_waker.take() {
w.wake();
}
}
// If this was the last read, wake one of the writers.
if state & READ_COUNT_MASK == ONE_READ {
self.0.write_wakers.notify_one();
}
}
}
@ -530,25 +427,12 @@ unsafe impl<T: Sync> Sync for RwLockWriteGuard<'_, T> {}
impl<T> Drop for RwLockWriteGuard<'_, T> {
fn drop(&mut self) {
let state = self.0.state.fetch_and(!WRITE_LOCK, Ordering::AcqRel);
let mut guard = None;
// Check if there are any blocked reads or writes.
if state & BLOCKED_READS != 0 {
guard = Some(self.0.reads.lock().unwrap());
} else if state & BLOCKED_WRITES != 0 {
guard = Some(self.0.writes.lock().unwrap());
}
self.0.state.store(0, Ordering::SeqCst);
// Wake up a single blocked task.
if let Some(mut guard) = guard {
if let Some((_, opt_waker)) = guard.iter_mut().next() {
// If there is no waker in this entry, that means it was already woken.
if let Some(w) = opt_waker.take() {
w.wake();
}
}
// Notify all blocked readers.
if !self.0.read_wakers.notify_all() {
// If there were no blocked readers, notify a blocked writer.
self.0.write_wakers.notify_one();
}
}
}

@ -95,8 +95,11 @@ impl WakerSet {
}
/// Removes the waker of a cancelled operation.
pub fn cancel(&self, key: usize) {
///
/// Returns `true` if another blocked operation from the set was notified.
pub fn cancel(&self, key: usize) -> bool {
let mut inner = self.lock();
if inner.entries.remove(key).is_none() {
inner.none_count -= 1;
@ -107,33 +110,45 @@ impl WakerSet {
w.wake();
inner.none_count += 1;
}
return true;
}
}
false
}
/// Notifies one blocked operation.
///
/// Returns `true` if an operation was notified.
#[inline]
pub fn notify_one(&self) {
pub fn notify_one(&self) -> bool {
// Use `SeqCst` ordering to synchronize with `Lock::drop()`.
if self.flag.load(Ordering::SeqCst) & NOTIFY_ONE != 0 {
self.notify(false);
self.notify(false)
} else {
false
}
}
/// Notifies all blocked operations.
// TODO: Delete this attribute when `crate::sync::channel()` is stabilized.
#[cfg(feature = "unstable")]
///
/// Returns `true` if at least one operation was notified.
#[inline]
pub fn notify_all(&self) {
pub fn notify_all(&self) -> bool {
// Use `SeqCst` ordering to synchronize with `Lock::drop()`.
if self.flag.load(Ordering::SeqCst) & NOTIFY_ALL != 0 {
self.notify(true);
self.notify(true)
} else {
false
}
}
/// Notifies blocked operations, either one or all of them.
fn notify(&self, all: bool) {
///
/// Returns `true` if at least one operation was notified.
fn notify(&self, all: bool) -> bool {
let mut inner = &mut *self.lock();
let mut notified = false;
for (_, opt_waker) in inner.entries.iter_mut() {
// If there is no waker in this entry, that means it was already woken.
@ -141,10 +156,15 @@ impl WakerSet {
w.wake();
inner.none_count += 1;
}
notified = true;
if !all {
break;
}
}
notified
}
/// Locks the list of entries.

@ -190,7 +190,7 @@ macro_rules! extension_trait {
};
// Parse the return type in an extension method.
(@doc ($($head:tt)*) -> impl Future<Output = $out:ty> [$f:ty] $($tail:tt)*) => {
(@doc ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {
extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*);
};
(@ext ($($head:tt)*) -> impl Future<Output = $out:ty> $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => {

@ -0,0 +1,100 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;
use async_std::prelude::*;
use async_std::stream;
use async_std::sync::channel;
use async_std::task;
#[test]
/// Checks that streams are merged fully even if one of the components
/// experiences delay.
fn merging_delayed_streams_work() {
let (sender, receiver) = channel::<i32>(10);
let mut s = receiver.merge(stream::empty());
let t = task::spawn(async move {
let mut xs = Vec::new();
while let Some(x) = s.next().await {
xs.push(x);
}
xs
});
task::block_on(async move {
task::sleep(std::time::Duration::from_millis(500)).await;
sender.send(92).await;
drop(sender);
let xs = t.await;
assert_eq!(xs, vec![92])
});
let (sender, receiver) = channel::<i32>(10);
let mut s = stream::empty().merge(receiver);
let t = task::spawn(async move {
let mut xs = Vec::new();
while let Some(x) = s.next().await {
xs.push(x);
}
xs
});
task::block_on(async move {
task::sleep(std::time::Duration::from_millis(500)).await;
sender.send(92).await;
drop(sender);
let xs = t.await;
assert_eq!(xs, vec![92])
});
}
pin_project! {
/// The opposite of `Fuse`: makes the stream panic if polled after termination.
struct Explode<S> {
#[pin]
done: bool,
#[pin]
inner: S,
}
}
impl<S: Stream> Stream for Explode<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if *this.done {
panic!("KABOOM!")
}
let res = this.inner.poll_next(cx);
if let Poll::Ready(None) = &res {
*this.done = true;
}
res
}
}
fn explode<S: Stream>(s: S) -> Explode<S> {
Explode {
done: false,
inner: s,
}
}
#[test]
fn merge_works_with_unfused_streams() {
let s1 = explode(stream::once(92));
let s2 = explode(stream::once(92));
let mut s = s1.merge(s2);
let xs = task::block_on(async move {
let mut xs = Vec::new();
while let Some(x) = s.next().await {
xs.push(x)
}
xs
});
assert_eq!(xs, vec![92, 92]);
}
Loading…
Cancel
Save