Merge remote-tracking branch 'upstream/master' into 342-stream-throttle

new-scheduler
Wouter Geraedts 5 years ago
commit ced5281b73

@ -41,7 +41,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --all --doc --features unstable
args: --all --features unstable
check_fmt_and_docs:
name: Checking fmt and docs

@ -21,15 +21,15 @@ features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]
[features]
docs = ["broadcaster"]
docs = ["unstable"]
unstable = ["broadcaster"]
[dependencies]
async-macros = "1.0.0"
async-task = "1.0.0"
cfg-if = "0.1.9"
crossbeam-channel = "0.3.9"
crossbeam-deque = "0.7.1"
crossbeam-utils = "0.6.6"
futures-core-preview = "=0.3.0-alpha.19"
futures-io-preview = "=0.3.0-alpha.19"
futures-timer = "1.0.2"

@ -1,7 +1,5 @@
use std::future::Future;
use cfg_if::cfg_if;
use crate::io;
use crate::path::Path;
use crate::task::blocking;
@ -113,22 +111,13 @@ impl DirBuilder {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::fs::DirBuilderExt;
} else if #[cfg(unix)] {
use std::os::unix::fs::DirBuilderExt;
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl DirBuilderExt for DirBuilder {
fn mode(&mut self, mode: u32) -> &mut Self {
self.mode = Some(mode);
self
}
}
}
}

@ -2,8 +2,6 @@ use std::ffi::OsString;
use std::fmt;
use std::sync::Arc;
use cfg_if::cfg_if;
use crate::fs::{FileType, Metadata};
use crate::io;
use crate::path::PathBuf;
@ -160,21 +158,12 @@ impl fmt::Debug for DirEntry {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::fs::DirEntryExt;
} else if #[cfg(unix)] {
use std::os::unix::fs::DirEntryExt;
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl DirEntryExt for DirEntry {
fn ino(&self) -> u64 {
self.0.ino()
}
}
}
}

@ -7,8 +7,6 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use cfg_if::cfg_if;
use crate::fs::{Metadata, Permissions};
use crate::future;
use crate::io::{self, Read, Seek, SeekFrom, Write};
@ -401,20 +399,9 @@ impl From<std::fs::File> for File {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
} else if #[cfg(unix)] {
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
} else if #[cfg(windows)] {
use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for File {
fn as_raw_fd(&self) -> RawFd {
self.file.as_raw_fd()
@ -436,12 +423,11 @@ cfg_if! {
.into_raw_fd()
}
}
}
}
#[cfg_attr(feature = "docs", doc(cfg(windows)))]
cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
cfg_windows! {
use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
impl AsRawHandle for File {
fn as_raw_handle(&self) -> RawHandle {
self.file.as_raw_handle()
@ -463,7 +449,6 @@ cfg_if! {
.into_raw_handle()
}
}
}
}
/// An async mutex with non-borrowing lock guards.

@ -1,7 +1,8 @@
use cfg_if::cfg_if;
cfg_not_docs! {
pub use std::fs::FileType;
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_docs! {
/// The type of a file or directory.
///
/// A file type is returned by [`Metadata::file_type`].
@ -80,7 +81,4 @@ cfg_if! {
unimplemented!()
}
}
} else {
pub use std::fs::FileType;
}
}

@ -1,5 +1,3 @@
use cfg_if::cfg_if;
use crate::io;
use crate::path::Path;
use crate::task::blocking;
@ -39,8 +37,11 @@ pub async fn metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> {
blocking::spawn(move || std::fs::metadata(path)).await
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_not_docs! {
pub use std::fs::Metadata;
}
cfg_docs! {
use std::time::SystemTime;
use crate::fs::{FileType, Permissions};
@ -225,7 +226,4 @@ cfg_if! {
unimplemented!()
}
}
} else {
pub use std::fs::Metadata;
}
}

