diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 653834a..dd8ec89 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,8 +59,10 @@ jobs: - uses: actions-rs/toolchain@v1 with: + profile: minimal toolchain: ${{ steps.component.outputs.toolchain }} override: true + components: rustfmt - name: setup run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index f0e735a..eda6038 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,63 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview ## [Unreleased] +# [0.99.11] - 2019-10-29 + +This patch introduces `async_std::sync::channel`, a novel asynchronous port of +the ultra-fast Crossbeam channels. This has been one of the most anticipated +features for async-std, and we're excited to be providing a first version of +this! + +In addition to channels, this patch has the regular list of new methods, types, +and doc fixes. + +## Examples + +__Send and receive items from a channel__ +```rust +// Create a bounded channel with a max-size of 1 +let (s, r) = channel(1); + +// This call returns immediately because there is enough space in the channel. +s.send(1).await; + +task::spawn(async move { + // This call blocks the current task because the channel is full. + // It will be able to complete only after the first message is received. + s.send(2).await; +}); + +// Receive items from the channel +task::sleep(Duration::from_secs(1)).await; +assert_eq!(r.recv().await, Some(1)); +assert_eq!(r.recv().await, Some(2)); +``` + +## Added +- Added `Future::delay` as "unstable" +- Added `Stream::flat_map` as "unstable" +- Added `Stream::flatten` as "unstable" +- Added `Stream::product` as "unstable" +- Added `Stream::sum` as "unstable" +- Added `Stream::min_by_key` +- Added `Stream::max_by` +- Added `Stream::timeout` as "unstable" +- Added `sync::channel` as "unstable". +- Added doc links from instantiated structs to the methods that create them. +- Implemented `Extend` + `FromStream` for `PathBuf`. + +## Changed +- Fixed an issue with `block_on` so it works even when nested. +- Fixed issues with our Clippy check on CI. +- Replaced our uses of `cfg_if` with our own macros, simplifying the codebase. +- Updated the homepage link in `Cargo.toml` to point to [async.rs](https://async.rs). +- Updated the module-level documentation for `stream` and `sync`. +- Various typos and grammar fixes. +- Removed redundant file flushes, improving the performance of `File` operations + +## Removed +Nothing was removed in this release. + # [0.99.10] - 2019-10-16 This patch stabilizes several core concurrency macros, introduces async versions @@ -281,7 +338,8 @@ task::blocking(async { - Initial beta release -[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.10...HEAD +[Unreleased]: https://github.com/async-rs/async-std/compare/v0.99.11...HEAD +[0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.10...v0.99.11 [0.99.10]: https://github.com/async-rs/async-std/compare/v0.99.9...v0.99.10 [0.99.9]: https://github.com/async-rs/async-std/compare/v0.99.8...v0.99.9 [0.99.8]: https://github.com/async-rs/async-std/compare/v0.99.7...v0.99.8 diff --git a/Cargo.toml b/Cargo.toml index ad88730..dcf2c7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-std" -version = "0.99.10" +version = "0.99.11" authors = [ "Stjepan Glavina ", "Yoshua Wuyts ", @@ -33,12 +33,12 @@ crossbeam-utils = "0.6.6" futures-core-preview = "=0.3.0-alpha.19" futures-io-preview = "=0.3.0-alpha.19" futures-timer = "1.0.2" -lazy_static = "1.4.0" log = { version = "0.4.8", features = ["kv_unstable"] } memchr = "2.2.1" mio = "0.6.19" mio-uds = "0.6.7" num_cpus = "1.10.1" +once_cell = "1.2.0" pin-utils = "0.1.0-alpha.4" slab = "0.4.2" kv-log-macro = "1.0.4" diff --git a/README.md b/README.md index 7aeaed8..9af20a3 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ syntax. ## Features - __Modern:__ Built from the ground up for `std::future` and `async/await` with - blazing fast compilation times. + blazing fast compilation time. - __Fast:__ Our robust allocator and threadpool designs provide ultra-high throughput with predictably low latency. - __Intuitive:__ Complete parity with the stdlib means you only need to learn diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index 806acdb..40e0abb 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -1,8 +1,8 @@ use std::fmt; use std::sync::{Arc, Mutex}; -use lazy_static::lazy_static; use mio::{self, Evented}; +use once_cell::sync::Lazy; use slab::Slab; use crate::io; @@ -100,25 +100,23 @@ impl Reactor { // } } -lazy_static! { - /// The state of the global networking driver. - static ref REACTOR: Reactor = { - // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O - // handles. - std::thread::Builder::new() - .name("async-net-driver".to_string()) - .spawn(move || { - // If the driver thread panics, there's not much we can do. It is not a - // recoverable error and there is no place to propagate it into so we just abort. - abort_on_panic(|| { - main_loop().expect("async networking thread has panicked"); - }) +/// The state of the global networking driver. +static REACTOR: Lazy = Lazy::new(|| { + // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O + // handles. + std::thread::Builder::new() + .name("async-net-driver".to_string()) + .spawn(move || { + // If the driver thread panics, there's not much we can do. It is not a + // recoverable error and there is no place to propagate it into so we just abort. + abort_on_panic(|| { + main_loop().expect("async networking thread has panicked"); }) - .expect("cannot start a thread driving blocking tasks"); + }) + .expect("cannot start a thread driving blocking tasks"); - Reactor::new().expect("cannot initialize reactor") - }; -} + Reactor::new().expect("cannot initialize reactor") +}); /// Waits on the poller for new events and wakes up tasks blocked on I/O handles. fn main_loop() -> io::Result<()> { diff --git a/src/net/mod.rs b/src/net/mod.rs index b3ae287..29e4309 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,14 +1,42 @@ //! Networking primitives for TCP/UDP communication. //! -//! For OS-specific networking primitives like Unix domain sockets, refer to the [`async_std::os`] -//! module. +//! This module provides networking functionality for the Transmission Control and User +//! Datagram Protocols, as well as types for IP and socket addresses. //! //! This module is an async version of [`std::net`]. //! +//! # Organization +//! +//! * [`TcpListener`] and [`TcpStream`] provide functionality for communication over TCP +//! * [`UdpSocket`] provides functionality for communication over UDP +//! * [`IpAddr`] represents IP addresses of either IPv4 or IPv6; [`Ipv4Addr`] and +//! [`Ipv6Addr`] are respectively IPv4 and IPv6 addresses +//! * [`SocketAddr`] represents socket addresses of either IPv4 or IPv6; [`SocketAddrV4`] +//! and [`SocketAddrV6`] are respectively IPv4 and IPv6 socket addresses +//! * [`ToSocketAddrs`] is a trait that used for generic address resolution when interacting +//! with networking objects like [`TcpListener`], [`TcpStream`] or [`UdpSocket`] +//! * Other types are return or parameter types for various methods in this module +//! +//! [`IpAddr`]: enum.IpAddr.html +//! [`Ipv4Addr`]: struct.Ipv4Addr.html +//! [`Ipv6Addr`]: struct.Ipv6Addr.html +//! [`SocketAddr`]: enum.SocketAddr.html +//! [`SocketAddrV4`]: struct.SocketAddrV4.html +//! [`SocketAddrV6`]: struct.SocketAddrV6.html +//! [`TcpListener`]: struct.TcpListener.html +//! [`TcpStream`]: struct.TcpStream.html +//! [`ToSocketAddrs`]: trait.ToSocketAddrs.html +//! [`UdpSocket`]: struct.UdpSocket.html +//! +//! # Platform-specific extensions +//! +//! APIs such as Unix domain sockets are available on certain platforms only. You can find +//! platform-specific extensions in the [`async_std::os`] module. +//! //! [`async_std::os`]: ../os/index.html //! [`std::net`]: https://doc.rust-lang.org/std/net/index.html //! -//! ## Examples +//! # Examples //! //! A simple UDP echo server: //! diff --git a/src/stream/mod.rs b/src/stream/mod.rs index e796510..6db0dbe 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -2,24 +2,303 @@ //! //! This module is an async version of [`std::iter`]. //! -//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html +//! If you've found yourself with an asynchronous collection of some kind, +//! and needed to perform an operation on the elements of said collection, +//! you'll quickly run into 'streams'. Streams are heavily used in idiomatic +//! asynchronous Rust code, so it's worth becoming familiar with them. //! -//! # Examples +//! Before explaining more, let's talk about how this module is structured: +//! +//! # Organization +//! +//! This module is largely organized by type: +//! +//! * [Traits] are the core portion: these traits define what kind of streams +//! exist and what you can do with them. The methods of these traits are worth +//! putting some extra study time into. +//! * [Functions] provide some helpful ways to create some basic streams. +//! * [Structs] are often the return types of the various methods on this +//! module's traits. You'll usually want to look at the method that creates +//! the `struct`, rather than the `struct` itself. For more detail about why, +//! see '[Implementing Stream](#implementing-stream)'. +//! +//! [Traits]: #traits +//! [Functions]: #functions +//! [Structs]: #structs +//! +//! That's it! Let's dig into streams. +//! +//! # Stream +//! +//! The heart and soul of this module is the [`Stream`] trait. The core of +//! [`Stream`] looks like this: //! //! ``` -//! # async_std::task::block_on(async { +//! # use async_std::task::{Context, Poll}; +//! # use std::pin::Pin; +//! trait Stream { +//! type Item; +//! fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; +//! } +//! ``` +//! +//! A stream has a method, [`next`], which when called, returns an +//! [`Poll`]<[`Option`]`>`. [`next`] will return `Ready(Some(Item))` +//! as long as there are elements, and once they've all been exhausted, will +//! return `None` to indicate that iteration is finished. If we're waiting on +//! something asynchronous to resolve `Pending` is returned. +//! +//! Individual streams may choose to resume iteration, and so calling +//! [`next`] again may or may not eventually start returning `Ready(Some(Item))` +//! again at some point. +//! +//! [`Stream`]'s full definition includes a number of other methods as well, +//! but they are default methods, built on top of [`next`], and so you get +//! them for free. +//! +//! Streams are also composable, and it's common to chain them together to do +//! more complex forms of processing. See the [Adapters](#adapters) section +//! below for more details. +//! +//! [`Poll`]: ../task/enum.Poll.html +//! [`Stream`]: trait.Stream.html +//! [`next`]: trait.Stream.html#tymethod.next +//! [`Option`]: ../../std/option/enum.Option.html +//! +//! # The three forms of streaming +//! +//! There are three common methods which can create streams from a collection: +//! +//! * `stream()`, which iterates over `&T`. +//! * `stream_mut()`, which iterates over `&mut T`. +//! * `into_stream()`, which iterates over `T`. +//! +//! Various things in async-std may implement one or more of the +//! three, where appropriate. +//! +//! # Implementing Stream +//! +//! Creating a stream of your own involves two steps: creating a `struct` to +//! hold the stream's state, and then `impl`ementing [`Stream`] for that +//! `struct`. This is why there are so many `struct`s in this module: there is +//! one for each stream and iterator adapter. +//! +//! Let's make a stream named `Counter` which counts from `1` to `5`: +//! +//! ``` +//! # use async_std::prelude::*; +//! # use async_std::task::{Context, Poll}; +//! # use std::pin::Pin; +//! // First, the struct: +//! +//! /// A stream which counts from one to five +//! struct Counter { +//! count: usize, +//! } +//! +//! // we want our count to start at one, so let's add a new() method to help. +//! // This isn't strictly necessary, but is convenient. Note that we start +//! // `count` at zero, we'll see why in `next()`'s implementation below. +//! impl Counter { +//! fn new() -> Counter { +//! Counter { count: 0 } +//! } +//! } +//! +//! // Then, we implement `Stream` for our `Counter`: +//! +//! impl Stream for Counter { +//! // we will be counting with usize +//! type Item = usize; +//! +//! // poll_next() is the only required method +//! fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +//! // Increment our count. This is why we started at zero. +//! self.count += 1; +//! +//! // Check to see if we've finished counting or not. +//! if self.count < 6 { +//! Poll::Ready(Some(self.count)) +//! } else { +//! Poll::Ready(None) +//! } +//! } +//! } +//! +//! // And now we can use it! +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # -//! use async_std::prelude::*; -//! use async_std::stream; +//! let mut counter = Counter::new(); //! -//! let mut s = stream::repeat(9).take(3); +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); //! -//! while let Some(v) = s.next().await { -//! assert_eq!(v, 9); +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! +//! let x = counter.next().await.unwrap(); +//! println!("{}", x); +//! # +//! # Ok(()) }) } +//! ``` +//! +//! This will print `1` through `5`, each on their own line. +//! +//! Calling `next().await` this way gets repetitive. Rust has a construct which +//! can call `next()` on your stream, until it reaches `None`. Let's go over +//! that next. +//! +//! # while let Loops and IntoStream +//! +//! Rust's `while let` loop syntax is an idiomatic way to iterate over streams. Here's a basic +//! example of `while let`: +//! +//! ``` +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! # use async_std::prelude::*; +//! # use async_std::stream; +//! let mut values = stream::repeat(1u8).take(5); +//! +//! while let Some(x) = values.next().await { +//! println!("{}", x); //! } //! # -//! # }) +//! # Ok(()) }) } //! ``` +//! +//! This will print the numbers one through five, each on their own line. But +//! you'll notice something here: we never called anything on our vector to +//! produce a stream. What gives? +//! +//! There's a trait in the standard library for converting something into an +//! stream: [`IntoStream`]. This trait has one method, [`into_stream], +//! which converts the thing implementing [`IntoStream`] into a stream. +//! +//! Unlike `std::iter::IntoIterator`, `IntoStream` does not have compiler +//! support yet. This means that automatic conversions like with `for` loops +//! doesn't occur yet, and `into_stream` will always have to be called manually. +//! +//! [`IntoStream`]: trait.IntoStream.html +//! [`into_stream`]: trait.IntoStream.html#tymethod.into_stream +//! +//! # Adapters +//! +//! Functions which take an [`Stream`] and return another [`Stream`] are +//! often called 'stream adapters', as they are a form of the 'adapter +//! pattern'. +//! +//! Common stream adapters include [`map`], [`take`], and [`filter`]. +//! For more, see their documentation. +//! +//! [`map`]: trait.Stream.html#method.map +//! [`take`]: trait.Stream.html#method.take +//! [`filter`]: trait.Stream.html#method.filter +//! +//! # Laziness +//! +//! Streams (and stream [adapters](#adapters)) are *lazy*. This means that +//! just creating a stream doesn't _do_ a whole lot. Nothing really happens +//! until you call [`next`]. This is sometimes a source of confusion when +//! creating a stream solely for its side effects. For example, the [`map`] +//! method calls a closure on each element it iterates over: +//! +//! ``` +//! # #![allow(unused_must_use)] +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! # use async_std::prelude::*; +//! # use async_std::stream; +//! let v = stream::repeat(1u8).take(5); +//! v.map(|x| println!("{}", x)); +//! # +//! # Ok(()) }) } +//! ``` +//! +//! This will not print any values, as we only created a stream, rather than +//! using it. The compiler will warn us about this kind of behavior: +//! +//! ```text +//! warning: unused result that must be used: streams are lazy and +//! do nothing unless consumed +//! ``` +//! +//! The idiomatic way to write a [`map`] for its side effects is to use a +//! `while let` loop instead: +//! +//! ``` +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! # use async_std::prelude::*; +//! # use async_std::stream; +//! let mut v = stream::repeat(1u8).take(5); +//! +//! while let Some(x) = &v.next().await { +//! println!("{}", x); +//! } +//! # +//! # Ok(()) }) } +//! ``` +//! +//! [`map`]: trait.Stream.html#method.map +//! +//! The two most common ways to evaluate a stream are to use a `while let` loop +//! like this, or using the [`collect`] method to produce a new collection. +//! +//! [`collect`]: trait.Stream.html#method.collect +//! +//! # Infinity +//! +//! Streams do not have to be finite. As an example, an repeat stream is +//! an infinite stream: +//! +//! ``` +//! # use async_std::stream; +//! let numbers = stream::repeat(1u8); +//! ``` +//! +//! It is common to use the [`take`] stream adapter to turn an infinite +//! stream into a finite one: +//! +//! ``` +//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { +//! # +//! # use async_std::prelude::*; +//! # use async_std::stream; +//! let numbers = stream::repeat(1u8); +//! let mut five_numbers = numbers.take(5); +//! +//! while let Some(number) = five_numbers.next().await { +//! println!("{}", number); +//! } +//! # +//! # Ok(()) }) } +//! ``` +//! +//! This will print the numbers `0` through `4`, each on their own line. +//! +//! Bear in mind that methods on infinite streams, even those for which a +//! result can be determined mathematically in finite time, may not terminate. +//! Specifically, methods such as [`min`], which in the general case require +//! traversing every element in the stream, are likely not to return +//! successfully for any infinite streams. +//! +//! ```ignore +//! let ones = async_std::stream::repeat(1); +//! let least = ones.min().await.unwrap(); // Oh no! An infinite loop! +//! // `ones.min()` causes an infinite loop, so we won't reach this point! +//! println!("The smallest number one is {}.", least); +//! ``` +//! +//! [`std::iter`]: https://doc.rust-lang.org/std/iter/index.html +//! [`take`]: trait.Stream.html#method.take +//! [`min`]: trait.Stream.html#method.min pub use empty::{empty, Empty}; pub use from_fn::{from_fn, FromFn}; diff --git a/src/stream/stream/eq.rs b/src/stream/stream/eq.rs new file mode 100644 index 0000000..5343c1a --- /dev/null +++ b/src/stream/stream/eq.rs @@ -0,0 +1,63 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + // Lexicographically compares the elements of this `Stream` with those + // of another. + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct EqFuture { + #[pin] + l: Fuse, + #[pin] + r: Fuse, + } +} + +impl EqFuture +where + L::Item: PartialEq, +{ + pub(super) fn new(l: L, r: R) -> Self { + EqFuture { + l: l.fuse(), + r: r.fuse(), + } + } +} + +impl Future for EqFuture +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx)); + let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx)); + + if this.l.done && this.r.done { + return Poll::Ready(true); + } + + match (l_val, r_val) { + (Some(l), Some(r)) if l != r => { + return Poll::Ready(false); + } + _ => {} + } + } + } +} diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 39af9cb..6297bef 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -6,8 +6,7 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { - /// A `Stream` that is permanently closed once a single call to `poll` results in - /// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`. + /// A stream that yields `None` forever after the underlying stream yields `None` once. /// /// This `struct` is created by the [`fuse`] method on [`Stream`]. See its /// documentation for more. diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs new file mode 100644 index 0000000..a626b28 --- /dev/null +++ b/src/stream/stream/max_by.rs @@ -0,0 +1,57 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct MaxByFuture { + #[pin] + stream: S, + compare: F, + max: Option, + } +} + +impl MaxByFuture { + pub(super) fn new(stream: S, compare: F) -> Self { + MaxByFuture { + stream, + compare, + max: None, + } + } +} + +impl Future for MaxByFuture +where + S: Stream, + F: FnMut(&S::Item, &S::Item) -> Ordering, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(new) => { + cx.waker().wake_by_ref(); + match this.max.take() { + None => *this.max = Some(new), + Some(old) => match (this.compare)(&new, &old) { + Ordering::Greater => *this.max = Some(new), + _ => *this.max = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(this.max.take()), + } + } +} diff --git a/src/stream/stream/min.rs b/src/stream/stream/min.rs new file mode 100644 index 0000000..1ab5606 --- /dev/null +++ b/src/stream/stream/min.rs @@ -0,0 +1,60 @@ +use std::marker::PhantomData; +use std::cmp::{Ordering, Ord}; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct MinFuture { + #[pin] + stream: S, + _compare: PhantomData, + min: Option, + } +} + +impl MinFuture { + pub(super) fn new(stream: S) -> Self { + Self { + stream, + _compare: PhantomData, + min: None, + } + } +} + +impl Future for MinFuture +where + S: Stream, + S::Item: Ord, + F: FnMut(&S::Item, &S::Item) -> Ordering, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(new) => { + cx.waker().wake_by_ref(); + match this.min.take() { + None => *this.min = Some(new), + + Some(old) => match new.cmp(&old) { + Ordering::Less => *this.min = Some(new), + _ => *this.min = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(this.min.take()), + } + } +} diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs new file mode 100644 index 0000000..6557f22 --- /dev/null +++ b/src/stream/stream/min_by_key.rs @@ -0,0 +1,60 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct MinByKeyFuture { + #[pin] + stream: S, + min: Option, + key_by: K, + } +} + +impl MinByKeyFuture { + pub(super) fn new(stream: S, key_by: K) -> Self { + MinByKeyFuture { + stream, + min: None, + key_by, + } + } +} + +impl Future for MinByKeyFuture +where + S: Stream, + K: FnMut(&S::Item) -> S::Item, + S::Item: Ord, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(new) => { + let new = (this.key_by)(&new); + cx.waker().wake_by_ref(); + match this.min.take() { + None => *this.min = Some(new), + + Some(old) => match new.cmp(&old) { + Ordering::Less => *this.min = Some(new), + _ => *this.min = Some(old), + }, + } + Poll::Pending + } + None => Poll::Ready(this.min.take()), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e1a51bf..71139f4 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -26,6 +26,7 @@ mod any; mod chain; mod cmp; mod enumerate; +mod eq; mod filter; mod filter_map; mod find; @@ -40,10 +41,15 @@ mod last; mod le; mod lt; mod map; +mod max_by; +mod min; mod min_by; +mod min_by_key; +mod ne; mod next; mod nth; mod partial_cmp; +mod position; mod scan; mod skip; mod skip_while; @@ -58,6 +64,7 @@ use all::AllFuture; use any::AnyFuture; use cmp::CmpFuture; use enumerate::Enumerate; +use eq::EqFuture; use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture; @@ -68,10 +75,15 @@ use gt::GtFuture; use last::LastFuture; use le::LeFuture; use lt::LtFuture; +use max_by::MaxByFuture; +use min::MinFuture; use min_by::MinByFuture; +use min_by_key::MinByKeyFuture; +use ne::NeFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; +use position::PositionFuture; use try_fold::TryFoldFuture; use try_for_each::TryForEeachFuture; @@ -505,9 +517,11 @@ extension_trait! { } #[doc = r#" - Transforms this `Stream` into a "fused" `Stream` such that after the first time - `poll` returns `Poll::Ready(None)`, all future calls to `poll` will also return - `Poll::Ready(None)`. + Creates a stream which ends after the first `None`. + + After a stream returns `None`, future calls may or may not yield `Some(T)` again. + `fuse()` adapts an iterator, ensuring that after a `None` is given, it will always + return `None` forever. # Examples @@ -678,6 +692,43 @@ extension_trait! { FilterMap::new(self, f) } + #[doc = r#" + Returns the element that gives the minimum value with respect to the + specified key function. If several elements are equally minimum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, -3].into_iter().collect(); + + let min = s.clone().min_by_key(|x| x.abs()).await; + assert_eq!(min, Some(1)); + + let min = VecDeque::::new().min_by_key(|x| x.abs()).await; + assert_eq!(min, None); + # + # }) } + ``` + "#] + fn min_by_key( + self, + key_by: K, + ) -> impl Future> [MinByKeyFuture] + where + Self: Sized, + Self::Item: Ord, + K: FnMut(&Self::Item) -> Self::Item, + { + MinByKeyFuture::new(self, key_by) + } + #[doc = r#" Returns the element that gives the minimum value with respect to the specified comparison function. If several elements are equally minimum, @@ -717,6 +768,79 @@ extension_trait! { MinByFuture::new(self, compare) } + #[doc = r#" + Returns the element that gives the minimum value. If several elements are equally minimum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + + let min = s.clone().min().await; + assert_eq!(min, Some(1)); + + let min = VecDeque::::new().min().await; + assert_eq!(min, None); + # + # }) } + ``` + "#] + fn min( + self, + ) -> impl Future> [MinFuture] + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + MinFuture::new(self) + } + + #[doc = r#" + Returns the element that gives the maximum value with respect to the + specified comparison function. If several elements are equally maximum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + + let max = s.clone().max_by(|x, y| x.cmp(y)).await; + assert_eq!(max, Some(3)); + + let max = s.max_by(|x, y| y.cmp(x)).await; + assert_eq!(max, Some(1)); + + let max = VecDeque::::new().max_by(|x, y| x.cmp(y)).await; + assert_eq!(max, None); + # + # }) } + ``` + "#] + fn max_by( + self, + compare: F, + ) -> impl Future> [MaxByFuture] + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + MaxByFuture::new(self, compare) + } + #[doc = r#" Returns the nth element of the stream. @@ -1466,6 +1590,45 @@ extension_trait! { PartialCmpFuture::new(self, other) } + #[doc = r#" + Searches for an element in a Stream that satisfies a predicate, returning + its index. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let res = s.clone().position(|x| *x == 1).await; + assert_eq!(res, Some(0)); + + let res = s.clone().position(|x| *x == 2).await; + assert_eq!(res, Some(1)); + + let res = s.clone().position(|x| *x == 3).await; + assert_eq!(res, Some(2)); + + let res = s.clone().position(|x| *x == 4).await; + assert_eq!(res, None); + # + # }) } + ``` + "#] + fn position

