From 1d875836a2302681a395ee44512a518f0222da4a Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Tue, 28 Jan 2020 18:14:16 +0100 Subject: [PATCH] Implement Clone for TcpStream (#689) * Implement Clone for TcpStream * Update examples * Remove accidentally added examples --- examples/tcp-echo.rs | 5 +++-- examples/tcp-ipv4-and-6-echo.rs | 5 +++-- src/net/tcp/listener.rs | 7 +++---- src/net/tcp/stream.rs | 26 ++++++++++++++++---------- tests/tcp.rs | 22 ++++++++++++++++++++++ 5 files changed, 47 insertions(+), 18 deletions(-) diff --git a/examples/tcp-echo.rs b/examples/tcp-echo.rs index 7c50be0..c04f077 100644 --- a/examples/tcp-echo.rs +++ b/examples/tcp-echo.rs @@ -14,8 +14,9 @@ use async_std::task; async fn process(stream: TcpStream) -> io::Result<()> { println!("Accepted from: {}", stream.peer_addr()?); - let (reader, writer) = &mut (&stream, &stream); - io::copy(reader, writer).await?; + let mut reader = stream.clone(); + let mut writer = stream; + io::copy(&mut reader, &mut writer).await?; Ok(()) } diff --git a/examples/tcp-ipv4-and-6-echo.rs b/examples/tcp-ipv4-and-6-echo.rs index aef5e15..a00e1f3 100644 --- a/examples/tcp-ipv4-and-6-echo.rs +++ b/examples/tcp-ipv4-and-6-echo.rs @@ -15,8 +15,9 @@ use async_std::task; async fn process(stream: TcpStream) -> io::Result<()> { println!("Accepted from: {}", stream.peer_addr()?); - let (reader, writer) = &mut (&stream, &stream); - io::copy(reader, writer).await?; + let mut reader = stream.clone(); + let mut writer = stream; + io::copy(&mut reader, &mut writer).await?; Ok(()) } diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index fe06a96..1d7e91a 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -1,6 +1,7 @@ use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; +use std::sync::Arc; use crate::future; use crate::io; @@ -75,9 +76,7 @@ impl TcpListener { /// [`local_addr`]: #method.local_addr pub async fn bind(addrs: A) -> io::Result { let mut last_err = None; - let addrs = addrs - .to_socket_addrs() - .await?; + let addrs = addrs.to_socket_addrs().await?; for addr in addrs { match mio::net::TcpListener::bind(&addr) { @@ -121,7 +120,7 @@ impl TcpListener { let mio_stream = mio::net::TcpStream::from_stream(io)?; let stream = TcpStream { - watcher: Watcher::new(mio_stream), + watcher: Arc::new(Watcher::new(mio_stream)), }; Ok((stream, addr)) } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 537bd4c..c9cdf5e 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -1,6 +1,7 @@ use std::io::{IoSlice, IoSliceMut, Read as _, Write as _}; use std::net::SocketAddr; use std::pin::Pin; +use std::sync::Arc; use crate::future; use crate::io::{self, Read, Write}; @@ -44,9 +45,9 @@ use crate::task::{Context, Poll}; /// # /// # Ok(()) }) } /// ``` -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TcpStream { - pub(super) watcher: Watcher, + pub(super) watcher: Arc>, } impl TcpStream { @@ -71,9 +72,7 @@ impl TcpStream { /// ``` pub async fn connect(addrs: A) -> io::Result { let mut last_err = None; - let addrs = addrs - .to_socket_addrs() - .await?; + let addrs = addrs.to_socket_addrs().await?; for addr in addrs { // mio's TcpStream::connect is non-blocking and may just be in progress @@ -84,16 +83,20 @@ impl TcpStream { Ok(s) => Watcher::new(s), Err(e) => { last_err = Some(e); - continue + continue; } }; future::poll_fn(|cx| watcher.poll_write_ready(cx)).await; match watcher.get_ref().take_error() { - Ok(None) => return Ok(TcpStream { watcher }), + Ok(None) => { + return Ok(TcpStream { + watcher: Arc::new(watcher), + }); + } Ok(Some(e)) => last_err = Some(e), - Err(e) => last_err = Some(e) + Err(e) => last_err = Some(e), } } @@ -369,7 +372,7 @@ impl From for TcpStream { fn from(stream: std::net::TcpStream) -> TcpStream { let mio_stream = mio::net::TcpStream::from_stream(stream).unwrap(); TcpStream { - watcher: Watcher::new(mio_stream), + watcher: Arc::new(Watcher::new(mio_stream)), } } } @@ -391,7 +394,10 @@ cfg_unix! { impl IntoRawFd for TcpStream { fn into_raw_fd(self) -> RawFd { - self.watcher.into_inner().into_raw_fd() + // TODO(stjepang): This does not mean `RawFd` is now the sole owner of the file + // descriptor because it's possible that there are other clones of this `TcpStream` + // using it at the same time. We should probably document that behavior. + self.as_raw_fd() } } } diff --git a/tests/tcp.rs b/tests/tcp.rs index 00fa3a0..d92cff0 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -94,3 +94,25 @@ fn smoke_async_stream_to_std_listener() -> io::Result<()> { Ok(()) } + +#[test] +fn cloned_streams() -> io::Result<()> { + task::block_on(async { + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + let mut stream = TcpStream::connect(&addr).await?; + let mut cloned_stream = stream.clone(); + let mut incoming = listener.incoming(); + let mut write_stream = incoming.next().await.unwrap()?; + write_stream.write_all(b"Each your doing").await?; + + let mut buf = [0; 15]; + stream.read_exact(&mut buf[..8]).await?; + cloned_stream.read_exact(&mut buf[8..]).await?; + + assert_eq!(&buf[..15], b"Each your doing"); + + Ok(()) + }) +}