mirror of
https://github.com/async-rs/async-std.git
synced 2025-01-16 02:39:55 +00:00
Merge branch 'master' into add_stream_flatten
This commit is contained in:
commit
040227f38a
52 changed files with 811 additions and 82 deletions
28
.github/workflows/ci.yml
vendored
28
.github/workflows/ci.yml
vendored
|
@ -7,12 +7,13 @@ on:
|
|||
- staging
|
||||
- trying
|
||||
|
||||
env:
|
||||
RUSTFLAGS: -Dwarnings
|
||||
|
||||
jobs:
|
||||
build_and_test:
|
||||
name: Build and test
|
||||
runs-on: ${{ matrix.os }}
|
||||
env:
|
||||
RUSTFLAGS: -Dwarnings
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ubuntu-latest, windows-latest, macOS-latest]
|
||||
|
@ -48,8 +49,6 @@ jobs:
|
|||
check_fmt_and_docs:
|
||||
name: Checking fmt and docs
|
||||
runs-on: ubuntu-latest
|
||||
env:
|
||||
RUSTFLAGS: -Dwarnings
|
||||
steps:
|
||||
- uses: actions/checkout@master
|
||||
|
||||
|
@ -81,20 +80,11 @@ jobs:
|
|||
clippy_check:
|
||||
name: Clippy check
|
||||
runs-on: ubuntu-latest
|
||||
# TODO: There is a lot of warnings
|
||||
# env:
|
||||
# RUSTFLAGS: -Dwarnings
|
||||
steps:
|
||||
- uses: actions/checkout@v1
|
||||
- id: component
|
||||
uses: actions-rs/components-nightly@v1
|
||||
with:
|
||||
component: clippy
|
||||
- uses: actions-rs/toolchain@v1
|
||||
with:
|
||||
toolchain: ${{ steps.component.outputs.toolchain }}
|
||||
override: true
|
||||
- run: rustup component add clippy
|
||||
- uses: actions-rs/clippy-check@v1
|
||||
with:
|
||||
token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- name: Install rust
|
||||
run: rustup update beta && rustup default beta
|
||||
- name: Install clippy
|
||||
run: rustup component add clippy
|
||||
- name: clippy
|
||||
run: cargo clippy --all --features unstable
|
||||
|
|
|
@ -9,7 +9,7 @@ authors = [
|
|||
edition = "2018"
|
||||
license = "Apache-2.0/MIT"
|
||||
repository = "https://github.com/async-rs/async-std"
|
||||
homepage = "https://github.com/async-rs/async-std"
|
||||
homepage = "https://async.rs"
|
||||
documentation = "https://docs.rs/async-std"
|
||||
description = "Async version of the Rust standard library"
|
||||
keywords = ["async", "await", "future", "std", "task"]
|
||||
|
|
|
@ -24,11 +24,7 @@ To sum up: Rust gives us the ability to safely abstract over important propertie
|
|||
|
||||
## An easy view of computation
|
||||
|
||||
While computation is a subject to write a whole [book](https://computationbook.com/) about, a very simplified view suffices for us:
|
||||
|
||||
- computation is a sequence of composable operations
|
||||
- they can branch based on a decision
|
||||
- they either run to succession and yield a result, or they can yield an error
|
||||
While computation is a subject to write a whole [book](https://computationbook.com/) about, a very simplified view suffices for us: A sequence of composable operations which can branch based on a decision, run to succession and yield a result or yield an error
|
||||
|
||||
## Deferring computation
|
||||
|
||||
|
@ -136,11 +132,11 @@ When executing 2 or more of these functions at the same time, our runtime system
|
|||
|
||||
## Conclusion
|
||||
|
||||
Working from values, we searched for something that expresses *working towards a value available sometime later*. From there, we talked about the concept of polling.
|
||||
Working from values, we searched for something that expresses *working towards a value available later*. From there, we talked about the concept of polling.
|
||||
|
||||
A `Future` is any data type that does not represent a value, but the ability to *produce a value at some point in the future*. Implementations of this are very varied and detailed depending on use-case, but the interface is simple.
|
||||
|
||||
Next, we will introduce you to `tasks`, which we need to actually *run* Futures.
|
||||
Next, we will introduce you to `tasks`, which we will use to actually *run* Futures.
|
||||
|
||||
[^1]: Two parties reading while it is guaranteed that no one is writing is always safe.
|
||||
|
||||
|
|
|
@ -80,7 +80,7 @@ Tasks in `async_std` are one of the core abstractions. Much like Rust's `thread`
|
|||
|
||||
## Blocking
|
||||
|
||||
`Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and by itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this:
|
||||
`Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and of itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this:
|
||||
|
||||
```rust,edition2018
|
||||
# extern crate async_std;
|
||||
|
|
|
@ -4,4 +4,4 @@
|
|||
|
||||
`async-std` provides an interface to all important primitives: filesystem operations, network operations and concurrency basics like timers. It also exposes a `task` in a model similar to the `thread` module found in the Rust standard lib. But it does not only include I/O primitives, but also `async/await` compatible versions of primitives like `Mutex`.
|
||||
|
||||
[organization]: https://github.com/async-rs/async-std
|
||||
[organization]: https://github.com/async-rs
|
||||
|
|
|
@ -31,7 +31,7 @@ In general, this crate will be conservative with respect to the minimum supporte
|
|||
|
||||
## Security fixes
|
||||
|
||||
Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give at least _3 month_ of ahead notice.
|
||||
Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give a notice at least _3 months_ ahead.
|
||||
|
||||
## Credits
|
||||
|
||||
|
|
|
@ -8,6 +8,6 @@ fn main() -> Result<()> {
|
|||
match (args.nth(1).as_ref().map(String::as_str), args.next()) {
|
||||
(Some("client"), None) => client::main(),
|
||||
(Some("server"), None) => server::main(),
|
||||
_ => Err("Usage: a-chat [client|server]")?,
|
||||
_ => Err("Usage: a-chat [client|server]".into()),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
|
|||
let mut lines = reader.lines();
|
||||
|
||||
let name = match lines.next().await {
|
||||
None => Err("peer disconnected immediately")?,
|
||||
None => return Err("peer disconnected immediately".into()),
|
||||
Some(line) => line?,
|
||||
};
|
||||
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
|
||||
|
|
|
@ -66,6 +66,23 @@ pub struct File {
|
|||
}
|
||||
|
||||
impl File {
|
||||
/// Creates an async file handle.
|
||||
pub(crate) fn new(file: std::fs::File, is_flushed: bool) -> File {
|
||||
let file = Arc::new(file);
|
||||
|
||||
File {
|
||||
file: file.clone(),
|
||||
lock: Lock::new(State {
|
||||
file,
|
||||
mode: Mode::Idle,
|
||||
cache: Vec::new(),
|
||||
is_flushed,
|
||||
last_read_err: None,
|
||||
last_write_err: None,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Opens a file in read-only mode.
|
||||
///
|
||||
/// See the [`OpenOptions::open`] function for more options.
|
||||
|
@ -96,7 +113,7 @@ impl File {
|
|||
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
|
||||
let path = path.as_ref().to_owned();
|
||||
let file = blocking::spawn(move || std::fs::File::open(&path)).await?;
|
||||
Ok(file.into())
|
||||
Ok(File::new(file, true))
|
||||
}
|
||||
|
||||
/// Opens a file in write-only mode.
|
||||
|
@ -131,7 +148,7 @@ impl File {
|
|||
pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
|
||||
let path = path.as_ref().to_owned();
|
||||
let file = blocking::spawn(move || std::fs::File::create(&path)).await?;
|
||||
Ok(file.into())
|
||||
Ok(File::new(file, true))
|
||||
}
|
||||
|
||||
/// Synchronizes OS-internal buffered contents and metadata to disk.
|
||||
|
@ -383,19 +400,7 @@ impl Seek for &File {
|
|||
|
||||
impl From<std::fs::File> for File {
|
||||
fn from(file: std::fs::File) -> File {
|
||||
let file = Arc::new(file);
|
||||
|
||||
File {
|
||||
file: file.clone(),
|
||||
lock: Lock::new(State {
|
||||
file,
|
||||
mode: Mode::Idle,
|
||||
cache: Vec::new(),
|
||||
is_flushed: false,
|
||||
last_read_err: None,
|
||||
last_write_err: None,
|
||||
}),
|
||||
}
|
||||
File::new(file, false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -284,7 +284,10 @@ impl OpenOptions {
|
|||
pub fn open<P: AsRef<Path>>(&self, path: P) -> impl Future<Output = io::Result<File>> {
|
||||
let path = path.as_ref().to_owned();
|
||||
let options = self.0.clone();
|
||||
async move { blocking::spawn(move || options.open(path).map(|f| f.into())).await }
|
||||
async move {
|
||||
let file = blocking::spawn(move || options.open(path)).await?;
|
||||
Ok(File::new(file, true))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,9 +28,10 @@ pub fn empty() -> Empty {
|
|||
|
||||
/// A reader that contains no data.
|
||||
///
|
||||
/// This reader is constructed by the [`sink`] function.
|
||||
/// This reader is created by the [`empty`] function. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`sink`]: fn.sink.html
|
||||
/// [`empty`]: fn.empty.html
|
||||
pub struct Empty {
|
||||
_private: (),
|
||||
}
|
||||
|
|
|
@ -29,7 +29,8 @@ pub fn repeat(byte: u8) -> Repeat {
|
|||
|
||||
/// A reader which yields one byte over and over and over and over and over and...
|
||||
///
|
||||
/// This reader is constructed by the [`repeat`] function.
|
||||
/// This reader is created by the [`repeat`] function. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`repeat`]: fn.repeat.html
|
||||
pub struct Repeat {
|
||||
|
|
|
@ -25,7 +25,8 @@ pub fn sink() -> Sink {
|
|||
|
||||
/// A writer that consumes and drops all data.
|
||||
///
|
||||
/// This writer is constructed by the [`sink`] function.
|
||||
/// This writer is constructed by the [`sink`] function. See its documentation
|
||||
/// for more.
|
||||
///
|
||||
/// [`sink`]: fn.sink.html
|
||||
pub struct Sink {
|
||||
|
|
|
@ -11,6 +11,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll};
|
|||
///
|
||||
/// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html
|
||||
///
|
||||
/// ### Note: Windows Portability Consideration
|
||||
///
|
||||
/// When operating in a console, the Windows implementation of this stream does not support
|
||||
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
|
||||
/// an error.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
|
@ -34,12 +40,16 @@ pub fn stderr() -> Stderr {
|
|||
|
||||
/// A handle to the standard error of the current process.
|
||||
///
|
||||
/// Created by the [`stderr`] function.
|
||||
/// This writer is created by the [`stderr`] function. See its documentation for
|
||||
/// more.
|
||||
///
|
||||
/// This type is an async version of [`std::io::Stderr`].
|
||||
/// ### Note: Windows Portability Consideration
|
||||
///
|
||||
/// When operating in a console, the Windows implementation of this stream does not support
|
||||
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
|
||||
/// an error.
|
||||
///
|
||||
/// [`stderr`]: fn.stderr.html
|
||||
/// [`std::io::Stderr`]: https://doc.rust-lang.org/std/io/struct.Stderr.html
|
||||
#[derive(Debug)]
|
||||
pub struct Stderr(Mutex<State>);
|
||||
|
||||
|
|
|
@ -11,6 +11,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll};
|
|||
///
|
||||
/// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html
|
||||
///
|
||||
/// ### Note: Windows Portability Consideration
|
||||
///
|
||||
/// When operating in a console, the Windows implementation of this stream does not support
|
||||
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
|
||||
/// an error.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
|
@ -35,12 +41,16 @@ pub fn stdin() -> Stdin {
|
|||
|
||||
/// A handle to the standard input of the current process.
|
||||
///
|
||||
/// Created by the [`stdin`] function.
|
||||
/// This reader is created by the [`stdin`] function. See its documentation for
|
||||
/// more.
|
||||
///
|
||||
/// This type is an async version of [`std::io::Stdin`].
|
||||
/// ### Note: Windows Portability Consideration
|
||||
///
|
||||
/// When operating in a console, the Windows implementation of this stream does not support
|
||||
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
|
||||
/// an error.
|
||||
///
|
||||
/// [`stdin`]: fn.stdin.html
|
||||
/// [`std::io::Stdin`]: https://doc.rust-lang.org/std/io/struct.Stdin.html
|
||||
#[derive(Debug)]
|
||||
pub struct Stdin(Mutex<State>);
|
||||
|
||||
|
|
|
@ -11,6 +11,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll};
|
|||
///
|
||||
/// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html
|
||||
///
|
||||
/// ### Note: Windows Portability Consideration
|
||||
///
|
||||
/// When operating in a console, the Windows implementation of this stream does not support
|
||||
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
|
||||
/// an error.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```no_run
|
||||
|
@ -34,12 +40,16 @@ pub fn stdout() -> Stdout {
|
|||
|
||||
/// A handle to the standard output of the current process.
|
||||
///
|
||||
/// Created by the [`stdout`] function.
|
||||
/// This writer is created by the [`stdout`] function. See its documentation
|
||||
/// for more.
|
||||
///
|
||||
/// This type is an async version of [`std::io::Stdout`].
|
||||
/// ### Note: Windows Portability Consideration
|
||||
///
|
||||
/// When operating in a console, the Windows implementation of this stream does not support
|
||||
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
|
||||
/// an error.
|
||||
///
|
||||
/// [`stdout`]: fn.stdout.html
|
||||
/// [`std::io::Stdout`]: https://doc.rust-lang.org/std/io/struct.Stdout.html
|
||||
#[derive(Debug)]
|
||||
pub struct Stdout(Mutex<State>);
|
||||
|
||||
|
|
|
@ -43,6 +43,7 @@
|
|||
|
||||
#![cfg_attr(feature = "docs", feature(doc_cfg))]
|
||||
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
|
||||
#![allow(clippy::mutex_atomic, clippy::module_inception)]
|
||||
#![doc(test(attr(deny(rust_2018_idioms, warnings))))]
|
||||
#![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
|
||||
#![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")]
|
||||
|
|
|
@ -7,3 +7,8 @@ mod from_stream;
|
|||
|
||||
#[doc(inline)]
|
||||
pub use std::option::Option;
|
||||
|
||||
cfg_unstable! {
|
||||
mod product;
|
||||
mod sum;
|
||||
}
|
||||
|
|
66
src/option/product.rs
Normal file
66
src/option/product.rs
Normal file
|
@ -0,0 +1,66 @@
|
|||
use std::pin::Pin;
|
||||
|
||||
use crate::prelude::*;
|
||||
use crate::stream::{Stream, Product};
|
||||
|
||||
impl<T, U> Product<Option<U>> for Option<T>
|
||||
where
|
||||
T: Product<U>,
|
||||
{
|
||||
#[doc = r#"
|
||||
Takes each element in the `Stream`: if it is a `None`, no further
|
||||
elements are taken, and the `None` is returned. Should no `None` occur,
|
||||
the product of all elements is returned.
|
||||
|
||||
# Examples
|
||||
|
||||
This multiplies every integer in a vector, rejecting the product if a negative element is
|
||||
encountered:
|
||||
|
||||
```
|
||||
# fn main() { async_std::task::block_on(async {
|
||||
#
|
||||
use std::collections::VecDeque;
|
||||
use async_std::prelude::*;
|
||||
|
||||
let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
|
||||
let prod: Option<i32> = v.map(|x|
|
||||
if x < 0 {
|
||||
None
|
||||
} else {
|
||||
Some(x)
|
||||
}).product().await;
|
||||
assert_eq!(prod, Some(8));
|
||||
#
|
||||
# }) }
|
||||
```
|
||||
"#]
|
||||
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
|
||||
where S: Stream<Item = Option<U>> + 'a
|
||||
{
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_none = false;
|
||||
let out = <T as Product<U>>::product(stream
|
||||
.scan((), |_, elem| {
|
||||
match elem {
|
||||
Some(elem) => Some(elem),
|
||||
None => {
|
||||
found_none = true;
|
||||
// Stop processing the stream on error
|
||||
None
|
||||
}
|
||||
}
|
||||
})).await;
|
||||
|
||||
if found_none {
|
||||
None
|
||||
} else {
|
||||
Some(out)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
63
src/option/sum.rs
Normal file
63
src/option/sum.rs
Normal file
|
@ -0,0 +1,63 @@
|
|||
use std::pin::Pin;
|
||||
|
||||
use crate::prelude::*;
|
||||
use crate::stream::{Stream, Sum};
|
||||
|
||||
impl<T, U> Sum<Option<U>> for Option<T>
|
||||
where
|
||||
T: Sum<U>,
|
||||
{
|
||||
#[doc = r#"
|
||||
Takes each element in the `Iterator`: if it is a `None`, no further
|
||||
elements are taken, and the `None` is returned. Should no `None` occur,
|
||||
the sum of all elements is returned.
|
||||
|
||||
# Examples
|
||||
|
||||
This sums up the position of the character 'a' in a vector of strings,
|
||||
if a word did not have the character 'a' the operation returns `None`:
|
||||
|
||||
```
|
||||
# fn main() { async_std::task::block_on(async {
|
||||
#
|
||||
use std::collections::VecDeque;
|
||||
use async_std::prelude::*;
|
||||
|
||||
let words: VecDeque<_> = vec!["have", "a", "great", "day"]
|
||||
.into_iter()
|
||||
.collect();
|
||||
let total: Option<usize> = words.map(|w| w.find('a')).sum().await;
|
||||
assert_eq!(total, Some(5));
|
||||
#
|
||||
# }) }
|
||||
```
|
||||
"#]
|
||||
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
|
||||
where S: Stream<Item = Option<U>> + 'a
|
||||
{
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_none = false;
|
||||
let out = <T as Sum<U>>::sum(stream
|
||||
.scan((), |_, elem| {
|
||||
match elem {
|
||||
Some(elem) => Some(elem),
|
||||
None => {
|
||||
found_none = true;
|
||||
// Stop processing the stream on error
|
||||
None
|
||||
}
|
||||
}
|
||||
})).await;
|
||||
|
||||
if found_none {
|
||||
None
|
||||
} else {
|
||||
Some(out)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,6 +1,12 @@
|
|||
use std::ffi::{OsStr, OsString};
|
||||
#[cfg(feature = "unstable")]
|
||||
use std::pin::Pin;
|
||||
|
||||
use crate::path::Path;
|
||||
#[cfg(feature = "unstable")]
|
||||
use crate::prelude::*;
|
||||
#[cfg(feature = "unstable")]
|
||||
use crate::stream::{Extend, FromStream, IntoStream};
|
||||
|
||||
/// This struct is an async version of [`std::path::PathBuf`].
|
||||
///
|
||||
|
@ -233,3 +239,44 @@ impl AsRef<std::path::Path> for PathBuf {
|
|||
self.inner.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "unstable")]
|
||||
impl<P: AsRef<Path>> Extend<P> for PathBuf {
|
||||
fn stream_extend<'a, S: IntoStream<Item = P>>(
|
||||
&'a mut self,
|
||||
stream: S,
|
||||
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
|
||||
where
|
||||
P: 'a,
|
||||
<S as IntoStream>::IntoStream: 'a,
|
||||
{
|
||||
let stream = stream.into_stream();
|
||||
|
||||
//TODO: This can be added back in once this issue is resolved:
|
||||
// https://github.com/rust-lang/rust/issues/58234
|
||||
//self.reserve(stream.size_hint().0);
|
||||
|
||||
Box::pin(stream.for_each(move |item| self.push(item.as_ref())))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "unstable")]
|
||||
impl<'b, P: AsRef<Path> + 'b> FromStream<P> for PathBuf {
|
||||
#[inline]
|
||||
fn from_stream<'a, S: IntoStream<Item = P>>(
|
||||
stream: S,
|
||||
) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>
|
||||
where
|
||||
<S as IntoStream>::IntoStream: 'a,
|
||||
{
|
||||
let stream = stream.into_stream();
|
||||
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
let mut out = Self::new();
|
||||
out.stream_extend(stream).await;
|
||||
out
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,3 +7,8 @@ mod from_stream;
|
|||
|
||||
#[doc(inline)]
|
||||
pub use std::result::Result;
|
||||
|
||||
cfg_unstable! {
|
||||
mod product;
|
||||
mod sum;
|
||||
}
|
||||
|
|
64
src/result/product.rs
Normal file
64
src/result/product.rs
Normal file
|
@ -0,0 +1,64 @@
|
|||
use std::pin::Pin;
|
||||
|
||||
use crate::prelude::*;
|
||||
use crate::stream::{Stream, Product};
|
||||
|
||||
impl<T, U, E> Product<Result<U, E>> for Result<T, E>
|
||||
where
|
||||
T: Product<U>,
|
||||
{
|
||||
#[doc = r#"
|
||||
Takes each element in the `Stream`: if it is an `Err`, no further
|
||||
elements are taken, and the `Err` is returned. Should no `Err` occur,
|
||||
the product of all elements is returned.
|
||||
|
||||
# Examples
|
||||
|
||||
This multiplies every integer in a vector, rejecting the product if a negative element is
|
||||
encountered:
|
||||
|
||||
```
|
||||
# fn main() { async_std::task::block_on(async {
|
||||
#
|
||||
use std::collections::VecDeque;
|
||||
use async_std::prelude::*;
|
||||
|
||||
let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
|
||||
let res: Result<i32, &'static str> = v.map(|x|
|
||||
if x < 0 {
|
||||
Err("Negative element found")
|
||||
} else {
|
||||
Ok(x)
|
||||
}).product().await;
|
||||
assert_eq!(res, Ok(8));
|
||||
#
|
||||
# }) }
|
||||
```
|
||||
"#]
|
||||
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
|
||||
where S: Stream<Item = Result<U, E>> + 'a
|
||||
{
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_error = None;
|
||||
let out = <T as Product<U>>::product(stream
|
||||
.scan((), |_, elem| {
|
||||
match elem {
|
||||
Ok(elem) => Some(elem),
|
||||
Err(err) => {
|
||||
found_error = Some(err);
|
||||
// Stop processing the stream on error
|
||||
None
|
||||
}
|
||||
}
|
||||
})).await;
|
||||
match found_error {
|
||||
Some(err) => Err(err),
|
||||
None => Ok(out)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
64
src/result/sum.rs
Normal file
64
src/result/sum.rs
Normal file
|
@ -0,0 +1,64 @@
|
|||
use std::pin::Pin;
|
||||
|
||||
use crate::prelude::*;
|
||||
use crate::stream::{Stream, Sum};
|
||||
|
||||
impl<T, U, E> Sum<Result<U, E>> for Result<T, E>
|
||||
where
|
||||
T: Sum<U>,
|
||||
{
|
||||
#[doc = r#"
|
||||
Takes each element in the `Stream`: if it is an `Err`, no further
|
||||
elements are taken, and the `Err` is returned. Should no `Err` occur,
|
||||
the sum of all elements is returned.
|
||||
|
||||
# Examples
|
||||
|
||||
This sums up every integer in a vector, rejecting the sum if a negative
|
||||
element is encountered:
|
||||
|
||||
```
|
||||
# fn main() { async_std::task::block_on(async {
|
||||
#
|
||||
use std::collections::VecDeque;
|
||||
use async_std::prelude::*;
|
||||
|
||||
let v: VecDeque<_> = vec![1, 2].into_iter().collect();
|
||||
let res: Result<i32, &'static str> = v.map(|x|
|
||||
if x < 0 {
|
||||
Err("Negative element found")
|
||||
} else {
|
||||
Ok(x)
|
||||
}).sum().await;
|
||||
assert_eq!(res, Ok(3));
|
||||
#
|
||||
# }) }
|
||||
```
|
||||
"#]
|
||||
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
|
||||
where S: Stream<Item = Result<U, E>> + 'a
|
||||
{
|
||||
Box::pin(async move {
|
||||
pin_utils::pin_mut!(stream);
|
||||
|
||||
// Using `scan` here because it is able to stop the stream early
|
||||
// if a failure occurs
|
||||
let mut found_error = None;
|
||||
let out = <T as Sum<U>>::sum(stream
|
||||
.scan((), |_, elem| {
|
||||
match elem {
|
||||
Ok(elem) => Some(elem),
|
||||
Err(err) => {
|
||||
found_error = Some(err);
|
||||
// Stop processing the stream on error
|
||||
None
|
||||
}
|
||||
}
|
||||
})).await;
|
||||
match found_error {
|
||||
Some(err) => Err(err),
|
||||
None => Ok(out)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -6,6 +6,11 @@ use crate::task::{Context, Poll};
|
|||
|
||||
/// Creates a stream that doesn't yield any items.
|
||||
///
|
||||
/// This `struct` is created by the [`empty`] function. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`empty`]: fn.empty.html
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
|
|
|
@ -75,6 +75,7 @@ pub use crate::stream::Stream;
|
|||
/// assert_eq!(5, counter.len());
|
||||
/// # });
|
||||
/// ```
|
||||
#[allow(clippy::len_without_is_empty)] // ExactSizeIterator::is_empty is unstable
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
pub trait ExactSizeStream: Stream {
|
||||
|
|
|
@ -10,7 +10,8 @@ use crate::task::{Context, Poll};
|
|||
pin_project! {
|
||||
/// A stream that yields elements by calling a closure.
|
||||
///
|
||||
/// This stream is constructed by [`from_fn`] function.
|
||||
/// This stream is created by the [`from_fn`] function. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`from_fn`]: fn.from_fn.html
|
||||
#[derive(Debug)]
|
||||
|
|
|
@ -52,6 +52,10 @@ pub fn interval(dur: Duration) -> Interval {
|
|||
|
||||
/// A stream representing notifications at fixed interval
|
||||
///
|
||||
/// This stream is created by the [`interval`] function. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`interval`]: fn.interval.html
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[derive(Debug)]
|
||||
|
@ -111,6 +115,7 @@ fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::next_interval;
|
||||
use std::cmp::Ordering;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
struct Timeline(Instant);
|
||||
|
@ -134,12 +139,10 @@ mod test {
|
|||
// The math around Instant/Duration isn't 100% precise due to rounding
|
||||
// errors, see #249 for more info
|
||||
fn almost_eq(a: Instant, b: Instant) -> bool {
|
||||
if a == b {
|
||||
true
|
||||
} else if a > b {
|
||||
a - b < Duration::from_millis(1)
|
||||
} else {
|
||||
b - a < Duration::from_millis(1)
|
||||
match a.cmp(&b) {
|
||||
Ordering::Equal => true,
|
||||
Ordering::Greater => a - b < Duration::from_millis(1),
|
||||
Ordering::Less => b - a < Duration::from_millis(1),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,8 @@ pub fn once<T>(t: T) -> Once<T> {
|
|||
pin_project! {
|
||||
/// A stream that yields a single item.
|
||||
///
|
||||
/// This stream is constructed by the [`once`] function.
|
||||
/// This stream is created by the [`once`] function. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`once`]: fn.once.html
|
||||
#[derive(Debug)]
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
use std::pin::Pin;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
|
||||
/// Trait to represent types that can be created by productming up a stream.
|
||||
/// Trait to represent types that can be created by multiplying the elements of a stream.
|
||||
///
|
||||
/// This trait is used to implement the [`product`] method on streams. Types which
|
||||
/// implement the trait can be generated by the [`product`] method. Like
|
||||
|
@ -16,8 +18,62 @@ use crate::stream::Stream;
|
|||
pub trait Product<A = Self>: Sized {
|
||||
/// Method which takes a stream and generates `Self` from the elements by
|
||||
/// multiplying the items.
|
||||
fn product<S, F>(stream: S) -> F
|
||||
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
|
||||
where
|
||||
S: Stream<Item = A>,
|
||||
F: Future<Output = Self>;
|
||||
S: Stream<Item = A> + 'a;
|
||||
}
|
||||
|
||||
use core::ops::Mul;
|
||||
use core::num::Wrapping;
|
||||
use crate::stream::stream::StreamExt;
|
||||
|
||||
macro_rules! integer_product {
|
||||
(@impls $one: expr, $($a:ty)*) => ($(
|
||||
impl Product for $a {
|
||||
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
|
||||
where
|
||||
S: Stream<Item = $a> + 'a,
|
||||
{
|
||||
Box::pin(async move { stream.fold($one, Mul::mul).await } )
|
||||
}
|
||||
}
|
||||
impl<'a> Product<&'a $a> for $a {
|
||||
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
|
||||
where
|
||||
S: Stream<Item = &'a $a> + 'b,
|
||||
{
|
||||
Box::pin(async move { stream.fold($one, Mul::mul).await } )
|
||||
}
|
||||
}
|
||||
)*);
|
||||
($($a:ty)*) => (
|
||||
integer_product!(@impls 1, $($a)*);
|
||||
integer_product!(@impls Wrapping(1), $(Wrapping<$a>)*);
|
||||
);
|
||||
}
|
||||
|
||||
macro_rules! float_product {
|
||||
($($a:ty)*) => ($(
|
||||
impl Product for $a {
|
||||
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
|
||||
where S: Stream<Item = $a> + 'a,
|
||||
{
|
||||
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
|
||||
}
|
||||
}
|
||||
impl<'a> Product<&'a $a> for $a {
|
||||
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'b>>
|
||||
where S: Stream<Item = &'a $a> + 'b,
|
||||
{
|
||||
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
|
||||
}
|
||||
}
|
||||
)*);
|
||||
($($a:ty)*) => (
|
||||
float_product!($($a)*);
|
||||
float_product!($(Wrapping<$a>)*);
|
||||
);
|
||||
}
|
||||
|
||||
integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
|
||||
float_product!{ f32 f64 }
|
||||
|
|
|
@ -29,7 +29,8 @@ where
|
|||
|
||||
/// A stream that yields the same item repeatedly.
|
||||
///
|
||||
/// This stream is constructed by the [`repeat`] function.
|
||||
/// This stream is created by the [`repeat`] function. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`repeat`]: fn.repeat.html
|
||||
#[derive(Debug)]
|
||||
|
|
|
@ -10,7 +10,8 @@ use crate::task::{Context, Poll};
|
|||
pin_project! {
|
||||
/// A stream that repeats elements of type `T` endlessly by applying a provided closure.
|
||||
///
|
||||
/// This stream is constructed by the [`repeat_with`] function.
|
||||
/// This stream is created by the [`repeat_with`] function. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`repeat_with`]: fn.repeat_with.html
|
||||
#[derive(Debug)]
|
||||
|
|
|
@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
|
|||
|
||||
pin_project! {
|
||||
/// Chains two streams one after another.
|
||||
///
|
||||
/// This `struct` is created by the [`chain`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`chain`]: trait.Stream.html#method.chain
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Debug)]
|
||||
pub struct Chain<S, U> {
|
||||
#[pin]
|
||||
|
|
|
@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
|
|||
|
||||
pin_project! {
|
||||
/// A stream to filter elements of another stream with a predicate.
|
||||
///
|
||||
/// This `struct` is created by the [`filter`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`filter`]: trait.Stream.html#method.filter
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Debug)]
|
||||
pub struct Filter<S, P, T> {
|
||||
#[pin]
|
||||
|
|
|
@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
|
|||
pin_project! {
|
||||
/// A `Stream` that is permanently closed once a single call to `poll` results in
|
||||
/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`.
|
||||
///
|
||||
/// This `struct` is created by the [`fuse`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`fuse`]: trait.Stream.html#method.fuse
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Fuse<S> {
|
||||
#[pin]
|
||||
|
|
|
@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
|
|||
|
||||
pin_project! {
|
||||
/// A stream that does something with each element of another stream.
|
||||
///
|
||||
/// This `struct` is created by the [`inspect`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`inspect`]: trait.Stream.html#method.inspect
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Debug)]
|
||||
pub struct Inspect<S, F, T> {
|
||||
#[pin]
|
||||
|
|
|
@ -7,9 +7,11 @@ use pin_project_lite::pin_project;
|
|||
pin_project! {
|
||||
/// A stream that merges two other streams into a single stream.
|
||||
///
|
||||
/// This stream is returned by [`Stream::merge`].
|
||||
/// This `struct` is created by the [`merge`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`Stream::merge`]: trait.Stream.html#method.merge
|
||||
/// [`merge`]: trait.Stream.html#method.merge
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[derive(Debug)]
|
||||
|
|
|
@ -93,18 +93,21 @@ use std::marker::PhantomData;
|
|||
|
||||
cfg_unstable! {
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::FromStream;
|
||||
use crate::stream::into_stream::IntoStream;
|
||||
use crate::stream::{FromStream, Product, Sum};
|
||||
|
||||
pub use merge::Merge;
|
||||
pub use flatten::Flatten;
|
||||
pub use flat_map::FlatMap;
|
||||
pub use timeout::{TimeoutError, Timeout};
|
||||
|
||||
mod merge;
|
||||
mod flatten;
|
||||
mod flat_map;
|
||||
mod timeout;
|
||||
}
|
||||
|
||||
extension_trait! {
|
||||
|
@ -1159,6 +1162,40 @@ extension_trait! {
|
|||
Skip::new(self, n)
|
||||
}
|
||||
|
||||
#[doc=r#"
|
||||
Await a stream or times out after a duration of time.
|
||||
|
||||
If you want to await an I/O future consider using
|
||||
[`io::timeout`](../io/fn.timeout.html) instead.
|
||||
|
||||
# Examples
|
||||
|
||||
```
|
||||
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
|
||||
#
|
||||
use std::time::Duration;
|
||||
|
||||
use async_std::stream;
|
||||
use async_std::prelude::*;
|
||||
|
||||
let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1));
|
||||
|
||||
while let Some(v) = s.next().await {
|
||||
assert_eq!(v, Ok(1));
|
||||
}
|
||||
#
|
||||
# Ok(()) }) }
|
||||
```
|
||||
"#]
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn timeout(self, dur: Duration) -> Timeout<Self>
|
||||
where
|
||||
Self: Stream + Sized,
|
||||
{
|
||||
Timeout::new(self, dur)
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
A combinator that applies a function as long as it returns successfully, producing a single, final value.
|
||||
Immediately returns the error when the function returns unsuccessfully.
|
||||
|
@ -1611,6 +1648,95 @@ extension_trait! {
|
|||
{
|
||||
LtFuture::new(self, other)
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
Sums the elements of an iterator.
|
||||
|
||||
Takes each element, adds them together, and returns the result.
|
||||
|
||||
An empty iterator returns the zero value of the type.
|
||||
|
||||
# Panics
|
||||
|
||||
When calling `sum()` and a primitive integer type is being returned, this
|
||||
method will panic if the computation overflows and debug assertions are
|
||||
enabled.
|
||||
|
||||
# Examples
|
||||
|
||||
Basic usage:
|
||||
|
||||
```
|
||||
# fn main() { async_std::task::block_on(async {
|
||||
#
|
||||
use std::collections::VecDeque;
|
||||
use async_std::prelude::*;
|
||||
|
||||
let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect();
|
||||
let sum: u8 = s.sum().await;
|
||||
|
||||
assert_eq!(sum, 10);
|
||||
#
|
||||
# }) }
|
||||
```
|
||||
"#]
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn sum<'a, S>(
|
||||
self,
|
||||
) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>]
|
||||
where
|
||||
Self: Sized + Stream<Item = S> + 'a,
|
||||
S: Sum,
|
||||
{
|
||||
Sum::sum(self)
|
||||
}
|
||||
|
||||
#[doc = r#"
|
||||
Iterates over the entire iterator, multiplying all the elements
|
||||
|
||||
An empty iterator returns the one value of the type.
|
||||
|
||||
# Panics
|
||||
|
||||
When calling `product()` and a primitive integer type is being returned,
|
||||
method will panic if the computation overflows and debug assertions are
|
||||
enabled.
|
||||
|
||||
# Examples
|
||||
|
||||
This example calculates the factorial of n (i.e. the product of the numbers from 1 to
|
||||
n, inclusive):
|
||||
|
||||
```
|
||||
# fn main() { async_std::task::block_on(async {
|
||||
#
|
||||
async fn factorial(n: u32) -> u32 {
|
||||
use std::collections::VecDeque;
|
||||
use async_std::prelude::*;
|
||||
|
||||
let s: VecDeque<_> = (1..=n).collect();
|
||||
s.product().await
|
||||
}
|
||||
|
||||
assert_eq!(factorial(0).await, 1);
|
||||
assert_eq!(factorial(1).await, 1);
|
||||
assert_eq!(factorial(5).await, 120);
|
||||
#
|
||||
# }) }
|
||||
```
|
||||
"#]
|
||||
#[cfg(feature = "unstable")]
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
fn product<'a, P>(
|
||||
self,
|
||||
) -> impl Future<Output = P> + 'a [Pin<Box<dyn Future<Output = P> + 'a>>]
|
||||
where
|
||||
Self: Sized + Stream<Item = P> + 'a,
|
||||
P: Product,
|
||||
{
|
||||
Product::product(self)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {
|
||||
|
|
|
@ -7,6 +7,12 @@ use crate::task::{Context, Poll};
|
|||
|
||||
pin_project! {
|
||||
/// A stream to maintain state while polling another stream.
|
||||
///
|
||||
/// This `struct` is created by the [`scan`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`scan`]: trait.Stream.html#method.scan
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Debug)]
|
||||
pub struct Scan<S, St, F> {
|
||||
#[pin]
|
||||
|
|
|
@ -7,6 +7,12 @@ use crate::stream::Stream;
|
|||
|
||||
pin_project! {
|
||||
/// A stream to skip first n elements of another stream.
|
||||
///
|
||||
/// This `struct` is created by the [`skip`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`skip`]: trait.Stream.html#method.skip
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Debug)]
|
||||
pub struct Skip<S> {
|
||||
#[pin]
|
||||
|
|
|
@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
|
|||
|
||||
pin_project! {
|
||||
/// A stream to skip elements of another stream based on a predicate.
|
||||
///
|
||||
/// This `struct` is created by the [`skip_while`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`skip_while`]: trait.Stream.html#method.skip_while
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Debug)]
|
||||
pub struct SkipWhile<S, P, T> {
|
||||
#[pin]
|
||||
|
|
|
@ -7,6 +7,12 @@ use crate::task::{Context, Poll};
|
|||
|
||||
pin_project! {
|
||||
/// A stream that steps a given amount of elements of another stream.
|
||||
///
|
||||
/// This `struct` is created by the [`step_by`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`step_by`]: trait.Stream.html#method.step_by
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Debug)]
|
||||
pub struct StepBy<S> {
|
||||
#[pin]
|
||||
|
|
|
@ -7,6 +7,12 @@ use crate::task::{Context, Poll};
|
|||
|
||||
pin_project! {
|
||||
/// A stream that yields the first `n` items of another stream.
|
||||
///
|
||||
/// This `struct` is created by the [`take`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`take`]: trait.Stream.html#method.take
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct Take<S> {
|
||||
#[pin]
|
||||
|
|
|
@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
|
|||
|
||||
pin_project! {
|
||||
/// A stream that yields elements based on a predicate.
|
||||
///
|
||||
/// This `struct` is created by the [`take_while`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`take_while`]: trait.Stream.html#method.take_while
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
#[derive(Debug)]
|
||||
pub struct TakeWhile<S, P, T> {
|
||||
#[pin]
|
||||
|
|
63
src/stream/stream/timeout.rs
Normal file
63
src/stream/stream/timeout.rs
Normal file
|
@ -0,0 +1,63 @@
|
|||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_timer::Delay;
|
||||
use pin_project_lite::pin_project;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
use crate::task::{Context, Poll};
|
||||
|
||||
pin_project! {
|
||||
/// A stream with timeout time set
|
||||
#[derive(Debug)]
|
||||
pub struct Timeout<S: Stream> {
|
||||
#[pin]
|
||||
stream: S,
|
||||
#[pin]
|
||||
delay: Delay,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Stream> Timeout<S> {
|
||||
pub fn new(stream: S, dur: Duration) -> Timeout<S> {
|
||||
let delay = Delay::new(dur);
|
||||
|
||||
Timeout { stream, delay }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: Stream> Stream for Timeout<S> {
|
||||
type Item = Result<S::Item, TimeoutError>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
|
||||
match this.stream.poll_next(cx) {
|
||||
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => match this.delay.poll(cx) {
|
||||
Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError { _private: () }))),
|
||||
Poll::Pending => Poll::Pending,
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An error returned when a stream times out.
|
||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||
#[cfg(any(feature = "unstable", feature = "docs"))]
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
pub struct TimeoutError {
|
||||
_private: (),
|
||||
}
|
||||
|
||||
impl Error for TimeoutError {}
|
||||
|
||||
impl fmt::Display for TimeoutError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
"stream has timed out".fmt(f)
|
||||
}
|
||||
}
|
|
@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
|
|||
|
||||
pin_project! {
|
||||
/// An iterator that iterates two other iterators simultaneously.
|
||||
///
|
||||
/// This `struct` is created by the [`zip`] method on [`Stream`]. See its
|
||||
/// documentation for more.
|
||||
///
|
||||
/// [`zip`]: trait.Stream.html#method.zip
|
||||
/// [`Stream`]: trait.Stream.html
|
||||
pub struct Zip<A: Stream, B> {
|
||||
item_slot: Option<A::Item>,
|
||||
#[pin]
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
use std::pin::Pin;
|
||||
|
||||
use crate::future::Future;
|
||||
use crate::stream::Stream;
|
||||
|
||||
|
@ -16,8 +18,62 @@ use crate::stream::Stream;
|
|||
pub trait Sum<A = Self>: Sized {
|
||||
/// Method which takes a stream and generates `Self` from the elements by
|
||||
/// "summing up" the items.
|
||||
fn sum<S, F>(stream: S) -> F
|
||||
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
|
||||
where
|
||||
S: Stream<Item = A>,
|
||||
F: Future<Output = Self>;
|
||||
S: Stream<Item = A> + 'a;
|
||||
}
|
||||
|
||||
use core::ops::Add;
|
||||
use core::num::Wrapping;
|
||||
use crate::stream::stream::StreamExt;
|
||||
|
||||
macro_rules! integer_sum {
|
||||
(@impls $zero: expr, $($a:ty)*) => ($(
|
||||
impl Sum for $a {
|
||||
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
|
||||
where
|
||||
S: Stream<Item = $a> + 'a,
|
||||
{
|
||||
Box::pin(async move { stream.fold($zero, Add::add).await } )
|
||||
}
|
||||
}
|
||||
impl<'a> Sum<&'a $a> for $a {
|
||||
fn sum<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
|
||||
where
|
||||
S: Stream<Item = &'a $a> + 'b,
|
||||
{
|
||||
Box::pin(async move { stream.fold($zero, Add::add).await } )
|
||||
}
|
||||
}
|
||||
)*);
|
||||
($($a:ty)*) => (
|
||||
integer_sum!(@impls 0, $($a)*);
|
||||
integer_sum!(@impls Wrapping(0), $(Wrapping<$a>)*);
|
||||
);
|
||||
}
|
||||
|
||||
macro_rules! float_sum {
|
||||
($($a:ty)*) => ($(
|
||||
impl Sum for $a {
|
||||
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
|
||||
where S: Stream<Item = $a> + 'a,
|
||||
{
|
||||
Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } )
|
||||
}
|
||||
}
|
||||
impl<'a> Sum<&'a $a> for $a {
|
||||
fn sum<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
|
||||
where S: Stream<Item = &'a $a> + 'b,
|
||||
{
|
||||
Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } )
|
||||
}
|
||||
}
|
||||
)*);
|
||||
($($a:ty)*) => (
|
||||
float_sum!(@impls 0.0, $($a)*);
|
||||
float_sum!(@impls Wrapping(0.0), $(Wrapping<$a>)*);
|
||||
);
|
||||
}
|
||||
|
||||
integer_sum!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
|
||||
float_sum!{ f32 f64 }
|
||||
|
|
|
@ -154,6 +154,7 @@ where
|
|||
|
||||
fn vtable() -> &'static RawWakerVTable {
|
||||
unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
|
||||
#![allow(clippy::redundant_clone)]
|
||||
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
|
||||
mem::forget(arc.clone());
|
||||
RawWaker::new(ptr, vtable())
|
||||
|
|
|
@ -167,6 +167,7 @@ impl Tag {
|
|||
}
|
||||
|
||||
pub fn task(&self) -> &Task {
|
||||
#[allow(clippy::transmute_ptr_to_ptr)]
|
||||
unsafe {
|
||||
let raw = self.raw_metadata.load(Ordering::Acquire);
|
||||
|
||||
|
@ -189,6 +190,7 @@ impl Tag {
|
|||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::transmute_ptr_to_ptr)]
|
||||
mem::transmute::<&AtomicUsize, &Option<Task>>(&self.raw_metadata)
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
|
|
|
@ -4,6 +4,7 @@ use async_std::task;
|
|||
|
||||
#[test]
|
||||
fn test_buffered_writer() {
|
||||
#![allow(clippy::cognitive_complexity)]
|
||||
task::block_on(async {
|
||||
let inner = Vec::new();
|
||||
let mut writer = BufWriter::with_capacity(2, inner);
|
||||
|
|
|
@ -37,6 +37,7 @@ fn capacity() {
|
|||
|
||||
#[test]
|
||||
fn len_empty_full() {
|
||||
#![allow(clippy::cognitive_complexity)]
|
||||
task::block_on(async {
|
||||
let (s, r) = channel(2);
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ use futures::channel::mpsc;
|
|||
/// Generates a random number in `0..n`.
|
||||
pub fn random(n: u32) -> u32 {
|
||||
thread_local! {
|
||||
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1406868647));
|
||||
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1_406_868_647));
|
||||
}
|
||||
|
||||
RNG.with(|rng| {
|
||||
|
|
Loading…
Reference in a new issue