Use non-blocking connect for TcpStream. (#687)

* Use non-blocking connect for TcpStream.

Instead of spawning a background thread which is unaware of any timeouts
but continues to run until the TCP stack decides that the remote is not
reachable we use mio's non-blocking connect.

mio's `TcpStream::connect` returns immediately but the actual connection
is usually just in progress and we have to be sure the socket is
writeable before we can consider the connection as established.

* Add Watcher::{poll_read_ready, poll_write_ready}.

Following a suggestion of @stjepang we offer methods to check for
read/write readiness of a `Watcher` instead of the previous approach to
accept a set of `Waker`s when registering an event source. The changes
relative to master are smaller and both methods look more useful in
other contexts. Also the code is more robust w.r.t. wakeups of the
`Waker` from clones outside the `Reactor`.

I am not sure if we need to add protection mechanisms against spurious
wakeups from mio. Currently we treat the `Poll::Ready(())` of
`Watcher::poll_write_ready` as proof that the non-blocking connect has
finished, but if the event from mio was a spurious one, it might still
be ongoing.
split-by-pattern
Toralf Wittner 5 years ago committed by Stjepan Glavina
parent 57f9fb7e93
commit 57974ae0b7

@ -16,10 +16,30 @@ struct Entry {
token: mio::Token, token: mio::Token,
/// Tasks that are blocked on reading from this I/O handle. /// Tasks that are blocked on reading from this I/O handle.
readers: Mutex<Vec<Waker>>, readers: Mutex<Readers>,
/// Thasks that are blocked on writing to this I/O handle. /// Thasks that are blocked on writing to this I/O handle.
writers: Mutex<Vec<Waker>>, writers: Mutex<Writers>,
}
/// The set of `Waker`s interested in read readiness.
#[derive(Debug)]
struct Readers {
/// Flag indicating read readiness.
/// (cf. `Watcher::poll_read_ready`)
ready: bool,
/// The `Waker`s blocked on reading.
wakers: Vec<Waker>
}
/// The set of `Waker`s interested in write readiness.
#[derive(Debug)]
struct Writers {
/// Flag indicating write readiness.
/// (cf. `Watcher::poll_write_ready`)
ready: bool,
/// The `Waker`s blocked on writing.
wakers: Vec<Waker>
} }
/// The state of a networking driver. /// The state of a networking driver.
@ -68,8 +88,8 @@ impl Reactor {
// Allocate an entry and insert it into the slab. // Allocate an entry and insert it into the slab.
let entry = Arc::new(Entry { let entry = Arc::new(Entry {
token, token,
readers: Mutex::new(Vec::new()), readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }),
writers: Mutex::new(Vec::new()), writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }),
}); });
vacant.insert(entry.clone()); vacant.insert(entry.clone());
@ -144,14 +164,18 @@ fn main_loop() -> io::Result<()> {
// Wake up reader tasks blocked on this I/O handle. // Wake up reader tasks blocked on this I/O handle.
if !(readiness & reader_interests()).is_empty() { if !(readiness & reader_interests()).is_empty() {
for w in entry.readers.lock().unwrap().drain(..) { let mut readers = entry.readers.lock().unwrap();
readers.ready = true;
for w in readers.wakers.drain(..) {
w.wake(); w.wake();
} }
} }
// Wake up writer tasks blocked on this I/O handle. // Wake up writer tasks blocked on this I/O handle.
if !(readiness & writer_interests()).is_empty() { if !(readiness & writer_interests()).is_empty() {
for w in entry.writers.lock().unwrap().drain(..) { let mut writers = entry.writers.lock().unwrap();
writers.ready = true;
for w in writers.wakers.drain(..) {
w.wake(); w.wake();
} }
} }
@ -207,7 +231,7 @@ impl<T: Evented> Watcher<T> {
} }
// Lock the waker list. // Lock the waker list.
let mut list = self.entry.readers.lock().unwrap(); let mut readers = self.entry.readers.lock().unwrap();
// Try running the operation again. // Try running the operation again.
match f(self.source.as_ref().unwrap()) { match f(self.source.as_ref().unwrap()) {
@ -216,10 +240,12 @@ impl<T: Evented> Watcher<T> {
} }
// Register the task if it isn't registered already. // Register the task if it isn't registered already.
if list.iter().all(|w| !w.will_wake(cx.waker())) { if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone()); readers.wakers.push(cx.waker().clone());
} }
readers.ready = false;
Poll::Pending Poll::Pending
} }
@ -242,7 +268,7 @@ impl<T: Evented> Watcher<T> {
} }
// Lock the waker list. // Lock the waker list.
let mut list = self.entry.writers.lock().unwrap(); let mut writers = self.entry.writers.lock().unwrap();
// Try running the operation again. // Try running the operation again.
match f(self.source.as_ref().unwrap()) { match f(self.source.as_ref().unwrap()) {
@ -251,10 +277,49 @@ impl<T: Evented> Watcher<T> {
} }
// Register the task if it isn't registered already. // Register the task if it isn't registered already.
if list.iter().all(|w| !w.will_wake(cx.waker())) { if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
list.push(cx.waker().clone()); writers.wakers.push(cx.waker().clone());
} }
writers.ready = false;
Poll::Pending
}
/// Polls the inner I/O source until a non-blocking read can be performed.
///
/// If non-blocking reads are currently not possible, the `Waker`
/// will be saved and notified when it can read non-blocking
/// again.
#[allow(dead_code)]
pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
// Lock the waker list.
let mut readers = self.entry.readers.lock().unwrap();
if readers.ready {
return Poll::Ready(())
}
// Register the task if it isn't registered already.
if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
readers.wakers.push(cx.waker().clone());
}
Poll::Pending
}
/// Polls the inner I/O source until a non-blocking write can be performed.
///
/// If non-blocking writes are currently not possible, the `Waker`
/// will be saved and notified when it can write non-blocking
/// again.
pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<()> {
// Lock the waker list.
let mut writers = self.entry.writers.lock().unwrap();
if writers.ready {
return Poll::Ready(())
}
// Register the task if it isn't registered already.
if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) {
writers.wakers.push(cx.waker().clone());
}
Poll::Pending Poll::Pending
} }

@ -6,8 +6,7 @@ use crate::future;
use crate::io::{self, Read, Write}; use crate::io::{self, Read, Write};
use crate::net::driver::Watcher; use crate::net::driver::Watcher;
use crate::net::ToSocketAddrs; use crate::net::ToSocketAddrs;
use crate::task::{spawn_blocking, Context, Poll}; use crate::task::{Context, Poll};
use crate::utils::Context as _;
/// A TCP stream between a local and a remote socket. /// A TCP stream between a local and a remote socket.
/// ///
@ -77,20 +76,24 @@ impl TcpStream {
.await?; .await?;
for addr in addrs { for addr in addrs {
let res = spawn_blocking(move || { // mio's TcpStream::connect is non-blocking and may just be in progress
let std_stream = std::net::TcpStream::connect(addr) // when it returns with `Ok`. We therefore wait for write readiness to
.context(|| format!("could not connect to {}", addr))?; // be sure the connection has either been established or there was an
let mio_stream = mio::net::TcpStream::from_stream(std_stream) // error which we check for afterwards.
.context(|| format!("could not open async connection to {}", addr))?; let watcher = match mio::net::TcpStream::connect(&addr) {
Ok(TcpStream { Ok(s) => Watcher::new(s),
watcher: Watcher::new(mio_stream), Err(e) => {
}) last_err = Some(e);
}) continue
.await; }
};
match res { future::poll_fn(|cx| watcher.poll_write_ready(cx)).await;
Ok(stream) => return Ok(stream),
Err(err) => last_err = Some(err), match watcher.get_ref().take_error() {
Ok(None) => return Ok(TcpStream { watcher }),
Ok(Some(e)) => last_err = Some(e),
Err(e) => last_err = Some(e)
} }
} }

Loading…
Cancel
Save