Merge branch 'master' into add_stdin_lock

This commit is contained in:
k-nasa 2019-11-01 10:10:49 +09:00
commit f8b8c9debe
19 changed files with 1179 additions and 85 deletions

View file

@ -59,8 +59,10 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: ${{ steps.component.outputs.toolchain }}
override: true
components: rustfmt
- name: setup
run: |

View file

@ -7,6 +7,63 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview
## [Unreleased]
# [0.99.11] - 2019-10-29
This patch introduces `async_std::sync::channel`, a novel asynchronous port of
the ultra-fast Crossbeam channels. This has been one of the most anticipated
features for async-std, and we're excited to be providing a first version of
this!
In addition to channels, this patch has the regular list of new methods, types,
and doc fixes.
## Examples
__Send and receive items from a channel__
```rust
// Create a bounded channel with a max-size of 1
let (s, r) = channel(1);
// This call returns immediately because there is enough space in the channel.
s.send(1).await;
task::spawn(async move {
// This call blocks the current task because the channel is full.
// It will be able to complete only after the first message is received.
s.send(2).await;
});
// Receive items from the channel
task::sleep(Duration::from_secs(1)).await;
assert_eq!(r.recv().await, Some(1));
assert_eq!(r.recv().await, Some(2));
```
## Added
- Added `Future::delay` as "unstable"
- Added `Stream::flat_map` as "unstable"
- Added `Stream::flatten` as "unstable"
- Added `Stream::product` as "unstable"
- Added `Stream::sum` as "unstable"
- Added `Stream::min_by_key`
- Added `Stream::max_by`
- Added `Stream::timeout` as "unstable"
- Added `sync::channel` as "unstable".
- Added doc links from instantiated structs to the methods that create them.
- Implemented `Extend` + `FromStream` for `PathBuf`.
## Changed
- Fixed an issue with `block_on` so it works even when nested.
- Fixed issues with our Clippy check on CI.
- Replaced our uses of `cfg_if` with our own macros, simplifying the codebase.
- Updated the homepage link in `Cargo.toml` to point to [async.rs](https://async.rs).
- Updated the module-level documentation for `stream` and `sync`.
- Various typos and grammar fixes.
- Removed redundant file flushes, improving the performance of `File` operations
## Removed
Nothing was removed in this release.
# [0.99.10] - 2019-10-16
This patch stabilizes several core concurrency macros, introduces async versions
@ -281,7 +338,8 @@ task::blocking(async {
- Initial beta release
[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.10...HEAD
[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.11...HEAD
[0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11
[0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.9...v0.99.10
[0.99.9]: https://github.com/async-rs/async-std/compare/v0.99.8...v0.99.9
[0.99.8]: https://github.com/async-rs/async-std/compare/v0.99.7...v0.99.8

View file

@ -1,6 +1,6 @@
[package]
name = "async-std"
version = "0.99.10"
version = "0.99.11"
authors = [
"Stjepan Glavina <stjepang@gmail.com>",
"Yoshua Wuyts <yoshuawuyts@gmail.com>",
@ -33,12 +33,12 @@ crossbeam-utils = "0.6.6"
futures-core-preview = "=0.3.0-alpha.19"
futures-io-preview = "=0.3.0-alpha.19"
futures-timer = "1.0.2"
lazy_static = "1.4.0"
log = { version = "0.4.8", features = ["kv_unstable"] }
memchr = "2.2.1"
mio = "0.6.19"
mio-uds = "0.6.7"
num_cpus = "1.10.1"
once_cell = "1.2.0"
pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"
kv-log-macro = "1.0.4"

View file

@ -61,7 +61,7 @@ syntax.
## Features
- __Modern:__ Built from the ground up for `std::future` and `async/await` with
blazing fast compilation times.
blazing fast compilation time.
- __Fast:__ Our robust allocator and threadpool designs provide ultra-high
throughput with predictably low latency.
- __Intuitive:__ Complete parity with the stdlib means you only need to learn

View file

@ -1,8 +1,8 @@
use std::fmt;
use std::sync::{Arc, Mutex};
use lazy_static::lazy_static;
use mio::{self, Evented};
use once_cell::sync::Lazy;
use slab::Slab;
use crate::io;
@ -100,25 +100,23 @@ impl Reactor {
// }
}
lazy_static! {
/// The state of the global networking driver.
static ref REACTOR: Reactor = {
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
// handles.
std::thread::Builder::new()
.name("async-net-driver".to_string())
.spawn(move || {
// If the driver thread panics, there's not much we can do. It is not a
// recoverable error and there is no place to propagate it into so we just abort.
abort_on_panic(|| {
main_loop().expect("async networking thread has panicked");
})
/// The state of the global networking driver.
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
// handles.
std::thread::Builder::new()
.name("async-net-driver".to_string())
.spawn(move || {
// If the driver thread panics, there's not much we can do. It is not a
// recoverable error and there is no place to propagate it into so we just abort.
abort_on_panic(|| {
main_loop().expect("async networking thread has panicked");
})
.expect("cannot start a thread driving blocking tasks");
})
.expect("cannot start a thread driving blocking tasks");
Reactor::new().expect("cannot initialize reactor")
};
}
Reactor::new().expect("cannot initialize reactor")
});
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
fn main_loop() -> io::Result<()> {

View file

@ -1,14 +1,42 @@
//! Networking primitives for TCP/UDP communication.
//!
//! For OS-specific networking primitives like Unix domain sockets, refer to the [`async_std::os`]
//! module.
//! This module provides networking functionality for the Transmission Control and User
//! Datagram Protocols, as well as types for IP and socket addresses.
//!
//! This module is an async version of [`std::net`].
//!
//! # Organization
//!
//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP
//! * [`UdpSocket`] provides functionality for communication over UDP
//! * [`IpAddr`] represents IP addresses of either IPv4 or IPv6; [`Ipv4Addr`] and
//! [`Ipv6Addr`] are respectively IPv4 and IPv6 addresses
//! * [`SocketAddr`] represents socket addresses of either IPv4 or IPv6; [`SocketAddrV4`]
//! and [`SocketAddrV6`] are respectively IPv4 and IPv6 socket addresses
//! * [`ToSocketAddrs`] is a trait that used for generic address resolution when interacting
//! with networking objects like [`TcpListener`], [`TcpStream`] or [`UdpSocket`]
//! * Other types are return or parameter types for various methods in this module
//!
//! [`IpAddr`]: enum.IpAddr.html
//! [`Ipv4Addr`]: struct.Ipv4Addr.html
//! [`Ipv6Addr`]: struct.Ipv6Addr.html
//! [`SocketAddr`]: enum.SocketAddr.html
//! [`SocketAddrV4`]: struct.SocketAddrV4.html
//! [`SocketAddrV6`]: struct.SocketAddrV6.html
//! [`TcpListener`]: struct.TcpListener.html
//! [`TcpStream`]: struct.TcpStream.html
//! [`ToSocketAddrs`]: trait.ToSocketAddrs.html
//! [`UdpSocket`]: struct.UdpSocket.html
//!
//! # Platform-specific extensions
//!
//! APIs such as Unix domain sockets are available on certain platforms only. You can find
//! platform-specific extensions in the [`async_std::os`] module.
//!
//! [`async_std::os`]: ../os/index.html
//! [`std::net`]: https://doc.rust-lang.org/std/net/index.html
//!
//! ## Examples
//! # Examples
//!
//! A simple UDP echo server:
//!

View file

@ -2,24 +2,303 @@
//!
//! This module is an async version of [`std::iter`].
//!
//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
//! If you've found yourself with an asynchronous collection of some kind,
//! and needed to perform an operation on the elements of said collection,
//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic
//! asynchronous Rust code, so it's worth becoming familiar with them.
//!
//! # Examples
//! Before explaining more, let's talk about how this module is structured:
//!
//! # Organization
//!
//! This module is largely organized by type:
//!
//! * [Traits] are the core portion: these traits define what kind of streams
//! exist and what you can do with them. The methods of these traits are worth
//! putting some extra study time into.
//! * [Functions] provide some helpful ways to create some basic streams.
//! * [Structs] are often the return types of the various methods on this
//! module's traits. You'll usually want to look at the method that creates
//! the `struct`, rather than the `struct` itself. For more detail about why,
//! see '[Implementing Stream](#implementing-stream)'.
//!
//! [Traits]: #traits
//! [Functions]: #functions
//! [Structs]: #structs
//!
//! That's it! Let's dig into streams.
//!
//! # Stream
//!
//! The heart and soul of this module is the [`Stream`] trait. The core of
//! [`Stream`] looks like this:
//!
//! ```
//! # async_std::task::block_on(async {
//! # use async_std::task::{Context, Poll};
//! # use std::pin::Pin;
//! trait Stream {
//! type Item;
//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
//! }
//! ```
//!
//! A stream has a method, [`next`], which when called, returns an
//! [`Poll`]<[`Option`]`<Item>>`. [`next`] will return `Ready(Some(Item))`
//! as long as there are elements, and once they've all been exhausted, will
//! return `None` to indicate that iteration is finished. If we're waiting on
//! something asynchronous to resolve `Pending` is returned.
//!
//! Individual streams may choose to resume iteration, and so calling
//! [`next`] again may or may not eventually start returning `Ready(Some(Item))`
//! again at some point.
//!
//! [`Stream`]'s full definition includes a number of other methods as well,
//! but they are default methods, built on top of [`next`], and so you get
//! them for free.
//!
//! Streams are also composable, and it's common to chain them together to do
//! more complex forms of processing. See the [Adapters](#adapters) section
//! below for more details.
//!
//! [`Poll`]: ../task/enum.Poll.html
//! [`Stream`]: trait.Stream.html
//! [`next`]: trait.Stream.html#tymethod.next
//! [`Option`]: ../../std/option/enum.Option.html
//!
//! # The three forms of streaming
//!
//! There are three common methods which can create streams from a collection:
//!
//! * `stream()`, which iterates over `&T`.
//! * `stream_mut()`, which iterates over `&mut T`.
//! * `into_stream()`, which iterates over `T`.
//!
//! Various things in async-std may implement one or more of the
//! three, where appropriate.
//!
//! # Implementing Stream
//!
//! Creating a stream of your own involves two steps: creating a `struct` to
//! hold the stream's state, and then `impl`ementing [`Stream`] for that
//! `struct`. This is why there are so many `struct`s in this module: there is
//! one for each stream and iterator adapter.
//!
//! Let's make a stream named `Counter` which counts from `1` to `5`:
//!
//! ```
//! # use async_std::prelude::*;
//! # use async_std::task::{Context, Poll};
//! # use std::pin::Pin;
//! // First, the struct:
//!
//! /// A stream which counts from one to five
//! struct Counter {
//! count: usize,
//! }
//!
//! // we want our count to start at one, so let's add a new() method to help.
//! // This isn't strictly necessary, but is convenient. Note that we start
//! // `count` at zero, we'll see why in `next()`'s implementation below.
//! impl Counter {
//! fn new() -> Counter {
//! Counter { count: 0 }
//! }
//! }
//!
//! // Then, we implement `Stream` for our `Counter`:
//!
//! impl Stream for Counter {
//! // we will be counting with usize
//! type Item = usize;
//!
//! // poll_next() is the only required method
//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
//! // Increment our count. This is why we started at zero.
//! self.count += 1;
//!
//! // Check to see if we've finished counting or not.
//! if self.count < 6 {
//! Poll::Ready(Some(self.count))
//! } else {
//! Poll::Ready(None)
//! }
//! }
//! }
//!
//! // And now we can use it!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
//! use async_std::prelude::*;
//! use async_std::stream;
//! let mut counter = Counter::new();
//!
//! let mut s = stream::repeat(9).take(3);
//! let x = counter.next().await.unwrap();
//! println!("{}", x);
//!
//! while let Some(v) = s.next().await {
//! assert_eq!(v, 9);
//! let x = counter.next().await.unwrap();
//! println!("{}", x);
//!
//! let x = counter.next().await.unwrap();
//! println!("{}", x);
//!
//! let x = counter.next().await.unwrap();
//! println!("{}", x);
//!
//! let x = counter.next().await.unwrap();
//! println!("{}", x);
//! #
//! # Ok(()) }) }
//! ```
//!
//! This will print `1` through `5`, each on their own line.
//!
//! Calling `next().await` this way gets repetitive. Rust has a construct which
//! can call `next()` on your stream, until it reaches `None`. Let's go over
//! that next.
//!
//! # while let Loops and IntoStream
//!
//! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic
//! example of `while let`:
//!
//! ```
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
//! # use async_std::prelude::*;
//! # use async_std::stream;
//! let mut values = stream::repeat(1u8).take(5);
//!
//! while let Some(x) = values.next().await {
//! println!("{}", x);
//! }
//! #
//! # })
//! # Ok(()) }) }
//! ```
//!
//! This will print the numbers one through five, each on their own line. But
//! you'll notice something here: we never called anything on our vector to
//! produce a stream. What gives?
//!
//! There's a trait in the standard library for converting something into an
//! stream: [`IntoStream`]. This trait has one method, [`into_stream],
//! which converts the thing implementing [`IntoStream`] into a stream.
//!
//! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler
//! support yet. This means that automatic conversions like with `for` loops
//! doesn't occur yet, and `into_stream` will always have to be called manually.
//!
//! [`IntoStream`]: trait.IntoStream.html
//! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream
//!
//! # Adapters
//!
//! Functions which take an [`Stream`] and return another [`Stream`] are
//! often called 'stream adapters', as they are a form of the 'adapter
//! pattern'.
//!
//! Common stream adapters include [`map`], [`take`], and [`filter`].
//! For more, see their documentation.
//!
//! [`map`]: trait.Stream.html#method.map
//! [`take`]: trait.Stream.html#method.take
//! [`filter`]: trait.Stream.html#method.filter
//!
//! # Laziness
//!
//! Streams (and stream [adapters](#adapters)) are *lazy*. This means that
//! just creating a stream doesn't _do_ a whole lot. Nothing really happens
//! until you call [`next`]. This is sometimes a source of confusion when
//! creating a stream solely for its side effects. For example, the [`map`]
//! method calls a closure on each element it iterates over:
//!
//! ```
//! # #![allow(unused_must_use)]
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
//! # use async_std::prelude::*;
//! # use async_std::stream;
//! let v = stream::repeat(1u8).take(5);
//! v.map(|x| println!("{}", x));
//! #
//! # Ok(()) }) }
//! ```
//!
//! This will not print any values, as we only created a stream, rather than
//! using it. The compiler will warn us about this kind of behavior:
//!
//! ```text
//! warning: unused result that must be used: streams are lazy and
//! do nothing unless consumed
//! ```
//!
//! The idiomatic way to write a [`map`] for its side effects is to use a
//! `while let` loop instead:
//!
//! ```
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
//! # use async_std::prelude::*;
//! # use async_std::stream;
//! let mut v = stream::repeat(1u8).take(5);
//!
//! while let Some(x) = &v.next().await {
//! println!("{}", x);
//! }
//! #
//! # Ok(()) }) }
//! ```
//!
//! [`map`]: trait.Stream.html#method.map
//!
//! The two most common ways to evaluate a stream are to use a `while let` loop
//! like this, or using the [`collect`] method to produce a new collection.
//!
//! [`collect`]: trait.Stream.html#method.collect
//!
//! # Infinity
//!
//! Streams do not have to be finite. As an example, an repeat stream is
//! an infinite stream:
//!
//! ```
//! # use async_std::stream;
//! let numbers = stream::repeat(1u8);
//! ```
//!
//! It is common to use the [`take`] stream adapter to turn an infinite
//! stream into a finite one:
//!
//! ```
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! #
//! # use async_std::prelude::*;
//! # use async_std::stream;
//! let numbers = stream::repeat(1u8);
//! let mut five_numbers = numbers.take(5);
//!
//! while let Some(number) = five_numbers.next().await {
//! println!("{}", number);
//! }
//! #
//! # Ok(()) }) }
//! ```
//!
//! This will print the numbers `0` through `4`, each on their own line.
//!
//! Bear in mind that methods on infinite streams, even those for which a
//! result can be determined mathematically in finite time, may not terminate.
//! Specifically, methods such as [`min`], which in the general case require
//! traversing every element in the stream, are likely not to return
//! successfully for any infinite streams.
//!
//! ```ignore
//! let ones = async_std::stream::repeat(1);
//! let least = ones.min().await.unwrap(); // Oh no! An infinite loop!
//! // `ones.min()` causes an infinite loop, so we won't reach this point!
//! println!("The smallest number one is {}.", least);
//! ```
//!
//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html
//! [`take`]: trait.Stream.html#method.take
//! [`min`]: trait.Stream.html#method.min
pub use empty::{empty, Empty};
pub use from_fn::{from_fn, FromFn};

63
src/stream/stream/eq.rs Normal file
View file

@ -0,0 +1,63 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::fuse::Fuse;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
// Lexicographically compares the elements of this `Stream` with those
// of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct EqFuture<L: Stream, R: Stream> {
#[pin]
l: Fuse<L>,
#[pin]
r: Fuse<R>,
}
}
impl<L: Stream, R: Stream> EqFuture<L, R>
where
L::Item: PartialEq<R::Item>,
{
pub(super) fn new(l: L, r: R) -> Self {
EqFuture {
l: l.fuse(),
r: r.fuse(),
}
}
}
impl<L: Stream, R: Stream> Future for EqFuture<L, R>
where
L: Stream + Sized,
R: Stream + Sized,
L::Item: PartialEq<R::Item>,
{
type Output = bool;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx));
let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx));
if this.l.done && this.r.done {
return Poll::Ready(true);
}
match (l_val, r_val) {
(Some(l), Some(r)) if l != r => {
return Poll::Ready(false);
}
_ => {}
}
}
}
}

View file

@ -6,8 +6,7 @@ use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
/// A `Stream` that is permanently closed once a single call to `poll` results in
/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`.
/// A stream that yields `None` forever after the underlying stream yields `None` once.
///
/// This `struct` is created by the [`fuse`] method on [`Stream`]. See its
/// documentation for more.

View file

@ -0,0 +1,57 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MaxByFuture<S, F, T> {
#[pin]
stream: S,
compare: F,
max: Option<T>,
}
}
impl<S, F, T> MaxByFuture<S, F, T> {
pub(super) fn new(stream: S, compare: F) -> Self {
MaxByFuture {
stream,
compare,
max: None,
}
}
}
impl<S, F> Future for MaxByFuture<S, F, S::Item>
where
S: Stream,
F: FnMut(&S::Item, &S::Item) -> Ordering,
{
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
cx.waker().wake_by_ref();
match this.max.take() {
None => *this.max = Some(new),
Some(old) => match (this.compare)(&new, &old) {
Ordering::Greater => *this.max = Some(new),
_ => *this.max = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(this.max.take()),
}
}
}

60
src/stream/stream/min.rs Normal file
View file

@ -0,0 +1,60 @@
use std::marker::PhantomData;
use std::cmp::{Ordering, Ord};
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MinFuture<S, F, T> {
#[pin]
stream: S,
_compare: PhantomData<F>,
min: Option<T>,
}
}
impl<S, F, T> MinFuture<S, F, T> {
pub(super) fn new(stream: S) -> Self {
Self {
stream,
_compare: PhantomData,
min: None,
}
}
}
impl<S, F> Future for MinFuture<S, F, S::Item>
where
S: Stream,
S::Item: Ord,
F: FnMut(&S::Item, &S::Item) -> Ordering,
{
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
cx.waker().wake_by_ref();
match this.min.take() {
None => *this.min = Some(new),
Some(old) => match new.cmp(&old) {
Ordering::Less => *this.min = Some(new),
_ => *this.min = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(this.min.take()),
}
}
}

View file

@ -0,0 +1,60 @@
use std::cmp::Ordering;
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct MinByKeyFuture<S, T, K> {
#[pin]
stream: S,
min: Option<T>,
key_by: K,
}
}
impl<S, T, K> MinByKeyFuture<S, T, K> {
pub(super) fn new(stream: S, key_by: K) -> Self {
MinByKeyFuture {
stream,
min: None,
key_by,
}
}
}
impl<S, K> Future for MinByKeyFuture<S, S::Item, K>
where
S: Stream,
K: FnMut(&S::Item) -> S::Item,
S::Item: Ord,
{
type Output = Option<S::Item>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(new) => {
let new = (this.key_by)(&new);
cx.waker().wake_by_ref();
match this.min.take() {
None => *this.min = Some(new),
Some(old) => match new.cmp(&old) {
Ordering::Less => *this.min = Some(new),
_ => *this.min = Some(old),
},
}
Poll::Pending
}
None => Poll::Ready(this.min.take()),
}
}
}

View file

@ -26,6 +26,7 @@ mod any;
mod chain;
mod cmp;
mod enumerate;
mod eq;
mod filter;
mod filter_map;
mod find;
@ -40,10 +41,15 @@ mod last;
mod le;
mod lt;
mod map;
mod max_by;
mod min;
mod min_by;
mod min_by_key;
mod ne;
mod next;
mod nth;
mod partial_cmp;
mod position;
mod scan;
mod skip;
mod skip_while;
@ -58,6 +64,7 @@ use all::AllFuture;
use any::AnyFuture;
use cmp::CmpFuture;
use enumerate::Enumerate;
use eq::EqFuture;
use filter_map::FilterMap;
use find::FindFuture;
use find_map::FindMapFuture;
@ -68,10 +75,15 @@ use gt::GtFuture;
use last::LastFuture;
use le::LeFuture;
use lt::LtFuture;
use max_by::MaxByFuture;
use min::MinFuture;
use min_by::MinByFuture;
use min_by_key::MinByKeyFuture;
use ne::NeFuture;
use next::NextFuture;
use nth::NthFuture;
use partial_cmp::PartialCmpFuture;
use position::PositionFuture;
use try_fold::TryFoldFuture;
use try_for_each::TryForEeachFuture;
@ -505,9 +517,11 @@ extension_trait! {
}
#[doc = r#"
Transforms this `Stream` into a "fused" `Stream` such that after the first time
`poll` returns `Poll::Ready(None)`, all future calls to `poll` will also return
`Poll::Ready(None)`.
Creates a stream which ends after the first `None`.
After a stream returns `None`, future calls may or may not yield `Some(T)` again.
`fuse()` adapts an iterator, ensuring that after a `None` is given, it will always
return `None` forever.
# Examples
@ -678,6 +692,43 @@ extension_trait! {
FilterMap::new(self, f)
}
#[doc = r#"
Returns the element that gives the minimum value with respect to the
specified key function. If several elements are equally minimum,
the first element is returned. If the stream is empty, `None` is returned.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<i32> = vec![1, 2, -3].into_iter().collect();
let min = s.clone().min_by_key(|x| x.abs()).await;
assert_eq!(min, Some(1));
let min = VecDeque::<isize>::new().min_by_key(|x| x.abs()).await;
assert_eq!(min, None);
#
# }) }
```
"#]
fn min_by_key<K>(
self,
key_by: K,
) -> impl Future<Output = Option<Self::Item>> [MinByKeyFuture<Self, Self::Item, K>]
where
Self: Sized,
Self::Item: Ord,
K: FnMut(&Self::Item) -> Self::Item,
{
MinByKeyFuture::new(self, key_by)
}
#[doc = r#"
Returns the element that gives the minimum value with respect to the
specified comparison function. If several elements are equally minimum,
@ -717,6 +768,79 @@ extension_trait! {
MinByFuture::new(self, compare)
}
#[doc = r#"
Returns the element that gives the minimum value. If several elements are equally minimum,
the first element is returned. If the stream is empty, `None` is returned.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let min = s.clone().min().await;
assert_eq!(min, Some(1));
let min = VecDeque::<usize>::new().min().await;
assert_eq!(min, None);
#
# }) }
```
"#]
fn min<F>(
self,
) -> impl Future<Output = Option<Self::Item>> [MinFuture<Self, F, Self::Item>]
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
{
MinFuture::new(self)
}
#[doc = r#"
Returns the element that gives the maximum value with respect to the
specified comparison function. If several elements are equally maximum,
the first element is returned. If the stream is empty, `None` is returned.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let max = s.clone().max_by(|x, y| x.cmp(y)).await;
assert_eq!(max, Some(3));
let max = s.max_by(|x, y| y.cmp(x)).await;
assert_eq!(max, Some(1));
let max = VecDeque::<usize>::new().max_by(|x, y| x.cmp(y)).await;
assert_eq!(max, None);
#
# }) }
```
"#]
fn max_by<F>(
self,
compare: F,
) -> impl Future<Output = Option<Self::Item>> [MaxByFuture<Self, F, Self::Item>]
where
Self: Sized,
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
{
MaxByFuture::new(self, compare)
}
#[doc = r#"
Returns the nth element of the stream.
@ -1466,6 +1590,45 @@ extension_trait! {
PartialCmpFuture::new(self, other)
}
#[doc = r#"
Searches for an element in a Stream that satisfies a predicate, returning
its index.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
let res = s.clone().position(|x| *x == 1).await;
assert_eq!(res, Some(0));
let res = s.clone().position(|x| *x == 2).await;
assert_eq!(res, Some(1));
let res = s.clone().position(|x| *x == 3).await;
assert_eq!(res, Some(2));
let res = s.clone().position(|x| *x == 4).await;
assert_eq!(res, None);
#
# }) }
```
"#]
fn position<P>(
self,
predicate: P
) -> impl Future<Output = Option<usize>> [PositionFuture<Self, P>]
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
PositionFuture::new(self, predicate)
}
#[doc = r#"
Lexicographically compares the elements of this `Stream` with those
of another using 'Ord'.
@ -1504,6 +1667,39 @@ extension_trait! {
CmpFuture::new(self, other)
}
#[doc = r#"
Determines if the elements of this `Stream` are lexicographically
not equal to those of another.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
let single: VecDeque<isize> = vec![1].into_iter().collect();
let single_ne: VecDeque<isize> = vec![10].into_iter().collect();
let multi: VecDeque<isize> = vec![1,2].into_iter().collect();
let multi_ne: VecDeque<isize> = vec![1,5].into_iter().collect();
assert_eq!(single.clone().ne(single.clone()).await, false);
assert_eq!(single_ne.clone().ne(single.clone()).await, true);
assert_eq!(multi.clone().ne(single_ne.clone()).await, true);
assert_eq!(multi_ne.clone().ne(multi.clone()).await, true);
#
# }) }
```
"#]
fn ne<S>(
self,
other: S
) -> impl Future<Output = bool> [NeFuture<Self, S>]
where
Self: Sized + Stream,
S: Sized + Stream,
<Self as Stream>::Item: PartialEq<S::Item>,
{
NeFuture::new(self, other)
}
#[doc = r#"
Determines if the elements of this `Stream` are lexicographically
greater than or equal to those of another.
@ -1540,6 +1736,42 @@ extension_trait! {
GeFuture::new(self, other)
}
#[doc = r#"
Determines if the elements of this `Stream` are lexicographically
equal to those of another.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use std::collections::VecDeque;
let single: VecDeque<isize> = vec![1].into_iter().collect();
let single_eq: VecDeque<isize> = vec![10].into_iter().collect();
let multi: VecDeque<isize> = vec![1,2].into_iter().collect();
let multi_eq: VecDeque<isize> = vec![1,5].into_iter().collect();
assert_eq!(single.clone().eq(single.clone()).await, true);
assert_eq!(single_eq.clone().eq(single.clone()).await, false);
assert_eq!(multi.clone().eq(single_eq.clone()).await, false);
assert_eq!(multi_eq.clone().eq(multi.clone()).await, false);
#
# }) }
```
"#]
fn eq<S>(
self,
other: S
) -> impl Future<Output = bool> [EqFuture<Self, S>]
where
Self: Sized + Stream,
S: Sized + Stream,
<Self as Stream>::Item: PartialEq<S::Item>,
{
EqFuture::new(self, other)
}
#[doc = r#"
Determines if the elements of this `Stream` are lexicographically
greater than those of another.

65
src/stream/stream/ne.rs Normal file
View file

@ -0,0 +1,65 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use super::fuse::Fuse;
use crate::future::Future;
use crate::prelude::*;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
// Lexicographically compares the elements of this `Stream` with those
// of another.
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NeFuture<L: Stream, R: Stream> {
#[pin]
l: Fuse<L>,
#[pin]
r: Fuse<R>,
}
}
impl<L: Stream, R: Stream> NeFuture<L, R>
where
L::Item: PartialEq<R::Item>,
{
pub(super) fn new(l: L, r: R) -> Self {
Self {
l: l.fuse(),
r: r.fuse(),
}
}
}
impl<L: Stream, R: Stream> Future for NeFuture<L, R>
where
L: Stream + Sized,
R: Stream + Sized,
L::Item: PartialEq<R::Item>,
{
type Output = bool;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx));
let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx));
if this.l.done || this.r.done {
return Poll::Ready(false);
}
match (l_val, r_val) {
(Some(l), Some(r)) if l == r => {
continue;
}
_ => {
return Poll::Ready(true);
}
}
}
}
}

View file

@ -0,0 +1,51 @@
use std::pin::Pin;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct PositionFuture<S, P> {
#[pin]
stream: S,
predicate: P,
index:usize,
}
}
impl<S, P> PositionFuture<S, P> {
pub(super) fn new(stream: S, predicate: P) -> Self {
PositionFuture {
stream,
predicate,
index: 0,
}
}
}
impl<S, P> Future for PositionFuture<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Output = Option<usize>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let next = futures_core::ready!(this.stream.poll_next(cx));
match next {
Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)),
Some(_) => {
cx.waker().wake_by_ref();
*this.index += 1;
Poll::Pending
}
None => Poll::Ready(None),
}
}
}

View file

@ -4,6 +4,152 @@
//!
//! [`std::sync`]: https://doc.rust-lang.org/std/sync/index.html
//!
//! ## The need for synchronization
//!
//! async-std's sync primitives are scheduler-aware, making it possible to
//! `.await` their operations - for example the locking of a [`Mutex`].
//!
//! Conceptually, a Rust program is a series of operations which will
//! be executed on a computer. The timeline of events happening in the
//! program is consistent with the order of the operations in the code.
//!
//! Consider the following code, operating on some global static variables:
//!
//! ```rust
//! static mut A: u32 = 0;
//! static mut B: u32 = 0;
//! static mut C: u32 = 0;
//!
//! fn main() {
//! unsafe {
//! A = 3;
//! B = 4;
//! A = A + B;
//! C = B;
//! println!("{} {} {}", A, B, C);
//! C = A;
//! }
//! }
//! ```
//!
//! It appears as if some variables stored in memory are changed, an addition
//! is performed, result is stored in `A` and the variable `C` is
//! modified twice.
//!
//! When only a single thread is involved, the results are as expected:
//! the line `7 4 4` gets printed.
//!
//! As for what happens behind the scenes, when optimizations are enabled the
//! final generated machine code might look very different from the code:
//!
//! - The first store to `C` might be moved before the store to `A` or `B`,
//! _as if_ we had written `C = 4; A = 3; B = 4`.
//!
//! - Assignment of `A + B` to `A` might be removed, since the sum can be stored
//! in a temporary location until it gets printed, with the global variable
//! never getting updated.
//!
//! - The final result could be determined just by looking at the code
//! at compile time, so [constant folding] might turn the whole
//! block into a simple `println!("7 4 4")`.
//!
//! The compiler is allowed to perform any combination of these
//! optimizations, as long as the final optimized code, when executed,
//! produces the same results as the one without optimizations.
//!
//! Due to the [concurrency] involved in modern computers, assumptions
//! about the program's execution order are often wrong. Access to
//! global variables can lead to nondeterministic results, **even if**
//! compiler optimizations are disabled, and it is **still possible**
//! to introduce synchronization bugs.
//!
//! Note that thanks to Rust's safety guarantees, accessing global (static)
//! variables requires `unsafe` code, assuming we don't use any of the
//! synchronization primitives in this module.
//!
//! [constant folding]: https://en.wikipedia.org/wiki/Constant_folding
//! [concurrency]: https://en.wikipedia.org/wiki/Concurrency_(computer_science)
//!
//! ## Out-of-order execution
//!
//! Instructions can execute in a different order from the one we define, due to
//! various reasons:
//!
//! - The **compiler** reordering instructions: If the compiler can issue an
//! instruction at an earlier point, it will try to do so. For example, it
//! might hoist memory loads at the top of a code block, so that the CPU can
//! start [prefetching] the values from memory.
//!
//! In single-threaded scenarios, this can cause issues when writing
//! signal handlers or certain kinds of low-level code.
//! Use [compiler fences] to prevent this reordering.
//!
//! - A **single processor** executing instructions [out-of-order]:
//! Modern CPUs are capable of [superscalar] execution,
//! i.e., multiple instructions might be executing at the same time,
//! even though the machine code describes a sequential process.
//!
//! This kind of reordering is handled transparently by the CPU.
//!
//! - A **multiprocessor** system executing multiple hardware threads
//! at the same time: In multi-threaded scenarios, you can use two
//! kinds of primitives to deal with synchronization:
//! - [memory fences] to ensure memory accesses are made visible to
//! other CPUs in the right order.
//! - [atomic operations] to ensure simultaneous access to the same
//! memory location doesn't lead to undefined behavior.
//!
//! [prefetching]: https://en.wikipedia.org/wiki/Cache_prefetching
//! [compiler fences]: https://doc.rust-lang.org/std/sync/atomic/fn.compiler_fence.html
//! [out-of-order]: https://en.wikipedia.org/wiki/Out-of-order_execution
//! [superscalar]: https://en.wikipedia.org/wiki/Superscalar_processor
//! [memory fences]: https://doc.rust-lang.org/std/sync/atomic/fn.fence.html
//! [atomic operations]: https://doc.rust-lang.org/std/sync/atomic/index.html
//!
//! ## Higher-level synchronization objects
//!
//! Most of the low-level synchronization primitives are quite error-prone and
//! inconvenient to use, which is why async-std also exposes some
//! higher-level synchronization objects.
//!
//! These abstractions can be built out of lower-level primitives.
//! For efficiency, the sync objects in async-std are usually
//! implemented with help from the scheduler, which is
//! able to reschedule the tasks while they are blocked on acquiring
//! a lock.
//!
//! The following is an overview of the available synchronization
//! objects:
//!
//! - [`Arc`]: Atomically Reference-Counted pointer, which can be used
//! in multithreaded environments to prolong the lifetime of some
//! data until all the threads have finished using it.
//!
//! - [`Barrier`]: Ensures multiple threads will wait for each other
//! to reach a point in the program, before continuing execution all
//! together.
//!
//! - [`channel`]: Multi-producer, multi-consumer queues, used for
//! message-based communication. Can provide a lightweight
//! inter-task synchronisation mechanism, at the cost of some
//! extra memory.
//!
//! - [`Mutex`]: Mutual Exclusion mechanism, which ensures that at
//! most one task at a time is able to access some data.
//!
//! - [`RwLock`]: Provides a mutual exclusion mechanism which allows
//! multiple readers at the same time, while allowing only one
//! writer at a time. In some cases, this can be more efficient than
//! a mutex.
//!
//! [`Arc`]: crate::sync::Arc
//! [`Barrier`]: crate::sync::Barrier
//! [`Condvar`]: crate::sync::Condvar
//! [`channel`]: fn.channel.html
//! [`Mutex`]: crate::sync::Mutex
//! [`Once`]: crate::sync::Once
//! [`RwLock`]: crate::sync::RwLock
//!
//! # Examples
//!
//! Spawn a task that updates an integer protected by a mutex:

View file

@ -5,7 +5,7 @@ use std::thread;
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use crate::task::task::{JoinHandle, Tag};
use crate::utils::abort_on_panic;
@ -19,30 +19,30 @@ struct Pool {
receiver: Receiver<async_task::Task<Tag>>,
}
lazy_static! {
static ref POOL: Pool = {
for _ in 0..2 {
thread::Builder::new()
.name("async-blocking-driver".to_string())
.spawn(|| abort_on_panic(|| {
static POOL: Lazy<Pool> = Lazy::new(|| {
for _ in 0..2 {
thread::Builder::new()
.name("async-blocking-driver".to_string())
.spawn(|| {
abort_on_panic(|| {
for task in &POOL.receiver {
task.run();
}
}))
.expect("cannot start a thread driving blocking tasks");
}
})
})
.expect("cannot start a thread driving blocking tasks");
}
// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
// the number of buffers that work must flow through
// before being acted on by a core. This helps keep
// latency snappy in the overall async system by
// reducing bufferbloat.
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
};
}
// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
// the number of buffers that work must flow through
// before being acted on by a core. This helps keep
// latency snappy in the overall async system by
// reducing bufferbloat.
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
});
// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't

View file

@ -3,7 +3,7 @@ use std::thread;
use crossbeam_deque::{Injector, Stealer, Worker};
use kv_log_macro::trace;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use super::sleepers::Sleepers;
use super::task;
@ -111,28 +111,26 @@ impl Pool {
#[inline]
pub(crate) fn get() -> &'static Pool {
lazy_static! {
static ref POOL: Pool = {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
static POOL: Lazy<Pool> = Lazy::new(|| {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
// Spawn worker threads.
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
// Spawn worker threads.
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| abort_on_panic(|| worker::main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| abort_on_panic(|| worker::main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
};
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
});
&*POOL
}

View file

@ -5,7 +5,7 @@ use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use super::worker;
use crate::utils::abort_on_panic;
@ -174,9 +174,7 @@ impl<T: Send + 'static> LocalKey<T> {
fn key(&self) -> usize {
#[cold]
fn init(key: &AtomicUsize) -> usize {
lazy_static! {
static ref COUNTER: Mutex<usize> = Mutex::new(1);
}
static COUNTER: Lazy<Mutex<usize>> = Lazy::new(|| Mutex::new(1));
let mut counter = COUNTER.lock().unwrap();
let prev = key.compare_and_swap(0, *counter, Ordering::AcqRel);