Reduce dependency on futures crate (#140)

* Add future::poll_fn

* Replace all uses of poll_fn with the new one

* Remove some uses of futures

* Simplify ReadDir and DirEntry

* Remove some use of futures from File

* Use futures subcrates

* Fix imports in docs

* Remove futures-util dependency

* Remove futures-executor-preview

* Refactor

* Require more features in the futures-preview crate
staging
Stjepan Glavina 5 years ago committed by GitHub
parent 75a4ba80cc
commit bac74c2d7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -27,6 +27,9 @@ unstable = []
async-task = "1.0.0" async-task = "1.0.0"
cfg-if = "0.1.9" cfg-if = "0.1.9"
crossbeam-channel = "0.3.9" crossbeam-channel = "0.3.9"
futures-channel-preview = "0.3.0-alpha.18"
futures-core-preview = "0.3.0-alpha.18"
futures-io-preview = "0.3.0-alpha.18"
futures-timer = "0.3.0" futures-timer = "0.3.0"
lazy_static = "1.3.0" lazy_static = "1.3.0"
log = { version = "0.4.8", features = ["kv_unstable"] } log = { version = "0.4.8", features = ["kv_unstable"] }
@ -37,11 +40,11 @@ num_cpus = "1.10.0"
pin-utils = "0.1.0-alpha.4" pin-utils = "0.1.0-alpha.4"
slab = "0.4.2" slab = "0.4.2"
[dependencies.futures-preview]
version = "0.3.0-alpha.18"
features = ["async-await", "nightly"]
[dev-dependencies] [dev-dependencies]
femme = "1.1.0" femme = "1.1.0"
tempdir = "0.3.7"
surf = "1.0.1" surf = "1.0.1"
tempdir = "0.3.7"
[dev-dependencies.futures-preview]
version = "0.3.0-alpha.18"
features = ["std", "nightly", "async-await"]

@ -20,7 +20,7 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined
# task, # task,
# }; # };
# use futures::channel::mpsc; # use futures::channel::mpsc;
# use futures::SinkExt; # use futures::sink::SinkExt;
# use std::sync::Arc; # use std::sync::Arc;
# #
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

@ -20,7 +20,7 @@ if Alice and Charley send two messages to Bob at the same time, Bob will see the
# prelude::Stream, # prelude::Stream,
# }; # };
use futures::channel::mpsc; // 1 use futures::channel::mpsc; // 1
use futures::SinkExt; use futures::sink::SinkExt;
use std::sync::Arc; use std::sync::Arc;
# type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; # type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

@ -1,15 +1,12 @@
use std::ffi::OsString; use std::ffi::OsString;
use std::fs; use std::fs;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin; use std::sync::Arc;
use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future::{self, FutureExt, TryFutureExt};
use crate::future::Future;
use crate::io; use crate::io;
use crate::task::{blocking, Poll}; use crate::task::blocking;
/// An entry inside a directory. /// An entry inside a directory.
/// ///
@ -21,26 +18,11 @@ use crate::task::{blocking, Poll};
/// [`std::fs::DirEntry`]: https://doc.rust-lang.org/std/fs/struct.DirEntry.html /// [`std::fs::DirEntry`]: https://doc.rust-lang.org/std/fs/struct.DirEntry.html
#[derive(Debug)] #[derive(Debug)]
pub struct DirEntry { pub struct DirEntry {
/// The state of the entry. /// The inner synchronous `DirEntry`.
state: Mutex<State>, inner: Arc<fs::DirEntry>,
/// The full path to the entry.
path: PathBuf,
#[cfg(unix)] #[cfg(unix)]
ino: u64, ino: u64,
/// The bare name of the entry without the leading path.
file_name: OsString,
}
/// The state of an asynchronous `DirEntry`.
///
/// The `DirEntry` can be either idle or busy performing an asynchronous operation.
#[derive(Debug)]
enum State {
Idle(Option<fs::DirEntry>),
Busy(blocking::JoinHandle<State>),
} }
impl DirEntry { impl DirEntry {
@ -48,17 +30,13 @@ impl DirEntry {
pub(crate) fn new(inner: fs::DirEntry) -> DirEntry { pub(crate) fn new(inner: fs::DirEntry) -> DirEntry {
#[cfg(unix)] #[cfg(unix)]
let dir_entry = DirEntry { let dir_entry = DirEntry {
path: inner.path(),
file_name: inner.file_name(),
ino: inner.ino(), ino: inner.ino(),
state: Mutex::new(State::Idle(Some(inner))), inner: Arc::new(inner),
}; };
#[cfg(windows)] #[cfg(windows)]
let dir_entry = DirEntry { let dir_entry = DirEntry {
path: inner.path(), inner: Arc::new(inner),
file_name: inner.file_name(),
state: Mutex::new(State::Idle(Some(inner))),
}; };
dir_entry dir_entry
@ -89,7 +67,7 @@ impl DirEntry {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn path(&self) -> PathBuf { pub fn path(&self) -> PathBuf {
self.path.clone() self.inner.path()
} }
/// Returns the metadata for this entry. /// Returns the metadata for this entry.
@ -114,35 +92,8 @@ impl DirEntry {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub async fn metadata(&self) -> io::Result<fs::Metadata> { pub async fn metadata(&self) -> io::Result<fs::Metadata> {
future::poll_fn(|cx| { let inner = self.inner.clone();
let state = &mut *self.state.lock().unwrap(); blocking::spawn(async move { inner.metadata() }).await
loop {
match state {
State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None),
Some(inner) => {
let (s, r) = futures::channel::oneshot::channel();
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = inner.metadata();
let _ = s.send(res);
State::Idle(Some(inner))
}));
return Poll::Ready(Some(r));
}
},
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
})
.map(|opt| opt.ok_or_else(|| io_error("invalid state")))
.await?
.map_err(|_| io_error("blocking task failed"))
.await?
} }
/// Returns the file type for this entry. /// Returns the file type for this entry.
@ -167,35 +118,8 @@ impl DirEntry {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub async fn file_type(&self) -> io::Result<fs::FileType> { pub async fn file_type(&self) -> io::Result<fs::FileType> {
future::poll_fn(|cx| { let inner = self.inner.clone();
let state = &mut *self.state.lock().unwrap(); blocking::spawn(async move { inner.file_type() }).await
loop {
match state {
State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None),
Some(inner) => {
let (s, r) = futures::channel::oneshot::channel();
// Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move {
let res = inner.file_type();
let _ = s.send(res);
State::Idle(Some(inner))
}));
return Poll::Ready(Some(r));
}
},
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
})
.map(|opt| opt.ok_or_else(|| io_error("invalid state")))
.await?
.map_err(|_| io_error("blocking task failed"))
.await?
} }
/// Returns the bare name of this entry without the leading path. /// Returns the bare name of this entry without the leading path.
@ -218,15 +142,10 @@ impl DirEntry {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn file_name(&self) -> OsString { pub fn file_name(&self) -> OsString {
self.file_name.clone() self.inner.file_name()
} }
} }
/// Creates a custom `io::Error` with an arbitrary error type.
fn io_error(err: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> io::Error {
io::Error::new(io::ErrorKind::Other, err)
}
cfg_if! { cfg_if! {
if #[cfg(feature = "docs")] { if #[cfg(feature = "docs")] {
use crate::os::unix::fs::DirEntryExt; use crate::os::unix::fs::DirEntryExt;

@ -7,10 +7,9 @@ use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future::{self, FutureExt, TryFutureExt}; use futures_io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer};
use futures::io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer};
use crate::future::Future; use crate::future::{self, Future};
use crate::io; use crate::io;
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
@ -234,7 +233,7 @@ impl File {
State::Idle(opt) => match opt.take() { State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None), None => return Poll::Ready(None),
Some(inner) => { Some(inner) => {
let (s, r) = futures::channel::oneshot::channel(); let (s, r) = futures_channel::oneshot::channel();
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { *state = State::Busy(blocking::spawn(async move {
@ -247,14 +246,14 @@ impl File {
} }
}, },
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
}) })
.map(|opt| opt.ok_or_else(|| io_error("file closed"))) .await
.await? .ok_or_else(|| io_error("file closed"))?
.map_err(|_| io_error("blocking task failed")) .await
.await? .map_err(|_| io_error("blocking task failed"))?
} }
/// Similar to [`sync_all`], except that it may not synchronize file metadata. /// Similar to [`sync_all`], except that it may not synchronize file metadata.
@ -289,7 +288,7 @@ impl File {
State::Idle(opt) => match opt.take() { State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None), None => return Poll::Ready(None),
Some(inner) => { Some(inner) => {
let (s, r) = futures::channel::oneshot::channel(); let (s, r) = futures_channel::oneshot::channel();
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { *state = State::Busy(blocking::spawn(async move {
@ -302,14 +301,14 @@ impl File {
} }
}, },
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
}) })
.map(|opt| opt.ok_or_else(|| io_error("file closed"))) .await
.await? .ok_or_else(|| io_error("file closed"))?
.map_err(|_| io_error("blocking task failed")) .await
.await? .map_err(|_| io_error("blocking task failed"))?
} }
/// Truncates or extends the underlying file. /// Truncates or extends the underlying file.
@ -346,7 +345,7 @@ impl File {
State::Idle(opt) => match opt.take() { State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None), None => return Poll::Ready(None),
Some(inner) => { Some(inner) => {
let (s, r) = futures::channel::oneshot::channel(); let (s, r) = futures_channel::oneshot::channel();
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { *state = State::Busy(blocking::spawn(async move {
@ -359,14 +358,14 @@ impl File {
} }
}, },
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
}) })
.map(|opt| opt.ok_or_else(|| io_error("file closed"))) .await
.await? .ok_or_else(|| io_error("file closed"))?
.map_err(|_| io_error("blocking task failed")) .await
.await? .map_err(|_| io_error("blocking task failed"))?
} }
/// Queries metadata about the file. /// Queries metadata about the file.
@ -392,7 +391,7 @@ impl File {
State::Idle(opt) => match opt.take() { State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None), None => return Poll::Ready(None),
Some(inner) => { Some(inner) => {
let (s, r) = futures::channel::oneshot::channel(); let (s, r) = futures_channel::oneshot::channel();
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { *state = State::Busy(blocking::spawn(async move {
@ -405,14 +404,14 @@ impl File {
} }
}, },
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
}) })
.map(|opt| opt.ok_or_else(|| io_error("file closed"))) .await
.await? .ok_or_else(|| io_error("file closed"))?
.map_err(|_| io_error("blocking task failed")) .await
.await? .map_err(|_| io_error("blocking task failed"))?
} }
/// Changes the permissions on the underlying file. /// Changes the permissions on the underlying file.
@ -448,7 +447,7 @@ impl File {
State::Idle(opt) => match opt.take() { State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None), None => return Poll::Ready(None),
Some(inner) => { Some(inner) => {
let (s, r) = futures::channel::oneshot::channel(); let (s, r) = futures_channel::oneshot::channel();
let perm = perm.take().unwrap(); let perm = perm.take().unwrap();
// Start the operation asynchronously. // Start the operation asynchronously.
@ -462,14 +461,14 @@ impl File {
} }
}, },
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
}) })
.map(|opt| opt.ok_or_else(|| io_error("file closed"))) .await
.await? .ok_or_else(|| io_error("file closed"))?
.map_err(|_| io_error("blocking task failed")) .await
.await? .map_err(|_| io_error("blocking task failed"))?
} }
} }
@ -543,7 +542,7 @@ impl AsyncRead for &File {
})); }));
} }
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }
@ -619,7 +618,7 @@ impl AsyncWrite for &File {
} }
} }
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }
@ -652,7 +651,7 @@ impl AsyncWrite for &File {
} }
} }
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }
@ -677,7 +676,7 @@ impl AsyncWrite for &File {
})); }));
} }
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }
@ -723,7 +722,7 @@ impl AsyncSeek for &File {
} }
} }
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }

