Merge branch 'master' into new-scheduler

pull/733/head
k-nasa 5 years ago
commit 84e5c5f351

@ -40,6 +40,7 @@ jobs:
with: with:
command: check command: check
args: --features unstable --all --bins --examples --tests args: --features unstable --all --bins --examples --tests
- name: check bench - name: check bench
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
if: matrix.rust == 'nightly' if: matrix.rust == 'nightly'
@ -63,7 +64,31 @@ jobs:
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
with: with:
command: test command: test
args: --all --features unstable attributes args: --all --features "unstable attributes"
- name: documentation test
uses: actions-rs/cargo@v1
with:
command: test
args: --doc --features "unstable attributes"
build__with_no_std:
name: Build with no-std
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: setup
run: |
rustup default nightly
rustup target add thumbv7m-none-eabi
- name: check no_std
uses: actions-rs/cargo@v1
with:
command: check
args: --no-default-features --features alloc --target thumbv7m-none-eabi -Z avoid-dev-deps
check_fmt_and_docs: check_fmt_and_docs:
name: Checking fmt and docs name: Checking fmt and docs

@ -7,6 +7,130 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview
## [Unreleased] ## [Unreleased]
# [1.5.0] - 2020-02-03
[API Documentation](https://docs.rs/async-std/1.5.0/async-std)
This patch includes various quality of life improvements to async-std.
Including improved performance, stability, and the addition of various
`Clone` impls that replace the use of `Arc` in many cases.
## Added
- Added links to various ecosystem projects from the README ([#660](https://github.com/async-rs/async-std/pull/660))
- Added an example on `FromStream` for `Result<T, E>` ([#643](https://github.com/async-rs/async-std/pull/643))
- Added `stream::pending` as "unstable" ([#615](https://github.com/async-rs/async-std/pull/615))
- Added an example of `stream::timeout` to document the error flow ([#675](https://github.com/async-rs/async-std/pull/675))
- Implement `Clone` for `DirEntry` ([#682](https://github.com/async-rs/async-std/pull/682))
- Implement `Clone` for `TcpStream` ([#689](https://github.com/async-rs/async-std/pull/689))
## Changed
- Removed internal comment on `stream::Interval` ([#645](https://github.com/async-rs/async-std/pull/645))
- The "unstable" feature can now be used without requiring the "default" feature ([#647](https://github.com/async-rs/async-std/pull/647))
- Removed unnecessary trait bound on `stream::FlatMap` ([#651](https://github.com/async-rs/async-std/pull/651))
- Updated the "broadcaster" dependency used by "unstable" to `1.0.0` ([#681](https://github.com/async-rs/async-std/pull/681))
- Updated `async-task` to 1.2.1 ([#676](https://github.com/async-rs/async-std/pull/676))
- `task::block_on` now parks after a single poll, improving performance in many cases ([#684](https://github.com/async-rs/async-std/pull/684))
- Improved reading flow of the "client" part of the async-std tutorial ([#550](https://github.com/async-rs/async-std/pull/550))
- Use `take_while` instead of `scan` in `impl` of `Product`, `Sum` and `FromStream` ([#667](https://github.com/async-rs/async-std/pull/667))
- `TcpStream::connect` no longer uses a thread from the threadpool, improving performance ([#687](https://github.com/async-rs/async-std/pull/687))
## Fixed
- Fixed crate documentation typo ([#655](https://github.com/async-rs/async-std/pull/655))
- Fixed documentation for `UdpSocket::recv` ([#648](https://github.com/async-rs/async-std/pull/648))
- Fixed documentation for `UdpSocket::send` ([#671](https://github.com/async-rs/async-std/pull/671))
- Fixed typo in stream documentation ([#650](https://github.com/async-rs/async-std/pull/650))
- Fixed typo on `sync::JoinHandle` documentation ([#659](https://github.com/async-rs/async-std/pull/659))
- Removed use of `std::error::Error::description` which failed CI ([#661](https://github.com/async-rs/async-std/pull/662))
- Removed the use of rustfmt's unstable `format_code_in_doc_comments` option which failed CI ([#685](https://github.com/async-rs/async-std/pull/685))
- Fixed a code typo in the `task::sleep` example ([#688](https://github.com/async-rs/async-std/pull/688))
# [1.4.0] - 2019-12-20
[API Documentation](https://docs.rs/async-std/1.4.0/async-std)
This patch adds `Future::timeout`, providing a method counterpart to the
`future::timeout` free function. And includes several bug fixes around missing
APIs. Notably we're not shipping our new executor yet, first announced [on our
blog](https://async.rs/blog/stop-worrying-about-blocking-the-new-async-std-runtime/).
## Examples
```rust
use async_std::prelude::*;
use async_std::future;
use std::time::Duration;
let fut = future::pending::<()>(); // This future will never resolve.
let res = fut.timeout(Duration::from_millis(100)).await;
assert!(res.is_err()); // The future timed out, returning an err.
```
## Added
- Added `Future::timeout` as "unstable" [(#600)](https://github.com/async-rs/async-std/pull/600)
## Fixes
- Fixed a doc test and enabled it on CI [(#597)](https://github.com/async-rs/async-std/pull/597)
- Fixed a rendering issue with the `stream` submodule documentation [(#621)](https://github.com/async-rs/async-std/pull/621)
- `Write::write_fmt`'s future is now correctly marked as `#[must_use]` [(#628)](https://github.com/async-rs/async-std/pull/628)
- Fixed the missing `io::Bytes` export [(#633)](https://github.com/async-rs/async-std/pull/633)
- Fixed the missing `io::Chain` export [(#633)](https://github.com/async-rs/async-std/pull/633)
- Fixed the missing `io::Take` export [(#633)](https://github.com/async-rs/async-std/pull/633)
# [1.3.0] - 2019-12-12
[API Documentation](https://docs.rs/async-std/1.3.0/async-std)
This patch introduces `Stream::delay`, more methods on `DoubleEndedStream`,
and improves compile times. `Stream::delay` is a new API that's similar to
[`task::sleep`](https://docs.rs/async-std/1.2.0/async_std/task/fn.sleep.html),
but can be passed as part of as stream, rather than as a separate block. This is
useful for examples, or when manually debugging race conditions.
## Examples
```rust
let start = Instant::now();
let mut s = stream::from_iter(vec![0u8, 1]).delay(Duration::from_millis(200));
// The first time will take more than 200ms due to delay.
s.next().await;
assert!(start.elapsed().as_millis() >= 200);
// There will be no delay after the first time.
s.next().await;
assert!(start.elapsed().as_millis() <= 210);
```
## Added
- Added `Stream::delay` as "unstable" [(#309)](https://github.com/async-rs/async-std/pull/309)
- Added `DoubleEndedStream::next_back` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562)
- Added `DoubleEndedStream::nth_back` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562)
- Added `DoubleEndedStream::rfind` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562)
- Added `DoubleEndedStream::rfold` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562)
- Added `DoubleEndedStream::try_rfold` as "unstable" [(#562)](https://github.com/async-rs/async-std/pull/562)
- `stream::Once` now implements `DoubleEndedStream` [(#562)](https://github.com/async-rs/async-std/pull/562)
- `stream::FromIter` now implements `DoubleEndedStream` [(#562)](https://github.com/async-rs/async-std/pull/562)
## Changed
- Removed our dependency on `async-macros`, speeding up compilation [(#610)](https://github.com/async-rs/async-std/pull/610)
## Fixes
- Fixed a link in the task docs [(#598)](https://github.com/async-rs/async-std/pull/598)
- Fixed the `UdpSocket::recv` example [(#603)](https://github.com/async-rs/async-std/pull/603)
- Fixed a link to `task::block_on` [(#608)](https://github.com/async-rs/async-std/pull/608)
- Fixed an incorrect API mention in `task::Builder` [(#612)](https://github.com/async-rs/async-std/pull/612)
- Fixed leftover mentions of `futures-preview` [(#595)](https://github.com/async-rs/async-std/pull/595)
- Fixed a typo in the tutorial [(#614)](https://github.com/async-rs/async-std/pull/614)
- `<TcpStream as Write>::poll_close` now closes the write half of the stream [(#618)](https://github.com/async-rs/async-std/pull/618)
# [1.2.0] - 2019-11-27 # [1.2.0] - 2019-11-27
[API Documentation](https://docs.rs/async-std/1.2.0/async-std) [API Documentation](https://docs.rs/async-std/1.2.0/async-std)
@ -553,7 +677,10 @@ task::blocking(async {
- Initial beta release - Initial beta release
[Unreleased]: https://github.com/async-rs/async-std/compare/v1.2.0...HEAD [Unreleased]: https://github.com/async-rs/async-std/compare/v1.5.0...HEAD
[1.5.0]: https://github.com/async-rs/async-std/compare/v1.4.0...v1.5.0
[1.4.0]: https://github.com/async-rs/async-std/compare/v1.3.0...v1.4.0
[1.3.0]: https://github.com/async-rs/async-std/compare/v1.2.0...v1.3.0
[1.2.0]: https://github.com/async-rs/async-std/compare/v1.1.0...v1.2.0 [1.2.0]: https://github.com/async-rs/async-std/compare/v1.1.0...v1.2.0
[1.1.0]: https://github.com/async-rs/async-std/compare/v1.0.1...v1.1.0 [1.1.0]: https://github.com/async-rs/async-std/compare/v1.0.1...v1.1.0
[1.0.1]: https://github.com/async-rs/async-std/compare/v1.0.0...v1.0.1 [1.0.1]: https://github.com/async-rs/async-std/compare/v1.0.0...v1.0.1

@ -1,6 +1,6 @@
[package] [package]
name = "async-std" name = "async-std"
version = "1.2.0" version = "1.5.0"
authors = [ authors = [
"Stjepan Glavina <stjepang@gmail.com>", "Stjepan Glavina <stjepang@gmail.com>",
"Yoshua Wuyts <yoshuawuyts@gmail.com>", "Yoshua Wuyts <yoshuawuyts@gmail.com>",
@ -35,47 +35,49 @@ default = [
"num_cpus", "num_cpus",
"pin-project-lite", "pin-project-lite",
] ]
docs = ["attributes", "unstable"] docs = ["attributes", "unstable", "default"]
unstable = ["default", "broadcaster"] unstable = ["std", "broadcaster", "futures-timer"]
attributes = ["async-attributes"] attributes = ["async-attributes"]
std = [ std = [
"async-macros", "alloc",
"crossbeam-utils", "crossbeam-utils",
"futures-core", "futures-core/std",
"futures-io", "futures-io",
"memchr", "memchr",
"once_cell", "once_cell",
"pin-project-lite",
"pin-utils", "pin-utils",
"slab", "slab",
] ]
alloc = [
"futures-core/alloc",
"pin-project-lite",
]
[dependencies] [dependencies]
async-attributes = { version = "1.1.1", optional = true } async-attributes = { version = "1.1.1", optional = true }
async-macros = { version = "2.0.0", optional = true } async-task = { version = "1.2.1", optional = true }
async-task = { version = "1.0.0", optional = true } broadcaster = { version = "1.0.0", optional = true }
broadcaster = { version = "0.2.6", optional = true, default-features = false, features = ["default-channels"] }
crossbeam-channel = { version = "0.4.0", optional = true } crossbeam-channel = { version = "0.4.0", optional = true }
crossbeam-deque = { version = "0.7.2", optional = true } crossbeam-deque = { version = "0.7.2", optional = true }
crossbeam-queue = { version = "0.2.0", optional = true } crossbeam-queue = { version = "0.2.0", optional = true }
crossbeam-utils = { version = "0.7.0", optional = true } crossbeam-utils = { version = "0.7.0", optional = true }
futures-core = { version = "0.3.1", optional = true } futures-core = { version = "0.3.1", optional = true, default-features = false }
futures-io = { version = "0.3.1", optional = true } futures-io = { version = "0.3.1", optional = true }
futures-timer = { version = "2.0.2", optional = true } futures-timer = { version = "2.0.2", optional = true }
kv-log-macro = { version = "1.0.4", optional = true } kv-log-macro = { version = "1.0.4", optional = true }
log = { version = "0.4.8", features = ["kv_unstable"], optional = true } log = { version = "0.4.8", features = ["kv_unstable"], optional = true }
memchr = { version = "2.2.1", optional = true } memchr = { version = "2.3.0", optional = true }
mio = { version = "0.6.19", optional = true } mio = { version = "0.6.19", optional = true }
mio-uds = { version = "0.6.7", optional = true } mio-uds = { version = "0.6.7", optional = true }
num_cpus = { version = "1.11.1", optional = true } num_cpus = { version = "1.11.1", optional = true }
once_cell = { version = "1.2.0", optional = true } once_cell = { version = "1.2.0", optional = true }
pin-project-lite = { version = "0.1.1", optional = true } pin-project-lite = { version = "0.1.2", optional = true }
pin-utils = { version = "0.1.0-alpha.4", optional = true } pin-utils = { version = "0.1.0-alpha.4", optional = true }
slab = { version = "0.4.2", optional = true } slab = { version = "0.4.2", optional = true }
[dev-dependencies] [dev-dependencies]
femme = "1.3.0" femme = "1.3.0"
rand = "0.7.2" rand = "0.7.3"
surf = "1.0.3" surf = "1.0.3"
tempdir = "0.3.7" tempdir = "0.3.7"
futures = "0.3.1" futures = "0.3.1"

@ -74,18 +74,15 @@ syntax.
## Examples ## Examples
All examples require the [`"attributes"` feature] to be enabled. This feature
is not enabled by default because it significantly impacts compile times. See
[`task::block_on`] for an alternative way to start executing tasks.
```rust ```rust
use async_std::task;
async fn say_hello() { async fn say_hello() {
println!("Hello, world!"); println!("Hello, world!");
} }
#[async_std::main] fn main() {
async fn main() { task::block_on(say_hello())
say_hello().await;
} }
``` ```
@ -123,6 +120,22 @@ documentation] on how to enable them.
[cargo-add]: https://github.com/killercup/cargo-edit [cargo-add]: https://github.com/killercup/cargo-edit
[features documentation]: https://docs.rs/async-std/#features [features documentation]: https://docs.rs/async-std/#features
## Ecosystem
* [async-tls](https://crates.io/crates/async-tls) — Async TLS/SSL streams using **Rustls**.
* [async-native-tls](https://crates.io/crates/async-native-tls) — **Native TLS** for Async. Native TLS for futures and async-std.
* [async-tungstenite](https://crates.io/crates/async-tungstenite) — Asynchronous **WebSockets** for async-std, tokio, gio and any std Futures runtime.
* [Tide](https://crates.io/crates/tide) — Serve the web. A modular **web framework** built around async/await.
* [SQLx](https://crates.io/crates/sqlx) — The Rust **SQL** Toolkit. SQLx is a 100% safe Rust library for Postgres and MySQL with compile-time checked queries.
* [Surf](https://crates.io/crates/surf) — Surf the web. Surf is a friendly **HTTP client** built for casual Rustaceans and veterans alike.
* [Xactor](https://crates.io/crates/xactor) — Xactor is a rust actors framework based on async-std.
## License ## License
<sup> <sup>

@ -19,8 +19,9 @@
- [Clean Shutdown](./tutorial/clean_shutdown.md) - [Clean Shutdown](./tutorial/clean_shutdown.md)
- [Handling Disconnection](./tutorial/handling_disconnection.md) - [Handling Disconnection](./tutorial/handling_disconnection.md)
- [Implementing a Client](./tutorial/implementing_a_client.md) - [Implementing a Client](./tutorial/implementing_a_client.md)
- [TODO: Async Patterns](./patterns.md) - [Async Patterns](./patterns.md)
- [TODO: Collected Small Patterns](./patterns/small-patterns.md) - [TODO: Collected Small Patterns](./patterns/small-patterns.md)
- [Production-Ready Accept Loop](./patterns/accept-loop.md)
- [Security practices](./security/index.md) - [Security practices](./security/index.md)
- [Security Disclosures and Policy](./security/policy.md) - [Security Disclosures and Policy](./security/policy.md)
- [Glossary](./glossary.md) - [Glossary](./glossary.md)

@ -0,0 +1,266 @@
# Production-Ready Accept Loop
A production-ready accept loop needs the following things:
1. Handling errors
2. Limiting the number of simultanteous connections to avoid deny-of-service
(DoS) attacks
## Handling errors
There are two kinds of errors in an accept loop:
1. Per-connection errors. The system uses them to notify that there was a
connection in the queue and it's dropped by the peer. Subsequent connections
can be already queued so next connection must be accepted immediately.
2. Resource shortages. When these are encountered it doesn't make sense to
accept the next socket immediately. But the listener stays active, so you server
should try to accept socket later.
Here is the example of a per-connection error (printed in normal and debug mode):
```
Error: Connection reset by peer (os error 104)
Error: Os { code: 104, kind: ConnectionReset, message: "Connection reset by peer" }
```
And the following is the most common example of a resource shortage error:
```
Error: Too many open files (os error 24)
Error: Os { code: 24, kind: Other, message: "Too many open files" }
```
### Testing Application
To test your application for these errors try the following (this works
on unixes only).
Lower limits and start the application:
```
$ ulimit -n 100
$ cargo run --example your_app
Compiling your_app v0.1.0 (/work)
Finished dev [unoptimized + debuginfo] target(s) in 5.47s
Running `target/debug/examples/your_app`
Server is listening on: http://127.0.0.1:1234
```
Then in another console run the [`wrk`] benchmark tool:
```
$ wrk -c 1000 http://127.0.0.1:1234
Running 10s test @ http://localhost:8080/
2 threads and 1000 connections
$ telnet localhost 1234
Trying ::1...
Connected to localhost.
```
Important is to check the following things:
1. The application doesn't crash on error (but may log errors, see below)
2. It's possible to connect to the application again once load is stopped
(few seconds after `wrk`). This is what `telnet` does in example above,
make sure it prints `Connected to <hostname>`.
3. The `Too many open files` error is logged in the appropriate log. This
requires to set "maximum number of simultaneous connections" parameter (see
below) of your application to a value greater then `100` for this example.
4. Check CPU usage of the app while doing a test. It should not occupy 100%
of a single CPU core (it's unlikely that you can exhaust CPU by 1000
connections in Rust, so this means error handling is not right).
#### Testing non-HTTP applications
If it's possible, use the appropriate benchmark tool and set the appropriate
number of connections. For example `redis-benchmark` has a `-c` parameter for
that, if you implement redis protocol.
Alternatively, can still use `wrk`, just make sure that connection is not
immediately closed. If it is, put a temporary timeout before handing
the connection to the protocol handler, like this:
```rust,edition2018
# extern crate async_std;
# use std::time::Duration;
# use async_std::{
# net::{TcpListener, ToSocketAddrs},
# prelude::*,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
#async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
# let listener = TcpListener::bind(addr).await?;
# let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
task::spawn(async {
task::sleep(Duration::from_secs(10)).await; // 1
connection_loop(stream).await;
});
}
# Ok(())
# }
```
1. Make sure the sleep coroutine is inside the spawned task, not in the loop.
[`wrk`]: https://github.com/wg/wrk
### Handling Errors Manually
Here is how basic accept loop could look like:
```rust,edition2018
# extern crate async_std;
# use std::time::Duration;
# use async_std::{
# net::{TcpListener, ToSocketAddrs},
# prelude::*,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let mut incoming = listener.incoming();
while let Some(result) = incoming.next().await {
let stream = match stream {
Err(ref e) if is_connection_error(e) => continue, // 1
Err(e) => {
eprintln!("Error: {}. Pausing for 500ms."); // 3
task::sleep(Duration::from_millis(500)).await; // 2
continue;
}
Ok(s) => s,
};
// body
}
Ok(())
}
```
1. Ignore per-connection errors.
2. Sleep and continue on resource shortage.
3. It's important to log the message, because these errors commonly mean the
misconfiguration of the system and are helpful for operations people running
the application.
Be sure to [test your application](#testing-application).
### External Crates
The crate [`async-listen`] has a helper to achieve this task:
```rust,edition2018
# extern crate async_std;
# extern crate async_listen;
# use std::time::Duration;
# use async_std::{
# net::{TcpListener, ToSocketAddrs},
# prelude::*,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
use async_listen::{ListenExt, error_hint};
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let mut incoming = listener
.incoming()
.log_warnings(log_accept_error) // 1
.handle_errors(Duration::from_millis(500));
while let Some(socket) = incoming.next().await { // 2
// body
}
Ok(())
}
fn log_accept_error(e: &io::Error) {
eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e)) // 3
}
```
1. Logs resource shortages (`async-listen` calls them warnings). If you use
`log` crate or any other in your app this should go to the log.
2. Stream yields sockets without `Result` wrapper after `handle_errors` because
all errors are already handled.
3. Together with the error we print a hint, which explains some errors for end
users. For example, it recommends increasing open file limit and gives
a link.
[`async-listen`]: https://crates.io/crates/async-listen/
Be sure to [test your application](#testing-application).
## Connections Limit
Even if you've applied everything described in
[Handling Errors](#handling-errors) section, there is still a problem.
Let's imagine you have a server that needs to open a file to process
client request. At some point, you might encounter the following situation:
1. There are as many client connection as max file descriptors allowed for
the application.
2. Listener gets `Too many open files` error so it sleeps.
3. Some client sends a request via the previously open connection.
4. Opening a file to serve request fails, because of the same
`Too many open files` error, until some other client drops a connection.
There are many more possible situations, this is just a small illustation that
limiting number of connections is very useful. Generally, it's one of the ways
to control resources used by a server and avoiding some kinds of deny of
service (DoS) attacks.
### `async-listen` crate
Limiting maximum number of simultaneous connections with [`async-listen`]
looks like the following:
```rust,edition2018
# extern crate async_std;
# extern crate async_listen;
# use std::time::Duration;
# use async_std::{
# net::{TcpListener, TcpStream, ToSocketAddrs},
# prelude::*,
# };
#
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
#
use async_listen::{ListenExt, Token, error_hint};
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;
let mut incoming = listener
.incoming()
.log_warnings(log_accept_error)
.handle_errors(Duration::from_millis(500)) // 1
.backpressure(100);
while let Some((token, socket)) = incoming.next().await { // 2
task::spawn(async move {
connection_loop(&token, stream).await; // 3
});
}
Ok(())
}
async fn connection_loop(_token: &Token, stream: TcpStream) { // 4
// ...
}
# fn log_accept_error(e: &io::Error) {
# eprintln!("Error: {}. Listener paused for 0.5s. {}", e, error_hint(e));
# }
```
1. We need to handle errors first, because [`backpressure`] helper expects
stream of `TcpStream` rather than `Result`.
2. The token yielded by a new stream is what is counted by backpressure helper.
I.e. if you drop a token, new connection can be established.
3. We give the connection loop a reference to token to bind token's lifetime to
the lifetime of the connection.
4. The token itsellf in the function can be ignored, hence `_token`
[`backpressure`]: https://docs.rs/async-listen/0.1.2/async_listen/trait.ListenExt.html#method.backpressure
Be sure to [test this behavior](#testing-application).

@ -157,7 +157,7 @@ async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> {
spawn_and_log_error(connection_loop(broker_sender.clone(), stream)); spawn_and_log_error(connection_loop(broker_sender.clone(), stream));
} }
drop(broker_sender); drop(broker_sender);
broker_handle.await?; broker_handle.await;
Ok(()) Ok(())
} }

@ -1,18 +1,16 @@
## Implementing a client ## Implementing a client
Let's now implement the client for the chat. Since the protocol is line-based, implementing a client for the chat is straightforward:
Because the protocol is line-based, the implementation is pretty straightforward:
* Lines read from stdin should be sent over the socket. * Lines read from stdin should be sent over the socket.
* Lines read from the socket should be echoed to stdout. * Lines read from the socket should be echoed to stdout.
Unlike the server, the client needs only limited concurrency, as it interacts with only a single user. Although async does not significantly affect client performance (as unlike the server, the client interacts solely with one user and only needs limited concurrency), async is still useful for managing concurrency!
For this reason, async doesn't bring a lot of performance benefits in this case.
The client has to read from stdin and the socket *simultaneously*.
Programming this with threads is cumbersome, especially when implementing a clean shutdown.
With async, the `select!` macro is all that is needed.
However, async is still useful for managing concurrency!
Specifically, the client should *simultaneously* read from stdin and from the socket.
Programming this with threads is cumbersome, especially when implementing clean shutdown.
With async, we can just use the `select!` macro.
```rust,edition2018 ```rust,edition2018
# extern crate async_std; # extern crate async_std;

@ -111,7 +111,7 @@ We can "fix" it by waiting for the task to be joined, like this:
# #
# async move |stream| { # async move |stream| {
let handle = task::spawn(connection_loop(stream)); let handle = task::spawn(connection_loop(stream));
handle.await handle.await?
# }; # };
``` ```

@ -14,8 +14,9 @@ use async_std::task;
async fn process(stream: TcpStream) -> io::Result<()> { async fn process(stream: TcpStream) -> io::Result<()> {
println!("Accepted from: {}", stream.peer_addr()?); println!("Accepted from: {}", stream.peer_addr()?);
let (reader, writer) = &mut (&stream, &stream); let mut reader = stream.clone();
io::copy(reader, writer).await?; let mut writer = stream;
io::copy(&mut reader, &mut writer).await?;
Ok(()) Ok(())
} }

@ -15,8 +15,9 @@ use async_std::task;
async fn process(stream: TcpStream) -> io::Result<()> { async fn process(stream: TcpStream) -> io::Result<()> {
println!("Accepted from: {}", stream.peer_addr()?); println!("Accepted from: {}", stream.peer_addr()?);
let (reader, writer) = &mut (&stream, &stream); let mut reader = stream.clone();
io::copy(reader, writer).await?; let mut writer = stream;
io::copy(&mut reader, &mut writer).await?;
Ok(()) Ok(())
} }

@ -1,2 +1 @@
version = "Two" version = "Two"
format_code_in_doc_comments = true

@ -158,6 +158,12 @@ impl fmt::Debug for DirEntry {
} }
} }
impl Clone for DirEntry {
fn clone(&self) -> Self {
DirEntry(self.0.clone())
}
}
cfg_unix! { cfg_unix! {
use crate::os::unix::fs::DirEntryExt; use crate::os::unix::fs::DirEntryExt;

@ -1,6 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use async_macros::MaybeDone; use crate::future::MaybeDone;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -7,7 +7,6 @@ cfg_unstable! {
mod try_join; mod try_join;
use std::time::Duration; use std::time::Duration;
use delay::DelayFuture; use delay::DelayFuture;
use flatten::FlattenFuture; use flatten::FlattenFuture;
use crate::future::IntoFuture; use crate::future::IntoFuture;
@ -17,9 +16,13 @@ cfg_unstable! {
use try_join::TryJoin; use try_join::TryJoin;
} }
cfg_unstable_default! {
use crate::future::timeout::TimeoutFuture;
}
extension_trait! { extension_trait! {
use std::pin::Pin; use core::pin::Pin;
use std::ops::{Deref, DerefMut}; use core::ops::{Deref, DerefMut};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
@ -133,7 +136,7 @@ extension_trait! {
[`Future`]: ../future/trait.Future.html [`Future`]: ../future/trait.Future.html
"#] "#]
pub trait FutureExt: std::future::Future { pub trait FutureExt: core::future::Future {
/// Returns a Future that delays execution for a specified time. /// Returns a Future that delays execution for a specified time.
/// ///
/// # Examples /// # Examples
@ -148,7 +151,7 @@ extension_trait! {
/// dbg!(a.await); /// dbg!(a.await);
/// # }) /// # })
/// ``` /// ```
#[cfg(all(feature = "default", feature = "unstable"))] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn delay(self, dur: Duration) -> impl Future<Output = Self::Output> [DelayFuture<Self>] fn delay(self, dur: Duration) -> impl Future<Output = Self::Output> [DelayFuture<Self>]
where where
@ -355,6 +358,40 @@ extension_trait! {
{ {
TryJoin::new(self, other) TryJoin::new(self, other)
} }
#[doc = r#"
Waits for both the future and a timeout, if the timeout completes before
the future, it returns an TimeoutError.
# Example
```
# async_std::task::block_on(async {
#
use std::time::Duration;
use async_std::prelude::*;
use async_std::future;
let fut = future::ready(0);
let dur = Duration::from_millis(100);
let res = fut.timeout(dur).await;
assert!(res.is_ok());
let fut = future::pending::<()>();
let dur = Duration::from_millis(100);
let res = fut.timeout(dur).await;
assert!(res.is_err())
#
# });
```
"#]
#[cfg(any(all(feature = "default", feature = "unstable"), feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn timeout(self, dur: Duration) -> impl Future<Output = Self::Output> [TimeoutFuture<Self>]
where Self: Sized
{
TimeoutFuture::new(self, dur)
}
} }
impl<F: Future + Unpin + ?Sized> Future for Box<F> { impl<F: Future + Unpin + ?Sized> Future for Box<F> {

@ -1,7 +1,7 @@
use std::future::Future; use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use async_macros::MaybeDone; use crate::future::MaybeDone;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,6 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use async_macros::MaybeDone; use crate::future::MaybeDone;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,6 +1,6 @@
use std::pin::Pin; use std::pin::Pin;
use async_macros::MaybeDone; use crate::future::MaybeDone;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -0,0 +1,79 @@
//! A type that wraps a future to keep track of its completion status.
//!
//! This implementation was taken from the original `macro_rules` `join/try_join`
//! macros in the `futures-preview` crate.
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures_core::ready;
/// A future that may have completed.
#[derive(Debug)]
pub(crate) enum MaybeDone<Fut: Future> {
/// A not-yet-completed future
Future(Fut),
/// The output of the completed future
Done(Fut::Output),
/// The empty variant after the result of a [`MaybeDone`] has been
/// taken using the [`take`](MaybeDone::take) method.
Gone,
}
impl<Fut: Future> MaybeDone<Fut> {
/// Create a new instance of `MaybeDone`.
pub(crate) fn new(future: Fut) -> MaybeDone<Fut> {
Self::Future(future)
}
/// Returns an [`Option`] containing a reference to the output of the future.
/// The output of this method will be [`Some`] if and only if the inner
/// future has been completed and [`take`](MaybeDone::take)
/// has not yet been called.
#[inline]
pub(crate) fn output(self: Pin<&Self>) -> Option<&Fut::Output> {
let this = self.get_ref();
match this {
MaybeDone::Done(res) => Some(res),
_ => None,
}
}
/// Attempt to take the output of a `MaybeDone` without driving it
/// towards completion.
#[inline]
pub(crate) fn take(self: Pin<&mut Self>) -> Option<Fut::Output> {
unsafe {
let this = self.get_unchecked_mut();
match this {
MaybeDone::Done(_) => {}
MaybeDone::Future(_) | MaybeDone::Gone => return None,
};
if let MaybeDone::Done(output) = mem::replace(this, MaybeDone::Gone) {
Some(output)
} else {
unreachable!()
}
}
}
}
impl<Fut: Future> Future for MaybeDone<Fut> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let res = unsafe {
match Pin::as_mut(&mut self).get_unchecked_mut() {
MaybeDone::Future(a) => ready!(Pin::new_unchecked(a).poll(cx)),
MaybeDone::Done(_) => return Poll::Ready(()),
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
}
};
self.set(MaybeDone::Done(res));
Poll::Ready(())
}
}

@ -46,15 +46,20 @@
//! [`Future::race`]: trait.Future.html#method.race //! [`Future::race`]: trait.Future.html#method.race
//! [`Future::try_race`]: trait.Future.html#method.try_race //! [`Future::try_race`]: trait.Future.html#method.try_race
cfg_alloc! {
pub use future::Future; pub use future::Future;
pub(crate) mod future;
}
cfg_std! {
pub use pending::pending; pub use pending::pending;
pub use poll_fn::poll_fn; pub use poll_fn::poll_fn;
pub use ready::ready; pub use ready::ready;
pub(crate) mod future;
mod pending; mod pending;
mod poll_fn; mod poll_fn;
mod ready; mod ready;
}
cfg_default! { cfg_default! {
pub use timeout::{timeout, TimeoutError}; pub use timeout::{timeout, TimeoutError};
@ -63,5 +68,7 @@ cfg_default! {
cfg_unstable! { cfg_unstable! {
pub use into_future::IntoFuture; pub use into_future::IntoFuture;
pub(crate) use maybe_done::MaybeDone;
mod into_future; mod into_future;
mod maybe_done;
} }

@ -42,7 +42,7 @@ where
pin_project! { pin_project! {
/// A future that times out after a duration of time. /// A future that times out after a duration of time.
struct TimeoutFuture<F> { pub struct TimeoutFuture<F> {
#[pin] #[pin]
future: F, future: F,
#[pin] #[pin]
@ -50,6 +50,13 @@ pin_project! {
} }
} }
impl<F> TimeoutFuture<F> {
#[allow(dead_code)]
pub(super) fn new(future: F, dur: Duration) -> TimeoutFuture<F> {
TimeoutFuture { future: future, delay: Delay::new(dur) }
}
}
impl<F: Future> Future for TimeoutFuture<F> { impl<F: Future> Future for TimeoutFuture<F> {
type Output = Result<F::Output, TimeoutError>; type Output = Result<F::Output, TimeoutError>;

@ -105,8 +105,8 @@
//! //!
//! ```no_run //! ```no_run
//! use async_std::fs::File; //! use async_std::fs::File;
//! use async_std::io::BufWriter;
//! use async_std::io::prelude::*; //! use async_std::io::prelude::*;
//! use async_std::io::BufWriter;
//! //!
//! # fn main() -> std::io::Result<()> { async_std::task::block_on(async { //! # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
//! # //! #
@ -116,8 +116,8 @@
//! //!
//! // write a byte to the buffer //! // write a byte to the buffer
//! writer.write(&[42]).await?; //! writer.write(&[42]).await?;
//!
//! } // the buffer is flushed once writer goes out of scope //! } // the buffer is flushed once writer goes out of scope
//! //
//! # //! #
//! # Ok(()) }) } //! # Ok(()) }) }
//! ``` //! ```
@ -275,13 +275,13 @@ cfg_std! {
#[doc(inline)] #[doc(inline)]
pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom}; pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
pub use buf_read::{BufRead, Lines}; pub use buf_read::{BufRead, Lines, Split};
pub use buf_reader::BufReader; pub use buf_reader::BufReader;
pub use buf_writer::{BufWriter, IntoInnerError}; pub use buf_writer::{BufWriter, IntoInnerError};
pub use copy::copy; pub use copy::copy;
pub use cursor::Cursor; pub use cursor::Cursor;
pub use empty::{empty, Empty}; pub use empty::{empty, Empty};
pub use read::Read; pub use read::*;
pub use repeat::{repeat, Repeat}; pub use repeat::{repeat, Repeat};
pub use seek::Seek; pub use seek::Seek;
pub use sink::{sink, Sink}; pub use sink::{sink, Sink};
@ -321,7 +321,7 @@ cfg_default! {
mod stdout; mod stdout;
} }
cfg_unstable! { cfg_unstable_default! {
pub use stderr::StderrLock; pub use stderr::StderrLock;
pub use stdin::StdinLock; pub use stdin::StdinLock;
pub use stdout::StdoutLock; pub use stdout::StdoutLock;

@ -32,7 +32,7 @@ impl<T: Read + Unpin> Stream for Bytes<T> {
} }
} }
#[cfg(test)] #[cfg(all(test, default))]
mod tests { mod tests {
use crate::io; use crate::io;
use crate::prelude::*; use crate::prelude::*;

@ -165,7 +165,7 @@ impl<T: BufRead, U: BufRead> BufRead for Chain<T, U> {
} }
} }
#[cfg(test)] #[cfg(all(test, default))]
mod tests { mod tests {
use crate::io; use crate::io;
use crate::prelude::*; use crate::prelude::*;

@ -17,6 +17,10 @@ use std::mem;
use crate::io::IoSliceMut; use crate::io::IoSliceMut;
pub use take::Take;
pub use bytes::Bytes;
pub use chain::Chain;
extension_trait! { extension_trait! {
use std::pin::Pin; use std::pin::Pin;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
@ -301,11 +305,11 @@ extension_trait! {
# Ok(()) }) } # Ok(()) }) }
``` ```
"#] "#]
fn take(self, limit: u64) -> take::Take<Self> fn take(self, limit: u64) -> Take<Self>
where where
Self: Sized, Self: Sized,
{ {
take::Take { inner: self, limit } Take { inner: self, limit }
} }
#[doc = r#" #[doc = r#"
@ -377,8 +381,8 @@ extension_trait! {
# Ok(()) }) } # Ok(()) }) }
``` ```
"#] "#]
fn bytes(self) -> bytes::Bytes<Self> where Self: Sized { fn bytes(self) -> Bytes<Self> where Self: Sized {
bytes::Bytes { inner: self } Bytes { inner: self }
} }
#[doc = r#" #[doc = r#"
@ -413,8 +417,8 @@ extension_trait! {
# Ok(()) }) } # Ok(()) }) }
``` ```
"#] "#]
fn chain<R: Read>(self, next: R) -> chain::Chain<Self, R> where Self: Sized { fn chain<R: Read>(self, next: R) -> Chain<Self, R> where Self: Sized {
chain::Chain { first: self, second: next, done_first: false } Chain { first: self, second: next, done_first: false }
} }
} }

@ -1,5 +1,7 @@
use crate::utils::Context; use crate::utils::Context;
use std::{error::Error as StdError, fmt, io};
/// Wrap `std::io::Error` with additional message /// Wrap `std::io::Error` with additional message
/// ///
/// Keeps the original error kind and stores the original I/O error as `source`. /// Keeps the original error kind and stores the original I/O error as `source`.
@ -9,8 +11,6 @@ impl<T> Context for Result<T, std::io::Error> {
} }
} }
use std::{error::Error as StdError, fmt, io};
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct VerboseError { pub(crate) struct VerboseError {
source: io::Error, source: io::Error,
@ -36,10 +36,6 @@ impl fmt::Display for VerboseError {
} }
impl StdError for VerboseError { impl StdError for VerboseError {
fn description(&self) -> &str {
self.source.description()
}
fn source(&self) -> Option<&(dyn StdError + 'static)> { fn source(&self) -> Option<&(dyn StdError + 'static)> {
Some(&self.source) Some(&self.source)
} }

@ -6,6 +6,7 @@ use crate::task::{Context, Poll};
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
#[must_use]
pub struct WriteFmtFuture<'a, T: Unpin + ?Sized> { pub struct WriteFmtFuture<'a, T: Unpin + ?Sized> {
pub(crate) writer: &'a mut T, pub(crate) writer: &'a mut T,
pub(crate) res: Option<io::Result<Vec<u8>>>, pub(crate) res: Option<io::Result<Vec<u8>>>,

@ -47,7 +47,7 @@
//! encouraged to read it. The `async-std` source is generally high //! encouraged to read it. The `async-std` source is generally high
//! quality and a peek behind the curtains is often enlightening. //! quality and a peek behind the curtains is often enlightening.
//! //!
//! Modules in this crate are organized in the same way as in `async-std`, except blocking //! Modules in this crate are organized in the same way as in `std`, except blocking
//! functions have been replaced with async functions and threads have been replaced with //! functions have been replaced with async functions and threads have been replaced with
//! lightweight tasks. //! lightweight tasks.
//! //!
@ -152,6 +152,8 @@
//! Await two futures concurrently, and return a tuple of their output: //! Await two futures concurrently, and return a tuple of their output:
//! //!
//! ``` //! ```
//! use async_std::prelude::*;
//!
//! #[async_std::main] //! #[async_std::main]
//! async fn main() { //! async fn main() {
//! let a = async { 1u8 }; //! let a = async { 1u8 };
@ -167,7 +169,7 @@
//! //!
//! #[async_std::main] //! #[async_std::main]
//! async fn main() -> std::io::Result<()> { //! async fn main() -> std::io::Result<()> {
//! let mut socket = UdpSocket::bind("127.0.0.1:8080")?; //! let socket = UdpSocket::bind("127.0.0.1:8080").await?;
//! println!("Listening on {}", socket.local_addr()?); //! println!("Listening on {}", socket.local_addr()?);
//! //!
//! let mut buf = vec![0u8; 1024]; //! let mut buf = vec![0u8; 1024];
@ -218,7 +220,18 @@
//! default-features = false //! default-features = false
//! features = ["std"] //! features = ["std"]
//! ``` //! ```
//!
//! And to use async-std on `no_std` targets that only support `alloc` only
//! enable the `alloc` Cargo feature:
//!
//! ```toml
//! [dependencies.async-std]
//! version = "1.5.0"
//! default-features = false
//! features = ["alloc"]
//! ```
#![cfg_attr(not(feature = "std"), no_std)]
#![cfg_attr(feature = "docs", feature(doc_cfg))] #![cfg_attr(feature = "docs", feature(doc_cfg))]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![allow(clippy::mutex_atomic, clippy::module_inception)] #![allow(clippy::mutex_atomic, clippy::module_inception)]
@ -227,6 +240,8 @@
#![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")]
#![recursion_limit = "2048"] #![recursion_limit = "2048"]
extern crate alloc;
#[macro_use] #[macro_use]
mod utils; mod utils;
@ -238,14 +253,17 @@ pub use async_attributes::{main, test};
#[cfg(feature = "std")] #[cfg(feature = "std")]
mod macros; mod macros;
cfg_std! { cfg_alloc! {
pub mod task;
pub mod future; pub mod future;
pub mod stream;
}
cfg_std! {
pub mod io; pub mod io;
pub mod os; pub mod os;
pub mod prelude; pub mod prelude;
pub mod stream;
pub mod sync; pub mod sync;
pub mod task;
} }
cfg_default! { cfg_default! {
@ -265,7 +283,9 @@ cfg_unstable! {
mod option; mod option;
mod string; mod string;
mod collections; mod collections;
}
cfg_unstable_default! {
#[doc(inline)] #[doc(inline)]
pub use std::{write, writeln}; pub use std::{write, writeln};
} }

@ -1,6 +1,7 @@
use std::future::Future; use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use crate::future; use crate::future;
use crate::io; use crate::io;
@ -75,9 +76,7 @@ impl TcpListener {
/// [`local_addr`]: #method.local_addr /// [`local_addr`]: #method.local_addr
pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> { pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
let mut last_err = None; let mut last_err = None;
let addrs = addrs let addrs = addrs.to_socket_addrs().await?;
.to_socket_addrs()
.await?;
for addr in addrs { for addr in addrs {
match mio::net::TcpListener::bind(&addr) { match mio::net::TcpListener::bind(&addr) {
@ -121,7 +120,7 @@ impl TcpListener {
let mio_stream = mio::net::TcpStream::from_stream(io)?; let mio_stream = mio::net::TcpStream::from_stream(io)?;
let stream = TcpStream { let stream = TcpStream {
watcher: Watcher::new(mio_stream), watcher: Arc::new(Watcher::new(mio_stream)),
}; };
Ok((stream, addr)) Ok((stream, addr))
} }

@ -1,13 +1,13 @@
use std::io::{IoSlice, IoSliceMut, Read as _, Write as _}; use std::io::{IoSlice, IoSliceMut, Read as _, Write as _};
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use crate::future; use crate::future;
use crate::io::{self, Read, Write}; use crate::io::{self, Read, Write};
use crate::rt::Watcher; use crate::rt::Watcher;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
use crate::task::{spawn_blocking, Context, Poll}; use crate::task::{Context, Poll};
use crate::utils::Context as _;
/// A TCP stream between a local and a remote socket. /// A TCP stream between a local and a remote socket.
/// ///
@ -45,9 +45,9 @@ use crate::utils::Context as _;
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct TcpStream { pub struct TcpStream {
pub(super) watcher: Watcher<mio::net::TcpStream>, pub(super) watcher: Arc<Watcher<mio::net::TcpStream>>,
} }
impl TcpStream { impl TcpStream {
@ -72,25 +72,31 @@ impl TcpStream {
/// ``` /// ```
pub async fn connect<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpStream> { pub async fn connect<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpStream> {
let mut last_err = None; let mut last_err = None;
let addrs = addrs let addrs = addrs.to_socket_addrs().await?;
.to_socket_addrs()
.await?;
for addr in addrs { for addr in addrs {
let res = spawn_blocking(move || { // mio's TcpStream::connect is non-blocking and may just be in progress
let std_stream = std::net::TcpStream::connect(addr) // when it returns with `Ok`. We therefore wait for write readiness to
.context(|| format!("could not connect to {}", addr))?; // be sure the connection has either been established or there was an
let mio_stream = mio::net::TcpStream::from_stream(std_stream) // error which we check for afterwards.
.context(|| format!("could not open async connection to {}", addr))?; let watcher = match mio::net::TcpStream::connect(&addr) {
Ok(TcpStream { Ok(s) => Watcher::new(s),
watcher: Watcher::new(mio_stream), Err(e) => {
}) last_err = Some(e);
}) continue;
.await; }
};
match res { future::poll_fn(|cx| watcher.poll_write_ready(cx)).await;
Ok(stream) => return Ok(stream),
Err(err) => last_err = Some(err), match watcher.get_ref().take_error() {
Ok(None) => {
return Ok(TcpStream {
watcher: Arc::new(watcher),
});
}
Ok(Some(e)) => last_err = Some(e),
Err(e) => last_err = Some(e),
} }
} }
@ -356,6 +362,7 @@ impl Write for &TcpStream {
} }
fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
self.shutdown(std::net::Shutdown::Write)?;
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} }
} }
@ -365,7 +372,7 @@ impl From<std::net::TcpStream> for TcpStream {
fn from(stream: std::net::TcpStream) -> TcpStream { fn from(stream: std::net::TcpStream) -> TcpStream {
let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap(); let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap();
TcpStream { TcpStream {
watcher: Watcher::new(mio_stream), watcher: Arc::new(Watcher::new(mio_stream)),
} }
} }
} }
@ -387,7 +394,10 @@ cfg_unix! {
impl IntoRawFd for TcpStream { impl IntoRawFd for TcpStream {
fn into_raw_fd(self) -> RawFd { fn into_raw_fd(self) -> RawFd {
self.watcher.into_inner().into_raw_fd() // TODO(stjepang): This does not mean `RawFd` is now the sole owner of the file
// descriptor because it's possible that there are other clones of this `TcpStream`
// using it at the same time. We should probably document that behavior.
self.as_raw_fd()
} }
} }
} }

@ -244,9 +244,12 @@ impl UdpSocket {
})) }))
} }
/// Sends data on the socket to the given address. /// Sends data on the socket to the remote address to which it is connected.
/// ///
/// On success, returns the number of bytes written. /// The [`connect`] method will connect this socket to a remote address.
/// This method will fail if the socket is not connected.
///
/// [`connect`]: #method.connect
/// ///
/// # Examples /// # Examples
/// ///
@ -255,18 +258,11 @@ impl UdpSocket {
/// # /// #
/// use async_std::net::UdpSocket; /// use async_std::net::UdpSocket;
/// ///
/// const THE_MERCHANT_OF_VENICE: &[u8] = b" /// let socket = UdpSocket::bind("127.0.0.1:34254").await?;
/// If you prick us, do we not bleed? /// socket.connect("127.0.0.1:8080").await?;
/// If you tickle us, do we not laugh? /// let bytes = socket.send(b"Hi there!").await?;
/// If you poison us, do we not die?
/// And if you wrong us, shall we not revenge?
/// ";
///
/// let socket = UdpSocket::bind("127.0.0.1:0").await?;
/// ///
/// let addr = "127.0.0.1:7878"; /// println!("Sent {} bytes", bytes);
/// let sent = socket.send_to(THE_MERCHANT_OF_VENICE, &addr).await?;
/// println!("Sent {} bytes to {}", sent, addr);
/// # /// #
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
@ -288,7 +284,7 @@ impl UdpSocket {
/// Receives data from the socket. /// Receives data from the socket.
/// ///
/// On success, returns the number of bytes read and the origin. /// On success, returns the number of bytes read.
/// ///
/// # Examples /// # Examples
/// ///

@ -2,6 +2,7 @@ use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{FromStream, IntoStream}; use crate::stream::{FromStream, IntoStream};
use std::convert::identity;
impl<T, V> FromStream<Option<T>> for Option<V> impl<T, V> FromStream<Option<T>> for Option<V>
where where
@ -17,24 +18,22 @@ where
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_error = false; let mut found_none = false;
let out: V = stream let out: V = stream
.scan((), |_, elem| { .take_while(|elem| {
match elem { elem.is_some() || {
Some(elem) => Some(elem), found_none = true;
None => { // Stop processing the stream on `None`
found_error = true; false
// Stop processing the stream on error
None
}
} }
}) })
.filter_map(identity)
.collect() .collect()
.await; .await;
if found_error { None } else { Some(out) } if found_none { None } else { Some(out) }
}) })
} }
} }

@ -1,7 +1,8 @@
use std::pin::Pin; use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Stream, Product}; use crate::stream::{Product, Stream};
use std::convert::identity;
impl<T, U> Product<Option<U>> for Option<T> impl<T, U> Product<Option<U>> for Option<T>
where where
@ -36,29 +37,27 @@ where
``` ```
"#] "#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>> fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a where
S: Stream<Item = Option<U>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_none = false; let mut found_none = false;
let out = <T as Product<U>>::product(stream let out = <T as Product<U>>::product(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Some(elem) => Some(elem), elem.is_some() || {
None => {
found_none = true; found_none = true;
// Stop processing the stream on error // Stop processing the stream on `None`
None false
} }
} })
})).await; .filter_map(identity),
)
.await;
if found_none { if found_none { None } else { Some(out) }
None
} else {
Some(out)
}
}) })
} }
} }

@ -2,6 +2,7 @@ use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Stream, Sum}; use crate::stream::{Stream, Sum};
use std::convert::identity;
impl<T, U> Sum<Option<U>> for Option<T> impl<T, U> Sum<Option<U>> for Option<T>
where where
@ -31,29 +32,27 @@ where
``` ```
"#] "#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>> fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a where
S: Stream<Item = Option<U>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut found_none = false; let mut found_none = false;
let out = <T as Sum<U>>::sum(stream let out = <T as Sum<U>>::sum(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Some(elem) => Some(elem), elem.is_some() || {
None => {
found_none = true; found_none = true;
// Stop processing the stream on error // Stop processing the stream on `None`
None false
}
} }
})).await; })
.filter_map(identity),
)
.await;
if found_none { if found_none { None } else { Some(out) }
None
} else {
Some(out)
}
}) })
} }
} }

@ -10,6 +10,23 @@ where
/// Takes each element in the stream: if it is an `Err`, no further /// Takes each element in the stream: if it is an `Err`, no further
/// elements are taken, and the `Err` is returned. Should no `Err` /// elements are taken, and the `Err` is returned. Should no `Err`
/// occur, a container with the values of each `Result` is returned. /// occur, a container with the values of each `Result` is returned.
///
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let v = stream::from_iter(vec![1, 2]);
/// let res: Result<Vec<u32>, &'static str> = v.map(|x: u32|
/// x.checked_add(1).ok_or("Overflow!")
/// ).collect().await;
/// assert_eq!(res, Ok(vec![2, 3]));
/// #
/// # }) }
/// ```
#[inline] #[inline]
fn from_stream<'a, S: IntoStream<Item = Result<T, E>> + 'a>( fn from_stream<'a, S: IntoStream<Item = Result<T, E>> + 'a>(
stream: S, stream: S,
@ -17,26 +34,34 @@ where
let stream = stream.into_stream(); let stream = stream.into_stream();
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut is_error = false;
let mut found_error = None; let mut found_error = None;
let out: V = stream let out: V = stream
.scan((), |_, elem| { .take_while(|elem| {
match elem { // Stop processing the stream on `Err`
Ok(elem) => Some(elem), !is_error
&& (elem.is_ok() || {
is_error = true;
// Capture first `Err`
true
})
})
.filter_map(|elem| match elem {
Ok(value) => Some(value),
Err(err) => { Err(err) => {
found_error = Some(err); found_error = Some(err);
// Stop processing the stream on error
None None
} }
}
}) })
.collect() .collect()
.await; .await;
match found_error { if is_error {
Some(err) => Err(err), Err(found_error.unwrap())
None => Ok(out), } else {
Ok(out)
} }
}) })
} }

@ -1,7 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use crate::prelude::*; use crate::prelude::*;
use crate::stream::{Stream, Product}; use crate::stream::{Product, Stream};
impl<T, U, E> Product<Result<U, E>> for Result<T, E> impl<T, U, E> Product<Result<U, E>> for Result<T, E>
where where
@ -36,26 +36,39 @@ where
``` ```
"#] "#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>> fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a where
S: Stream<Item = Result<U, E>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut is_error = false;
let mut found_error = None; let mut found_error = None;
let out = <T as Product<U>>::product(stream let out = <T as Product<U>>::product(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Ok(elem) => Some(elem), // Stop processing the stream on `Err`
!is_error
&& (elem.is_ok() || {
is_error = true;
// Capture first `Err`
true
})
})
.filter_map(|elem| match elem {
Ok(value) => Some(value),
Err(err) => { Err(err) => {
found_error = Some(err); found_error = Some(err);
// Stop processing the stream on error
None None
} }
} }),
})).await; )
match found_error { .await;
Some(err) => Err(err),
None => Ok(out) if is_error {
Err(found_error.unwrap())
} else {
Ok(out)
} }
}) })
} }

@ -36,26 +36,39 @@ where
``` ```
"#] "#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>> fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a where
S: Stream<Item = Result<U, E>> + 'a,
{ {
Box::pin(async move { Box::pin(async move {
// Using `scan` here because it is able to stop the stream early // Using `take_while` here because it is able to stop the stream early
// if a failure occurs // if a failure occurs
let mut is_error = false;
let mut found_error = None; let mut found_error = None;
let out = <T as Sum<U>>::sum(stream let out = <T as Sum<U>>::sum(
.scan((), |_, elem| { stream
match elem { .take_while(|elem| {
Ok(elem) => Some(elem), // Stop processing the stream on `Err`
!is_error
&& (elem.is_ok() || {
is_error = true;
// Capture first `Err`
true
})
})
.filter_map(|elem| match elem {
Ok(value) => Some(value),
Err(err) => { Err(err) => {
found_error = Some(err); found_error = Some(err);
// Stop processing the stream on error
None None
} }
} }),
})).await; )
match found_error { .await;
Some(err) => Err(err),
None => Ok(out) if is_error {
Err(found_error.unwrap())
} else {
Ok(out)
} }
}) })
} }

@ -16,10 +16,10 @@ struct Entry {
token: mio::Token, token: mio::Token,
/// Tasks that are blocked on reading from this I/O handle. /// Tasks that are blocked on reading from this I/O handle.
readers: Mutex<Vec<Waker>>, readers: Mutex<Readers>,
/// Tasks that are blocked on writing to this I/O handle. /// Tasks that are blocked on writing to this I/O handle.
writers: Mutex<Vec<Waker>>, writers: Mutex<Writers>,
} }
/// The state of a networking driver. /// The state of a networking driver.
@ -40,6 +40,26 @@ pub struct Reactor {
notify_token: mio::Token, notify_token: mio::Token,
} }
/// The set of `Waker`s interested in read readiness.
#[derive(Debug)]
struct Readers {
/// Flag indicating read readiness.
/// (cf. `Watcher::poll_read_ready`)
ready: bool,
/// The `Waker`s blocked on reading.
wakers: Vec<Waker>,
}
/// The set of `Waker`s interested in write readiness.
#[derive(Debug)]
struct Writers {
/// Flag indicating write readiness.
/// (cf. `Watcher::poll_write_ready`)
ready: bool,
/// The `Waker`s blocked on writing.
wakers: Vec<Waker>,
}
impl Reactor { impl Reactor {
/// Creates a new reactor for polling I/O events. /// Creates a new reactor for polling I/O events.
pub fn new() -> io::Result<Reactor> { pub fn new() -> io::Result<Reactor> {
@ -72,8 +92,14 @@ impl Reactor {
// Allocate an entry and insert it into the slab. // Allocate an entry and insert it into the slab.
let entry = Arc::new(Entry { let entry = Arc::new(Entry {
token, token,
readers: Mutex::new(Vec::new()), readers: Mutex::new(Readers {
writers: Mutex::new(Vec::new()), ready: false,
wakers: Vec::new(),
}),
writers: Mutex::new(Writers {
ready: false,
wakers: Vec::new(),
}),
}); });
vacant.insert(entry.clone()); vacant.insert(entry.clone());
@ -131,7 +157,9 @@ impl Reactor {
// Wake up reader tasks blocked on this I/O handle. // Wake up reader tasks blocked on this I/O handle.
let reader_interests = mio::Ready::all() - mio::Ready::writable(); let reader_interests = mio::Ready::all() - mio::Ready::writable();
if !(readiness & reader_interests).is_empty() { if !(readiness & reader_interests).is_empty() {
for w in entry.readers.lock().unwrap().drain(..) { let mut readers = entry.readers.lock().unwrap();
readers.ready = true;
for w in readers.wakers.drain(..) {
w.wake(); w.wake();
progress = true; progress = true;
} }
@ -140,7 +168,9 @@ impl Reactor {
// Wake up writer tasks blocked on this I/O handle. // Wake up writer tasks blocked on this I/O handle.
let writer_interests = mio::Ready::all() - mio::Ready::readable(); let writer_interests = mio::Ready::all() - mio::Ready::readable();
if !(readiness & writer_interests).is_empty() { if !(readiness & writer_interests).is_empty() {
for w in entry.writers.lock().unwrap().drain(..) { let mut writers = entry.writers.lock().unwrap();
writers.ready = true;
for w in writers.wakers.drain(..) {
w.wake(); w.wake();
progress = true; progress = true;
} }
@ -200,7 +230,7 @@ impl<T: Evented> Watcher<T> {
} }
// Lock the waker list. // Lock the waker list.
let mut list = self.entry.readers.lock().unwrap(); let mut readers = self.entry.readers.lock().unwrap();
// Try running the operation again. // Try running the operation again.
match f(self.source.as_ref().unwrap()) { match f(self.source.as_ref().unwrap()) {
@ -209,8 +239,9 @@ impl<T: Evented> Watcher<T> {
} }
// Register the task if it isn't registered already. // Register the task if it isn't registered already.
if list.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone()); if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
readers.wakers.push(cx.waker().clone());
} }
Poll::Pending Poll::Pending
@ -235,7 +266,7 @@ impl<T: Evented> Watcher<T> {
} }
// Lock the waker list. // Lock the waker list.
let mut list = self.entry.writers.lock().unwrap(); let mut writers = self.entry.writers.lock().unwrap();
// Try running the operation again. // Try running the operation again.
match f(self.source.as_ref().unwrap()) { match f(self.source.as_ref().unwrap()) {
@ -244,10 +275,47 @@ impl<T: Evented> Watcher<T> {
} }
// Register the task if it isn't registered already. // Register the task if it isn't registered already.
if list.iter().all(|w| !w.will_wake(cx.waker())) { if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone()); writers.wakers.push(cx.waker().clone());
}
Poll::Pending
}
/// Polls the inner I/O source until a non-blocking read can be performed.
///
/// If non-blocking reads are currently not possible, the `Waker`
/// will be saved and notified when it can read non-blocking
/// again.
#[allow(dead_code)]
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
// Lock the waker list.
let mut readers = self.entry.readers.lock().unwrap();
if readers.ready {
return Poll::Ready(());
}
// Register the task if it isn't registered already.
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
readers.wakers.push(cx.waker().clone());
}
Poll::Pending
} }
/// Polls the inner I/O source until a non-blocking write can be performed.
///
/// If non-blocking writes are currently not possible, the `Waker`
/// will be saved and notified when it can write non-blocking
/// again.
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
// Lock the waker list.
let mut writers = self.entry.writers.lock().unwrap();
if writers.ready {
return Poll::Ready(());
}
// Register the task if it isn't registered already.
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
writers.wakers.push(cx.waker().clone());
}
Poll::Pending Poll::Pending
} }

@ -1,24 +0,0 @@
use crate::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
/// A stream able to yield elements from both ends.
///
/// Something that implements `DoubleEndedStream` has one extra capability
/// over something that implements [`Stream`]: the ability to also take
/// `Item`s from the back, as well as the front.
///
/// [`Stream`]: trait.Stream.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait DoubleEndedStream: Stream {
/// Removes and returns an element from the end of the stream.
///
/// Returns `None` when there are no more elements.
///
/// The [trait-level] docs contain more details.
///
/// [trait-level]: trait.DoubleEndedStream.html
fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

@ -0,0 +1,246 @@
use crate::stream::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
mod next_back;
mod nth_back;
mod rfind;
mod rfold;
mod try_rfold;
use next_back::NextBackFuture;
use nth_back::NthBackFuture;
use rfind::RFindFuture;
use rfold::RFoldFuture;
use try_rfold::TryRFoldFuture;
/// A stream able to yield elements from both ends.
///
/// Something that implements `DoubleEndedStream` has one extra capability
/// over something that implements [`Stream`]: the ability to also take
/// `Item`s from the back, as well as the front.
///
/// [`Stream`]: trait.Stream.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait DoubleEndedStream: Stream {
#[doc = r#"
Attempts to receive the next item from the back of the stream.
There are several possible return values:
* `Poll::Pending` means this stream's next_back value is not ready yet.
* `Poll::Ready(None)` means this stream has been exhausted.
* `Poll::Ready(Some(item))` means `item` was received out of the stream.
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use std::pin::Pin;
use async_std::prelude::*;
use async_std::stream;
use async_std::task::{Context, Poll};
fn increment(
s: impl DoubleEndedStream<Item = i32> + Unpin,
) -> impl DoubleEndedStream<Item = i32> + Unpin {
struct Increment<S>(S);
impl<S: DoubleEndedStream<Item = i32> + Unpin> Stream for Increment<S> {
type Item = S::Item;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
}
}
}
impl<S: DoubleEndedStream<Item = i32> + Unpin> DoubleEndedStream for Increment<S> {
fn poll_next_back(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next_back(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(item)) => Poll::Ready(Some(item + 1)),
}
}
}
Increment(s)
}
let mut s = increment(stream::once(7));
assert_eq!(s.next_back().await, Some(8));
assert_eq!(s.next_back().await, None);
#
# }) }
```
"#]
fn poll_next_back(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
#[doc = r#"
Advances the stream and returns the next value.
Returns [`None`] when iteration is finished. Individual stream implementations may
choose to resume iteration, and so calling `next()` again may or may not eventually
start returning more values.
[`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
# Examples
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let mut s = stream::from_iter(vec![7u8]);
assert_eq!(s.next_back().await, Some(7));
assert_eq!(s.next_back().await, None);
#
# }) }
```
"#]
fn next_back(&mut self) -> NextBackFuture<'_, Self>
where
Self: Unpin,
{
NextBackFuture { stream: self }
}
#[doc = r#"
Returns the nth element from the back of the stream.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let mut s = stream::from_iter(vec![1u8, 2, 3, 4, 5]);
let second = s.nth_back(1).await;
assert_eq!(second, Some(4));
#
# }) }
```
"#]
fn nth_back(&mut self, n: usize) -> NthBackFuture<'_, Self>
where
Self: Unpin + Sized,
{
NthBackFuture::new(self, n)
}
#[doc = r#"
Returns the the frist element from the right that matches the predicate.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let mut s = stream::from_iter(vec![1u8, 2, 3, 4, 5]);
let second = s.rfind(|v| v % 2 == 0).await;
assert_eq!(second, Some(4));
#
# }) }
```
"#]
fn rfind<P>(&mut self, p: P) -> RFindFuture<'_, Self, P>
where
Self: Unpin + Sized,
P: FnMut(&Self::Item) -> bool,
{
RFindFuture::new(self, p)
}
#[doc = r#"
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let s = stream::from_iter(vec![1u8, 2, 3, 4, 5]);
let second = s.rfold(0, |acc, v| v + acc).await;
assert_eq!(second, 15);
#
# }) }
```
"#]
fn rfold<B, F>(self, accum: B, f: F) -> RFoldFuture<Self, F, B>
where
Self: Sized,
F: FnMut(B, Self::Item) -> B,
{
RFoldFuture::new(self, accum, f)
}
#[doc = r#"
A combinator that applies a function as long as it returns successfully, producing a single, final value.
Immediately returns the error when the function returns unsuccessfully.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;
let s = stream::from_iter(vec![1u8, 2, 3, 4, 5]);
let sum = s.try_rfold(0, |acc, v| {
if (acc+v) % 2 == 1 {
Ok(v+3)
} else {
Err("fail")
}
}).await;
assert_eq!(sum, Err("fail"));
#
# }) }
```
"#]
fn try_rfold<B, F, E>(self, accum: B, f: F) -> TryRFoldFuture<Self, F, B>
where
Self: Sized,
F: FnMut(B, Self::Item) -> Result<B, E>,
{
TryRFoldFuture::new(self, accum, f)
}
}

@ -0,0 +1,19 @@
use core::pin::Pin;
use core::future::Future;
use crate::stream::DoubleEndedStream;
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NextBackFuture<'a, T: Unpin + ?Sized> {
pub(crate) stream: &'a mut T,
}
impl<T: DoubleEndedStream + Unpin + ?Sized> Future for NextBackFuture<'_, T> {
type Output = Option<T::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.stream).poll_next_back(cx)
}
}

@ -0,0 +1,41 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use crate::stream::DoubleEndedStream;
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NthBackFuture<'a, S> {
stream: &'a mut S,
n: usize,
}
impl<'a, S> NthBackFuture<'a, S> {
pub(crate) fn new(stream: &'a mut S, n: usize) -> Self {
NthBackFuture { stream, n }
}
}
impl<'a, S> Future for NthBackFuture<'a, S>
where
S: DoubleEndedStream + Sized + Unpin,
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx));
match next {
Some(v) => match self.n {
0 => Poll::Ready(Some(v)),
_ => {
self.n -= 1;
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}

@ -0,0 +1,41 @@
use core::task::{Context, Poll};
use core::future::Future;
use core::pin::Pin;
use crate::stream::DoubleEndedStream;
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct RFindFuture<'a, S, P> {
stream: &'a mut S,
p: P,
}
impl<'a, S, P> RFindFuture<'a, S, P> {
pub(super) fn new(stream: &'a mut S, p: P) -> Self {
RFindFuture { stream, p }
}
}
impl<S: Unpin, P> Unpin for RFindFuture<'_, S, P> {}
impl<'a, S, P> Future for RFindFuture<'a, S, P>
where
S: DoubleEndedStream + Unpin + Sized,
P: FnMut(&S::Item) -> bool,
{
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next_back(cx));
match item {
Some(v) if (&mut self.p)(&v) => Poll::Ready(Some(v)),
Some(_) => {
cx.waker().wake_by_ref();
Poll::Pending
}
None => Poll::Ready(None),
}
}
}

@ -0,0 +1,52 @@
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;
use crate::stream::DoubleEndedStream;
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct RFoldFuture<S, F, B> {
#[pin]
stream: S,
f: F,
acc: Option<B>,
}
}
impl<S, F, B> RFoldFuture<S, F, B> {
pub(super) fn new(stream: S, init: B, f: F) -> Self {
RFoldFuture {
stream,
f,
acc: Some(init),
}
}
}
impl<S, F, B> Future for RFoldFuture<S, F, B>
where
S: DoubleEndedStream + Sized,
F: FnMut(B, S::Item) -> B,
{
type Output = B;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx));
match next {
Some(v) => {
let old = this.acc.take().unwrap();
let new = (this.f)(old, v);
*this.acc = Some(new);
}
None => return Poll::Ready(this.acc.take().unwrap()),
}
}
}
}

@ -0,0 +1,56 @@
use crate::future::Future;
use core::pin::Pin;
use crate::task::{Context, Poll};
use pin_project_lite::pin_project;
use crate::stream::DoubleEndedStream;
pin_project! {
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct TryRFoldFuture<S, F, T> {
#[pin]
stream: S,
f: F,
acc: Option<T>,
}
}
impl<S, F, T> TryRFoldFuture<S, F, T> {
pub(super) fn new(stream: S, init: T, f: F) -> Self {
TryRFoldFuture {
stream,
f,
acc: Some(init),
}
}
}
impl<S, F, T, E> Future for TryRFoldFuture<S, F, T>
where
S: DoubleEndedStream + Unpin,
F: FnMut(T, S::Item) -> Result<T, E>,
{
type Output = Result<T, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
let next = futures_core::ready!(this.stream.as_mut().poll_next_back(cx));
match next {
Some(v) => {
let old = this.acc.take().unwrap();
let new = (this.f)(old, v);
match new {
Ok(o) => *this.acc = Some(o),
Err(e) => return Poll::Ready(Err(e)),
}
}
None => return Poll::Ready(Ok(this.acc.take().unwrap())),
}
}
}
}

@ -1,5 +1,5 @@
use std::marker::PhantomData; use core::marker::PhantomData;
use std::pin::Pin; use core::pin::Pin;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,6 +1,6 @@
use std::pin::Pin; use core::pin::Pin;
use core::future::Future;
use crate::prelude::*;
use crate::stream::IntoStream; use crate::stream::IntoStream;
/// Extends a collection with the contents of a stream. /// Extends a collection with the contents of a stream.

@ -1,4 +1,4 @@
use std::pin::Pin; use core::pin::Pin;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,8 +1,10 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
#[cfg(feature = "unstable")]
use crate::stream::double_ended_stream::DoubleEndedStream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
@ -51,3 +53,10 @@ impl<I: Iterator> Stream for FromIter<I> {
Poll::Ready(self.iter.next()) Poll::Ready(self.iter.next())
} }
} }
#[cfg(feature = "unstable")]
impl<T: DoubleEndedIterator> DoubleEndedStream for FromIter<T> {
fn poll_next_back(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<T::Item>> {
Poll::Ready(self.iter.next_back())
}
}

@ -1,5 +1,5 @@
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use crate::stream::IntoStream; use crate::stream::IntoStream;

@ -2,10 +2,10 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crate::future::Future;
use crate::stream::Stream;
use futures_timer::Delay; use futures_timer::Delay;
use crate::prelude::*;
/// Creates a new stream that yields at a set interval. /// Creates a new stream that yields at a set interval.
/// ///
/// The stream first yields after `dur`, and continues to yield every /// The stream first yields after `dur`, and continues to yield every
@ -85,7 +85,7 @@ impl Stream for Interval {
/// While technically for large duration it's impossible to represent any /// While technically for large duration it's impossible to represent any
/// duration as nanoseconds, the largest duration we can represent is about /// duration as nanoseconds, the largest duration we can represent is about
/// 427_000 years. Large enough for any interval we would use or calculate in /// 427_000 years. Large enough for any interval we would use or calculate in
/// tokio. /// async-std.
fn duration_to_nanos(dur: Duration) -> Option<u64> { fn duration_to_nanos(dur: Duration) -> Option<u64> {
dur.as_secs() dur.as_secs()
.checked_mul(1_000_000_000) .checked_mul(1_000_000_000)

@ -325,6 +325,7 @@ cfg_unstable! {
mod fused_stream; mod fused_stream;
mod interval; mod interval;
mod into_stream; mod into_stream;
mod pending;
mod product; mod product;
mod successors; mod successors;
mod sum; mod sum;
@ -336,6 +337,7 @@ cfg_unstable! {
pub use fused_stream::FusedStream; pub use fused_stream::FusedStream;
pub use interval::{interval, Interval}; pub use interval::{interval, Interval};
pub use into_stream::IntoStream; pub use into_stream::IntoStream;
pub use pending::{pending, Pending};
pub use product::Product; pub use product::Product;
pub use stream::Merge; pub use stream::Merge;
pub use successors::{successors, Successors}; pub use successors::{successors, Successors};

@ -1,10 +1,13 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
#[cfg(feature = "unstable")]
use crate::stream::DoubleEndedStream;
/// Creates a stream that yields a single item. /// Creates a stream that yields a single item.
/// ///
/// # Examples /// # Examples
@ -46,3 +49,10 @@ impl<T> Stream for Once<T> {
Poll::Ready(self.project().value.take()) Poll::Ready(self.project().value.take())
} }
} }
#[cfg(feature = "unstable")]
impl <T> DoubleEndedStream for Once<T> {
fn poll_next_back(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.project().value.take())
}
}

@ -0,0 +1,68 @@
use core::marker::PhantomData;
use core::pin::Pin;
use core::task::{Context, Poll};
use crate::stream::{DoubleEndedStream, ExactSizeStream, FusedStream, Stream};
/// A stream that never returns any items.
///
/// This stream is created by the [`pending`] function. See its
/// documentation for more.
///
/// [`pending`]: fn.pending.html
#[derive(Debug)]
pub struct Pending<T> {
_marker: PhantomData<T>,
}
/// Creates a stream that never returns any items.
///
/// The returned stream will always return `Pending` when polled.
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// #
/// use std::time::Duration;
///
/// use async_std::prelude::*;
/// use async_std::stream;
///
/// let dur = Duration::from_millis(100);
/// let mut s = stream::pending::<()>().timeout(dur);
///
/// let item = s.next().await;
///
/// assert!(item.is_some());
/// assert!(item.unwrap().is_err());
///
/// #
/// # })
/// ```
pub fn pending<T>() -> Pending<T> {
Pending {
_marker: PhantomData,
}
}
impl<T> Stream for Pending<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Pending
}
}
impl<T> DoubleEndedStream for Pending<T> {
fn poll_next_back(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Pending
}
}
impl<T> FusedStream for Pending<T> {}
impl<T> ExactSizeStream for Pending<T> {
fn len(&self) -> usize {
0
}
}

@ -1,5 +1,5 @@
use std::pin::Pin; use core::pin::Pin;
use std::future::Future; use core::future::Future;
use crate::stream::Stream; use crate::stream::Stream;

@ -1,4 +1,4 @@
use std::pin::Pin; use core::pin::Pin;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,4 +1,4 @@
use std::pin::Pin; use core::pin::Pin;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,6 +1,6 @@
use std::marker::PhantomData; use core::marker::PhantomData;
use std::pin::Pin; use core::pin::Pin;
use std::future::Future; use core::future::Future;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,6 +1,6 @@
use std::marker::PhantomData; use core::marker::PhantomData;
use std::pin::Pin; use core::pin::Pin;
use std::future::Future; use core::future::Future;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,9 +1,10 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use super::fuse::Fuse; use super::fuse::Fuse;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {

@ -1,7 +1,7 @@
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use std::pin::Pin; use core::pin::Pin;
pin_project! { pin_project! {
/// A stream that clones the elements of an underlying stream. /// A stream that clones the elements of an underlying stream.

@ -1,11 +1,11 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::pin::Pin; use core::future::Future;
use std::future::Future; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use super::fuse::Fuse; use super::fuse::Fuse;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,7 +1,7 @@
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use std::pin::Pin; use core::pin::Pin;
pin_project! { pin_project! {
/// A stream that copies the elements of an underlying stream. /// A stream that copies the elements of an underlying stream.

@ -1,5 +1,5 @@
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
@ -9,7 +9,7 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
#[cfg(all(feature = "default", feature = "unstable"))] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub struct CountFuture<S> { pub struct CountFuture<S> {
#[pin] #[pin]

@ -1,5 +1,5 @@
use std::mem::ManuallyDrop; use core::mem::ManuallyDrop;
use std::pin::Pin; use core::pin::Pin;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,6 +1,6 @@
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use std::time::Duration; use core::time::Duration;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,4 +1,4 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,10 +1,10 @@
use std::pin::Pin; use core::future::Future;
use std::future::Future; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use super::fuse::Fuse; use super::fuse::Fuse;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,4 +1,4 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,5 +1,5 @@
use std::pin::Pin; use core::pin::Pin;
use std::task::{Context, Poll}; use core::task::{Context, Poll};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,5 +1,5 @@
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,6 +1,6 @@
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use std::task::{Context, Poll}; use core::task::{Context, Poll};
use crate::stream::Stream; use crate::stream::Stream;

@ -1,9 +1,9 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::prelude::*;
use crate::stream::stream::map::Map; use crate::stream::stream::map::Map;
use crate::stream::stream::StreamExt;
use crate::stream::{IntoStream, Stream}; use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
@ -41,7 +41,6 @@ where
impl<S, U, F> Stream for FlatMap<S, U, F> impl<S, U, F> Stream for FlatMap<S, U, F>
where where
S: Stream, S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: Stream, U: Stream,
F: FnMut(S::Item) -> U, F: FnMut(S::Item) -> U,
{ {

@ -1,5 +1,5 @@
use std::fmt; use core::fmt;
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,5 +1,5 @@
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,5 +1,5 @@
use std::pin::Pin; use core::pin::Pin;
use std::future::Future; use core::future::Future;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,4 +1,4 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,11 +1,11 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::pin::Pin; use core::future::Future;
use std::future::Future; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use super::partial_cmp::PartialCmpFuture; use super::partial_cmp::PartialCmpFuture;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,11 +1,11 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::pin::Pin; use core::future::Future;
use std::future::Future; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use super::partial_cmp::PartialCmpFuture; use super::partial_cmp::PartialCmpFuture;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,4 +1,4 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,5 +1,5 @@
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,11 +1,11 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::pin::Pin; use core::future::Future;
use std::future::Future; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use super::partial_cmp::PartialCmpFuture; use super::partial_cmp::PartialCmpFuture;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,11 +1,11 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::pin::Pin; use core::future::Future;
use std::future::Future; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use super::partial_cmp::PartialCmpFuture; use super::partial_cmp::PartialCmpFuture;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,4 +1,4 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,7 +1,7 @@
use std::cmp::{Ord, Ordering}; use core::cmp::{Ord, Ordering};
use std::marker::PhantomData; use core::marker::PhantomData;
use std::pin::Pin; use core::pin::Pin;
use std::future::Future; use core::future::Future;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,6 +1,6 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::pin::Pin; use core::pin::Pin;
use std::future::Future; use core::future::Future;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,6 +1,6 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,10 +1,11 @@
use std::pin::Pin; use core::pin::Pin;
use std::task::{Context, Poll}; use core::task::{Context, Poll};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Fuse; use crate::stream::Fuse;
use crate::stream::Stream;
use crate::utils; use crate::utils;
pin_project! { pin_project! {

@ -1,7 +1,7 @@
use std::cmp::{Ord, Ordering}; use core::cmp::{Ord, Ordering};
use std::marker::PhantomData; use core::marker::PhantomData;
use std::pin::Pin; use core::pin::Pin;
use std::future::Future; use core::future::Future;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,6 +1,6 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::pin::Pin; use core::pin::Pin;
use std::future::Future; use core::future::Future;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,6 +1,6 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -110,12 +110,12 @@ pub use take::Take;
pub use take_while::TakeWhile; pub use take_while::TakeWhile;
pub use zip::Zip; pub use zip::Zip;
use std::cmp::Ordering; use core::cmp::Ordering;
cfg_unstable! { cfg_unstable! {
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use std::time::Duration; use core::time::Duration;
use crate::stream::into_stream::IntoStream; use crate::stream::into_stream::IntoStream;
use crate::stream::{FromStream, Product, Sum}; use crate::stream::{FromStream, Product, Sum};
@ -350,12 +350,12 @@ extension_trait! {
assert!(start.elapsed().as_millis() >= 15); assert!(start.elapsed().as_millis() >= 15);
s.next().await; s.next().await;
assert!(start.elapsed().as_millis() >= 35); assert!(start.elapsed().as_millis() >= 25);
# #
# }) } # }) }
``` ```
"#] "#]
#[cfg(all(feature = "default", feature = "unstable"))] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn throttle(self, d: Duration) -> Throttle<Self> fn throttle(self, d: Duration) -> Throttle<Self>
where where
@ -587,13 +587,13 @@ extension_trait! {
assert_eq!(s.next().await, Some(1)); assert_eq!(s.next().await, Some(1));
// There will be no delay after the first time. // There will be no delay after the first time.
assert!(start.elapsed().as_millis() <= 210); assert!(start.elapsed().as_millis() < 400);
assert_eq!(s.next().await, Some(2)); assert_eq!(s.next().await, Some(2));
assert!(start.elapsed().as_millis() <= 210); assert!(start.elapsed().as_millis() < 400);
assert_eq!(s.next().await, None); assert_eq!(s.next().await, None);
assert!(start.elapsed().as_millis() <= 210); assert!(start.elapsed().as_millis() < 400);
# #
# }) } # }) }
``` ```
@ -695,7 +695,7 @@ extension_trait! {
# }) } # }) }
``` ```
An empty stream will return `None: An empty stream will return `None`:
``` ```
# fn main() { async_std::task::block_on(async { # fn main() { async_std::task::block_on(async {
# #
@ -791,18 +791,22 @@ extension_trait! {
# async_std::task::block_on(async { # async_std::task::block_on(async {
use async_std::prelude::*; use async_std::prelude::*;
use async_std::stream::IntoStream;
use async_std::stream; use async_std::stream;
let inner1 = stream::from_iter(vec![1,2,3]); let words = stream::from_iter(&["alpha", "beta", "gamma"]);
let inner2 = stream::from_iter(vec![4,5,6]);
let s = stream::from_iter(vec![inner1, inner2]); let merged: String = words
.flat_map(|s| stream::from_iter(s.chars()))
.collect().await;
assert_eq!(merged, "alphabetagamma");
let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await; let d3 = stream::from_iter(&[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]);
let d1: Vec<_> = d3
assert_eq!(v, vec![1,2,3,4,5,6]); .flat_map(|item| stream::from_iter(item))
.flat_map(|item| stream::from_iter(item))
.collect().await;
assert_eq!(d1, [&1, &2, &3, &4, &5, &6, &7, &8]);
# }); # });
``` ```
"#] "#]
@ -1507,7 +1511,7 @@ extension_trait! {
# }) } # }) }
``` ```
"#] "#]
#[cfg(all(feature = "default", feature = "unstable"))] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn by_ref(&mut self) -> &mut Self { fn by_ref(&mut self) -> &mut Self {
self self
@ -1641,6 +1645,13 @@ extension_trait! {
while let Some(v) = s.next().await { while let Some(v) = s.next().await {
assert_eq!(v, Ok(1)); assert_eq!(v, Ok(1));
} }
// when timeout
let mut s = stream::pending::<()>().timeout(Duration::from_millis(10));
match s.next().await {
Some(item) => assert!(item.is_err()),
None => panic!()
};
# #
# Ok(()) }) } # Ok(()) }) }
``` ```

@ -1,10 +1,10 @@
use std::pin::Pin; use core::future::Future;
use std::future::Future; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use super::fuse::Fuse; use super::fuse::Fuse;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,5 +1,5 @@
use std::pin::Pin; use core::pin::Pin;
use std::future::Future; use core::future::Future;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,6 +1,6 @@
use std::pin::Pin; use core::pin::Pin;
use std::task::{Context, Poll}; use core::task::{Context, Poll};
use std::future::Future; use core::future::Future;
use crate::stream::Stream; use crate::stream::Stream;

@ -1,11 +1,11 @@
use std::cmp::Ordering; use core::cmp::Ordering;
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use super::fuse::Fuse; use super::fuse::Fuse;
use crate::prelude::*; use crate::stream::stream::StreamExt;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,14 +1,14 @@
use pin_project_lite::pin_project; use pin_project_lite::pin_project;
use std::default::Default; use core::default::Default;
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
pin_project! { pin_project! {
#[derive(Debug)] #[derive(Debug)]
#[cfg(all(feature = "default", feature = "unstable"))] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub struct PartitionFuture<S, F, B> { pub struct PartitionFuture<S, F, B> {
#[pin] #[pin]

@ -1,5 +1,5 @@
use std::future::Future; use core::future::Future;
use std::pin::Pin; use core::pin::Pin;
use crate::stream::Stream; use crate::stream::Stream;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -1,4 +1,4 @@
use std::pin::Pin; use core::pin::Pin;
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

@ -1,5 +1,5 @@
use std::pin::Pin; use core::pin::Pin;
use std::task::{Context, Poll}; use core::task::{Context, Poll};
use pin_project_lite::pin_project; use pin_project_lite::pin_project;

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save