improve idle detection

fix/scheduler-park
dignifiedquire 5 years ago
parent 546ad3d287
commit 6306ad9df1

@ -35,6 +35,8 @@ struct Scheduler {
/// Set to `true` while a machine is polling the reactor. /// Set to `true` while a machine is polling the reactor.
polling: bool, polling: bool,
progress: bool,
/// Available threads. /// Available threads.
threads: Vec<ThreadState>, threads: Vec<ThreadState>,
@ -83,6 +85,7 @@ impl Runtime {
machines: Vec::with_capacity(*MAXPROCS), machines: Vec::with_capacity(*MAXPROCS),
threads, threads,
polling: false, polling: false,
progress: false,
}), }),
} }
} }
@ -127,7 +130,7 @@ impl Runtime {
// println!("getting idle thread"); // println!("getting idle thread");
let sched = self.sched.lock().unwrap(); let sched = self.sched.lock().unwrap();
'inner: for (i, thread) in sched.threads.iter().enumerate() { 'inner: for thread in sched.threads.iter() {
// grab the first parked thread // grab the first parked thread
if thread if thread
.parked .parked
@ -166,7 +169,7 @@ impl Runtime {
parker.park(); parker.park();
// TODO: shutdown if idle for too long // TODO: shutdown if idle for too long
} }
// println!("{} thread unparked {}", k, i); // println!("thread unparked {}", i);
// when this thread is unparked, retrieve machine // when this thread is unparked, retrieve machine
let m: Arc<Machine> = let m: Arc<Machine> =
machine_recv.recv().expect("failed to receive machine"); machine_recv.recv().expect("failed to receive machine");
@ -189,8 +192,6 @@ impl Runtime {
.send(m) .send(m)
.expect("failed to send machine to thread"); .expect("failed to send machine to thread");
// println!("started thread {}", i);
sched.threads.push(ThreadState { sched.threads.push(ThreadState {
unparker, unparker,
parked, parked,
@ -222,12 +223,15 @@ impl Runtime {
// If no machine has been polling the reactor in a while, that means the runtime is // If no machine has been polling the reactor in a while, that means the runtime is
// overloaded with work and we need to start another machine. // overloaded with work and we need to start another machine.
if !sched.polling { if !sched.polling {
if !sched.progress {
if let Some(p) = sched.processors.pop() { if let Some(p) = sched.processors.pop() {
let m = Arc::new(Machine::new(p)); let m = Arc::new(Machine::new(p));
to_start.push(m.clone()); to_start.push(m.clone());
sched.machines.push(m); sched.machines.push(m);
} }
} }
sched.progress = false;
}
to_start to_start
} }
@ -420,13 +424,16 @@ impl Machine {
sched.polling = false; sched.polling = false;
//println!("polling stop"); //println!("polling stop");
sched.machines.push(m); sched.machines.push(m);
sched.progress = true;
runs = 0; runs = 0;
fails = 0; fails = 0;
} }
// println!("thread break");
// When shutting down the thread, take the processor out if still available. // When shutting down the thread, take the processor out if still available.
let opt_p = self.processor.lock().take(); let opt_p = self.processor.lock().take();
// println!("processor {:?}", opt_p.is_some());
// Return the processor to the scheduler and remove the machine. // Return the processor to the scheduler and remove the machine.
if let Some(p) = opt_p { if let Some(p) = opt_p {
@ -435,6 +442,7 @@ impl Machine {
sched.processors.push(p); sched.processors.push(p);
sched.machines.retain(|elem| !ptr::eq(&**elem, self)); sched.machines.retain(|elem| !ptr::eq(&**elem, self));
} }
// println!("thread run stopped");
} }
} }

Loading…
Cancel
Save