@ -1,7 +1,6 @@
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex;
use super::DirEntry; use super::DirEntry;
use crate::future::Future; use crate::future::Future;
@ -64,71 +63,45 @@ pub async fn read_dir<P: AsRef<Path>>(path: P) -> io::Result<ReadDir> {
/// [`DirEntry`]: struct.DirEntry.html /// [`DirEntry`]: struct.DirEntry.html
/// [`std::fs::ReadDir`]: https://doc.rust-lang.org/std/fs/struct.ReadDir.html /// [`std::fs::ReadDir`]: https://doc.rust-lang.org/std/fs/struct.ReadDir.html
#[derive(Debug)] #[derive(Debug)]
pub struct ReadDir(Mutex<State>); pub struct ReadDir(State);
/// The state of an asynchronous `ReadDir`. /// The state of an asynchronous `ReadDir`.
/// ///
/// The `ReadDir` can be either idle or busy performing an asynchronous operation. /// The `ReadDir` can be either idle or busy performing an asynchronous operation.
#[derive(Debug)] #[derive(Debug)]
enum State { enum State {
Idle(Option<Inner>), Idle(Option<fs::ReadDir>),
Busy(blocking::JoinHandle<State>), Busy(blocking::JoinHandle<(fs::ReadDir, Option<io::Result<fs::DirEntry>>)>),
}
/// Inner representation of an asynchronous `DirEntry`.
#[derive(Debug)]
struct Inner {
/// The blocking handle.
read_dir: fs::ReadDir,
/// The next item in the stream.
item: Option<io::Result<DirEntry>>,
} }
impl ReadDir { impl ReadDir {
/// Creates an asynchronous `ReadDir` from a synchronous handle. /// Creates an asynchronous `ReadDir` from a synchronous handle.
pub(crate) fn new(inner: fs::ReadDir) -> ReadDir { pub(crate) fn new(inner: fs::ReadDir) -> ReadDir {
ReadDir(Mutex::new(State::Idle(Some(Inner { ReadDir(State::Idle(Some(inner)))
read_dir: inner,
item: None,
}))))
} }
} }
impl futures::Stream for ReadDir { impl futures_core::stream::Stream for ReadDir {
type Item = io::Result<DirEntry>; type Item = io::Result<DirEntry>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let state = &mut *self.0.lock().unwrap();
loop { loop {
match state { match &mut self.0 {
State::Idle(opt) => { State::Idle(opt) => {
let inner = match opt.as_mut() {
None => return Poll::Ready(None),
Some(inner) => inner,
};
// Check if the operation has completed.
if let Some(res) = inner.item.take() {
return Poll::Ready(Some(res));
} else {
let mut inner = opt.take().unwrap(); let mut inner = opt.take().unwrap();
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(blocking::spawn(async move { self.0 = State::Busy(blocking::spawn(async move {
match inner.read_dir.next() { let next = inner.next();
None => State::Idle(None), (inner, next)
Some(res) => {
inner.item = Some(res.map(DirEntry::new));
State::Idle(Some(inner))
}
}
})); }));
} }
}
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => {
let (inner, opt) = futures_core::ready!(Pin::new(task).poll(cx));
self.0 = State::Idle(Some(inner));
return Poll::Ready(opt.map(|res| res.map(DirEntry::new)));
}
} }
} }
} }

@ -6,9 +6,11 @@ pub use std::future::Future;
use cfg_if::cfg_if; use cfg_if::cfg_if;
pub use pending::pending; pub use pending::pending;
pub use poll_fn::poll_fn;
pub use ready::ready; pub use ready::ready;
mod pending; mod pending;
mod poll_fn;
mod ready; mod ready;
cfg_if! { cfg_if! {

@ -1,16 +1,23 @@
use std::marker::PhantomData;
use std::pin::Pin;
use crate::future::Future;
use crate::task::{Context, Poll};
/// Never resolves to a value. /// Never resolves to a value.
/// ///
/// # Examples /// # Examples
///
/// ``` /// ```
/// # fn main() { async_std::task::block_on(async { /// # fn main() { async_std::task::block_on(async {
/// # /// #
/// use std::time::Duration; /// use std::time::Duration;
/// ///
/// use async_std::future::pending; /// use async_std::future;
/// use async_std::io; /// use async_std::io;
/// ///
/// let dur = Duration::from_secs(1); /// let dur = Duration::from_secs(1);
/// let fut = pending(); /// let fut = future::pending();
/// ///
/// let res: io::Result<()> = io::timeout(dur, fut).await; /// let res: io::Result<()> = io::timeout(dur, fut).await;
/// assert!(res.is_err()); /// assert!(res.is_err());
@ -18,5 +25,20 @@
/// # }) } /// # }) }
/// ``` /// ```
pub async fn pending<T>() -> T { pub async fn pending<T>() -> T {
futures::future::pending::<T>().await let fut = Pending {
_marker: PhantomData,
};
fut.await
}
struct Pending<T> {
_marker: PhantomData<T>,
}
impl<T> Future for Pending<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<T> {
Poll::Pending
}
} }

@ -0,0 +1,49 @@
use std::pin::Pin;
use crate::future::Future;
use crate::task::{Context, Poll};
/// Creates a new future wrapping around a function returning [`Poll`].
///
/// Polling the returned future delegates to the wrapped function.
///
/// # Examples
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::future;
/// use async_std::task::{Context, Poll};
///
/// fn poll_greeting(_: &mut Context<'_>) -> Poll<String> {
/// Poll::Ready("hello world".to_string())
/// }
///
/// assert_eq!(future::poll_fn(poll_greeting).await, "hello world");
/// #
/// # }) }
/// ```
pub async fn poll_fn<F, T>(f: F) -> T
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
let fut = PollFn { f };
fut.await
}
struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(&mut self.f)(cx)
}
}

