diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md new file mode 100644 index 0000000..708d202 --- /dev/null +++ b/.github/CONTRIBUTING.md @@ -0,0 +1,3 @@ +Our contribution policy can be found at [async.rs/contribute][policy]. + +[policy]: https://async.rs/contribute/ diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1d14e21..27f0318 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,7 @@ on: pull_request: push: branches: + - master - staging - trying diff --git a/CHANGELOG.md b/CHANGELOG.md index 7977be7..fe32794 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,49 @@ and this project adheres to [Semantic Versioning](https://book.async.rs/overview ## [Unreleased] +# [1.2.0] - 2019-11-27 + +[API Documentation](https://docs.rs/async-std/1.2.0/async-std) + +This patch includes some minor quality-of-life improvements, introduces a +new `Stream::unzip` API, and adds verbose errors to our networking types. + +This means if you can't connect to a socket, you'll never have to wonder again +*which* address it was you couldn't connect to, instead of having to go through +the motions to debug what the address was. + +## Example + +Unzip a stream of tuples into two collections: + +```rust +use async_std::prelude::*; +use async_std::stream; + +let s = stream::from_iter(vec![(1,2), (3,4)]); + +let (left, right): (Vec<_>, Vec<_>) = s.unzip().await; + +assert_eq!(left, [1, 3]); +assert_eq!(right, [2, 4]); +``` + +## Added + +- Added `Stream::unzip` as "unstable". +- Added verbose errors to the networking types. + +## Changed + +- Enabled CI on master branch. +- `Future::join` and `Future::try_join` can now join futures with different + output types. + +## Fixed + +- Fixed the docs and `Debug` output of `BufWriter`. +- Fixed a bug in `Stream::throttle` that made it consume too much CPU. + # [1.1.0] - 2019-11-21 [API Documentation](https://docs.rs/async-std/1.1.0/async-std) @@ -510,7 +553,8 @@ task::blocking(async { - Initial beta release -[Unreleased]: https://github.com/async-rs/async-std/compare/v1.1.0...HEAD +[Unreleased]: https://github.com/async-rs/async-std/compare/v1.2.0...HEAD +[1.2.0]: https://github.com/async-rs/async-std/compare/v1.1.0...v1.2.0 [1.1.0]: https://github.com/async-rs/async-std/compare/v1.0.1...v1.1.0 [1.0.1]: https://github.com/async-rs/async-std/compare/v1.0.0...v1.0.1 [1.0.0]: https://github.com/async-rs/async-std/compare/v0.99.12...v1.0.0 diff --git a/Cargo.toml b/Cargo.toml index 9df0566..7c4613b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "async-std" -version = "1.1.0" +version = "1.2.0" authors = [ "Stjepan Glavina ", "Yoshua Wuyts ", diff --git a/src/fs/canonicalize.rs b/src/fs/canonicalize.rs index 6eb6977..38a5a6b 100644 --- a/src/fs/canonicalize.rs +++ b/src/fs/canonicalize.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::{Path, PathBuf}; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Returns the canonical form of a path. /// @@ -32,5 +33,10 @@ use crate::task::spawn_blocking; /// ``` pub async fn canonicalize>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::canonicalize(&path).map(Into::into)).await + spawn_blocking(move || { + std::fs::canonicalize(&path) + .map(Into::into) + .context(|| format!("could not canonicalize `{}`", path.display())) + }) + .await } diff --git a/src/fs/copy.rs b/src/fs/copy.rs index 170b66e..8fb447b 100644 --- a/src/fs/copy.rs +++ b/src/fs/copy.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Copies the contents and permissions of a file to a new location. /// @@ -41,5 +42,9 @@ use crate::task::spawn_blocking; pub async fn copy, Q: AsRef>(from: P, to: Q) -> io::Result { let from = from.as_ref().to_owned(); let to = to.as_ref().to_owned(); - spawn_blocking(move || std::fs::copy(&from, &to)).await + spawn_blocking(move || { + std::fs::copy(&from, &to) + .context(|| format!("could not copy `{}` to `{}`", from.display(), to.display())) + }) + .await } diff --git a/src/fs/create_dir.rs b/src/fs/create_dir.rs index 03c2491..37923c0 100644 --- a/src/fs/create_dir.rs +++ b/src/fs/create_dir.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Creates a new directory. /// @@ -34,5 +35,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn create_dir>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::create_dir(path)).await + spawn_blocking(move || { + std::fs::create_dir(&path) + .context(|| format!("could not create directory `{}`", path.display())) + }) + .await } diff --git a/src/fs/create_dir_all.rs b/src/fs/create_dir_all.rs index 1524194..753dfd4 100644 --- a/src/fs/create_dir_all.rs +++ b/src/fs/create_dir_all.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Creates a new directory and all of its parents if they are missing. /// @@ -29,5 +30,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn create_dir_all>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::create_dir_all(path)).await + spawn_blocking(move || { + std::fs::create_dir_all(&path) + .context(|| format!("could not create directory path `{}`", path.display())) + }) + .await } diff --git a/src/fs/file.rs b/src/fs/file.rs index 94e2989..7fe99ee 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -9,11 +9,11 @@ use std::sync::{Arc, Mutex}; use crate::fs::{Metadata, Permissions}; use crate::future; -use crate::utils::Context as _; use crate::io::{self, Read, Seek, SeekFrom, Write}; use crate::path::Path; use crate::prelude::*; use crate::task::{self, spawn_blocking, Context, Poll, Waker}; +use crate::utils::Context as _; /// An open file on the filesystem. /// @@ -114,8 +114,7 @@ impl File { pub async fn open>(path: P) -> io::Result { let path = path.as_ref().to_owned(); let file = spawn_blocking(move || { - std::fs::File::open(&path) - .context(|| format!("Could not open {}", path.display())) + std::fs::File::open(&path).context(|| format!("could not open `{}`", path.display())) }) .await?; Ok(File::new(file, true)) @@ -154,7 +153,7 @@ impl File { let path = path.as_ref().to_owned(); let file = spawn_blocking(move || { std::fs::File::create(&path) - .context(|| format!("Could not create {}", path.display())) + .context(|| format!("could not create `{}`", path.display())) }) .await?; Ok(File::new(file, true)) diff --git a/src/fs/hard_link.rs b/src/fs/hard_link.rs index e6e56cd..a6a4069 100644 --- a/src/fs/hard_link.rs +++ b/src/fs/hard_link.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Creates a hard link on the filesystem. /// @@ -32,5 +33,14 @@ use crate::task::spawn_blocking; pub async fn hard_link, Q: AsRef>(from: P, to: Q) -> io::Result<()> { let from = from.as_ref().to_owned(); let to = to.as_ref().to_owned(); - spawn_blocking(move || std::fs::hard_link(&from, &to)).await + spawn_blocking(move || { + std::fs::hard_link(&from, &to).context(|| { + format!( + "could not create a hard link from `{}` to `{}`", + from.display(), + to.display() + ) + }) + }) + .await } diff --git a/src/fs/read.rs b/src/fs/read.rs index ab7d175..3b568f7 100644 --- a/src/fs/read.rs +++ b/src/fs/read.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Reads the entire contents of a file as raw bytes. /// @@ -36,5 +37,8 @@ use crate::task::spawn_blocking; /// ``` pub async fn read>(path: P) -> io::Result> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::read(path)).await + spawn_blocking(move || { + std::fs::read(&path).context(|| format!("could not read file `{}`", path.display())) + }) + .await } diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index 5e51065..d8261a9 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -1,11 +1,12 @@ -use std::pin::Pin; use std::future::Future; +use std::pin::Pin; use crate::fs::DirEntry; use crate::io; use crate::path::Path; use crate::stream::Stream; use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; +use crate::utils::Context as _; /// Returns a stream of entries in a directory. /// @@ -45,9 +46,12 @@ use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; /// ``` pub async fn read_dir>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::read_dir(path)) - .await - .map(ReadDir::new) + spawn_blocking(move || { + std::fs::read_dir(&path) + .context(|| format!("could not read directory `{}`", path.display())) + }) + .await + .map(ReadDir::new) } /// A stream of entries in a directory. diff --git a/src/fs/read_link.rs b/src/fs/read_link.rs index 7ec18a4..d8cabb7 100644 --- a/src/fs/read_link.rs +++ b/src/fs/read_link.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::{Path, PathBuf}; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Reads a symbolic link and returns the path it points to. /// @@ -28,5 +29,10 @@ use crate::task::spawn_blocking; /// ``` pub async fn read_link>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::read_link(path).map(Into::into)).await + spawn_blocking(move || { + std::fs::read_link(&path) + .map(Into::into) + .context(|| format!("could not read link `{}`", path.display())) + }) + .await } diff --git a/src/fs/read_to_string.rs b/src/fs/read_to_string.rs index d06aa61..2378aae 100644 --- a/src/fs/read_to_string.rs +++ b/src/fs/read_to_string.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Reads the entire contents of a file as a string. /// @@ -37,5 +38,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn read_to_string>(path: P) -> io::Result { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::read_to_string(path)).await + spawn_blocking(move || { + std::fs::read_to_string(&path) + .context(|| format!("could not read file `{}`", path.display())) + }) + .await } diff --git a/src/fs/remove_dir.rs b/src/fs/remove_dir.rs index 1a62db2..8fdba18 100644 --- a/src/fs/remove_dir.rs +++ b/src/fs/remove_dir.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Removes an empty directory. /// @@ -29,5 +30,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn remove_dir>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::remove_dir(path)).await + spawn_blocking(move || { + std::fs::remove_dir(&path) + .context(|| format!("could not remove directory `{}`", path.display())) + }) + .await } diff --git a/src/fs/remove_dir_all.rs b/src/fs/remove_dir_all.rs index 3366740..d4bad3a 100644 --- a/src/fs/remove_dir_all.rs +++ b/src/fs/remove_dir_all.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Removes a directory and all of its contents. /// @@ -29,5 +30,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn remove_dir_all>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::remove_dir_all(path)).await + spawn_blocking(move || { + std::fs::remove_dir_all(&path) + .context(|| format!("could not remove directory `{}`", path.display())) + }) + .await } diff --git a/src/fs/remove_file.rs b/src/fs/remove_file.rs index 9a74ec1..b881f8b 100644 --- a/src/fs/remove_file.rs +++ b/src/fs/remove_file.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Removes a file. /// @@ -29,5 +30,9 @@ use crate::task::spawn_blocking; /// ``` pub async fn remove_file>(path: P) -> io::Result<()> { let path = path.as_ref().to_owned(); - spawn_blocking(move || std::fs::remove_file(path)).await + spawn_blocking(move || { + std::fs::remove_file(&path) + .context(|| format!("could not remove file `{}`", path.display())) + }) + .await } diff --git a/src/fs/rename.rs b/src/fs/rename.rs index ed7f39c..25fc55f 100644 --- a/src/fs/rename.rs +++ b/src/fs/rename.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Renames a file or directory to a new location. /// @@ -34,5 +35,14 @@ use crate::task::spawn_blocking; pub async fn rename, Q: AsRef>(from: P, to: Q) -> io::Result<()> { let from = from.as_ref().to_owned(); let to = to.as_ref().to_owned(); - spawn_blocking(move || std::fs::rename(&from, &to)).await + spawn_blocking(move || { + std::fs::rename(&from, &to).context(|| { + format!( + "could not rename `{}` to `{}`", + from.display(), + to.display() + ) + }) + }) + .await } diff --git a/src/fs/write.rs b/src/fs/write.rs index 4e5d20b..7c14098 100644 --- a/src/fs/write.rs +++ b/src/fs/write.rs @@ -1,6 +1,7 @@ use crate::io; use crate::path::Path; use crate::task::spawn_blocking; +use crate::utils::Context as _; /// Writes a slice of bytes as the new contents of a file. /// @@ -33,5 +34,9 @@ use crate::task::spawn_blocking; pub async fn write, C: AsRef<[u8]>>(path: P, contents: C) -> io::Result<()> { let path = path.as_ref().to_owned(); let contents = contents.as_ref().to_owned(); - spawn_blocking(move || std::fs::write(path, contents)).await + spawn_blocking(move || { + std::fs::write(&path, contents) + .context(|| format!("could not write to file `{}`", path.display())) + }) + .await } diff --git a/src/future/future/join.rs b/src/future/future/join.rs index 5cfbd99..0febcad 100644 --- a/src/future/future/join.rs +++ b/src/future/future/join.rs @@ -12,7 +12,7 @@ pin_project! { pub struct Join where L: Future, - R: Future + R: Future, { #[pin] left: MaybeDone, #[pin] right: MaybeDone, @@ -22,7 +22,7 @@ pin_project! { impl Join where L: Future, - R: Future, + R: Future, { pub(crate) fn new(left: L, right: R) -> Self { Self { @@ -35,7 +35,7 @@ where impl Future for Join where L: Future, - R: Future, + R: Future, { type Output = (L::Output, R::Output); diff --git a/src/future/future/mod.rs b/src/future/future/mod.rs index fc800cb..3be9343 100644 --- a/src/future/future/mod.rs +++ b/src/future/future/mod.rs @@ -290,10 +290,10 @@ extension_trait! { use async_std::future; let a = future::ready(1u8); - let b = future::ready(2u8); + let b = future::ready(2u16); let f = a.join(b); - assert_eq!(f.await, (1u8, 2u8)); + assert_eq!(f.await, (1u8, 2u16)); # }); ``` "#] @@ -305,7 +305,7 @@ extension_trait! { ) -> impl Future::Output, ::Output)> [Join] where Self: std::future::Future + Sized, - F: std::future::Future::Output>, + F: std::future::Future, { Join::new(self, other) } @@ -329,30 +329,30 @@ extension_trait! { use async_std::prelude::*; use async_std::future; - let a = future::ready(Err("Error")); + let a = future::ready(Err::("Error")); let b = future::ready(Ok(1u8)); let f = a.try_join(b); assert_eq!(f.await, Err("Error")); let a = future::ready(Ok::(1u8)); - let b = future::ready(Ok::(2u8)); + let b = future::ready(Ok::(2u16)); let f = a.try_join(b); - assert_eq!(f.await, Ok((1u8, 2u8))); + assert_eq!(f.await, Ok((1u8, 2u16))); # # Ok(()) }) } ``` "#] #[cfg(any(feature = "unstable", feature = "docs"))] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn try_join( + fn try_join( self, other: F - ) -> impl Future> [TryJoin] + ) -> impl Future> [TryJoin] where - Self: std::future::Future> + Sized, - F: std::future::Future::Output>, + Self: std::future::Future> + Sized, + F: std::future::Future>, { TryJoin::new(self, other) } diff --git a/src/future/future/try_join.rs b/src/future/future/try_join.rs index 58ae6d6..f166724 100644 --- a/src/future/future/try_join.rs +++ b/src/future/future/try_join.rs @@ -12,7 +12,7 @@ pin_project! { pub struct TryJoin where L: Future, - R: Future + R: Future, { #[pin] left: MaybeDone, #[pin] right: MaybeDone, @@ -22,7 +22,7 @@ pin_project! { impl TryJoin where L: Future, - R: Future, + R: Future, { pub(crate) fn new(left: L, right: R) -> Self { Self { @@ -32,12 +32,12 @@ where } } -impl Future for TryJoin +impl Future for TryJoin where - L: Future>, - R: Future, + L: Future>, + R: Future>, { - type Output = Result<(T, T), E>; + type Output = Result<(A, B), E>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.project(); diff --git a/src/io/buf_writer.rs b/src/io/buf_writer.rs index ce6a97b..c527d02 100644 --- a/src/io/buf_writer.rs +++ b/src/io/buf_writer.rs @@ -22,14 +22,14 @@ pin_project! { /// times. It also provides no advantage when writing to a destination that is /// in memory, like a `Vec`. /// - /// When the `BufWriter` is dropped, the contents of its buffer will be written - /// out. However, any errors that happen in the process of flushing the buffer - /// when the writer is dropped will be ignored. Code that wishes to handle such - /// errors must manually call [`flush`] before the writer is dropped. + /// Unlike the `BufWriter` type in `std`, this type does not write out the + /// contents of its buffer when it is dropped. Therefore, it is absolutely + /// critical that users explicitly flush the buffer before dropping a + /// `BufWriter`. /// - /// This type is an async version of [`std::io::BufReader`]. + /// This type is an async version of [`std::io::BufWriter`]. /// - /// [`std::io::BufReader`]: https://doc.rust-lang.org/std/io/struct.BufReader.html + /// [`std::io::BufWriter`]: https://doc.rust-lang.org/std/io/struct.BufWriter.html /// /// # Examples /// @@ -61,10 +61,13 @@ pin_project! { /// use async_std::prelude::*; /// /// let mut stream = BufWriter::new(TcpStream::connect("127.0.0.1:34254").await?); + /// /// for i in 0..10 { /// let arr = [i+1]; /// stream.write(&arr).await?; /// }; + /// + /// stream.flush().await?; /// # /// # Ok(()) }) } /// ``` @@ -325,7 +328,7 @@ impl Write for BufWriter { impl fmt::Debug for BufWriter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BufReader") + f.debug_struct("BufWriter") .field("writer", &self.inner) .field("buf", &self.buf) .finish() diff --git a/src/io/copy.rs b/src/io/copy.rs index 8ec3c1a..f05ed0e 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -1,10 +1,11 @@ -use std::pin::Pin; use std::future::Future; +use std::pin::Pin; use pin_project_lite::pin_project; use crate::io::{self, BufRead, BufReader, Read, Write}; use crate::task::{Context, Poll}; +use crate::utils::Context as _; /// Copies the entire contents of a reader into a writer. /// @@ -90,7 +91,7 @@ where writer, amt: 0, }; - future.await + future.await.context(|| String::from("io::copy failed")) } /// Copies the entire contents of a reader into a writer. @@ -177,5 +178,5 @@ where writer, amt: 0, }; - future.await + future.await.context(|| String::from("io::copy failed")) } diff --git a/src/io/stdin.rs b/src/io/stdin.rs index 167ea2d..369ccae 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -1,10 +1,11 @@ +use std::future::Future; use std::pin::Pin; use std::sync::Mutex; -use std::future::Future; use crate::future; use crate::io::{self, Read}; use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; +use crate::utils::Context as _; cfg_unstable! { use once_cell::sync::Lazy; @@ -162,6 +163,7 @@ impl Stdin { } }) .await + .context(|| String::from("could not read line on stdin")) } /// Locks this handle to the standard input stream, returning a readable guard. diff --git a/src/net/addr.rs b/src/net/addr.rs index 2769dd5..ea83950 100644 --- a/src/net/addr.rs +++ b/src/net/addr.rs @@ -1,11 +1,12 @@ +use std::future::Future; use std::mem; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; use std::pin::Pin; -use std::future::Future; use crate::io; use crate::task::{spawn_blocking, Context, JoinHandle, Poll}; +use crate::utils::Context as ErrorContext; cfg_not_docs! { macro_rules! ret { @@ -67,6 +68,18 @@ pub enum ToSocketAddrsFuture { Done, } +/// Wrap `std::io::Error` with additional message +/// +/// Keeps the original error kind and stores the original I/O error as `source`. +impl ErrorContext for ToSocketAddrsFuture { + fn context(self, message: impl Fn() -> String) -> Self { + match self { + ToSocketAddrsFuture::Ready(res) => ToSocketAddrsFuture::Ready(res.context(message)), + x => x, + } + } +} + impl> Future for ToSocketAddrsFuture { type Output = io::Result; @@ -110,7 +123,9 @@ impl ToSocketAddrs for SocketAddrV4 { impl Future, ToSocketAddrsFuture ) { - SocketAddr::V4(*self).to_socket_addrs() + SocketAddr::V4(*self) + .to_socket_addrs() + .context(|| format!("could not resolve address `{}`", self)) } } @@ -123,7 +138,9 @@ impl ToSocketAddrs for SocketAddrV6 { impl Future, ToSocketAddrsFuture ) { - SocketAddr::V6(*self).to_socket_addrs() + SocketAddr::V6(*self) + .to_socket_addrs() + .context(|| format!("could not resolve address `{}`", self)) } } @@ -195,7 +212,9 @@ impl ToSocketAddrs for (&str, u16) { let host = host.to_string(); let task = spawn_blocking(move || { - std::net::ToSocketAddrs::to_socket_addrs(&(host.as_str(), port)) + let addr = (host.as_str(), port); + std::net::ToSocketAddrs::to_socket_addrs(&addr) + .context(|| format!("could not resolve address `{:?}`", addr)) }); ToSocketAddrsFuture::Resolving(task) } @@ -215,7 +234,10 @@ impl ToSocketAddrs for str { } let addr = self.to_string(); - let task = spawn_blocking(move || std::net::ToSocketAddrs::to_socket_addrs(addr.as_str())); + let task = spawn_blocking(move || { + std::net::ToSocketAddrs::to_socket_addrs(addr.as_str()) + .context(|| format!("could not resolve address `{:?}`", addr)) + }); ToSocketAddrsFuture::Resolving(task) } } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index f98bbdc..fe06a96 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,5 +1,5 @@ -use std::net::SocketAddr; use std::future::Future; +use std::net::SocketAddr; use std::pin::Pin; use crate::future; @@ -75,8 +75,11 @@ impl TcpListener { /// [`local_addr`]: #method.local_addr pub async fn bind(addrs: A) -> io::Result { let mut last_err = None; + let addrs = addrs + .to_socket_addrs() + .await?; - for addr in addrs.to_socket_addrs().await? { + for addr in addrs { match mio::net::TcpListener::bind(&addr) { Ok(mio_listener) => { return Ok(TcpListener { diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 1da9c7c..4131783 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -7,6 +7,7 @@ use crate::io::{self, Read, Write}; use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; use crate::task::{spawn_blocking, Context, Poll}; +use crate::utils::Context as _; /// A TCP stream between a local and a remote socket. /// @@ -71,11 +72,16 @@ impl TcpStream { /// ``` pub async fn connect(addrs: A) -> io::Result { let mut last_err = None; + let addrs = addrs + .to_socket_addrs() + .await?; - for addr in addrs.to_socket_addrs().await? { + for addr in addrs { let res = spawn_blocking(move || { - let std_stream = std::net::TcpStream::connect(addr)?; - let mio_stream = mio::net::TcpStream::from_stream(std_stream)?; + let std_stream = std::net::TcpStream::connect(addr) + .context(|| format!("could not connect to {}", addr))?; + let mio_stream = mio::net::TcpStream::from_stream(std_stream) + .context(|| format!("could not open async connection to {}", addr))?; Ok(TcpStream { watcher: Watcher::new(mio_stream), }) diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 37c9d50..7fef1ed 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -5,6 +5,7 @@ use std::net::{Ipv4Addr, Ipv6Addr}; use crate::future; use crate::net::driver::Watcher; use crate::net::ToSocketAddrs; +use crate::utils::Context as _; /// A UDP socket. /// @@ -66,10 +67,13 @@ impl UdpSocket { /// # /// # Ok(()) }) } /// ``` - pub async fn bind(addr: A) -> io::Result { + pub async fn bind(addrs: A) -> io::Result { let mut last_err = None; + let addrs = addrs + .to_socket_addrs() + .await?; - for addr in addr.to_socket_addrs().await? { + for addr in addrs { match mio::net::UdpSocket::bind(&addr) { Ok(mio_socket) => { return Ok(UdpSocket { @@ -106,7 +110,10 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub fn local_addr(&self) -> io::Result { - self.watcher.get_ref().local_addr() + self.watcher + .get_ref() + .local_addr() + .context(|| String::from("could not get local address")) } /// Sends data on the socket to the given address. @@ -151,6 +158,7 @@ impl UdpSocket { .poll_write_with(cx, |inner| inner.send_to(buf, &addr)) }) .await + .context(|| format!("could not send packet to {}", addr)) } /// Receives data from the socket. @@ -178,6 +186,17 @@ impl UdpSocket { .poll_read_with(cx, |inner| inner.recv_from(buf)) }) .await + .context(|| { + use std::fmt::Write; + + let mut error = String::from("could not receive data on "); + if let Ok(addr) = self.local_addr() { + let _ = write!(&mut error, "{}", addr); + } else { + error.push_str("socket"); + } + error + }) } /// Connects the UDP socket to a remote address. @@ -195,7 +214,7 @@ impl UdpSocket { /// ```no_run /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async { /// # - /// use async_std::net::UdpSocket; + /// use async_std::net::UdpSocket; /// /// let socket = UdpSocket::bind("127.0.0.1:0").await?; /// socket.connect("127.0.0.1:8080").await?; @@ -204,8 +223,12 @@ impl UdpSocket { /// ``` pub async fn connect(&self, addrs: A) -> io::Result<()> { let mut last_err = None; + let addrs = addrs + .to_socket_addrs() + .await + .context(|| String::from("could not resolve addresses"))?; - for addr in addrs.to_socket_addrs().await? { + for addr in addrs { // TODO(stjepang): connect on the blocking pool match self.watcher.get_ref().connect(addr) { Ok(()) => return Ok(()), @@ -248,7 +271,19 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { - future::poll_fn(|cx| self.watcher.poll_write_with(cx, |inner| inner.send(buf))).await + future::poll_fn(|cx| self.watcher.poll_write_with(cx, |inner| inner.send(buf))) + .await + .context(|| { + use std::fmt::Write; + + let mut error = String::from("could not send data on "); + if let Ok(addr) = self.local_addr() { + let _ = write!(&mut error, "{}", addr); + } else { + error.push_str("socket"); + } + error + }) } /// Receives data from the socket. @@ -271,7 +306,19 @@ impl UdpSocket { /// # Ok(()) }) } /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { - future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.recv(buf))).await + future::poll_fn(|cx| self.watcher.poll_read_with(cx, |inner| inner.recv(buf))) + .await + .context(|| { + use std::fmt::Write; + + let mut error = String::from("could not receive data on "); + if let Ok(addr) = self.local_addr() { + let _ = write!(&mut error, "{}", addr); + } else { + error.push_str("socket"); + } + error + }) } /// Gets the value of the `SO_BROADCAST` option for this socket. @@ -415,7 +462,7 @@ impl UdpSocket { /// use async_std::net::UdpSocket; /// /// let socket_addr = SocketAddr::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 0).into(), 0); - /// let mdns_addr = Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0x0123) ; + /// let mdns_addr = Ipv6Addr::new(0xFF02, 0, 0, 0, 0, 0, 0, 0x0123); /// let socket = UdpSocket::bind(&socket_addr).await?; /// /// socket.join_multicast_v6(&mdns_addr, 0)?; diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d140228..f7a1ba2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -123,6 +123,7 @@ cfg_unstable! { use count::CountFuture; use partition::PartitionFuture; + use unzip::UnzipFuture; pub use merge::Merge; pub use flatten::Flatten; @@ -137,6 +138,7 @@ cfg_unstable! { mod partition; mod timeout; mod throttle; + mod unzip; } extension_trait! { @@ -334,28 +336,19 @@ extension_trait! { let start = Instant::now(); // emit value every 5 milliseconds - let s = stream::interval(Duration::from_millis(5)) - .enumerate() - .take(3); + let s = stream::interval(Duration::from_millis(5)).take(2); // throttle for 10 milliseconds let mut s = s.throttle(Duration::from_millis(10)); - assert_eq!(s.next().await, Some((0, ()))); - let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 5); + s.next().await; + assert!(start.elapsed().as_millis() >= 5); - assert_eq!(s.next().await, Some((1, ()))); - let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 15); + s.next().await; + assert!(start.elapsed().as_millis() >= 15); - assert_eq!(s.next().await, Some((2, ()))); - let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 25); - - assert_eq!(s.next().await, None); - let duration_ms = start.elapsed().as_millis(); - assert!(duration_ms >= 35); + s.next().await; + assert!(start.elapsed().as_millis() >= 35); # # }) } ``` @@ -1748,6 +1741,44 @@ extension_trait! { Zip::new(self, other) } + #[doc = r#" + Converts an stream of pairs into a pair of containers. + + `unzip()` consumes an entire stream of pairs, producing two collections: one from the left elements of the pairs, and one from the right elements. + + This function is, in some sense, the opposite of [`zip`]. + + [`zip`]: trait.Stream.html#method.zip + + # Example + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + + let s = stream::from_iter(vec![(1,2), (3,4)]); + + let (left, right): (Vec<_>, Vec<_>) = s.unzip().await; + + assert_eq!(left, [1, 3]); + assert_eq!(right, [2, 4]); + # + # }) } + ``` + "#] + #[cfg(feature = "unstable")] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn unzip(self) -> impl Future [UnzipFuture] + where + FromA: Default + Extend, + FromB: Default + Extend, + Self: Stream + Sized, + { + UnzipFuture::new(self) + } + #[doc = r#" Transforms a stream into a collection. diff --git a/src/stream/stream/throttle.rs b/src/stream/stream/throttle.rs index b2480bb..ce8c13b 100644 --- a/src/stream/stream/throttle.rs +++ b/src/stream/stream/throttle.rs @@ -55,10 +55,7 @@ impl Stream for Throttle { } match this.stream.poll_next(cx) { - Poll::Pending => { - cx.waker().wake_by_ref(); // Continue driving even though emitting Pending - Poll::Pending - } + Poll::Pending => Poll::Pending, Poll::Ready(None) => Poll::Ready(None), Poll::Ready(Some(v)) => { *this.blocked = true; diff --git a/src/stream/stream/unzip.rs b/src/stream/stream/unzip.rs new file mode 100644 index 0000000..e0832ff --- /dev/null +++ b/src/stream/stream/unzip.rs @@ -0,0 +1,57 @@ +use std::future::Future; +use std::pin::Pin; + +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[derive(Clone, Debug)] + #[cfg(all(feature = "default", feature = "unstable"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + pub struct UnzipFuture { + #[pin] + stream: S, + res: Option<(FromA, FromB)>, + } +} + +impl UnzipFuture +where + FromA: Default, + FromB: Default, +{ + pub(super) fn new(stream: S) -> Self { + UnzipFuture { + stream, + res: Some((FromA::default(), FromB::default())), + } + } +} + +impl Future for UnzipFuture +where + S: Stream, + FromA: Default + Extend, + FromB: Default + Extend, +{ + type Output = (FromA, FromB); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + loop { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + + match next { + Some((a, b)) => { + let res = this.res.as_mut().unwrap(); + res.0.extend(Some(a)); + res.1.extend(Some(b)); + } + None => return Poll::Ready(this.res.take().unwrap()), + } + } + } +} diff --git a/tests/verbose_errors.rs b/tests/verbose_errors.rs index 54d04f8..1563092 100644 --- a/tests/verbose_errors.rs +++ b/tests/verbose_errors.rs @@ -1,4 +1,4 @@ -use async_std::{fs, task}; +use async_std::{fs, io, net::ToSocketAddrs, task}; #[test] fn open_file() { @@ -8,7 +8,22 @@ fn open_file() { match res { Ok(_) => panic!("Found file with random name: We live in a simulation"), Err(e) => assert_eq!( - "Could not open /ashjudlkahasdasdsikdhajik/asdasdasdasdasdasd/fjuiklashdbflasas", + "Could not open `/ashjudlkahasdasdsikdhajik/asdasdasdasdasdasd/fjuiklashdbflasas`", + &format!("{}", e) + ), + } + }) +} + +#[test] +fn resolve_address() { + task::block_on(async { + let non_existing_addr = "ashjudlkahasdasdsikdhajik.asdasdasdasdasdasd.fjuiklashdbflasas:80"; + let res: Result<_, io::Error> = non_existing_addr.to_socket_addrs().await; + match res { + Ok(_) => panic!("Found address with random name: We live in a simulation"), + Err(e) => assert_eq!( + "could not resolve address `\"ashjudlkahasdasdsikdhajik.asdasdasdasdasdasd.fjuiklashdbflasas:80\"`", &format!("{}", e) ), }