From 374f0c9eb88637cbc8b70bced57df607c2857ea3 Mon Sep 17 00:00:00 2001 From: Roman Proskuryakov Date: Wed, 28 Aug 2019 23:09:15 +0300 Subject: [PATCH] Refactor TcpStream::connect into resolving loop and TcpStream::connect_to (#119) --- src/net/tcp/stream.rs | 107 ++++++++++++++++++++++-------------------- 1 file changed, 55 insertions(+), 52 deletions(-) diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 6687a1b..d823317 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -78,61 +78,10 @@ impl TcpStream { /// # Ok(()) }) } /// ``` pub async fn connect(addrs: A) -> io::Result { - enum State { - Waiting(TcpStream), - Error(io::Error), - Done, - } - let mut last_err = None; for addr in addrs.to_socket_addrs()? { - let mut state = { - match mio::net::TcpStream::connect(&addr) { - Ok(mio_stream) => { - #[cfg(unix)] - let stream = TcpStream { - raw_fd: mio_stream.as_raw_fd(), - io_handle: IoHandle::new(mio_stream), - }; - - #[cfg(windows)] - let stream = TcpStream { - // raw_socket: mio_stream.as_raw_socket(), - io_handle: IoHandle::new(mio_stream), - }; - - State::Waiting(stream) - } - Err(err) => State::Error(err), - } - }; - - let res = future::poll_fn(|cx| { - match mem::replace(&mut state, State::Done) { - State::Waiting(stream) => { - // Once we've connected, wait for the stream to be writable as that's when - // the actual connection has been initiated. Once we're writable we check - // for `take_socket_error` to see if the connect actually hit an error or - // not. - // - // If all that succeeded then we ship everything on up. - if let Poll::Pending = stream.io_handle.poll_writable(cx)? { - state = State::Waiting(stream); - return Poll::Pending; - } - - if let Some(err) = stream.io_handle.get_ref().take_error()? { - return Poll::Ready(Err(err)); - } - - Poll::Ready(Ok(stream)) - } - State::Error(err) => Poll::Ready(Err(err)), - State::Done => panic!("`TcpStream::connect()` future polled after completion"), - } - }) - .await; + let res = Self::connect_to(addr).await; match res { Ok(stream) => return Ok(stream), @@ -148,6 +97,60 @@ impl TcpStream { })) } + /// Creates a new TCP stream connected to the specified address. + async fn connect_to(addr: SocketAddr) -> io::Result { + let stream = mio::net::TcpStream::connect(&addr).map(|mio_stream| { + #[cfg(unix)] + let stream = TcpStream { + raw_fd: mio_stream.as_raw_fd(), + io_handle: IoHandle::new(mio_stream), + }; + + #[cfg(windows)] + let stream = TcpStream { + // raw_socket: mio_stream.as_raw_socket(), + io_handle: IoHandle::new(mio_stream), + }; + + stream + }); + + enum State { + Waiting(TcpStream), + Error(io::Error), + Done, + } + let mut state = match stream { + Ok(stream) => State::Waiting(stream), + Err(err) => State::Error(err), + }; + future::poll_fn(|cx| { + match mem::replace(&mut state, State::Done) { + State::Waiting(stream) => { + // Once we've connected, wait for the stream to be writable as that's when + // the actual connection has been initiated. Once we're writable we check + // for `take_socket_error` to see if the connect actually hit an error or + // not. + // + // If all that succeeded then we ship everything on up. + if let Poll::Pending = stream.io_handle.poll_writable(cx)? { + state = State::Waiting(stream); + return Poll::Pending; + } + + if let Some(err) = stream.io_handle.get_ref().take_error()? { + return Poll::Ready(Err(err)); + } + + Poll::Ready(Ok(stream)) + } + State::Error(err) => Poll::Ready(Err(err)), + State::Done => panic!("`TcpStream::connect_to()` future polled after completion"), + } + }) + .await + } + /// Returns the local address that this stream is connected to. /// /// ## Examples