@ -3,7 +3,7 @@ use std::pin::Pin;
use std::str; use std::str;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncBufRead; use futures_io::AsyncBufRead;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
@ -212,7 +212,7 @@ impl<T: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, T> {
} = &mut *self; } = &mut *self;
let reader = Pin::new(reader); let reader = Pin::new(reader);
let ret = futures::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() { if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| { Poll::Ready(ret.and_then(|_| {
Err(io::Error::new( Err(io::Error::new(
@ -247,7 +247,7 @@ pub struct Lines<R> {
read: usize, read: usize,
} }
impl<R: AsyncBufRead> futures::Stream for Lines<R> { impl<R: AsyncBufRead> futures_core::stream::Stream for Lines<R> {
type Item = io::Result<String>; type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
@ -258,7 +258,7 @@ impl<R: AsyncBufRead> futures::Stream for Lines<R> {
read, read,
} = unsafe { self.get_unchecked_mut() }; } = unsafe { self.get_unchecked_mut() };
let reader = unsafe { Pin::new_unchecked(reader) }; let reader = unsafe { Pin::new_unchecked(reader) };
let n = futures::ready!(read_line_internal(reader, cx, buf, bytes, read))?; let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?;
if n == 0 && buf.is_empty() { if n == 0 && buf.is_empty() {
return Poll::Ready(None); return Poll::Ready(None);
} }
@ -279,7 +279,7 @@ pub fn read_line_internal<R: AsyncBufRead + ?Sized>(
bytes: &mut Vec<u8>, bytes: &mut Vec<u8>,
read: &mut usize, read: &mut usize,
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
let ret = futures::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read));
if str::from_utf8(&bytes).is_err() { if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| { Poll::Ready(ret.and_then(|_| {
Err(io::Error::new( Err(io::Error::new(
@ -305,7 +305,7 @@ pub fn read_until_internal<R: AsyncBufRead + ?Sized>(
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
loop { loop {
let (done, used) = { let (done, used) = {
let available = futures::ready!(reader.as_mut().poll_fill_buf(cx))?; let available = futures_core::ready!(reader.as_mut().poll_fill_buf(cx))?;
if let Some(i) = memchr::memchr(byte, available) { if let Some(i) = memchr::memchr(byte, available) {
buf.extend_from_slice(&available[..=i]); buf.extend_from_slice(&available[..=i]);
(true, i + 1) (true, i + 1)

@ -2,7 +2,7 @@ use std::io::{IoSliceMut, Read as _};
use std::pin::Pin; use std::pin::Pin;
use std::{cmp, fmt}; use std::{cmp, fmt};
use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer}; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer};
use crate::io::{self, SeekFrom}; use crate::io::{self, SeekFrom};
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
@ -51,7 +51,7 @@ pub struct BufReader<R> {
cap: usize, cap: usize,
} }
impl<R: AsyncRead> BufReader<R> { impl<R: io::Read> BufReader<R> {
/// Creates a buffered reader with default buffer capacity. /// Creates a buffered reader with default buffer capacity.
/// ///
/// The default capacity is currently 8 KB, but may change in the future. /// The default capacity is currently 8 KB, but may change in the future.
@ -87,19 +87,13 @@ impl<R: AsyncRead> BufReader<R> {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> { pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
unsafe {
let mut buffer = Vec::with_capacity(capacity);
buffer.set_len(capacity);
inner.initializer().initialize(&mut buffer);
BufReader { BufReader {
inner, inner,
buf: buffer.into_boxed_slice(), buf: vec![0; capacity].into_boxed_slice(),
pos: 0, pos: 0,
cap: 0, cap: 0,
} }
} }
}
} }
impl<R> BufReader<R> { impl<R> BufReader<R> {
@ -209,11 +203,11 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
// (larger than our internal buffer), bypass our internal buffer // (larger than our internal buffer), bypass our internal buffer
// entirely. // entirely.
if self.pos == self.cap && buf.len() >= self.buf.len() { if self.pos == self.cap && buf.len() >= self.buf.len() {
let res = futures::ready!(self.as_mut().inner().poll_read(cx, buf)); let res = futures_core::ready!(self.as_mut().inner().poll_read(cx, buf));
self.discard_buffer(); self.discard_buffer();
return Poll::Ready(res); return Poll::Ready(res);
} }
let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?; let mut rem = futures_core::ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = rem.read(buf)?; let nread = rem.read(buf)?;
self.consume(nread); self.consume(nread);
Poll::Ready(Ok(nread)) Poll::Ready(Ok(nread))
@ -226,11 +220,11 @@ impl<R: AsyncRead> AsyncRead for BufReader<R> {
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
let total_len = bufs.iter().map(|b| b.len()).sum::<usize>(); let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
if self.pos == self.cap && total_len >= self.buf.len() { if self.pos == self.cap && total_len >= self.buf.len() {
let res = futures::ready!(self.as_mut().inner().poll_read_vectored(cx, bufs)); let res = futures_core::ready!(self.as_mut().inner().poll_read_vectored(cx, bufs));
self.discard_buffer(); self.discard_buffer();
return Poll::Ready(res); return Poll::Ready(res);
} }
let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?; let mut rem = futures_core::ready!(self.as_mut().poll_fill_buf(cx))?;
let nread = rem.read_vectored(bufs)?; let nread = rem.read_vectored(bufs)?;
self.consume(nread); self.consume(nread);
Poll::Ready(Ok(nread)) Poll::Ready(Ok(nread))
@ -261,7 +255,7 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
// to tell the compiler that the pos..cap slice is always valid. // to tell the compiler that the pos..cap slice is always valid.
if *pos >= *cap { if *pos >= *cap {
debug_assert!(*pos == *cap); debug_assert!(*pos == *cap);
*cap = futures::ready!(inner.as_mut().poll_read(cx, buf))?; *cap = futures_core::ready!(inner.as_mut().poll_read(cx, buf))?;
*pos = 0; *pos = 0;
} }
Poll::Ready(Ok(&buf[*pos..*cap])) Poll::Ready(Ok(&buf[*pos..*cap]))
@ -272,7 +266,7 @@ impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
} }
} }
impl<R: AsyncRead + fmt::Debug> fmt::Debug for BufReader<R> { impl<R: io::Read + fmt::Debug> fmt::Debug for BufReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufReader") f.debug_struct("BufReader")
.field("reader", &self.inner) .field("reader", &self.inner)
@ -316,25 +310,26 @@ impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
// support seeking by i64::min_value() so we need to handle underflow when subtracting // support seeking by i64::min_value() so we need to handle underflow when subtracting
// remainder. // remainder.
if let Some(offset) = n.checked_sub(remainder) { if let Some(offset) = n.checked_sub(remainder) {
result = futures::ready!( result = futures_core::ready!(
self.as_mut() self.as_mut()
.inner() .inner()
.poll_seek(cx, SeekFrom::Current(offset)) .poll_seek(cx, SeekFrom::Current(offset))
)?; )?;
} else { } else {
// seek backwards by our remainder, and then by the offset // seek backwards by our remainder, and then by the offset
futures::ready!( futures_core::ready!(
self.as_mut() self.as_mut()
.inner() .inner()
.poll_seek(cx, SeekFrom::Current(-remainder)) .poll_seek(cx, SeekFrom::Current(-remainder))
)?; )?;
self.as_mut().discard_buffer(); self.as_mut().discard_buffer();
result = result = futures_core::ready!(
futures::ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?; self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n))
)?;
} }
} else { } else {
// Seeking with Start/End doesn't care about our buffer length. // Seeking with Start/End doesn't care about our buffer length.
result = futures::ready!(self.as_mut().inner().poll_seek(cx, pos))?; result = futures_core::ready!(self.as_mut().inner().poll_seek(cx, pos))?;
} }
self.discard_buffer(); self.discard_buffer();
Poll::Ready(Ok(result)) Poll::Ready(Ok(result))

@ -1,6 +1,10 @@
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite}; use std::pin::Pin;
use crate::io; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
use crate::future::Future;
use crate::io::{self, BufReader};
use crate::task::{Context, Poll};
/// Copies the entire contents of a reader into a writer. /// Copies the entire contents of a reader into a writer.
/// ///
@ -44,6 +48,55 @@ where
R: AsyncRead + Unpin + ?Sized, R: AsyncRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized, W: AsyncWrite + Unpin + ?Sized,
{ {
let bytes_read = reader.copy_into(writer).await?; pub struct CopyFuture<'a, R, W: ?Sized> {
Ok(bytes_read) reader: R,
writer: &'a mut W,
amt: u64,
}
impl<R, W: Unpin + ?Sized> CopyFuture<'_, R, W> {
fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, Pin<&mut W>, &mut u64) {
unsafe {
let this = self.get_unchecked_mut();
(
Pin::new_unchecked(&mut this.reader),
Pin::new(&mut *this.writer),
&mut this.amt,
)
}
}
}
impl<R, W> Future for CopyFuture<'_, R, W>
where
R: AsyncBufRead,
W: AsyncWrite + Unpin + ?Sized,
{
type Output = io::Result<u64>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut reader, mut writer, amt) = self.project();
loop {
let buffer = futures_core::ready!(reader.as_mut().poll_fill_buf(cx))?;
if buffer.is_empty() {
futures_core::ready!(writer.as_mut().poll_flush(cx))?;
return Poll::Ready(Ok(*amt));
}
let i = futures_core::ready!(writer.as_mut().poll_write(cx, buffer))?;
if i == 0 {
return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
}
*amt += i as u64;
reader.as_mut().consume(i);
}
}
}
let future = CopyFuture {
reader: BufReader::new(reader),
writer,
amt: 0,
};
future.await
} }

@ -1,7 +1,7 @@
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use futures::io::{AsyncBufRead, AsyncRead, Initializer}; use futures_io::{AsyncBufRead, AsyncRead, Initializer};
use crate::io; use crate::io;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -4,7 +4,7 @@ use std::pin::Pin;
use std::str; use std::str;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncRead; use futures_io::AsyncRead;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
@ -290,7 +290,7 @@ impl<T: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, T> {
} = &mut *self; } = &mut *self;
let reader = Pin::new(reader); let reader = Pin::new(reader);
let ret = futures::ready!(read_to_end_internal(reader, cx, bytes, *start_len)); let ret = futures_core::ready!(read_to_end_internal(reader, cx, bytes, *start_len));
if str::from_utf8(&bytes).is_err() { if str::from_utf8(&bytes).is_err() {
Poll::Ready(ret.and_then(|_| { Poll::Ready(ret.and_then(|_| {
Err(io::Error::new( Err(io::Error::new(
@ -321,7 +321,7 @@ impl<T: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, T> {
let Self { reader, buf } = &mut *self; let Self { reader, buf } = &mut *self;
while !buf.is_empty() { while !buf.is_empty() {
let n = futures::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?; let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n); let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n);
*buf = rest; *buf = rest;
@ -377,7 +377,7 @@ pub fn read_to_end_internal<R: AsyncRead + ?Sized>(
} }
} }
match futures::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
Ok(0) => { Ok(0) => {
ret = Poll::Ready(Ok(g.len - start_len)); ret = Poll::Ready(Ok(g.len - start_len));
break; break;

@ -1,7 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncSeek; use futures_io::AsyncSeek;
use crate::future::Future; use crate::future::Future;
use crate::io::{self, SeekFrom}; use crate::io::{self, SeekFrom};

@ -1,7 +1,7 @@
use std::fmt; use std::fmt;
use std::pin::Pin; use std::pin::Pin;
use futures::io::AsyncWrite; use futures_io::AsyncWrite;
use crate::io; use crate::io;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};

@ -3,7 +3,7 @@ use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncWrite; use futures_io::AsyncWrite;
use crate::future::Future; use crate::future::Future;
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
@ -125,7 +125,7 @@ impl AsyncWrite for Stderr {
} }
} }
// Poll the asynchronous operation the stderr is currently blocked on. // Poll the asynchronous operation the stderr is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }
@ -153,7 +153,7 @@ impl AsyncWrite for Stderr {
} }
} }
// Poll the asynchronous operation the stderr is currently blocked on. // Poll the asynchronous operation the stderr is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }

@ -3,10 +3,9 @@ use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future; use futures_io::{AsyncRead, Initializer};
use futures::io::{AsyncRead, Initializer};
use crate::future::Future; use crate::future::{self, Future};
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
/// Constructs a new handle to the standard input of the current process. /// Constructs a new handle to the standard input of the current process.
@ -130,7 +129,7 @@ impl Stdin {
} }
} }
// Poll the asynchronous operation the stdin is currently blocked on. // Poll the asynchronous operation the stdin is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
}) })
@ -182,7 +181,7 @@ impl AsyncRead for Stdin {
} }
} }
// Poll the asynchronous operation the stdin is currently blocked on. // Poll the asynchronous operation the stdin is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }

@ -3,7 +3,7 @@ use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncWrite; use futures_io::AsyncWrite;
use crate::future::Future; use crate::future::Future;
use crate::task::{blocking, Context, Poll}; use crate::task::{blocking, Context, Poll};
@ -125,7 +125,7 @@ impl AsyncWrite for Stdout {
} }
} }
// Poll the asynchronous operation the stdout is currently blocked on. // Poll the asynchronous operation the stdout is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }
@ -153,7 +153,7 @@ impl AsyncWrite for Stdout {
} }
} }
// Poll the asynchronous operation the stdout is currently blocked on. // Poll the asynchronous operation the stdout is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)),
} }
} }
} }

