Merge pull request #968 from erickt/opt

Change Incoming impls to only do one allocation
pull/974/head
Florian Gilcher 4 years ago committed by GitHub
commit 2d2722878c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,6 +1,6 @@
use std::fmt; use std::fmt;
use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::net::TcpStream as StdTcpStream;
use std::pin::Pin; use std::pin::Pin;
use async_io::Async; use async_io::Async;
@ -148,8 +148,7 @@ impl TcpListener {
/// ``` /// ```
pub fn incoming(&self) -> Incoming<'_> { pub fn incoming(&self) -> Incoming<'_> {
Incoming { Incoming {
listener: self, incoming: Box::pin(self.watcher.incoming()),
accept: None,
} }
} }
@ -187,35 +186,21 @@ impl TcpListener {
/// [`TcpListener`]: struct.TcpListener.html /// [`TcpListener`]: struct.TcpListener.html
/// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html /// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html
pub struct Incoming<'a> { pub struct Incoming<'a> {
listener: &'a TcpListener, incoming: Pin<Box<dyn Stream<Item = io::Result<Async<StdTcpStream>>> + Send + Sync + 'a>>,
accept: Option<
Pin<Box<dyn Future<Output = io::Result<(TcpStream, SocketAddr)>> + Send + Sync + 'a>>,
>,
} }
impl Stream for Incoming<'_> { impl Stream for Incoming<'_> {
type Item = io::Result<TcpStream>; type Item = io::Result<TcpStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop { let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
if self.accept.is_none() { Poll::Ready(res.map(|res| res.map(|stream| TcpStream { watcher: Arc::new(stream) })))
self.accept = Some(Box::pin(self.listener.accept()));
}
if let Some(f) = &mut self.accept {
let res = ready!(f.as_mut().poll(cx));
self.accept = None;
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
}
}
} }
} }
impl fmt::Debug for Incoming<'_> { impl fmt::Debug for Incoming<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Incoming") write!(f, "Incoming {{ ... }}")
.field("listener", self.listener)
.finish()
} }
} }

@ -1,8 +1,8 @@
//! Unix-specific networking extensions. //! Unix-specific networking extensions.
use std::fmt; use std::fmt;
use std::future::Future;
use std::os::unix::net::UnixListener as StdUnixListener; use std::os::unix::net::UnixListener as StdUnixListener;
use std::os::unix::net::UnixStream as StdUnixStream;
use std::pin::Pin; use std::pin::Pin;
use async_io::Async; use async_io::Async;
@ -129,8 +129,7 @@ impl UnixListener {
/// ``` /// ```
pub fn incoming(&self) -> Incoming<'_> { pub fn incoming(&self) -> Incoming<'_> {
Incoming { Incoming {
listener: self, incoming: Box::pin(self.watcher.incoming()),
accept: None,
} }
} }
@ -178,34 +177,21 @@ impl fmt::Debug for UnixListener {
/// [`incoming`]: struct.UnixListener.html#method.incoming /// [`incoming`]: struct.UnixListener.html#method.incoming
/// [`UnixListener`]: struct.UnixListener.html /// [`UnixListener`]: struct.UnixListener.html
pub struct Incoming<'a> { pub struct Incoming<'a> {
listener: &'a UnixListener, incoming: Pin<Box<dyn Stream<Item = io::Result<Async<StdUnixStream>>> + Send + Sync + 'a>>,
accept: Option<
Pin<Box<dyn Future<Output = io::Result<(UnixStream, SocketAddr)>> + Send + Sync + 'a>>,
>,
} }
impl Stream for Incoming<'_> { impl Stream for Incoming<'_> {
type Item = io::Result<UnixStream>; type Item = io::Result<UnixStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop { let res = ready!(Pin::new(&mut self.incoming).poll_next(cx));
if self.accept.is_none() { Poll::Ready(res.map(|res| res.map(|stream| UnixStream { watcher: Arc::new(stream) })))
self.accept = Some(Box::pin(self.listener.accept()));
}
if let Some(f) = &mut self.accept {
let res = ready!(f.as_mut().poll(cx));
self.accept = None;
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
}
}
} }
} }
impl fmt::Debug for Incoming<'_> { impl fmt::Debug for Incoming<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Incoming") f.debug_struct("Incoming")
.field("listener", self.listener)
.finish() .finish()
} }
} }

Loading…
Cancel
Save