Merge branch 'master' into add_stdin_lock

poc-serde-support
k-nasa 5 years ago
commit 35cb11e398

@ -7,12 +7,13 @@ on:
- staging - staging
- trying - trying
env:
RUSTFLAGS: -Dwarnings
jobs: jobs:
build_and_test: build_and_test:
name: Build and test name: Build and test
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
env:
RUSTFLAGS: -Dwarnings
strategy: strategy:
matrix: matrix:
os: [ubuntu-latest, windows-latest, macOS-latest] os: [ubuntu-latest, windows-latest, macOS-latest]
@ -48,8 +49,6 @@ jobs:
check_fmt_and_docs: check_fmt_and_docs:
name: Checking fmt and docs name: Checking fmt and docs
runs-on: ubuntu-latest runs-on: ubuntu-latest
env:
RUSTFLAGS: -Dwarnings
steps: steps:
- uses: actions/checkout@master - uses: actions/checkout@master
@ -81,20 +80,11 @@ jobs:
clippy_check: clippy_check:
name: Clippy check name: Clippy check
runs-on: ubuntu-latest runs-on: ubuntu-latest
# TODO: There is a lot of warnings
# env:
# RUSTFLAGS: -Dwarnings
steps: steps:
- uses: actions/checkout@v1 - uses: actions/checkout@v1
- id: component - name: Install rust
uses: actions-rs/components-nightly@v1 run: rustup update beta && rustup default beta
with: - name: Install clippy
component: clippy run: rustup component add clippy
- uses: actions-rs/toolchain@v1 - name: clippy
with: run: cargo clippy --all --features unstable
toolchain: ${{ steps.component.outputs.toolchain }}
override: true
- run: rustup component add clippy
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}

@ -9,7 +9,7 @@ authors = [
edition = "2018" edition = "2018"
license = "Apache-2.0/MIT" license = "Apache-2.0/MIT"
repository = "https://github.com/async-rs/async-std" repository = "https://github.com/async-rs/async-std"
homepage = "https://github.com/async-rs/async-std" homepage = "https://async.rs"
documentation = "https://docs.rs/async-std" documentation = "https://docs.rs/async-std"
description = "Async version of the Rust standard library" description = "Async version of the Rust standard library"
keywords = ["async", "await", "future", "std", "task"] keywords = ["async", "await", "future", "std", "task"]

@ -24,11 +24,7 @@ To sum up: Rust gives us the ability to safely abstract over important propertie
## An easy view of computation ## An easy view of computation
While computation is a subject to write a whole [book](https://computationbook.com/) about, a very simplified view suffices for us: While computation is a subject to write a whole [book](https://computationbook.com/) about, a very simplified view suffices for us: A sequence of composable operations which can branch based on a decision, run to succession and yield a result or yield an error
- computation is a sequence of composable operations
- they can branch based on a decision
- they either run to succession and yield a result, or they can yield an error
## Deferring computation ## Deferring computation
@ -136,11 +132,11 @@ When executing 2 or more of these functions at the same time, our runtime system
## Conclusion ## Conclusion
Working from values, we searched for something that expresses *working towards a value available sometime later*. From there, we talked about the concept of polling. Working from values, we searched for something that expresses *working towards a value available later*. From there, we talked about the concept of polling.
A `Future` is any data type that does not represent a value, but the ability to *produce a value at some point in the future*. Implementations of this are very varied and detailed depending on use-case, but the interface is simple. A `Future` is any data type that does not represent a value, but the ability to *produce a value at some point in the future*. Implementations of this are very varied and detailed depending on use-case, but the interface is simple.
Next, we will introduce you to `tasks`, which we need to actually *run* Futures. Next, we will introduce you to `tasks`, which we will use to actually *run* Futures.
[^1]: Two parties reading while it is guaranteed that no one is writing is always safe. [^1]: Two parties reading while it is guaranteed that no one is writing is always safe.

@ -80,7 +80,7 @@ Tasks in `async_std` are one of the core abstractions. Much like Rust's `thread`
## Blocking ## Blocking
`Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and by itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this: `Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and of itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this:
```rust,edition2018 ```rust,edition2018
# extern crate async_std; # extern crate async_std;

@ -4,4 +4,4 @@
`async-std` provides an interface to all important primitives: filesystem operations, network operations and concurrency basics like timers. It also exposes a `task` in a model similar to the `thread` module found in the Rust standard lib. But it does not only include I/O primitives, but also `async/await` compatible versions of primitives like `Mutex`. `async-std` provides an interface to all important primitives: filesystem operations, network operations and concurrency basics like timers. It also exposes a `task` in a model similar to the `thread` module found in the Rust standard lib. But it does not only include I/O primitives, but also `async/await` compatible versions of primitives like `Mutex`.
[organization]: https://github.com/async-rs/async-std [organization]: https://github.com/async-rs

@ -31,7 +31,7 @@ In general, this crate will be conservative with respect to the minimum supporte
## Security fixes ## Security fixes
Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give at least _3 month_ of ahead notice. Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give a notice at least _3 months_ ahead.
## Credits ## Credits

@ -8,6 +8,6 @@ fn main() -> Result<()> {
match (args.nth(1).as_ref().map(String::as_str), args.next()) { match (args.nth(1).as_ref().map(String::as_str), args.next()) {
(Some("client"), None) => client::main(), (Some("client"), None) => client::main(),
(Some("server"), None) => server::main(), (Some("server"), None) => server::main(),
_ => Err("Usage: a-chat [client|server]")?, _ => Err("Usage: a-chat [client|server]".into()),
} }
} }

@ -45,7 +45,7 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result
let mut lines = reader.lines(); let mut lines = reader.lines();
let name = match lines.next().await { let name = match lines.next().await {
None => Err("peer disconnected immediately")?, None => return Err("peer disconnected immediately".into()),
Some(line) => line?, Some(line) => line?,
}; };
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>(); let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();

@ -66,6 +66,23 @@ pub struct File {
} }
impl File { impl File {
/// Creates an async file handle.
pub(crate) fn new(file: std::fs::File, is_flushed: bool) -> File {
let file = Arc::new(file);
File {
file: file.clone(),
lock: Lock::new(State {
file,
mode: Mode::Idle,
cache: Vec::new(),
is_flushed,
last_read_err: None,
last_write_err: None,
}),
}
}
/// Opens a file in read-only mode. /// Opens a file in read-only mode.
/// ///
/// See the [`OpenOptions::open`] function for more options. /// See the [`OpenOptions::open`] function for more options.
@ -96,7 +113,7 @@ impl File {
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> { pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let file = blocking::spawn(move || std::fs::File::open(&path)).await?; let file = blocking::spawn(move || std::fs::File::open(&path)).await?;
Ok(file.into()) Ok(File::new(file, true))
} }
/// Opens a file in write-only mode. /// Opens a file in write-only mode.
@ -131,7 +148,7 @@ impl File {
pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> { pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let file = blocking::spawn(move || std::fs::File::create(&path)).await?; let file = blocking::spawn(move || std::fs::File::create(&path)).await?;
Ok(file.into()) Ok(File::new(file, true))
} }
/// Synchronizes OS-internal buffered contents and metadata to disk. /// Synchronizes OS-internal buffered contents and metadata to disk.
@ -383,19 +400,7 @@ impl Seek for &File {
impl From<std::fs::File> for File { impl From<std::fs::File> for File {
fn from(file: std::fs::File) -> File { fn from(file: std::fs::File) -> File {
let file = Arc::new(file); File::new(file, false)
File {
file: file.clone(),
lock: Lock::new(State {
file,
mode: Mode::Idle,
cache: Vec::new(),
is_flushed: false,
last_read_err: None,
last_write_err: None,
}),
}
} }
} }

@ -284,7 +284,10 @@ impl OpenOptions {
pub fn open<P: AsRef<Path>>(&self, path: P) -> impl Future<Output = io::Result<File>> { pub fn open<P: AsRef<Path>>(&self, path: P) -> impl Future<Output = io::Result<File>> {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
let options = self.0.clone(); let options = self.0.clone();
async move { blocking::spawn(move || options.open(path).map(|f| f.into())).await } async move {
let file = blocking::spawn(move || options.open(path)).await?;
Ok(File::new(file, true))
}
} }
} }

@ -1,3 +1,11 @@
cfg_unstable! {
mod delay;
use std::time::Duration;
use delay::DelayFuture;
}
extension_trait! { extension_trait! {
use std::pin::Pin; use std::pin::Pin;
use std::ops::{Deref, DerefMut}; use std::ops::{Deref, DerefMut};
@ -99,6 +107,28 @@ extension_trait! {
} }
pub trait FutureExt: std::future::Future { pub trait FutureExt: std::future::Future {
/// Returns a Future that delays execution for a specified time.
///
/// # Examples
///
/// ```
/// # async_std::task::block_on(async {
/// use async_std::prelude::*;
/// use async_std::future;
/// use std::time::Duration;
///
/// let a = future::ready(1).delay(Duration::from_millis(2000));
/// dbg!(a.await);
/// # })
/// ```
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
fn delay(self, dur: Duration) -> impl Future<Output = Self::Output> [DelayFuture<Self>]
where
Self: Future + Sized
{
DelayFuture::new(self, dur)
}
} }
impl<F: Future + Unpin + ?Sized> Future for Box<F> { impl<F: Future + Unpin + ?Sized> Future for Box<F> {

@ -0,0 +1,43 @@
use std::pin::Pin;
use std::time::Duration;
use futures_timer::Delay;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::task::{Context, Poll};
pin_project! {
#[doc(hidden)]
#[derive(Debug)]
pub struct DelayFuture<F> {
#[pin]
future: F,
#[pin]
delay: Delay,
}
}
impl<F> DelayFuture<F> {
pub fn new(future: F, dur: Duration) -> DelayFuture<F> {
let delay = Delay::new(dur);
DelayFuture { future, delay }
}
}
impl<F: Future> Future for DelayFuture<F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.delay.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(_) => match this.future.poll(cx) {
Poll::Ready(v) => Poll::Ready(v),
Poll::Pending => Poll::Pending,
},
}
}
}

@ -28,9 +28,10 @@ pub fn empty() -> Empty {
/// A reader that contains no data. /// A reader that contains no data.
/// ///
/// This reader is constructed by the [`sink`] function. /// This reader is created by the [`empty`] function. See its
/// documentation for more.
/// ///
/// [`sink`]: fn.sink.html /// [`empty`]: fn.empty.html
pub struct Empty { pub struct Empty {
_private: (), _private: (),
} }

@ -29,7 +29,8 @@ pub fn repeat(byte: u8) -> Repeat {
/// A reader which yields one byte over and over and over and over and over and... /// A reader which yields one byte over and over and over and over and over and...
/// ///
/// This reader is constructed by the [`repeat`] function. /// This reader is created by the [`repeat`] function. See its
/// documentation for more.
/// ///
/// [`repeat`]: fn.repeat.html /// [`repeat`]: fn.repeat.html
pub struct Repeat { pub struct Repeat {

@ -25,7 +25,8 @@ pub fn sink() -> Sink {
/// A writer that consumes and drops all data. /// A writer that consumes and drops all data.
/// ///
/// This writer is constructed by the [`sink`] function. /// This writer is constructed by the [`sink`] function. See its documentation
/// for more.
/// ///
/// [`sink`]: fn.sink.html /// [`sink`]: fn.sink.html
pub struct Sink { pub struct Sink {

@ -12,6 +12,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll};
/// ///
/// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html /// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html
/// ///
/// ### Note: Windows Portability Consideration
///
/// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
///
/// # Examples /// # Examples
/// ///
/// ```no_run /// ```no_run
@ -35,12 +41,16 @@ pub fn stderr() -> Stderr {
/// A handle to the standard error of the current process. /// A handle to the standard error of the current process.
/// ///
/// Created by the [`stderr`] function. /// This writer is created by the [`stderr`] function. See its documentation for
/// more.
///
/// ### Note: Windows Portability Consideration
/// ///
/// This type is an async version of [`std::io::Stderr`]. /// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
/// ///
/// [`stderr`]: fn.stderr.html /// [`stderr`]: fn.stderr.html
/// [`std::io::Stderr`]: https://doc.rust-lang.org/std/io/struct.Stderr.html
#[derive(Debug)] #[derive(Debug)]
pub struct Stderr(Mutex<State>); pub struct Stderr(Mutex<State>);

@ -12,6 +12,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll};
/// ///
/// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html /// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html
/// ///
/// ### Note: Windows Portability Consideration
///
/// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
///
/// # Examples /// # Examples
/// ///
/// ```no_run /// ```no_run
@ -36,12 +42,16 @@ pub fn stdin() -> Stdin {
/// A handle to the standard input of the current process. /// A handle to the standard input of the current process.
/// ///
/// Created by the [`stdin`] function. /// This reader is created by the [`stdin`] function. See its documentation for
/// more.
///
/// ### Note: Windows Portability Consideration
/// ///
/// This type is an async version of [`std::io::Stdin`]. /// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
/// ///
/// [`stdin`]: fn.stdin.html /// [`stdin`]: fn.stdin.html
/// [`std::io::Stdin`]: https://doc.rust-lang.org/std/io/struct.Stdin.html
#[derive(Debug)] #[derive(Debug)]
pub struct Stdin(Mutex<State>); pub struct Stdin(Mutex<State>);

@ -12,6 +12,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll};
/// ///
/// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html /// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html
/// ///
/// ### Note: Windows Portability Consideration
///
/// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
///
/// # Examples /// # Examples
/// ///
/// ```no_run /// ```no_run
@ -35,12 +41,16 @@ pub fn stdout() -> Stdout {
/// A handle to the standard output of the current process. /// A handle to the standard output of the current process.
/// ///
/// Created by the [`stdout`] function. /// This writer is created by the [`stdout`] function. See its documentation
/// for more.
///
/// ### Note: Windows Portability Consideration
/// ///
/// This type is an async version of [`std::io::Stdout`]. /// When operating in a console, the Windows implementation of this stream does not support
/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return
/// an error.
/// ///
/// [`stdout`]: fn.stdout.html /// [`stdout`]: fn.stdout.html
/// [`std::io::Stdout`]: https://doc.rust-lang.org/std/io/struct.Stdout.html
#[derive(Debug)] #[derive(Debug)]
pub struct Stdout(Mutex<State>); pub struct Stdout(Mutex<State>);

@ -43,10 +43,11 @@
#![cfg_attr(feature = "docs", feature(doc_cfg))] #![cfg_attr(feature = "docs", feature(doc_cfg))]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![allow(clippy::mutex_atomic, clippy::module_inception)]
#![doc(test(attr(deny(rust_2018_idioms, warnings))))] #![doc(test(attr(deny(rust_2018_idioms, warnings))))]
#![doc(test(attr(allow(unused_extern_crates, unused_variables))))] #![doc(test(attr(allow(unused_extern_crates, unused_variables))))]
#![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")]
#![recursion_limit = "1024"] #![recursion_limit = "2048"]
#[macro_use] #[macro_use]
mod utils; mod utils;

@ -7,3 +7,8 @@ mod from_stream;
#[doc(inline)] #[doc(inline)]
pub use std::option::Option; pub use std::option::Option;
cfg_unstable! {
mod product;
mod sum;
}

@ -0,0 +1,66 @@
use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{Stream, Product};
impl<T, U> Product<Option<U>> for Option<T>
where
T: Product<U>,
{
#[doc = r#"
Takes each element in the `Stream`: if it is a `None`, no further
elements are taken, and the `None` is returned. Should no `None` occur,
the product of all elements is returned.
# Examples
This multiplies every integer in a vector, rejecting the product if a negative element is
encountered:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
let prod: Option<i32> = v.map(|x|
if x < 0 {
None
} else {
Some(x)
}).product().await;
assert_eq!(prod, Some(8));
#
# }) }
```
"#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_none = false;
let out = <T as Product<U>>::product(stream
.scan((), |_, elem| {
match elem {
Some(elem) => Some(elem),
None => {
found_none = true;
// Stop processing the stream on error
None
}
}
})).await;
if found_none {
None
} else {
Some(out)
}
})
}
}

@ -0,0 +1,63 @@
use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{Stream, Sum};
impl<T, U> Sum<Option<U>> for Option<T>
where
T: Sum<U>,
{
#[doc = r#"
Takes each element in the `Iterator`: if it is a `None`, no further
elements are taken, and the `None` is returned. Should no `None` occur,
the sum of all elements is returned.
# Examples
This sums up the position of the character 'a' in a vector of strings,
if a word did not have the character 'a' the operation returns `None`:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let words: VecDeque<_> = vec!["have", "a", "great", "day"]
.into_iter()
.collect();
let total: Option<usize> = words.map(|w| w.find('a')).sum().await;
assert_eq!(total, Some(5));
#
# }) }
```
"#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Option<T>> + 'a>>
where S: Stream<Item = Option<U>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_none = false;
let out = <T as Sum<U>>::sum(stream
.scan((), |_, elem| {
match elem {
Some(elem) => Some(elem),
None => {
found_none = true;
// Stop processing the stream on error
None
}
}
})).await;
if found_none {
None
} else {
Some(out)
}
})
}
}

@ -1,6 +1,12 @@
use std::ffi::{OsStr, OsString}; use std::ffi::{OsStr, OsString};
#[cfg(feature = "unstable")]
use std::pin::Pin;
use crate::path::Path; use crate::path::Path;
#[cfg(feature = "unstable")]
use crate::prelude::*;
#[cfg(feature = "unstable")]
use crate::stream::{Extend, FromStream, IntoStream};
/// This struct is an async version of [`std::path::PathBuf`]. /// This struct is an async version of [`std::path::PathBuf`].
/// ///
@ -233,3 +239,44 @@ impl AsRef<std::path::Path> for PathBuf {
self.inner.as_ref() self.inner.as_ref()
} }
} }
#[cfg(feature = "unstable")]
impl<P: AsRef<Path>> Extend<P> for PathBuf {
fn stream_extend<'a, S: IntoStream<Item = P>>(
&'a mut self,
stream: S,
) -> Pin<Box<dyn Future<Output = ()> + 'a>>
where
P: 'a,
<S as IntoStream>::IntoStream: 'a,
{
let stream = stream.into_stream();
//TODO: This can be added back in once this issue is resolved:
// https://github.com/rust-lang/rust/issues/58234
//self.reserve(stream.size_hint().0);
Box::pin(stream.for_each(move |item| self.push(item.as_ref())))
}
}
#[cfg(feature = "unstable")]
impl<'b, P: AsRef<Path> + 'b> FromStream<P> for PathBuf {
#[inline]
fn from_stream<'a, S: IntoStream<Item = P>>(
stream: S,
) -> Pin<Box<dyn core::future::Future<Output = Self> + 'a>>
where
<S as IntoStream>::IntoStream: 'a,
{
let stream = stream.into_stream();
Box::pin(async move {
pin_utils::pin_mut!(stream);
let mut out = Self::new();
out.stream_extend(stream).await;
out
})
}
}

@ -7,3 +7,8 @@ mod from_stream;
#[doc(inline)] #[doc(inline)]
pub use std::result::Result; pub use std::result::Result;
cfg_unstable! {
mod product;
mod sum;
}

@ -0,0 +1,64 @@
use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{Stream, Product};
impl<T, U, E> Product<Result<U, E>> for Result<T, E>
where
T: Product<U>,
{
#[doc = r#"
Takes each element in the `Stream`: if it is an `Err`, no further
elements are taken, and the `Err` is returned. Should no `Err` occur,
the product of all elements is returned.
# Examples
This multiplies every integer in a vector, rejecting the product if a negative element is
encountered:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect();
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
}).product().await;
assert_eq!(res, Ok(8));
#
# }) }
```
"#]
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_error = None;
let out = <T as Product<U>>::product(stream
.scan((), |_, elem| {
match elem {
Ok(elem) => Some(elem),
Err(err) => {
found_error = Some(err);
// Stop processing the stream on error
None
}
}
})).await;
match found_error {
Some(err) => Err(err),
None => Ok(out)
}
})
}
}

@ -0,0 +1,64 @@
use std::pin::Pin;
use crate::prelude::*;
use crate::stream::{Stream, Sum};
impl<T, U, E> Sum<Result<U, E>> for Result<T, E>
where
T: Sum<U>,
{
#[doc = r#"
Takes each element in the `Stream`: if it is an `Err`, no further
elements are taken, and the `Err` is returned. Should no `Err` occur,
the sum of all elements is returned.
# Examples
This sums up every integer in a vector, rejecting the sum if a negative
element is encountered:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let v: VecDeque<_> = vec![1, 2].into_iter().collect();
let res: Result<i32, &'static str> = v.map(|x|
if x < 0 {
Err("Negative element found")
} else {
Ok(x)
}).sum().await;
assert_eq!(res, Ok(3));
#
# }) }
```
"#]
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>
where S: Stream<Item = Result<U, E>> + 'a
{
Box::pin(async move {
pin_utils::pin_mut!(stream);
// Using `scan` here because it is able to stop the stream early
// if a failure occurs
let mut found_error = None;
let out = <T as Sum<U>>::sum(stream
.scan((), |_, elem| {
match elem {
Ok(elem) => Some(elem),
Err(err) => {
found_error = Some(err);
// Stop processing the stream on error
None
}
}
})).await;
match found_error {
Some(err) => Err(err),
None => Ok(out)
}
})
}
}

@ -6,6 +6,11 @@ use crate::task::{Context, Poll};
/// Creates a stream that doesn't yield any items. /// Creates a stream that doesn't yield any items.
/// ///
/// This `struct` is created by the [`empty`] function. See its
/// documentation for more.
///
/// [`empty`]: fn.empty.html
///
/// # Examples /// # Examples
/// ///
/// ``` /// ```

@ -75,6 +75,7 @@ pub use crate::stream::Stream;
/// assert_eq!(5, counter.len()); /// assert_eq!(5, counter.len());
/// # }); /// # });
/// ``` /// ```
#[allow(clippy::len_without_is_empty)] // ExactSizeIterator::is_empty is unstable
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait ExactSizeStream: Stream { pub trait ExactSizeStream: Stream {

@ -10,7 +10,8 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream that yields elements by calling a closure. /// A stream that yields elements by calling a closure.
/// ///
/// This stream is constructed by [`from_fn`] function. /// This stream is created by the [`from_fn`] function. See its
/// documentation for more.
/// ///
/// [`from_fn`]: fn.from_fn.html /// [`from_fn`]: fn.from_fn.html
#[derive(Debug)] #[derive(Debug)]

@ -52,6 +52,10 @@ pub fn interval(dur: Duration) -> Interval {
/// A stream representing notifications at fixed interval /// A stream representing notifications at fixed interval
/// ///
/// This stream is created by the [`interval`] function. See its
/// documentation for more.
///
/// [`interval`]: fn.interval.html
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)] #[derive(Debug)]
@ -111,6 +115,7 @@ fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::next_interval; use super::next_interval;
use std::cmp::Ordering;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
struct Timeline(Instant); struct Timeline(Instant);
@ -134,12 +139,10 @@ mod test {
// The math around Instant/Duration isn't 100% precise due to rounding // The math around Instant/Duration isn't 100% precise due to rounding
// errors, see #249 for more info // errors, see #249 for more info
fn almost_eq(a: Instant, b: Instant) -> bool { fn almost_eq(a: Instant, b: Instant) -> bool {
if a == b { match a.cmp(&b) {
true Ordering::Equal => true,
} else if a > b { Ordering::Greater => a - b < Duration::from_millis(1),
a - b < Duration::from_millis(1) Ordering::Less => b - a < Duration::from_millis(1),
} else {
b - a < Duration::from_millis(1)
} }
} }

@ -29,7 +29,8 @@ pub fn once<T>(t: T) -> Once<T> {
pin_project! { pin_project! {
/// A stream that yields a single item. /// A stream that yields a single item.
/// ///
/// This stream is constructed by the [`once`] function. /// This stream is created by the [`once`] function. See its
/// documentation for more.
/// ///
/// [`once`]: fn.once.html /// [`once`]: fn.once.html
#[derive(Debug)] #[derive(Debug)]

@ -1,7 +1,9 @@
use std::pin::Pin;
use crate::future::Future; use crate::future::Future;
use crate::stream::Stream; use crate::stream::Stream;
/// Trait to represent types that can be created by productming up a stream. /// Trait to represent types that can be created by multiplying the elements of a stream.
/// ///
/// This trait is used to implement the [`product`] method on streams. Types which /// This trait is used to implement the [`product`] method on streams. Types which
/// implement the trait can be generated by the [`product`] method. Like /// implement the trait can be generated by the [`product`] method. Like
@ -16,8 +18,62 @@ use crate::stream::Stream;
pub trait Product<A = Self>: Sized { pub trait Product<A = Self>: Sized {
/// Method which takes a stream and generates `Self` from the elements by /// Method which takes a stream and generates `Self` from the elements by
/// multiplying the items. /// multiplying the items.
fn product<S, F>(stream: S) -> F fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
where where
S: Stream<Item = A>, S: Stream<Item = A> + 'a;
F: Future<Output = Self>; }
use core::ops::Mul;
use core::num::Wrapping;
use crate::stream::stream::StreamExt;
macro_rules! integer_product {
(@impls $one: expr, $($a:ty)*) => ($(
impl Product for $a {
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
where
S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold($one, Mul::mul).await } )
}
}
impl<'a> Product<&'a $a> for $a {
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
where
S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold($one, Mul::mul).await } )
}
}
)*);
($($a:ty)*) => (
integer_product!(@impls 1, $($a)*);
integer_product!(@impls Wrapping(1), $(Wrapping<$a>)*);
);
} }
macro_rules! float_product {
($($a:ty)*) => ($(
impl Product for $a {
fn product<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
where S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
}
}
impl<'a> Product<&'a $a> for $a {
fn product<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'b>>
where S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } )
}
}
)*);
($($a:ty)*) => (
float_product!($($a)*);
float_product!($(Wrapping<$a>)*);
);
}
integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
float_product!{ f32 f64 }

@ -29,7 +29,8 @@ where
/// A stream that yields the same item repeatedly. /// A stream that yields the same item repeatedly.
/// ///
/// This stream is constructed by the [`repeat`] function. /// This stream is created by the [`repeat`] function. See its
/// documentation for more.
/// ///
/// [`repeat`]: fn.repeat.html /// [`repeat`]: fn.repeat.html
#[derive(Debug)] #[derive(Debug)]

@ -10,7 +10,8 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream that repeats elements of type `T` endlessly by applying a provided closure. /// A stream that repeats elements of type `T` endlessly by applying a provided closure.
/// ///
/// This stream is constructed by the [`repeat_with`] function. /// This stream is created by the [`repeat_with`] function. See its
/// documentation for more.
/// ///
/// [`repeat_with`]: fn.repeat_with.html /// [`repeat_with`]: fn.repeat_with.html
#[derive(Debug)] #[derive(Debug)]

@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// Chains two streams one after another. /// Chains two streams one after another.
///
/// This `struct` is created by the [`chain`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`chain`]: trait.Stream.html#method.chain
/// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct Chain<S, U> { pub struct Chain<S, U> {
#[pin] #[pin]

@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream to filter elements of another stream with a predicate. /// A stream to filter elements of another stream with a predicate.
///
/// This `struct` is created by the [`filter`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`filter`]: trait.Stream.html#method.filter
/// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct Filter<S, P, T> { pub struct Filter<S, P, T> {
#[pin] #[pin]

@ -0,0 +1,62 @@
use pin_project_lite::pin_project;
use std::pin::Pin;
use crate::prelude::*;
use crate::stream::stream::map::Map;
use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll};
pin_project! {
/// This `struct` is created by the [`flat_map`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flat_map`]: trait.Stream.html#method.flat_map
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct FlatMap<S, U, T, F> {
#[pin]
stream: Map<S, F, T, U>,
#[pin]
inner_stream: Option<U>,
}
}
impl<S, U, F> FlatMap<S, U, S::Item, F>
where
S: Stream,
U: IntoStream,
F: FnMut(S::Item) -> U,
{
pub(super) fn new(stream: S, f: F) -> FlatMap<S, U, S::Item, F> {
FlatMap {
stream: stream.map(f),
inner_stream: None,
}
}
}
impl<S, U, F> Stream for FlatMap<S, U, S::Item, F>
where
S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: Stream,
F: FnMut(S::Item) -> U,
{
type Item = U::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
}
}
match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
}
}
}
}

@ -0,0 +1,58 @@
use pin_project_lite::pin_project;
use std::pin::Pin;
use crate::stream::{IntoStream, Stream};
use crate::task::{Context, Poll};
pin_project! {
/// This `struct` is created by the [`flatten`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`flatten`]: trait.Stream.html#method.flatten
/// [`Stream`]: trait.Stream.html
#[allow(missing_debug_implementations)]
pub struct Flatten<S, U> {
#[pin]
stream: S,
#[pin]
inner_stream: Option<U>,
}
}
impl<S> Flatten<S, S::Item>
where
S: Stream,
S::Item: IntoStream,
{
pub(super) fn new(stream: S) -> Flatten<S, S::Item> {
Flatten {
stream,
inner_stream: None,
}
}
}
impl<S, U> Stream for Flatten<S, <S::Item as IntoStream>::IntoStream>
where
S: Stream,
S::Item: IntoStream<IntoStream = U, Item = U::Item>,
U: Stream,
{
type Item = U::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
if let item @ Some(_) = futures_core::ready!(inner.poll_next(cx)) {
return Poll::Ready(item);
}
}
match futures_core::ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(inner) => this.inner_stream.set(Some(inner.into_stream())),
}
}
}
}