@ -1,7 +1,5 @@
use std::future::Future;
use cfg_if::cfg_if;
use crate::fs::File;
use crate::io;
use crate::path::Path;
@ -296,17 +294,9 @@ impl Default for OpenOptions {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::fs::OpenOptionsExt;
} else if #[cfg(unix)] {
use std::os::unix::fs::OpenOptionsExt;
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl OpenOptionsExt for OpenOptions {
fn mode(&mut self, mode: u32) -> &mut Self {
self.0.mode(mode);
@ -318,5 +308,4 @@ cfg_if! {
self
}
}
}
}

@ -1,7 +1,8 @@
use cfg_if::cfg_if;
cfg_not_docs! {
pub use std::fs::Permissions;
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_docs! {
/// A set of permissions on a file or directory.
///
/// This type is a re-export of [`std::fs::Permissions`].
@ -52,7 +53,4 @@ cfg_if! {
unimplemented!()
}
}
} else {
pub use std::fs::Permissions;
}
}

@ -1,15 +1,9 @@
use crate::utils::extension_trait;
cfg_if::cfg_if! {
if #[cfg(feature = "docs")] {
extension_trait! {
use std::pin::Pin;
use std::ops::{Deref, DerefMut};
use crate::task::{Context, Poll};
}
}
extension_trait! {
#[doc = r#"
A future represents an asynchronous computation.

@ -30,7 +30,7 @@ use crate::future::Future;
/// }
/// }
/// ```
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait IntoFuture {
/// The type of value produced on completion.

@ -44,12 +44,6 @@
#[doc(inline)]
pub use async_macros::{join, try_join};
#[doc(inline)]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub use async_macros::{select, try_select};
use cfg_if::cfg_if;
pub use future::Future;
pub use pending::pending;
pub use poll_fn::poll_fn;
@ -62,10 +56,10 @@ mod poll_fn;
mod ready;
mod timeout;
cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {
mod into_future;
cfg_unstable! {
#[doc(inline)]
pub use async_macros::{select, try_select};
pub use into_future::IntoFuture;
}
mod into_future;
}

@ -12,19 +12,12 @@ use read_until::ReadUntilFuture;
use std::mem;
use std::pin::Pin;
use cfg_if::cfg_if;
use crate::io;
use crate::task::{Context, Poll};
use crate::utils::extension_trait;
cfg_if! {
if #[cfg(feature = "docs")] {
extension_trait! {
use std::ops::{Deref, DerefMut};
}
}
extension_trait! {
#[doc = r#"
Allows reading from a buffered byte stream.

@ -13,23 +13,17 @@ use read_to_end::{read_to_end_internal, ReadToEndFuture};
use read_to_string::ReadToStringFuture;
use read_vectored::ReadVectoredFuture;
use cfg_if::cfg_if;
use std::mem;
use crate::io::IoSliceMut;
use crate::utils::extension_trait;
cfg_if! {
if #[cfg(feature = "docs")] {
extension_trait! {
use std::pin::Pin;
use std::ops::{Deref, DerefMut};
use crate::io;
use crate::task::{Context, Poll};
}
}
extension_trait! {
#[doc = r#"
Allows reading from a byte stream.

@ -1,19 +1,16 @@
use std::pin::Pin;
mod seek;
use cfg_if::cfg_if;
use seek::SeekFuture;
use crate::future::Future;
use crate::io::{self, SeekFrom};
use crate::task::{Context, Poll};
use crate::utils::extension_trait;
use crate::io::SeekFrom;
cfg_if! {
if #[cfg(feature = "docs")] {
extension_trait! {
use std::ops::{Deref, DerefMut};
}
}
use std::pin::Pin;
use crate::io;
use crate::task::{Context, Poll};
extension_trait! {
#[doc = r#"
Allows seeking through a byte stream.
@ -114,19 +111,3 @@ extension_trait! {
}
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct SeekFuture<'a, T: Unpin + ?Sized> {
seeker: &'a mut T,
pos: SeekFrom,
}
impl<T: SeekExt + Unpin + ?Sized> Future for SeekFuture<'_, T> {
type Output = io::Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pos = self.pos;
Pin::new(&mut *self.seeker).poll_seek(cx, pos)
}
}

@ -0,0 +1,21 @@
use std::pin::Pin;
use crate::future::Future;
use crate::io::{self, Seek, SeekFrom};
use crate::task::{Context, Poll};
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct SeekFuture<'a, T: Unpin + ?Sized> {
pub(crate) seeker: &'a mut T,
pub(crate) pos: SeekFrom,
}
impl<T: Seek + Unpin + ?Sized> Future for SeekFuture<'_, T> {
type Output = io::Result<u64>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pos = self.pos;
Pin::new(&mut *self.seeker).poll_seek(cx, pos)
}
}

