From c4e181cfe1d9c86c25a439cde9433b36aa8fe463 Mon Sep 17 00:00:00 2001 From: Erick Tryzelaar Date: Wed, 31 Mar 2021 11:20:29 -0700 Subject: [PATCH] Change Incoming impls to only do one allocation This modifies net::tcp::Incoming and os::net::unix::Incoming to only do one allocation, rather than an allocation for each connection. --- src/net/tcp/listener.rs | 27 ++++++--------------------- src/os/unix/net/listener.rs | 24 +++++------------------- 2 files changed, 11 insertions(+), 40 deletions(-) diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index cfefc7d2..0825cd92 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,6 +1,6 @@ use std::fmt; -use std::future::Future; use std::net::SocketAddr; +use std::net::TcpStream as StdTcpStream; use std::pin::Pin; use async_io::Async; @@ -148,8 +148,7 @@ impl TcpListener { /// ``` pub fn incoming(&self) -> Incoming<'_> { Incoming { - listener: self, - accept: None, + incoming: Box::pin(self.watcher.incoming()), } } @@ -187,35 +186,21 @@ impl TcpListener { /// [`TcpListener`]: struct.TcpListener.html /// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html pub struct Incoming<'a> { - listener: &'a TcpListener, - accept: Option< - Pin> + Send + Sync + 'a>>, - >, + incoming: Pin>> + Send + Sync + 'a>>, } impl Stream for Incoming<'_> { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - if self.accept.is_none() { - self.accept = Some(Box::pin(self.listener.accept())); - } - - if let Some(f) = &mut self.accept { - let res = ready!(f.as_mut().poll(cx)); - self.accept = None; - return Poll::Ready(Some(res.map(|(stream, _)| stream))); - } - } + let res = ready!(Pin::new(&mut self.incoming).poll_next(cx)); + Poll::Ready(res.map(|res| res.map(|stream| TcpStream { watcher: Arc::new(stream) }))) } } impl fmt::Debug for Incoming<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Incoming") - .field("listener", self.listener) - .finish() + write!(f, "Incoming {{ ... }}") } } diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index 3573d7d3..e86502b5 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -1,8 +1,8 @@ //! Unix-specific networking extensions. use std::fmt; -use std::future::Future; use std::os::unix::net::UnixListener as StdUnixListener; +use std::os::unix::net::UnixStream as StdUnixStream; use std::pin::Pin; use async_io::Async; @@ -129,8 +129,7 @@ impl UnixListener { /// ``` pub fn incoming(&self) -> Incoming<'_> { Incoming { - listener: self, - accept: None, + incoming: Box::pin(self.watcher.incoming()), } } @@ -178,34 +177,21 @@ impl fmt::Debug for UnixListener { /// [`incoming`]: struct.UnixListener.html#method.incoming /// [`UnixListener`]: struct.UnixListener.html pub struct Incoming<'a> { - listener: &'a UnixListener, - accept: Option< - Pin> + Send + Sync + 'a>>, - >, + incoming: Pin>> + Send + Sync + 'a>>, } impl Stream for Incoming<'_> { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - if self.accept.is_none() { - self.accept = Some(Box::pin(self.listener.accept())); - } - - if let Some(f) = &mut self.accept { - let res = ready!(f.as_mut().poll(cx)); - self.accept = None; - return Poll::Ready(Some(res.map(|(stream, _)| stream))); - } - } + let res = ready!(Pin::new(&mut self.incoming).poll_next(cx)); + Poll::Ready(res.map(|res| res.map(|stream| UnixStream { watcher: Arc::new(stream) }))) } } impl fmt::Debug for Incoming<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Incoming") - .field("listener", self.listener) .finish() } }