( + self, + predicate: P + ) -> impl Future> [PositionFuture] + where + Self: Sized, + P: FnMut(&Self::Item) -> bool, + { + PositionFuture::new(self, predicate) + } + #[doc = r#" Lexicographically compares the elements of this `Stream` with those of another using 'Ord'. @@ -1504,6 +1667,39 @@ extension_trait! { CmpFuture::new(self, other) } + #[doc = r#" + Determines if the elements of this `Stream` are lexicographically + not equal to those of another. + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + let single: VecDeque = vec![1].into_iter().collect(); + let single_ne: VecDeque = vec![10].into_iter().collect(); + let multi: VecDeque = vec![1,2].into_iter().collect(); + let multi_ne: VecDeque = vec![1,5].into_iter().collect(); + assert_eq!(single.clone().ne(single.clone()).await, false); + assert_eq!(single_ne.clone().ne(single.clone()).await, true); + assert_eq!(multi.clone().ne(single_ne.clone()).await, true); + assert_eq!(multi_ne.clone().ne(multi.clone()).await, true); + # + # }) } + ``` + "#] + fn ne( + self, + other: S + ) -> impl Future [NeFuture] + where + Self: Sized + Stream, + S: Sized + Stream, + ::Item: PartialEq, + { + NeFuture::new(self, other) + } + #[doc = r#" Determines if the elements of this `Stream` are lexicographically greater than or equal to those of another. @@ -1540,6 +1736,42 @@ extension_trait! { GeFuture::new(self, other) } + #[doc = r#" + Determines if the elements of this `Stream` are lexicographically + equal to those of another. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let single: VecDeque = vec![1].into_iter().collect(); + let single_eq: VecDeque = vec![10].into_iter().collect(); + let multi: VecDeque = vec![1,2].into_iter().collect(); + let multi_eq: VecDeque = vec![1,5].into_iter().collect(); + assert_eq!(single.clone().eq(single.clone()).await, true); + assert_eq!(single_eq.clone().eq(single.clone()).await, false); + assert_eq!(multi.clone().eq(single_eq.clone()).await, false); + assert_eq!(multi_eq.clone().eq(multi.clone()).await, false); + # + # }) } + ``` + "#] + fn eq( + self, + other: S + ) -> impl Future [EqFuture] + where + Self: Sized + Stream, + S: Sized + Stream, + ::Item: PartialEq, + { + EqFuture::new(self, other) + } + #[doc = r#" Determines if the elements of this `Stream` are lexicographically greater than those of another. diff --git a/src/stream/stream/ne.rs b/src/stream/stream/ne.rs new file mode 100644 index 0000000..ffeaca8 --- /dev/null +++ b/src/stream/stream/ne.rs @@ -0,0 +1,65 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + // Lexicographically compares the elements of this `Stream` with those + // of another. + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct NeFuture { + #[pin] + l: Fuse, + #[pin] + r: Fuse, + } +} + +impl NeFuture +where + L::Item: PartialEq, +{ + pub(super) fn new(l: L, r: R) -> Self { + Self { + l: l.fuse(), + r: r.fuse(), + } + } +} + +impl Future for NeFuture +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialEq, +{ + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let l_val = futures_core::ready!(this.l.as_mut().poll_next(cx)); + let r_val = futures_core::ready!(this.r.as_mut().poll_next(cx)); + + if this.l.done || this.r.done { + return Poll::Ready(false); + } + + match (l_val, r_val) { + (Some(l), Some(r)) if l == r => { + continue; + } + _ => { + return Poll::Ready(true); + } + } + } + } +} diff --git a/src/stream/stream/position.rs b/src/stream/stream/position.rs new file mode 100644 index 0000000..3cd5b84 --- /dev/null +++ b/src/stream/stream/position.rs @@ -0,0 +1,51 @@ +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct PositionFuture { + #[pin] + stream: S, + predicate: P, + index:usize, + } +} + +impl PositionFuture { + pub(super) fn new(stream: S, predicate: P) -> Self { + PositionFuture { + stream, + predicate, + index: 0, + } + } +} + +impl Future for PositionFuture +where + S: Stream, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let next = futures_core::ready!(this.stream.poll_next(cx)); + + match next { + Some(v) if (this.predicate)(&v) => Poll::Ready(Some(*this.index)), + Some(_) => { + cx.waker().wake_by_ref(); + *this.index += 1; + Poll::Pending + } + None => Poll::Ready(None), + } + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 3ad2776..d10e6bd 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -4,6 +4,152 @@ //! //! [`std::sync`]: https://doc.rust-lang.org/std/sync/index.html //! +//! ## The need for synchronization +//! +//! async-std's sync primitives are scheduler-aware, making it possible to +//! `.await` their operations - for example the locking of a [`Mutex`]. +//! +//! Conceptually, a Rust program is a series of operations which will +//! be executed on a computer. The timeline of events happening in the +//! program is consistent with the order of the operations in the code. +//! +//! Consider the following code, operating on some global static variables: +//! +//! ```rust +//! static mut A: u32 = 0; +//! static mut B: u32 = 0; +//! static mut C: u32 = 0; +//! +//! fn main() { +//! unsafe { +//! A = 3; +//! B = 4; +//! A = A + B; +//! C = B; +//! println!("{} {} {}", A, B, C); +//! C = A; +//! } +//! } +//! ``` +//! +//! It appears as if some variables stored in memory are changed, an addition +//! is performed, result is stored in `A` and the variable `C` is +//! modified twice. +//! +//! When only a single thread is involved, the results are as expected: +//! the line `7 4 4` gets printed. +//! +//! As for what happens behind the scenes, when optimizations are enabled the +//! final generated machine code might look very different from the code: +//! +//! - The first store to `C` might be moved before the store to `A` or `B`, +//! _as if_ we had written `C = 4; A = 3; B = 4`. +//! +//! - Assignment of `A + B` to `A` might be removed, since the sum can be stored +//! in a temporary location until it gets printed, with the global variable +//! never getting updated. +//! +//! - The final result could be determined just by looking at the code +//! at compile time, so [constant folding] might turn the whole +//! block into a simple `println!("7 4 4")`. +//! +//! The compiler is allowed to perform any combination of these +//! optimizations, as long as the final optimized code, when executed, +//! produces the same results as the one without optimizations. +//! +//! Due to the [concurrency] involved in modern computers, assumptions +//! about the program's execution order are often wrong. Access to +//! global variables can lead to nondeterministic results, **even if** +//! compiler optimizations are disabled, and it is **still possible** +//! to introduce synchronization bugs. +//! +//! Note that thanks to Rust's safety guarantees, accessing global (static) +//! variables requires `unsafe` code, assuming we don't use any of the +//! synchronization primitives in this module. +//! +//! [constant folding]: https://en.wikipedia.org/wiki/Constant_folding +//! [concurrency]: https://en.wikipedia.org/wiki/Concurrency_(computer_science) +//! +//! ## Out-of-order execution +//! +//! Instructions can execute in a different order from the one we define, due to +//! various reasons: +//! +//! - The **compiler** reordering instructions: If the compiler can issue an +//! instruction at an earlier point, it will try to do so. For example, it +//! might hoist memory loads at the top of a code block, so that the CPU can +//! start [prefetching] the values from memory. +//! +//! In single-threaded scenarios, this can cause issues when writing +//! signal handlers or certain kinds of low-level code. +//! Use [compiler fences] to prevent this reordering. +//! +//! - A **single processor** executing instructions [out-of-order]: +//! Modern CPUs are capable of [superscalar] execution, +//! i.e., multiple instructions might be executing at the same time, +//! even though the machine code describes a sequential process. +//! +//! This kind of reordering is handled transparently by the CPU. +//! +//! - A **multiprocessor** system executing multiple hardware threads +//! at the same time: In multi-threaded scenarios, you can use two +//! kinds of primitives to deal with synchronization: +//! - [memory fences] to ensure memory accesses are made visible to +//! other CPUs in the right order. +//! - [atomic operations] to ensure simultaneous access to the same +//! memory location doesn't lead to undefined behavior. +//! +//! [prefetching]: https://en.wikipedia.org/wiki/Cache_prefetching +//! [compiler fences]: https://doc.rust-lang.org/std/sync/atomic/fn.compiler_fence.html +//! [out-of-order]: https://en.wikipedia.org/wiki/Out-of-order_execution +//! [superscalar]: https://en.wikipedia.org/wiki/Superscalar_processor +//! [memory fences]: https://doc.rust-lang.org/std/sync/atomic/fn.fence.html +//! [atomic operations]: https://doc.rust-lang.org/std/sync/atomic/index.html +//! +//! ## Higher-level synchronization objects +//! +//! Most of the low-level synchronization primitives are quite error-prone and +//! inconvenient to use, which is why async-std also exposes some +//! higher-level synchronization objects. +//! +//! These abstractions can be built out of lower-level primitives. +//! For efficiency, the sync objects in async-std are usually +//! implemented with help from the scheduler, which is +//! able to reschedule the tasks while they are blocked on acquiring +//! a lock. +//! +//! The following is an overview of the available synchronization +//! objects: +//! +//! - [`Arc`]: Atomically Reference-Counted pointer, which can be used +//! in multithreaded environments to prolong the lifetime of some +//! data until all the threads have finished using it. +//! +//! - [`Barrier`]: Ensures multiple threads will wait for each other +//! to reach a point in the program, before continuing execution all +//! together. +//! +//! - [`channel`]: Multi-producer, multi-consumer queues, used for +//! message-based communication. Can provide a lightweight +//! inter-task synchronisation mechanism, at the cost of some +//! extra memory. +//! +//! - [`Mutex`]: Mutual Exclusion mechanism, which ensures that at +//! most one task at a time is able to access some data. +//! +//! - [`RwLock`]: Provides a mutual exclusion mechanism which allows +//! multiple readers at the same time, while allowing only one +//! writer at a time. In some cases, this can be more efficient than +//! a mutex. +//! +//! [`Arc`]: crate::sync::Arc +//! [`Barrier`]: crate::sync::Barrier +//! [`Condvar`]: crate::sync::Condvar +//! [`channel`]: fn.channel.html +//! [`Mutex`]: crate::sync::Mutex +//! [`Once`]: crate::sync::Once +//! [`RwLock`]: crate::sync::RwLock +//! //! # Examples //! //! Spawn a task that updates an integer protected by a mutex: diff --git a/src/task/blocking.rs b/src/task/blocking.rs index 3216012..1f1a222 100644 --- a/src/task/blocking.rs +++ b/src/task/blocking.rs @@ -5,7 +5,7 @@ use std::thread; use std::time::Duration; use crossbeam_channel::{bounded, Receiver, Sender}; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use crate::task::task::{JoinHandle, Tag}; use crate::utils::abort_on_panic; @@ -19,30 +19,30 @@ struct Pool { receiver: Receiver>, } -lazy_static! { - static ref POOL: Pool = { - for _ in 0..2 { - thread::Builder::new() - .name("async-blocking-driver".to_string()) - .spawn(|| abort_on_panic(|| { +static POOL: Lazy = Lazy::new(|| { + for _ in 0..2 { + thread::Builder::new() + .name("async-blocking-driver".to_string()) + .spawn(|| { + abort_on_panic(|| { for task in &POOL.receiver { task.run(); } - })) - .expect("cannot start a thread driving blocking tasks"); - } + }) + }) + .expect("cannot start a thread driving blocking tasks"); + } - // We want to use an unbuffered channel here to help - // us drive our dynamic control. In effect, the - // kernel's scheduler becomes the queue, reducing - // the number of buffers that work must flow through - // before being acted on by a core. This helps keep - // latency snappy in the overall async system by - // reducing bufferbloat. - let (sender, receiver) = bounded(0); - Pool { sender, receiver } - }; -} + // We want to use an unbuffered channel here to help + // us drive our dynamic control. In effect, the + // kernel's scheduler becomes the queue, reducing + // the number of buffers that work must flow through + // before being acted on by a core. This helps keep + // latency snappy in the overall async system by + // reducing bufferbloat. + let (sender, receiver) = bounded(0); + Pool { sender, receiver } +}); // Create up to MAX_THREADS dynamic blocking task worker threads. // Dynamic threads will terminate themselves if they don't diff --git a/src/task/pool.rs b/src/task/pool.rs index bfaa17d..3fd7047 100644 --- a/src/task/pool.rs +++ b/src/task/pool.rs @@ -3,7 +3,7 @@ use std::thread; use crossbeam_deque::{Injector, Stealer, Worker}; use kv_log_macro::trace; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use super::sleepers::Sleepers; use super::task; @@ -111,28 +111,26 @@ impl Pool { #[inline] pub(crate) fn get() -> &'static Pool { - lazy_static! { - static ref POOL: Pool = { - let num_threads = num_cpus::get().max(1); - let mut stealers = Vec::new(); + static POOL: Lazy = Lazy::new(|| { + let num_threads = num_cpus::get().max(1); + let mut stealers = Vec::new(); - // Spawn worker threads. - for _ in 0..num_threads { - let worker = Worker::new_fifo(); - stealers.push(worker.stealer()); + // Spawn worker threads. + for _ in 0..num_threads { + let worker = Worker::new_fifo(); + stealers.push(worker.stealer()); - thread::Builder::new() - .name("async-task-driver".to_string()) - .spawn(|| abort_on_panic(|| worker::main_loop(worker))) - .expect("cannot start a thread driving tasks"); - } + thread::Builder::new() + .name("async-task-driver".to_string()) + .spawn(|| abort_on_panic(|| worker::main_loop(worker))) + .expect("cannot start a thread driving tasks"); + } - Pool { - injector: Injector::new(), - stealers, - sleepers: Sleepers::new(), - } - }; - } + Pool { + injector: Injector::new(), + stealers, + sleepers: Sleepers::new(), + } + }); &*POOL } diff --git a/src/task/task_local.rs b/src/task/task_local.rs index c72937f..e92f4f9 100644 --- a/src/task/task_local.rs +++ b/src/task/task_local.rs @@ -5,7 +5,7 @@ use std::future::Future; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; -use lazy_static::lazy_static; +use once_cell::sync::Lazy; use super::worker; use crate::utils::abort_on_panic; @@ -174,9 +174,7 @@ impl LocalKey { fn key(&self) -> usize { #[cold] fn init(key: &AtomicUsize) -> usize { - lazy_static! { - static ref COUNTER: Mutex = Mutex::new(1); - } + static COUNTER: Lazy> = Lazy::new(|| Mutex::new(1)); let mut counter = COUNTER.lock().unwrap(); let prev = key.compare_and_swap(0, *counter, Ordering::AcqRel);