diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index 9227d24..efdc314 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -122,11 +122,12 @@ impl Runtime { // Get a list of new machines to start, if any need to be started. let machines = self.make_machines(); for m in machines { + // println!("{} -- looking for thread", k); idle = 0; // println!("getting idle thread"); - let mut sched = self.sched.lock().unwrap(); - 'inner: for thread in &sched.threads { + let sched = self.sched.lock().unwrap(); + 'inner: for (i, thread) in sched.threads.iter().enumerate() { // grab the first parked thread if thread .parked @@ -135,18 +136,22 @@ impl Runtime { // transfer the machine thread .machine_sender - .send(m) + .send(m.clone()) .expect("failed to send machine to thread"); // unpark the thread thread.unparker.unpark(); + // println!("{} found thread to unpark {}", k, i); break 'inner; } } - + let len = sched.threads.len(); + drop(sched); // no idle thread available, check if we can spawn one - if sched.threads.len() < *MAXPROCS { + if len < *MAXPROCS { + let i = len; + // println!("{} spawning thread {}", k, i); // we can spawn one, lets do it - let parked = Arc::new(atomic::AtomicBool::new(true)); + let parked = Arc::new(atomic::AtomicBool::new(false)); let parked2 = parked.clone(); let (machine_sender, machine_recv) = crossbeam_channel::bounded(1); let parker = Parker::new(); @@ -159,7 +164,9 @@ impl Runtime { loop { while parked2.load(Ordering::Acquire) { parker.park(); + // TODO: shutdown if idle for too long } + // println!("{} thread unparked {}", k, i); // when this thread is unparked, retrieve machine let m: Arc = machine_recv.recv().expect("failed to receive machine"); @@ -169,18 +176,28 @@ impl Runtime { m.run(self); // when run ends, go into parked mode again parked2.store(false, Ordering::Relaxed); + // println!("thread parked {}", i); } }) }) .expect("cannot start a machine thread"); + let mut sched = self.sched.lock().unwrap(); + + // transfer the machine + machine_sender + .send(m) + .expect("failed to send machine to thread"); + + // println!("started thread {}", i); + sched.threads.push(ThreadState { unparker, parked, machine_sender, }); + drop(sched); } - drop(sched); } // Sleep for a bit longer if the scheduler state hasn't changed in a while. @@ -209,7 +226,6 @@ impl Runtime { let m = Arc::new(Machine::new(p)); to_start.push(m.clone()); sched.machines.push(m); - assert!(sched.machines.len() <= *MAXPROCS); } } @@ -417,11 +433,7 @@ impl Machine { // println!("returning processor to pool"); let mut sched = rt.sched.lock().unwrap(); sched.processors.push(p); - assert!(sched.processors.len() <= *MAXPROCS); - // println!("machines {}", sched.machines.len()); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); - // println!("machines retained {}", sched.machines.len()); - assert!(sched.machines.len() <= *MAXPROCS); } } }