From c3e38150e4aa3f885f60f9d2d2cdaebffdcc2d9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milo=C5=A1=20Vu=C4=8Denovi=C4=87?= Date: Mon, 7 Oct 2019 16:49:42 +0200 Subject: [PATCH] Fix uds listener hanging on accept (#272) * Fix uds listener hanging on accept UDS listener was hanging because the accept method would return `Poll::Pending` without registering the task to be awoken in the case when underlying unix listener returns a WouldBlock that gets converted to None. This is a hacky fix for this case. Should fix #248 * Test simulating uds ping-pong server/client This one should reproduce #248 bug to prevent further regressions. * Code review fixes --- src/os/unix/net/listener.rs | 13 +++++++-- tests/uds.rs | 57 ++++++++++++++++++++++++++++++++++++- 2 files changed, 66 insertions(+), 4 deletions(-) diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index ed4f1f4..eba2fad 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -93,11 +93,16 @@ impl UnixListener { /// ``` pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { future::poll_fn(|cx| { - let res = - futures_core::ready!(self.watcher.poll_read_with(cx, |inner| inner.accept_std())); + let res = futures_core::ready!(self.watcher.poll_read_with(cx, |inner| { + match inner.accept_std() { + // Converting to `WouldBlock` so that the watcher will + // add the waker of this task to a list of readers. + Ok(None) => Err(io::ErrorKind::WouldBlock.into()), + res => res, + } + })); match res? { - None => Poll::Pending, Some((io, addr)) => { let mio_stream = mio_uds::UnixStream::from_stream(io)?; let stream = UnixStream { @@ -105,6 +110,8 @@ impl UnixListener { }; Poll::Ready(Ok((stream, addr))) } + // This should never happen since `None` is converted to `WouldBlock` + None => unreachable!(), } }) .await diff --git a/tests/uds.rs b/tests/uds.rs index e64af3c..3ab4d6b 100644 --- a/tests/uds.rs +++ b/tests/uds.rs @@ -1,9 +1,14 @@ #![cfg(unix)] use async_std::io; -use async_std::os::unix::net::UnixDatagram; +use async_std::os::unix::net::{UnixDatagram, UnixListener, UnixStream}; +use async_std::prelude::*; use async_std::task; +use tempdir::TempDir; + +use std::time::Duration; + const JULIUS_CAESAR: &[u8] = b" Friends, Romans, countrymen - lend me your ears! I come not to praise Caesar, but to bury him. @@ -39,3 +44,53 @@ fn into_raw_fd() -> io::Result<()> { Ok(()) }) } + +const PING: &[u8] = b"ping"; +const PONG: &[u8] = b"pong"; +const TEST_TIMEOUT: Duration = Duration::from_secs(3); + +#[test] +fn socket_ping_pong() { + let tmp_dir = TempDir::new("socket_ping_pong").expect("Temp dir not created"); + let sock_path = tmp_dir.as_ref().join("sock"); + let iter_cnt = 16; + + let listener = + task::block_on(async { UnixListener::bind(&sock_path).await.expect("Socket bind") }); + + let server_handle = std::thread::spawn(move || { + task::block_on(async { ping_pong_server(listener, iter_cnt).await }).unwrap() + }); + + let client_handle = std::thread::spawn(move || { + task::block_on(async { ping_pong_client(&sock_path, iter_cnt).await }).unwrap() + }); + + client_handle.join().unwrap(); + server_handle.join().unwrap(); +} + +async fn ping_pong_server(listener: UnixListener, iterations: u32) -> std::io::Result<()> { + let mut incoming = listener.incoming(); + let mut buf = [0; 1024]; + for _ix in 0..iterations { + if let Some(s) = incoming.next().await { + let mut s = s?; + let n = s.read(&mut buf[..]).await?; + assert_eq!(&buf[..n], PING); + s.write_all(&PONG).await?; + } + } + Ok(()) +} + +async fn ping_pong_client(socket: &std::path::PathBuf, iterations: u32) -> std::io::Result<()> { + let mut buf = [0; 1024]; + for _ix in 0..iterations { + let mut socket = UnixStream::connect(&socket).await?; + socket.write_all(&PING).await?; + let n = async_std::io::timeout(TEST_TIMEOUT, socket.read(&mut buf[..])).await?; + assert_eq!(&buf[..n], PONG); + } + Ok(()) +}