@ -3,7 +3,7 @@ use std::mem;
use std::pin::Pin; use std::pin::Pin;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::io::AsyncWrite; use futures_io::AsyncWrite;
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
@ -201,7 +201,7 @@ impl<T: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, T> {
let Self { writer, buf } = &mut *self; let Self { writer, buf } = &mut *self;
while !buf.is_empty() { while !buf.is_empty() {
let n = futures::ready!(Pin::new(&mut **writer).poll_write(cx, buf))?; let n = futures_core::ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
let (_, rest) = mem::replace(buf, &[]).split_at(n); let (_, rest) = mem::replace(buf, &[]).split_at(n);
*buf = rest; *buf = rest;

@ -1,15 +1,14 @@
use std::marker::PhantomData;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
use std::pin::Pin; use std::pin::Pin;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future::{ready, Ready};
use crate::future::Future; use crate::future::Future;
use crate::io; use crate::io;
use crate::task::blocking; use crate::task::blocking;
use crate::task::{Context, Poll}; use crate::task::{Context, Poll};
use std::marker::PhantomData;
cfg_if! { cfg_if! {
if #[cfg(feature = "docs")] { if #[cfg(feature = "docs")] {
@ -47,19 +46,22 @@ pub trait ToSocketAddrs {
#[doc(hidden)] #[doc(hidden)]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub enum ToSocketAddrsFuture<'a, I: Iterator<Item = SocketAddr>> { pub enum ToSocketAddrsFuture<'a, I> {
Phantom(PhantomData<&'a ()>), Phantom(PhantomData<&'a ()>),
Join(blocking::JoinHandle<io::Result<I>>), Join(blocking::JoinHandle<io::Result<I>>),
Ready(Ready<io::Result<I>>), Ready(Option<io::Result<I>>),
} }
impl<I: Iterator<Item = SocketAddr>> Future for ToSocketAddrsFuture<'_, I> { impl<I: Iterator<Item = SocketAddr>> Future for ToSocketAddrsFuture<'_, I> {
type Output = io::Result<I>; type Output = io::Result<I>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.get_mut() { match unsafe { self.get_unchecked_mut() } {
ToSocketAddrsFuture::Join(f) => Pin::new(&mut *f).poll(cx), ToSocketAddrsFuture::Join(f) => Pin::new(&mut *f).poll(cx),
ToSocketAddrsFuture::Ready(f) => Pin::new(&mut *f).poll(cx), ToSocketAddrsFuture::Ready(res) => {
let res = res.take().expect("polled a completed future");
Poll::Ready(res)
}
_ => unreachable!(), _ => unreachable!(),
} }
} }
@ -69,7 +71,7 @@ impl ToSocketAddrs for SocketAddr {
type Iter = std::option::IntoIter<SocketAddr>; type Iter = std::option::IntoIter<SocketAddr>;
fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) {
ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self)))
} }
} }
@ -77,7 +79,7 @@ impl ToSocketAddrs for SocketAddrV4 {
type Iter = std::option::IntoIter<SocketAddr>; type Iter = std::option::IntoIter<SocketAddr>;
fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) {
ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self)))
} }
} }
@ -85,7 +87,7 @@ impl ToSocketAddrs for SocketAddrV6 {
type Iter = std::option::IntoIter<SocketAddr>; type Iter = std::option::IntoIter<SocketAddr>;
fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) {
ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self)))
} }
} }
@ -93,7 +95,7 @@ impl ToSocketAddrs for (IpAddr, u16) {
type Iter = std::option::IntoIter<SocketAddr>; type Iter = std::option::IntoIter<SocketAddr>;
fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) {
ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self)))
} }
} }
@ -101,7 +103,7 @@ impl ToSocketAddrs for (Ipv4Addr, u16) {
type Iter = std::option::IntoIter<SocketAddr>; type Iter = std::option::IntoIter<SocketAddr>;
fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) {
ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self)))
} }
} }
@ -109,7 +111,7 @@ impl ToSocketAddrs for (Ipv6Addr, u16) {
type Iter = std::option::IntoIter<SocketAddr>; type Iter = std::option::IntoIter<SocketAddr>;
fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) {
ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self)))
} }
} }
@ -141,7 +143,7 @@ impl<'a> ToSocketAddrs for &'a [SocketAddr] {
type Iter = std::iter::Cloned<std::slice::Iter<'a, SocketAddr>>; type Iter = std::iter::Cloned<std::slice::Iter<'a, SocketAddr>>;
fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) {
ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self)))
} }
} }

