From 77d36931129d16e0633bef29fad7c455d4c4325e Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 19 Apr 2020 17:56:37 +0200 Subject: [PATCH] add tracing ability --- Cargo.toml | 6 ++- src/rt/runtime.rs | 117 ++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 109 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d49eb95..288af23 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,13 +52,15 @@ alloc = [ "futures-core/alloc", "pin-project-lite", ] +tracing = [] [dependencies] async-attributes = { version = "1.1.1", optional = true } async-task = { version = "1.3.1", optional = true } broadcaster = { version = "1.0.0", optional = true } crossbeam-channel = { version = "0.4.2", optional = true } -crossbeam-deque = { version = "0.7.3", optional = true } +crossbeam-deque = { git = "https://github.com/stjepang/crossbeam", branch = "deque-len", optional = true} +# crossbeam-deque = { version = "0.7.3", optional = true } crossbeam-queue = { version = "0.2.0", optional = true } crossbeam-utils = { version = "0.7.2", optional = true } futures-core = { version = "0.3.4", optional = true, default-features = false } @@ -74,6 +76,7 @@ once_cell = { version = "1.3.1", optional = true } pin-project-lite = { version = "0.1.4", optional = true } pin-utils = { version = "0.1.0-alpha.4", optional = true } slab = { version = "0.4.2", optional = true } +log-update = "0.1.0" [dev-dependencies] femme = "1.3.0" @@ -89,3 +92,4 @@ required-features = ["unstable"] [[example]] name = "tcp-ipv4-and-6-echo" required-features = ["unstable"] + diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index dc9f056..fb8d358 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -31,6 +31,9 @@ thread_local! { /// Maximum number of OS threads = processors = machines static MAXPROCS: Lazy = Lazy::new(|| num_cpus::get().max(1)); +/// Minimum number of machines that are kept exeuting, to avoid starvation. +const MIN_MACHINES: usize = 2; + struct Scheduler { /// Set to `true` while a machine is polling the reactor. polling: bool, @@ -67,6 +70,9 @@ pub struct Runtime { /// The scheduler state. sched: Mutex, + + #[cfg(feature = "tracing")] + poll_count: atomic::AtomicUsize, } impl Runtime { @@ -80,6 +86,8 @@ impl Runtime { reactor: Reactor::new().unwrap(), injector: Injector::new(), stealers, + #[cfg(feature = "tracing")] + poll_count: atomic::AtomicUsize::new(0), sched: Mutex::new(Scheduler { processors, machines: Vec::with_capacity(*MAXPROCS), @@ -121,6 +129,73 @@ impl Runtime { let mut idle = 0; let mut delay = 0; + #[cfg(feature = "tracing")] + s.builder() + .name("async-std/trace".to_string()) + .spawn(|_| { + use log_update::LogUpdate; + use std::io::stdout; + let mut log_update = LogUpdate::new(stdout()).unwrap(); + + loop { + let (thread_list, machine_list, processor_list, polling) = { + let sched = self.sched.lock().unwrap(); + let thread_list = sched + .threads + .iter() + .map(|t| { + if t.parked.load(Ordering::Relaxed) { + "_" + } else { + "|" + } + }) + .fold(String::new(), |mut s, curr| { + s += " "; + s += curr; + s + }); + let machine_list = sched + .machines + .iter() + .map(|m| match &*m.processor.lock() { + Some(p) => { + let len = p.worker.len() + p.slot.is_some() as usize; + len.to_string() + } + None => "_".to_string(), + }) + .fold(String::new(), |mut s, curr| { + s += " "; + s += &curr; + s + }); + let processor_list = sched + .processors + .iter() + .map(|p| { + let len = p.worker.len() + p.slot.is_some() as usize; + len.to_string() + }) + .fold(String::new(), |mut s, curr| { + s += " "; + s += &curr; + s + }); + (thread_list, machine_list, processor_list, sched.polling) + }; + let glen = self.injector.len(); + let polls = self.poll_count.load(Ordering::Relaxed); + let msg = format!( + "GlobalQueue: {}\nPolls: {} - {}\nThreads:\n{}\nMachines:\n{}\nProcessors:\n{}\n", + glen, polls,polling, thread_list, machine_list, processor_list + ); + log_update.render(&msg).unwrap(); + thread::sleep(Duration::from_millis(10)); + } + }) + .expect("failed to start tracing"); + loop { // Get a list of new machines to start, if any need to be started. let machines = self.make_machines(); @@ -136,7 +211,7 @@ impl Runtime { .parked .compare_and_swap(true, false, Ordering::Acquire) { - println!("unpark thread {}", i); + // println!("unpark thread {}", i); // transfer the machine thread .machine_sender @@ -166,12 +241,12 @@ impl Runtime { .spawn(move |_| { abort_on_panic(|| { loop { - println!("checking park loop {}", i); + // println!("checking park loop {}", i); while parked2.load(Ordering::Acquire) { parker.park(); // TODO: shutdown if idle for too long } - println!("thread unparked {}", i); + // println!("thread unparked {}", i); // when this thread is unparked, retrieve machine let m: Arc = machine_recv.recv().expect("failed to receive machine"); @@ -189,7 +264,7 @@ impl Runtime { *machine.borrow_mut() = None; }); parked2.store(true, Ordering::Relaxed); - println!("thread parked {}", i); + // println!("thread parked {}", i); } } }) @@ -233,13 +308,24 @@ impl Runtime { // If no machine has been polling the reactor in a while, that means the runtime is // overloaded with work and we need to start another machine. - if !sched.polling { - dbg!(sched.progress, sched.polling); + // + // Also ensure that there are at least 2 running machiens to avoid starvation. + if !sched.polling || sched.machines.len() < MIN_MACHINES { + #[cfg(feature = "tracing")] + self.poll_count.fetch_add(1, Ordering::Relaxed); // if !sched.progress { + if let Some(p) = sched.processors.pop() { - let m = Arc::new(Machine::new(p)); - to_start.push(m.clone()); - sched.machines.push(m); + if let Some(m) = sched.machines.iter().find(|m| m.processor.lock().is_none()) { + // find idle m + *m.processor.lock() = Some(p); + to_start.push(m.clone()); + } else { + // no idle m + let m = Arc::new(Machine::new(p)); + to_start.push(m.clone()); + sched.machines.push(m); + } } // } sched.progress = false; @@ -412,7 +498,12 @@ impl Machine { // If another thread is already blocked on the reactor, there is no point in keeping // the current thread around since there is too little work to do. if sched.polling { - break; + if sched.machines.len() > MIN_MACHINES { + break; + } else { + // thread::sleep(Duration::from_micros(10)); + continue; + } } // Take out the machine associated with the current thread. @@ -441,11 +532,11 @@ impl Machine { runs = 0; fails = 0; } - println!("thread break"); + // println!("thread break"); // When shutting down the thread, take the processor out if still available. let opt_p = self.processor.lock().take(); - println!("processor {:?}", opt_p.is_some()); + // println!("processor {:?}", opt_p.is_some()); // Return the processor to the scheduler and remove the machine. if let Some(p) = opt_p { @@ -454,7 +545,7 @@ impl Machine { sched.processors.push(p); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); } - println!("thread run stopped"); + // println!("thread run stopped"); } }