diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index 3a08bb6..8f84f54 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -2,7 +2,7 @@ use std::cell::Cell; use std::io; use std::iter; use std::sync::atomic::{self, Ordering}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; @@ -22,6 +22,11 @@ thread_local! { static YIELD_NOW: Cell = Cell::new(false); } +struct Scheduler { + /// Set to `true` while a machine is polling the reactor. + polling: bool, +} + /// An async runtime. pub struct Runtime { /// The reactor. @@ -33,7 +38,11 @@ pub struct Runtime { /// Handles to local queues for stealing work. stealers: Vec>, + /// Machines to start machines: Vec>, + + /// The scheduler state. + sched: Mutex, } impl Runtime { @@ -57,6 +66,7 @@ impl Runtime { injector: Injector::new(), stealers, machines, + sched: Mutex::new(Scheduler { polling: false }), } } @@ -116,7 +126,25 @@ impl Runtime { /// This function might not poll the reactor at all so do not rely on it doing anything. Only /// use for optimization. fn quick_poll(&self) -> io::Result { - return self.reactor.poll(Some(Duration::from_secs(0))); + if let Ok(sched) = self.sched.try_lock() { + if !sched.polling { + return self.reactor.poll(Some(Duration::from_secs(0))); + } + } + Ok(false) + } + + fn poll(&self) -> io::Result { + let mut sched = self.sched.lock().unwrap(); + sched.polling = true; + drop(sched); + + let result = self.reactor.poll(None); + + let mut sched = self.sched.lock().unwrap(); + sched.polling = false; + + result } } @@ -242,7 +270,7 @@ impl Machine { continue; } - rt.reactor.poll(None).unwrap(); + rt.poll().unwrap(); runs = 0; fails = 0;