From 4ef55d4d7bf51cc5d0a97a33b1773cc398da5167 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Fri, 15 Nov 2019 09:01:41 +0900 Subject: [PATCH 01/18] Enable CI on master branch --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 99436b72..dac0ff44 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,7 @@ on: pull_request: push: branches: + - master - staging - trying From 31cf932d808bdfb3cbdf024968b333f23d510547 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 00:24:59 +0900 Subject: [PATCH 02/18] wip: Add stream unzip --- src/stream/stream/mod.rs | 13 ++++++++++ src/stream/stream/unzip.rs | 53 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 src/stream/stream/unzip.rs diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 89385837..900bde3a 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -130,6 +130,7 @@ cfg_unstable! { pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; pub use throttle::Throttle; + pub use unzip::UnzipFuture; mod count; mod merge; @@ -138,6 +139,7 @@ cfg_unstable! { mod partition; mod timeout; mod throttle; + mod unzip; } extension_trait! { @@ -1717,6 +1719,17 @@ extension_trait! { Zip::new(self, other) } + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn unzip(self) -> impl Future [UnzipFuture] + where + FromA: Default + Extend, + FromB: Default + Extend, + Self: Stream + Sized, + { + UnzipFuture::new(self) + } + #[doc = r#" Transforms a stream into a collection. diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs new file mode 100644 index 00000000..ef1ff3a0 --- /dev/null +++ b/src/stream/stream/unzip.rs @@ -0,0 +1,53 @@ +use std::future::Future; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub struct UnzipFuture { + #[pin] + stream: S, + res: (FromA, FromB), + } +} + +impl UnzipFuture +where + FromA: Default, + FromB: Default, +{ + pub(super) fn new(stream: S) -> Self { + UnzipFuture { + stream, + res: (FromA::default(), FromB::default()), + } + } +} + +impl Future for UnzipFuture +where + S: Stream, + FromA: Default + Extend + Copy, + FromB: Default + Extend + Copy, +{ + type Output = (FromA, FromB); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some((a, b)) => { + this.res.0.extend(Some(a)); + this.res.1.extend(Some(b)); + Poll::Pending + } + None => Poll::Ready(*this.res), + } + } +} From 603b3c508559129bf2f866291511e35db0a93c4d Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 01:16:35 +0900 Subject: [PATCH 03/18] add: Add stream unzip --- src/stream/stream/mod.rs | 2 +- src/stream/stream/unzip.rs | 30 ++++++++++++++++++------------ 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 900bde3a..04aa4d68 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -124,13 +124,13 @@ cfg_unstable! { use count::CountFuture; use partition::PartitionFuture; + use unzip::UnzipFuture; pub use merge::Merge; pub use flatten::Flatten; pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; pub use throttle::Throttle; - pub use unzip::UnzipFuture; mod count; mod merge; diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs index ef1ff3a0..4f5dfa19 100644 --- a/src/stream/stream/unzip.rs +++ b/src/stream/stream/unzip.rs @@ -7,12 +7,13 @@ use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { + #[derive(Clone, Debug)] #[cfg(all(feature = "default", feature = "unstable"))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - pub struct UnzipFuture { + pub struct UnzipFuture { #[pin] stream: S, - res: (FromA, FromB), + res: Option<(FromA, FromB)>, } } @@ -24,7 +25,7 @@ where pub(super) fn new(stream: S) -> Self { UnzipFuture { stream, - res: (FromA::default(), FromB::default()), + res: Some((FromA::default(), FromB::default())), } } } @@ -32,22 +33,27 @@ where impl Future for UnzipFuture where S: Stream, - FromA: Default + Extend + Copy, - FromB: Default + Extend + Copy, + FromA: Default + Extend, + FromB: Default + Extend, { type Output = (FromA, FromB); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); - let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); - match next { - Some((a, b)) => { - this.res.0.extend(Some(a)); - this.res.1.extend(Some(b)); - Poll::Pending + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some((a, b)) => { + let mut res = this.res.take().unwrap(); + res.0.extend(Some(a)); + res.1.extend(Some(b)); + + *this.res = Some(res); + } + None => return Poll::Ready(this.res.take().unwrap()), } - None => Poll::Ready(*this.res), } } } From 91ee4c7b9fddcf0c245d9527b75fec2642846702 Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 01:16:48 +0900 Subject: [PATCH 04/18] doc: Add stream unzip doc --- src/stream/stream/mod.rs | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 04aa4d68..e0e9b7e4 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1719,6 +1719,33 @@ extension_trait! { Zip::new(self, other) } + #[doc = r#" + Converts an stream of pairs into a pair of containers. + + unzip() consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements. + + This function is, in some sense, the opposite of [`zip`]. + + [`zip`]: trait.Stream.html#method.zip + + # Example + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let s = stream::from_iter(vec![(1,2), (3,4)]); + + let (left, right): (Vec<_>, Vec<_>) = s.unzip().await; + + assert_eq!(left, [1, 3]); + assert_eq!(right, [2, 4]); + # + # }) } + ``` + "#] #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] fn unzip(self) -> impl Future [UnzipFuture] From a05b6a38104c26a1ffd2eb7ed6e4e32912c856fb Mon Sep 17 00:00:00 2001 From: k-nasa Date: Sat, 16 Nov 2019 01:36:53 +0900 Subject: [PATCH 05/18] fix: mutable ref --- src/stream/stream/unzip.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs index 4f5dfa19..e0832ff7 100644 --- a/src/stream/stream/unzip.rs +++ b/src/stream/stream/unzip.rs @@ -46,11 +46,9 @@ where match next { Some((a, b)) => { - let mut res = this.res.take().unwrap(); + let res = this.res.as_mut().unwrap(); res.0.extend(Some(a)); res.1.extend(Some(b)); - - *this.res = Some(res); } None => return Poll::Ready(this.res.take().unwrap()), } From d146d95a3934ab214c8ec22c2cb029c562b60ef0 Mon Sep 17 00:00:00 2001 From: nasa Date: Wed, 20 Nov 2019 21:38:42 +0900 Subject: [PATCH 06/18] Update src/stream/stream/mod.rs Co-Authored-By: Taiki Endo --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e0e9b7e4..51c41390 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1722,7 +1722,7 @@ extension_trait! { #[doc = r#" Converts an stream of pairs into a pair of containers. - unzip() consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements. + `unzip()` consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements. This function is, in some sense, the opposite of [`zip`]. From e01f07d72a224ff0427dac70db544a12c2c0bf8c Mon Sep 17 00:00:00 2001 From: Pascal Hertleif Date: Thu, 21 Nov 2019 00:22:06 +0100 Subject: [PATCH 07/18] Add context to more errors cc #569 --- src/fs/canonicalize.rs | 8 ++++- src/fs/copy.rs | 7 ++++- src/fs/create_dir.rs | 7 ++++- src/fs/create_dir_all.rs | 7 ++++- src/fs/file.rs | 7 ++--- src/fs/hard_link.rs | 12 +++++++- src/fs/read.rs | 6 +++- src/fs/read_dir.rs | 12 +++++--- src/fs/read_link.rs | 8 ++++- src/fs/read_to_string.rs | 7 ++++- src/fs/remove_dir.rs | 7 ++++- src/fs/remove_dir_all.rs | 7 ++++- src/fs/remove_file.rs | 7 ++++- src/fs/rename.rs | 12 +++++++- src/fs/write.rs | 7 ++++- src/io/copy.rs | 7 +++-- src/io/stdin.rs | 4 ++- src/net/tcp/listener.rs | 9 ++++-- src/net/tcp/stream.rs | 13 ++++++-- src/net/udp/mod.rs | 64 +++++++++++++++++++++++++++++++++++----- tests/verbose_errors.rs | 2 +- 21 files changed, 181 insertions(+), 39 deletions(-) diff --git a/src/fs/canonicalize.rs b/src/fs/canonicalize.rs index 6eb6977d..38a5a6b9 100644 --- a/src/fs/canonicalize.rs +++ b/src/fs/canonicalize.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::{Path, PathBuf}; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Returns the canonical form of a path. /// @@ -32,5 +33,10 @@ use crate::task::spawn_blocking; /// ``` pub async fn canonicalize>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::canonicalize(&path).map(Into::into)).await + spawn_blocking(move || { + std::fs::canonicalize(&path) + .map(Into::into) + .context(|| format!("could not canonicalize `{}`", path.display())) + }) + .await } diff --git a/src/fs/copy.rs b/src/fs/copy.rs index 170b66ec..8fb447bb 100644 --- a/src/fs/copy.rs +++ b/src/fs/copy.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Copies the contents and permissions of a file to a new location. /// @@ -41,5 +42,9 @@ use crate::task::spawn_blocking; pub async fn copy, Q: AsRef>(from: P, to: Q) -> io::Result { let from = from.as_ref().to_owned(); let to = to.as_ref().to_owned(); - spawn_blocking(move || std::fs::copy(&from, &to)).await + spawn_blocking(move || { + std::fs::copy(&from, &to) + .context(|| format!("could not copy `{}` to `{}`", from.display(), to.display())) + }) + .await } diff --git a/src/fs/create_dir.rs b/src/fs/create_dir.rs index 03c24918..37923c05 100644 --- a/src/fs/create_dir.rs +++ b/src/fs/create_dir.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Creates a new directory. /// @@ -34,5 +35,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn create_dir>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::create_dir(path)).await + spawn_blocking(move || { + std::fs::create_dir(&path) + .context(|| format!("could not create directory `{}`", path.display())) + }) + .await } diff --git a/src/fs/create_dir_all.rs b/src/fs/create_dir_all.rs index 15241943..753dfd49 100644 --- a/src/fs/create_dir_all.rs +++ b/src/fs/create_dir_all.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Creates a new directory and all of its parents if they are missing. /// @@ -29,5 +30,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn create_dir_all>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::create_dir_all(path)).await + spawn_blocking(move || { + std::fs::create_dir_all(&path) + .context(|| format!("could not create directory path `{}`", path.display())) + }) + .await } diff --git a/src/fs/file.rs b/src/fs/file.rs index f8242811..24c72c6d 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -9,11 +9,11 @@ use std::sync::{Arc, Mutex}; use crate::fs::{Metadata, Permissions}; use crate::future; -use crate::utils::Context as _; use crate::io::{self, Read, Seek, SeekFrom, Write}; use crate::path::Path; use crate::prelude::*; use crate::task::{self, spawn_blocking, Context, Poll, Waker}; +use crate::utils::Context as _; /// An open file on the filesystem. /// @@ -114,8 +114,7 @@ impl File { pub async fn open>(path: P) -> io::Result { let path = path.as_ref().to_owned(); let file = spawn_blocking(move || { - std::fs::File::open(&path) - .context(|| format!("Could not open {}", path.display())) + std::fs::File::open(&path).context(|| format!("Could not open `{}`", path.display())) }) .await?; Ok(File::new(file, true)) @@ -154,7 +153,7 @@ impl File { let path = path.as_ref().to_owned(); let file = spawn_blocking(move || { std::fs::File::create(&path) - .context(|| format!("Could not create {}", path.display())) + .context(|| format!("Could not create `{}`", path.display())) }) .await?; Ok(File::new(file, true)) diff --git a/src/fs/hard_link.rs b/src/fs/hard_link.rs index e6e56cd5..a6a40698 100644 --- a/src/fs/hard_link.rs +++ b/src/fs/hard_link.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Creates a hard link on the filesystem. /// @@ -32,5 +33,14 @@ use crate::task::spawn_blocking; pub async fn hard_link, Q: AsRef>(from: P, to: Q) -> io::Result<()> { let from = from.as_ref().to_owned(); let to = to.as_ref().to_owned(); - spawn_blocking(move || std::fs::hard_link(&from, &to)).await + spawn_blocking(move || { + std::fs::hard_link(&from, &to).context(|| { + format!( + "could not create a hard link from `{}` to `{}`", + from.display(), + to.display() + ) + }) + }) + .await } diff --git a/src/fs/read.rs b/src/fs/read.rs index ab7d1756..3b568f7a 100644 --- a/src/fs/read.rs +++ b/src/fs/read.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Reads the entire contents of a file as raw bytes. /// @@ -36,5 +37,8 @@ use crate::task::spawn_blocking; /// ``` pub async fn read>(path: P) -> io::Result> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::read(path)).await + spawn_blocking(move || { + std::fs::read(&path).context(|| format!("could not read file `{}`", path.display())) + }) + .await } diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index 5e51065b..d8261a94 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -1,11 +1,12 @@ -use std::pin::Pin; use std::future::Future; +use std::pin::Pin; use crate::fs::DirEntry; use crate::io; use crate::path::Path; use crate::stream::Stream; use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; +use crate::utils::Context as _; /// Returns a stream of entries in a directory. /// @@ -45,9 +46,12 @@ use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; /// ``` pub async fn read_dir>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::read_dir(path)) - .await - .map(ReadDir::new) + spawn_blocking(move || { + std::fs::read_dir(&path) + .context(|| format!("could not read directory `{}`", path.display())) + }) + .await + .map(ReadDir::new) } /// A stream of entries in a directory. diff --git a/src/fs/read_link.rs b/src/fs/read_link.rs index 7ec18a45..d8cabb72 100644 --- a/src/fs/read_link.rs +++ b/src/fs/read_link.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::{Path, PathBuf}; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Reads a symbolic link and returns the path it points to. /// @@ -28,5 +29,10 @@ use crate::task::spawn_blocking; /// ``` pub async fn read_link>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::read_link(path).map(Into::into)).await + spawn_blocking(move || { + std::fs::read_link(&path) + .map(Into::into) + .context(|| format!("could not read link `{}`", path.display())) + }) + .await } diff --git a/src/fs/read_to_string.rs b/src/fs/read_to_string.rs index d06aa614..2378aaed 100644 --- a/src/fs/read_to_string.rs +++ b/src/fs/read_to_string.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Reads the entire contents of a file as a string. /// @@ -37,5 +38,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn read_to_string>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::read_to_string(path)).await + spawn_blocking(move || { + std::fs::read_to_string(&path) + .context(|| format!("could not read file `{}`", path.display())) + }) + .await } diff --git a/src/fs/remove_dir.rs b/src/fs/remove_dir.rs index 1a62db2e..8fdba18c 100644 --- a/src/fs/remove_dir.rs +++ b/src/fs/remove_dir.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Removes an empty directory. /// @@ -29,5 +30,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn remove_dir>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::remove_dir(path)).await + spawn_blocking(move || { + std::fs::remove_dir(&path) + .context(|| format!("could not remove directory `{}`", path.display())) + }) + .await } diff --git a/src/fs/remove_dir_all.rs b/src/fs/remove_dir_all.rs index 33667406..d4bad3a2 100644 --- a/src/fs/remove_dir_all.rs +++ b/src/fs/remove_dir_all.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Removes a directory and all of its contents. /// @@ -29,5 +30,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn remove_dir_all>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::remove_dir_all(path)).await + spawn_blocking(move || { + std::fs::remove_dir_all(&path) + .context(|| format!("could not remove directory `{}`", path.display())) + }) + .await } diff --git a/src/fs/remove_file.rs b/src/fs/remove_file.rs index 9a74ec11..b881f8be 100644 --- a/src/fs/remove_file.rs +++ b/src/fs/remove_file.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Removes a file. /// @@ -29,5 +30,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn remove_file>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::remove_file(path)).await + spawn_blocking(move || { + std::fs::remove_file(&path) + .context(|| format!("could not remove file `{}`", path.display())) + }) + .await } diff --git a/src/fs/rename.rs b/src/fs/rename.rs index ed7f39c9..25fc55fa 100644 --- a/src/fs/rename.rs +++ b/src/fs/rename.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Renames a file or directory to a new location. /// @@ -34,5 +35,14 @@ use crate::task::spawn_blocking; pub async fn rename, Q: AsRef>(from: P, to: Q) -> io::Result<()> { let from = from.as_ref().to_owned(); let to = to.as_ref().to_owned(); - spawn_blocking(move || std::fs::rename(&from, &to)).await + spawn_blocking(move || { + std::fs::rename(&from, &to).context(|| { + format!( + "could not rename `{}` to `{}`", + from.display(), + to.display() + ) + }) + }) + .await } diff --git a/src/fs/write.rs b/src/fs/write.rs index 4e5d20bb..7c14098b 100644 --- a/src/fs/write.rs +++ b/src/fs/write.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Writes a slice of bytes as the new contents of a file. /// @@ -33,5 +34,9 @@ use crate::task::spawn_blocking; pub async fn write, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> { let path = path.as_ref().to_owned(); let contents = contents.as_ref().to_owned(); - spawn_blocking(move || std::fs::write(path, contents)).await + spawn_blocking(move || { + std::fs::write(&path, contents) + .context(|| format!("could not write to file `{}`", path.display())) + }) + .await } diff --git a/src/io/copy.rs b/src/io/copy.rs index 8ec3c1af..f05ed0e1 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -1,10 +1,11 @@ -use std::pin::Pin; use std::future::Future; +use std::pin::Pin; use pin_project_lite::pin_project; use crate::io::{self, BufRead, BufReader, Read, Write}; use crate::task::{Context, Poll}; +use crate::utils::Context as _; /// Copies the entire contents of a reader into a writer. /// @@ -90,7 +91,7 @@ where writer, amt: 0, }; - future.await + future.await.context(|| String::from("io::copy failed")) } /// Copies the entire contents of a reader into a writer. @@ -177,5 +178,5 @@ where writer, amt: 0, }; - future.await + future.await.context(|| String::from("io::copy failed")) } diff --git a/src/io/stdin.rs b/src/io/stdin.rs index 167ea2dd..618393af 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -1,10 +1,11 @@ +use std::future::Future; use std::pin::Pin; use std::sync::Mutex; -use std::future::Future; use crate::future; use crate::io::{self, Read}; use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; +use crate::utils::Context as _; cfg_unstable! { use once_cell::sync::Lazy; @@ -162,6 +163,7 @@ impl Stdin { } }) .await + .context(|| String::from("Could not read line on stdin")) } /// Locks this handle to the standard input stream, returning a readable guard. diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index f98bbdc7..9d944f4b 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,5 +1,5 @@ -use std::net::SocketAddr; use std::future::Future; +use std::net::SocketAddr; use std::pin::Pin; use crate::future; @@ -8,6 +8,7 @@ use crate::net::driver::Watcher; use crate::net::{TcpStream, ToSocketAddrs}; use crate::stream::Stream; use crate::task::{Context, Poll}; +use crate::utils::Context as _; /// A TCP socket server, listening for connections. /// @@ -75,8 +76,12 @@ impl TcpListener { /// [`local_addr`]: #method.local_addr pub async fn bind(addrs: A) -> io::Result { let mut last_err = None; + let addrs = addrs + .to_socket_addrs() + .await + .context(|| String::from("could not resolve addresses"))?; - for addr in addrs.to_socket_addrs().await? { + for addr in addrs { match mio::net::TcpListener::bind(&addr) { Ok(mio_listener) => { return Ok(TcpListener { diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 1da9c7c2..9d3c2ecd 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -7,6 +7,7 @@ use crate::io::{self, Read, Write}; use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; use crate::task::{spawn_blocking, Context, Poll}; +use crate::utils::Context as _; /// A TCP stream between a local and a remote socket. /// @@ -71,11 +72,17 @@ impl TcpStream { /// ``` pub async fn connect(addrs: A) -> io::Result { let mut last_err = None; + let addrs = addrs + .to_socket_addrs() + .await + .context(|| String::from("could not resolve addresses"))?; - for addr in addrs.to_socket_addrs().await? { + for addr in addrs { let res = spawn_blocking(move || { - let std_stream = std::net::TcpStream::connect(addr)?; - let mio_stream = mio::net::TcpStream::from_stream(std_stream)?; + let std_stream = std::net::TcpStream::connect(addr) + .context(|| format!("could not connect to {}", addr))?; + let mio_stream = mio::net::TcpStream::from_stream(std_stream) + .context(|| format!("could not open async connection to {}", addr))?; Ok(TcpStream { watcher: Watcher::new(mio_stream), }) diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 37c9d50c..e6064136 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -5,6 +5,7 @@ use std::net::{Ipv4Addr, Ipv6Addr}; use crate::future; use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; +use crate::utils::Context as _; /// A UDP socket. /// @@ -66,10 +67,14 @@ impl UdpSocket { /// # /// # Ok(()) }) } /// ``` - pub async fn bind(addr: A) -> io::Result { + pub async fn bind(addrs: A) -> io::Result { let mut last_err = None; + let addrs = addrs + .to_socket_addrs() + .await + .context(|| String::from("could not resolve addresses"))?; - for addr in addr.to_socket_addrs().await? { + for addr in addrs { match mio::net::UdpSocket::bind(&addr) { Ok(mio_socket) => { return Ok(UdpSocket { @@ -106,7 +111,10 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub fn local_addr(&self) -> io::Result { - self.watcher.get_ref().local_addr() + self.watcher + .get_ref() + .local_addr() + .context(|| String::from("could not get local address")) } /// Sends data on the socket to the given address. @@ -151,6 +159,7 @@ impl UdpSocket { .poll_write_with(cx, |inner| inner.send_to(buf, &addr)) }) .await + .context(|| format!("Could not send packet to {}", addr)) } /// Receives data from the socket. @@ -178,6 +187,17 @@ impl UdpSocket { .poll_read_with(cx, |inner| inner.recv_from(buf)) }) .await + .context(|| { + use std::fmt::Write; + + let mut error = String::from("Could not receive data on "); + if let Ok(addr) = self.local_addr() { + let _ = write!(&mut error, "{}", addr); + } else { + error.push_str("socket"); + } + error + }) } /// Connects the UDP socket to a remote address. @@ -195,7 +215,7 @@ impl UdpSocket { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use async_std::net::UdpSocket; + /// use async_std::net::UdpSocket; /// /// let socket = UdpSocket::bind("127.0.0.1:0").await?; /// socket.connect("127.0.0.1:8080").await?; @@ -204,8 +224,12 @@ impl UdpSocket { /// ``` pub async fn connect(&self, addrs: A) -> io::Result<()> { let mut last_err = None; + let addrs = addrs + .to_socket_addrs() + .await + .context(|| String::from("could not resolve addresses"))?; - for addr in addrs.to_socket_addrs().await? { + for addr in addrs { // TODO(stjepang): connect on the blocking pool match self.watcher.get_ref().connect(addr) { Ok(()) => return Ok(()), @@ -248,7 +272,19 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { - future::poll_fn(|cx| self.watcher.poll_write_with(cx, |inner| inner.send(buf))).await + future::poll_fn(|cx| self.watcher.poll_write_with(cx, |inner| inner.send(buf))) + .await + .context(|| { + use std::fmt::Write; + + let mut error = String::from("Could not send data on "); + if let Ok(addr) = self.local_addr() { + let _ = write!(&mut error, "{}", addr); + } else { + error.push_str("socket"); + } + error + }) } /// Receives data from the socket. @@ -271,7 +307,19 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { - future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.recv(buf))).await + future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.recv(buf))) + .await + .context(|| { + use std::fmt::Write; + + let mut error = String::from("Could not receive data on "); + if let Ok(addr) = self.local_addr() { + let _ = write!(&mut error, "{}", addr); + } else { + error.push_str("socket"); + } + error + }) } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -415,7 +463,7 @@ impl UdpSocket { /// use async_std::net::UdpSocket; /// /// let socket_addr = SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0); - /// let mdns_addr = Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0x0123) ; + /// let mdns_addr = Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0x0123); /// let socket = UdpSocket::bind(&socket_addr).await?; /// /// socket.join_multicast_v6(&mdns_addr, 0)?; diff --git a/tests/verbose_errors.rs b/tests/verbose_errors.rs index 54d04f8d..207d0b27 100644 --- a/tests/verbose_errors.rs +++ b/tests/verbose_errors.rs @@ -8,7 +8,7 @@ fn open_file() { match res { Ok(_) => panic!("Found file with random name: We live in a simulation"), Err(e) => assert_eq!( - "Could not open /ashjudlkahasdasdsikdhajik/asdasdasdasdasdasd/fjuiklashdbflasas", + "Could not open `/ashjudlkahasdasdsikdhajik/asdasdasdasdasdasd/fjuiklashdbflasas`", &format!("{}", e) ), } From aa7d1c27a42b262ef47a68e0fb61f5078fb18a33 Mon Sep 17 00:00:00 2001 From: Pascal Hertleif Date: Mon, 25 Nov 2019 21:18:40 +0100 Subject: [PATCH 08/18] Verbose errors: Apply suggestions Co-Authored-By: Yoshua Wuyts --- src/fs/file.rs | 4 ++-- src/io/stdin.rs | 2 +- src/net/udp/mod.rs | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/fs/file.rs b/src/fs/file.rs index 24c72c6d..c111e013 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -114,7 +114,7 @@ impl File { pub async fn open>(path: P) -> io::Result { let path = path.as_ref().to_owned(); let file = spawn_blocking(move || { - std::fs::File::open(&path).context(|| format!("Could not open `{}`", path.display())) + std::fs::File::open(&path).context(|| format!("could not open `{}`", path.display())) }) .await?; Ok(File::new(file, true)) @@ -153,7 +153,7 @@ impl File { let path = path.as_ref().to_owned(); let file = spawn_blocking(move || { std::fs::File::create(&path) - .context(|| format!("Could not create `{}`", path.display())) + .context(|| format!("could not create `{}`", path.display())) }) .await?; Ok(File::new(file, true)) diff --git a/src/io/stdin.rs b/src/io/stdin.rs index 618393af..369ccae4 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -163,7 +163,7 @@ impl Stdin { } }) .await - .context(|| String::from("Could not read line on stdin")) + .context(|| String::from("could not read line on stdin")) } /// Locks this handle to the standard input stream, returning a readable guard. diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index e6064136..a1ca03f3 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -159,7 +159,7 @@ impl UdpSocket { .poll_write_with(cx, |inner| inner.send_to(buf, &addr)) }) .await - .context(|| format!("Could not send packet to {}", addr)) + .context(|| format!("could not send packet to {}", addr)) } /// Receives data from the socket. @@ -190,7 +190,7 @@ impl UdpSocket { .context(|| { use std::fmt::Write; - let mut error = String::from("Could not receive data on "); + let mut error = String::from("could not receive data on "); if let Ok(addr) = self.local_addr() { let _ = write!(&mut error, "{}", addr); } else { @@ -277,7 +277,7 @@ impl UdpSocket { .context(|| { use std::fmt::Write; - let mut error = String::from("Could not send data on "); + let mut error = String::from("could not send data on "); if let Ok(addr) = self.local_addr() { let _ = write!(&mut error, "{}", addr); } else { @@ -312,7 +312,7 @@ impl UdpSocket { .context(|| { use std::fmt::Write; - let mut error = String::from("Could not receive data on "); + let mut error = String::from("could not receive data on "); if let Ok(addr) = self.local_addr() { let _ = write!(&mut error, "{}", addr); } else { From 56538ebd9117402d2c8e69a67a08b4ea8b5e660f Mon Sep 17 00:00:00 2001 From: Pascal Hertleif Date: Mon, 25 Nov 2019 21:41:54 +0100 Subject: [PATCH 09/18] Improve verbose errors for socket addresses Moves the point of adding error context to the net::addr module so that we have access to the raw address input and can include it in the error message. --- src/net/addr.rs | 32 +++++++++++++++++++++++++++----- src/net/tcp/listener.rs | 4 +--- src/net/tcp/stream.rs | 3 +-- src/net/udp/mod.rs | 3 +-- tests/verbose_errors.rs | 17 ++++++++++++++++- 5 files changed, 46 insertions(+), 13 deletions(-) diff --git a/src/net/addr.rs b/src/net/addr.rs index 2769dd5e..ea839500 100644 --- a/src/net/addr.rs +++ b/src/net/addr.rs @@ -1,11 +1,12 @@ +use std::future::Future; use std::mem; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; use std::pin::Pin; -use std::future::Future; use crate::io; use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; +use crate::utils::Context as ErrorContext; cfg_not_docs! { macro_rules! ret { @@ -67,6 +68,18 @@ pub enum ToSocketAddrsFuture { Done, } +/// Wrap `std::io::Error` with additional message +/// +/// Keeps the original error kind and stores the original I/O error as `source`. +impl ErrorContext for ToSocketAddrsFuture { + fn context(self, message: impl Fn() -> String) -> Self { + match self { + ToSocketAddrsFuture::Ready(res) => ToSocketAddrsFuture::Ready(res.context(message)), + x => x, + } + } +} + impl> Future for ToSocketAddrsFuture { type Output = io::Result; @@ -110,7 +123,9 @@ impl ToSocketAddrs for SocketAddrV4 { impl Future, ToSocketAddrsFuture ) { - SocketAddr::V4(*self).to_socket_addrs() + SocketAddr::V4(*self) + .to_socket_addrs() + .context(|| format!("could not resolve address `{}`", self)) } } @@ -123,7 +138,9 @@ impl ToSocketAddrs for SocketAddrV6 { impl Future, ToSocketAddrsFuture ) { - SocketAddr::V6(*self).to_socket_addrs() + SocketAddr::V6(*self) + .to_socket_addrs() + .context(|| format!("could not resolve address `{}`", self)) } } @@ -195,7 +212,9 @@ impl ToSocketAddrs for (&str, u16) { let host = host.to_string(); let task = spawn_blocking(move || { - std::net::ToSocketAddrs::to_socket_addrs(&(host.as_str(), port)) + let addr = (host.as_str(), port); + std::net::ToSocketAddrs::to_socket_addrs(&addr) + .context(|| format!("could not resolve address `{:?}`", addr)) }); ToSocketAddrsFuture::Resolving(task) } @@ -215,7 +234,10 @@ impl ToSocketAddrs for str { } let addr = self.to_string(); - let task = spawn_blocking(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str())); + let task = spawn_blocking(move || { + std::net::ToSocketAddrs::to_socket_addrs(addr.as_str()) + .context(|| format!("could not resolve address `{:?}`", addr)) + }); ToSocketAddrsFuture::Resolving(task) } } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 9d944f4b..fe06a96d 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -8,7 +8,6 @@ use crate::net::driver::Watcher; use crate::net::{TcpStream, ToSocketAddrs}; use crate::stream::Stream; use crate::task::{Context, Poll}; -use crate::utils::Context as _; /// A TCP socket server, listening for connections. /// @@ -78,8 +77,7 @@ impl TcpListener { let mut last_err = None; let addrs = addrs .to_socket_addrs() - .await - .context(|| String::from("could not resolve addresses"))?; + .await?; for addr in addrs { match mio::net::TcpListener::bind(&addr) { diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 9d3c2ecd..41317833 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -74,8 +74,7 @@ impl TcpStream { let mut last_err = None; let addrs = addrs .to_socket_addrs() - .await - .context(|| String::from("could not resolve addresses"))?; + .await?; for addr in addrs { let res = spawn_blocking(move || { diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index a1ca03f3..7fef1ed5 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -71,8 +71,7 @@ impl UdpSocket { let mut last_err = None; let addrs = addrs .to_socket_addrs() - .await - .context(|| String::from("could not resolve addresses"))?; + .await?; for addr in addrs { match mio::net::UdpSocket::bind(&addr) { diff --git a/tests/verbose_errors.rs b/tests/verbose_errors.rs index 207d0b27..15630928 100644 --- a/tests/verbose_errors.rs +++ b/tests/verbose_errors.rs @@ -1,4 +1,4 @@ -use async_std::{fs, task}; +use async_std::{fs, io, net::ToSocketAddrs, task}; #[test] fn open_file() { @@ -14,3 +14,18 @@ fn open_file() { } }) } + +#[test] +fn resolve_address() { + task::block_on(async { + let non_existing_addr = "ashjudlkahasdasdsikdhajik.asdasdasdasdasdasd.fjuiklashdbflasas:80"; + let res: Result<_, io::Error> = non_existing_addr.to_socket_addrs().await; + match res { + Ok(_) => panic!("Found address with random name: We live in a simulation"), + Err(e) => assert_eq!( + "could not resolve address `\"ashjudlkahasdasdsikdhajik.asdasdasdasdasdasd.fjuiklashdbflasas:80\"`", + &format!("{}", e) + ), + } + }) +} From e66e2e2b8fc8332f84eabbab3256c7e1b1a6f69c Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Tue, 26 Nov 2019 11:39:57 +0100 Subject: [PATCH 10/18] link to our contribution guidelines Signed-off-by: Yoshua Wuyts --- .github/CONTRIBUTING.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .github/CONTRIBUTING.md diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md new file mode 100644 index 00000000..708d2021 --- /dev/null +++ b/.github/CONTRIBUTING.md @@ -0,0 +1,3 @@ +Our contribution policy can be found at [async.rs/contribute][policy]. + +[policy]: https://async.rs/contribute/ From 0f30ab8c0a67707e57ac238e7e6b42821b05ba8a Mon Sep 17 00:00:00 2001 From: boats Date: Tue, 26 Nov 2019 14:23:10 +0100 Subject: [PATCH 11/18] Fix the docs and Debug output of BufWriter. (#588) The BufWriter docs inaccurately stated that it flushes on drop, which it does not do. This PR changes the docs, as well as the example, to highlight that the user must explicitly flush a bufwriter. There were also two places where the BufWriter code referred to it as a BufReader: in the link to the std docs, and in the Debug output. Those have also been fixed. --- src/io/buf_writer.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index ce6a97b3..c527d027 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -22,14 +22,14 @@ pin_project! { /// times. It also provides no advantage when writing to a destination that is /// in memory, like a `Vec`. /// - /// When the `BufWriter` is dropped, the contents of its buffer will be written - /// out. However, any errors that happen in the process of flushing the buffer - /// when the writer is dropped will be ignored. Code that wishes to handle such - /// errors must manually call [`flush`] before the writer is dropped. + /// Unlike the `BufWriter` type in `std`, this type does not write out the + /// contents of its buffer when it is dropped. Therefore, it is absolutely + /// critical that users explicitly flush the buffer before dropping a + /// `BufWriter`. /// - /// This type is an async version of [`std::io::BufReader`]. + /// This type is an async version of [`std::io::BufWriter`]. /// - /// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html + /// [`std::io::BufWriter`]: https://doc.rust-lang.org/std/io/struct.BufWriter.html /// /// # Examples /// @@ -61,10 +61,13 @@ pin_project! { /// use async_std::prelude::*; /// /// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); + /// /// for i in 0..10 { /// let arr = [i+1]; /// stream.write(&arr).await?; /// }; + /// + /// stream.flush().await?; /// # /// # Ok(()) }) } /// ``` @@ -325,7 +328,7 @@ impl Write for BufWriter { impl fmt::Debug for BufWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufReader") + f.debug_struct("BufWriter") .field("writer", &self.inner) .field("buf", &self.buf) .finish() From 794e331761634ef6c89f8f57fd409fa415e0521c Mon Sep 17 00:00:00 2001 From: nasa Date: Wed, 27 Nov 2019 21:38:38 +0900 Subject: [PATCH 12/18] Refactor join type (#577) * refactor: update future join type * test: update future join test * update future::try_join --- src/future/future/join.rs | 6 +++--- src/future/future/mod.rs | 20 ++++++++++---------- src/future/future/try_join.rs | 12 ++++++------ 3 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/future/future/join.rs b/src/future/future/join.rs index 5cfbd99a..0febcad0 100644 --- a/src/future/future/join.rs +++ b/src/future/future/join.rs @@ -12,7 +12,7 @@ pin_project! { pub struct Join where L: Future, - R: Future + R: Future, { #[pin] left: MaybeDone, #[pin] right: MaybeDone, @@ -22,7 +22,7 @@ pin_project! { impl Join where L: Future, - R: Future, + R: Future, { pub(crate) fn new(left: L, right: R) -> Self { Self { @@ -35,7 +35,7 @@ where impl Future for Join where L: Future, - R: Future, + R: Future, { type Output = (L::Output, R::Output); diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index 5fdaf4b1..0f9ef586 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs @@ -289,10 +289,10 @@ extension_trait! { use async_std::future; let a = future::ready(1u8); - let b = future::ready(2u8); + let b = future::ready(2u16); let f = a.join(b); - assert_eq!(f.await, (1u8, 2u8)); + assert_eq!(f.await, (1u8, 2u16)); # }); ``` "#] @@ -304,7 +304,7 @@ extension_trait! { ) -> impl Future::Output, ::Output)> [Join] where Self: std::future::Future + Sized, - F: std::future::Future::Output>, + F: std::future::Future, { Join::new(self, other) } @@ -328,30 +328,30 @@ extension_trait! { use async_std::prelude::*; use async_std::future; - let a = future::ready(Err("Error")); + let a = future::ready(Err::("Error")); let b = future::ready(Ok(1u8)); let f = a.try_join(b); assert_eq!(f.await, Err("Error")); let a = future::ready(Ok::(1u8)); - let b = future::ready(Ok::(2u8)); + let b = future::ready(Ok::(2u16)); let f = a.try_join(b); - assert_eq!(f.await, Ok((1u8, 2u8))); + assert_eq!(f.await, Ok((1u8, 2u16))); # # Ok(()) }) } ``` "#] #[cfg(any(feature = "unstable", feature = "docs"))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn try_join( + fn try_join( self, other: F - ) -> impl Future> [TryJoin] + ) -> impl Future> [TryJoin] where - Self: std::future::Future> + Sized, - F: std::future::Future::Output>, + Self: std::future::Future> + Sized, + F: std::future::Future>, { TryJoin::new(self, other) } diff --git a/src/future/future/try_join.rs b/src/future/future/try_join.rs index 58ae6d62..f1667240 100644 --- a/src/future/future/try_join.rs +++ b/src/future/future/try_join.rs @@ -12,7 +12,7 @@ pin_project! { pub struct TryJoin where L: Future, - R: Future + R: Future, { #[pin] left: MaybeDone, #[pin] right: MaybeDone, @@ -22,7 +22,7 @@ pin_project! { impl TryJoin where L: Future, - R: Future, + R: Future, { pub(crate) fn new(left: L, right: R) -> Self { Self { @@ -32,12 +32,12 @@ where } } -impl Future for TryJoin +impl Future for TryJoin where - L: Future>, - R: Future, + L: Future>, + R: Future>, { - type Output = Result<(T, T), E>; + type Output = Result<(A, B), E>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); From 68005661d9b83ab59b9894b516f823d41a29e0dc Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 27 Nov 2019 15:47:27 +0100 Subject: [PATCH 13/18] fix Stream::throttle hot loop (#584) Signed-off-by: Yoshua Wuyts --- src/stream/stream/mod.rs | 23 +++++++---------------- src/stream/stream/throttle.rs | 5 +---- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e68e6acd..f7a1ba2c 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -336,28 +336,19 @@ extension_trait! { let start = Instant::now(); // emit value every 5 milliseconds - let s = stream::interval(Duration::from_millis(5)) - .enumerate() - .take(3); + let s = stream::interval(Duration::from_millis(5)).take(2); // throttle for 10 milliseconds let mut s = s.throttle(Duration::from_millis(10)); - assert_eq!(s.next().await, Some((0, ()))); - let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 5); + s.next().await; + assert!(start.elapsed().as_millis() >= 5); - assert_eq!(s.next().await, Some((1, ()))); - let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 15); + s.next().await; + assert!(start.elapsed().as_millis() >= 15); - assert_eq!(s.next().await, Some((2, ()))); - let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 25); - - assert_eq!(s.next().await, None); - let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 35); + s.next().await; + assert!(start.elapsed().as_millis() >= 35); # # }) } ``` diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index b2480bbd..ce8c13b3 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -55,10 +55,7 @@ impl Stream for Throttle { } match this.stream.poll_next(cx) { - Poll::Pending => { - cx.waker().wake_by_ref(); // Continue driving even though emitting Pending - Poll::Pending - } + Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { *this.blocked = true; From dba416608a79b8aed65f364e2d11745c0651a80f Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Wed, 27 Nov 2019 16:05:15 +0100 Subject: [PATCH 14/18] 1.2.0 (#589) Signed-off-by: Yoshua Wuyts --- CHANGELOG.md | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7977be76..5c9283f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,46 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview ## [Unreleased] +# [1.2.0] - 2019-11-28 + +[API Documentation](https://docs.rs/async-std/1.2.0/async-std) + +This patch includes some minor quality-of-life improvements, introduces a +new `Stream::unzip` API, and adds verbose errors to our networking types. + +This means if you can't connect to a socket, you'll never have to wonder again +*which* address it was you couldn't connect to, instead of having to go through +the motions to debug what the address was. + +## Example + +Unzip a stream of tuples into two collections: + +```rust +use async_std::prelude::*; +use async_std::stream; + +let s = stream::from_iter(vec![(1,2), (3,4)]); + +let (left, right): (Vec<_>, Vec<_>) = s.unzip().await; + +assert_eq!(left, [1, 3]); +assert_eq!(right, [2, 4]); +``` + +## Added + +- Added `Stream::unzip` as "unstable". +- Added verbose errors to the networking types. + +## Changed + +- Enabled CI on master branch. + +## Fixed + +- Fixed the docs and `Debug` output of `BufWriter`. + # [1.1.0] - 2019-11-21 [API Documentation](https://docs.rs/async-std/1.1.0/async-std) From 0165d7f6d11f03ff8d281492828e3f2146f1b8d3 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 27 Nov 2019 16:08:57 +0100 Subject: [PATCH 15/18] Add missing items to the changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c9283f0..533dcaf3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,10 +42,13 @@ assert_eq!(right, [2, 4]); ## Changed - Enabled CI on master branch. +- `Future::join` and `Future::try_join` can now join futures with different + output types. ## Fixed - Fixed the docs and `Debug` output of `BufWriter`. +- Fixed a bug in `Stream::throttle` that made it consume too much CPU. # [1.1.0] - 2019-11-21 From 4ed15d67c9c361c2fa576ac6cf89c98e5c3ed4b4 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 27 Nov 2019 16:12:31 +0100 Subject: [PATCH 16/18] Fix links in the changelog --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 533dcaf3..e637d4d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview ## [Unreleased] -# [1.2.0] - 2019-11-28 +# [1.2.0] - 2019-11-27 [API Documentation](https://docs.rs/async-std/1.2.0/async-std) @@ -553,7 +553,8 @@ task::blocking(async { - Initial beta release -[Unreleased]: https://github.com/async-rs/async-std/compare/v1.1.0...HEAD +[Unreleased]: https://github.com/async-rs/async-std/compare/v1.2.0...HEAD +[1.2.0]: https://github.com/async-rs/async-std/compare/v1.1.0...v1.2.0 [1.1.0]: https://github.com/async-rs/async-std/compare/v1.0.1...v1.1.0 [1.0.1]: https://github.com/async-rs/async-std/compare/v1.0.0...v1.0.1 [1.0.0]: https://github.com/async-rs/async-std/compare/v0.99.12...v1.0.0 From 9627826756b658bbbcca3323d5b3865d2d040646 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 27 Nov 2019 16:14:43 +0100 Subject: [PATCH 17/18] Bump the version to 1.2.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 9df05660..7c4613b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-std" -version = "1.1.0" +version = "1.2.0" authors = [ "Stjepan Glavina ", "Yoshua Wuyts ", From bf9ee8881542cc4b8e06e07145f3fd5ae2807216 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Wed, 27 Nov 2019 16:29:29 +0100 Subject: [PATCH 18/18] Fix a typo --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e637d4d7..fe32794c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,7 +26,7 @@ Unzip a stream of tuples into two collections: use async_std::prelude::*; use async_std::stream; -let s = stream::from_iter(vec![(1,2), (3,4)]); +let s = stream::from_iter(vec![(1,2), (3,4)]); let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;