diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index 3e810d0..a832b9f 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -11,6 +11,7 @@ use crossbeam_utils::thread::scope; use once_cell::unsync::OnceCell; use crate::rt::Reactor; +use crate::sync::Spinlock; use crate::task::Runnable; use crate::utils::{abort_on_panic, random}; @@ -58,7 +59,7 @@ impl Runtime { let stealers = machines .iter() - .map(|m| m.processor.worker.stealer()) + .map(|m| m.processor.lock().worker.stealer()) .collect(); Runtime { @@ -138,21 +139,20 @@ impl Runtime { /// A thread running a processor. struct Machine { /// Holds the processor until it gets stolen. - processor: Processor, + processor: Spinlock, } -unsafe impl Send for Machine {} -unsafe impl Sync for Machine {} - impl Machine { /// Creates a new machine running a processor. fn new(p: Processor) -> Machine { - Machine { processor: p } + Machine { + processor: Spinlock::new(p), + } } /// Schedules a task onto the machine. fn schedule(&self, rt: &Runtime, task: Runnable) { - self.processor.schedule(rt, task); + self.processor.lock().schedule(rt, task); } /// Finds the next runnable task. @@ -160,11 +160,11 @@ impl Machine { let mut retry = false; // First try finding a task in the local queue or in the global queue. - if let Some(task) = self.processor.pop_task() { + if let Some(task) = self.processor.lock().pop_task() { return Steal::Success(task); } - match self.processor.steal_from_global(rt) { + match self.processor.lock().steal_from_global(rt) { Steal::Empty => {} Steal::Retry => retry = true, Steal::Success(task) => return Steal::Success(task), @@ -176,12 +176,12 @@ impl Machine { // Try finding a task in the local queue, which might hold tasks woken by the reactor. If // the local queue is still empty, try stealing from other processors. if progress { - if let Some(task) = self.processor.pop_task() { + if let Some(task) = self.processor.lock().pop_task() { return Steal::Success(task); } } - match self.processor.steal_from_others(rt) { + match self.processor.lock().steal_from_others(rt) { Steal::Empty => {} Steal::Retry => retry = true, Steal::Success(task) => return Steal::Success(task), @@ -208,7 +208,7 @@ impl Machine { // Check if `task::yield_now()` was invoked and flush the slot if so. YIELD_NOW.with(|flag| { if flag.replace(false) { - self.processor.flush_slot(rt); + self.processor.lock().flush_slot(rt); } }); @@ -219,11 +219,12 @@ impl Machine { runs = 0; rt.quick_poll().unwrap(); - if let Steal::Success(task) = self.processor.steal_from_global(rt) { - self.processor.schedule(rt, task); + let p = self.processor.lock(); + if let Steal::Success(task) = p.steal_from_global(rt) { + p.schedule(rt, task); } - self.processor.flush_slot(rt); + p.flush_slot(rt); } // Try to find a runnable task.