mirror of
				https://github.com/async-rs/async-std.git
				synced 2025-10-26 14:26:37 +00:00 
			
		
		
		
	Merge pull request #772 from jbr/unixstream-clone
This commit is contained in:
		
						commit
						2b6c7fedff
					
				
					 3 changed files with 42 additions and 11 deletions
				
			
		|  | @ -13,6 +13,7 @@ use crate::io; | |||
| use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; | ||||
| use crate::path::Path; | ||||
| use crate::stream::Stream; | ||||
| use crate::sync::Arc; | ||||
| use crate::task::{Context, Poll}; | ||||
| 
 | ||||
| /// A Unix domain socket server, listening for connections.
 | ||||
|  | @ -92,7 +93,7 @@ impl UnixListener { | |||
|     pub async fn accept(&self) -> io::Result<(UnixStream, SocketAddr)> { | ||||
|         let (stream, addr) = self.watcher.accept().await?; | ||||
| 
 | ||||
|         Ok((UnixStream { watcher: stream }, addr)) | ||||
|         Ok((UnixStream { watcher: Arc::new(stream) }, addr)) | ||||
|     } | ||||
| 
 | ||||
|     /// Returns a stream of incoming connections.
 | ||||
|  |  | |||
|  | @ -11,6 +11,7 @@ use super::SocketAddr; | |||
| use crate::io::{self, Read, Write}; | ||||
| use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; | ||||
| use crate::path::Path; | ||||
| use crate::sync::Arc; | ||||
| use crate::task::{Context, Poll}; | ||||
| 
 | ||||
| /// A Unix stream socket.
 | ||||
|  | @ -36,8 +37,9 @@ use crate::task::{Context, Poll}; | |||
| /// #
 | ||||
| /// # Ok(()) }) }
 | ||||
| /// ```
 | ||||
| #[derive(Clone)] | ||||
| pub struct UnixStream { | ||||
|     pub(super) watcher: Async<StdUnixStream>, | ||||
|     pub(super) watcher: Arc<Async<StdUnixStream>>, | ||||
| } | ||||
| 
 | ||||
| impl UnixStream { | ||||
|  | @ -56,7 +58,7 @@ impl UnixStream { | |||
|     /// ```
 | ||||
|     pub async fn connect<P: AsRef<Path>>(path: P) -> io::Result<UnixStream> { | ||||
|         let path = path.as_ref().to_owned(); | ||||
|         let stream = Async::<StdUnixStream>::connect(path).await?; | ||||
|         let stream = Arc::new(Async::<StdUnixStream>::connect(path).await?); | ||||
| 
 | ||||
|         Ok(UnixStream { watcher: stream }) | ||||
|     } | ||||
|  | @ -78,8 +80,12 @@ impl UnixStream { | |||
|     /// ```
 | ||||
|     pub fn pair() -> io::Result<(UnixStream, UnixStream)> { | ||||
|         let (a, b) = Async::<StdUnixStream>::pair()?; | ||||
|         let a = UnixStream { watcher: a }; | ||||
|         let b = UnixStream { watcher: b }; | ||||
|         let a = UnixStream { | ||||
|             watcher: Arc::new(a), | ||||
|         }; | ||||
|         let b = UnixStream { | ||||
|             watcher: Arc::new(b), | ||||
|         }; | ||||
|         Ok((a, b)) | ||||
|     } | ||||
| 
 | ||||
|  | @ -158,7 +164,7 @@ impl Read for &UnixStream { | |||
|         cx: &mut Context<'_>, | ||||
|         buf: &mut [u8], | ||||
|     ) -> Poll<io::Result<usize>> { | ||||
|         Pin::new(&mut &self.watcher).poll_read(cx, buf) | ||||
|         Pin::new(&mut &*self.watcher).poll_read(cx, buf) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | @ -186,15 +192,15 @@ impl Write for &UnixStream { | |||
|         cx: &mut Context<'_>, | ||||
|         buf: &[u8], | ||||
|     ) -> Poll<io::Result<usize>> { | ||||
|         Pin::new(&mut &self.watcher).poll_write(cx, buf) | ||||
|         Pin::new(&mut &*self.watcher).poll_write(cx, buf) | ||||
|     } | ||||
| 
 | ||||
|     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||||
|         Pin::new(&mut &self.watcher).poll_flush(cx) | ||||
|         Pin::new(&mut &*self.watcher).poll_flush(cx) | ||||
|     } | ||||
| 
 | ||||
|     fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> { | ||||
|         Pin::new(&mut &self.watcher).poll_close(cx) | ||||
|         Pin::new(&mut &*self.watcher).poll_close(cx) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | @ -219,7 +225,7 @@ impl From<StdUnixStream> for UnixStream { | |||
|     /// Converts a `std::os::unix::net::UnixStream` into its asynchronous equivalent.
 | ||||
|     fn from(stream: StdUnixStream) -> UnixStream { | ||||
|         let stream = Async::new(stream).expect("UnixStream is known to be good"); | ||||
|         UnixStream { watcher: stream } | ||||
|         UnixStream { watcher: Arc::new(stream) } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | @ -238,6 +244,6 @@ impl FromRawFd for UnixStream { | |||
| 
 | ||||
| impl IntoRawFd for UnixStream { | ||||
|     fn into_raw_fd(self) -> RawFd { | ||||
|         self.watcher.into_raw_fd() | ||||
|         self.as_raw_fd() | ||||
|     } | ||||
| } | ||||
|  |  | |||
							
								
								
									
										24
									
								
								tests/uds.rs
									
									
									
									
									
								
							
							
						
						
									
										24
									
								
								tests/uds.rs
									
									
									
									
									
								
							|  | @ -94,3 +94,27 @@ async fn ping_pong_client(socket: &std::path::PathBuf, iterations: u32) -> std:: | |||
|     } | ||||
|     Ok(()) | ||||
| } | ||||
| 
 | ||||
| #[test] | ||||
| fn uds_clone() -> io::Result<()> { | ||||
|     task::block_on(async { | ||||
|         let tmp_dir = TempDir::new("socket_ping_pong").expect("Temp dir not created"); | ||||
|         let sock_path = tmp_dir.as_ref().join("sock"); | ||||
|         let input = UnixListener::bind(&sock_path).await?; | ||||
| 
 | ||||
|         let mut writer = UnixStream::connect(&sock_path).await?; | ||||
|         let mut reader = input.incoming().next().await.unwrap()?; | ||||
| 
 | ||||
|         writer.write(b"original").await.unwrap(); | ||||
|         let mut original_buf = [0; 8]; | ||||
|         reader.read(&mut original_buf).await?; | ||||
|         assert_eq!(&original_buf, b"original"); | ||||
| 
 | ||||
|         writer.clone().write(b"clone").await.unwrap(); | ||||
|         let mut clone_buf = [0; 5]; | ||||
|         reader.clone().read(&mut clone_buf).await?; | ||||
|         assert_eq!(&clone_buf, b"clone"); | ||||
| 
 | ||||
|         Ok(()) | ||||
|     }) | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue