diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 27f03187..bcda9906 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -40,6 +40,7 @@ jobs: with: command: check args: --features unstable --all --bins --examples --tests + - name: check bench uses: actions-rs/cargo@v1 if: matrix.rust == 'nightly' @@ -63,7 +64,31 @@ jobs: uses: actions-rs/cargo@v1 with: command: test - args: --all --features unstable attributes + args: --all --features "unstable attributes" + + - name: documentation test + uses: actions-rs/cargo@v1 + with: + command: test + args: --doc --features "unstable attributes" + + build__with_no_std: + name: Build with no-std + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@master + + - name: setup + run: | + rustup default nightly + rustup target add thumbv7m-none-eabi + + - name: check no_std + uses: actions-rs/cargo@v1 + with: + command: check + args: --no-default-features --features alloc --target thumbv7m-none-eabi -Z avoid-dev-deps check_fmt_and_docs: name: Checking fmt and docs diff --git a/CHANGELOG.md b/CHANGELOG.md index fe32794c..e464ed76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,130 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview ## [Unreleased] +# [1.5.0] - 2020-02-03 + +[API Documentation](https://docs.rs/async-std/1.5.0/async-std) + +This patch includes various quality of life improvements to async-std. +Including improved performance, stability, and the addition of various +`Clone` impls that replace the use of `Arc` in many cases. + +## Added + +- Added links to various ecosystem projects from the README ([#660](https://github.com/async-rs/async-std/pull/660)) +- Added an example on `FromStream` for `Result` ([#643](https://github.com/async-rs/async-std/pull/643)) +- Added `stream::pending` as "unstable" ([#615](https://github.com/async-rs/async-std/pull/615)) +- Added an example of `stream::timeout` to document the error flow ([#675](https://github.com/async-rs/async-std/pull/675)) +- Implement `Clone` for `DirEntry` ([#682](https://github.com/async-rs/async-std/pull/682)) +- Implement `Clone` for `TcpStream` ([#689](https://github.com/async-rs/async-std/pull/689)) + +## Changed + +- Removed internal comment on `stream::Interval` ([#645](https://github.com/async-rs/async-std/pull/645)) +- The "unstable" feature can now be used without requiring the "default" feature ([#647](https://github.com/async-rs/async-std/pull/647)) +- Removed unnecessary trait bound on `stream::FlatMap` ([#651](https://github.com/async-rs/async-std/pull/651)) +- Updated the "broadcaster" dependency used by "unstable" to `1.0.0` ([#681](https://github.com/async-rs/async-std/pull/681)) +- Updated `async-task` to 1.2.1 ([#676](https://github.com/async-rs/async-std/pull/676)) +- `task::block_on` now parks after a single poll, improving performance in many cases ([#684](https://github.com/async-rs/async-std/pull/684)) +- Improved reading flow of the "client" part of the async-std tutorial ([#550](https://github.com/async-rs/async-std/pull/550)) +- Use `take_while` instead of `scan` in `impl` of `Product`, `Sum` and `FromStream` ([#667](https://github.com/async-rs/async-std/pull/667)) +- `TcpStream::connect` no longer uses a thread from the threadpool, improving performance ([#687](https://github.com/async-rs/async-std/pull/687)) + +## Fixed + +- Fixed crate documentation typo ([#655](https://github.com/async-rs/async-std/pull/655)) +- Fixed documentation for `UdpSocket::recv` ([#648](https://github.com/async-rs/async-std/pull/648)) +- Fixed documentation for `UdpSocket::send` ([#671](https://github.com/async-rs/async-std/pull/671)) +- Fixed typo in stream documentation ([#650](https://github.com/async-rs/async-std/pull/650)) +- Fixed typo on `sync::JoinHandle` documentation ([#659](https://github.com/async-rs/async-std/pull/659)) +- Removed use of `std::error::Error::description` which failed CI ([#661](https://github.com/async-rs/async-std/pull/662)) +- Removed the use of rustfmt's unstable `format_code_in_doc_comments` option which failed CI ([#685](https://github.com/async-rs/async-std/pull/685)) +- Fixed a code typo in the `task::sleep` example ([#688](https://github.com/async-rs/async-std/pull/688)) + +# [1.4.0] - 2019-12-20 + +[API Documentation](https://docs.rs/async-std/1.4.0/async-std) + +This patch adds `Future::timeout`, providing a method counterpart to the +`future::timeout` free function. And includes several bug fixes around missing +APIs. Notably we're not shipping our new executor yet, first announced [on our +blog](https://async.rs/blog/stop-worrying-about-blocking-the-new-async-std-runtime/). + +## Examples + +```rust +use async_std::prelude::*; +use async_std::future; +use std::time::Duration; + +let fut = future::pending::<()>(); // This future will never resolve. +let res = fut.timeout(Duration::from_millis(100)).await; +assert!(res.is_err()); // The future timed out, returning an err. +``` + +## Added + +- Added `Future::timeout` as "unstable" [(#600)](https://github.com/async-rs/async-std/pull/600) + +## Fixes + +- Fixed a doc test and enabled it on CI [(#597)](https://github.com/async-rs/async-std/pull/597) +- Fixed a rendering issue with the `stream` submodule documentation [(#621)](https://github.com/async-rs/async-std/pull/621) +- `Write::write_fmt`'s future is now correctly marked as `#[must_use]` [(#628)](https://github.com/async-rs/async-std/pull/628) +- Fixed the missing `io::Bytes` export [(#633)](https://github.com/async-rs/async-std/pull/633) +- Fixed the missing `io::Chain` export [(#633)](https://github.com/async-rs/async-std/pull/633) +- Fixed the missing `io::Take` export [(#633)](https://github.com/async-rs/async-std/pull/633) + +# [1.3.0] - 2019-12-12 + +[API Documentation](https://docs.rs/async-std/1.3.0/async-std) + +This patch introduces `Stream::delay`, more methods on `DoubleEndedStream`, +and improves compile times. `Stream::delay` is a new API that's similar to +[`task::sleep`](https://docs.rs/async-std/1.2.0/async_std/task/fn.sleep.html), +but can be passed as part of as stream, rather than as a separate block. This is +useful for examples, or when manually debugging race conditions. + +## Examples + +```rust +let start = Instant::now(); +let mut s = stream::from_iter(vec![0u8, 1]).delay(Duration::from_millis(200)); + +// The first time will take more than 200ms due to delay. +s.next().await; +assert!(start.elapsed().as_millis() >= 200); + +// There will be no delay after the first time. +s.next().await; +assert!(start.elapsed().as_millis() <= 210); +``` + +## Added + +- Added `Stream::delay` as "unstable" [(#309)](https://github.com/async-rs/async-std/pull/309) +- Added `DoubleEndedStream::next_back` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562) +- Added `DoubleEndedStream::nth_back` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562) +- Added `DoubleEndedStream::rfind` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562) +- Added `DoubleEndedStream::rfold` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562) +- Added `DoubleEndedStream::try_rfold` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562) +- `stream::Once` now implements `DoubleEndedStream` [(#562)](https://github.com/async-rs/async-std/pull/562) +- `stream::FromIter` now implements `DoubleEndedStream` [(#562)](https://github.com/async-rs/async-std/pull/562) + +## Changed + +- Removed our dependency on `async-macros`, speeding up compilation [(#610)](https://github.com/async-rs/async-std/pull/610) + +## Fixes + +- Fixed a link in the task docs [(#598)](https://github.com/async-rs/async-std/pull/598) +- Fixed the `UdpSocket::recv` example [(#603)](https://github.com/async-rs/async-std/pull/603) +- Fixed a link to `task::block_on` [(#608)](https://github.com/async-rs/async-std/pull/608) +- Fixed an incorrect API mention in `task::Builder` [(#612)](https://github.com/async-rs/async-std/pull/612) +- Fixed leftover mentions of `futures-preview` [(#595)](https://github.com/async-rs/async-std/pull/595) +- Fixed a typo in the tutorial [(#614)](https://github.com/async-rs/async-std/pull/614) +- `::poll_close` now closes the write half of the stream [(#618)](https://github.com/async-rs/async-std/pull/618) + # [1.2.0] - 2019-11-27 [API Documentation](https://docs.rs/async-std/1.2.0/async-std) @@ -553,7 +677,10 @@ task::blocking(async { - Initial beta release -[Unreleased]: https://github.com/async-rs/async-std/compare/v1.2.0...HEAD +[Unreleased]: https://github.com/async-rs/async-std/compare/v1.5.0...HEAD +[1.5.0]: https://github.com/async-rs/async-std/compare/v1.4.0...v1.5.0 +[1.4.0]: https://github.com/async-rs/async-std/compare/v1.3.0...v1.4.0 +[1.3.0]: https://github.com/async-rs/async-std/compare/v1.2.0...v1.3.0 [1.2.0]: https://github.com/async-rs/async-std/compare/v1.1.0...v1.2.0 [1.1.0]: https://github.com/async-rs/async-std/compare/v1.0.1...v1.1.0 [1.0.1]: https://github.com/async-rs/async-std/compare/v1.0.0...v1.0.1 diff --git a/Cargo.toml b/Cargo.toml index e5ae02d2..5928a062 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-std" -version = "1.2.0" +version = "1.5.0" authors = [ "Stjepan Glavina ", "Yoshua Wuyts ", @@ -35,47 +35,49 @@ default = [ "num_cpus", "pin-project-lite", ] -docs = ["attributes", "unstable"] -unstable = ["default", "broadcaster"] +docs = ["attributes", "unstable", "default"] +unstable = ["std", "broadcaster", "futures-timer"] attributes = ["async-attributes"] std = [ - "async-macros", + "alloc", "crossbeam-utils", - "futures-core", + "futures-core/std", "futures-io", "memchr", "once_cell", - "pin-project-lite", "pin-utils", "slab", ] +alloc = [ + "futures-core/alloc", + "pin-project-lite", +] [dependencies] async-attributes = { version = "1.1.1", optional = true } -async-macros = { version = "2.0.0", optional = true } -async-task = { version = "1.0.0", optional = true } -broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] } +async-task = { version = "1.2.1", optional = true } +broadcaster = { version = "1.0.0", optional = true } crossbeam-channel = { version = "0.4.0", optional = true } crossbeam-deque = { version = "0.7.2", optional = true } crossbeam-queue = { version = "0.2.0", optional = true } crossbeam-utils = { version = "0.7.0", optional = true } -futures-core = { version = "0.3.1", optional = true } +futures-core = { version = "0.3.1", optional = true, default-features = false } futures-io = { version = "0.3.1", optional = true } futures-timer = { version = "2.0.2", optional = true } kv-log-macro = { version = "1.0.4", optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true } -memchr = { version = "2.2.1", optional = true } +memchr = { version = "2.3.0", optional = true } mio = { version = "0.6.19", optional = true } mio-uds = { version = "0.6.7", optional = true } num_cpus = { version = "1.11.1", optional = true } once_cell = { version = "1.2.0", optional = true } -pin-project-lite = { version = "0.1.1", optional = true } +pin-project-lite = { version = "0.1.2", optional = true } pin-utils = { version = "0.1.0-alpha.4", optional = true } slab = { version = "0.4.2", optional = true } [dev-dependencies] femme = "1.3.0" -rand = "0.7.2" +rand = "0.7.3" surf = "1.0.3" tempdir = "0.3.7" futures = "0.3.1" diff --git a/README.md b/README.md index 769b4296..4ebf5924 100644 --- a/README.md +++ b/README.md @@ -74,18 +74,15 @@ syntax. ## Examples -All examples require the [`"attributes"` feature] to be enabled. This feature -is not enabled by default because it significantly impacts compile times. See -[`task::block_on`] for an alternative way to start executing tasks. - ```rust +use async_std::task; + async fn say_hello() { println!("Hello, world!"); } -#[async_std::main] -async fn main() { - say_hello().await; +fn main() { + task::block_on(say_hello()) } ``` @@ -123,6 +120,22 @@ documentation] on how to enable them. [cargo-add]: https://github.com/killercup/cargo-edit [features documentation]: https://docs.rs/async-std/#features +## Ecosystem + + * [async-tls](https://crates.io/crates/async-tls) — Async TLS/SSL streams using **Rustls**. + + * [async-native-tls](https://crates.io/crates/async-native-tls) — **Native TLS** for Async. Native TLS for futures and async-std. + + * [async-tungstenite](https://crates.io/crates/async-tungstenite) — Asynchronous **WebSockets** for async-std, tokio, gio and any std Futures runtime. + + * [Tide](https://crates.io/crates/tide) — Serve the web. A modular **web framework** built around async/await. + + * [SQLx](https://crates.io/crates/sqlx) — The Rust **SQL** Toolkit. SQLx is a 100% safe Rust library for Postgres and MySQL with compile-time checked queries. + + * [Surf](https://crates.io/crates/surf) — Surf the web. Surf is a friendly **HTTP client** built for casual Rustaceans and veterans alike. + + * [Xactor](https://crates.io/crates/xactor) — Xactor is a rust actors framework based on async-std. + ## License diff --git a/docs/src/SUMMARY.md b/docs/src/SUMMARY.md index d679825e..8b49ce7a 100644 --- a/docs/src/SUMMARY.md +++ b/docs/src/SUMMARY.md @@ -19,8 +19,9 @@ - [Clean Shutdown](./tutorial/clean_shutdown.md) - [Handling Disconnection](./tutorial/handling_disconnection.md) - [Implementing a Client](./tutorial/implementing_a_client.md) -- [TODO: Async Patterns](./patterns.md) +- [Async Patterns](./patterns.md) - [TODO: Collected Small Patterns](./patterns/small-patterns.md) + - [Production-Ready Accept Loop](./patterns/accept-loop.md) - [Security practices](./security/index.md) - [Security Disclosures and Policy](./security/policy.md) - [Glossary](./glossary.md) diff --git a/docs/src/patterns/accept-loop.md b/docs/src/patterns/accept-loop.md new file mode 100644 index 00000000..4508baf4 --- /dev/null +++ b/docs/src/patterns/accept-loop.md @@ -0,0 +1,266 @@ +# Production-Ready Accept Loop + +A production-ready accept loop needs the following things: +1. Handling errors +2. Limiting the number of simultanteous connections to avoid deny-of-service + (DoS) attacks + + +## Handling errors + +There are two kinds of errors in an accept loop: +1. Per-connection errors. The system uses them to notify that there was a + connection in the queue and it's dropped by the peer. Subsequent connections + can be already queued so next connection must be accepted immediately. +2. Resource shortages. When these are encountered it doesn't make sense to + accept the next socket immediately. But the listener stays active, so you server + should try to accept socket later. + +Here is the example of a per-connection error (printed in normal and debug mode): +``` +Error: Connection reset by peer (os error 104) +Error: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" } +``` + +And the following is the most common example of a resource shortage error: +``` +Error: Too many open files (os error 24) +Error: Os { code: 24, kind: Other, message: "Too many open files" } +``` + +### Testing Application + +To test your application for these errors try the following (this works +on unixes only). + +Lower limits and start the application: +``` +$ ulimit -n 100 +$ cargo run --example your_app + Compiling your_app v0.1.0 (/work) + Finished dev [unoptimized + debuginfo] target(s) in 5.47s + Running `target/debug/examples/your_app` +Server is listening on: http://127.0.0.1:1234 +``` +Then in another console run the [`wrk`] benchmark tool: +``` +$ wrk -c 1000 http://127.0.0.1:1234 +Running 10s test @ http://localhost:8080/ + 2 threads and 1000 connections +$ telnet localhost 1234 +Trying ::1... +Connected to localhost. +``` + +Important is to check the following things: + +1. The application doesn't crash on error (but may log errors, see below) +2. It's possible to connect to the application again once load is stopped + (few seconds after `wrk`). This is what `telnet` does in example above, + make sure it prints `Connected to `. +3. The `Too many open files` error is logged in the appropriate log. This + requires to set "maximum number of simultaneous connections" parameter (see + below) of your application to a value greater then `100` for this example. +4. Check CPU usage of the app while doing a test. It should not occupy 100% + of a single CPU core (it's unlikely that you can exhaust CPU by 1000 + connections in Rust, so this means error handling is not right). + +#### Testing non-HTTP applications + +If it's possible, use the appropriate benchmark tool and set the appropriate +number of connections. For example `redis-benchmark` has a `-c` parameter for +that, if you implement redis protocol. + +Alternatively, can still use `wrk`, just make sure that connection is not +immediately closed. If it is, put a temporary timeout before handing +the connection to the protocol handler, like this: + +```rust,edition2018 +# extern crate async_std; +# use std::time::Duration; +# use async_std::{ +# net::{TcpListener, ToSocketAddrs}, +# prelude::*, +# }; +# +# type Result = std::result::Result>; +# +#async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { +# let listener = TcpListener::bind(addr).await?; +# let mut incoming = listener.incoming(); +while let Some(stream) = incoming.next().await { + task::spawn(async { + task::sleep(Duration::from_secs(10)).await; // 1 + connection_loop(stream).await; + }); +} +# Ok(()) +# } +``` + +1. Make sure the sleep coroutine is inside the spawned task, not in the loop. + +[`wrk`]: https://github.com/wg/wrk + + +### Handling Errors Manually + +Here is how basic accept loop could look like: + +```rust,edition2018 +# extern crate async_std; +# use std::time::Duration; +# use async_std::{ +# net::{TcpListener, ToSocketAddrs}, +# prelude::*, +# }; +# +# type Result = std::result::Result>; +# +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { + let listener = TcpListener::bind(addr).await?; + let mut incoming = listener.incoming(); + while let Some(result) = incoming.next().await { + let stream = match stream { + Err(ref e) if is_connection_error(e) => continue, // 1 + Err(e) => { + eprintln!("Error: {}. Pausing for 500ms."); // 3 + task::sleep(Duration::from_millis(500)).await; // 2 + continue; + } + Ok(s) => s, + }; + // body + } + Ok(()) +} +``` + +1. Ignore per-connection errors. +2. Sleep and continue on resource shortage. +3. It's important to log the message, because these errors commonly mean the + misconfiguration of the system and are helpful for operations people running + the application. + +Be sure to [test your application](#testing-application). + + +### External Crates + +The crate [`async-listen`] has a helper to achieve this task: +```rust,edition2018 +# extern crate async_std; +# extern crate async_listen; +# use std::time::Duration; +# use async_std::{ +# net::{TcpListener, ToSocketAddrs}, +# prelude::*, +# }; +# +# type Result = std::result::Result>; +# +use async_listen::{ListenExt, error_hint}; + +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { + + let listener = TcpListener::bind(addr).await?; + let mut incoming = listener + .incoming() + .log_warnings(log_accept_error) // 1 + .handle_errors(Duration::from_millis(500)); + while let Some(socket) = incoming.next().await { // 2 + // body + } + Ok(()) +} + +fn log_accept_error(e: &io::Error) { + eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)) // 3 +} +``` + +1. Logs resource shortages (`async-listen` calls them warnings). If you use + `log` crate or any other in your app this should go to the log. +2. Stream yields sockets without `Result` wrapper after `handle_errors` because + all errors are already handled. +3. Together with the error we print a hint, which explains some errors for end + users. For example, it recommends increasing open file limit and gives + a link. + +[`async-listen`]: https://crates.io/crates/async-listen/ + +Be sure to [test your application](#testing-application). + + +## Connections Limit + +Even if you've applied everything described in +[Handling Errors](#handling-errors) section, there is still a problem. + +Let's imagine you have a server that needs to open a file to process +client request. At some point, you might encounter the following situation: + +1. There are as many client connection as max file descriptors allowed for + the application. +2. Listener gets `Too many open files` error so it sleeps. +3. Some client sends a request via the previously open connection. +4. Opening a file to serve request fails, because of the same + `Too many open files` error, until some other client drops a connection. + +There are many more possible situations, this is just a small illustation that +limiting number of connections is very useful. Generally, it's one of the ways +to control resources used by a server and avoiding some kinds of deny of +service (DoS) attacks. + +### `async-listen` crate + +Limiting maximum number of simultaneous connections with [`async-listen`] +looks like the following: + +```rust,edition2018 +# extern crate async_std; +# extern crate async_listen; +# use std::time::Duration; +# use async_std::{ +# net::{TcpListener, TcpStream, ToSocketAddrs}, +# prelude::*, +# }; +# +# type Result = std::result::Result>; +# +use async_listen::{ListenExt, Token, error_hint}; + +async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { + + let listener = TcpListener::bind(addr).await?; + let mut incoming = listener + .incoming() + .log_warnings(log_accept_error) + .handle_errors(Duration::from_millis(500)) // 1 + .backpressure(100); + while let Some((token, socket)) = incoming.next().await { // 2 + task::spawn(async move { + connection_loop(&token, stream).await; // 3 + }); + } + Ok(()) +} +async fn connection_loop(_token: &Token, stream: TcpStream) { // 4 + // ... +} +# fn log_accept_error(e: &io::Error) { +# eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)); +# } +``` + +1. We need to handle errors first, because [`backpressure`] helper expects + stream of `TcpStream` rather than `Result`. +2. The token yielded by a new stream is what is counted by backpressure helper. + I.e. if you drop a token, new connection can be established. +3. We give the connection loop a reference to token to bind token's lifetime to + the lifetime of the connection. +4. The token itsellf in the function can be ignored, hence `_token` + +[`backpressure`]: https://docs.rs/async-listen/0.1.2/async_listen/trait.ListenExt.html#method.backpressure + +Be sure to [test this behavior](#testing-application). diff --git a/docs/src/tutorial/handling_disconnection.md b/docs/src/tutorial/handling_disconnection.md index 9db9abd2..87a6ac66 100644 --- a/docs/src/tutorial/handling_disconnection.md +++ b/docs/src/tutorial/handling_disconnection.md @@ -157,7 +157,7 @@ async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { spawn_and_log_error(connection_loop(broker_sender.clone(), stream)); } drop(broker_sender); - broker_handle.await?; + broker_handle.await; Ok(()) } diff --git a/docs/src/tutorial/implementing_a_client.md b/docs/src/tutorial/implementing_a_client.md index fd728555..ba9d6f33 100644 --- a/docs/src/tutorial/implementing_a_client.md +++ b/docs/src/tutorial/implementing_a_client.md @@ -1,18 +1,16 @@ ## Implementing a client -Let's now implement the client for the chat. -Because the protocol is line-based, the implementation is pretty straightforward: +Since the protocol is line-based, implementing a client for the chat is straightforward: * Lines read from stdin should be sent over the socket. * Lines read from the socket should be echoed to stdout. -Unlike the server, the client needs only limited concurrency, as it interacts with only a single user. -For this reason, async doesn't bring a lot of performance benefits in this case. +Although async does not significantly affect client performance (as unlike the server, the client interacts solely with one user and only needs limited concurrency), async is still useful for managing concurrency! + +The client has to read from stdin and the socket *simultaneously*. +Programming this with threads is cumbersome, especially when implementing a clean shutdown. +With async, the `select!` macro is all that is needed. -However, async is still useful for managing concurrency! -Specifically, the client should *simultaneously* read from stdin and from the socket. -Programming this with threads is cumbersome, especially when implementing clean shutdown. -With async, we can just use the `select!` macro. ```rust,edition2018 # extern crate async_std; diff --git a/docs/src/tutorial/receiving_messages.md b/docs/src/tutorial/receiving_messages.md index 4f705294..f62b65d9 100644 --- a/docs/src/tutorial/receiving_messages.md +++ b/docs/src/tutorial/receiving_messages.md @@ -111,7 +111,7 @@ We can "fix" it by waiting for the task to be joined, like this: # # async move |stream| { let handle = task::spawn(connection_loop(stream)); -handle.await +handle.await? # }; ``` diff --git a/examples/tcp-echo.rs b/examples/tcp-echo.rs index 7c50be01..c04f0776 100644 --- a/examples/tcp-echo.rs +++ b/examples/tcp-echo.rs @@ -14,8 +14,9 @@ use async_std::task; async fn process(stream: TcpStream) -> io::Result<()> { println!("Accepted from: {}", stream.peer_addr()?); - let (reader, writer) = &mut (&stream, &stream); - io::copy(reader, writer).await?; + let mut reader = stream.clone(); + let mut writer = stream; + io::copy(&mut reader, &mut writer).await?; Ok(()) } diff --git a/examples/tcp-ipv4-and-6-echo.rs b/examples/tcp-ipv4-and-6-echo.rs index aef5e15e..a00e1f38 100644 --- a/examples/tcp-ipv4-and-6-echo.rs +++ b/examples/tcp-ipv4-and-6-echo.rs @@ -15,8 +15,9 @@ use async_std::task; async fn process(stream: TcpStream) -> io::Result<()> { println!("Accepted from: {}", stream.peer_addr()?); - let (reader, writer) = &mut (&stream, &stream); - io::copy(reader, writer).await?; + let mut reader = stream.clone(); + let mut writer = stream; + io::copy(&mut reader, &mut writer).await?; Ok(()) } diff --git a/rustfmt.toml b/rustfmt.toml index c6d404e2..1082fd88 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,2 +1 @@ version = "Two" -format_code_in_doc_comments = true diff --git a/src/fs/dir_entry.rs b/src/fs/dir_entry.rs index 527fab42..e1622eec 100644 --- a/src/fs/dir_entry.rs +++ b/src/fs/dir_entry.rs @@ -158,6 +158,12 @@ impl fmt::Debug for DirEntry { } } +impl Clone for DirEntry { + fn clone(&self) -> Self { + DirEntry(self.0.clone()) + } +} + cfg_unix! { use crate::os::unix::fs::DirEntryExt; diff --git a/src/future/future/join.rs b/src/future/future/join.rs index 0febcad0..4a508ce8 100644 --- a/src/future/future/join.rs +++ b/src/future/future/join.rs @@ -1,6 +1,6 @@ use std::pin::Pin; -use async_macros::MaybeDone; +use crate::future::MaybeDone; use pin_project_lite::pin_project; use crate::task::{Context, Poll}; diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index 0f9ef586..24f3fb59 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs @@ -7,7 +7,6 @@ cfg_unstable! { mod try_join; use std::time::Duration; - use delay::DelayFuture; use flatten::FlattenFuture; use crate::future::IntoFuture; @@ -17,9 +16,13 @@ cfg_unstable! { use try_join::TryJoin; } +cfg_unstable_default! { + use crate::future::timeout::TimeoutFuture; +} + extension_trait! { - use std::pin::Pin; - use std::ops::{Deref, DerefMut}; + use core::pin::Pin; + use core::ops::{Deref, DerefMut}; use crate::task::{Context, Poll}; @@ -133,7 +136,7 @@ extension_trait! { [`Future`]: ../future/trait.Future.html "#] - pub trait FutureExt: std::future::Future { + pub trait FutureExt: core::future::Future { /// Returns a Future that delays execution for a specified time. /// /// # Examples @@ -148,7 +151,7 @@ extension_trait! { /// dbg!(a.await); /// # }) /// ``` - #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn delay(self, dur: Duration) -> impl Future [DelayFuture] where @@ -355,6 +358,40 @@ extension_trait! { { TryJoin::new(self, other) } + + #[doc = r#" + Waits for both the future and a timeout, if the timeout completes before + the future, it returns an TimeoutError. + + # Example + ``` + # async_std::task::block_on(async { + # + use std::time::Duration; + + use async_std::prelude::*; + use async_std::future; + + let fut = future::ready(0); + let dur = Duration::from_millis(100); + let res = fut.timeout(dur).await; + assert!(res.is_ok()); + + let fut = future::pending::<()>(); + let dur = Duration::from_millis(100); + let res = fut.timeout(dur).await; + assert!(res.is_err()) + # + # }); + ``` + "#] + #[cfg(any(all(feature = "default", feature = "unstable"), feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn timeout(self, dur: Duration) -> impl Future [TimeoutFuture] + where Self: Sized + { + TimeoutFuture::new(self, dur) + } } impl Future for Box { diff --git a/src/future/future/race.rs b/src/future/future/race.rs index ed034f05..82165a0f 100644 --- a/src/future/future/race.rs +++ b/src/future/future/race.rs @@ -1,7 +1,7 @@ use std::future::Future; use std::pin::Pin; -use async_macros::MaybeDone; +use crate::future::MaybeDone; use pin_project_lite::pin_project; use crate::task::{Context, Poll}; diff --git a/src/future/future/try_join.rs b/src/future/future/try_join.rs index f1667240..5277ae31 100644 --- a/src/future/future/try_join.rs +++ b/src/future/future/try_join.rs @@ -1,6 +1,6 @@ use std::pin::Pin; -use async_macros::MaybeDone; +use crate::future::MaybeDone; use pin_project_lite::pin_project; use crate::task::{Context, Poll}; diff --git a/src/future/future/try_race.rs b/src/future/future/try_race.rs index d0ca4a90..45199570 100644 --- a/src/future/future/try_race.rs +++ b/src/future/future/try_race.rs @@ -1,6 +1,6 @@ use std::pin::Pin; -use async_macros::MaybeDone; +use crate::future::MaybeDone; use pin_project_lite::pin_project; use crate::task::{Context, Poll}; diff --git a/src/future/maybe_done.rs b/src/future/maybe_done.rs new file mode 100644 index 00000000..7a3a7fa3 --- /dev/null +++ b/src/future/maybe_done.rs @@ -0,0 +1,79 @@ +//! A type that wraps a future to keep track of its completion status. +//! +//! This implementation was taken from the original `macro_rules` `join/try_join` +//! macros in the `futures-preview` crate. + +use std::future::Future; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::ready; + +/// A future that may have completed. +#[derive(Debug)] +pub(crate) enum MaybeDone { + /// A not-yet-completed future + Future(Fut), + + /// The output of the completed future + Done(Fut::Output), + + /// The empty variant after the result of a [`MaybeDone`] has been + /// taken using the [`take`](MaybeDone::take) method. + Gone, +} + +impl MaybeDone { + /// Create a new instance of `MaybeDone`. + pub(crate) fn new(future: Fut) -> MaybeDone { + Self::Future(future) + } + + /// Returns an [`Option`] containing a reference to the output of the future. + /// The output of this method will be [`Some`] if and only if the inner + /// future has been completed and [`take`](MaybeDone::take) + /// has not yet been called. + #[inline] + pub(crate) fn output(self: Pin<&Self>) -> Option<&Fut::Output> { + let this = self.get_ref(); + match this { + MaybeDone::Done(res) => Some(res), + _ => None, + } + } + + /// Attempt to take the output of a `MaybeDone` without driving it + /// towards completion. + #[inline] + pub(crate) fn take(self: Pin<&mut Self>) -> Option { + unsafe { + let this = self.get_unchecked_mut(); + match this { + MaybeDone::Done(_) => {} + MaybeDone::Future(_) | MaybeDone::Gone => return None, + }; + if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) { + Some(output) + } else { + unreachable!() + } + } + } +} + +impl Future for MaybeDone { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let res = unsafe { + match Pin::as_mut(&mut self).get_unchecked_mut() { + MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)), + MaybeDone::Done(_) => return Poll::Ready(()), + MaybeDone::Gone => panic!("MaybeDone polled after value taken"), + } + }; + self.set(MaybeDone::Done(res)); + Poll::Ready(()) + } +} diff --git a/src/future/mod.rs b/src/future/mod.rs index 8b51a6a5..9b75533d 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -46,15 +46,20 @@ //! [`Future::race`]: trait.Future.html#method.race //! [`Future::try_race`]: trait.Future.html#method.try_race -pub use future::Future; -pub use pending::pending; -pub use poll_fn::poll_fn; -pub use ready::ready; +cfg_alloc! { + pub use future::Future; + pub(crate) mod future; +} + +cfg_std! { + pub use pending::pending; + pub use poll_fn::poll_fn; + pub use ready::ready; -pub(crate) mod future; -mod pending; -mod poll_fn; -mod ready; + mod pending; + mod poll_fn; + mod ready; +} cfg_default! { pub use timeout::{timeout, TimeoutError}; @@ -63,5 +68,7 @@ cfg_default! { cfg_unstable! { pub use into_future::IntoFuture; + pub(crate) use maybe_done::MaybeDone; mod into_future; + mod maybe_done; } diff --git a/src/future/timeout.rs b/src/future/timeout.rs index ff87ae4f..05aaa450 100644 --- a/src/future/timeout.rs +++ b/src/future/timeout.rs @@ -42,7 +42,7 @@ where pin_project! { /// A future that times out after a duration of time. - struct TimeoutFuture { + pub struct TimeoutFuture { #[pin] future: F, #[pin] @@ -50,6 +50,13 @@ pin_project! { } } +impl TimeoutFuture { + #[allow(dead_code)] + pub(super) fn new(future: F, dur: Duration) -> TimeoutFuture { + TimeoutFuture { future: future, delay: Delay::new(dur) } + } +} + impl Future for TimeoutFuture { type Output = Result; diff --git a/src/io/mod.rs b/src/io/mod.rs index ee1289d3..dd97567b 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -105,8 +105,8 @@ //! //! ```no_run //! use async_std::fs::File; -//! use async_std::io::BufWriter; //! use async_std::io::prelude::*; +//! use async_std::io::BufWriter; //! //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # @@ -116,8 +116,8 @@ //! //! // write a byte to the buffer //! writer.write(&[42]).await?; -//! //! } // the buffer is flushed once writer goes out of scope +//! // //! # //! # Ok(()) }) } //! ``` @@ -275,13 +275,13 @@ cfg_std! { #[doc(inline)] pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; - pub use buf_read::{BufRead, Lines}; + pub use buf_read::{BufRead, Lines, Split}; pub use buf_reader::BufReader; pub use buf_writer::{BufWriter, IntoInnerError}; pub use copy::copy; pub use cursor::Cursor; pub use empty::{empty, Empty}; - pub use read::Read; + pub use read::*; pub use repeat::{repeat, Repeat}; pub use seek::Seek; pub use sink::{sink, Sink}; @@ -321,7 +321,7 @@ cfg_default! { mod stdout; } -cfg_unstable! { +cfg_unstable_default! { pub use stderr::StderrLock; pub use stdin::StdinLock; pub use stdout::StdoutLock; diff --git a/src/io/read/bytes.rs b/src/io/read/bytes.rs index 42245243..ab925961 100644 --- a/src/io/read/bytes.rs +++ b/src/io/read/bytes.rs @@ -32,7 +32,7 @@ impl Stream for Bytes { } } -#[cfg(test)] +#[cfg(all(test, default))] mod tests { use crate::io; use crate::prelude::*; diff --git a/src/io/read/chain.rs b/src/io/read/chain.rs index 335cac25..4fcdb0ec 100644 --- a/src/io/read/chain.rs +++ b/src/io/read/chain.rs @@ -165,7 +165,7 @@ impl BufRead for Chain { } } -#[cfg(test)] +#[cfg(all(test, default))] mod tests { use crate::io; use crate::prelude::*; diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs index 0d7f4dcc..8aade189 100644 --- a/src/io/read/mod.rs +++ b/src/io/read/mod.rs @@ -17,6 +17,10 @@ use std::mem; use crate::io::IoSliceMut; +pub use take::Take; +pub use bytes::Bytes; +pub use chain::Chain; + extension_trait! { use std::pin::Pin; use std::ops::{Deref, DerefMut}; @@ -301,11 +305,11 @@ extension_trait! { # Ok(()) }) } ``` "#] - fn take(self, limit: u64) -> take::Take + fn take(self, limit: u64) -> Take where Self: Sized, { - take::Take { inner: self, limit } + Take { inner: self, limit } } #[doc = r#" @@ -377,8 +381,8 @@ extension_trait! { # Ok(()) }) } ``` "#] - fn bytes(self) -> bytes::Bytes where Self: Sized { - bytes::Bytes { inner: self } + fn bytes(self) -> Bytes where Self: Sized { + Bytes { inner: self } } #[doc = r#" @@ -413,8 +417,8 @@ extension_trait! { # Ok(()) }) } ``` "#] - fn chain(self, next: R) -> chain::Chain where Self: Sized { - chain::Chain { first: self, second: next, done_first: false } + fn chain(self, next: R) -> Chain where Self: Sized { + Chain { first: self, second: next, done_first: false } } } diff --git a/src/io/utils.rs b/src/io/utils.rs index 1b730645..0ba3ecb2 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -1,5 +1,7 @@ use crate::utils::Context; +use std::{error::Error as StdError, fmt, io}; + /// Wrap `std::io::Error` with additional message /// /// Keeps the original error kind and stores the original I/O error as `source`. @@ -9,8 +11,6 @@ impl Context for Result { } } -use std::{error::Error as StdError, fmt, io}; - #[derive(Debug)] pub(crate) struct VerboseError { source: io::Error, @@ -36,10 +36,6 @@ impl fmt::Display for VerboseError { } impl StdError for VerboseError { - fn description(&self) -> &str { - self.source.description() - } - fn source(&self) -> Option<&(dyn StdError + 'static)> { Some(&self.source) } diff --git a/src/io/write/write_fmt.rs b/src/io/write/write_fmt.rs index ec7847f2..d20c41d8 100644 --- a/src/io/write/write_fmt.rs +++ b/src/io/write/write_fmt.rs @@ -6,6 +6,7 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] +#[must_use] pub struct WriteFmtFuture<'a, T: Unpin + ?Sized> { pub(crate) writer: &'a mut T, pub(crate) res: Option>>, diff --git a/src/lib.rs b/src/lib.rs index 40ba15ab..7e0e98d3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,7 +47,7 @@ //! encouraged to read it. The `async-std` source is generally high //! quality and a peek behind the curtains is often enlightening. //! -//! Modules in this crate are organized in the same way as in `async-std`, except blocking +//! Modules in this crate are organized in the same way as in `std`, except blocking //! functions have been replaced with async functions and threads have been replaced with //! lightweight tasks. //! @@ -152,6 +152,8 @@ //! Await two futures concurrently, and return a tuple of their output: //! //! ``` +//! use async_std::prelude::*; +//! //! #[async_std::main] //! async fn main() { //! let a = async { 1u8 }; @@ -167,7 +169,7 @@ //! //! #[async_std::main] //! async fn main() -> std::io::Result<()> { -//! let mut socket = UdpSocket::bind("127.0.0.1:8080")?; +//! let socket = UdpSocket::bind("127.0.0.1:8080").await?; //! println!("Listening on {}", socket.local_addr()?); //! //! let mut buf = vec![0u8; 1024]; @@ -218,7 +220,18 @@ //! default-features = false //! features = ["std"] //! ``` +//! +//! And to use async-std on `no_std` targets that only support `alloc` only +//! enable the `alloc` Cargo feature: +//! +//! ```toml +//! [dependencies.async-std] +//! version = "1.5.0" +//! default-features = false +//! features = ["alloc"] +//! ``` +#![cfg_attr(not(feature = "std"), no_std)] #![cfg_attr(feature = "docs", feature(doc_cfg))] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![allow(clippy::mutex_atomic, clippy::module_inception)] @@ -227,6 +240,8 @@ #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] #![recursion_limit = "2048"] +extern crate alloc; + #[macro_use] mod utils; @@ -238,14 +253,17 @@ pub use async_attributes::{main, test}; #[cfg(feature = "std")] mod macros; -cfg_std! { +cfg_alloc! { + pub mod task; pub mod future; + pub mod stream; +} + +cfg_std! { pub mod io; pub mod os; pub mod prelude; - pub mod stream; pub mod sync; - pub mod task; } cfg_default! { @@ -265,7 +283,9 @@ cfg_unstable! { mod option; mod string; mod collections; +} +cfg_unstable_default! { #[doc(inline)] pub use std::{write, writeln}; } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index b389518c..9e15d40f 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,6 +1,7 @@ use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; +use std::sync::Arc; use crate::future; use crate::io; @@ -75,9 +76,7 @@ impl TcpListener { /// [`local_addr`]: #method.local_addr pub async fn bind(addrs: A) -> io::Result { let mut last_err = None; - let addrs = addrs - .to_socket_addrs() - .await?; + let addrs = addrs.to_socket_addrs().await?; for addr in addrs { match mio::net::TcpListener::bind(&addr) { @@ -121,7 +120,7 @@ impl TcpListener { let mio_stream = mio::net::TcpStream::from_stream(io)?; let stream = TcpStream { - watcher: Watcher::new(mio_stream), + watcher: Arc::new(Watcher::new(mio_stream)), }; Ok((stream, addr)) } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 71245a31..1f50e8f1 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -1,13 +1,13 @@ use std::io::{IoSlice, IoSliceMut, Read as _, Write as _}; use std::net::SocketAddr; use std::pin::Pin; +use std::sync::Arc; use crate::future; use crate::io::{self, Read, Write}; use crate::rt::Watcher; use crate::net::ToSocketAddrs; -use crate::task::{spawn_blocking, Context, Poll}; -use crate::utils::Context as _; +use crate::task::{Context, Poll}; /// A TCP stream between a local and a remote socket. /// @@ -45,9 +45,9 @@ use crate::utils::Context as _; /// # /// # Ok(()) }) } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TcpStream { - pub(super) watcher: Watcher, + pub(super) watcher: Arc>, } impl TcpStream { @@ -72,25 +72,31 @@ impl TcpStream { /// ``` pub async fn connect(addrs: A) -> io::Result { let mut last_err = None; - let addrs = addrs - .to_socket_addrs() - .await?; + let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - let res = spawn_blocking(move || { - let std_stream = std::net::TcpStream::connect(addr) - .context(|| format!("could not connect to {}", addr))?; - let mio_stream = mio::net::TcpStream::from_stream(std_stream) - .context(|| format!("could not open async connection to {}", addr))?; - Ok(TcpStream { - watcher: Watcher::new(mio_stream), - }) - }) - .await; + // mio's TcpStream::connect is non-blocking and may just be in progress + // when it returns with `Ok`. We therefore wait for write readiness to + // be sure the connection has either been established or there was an + // error which we check for afterwards. + let watcher = match mio::net::TcpStream::connect(&addr) { + Ok(s) => Watcher::new(s), + Err(e) => { + last_err = Some(e); + continue; + } + }; - match res { - Ok(stream) => return Ok(stream), - Err(err) => last_err = Some(err), + future::poll_fn(|cx| watcher.poll_write_ready(cx)).await; + + match watcher.get_ref().take_error() { + Ok(None) => { + return Ok(TcpStream { + watcher: Arc::new(watcher), + }); + } + Ok(Some(e)) => last_err = Some(e), + Err(e) => last_err = Some(e), } } @@ -356,6 +362,7 @@ impl Write for &TcpStream { } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + self.shutdown(std::net::Shutdown::Write)?; Poll::Ready(Ok(())) } } @@ -365,7 +372,7 @@ impl From for TcpStream { fn from(stream: std::net::TcpStream) -> TcpStream { let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap(); TcpStream { - watcher: Watcher::new(mio_stream), + watcher: Arc::new(Watcher::new(mio_stream)), } } } @@ -387,7 +394,10 @@ cfg_unix! { impl IntoRawFd for TcpStream { fn into_raw_fd(self) -> RawFd { - self.watcher.into_inner().into_raw_fd() + // TODO(stjepang): This does not mean `RawFd` is now the sole owner of the file + // descriptor because it's possible that there are other clones of this `TcpStream` + // using it at the same time. We should probably document that behavior. + self.as_raw_fd() } } } diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 961288a0..774478d3 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -244,9 +244,12 @@ impl UdpSocket { })) } - /// Sends data on the socket to the given address. + /// Sends data on the socket to the remote address to which it is connected. /// - /// On success, returns the number of bytes written. + /// The [`connect`] method will connect this socket to a remote address. + /// This method will fail if the socket is not connected. + /// + /// [`connect`]: #method.connect /// /// # Examples /// @@ -255,18 +258,11 @@ impl UdpSocket { /// # /// use async_std::net::UdpSocket; /// - /// const THE_MERCHANT_OF_VENICE: &[u8] = b" - /// If you prick us, do we not bleed? - /// If you tickle us, do we not laugh? - /// If you poison us, do we not die? - /// And if you wrong us, shall we not revenge? - /// "; - /// - /// let socket = UdpSocket::bind("127.0.0.1:0").await?; + /// let socket = UdpSocket::bind("127.0.0.1:34254").await?; + /// socket.connect("127.0.0.1:8080").await?; + /// let bytes = socket.send(b"Hi there!").await?; /// - /// let addr = "127.0.0.1:7878"; - /// let sent = socket.send_to(THE_MERCHANT_OF_VENICE, &addr).await?; - /// println!("Sent {} bytes to {}", sent, addr); + /// println!("Sent {} bytes", bytes); /// # /// # Ok(()) }) } /// ``` @@ -288,7 +284,7 @@ impl UdpSocket { /// Receives data from the socket. /// - /// On success, returns the number of bytes read and the origin. + /// On success, returns the number of bytes read. /// /// # Examples /// diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index 86791143..de929ca9 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -2,6 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{FromStream, IntoStream}; +use std::convert::identity; impl FromStream> for Option where @@ -17,24 +18,22 @@ where let stream = stream.into_stream(); Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs - let mut found_error = false; + let mut found_none = false; let out: V = stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { - found_error = true; - // Stop processing the stream on error - None - } + .take_while(|elem| { + elem.is_some() || { + found_none = true; + // Stop processing the stream on `None` + false } }) + .filter_map(identity) .collect() .await; - if found_error { None } else { Some(out) } + if found_none { None } else { Some(out) } }) } } diff --git a/src/option/product.rs b/src/option/product.rs index abaab73e..b446c1ff 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -1,7 +1,8 @@ use std::pin::Pin; use crate::prelude::*; -use crate::stream::{Stream, Product}; +use crate::stream::{Product, Stream}; +use std::convert::identity; impl Product> for Option where @@ -36,29 +37,27 @@ where ``` "#] fn product<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; - let out = >::product(stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { + let out = >::product( + stream + .take_while(|elem| { + elem.is_some() || { found_none = true; - // Stop processing the stream on error - None + // Stop processing the stream on `None` + false } - } - })).await; + }) + .filter_map(identity), + ) + .await; - if found_none { - None - } else { - Some(out) - } + if found_none { None } else { Some(out) } }) } } diff --git a/src/option/sum.rs b/src/option/sum.rs index d2e44830..de404f42 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -2,6 +2,7 @@ use std::pin::Pin; use crate::prelude::*; use crate::stream::{Stream, Sum}; +use std::convert::identity; impl Sum> for Option where @@ -31,29 +32,27 @@ where ``` "#] fn sum<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs let mut found_none = false; - let out = >::sum(stream - .scan((), |_, elem| { - match elem { - Some(elem) => Some(elem), - None => { + let out = >::sum( + stream + .take_while(|elem| { + elem.is_some() || { found_none = true; - // Stop processing the stream on error - None + // Stop processing the stream on `None` + false } - } - })).await; + }) + .filter_map(identity), + ) + .await; - if found_none { - None - } else { - Some(out) - } + if found_none { None } else { Some(out) } }) } } diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index a8490d69..8a8e0eaf 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -10,6 +10,23 @@ where /// Takes each element in the stream: if it is an `Err`, no further /// elements are taken, and the `Err` is returned. Should no `Err` /// occur, a container with the values of each `Result` is returned. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let v = stream::from_iter(vec![1, 2]); + /// let res: Result, &'static str> = v.map(|x: u32| + /// x.checked_add(1).ok_or("Overflow!") + /// ).collect().await; + /// assert_eq!(res, Ok(vec![2, 3])); + /// # + /// # }) } + /// ``` #[inline] fn from_stream<'a, S: IntoStream> + 'a>( stream: S, @@ -17,26 +34,34 @@ where let stream = stream.into_stream(); Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; let out: V = stream - .scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), - Err(err) => { - found_error = Some(err); - // Stop processing the stream on error - None - } + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None } }) .collect() .await; - match found_error { - Some(err) => Err(err), - None => Ok(out), + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/product.rs b/src/result/product.rs index ec9d94a8..45782ff7 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -1,7 +1,7 @@ use std::pin::Pin; use crate::prelude::*; -use crate::stream::{Stream, Product}; +use crate::stream::{Product, Stream}; impl Product> for Result where @@ -36,26 +36,39 @@ where ``` "#] fn product<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::product(stream - .scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), + let out = >::product( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), Err(err) => { found_error = Some(err); - // Stop processing the stream on error None } - } - })).await; - match found_error { - Some(err) => Err(err), - None => Ok(out) + }), + ) + .await; + + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/result/sum.rs b/src/result/sum.rs index ccc4240e..b6d84a0c 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -36,26 +36,39 @@ where ``` "#] fn sum<'a, S>(stream: S) -> Pin> + 'a>> - where S: Stream> + 'a + where + S: Stream> + 'a, { Box::pin(async move { - // Using `scan` here because it is able to stop the stream early + // Using `take_while` here because it is able to stop the stream early // if a failure occurs + let mut is_error = false; let mut found_error = None; - let out = >::sum(stream - .scan((), |_, elem| { - match elem { - Ok(elem) => Some(elem), + let out = >::sum( + stream + .take_while(|elem| { + // Stop processing the stream on `Err` + !is_error + && (elem.is_ok() || { + is_error = true; + // Capture first `Err` + true + }) + }) + .filter_map(|elem| match elem { + Ok(value) => Some(value), Err(err) => { found_error = Some(err); - // Stop processing the stream on error None } - } - })).await; - match found_error { - Some(err) => Err(err), - None => Ok(out) + }), + ) + .await; + + if is_error { + Err(found_error.unwrap()) + } else { + Ok(out) } }) } diff --git a/src/rt/reactor.rs b/src/rt/reactor.rs index c0046c97..2a35b72c 100644 --- a/src/rt/reactor.rs +++ b/src/rt/reactor.rs @@ -16,10 +16,10 @@ struct Entry { token: mio::Token, /// Tasks that are blocked on reading from this I/O handle. - readers: Mutex>, + readers: Mutex, /// Tasks that are blocked on writing to this I/O handle. - writers: Mutex>, + writers: Mutex, } /// The state of a networking driver. @@ -40,6 +40,26 @@ pub struct Reactor { notify_token: mio::Token, } +/// The set of `Waker`s interested in read readiness. +#[derive(Debug)] +struct Readers { + /// Flag indicating read readiness. + /// (cf. `Watcher::poll_read_ready`) + ready: bool, + /// The `Waker`s blocked on reading. + wakers: Vec, +} + +/// The set of `Waker`s interested in write readiness. +#[derive(Debug)] +struct Writers { + /// Flag indicating write readiness. + /// (cf. `Watcher::poll_write_ready`) + ready: bool, + /// The `Waker`s blocked on writing. + wakers: Vec, +} + impl Reactor { /// Creates a new reactor for polling I/O events. pub fn new() -> io::Result { @@ -72,8 +92,14 @@ impl Reactor { // Allocate an entry and insert it into the slab. let entry = Arc::new(Entry { token, - readers: Mutex::new(Vec::new()), - writers: Mutex::new(Vec::new()), + readers: Mutex::new(Readers { + ready: false, + wakers: Vec::new(), + }), + writers: Mutex::new(Writers { + ready: false, + wakers: Vec::new(), + }), }); vacant.insert(entry.clone()); @@ -131,7 +157,9 @@ impl Reactor { // Wake up reader tasks blocked on this I/O handle. let reader_interests = mio::Ready::all() - mio::Ready::writable(); if !(readiness & reader_interests).is_empty() { - for w in entry.readers.lock().unwrap().drain(..) { + let mut readers = entry.readers.lock().unwrap(); + readers.ready = true; + for w in readers.wakers.drain(..) { w.wake(); progress = true; } @@ -140,7 +168,9 @@ impl Reactor { // Wake up writer tasks blocked on this I/O handle. let writer_interests = mio::Ready::all() - mio::Ready::readable(); if !(readiness & writer_interests).is_empty() { - for w in entry.writers.lock().unwrap().drain(..) { + let mut writers = entry.writers.lock().unwrap(); + writers.ready = true; + for w in writers.wakers.drain(..) { w.wake(); progress = true; } @@ -200,7 +230,7 @@ impl Watcher { } // Lock the waker list. - let mut list = self.entry.readers.lock().unwrap(); + let mut readers = self.entry.readers.lock().unwrap(); // Try running the operation again. match f(self.source.as_ref().unwrap()) { @@ -209,8 +239,9 @@ impl Watcher { } // Register the task if it isn't registered already. - if list.iter().all(|w| !w.will_wake(cx.waker())) { - list.push(cx.waker().clone()); + + if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + readers.wakers.push(cx.waker().clone()); } Poll::Pending @@ -235,7 +266,7 @@ impl Watcher { } // Lock the waker list. - let mut list = self.entry.writers.lock().unwrap(); + let mut writers = self.entry.writers.lock().unwrap(); // Try running the operation again. match f(self.source.as_ref().unwrap()) { @@ -244,10 +275,47 @@ impl Watcher { } // Register the task if it isn't registered already. - if list.iter().all(|w| !w.will_wake(cx.waker())) { - list.push(cx.waker().clone()); + if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + writers.wakers.push(cx.waker().clone()); + } + + Poll::Pending + } + + /// Polls the inner I/O source until a non-blocking read can be performed. + /// + /// If non-blocking reads are currently not possible, the `Waker` + /// will be saved and notified when it can read non-blocking + /// again. + #[allow(dead_code)] + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + // Lock the waker list. + let mut readers = self.entry.readers.lock().unwrap(); + if readers.ready { + return Poll::Ready(()); + } + // Register the task if it isn't registered already. + if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + readers.wakers.push(cx.waker().clone()); } + Poll::Pending + } + /// Polls the inner I/O source until a non-blocking write can be performed. + /// + /// If non-blocking writes are currently not possible, the `Waker` + /// will be saved and notified when it can write non-blocking + /// again. + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + // Lock the waker list. + let mut writers = self.entry.writers.lock().unwrap(); + if writers.ready { + return Poll::Ready(()); + } + // Register the task if it isn't registered already. + if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { + writers.wakers.push(cx.waker().clone()); + } Poll::Pending } diff --git a/src/stream/double_ended_stream.rs b/src/stream/double_ended_stream.rs deleted file mode 100644 index 129bb1cd..00000000 --- a/src/stream/double_ended_stream.rs +++ /dev/null @@ -1,24 +0,0 @@ -use crate::stream::Stream; - -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// A stream able to yield elements from both ends. -/// -/// Something that implements `DoubleEndedStream` has one extra capability -/// over something that implements [`Stream`]: the ability to also take -/// `Item`s from the back, as well as the front. -/// -/// [`Stream`]: trait.Stream.html -#[cfg(feature = "unstable")] -#[cfg_attr(feature = "docs", doc(cfg(unstable)))] -pub trait DoubleEndedStream: Stream { - /// Removes and returns an element from the end of the stream. - /// - /// Returns `None` when there are no more elements. - /// - /// The [trait-level] docs contain more details. - /// - /// [trait-level]: trait.DoubleEndedStream.html - fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; -} diff --git a/src/stream/double_ended_stream/mod.rs b/src/stream/double_ended_stream/mod.rs new file mode 100644 index 00000000..a177865b --- /dev/null +++ b/src/stream/double_ended_stream/mod.rs @@ -0,0 +1,246 @@ +use crate::stream::Stream; + +use std::pin::Pin; +use std::task::{Context, Poll}; + +mod next_back; +mod nth_back; +mod rfind; +mod rfold; +mod try_rfold; + +use next_back::NextBackFuture; +use nth_back::NthBackFuture; +use rfind::RFindFuture; +use rfold::RFoldFuture; +use try_rfold::TryRFoldFuture; + +/// A stream able to yield elements from both ends. +/// +/// Something that implements `DoubleEndedStream` has one extra capability +/// over something that implements [`Stream`]: the ability to also take +/// `Item`s from the back, as well as the front. +/// +/// [`Stream`]: trait.Stream.html +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub trait DoubleEndedStream: Stream { + #[doc = r#" + Attempts to receive the next item from the back of the stream. + + There are several possible return values: + + * `Poll::Pending` means this stream's next_back value is not ready yet. + * `Poll::Ready(None)` means this stream has been exhausted. + * `Poll::Ready(Some(item))` means `item` was received out of the stream. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::pin::Pin; + + use async_std::prelude::*; + use async_std::stream; + use async_std::task::{Context, Poll}; + + fn increment( + s: impl DoubleEndedStream + Unpin, + ) -> impl DoubleEndedStream + Unpin { + struct Increment(S); + + impl + Unpin> Stream for Increment { + type Item = S::Item; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.0).poll_next(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + } + } + } + + impl + Unpin> DoubleEndedStream for Increment { + fn poll_next_back( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + match Pin::new(&mut self.0).poll_next_back(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(None) => Poll::Ready(None), + Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)), + } + } + } + + Increment(s) + } + + let mut s = increment(stream::once(7)); + + assert_eq!(s.next_back().await, Some(8)); + assert_eq!(s.next_back().await, None); + # + # }) } + ``` + "#] + fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + #[doc = r#" + Advances the stream and returns the next value. + + Returns [`None`] when iteration is finished. Individual stream implementations may + choose to resume iteration, and so calling `next()` again may or may not eventually + start returning more values. + + [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let mut s = stream::from_iter(vec![7u8]); + + assert_eq!(s.next_back().await, Some(7)); + assert_eq!(s.next_back().await, None); + # + # }) } + ``` + "#] + fn next_back(&mut self) -> NextBackFuture<'_, Self> + where + Self: Unpin, + { + NextBackFuture { stream: self } + } + + #[doc = r#" + Returns the nth element from the back of the stream. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let mut s = stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.nth_back(1).await; + assert_eq!(second, Some(4)); + # + # }) } + ``` + "#] + fn nth_back(&mut self, n: usize) -> NthBackFuture<'_, Self> + where + Self: Unpin + Sized, + { + NthBackFuture::new(self, n) + } + + #[doc = r#" + Returns the the frist element from the right that matches the predicate. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let mut s = stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.rfind(|v| v % 2 == 0).await; + assert_eq!(second, Some(4)); + # + # }) } + ``` + "#] + fn rfind

(&mut self, p: P) -> RFindFuture<'_, Self, P> + where + Self: Unpin + Sized, + P: FnMut(&Self::Item) -> bool, + { + RFindFuture::new(self, p) + } + + #[doc = r#" + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let s = stream::from_iter(vec![1u8, 2, 3, 4, 5]); + + let second = s.rfold(0, |acc, v| v + acc).await; + + assert_eq!(second, 15); + # + # }) } + ``` + "#] + fn rfold(self, accum: B, f: F) -> RFoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> B, + { + RFoldFuture::new(self, accum, f) + } + + #[doc = r#" + A combinator that applies a function as long as it returns successfully, producing a single, final value. + Immediately returns the error when the function returns unsuccessfully. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let s = stream::from_iter(vec![1u8, 2, 3, 4, 5]); + let sum = s.try_rfold(0, |acc, v| { + if (acc+v) % 2 == 1 { + Ok(v+3) + } else { + Err("fail") + } + }).await; + + assert_eq!(sum, Err("fail")); + # + # }) } + ``` + "#] + fn try_rfold(self, accum: B, f: F) -> TryRFoldFuture + where + Self: Sized, + F: FnMut(B, Self::Item) -> Result, + { + TryRFoldFuture::new(self, accum, f) + } +} diff --git a/src/stream/double_ended_stream/next_back.rs b/src/stream/double_ended_stream/next_back.rs new file mode 100644 index 00000000..9fb52b7b --- /dev/null +++ b/src/stream/double_ended_stream/next_back.rs @@ -0,0 +1,19 @@ +use core::pin::Pin; +use core::future::Future; + +use crate::stream::DoubleEndedStream; +use crate::task::{Context, Poll}; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct NextBackFuture<'a, T: Unpin + ?Sized> { + pub(crate) stream: &'a mut T, +} + +impl Future for NextBackFuture<'_, T> { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut *self.stream).poll_next_back(cx) + } +} diff --git a/src/stream/double_ended_stream/nth_back.rs b/src/stream/double_ended_stream/nth_back.rs new file mode 100644 index 00000000..2701e9b6 --- /dev/null +++ b/src/stream/double_ended_stream/nth_back.rs @@ -0,0 +1,41 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use crate::stream::DoubleEndedStream; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct NthBackFuture<'a, S> { + stream: &'a mut S, + n: usize, +} + +impl<'a, S> NthBackFuture<'a, S> { + pub(crate) fn new(stream: &'a mut S, n: usize) -> Self { + NthBackFuture { stream, n } + } +} + +impl<'a, S> Future for NthBackFuture<'a, S> +where + S: DoubleEndedStream + Sized + Unpin, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx)); + match next { + Some(v) => match self.n { + 0 => Poll::Ready(Some(v)), + _ => { + self.n -= 1; + cx.waker().wake_by_ref(); + Poll::Pending + } + }, + None => Poll::Ready(None), + } + } +} + diff --git a/src/stream/double_ended_stream/rfind.rs b/src/stream/double_ended_stream/rfind.rs new file mode 100644 index 00000000..36665516 --- /dev/null +++ b/src/stream/double_ended_stream/rfind.rs @@ -0,0 +1,41 @@ +use core::task::{Context, Poll}; +use core::future::Future; +use core::pin::Pin; + +use crate::stream::DoubleEndedStream; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct RFindFuture<'a, S, P> { + stream: &'a mut S, + p: P, +} + +impl<'a, S, P> RFindFuture<'a, S, P> { + pub(super) fn new(stream: &'a mut S, p: P) -> Self { + RFindFuture { stream, p } + } +} + +impl Unpin for RFindFuture<'_, S, P> {} + +impl<'a, S, P> Future for RFindFuture<'a, S, P> +where + S: DoubleEndedStream + Unpin + Sized, + P: FnMut(&S::Item) -> bool, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx)); + + match item { + Some(v) if (&mut self.p)(&v) => Poll::Ready(Some(v)), + Some(_) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + None => Poll::Ready(None), + } + } +} diff --git a/src/stream/double_ended_stream/rfold.rs b/src/stream/double_ended_stream/rfold.rs new file mode 100644 index 00000000..9bc18783 --- /dev/null +++ b/src/stream/double_ended_stream/rfold.rs @@ -0,0 +1,52 @@ +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +use crate::stream::DoubleEndedStream; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct RFoldFuture { + #[pin] + stream: S, + f: F, + acc: Option, + } +} + +impl RFoldFuture { + pub(super) fn new(stream: S, init: B, f: F) -> Self { + RFoldFuture { + stream, + f, + acc: Some(init), + } + } +} + +impl Future for RFoldFuture +where + S: DoubleEndedStream + Sized, + F: FnMut(B, S::Item) -> B, +{ + type Output = B; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); + + match next { + Some(v) => { + let old = this.acc.take().unwrap(); + let new = (this.f)(old, v); + *this.acc = Some(new); + } + None => return Poll::Ready(this.acc.take().unwrap()), + } + } + } +} diff --git a/src/stream/double_ended_stream/try_rfold.rs b/src/stream/double_ended_stream/try_rfold.rs new file mode 100644 index 00000000..d67b6ecf --- /dev/null +++ b/src/stream/double_ended_stream/try_rfold.rs @@ -0,0 +1,56 @@ +use crate::future::Future; +use core::pin::Pin; +use crate::task::{Context, Poll}; + +use pin_project_lite::pin_project; + +use crate::stream::DoubleEndedStream; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct TryRFoldFuture { + #[pin] + stream: S, + f: F, + acc: Option, + } +} + +impl TryRFoldFuture { + pub(super) fn new(stream: S, init: T, f: F) -> Self { + TryRFoldFuture { + stream, + f, + acc: Some(init), + } + } +} + +impl Future for TryRFoldFuture +where + S: DoubleEndedStream + Unpin, + F: FnMut(T, S::Item) -> Result, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx)); + + match next { + Some(v) => { + let old = this.acc.take().unwrap(); + let new = (this.f)(old, v); + + match new { + Ok(o) => *this.acc = Some(o), + Err(e) => return Poll::Ready(Err(e)), + } + } + None => return Poll::Ready(Ok(this.acc.take().unwrap())), + } + } + } +} diff --git a/src/stream/empty.rs b/src/stream/empty.rs index 49090707..a11337af 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -1,5 +1,5 @@ -use std::marker::PhantomData; -use std::pin::Pin; +use core::marker::PhantomData; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/extend.rs b/src/stream/extend.rs index c48fe1ed..702cbcac 100644 --- a/src/stream/extend.rs +++ b/src/stream/extend.rs @@ -1,6 +1,6 @@ -use std::pin::Pin; +use core::pin::Pin; +use core::future::Future; -use crate::prelude::*; use crate::stream::IntoStream; /// Extends a collection with the contents of a stream. diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 8067176e..d3736454 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/from_iter.rs b/src/stream/from_iter.rs index d7a31d6c..ea2ed8ef 100644 --- a/src/stream/from_iter.rs +++ b/src/stream/from_iter.rs @@ -1,8 +1,10 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; use crate::stream::Stream; +#[cfg(feature = "unstable")] +use crate::stream::double_ended_stream::DoubleEndedStream; use crate::task::{Context, Poll}; pin_project! { @@ -51,3 +53,10 @@ impl Stream for FromIter { Poll::Ready(self.iter.next()) } } + +#[cfg(feature = "unstable")] +impl DoubleEndedStream for FromIter { + fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.iter.next_back()) + } +} diff --git a/src/stream/from_stream.rs b/src/stream/from_stream.rs index 67b9b3df..12d89e81 100644 --- a/src/stream/from_stream.rs +++ b/src/stream/from_stream.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use crate::stream::IntoStream; diff --git a/src/stream/interval.rs b/src/stream/interval.rs index b0df7141..7a0c1740 100644 --- a/src/stream/interval.rs +++ b/src/stream/interval.rs @@ -2,10 +2,10 @@ use std::pin::Pin; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use crate::future::Future; +use crate::stream::Stream; use futures_timer::Delay; -use crate::prelude::*; - /// Creates a new stream that yields at a set interval. /// /// The stream first yields after `dur`, and continues to yield every @@ -85,7 +85,7 @@ impl Stream for Interval { /// While technically for large duration it's impossible to represent any /// duration as nanoseconds, the largest duration we can represent is about /// 427_000 years. Large enough for any interval we would use or calculate in -/// tokio. +/// async-std. fn duration_to_nanos(dur: Duration) -> Option { dur.as_secs() .checked_mul(1_000_000_000) diff --git a/src/stream/mod.rs b/src/stream/mod.rs index d8b96ec2..0bfd4e86 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -325,6 +325,7 @@ cfg_unstable! { mod fused_stream; mod interval; mod into_stream; + mod pending; mod product; mod successors; mod sum; @@ -336,6 +337,7 @@ cfg_unstable! { pub use fused_stream::FusedStream; pub use interval::{interval, Interval}; pub use into_stream::IntoStream; + pub use pending::{pending, Pending}; pub use product::Product; pub use stream::Merge; pub use successors::{successors, Successors}; diff --git a/src/stream/once.rs b/src/stream/once.rs index e4ac682c..b86f181d 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -1,10 +1,13 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; use crate::stream::Stream; use crate::task::{Context, Poll}; +#[cfg(feature = "unstable")] +use crate::stream::DoubleEndedStream; + /// Creates a stream that yields a single item. /// /// # Examples @@ -46,3 +49,10 @@ impl Stream for Once { Poll::Ready(self.project().value.take()) } } + +#[cfg(feature = "unstable")] +impl DoubleEndedStream for Once { + fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(self.project().value.take()) + } +} diff --git a/src/stream/pending.rs b/src/stream/pending.rs new file mode 100644 index 00000000..edb6be4b --- /dev/null +++ b/src/stream/pending.rs @@ -0,0 +1,68 @@ +use core::marker::PhantomData; +use core::pin::Pin; +use core::task::{Context, Poll}; + +use crate::stream::{DoubleEndedStream, ExactSizeStream, FusedStream, Stream}; + +/// A stream that never returns any items. +/// +/// This stream is created by the [`pending`] function. See its +/// documentation for more. +/// +/// [`pending`]: fn.pending.html +#[derive(Debug)] +pub struct Pending { + _marker: PhantomData, +} + +/// Creates a stream that never returns any items. +/// +/// The returned stream will always return `Pending` when polled. +/// # Examples +/// +/// ``` +/// # async_std::task::block_on(async { +/// # +/// use std::time::Duration; +/// +/// use async_std::prelude::*; +/// use async_std::stream; +/// +/// let dur = Duration::from_millis(100); +/// let mut s = stream::pending::<()>().timeout(dur); +/// +/// let item = s.next().await; +/// +/// assert!(item.is_some()); +/// assert!(item.unwrap().is_err()); +/// +/// # +/// # }) +/// ``` +pub fn pending() -> Pending { + Pending { + _marker: PhantomData, + } +} + +impl Stream for Pending { + type Item = T; + + fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Pending + } +} + +impl DoubleEndedStream for Pending { + fn poll_next_back(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Pending + } +} + +impl FusedStream for Pending {} + +impl ExactSizeStream for Pending { + fn len(&self) -> usize { + 0 + } +} diff --git a/src/stream/product.rs b/src/stream/product.rs index 2f5bf4c3..15497e87 100644 --- a/src/stream/product.rs +++ b/src/stream/product.rs @@ -1,5 +1,5 @@ -use std::pin::Pin; -use std::future::Future; +use core::pin::Pin; +use core::future::Future; use crate::stream::Stream; diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index f3dfdbd8..e73a2f82 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs index e183a77c..39984a3a 100644 --- a/src/stream/repeat_with.rs +++ b/src/stream/repeat_with.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index d2ac5cac..06f4d7f8 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs @@ -1,6 +1,6 @@ -use std::marker::PhantomData; -use std::pin::Pin; -use std::future::Future; +use core::marker::PhantomData; +use core::pin::Pin; +use core::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index 34168e67..15154c50 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs @@ -1,6 +1,6 @@ -use std::marker::PhantomData; -use std::pin::Pin; -use std::future::Future; +use core::marker::PhantomData; +use core::pin::Pin; +use core::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index 909fc19b..c034a53e 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -1,9 +1,10 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; use super::fuse::Fuse; -use crate::prelude::*; +use crate::stream::stream::StreamExt; +use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { diff --git a/src/stream/stream/cloned.rs b/src/stream/stream/cloned.rs index 4c77c5c9..eac992dd 100644 --- a/src/stream/stream/cloned.rs +++ b/src/stream/stream/cloned.rs @@ -1,7 +1,7 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; use pin_project_lite::pin_project; -use std::pin::Pin; +use core::pin::Pin; pin_project! { /// A stream that clones the elements of an underlying stream. diff --git a/src/stream/stream/cmp.rs b/src/stream/stream/cmp.rs index 2be0c1a3..9d2b0ecc 100644 --- a/src/stream/stream/cmp.rs +++ b/src/stream/stream/cmp.rs @@ -1,11 +1,11 @@ -use std::cmp::Ordering; -use std::pin::Pin; -use std::future::Future; +use core::cmp::Ordering; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; use super::fuse::Fuse; -use crate::prelude::*; +use crate::stream::stream::StreamExt; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/copied.rs b/src/stream/stream/copied.rs index 651c31b6..19296f5b 100644 --- a/src/stream/stream/copied.rs +++ b/src/stream/stream/copied.rs @@ -1,7 +1,7 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; use pin_project_lite::pin_project; -use std::pin::Pin; +use core::pin::Pin; pin_project! { /// A stream that copies the elements of an underlying stream. diff --git a/src/stream/stream/count.rs b/src/stream/stream/count.rs index ebf2a2f1..63e04497 100644 --- a/src/stream/stream/count.rs +++ b/src/stream/stream/count.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; @@ -9,7 +9,7 @@ use crate::task::{Context, Poll}; pin_project! { #[doc(hidden)] #[allow(missing_debug_implementations)] - #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub struct CountFuture { #[pin] diff --git a/src/stream/stream/cycle.rs b/src/stream/stream/cycle.rs index 5f8eaa20..ef46d1a7 100644 --- a/src/stream/stream/cycle.rs +++ b/src/stream/stream/cycle.rs @@ -1,5 +1,5 @@ -use std::mem::ManuallyDrop; -use std::pin::Pin; +use core::mem::ManuallyDrop; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/delay.rs b/src/stream/stream/delay.rs index 57687929..ff4c9373 100644 --- a/src/stream/stream/delay.rs +++ b/src/stream/stream/delay.rs @@ -1,6 +1,6 @@ -use std::future::Future; -use std::pin::Pin; -use std::time::Duration; +use core::future::Future; +use core::pin::Pin; +use core::time::Duration; use pin_project_lite::pin_project; diff --git a/src/stream/stream/enumerate.rs b/src/stream/stream/enumerate.rs index c4a37d6e..093fefbd 100644 --- a/src/stream/stream/enumerate.rs +++ b/src/stream/stream/enumerate.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/eq.rs b/src/stream/stream/eq.rs index 58ccc90e..3d8307b0 100644 --- a/src/stream/stream/eq.rs +++ b/src/stream/stream/eq.rs @@ -1,10 +1,10 @@ -use std::pin::Pin; -use std::future::Future; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; use super::fuse::Fuse; -use crate::prelude::*; +use crate::stream::stream::StreamExt; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index 00344b0e..2dc7dd48 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index 3cd1e47a..e43e8f09 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -1,5 +1,5 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::pin::Pin; +use core::task::{Context, Poll}; use pin_project_lite::pin_project; diff --git a/src/stream/stream/find.rs b/src/stream/stream/find.rs index 4a0749b1..8652ac09 100644 --- a/src/stream/stream/find.rs +++ b/src/stream/stream/find.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/find_map.rs b/src/stream/stream/find_map.rs index c7949439..f7e3c1e0 100644 --- a/src/stream/stream/find_map.rs +++ b/src/stream/stream/find_map.rs @@ -1,6 +1,6 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; use crate::stream::Stream; diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs index 6c828c92..e07893a9 100644 --- a/src/stream/stream/flat_map.rs +++ b/src/stream/stream/flat_map.rs @@ -1,9 +1,9 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; -use crate::prelude::*; use crate::stream::stream::map::Map; +use crate::stream::stream::StreamExt; use crate::stream::{IntoStream, Stream}; use crate::task::{Context, Poll}; @@ -41,7 +41,6 @@ where impl Stream for FlatMap where S: Stream, - S::Item: IntoStream, U: Stream, F: FnMut(S::Item) -> U, { diff --git a/src/stream/stream/flatten.rs b/src/stream/stream/flatten.rs index 1d6fcae6..f2e275c2 100644 --- a/src/stream/stream/flatten.rs +++ b/src/stream/stream/flatten.rs @@ -1,5 +1,5 @@ -use std::fmt; -use std::pin::Pin; +use core::fmt; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index a346eb67..3938a373 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index dce5cdae..dbada101 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -1,5 +1,5 @@ -use std::pin::Pin; -use std::future::Future; +use core::pin::Pin; +use core::future::Future; use pin_project_lite::pin_project; diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index c7449c27..f3a05962 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/ge.rs b/src/stream/stream/ge.rs index 67b20bed..1e1b70df 100644 --- a/src/stream/stream/ge.rs +++ b/src/stream/stream/ge.rs @@ -1,11 +1,11 @@ -use std::cmp::Ordering; -use std::pin::Pin; -use std::future::Future; +use core::cmp::Ordering; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; use super::partial_cmp::PartialCmpFuture; -use crate::prelude::*; +use crate::stream::stream::StreamExt; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/gt.rs b/src/stream/stream/gt.rs index 1c121891..d58e7e9e 100644 --- a/src/stream/stream/gt.rs +++ b/src/stream/stream/gt.rs @@ -1,11 +1,11 @@ -use std::cmp::Ordering; -use std::pin::Pin; -use std::future::Future; +use core::cmp::Ordering; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; use super::partial_cmp::PartialCmpFuture; -use crate::prelude::*; +use crate::stream::stream::StreamExt; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/inspect.rs b/src/stream/stream/inspect.rs index bb39662b..481810d7 100644 --- a/src/stream/stream/inspect.rs +++ b/src/stream/stream/inspect.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/last.rs b/src/stream/stream/last.rs index d037efcb..ebf1a484 100644 --- a/src/stream/stream/last.rs +++ b/src/stream/stream/last.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/le.rs b/src/stream/stream/le.rs index 7b86161c..169f9ece 100644 --- a/src/stream/stream/le.rs +++ b/src/stream/stream/le.rs @@ -1,11 +1,11 @@ -use std::cmp::Ordering; -use std::pin::Pin; -use std::future::Future; +use core::cmp::Ordering; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; use super::partial_cmp::PartialCmpFuture; -use crate::prelude::*; +use crate::stream::stream::StreamExt; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/lt.rs b/src/stream/stream/lt.rs index 100a0034..1851b8e4 100644 --- a/src/stream/stream/lt.rs +++ b/src/stream/stream/lt.rs @@ -1,11 +1,11 @@ -use std::cmp::Ordering; -use std::pin::Pin; -use std::future::Future; +use core::cmp::Ordering; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; use super::partial_cmp::PartialCmpFuture; -use crate::prelude::*; +use crate::stream::stream::StreamExt; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index 8e074a75..0eab3ce2 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/max.rs b/src/stream/stream/max.rs index d8ff119d..8a4d5944 100644 --- a/src/stream/stream/max.rs +++ b/src/stream/stream/max.rs @@ -1,7 +1,7 @@ -use std::cmp::{Ord, Ordering}; -use std::marker::PhantomData; -use std::pin::Pin; -use std::future::Future; +use core::cmp::{Ord, Ordering}; +use core::marker::PhantomData; +use core::pin::Pin; +use core::future::Future; use pin_project_lite::pin_project; diff --git a/src/stream/stream/max_by.rs b/src/stream/stream/max_by.rs index 36b876bb..8f986452 100644 --- a/src/stream/stream/max_by.rs +++ b/src/stream/stream/max_by.rs @@ -1,6 +1,6 @@ -use std::cmp::Ordering; -use std::pin::Pin; -use std::future::Future; +use core::cmp::Ordering; +use core::pin::Pin; +use core::future::Future; use pin_project_lite::pin_project; diff --git a/src/stream/stream/max_by_key.rs b/src/stream/stream/max_by_key.rs index e421f94a..8fa91ab9 100644 --- a/src/stream/stream/max_by_key.rs +++ b/src/stream/stream/max_by_key.rs @@ -1,6 +1,6 @@ -use std::cmp::Ordering; -use std::future::Future; -use std::pin::Pin; +use core::cmp::Ordering; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index 84ac4322..23209729 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -1,10 +1,11 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::pin::Pin; +use core::task::{Context, Poll}; use pin_project_lite::pin_project; -use crate::prelude::*; +use crate::stream::stream::StreamExt; use crate::stream::Fuse; +use crate::stream::Stream; use crate::utils; pin_project! { diff --git a/src/stream/stream/min.rs b/src/stream/stream/min.rs index 4ce52be9..4fe2a677 100644 --- a/src/stream/stream/min.rs +++ b/src/stream/stream/min.rs @@ -1,7 +1,7 @@ -use std::cmp::{Ord, Ordering}; -use std::marker::PhantomData; -use std::pin::Pin; -use std::future::Future; +use core::cmp::{Ord, Ordering}; +use core::marker::PhantomData; +use core::pin::Pin; +use core::future::Future; use pin_project_lite::pin_project; diff --git a/src/stream/stream/min_by.rs b/src/stream/stream/min_by.rs index e35719e6..fe1d40ea 100644 --- a/src/stream/stream/min_by.rs +++ b/src/stream/stream/min_by.rs @@ -1,6 +1,6 @@ -use std::cmp::Ordering; -use std::pin::Pin; -use std::future::Future; +use core::cmp::Ordering; +use core::pin::Pin; +use core::future::Future; use pin_project_lite::pin_project; diff --git a/src/stream/stream/min_by_key.rs b/src/stream/stream/min_by_key.rs index 07c3642a..549b7983 100644 --- a/src/stream/stream/min_by_key.rs +++ b/src/stream/stream/min_by_key.rs @@ -1,6 +1,6 @@ -use std::cmp::Ordering; -use std::future::Future; -use std::pin::Pin; +use core::cmp::Ordering; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 15724088..d0cc718e 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -110,12 +110,12 @@ pub use take::Take; pub use take_while::TakeWhile; pub use zip::Zip; -use std::cmp::Ordering; +use core::cmp::Ordering; cfg_unstable! { - use std::future::Future; - use std::pin::Pin; - use std::time::Duration; + use core::future::Future; + use core::pin::Pin; + use core::time::Duration; use crate::stream::into_stream::IntoStream; use crate::stream::{FromStream, Product, Sum}; @@ -350,12 +350,12 @@ extension_trait! { assert!(start.elapsed().as_millis() >= 15); s.next().await; - assert!(start.elapsed().as_millis() >= 35); + assert!(start.elapsed().as_millis() >= 25); # # }) } ``` "#] - #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn throttle(self, d: Duration) -> Throttle where @@ -587,13 +587,13 @@ extension_trait! { assert_eq!(s.next().await, Some(1)); // There will be no delay after the first time. - assert!(start.elapsed().as_millis() <= 210); + assert!(start.elapsed().as_millis() < 400); assert_eq!(s.next().await, Some(2)); - assert!(start.elapsed().as_millis() <= 210); + assert!(start.elapsed().as_millis() < 400); assert_eq!(s.next().await, None); - assert!(start.elapsed().as_millis() <= 210); + assert!(start.elapsed().as_millis() < 400); # # }) } ``` @@ -695,7 +695,7 @@ extension_trait! { # }) } ``` - An empty stream will return `None: + An empty stream will return `None`: ``` # fn main() { async_std::task::block_on(async { # @@ -791,18 +791,22 @@ extension_trait! { # async_std::task::block_on(async { use async_std::prelude::*; - use async_std::stream::IntoStream; use async_std::stream; - let inner1 = stream::from_iter(vec![1,2,3]); - let inner2 = stream::from_iter(vec![4,5,6]); - - let s = stream::from_iter(vec![inner1, inner2]); + let words = stream::from_iter(&["alpha", "beta", "gamma"]); - let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await; + let merged: String = words + .flat_map(|s| stream::from_iter(s.chars())) + .collect().await; + assert_eq!(merged, "alphabetagamma"); - assert_eq!(v, vec![1,2,3,4,5,6]); + let d3 = stream::from_iter(&[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]); + let d1: Vec<_> = d3 + .flat_map(|item| stream::from_iter(item)) + .flat_map(|item| stream::from_iter(item)) + .collect().await; + assert_eq!(d1, [&1, &2, &3, &4, &5, &6, &7, &8]); # }); ``` "#] @@ -1507,7 +1511,7 @@ extension_trait! { # }) } ``` "#] - #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn by_ref(&mut self) -> &mut Self { self @@ -1641,6 +1645,13 @@ extension_trait! { while let Some(v) = s.next().await { assert_eq!(v, Ok(1)); } + + // when timeout + let mut s = stream::pending::<()>().timeout(Duration::from_millis(10)); + match s.next().await { + Some(item) => assert!(item.is_err()), + None => panic!() + }; # # Ok(()) }) } ``` diff --git a/src/stream/stream/ne.rs b/src/stream/stream/ne.rs index ec11d1fd..c51ab31e 100644 --- a/src/stream/stream/ne.rs +++ b/src/stream/stream/ne.rs @@ -1,10 +1,10 @@ -use std::pin::Pin; -use std::future::Future; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; use super::fuse::Fuse; -use crate::prelude::*; +use crate::stream::stream::StreamExt; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/next.rs b/src/stream/stream/next.rs index 23abb0b4..7bd20832 100644 --- a/src/stream/stream/next.rs +++ b/src/stream/stream/next.rs @@ -1,5 +1,5 @@ -use std::pin::Pin; -use std::future::Future; +use core::pin::Pin; +use core::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/nth.rs b/src/stream/stream/nth.rs index 267bd40a..8cdabb6c 100644 --- a/src/stream/stream/nth.rs +++ b/src/stream/stream/nth.rs @@ -1,6 +1,6 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::future::Future; use crate::stream::Stream; diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs index 85587c99..928a03b0 100644 --- a/src/stream/stream/partial_cmp.rs +++ b/src/stream/stream/partial_cmp.rs @@ -1,11 +1,11 @@ -use std::cmp::Ordering; -use std::future::Future; -use std::pin::Pin; +use core::cmp::Ordering; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; use super::fuse::Fuse; -use crate::prelude::*; +use crate::stream::stream::StreamExt; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/partition.rs b/src/stream/stream/partition.rs index 7e2caad4..69268ecb 100644 --- a/src/stream/stream/partition.rs +++ b/src/stream/stream/partition.rs @@ -1,14 +1,14 @@ use pin_project_lite::pin_project; -use std::default::Default; -use std::future::Future; -use std::pin::Pin; +use core::default::Default; +use core::future::Future; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { #[derive(Debug)] - #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub struct PartitionFuture { #[pin] diff --git a/src/stream/stream/position.rs b/src/stream/stream/position.rs index df60eaae..2811b6d8 100644 --- a/src/stream/stream/position.rs +++ b/src/stream/stream/position.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index 385edf8e..d72b0dc2 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index bcff50d6..52b137d0 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -1,5 +1,5 @@ -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::pin::Pin; +use core::task::{Context, Poll}; use pin_project_lite::pin_project; diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 23347132..d139de4d 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/step_by.rs b/src/stream/stream/step_by.rs index 2149cdad..3dd3d625 100644 --- a/src/stream/stream/step_by.rs +++ b/src/stream/stream/step_by.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 8c852276..ffc3e993 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index 2ba8490e..60eb8c51 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs index f580360d..ce658c83 100644 --- a/src/stream/stream/timeout.rs +++ b/src/stream/stream/timeout.rs @@ -1,8 +1,8 @@ use std::error::Error; use std::fmt; +use std::future::Future; use std::pin::Pin; use std::time::Duration; -use std::future::Future; use futures_timer::Delay; use pin_project_lite::pin_project; diff --git a/src/stream/stream/try_fold.rs b/src/stream/stream/try_fold.rs index 3b92d95a..73312ff7 100644 --- a/src/stream/stream/try_fold.rs +++ b/src/stream/stream/try_fold.rs @@ -1,4 +1,4 @@ -use std::pin::Pin; +use core::pin::Pin; use crate::future::Future; use crate::stream::Stream; diff --git a/src/stream/stream/try_for_each.rs b/src/stream/stream/try_for_each.rs index 86f1674a..917bfd16 100644 --- a/src/stream/stream/try_for_each.rs +++ b/src/stream/stream/try_for_each.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs index e0832ff7..7771509a 100644 --- a/src/stream/stream/unzip.rs +++ b/src/stream/stream/unzip.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use pin_project_lite::pin_project; @@ -8,7 +8,7 @@ use crate::task::{Context, Poll}; pin_project! { #[derive(Clone, Debug)] - #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub struct UnzipFuture { #[pin] diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 597691b4..83d613c8 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -1,5 +1,5 @@ -use std::fmt; -use std::pin::Pin; +use core::fmt; +use core::pin::Pin; use pin_project_lite::pin_project; diff --git a/src/stream/successors.rs b/src/stream/successors.rs index 4421564e..a9ce40ff 100644 --- a/src/stream/successors.rs +++ b/src/stream/successors.rs @@ -1,5 +1,5 @@ -use std::mem; -use std::pin::Pin; +use core::mem; +use core::pin::Pin; use crate::stream::Stream; use crate::task::{Context, Poll}; diff --git a/src/stream/sum.rs b/src/stream/sum.rs index 9607bafa..3b3144e5 100644 --- a/src/stream/sum.rs +++ b/src/stream/sum.rs @@ -1,5 +1,5 @@ -use std::future::Future; -use std::pin::Pin; +use core::future::Future; +use core::pin::Pin; use crate::stream::Stream; diff --git a/src/sync/channel.rs b/src/sync/channel.rs index 2647f650..8ab1cc12 100644 --- a/src/sync/channel.rs +++ b/src/sync/channel.rs @@ -1,5 +1,6 @@ use std::cell::UnsafeCell; -use std::fmt; +use std::error::Error; +use std::fmt::{self, Debug, Display}; use std::future::Future; use std::isize; use std::marker::PhantomData; @@ -31,6 +32,7 @@ use crate::sync::WakerSet; /// # Examples /// /// ``` +/// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use std::time::Duration; @@ -41,7 +43,7 @@ use crate::sync::WakerSet; /// let (s, r) = channel(1); /// /// // This call returns immediately because there is enough space in the channel. -/// s.send(1).await; +/// s.send(1usize).await; /// /// task::spawn(async move { /// // This call will have to wait because the channel is full. @@ -50,10 +52,11 @@ use crate::sync::WakerSet; /// }); /// /// task::sleep(Duration::from_secs(1)).await; -/// assert_eq!(r.recv().await, Some(1)); -/// assert_eq!(r.recv().await, Some(2)); +/// assert_eq!(r.recv().await?, 1); +/// assert_eq!(r.recv().await?, 2); +/// # Ok(()) /// # -/// # }) +/// # }) } /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] @@ -112,6 +115,7 @@ impl Sender { /// # Examples /// /// ``` + /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; @@ -124,11 +128,12 @@ impl Sender { /// s.send(2).await; /// }); /// - /// assert_eq!(r.recv().await, Some(1)); - /// assert_eq!(r.recv().await, Some(2)); - /// assert_eq!(r.recv().await, None); + /// assert_eq!(r.recv().await?, 1); + /// assert_eq!(r.recv().await?, 2); + /// assert!(r.recv().await.is_err()); /// # - /// # }) + /// # Ok(()) + /// # }) } /// ``` pub async fn send(&self, msg: T) { struct SendFuture<'a, T> { @@ -192,6 +197,27 @@ impl Sender { .await } + /// Attempts to send a message into the channel. + /// + /// If the channel is full, this method will return an error. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// assert!(s.try_send(1).is_ok()); + /// assert!(s.try_send(2).is_err()); + /// # + /// # }) + /// ``` + pub fn try_send(&self, msg: T) -> Result<(), TrySendError> { + self.channel.try_send(msg) + } + /// Returns the channel capacity. /// /// # Examples @@ -313,6 +339,7 @@ impl fmt::Debug for Sender { /// # Examples /// /// ``` +/// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use std::time::Duration; @@ -323,15 +350,16 @@ impl fmt::Debug for Sender { /// let (s, r) = channel(100); /// /// task::spawn(async move { -/// s.send(1).await; +/// s.send(1usize).await; /// task::sleep(Duration::from_secs(1)).await; /// s.send(2).await; /// }); /// -/// assert_eq!(r.recv().await, Some(1)); // Received immediately. -/// assert_eq!(r.recv().await, Some(2)); // Received after 1 second. +/// assert_eq!(r.recv().await?, 1); // Received immediately. +/// assert_eq!(r.recv().await?, 2); // Received after 1 second. /// # -/// # }) +/// # Ok(()) +/// # }) } /// ``` #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] @@ -346,12 +374,14 @@ pub struct Receiver { impl Receiver { /// Receives a message from the channel. /// - /// If the channel is empty and still has senders, this method will wait until a message is - /// sent into the channel or until all senders get dropped. + /// If the channel is empty and still has senders, this method + /// will wait until a message is sent into it. Once all senders + /// have been dropped it will return `None`. /// /// # Examples /// /// ``` + /// # fn main() -> Result<(), async_std::sync::RecvError> { /// # async_std::task::block_on(async { /// # /// use async_std::sync::channel; @@ -360,24 +390,26 @@ impl Receiver { /// let (s, r) = channel(1); /// /// task::spawn(async move { - /// s.send(1).await; + /// s.send(1usize).await; /// s.send(2).await; + /// // Then we drop the sender /// }); /// - /// assert_eq!(r.recv().await, Some(1)); - /// assert_eq!(r.recv().await, Some(2)); - /// assert_eq!(r.recv().await, None); + /// assert_eq!(r.recv().await?, 1); + /// assert_eq!(r.recv().await?, 2); + /// assert!(r.recv().await.is_err()); /// # - /// # }) + /// # Ok(()) + /// # }) } /// ``` - pub async fn recv(&self) -> Option { + pub async fn recv(&self) -> Result { struct RecvFuture<'a, T> { channel: &'a Channel, opt_key: Option, } impl Future for RecvFuture<'_, T> { - type Output = Option; + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { poll_recv( @@ -405,6 +437,30 @@ impl Receiver { .await } + /// Attempts to receive a message from the channel. + /// + /// If the channel is empty, this method will return an error. + /// + /// # Examples + /// + /// ``` + /// # async_std::task::block_on(async { + /// # + /// use async_std::sync::channel; + /// + /// let (s, r) = channel(1); + /// + /// s.send(1u8).await; + /// + /// assert!(r.try_recv().is_ok()); + /// assert!(r.try_recv().is_err()); + /// # + /// # }) + /// ``` + pub fn try_recv(&self) -> Result { + self.channel.try_recv() + } + /// Returns the channel capacity. /// /// # Examples @@ -519,12 +575,13 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = &mut *self; - poll_recv( + let res = futures_core::ready!(poll_recv( &this.channel, &this.channel.stream_wakers, &mut this.opt_key, cx, - ) + )); + Poll::Ready(res.ok()) } } @@ -543,7 +600,7 @@ fn poll_recv( wakers: &WakerSet, opt_key: &mut Option, cx: &mut Context<'_>, -) -> Poll> { +) -> Poll> { loop { // If the current task is in the set, remove it. if let Some(key) = opt_key.take() { @@ -552,8 +609,8 @@ fn poll_recv( // Try receiving a message. match channel.try_recv() { - Ok(msg) => return Poll::Ready(Some(msg)), - Err(TryRecvError::Disconnected) => return Poll::Ready(None), + Ok(msg) => return Poll::Ready(Ok(msg)), + Err(TryRecvError::Disconnected) => return Poll::Ready(Err(RecvError {})), Err(TryRecvError::Empty) => { // Insert this receive operation. *opt_key = Some(wakers.insert(cx)); @@ -932,8 +989,10 @@ impl Drop for Channel { } } -/// An error returned from the `try_send()` method. -enum TrySendError { +/// An error returned from the `try_send` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +pub enum TrySendError { /// The channel is full but not disconnected. Full(T), @@ -941,11 +1000,59 @@ enum TrySendError { Disconnected(T), } -/// An error returned from the `try_recv()` method. -enum TryRecvError { +impl Error for TrySendError {} + +impl Debug for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Full(_) => Debug::fmt("Full", f), + Self::Disconnected(_) => Debug::fmt("Disconnected", f), + } + } +} + +impl Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Full(_) => Display::fmt("The channel is full.", f), + Self::Disconnected(_) => Display::fmt("The channel is full and disconnected.", f), + } + } +} + +/// An error returned from the `try_recv` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[derive(Debug)] +pub enum TryRecvError { /// The channel is empty but not disconnected. Empty, /// The channel is empty and disconnected. Disconnected, } + +impl Error for TryRecvError {} + +impl Display for TryRecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Empty => Display::fmt("The channel is empty.", f), + Self::Disconnected => Display::fmt("The channel is empty and disconnected.", f), + } + } +} + +/// An error returned from the `recv` method. +#[cfg(feature = "unstable")] +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[derive(Debug)] +pub struct RecvError; + +impl Error for RecvError {} + +impl Display for RecvError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt("The channel is empty.", f) + } +} diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 088c520b..c2211656 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -184,7 +184,7 @@ mod rwlock; cfg_unstable! { pub use barrier::{Barrier, BarrierWaitResult}; - pub use channel::{channel, Sender, Receiver}; + pub use channel::{channel, Sender, Receiver, RecvError, TryRecvError, TrySendError}; mod barrier; mod channel; diff --git a/src/sync/mutex.rs b/src/sync/mutex.rs index 4d2cf251..c62b5616 100644 --- a/src/sync/mutex.rs +++ b/src/sync/mutex.rs @@ -39,14 +39,14 @@ use crate::task::{Context, Poll}; /// # /// # }) /// ``` -pub struct Mutex { +pub struct Mutex { locked: AtomicBool, wakers: WakerSet, value: UnsafeCell, } -unsafe impl Send for Mutex {} -unsafe impl Sync for Mutex {} +unsafe impl Send for Mutex {} +unsafe impl Sync for Mutex {} impl Mutex { /// Creates a new mutex. @@ -65,7 +65,9 @@ impl Mutex { value: UnsafeCell::new(t), } } +} +impl Mutex { /// Acquires the lock. /// /// Returns a guard that releases the lock when dropped. @@ -91,12 +93,12 @@ impl Mutex { /// # }) /// ``` pub async fn lock(&self) -> MutexGuard<'_, T> { - pub struct LockFuture<'a, T> { + pub struct LockFuture<'a, T: ?Sized> { mutex: &'a Mutex, opt_key: Option, } - impl<'a, T> Future for LockFuture<'a, T> { + impl<'a, T: ?Sized> Future for LockFuture<'a, T> { type Output = MutexGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -123,7 +125,7 @@ impl Mutex { } } - impl Drop for LockFuture<'_, T> { + impl Drop for LockFuture<'_, T> { fn drop(&mut self) { // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { @@ -189,7 +191,7 @@ impl Mutex { /// let mutex = Mutex::new(10); /// assert_eq!(mutex.into_inner(), 10); /// ``` - pub fn into_inner(self) -> T { + pub fn into_inner(self) -> T where T: Sized { self.value.into_inner() } @@ -216,7 +218,7 @@ impl Mutex { } } -impl fmt::Debug for Mutex { +impl fmt::Debug for Mutex { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { struct Locked; impl fmt::Debug for Locked { @@ -238,19 +240,19 @@ impl From for Mutex { } } -impl Default for Mutex { +impl Default for Mutex { fn default() -> Mutex { Mutex::new(Default::default()) } } /// A guard that releases the lock when dropped. -pub struct MutexGuard<'a, T>(&'a Mutex); +pub struct MutexGuard<'a, T: ?Sized>(&'a Mutex); -unsafe impl Send for MutexGuard<'_, T> {} -unsafe impl Sync for MutexGuard<'_, T> {} +unsafe impl Send for MutexGuard<'_, T> {} +unsafe impl Sync for MutexGuard<'_, T> {} -impl Drop for MutexGuard<'_, T> { +impl Drop for MutexGuard<'_, T> { fn drop(&mut self) { // Use `SeqCst` ordering to synchronize with `WakerSet::insert()` and `WakerSet::update()`. self.0.locked.store(false, Ordering::SeqCst); @@ -260,19 +262,19 @@ impl Drop for MutexGuard<'_, T> { } } -impl fmt::Debug for MutexGuard<'_, T> { +impl fmt::Debug for MutexGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&**self, f) } } -impl fmt::Display for MutexGuard<'_, T> { +impl fmt::Display for MutexGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { (**self).fmt(f) } } -impl Deref for MutexGuard<'_, T> { +impl Deref for MutexGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -280,7 +282,7 @@ impl Deref for MutexGuard<'_, T> { } } -impl DerefMut for MutexGuard<'_, T> { +impl DerefMut for MutexGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.0.value.get() } } diff --git a/src/sync/rwlock.rs b/src/sync/rwlock.rs index bc3f6405..08d8ed84 100644 --- a/src/sync/rwlock.rs +++ b/src/sync/rwlock.rs @@ -49,15 +49,15 @@ const READ_COUNT_MASK: usize = !(ONE_READ - 1); /// # /// # }) /// ``` -pub struct RwLock { +pub struct RwLock { state: AtomicUsize, read_wakers: WakerSet, write_wakers: WakerSet, value: UnsafeCell, } -unsafe impl Send for RwLock {} -unsafe impl Sync for RwLock {} +unsafe impl Send for RwLock {} +unsafe impl Sync for RwLock {} impl RwLock { /// Creates a new reader-writer lock. @@ -77,7 +77,9 @@ impl RwLock { value: UnsafeCell::new(t), } } +} +impl RwLock { /// Acquires a read lock. /// /// Returns a guard that releases the lock when dropped. @@ -99,12 +101,12 @@ impl RwLock { /// # }) /// ``` pub async fn read(&self) -> RwLockReadGuard<'_, T> { - pub struct ReadFuture<'a, T> { + pub struct ReadFuture<'a, T: ?Sized> { lock: &'a RwLock, opt_key: Option, } - impl<'a, T> Future for ReadFuture<'a, T> { + impl<'a, T: ?Sized> Future for ReadFuture<'a, T> { type Output = RwLockReadGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -131,7 +133,7 @@ impl RwLock { } } - impl Drop for ReadFuture<'_, T> { + impl Drop for ReadFuture<'_, T> { fn drop(&mut self) { // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { @@ -224,12 +226,12 @@ impl RwLock { /// # }) /// ``` pub async fn write(&self) -> RwLockWriteGuard<'_, T> { - pub struct WriteFuture<'a, T> { + pub struct WriteFuture<'a, T: ?Sized> { lock: &'a RwLock, opt_key: Option, } - impl<'a, T> Future for WriteFuture<'a, T> { + impl<'a, T: ?Sized> Future for WriteFuture<'a, T> { type Output = RwLockWriteGuard<'a, T>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -256,7 +258,7 @@ impl RwLock { } } - impl Drop for WriteFuture<'_, T> { + impl Drop for WriteFuture<'_, T> { fn drop(&mut self) { // If the current task is still in the set, that means it is being cancelled now. if let Some(key) = self.opt_key { @@ -316,7 +318,7 @@ impl RwLock { /// let lock = RwLock::new(10); /// assert_eq!(lock.into_inner(), 10); /// ``` - pub fn into_inner(self) -> T { + pub fn into_inner(self) -> T where T: Sized { self.value.into_inner() } @@ -343,7 +345,7 @@ impl RwLock { } } -impl fmt::Debug for RwLock { +impl fmt::Debug for RwLock { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { struct Locked; impl fmt::Debug for Locked { @@ -365,19 +367,19 @@ impl From for RwLock { } } -impl Default for RwLock { +impl Default for RwLock { fn default() -> RwLock { RwLock::new(Default::default()) } } /// A guard that releases the read lock when dropped. -pub struct RwLockReadGuard<'a, T>(&'a RwLock); +pub struct RwLockReadGuard<'a, T: ?Sized>(&'a RwLock); -unsafe impl Send for RwLockReadGuard<'_, T> {} -unsafe impl Sync for RwLockReadGuard<'_, T> {} +unsafe impl Send for RwLockReadGuard<'_, T> {} +unsafe impl Sync for RwLockReadGuard<'_, T> {} -impl Drop for RwLockReadGuard<'_, T> { +impl Drop for RwLockReadGuard<'_, T> { fn drop(&mut self) { let state = self.0.state.fetch_sub(ONE_READ, Ordering::SeqCst); @@ -388,19 +390,19 @@ impl Drop for RwLockReadGuard<'_, T> { } } -impl fmt::Debug for RwLockReadGuard<'_, T> { +impl fmt::Debug for RwLockReadGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&**self, f) } } -impl fmt::Display for RwLockReadGuard<'_, T> { +impl fmt::Display for RwLockReadGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { (**self).fmt(f) } } -impl Deref for RwLockReadGuard<'_, T> { +impl Deref for RwLockReadGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -409,12 +411,12 @@ impl Deref for RwLockReadGuard<'_, T> { } /// A guard that releases the write lock when dropped. -pub struct RwLockWriteGuard<'a, T>(&'a RwLock); +pub struct RwLockWriteGuard<'a, T: ?Sized>(&'a RwLock); -unsafe impl Send for RwLockWriteGuard<'_, T> {} -unsafe impl Sync for RwLockWriteGuard<'_, T> {} +unsafe impl Send for RwLockWriteGuard<'_, T> {} +unsafe impl Sync for RwLockWriteGuard<'_, T> {} -impl Drop for RwLockWriteGuard<'_, T> { +impl Drop for RwLockWriteGuard<'_, T> { fn drop(&mut self) { self.0.state.store(0, Ordering::SeqCst); @@ -427,19 +429,19 @@ impl Drop for RwLockWriteGuard<'_, T> { } } -impl fmt::Debug for RwLockWriteGuard<'_, T> { +impl fmt::Debug for RwLockWriteGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fmt::Debug::fmt(&**self, f) } } -impl fmt::Display for RwLockWriteGuard<'_, T> { +impl fmt::Display for RwLockWriteGuard<'_, T> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { (**self).fmt(f) } } -impl Deref for RwLockWriteGuard<'_, T> { +impl Deref for RwLockWriteGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -447,7 +449,7 @@ impl Deref for RwLockWriteGuard<'_, T> { } } -impl DerefMut for RwLockWriteGuard<'_, T> { +impl DerefMut for RwLockWriteGuard<'_, T> { fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.0.value.get() } } diff --git a/src/task/mod.rs b/src/task/mod.rs index 075e71dd..56224a36 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -37,7 +37,7 @@ //! outlive its parent (the task that spawned it), unless this parent is the root task. //! //! The root task can also wait on the completion of the child task; a call to [`spawn`] produces a -//! [`JoinHandle`], which provides implements `Future` and can be `await`ed: +//! [`JoinHandle`], which implements `Future` and can be `await`ed: //! //! ``` //! use async_std::task; @@ -58,7 +58,7 @@ //! ## Configuring tasks //! //! A new task can be configured before it is spawned via the [`Builder`] type, -//! which currently allows you to set the name and stack size for the child task: +//! which currently allows you to set the name for the child task: //! //! ``` //! # #![allow(unused_must_use)] @@ -110,7 +110,6 @@ //! [`join`]: struct.JoinHandle.html#method.join //! [`panic!`]: https://doc.rust-lang.org/std/macro.panic.html //! [`Builder`]: struct.Builder.html -//! [`Builder::stack_size`]: struct.Builder.html#method.stack_size //! [`Builder::name`]: struct.Builder.html#method.name //! [`task::current`]: fn.current.html //! [`Task`]: struct.Task.html @@ -118,13 +117,15 @@ //! [`task_local!`]: ../macro.task_local.html //! [`with`]: struct.LocalKey.html#method.with -cfg_std! { +cfg_alloc! { #[doc(inline)] - pub use std::task::{Context, Poll, Waker}; + pub use core::task::{Context, Poll, Waker}; + pub use ready::ready; - #[doc(inline)] - pub use async_macros::ready; + mod ready; +} +cfg_std! { pub use yield_now::yield_now; mod yield_now; } diff --git a/src/task/ready.rs b/src/task/ready.rs new file mode 100644 index 00000000..aca04aa7 --- /dev/null +++ b/src/task/ready.rs @@ -0,0 +1,4 @@ +/// Extracts the successful type of a `Poll`. +/// +/// This macro bakes in propagation of `Pending` signals by returning early. +pub use futures_core::ready; diff --git a/src/unit/from_stream.rs b/src/unit/from_stream.rs index da216e22..7b784383 100644 --- a/src/unit/from_stream.rs +++ b/src/unit/from_stream.rs @@ -8,6 +8,6 @@ impl FromStream<()> for () { fn from_stream<'a, S: IntoStream + 'a>( stream: S, ) -> Pin + 'a>> { - Box::pin(stream.into_stream().for_each(|_| ())) + Box::pin(stream.into_stream().for_each(drop)) } } diff --git a/src/utils.rs b/src/utils.rs index c8984ba8..271a8579 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,3 +1,5 @@ +use alloc::string::String; + /// Calls a function and aborts if it panics. /// /// This is useful in unsafe code where we can't recover from panics. @@ -19,7 +21,7 @@ pub fn abort_on_panic(f: impl FnOnce() -> T) -> T { } /// Generates a random number in `0..n`. -#[cfg(feature = "default")] +#[cfg(any(feature = "unstable", feature = "default"))] pub fn random(n: u32) -> u32 { use std::cell::Cell; use std::num::Wrapping; @@ -90,8 +92,21 @@ macro_rules! cfg_unstable { } } +/// Declares unstable and default items. +#[doc(hidden)] +macro_rules! cfg_unstable_default { + ($($item:item)*) => { + $( + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(unstable))] + $item + )* + } +} + /// Declares Unix-specific items. #[doc(hidden)] +#[allow(unused_macros)] macro_rules! cfg_unix { ($($item:item)*) => { $( @@ -104,6 +119,7 @@ macro_rules! cfg_unix { /// Declares Windows-specific items. #[doc(hidden)] +#[allow(unused_macros)] macro_rules! cfg_windows { ($($item:item)*) => { $( @@ -116,6 +132,7 @@ macro_rules! cfg_windows { /// Declares items when the "docs" feature is enabled. #[doc(hidden)] +#[allow(unused_macros)] macro_rules! cfg_docs { ($($item:item)*) => { $( @@ -127,6 +144,7 @@ macro_rules! cfg_docs { /// Declares items when the "docs" feature is disabled. #[doc(hidden)] +#[allow(unused_macros)] macro_rules! cfg_not_docs { ($($item:item)*) => { $( @@ -148,6 +166,18 @@ macro_rules! cfg_std { } } +/// Declares no-std items. +#[allow(unused_macros)] +#[doc(hidden)] +macro_rules! cfg_alloc { + ($($item:item)*) => { + $( + #[cfg(feature = "alloc")] + $item + )* + } +} + /// Declares default items. #[allow(unused_macros)] #[doc(hidden)] @@ -168,6 +198,7 @@ macro_rules! cfg_default { /// /// Inside invocations of this macro, we write a definitions that looks similar to the final /// rendered docs, and the macro then generates all the boilerplate for us. +#[allow(unused_macros)] #[doc(hidden)] macro_rules! extension_trait { ( @@ -192,14 +223,14 @@ macro_rules! extension_trait { #[allow(dead_code)] mod owned { #[doc(hidden)] - pub struct ImplFuture(std::marker::PhantomData); + pub struct ImplFuture(core::marker::PhantomData); } // A fake `impl Future` type that borrows its environment. #[allow(dead_code)] mod borrowed { #[doc(hidden)] - pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>); + pub struct ImplFuture<'a, T>(core::marker::PhantomData<&'a T>); } // Render a fake trait combining the bodies of the base trait and the extension trait. @@ -226,11 +257,6 @@ macro_rules! extension_trait { $(#[cfg(feature = "docs")] $imp)* }; - // Optimization: expand `$head` eagerly before starting a new method definition. - (@ext ($($head:tt)*) #[doc = $d:literal] $($tail:tt)*) => { - $($head)* extension_trait!(@ext (#[doc = $d]) $($tail)*); - }; - // Parse the return type in an extension method. (@doc ($($head:tt)*) -> impl Future $(+ $lt:lifetime)? [$f:ty] $($tail:tt)*) => { extension_trait!(@doc ($($head)* -> owned::ImplFuture<$out>) $($tail)*); diff --git a/tests/channel.rs b/tests/channel.rs index 34bd888f..f3029060 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -18,13 +18,13 @@ fn smoke() { let (s, r) = channel(1); s.send(7).await; - assert_eq!(r.recv().await, Some(7)); + assert_eq!(r.recv().await.unwrap(), 7); s.send(8).await; - assert_eq!(r.recv().await, Some(8)); + assert_eq!(r.recv().await.unwrap(), 8); drop(s); - assert_eq!(r.recv().await, None); + assert!(r.recv().await.is_err()); }); task::block_on(async { @@ -74,7 +74,7 @@ fn len_empty_full() { assert_eq!(r.is_empty(), false); assert_eq!(r.is_full(), true); - r.recv().await; + let _ = r.recv().await; assert_eq!(s.len(), 1); assert_eq!(s.is_empty(), false); @@ -91,12 +91,12 @@ fn recv() { let (s, r) = channel(100); task::spawn(async move { - assert_eq!(r.recv().await, Some(7)); + assert_eq!(r.recv().await.unwrap(), 7); task::sleep(ms(1000)).await; - assert_eq!(r.recv().await, Some(8)); + assert_eq!(r.recv().await.unwrap(), 8); task::sleep(ms(1000)).await; - assert_eq!(r.recv().await, Some(9)); - assert_eq!(r.recv().await, None); + assert_eq!(r.recv().await.unwrap(), 9); + assert!(r.recv().await.is_err()); }); task::sleep(ms(1500)).await; @@ -122,9 +122,9 @@ fn send() { }); task::sleep(ms(1500)).await; - assert_eq!(r.recv().await, Some(7)); - assert_eq!(r.recv().await, Some(8)); - assert_eq!(r.recv().await, Some(9)); + assert_eq!(r.recv().await.unwrap(), 7); + assert_eq!(r.recv().await.unwrap(), 8); + assert_eq!(r.recv().await.unwrap(), 9); }) } @@ -139,10 +139,10 @@ fn recv_after_disconnect() { drop(s); - assert_eq!(r.recv().await, Some(1)); - assert_eq!(r.recv().await, Some(2)); - assert_eq!(r.recv().await, Some(3)); - assert_eq!(r.recv().await, None); + assert_eq!(r.recv().await.unwrap(), 1); + assert_eq!(r.recv().await.unwrap(), 2); + assert_eq!(r.recv().await.unwrap(), 3); + assert!(r.recv().await.is_err()); }) } @@ -164,7 +164,7 @@ fn len() { } for i in 0..50 { - r.recv().await; + let _ = r.recv().await; assert_eq!(r.len(), 50 - i - 1); } } @@ -188,7 +188,7 @@ fn len() { let r = r.clone(); async move { for i in 0..COUNT { - assert_eq!(r.recv().await, Some(i)); + assert_eq!(r.recv().await.unwrap(), i); let len = r.len(); assert!(len <= CAP); } @@ -214,7 +214,7 @@ fn disconnect_wakes_receiver() { let (s, r) = channel::<()>(1); let child = task::spawn(async move { - assert_eq!(r.recv().await, None); + assert!(r.recv().await.is_err()); }); task::sleep(ms(1000)).await; @@ -233,9 +233,9 @@ fn spsc() { let child = task::spawn(async move { for i in 0..COUNT { - assert_eq!(r.recv().await, Some(i)); + assert_eq!(r.recv().await.unwrap(), i); } - assert_eq!(r.recv().await, None); + assert!(r.recv().await.is_err()); }); for i in 0..COUNT { diff --git a/tests/tcp.rs b/tests/tcp.rs index 00fa3a04..d92cff0d 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -94,3 +94,25 @@ fn smoke_async_stream_to_std_listener() -> io::Result<()> { Ok(()) } + +#[test] +fn cloned_streams() -> io::Result<()> { + task::block_on(async { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + let mut stream = TcpStream::connect(&addr).await?; + let mut cloned_stream = stream.clone(); + let mut incoming = listener.incoming(); + let mut write_stream = incoming.next().await.unwrap()?; + write_stream.write_all(b"Each your doing").await?; + + let mut buf = [0; 15]; + stream.read_exact(&mut buf[..8]).await?; + cloned_stream.read_exact(&mut buf[8..]).await?; + + assert_eq!(&buf[..15], b"Each your doing"); + + Ok(()) + }) +} diff --git a/tests/verbose_errors.rs b/tests/verbose_errors.rs index 15630928..17d42611 100644 --- a/tests/verbose_errors.rs +++ b/tests/verbose_errors.rs @@ -8,7 +8,7 @@ fn open_file() { match res { Ok(_) => panic!("Found file with random name: We live in a simulation"), Err(e) => assert_eq!( - "Could not open `/ashjudlkahasdasdsikdhajik/asdasdasdasdasdasd/fjuiklashdbflasas`", + "could not open `/ashjudlkahasdasdsikdhajik/asdasdasdasdasdasd/fjuiklashdbflasas`", &format!("{}", e) ), }