@ -1,8 +1,6 @@
use std::pin::Pin;
use std::sync::Mutex;
use cfg_if::cfg_if;
use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, JoinHandle, Poll};
@ -162,35 +160,22 @@ impl Write for Stderr {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::io::{AsRawFd, RawFd};
use crate::os::windows::io::{AsRawHandle, RawHandle};
} else if #[cfg(unix)] {
use std::os::unix::io::{AsRawFd, RawFd};
} else if #[cfg(windows)] {
use std::os::windows::io::{AsRawHandle, RawHandle};
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for Stderr {
fn as_raw_fd(&self) -> RawFd {
std::io::stderr().as_raw_fd()
}
}
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
cfg_windows! {
use crate::os::windows::io::{AsRawHandle, RawHandle};
impl AsRawHandle for Stderr {
fn as_raw_handle(&self) -> RawHandle {
std::io::stderr().as_raw_handle()
}
}
}
}

@ -1,8 +1,6 @@
use std::pin::Pin;
use std::sync::Mutex;
use cfg_if::cfg_if;
use crate::future::{self, Future};
use crate::io::{self, Read};
use crate::task::{blocking, Context, JoinHandle, Poll};
@ -186,35 +184,22 @@ impl Read for Stdin {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::io::{AsRawFd, RawFd};
use crate::os::windows::io::{AsRawHandle, RawHandle};
} else if #[cfg(unix)] {
use std::os::unix::io::{AsRawFd, RawFd};
} else if #[cfg(windows)] {
use std::os::windows::io::{AsRawHandle, RawHandle};
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for Stdin {
fn as_raw_fd(&self) -> RawFd {
std::io::stdin().as_raw_fd()
}
}
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
cfg_windows! {
use crate::os::windows::io::{AsRawHandle, RawHandle};
impl AsRawHandle for Stdin {
fn as_raw_handle(&self) -> RawHandle {
std::io::stdin().as_raw_handle()
}
}
}
}

@ -1,8 +1,6 @@
use std::pin::Pin;
use std::sync::Mutex;
use cfg_if::cfg_if;
use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, JoinHandle, Poll};
@ -162,35 +160,22 @@ impl Write for Stdout {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::io::{AsRawFd, RawFd};
use crate::os::windows::io::{AsRawHandle, RawHandle};
} else if #[cfg(unix)] {
use std::os::unix::io::{AsRawFd, RawFd};
} else if #[cfg(windows)] {
use std::os::windows::io::{AsRawHandle, RawHandle};
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for Stdout {
fn as_raw_fd(&self) -> RawFd {
std::io::stdout().as_raw_fd()
}
}
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
cfg_windows! {
use crate::os::windows::io::{AsRawHandle, RawHandle};
impl AsRawHandle for Stdout {
fn as_raw_handle(&self) -> RawHandle {
std::io::stdout().as_raw_handle()
}
}
}
}

