From 124aa76c4eb005483dfe904cd37635e8082b76f1 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 19 Apr 2020 21:45:20 +0200 Subject: [PATCH] improve rescheduling --- src/rt/runtime.rs | 51 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index fb8d3587..5f5a86f0 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -50,6 +50,16 @@ struct Scheduler { machines: Vec>, } +impl Scheduler { + /// Get the next machine that has no work yet, if there is any. + fn next_idle_machine(&self) -> Option> { + self.machines + .iter() + .find(|m| !m.has_work()) + .map(|m| m.clone()) + } +} + struct ThreadState { unparker: Unparker, parked: Arc, @@ -258,8 +268,23 @@ impl Runtime { // run it m.run(self); - // when run ends, go into parked mode again + // when run ends { + // see if there are any available processors + let mut sched = self.sched.lock().unwrap(); + if let Some(p) = sched.processors.pop() { + // get a machine + if let Some(m) = sched.next_idle_machine(){ + *m.processor.lock() = Some(p); + MACHINE.with(|machine| { + machine.borrow_mut().replace(m); + }); + continue; + } + } + drop(sched); + + // go into parked mode, no work MACHINE.with(|machine| { *machine.borrow_mut() = None; }); @@ -314,9 +339,8 @@ impl Runtime { #[cfg(feature = "tracing")] self.poll_count.fetch_add(1, Ordering::Relaxed); // if !sched.progress { - if let Some(p) = sched.processors.pop() { - if let Some(m) = sched.machines.iter().find(|m| m.processor.lock().is_none()) { + if let Some(m) = sched.next_idle_machine() { // find idle m *m.processor.lock() = Some(p); to_start.push(m.clone()); @@ -370,6 +394,15 @@ impl Machine { } } + fn has_work(&self) -> bool { + if let Some(p) = &*self.processor.lock() { + // TODO: is this the right check? + p.has_work() + } else { + false + } + } + /// Schedules a task onto the machine. fn schedule(&self, rt: &Runtime, task: Runnable) { match self.processor.lock().as_mut() { @@ -498,12 +531,7 @@ impl Machine { // If another thread is already blocked on the reactor, there is no point in keeping // the current thread around since there is too little work to do. if sched.polling { - if sched.machines.len() > MIN_MACHINES { - break; - } else { - // thread::sleep(Duration::from_micros(10)); - continue; - } + break; } // Take out the machine associated with the current thread. @@ -566,6 +594,11 @@ impl Processor { } } + /// Is there any available work for this processor? + fn has_work(&self) -> bool { + self.slot.is_some() || !self.worker.is_empty() + } + /// Schedules a task to run on this processor. fn schedule(&mut self, rt: &Runtime, task: Runnable) { match self.slot.replace(task) {