@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A `Stream` that is permanently closed once a single call to `poll` results in /// A `Stream` that is permanently closed once a single call to `poll` results in
/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`. /// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`.
///
/// This `struct` is created by the [`fuse`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`fuse`]: trait.Stream.html#method.fuse
/// [`Stream`]: trait.Stream.html
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Fuse<S> { pub struct Fuse<S> {
#[pin] #[pin]

@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream that does something with each element of another stream. /// A stream that does something with each element of another stream.
///
/// This `struct` is created by the [`inspect`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`inspect`]: trait.Stream.html#method.inspect
/// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct Inspect<S, F, T> { pub struct Inspect<S, F, T> {
#[pin] #[pin]

@ -7,9 +7,11 @@ use pin_project_lite::pin_project;
pin_project! { pin_project! {
/// A stream that merges two other streams into a single stream. /// A stream that merges two other streams into a single stream.
/// ///
/// This stream is returned by [`Stream::merge`]. /// This `struct` is created by the [`merge`] method on [`Stream`]. See its
/// documentation for more.
/// ///
/// [`Stream::merge`]: trait.Stream.html#method.merge /// [`merge`]: trait.Stream.html#method.merge
/// [`Stream`]: trait.Stream.html
#[cfg(feature = "unstable")] #[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)] #[derive(Debug)]

@ -93,13 +93,21 @@ use std::marker::PhantomData;
cfg_unstable! { cfg_unstable! {
use std::pin::Pin; use std::pin::Pin;
use std::time::Duration;
use crate::future::Future; use crate::future::Future;
use crate::stream::FromStream; use crate::stream::into_stream::IntoStream;
use crate::stream::{FromStream, Product, Sum};
pub use merge::Merge; pub use merge::Merge;
pub use flatten::Flatten;
pub use flat_map::FlatMap;
pub use timeout::{TimeoutError, Timeout};
mod merge; mod merge;
mod flatten;
mod flat_map;
mod timeout;
} }
extension_trait! { extension_trait! {
@ -559,6 +567,76 @@ extension_trait! {
Filter::new(self, predicate) Filter::new(self, predicate)
} }
#[doc= r#"
Creates an stream that works like map, but flattens nested structure.
# Examples
Basic usage:
```
# async_std::task::block_on(async {
use std::collections::VecDeque;
use async_std::prelude::*;
use async_std::stream::IntoStream;
let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();
let v :Vec<_> = s.flat_map(|s| s.into_stream()).collect().await;
assert_eq!(v, vec![1,2,3,4,5,6]);
# });
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, Self::Item, F>
where
Self: Sized,
U: IntoStream,
F: FnMut(Self::Item) -> U,
{
FlatMap::new(self, f)
}
#[doc = r#"
Creates an stream that flattens nested structure.
# Examples
Basic usage:
```
# async_std::task::block_on(async {
use std::collections::VecDeque;
use async_std::prelude::*;
let inner1: VecDeque<u8> = vec![1,2,3].into_iter().collect();
let inner2: VecDeque<u8> = vec![4,5,6].into_iter().collect();
let s: VecDeque<_> = vec![inner1, inner2].into_iter().collect();
let v: Vec<_> = s.flatten().collect().await;
assert_eq!(v, vec![1,2,3,4,5,6]);
# });
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn flatten(self) -> Flatten<Self, Self::Item>
where
Self: Sized,
Self::Item: IntoStream,
{
Flatten::new(self)
}
#[doc = r#" #[doc = r#"
Both filters and maps a stream. Both filters and maps a stream.
@ -1084,6 +1162,40 @@ extension_trait! {
Skip::new(self, n) Skip::new(self, n)
} }
#[doc=r#"
Await a stream or times out after a duration of time.
If you want to await an I/O future consider using
[`io::timeout`](../io/fn.timeout.html) instead.
# Examples
```
# fn main() -> std::io::Result<()> { async_std::task::block_on(async {
#
use std::time::Duration;
use async_std::stream;
use async_std::prelude::*;
let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1));
while let Some(v) = s.next().await {
assert_eq!(v, Ok(1));
}
#
# Ok(()) }) }
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn timeout(self, dur: Duration) -> Timeout<Self>
where
Self: Stream + Sized,
{
Timeout::new(self, dur)
}
#[doc = r#" #[doc = r#"
A combinator that applies a function as long as it returns successfully, producing a single, final value. A combinator that applies a function as long as it returns successfully, producing a single, final value.
Immediately returns the error when the function returns unsuccessfully. Immediately returns the error when the function returns unsuccessfully.
@ -1536,6 +1648,95 @@ extension_trait! {
{ {
LtFuture::new(self, other) LtFuture::new(self, other)
} }
#[doc = r#"
Sums the elements of an iterator.
Takes each element, adds them together, and returns the result.
An empty iterator returns the zero value of the type.
# Panics
When calling `sum()` and a primitive integer type is being returned, this
method will panic if the computation overflows and debug assertions are
enabled.
# Examples
Basic usage:
```
# fn main() { async_std::task::block_on(async {
#
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect();
let sum: u8 = s.sum().await;
assert_eq!(sum, 10);
#
# }) }
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn sum<'a, S>(
self,
) -> impl Future<Output = S> + 'a [Pin<Box<dyn Future<Output = S> + 'a>>]
where
Self: Sized + Stream<Item = S> + 'a,
S: Sum,
{
Sum::sum(self)
}
#[doc = r#"
Iterates over the entire iterator, multiplying all the elements
An empty iterator returns the one value of the type.
# Panics
When calling `product()` and a primitive integer type is being returned,
method will panic if the computation overflows and debug assertions are
enabled.
# Examples
This example calculates the factorial of n (i.e. the product of the numbers from 1 to
n, inclusive):
```
# fn main() { async_std::task::block_on(async {
#
async fn factorial(n: u32) -> u32 {
use std::collections::VecDeque;
use async_std::prelude::*;
let s: VecDeque<_> = (1..=n).collect();
s.product().await
}
assert_eq!(factorial(0).await, 1);
assert_eq!(factorial(1).await, 1);
assert_eq!(factorial(5).await, 120);
#
# }) }
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn product<'a, P>(
self,
) -> impl Future<Output = P> + 'a [Pin<Box<dyn Future<Output = P> + 'a>>]
where
Self: Sized + Stream<Item = P> + 'a,
P: Product,
{
Product::product(self)
}
} }
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> { impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {

@ -7,6 +7,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream to maintain state while polling another stream. /// A stream to maintain state while polling another stream.
///
/// This `struct` is created by the [`scan`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`scan`]: trait.Stream.html#method.scan
/// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct Scan<S, St, F> { pub struct Scan<S, St, F> {
#[pin] #[pin]

@ -7,6 +7,12 @@ use crate::stream::Stream;
pin_project! { pin_project! {
/// A stream to skip first n elements of another stream. /// A stream to skip first n elements of another stream.
///
/// This `struct` is created by the [`skip`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`skip`]: trait.Stream.html#method.skip
/// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct Skip<S> { pub struct Skip<S> {
#[pin] #[pin]

@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream to skip elements of another stream based on a predicate. /// A stream to skip elements of another stream based on a predicate.
///
/// This `struct` is created by the [`skip_while`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`skip_while`]: trait.Stream.html#method.skip_while
/// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct SkipWhile<S, P, T> { pub struct SkipWhile<S, P, T> {
#[pin] #[pin]

@ -7,6 +7,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream that steps a given amount of elements of another stream. /// A stream that steps a given amount of elements of another stream.
///
/// This `struct` is created by the [`step_by`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`step_by`]: trait.Stream.html#method.step_by
/// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct StepBy<S> { pub struct StepBy<S> {
#[pin] #[pin]

@ -7,6 +7,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream that yields the first `n` items of another stream. /// A stream that yields the first `n` items of another stream.
///
/// This `struct` is created by the [`take`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`take`]: trait.Stream.html#method.take
/// [`Stream`]: trait.Stream.html
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Take<S> { pub struct Take<S> {
#[pin] #[pin]

@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// A stream that yields elements based on a predicate. /// A stream that yields elements based on a predicate.
///
/// This `struct` is created by the [`take_while`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`take_while`]: trait.Stream.html#method.take_while
/// [`Stream`]: trait.Stream.html
#[derive(Debug)] #[derive(Debug)]
pub struct TakeWhile<S, P, T> { pub struct TakeWhile<S, P, T> {
#[pin] #[pin]

@ -0,0 +1,63 @@
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::time::Duration;
use futures_timer::Delay;
use pin_project_lite::pin_project;
use crate::future::Future;
use crate::stream::Stream;
use crate::task::{Context, Poll};
pin_project! {
/// A stream with timeout time set
#[derive(Debug)]
pub struct Timeout<S: Stream> {
#[pin]
stream: S,
#[pin]
delay: Delay,
}
}
impl<S: Stream> Timeout<S> {
pub fn new(stream: S, dur: Duration) -> Timeout<S> {
let delay = Delay::new(dur);
Timeout { stream, delay }
}
}
impl<S: Stream> Stream for Timeout<S> {
type Item = Result<S::Item, TimeoutError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.stream.poll_next(cx) {
Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => match this.delay.poll(cx) {
Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError { _private: () }))),
Poll::Pending => Poll::Pending,
},
}
}
}
/// An error returned when a stream times out.
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct TimeoutError {
_private: (),
}
impl Error for TimeoutError {}
impl fmt::Display for TimeoutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"stream has timed out".fmt(f)
}
}

@ -8,6 +8,12 @@ use crate::task::{Context, Poll};
pin_project! { pin_project! {
/// An iterator that iterates two other iterators simultaneously. /// An iterator that iterates two other iterators simultaneously.
///
/// This `struct` is created by the [`zip`] method on [`Stream`]. See its
/// documentation for more.
///
/// [`zip`]: trait.Stream.html#method.zip
/// [`Stream`]: trait.Stream.html
pub struct Zip<A: Stream, B> { pub struct Zip<A: Stream, B> {
item_slot: Option<A::Item>, item_slot: Option<A::Item>,
#[pin] #[pin]

@ -1,3 +1,5 @@
use std::pin::Pin;
use crate::future::Future; use crate::future::Future;
use crate::stream::Stream; use crate::stream::Stream;
@ -16,8 +18,62 @@ use crate::stream::Stream;
pub trait Sum<A = Self>: Sized { pub trait Sum<A = Self>: Sized {
/// Method which takes a stream and generates `Self` from the elements by /// Method which takes a stream and generates `Self` from the elements by
/// "summing up" the items. /// "summing up" the items.
fn sum<S, F>(stream: S) -> F fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
where where
S: Stream<Item = A>, S: Stream<Item = A> + 'a;
F: Future<Output = Self>; }
use core::ops::Add;
use core::num::Wrapping;
use crate::stream::stream::StreamExt;
macro_rules! integer_sum {
(@impls $zero: expr, $($a:ty)*) => ($(
impl Sum for $a {
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self>+ 'a>>
where
S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold($zero, Add::add).await } )
}
}
impl<'a> Sum<&'a $a> for $a {
fn sum<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
where
S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold($zero, Add::add).await } )
}
}
)*);
($($a:ty)*) => (
integer_sum!(@impls 0, $($a)*);
integer_sum!(@impls Wrapping(0), $(Wrapping<$a>)*);
);
} }
macro_rules! float_sum {
($($a:ty)*) => ($(
impl Sum for $a {
fn sum<'a, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'a>>
where S: Stream<Item = $a> + 'a,
{
Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } )
}
}
impl<'a> Sum<&'a $a> for $a {
fn sum<'b, S>(stream: S) -> Pin<Box<dyn Future<Output = Self> + 'b>>
where S: Stream<Item = &'a $a> + 'b,
{
Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } )
}
}
)*);
($($a:ty)*) => (
float_sum!(@impls 0.0, $($a)*);
float_sum!(@impls Wrapping(0.0), $(Wrapping<$a>)*);
);
}
integer_sum!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize }
float_sum!{ f32 f64 }

@ -154,6 +154,7 @@ where
fn vtable() -> &'static RawWakerVTable { fn vtable() -> &'static RawWakerVTable {
unsafe fn clone_raw(ptr: *const ()) -> RawWaker { unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
#![allow(clippy::redundant_clone)]
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
mem::forget(arc.clone()); mem::forget(arc.clone());
RawWaker::new(ptr, vtable()) RawWaker::new(ptr, vtable())

@ -167,6 +167,7 @@ impl Tag {
} }
pub fn task(&self) -> &Task { pub fn task(&self) -> &Task {
#[allow(clippy::transmute_ptr_to_ptr)]
unsafe { unsafe {
let raw = self.raw_metadata.load(Ordering::Acquire); let raw = self.raw_metadata.load(Ordering::Acquire);
@ -189,6 +190,7 @@ impl Tag {
} }
} }
#[allow(clippy::transmute_ptr_to_ptr)]
mem::transmute::<&AtomicUsize, &Option<Task>>(&self.raw_metadata) mem::transmute::<&AtomicUsize, &Option<Task>>(&self.raw_metadata)
.as_ref() .as_ref()
.unwrap() .unwrap()

@ -4,6 +4,7 @@ use async_std::task;
#[test] #[test]
fn test_buffered_writer() { fn test_buffered_writer() {
#![allow(clippy::cognitive_complexity)]
task::block_on(async { task::block_on(async {
let inner = Vec::new(); let inner = Vec::new();
let mut writer = BufWriter::with_capacity(2, inner); let mut writer = BufWriter::with_capacity(2, inner);

@ -37,6 +37,7 @@ fn capacity() {
#[test] #[test]
fn len_empty_full() { fn len_empty_full() {
#![allow(clippy::cognitive_complexity)]
task::block_on(async { task::block_on(async {
let (s, r) = channel(2); let (s, r) = channel(2);

@ -13,7 +13,7 @@ use futures::channel::mpsc;
/// Generates a random number in `0..n`. /// Generates a random number in `0..n`.
pub fn random(n: u32) -> u32 { pub fn random(n: u32) -> u32 {
thread_local! { thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1406868647)); static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1_406_868_647));
} }
RNG.with(|rng| { RNG.with(|rng| {

Loading…
Cancel
Save