Merge pull request #889 from stjepang/fix-incoming

Store a future inside Incoming
pull/890/head
Friedel Ziegelmayer 4 years ago committed by GitHub
commit c4eb910c21
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,4 @@
use std::fmt;
use std::future::Future; use std::future::Future;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
@ -8,7 +9,7 @@ use crate::io;
use crate::net::{TcpStream, ToSocketAddrs}; use crate::net::{TcpStream, ToSocketAddrs};
use crate::stream::Stream; use crate::stream::Stream;
use crate::sync::Arc; use crate::sync::Arc;
use crate::task::{Context, Poll}; use crate::task::{ready, Context, Poll};
/// A TCP socket server, listening for connections. /// A TCP socket server, listening for connections.
/// ///
@ -146,7 +147,10 @@ impl TcpListener {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn incoming(&self) -> Incoming<'_> { pub fn incoming(&self) -> Incoming<'_> {
Incoming(self) Incoming {
listener: self,
accept: None,
}
} }
/// Returns the local address that this listener is bound to. /// Returns the local address that this listener is bound to.
@ -182,18 +186,36 @@ impl TcpListener {
/// [`incoming`]: struct.TcpListener.html#method.incoming /// [`incoming`]: struct.TcpListener.html#method.incoming
/// [`TcpListener`]: struct.TcpListener.html /// [`TcpListener`]: struct.TcpListener.html
/// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html /// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html
#[derive(Debug)] pub struct Incoming<'a> {
pub struct Incoming<'a>(&'a TcpListener); listener: &'a TcpListener,
accept: Option<
Pin<Box<dyn Future<Output = io::Result<(TcpStream, SocketAddr)>> + Send + Sync + 'a>>,
>,
}
impl<'a> Stream for Incoming<'a> { impl Stream for Incoming<'_> {
type Item = io::Result<TcpStream>; type Item = io::Result<TcpStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let future = self.0.accept(); loop {
pin_utils::pin_mut!(future); if self.accept.is_none() {
self.accept = Some(Box::pin(self.listener.accept()));
}
if let Some(f) = &mut self.accept {
let res = ready!(f.as_mut().poll(cx));
self.accept = None;
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
}
}
}
}
let (socket, _) = futures_core::ready!(future.poll(cx))?; impl fmt::Debug for Incoming<'_> {
Poll::Ready(Some(Ok(socket))) fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Incoming")
.field("listener", self.listener)
.finish()
} }
} }

@ -14,7 +14,7 @@ use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use crate::path::Path; use crate::path::Path;
use crate::stream::Stream; use crate::stream::Stream;
use crate::sync::Arc; use crate::sync::Arc;
use crate::task::{Context, Poll}; use crate::task::{ready, Context, Poll};
/// A Unix domain socket server, listening for connections. /// A Unix domain socket server, listening for connections.
/// ///
@ -128,7 +128,10 @@ impl UnixListener {
/// # Ok(()) }) } /// # Ok(()) }) }
/// ``` /// ```
pub fn incoming(&self) -> Incoming<'_> { pub fn incoming(&self) -> Incoming<'_> {
Incoming(self) Incoming {
listener: self,
accept: None,
}
} }
/// Returns the local socket address of this listener. /// Returns the local socket address of this listener.
@ -174,18 +177,36 @@ impl fmt::Debug for UnixListener {
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None /// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
/// [`incoming`]: struct.UnixListener.html#method.incoming /// [`incoming`]: struct.UnixListener.html#method.incoming
/// [`UnixListener`]: struct.UnixListener.html /// [`UnixListener`]: struct.UnixListener.html
#[derive(Debug)] pub struct Incoming<'a> {
pub struct Incoming<'a>(&'a UnixListener); listener: &'a UnixListener,
accept: Option<
Pin<Box<dyn Future<Output = io::Result<(UnixStream, SocketAddr)>> + Send + Sync + 'a>>,
>,
}
impl Stream for Incoming<'_> { impl Stream for Incoming<'_> {
type Item = io::Result<UnixStream>; type Item = io::Result<UnixStream>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let future = self.0.accept(); loop {
pin_utils::pin_mut!(future); if self.accept.is_none() {
self.accept = Some(Box::pin(self.listener.accept()));
}
let (socket, _) = futures_core::ready!(future.poll(cx))?; if let Some(f) = &mut self.accept {
Poll::Ready(Some(Ok(socket))) let res = ready!(f.as_mut().poll(cx));
self.accept = None;
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
}
}
}
}
impl fmt::Debug for Incoming<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Incoming")
.field("listener", self.listener)
.finish()
} }
} }

Loading…
Cancel
Save