@ -10,22 +10,14 @@ use write_all::WriteAllFuture;
use write_fmt::WriteFmtFuture;
use write_vectored::WriteVectoredFuture;
use cfg_if::cfg_if;
use crate::io::{self, IoSlice};
use crate::io::IoSlice;
use crate::utils::extension_trait;
use crate::io;
cfg_if! {
if #[cfg(feature = "docs")] {
extension_trait! {
use std::pin::Pin;
use std::ops::{Deref, DerefMut};
use crate::task::{Context, Poll};
}
}
extension_trait! {
#[doc = r#"
Allows writing to a byte stream.

@ -48,7 +48,8 @@
#![doc(html_logo_url = "https://async.rs/images/logo--hero.svg")]
#![recursion_limit = "1024"]
use cfg_if::cfg_if;
#[macro_use]
mod utils;
pub mod fs;
pub mod future;
@ -61,11 +62,8 @@ pub mod stream;
pub mod sync;
pub mod task;
cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
cfg_unstable! {
pub mod pin;
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub mod process;
mod unit;
@ -74,13 +72,9 @@ cfg_if! {
mod option;
mod string;
mod collections;
}
#[doc(inline)]
pub use std::{write, writeln};
}
mod macros;
pub(crate) mod utils;
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[doc(inline)]
pub use std::{write, writeln};

@ -43,7 +43,7 @@
/// #
/// # })
/// ```
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[macro_export]
macro_rules! print {
@ -81,7 +81,7 @@ macro_rules! print {
/// #
/// # })
/// ```
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[macro_export]
macro_rules! println {
@ -119,7 +119,7 @@ macro_rules! println {
/// #
/// # })
/// ```
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[macro_export]
macro_rules! eprint {
@ -153,7 +153,7 @@ macro_rules! eprint {
/// #
/// # })
/// ```
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[macro_export]
macro_rules! eprintln {

@ -3,25 +3,23 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
use std::pin::Pin;
use cfg_if::cfg_if;
use crate::future::Future;
use crate::io;
use crate::task::{blocking, Context, JoinHandle, Poll};
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_not_docs! {
macro_rules! ret {
(impl Future<Output = $out:ty>, $fut:ty) => ($fut);
}
}
cfg_docs! {
#[doc(hidden)]
pub struct ImplFuture<T>(std::marker::PhantomData<T>);
macro_rules! ret {
(impl Future<Output = $out:ty>, $fut:ty) => (ImplFuture<$out>);
}
} else {
macro_rules! ret {
(impl Future<Output = $out:ty>, $fut:ty) => ($fut);
}
}
}
/// Converts or resolves addresses to [`SocketAddr`] values.

@ -1,13 +1,10 @@
use std::net::SocketAddr;
use std::pin::Pin;
use cfg_if::cfg_if;
use super::TcpStream;
use crate::future::{self, Future};
use crate::io;
use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs;
use crate::net::{TcpStream, ToSocketAddrs};
use crate::stream::Stream;
use crate::task::{Context, Poll};
@ -213,20 +210,9 @@ impl From<std::net::TcpListener> for TcpListener {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
// use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
} else if #[cfg(unix)] {
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
} else if #[cfg(windows)] {
// use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.watcher.get_ref().as_raw_fd()
@ -244,12 +230,11 @@ cfg_if! {
self.watcher.into_inner().into_raw_fd()
}
}
}
}
#[cfg_attr(feature = "docs", doc(cfg(windows)))]
cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
cfg_windows! {
// use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
//
// impl AsRawSocket for TcpListener {
// fn as_raw_socket(&self) -> RawSocket {
// self.raw_socket
@ -267,5 +252,4 @@ cfg_if! {
// self.raw_socket
// }
// }
}
}

@ -2,8 +2,6 @@ use std::io::{IoSlice, IoSliceMut, Read as _, Write as _};
use std::net::SocketAddr;
use std::pin::Pin;
use cfg_if::cfg_if;
use crate::future;
use crate::io::{self, Read, Write};
use crate::net::driver::Watcher;
@ -367,20 +365,9 @@ impl From<std::net::TcpStream> for TcpStream {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
// use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
} else if #[cfg(unix)] {
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
} else if #[cfg(windows)] {
// use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for TcpStream {
fn as_raw_fd(&self) -> RawFd {
self.watcher.get_ref().as_raw_fd()
@ -398,12 +385,11 @@ cfg_if! {
self.watcher.into_inner().into_raw_fd()
}
}
}
}
#[cfg_attr(feature = "docs", doc(cfg(windows)))]
cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
cfg_windows! {
// use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
//
// impl AsRawSocket for TcpStream {
// fn as_raw_socket(&self) -> RawSocket {
// self.raw_socket
@ -421,5 +407,4 @@ cfg_if! {
// self.raw_socket
// }
// }
}
}

