fix start state

fix/scheduler-park
dignifiedquire 5 years ago
parent bc8677ed09
commit 546ad3d287

@ -122,11 +122,12 @@ impl Runtime {
// Get a list of new machines to start, if any need to be started. // Get a list of new machines to start, if any need to be started.
let machines = self.make_machines(); let machines = self.make_machines();
for m in machines { for m in machines {
// println!("{} -- looking for thread", k);
idle = 0; idle = 0;
// println!("getting idle thread"); // println!("getting idle thread");
let mut sched = self.sched.lock().unwrap(); let sched = self.sched.lock().unwrap();
'inner: for thread in &sched.threads { 'inner: for (i, thread) in sched.threads.iter().enumerate() {
// grab the first parked thread // grab the first parked thread
if thread if thread
.parked .parked
@ -135,18 +136,22 @@ impl Runtime {
// transfer the machine // transfer the machine
thread thread
.machine_sender .machine_sender
.send(m) .send(m.clone())
.expect("failed to send machine to thread"); .expect("failed to send machine to thread");
// unpark the thread // unpark the thread
thread.unparker.unpark(); thread.unparker.unpark();
// println!("{} found thread to unpark {}", k, i);
break 'inner; break 'inner;
} }
} }
let len = sched.threads.len();
drop(sched);
// no idle thread available, check if we can spawn one // no idle thread available, check if we can spawn one
if sched.threads.len() < *MAXPROCS { if len < *MAXPROCS {
let i = len;
// println!("{} spawning thread {}", k, i);
// we can spawn one, lets do it // we can spawn one, lets do it
let parked = Arc::new(atomic::AtomicBool::new(true)); let parked = Arc::new(atomic::AtomicBool::new(false));
let parked2 = parked.clone(); let parked2 = parked.clone();
let (machine_sender, machine_recv) = crossbeam_channel::bounded(1); let (machine_sender, machine_recv) = crossbeam_channel::bounded(1);
let parker = Parker::new(); let parker = Parker::new();
@ -159,7 +164,9 @@ impl Runtime {
loop { loop {
while parked2.load(Ordering::Acquire) { while parked2.load(Ordering::Acquire) {
parker.park(); parker.park();
// TODO: shutdown if idle for too long
} }
// println!("{} thread unparked {}", k, i);
// when this thread is unparked, retrieve machine // when this thread is unparked, retrieve machine
let m: Arc<Machine> = let m: Arc<Machine> =
machine_recv.recv().expect("failed to receive machine"); machine_recv.recv().expect("failed to receive machine");
@ -169,18 +176,28 @@ impl Runtime {
m.run(self); m.run(self);
// when run ends, go into parked mode again // when run ends, go into parked mode again
parked2.store(false, Ordering::Relaxed); parked2.store(false, Ordering::Relaxed);
// println!("thread parked {}", i);
} }
}) })
}) })
.expect("cannot start a machine thread"); .expect("cannot start a machine thread");
let mut sched = self.sched.lock().unwrap();
// transfer the machine
machine_sender
.send(m)
.expect("failed to send machine to thread");
// println!("started thread {}", i);
sched.threads.push(ThreadState { sched.threads.push(ThreadState {
unparker, unparker,
parked, parked,
machine_sender, machine_sender,
}); });
drop(sched);
} }
drop(sched);
} }
// Sleep for a bit longer if the scheduler state hasn't changed in a while. // Sleep for a bit longer if the scheduler state hasn't changed in a while.
@ -209,7 +226,6 @@ impl Runtime {
let m = Arc::new(Machine::new(p)); let m = Arc::new(Machine::new(p));
to_start.push(m.clone()); to_start.push(m.clone());
sched.machines.push(m); sched.machines.push(m);
assert!(sched.machines.len() <= *MAXPROCS);
} }
} }
@ -417,11 +433,7 @@ impl Machine {
// println!("returning processor to pool"); // println!("returning processor to pool");
let mut sched = rt.sched.lock().unwrap(); let mut sched = rt.sched.lock().unwrap();
sched.processors.push(p); sched.processors.push(p);
assert!(sched.processors.len() <= *MAXPROCS);
// println!("machines {}", sched.machines.len());
sched.machines.retain(|elem| !ptr::eq(&**elem, self)); sched.machines.retain(|elem| !ptr::eq(&**elem, self));
// println!("machines retained {}", sched.machines.len());
assert!(sched.machines.len() <= *MAXPROCS);
} }
} }
} }

Loading…
Cancel
Save