diff --git a/docs/src/concepts/tasks.md b/docs/src/concepts/tasks.md index 2db8cb0c..c3dbbe20 100644 --- a/docs/src/concepts/tasks.md +++ b/docs/src/concepts/tasks.md @@ -2,7 +2,7 @@ Now that we know what Futures are, we want to run them! -In `async-std`, the [`tasks`][tasks] module is responsible for this. The simplest way is using the `block_on` function: +In `async-std`, the [`task`][tasks] module is responsible for this. The simplest way is using the `block_on` function: ```rust,edition2018 # extern crate async_std; diff --git a/src/io/write/write_fmt.rs b/src/io/write/write_fmt.rs index d20c41d8..318b1c37 100644 --- a/src/io/write/write_fmt.rs +++ b/src/io/write/write_fmt.rs @@ -11,7 +11,7 @@ pub struct WriteFmtFuture<'a, T: Unpin + ?Sized> { pub(crate) writer: &'a mut T, pub(crate) res: Option>>, pub(crate) buffer: Option>, - pub(crate) amt: u64, + pub(crate) amt: usize, } impl Future for WriteFmtFuture<'_, T> { @@ -37,15 +37,15 @@ impl Future for WriteFmtFuture<'_, T> { // Copy the data from the buffer into the writer until it's done. loop { - if *amt == buffer.len() as u64 { + if *amt == buffer.len() { futures_core::ready!(Pin::new(&mut **writer).poll_flush(cx))?; return Poll::Ready(Ok(())); } - let i = futures_core::ready!(Pin::new(&mut **writer).poll_write(cx, buffer))?; + let i = futures_core::ready!(Pin::new(&mut **writer).poll_write(cx, &buffer[*amt..]))?; if i == 0 { return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); } - *amt += i as u64; + *amt += i; } } } diff --git a/src/lib.rs b/src/lib.rs index 9a1c00c1..ebbb3965 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -111,7 +111,7 @@ //! [files]: fs/struct.File.html //! [TCP]: net/struct.TcpStream.html //! [UDP]: net/struct.UdpSocket.html -//! [`io`]: fs/struct.File.html +//! [`io`]: io/index.html //! [`sync`]: sync/index.html //! [`channel`]: channel/index.html //! diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 72c1bcae..cb7213ed 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,6 +1,6 @@ use std::fmt; -use std::future::Future; use std::net::SocketAddr; +use std::net::TcpStream as StdTcpStream; use std::pin::Pin; use async_io::Async; @@ -149,8 +149,7 @@ impl TcpListener { #[must_use] pub fn incoming(&self) -> Incoming<'_> { Incoming { - listener: self, - accept: None, + incoming: Box::pin(self.watcher.incoming()), } } @@ -188,35 +187,21 @@ impl TcpListener { /// [`TcpListener`]: struct.TcpListener.html /// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html pub struct Incoming<'a> { - listener: &'a TcpListener, - accept: Option< - Pin> + Send + Sync + 'a>>, - >, + incoming: Pin>> + Send + Sync + 'a>>, } impl Stream for Incoming<'_> { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - if self.accept.is_none() { - self.accept = Some(Box::pin(self.listener.accept())); - } - - if let Some(f) = &mut self.accept { - let res = ready!(f.as_mut().poll(cx)); - self.accept = None; - return Poll::Ready(Some(res.map(|(stream, _)| stream))); - } - } + let res = ready!(Pin::new(&mut self.incoming).poll_next(cx)); + Poll::Ready(res.map(|res| res.map(|stream| TcpStream { watcher: Arc::new(stream) }))) } } impl fmt::Debug for Incoming<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Incoming") - .field("listener", self.listener) - .finish() + write!(f, "Incoming {{ ... }}") } } diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index 3732a634..030ac769 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -1,8 +1,8 @@ //! Unix-specific networking extensions. use std::fmt; -use std::future::Future; use std::os::unix::net::UnixListener as StdUnixListener; +use std::os::unix::net::UnixStream as StdUnixStream; use std::pin::Pin; use async_io::Async; @@ -130,8 +130,7 @@ impl UnixListener { #[must_use] pub fn incoming(&self) -> Incoming<'_> { Incoming { - listener: self, - accept: None, + incoming: Box::pin(self.watcher.incoming()), } } @@ -179,34 +178,21 @@ impl fmt::Debug for UnixListener { /// [`incoming`]: struct.UnixListener.html#method.incoming /// [`UnixListener`]: struct.UnixListener.html pub struct Incoming<'a> { - listener: &'a UnixListener, - accept: Option< - Pin> + Send + Sync + 'a>>, - >, + incoming: Pin>> + Send + Sync + 'a>>, } impl Stream for Incoming<'_> { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - if self.accept.is_none() { - self.accept = Some(Box::pin(self.listener.accept())); - } - - if let Some(f) = &mut self.accept { - let res = ready!(f.as_mut().poll(cx)); - self.accept = None; - return Poll::Ready(Some(res.map(|(stream, _)| stream))); - } - } + let res = ready!(Pin::new(&mut self.incoming).poll_next(cx)); + Poll::Ready(res.map(|res| res.map(|stream| UnixStream { watcher: Arc::new(stream) }))) } } impl fmt::Debug for Incoming<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Incoming") - .field("listener", self.listener) .finish() } } diff --git a/src/task/current.rs b/src/task/current.rs index 8a56cc26..b22426b6 100644 --- a/src/task/current.rs +++ b/src/task/current.rs @@ -22,7 +22,28 @@ use crate::task::{Task, TaskLocalsWrapper}; /// # /// # }) /// ``` -#[must_use] pub fn current() -> Task { - TaskLocalsWrapper::get_current(|t| t.task().clone()) - .expect("`task::current()` called outside the context of a task") +#[must_use] +pub fn current() -> Task { + try_current().expect("`task::current()` called outside the context of a task") } + +/// Returns a handle to the current task if called within the context of a task created by [`block_on`], +/// [`spawn`], or [`Builder::spawn`], otherwise returns `None`. +/// +/// [`block_on`]: fn.block_on.html +/// [`spawn`]: fn.spawn.html +/// [`Builder::spawn`]: struct.Builder.html#method.spawn +/// +/// # Examples +/// +/// ``` +/// use async_std::task; +/// +/// match task::try_current() { +/// Some(t) => println!("The name of this task is {:?}", t.name()), +/// None => println!("Not inside a task!"), +/// } +/// ``` +pub fn try_current() -> Option { + TaskLocalsWrapper::get_current(|t| t.task().clone()) +} \ No newline at end of file diff --git a/src/task/mod.rs b/src/task/mod.rs index 440f6ddc..fe574ec6 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -133,7 +133,7 @@ cfg_std! { cfg_default! { pub use block_on::block_on; pub use builder::Builder; - pub use current::current; + pub use current::{current, try_current}; pub use task::Task; pub use task_id::TaskId; pub use join_handle::JoinHandle;