diff --git a/Cargo.toml b/Cargo.toml index 230de640..24badb1e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,9 @@ unstable = [] async-task = "1.0.0" cfg-if = "0.1.9" crossbeam-channel = "0.3.9" +futures-channel-preview = "0.3.0-alpha.18" +futures-core-preview = "0.3.0-alpha.18" +futures-io-preview = "0.3.0-alpha.18" futures-timer = "0.3.0" lazy_static = "1.3.0" log = { version = "0.4.8", features = ["kv_unstable"] } @@ -37,11 +40,11 @@ num_cpus = "1.10.0" pin-utils = "0.1.0-alpha.4" slab = "0.4.2" -[dependencies.futures-preview] -version = "0.3.0-alpha.18" -features = ["async-await", "nightly"] - [dev-dependencies] femme = "1.1.0" -tempdir = "0.3.7" surf = "1.0.1" +tempdir = "0.3.7" + +[dev-dependencies.futures-preview] +version = "0.3.0-alpha.18" +features = ["std", "nightly", "async-await"] diff --git a/docs/src/tutorial/connecting_readers_and_writers.md b/docs/src/tutorial/connecting_readers_and_writers.md index 31631f88..f3ccf2d9 100644 --- a/docs/src/tutorial/connecting_readers_and_writers.md +++ b/docs/src/tutorial/connecting_readers_and_writers.md @@ -20,7 +20,7 @@ The order of events "Bob sends message to Alice" and "Alice joins" is determined # task, # }; # use futures::channel::mpsc; -# use futures::SinkExt; +# use futures::sink::SinkExt; # use std::sync::Arc; # # type Result = std::result::Result>; diff --git a/docs/src/tutorial/sending_messages.md b/docs/src/tutorial/sending_messages.md index 465d6740..80784c2a 100644 --- a/docs/src/tutorial/sending_messages.md +++ b/docs/src/tutorial/sending_messages.md @@ -20,7 +20,7 @@ if Alice and Charley send two messages to Bob at the same time, Bob will see the # prelude::Stream, # }; use futures::channel::mpsc; // 1 -use futures::SinkExt; +use futures::sink::SinkExt; use std::sync::Arc; # type Result = std::result::Result>; diff --git a/src/fs/dir_entry.rs b/src/fs/dir_entry.rs index 4e07a3f2..c7928ebd 100644 --- a/src/fs/dir_entry.rs +++ b/src/fs/dir_entry.rs @@ -1,15 +1,12 @@ use std::ffi::OsString; use std::fs; use std::path::PathBuf; -use std::pin::Pin; -use std::sync::Mutex; +use std::sync::Arc; use cfg_if::cfg_if; -use futures::future::{self, FutureExt, TryFutureExt}; -use crate::future::Future; use crate::io; -use crate::task::{blocking, Poll}; +use crate::task::blocking; /// An entry inside a directory. /// @@ -21,26 +18,11 @@ use crate::task::{blocking, Poll}; /// [`std::fs::DirEntry`]: https://doc.rust-lang.org/std/fs/struct.DirEntry.html #[derive(Debug)] pub struct DirEntry { - /// The state of the entry. - state: Mutex, - - /// The full path to the entry. - path: PathBuf, + /// The inner synchronous `DirEntry`. + inner: Arc, #[cfg(unix)] ino: u64, - - /// The bare name of the entry without the leading path. - file_name: OsString, -} - -/// The state of an asynchronous `DirEntry`. -/// -/// The `DirEntry` can be either idle or busy performing an asynchronous operation. -#[derive(Debug)] -enum State { - Idle(Option), - Busy(blocking::JoinHandle), } impl DirEntry { @@ -48,17 +30,13 @@ impl DirEntry { pub(crate) fn new(inner: fs::DirEntry) -> DirEntry { #[cfg(unix)] let dir_entry = DirEntry { - path: inner.path(), - file_name: inner.file_name(), ino: inner.ino(), - state: Mutex::new(State::Idle(Some(inner))), + inner: Arc::new(inner), }; #[cfg(windows)] let dir_entry = DirEntry { - path: inner.path(), - file_name: inner.file_name(), - state: Mutex::new(State::Idle(Some(inner))), + inner: Arc::new(inner), }; dir_entry @@ -89,7 +67,7 @@ impl DirEntry { /// # Ok(()) }) } /// ``` pub fn path(&self) -> PathBuf { - self.path.clone() + self.inner.path() } /// Returns the metadata for this entry. @@ -114,35 +92,8 @@ impl DirEntry { /// # Ok(()) }) } /// ``` pub async fn metadata(&self) -> io::Result { - future::poll_fn(|cx| { - let state = &mut *self.state.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => match opt.take() { - None => return Poll::Ready(None), - Some(inner) => { - let (s, r) = futures::channel::oneshot::channel(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.metadata(); - let _ = s.send(res); - State::Idle(Some(inner)) - })); - - return Poll::Ready(Some(r)); - } - }, - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), - } - } - }) - .map(|opt| opt.ok_or_else(|| io_error("invalid state"))) - .await? - .map_err(|_| io_error("blocking task failed")) - .await? + let inner = self.inner.clone(); + blocking::spawn(async move { inner.metadata() }).await } /// Returns the file type for this entry. @@ -167,35 +118,8 @@ impl DirEntry { /// # Ok(()) }) } /// ``` pub async fn file_type(&self) -> io::Result { - future::poll_fn(|cx| { - let state = &mut *self.state.lock().unwrap(); - - loop { - match state { - State::Idle(opt) => match opt.take() { - None => return Poll::Ready(None), - Some(inner) => { - let (s, r) = futures::channel::oneshot::channel(); - - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - let res = inner.file_type(); - let _ = s.send(res); - State::Idle(Some(inner)) - })); - - return Poll::Ready(Some(r)); - } - }, - // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), - } - } - }) - .map(|opt| opt.ok_or_else(|| io_error("invalid state"))) - .await? - .map_err(|_| io_error("blocking task failed")) - .await? + let inner = self.inner.clone(); + blocking::spawn(async move { inner.file_type() }).await } /// Returns the bare name of this entry without the leading path. @@ -218,15 +142,10 @@ impl DirEntry { /// # Ok(()) }) } /// ``` pub fn file_name(&self) -> OsString { - self.file_name.clone() + self.inner.file_name() } } -/// Creates a custom `io::Error` with an arbitrary error type. -fn io_error(err: impl Into>) -> io::Error { - io::Error::new(io::ErrorKind::Other, err) -} - cfg_if! { if #[cfg(feature = "docs")] { use crate::os::unix::fs::DirEntryExt; diff --git a/src/fs/file.rs b/src/fs/file.rs index b297181d..09e2ad64 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -7,10 +7,9 @@ use std::pin::Pin; use std::sync::Mutex; use cfg_if::cfg_if; -use futures::future::{self, FutureExt, TryFutureExt}; -use futures::io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer}; +use futures_io::{AsyncRead, AsyncSeek, AsyncWrite, Initializer}; -use crate::future::Future; +use crate::future::{self, Future}; use crate::io; use crate::task::{blocking, Context, Poll}; @@ -234,7 +233,7 @@ impl File { State::Idle(opt) => match opt.take() { None => return Poll::Ready(None), Some(inner) => { - let (s, r) = futures::channel::oneshot::channel(); + let (s, r) = futures_channel::oneshot::channel(); // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { @@ -247,14 +246,14 @@ impl File { } }, // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } }) - .map(|opt| opt.ok_or_else(|| io_error("file closed"))) - .await? - .map_err(|_| io_error("blocking task failed")) - .await? + .await + .ok_or_else(|| io_error("file closed"))? + .await + .map_err(|_| io_error("blocking task failed"))? } /// Similar to [`sync_all`], except that it may not synchronize file metadata. @@ -289,7 +288,7 @@ impl File { State::Idle(opt) => match opt.take() { None => return Poll::Ready(None), Some(inner) => { - let (s, r) = futures::channel::oneshot::channel(); + let (s, r) = futures_channel::oneshot::channel(); // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { @@ -302,14 +301,14 @@ impl File { } }, // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } }) - .map(|opt| opt.ok_or_else(|| io_error("file closed"))) - .await? - .map_err(|_| io_error("blocking task failed")) - .await? + .await + .ok_or_else(|| io_error("file closed"))? + .await + .map_err(|_| io_error("blocking task failed"))? } /// Truncates or extends the underlying file. @@ -346,7 +345,7 @@ impl File { State::Idle(opt) => match opt.take() { None => return Poll::Ready(None), Some(inner) => { - let (s, r) = futures::channel::oneshot::channel(); + let (s, r) = futures_channel::oneshot::channel(); // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { @@ -359,14 +358,14 @@ impl File { } }, // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } }) - .map(|opt| opt.ok_or_else(|| io_error("file closed"))) - .await? - .map_err(|_| io_error("blocking task failed")) - .await? + .await + .ok_or_else(|| io_error("file closed"))? + .await + .map_err(|_| io_error("blocking task failed"))? } /// Queries metadata about the file. @@ -392,7 +391,7 @@ impl File { State::Idle(opt) => match opt.take() { None => return Poll::Ready(None), Some(inner) => { - let (s, r) = futures::channel::oneshot::channel(); + let (s, r) = futures_channel::oneshot::channel(); // Start the operation asynchronously. *state = State::Busy(blocking::spawn(async move { @@ -405,14 +404,14 @@ impl File { } }, // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } }) - .map(|opt| opt.ok_or_else(|| io_error("file closed"))) - .await? - .map_err(|_| io_error("blocking task failed")) - .await? + .await + .ok_or_else(|| io_error("file closed"))? + .await + .map_err(|_| io_error("blocking task failed"))? } /// Changes the permissions on the underlying file. @@ -448,7 +447,7 @@ impl File { State::Idle(opt) => match opt.take() { None => return Poll::Ready(None), Some(inner) => { - let (s, r) = futures::channel::oneshot::channel(); + let (s, r) = futures_channel::oneshot::channel(); let perm = perm.take().unwrap(); // Start the operation asynchronously. @@ -462,14 +461,14 @@ impl File { } }, // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } }) - .map(|opt| opt.ok_or_else(|| io_error("file closed"))) - .await? - .map_err(|_| io_error("blocking task failed")) - .await? + .await + .ok_or_else(|| io_error("file closed"))? + .await + .map_err(|_| io_error("blocking task failed"))? } } @@ -543,7 +542,7 @@ impl AsyncRead for &File { })); } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } @@ -619,7 +618,7 @@ impl AsyncWrite for &File { } } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } @@ -652,7 +651,7 @@ impl AsyncWrite for &File { } } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } @@ -677,7 +676,7 @@ impl AsyncWrite for &File { })); } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } @@ -723,7 +722,7 @@ impl AsyncSeek for &File { } } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } diff --git a/src/fs/read_dir.rs b/src/fs/read_dir.rs index dffab3a4..b3771610 100644 --- a/src/fs/read_dir.rs +++ b/src/fs/read_dir.rs @@ -1,7 +1,6 @@ use std::fs; use std::path::Path; use std::pin::Pin; -use std::sync::Mutex; use super::DirEntry; use crate::future::Future; @@ -64,71 +63,45 @@ pub async fn read_dir>(path: P) -> io::Result { /// [`DirEntry`]: struct.DirEntry.html /// [`std::fs::ReadDir`]: https://doc.rust-lang.org/std/fs/struct.ReadDir.html #[derive(Debug)] -pub struct ReadDir(Mutex); +pub struct ReadDir(State); /// The state of an asynchronous `ReadDir`. /// /// The `ReadDir` can be either idle or busy performing an asynchronous operation. #[derive(Debug)] enum State { - Idle(Option), - Busy(blocking::JoinHandle), -} - -/// Inner representation of an asynchronous `DirEntry`. -#[derive(Debug)] -struct Inner { - /// The blocking handle. - read_dir: fs::ReadDir, - - /// The next item in the stream. - item: Option>, + Idle(Option), + Busy(blocking::JoinHandle<(fs::ReadDir, Option>)>), } impl ReadDir { /// Creates an asynchronous `ReadDir` from a synchronous handle. pub(crate) fn new(inner: fs::ReadDir) -> ReadDir { - ReadDir(Mutex::new(State::Idle(Some(Inner { - read_dir: inner, - item: None, - })))) + ReadDir(State::Idle(Some(inner))) } } -impl futures::Stream for ReadDir { +impl futures_core::stream::Stream for ReadDir { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let state = &mut *self.0.lock().unwrap(); - loop { - match state { + match &mut self.0 { State::Idle(opt) => { - let inner = match opt.as_mut() { - None => return Poll::Ready(None), - Some(inner) => inner, - }; - - // Check if the operation has completed. - if let Some(res) = inner.item.take() { - return Poll::Ready(Some(res)); - } else { - let mut inner = opt.take().unwrap(); + let mut inner = opt.take().unwrap(); - // Start the operation asynchronously. - *state = State::Busy(blocking::spawn(async move { - match inner.read_dir.next() { - None => State::Idle(None), - Some(res) => { - inner.item = Some(res.map(DirEntry::new)); - State::Idle(Some(inner)) - } - } - })); - } + // Start the operation asynchronously. + self.0 = State::Busy(blocking::spawn(async move { + let next = inner.next(); + (inner, next) + })); } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => { + let (inner, opt) = futures_core::ready!(Pin::new(task).poll(cx)); + self.0 = State::Idle(Some(inner)); + return Poll::Ready(opt.map(|res| res.map(DirEntry::new))); + } } } } diff --git a/src/future/mod.rs b/src/future/mod.rs index 5d510a44..7d88b903 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -6,9 +6,11 @@ pub use std::future::Future; use cfg_if::cfg_if; pub use pending::pending; +pub use poll_fn::poll_fn; pub use ready::ready; mod pending; +mod poll_fn; mod ready; cfg_if! { diff --git a/src/future/pending.rs b/src/future/pending.rs index 41284f54..aaee7065 100644 --- a/src/future/pending.rs +++ b/src/future/pending.rs @@ -1,16 +1,23 @@ +use std::marker::PhantomData; +use std::pin::Pin; + +use crate::future::Future; +use crate::task::{Context, Poll}; + /// Never resolves to a value. /// /// # Examples +/// /// ``` /// # fn main() { async_std::task::block_on(async { /// # /// use std::time::Duration; /// -/// use async_std::future::pending; +/// use async_std::future; /// use async_std::io; /// /// let dur = Duration::from_secs(1); -/// let fut = pending(); +/// let fut = future::pending(); /// /// let res: io::Result<()> = io::timeout(dur, fut).await; /// assert!(res.is_err()); @@ -18,5 +25,20 @@ /// # }) } /// ``` pub async fn pending() -> T { - futures::future::pending::().await + let fut = Pending { + _marker: PhantomData, + }; + fut.await +} + +struct Pending { + _marker: PhantomData, +} + +impl Future for Pending { + type Output = T; + + fn poll(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll { + Poll::Pending + } } diff --git a/src/future/poll_fn.rs b/src/future/poll_fn.rs new file mode 100644 index 00000000..116e71c6 --- /dev/null +++ b/src/future/poll_fn.rs @@ -0,0 +1,49 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::task::{Context, Poll}; + +/// Creates a new future wrapping around a function returning [`Poll`]. +/// +/// Polling the returned future delegates to the wrapped function. +/// +/// # Examples +/// +/// ``` +/// # fn main() { async_std::task::block_on(async { +/// # +/// use async_std::future; +/// use async_std::task::{Context, Poll}; +/// +/// fn poll_greeting(_: &mut Context<'_>) -> Poll { +/// Poll::Ready("hello world".to_string()) +/// } +/// +/// assert_eq!(future::poll_fn(poll_greeting).await, "hello world"); +/// # +/// # }) } +/// ``` +pub async fn poll_fn(f: F) -> T +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + let fut = PollFn { f }; + fut.await +} + +struct PollFn { + f: F, +} + +impl Unpin for PollFn {} + +impl Future for PollFn +where + F: FnMut(&mut Context<'_>) -> Poll, +{ + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + (&mut self.f)(cx) + } +} diff --git a/src/io/buf_read.rs b/src/io/buf_read.rs index 6b7b9dbb..b05bf89e 100644 --- a/src/io/buf_read.rs +++ b/src/io/buf_read.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::str; use cfg_if::cfg_if; -use futures::io::AsyncBufRead; +use futures_io::AsyncBufRead; use crate::future::Future; use crate::io; @@ -212,7 +212,7 @@ impl Future for ReadLineFuture<'_, T> { } = &mut *self; let reader = Pin::new(reader); - let ret = futures::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); + let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); if str::from_utf8(&bytes).is_err() { Poll::Ready(ret.and_then(|_| { Err(io::Error::new( @@ -247,7 +247,7 @@ pub struct Lines { read: usize, } -impl futures::Stream for Lines { +impl futures_core::stream::Stream for Lines { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -258,7 +258,7 @@ impl futures::Stream for Lines { read, } = unsafe { self.get_unchecked_mut() }; let reader = unsafe { Pin::new_unchecked(reader) }; - let n = futures::ready!(read_line_internal(reader, cx, buf, bytes, read))?; + let n = futures_core::ready!(read_line_internal(reader, cx, buf, bytes, read))?; if n == 0 && buf.is_empty() { return Poll::Ready(None); } @@ -279,7 +279,7 @@ pub fn read_line_internal( bytes: &mut Vec, read: &mut usize, ) -> Poll> { - let ret = futures::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); + let ret = futures_core::ready!(read_until_internal(reader, cx, b'\n', bytes, read)); if str::from_utf8(&bytes).is_err() { Poll::Ready(ret.and_then(|_| { Err(io::Error::new( @@ -305,7 +305,7 @@ pub fn read_until_internal( ) -> Poll> { loop { let (done, used) = { - let available = futures::ready!(reader.as_mut().poll_fill_buf(cx))?; + let available = futures_core::ready!(reader.as_mut().poll_fill_buf(cx))?; if let Some(i) = memchr::memchr(byte, available) { buf.extend_from_slice(&available[..=i]); (true, i + 1) diff --git a/src/io/buf_reader.rs b/src/io/buf_reader.rs index 2ad10cc6..f38307a9 100644 --- a/src/io/buf_reader.rs +++ b/src/io/buf_reader.rs @@ -2,7 +2,7 @@ use std::io::{IoSliceMut, Read as _}; use std::pin::Pin; use std::{cmp, fmt}; -use futures::io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer}; +use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, Initializer}; use crate::io::{self, SeekFrom}; use crate::task::{Context, Poll}; @@ -51,7 +51,7 @@ pub struct BufReader { cap: usize, } -impl BufReader { +impl BufReader { /// Creates a buffered reader with default buffer capacity. /// /// The default capacity is currently 8 KB, but may change in the future. @@ -87,17 +87,11 @@ impl BufReader { /// # Ok(()) }) } /// ``` pub fn with_capacity(capacity: usize, inner: R) -> BufReader { - unsafe { - let mut buffer = Vec::with_capacity(capacity); - buffer.set_len(capacity); - inner.initializer().initialize(&mut buffer); - - BufReader { - inner, - buf: buffer.into_boxed_slice(), - pos: 0, - cap: 0, - } + BufReader { + inner, + buf: vec![0; capacity].into_boxed_slice(), + pos: 0, + cap: 0, } } } @@ -209,11 +203,11 @@ impl AsyncRead for BufReader { // (larger than our internal buffer), bypass our internal buffer // entirely. if self.pos == self.cap && buf.len() >= self.buf.len() { - let res = futures::ready!(self.as_mut().inner().poll_read(cx, buf)); + let res = futures_core::ready!(self.as_mut().inner().poll_read(cx, buf)); self.discard_buffer(); return Poll::Ready(res); } - let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?; + let mut rem = futures_core::ready!(self.as_mut().poll_fill_buf(cx))?; let nread = rem.read(buf)?; self.consume(nread); Poll::Ready(Ok(nread)) @@ -226,11 +220,11 @@ impl AsyncRead for BufReader { ) -> Poll> { let total_len = bufs.iter().map(|b| b.len()).sum::(); if self.pos == self.cap && total_len >= self.buf.len() { - let res = futures::ready!(self.as_mut().inner().poll_read_vectored(cx, bufs)); + let res = futures_core::ready!(self.as_mut().inner().poll_read_vectored(cx, bufs)); self.discard_buffer(); return Poll::Ready(res); } - let mut rem = futures::ready!(self.as_mut().poll_fill_buf(cx))?; + let mut rem = futures_core::ready!(self.as_mut().poll_fill_buf(cx))?; let nread = rem.read_vectored(bufs)?; self.consume(nread); Poll::Ready(Ok(nread)) @@ -261,7 +255,7 @@ impl AsyncBufRead for BufReader { // to tell the compiler that the pos..cap slice is always valid. if *pos >= *cap { debug_assert!(*pos == *cap); - *cap = futures::ready!(inner.as_mut().poll_read(cx, buf))?; + *cap = futures_core::ready!(inner.as_mut().poll_read(cx, buf))?; *pos = 0; } Poll::Ready(Ok(&buf[*pos..*cap])) @@ -272,7 +266,7 @@ impl AsyncBufRead for BufReader { } } -impl fmt::Debug for BufReader { +impl fmt::Debug for BufReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufReader") .field("reader", &self.inner) @@ -316,25 +310,26 @@ impl AsyncSeek for BufReader { // support seeking by i64::min_value() so we need to handle underflow when subtracting // remainder. if let Some(offset) = n.checked_sub(remainder) { - result = futures::ready!( + result = futures_core::ready!( self.as_mut() .inner() .poll_seek(cx, SeekFrom::Current(offset)) )?; } else { // seek backwards by our remainder, and then by the offset - futures::ready!( + futures_core::ready!( self.as_mut() .inner() .poll_seek(cx, SeekFrom::Current(-remainder)) )?; self.as_mut().discard_buffer(); - result = - futures::ready!(self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)))?; + result = futures_core::ready!( + self.as_mut().inner().poll_seek(cx, SeekFrom::Current(n)) + )?; } } else { // Seeking with Start/End doesn't care about our buffer length. - result = futures::ready!(self.as_mut().inner().poll_seek(cx, pos))?; + result = futures_core::ready!(self.as_mut().inner().poll_seek(cx, pos))?; } self.discard_buffer(); Poll::Ready(Ok(result)) diff --git a/src/io/copy.rs b/src/io/copy.rs index 961c8264..ccc6bc82 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -1,6 +1,10 @@ -use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite}; +use std::pin::Pin; -use crate::io; +use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; + +use crate::future::Future; +use crate::io::{self, BufReader}; +use crate::task::{Context, Poll}; /// Copies the entire contents of a reader into a writer. /// @@ -44,6 +48,55 @@ where R: AsyncRead + Unpin + ?Sized, W: AsyncWrite + Unpin + ?Sized, { - let bytes_read = reader.copy_into(writer).await?; - Ok(bytes_read) + pub struct CopyFuture<'a, R, W: ?Sized> { + reader: R, + writer: &'a mut W, + amt: u64, + } + + impl CopyFuture<'_, R, W> { + fn project(self: Pin<&mut Self>) -> (Pin<&mut R>, Pin<&mut W>, &mut u64) { + unsafe { + let this = self.get_unchecked_mut(); + ( + Pin::new_unchecked(&mut this.reader), + Pin::new(&mut *this.writer), + &mut this.amt, + ) + } + } + } + + impl Future for CopyFuture<'_, R, W> + where + R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, + { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let (mut reader, mut writer, amt) = self.project(); + loop { + let buffer = futures_core::ready!(reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + futures_core::ready!(writer.as_mut().poll_flush(cx))?; + return Poll::Ready(Ok(*amt)); + } + + let i = futures_core::ready!(writer.as_mut().poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + *amt += i as u64; + reader.as_mut().consume(i); + } + } + } + + let future = CopyFuture { + reader: BufReader::new(reader), + writer, + amt: 0, + }; + future.await } diff --git a/src/io/empty.rs b/src/io/empty.rs index a832677d..35ed732b 100644 --- a/src/io/empty.rs +++ b/src/io/empty.rs @@ -1,7 +1,7 @@ use std::fmt; use std::pin::Pin; -use futures::io::{AsyncBufRead, AsyncRead, Initializer}; +use futures_io::{AsyncBufRead, AsyncRead, Initializer}; use crate::io; use crate::task::{Context, Poll}; diff --git a/src/io/read.rs b/src/io/read.rs index cf3732f7..4b6bb1fb 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use std::str; use cfg_if::cfg_if; -use futures::io::AsyncRead; +use futures_io::AsyncRead; use crate::future::Future; use crate::io; @@ -290,7 +290,7 @@ impl Future for ReadToStringFuture<'_, T> { } = &mut *self; let reader = Pin::new(reader); - let ret = futures::ready!(read_to_end_internal(reader, cx, bytes, *start_len)); + let ret = futures_core::ready!(read_to_end_internal(reader, cx, bytes, *start_len)); if str::from_utf8(&bytes).is_err() { Poll::Ready(ret.and_then(|_| { Err(io::Error::new( @@ -321,7 +321,7 @@ impl Future for ReadExactFuture<'_, T> { let Self { reader, buf } = &mut *self; while !buf.is_empty() { - let n = futures::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?; + let n = futures_core::ready!(Pin::new(&mut *reader).poll_read(cx, buf))?; let (_, rest) = mem::replace(buf, &mut []).split_at_mut(n); *buf = rest; @@ -377,7 +377,7 @@ pub fn read_to_end_internal( } } - match futures::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { + match futures_core::ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) { Ok(0) => { ret = Poll::Ready(Ok(g.len - start_len)); break; diff --git a/src/io/seek.rs b/src/io/seek.rs index 9250faaf..61a5d9c5 100644 --- a/src/io/seek.rs +++ b/src/io/seek.rs @@ -1,7 +1,7 @@ use std::pin::Pin; use cfg_if::cfg_if; -use futures::io::AsyncSeek; +use futures_io::AsyncSeek; use crate::future::Future; use crate::io::{self, SeekFrom}; diff --git a/src/io/sink.rs b/src/io/sink.rs index ec38431c..fba56334 100644 --- a/src/io/sink.rs +++ b/src/io/sink.rs @@ -1,7 +1,7 @@ use std::fmt; use std::pin::Pin; -use futures::io::AsyncWrite; +use futures_io::AsyncWrite; use crate::io; use crate::task::{Context, Poll}; diff --git a/src/io/stderr.rs b/src/io/stderr.rs index 73b64ce4..bb2318fa 100644 --- a/src/io/stderr.rs +++ b/src/io/stderr.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::Mutex; use cfg_if::cfg_if; -use futures::io::AsyncWrite; +use futures_io::AsyncWrite; use crate::future::Future; use crate::task::{blocking, Context, Poll}; @@ -125,7 +125,7 @@ impl AsyncWrite for Stderr { } } // Poll the asynchronous operation the stderr is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } @@ -153,7 +153,7 @@ impl AsyncWrite for Stderr { } } // Poll the asynchronous operation the stderr is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } diff --git a/src/io/stdin.rs b/src/io/stdin.rs index bd4c1118..9fe432d0 100644 --- a/src/io/stdin.rs +++ b/src/io/stdin.rs @@ -3,10 +3,9 @@ use std::pin::Pin; use std::sync::Mutex; use cfg_if::cfg_if; -use futures::future; -use futures::io::{AsyncRead, Initializer}; +use futures_io::{AsyncRead, Initializer}; -use crate::future::Future; +use crate::future::{self, Future}; use crate::task::{blocking, Context, Poll}; /// Constructs a new handle to the standard input of the current process. @@ -130,7 +129,7 @@ impl Stdin { } } // Poll the asynchronous operation the stdin is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } }) @@ -182,7 +181,7 @@ impl AsyncRead for Stdin { } } // Poll the asynchronous operation the stdin is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } diff --git a/src/io/stdout.rs b/src/io/stdout.rs index 5045d112..f62f3df3 100644 --- a/src/io/stdout.rs +++ b/src/io/stdout.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::sync::Mutex; use cfg_if::cfg_if; -use futures::io::AsyncWrite; +use futures_io::AsyncWrite; use crate::future::Future; use crate::task::{blocking, Context, Poll}; @@ -125,7 +125,7 @@ impl AsyncWrite for Stdout { } } // Poll the asynchronous operation the stdout is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } @@ -153,7 +153,7 @@ impl AsyncWrite for Stdout { } } // Poll the asynchronous operation the stdout is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => *state = futures_core::ready!(Pin::new(task).poll(cx)), } } } diff --git a/src/io/write.rs b/src/io/write.rs index 0fba81bc..d332eec3 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -3,7 +3,7 @@ use std::mem; use std::pin::Pin; use cfg_if::cfg_if; -use futures::io::AsyncWrite; +use futures_io::AsyncWrite; use crate::future::Future; use crate::io; @@ -201,7 +201,7 @@ impl Future for WriteAllFuture<'_, T> { let Self { writer, buf } = &mut *self; while !buf.is_empty() { - let n = futures::ready!(Pin::new(&mut **writer).poll_write(cx, buf))?; + let n = futures_core::ready!(Pin::new(&mut **writer).poll_write(cx, buf))?; let (_, rest) = mem::replace(buf, &[]).split_at(n); *buf = rest; diff --git a/src/net/addr.rs b/src/net/addr.rs index 39dba52d..71f43a5c 100644 --- a/src/net/addr.rs +++ b/src/net/addr.rs @@ -1,15 +1,14 @@ +use std::marker::PhantomData; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; use std::pin::Pin; use cfg_if::cfg_if; -use futures::future::{ready, Ready}; use crate::future::Future; use crate::io; use crate::task::blocking; use crate::task::{Context, Poll}; -use std::marker::PhantomData; cfg_if! { if #[cfg(feature = "docs")] { @@ -47,19 +46,22 @@ pub trait ToSocketAddrs { #[doc(hidden)] #[allow(missing_debug_implementations)] -pub enum ToSocketAddrsFuture<'a, I: Iterator> { +pub enum ToSocketAddrsFuture<'a, I> { Phantom(PhantomData<&'a ()>), Join(blocking::JoinHandle>), - Ready(Ready>), + Ready(Option>), } impl> Future for ToSocketAddrsFuture<'_, I> { type Output = io::Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.get_mut() { + match unsafe { self.get_unchecked_mut() } { ToSocketAddrsFuture::Join(f) => Pin::new(&mut *f).poll(cx), - ToSocketAddrsFuture::Ready(f) => Pin::new(&mut *f).poll(cx), + ToSocketAddrsFuture::Ready(res) => { + let res = res.take().expect("polled a completed future"); + Poll::Ready(res) + } _ => unreachable!(), } } @@ -69,7 +71,7 @@ impl ToSocketAddrs for SocketAddr { type Iter = std::option::IntoIter; fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { - ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) + ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self))) } } @@ -77,7 +79,7 @@ impl ToSocketAddrs for SocketAddrV4 { type Iter = std::option::IntoIter; fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { - ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) + ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self))) } } @@ -85,7 +87,7 @@ impl ToSocketAddrs for SocketAddrV6 { type Iter = std::option::IntoIter; fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { - ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) + ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self))) } } @@ -93,7 +95,7 @@ impl ToSocketAddrs for (IpAddr, u16) { type Iter = std::option::IntoIter; fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { - ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) + ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self))) } } @@ -101,7 +103,7 @@ impl ToSocketAddrs for (Ipv4Addr, u16) { type Iter = std::option::IntoIter; fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { - ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) + ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self))) } } @@ -109,7 +111,7 @@ impl ToSocketAddrs for (Ipv6Addr, u16) { type Iter = std::option::IntoIter; fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { - ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) + ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self))) } } @@ -141,7 +143,7 @@ impl<'a> ToSocketAddrs for &'a [SocketAddr] { type Iter = std::iter::Cloned>; fn to_socket_addrs(&self) -> ret!('_, ToSocketAddrsFuture, Self::Iter) { - ToSocketAddrsFuture::Ready(ready(std::net::ToSocketAddrs::to_socket_addrs(self))) + ToSocketAddrsFuture::Ready(Some(std::net::ToSocketAddrs::to_socket_addrs(self))) } } diff --git a/src/net/driver/mod.rs b/src/net/driver/mod.rs index 4002254d..04ab4bb6 100644 --- a/src/net/driver/mod.rs +++ b/src/net/driver/mod.rs @@ -4,7 +4,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; -use futures::io::{AsyncRead, AsyncWrite}; +use futures_io::{AsyncRead, AsyncWrite}; use lazy_static::lazy_static; use mio::{self, Evented}; use slab::Slab; @@ -303,7 +303,7 @@ impl AsyncRead for IoHandle { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - futures::ready!(Pin::new(&mut *self).poll_readable(cx)?); + futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?); match self.source.read(buf) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -324,7 +324,7 @@ where cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - futures::ready!(Pin::new(&mut *self).poll_readable(cx)?); + futures_core::ready!(Pin::new(&mut *self).poll_readable(cx)?); match (&self.source).read(buf) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -342,7 +342,7 @@ impl AsyncWrite for IoHandle { cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - futures::ready!(self.poll_writable(cx)?); + futures_core::ready!(self.poll_writable(cx)?); match self.source.write(buf) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -354,7 +354,7 @@ impl AsyncWrite for IoHandle { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - futures::ready!(self.poll_writable(cx)?); + futures_core::ready!(self.poll_writable(cx)?); match self.source.flush() { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -379,7 +379,7 @@ where cx: &mut Context<'_>, buf: &[u8], ) -> Poll> { - futures::ready!(self.poll_writable(cx)?); + futures_core::ready!(self.poll_writable(cx)?); match (&self.source).write(buf) { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { @@ -391,7 +391,7 @@ where } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - futures::ready!(self.poll_writable(cx)?); + futures_core::ready!(self.poll_writable(cx)?); match (&self.source).flush() { Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => { diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index ac18387a..67a48650 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -2,10 +2,9 @@ use std::net::SocketAddr; use std::pin::Pin; use cfg_if::cfg_if; -use futures::future; use super::TcpStream; -use crate::future::Future; +use crate::future::{self, Future}; use crate::io; use crate::net::driver::IoHandle; use crate::net::ToSocketAddrs; @@ -129,7 +128,7 @@ impl TcpListener { /// ``` pub async fn accept(&self) -> io::Result<(TcpStream, SocketAddr)> { future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_readable(cx)?); + futures_core::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().accept_std() { Ok((io, addr)) => { @@ -225,14 +224,14 @@ impl TcpListener { #[derive(Debug)] pub struct Incoming<'a>(&'a TcpListener); -impl<'a> futures::Stream for Incoming<'a> { +impl<'a> futures_core::stream::Stream for Incoming<'a> { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let future = self.0.accept(); pin_utils::pin_mut!(future); - let (socket, _) = futures::ready!(future.poll(cx))?; + let (socket, _) = futures_core::ready!(future.poll(cx))?; Poll::Ready(Some(Ok(socket))) } } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 5ea181f2..b63b7f2e 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -4,9 +4,9 @@ use std::net::SocketAddr; use std::pin::Pin; use cfg_if::cfg_if; -use futures::future; -use futures::io::{AsyncRead, AsyncWrite}; +use futures_io::{AsyncRead, AsyncWrite}; +use crate::future; use crate::io; use crate::net::driver::IoHandle; use crate::net::ToSocketAddrs; @@ -259,7 +259,7 @@ impl TcpStream { /// ``` pub async fn peek(&self, buf: &mut [u8]) -> io::Result { let res = future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_readable(cx)?); + futures_core::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().peek(buf) { Ok(len) => Poll::Ready(Ok(len)), diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index 19119a56..23024274 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -2,9 +2,9 @@ use std::io; use std::net::SocketAddr; use cfg_if::cfg_if; -use futures::future; use std::net::{Ipv4Addr, Ipv6Addr}; +use crate::future; use crate::net::driver::IoHandle; use crate::net::ToSocketAddrs; use crate::task::Poll; @@ -165,7 +165,7 @@ impl UdpSocket { }; future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_writable(cx)?); + futures_core::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().send_to(buf, &addr) { Ok(n) => Poll::Ready(Ok(n)), @@ -200,7 +200,7 @@ impl UdpSocket { /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_readable(cx)?); + futures_core::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().recv_from(buf) { Ok(n) => Poll::Ready(Ok(n)), @@ -282,7 +282,7 @@ impl UdpSocket { /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_writable(cx)?); + futures_core::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().send(buf) { Ok(n) => Poll::Ready(Ok(n)), @@ -317,7 +317,7 @@ impl UdpSocket { /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_readable(cx)?); + futures_core::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().recv(buf) { Ok(n) => Poll::Ready(Ok(n)), diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index 62debc4c..fb5b5f16 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -4,10 +4,10 @@ use std::fmt; use std::net::Shutdown; use std::path::Path; -use futures::future; use mio_uds; use super::SocketAddr; +use crate::future; use crate::io; use crate::net::driver::IoHandle; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -203,7 +203,7 @@ impl UnixDatagram { /// ``` pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_readable(cx)?); + futures_core::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().recv_from(buf) { Ok(n) => Poll::Ready(Ok(n)), @@ -236,7 +236,7 @@ impl UnixDatagram { /// ``` pub async fn recv(&self, buf: &mut [u8]) -> io::Result { future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_writable(cx)?); + futures_core::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().recv(buf) { Ok(n) => Poll::Ready(Ok(n)), @@ -268,7 +268,7 @@ impl UnixDatagram { /// ``` pub async fn send_to>(&self, buf: &[u8], path: P) -> io::Result { future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_writable(cx)?); + futures_core::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().send_to(buf, path.as_ref()) { Ok(n) => Poll::Ready(Ok(n)), @@ -301,7 +301,7 @@ impl UnixDatagram { /// ``` pub async fn send(&self, buf: &[u8]) -> io::Result { future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_writable(cx)?); + futures_core::ready!(self.io_handle.poll_writable(cx)?); match self.io_handle.get_ref().send(buf) { Ok(n) => Poll::Ready(Ok(n)), diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index 74b0a28d..61df2517 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -4,12 +4,11 @@ use std::fmt; use std::path::Path; use std::pin::Pin; -use futures::future; use mio_uds; use super::SocketAddr; use super::UnixStream; -use crate::future::Future; +use crate::future::{self, Future}; use crate::io; use crate::net::driver::IoHandle; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -97,7 +96,7 @@ impl UnixListener { /// ``` pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { future::poll_fn(|cx| { - futures::ready!(self.io_handle.poll_readable(cx)?); + futures_core::ready!(self.io_handle.poll_readable(cx)?); match self.io_handle.get_ref().accept_std() { Ok(Some((io, addr))) => { @@ -198,14 +197,14 @@ impl fmt::Debug for UnixListener { #[derive(Debug)] pub struct Incoming<'a>(&'a UnixListener); -impl futures::Stream for Incoming<'_> { +impl futures_core::stream::Stream for Incoming<'_> { type Item = io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let future = self.0.accept(); - futures::pin_mut!(future); + pin_utils::pin_mut!(future); - let (socket, _) = futures::ready!(future.poll(cx))?; + let (socket, _) = futures_core::ready!(future.poll(cx))?; Poll::Ready(Some(Ok(socket))) } } diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 74afdeeb..389720e4 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -6,11 +6,11 @@ use std::net::Shutdown; use std::path::Path; use std::pin::Pin; -use futures::future; -use futures::io::{AsyncRead, AsyncWrite}; +use futures_io::{AsyncRead, AsyncWrite}; use mio_uds; use super::SocketAddr; +use crate::future; use crate::io; use crate::net::driver::IoHandle; use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; @@ -81,7 +81,7 @@ impl UnixStream { future::poll_fn(|cx| { match &mut state { State::Waiting(stream) => { - futures::ready!(stream.io_handle.poll_writable(cx)?); + futures_core::ready!(stream.io_handle.poll_writable(cx)?); if let Some(err) = stream.io_handle.get_ref().take_error()? { return Poll::Ready(Err(err)); diff --git a/src/stream/empty.rs b/src/stream/empty.rs index 4445e435..f4cf5525 100644 --- a/src/stream/empty.rs +++ b/src/stream/empty.rs @@ -35,7 +35,7 @@ pub struct Empty { _marker: PhantomData, } -impl futures::Stream for Empty { +impl futures_core::stream::Stream for Empty { type Item = T; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { diff --git a/src/stream/once.rs b/src/stream/once.rs index 160d6cf4..09811d8f 100644 --- a/src/stream/once.rs +++ b/src/stream/once.rs @@ -33,7 +33,7 @@ pub struct Once { value: Option, } -impl futures::Stream for Once { +impl futures_core::stream::Stream for Once { type Item = T; fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { diff --git a/src/stream/repeat.rs b/src/stream/repeat.rs index d42b7271..c2ee97ae 100644 --- a/src/stream/repeat.rs +++ b/src/stream/repeat.rs @@ -36,7 +36,7 @@ pub struct Repeat { item: T, } -impl futures::Stream for Repeat { +impl futures_core::stream::Stream for Repeat { type Item = T; fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { diff --git a/src/stream/stream.rs b/src/stream/stream.rs index 7a127b09..698eb320 100644 --- a/src/stream/stream.rs +++ b/src/stream/stream.rs @@ -233,8 +233,8 @@ pub trait Stream { } } -impl Stream for T { - type Item = ::Item; +impl Stream for T { + type Item = ::Item; fn next(&mut self) -> ret!('_, NextFuture, Option) where @@ -250,7 +250,7 @@ pub struct NextFuture<'a, T: Unpin + ?Sized> { stream: &'a mut T, } -impl Future for NextFuture<'_, T> { +impl Future for NextFuture<'_, T> { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -267,19 +267,19 @@ pub struct Take { impl Unpin for Take {} -impl Take { +impl Take { pin_utils::unsafe_pinned!(stream: S); pin_utils::unsafe_unpinned!(remaining: usize); } -impl futures::Stream for Take { +impl futures_core::stream::Stream for Take { type Item = S::Item; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if self.remaining == 0 { Poll::Ready(None) } else { - let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); match next { Some(_) => *self.as_mut().remaining() -= 1, None => *self.as_mut().remaining() = 0, @@ -311,14 +311,14 @@ where impl Future for AllFuture<'_, S, F, S::Item> where - S: futures::Stream + Unpin + Sized, + S: futures_core::stream::Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures::Stream; - let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + use futures_core::stream::Stream; + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); match next { Some(v) => { let result = (self.as_mut().f())(v); @@ -358,14 +358,14 @@ where impl Future for AnyFuture<'_, S, F, S::Item> where - S: futures::Stream + Unpin + Sized, + S: futures_core::stream::Stream + Unpin + Sized, F: FnMut(S::Item) -> bool, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - use futures::Stream; - let next = futures::ready!(self.as_mut().stream().poll_next(cx)); + use futures_core::stream::Stream; + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); match next { Some(v) => { let result = (self.as_mut().f())(v); diff --git a/src/task/block_on.rs b/src/task/block_on.rs new file mode 100644 index 00000000..92c4b517 --- /dev/null +++ b/src/task/block_on.rs @@ -0,0 +1,135 @@ +use std::cell::UnsafeCell; +use std::mem::{self, ManuallyDrop}; +use std::panic::{self, AssertUnwindSafe, UnwindSafe}; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{RawWaker, RawWakerVTable}; +use std::thread::{self, Thread}; + +use super::pool; +use super::Builder; +use crate::future::Future; +use crate::task::{Context, Poll, Waker}; + +/// Spawns a task and blocks the current thread on its result. +/// +/// Calling this function is similar to [spawning] a thread and immediately [joining] it, except an +/// asynchronous task will be spawned. +/// +/// [spawning]: https://doc.rust-lang.org/std/thread/fn.spawn.html +/// [joining]: https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join +/// +/// # Examples +/// +/// ```no_run +/// use async_std::task; +/// +/// fn main() { +/// task::block_on(async { +/// println!("Hello, world!"); +/// }) +/// } +/// ``` +pub fn block_on(future: F) -> T +where + F: Future + Send, + T: Send, +{ + unsafe { + // A place on the stack where the result will be stored. + let out = &mut UnsafeCell::new(None); + + // Wrap the future into one that stores the result into `out`. + let future = { + let out = out.get(); + + async move { + let future = CatchUnwindFuture { + future: AssertUnwindSafe(future), + }; + *out = Some(future.await); + } + }; + + // Pin the future onto the stack. + pin_utils::pin_mut!(future); + + // Transmute the future into one that is static and sendable. + let future = mem::transmute::< + Pin<&mut dyn Future>, + Pin<&'static mut (dyn Future + Send)>, + >(future); + + // Spawn the future and wait for it to complete. + block(pool::spawn_with_builder(Builder::new(), future, "block_on")); + + // Take out the result. + match (*out.get()).take().unwrap() { + Ok(v) => v, + Err(err) => panic::resume_unwind(err), + } + } +} + +struct CatchUnwindFuture { + future: F, +} + +impl CatchUnwindFuture { + pin_utils::unsafe_pinned!(future: F); +} + +impl Future for CatchUnwindFuture { + type Output = thread::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + panic::catch_unwind(AssertUnwindSafe(|| self.future().poll(cx)))?.map(Ok) + } +} + +fn block(f: F) -> F::Output { + thread_local! { + static ARC_THREAD: Arc = Arc::new(thread::current()); + } + + pin_utils::pin_mut!(f); + + ARC_THREAD.with(|arc_thread: &Arc| { + let ptr = (&**arc_thread as *const Thread) as *const (); + let vt = vtable(); + + let waker = unsafe { ManuallyDrop::new(Waker::from_raw(RawWaker::new(ptr, vt))) }; + let cx = &mut Context::from_waker(&waker); + + loop { + if let Poll::Ready(t) = f.as_mut().poll(cx) { + return t; + } + thread::park(); + } + }) +} + +fn vtable() -> &'static RawWakerVTable { + unsafe fn clone_raw(ptr: *const ()) -> RawWaker { + let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread)); + mem::forget(arc.clone()); + RawWaker::new(ptr, vtable()) + } + + unsafe fn wake_raw(ptr: *const ()) { + let arc = Arc::from_raw(ptr as *const Thread); + arc.unpark(); + } + + unsafe fn wake_by_ref_raw(ptr: *const ()) { + let arc = ManuallyDrop::new(Arc::from_raw(ptr as *const Thread)); + arc.unpark(); + } + + unsafe fn drop_raw(ptr: *const ()) { + drop(Arc::from_raw(ptr as *const Thread)) + } + + &RawWakerVTable::new(clone_raw, wake_raw, wake_by_ref_raw, drop_raw) +} diff --git a/src/task/mod.rs b/src/task/mod.rs index 42b7e088..eef72846 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -24,11 +24,13 @@ #[doc(inline)] pub use std::task::{Context, Poll, Waker}; +pub use block_on::block_on; pub use local::{AccessError, LocalKey}; -pub use pool::{block_on, current, spawn, Builder}; +pub use pool::{current, spawn, Builder}; pub use sleep::sleep; pub use task::{JoinHandle, Task, TaskId}; +mod block_on; mod local; mod pool; mod sleep; diff --git a/src/task/pool.rs b/src/task/pool.rs index ab09ffbb..3640909f 100644 --- a/src/task/pool.rs +++ b/src/task/pool.rs @@ -1,13 +1,10 @@ -use std::cell::{Cell, UnsafeCell}; +use std::cell::Cell; use std::fmt::Arguments; use std::mem; -use std::panic::{self, AssertUnwindSafe}; -use std::pin::Pin; use std::ptr; use std::thread; use crossbeam_channel::{unbounded, Sender}; -use futures::future::FutureExt; use lazy_static::lazy_static; use super::task; @@ -70,63 +67,6 @@ where spawn_with_builder(Builder::new(), future, "spawn") } -/// Spawns a task and blocks the current thread on its result. -/// -/// Calling this function is similar to [spawning] a thread and immediately [joining] it, except an -/// asynchronous task will be spawned. -/// -/// [spawning]: https://doc.rust-lang.org/std/thread/fn.spawn.html -/// [joining]: https://doc.rust-lang.org/std/thread/struct.JoinHandle.html#method.join -/// -/// # Examples -/// -/// ```no_run -/// use async_std::task; -/// -/// fn main() { -/// task::block_on(async { -/// println!("Hello, world!"); -/// }) -/// } -/// ``` -pub fn block_on(future: F) -> T -where - F: Future + Send, - T: Send, -{ - unsafe { - // A place on the stack where the result will be stored. - let out = &mut UnsafeCell::new(None); - - // Wrap the future into one that stores the result into `out`. - let future = { - let out = out.get(); - async move { - let v = AssertUnwindSafe(future).catch_unwind().await; - *out = Some(v); - } - }; - - // Pin the future onto the stack. - futures::pin_mut!(future); - - // Transmute the future into one that is static and sendable. - let future = mem::transmute::< - Pin<&mut dyn Future>, - Pin<&'static mut (dyn Future + Send)>, - >(future); - - // Spawn the future and wait for it to complete. - futures::executor::block_on(spawn_with_builder(Builder::new(), future, "block_on")); - - // Take out the result. - match (*out.get()).take().unwrap() { - Ok(v) => v, - Err(err) => panic::resume_unwind(err), - } - } -} - /// Task builder that configures the settings of a new task. #[derive(Debug)] pub struct Builder {