@ -1,7 +1,5 @@
use std::io;
use std::net::SocketAddr;
use cfg_if::cfg_if;
use std::net::{Ipv4Addr, Ipv6Addr};
use crate::future;
@ -463,20 +461,9 @@ impl From<std::net::UdpSocket> for UdpSocket {
}
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_unix! {
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
// use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
} else if #[cfg(unix)] {
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
} else if #[cfg(windows)] {
// use std::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
}
}
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
cfg_if! {
if #[cfg(any(unix, feature = "docs"))] {
impl AsRawFd for UdpSocket {
fn as_raw_fd(&self) -> RawFd {
self.watcher.get_ref().as_raw_fd()
@ -494,13 +481,10 @@ cfg_if! {
self.watcher.into_inner().into_raw_fd()
}
}
}
}
#[cfg_attr(feature = "docs", doc(cfg(windows)))]
cfg_if! {
if #[cfg(any(windows, feature = "docs"))] {
// use std::os::windows::io::{AsRawSocket, FromRawSocket, IntoRawSocket, RawSocket};
cfg_windows! {
// use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
//
// impl AsRawSocket for UdpSocket {
// fn as_raw_socket(&self) -> RawSocket {
@ -519,5 +503,4 @@ cfg_if! {
// self.raw_socket
// }
// }
}
}

@ -1,9 +1,9 @@
//! OS-specific extensions.
#[cfg(any(unix, feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
pub mod unix;
cfg_unix! {
pub mod unix;
}
#[cfg(any(windows, feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(windows)))]
pub mod windows;
cfg_windows! {
pub mod windows;
}

@ -1,7 +1,5 @@
//! Unix-specific filesystem extensions.
use cfg_if::cfg_if;
use crate::io;
use crate::path::Path;
use crate::task::blocking;
@ -31,8 +29,11 @@ pub async fn symlink<P: AsRef<Path>, Q: AsRef<Path>>(src: P, dst: Q) -> io::Resu
blocking::spawn(move || std::os::unix::fs::symlink(&src, &dst)).await
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_not_docs! {
pub use std::os::unix::fs::{DirBuilderExt, DirEntryExt, OpenOptionsExt};
}
cfg_docs! {
/// Unix-specific extensions to `DirBuilder`.
pub trait DirBuilderExt {
/// Sets the mode to create new directories with. This option defaults to
@ -67,7 +68,4 @@ cfg_if! {
/// This options overwrites any previously set custom flags.
fn custom_flags(&mut self, flags: i32) -> &mut Self;
}
} else {
pub use std::os::unix::fs::{DirBuilderExt, OpenOptionsExt};
}
}

@ -1,9 +1,10 @@
//! Unix-specific I/O extensions.
use cfg_if::cfg_if;
cfg_not_docs! {
pub use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_docs! {
/// Raw file descriptors.
pub type RawFd = std::os::raw::c_int;
@ -50,7 +51,4 @@ cfg_if! {
/// and must close the descriptor once it's no longer needed.
fn into_raw_fd(self) -> RawFd;
}
} else {
pub use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
}
}

@ -1,7 +1,5 @@
//! Unix-specific networking extensions.
use cfg_if::cfg_if;
pub use datagram::UnixDatagram;
pub use listener::{Incoming, UnixListener};
pub use stream::UnixStream;
@ -10,8 +8,11 @@ mod datagram;
mod listener;
mod stream;
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_not_docs! {
pub use std::os::unix::net::SocketAddr;
}
cfg_docs! {
use std::fmt;
use crate::path::Path;
@ -93,7 +94,4 @@ cfg_if! {
unreachable!("this impl only appears in the rendered docs")
}
}
} else {
pub use std::os::unix::net::SocketAddr;
}
}

@ -1,9 +1,12 @@
//! Windows-specific I/O extensions.
use cfg_if::cfg_if;
cfg_not_docs! {
pub use std::os::windows::io::{
AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle, RawSocket,
};
}
cfg_if! {
if #[cfg(feature = "docs")] {
cfg_docs! {
/// Raw HANDLEs.
pub type RawHandle = *mut std::os::raw::c_void;
@ -42,9 +45,4 @@ cfg_if! {
/// it once it's no longer needed.
fn into_raw_handle(self) -> RawHandle;
}
} else {
pub use std::os::windows::io::{
AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle, RawSocket,
};
}
}

@ -11,8 +11,6 @@
//! use async_std::prelude::*;
//! ```
use cfg_if::cfg_if;
#[doc(no_inline)]
pub use crate::future::Future;
#[doc(no_inline)]
@ -41,12 +39,9 @@ pub use crate::io::write::WriteExt as _;
#[doc(hidden)]
pub use crate::stream::stream::StreamExt as _;
cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {
cfg_unstable! {
#[doc(no_inline)]
pub use crate::stream::DoubleEndedStream;
#[doc(no_inline)]
pub use crate::stream::ExactSizeStream;
}
}

@ -10,8 +10,8 @@ use std::task::{Context, Poll};
/// `Item`s from the back, as well as the front.
///
/// [`Stream`]: trait.Stream.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub trait DoubleEndedStream: Stream {
/// Removes and returns an element from the end of the stream.
///

