diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index d4f3347..dc9f056 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -1,4 +1,5 @@ use std::cell::Cell; +use std::cell::RefCell; use std::io; use std::iter; use std::ptr; @@ -13,7 +14,6 @@ use crossbeam_utils::{ thread::scope, }; use once_cell::sync::Lazy; -use once_cell::unsync::OnceCell; use crate::rt::Reactor; use crate::sync::Spinlock; @@ -22,7 +22,7 @@ use crate::utils::{abort_on_panic, random}; thread_local! { /// A reference to the current machine, if the current thread runs tasks. - static MACHINE: OnceCell> = OnceCell::new(); + static MACHINE: RefCell>> = RefCell::new(None); /// This flag is set to true whenever `task::yield_now()` is invoked. static YIELD_NOW: Cell = Cell::new(false); @@ -105,7 +105,7 @@ impl Runtime { MACHINE.with(|machine| { // If the current thread is a worker thread, schedule it onto the current machine. // Otherwise, push it into the global task queue. - match machine.get() { + match &*machine.borrow() { None => { self.injector.push(task); self.notify(); @@ -130,12 +130,13 @@ impl Runtime { // println!("getting idle thread"); let sched = self.sched.lock().unwrap(); - 'inner: for thread in sched.threads.iter() { + 'inner: for (i, thread) in sched.threads.iter().enumerate() { // grab the first parked thread if thread .parked .compare_and_swap(true, false, Ordering::Acquire) { + println!("unpark thread {}", i); // transfer the machine thread .machine_sender @@ -165,21 +166,31 @@ impl Runtime { .spawn(move |_| { abort_on_panic(|| { loop { + println!("checking park loop {}", i); while parked2.load(Ordering::Acquire) { parker.park(); // TODO: shutdown if idle for too long } - // println!("thread unparked {}", i); + println!("thread unparked {}", i); // when this thread is unparked, retrieve machine let m: Arc = machine_recv.recv().expect("failed to receive machine"); + // store it in the thread local - let _ = MACHINE.with(|machine| machine.set(m.clone())); + MACHINE.with(|machine| { + *machine.borrow_mut() = Some(m.clone()); + }); // run it m.run(self); + // when run ends, go into parked mode again - parked2.store(false, Ordering::Relaxed); - // println!("thread parked {}", i); + { + MACHINE.with(|machine| { + *machine.borrow_mut() = None; + }); + parked2.store(true, Ordering::Relaxed); + println!("thread parked {}", i); + } } }) }) @@ -223,13 +234,14 @@ impl Runtime { // If no machine has been polling the reactor in a while, that means the runtime is // overloaded with work and we need to start another machine. if !sched.polling { - if !sched.progress { - if let Some(p) = sched.processors.pop() { - let m = Arc::new(Machine::new(p)); - to_start.push(m.clone()); - sched.machines.push(m); - } + dbg!(sched.progress, sched.polling); + // if !sched.progress { + if let Some(p) = sched.processors.pop() { + let m = Arc::new(Machine::new(p)); + to_start.push(m.clone()); + sched.machines.push(m); } + // } sched.progress = false; } @@ -429,11 +441,11 @@ impl Machine { runs = 0; fails = 0; } - // println!("thread break"); + println!("thread break"); // When shutting down the thread, take the processor out if still available. let opt_p = self.processor.lock().take(); - // println!("processor {:?}", opt_p.is_some()); + println!("processor {:?}", opt_p.is_some()); // Return the processor to the scheduler and remove the machine. if let Some(p) = opt_p { @@ -442,7 +454,7 @@ impl Machine { sched.processors.push(p); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); } - // println!("thread run stopped"); + println!("thread run stopped"); } }