diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d47eabce..653834a7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -7,12 +7,13 @@ on: - staging - trying +env: + RUSTFLAGS: -Dwarnings + jobs: build_and_test: name: Build and test runs-on: ${{ matrix.os }} - env: - RUSTFLAGS: -Dwarnings strategy: matrix: os: [ubuntu-latest, windows-latest, macOS-latest] @@ -48,8 +49,6 @@ jobs: check_fmt_and_docs: name: Checking fmt and docs runs-on: ubuntu-latest - env: - RUSTFLAGS: -Dwarnings steps: - uses: actions/checkout@master @@ -81,20 +80,11 @@ jobs: clippy_check: name: Clippy check runs-on: ubuntu-latest - # TODO: There is a lot of warnings - # env: - # RUSTFLAGS: -Dwarnings steps: - uses: actions/checkout@v1 - - id: component - uses: actions-rs/components-nightly@v1 - with: - component: clippy - - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ steps.component.outputs.toolchain }} - override: true - - run: rustup component add clippy - - uses: actions-rs/clippy-check@v1 - with: - token: ${{ secrets.GITHUB_TOKEN }} + - name: Install rust + run: rustup update beta && rustup default beta + - name: Install clippy + run: rustup component add clippy + - name: clippy + run: cargo clippy --all --features unstable diff --git a/Cargo.toml b/Cargo.toml index 7aaba687..ad887303 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ authors = [ edition = "2018" license = "Apache-2.0/MIT" repository = "https://github.com/async-rs/async-std" -homepage = "https://github.com/async-rs/async-std" +homepage = "https://async.rs" documentation = "https://docs.rs/async-std" description = "Async version of the Rust standard library" keywords = ["async", "await", "future", "std", "task"] diff --git a/docs/src/concepts/futures.md b/docs/src/concepts/futures.md index 67db720c..7d9cc636 100644 --- a/docs/src/concepts/futures.md +++ b/docs/src/concepts/futures.md @@ -24,11 +24,7 @@ To sum up: Rust gives us the ability to safely abstract over important propertie ## An easy view of computation -While computation is a subject to write a whole [book](https://computationbook.com/) about, a very simplified view suffices for us: - -- computation is a sequence of composable operations -- they can branch based on a decision -- they either run to succession and yield a result, or they can yield an error +While computation is a subject to write a whole [book](https://computationbook.com/) about, a very simplified view suffices for us: A sequence of composable operations which can branch based on a decision, run to succession and yield a result or yield an error ## Deferring computation @@ -136,11 +132,11 @@ When executing 2 or more of these functions at the same time, our runtime system ## Conclusion -Working from values, we searched for something that expresses *working towards a value available sometime later*. From there, we talked about the concept of polling. +Working from values, we searched for something that expresses *working towards a value available later*. From there, we talked about the concept of polling. A `Future` is any data type that does not represent a value, but the ability to *produce a value at some point in the future*. Implementations of this are very varied and detailed depending on use-case, but the interface is simple. -Next, we will introduce you to `tasks`, which we need to actually *run* Futures. +Next, we will introduce you to `tasks`, which we will use to actually *run* Futures. [^1]: Two parties reading while it is guaranteed that no one is writing is always safe. diff --git a/docs/src/concepts/tasks.md b/docs/src/concepts/tasks.md index d4037a3b..2142cac4 100644 --- a/docs/src/concepts/tasks.md +++ b/docs/src/concepts/tasks.md @@ -80,7 +80,7 @@ Tasks in `async_std` are one of the core abstractions. Much like Rust's `thread` ## Blocking -`Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and by itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this: +`Task`s are assumed to run _concurrently_, potentially by sharing a thread of execution. This means that operations blocking an _operating system thread_, such as `std::thread::sleep` or io function from Rust's `std` library will _stop execution of all tasks sharing this thread_. Other libraries (such as database drivers) have similar behaviour. Note that _blocking the current thread_ is not in and of itself bad behaviour, just something that does not mix well with the concurrent execution model of `async-std`. Essentially, never do this: ```rust,edition2018 # extern crate async_std; diff --git a/docs/src/overview/async-std.md b/docs/src/overview/async-std.md index 2b59ffb0..0086599f 100644 --- a/docs/src/overview/async-std.md +++ b/docs/src/overview/async-std.md @@ -4,4 +4,4 @@ `async-std` provides an interface to all important primitives: filesystem operations, network operations and concurrency basics like timers. It also exposes a `task` in a model similar to the `thread` module found in the Rust standard lib. But it does not only include I/O primitives, but also `async/await` compatible versions of primitives like `Mutex`. -[organization]: https://github.com/async-rs/async-std +[organization]: https://github.com/async-rs diff --git a/docs/src/overview/stability-guarantees.md b/docs/src/overview/stability-guarantees.md index 84bb68d7..8c14e20f 100644 --- a/docs/src/overview/stability-guarantees.md +++ b/docs/src/overview/stability-guarantees.md @@ -31,7 +31,7 @@ In general, this crate will be conservative with respect to the minimum supporte ## Security fixes -Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give at least _3 month_ of ahead notice. +Security fixes will be applied to _all_ minor branches of this library in all _supported_ major revisions. This policy might change in the future, in which case we give a notice at least _3 months_ ahead. ## Credits diff --git a/examples/a-chat/main.rs b/examples/a-chat/main.rs index ced7cac2..89e5e2b6 100644 --- a/examples/a-chat/main.rs +++ b/examples/a-chat/main.rs @@ -8,6 +8,6 @@ fn main() -> Result<()> { match (args.nth(1).as_ref().map(String::as_str), args.next()) { (Some("client"), None) => client::main(), (Some("server"), None) => server::main(), - _ => Err("Usage: a-chat [client|server]")?, + _ => Err("Usage: a-chat [client|server]".into()), } } diff --git a/examples/a-chat/server.rs b/examples/a-chat/server.rs index 77ebfd1e..e049a490 100644 --- a/examples/a-chat/server.rs +++ b/examples/a-chat/server.rs @@ -45,7 +45,7 @@ async fn connection_loop(mut broker: Sender, stream: TcpStream) -> Result let mut lines = reader.lines(); let name = match lines.next().await { - None => Err("peer disconnected immediately")?, + None => return Err("peer disconnected immediately".into()), Some(line) => line?, }; let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::(); diff --git a/src/fs/file.rs b/src/fs/file.rs index 3129e96a..745a5848 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -66,6 +66,23 @@ pub struct File { } impl File { + /// Creates an async file handle. + pub(crate) fn new(file: std::fs::File, is_flushed: bool) -> File { + let file = Arc::new(file); + + File { + file: file.clone(), + lock: Lock::new(State { + file, + mode: Mode::Idle, + cache: Vec::new(), + is_flushed, + last_read_err: None, + last_write_err: None, + }), + } + } + /// Opens a file in read-only mode. /// /// See the [`OpenOptions::open`] function for more options. @@ -96,7 +113,7 @@ impl File { pub async fn open>(path: P) -> io::Result { let path = path.as_ref().to_owned(); let file = blocking::spawn(move || std::fs::File::open(&path)).await?; - Ok(file.into()) + Ok(File::new(file, true)) } /// Opens a file in write-only mode. @@ -131,7 +148,7 @@ impl File { pub async fn create>(path: P) -> io::Result { let path = path.as_ref().to_owned(); let file = blocking::spawn(move || std::fs::File::create(&path)).await?; - Ok(file.into()) + Ok(File::new(file, true)) } /// Synchronizes OS-internal buffered contents and metadata to disk. @@ -383,19 +400,7 @@ impl Seek for &File { impl From for File { fn from(file: std::fs::File) -> File { - let file = Arc::new(file); - - File { - file: file.clone(), - lock: Lock::new(State { - file, - mode: Mode::Idle, - cache: Vec::new(), - is_flushed: false, - last_read_err: None, - last_write_err: None, - }), - } + File::new(file, false) } } diff --git a/src/fs/open_options.rs b/src/fs/open_options.rs index a2eb9e76..7f700734 100644 --- a/src/fs/open_options.rs +++ b/src/fs/open_options.rs @@ -284,7 +284,10 @@ impl OpenOptions { pub fn open>(&self, path: P) -> impl Future> { let path = path.as_ref().to_owned(); let options = self.0.clone(); - async move { blocking::spawn(move || options.open(path).map(|f| f.into())).await } + async move { + let file = blocking::spawn(move || options.open(path)).await?; + Ok(File::new(file, true)) + } } } diff --git a/src/io/empty.rs b/src/io/empty.rs index d8d768e0..90442675 100644 --- a/src/io/empty.rs +++ b/src/io/empty.rs @@ -28,9 +28,10 @@ pub fn empty() -> Empty { /// A reader that contains no data. /// -/// This reader is constructed by the [`sink`] function. +/// This reader is created by the [`empty`] function. See its +/// documentation for more. /// -/// [`sink`]: fn.sink.html +/// [`empty`]: fn.empty.html pub struct Empty { _private: (), } diff --git a/src/io/repeat.rs b/src/io/repeat.rs index a82e21be..56368179 100644 --- a/src/io/repeat.rs +++ b/src/io/repeat.rs @@ -29,7 +29,8 @@ pub fn repeat(byte: u8) -> Repeat { /// A reader which yields one byte over and over and over and over and over and... /// -/// This reader is constructed by the [`repeat`] function. +/// This reader is created by the [`repeat`] function. See its +/// documentation for more. /// /// [`repeat`]: fn.repeat.html pub struct Repeat { diff --git a/src/io/sink.rs b/src/io/sink.rs index faa763c6..86aeb0ae 100644 --- a/src/io/sink.rs +++ b/src/io/sink.rs @@ -25,7 +25,8 @@ pub fn sink() -> Sink { /// A writer that consumes and drops all data. /// -/// This writer is constructed by the [`sink`] function. +/// This writer is constructed by the [`sink`] function. See its documentation +/// for more. /// /// [`sink`]: fn.sink.html pub struct Sink { diff --git a/src/io/stderr.rs b/src/io/stderr.rs index 1ec28b29..0a8c4700 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -11,6 +11,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll}; /// /// [`std::io::stderr`]: https://doc.rust-lang.org/std/io/fn.stderr.html /// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// /// # Examples /// /// ```no_run @@ -34,12 +40,16 @@ pub fn stderr() -> Stderr { /// A handle to the standard error of the current process. /// -/// Created by the [`stderr`] function. +/// This writer is created by the [`stderr`] function. See its documentation for +/// more. /// -/// This type is an async version of [`std::io::Stderr`]. +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. /// /// [`stderr`]: fn.stderr.html -/// [`std::io::Stderr`]: https://doc.rust-lang.org/std/io/struct.Stderr.html #[derive(Debug)] pub struct Stderr(Mutex); diff --git a/src/io/stdin.rs b/src/io/stdin.rs index dd3991fd..22b9cf34 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -11,6 +11,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll}; /// /// [`std::io::stdin`]: https://doc.rust-lang.org/std/io/fn.stdin.html /// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// /// # Examples /// /// ```no_run @@ -35,12 +41,16 @@ pub fn stdin() -> Stdin { /// A handle to the standard input of the current process. /// -/// Created by the [`stdin`] function. +/// This reader is created by the [`stdin`] function. See its documentation for +/// more. /// -/// This type is an async version of [`std::io::Stdin`]. +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. /// /// [`stdin`]: fn.stdin.html -/// [`std::io::Stdin`]: https://doc.rust-lang.org/std/io/struct.Stdin.html #[derive(Debug)] pub struct Stdin(Mutex); diff --git a/src/io/stdout.rs b/src/io/stdout.rs index 7945bfdd..1e9340fc 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -11,6 +11,12 @@ use crate::task::{blocking, Context, JoinHandle, Poll}; /// /// [`std::io::stdout`]: https://doc.rust-lang.org/std/io/fn.stdout.html /// +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. +/// /// # Examples /// /// ```no_run @@ -34,12 +40,16 @@ pub fn stdout() -> Stdout { /// A handle to the standard output of the current process. /// -/// Created by the [`stdout`] function. +/// This writer is created by the [`stdout`] function. See its documentation +/// for more. /// -/// This type is an async version of [`std::io::Stdout`]. +/// ### Note: Windows Portability Consideration +/// +/// When operating in a console, the Windows implementation of this stream does not support +/// non-UTF-8 byte sequences. Attempting to write bytes that are not valid UTF-8 will return +/// an error. /// /// [`stdout`]: fn.stdout.html -/// [`std::io::Stdout`]: https://doc.rust-lang.org/std/io/struct.Stdout.html #[derive(Debug)] pub struct Stdout(Mutex); diff --git a/src/lib.rs b/src/lib.rs index 22481b53..b659c39e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,6 +43,7 @@ #![cfg_attr(feature = "docs", feature(doc_cfg))] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![allow(clippy::mutex_atomic, clippy::module_inception)] #![doc(test(attr(deny(rust_2018_idioms, warnings))))] #![doc(test(attr(allow(unused_extern_crates, unused_variables))))] #![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")] diff --git a/src/option/mod.rs b/src/option/mod.rs index afb29adc..76f096b3 100644 --- a/src/option/mod.rs +++ b/src/option/mod.rs @@ -7,3 +7,8 @@ mod from_stream; #[doc(inline)] pub use std::option::Option; + +cfg_unstable! { + mod product; + mod sum; +} diff --git a/src/option/product.rs b/src/option/product.rs new file mode 100644 index 00000000..8b66bc69 --- /dev/null +++ b/src/option/product.rs @@ -0,0 +1,66 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Stream, Product}; + +impl Product> for Option +where + T: Product, +{ + #[doc = r#" + Takes each element in the `Stream`: if it is a `None`, no further + elements are taken, and the `None` is returned. Should no `None` occur, + the product of all elements is returned. + + # Examples + + This multiplies every integer in a vector, rejecting the product if a negative element is + encountered: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect(); + let prod: Option = v.map(|x| + if x < 0 { + None + } else { + Some(x) + }).product().await; + assert_eq!(prod, Some(8)); + # + # }) } + ``` + "#] + fn product<'a, S>(stream: S) -> Pin> + 'a>> + where S: Stream> + 'a + { + Box::pin(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_none = false; + let out = >::product(stream + .scan((), |_, elem| { + match elem { + Some(elem) => Some(elem), + None => { + found_none = true; + // Stop processing the stream on error + None + } + } + })).await; + + if found_none { + None + } else { + Some(out) + } + }) + } +} diff --git a/src/option/sum.rs b/src/option/sum.rs new file mode 100644 index 00000000..25dc9209 --- /dev/null +++ b/src/option/sum.rs @@ -0,0 +1,63 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Stream, Sum}; + +impl Sum> for Option +where + T: Sum, +{ + #[doc = r#" + Takes each element in the `Iterator`: if it is a `None`, no further + elements are taken, and the `None` is returned. Should no `None` occur, + the sum of all elements is returned. + + # Examples + + This sums up the position of the character 'a' in a vector of strings, + if a word did not have the character 'a' the operation returns `None`: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let words: VecDeque<_> = vec!["have", "a", "great", "day"] + .into_iter() + .collect(); + let total: Option = words.map(|w| w.find('a')).sum().await; + assert_eq!(total, Some(5)); + # + # }) } + ``` + "#] + fn sum<'a, S>(stream: S) -> Pin> + 'a>> + where S: Stream> + 'a + { + Box::pin(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_none = false; + let out = >::sum(stream + .scan((), |_, elem| { + match elem { + Some(elem) => Some(elem), + None => { + found_none = true; + // Stop processing the stream on error + None + } + } + })).await; + + if found_none { + None + } else { + Some(out) + } + }) + } +} diff --git a/src/path/pathbuf.rs b/src/path/pathbuf.rs index 38018c9d..a90ebabb 100644 --- a/src/path/pathbuf.rs +++ b/src/path/pathbuf.rs @@ -1,6 +1,12 @@ use std::ffi::{OsStr, OsString}; +#[cfg(feature = "unstable")] +use std::pin::Pin; use crate::path::Path; +#[cfg(feature = "unstable")] +use crate::prelude::*; +#[cfg(feature = "unstable")] +use crate::stream::{Extend, FromStream, IntoStream}; /// This struct is an async version of [`std::path::PathBuf`]. /// @@ -233,3 +239,44 @@ impl AsRef for PathBuf { self.inner.as_ref() } } + +#[cfg(feature = "unstable")] +impl> Extend

for PathBuf { + fn stream_extend<'a, S: IntoStream>( + &'a mut self, + stream: S, + ) -> Pin + 'a>> + where + P: 'a, + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + //TODO: This can be added back in once this issue is resolved: + // https://github.com/rust-lang/rust/issues/58234 + //self.reserve(stream.size_hint().0); + + Box::pin(stream.for_each(move |item| self.push(item.as_ref()))) + } +} + +#[cfg(feature = "unstable")] +impl<'b, P: AsRef + 'b> FromStream

for PathBuf { + #[inline] + fn from_stream<'a, S: IntoStream>( + stream: S, + ) -> Pin + 'a>> + where + ::IntoStream: 'a, + { + let stream = stream.into_stream(); + + Box::pin(async move { + pin_utils::pin_mut!(stream); + + let mut out = Self::new(); + out.stream_extend(stream).await; + out + }) + } +} diff --git a/src/result/mod.rs b/src/result/mod.rs index 908f9c4d..cae0ebd9 100644 --- a/src/result/mod.rs +++ b/src/result/mod.rs @@ -7,3 +7,8 @@ mod from_stream; #[doc(inline)] pub use std::result::Result; + +cfg_unstable! { + mod product; + mod sum; +} diff --git a/src/result/product.rs b/src/result/product.rs new file mode 100644 index 00000000..17afa94b --- /dev/null +++ b/src/result/product.rs @@ -0,0 +1,64 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Stream, Product}; + +impl Product> for Result +where + T: Product, +{ + #[doc = r#" + Takes each element in the `Stream`: if it is an `Err`, no further + elements are taken, and the `Err` is returned. Should no `Err` occur, + the product of all elements is returned. + + # Examples + + This multiplies every integer in a vector, rejecting the product if a negative element is + encountered: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let v: VecDeque<_> = vec![1, 2, 4].into_iter().collect(); + let res: Result = v.map(|x| + if x < 0 { + Err("Negative element found") + } else { + Ok(x) + }).product().await; + assert_eq!(res, Ok(8)); + # + # }) } + ``` + "#] + fn product<'a, S>(stream: S) -> Pin> + 'a>> + where S: Stream> + 'a + { + Box::pin(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_error = None; + let out = >::product(stream + .scan((), |_, elem| { + match elem { + Ok(elem) => Some(elem), + Err(err) => { + found_error = Some(err); + // Stop processing the stream on error + None + } + } + })).await; + match found_error { + Some(err) => Err(err), + None => Ok(out) + } + }) + } +} diff --git a/src/result/sum.rs b/src/result/sum.rs new file mode 100644 index 00000000..caca4f65 --- /dev/null +++ b/src/result/sum.rs @@ -0,0 +1,64 @@ +use std::pin::Pin; + +use crate::prelude::*; +use crate::stream::{Stream, Sum}; + +impl Sum> for Result +where + T: Sum, +{ + #[doc = r#" + Takes each element in the `Stream`: if it is an `Err`, no further + elements are taken, and the `Err` is returned. Should no `Err` occur, + the sum of all elements is returned. + + # Examples + + This sums up every integer in a vector, rejecting the sum if a negative + element is encountered: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let v: VecDeque<_> = vec![1, 2].into_iter().collect(); + let res: Result = v.map(|x| + if x < 0 { + Err("Negative element found") + } else { + Ok(x) + }).sum().await; + assert_eq!(res, Ok(3)); + # + # }) } + ``` + "#] + fn sum<'a, S>(stream: S) -> Pin> + 'a>> + where S: Stream> + 'a + { + Box::pin(async move { + pin_utils::pin_mut!(stream); + + // Using `scan` here because it is able to stop the stream early + // if a failure occurs + let mut found_error = None; + let out = >::sum(stream + .scan((), |_, elem| { + match elem { + Ok(elem) => Some(elem), + Err(err) => { + found_error = Some(err); + // Stop processing the stream on error + None + } + } + })).await; + match found_error { + Some(err) => Err(err), + None => Ok(out) + } + }) + } +} diff --git a/src/stream/empty.rs b/src/stream/empty.rs index ceb91fea..49090707 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -6,6 +6,11 @@ use crate::task::{Context, Poll}; /// Creates a stream that doesn't yield any items. /// +/// This `struct` is created by the [`empty`] function. See its +/// documentation for more. +/// +/// [`empty`]: fn.empty.html +/// /// # Examples /// /// ``` diff --git a/src/stream/exact_size_stream.rs b/src/stream/exact_size_stream.rs index 32a1eb31..8b6ba97d 100644 --- a/src/stream/exact_size_stream.rs +++ b/src/stream/exact_size_stream.rs @@ -75,6 +75,7 @@ pub use crate::stream::Stream; /// assert_eq!(5, counter.len()); /// # }); /// ``` +#[allow(clippy::len_without_is_empty)] // ExactSizeIterator::is_empty is unstable #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] pub trait ExactSizeStream: Stream { diff --git a/src/stream/from_fn.rs b/src/stream/from_fn.rs index 7fee8926..5260d878 100644 --- a/src/stream/from_fn.rs +++ b/src/stream/from_fn.rs @@ -10,7 +10,8 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that yields elements by calling a closure. /// - /// This stream is constructed by [`from_fn`] function. + /// This stream is created by the [`from_fn`] function. See its + /// documentation for more. /// /// [`from_fn`]: fn.from_fn.html #[derive(Debug)] diff --git a/src/stream/interval.rs b/src/stream/interval.rs index 2f7fe9e3..efec4362 100644 --- a/src/stream/interval.rs +++ b/src/stream/interval.rs @@ -52,6 +52,10 @@ pub fn interval(dur: Duration) -> Interval { /// A stream representing notifications at fixed interval /// +/// This stream is created by the [`interval`] function. See its +/// documentation for more. +/// +/// [`interval`]: fn.interval.html #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] @@ -111,6 +115,7 @@ fn next_interval(prev: Instant, now: Instant, interval: Duration) -> Instant { #[cfg(test)] mod test { use super::next_interval; + use std::cmp::Ordering; use std::time::{Duration, Instant}; struct Timeline(Instant); @@ -134,12 +139,10 @@ mod test { // The math around Instant/Duration isn't 100% precise due to rounding // errors, see #249 for more info fn almost_eq(a: Instant, b: Instant) -> bool { - if a == b { - true - } else if a > b { - a - b < Duration::from_millis(1) - } else { - b - a < Duration::from_millis(1) + match a.cmp(&b) { + Ordering::Equal => true, + Ordering::Greater => a - b < Duration::from_millis(1), + Ordering::Less => b - a < Duration::from_millis(1), } } diff --git a/src/stream/once.rs b/src/stream/once.rs index ae90d639..d993c160 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -29,7 +29,8 @@ pub fn once(t: T) -> Once { pin_project! { /// A stream that yields a single item. /// - /// This stream is constructed by the [`once`] function. + /// This stream is created by the [`once`] function. See its + /// documentation for more. /// /// [`once`]: fn.once.html #[derive(Debug)] diff --git a/src/stream/product.rs b/src/stream/product.rs index 5799990d..71b14c70 100644 --- a/src/stream/product.rs +++ b/src/stream/product.rs @@ -1,7 +1,9 @@ +use std::pin::Pin; + use crate::future::Future; use crate::stream::Stream; -/// Trait to represent types that can be created by productming up a stream. +/// Trait to represent types that can be created by multiplying the elements of a stream. /// /// This trait is used to implement the [`product`] method on streams. Types which /// implement the trait can be generated by the [`product`] method. Like @@ -16,8 +18,62 @@ use crate::stream::Stream; pub trait Product: Sized { /// Method which takes a stream and generates `Self` from the elements by /// multiplying the items. - fn product(stream: S) -> F + fn product<'a, S>(stream: S) -> Pin + 'a>> where - S: Stream, - F: Future; + S: Stream + 'a; } + +use core::ops::Mul; +use core::num::Wrapping; +use crate::stream::stream::StreamExt; + +macro_rules! integer_product { + (@impls $one: expr, $($a:ty)*) => ($( + impl Product for $a { + fn product<'a, S>(stream: S) -> Pin+ 'a>> + where + S: Stream + 'a, + { + Box::pin(async move { stream.fold($one, Mul::mul).await } ) + } + } + impl<'a> Product<&'a $a> for $a { + fn product<'b, S>(stream: S) -> Pin + 'b>> + where + S: Stream + 'b, + { + Box::pin(async move { stream.fold($one, Mul::mul).await } ) + } + } + )*); + ($($a:ty)*) => ( + integer_product!(@impls 1, $($a)*); + integer_product!(@impls Wrapping(1), $(Wrapping<$a>)*); + ); +} + +macro_rules! float_product { + ($($a:ty)*) => ($( + impl Product for $a { + fn product<'a, S>(stream: S) -> Pin+ 'a>> + where S: Stream + 'a, + { + Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } ) + } + } + impl<'a> Product<&'a $a> for $a { + fn product<'b, S>(stream: S) -> Pin+ 'b>> + where S: Stream + 'b, + { + Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } ) + } + } + )*); + ($($a:ty)*) => ( + float_product!($($a)*); + float_product!($(Wrapping<$a>)*); + ); +} + +integer_product!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize } +float_product!{ f32 f64 } diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index 75fd6973..aaaff0c6 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -29,7 +29,8 @@ where /// A stream that yields the same item repeatedly. /// -/// This stream is constructed by the [`repeat`] function. +/// This stream is created by the [`repeat`] function. See its +/// documentation for more. /// /// [`repeat`]: fn.repeat.html #[derive(Debug)] diff --git a/src/stream/repeat_with.rs b/src/stream/repeat_with.rs index 15d4aa12..de53bc9d 100644 --- a/src/stream/repeat_with.rs +++ b/src/stream/repeat_with.rs @@ -10,7 +10,8 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that repeats elements of type `T` endlessly by applying a provided closure. /// - /// This stream is constructed by the [`repeat_with`] function. + /// This stream is created by the [`repeat_with`] function. See its + /// documentation for more. /// /// [`repeat_with`]: fn.repeat_with.html #[derive(Debug)] diff --git a/src/stream/stream/chain.rs b/src/stream/stream/chain.rs index df316150..5e0eeb48 100644 --- a/src/stream/stream/chain.rs +++ b/src/stream/stream/chain.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// Chains two streams one after another. + /// + /// This `struct` is created by the [`chain`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`chain`]: trait.Stream.html#method.chain + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Chain { #[pin] diff --git a/src/stream/stream/filter.rs b/src/stream/stream/filter.rs index eb4153f7..a2562e77 100644 --- a/src/stream/stream/filter.rs +++ b/src/stream/stream/filter.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream to filter elements of another stream with a predicate. + /// + /// This `struct` is created by the [`filter`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`filter`]: trait.Stream.html#method.filter + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Filter { #[pin] diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs index 11629700..39af9cb0 100644 --- a/src/stream/stream/fuse.rs +++ b/src/stream/stream/fuse.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A `Stream` that is permanently closed once a single call to `poll` results in /// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`. + /// + /// This `struct` is created by the [`fuse`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`fuse`]: trait.Stream.html#method.fuse + /// [`Stream`]: trait.Stream.html #[derive(Clone, Debug)] pub struct Fuse { #[pin] diff --git a/src/stream/stream/inspect.rs b/src/stream/stream/inspect.rs index 5de60fb3..ba60b0ce 100644 --- a/src/stream/stream/inspect.rs +++ b/src/stream/stream/inspect.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that does something with each element of another stream. + /// + /// This `struct` is created by the [`inspect`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`inspect`]: trait.Stream.html#method.inspect + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Inspect { #[pin] diff --git a/src/stream/stream/merge.rs b/src/stream/stream/merge.rs index 3ccc223d..d926ec4f 100644 --- a/src/stream/stream/merge.rs +++ b/src/stream/stream/merge.rs @@ -7,9 +7,11 @@ use pin_project_lite::pin_project; pin_project! { /// A stream that merges two other streams into a single stream. /// - /// This stream is returned by [`Stream::merge`]. + /// This `struct` is created by the [`merge`] method on [`Stream`]. See its + /// documentation for more. /// - /// [`Stream::merge`]: trait.Stream.html#method.merge + /// [`merge`]: trait.Stream.html#method.merge + /// [`Stream`]: trait.Stream.html #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] #[derive(Debug)] diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 29e68fc4..e1a51bfb 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -93,18 +93,21 @@ use std::marker::PhantomData; cfg_unstable! { use std::pin::Pin; + use std::time::Duration; use crate::future::Future; - use crate::stream::FromStream; use crate::stream::into_stream::IntoStream; + use crate::stream::{FromStream, Product, Sum}; pub use merge::Merge; pub use flatten::Flatten; pub use flat_map::FlatMap; + pub use timeout::{TimeoutError, Timeout}; mod merge; mod flatten; mod flat_map; + mod timeout; } extension_trait! { @@ -1159,6 +1162,40 @@ extension_trait! { Skip::new(self, n) } + #[doc=r#" + Await a stream or times out after a duration of time. + + If you want to await an I/O future consider using + [`io::timeout`](../io/fn.timeout.html) instead. + + # Examples + + ``` + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # + use std::time::Duration; + + use async_std::stream; + use async_std::prelude::*; + + let mut s = stream::repeat(1).take(3).timeout(Duration::from_secs(1)); + + while let Some(v) = s.next().await { + assert_eq!(v, Ok(1)); + } + # + # Ok(()) }) } + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn timeout(self, dur: Duration) -> Timeout + where + Self: Stream + Sized, + { + Timeout::new(self, dur) + } + #[doc = r#" A combinator that applies a function as long as it returns successfully, producing a single, final value. Immediately returns the error when the function returns unsuccessfully. @@ -1611,6 +1648,95 @@ extension_trait! { { LtFuture::new(self, other) } + + #[doc = r#" + Sums the elements of an iterator. + + Takes each element, adds them together, and returns the result. + + An empty iterator returns the zero value of the type. + + # Panics + + When calling `sum()` and a primitive integer type is being returned, this + method will panic if the computation overflows and debug assertions are + enabled. + + # Examples + + Basic usage: + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + use async_std::prelude::*; + + let s: VecDeque<_> = vec![0u8, 1, 2, 3, 4].into_iter().collect(); + let sum: u8 = s.sum().await; + + assert_eq!(sum, 10); + # + # }) } + ``` + "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn sum<'a, S>( + self, + ) -> impl Future + 'a [Pin + 'a>>] + where + Self: Sized + Stream + 'a, + S: Sum, + { + Sum::sum(self) + } + + #[doc = r#" + Iterates over the entire iterator, multiplying all the elements + + An empty iterator returns the one value of the type. + + # Panics + + When calling `product()` and a primitive integer type is being returned, + method will panic if the computation overflows and debug assertions are + enabled. + + # Examples + + This example calculates the factorial of n (i.e. the product of the numbers from 1 to + n, inclusive): + + ``` + # fn main() { async_std::task::block_on(async { + # + async fn factorial(n: u32) -> u32 { + use std::collections::VecDeque; + use async_std::prelude::*; + + let s: VecDeque<_> = (1..=n).collect(); + s.product().await + } + + assert_eq!(factorial(0).await, 1); + assert_eq!(factorial(1).await, 1); + assert_eq!(factorial(5).await, 120); + # + # }) } + ``` + "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn product<'a, P>( + self, + ) -> impl Future + 'a [Pin + 'a>>] + where + Self: Sized + Stream + 'a, + P: Product, + { + Product::product(self) + } } impl Stream for Box { diff --git a/src/stream/stream/scan.rs b/src/stream/stream/scan.rs index c4771d85..385edf8e 100644 --- a/src/stream/stream/scan.rs +++ b/src/stream/stream/scan.rs @@ -7,6 +7,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream to maintain state while polling another stream. + /// + /// This `struct` is created by the [`scan`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`scan`]: trait.Stream.html#method.scan + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Scan { #[pin] diff --git a/src/stream/stream/skip.rs b/src/stream/stream/skip.rs index 6562b99f..cc2ba905 100644 --- a/src/stream/stream/skip.rs +++ b/src/stream/stream/skip.rs @@ -7,6 +7,12 @@ use crate::stream::Stream; pin_project! { /// A stream to skip first n elements of another stream. + /// + /// This `struct` is created by the [`skip`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`skip`]: trait.Stream.html#method.skip + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct Skip { #[pin] diff --git a/src/stream/stream/skip_while.rs b/src/stream/stream/skip_while.rs index 0499df23..6435d81c 100644 --- a/src/stream/stream/skip_while.rs +++ b/src/stream/stream/skip_while.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream to skip elements of another stream based on a predicate. + /// + /// This `struct` is created by the [`skip_while`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`skip_while`]: trait.Stream.html#method.skip_while + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct SkipWhile { #[pin] diff --git a/src/stream/stream/step_by.rs b/src/stream/stream/step_by.rs index ab9e45b6..13020982 100644 --- a/src/stream/stream/step_by.rs +++ b/src/stream/stream/step_by.rs @@ -7,6 +7,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that steps a given amount of elements of another stream. + /// + /// This `struct` is created by the [`step_by`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`step_by`]: trait.Stream.html#method.step_by + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct StepBy { #[pin] diff --git a/src/stream/stream/take.rs b/src/stream/stream/take.rs index 835bc447..e680b42b 100644 --- a/src/stream/stream/take.rs +++ b/src/stream/stream/take.rs @@ -7,6 +7,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that yields the first `n` items of another stream. + /// + /// This `struct` is created by the [`take`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`take`]: trait.Stream.html#method.take + /// [`Stream`]: trait.Stream.html #[derive(Clone, Debug)] pub struct Take { #[pin] diff --git a/src/stream/stream/take_while.rs b/src/stream/stream/take_while.rs index bf89458b..35978b47 100644 --- a/src/stream/stream/take_while.rs +++ b/src/stream/stream/take_while.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// A stream that yields elements based on a predicate. + /// + /// This `struct` is created by the [`take_while`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`take_while`]: trait.Stream.html#method.take_while + /// [`Stream`]: trait.Stream.html #[derive(Debug)] pub struct TakeWhile { #[pin] diff --git a/src/stream/stream/timeout.rs b/src/stream/stream/timeout.rs new file mode 100644 index 00000000..3c14811f --- /dev/null +++ b/src/stream/stream/timeout.rs @@ -0,0 +1,63 @@ +use std::error::Error; +use std::fmt; +use std::pin::Pin; +use std::time::Duration; + +use futures_timer::Delay; +use pin_project_lite::pin_project; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + /// A stream with timeout time set + #[derive(Debug)] + pub struct Timeout { + #[pin] + stream: S, + #[pin] + delay: Delay, + } +} + +impl Timeout { + pub fn new(stream: S, dur: Duration) -> Timeout { + let delay = Delay::new(dur); + + Timeout { stream, delay } + } +} + +impl Stream for Timeout { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + match this.stream.poll_next(cx) { + Poll::Ready(Some(v)) => Poll::Ready(Some(Ok(v))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => match this.delay.poll(cx) { + Poll::Ready(_) => Poll::Ready(Some(Err(TimeoutError { _private: () }))), + Poll::Pending => Poll::Pending, + }, + } + } +} + +/// An error returned when a stream times out. +#[cfg_attr(feature = "docs", doc(cfg(unstable)))] +#[cfg(any(feature = "unstable", feature = "docs"))] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub struct TimeoutError { + _private: (), +} + +impl Error for TimeoutError {} + +impl fmt::Display for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + "stream has timed out".fmt(f) + } +} diff --git a/src/stream/stream/zip.rs b/src/stream/stream/zip.rs index 9b7c299b..27681f37 100644 --- a/src/stream/stream/zip.rs +++ b/src/stream/stream/zip.rs @@ -8,6 +8,12 @@ use crate::task::{Context, Poll}; pin_project! { /// An iterator that iterates two other iterators simultaneously. + /// + /// This `struct` is created by the [`zip`] method on [`Stream`]. See its + /// documentation for more. + /// + /// [`zip`]: trait.Stream.html#method.zip + /// [`Stream`]: trait.Stream.html pub struct Zip { item_slot: Option, #[pin] diff --git a/src/stream/sum.rs b/src/stream/sum.rs index a87ade1a..dadbc347 100644 --- a/src/stream/sum.rs +++ b/src/stream/sum.rs @@ -1,3 +1,5 @@ +use std::pin::Pin; + use crate::future::Future; use crate::stream::Stream; @@ -16,8 +18,62 @@ use crate::stream::Stream; pub trait Sum: Sized { /// Method which takes a stream and generates `Self` from the elements by /// "summing up" the items. - fn sum(stream: S) -> F + fn sum<'a, S>(stream: S) -> Pin + 'a>> where - S: Stream, - F: Future; + S: Stream + 'a; } + +use core::ops::Add; +use core::num::Wrapping; +use crate::stream::stream::StreamExt; + +macro_rules! integer_sum { + (@impls $zero: expr, $($a:ty)*) => ($( + impl Sum for $a { + fn sum<'a, S>(stream: S) -> Pin+ 'a>> + where + S: Stream + 'a, + { + Box::pin(async move { stream.fold($zero, Add::add).await } ) + } + } + impl<'a> Sum<&'a $a> for $a { + fn sum<'b, S>(stream: S) -> Pin + 'b>> + where + S: Stream + 'b, + { + Box::pin(async move { stream.fold($zero, Add::add).await } ) + } + } + )*); + ($($a:ty)*) => ( + integer_sum!(@impls 0, $($a)*); + integer_sum!(@impls Wrapping(0), $(Wrapping<$a>)*); + ); +} + +macro_rules! float_sum { + ($($a:ty)*) => ($( + impl Sum for $a { + fn sum<'a, S>(stream: S) -> Pin + 'a>> + where S: Stream + 'a, + { + Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } ) + } + } + impl<'a> Sum<&'a $a> for $a { + fn sum<'b, S>(stream: S) -> Pin + 'b>> + where S: Stream + 'b, + { + Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } ) + } + } + )*); + ($($a:ty)*) => ( + float_sum!(@impls 0.0, $($a)*); + float_sum!(@impls Wrapping(0.0), $(Wrapping<$a>)*); + ); +} + +integer_sum!{ i8 i16 i32 i64 i128 isize u8 u16 u32 u64 u128 usize } +float_sum!{ f32 f64 } diff --git a/src/task/block_on.rs b/src/task/block_on.rs index b0adc387..c10303d7 100644 --- a/src/task/block_on.rs +++ b/src/task/block_on.rs @@ -154,6 +154,7 @@ where fn vtable() -> &'static RawWakerVTable { unsafe fn clone_raw(ptr: *const ()) -> RawWaker { + #![allow(clippy::redundant_clone)] let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker)); mem::forget(arc.clone()); RawWaker::new(ptr, vtable()) diff --git a/src/task/task.rs b/src/task/task.rs index ca3cac14..3d8e1080 100644 --- a/src/task/task.rs +++ b/src/task/task.rs @@ -167,6 +167,7 @@ impl Tag { } pub fn task(&self) -> &Task { + #[allow(clippy::transmute_ptr_to_ptr)] unsafe { let raw = self.raw_metadata.load(Ordering::Acquire); @@ -189,6 +190,7 @@ impl Tag { } } + #[allow(clippy::transmute_ptr_to_ptr)] mem::transmute::<&AtomicUsize, &Option>(&self.raw_metadata) .as_ref() .unwrap() diff --git a/tests/buf_writer.rs b/tests/buf_writer.rs index cb2368aa..5df90e08 100644 --- a/tests/buf_writer.rs +++ b/tests/buf_writer.rs @@ -4,6 +4,7 @@ use async_std::task; #[test] fn test_buffered_writer() { + #![allow(clippy::cognitive_complexity)] task::block_on(async { let inner = Vec::new(); let mut writer = BufWriter::with_capacity(2, inner); diff --git a/tests/channel.rs b/tests/channel.rs index 91622b0d..0c40f5a7 100644 --- a/tests/channel.rs +++ b/tests/channel.rs @@ -37,6 +37,7 @@ fn capacity() { #[test] fn len_empty_full() { + #![allow(clippy::cognitive_complexity)] task::block_on(async { let (s, r) = channel(2); diff --git a/tests/rwlock.rs b/tests/rwlock.rs index ff25e862..370dcb9f 100644 --- a/tests/rwlock.rs +++ b/tests/rwlock.rs @@ -13,7 +13,7 @@ use futures::channel::mpsc; /// Generates a random number in `0..n`. pub fn random(n: u32) -> u32 { thread_local! { - static RNG: Cell> = Cell::new(Wrapping(1406868647)); + static RNG: Cell> = Cell::new(Wrapping(1_406_868_647)); } RNG.with(|rng| {