@ -4,7 +4,7 @@ use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use futures::io::{AsyncRead, AsyncWrite}; use futures_io::{AsyncRead, AsyncWrite};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use mio::{self, Evented}; use mio::{self, Evented};
use slab::Slab; use slab::Slab;
@ -303,7 +303,7 @@ impl<T: Evented + std::io::Read + Unpin> AsyncRead for IoHandle<T> {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
futures::ready!(Pin::new(&mut *self).poll_readable(cx)?); futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?);
match self.source.read(buf) { match self.source.read(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -324,7 +324,7 @@ where
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
futures::ready!(Pin::new(&mut *self).poll_readable(cx)?); futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?);
match (&self.source).read(buf) { match (&self.source).read(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -342,7 +342,7 @@ impl<T: Evented + std::io::Write + Unpin> AsyncWrite for IoHandle<T> {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
futures::ready!(self.poll_writable(cx)?); futures_core::ready!(self.poll_writable(cx)?);
match self.source.write(buf) { match self.source.write(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -354,7 +354,7 @@ impl<T: Evented + std::io::Write + Unpin> AsyncWrite for IoHandle<T> {
} }
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
futures::ready!(self.poll_writable(cx)?); futures_core::ready!(self.poll_writable(cx)?);
match self.source.flush() { match self.source.flush() {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -379,7 +379,7 @@ where
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &[u8], buf: &[u8],
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
futures::ready!(self.poll_writable(cx)?); futures_core::ready!(self.poll_writable(cx)?);
match (&self.source).write(buf) { match (&self.source).write(buf) {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
@ -391,7 +391,7 @@ where
} }
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
futures::ready!(self.poll_writable(cx)?); futures_core::ready!(self.poll_writable(cx)?);
match (&self.source).flush() { match (&self.source).flush() {
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {

@ -2,10 +2,9 @@ use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future;
use super::TcpStream; use super::TcpStream;
use crate::future::Future; use crate::future::{self, Future};
use crate::io; use crate::io;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
@ -129,7 +128,7 @@ impl TcpListener {
/// ``` /// ```
pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_readable(cx)?); futures_core::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().accept_std() { match self.io_handle.get_ref().accept_std() {
Ok((io, addr)) => { Ok((io, addr)) => {
@ -225,14 +224,14 @@ impl TcpListener {
#[derive(Debug)] #[derive(Debug)]
pub struct Incoming<'a>(&'a TcpListener); pub struct Incoming<'a>(&'a TcpListener);
impl<'a> futures::Stream for Incoming<'a> { impl<'a> futures_core::stream::Stream for Incoming<'a> {
type Item = io::Result<TcpStream>; type Item = io::Result<TcpStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let future = self.0.accept(); let future = self.0.accept();
pin_utils::pin_mut!(future); pin_utils::pin_mut!(future);
let (socket, _) = futures::ready!(future.poll(cx))?; let (socket, _) = futures_core::ready!(future.poll(cx))?;
Poll::Ready(Some(Ok(socket))) Poll::Ready(Some(Ok(socket)))
} }
} }

@ -4,9 +4,9 @@ use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future; use futures_io::{AsyncRead, AsyncWrite};
use futures::io::{AsyncRead, AsyncWrite};
use crate::future;
use crate::io; use crate::io;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
@ -259,7 +259,7 @@ impl TcpStream {
/// ``` /// ```
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> { pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
let res = future::poll_fn(|cx| { let res = future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_readable(cx)?); futures_core::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().peek(buf) { match self.io_handle.get_ref().peek(buf) {
Ok(len) => Poll::Ready(Ok(len)), Ok(len) => Poll::Ready(Ok(len)),

@ -2,9 +2,9 @@ use std::io;
use std::net::SocketAddr; use std::net::SocketAddr;
use cfg_if::cfg_if; use cfg_if::cfg_if;
use futures::future;
use std::net::{Ipv4Addr, Ipv6Addr}; use std::net::{Ipv4Addr, Ipv6Addr};
use crate::future;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
use crate::task::Poll; use crate::task::Poll;
@ -165,7 +165,7 @@ impl UdpSocket {
}; };
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_writable(cx)?); futures_core::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().send_to(buf, &addr) { match self.io_handle.get_ref().send_to(buf, &addr) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -200,7 +200,7 @@ impl UdpSocket {
/// ``` /// ```
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_readable(cx)?); futures_core::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().recv_from(buf) { match self.io_handle.get_ref().recv_from(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -282,7 +282,7 @@ impl UdpSocket {
/// ``` /// ```
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_writable(cx)?); futures_core::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().send(buf) { match self.io_handle.get_ref().send(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -317,7 +317,7 @@ impl UdpSocket {
/// ``` /// ```
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_readable(cx)?); futures_core::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().recv(buf) { match self.io_handle.get_ref().recv(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),

@ -4,10 +4,10 @@ use std::fmt;
use std::net::Shutdown; use std::net::Shutdown;
use std::path::Path; use std::path::Path;
use futures::future;
use mio_uds; use mio_uds;
use super::SocketAddr; use super::SocketAddr;
use crate::future;
use crate::io; use crate::io;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
@ -203,7 +203,7 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_readable(cx)?); futures_core::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().recv_from(buf) { match self.io_handle.get_ref().recv_from(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -236,7 +236,7 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> { pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_writable(cx)?); futures_core::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().recv(buf) { match self.io_handle.get_ref().recv(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -268,7 +268,7 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> { pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_writable(cx)?); futures_core::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().send_to(buf, path.as_ref()) { match self.io_handle.get_ref().send_to(buf, path.as_ref()) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),
@ -301,7 +301,7 @@ impl UnixDatagram {
/// ``` /// ```
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> { pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_writable(cx)?); futures_core::ready!(self.io_handle.poll_writable(cx)?);
match self.io_handle.get_ref().send(buf) { match self.io_handle.get_ref().send(buf) {
Ok(n) => Poll::Ready(Ok(n)), Ok(n) => Poll::Ready(Ok(n)),

@ -4,12 +4,11 @@ use std::fmt;
use std::path::Path; use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use futures::future;
use mio_uds; use mio_uds;
use super::SocketAddr; use super::SocketAddr;
use super::UnixStream; use super::UnixStream;
use crate::future::Future; use crate::future::{self, Future};
use crate::io; use crate::io;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
@ -97,7 +96,7 @@ impl UnixListener {
/// ``` /// ```
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
future::poll_fn(|cx| { future::poll_fn(|cx| {
futures::ready!(self.io_handle.poll_readable(cx)?); futures_core::ready!(self.io_handle.poll_readable(cx)?);
match self.io_handle.get_ref().accept_std() { match self.io_handle.get_ref().accept_std() {
Ok(Some((io, addr))) => { Ok(Some((io, addr))) => {
@ -198,14 +197,14 @@ impl fmt::Debug for UnixListener {
#[derive(Debug)] #[derive(Debug)]
pub struct Incoming<'a>(&'a UnixListener); pub struct Incoming<'a>(&'a UnixListener);
impl futures::Stream for Incoming<'_> { impl futures_core::stream::Stream for Incoming<'_> {
type Item = io::Result<UnixStream>; type Item = io::Result<UnixStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let future = self.0.accept(); let future = self.0.accept();
futures::pin_mut!(future); pin_utils::pin_mut!(future);
let (socket, _) = futures::ready!(future.poll(cx))?; let (socket, _) = futures_core::ready!(future.poll(cx))?;
Poll::Ready(Some(Ok(socket))) Poll::Ready(Some(Ok(socket)))
} }
} }

@ -6,11 +6,11 @@ use std::net::Shutdown;
use std::path::Path; use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use futures::future; use futures_io::{AsyncRead, AsyncWrite};
use futures::io::{AsyncRead, AsyncWrite};
use mio_uds; use mio_uds;
use super::SocketAddr; use super::SocketAddr;
use crate::future;
use crate::io; use crate::io;
use crate::net::driver::IoHandle; use crate::net::driver::IoHandle;
use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
@ -81,7 +81,7 @@ impl UnixStream {
future::poll_fn(|cx| { future::poll_fn(|cx| {
match &mut state { match &mut state {
State::Waiting(stream) => { State::Waiting(stream) => {
futures::ready!(stream.io_handle.poll_writable(cx)?); futures_core::ready!(stream.io_handle.poll_writable(cx)?);
if let Some(err) = stream.io_handle.get_ref().take_error()? { if let Some(err) = stream.io_handle.get_ref().take_error()? {
return Poll::Ready(Err(err)); return Poll::Ready(Err(err));

@ -35,7 +35,7 @@ pub struct Empty<T> {
_marker: PhantomData<T>, _marker: PhantomData<T>,
} }
impl<T> futures::Stream for Empty<T> { impl<T> futures_core::stream::Stream for Empty<T> {
type Item = T; type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {

@ -33,7 +33,7 @@ pub struct Once<T> {
value: Option<T>, value: Option<T>,
} }
impl<T: Unpin> futures::Stream for Once<T> { impl<T: Unpin> futures_core::stream::Stream for Once<T> {
type Item = T; type Item = T;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> { fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {

@ -36,7 +36,7 @@ pub struct Repeat<T> {
item: T, item: T,
} }
impl<T: Clone> futures::Stream for Repeat<T> { impl<T: Clone> futures_core::stream::Stream for Repeat<T> {
type Item = T; type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {

@ -233,8 +233,8 @@ pub trait Stream {
} }
} }
impl<T: futures::Stream + Unpin + ?Sized> Stream for T { impl<T: futures_core::stream::Stream + Unpin + ?Sized> Stream for T {
type Item = <Self as futures::Stream>::Item; type Item = <Self as futures_core::stream::Stream>::Item;
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>) fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
where where
@ -250,7 +250,7 @@ pub struct NextFuture<'a, T: Unpin + ?Sized> {
stream: &'a mut T, stream: &'a mut T,
} }
impl<T: futures::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> { impl<T: futures_core::stream::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
type Output = Option<T::Item>; type Output = Option<T::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -267,19 +267,19 @@ pub struct Take<S> {
impl<S: Unpin> Unpin for Take<S> {} impl<S: Unpin> Unpin for Take<S> {}
impl<S: futures::Stream> Take<S> { impl<S: futures_core::stream::Stream> Take<S> {
pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(remaining: usize); pin_utils::unsafe_unpinned!(remaining: usize);
} }
impl<S: futures::Stream> futures::Stream for Take<S> { impl<S: futures_core::stream::Stream> futures_core::stream::Stream for Take<S> {
type Item = S::Item; type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
if self.remaining == 0 { if self.remaining == 0 {
Poll::Ready(None) Poll::Ready(None)
} else { } else {
let next = futures::ready!(self.as_mut().stream().poll_next(cx)); let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next { match next {
Some(_) => *self.as_mut().remaining() -= 1, Some(_) => *self.as_mut().remaining() -= 1,
None => *self.as_mut().remaining() = 0, None => *self.as_mut().remaining() = 0,
@ -311,14 +311,14 @@ where
impl<S, F> Future for AllFuture<'_, S, F, S::Item> impl<S, F> Future for AllFuture<'_, S, F, S::Item>
where where
S: futures::Stream + Unpin + Sized, S: futures_core::stream::Stream + Unpin + Sized,
F: FnMut(S::Item) -> bool, F: FnMut(S::Item) -> bool,
{ {
type Output = bool; type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures::Stream; use futures_core::stream::Stream;
let next = futures::ready!(self.as_mut().stream().poll_next(cx)); let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next { match next {
Some(v) => { Some(v) => {
let result = (self.as_mut().f())(v); let result = (self.as_mut().f())(v);
@ -358,14 +358,14 @@ where
impl<S, F> Future for AnyFuture<'_, S, F, S::Item> impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
where where
S: futures::Stream + Unpin + Sized, S: futures_core::stream::Stream + Unpin + Sized,
F: FnMut(S::Item) -> bool, F: FnMut(S::Item) -> bool,
{ {
type Output = bool; type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures::Stream; use futures_core::stream::Stream;
let next = futures::ready!(self.as_mut().stream().poll_next(cx)); let next = futures_core::ready!(self.as_mut().stream().poll_next(cx));
match next { match next {
Some(v) => { Some(v) => {
let result = (self.as_mut().f())(v); let result = (self.as_mut().f())(v);

@ -0,0 +1,135 @@
use std::cell::UnsafeCell;
use std::mem::{self, ManuallyDrop};
use std::panic::{self, AssertUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{RawWaker, RawWakerVTable};
use std::thread::{self, Thread};
use super::pool;
use super::Builder;
use crate::future::Future;
use crate::task::{Context, Poll, Waker};
/// Spawns a task and blocks the current thread on its result.
///
/// Calling this function is similar to [spawning] a thread and immediately [joining] it, except an
/// asynchronous task will be spawned.
///
/// [spawning]: https://doc.rust-lang.org/std/thread/fn.spawn.html
/// [joining]: https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join
///
/// # Examples
///
/// ```no_run
/// use async_std::task;
///
/// fn main() {
/// task::block_on(async {
/// println!("Hello, world!");
/// })
/// }
/// ```
pub fn block_on<F, T>(future: F) -> T
where
F: Future<Output = T> + Send,
T: Send,
{
unsafe {
// A place on the stack where the result will be stored.
let out = &mut UnsafeCell::new(None);
// Wrap the future into one that stores the result into `out`.
let future = {
let out = out.get();
async move {
let future = CatchUnwindFuture {
future: AssertUnwindSafe(future),
};
*out = Some(future.await);
}
};
// Pin the future onto the stack.
pin_utils::pin_mut!(future);
// Transmute the future into one that is static and sendable.
let future = mem::transmute::<
Pin<&mut dyn Future<Output = ()>>,
Pin<&'static mut (dyn Future<Output = ()> + Send)>,
>(future);
// Spawn the future and wait for it to complete.
block(pool::spawn_with_builder(Builder::new(), future, "block_on"));
// Take out the result.
match (*out.get()).take().unwrap() {
Ok(v) => v,
Err(err) => panic::resume_unwind(err),
}
}
}
struct CatchUnwindFuture<F> {
future: F,
}
impl<F> CatchUnwindFuture<F> {
pin_utils::unsafe_pinned!(future: F);
}
impl<F: Future + UnwindSafe> Future for CatchUnwindFuture<F> {
type Output = thread::Result<F::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
panic::catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok)
}
}
fn block<F: Future>(f: F) -> F::Output {
thread_local! {
static ARC_THREAD: Arc<Thread> = Arc::new(thread::current());
}
pin_utils::pin_mut!(f);
ARC_THREAD.with(|arc_thread: &Arc<Thread>| {
let ptr = (&**arc_thread as *const Thread) as *const ();
let vt = vtable();
let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) };
let cx = &mut Context::from_waker(&waker);
loop {
if let Poll::Ready(t) = f.as_mut().poll(cx) {
return t;
}
thread::park();
}
})
}
fn vtable() -> &'static RawWakerVTable {
unsafe fn clone_raw(ptr: *const ()) -> RawWaker {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread));
mem::forget(arc.clone());
RawWaker::new(ptr, vtable())
}
unsafe fn wake_raw(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const Thread);
arc.unpark();
}
unsafe fn wake_by_ref_raw(ptr: *const ()) {
let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread));
arc.unpark();
}
unsafe fn drop_raw(ptr: *const ()) {
drop(Arc::from_raw(ptr as *const Thread))
}
&RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw)
}

@ -24,11 +24,13 @@
#[doc(inline)] #[doc(inline)]
pub use std::task::{Context, Poll, Waker}; pub use std::task::{Context, Poll, Waker};
pub use block_on::block_on;
pub use local::{AccessError, LocalKey}; pub use local::{AccessError, LocalKey};
pub use pool::{block_on, current, spawn, Builder}; pub use pool::{current, spawn, Builder};
pub use sleep::sleep; pub use sleep::sleep;
pub use task::{JoinHandle, Task, TaskId}; pub use task::{JoinHandle, Task, TaskId};
mod block_on;
mod local; mod local;
mod pool; mod pool;
mod sleep; mod sleep;

@ -1,13 +1,10 @@
use std::cell::{Cell, UnsafeCell}; use std::cell::Cell;
use std::fmt::Arguments; use std::fmt::Arguments;
use std::mem; use std::mem;
use std::panic::{self, AssertUnwindSafe};
use std::pin::Pin;
use std::ptr; use std::ptr;
use std::thread; use std::thread;
use crossbeam_channel::{unbounded, Sender}; use crossbeam_channel::{unbounded, Sender};
use futures::future::FutureExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use super::task; use super::task;
@ -70,63 +67,6 @@ where
spawn_with_builder(Builder::new(), future, "spawn") spawn_with_builder(Builder::new(), future, "spawn")
} }
/// Spawns a task and blocks the current thread on its result.
///
/// Calling this function is similar to [spawning] a thread and immediately [joining] it, except an
/// asynchronous task will be spawned.
///
/// [spawning]: https://doc.rust-lang.org/std/thread/fn.spawn.html
/// [joining]: https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join
///
/// # Examples
///
/// ```no_run
/// use async_std::task;
///
/// fn main() {
/// task::block_on(async {
/// println!("Hello, world!");
/// })
/// }
/// ```
pub fn block_on<F, T>(future: F) -> T
where
F: Future<Output = T> + Send,
T: Send,
{
unsafe {
// A place on the stack where the result will be stored.
let out = &mut UnsafeCell::new(None);
// Wrap the future into one that stores the result into `out`.
let future = {
let out = out.get();
async move {
let v = AssertUnwindSafe(future).catch_unwind().await;
*out = Some(v);
}
};
// Pin the future onto the stack.
futures::pin_mut!(future);
// Transmute the future into one that is static and sendable.
let future = mem::transmute::<
Pin<&mut dyn Future<Output = ()>>,
Pin<&'static mut (dyn Future<Output = ()> + Send)>,
>(future);
// Spawn the future and wait for it to complete.
futures::executor::block_on(spawn_with_builder(Builder::new(), future, "block_on"));
// Take out the result.
match (*out.get()).take().unwrap() {
Ok(v) => v,
Err(err) => panic::resume_unwind(err),
}
}
}
/// Task builder that configures the settings of a new task. /// Task builder that configures the settings of a new task.
#[derive(Debug)] #[derive(Debug)]
pub struct Builder { pub struct Builder {

Loading…
Cancel
Save