@ -76,8 +76,8 @@ pub use crate::stream::Stream;
/// # });
/// # }
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub trait ExactSizeStream: Stream {
/// Returns the exact number of times the stream will iterate.
///

@ -27,6 +27,7 @@ use crate::stream::IntoStream;
/// #
/// # }) }
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait Extend<A> {
/// Extends a collection with the contents of a stream.

@ -106,8 +106,8 @@ use std::pin::Pin;
///```
///
/// [`IntoStream`]: trait.IntoStream.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub trait FromStream<T> {
/// Creates a value from a stream.
///

@ -14,7 +14,7 @@ use crate::stream::Stream;
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
/// [`Stream::fuse`]: trait.Stream.html#method.fuse
/// [`Fuse`]: struct.Fuse.html
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub trait FusedStream: Stream {}

@ -43,9 +43,8 @@ use futures_timer::Delay;
/// #
/// # Ok(()) }) }
/// ```
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[doc(inline)]
pub fn interval(dur: Duration) -> Interval {
Interval {
delay: Delay::new(dur),
@ -55,9 +54,9 @@ pub fn interval(dur: Duration) -> Interval {
/// A stream representing notifications at fixed interval
///
#[derive(Debug)]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[doc(inline)]
#[derive(Debug)]
pub struct Interval {
delay: Delay,
interval: Duration,

@ -13,8 +13,8 @@ use crate::stream::Stream;
/// See also: [`FromStream`].
///
/// [`FromStream`]: trait.FromStream.html
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub trait IntoStream {
/// The type of the elements being iterated over.
type Item;

@ -21,8 +21,6 @@
//! # })
//! ```
use cfg_if::cfg_if;
pub use empty::{empty, Empty};
pub use from_fn::{from_fn, FromFn};
pub use once::{once, Once};
@ -40,8 +38,7 @@ mod once;
mod repeat;
mod repeat_with;
cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {
cfg_unstable! {
mod double_ended_stream;
mod exact_size_stream;
mod extend;
@ -60,8 +57,6 @@ cfg_if! {
pub use interval::{interval, Interval};
pub use into_stream::IntoStream;
pub use product::Product;
pub use sum::Sum;
pub use stream::Merge;
}
pub use sum::Sum;
}

@ -11,8 +11,8 @@ use crate::stream::Stream;
/// [`product`]: trait.Product.html#tymethod.product
/// [`FromStream`]: trait.FromStream.html
/// [`Stream::product`]: trait.Stream.html#method.product
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub trait Product<A = Self>: Sized {
/// Method which takes a stream and generates `Self` from the elements by
/// multiplying the items.

@ -8,7 +8,7 @@ use futures_core::Stream;
/// This stream is returned by [`Stream::merge`].
///
/// [`Stream::merge`]: trait.Stream.html#method.merge
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct Merge<L, R> {

@ -94,32 +94,22 @@ use std::cmp::Ordering;
use std::marker::PhantomData;
use std::time::Duration;
use cfg_if::cfg_if;
use crate::utils::extension_trait;
cfg_if! {
if #[cfg(feature = "docs")] {
use std::ops::{Deref, DerefMut};
use crate::task::{Context, Poll};
}
}
cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {
mod merge;
cfg_unstable! {
use std::pin::Pin;
use crate::future::Future;
use crate::stream::FromStream;
pub use merge::Merge;
}
mod merge;
}
extension_trait! {
use std::ops::{Deref, DerefMut};
use crate::task::{Context, Poll};
#[doc = r#"
An asynchronous stream of values.
@ -1286,7 +1276,7 @@ extension_trait! {
[`stream`]: trait.Stream.html#tymethod.next
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
fn collect<'a, B>(
@ -1325,7 +1315,7 @@ extension_trait! {
# });
```
"#]
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn merge<U>(self, other: U) -> Merge<Self, U>
where

@ -11,8 +11,8 @@ use crate::stream::Stream;
/// [`sum`]: trait.Sum.html#tymethod.sum
/// [`FromStream`]: trait.FromStream.html
/// [`Stream::sum`]: trait.Stream.html#method.sum
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[cfg(any(feature = "unstable", feature = "docs"))]
pub trait Sum<A = Self>: Sized {
/// Method which takes a stream and generates `Self` from the elements by
/// "summing up" the items.

@ -32,6 +32,7 @@ use crate::sync::Mutex;
/// # });
/// # }
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug)]
pub struct Barrier {
@ -61,6 +62,7 @@ struct BarrierState {
/// let barrier = Barrier::new(1);
/// let barrier_wait_result = barrier.wait();
/// ```
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[derive(Debug, Clone)]
pub struct BarrierWaitResult(bool);

@ -32,14 +32,13 @@
#[doc(inline)]
pub use std::sync::{Arc, Weak};
#[cfg(any(feature = "unstable", feature = "docs"))]
pub use barrier::{Barrier, BarrierWaitResult};
pub use mutex::{Mutex, MutexGuard};
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
mod barrier;
mod mutex;
mod rwlock;
cfg_unstable! {
pub use barrier::{Barrier, BarrierWaitResult};
mod barrier;
}

@ -1,10 +1,12 @@
use std::cell::UnsafeCell;
use std::cell::{Cell, UnsafeCell};
use std::mem::{self, ManuallyDrop};
use std::panic::{self, AssertUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable};
use std::thread::{self, Thread};
use std::thread;
use crossbeam_utils::sync::Parker;
use super::task;
use super::task_local;
@ -119,13 +121,21 @@ where
F: Future<Output = T>,
{
thread_local! {
static ARC_THREAD: Arc<Thread> = Arc::new(thread::current());
// May hold a pre-allocated parker that can be reused for efficiency.
//
// Note that each invocation of `block` needs its own parker. In particular, if `block`
// recursively calls itself, we must make sure that each recursive call uses a distinct
// parker instance.
static CACHE: Cell<Option<Arc<Parker>>> = Cell::new(None);
}
pin_utils::pin_mut!(f);
ARC_THREAD.with(|arc_thread: &Arc<Thread>| {
let ptr = (&**arc_thread as *const Thread) as *const ();
CACHE.with(|cache| {
// Reuse a cached parker or create a new one for this invocation of `block`.
let arc_parker: Arc<Parker> = cache.take().unwrap_or_else(|| Arc::new(Parker::new()));
let ptr = (&*arc_parker as *const Parker) as *const ();
let vt = vtable();
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) };
@ -133,32 +143,34 @@ where
loop {
if let Poll::Ready(t) = f.as_mut().poll(cx) {
// Save the parker for the next invocation of `block`.
cache.set(Some(arc_parker));
return t;
}
thread::park();
arc_parker.park();
}
})
}
fn vtable() -> &'static RawWakerVTable {
unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread));
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
mem::forget(arc.clone());
RawWaker::new(ptr, vtable())
}
unsafe fn wake_raw(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const Thread);
arc.unpark();
let arc = Arc::from_raw(ptr as *const Parker);
arc.unparker().unpark();
}
unsafe fn wake_by_ref_raw(ptr: *const ()) {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread));
arc.unpark();
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Parker));
arc.unparker().unpark();
}
unsafe fn drop_raw(ptr: *const ()) {
drop(Arc::from_raw(ptr as *const Thread))
drop(Arc::from_raw(ptr as *const Parker))
}
&RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)

@ -143,11 +143,9 @@ mod worker;
pub(crate) mod blocking;
cfg_if::cfg_if! {
if #[cfg(any(feature = "unstable", feature = "docs"))] {
cfg_unstable! {
mod yield_now;
pub use yield_now::yield_now;
}
}
/// Spawns a blocking task.
@ -178,7 +176,7 @@ cfg_if::cfg_if! {
/// ```
// Once this function stabilizes we should merge `blocking::spawn` into this so
// all code in our crate uses `task::blocking` too.
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub fn spawn_blocking<F, R>(f: F) -> task::JoinHandle<R>

@ -26,7 +26,7 @@ use std::pin::Pin;
/// #
/// # }) }
/// ```
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub async fn yield_now() {

@ -20,6 +20,64 @@ pub fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
t
}
/// Declares unstable items.
#[doc(hidden)]
macro_rules! cfg_unstable {
($($item:item)*) => {
$(
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
$item
)*
}
}
/// Declares Unix-specific items.
#[doc(hidden)]
macro_rules! cfg_unix {
($($item:item)*) => {
$(
#[cfg(any(unix, feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unix)))]
$item
)*
}
}
/// Declares Windows-specific items.
#[doc(hidden)]
macro_rules! cfg_windows {
($($item:item)*) => {
$(
#[cfg(any(windows, feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(windows)))]
$item
)*
}
}
/// Declares items when the "docs" feature is enabled.
#[doc(hidden)]
macro_rules! cfg_docs {
($($item:item)*) => {
$(
#[cfg(feature = "docs")]
$item
)*
}
}
/// Declares items when the "docs" feature is disabled.
#[doc(hidden)]
macro_rules! cfg_not_docs {
($($item:item)*) => {
$(
#[cfg(not(feature = "docs"))]
$item
)*
}
}
/// Defines an extension trait for a base trait.
///
/// In generated docs, the base trait will contain methods from the extension trait. In actual
@ -29,7 +87,6 @@ pub fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
/// Inside invocations of this macro, we write a definitions that looks similar to the final
/// rendered docs, and the macro then generates all the boilerplate for us.
#[doc(hidden)]
#[macro_export]
macro_rules! extension_trait {
(
// Interesting patterns:
@ -113,6 +170,12 @@ macro_rules! extension_trait {
// Handle the end of the token list.
(@doc ($($head:tt)*)) => { $($head)* };
(@ext ($($head:tt)*)) => { $($head)* };
}
pub use crate::extension_trait;
// Parse imports at the beginning of the macro.
($import:item $($tail:tt)*) => {
#[cfg(feature = "docs")]
$import
extension_trait!($($tail)*);
};
}

@ -48,13 +48,13 @@ fn test_buffered_writer() {
}
#[test]
fn test_buffered_writer_inner_into_inner_does_not_flush() {
fn test_buffered_writer_inner_into_inner_flushes() {
task::block_on(async {
let mut w = BufWriter::with_capacity(3, Vec::new());
w.write(&[0, 1]).await.unwrap();
assert_eq!(*w.get_ref(), []);
let w = w.into_inner().await.unwrap();
assert_eq!(w, []);
assert_eq!(w, [0, 1]);
})
}

Loading…
Cancel
Save