more work

fix/scheduler-park
dignifiedquire 5 years ago
parent 6306ad9df1
commit 00b8366d55

@ -1,4 +1,5 @@
use std::cell::Cell; use std::cell::Cell;
use std::cell::RefCell;
use std::io; use std::io;
use std::iter; use std::iter;
use std::ptr; use std::ptr;
@ -13,7 +14,6 @@ use crossbeam_utils::{
thread::scope, thread::scope,
}; };
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use once_cell::unsync::OnceCell;
use crate::rt::Reactor; use crate::rt::Reactor;
use crate::sync::Spinlock; use crate::sync::Spinlock;
@ -22,7 +22,7 @@ use crate::utils::{abort_on_panic, random};
thread_local! { thread_local! {
/// A reference to the current machine, if the current thread runs tasks. /// A reference to the current machine, if the current thread runs tasks.
static MACHINE: OnceCell<Arc<Machine>> = OnceCell::new(); static MACHINE: RefCell<Option<Arc<Machine>>> = RefCell::new(None);
/// This flag is set to true whenever `task::yield_now()` is invoked. /// This flag is set to true whenever `task::yield_now()` is invoked.
static YIELD_NOW: Cell<bool> = Cell::new(false); static YIELD_NOW: Cell<bool> = Cell::new(false);
@ -105,7 +105,7 @@ impl Runtime {
MACHINE.with(|machine| { MACHINE.with(|machine| {
// If the current thread is a worker thread, schedule it onto the current machine. // If the current thread is a worker thread, schedule it onto the current machine.
// Otherwise, push it into the global task queue. // Otherwise, push it into the global task queue.
match machine.get() { match &*machine.borrow() {
None => { None => {
self.injector.push(task); self.injector.push(task);
self.notify(); self.notify();
@ -130,12 +130,13 @@ impl Runtime {
// println!("getting idle thread"); // println!("getting idle thread");
let sched = self.sched.lock().unwrap(); let sched = self.sched.lock().unwrap();
'inner: for thread in sched.threads.iter() { 'inner: for (i, thread) in sched.threads.iter().enumerate() {
// grab the first parked thread // grab the first parked thread
if thread if thread
.parked .parked
.compare_and_swap(true, false, Ordering::Acquire) .compare_and_swap(true, false, Ordering::Acquire)
{ {
println!("unpark thread {}", i);
// transfer the machine // transfer the machine
thread thread
.machine_sender .machine_sender
@ -165,21 +166,31 @@ impl Runtime {
.spawn(move |_| { .spawn(move |_| {
abort_on_panic(|| { abort_on_panic(|| {
loop { loop {
println!("checking park loop {}", i);
while parked2.load(Ordering::Acquire) { while parked2.load(Ordering::Acquire) {
parker.park(); parker.park();
// TODO: shutdown if idle for too long // TODO: shutdown if idle for too long
} }
// println!("thread unparked {}", i); println!("thread unparked {}", i);
// when this thread is unparked, retrieve machine // when this thread is unparked, retrieve machine
let m: Arc<Machine> = let m: Arc<Machine> =
machine_recv.recv().expect("failed to receive machine"); machine_recv.recv().expect("failed to receive machine");
// store it in the thread local // store it in the thread local
let _ = MACHINE.with(|machine| machine.set(m.clone())); MACHINE.with(|machine| {
*machine.borrow_mut() = Some(m.clone());
});
// run it // run it
m.run(self); m.run(self);
// when run ends, go into parked mode again // when run ends, go into parked mode again
parked2.store(false, Ordering::Relaxed); {
// println!("thread parked {}", i); MACHINE.with(|machine| {
*machine.borrow_mut() = None;
});
parked2.store(true, Ordering::Relaxed);
println!("thread parked {}", i);
}
} }
}) })
}) })
@ -223,13 +234,14 @@ impl Runtime {
// If no machine has been polling the reactor in a while, that means the runtime is // If no machine has been polling the reactor in a while, that means the runtime is
// overloaded with work and we need to start another machine. // overloaded with work and we need to start another machine.
if !sched.polling { if !sched.polling {
if !sched.progress { dbg!(sched.progress, sched.polling);
if let Some(p) = sched.processors.pop() { // if !sched.progress {
let m = Arc::new(Machine::new(p)); if let Some(p) = sched.processors.pop() {
to_start.push(m.clone()); let m = Arc::new(Machine::new(p));
sched.machines.push(m); to_start.push(m.clone());
} sched.machines.push(m);
} }
// }
sched.progress = false; sched.progress = false;
} }
@ -429,11 +441,11 @@ impl Machine {
runs = 0; runs = 0;
fails = 0; fails = 0;
} }
// println!("thread break"); println!("thread break");
// When shutting down the thread, take the processor out if still available. // When shutting down the thread, take the processor out if still available.
let opt_p = self.processor.lock().take(); let opt_p = self.processor.lock().take();
// println!("processor {:?}", opt_p.is_some()); println!("processor {:?}", opt_p.is_some());
// Return the processor to the scheduler and remove the machine. // Return the processor to the scheduler and remove the machine.
if let Some(p) = opt_p { if let Some(p) = opt_p {
@ -442,7 +454,7 @@ impl Machine {
sched.processors.push(p); sched.processors.push(p);
sched.machines.retain(|elem| !ptr::eq(&**elem, self)); sched.machines.retain(|elem| !ptr::eq(&**elem, self));
} }
// println!("thread run stopped"); println!("thread run stopped");
} }
} }

Loading…
Cancel
Save