mirror of
				https://github.com/async-rs/async-std.git
				synced 2025-10-28 23:36:37 +00:00 
			
		
		
		
	
						commit
						fc4e472599
					
				
					 23 changed files with 571 additions and 391 deletions
				
			
		|  | @ -26,6 +26,7 @@ default = [ | ||||||
|   "async-task", |   "async-task", | ||||||
|   "crossbeam-channel", |   "crossbeam-channel", | ||||||
|   "crossbeam-deque", |   "crossbeam-deque", | ||||||
|  |   "crossbeam-queue", | ||||||
|   "futures-timer", |   "futures-timer", | ||||||
|   "kv-log-macro", |   "kv-log-macro", | ||||||
|   "log", |   "log", | ||||||
|  | @ -58,6 +59,7 @@ async-task = { version = "1.3.1", optional = true } | ||||||
| broadcaster = { version = "1.0.0", optional = true } | broadcaster = { version = "1.0.0", optional = true } | ||||||
| crossbeam-channel = { version = "0.4.2", optional = true } | crossbeam-channel = { version = "0.4.2", optional = true } | ||||||
| crossbeam-deque = { version = "0.7.3", optional = true } | crossbeam-deque = { version = "0.7.3", optional = true } | ||||||
|  | crossbeam-queue = { version = "0.2.0", optional = true } | ||||||
| crossbeam-utils = { version = "0.7.2", optional = true } | crossbeam-utils = { version = "0.7.2", optional = true } | ||||||
| futures-core = { version = "0.3.4", optional = true, default-features = false } | futures-core = { version = "0.3.4", optional = true, default-features = false } | ||||||
| futures-io = { version = "0.3.4", optional = true } | futures-io = { version = "0.3.4", optional = true } | ||||||
|  |  | ||||||
|  | @ -270,6 +270,7 @@ cfg_default! { | ||||||
|     pub mod fs; |     pub mod fs; | ||||||
|     pub mod path; |     pub mod path; | ||||||
|     pub mod net; |     pub mod net; | ||||||
|  |     pub(crate) mod rt; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| cfg_unstable! { | cfg_unstable! { | ||||||
|  |  | ||||||
|  | @ -66,6 +66,5 @@ pub use tcp::{Incoming, TcpListener, TcpStream}; | ||||||
| pub use udp::UdpSocket; | pub use udp::UdpSocket; | ||||||
| 
 | 
 | ||||||
| mod addr; | mod addr; | ||||||
| pub(crate) mod driver; |  | ||||||
| mod tcp; | mod tcp; | ||||||
| mod udp; | mod udp; | ||||||
|  |  | ||||||
|  | @ -5,7 +5,7 @@ use std::sync::Arc; | ||||||
| 
 | 
 | ||||||
| use crate::future; | use crate::future; | ||||||
| use crate::io; | use crate::io; | ||||||
| use crate::net::driver::Watcher; | use crate::rt::Watcher; | ||||||
| use crate::net::{TcpStream, ToSocketAddrs}; | use crate::net::{TcpStream, ToSocketAddrs}; | ||||||
| use crate::stream::Stream; | use crate::stream::Stream; | ||||||
| use crate::task::{Context, Poll}; | use crate::task::{Context, Poll}; | ||||||
|  |  | ||||||
|  | @ -5,7 +5,7 @@ use std::sync::Arc; | ||||||
| 
 | 
 | ||||||
| use crate::future; | use crate::future; | ||||||
| use crate::io::{self, Read, Write}; | use crate::io::{self, Read, Write}; | ||||||
| use crate::net::driver::Watcher; | use crate::rt::Watcher; | ||||||
| use crate::net::ToSocketAddrs; | use crate::net::ToSocketAddrs; | ||||||
| use crate::task::{Context, Poll}; | use crate::task::{Context, Poll}; | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -3,8 +3,8 @@ use std::net::SocketAddr; | ||||||
| use std::net::{Ipv4Addr, Ipv6Addr}; | use std::net::{Ipv4Addr, Ipv6Addr}; | ||||||
| 
 | 
 | ||||||
| use crate::future; | use crate::future; | ||||||
| use crate::net::driver::Watcher; |  | ||||||
| use crate::net::ToSocketAddrs; | use crate::net::ToSocketAddrs; | ||||||
|  | use crate::rt::Watcher; | ||||||
| use crate::utils::Context as _; | use crate::utils::Context as _; | ||||||
| 
 | 
 | ||||||
| /// A UDP socket.
 | /// A UDP socket.
 | ||||||
|  |  | ||||||
|  | @ -8,7 +8,7 @@ use mio_uds; | ||||||
| use super::SocketAddr; | use super::SocketAddr; | ||||||
| use crate::future; | use crate::future; | ||||||
| use crate::io; | use crate::io; | ||||||
| use crate::net::driver::Watcher; | use crate::rt::Watcher; | ||||||
| use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; | use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; | ||||||
| use crate::path::Path; | use crate::path::Path; | ||||||
| use crate::task::spawn_blocking; | use crate::task::spawn_blocking; | ||||||
|  |  | ||||||
|  | @ -10,7 +10,7 @@ use super::SocketAddr; | ||||||
| use super::UnixStream; | use super::UnixStream; | ||||||
| use crate::future; | use crate::future; | ||||||
| use crate::io; | use crate::io; | ||||||
| use crate::net::driver::Watcher; | use crate::rt::Watcher; | ||||||
| use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; | use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; | ||||||
| use crate::path::Path; | use crate::path::Path; | ||||||
| use crate::stream::Stream; | use crate::stream::Stream; | ||||||
|  |  | ||||||
|  | @ -9,7 +9,7 @@ use mio_uds; | ||||||
| 
 | 
 | ||||||
| use super::SocketAddr; | use super::SocketAddr; | ||||||
| use crate::io::{self, Read, Write}; | use crate::io::{self, Read, Write}; | ||||||
| use crate::net::driver::Watcher; | use crate::rt::Watcher; | ||||||
| use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; | use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; | ||||||
| use crate::path::Path; | use crate::path::Path; | ||||||
| use crate::task::{spawn_blocking, Context, Poll}; | use crate::task::{spawn_blocking, Context, Poll}; | ||||||
|  |  | ||||||
							
								
								
									
										23
									
								
								src/rt/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										23
									
								
								src/rt/mod.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,23 @@ | ||||||
|  | //! The runtime.
 | ||||||
|  | 
 | ||||||
|  | use std::thread; | ||||||
|  | 
 | ||||||
|  | use once_cell::sync::Lazy; | ||||||
|  | 
 | ||||||
|  | use crate::utils::abort_on_panic; | ||||||
|  | 
 | ||||||
|  | pub use reactor::{Reactor, Watcher}; | ||||||
|  | pub use runtime::Runtime; | ||||||
|  | 
 | ||||||
|  | mod reactor; | ||||||
|  | mod runtime; | ||||||
|  | 
 | ||||||
|  | /// The global runtime.
 | ||||||
|  | pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| { | ||||||
|  |     thread::Builder::new() | ||||||
|  |         .name("async-std/runtime".to_string()) | ||||||
|  |         .spawn(|| abort_on_panic(|| RUNTIME.run())) | ||||||
|  |         .expect("cannot start a runtime thread"); | ||||||
|  | 
 | ||||||
|  |     Runtime::new() | ||||||
|  | }); | ||||||
|  | @ -1,13 +1,13 @@ | ||||||
| use std::fmt; | use std::fmt; | ||||||
| use std::sync::{Arc, Mutex}; | use std::sync::{Arc, Mutex}; | ||||||
|  | use std::time::Duration; | ||||||
| 
 | 
 | ||||||
| use mio::{self, Evented}; | use mio::{self, Evented}; | ||||||
| use once_cell::sync::Lazy; |  | ||||||
| use slab::Slab; | use slab::Slab; | ||||||
| 
 | 
 | ||||||
| use crate::io; | use crate::io; | ||||||
|  | use crate::rt::RUNTIME; | ||||||
| use crate::task::{Context, Poll, Waker}; | use crate::task::{Context, Poll, Waker}; | ||||||
| use crate::utils::abort_on_panic; |  | ||||||
| 
 | 
 | ||||||
| /// Data associated with a registered I/O handle.
 | /// Data associated with a registered I/O handle.
 | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
|  | @ -18,35 +18,18 @@ struct Entry { | ||||||
|     /// Tasks that are blocked on reading from this I/O handle.
 |     /// Tasks that are blocked on reading from this I/O handle.
 | ||||||
|     readers: Mutex<Readers>, |     readers: Mutex<Readers>, | ||||||
| 
 | 
 | ||||||
|     /// Thasks that are blocked on writing to this I/O handle.
 |     /// Tasks that are blocked on writing to this I/O handle.
 | ||||||
|     writers: Mutex<Writers>, |     writers: Mutex<Writers>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// The set of `Waker`s interested in read readiness.
 |  | ||||||
| #[derive(Debug)] |  | ||||||
| struct Readers { |  | ||||||
|     /// Flag indicating read readiness.
 |  | ||||||
|     /// (cf. `Watcher::poll_read_ready`)
 |  | ||||||
|     ready: bool, |  | ||||||
|     /// The `Waker`s blocked on reading.
 |  | ||||||
|     wakers: Vec<Waker> |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /// The set of `Waker`s interested in write readiness.
 |  | ||||||
| #[derive(Debug)] |  | ||||||
| struct Writers { |  | ||||||
|     /// Flag indicating write readiness.
 |  | ||||||
|     /// (cf. `Watcher::poll_write_ready`)
 |  | ||||||
|     ready: bool, |  | ||||||
|     /// The `Waker`s blocked on writing.
 |  | ||||||
|     wakers: Vec<Waker> |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /// The state of a networking driver.
 | /// The state of a networking driver.
 | ||||||
| struct Reactor { | pub struct Reactor { | ||||||
|     /// A mio instance that polls for new events.
 |     /// A mio instance that polls for new events.
 | ||||||
|     poller: mio::Poll, |     poller: mio::Poll, | ||||||
| 
 | 
 | ||||||
|  |     /// A list into which mio stores events.
 | ||||||
|  |     events: Mutex<mio::Events>, | ||||||
|  | 
 | ||||||
|     /// A collection of registered I/O handles.
 |     /// A collection of registered I/O handles.
 | ||||||
|     entries: Mutex<Slab<Arc<Entry>>>, |     entries: Mutex<Slab<Arc<Entry>>>, | ||||||
| 
 | 
 | ||||||
|  | @ -57,14 +40,35 @@ struct Reactor { | ||||||
|     notify_token: mio::Token, |     notify_token: mio::Token, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | /// The set of `Waker`s interested in read readiness.
 | ||||||
|  | #[derive(Debug)] | ||||||
|  | struct Readers { | ||||||
|  |     /// Flag indicating read readiness.
 | ||||||
|  |     /// (cf. `Watcher::poll_read_ready`)
 | ||||||
|  |     ready: bool, | ||||||
|  |     /// The `Waker`s blocked on reading.
 | ||||||
|  |     wakers: Vec<Waker>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /// The set of `Waker`s interested in write readiness.
 | ||||||
|  | #[derive(Debug)] | ||||||
|  | struct Writers { | ||||||
|  |     /// Flag indicating write readiness.
 | ||||||
|  |     /// (cf. `Watcher::poll_write_ready`)
 | ||||||
|  |     ready: bool, | ||||||
|  |     /// The `Waker`s blocked on writing.
 | ||||||
|  |     wakers: Vec<Waker>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
| impl Reactor { | impl Reactor { | ||||||
|     /// Creates a new reactor for polling I/O events.
 |     /// Creates a new reactor for polling I/O events.
 | ||||||
|     fn new() -> io::Result<Reactor> { |     pub fn new() -> io::Result<Reactor> { | ||||||
|         let poller = mio::Poll::new()?; |         let poller = mio::Poll::new()?; | ||||||
|         let notify_reg = mio::Registration::new2(); |         let notify_reg = mio::Registration::new2(); | ||||||
| 
 | 
 | ||||||
|         let mut reactor = Reactor { |         let mut reactor = Reactor { | ||||||
|             poller, |             poller, | ||||||
|  |             events: Mutex::new(mio::Events::with_capacity(1000)), | ||||||
|             entries: Mutex::new(Slab::new()), |             entries: Mutex::new(Slab::new()), | ||||||
|             notify_reg, |             notify_reg, | ||||||
|             notify_token: mio::Token(0), |             notify_token: mio::Token(0), | ||||||
|  | @ -88,8 +92,14 @@ impl Reactor { | ||||||
|         // Allocate an entry and insert it into the slab.
 |         // Allocate an entry and insert it into the slab.
 | ||||||
|         let entry = Arc::new(Entry { |         let entry = Arc::new(Entry { | ||||||
|             token, |             token, | ||||||
|             readers: Mutex::new(Readers { ready: false, wakers: Vec::new() }), |             readers: Mutex::new(Readers { | ||||||
|             writers: Mutex::new(Writers { ready: false, wakers: Vec::new() }), |                 ready: false, | ||||||
|  |                 wakers: Vec::new(), | ||||||
|  |             }), | ||||||
|  |             writers: Mutex::new(Writers { | ||||||
|  |                 ready: false, | ||||||
|  |                 wakers: Vec::new(), | ||||||
|  |             }), | ||||||
|         }); |         }); | ||||||
|         vacant.insert(entry.clone()); |         vacant.insert(entry.clone()); | ||||||
| 
 | 
 | ||||||
|  | @ -112,50 +122,32 @@ impl Reactor { | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     // fn notify(&self) {
 |     /// Notifies the reactor so that polling stops blocking.
 | ||||||
|     //     self.notify_reg
 |     pub fn notify(&self) -> io::Result<()> { | ||||||
|     //         .1
 |         self.notify_reg.1.set_readiness(mio::Ready::readable()) | ||||||
|     //         .set_readiness(mio::Ready::readable())
 |  | ||||||
|     //         .unwrap();
 |  | ||||||
|     // }
 |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| /// The state of the global networking driver.
 |  | ||||||
| static REACTOR: Lazy<Reactor> = Lazy::new(|| { |  | ||||||
|     // Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
 |  | ||||||
|     // handles.
 |  | ||||||
|     std::thread::Builder::new() |  | ||||||
|         .name("async-std/net".to_string()) |  | ||||||
|         .spawn(move || { |  | ||||||
|             // If the driver thread panics, there's not much we can do. It is not a
 |  | ||||||
|             // recoverable error and there is no place to propagate it into so we just abort.
 |  | ||||||
|             abort_on_panic(|| { |  | ||||||
|                 main_loop().expect("async networking thread has panicked"); |  | ||||||
|             }) |  | ||||||
|         }) |  | ||||||
|         .expect("cannot start a thread driving blocking tasks"); |  | ||||||
| 
 |  | ||||||
|     Reactor::new().expect("cannot initialize reactor") |  | ||||||
| }); |  | ||||||
| 
 |  | ||||||
|     /// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
 |     /// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
 | ||||||
| fn main_loop() -> io::Result<()> { |     ///
 | ||||||
|     let reactor = &REACTOR; |     /// Returns `Ok(true)` if at least one new task was woken.
 | ||||||
|     let mut events = mio::Events::with_capacity(1000); |     pub fn poll(&self, timeout: Option<Duration>) -> io::Result<bool> { | ||||||
|  |         let mut events = self.events.lock().unwrap(); | ||||||
| 
 | 
 | ||||||
|     loop { |  | ||||||
|         // Block on the poller until at least one new event comes in.
 |         // Block on the poller until at least one new event comes in.
 | ||||||
|         reactor.poller.poll(&mut events, None)?; |         self.poller.poll(&mut events, timeout)?; | ||||||
| 
 | 
 | ||||||
|         // Lock the entire entry table while we're processing new events.
 |         // Lock the entire entry table while we're processing new events.
 | ||||||
|         let entries = reactor.entries.lock().unwrap(); |         let entries = self.entries.lock().unwrap(); | ||||||
|  | 
 | ||||||
|  |         // The number of woken tasks.
 | ||||||
|  |         let mut progress = false; | ||||||
| 
 | 
 | ||||||
|         for event in events.iter() { |         for event in events.iter() { | ||||||
|             let token = event.token(); |             let token = event.token(); | ||||||
| 
 | 
 | ||||||
|             if token == reactor.notify_token { |             if token == self.notify_token { | ||||||
|                 // If this is the notification token, we just need the notification state.
 |                 // If this is the notification token, we just need the notification state.
 | ||||||
|                 reactor.notify_reg.1.set_readiness(mio::Ready::empty())?; |                 self.notify_reg.1.set_readiness(mio::Ready::empty())?; | ||||||
|             } else { |             } else { | ||||||
|                 // Otherwise, look for the entry associated with this token.
 |                 // Otherwise, look for the entry associated with this token.
 | ||||||
|                 if let Some(entry) = entries.get(token.0) { |                 if let Some(entry) = entries.get(token.0) { | ||||||
|  | @ -163,25 +155,31 @@ fn main_loop() -> io::Result<()> { | ||||||
|                     let readiness = event.readiness(); |                     let readiness = event.readiness(); | ||||||
| 
 | 
 | ||||||
|                     // Wake up reader tasks blocked on this I/O handle.
 |                     // Wake up reader tasks blocked on this I/O handle.
 | ||||||
|                     if !(readiness & reader_interests()).is_empty() { |                     let reader_interests = mio::Ready::all() - mio::Ready::writable(); | ||||||
|  |                     if !(readiness & reader_interests).is_empty() { | ||||||
|                         let mut readers = entry.readers.lock().unwrap(); |                         let mut readers = entry.readers.lock().unwrap(); | ||||||
|                         readers.ready = true; |                         readers.ready = true; | ||||||
|                         for w in readers.wakers.drain(..) { |                         for w in readers.wakers.drain(..) { | ||||||
|                             w.wake(); |                             w.wake(); | ||||||
|  |                             progress = true; | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
| 
 | 
 | ||||||
|                     // Wake up writer tasks blocked on this I/O handle.
 |                     // Wake up writer tasks blocked on this I/O handle.
 | ||||||
|                     if !(readiness & writer_interests()).is_empty() { |                     let writer_interests = mio::Ready::all() - mio::Ready::readable(); | ||||||
|  |                     if !(readiness & writer_interests).is_empty() { | ||||||
|                         let mut writers = entry.writers.lock().unwrap(); |                         let mut writers = entry.writers.lock().unwrap(); | ||||||
|                         writers.ready = true; |                         writers.ready = true; | ||||||
|                         for w in writers.wakers.drain(..) { |                         for w in writers.wakers.drain(..) { | ||||||
|                             w.wake(); |                             w.wake(); | ||||||
|  |                             progress = true; | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
|             } |             } | ||||||
|         } |         } | ||||||
|  | 
 | ||||||
|  |         Ok(progress) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -204,7 +202,8 @@ impl<T: Evented> Watcher<T> { | ||||||
|     /// lifetime of the returned I/O handle.
 |     /// lifetime of the returned I/O handle.
 | ||||||
|     pub fn new(source: T) -> Watcher<T> { |     pub fn new(source: T) -> Watcher<T> { | ||||||
|         Watcher { |         Watcher { | ||||||
|             entry: REACTOR |             entry: RUNTIME | ||||||
|  |                 .reactor() | ||||||
|                 .register(&source) |                 .register(&source) | ||||||
|                 .expect("cannot register an I/O event source"), |                 .expect("cannot register an I/O event source"), | ||||||
|             source: Some(source), |             source: Some(source), | ||||||
|  | @ -240,12 +239,11 @@ impl<T: Evented> Watcher<T> { | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         // Register the task if it isn't registered already.
 |         // Register the task if it isn't registered already.
 | ||||||
|  | 
 | ||||||
|         if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { |         if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { | ||||||
|             readers.wakers.push(cx.waker().clone()); |             readers.wakers.push(cx.waker().clone()); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         readers.ready = false; |  | ||||||
| 
 |  | ||||||
|         Poll::Pending |         Poll::Pending | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -281,8 +279,6 @@ impl<T: Evented> Watcher<T> { | ||||||
|             writers.wakers.push(cx.waker().clone()); |             writers.wakers.push(cx.waker().clone()); | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         writers.ready = false; |  | ||||||
| 
 |  | ||||||
|         Poll::Pending |         Poll::Pending | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  | @ -296,7 +292,7 @@ impl<T: Evented> Watcher<T> { | ||||||
|         // Lock the waker list.
 |         // Lock the waker list.
 | ||||||
|         let mut readers = self.entry.readers.lock().unwrap(); |         let mut readers = self.entry.readers.lock().unwrap(); | ||||||
|         if readers.ready { |         if readers.ready { | ||||||
|             return Poll::Ready(()) |             return Poll::Ready(()); | ||||||
|         } |         } | ||||||
|         // Register the task if it isn't registered already.
 |         // Register the task if it isn't registered already.
 | ||||||
|         if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { |         if readers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { | ||||||
|  | @ -314,7 +310,7 @@ impl<T: Evented> Watcher<T> { | ||||||
|         // Lock the waker list.
 |         // Lock the waker list.
 | ||||||
|         let mut writers = self.entry.writers.lock().unwrap(); |         let mut writers = self.entry.writers.lock().unwrap(); | ||||||
|         if writers.ready { |         if writers.ready { | ||||||
|             return Poll::Ready(()) |             return Poll::Ready(()); | ||||||
|         } |         } | ||||||
|         // Register the task if it isn't registered already.
 |         // Register the task if it isn't registered already.
 | ||||||
|         if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { |         if writers.wakers.iter().all(|w| !w.will_wake(cx.waker())) { | ||||||
|  | @ -329,7 +325,8 @@ impl<T: Evented> Watcher<T> { | ||||||
|     #[allow(dead_code)] |     #[allow(dead_code)] | ||||||
|     pub fn into_inner(mut self) -> T { |     pub fn into_inner(mut self) -> T { | ||||||
|         let source = self.source.take().unwrap(); |         let source = self.source.take().unwrap(); | ||||||
|         REACTOR |         RUNTIME | ||||||
|  |             .reactor() | ||||||
|             .deregister(&source, &self.entry) |             .deregister(&source, &self.entry) | ||||||
|             .expect("cannot deregister I/O event source"); |             .expect("cannot deregister I/O event source"); | ||||||
|         source |         source | ||||||
|  | @ -339,7 +336,8 @@ impl<T: Evented> Watcher<T> { | ||||||
| impl<T: Evented> Drop for Watcher<T> { | impl<T: Evented> Drop for Watcher<T> { | ||||||
|     fn drop(&mut self) { |     fn drop(&mut self) { | ||||||
|         if let Some(ref source) = self.source { |         if let Some(ref source) = self.source { | ||||||
|             REACTOR |             RUNTIME | ||||||
|  |                 .reactor() | ||||||
|                 .deregister(source, &self.entry) |                 .deregister(source, &self.entry) | ||||||
|                 .expect("cannot deregister I/O event source"); |                 .expect("cannot deregister I/O event source"); | ||||||
|         } |         } | ||||||
|  | @ -354,27 +352,3 @@ impl<T: Evented + fmt::Debug> fmt::Debug for Watcher<T> { | ||||||
|             .finish() |             .finish() | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 |  | ||||||
| /// Returns a mask containing flags that interest tasks reading from I/O handles.
 |  | ||||||
| #[inline] |  | ||||||
| fn reader_interests() -> mio::Ready { |  | ||||||
|     mio::Ready::all() - mio::Ready::writable() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /// Returns a mask containing flags that interest tasks writing into I/O handles.
 |  | ||||||
| #[inline] |  | ||||||
| fn writer_interests() -> mio::Ready { |  | ||||||
|     mio::Ready::writable() | hup() |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /// Returns a flag containing the hangup status.
 |  | ||||||
| #[inline] |  | ||||||
| fn hup() -> mio::Ready { |  | ||||||
|     #[cfg(unix)] |  | ||||||
|     let ready = mio::unix::UnixReady::hup().into(); |  | ||||||
| 
 |  | ||||||
|     #[cfg(not(unix))] |  | ||||||
|     let ready = mio::Ready::empty(); |  | ||||||
| 
 |  | ||||||
|     ready |  | ||||||
| } |  | ||||||
							
								
								
									
										343
									
								
								src/rt/runtime.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										343
									
								
								src/rt/runtime.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,343 @@ | ||||||
|  | use std::cell::Cell; | ||||||
|  | use std::io; | ||||||
|  | use std::iter; | ||||||
|  | use std::sync::atomic::{self, Ordering}; | ||||||
|  | use std::sync::{Arc, Mutex}; | ||||||
|  | use std::thread; | ||||||
|  | use std::time::Duration; | ||||||
|  | 
 | ||||||
|  | use crossbeam_deque::{Injector, Steal, Stealer, Worker}; | ||||||
|  | use crossbeam_utils::thread::scope; | ||||||
|  | use once_cell::unsync::OnceCell; | ||||||
|  | 
 | ||||||
|  | use crate::rt::Reactor; | ||||||
|  | use crate::sync::Spinlock; | ||||||
|  | use crate::task::Runnable; | ||||||
|  | use crate::utils::{abort_on_panic, random}; | ||||||
|  | 
 | ||||||
|  | thread_local! { | ||||||
|  |     /// A reference to the current machine, if the current thread runs tasks.
 | ||||||
|  |     static MACHINE: OnceCell<Arc<Machine>> = OnceCell::new(); | ||||||
|  | 
 | ||||||
|  |     /// This flag is set to true whenever `task::yield_now()` is invoked.
 | ||||||
|  |     static YIELD_NOW: Cell<bool> = Cell::new(false); | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | struct Scheduler { | ||||||
|  |     /// Set to `true` while a machine is polling the reactor.
 | ||||||
|  |     polling: bool, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /// An async runtime.
 | ||||||
|  | pub struct Runtime { | ||||||
|  |     /// The reactor.
 | ||||||
|  |     reactor: Reactor, | ||||||
|  | 
 | ||||||
|  |     /// The global queue of tasks.
 | ||||||
|  |     injector: Injector<Runnable>, | ||||||
|  | 
 | ||||||
|  |     /// Handles to local queues for stealing work.
 | ||||||
|  |     stealers: Vec<Stealer<Runnable>>, | ||||||
|  | 
 | ||||||
|  |     /// Machines to start
 | ||||||
|  |     machines: Vec<Arc<Machine>>, | ||||||
|  | 
 | ||||||
|  |     /// The scheduler state.
 | ||||||
|  |     sched: Mutex<Scheduler>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl Runtime { | ||||||
|  |     /// Creates a new runtime.
 | ||||||
|  |     pub fn new() -> Runtime { | ||||||
|  |         let cpus = num_cpus::get().max(1); | ||||||
|  |         let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect(); | ||||||
|  | 
 | ||||||
|  |         let machines: Vec<_> = processors | ||||||
|  |             .into_iter() | ||||||
|  |             .map(|p| Arc::new(Machine::new(p))) | ||||||
|  |             .collect(); | ||||||
|  | 
 | ||||||
|  |         let stealers = machines | ||||||
|  |             .iter() | ||||||
|  |             .map(|m| m.processor.lock().worker.stealer()) | ||||||
|  |             .collect(); | ||||||
|  | 
 | ||||||
|  |         Runtime { | ||||||
|  |             reactor: Reactor::new().unwrap(), | ||||||
|  |             injector: Injector::new(), | ||||||
|  |             stealers, | ||||||
|  |             machines, | ||||||
|  |             sched: Mutex::new(Scheduler { polling: false }), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Returns a reference to the reactor.
 | ||||||
|  |     pub fn reactor(&self) -> &Reactor { | ||||||
|  |         &self.reactor | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Flushes the task slot so that tasks get run more fairly.
 | ||||||
|  |     pub fn yield_now(&self) { | ||||||
|  |         YIELD_NOW.with(|flag| flag.set(true)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Schedules a task.
 | ||||||
|  |     pub fn schedule(&self, task: Runnable) { | ||||||
|  |         MACHINE.with(|machine| { | ||||||
|  |             // If the current thread is a worker thread, schedule it onto the current machine.
 | ||||||
|  |             // Otherwise, push it into the global task queue.
 | ||||||
|  |             match machine.get() { | ||||||
|  |                 None => { | ||||||
|  |                     self.injector.push(task); | ||||||
|  |                     self.notify(); | ||||||
|  |                 } | ||||||
|  |                 Some(m) => m.schedule(&self, task), | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Runs the runtime on the current thread.
 | ||||||
|  |     pub fn run(&self) { | ||||||
|  |         scope(|s| { | ||||||
|  |             for m in &self.machines { | ||||||
|  |                 s.builder() | ||||||
|  |                     .name("async-std/machine".to_string()) | ||||||
|  |                     .spawn(move |_| { | ||||||
|  |                         abort_on_panic(|| { | ||||||
|  |                             let _ = MACHINE.with(|machine| machine.set(m.clone())); | ||||||
|  |                             m.run(self); | ||||||
|  |                         }) | ||||||
|  |                     }) | ||||||
|  |                     .expect("cannot start a machine thread"); | ||||||
|  |             } | ||||||
|  |         }) | ||||||
|  |         .unwrap(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Unparks a thread polling the reactor.
 | ||||||
|  |     fn notify(&self) { | ||||||
|  |         atomic::fence(Ordering::SeqCst); | ||||||
|  |         self.reactor.notify().unwrap(); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Attempts to poll the reactor without blocking on it.
 | ||||||
|  |     ///
 | ||||||
|  |     /// Returns `Ok(true)` if at least one new task was woken.
 | ||||||
|  |     ///
 | ||||||
|  |     /// This function might not poll the reactor at all so do not rely on it doing anything. Only
 | ||||||
|  |     /// use for optimization.
 | ||||||
|  |     fn quick_poll(&self) -> io::Result<bool> { | ||||||
|  |         if let Ok(sched) = self.sched.try_lock() { | ||||||
|  |             if !sched.polling { | ||||||
|  |                 return self.reactor.poll(Some(Duration::from_secs(0))); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |         Ok(false) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /// A thread running a processor.
 | ||||||
|  | struct Machine { | ||||||
|  |     /// Holds the processor until it gets stolen.
 | ||||||
|  |     processor: Spinlock<Processor>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl Machine { | ||||||
|  |     /// Creates a new machine running a processor.
 | ||||||
|  |     fn new(p: Processor) -> Machine { | ||||||
|  |         Machine { | ||||||
|  |             processor: Spinlock::new(p), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Schedules a task onto the machine.
 | ||||||
|  |     fn schedule(&self, rt: &Runtime, task: Runnable) { | ||||||
|  |         self.processor.lock().schedule(rt, task); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Finds the next runnable task.
 | ||||||
|  |     fn find_task(&self, rt: &Runtime) -> Steal<Runnable> { | ||||||
|  |         let mut retry = false; | ||||||
|  | 
 | ||||||
|  |         // First try finding a task in the local queue or in the global queue.
 | ||||||
|  |         if let Some(task) = self.processor.lock().pop_task() { | ||||||
|  |             return Steal::Success(task); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         match self.processor.lock().steal_from_global(rt) { | ||||||
|  |             Steal::Empty => {} | ||||||
|  |             Steal::Retry => retry = true, | ||||||
|  |             Steal::Success(task) => return Steal::Success(task), | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         // Try polling the reactor, but don't block on it.
 | ||||||
|  |         let progress = rt.quick_poll().unwrap(); | ||||||
|  | 
 | ||||||
|  |         // Try finding a task in the local queue, which might hold tasks woken by the reactor. If
 | ||||||
|  |         // the local queue is still empty, try stealing from other processors.
 | ||||||
|  |         if progress { | ||||||
|  |             if let Some(task) = self.processor.lock().pop_task() { | ||||||
|  |                 return Steal::Success(task); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         match self.processor.lock().steal_from_others(rt) { | ||||||
|  |             Steal::Empty => {} | ||||||
|  |             Steal::Retry => retry = true, | ||||||
|  |             Steal::Success(task) => return Steal::Success(task), | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         if retry { Steal::Retry } else { Steal::Empty } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Runs the machine on the current thread.
 | ||||||
|  |     fn run(&self, rt: &Runtime) { | ||||||
|  |         /// Number of yields when no runnable task is found.
 | ||||||
|  |         const YIELDS: u32 = 3; | ||||||
|  |         /// Number of short sleeps when no runnable task in found.
 | ||||||
|  |         const SLEEPS: u32 = 10; | ||||||
|  |         /// Number of runs in a row before the global queue is inspected.
 | ||||||
|  |         const RUNS: u32 = 64; | ||||||
|  | 
 | ||||||
|  |         // The number of times the thread found work in a row.
 | ||||||
|  |         let mut runs = 0; | ||||||
|  |         // The number of times the thread didn't find work in a row.
 | ||||||
|  |         let mut fails = 0; | ||||||
|  | 
 | ||||||
|  |         loop { | ||||||
|  |             // Check if `task::yield_now()` was invoked and flush the slot if so.
 | ||||||
|  |             YIELD_NOW.with(|flag| { | ||||||
|  |                 if flag.replace(false) { | ||||||
|  |                     self.processor.lock().flush_slot(rt); | ||||||
|  |                 } | ||||||
|  |             }); | ||||||
|  | 
 | ||||||
|  |             // After a number of runs in a row, do some work to ensure no task is left behind
 | ||||||
|  |             // indefinitely. Poll the reactor, steal tasks from the global queue, and flush the
 | ||||||
|  |             // task slot.
 | ||||||
|  |             if runs >= RUNS { | ||||||
|  |                 runs = 0; | ||||||
|  |                 rt.quick_poll().unwrap(); | ||||||
|  | 
 | ||||||
|  |                 let mut p = self.processor.lock(); | ||||||
|  |                 if let Steal::Success(task) = p.steal_from_global(rt) { | ||||||
|  |                     p.schedule(rt, task); | ||||||
|  |                 } | ||||||
|  | 
 | ||||||
|  |                 p.flush_slot(rt); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             // Try to find a runnable task.
 | ||||||
|  |             if let Steal::Success(task) = self.find_task(rt) { | ||||||
|  |                 task.run(); | ||||||
|  |                 runs += 1; | ||||||
|  |                 fails = 0; | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             fails += 1; | ||||||
|  | 
 | ||||||
|  |             // Yield the current thread a few times.
 | ||||||
|  |             if fails <= YIELDS { | ||||||
|  |                 thread::yield_now(); | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             // Put the current thread to sleep a few times.
 | ||||||
|  |             if fails <= YIELDS + SLEEPS { | ||||||
|  |                 thread::sleep(Duration::from_micros(10)); | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             // One final check for available tasks while the scheduler is locked.
 | ||||||
|  |             if let Some(task) = iter::repeat_with(|| self.find_task(rt)) | ||||||
|  |                 .find(|s| !s.is_retry()) | ||||||
|  |                 .and_then(|s| s.success()) | ||||||
|  |             { | ||||||
|  |                 self.schedule(rt, task); | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             let mut sched = rt.sched.lock().unwrap(); | ||||||
|  | 
 | ||||||
|  |             if sched.polling { | ||||||
|  |                 thread::sleep(Duration::from_micros(10)); | ||||||
|  |                 continue; | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             sched.polling = true; | ||||||
|  |             drop(sched); | ||||||
|  | 
 | ||||||
|  |             rt.reactor.poll(None).unwrap(); | ||||||
|  | 
 | ||||||
|  |             let mut sched = rt.sched.lock().unwrap(); | ||||||
|  |             sched.polling = false; | ||||||
|  | 
 | ||||||
|  |             runs = 0; | ||||||
|  |             fails = 0; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | struct Processor { | ||||||
|  |     /// The local task queue.
 | ||||||
|  |     worker: Worker<Runnable>, | ||||||
|  | 
 | ||||||
|  |     /// Contains the next task to run as an optimization that skips the queue.
 | ||||||
|  |     slot: Option<Runnable>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl Processor { | ||||||
|  |     /// Creates a new processor.
 | ||||||
|  |     fn new() -> Processor { | ||||||
|  |         Processor { | ||||||
|  |             worker: Worker::new_fifo(), | ||||||
|  |             slot: None, | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Schedules a task to run on this processor.
 | ||||||
|  |     fn schedule(&mut self, rt: &Runtime, task: Runnable) { | ||||||
|  |         match self.slot.replace(task) { | ||||||
|  |             None => {} | ||||||
|  |             Some(task) => { | ||||||
|  |                 self.worker.push(task); | ||||||
|  |                 rt.notify(); | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Flushes a task from the slot into the local queue.
 | ||||||
|  |     fn flush_slot(&mut self, rt: &Runtime) { | ||||||
|  |         if let Some(task) = self.slot.take() { | ||||||
|  |             self.worker.push(task); | ||||||
|  |             rt.notify(); | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Pops a task from this processor.
 | ||||||
|  |     fn pop_task(&mut self) -> Option<Runnable> { | ||||||
|  |         self.slot.take().or_else(|| self.worker.pop()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Steals a task from the global queue.
 | ||||||
|  |     fn steal_from_global(&self, rt: &Runtime) -> Steal<Runnable> { | ||||||
|  |         rt.injector.steal_batch_and_pop(&self.worker) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Steals a task from other processors.
 | ||||||
|  |     fn steal_from_others(&self, rt: &Runtime) -> Steal<Runnable> { | ||||||
|  |         // Pick a random starting point in the list of queues.
 | ||||||
|  |         let len = rt.stealers.len(); | ||||||
|  |         let start = random(len as u32) as usize; | ||||||
|  | 
 | ||||||
|  |         // Create an iterator over stealers that starts from the chosen point.
 | ||||||
|  |         let (l, r) = rt.stealers.split_at(start); | ||||||
|  |         let stealers = r.iter().chain(l.iter()); | ||||||
|  | 
 | ||||||
|  |         // Try stealing a batch of tasks from each queue.
 | ||||||
|  |         stealers | ||||||
|  |             .map(|s| s.steal_batch_and_pop(&self.worker)) | ||||||
|  |             .collect() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | @ -192,3 +192,8 @@ cfg_unstable! { | ||||||
| 
 | 
 | ||||||
| pub(crate) mod waker_set; | pub(crate) mod waker_set; | ||||||
| pub(crate) use waker_set::WakerSet; | pub(crate) use waker_set::WakerSet; | ||||||
|  | 
 | ||||||
|  | cfg_default! { | ||||||
|  |     pub(crate) mod spin_lock; | ||||||
|  |     pub(crate) use spin_lock::Spinlock; | ||||||
|  | } | ||||||
|  |  | ||||||
							
								
								
									
										89
									
								
								src/sync/spin_lock.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										89
									
								
								src/sync/spin_lock.rs
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,89 @@ | ||||||
|  | use std::cell::UnsafeCell; | ||||||
|  | use std::ops::{Deref, DerefMut}; | ||||||
|  | use std::sync::atomic::{AtomicBool, Ordering}; | ||||||
|  | 
 | ||||||
|  | use crossbeam_utils::Backoff; | ||||||
|  | 
 | ||||||
|  | /// A simple spinlock.
 | ||||||
|  | #[derive(Debug)] | ||||||
|  | pub struct Spinlock<T> { | ||||||
|  |     locked: AtomicBool, | ||||||
|  |     value: UnsafeCell<T>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | unsafe impl<T: Send> Send for Spinlock<T> {} | ||||||
|  | unsafe impl<T: Send> Sync for Spinlock<T> {} | ||||||
|  | 
 | ||||||
|  | impl<T> Spinlock<T> { | ||||||
|  |     /// Returns a new spinlock initialized with `value`.
 | ||||||
|  |     pub const fn new(value: T) -> Spinlock<T> { | ||||||
|  |         Spinlock { | ||||||
|  |             locked: AtomicBool::new(false), | ||||||
|  |             value: UnsafeCell::new(value), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     /// Locks the spinlock.
 | ||||||
|  |     pub fn lock(&self) -> SpinlockGuard<'_, T> { | ||||||
|  |         let backoff = Backoff::new(); | ||||||
|  |         while self.locked.compare_and_swap(false, true, Ordering::Acquire) { | ||||||
|  |             backoff.snooze(); | ||||||
|  |         } | ||||||
|  |         SpinlockGuard { parent: self } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /// A guard holding a spinlock locked.
 | ||||||
|  | #[derive(Debug)] | ||||||
|  | pub struct SpinlockGuard<'a, T> { | ||||||
|  |     parent: &'a Spinlock<T>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | unsafe impl<T: Send> Send for SpinlockGuard<'_, T> {} | ||||||
|  | unsafe impl<T: Sync> Sync for SpinlockGuard<'_, T> {} | ||||||
|  | 
 | ||||||
|  | impl<'a, T> Drop for SpinlockGuard<'a, T> { | ||||||
|  |     fn drop(&mut self) { | ||||||
|  |         self.parent.locked.store(false, Ordering::Release); | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl<'a, T> Deref for SpinlockGuard<'a, T> { | ||||||
|  |     type Target = T; | ||||||
|  | 
 | ||||||
|  |     fn deref(&self) -> &T { | ||||||
|  |         unsafe { &*self.parent.value.get() } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl<'a, T> DerefMut for SpinlockGuard<'a, T> { | ||||||
|  |     fn deref_mut(&mut self) -> &mut T { | ||||||
|  |         unsafe { &mut *self.parent.value.get() } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[test] | ||||||
|  | fn spinlock() { | ||||||
|  |     use std::sync::Arc; | ||||||
|  | 
 | ||||||
|  |     use crate::sync::{Spinlock}; | ||||||
|  |     use crate::task; | ||||||
|  | 
 | ||||||
|  |     task::block_on(async { | ||||||
|  | 
 | ||||||
|  |         let m = Arc::new(Spinlock::new(0)); | ||||||
|  |         let mut tasks = vec![]; | ||||||
|  | 
 | ||||||
|  |         for _ in 0..10 { | ||||||
|  |             let m = m.clone(); | ||||||
|  |             tasks.push(task::spawn(async move { | ||||||
|  |                 *m.lock() += 1; | ||||||
|  |             })); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         for t in tasks { | ||||||
|  |             t.await; | ||||||
|  |         } | ||||||
|  |         assert_eq!(*m.lock(), 10); | ||||||
|  |     }) | ||||||
|  | } | ||||||
|  | @ -6,7 +6,6 @@ use std::task::{RawWaker, RawWakerVTable}; | ||||||
| 
 | 
 | ||||||
| use crossbeam_utils::sync::Parker; | use crossbeam_utils::sync::Parker; | ||||||
| use kv_log_macro::trace; | use kv_log_macro::trace; | ||||||
| use log::log_enabled; |  | ||||||
| 
 | 
 | ||||||
| use crate::task::{Context, Poll, Task, Waker}; | use crate::task::{Context, Poll, Task, Waker}; | ||||||
| 
 | 
 | ||||||
|  | @ -41,12 +40,10 @@ where | ||||||
|     let task = Task::new(None); |     let task = Task::new(None); | ||||||
| 
 | 
 | ||||||
|     // Log this `block_on` operation.
 |     // Log this `block_on` operation.
 | ||||||
|     if log_enabled!(log::Level::Trace) { |  | ||||||
|     trace!("block_on", { |     trace!("block_on", { | ||||||
|         task_id: task.id().0, |         task_id: task.id().0, | ||||||
|         parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), |         parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), | ||||||
|     }); |     }); | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     let future = async move { |     let future = async move { | ||||||
|         // Drop task-locals on exit.
 |         // Drop task-locals on exit.
 | ||||||
|  | @ -56,13 +53,9 @@ where | ||||||
| 
 | 
 | ||||||
|         // Log completion on exit.
 |         // Log completion on exit.
 | ||||||
|         defer! { |         defer! { | ||||||
|             if log_enabled!(log::Level::Trace) { |  | ||||||
|                 Task::get_current(|t| { |  | ||||||
|             trace!("completed", { |             trace!("completed", { | ||||||
|                         task_id: t.id().0, |                 task_id: Task::get_current(|t| t.id().0), | ||||||
|             }); |             }); | ||||||
|                 }); |  | ||||||
|             } |  | ||||||
|         } |         } | ||||||
| 
 | 
 | ||||||
|         future.await |         future.await | ||||||
|  |  | ||||||
|  | @ -1,9 +1,9 @@ | ||||||
| use kv_log_macro::trace; |  | ||||||
| use log::log_enabled; |  | ||||||
| use std::future::Future; | use std::future::Future; | ||||||
| 
 | 
 | ||||||
|  | use kv_log_macro::trace; | ||||||
|  | 
 | ||||||
| use crate::io; | use crate::io; | ||||||
| use crate::task::executor; | use crate::rt::RUNTIME; | ||||||
| use crate::task::{JoinHandle, Task}; | use crate::task::{JoinHandle, Task}; | ||||||
| use crate::utils::abort_on_panic; | use crate::utils::abort_on_panic; | ||||||
| 
 | 
 | ||||||
|  | @ -37,12 +37,10 @@ impl Builder { | ||||||
|         let task = Task::new(self.name); |         let task = Task::new(self.name); | ||||||
| 
 | 
 | ||||||
|         // Log this `spawn` operation.
 |         // Log this `spawn` operation.
 | ||||||
|         if log_enabled!(log::Level::Trace) { |  | ||||||
|         trace!("spawn", { |         trace!("spawn", { | ||||||
|             task_id: task.id().0, |             task_id: task.id().0, | ||||||
|             parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), |             parent_task_id: Task::get_current(|t| t.id().0).unwrap_or(0), | ||||||
|         }); |         }); | ||||||
|         } |  | ||||||
| 
 | 
 | ||||||
|         let future = async move { |         let future = async move { | ||||||
|             // Drop task-locals on exit.
 |             // Drop task-locals on exit.
 | ||||||
|  | @ -52,19 +50,15 @@ impl Builder { | ||||||
| 
 | 
 | ||||||
|             // Log completion on exit.
 |             // Log completion on exit.
 | ||||||
|             defer! { |             defer! { | ||||||
|                 if log_enabled!(log::Level::Trace) { |  | ||||||
|                     Task::get_current(|t| { |  | ||||||
|                 trace!("completed", { |                 trace!("completed", { | ||||||
|                             task_id: t.id().0, |                     task_id: Task::get_current(|t| t.id().0), | ||||||
|                 }); |                 }); | ||||||
|                     }); |  | ||||||
|                 } |  | ||||||
|             } |             } | ||||||
| 
 | 
 | ||||||
|             future.await |             future.await | ||||||
|         }; |         }; | ||||||
| 
 | 
 | ||||||
|         let schedule = move |t| executor::schedule(Runnable(t)); |         let schedule = move |t| RUNTIME.schedule(Runnable(t)); | ||||||
|         let (task, handle) = async_task::spawn(future, schedule, task); |         let (task, handle) = async_task::spawn(future, schedule, task); | ||||||
|         task.schedule(); |         task.schedule(); | ||||||
|         Ok(JoinHandle::new(handle)) |         Ok(JoinHandle::new(handle)) | ||||||
|  | @ -72,7 +66,7 @@ impl Builder { | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| /// A runnable task.
 | /// A runnable task.
 | ||||||
| pub(crate) struct Runnable(async_task::Task<Task>); | pub struct Runnable(async_task::Task<Task>); | ||||||
| 
 | 
 | ||||||
| impl Runnable { | impl Runnable { | ||||||
|     /// Runs the task by polling its future once.
 |     /// Runs the task by polling its future once.
 | ||||||
|  |  | ||||||
|  | @ -1,13 +0,0 @@ | ||||||
| //! Task executor.
 |  | ||||||
| //!
 |  | ||||||
| //! API bindings between `crate::task` and this module are very simple:
 |  | ||||||
| //!
 |  | ||||||
| //! * The only export is the `schedule` function.
 |  | ||||||
| //! * The only import is the `crate::task::Runnable` type.
 |  | ||||||
| 
 |  | ||||||
| pub(crate) use pool::schedule; |  | ||||||
| 
 |  | ||||||
| use sleepers::Sleepers; |  | ||||||
| 
 |  | ||||||
| mod pool; |  | ||||||
| mod sleepers; |  | ||||||
|  | @ -1,179 +0,0 @@ | ||||||
| use std::cell::Cell; |  | ||||||
| use std::iter; |  | ||||||
| use std::thread; |  | ||||||
| use std::time::Duration; |  | ||||||
| 
 |  | ||||||
| use crossbeam_deque::{Injector, Stealer, Worker}; |  | ||||||
| use once_cell::sync::Lazy; |  | ||||||
| use once_cell::unsync::OnceCell; |  | ||||||
| 
 |  | ||||||
| use crate::task::executor::Sleepers; |  | ||||||
| use crate::task::Runnable; |  | ||||||
| use crate::utils::{abort_on_panic, random}; |  | ||||||
| 
 |  | ||||||
| /// The state of an executor.
 |  | ||||||
| struct Pool { |  | ||||||
|     /// The global queue of tasks.
 |  | ||||||
|     injector: Injector<Runnable>, |  | ||||||
| 
 |  | ||||||
|     /// Handles to local queues for stealing work from worker threads.
 |  | ||||||
|     stealers: Vec<Stealer<Runnable>>, |  | ||||||
| 
 |  | ||||||
|     /// Used for putting idle workers to sleep and notifying them when new tasks come in.
 |  | ||||||
|     sleepers: Sleepers, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /// Global executor that runs spawned tasks.
 |  | ||||||
| static POOL: Lazy<Pool> = Lazy::new(|| { |  | ||||||
|     let num_threads = num_cpus::get().max(1); |  | ||||||
|     let mut stealers = Vec::new(); |  | ||||||
| 
 |  | ||||||
|     // Spawn worker threads.
 |  | ||||||
|     for _ in 0..num_threads { |  | ||||||
|         let worker = Worker::new_fifo(); |  | ||||||
|         stealers.push(worker.stealer()); |  | ||||||
| 
 |  | ||||||
|         let proc = Processor { |  | ||||||
|             worker, |  | ||||||
|             slot: Cell::new(None), |  | ||||||
|             slot_runs: Cell::new(0), |  | ||||||
|         }; |  | ||||||
| 
 |  | ||||||
|         thread::Builder::new() |  | ||||||
|             .name("async-std/executor".to_string()) |  | ||||||
|             .spawn(|| { |  | ||||||
|                 let _ = PROCESSOR.with(|p| p.set(proc)); |  | ||||||
|                 abort_on_panic(main_loop); |  | ||||||
|             }) |  | ||||||
|             .expect("cannot start a thread driving tasks"); |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     Pool { |  | ||||||
|         injector: Injector::new(), |  | ||||||
|         stealers, |  | ||||||
|         sleepers: Sleepers::new(), |  | ||||||
|     } |  | ||||||
| }); |  | ||||||
| 
 |  | ||||||
| /// The state of a worker thread.
 |  | ||||||
| struct Processor { |  | ||||||
|     /// The local task queue.
 |  | ||||||
|     worker: Worker<Runnable>, |  | ||||||
| 
 |  | ||||||
|     /// Contains the next task to run as an optimization that skips queues.
 |  | ||||||
|     slot: Cell<Option<Runnable>>, |  | ||||||
| 
 |  | ||||||
|     /// How many times in a row tasks have been taked from the slot rather than the queue.
 |  | ||||||
|     slot_runs: Cell<u32>, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| thread_local! { |  | ||||||
|     /// Worker thread state.
 |  | ||||||
|     static PROCESSOR: OnceCell<Processor> = OnceCell::new(); |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /// Schedules a new runnable task for execution.
 |  | ||||||
| pub(crate) fn schedule(task: Runnable) { |  | ||||||
|     PROCESSOR.with(|proc| { |  | ||||||
|         // If the current thread is a worker thread, store it into its task slot or push it into
 |  | ||||||
|         // its local task queue. Otherwise, push it into the global task queue.
 |  | ||||||
|         match proc.get() { |  | ||||||
|             Some(proc) => { |  | ||||||
|                 // Replace the task in the slot.
 |  | ||||||
|                 if let Some(task) = proc.slot.replace(Some(task)) { |  | ||||||
|                     // If the slot already contained a task, push it into the local task queue.
 |  | ||||||
|                     proc.worker.push(task); |  | ||||||
|                     POOL.sleepers.notify_one(); |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|             None => { |  | ||||||
|                 POOL.injector.push(task); |  | ||||||
|                 POOL.sleepers.notify_one(); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     }) |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /// Main loop running a worker thread.
 |  | ||||||
| fn main_loop() { |  | ||||||
|     /// Number of yields when no runnable task is found.
 |  | ||||||
|     const YIELDS: u32 = 3; |  | ||||||
|     /// Number of short sleeps when no runnable task in found.
 |  | ||||||
|     const SLEEPS: u32 = 1; |  | ||||||
| 
 |  | ||||||
|     // The number of times the thread didn't find work in a row.
 |  | ||||||
|     let mut fails = 0; |  | ||||||
| 
 |  | ||||||
|     loop { |  | ||||||
|         // Try to find a runnable task.
 |  | ||||||
|         match find_runnable() { |  | ||||||
|             Some(task) => { |  | ||||||
|                 fails = 0; |  | ||||||
| 
 |  | ||||||
|                 // Run the found task.
 |  | ||||||
|                 task.run(); |  | ||||||
|             } |  | ||||||
|             None => { |  | ||||||
|                 fails += 1; |  | ||||||
| 
 |  | ||||||
|                 // Yield the current thread or put it to sleep.
 |  | ||||||
|                 if fails <= YIELDS { |  | ||||||
|                     thread::yield_now(); |  | ||||||
|                 } else if fails <= YIELDS + SLEEPS { |  | ||||||
|                     thread::sleep(Duration::from_micros(10)); |  | ||||||
|                 } else { |  | ||||||
|                     POOL.sleepers.wait(); |  | ||||||
|                     fails = 0; |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| /// Find the next runnable task.
 |  | ||||||
| fn find_runnable() -> Option<Runnable> { |  | ||||||
|     /// Maximum number of times the slot can be used in a row.
 |  | ||||||
|     const SLOT_LIMIT: u32 = 16; |  | ||||||
| 
 |  | ||||||
|     PROCESSOR.with(|proc| { |  | ||||||
|         let proc = proc.get().unwrap(); |  | ||||||
| 
 |  | ||||||
|         // Try taking a task from the slot.
 |  | ||||||
|         let runs = proc.slot_runs.get(); |  | ||||||
|         if runs < SLOT_LIMIT { |  | ||||||
|             if let Some(task) = proc.slot.take() { |  | ||||||
|                 proc.slot_runs.set(runs + 1); |  | ||||||
|                 return Some(task); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|         proc.slot_runs.set(0); |  | ||||||
| 
 |  | ||||||
|         // Pop a task from the local queue, if not empty.
 |  | ||||||
|         proc.worker.pop().or_else(|| { |  | ||||||
|             // Otherwise, we need to look for a task elsewhere.
 |  | ||||||
|             iter::repeat_with(|| { |  | ||||||
|                 // Try stealing a batch of tasks from the global queue.
 |  | ||||||
|                 POOL.injector |  | ||||||
|                     .steal_batch_and_pop(&proc.worker) |  | ||||||
|                     // Or try stealing a batch of tasks from one of the other threads.
 |  | ||||||
|                     .or_else(|| { |  | ||||||
|                         // First, pick a random starting point in the list of local queues.
 |  | ||||||
|                         let len = POOL.stealers.len(); |  | ||||||
|                         let start = random(len as u32) as usize; |  | ||||||
| 
 |  | ||||||
|                         // Try stealing a batch of tasks from each local queue starting from the
 |  | ||||||
|                         // chosen point.
 |  | ||||||
|                         let (l, r) = POOL.stealers.split_at(start); |  | ||||||
|                         let stealers = r.iter().chain(l.iter()); |  | ||||||
|                         stealers |  | ||||||
|                             .map(|s| s.steal_batch_and_pop(&proc.worker)) |  | ||||||
|                             .collect() |  | ||||||
|                     }) |  | ||||||
|             }) |  | ||||||
|             // Loop while no task was stolen and any steal operation needs to be retried.
 |  | ||||||
|             .find(|s| !s.is_retry()) |  | ||||||
|             // Extract the stolen task, if there is one.
 |  | ||||||
|             .and_then(|s| s.success()) |  | ||||||
|         }) |  | ||||||
|     }) |  | ||||||
| } |  | ||||||
|  | @ -1,52 +0,0 @@ | ||||||
| use std::sync::atomic::{AtomicBool, Ordering}; |  | ||||||
| use std::sync::{Condvar, Mutex}; |  | ||||||
| 
 |  | ||||||
| /// The place where worker threads go to sleep.
 |  | ||||||
| ///
 |  | ||||||
| /// Similar to how thread parking works, if a notification comes up while no threads are sleeping,
 |  | ||||||
| /// the next thread that attempts to go to sleep will pick up the notification immediately.
 |  | ||||||
| pub struct Sleepers { |  | ||||||
|     /// How many threads are currently a sleep.
 |  | ||||||
|     sleep: Mutex<usize>, |  | ||||||
| 
 |  | ||||||
|     /// A condvar for notifying sleeping threads.
 |  | ||||||
|     wake: Condvar, |  | ||||||
| 
 |  | ||||||
|     /// Set to `true` if a notification came up while nobody was sleeping.
 |  | ||||||
|     notified: AtomicBool, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| impl Sleepers { |  | ||||||
|     /// Creates a new `Sleepers`.
 |  | ||||||
|     pub fn new() -> Sleepers { |  | ||||||
|         Sleepers { |  | ||||||
|             sleep: Mutex::new(0), |  | ||||||
|             wake: Condvar::new(), |  | ||||||
|             notified: AtomicBool::new(false), |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /// Puts the current thread to sleep.
 |  | ||||||
|     pub fn wait(&self) { |  | ||||||
|         let mut sleep = self.sleep.lock().unwrap(); |  | ||||||
| 
 |  | ||||||
|         if !self.notified.swap(false, Ordering::SeqCst) { |  | ||||||
|             *sleep += 1; |  | ||||||
|             let _ = self.wake.wait(sleep).unwrap(); |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /// Notifies one thread.
 |  | ||||||
|     pub fn notify_one(&self) { |  | ||||||
|         if !self.notified.load(Ordering::SeqCst) { |  | ||||||
|             let mut sleep = self.sleep.lock().unwrap(); |  | ||||||
| 
 |  | ||||||
|             if *sleep > 0 { |  | ||||||
|                 *sleep -= 1; |  | ||||||
|                 self.wake.notify_one(); |  | ||||||
|             } else { |  | ||||||
|                 self.notified.store(true, Ordering::SeqCst); |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  | @ -14,9 +14,6 @@ use crate::task::{Context, Poll, Task}; | ||||||
| #[derive(Debug)] | #[derive(Debug)] | ||||||
| pub struct JoinHandle<T>(async_task::JoinHandle<T, Task>); | pub struct JoinHandle<T>(async_task::JoinHandle<T, Task>); | ||||||
| 
 | 
 | ||||||
| unsafe impl<T> Send for JoinHandle<T> {} |  | ||||||
| unsafe impl<T> Sync for JoinHandle<T> {} |  | ||||||
| 
 |  | ||||||
| impl<T> JoinHandle<T> { | impl<T> JoinHandle<T> { | ||||||
|     /// Creates a new `JoinHandle`.
 |     /// Creates a new `JoinHandle`.
 | ||||||
|     pub(crate) fn new(inner: async_task::JoinHandle<T, Task>) -> JoinHandle<T> { |     pub(crate) fn new(inner: async_task::JoinHandle<T, Task>) -> JoinHandle<T> { | ||||||
|  |  | ||||||
|  | @ -141,13 +141,12 @@ cfg_default! { | ||||||
|     pub use spawn::spawn; |     pub use spawn::spawn; | ||||||
|     pub use task_local::{AccessError, LocalKey}; |     pub use task_local::{AccessError, LocalKey}; | ||||||
| 
 | 
 | ||||||
|     use builder::Runnable; |     pub(crate) use builder::Runnable; | ||||||
|     use task_local::LocalsMap; |     pub(crate) use task_local::LocalsMap; | ||||||
| 
 | 
 | ||||||
|     mod block_on; |     mod block_on; | ||||||
|     mod builder; |     mod builder; | ||||||
|     mod current; |     mod current; | ||||||
|     mod executor; |  | ||||||
|     mod join_handle; |     mod join_handle; | ||||||
|     mod sleep; |     mod sleep; | ||||||
|     mod spawn; |     mod spawn; | ||||||
|  |  | ||||||
|  | @ -31,7 +31,8 @@ use crate::utils::abort_on_panic; | ||||||
| ///
 | ///
 | ||||||
| /// task::spawn_blocking(|| {
 | /// task::spawn_blocking(|| {
 | ||||||
| ///     println!("long-running task here");
 | ///     println!("long-running task here");
 | ||||||
| /// }).await;
 | /// })
 | ||||||
|  | /// .await;
 | ||||||
| /// #
 | /// #
 | ||||||
| /// # })
 | /// # })
 | ||||||
| /// ```
 | /// ```
 | ||||||
|  | @ -50,14 +51,14 @@ where | ||||||
| 
 | 
 | ||||||
| type Runnable = async_task::Task<Task>; | type Runnable = async_task::Task<Task>; | ||||||
| 
 | 
 | ||||||
| /// The number of sleeping worker threads.
 |  | ||||||
| static SLEEPING: AtomicUsize = AtomicUsize::new(0); |  | ||||||
| 
 |  | ||||||
| struct Pool { | struct Pool { | ||||||
|     sender: Sender<Runnable>, |     sender: Sender<Runnable>, | ||||||
|     receiver: Receiver<Runnable>, |     receiver: Receiver<Runnable>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | /// The number of sleeping worker threads.
 | ||||||
|  | static SLEEPING: AtomicUsize = AtomicUsize::new(0); | ||||||
|  | 
 | ||||||
| static POOL: Lazy<Pool> = Lazy::new(|| { | static POOL: Lazy<Pool> = Lazy::new(|| { | ||||||
|     // Start a single worker thread waiting for the first task.
 |     // Start a single worker thread waiting for the first task.
 | ||||||
|     start_thread(); |     start_thread(); | ||||||
|  |  | ||||||
|  | @ -1,5 +1,5 @@ | ||||||
| use std::pin::Pin; |  | ||||||
| use std::future::Future; | use std::future::Future; | ||||||
|  | use std::pin::Pin; | ||||||
| 
 | 
 | ||||||
| use crate::task::{Context, Poll}; | use crate::task::{Context, Poll}; | ||||||
| 
 | 
 | ||||||
|  | @ -43,6 +43,10 @@ impl Future for YieldNow { | ||||||
|         if !self.0 { |         if !self.0 { | ||||||
|             self.0 = true; |             self.0 = true; | ||||||
|             cx.waker().wake_by_ref(); |             cx.waker().wake_by_ref(); | ||||||
|  | 
 | ||||||
|  |             #[cfg(feature = "default")] | ||||||
|  |             crate::rt::RUNTIME.yield_now(); | ||||||
|  | 
 | ||||||
|             Poll::Pending |             Poll::Pending | ||||||
|         } else { |         } else { | ||||||
|             Poll::Ready(()) |             Poll::Ready(()) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue