mirror of
https://github.com/async-rs/async-std.git
synced 2025-04-29 11:46:52 +00:00
Merge pull request #819 from async-rs/fix-sockets
This commit is contained in:
commit
5f418f07ac
14 changed files with 63 additions and 18 deletions
|
@ -5,7 +5,7 @@ use std::time::Duration;
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
use crate::utils::Timer;
|
use crate::utils::{timer_after, Timer};
|
||||||
|
|
||||||
pin_project! {
|
pin_project! {
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
@ -20,7 +20,7 @@ pin_project! {
|
||||||
|
|
||||||
impl<F> DelayFuture<F> {
|
impl<F> DelayFuture<F> {
|
||||||
pub fn new(future: F, dur: Duration) -> DelayFuture<F> {
|
pub fn new(future: F, dur: Duration) -> DelayFuture<F> {
|
||||||
let delay = Timer::after(dur);
|
let delay = timer_after(dur);
|
||||||
|
|
||||||
DelayFuture { future, delay }
|
DelayFuture { future, delay }
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,7 @@ use std::time::Duration;
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
use crate::utils::Timer;
|
use crate::utils::{timer_after, Timer};
|
||||||
|
|
||||||
/// Awaits a future or times out after a duration of time.
|
/// Awaits a future or times out after a duration of time.
|
||||||
///
|
///
|
||||||
|
@ -51,7 +51,7 @@ impl<F> TimeoutFuture<F> {
|
||||||
pub(super) fn new(future: F, dur: Duration) -> TimeoutFuture<F> {
|
pub(super) fn new(future: F, dur: Duration) -> TimeoutFuture<F> {
|
||||||
TimeoutFuture {
|
TimeoutFuture {
|
||||||
future,
|
future,
|
||||||
delay: Timer::after(dur),
|
delay: timer_after(dur),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ use std::time::Duration;
|
||||||
use pin_project_lite::pin_project;
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use crate::io;
|
use crate::io;
|
||||||
use crate::utils::Timer;
|
use crate::utils::{timer_after, Timer};
|
||||||
|
|
||||||
/// Awaits an I/O future or times out after a duration of time.
|
/// Awaits an I/O future or times out after a duration of time.
|
||||||
///
|
///
|
||||||
|
@ -37,7 +37,7 @@ where
|
||||||
F: Future<Output = io::Result<T>>,
|
F: Future<Output = io::Result<T>>,
|
||||||
{
|
{
|
||||||
Timeout {
|
Timeout {
|
||||||
timeout: Timer::after(dur),
|
timeout: timer_after(dur),
|
||||||
future: f,
|
future: f,
|
||||||
}
|
}
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -75,6 +75,8 @@ impl TcpListener {
|
||||||
///
|
///
|
||||||
/// [`local_addr`]: #method.local_addr
|
/// [`local_addr`]: #method.local_addr
|
||||||
pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
|
pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpListener> {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
let mut last_err = None;
|
let mut last_err = None;
|
||||||
let addrs = addrs.to_socket_addrs().await?;
|
let addrs = addrs.to_socket_addrs().await?;
|
||||||
|
|
||||||
|
@ -200,6 +202,8 @@ impl<'a> Stream for Incoming<'a> {
|
||||||
impl From<std::net::TcpListener> for TcpListener {
|
impl From<std::net::TcpListener> for TcpListener {
|
||||||
/// Converts a `std::net::TcpListener` into its asynchronous equivalent.
|
/// Converts a `std::net::TcpListener` into its asynchronous equivalent.
|
||||||
fn from(listener: std::net::TcpListener) -> TcpListener {
|
fn from(listener: std::net::TcpListener) -> TcpListener {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
TcpListener {
|
TcpListener {
|
||||||
watcher: Async::new(listener).expect("TcpListener is known to be good"),
|
watcher: Async::new(listener).expect("TcpListener is known to be good"),
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,6 +71,8 @@ impl TcpStream {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn connect<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpStream> {
|
pub async fn connect<A: ToSocketAddrs>(addrs: A) -> io::Result<TcpStream> {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
let mut last_err = None;
|
let mut last_err = None;
|
||||||
let addrs = addrs.to_socket_addrs().await?;
|
let addrs = addrs.to_socket_addrs().await?;
|
||||||
|
|
||||||
|
@ -356,6 +358,8 @@ impl Write for &TcpStream {
|
||||||
impl From<std::net::TcpStream> for TcpStream {
|
impl From<std::net::TcpStream> for TcpStream {
|
||||||
/// Converts a `std::net::TcpStream` into its asynchronous equivalent.
|
/// Converts a `std::net::TcpStream` into its asynchronous equivalent.
|
||||||
fn from(stream: std::net::TcpStream) -> TcpStream {
|
fn from(stream: std::net::TcpStream) -> TcpStream {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
TcpStream {
|
TcpStream {
|
||||||
watcher: Arc::new(Async::new(stream).expect("TcpStream is known to be good")),
|
watcher: Arc::new(Async::new(stream).expect("TcpStream is known to be good")),
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,6 +68,8 @@ impl UdpSocket {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<UdpSocket> {
|
pub async fn bind<A: ToSocketAddrs>(addrs: A) -> io::Result<UdpSocket> {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
let mut last_err = None;
|
let mut last_err = None;
|
||||||
let addrs = addrs.to_socket_addrs().await?;
|
let addrs = addrs.to_socket_addrs().await?;
|
||||||
|
|
||||||
|
@ -479,6 +481,8 @@ impl UdpSocket {
|
||||||
impl From<std::net::UdpSocket> for UdpSocket {
|
impl From<std::net::UdpSocket> for UdpSocket {
|
||||||
/// Converts a `std::net::UdpSocket` into its asynchronous equivalent.
|
/// Converts a `std::net::UdpSocket` into its asynchronous equivalent.
|
||||||
fn from(socket: std::net::UdpSocket) -> UdpSocket {
|
fn from(socket: std::net::UdpSocket) -> UdpSocket {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
UdpSocket {
|
UdpSocket {
|
||||||
watcher: Async::new(socket).expect("UdpSocket is known to be good"),
|
watcher: Async::new(socket).expect("UdpSocket is known to be good"),
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,6 +45,8 @@ pub struct UnixDatagram {
|
||||||
|
|
||||||
impl UnixDatagram {
|
impl UnixDatagram {
|
||||||
fn new(socket: StdUnixDatagram) -> UnixDatagram {
|
fn new(socket: StdUnixDatagram) -> UnixDatagram {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
UnixDatagram {
|
UnixDatagram {
|
||||||
watcher: Async::new(socket).expect("UnixDatagram is known to be good"),
|
watcher: Async::new(socket).expect("UnixDatagram is known to be good"),
|
||||||
}
|
}
|
||||||
|
@ -64,6 +66,8 @@ impl UnixDatagram {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
|
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixDatagram> {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
let path = path.as_ref().to_owned();
|
let path = path.as_ref().to_owned();
|
||||||
let socket = Async::<StdUnixDatagram>::bind(path)?;
|
let socket = Async::<StdUnixDatagram>::bind(path)?;
|
||||||
Ok(UnixDatagram { watcher: socket })
|
Ok(UnixDatagram { watcher: socket })
|
||||||
|
@ -305,6 +309,8 @@ impl fmt::Debug for UnixDatagram {
|
||||||
impl From<StdUnixDatagram> for UnixDatagram {
|
impl From<StdUnixDatagram> for UnixDatagram {
|
||||||
/// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent.
|
/// Converts a `std::os::unix::net::UnixDatagram` into its asynchronous equivalent.
|
||||||
fn from(datagram: StdUnixDatagram) -> UnixDatagram {
|
fn from(datagram: StdUnixDatagram) -> UnixDatagram {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
UnixDatagram {
|
UnixDatagram {
|
||||||
watcher: Async::new(datagram).expect("UnixDatagram is known to be good"),
|
watcher: Async::new(datagram).expect("UnixDatagram is known to be good"),
|
||||||
}
|
}
|
||||||
|
@ -319,6 +325,8 @@ impl AsRawFd for UnixDatagram {
|
||||||
|
|
||||||
impl FromRawFd for UnixDatagram {
|
impl FromRawFd for UnixDatagram {
|
||||||
unsafe fn from_raw_fd(fd: RawFd) -> UnixDatagram {
|
unsafe fn from_raw_fd(fd: RawFd) -> UnixDatagram {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
let raw = StdUnixDatagram::from_raw_fd(fd);
|
let raw = StdUnixDatagram::from_raw_fd(fd);
|
||||||
let datagram = Async::<StdUnixDatagram>::new(raw).expect("invalid file descriptor");
|
let datagram = Async::<StdUnixDatagram>::new(raw).expect("invalid file descriptor");
|
||||||
UnixDatagram { watcher: datagram }
|
UnixDatagram { watcher: datagram }
|
||||||
|
|
|
@ -68,6 +68,8 @@ impl UnixListener {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
|
pub async fn bind<P: AsRef<Path>>(path: P) -> io::Result<UnixListener> {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
let path = path.as_ref().to_owned();
|
let path = path.as_ref().to_owned();
|
||||||
let listener = Async::<StdUnixListener>::bind(path)?;
|
let listener = Async::<StdUnixListener>::bind(path)?;
|
||||||
|
|
||||||
|
@ -93,7 +95,12 @@ impl UnixListener {
|
||||||
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
|
pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> {
|
||||||
let (stream, addr) = self.watcher.accept().await?;
|
let (stream, addr) = self.watcher.accept().await?;
|
||||||
|
|
||||||
Ok((UnixStream { watcher: Arc::new(stream) }, addr))
|
Ok((
|
||||||
|
UnixStream {
|
||||||
|
watcher: Arc::new(stream),
|
||||||
|
},
|
||||||
|
addr,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a stream of incoming connections.
|
/// Returns a stream of incoming connections.
|
||||||
|
@ -187,6 +194,8 @@ impl Stream for Incoming<'_> {
|
||||||
impl From<StdUnixListener> for UnixListener {
|
impl From<StdUnixListener> for UnixListener {
|
||||||
/// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent.
|
/// Converts a `std::os::unix::net::UnixListener` into its asynchronous equivalent.
|
||||||
fn from(listener: StdUnixListener) -> UnixListener {
|
fn from(listener: StdUnixListener) -> UnixListener {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
UnixListener {
|
UnixListener {
|
||||||
watcher: Async::new(listener).expect("UnixListener is known to be good"),
|
watcher: Async::new(listener).expect("UnixListener is known to be good"),
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,8 @@ impl UnixStream {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
|
pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
let path = path.as_ref().to_owned();
|
let path = path.as_ref().to_owned();
|
||||||
let stream = Arc::new(Async::<StdUnixStream>::connect(path).await?);
|
let stream = Arc::new(Async::<StdUnixStream>::connect(path).await?);
|
||||||
|
|
||||||
|
@ -79,6 +81,8 @@ impl UnixStream {
|
||||||
/// # Ok(()) }) }
|
/// # Ok(()) }) }
|
||||||
/// ```
|
/// ```
|
||||||
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
|
pub fn pair() -> io::Result<(UnixStream, UnixStream)> {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
let (a, b) = Async::<StdUnixStream>::pair()?;
|
let (a, b) = Async::<StdUnixStream>::pair()?;
|
||||||
let a = UnixStream {
|
let a = UnixStream {
|
||||||
watcher: Arc::new(a),
|
watcher: Arc::new(a),
|
||||||
|
@ -224,8 +228,12 @@ impl fmt::Debug for UnixStream {
|
||||||
impl From<StdUnixStream> for UnixStream {
|
impl From<StdUnixStream> for UnixStream {
|
||||||
/// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent.
|
/// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent.
|
||||||
fn from(stream: StdUnixStream) -> UnixStream {
|
fn from(stream: StdUnixStream) -> UnixStream {
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
let stream = Async::new(stream).expect("UnixStream is known to be good");
|
let stream = Async::new(stream).expect("UnixStream is known to be good");
|
||||||
UnixStream { watcher: Arc::new(stream) }
|
UnixStream {
|
||||||
|
watcher: Arc::new(stream),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::task::{Context, Poll};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::stream::Stream;
|
use crate::stream::Stream;
|
||||||
use crate::utils::Timer;
|
use crate::utils::{timer_after, Timer};
|
||||||
|
|
||||||
/// Creates a new stream that yields at a set interval.
|
/// Creates a new stream that yields at a set interval.
|
||||||
///
|
///
|
||||||
|
@ -45,7 +45,7 @@ use crate::utils::Timer;
|
||||||
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
|
||||||
pub fn interval(dur: Duration) -> Interval {
|
pub fn interval(dur: Duration) -> Interval {
|
||||||
Interval {
|
Interval {
|
||||||
delay: Timer::after(dur),
|
delay: timer_after(dur),
|
||||||
interval: dur,
|
interval: dur,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -72,7 +72,7 @@ impl Stream for Interval {
|
||||||
return Poll::Pending;
|
return Poll::Pending;
|
||||||
}
|
}
|
||||||
let interval = self.interval;
|
let interval = self.interval;
|
||||||
let _ = std::mem::replace(&mut self.delay, Timer::after(interval));
|
let _ = std::mem::replace(&mut self.delay, timer_after(interval));
|
||||||
Poll::Ready(Some(()))
|
Poll::Ready(Some(()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use crate::stream::Stream;
|
use crate::stream::Stream;
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
use crate::utils::Timer;
|
use crate::utils::{timer_after, Timer};
|
||||||
|
|
||||||
pin_project! {
|
pin_project! {
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
@ -24,7 +24,7 @@ impl<S> Delay<S> {
|
||||||
pub(super) fn new(stream: S, dur: Duration) -> Self {
|
pub(super) fn new(stream: S, dur: Duration) -> Self {
|
||||||
Delay {
|
Delay {
|
||||||
stream,
|
stream,
|
||||||
delay: Timer::after(dur),
|
delay: timer_after(dur),
|
||||||
delay_done: false,
|
delay_done: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,7 @@ use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use crate::stream::Stream;
|
use crate::stream::Stream;
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
use crate::utils::Timer;
|
use crate::utils::{timer_after, Timer};
|
||||||
|
|
||||||
pin_project! {
|
pin_project! {
|
||||||
/// A stream that only yields one element once every `duration`.
|
/// A stream that only yields one element once every `duration`.
|
||||||
|
@ -35,7 +35,7 @@ impl<S: Stream> Throttle<S> {
|
||||||
stream,
|
stream,
|
||||||
duration,
|
duration,
|
||||||
blocked: false,
|
blocked: false,
|
||||||
delay: Timer::after(Duration::default()),
|
delay: timer_after(Duration::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,7 +59,7 @@ impl<S: Stream> Stream for Throttle<S> {
|
||||||
Poll::Ready(None) => Poll::Ready(None),
|
Poll::Ready(None) => Poll::Ready(None),
|
||||||
Poll::Ready(Some(v)) => {
|
Poll::Ready(Some(v)) => {
|
||||||
*this.blocked = true;
|
*this.blocked = true;
|
||||||
let _ = std::mem::replace(&mut *this.delay, Timer::after(*this.duration));
|
let _ = std::mem::replace(&mut *this.delay, timer_after(*this.duration));
|
||||||
Poll::Ready(Some(v))
|
Poll::Ready(Some(v))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,7 +8,7 @@ use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use crate::stream::Stream;
|
use crate::stream::Stream;
|
||||||
use crate::task::{Context, Poll};
|
use crate::task::{Context, Poll};
|
||||||
use crate::utils::Timer;
|
use crate::utils::{timer_after, Timer};
|
||||||
|
|
||||||
pin_project! {
|
pin_project! {
|
||||||
/// A stream with timeout time set
|
/// A stream with timeout time set
|
||||||
|
@ -23,7 +23,7 @@ pin_project! {
|
||||||
|
|
||||||
impl<S: Stream> Timeout<S> {
|
impl<S: Stream> Timeout<S> {
|
||||||
pub(crate) fn new(stream: S, dur: Duration) -> Self {
|
pub(crate) fn new(stream: S, dur: Duration) -> Self {
|
||||||
let delay = Timer::after(dur);
|
let delay = timer_after(dur);
|
||||||
|
|
||||||
Self { stream, delay }
|
Self { stream, delay }
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,6 +64,14 @@ mod timer {
|
||||||
pub type Timer = smol::Timer;
|
pub type Timer = smol::Timer;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(any(feature = "unstable", feature = "default"))]
|
||||||
|
pub(crate) fn timer_after(dur: std::time::Duration) -> timer::Timer {
|
||||||
|
#[cfg(not(target_os = "unknown"))]
|
||||||
|
once_cell::sync::Lazy::force(&crate::rt::RUNTIME);
|
||||||
|
|
||||||
|
Timer::after(dur)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(any(
|
#[cfg(any(
|
||||||
all(target_arch = "wasm32", feature = "default"),
|
all(target_arch = "wasm32", feature = "default"),
|
||||||
all(feature = "unstable", not(feature = "default"))
|
all(feature = "unstable", not(feature = "default"))
|
||||||
|
|
Loading…
Reference in a new issue