|
|
|
@ -8,7 +8,11 @@ use std::thread;
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
|
|
|
|
|
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
|
|
|
|
|
use crossbeam_utils::thread::scope;
|
|
|
|
|
use crossbeam_utils::{
|
|
|
|
|
sync::{Parker, Unparker},
|
|
|
|
|
thread::scope,
|
|
|
|
|
};
|
|
|
|
|
use once_cell::sync::Lazy;
|
|
|
|
|
use once_cell::unsync::OnceCell;
|
|
|
|
|
|
|
|
|
|
use crate::rt::Reactor;
|
|
|
|
@ -24,10 +28,16 @@ thread_local! {
|
|
|
|
|
static YIELD_NOW: Cell<bool> = Cell::new(false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Maximum number of OS threads = processors = machines
|
|
|
|
|
static MAXPROCS: Lazy<usize> = Lazy::new(|| num_cpus::get().max(1));
|
|
|
|
|
|
|
|
|
|
struct Scheduler {
|
|
|
|
|
/// Set to `true` while a machine is polling the reactor.
|
|
|
|
|
polling: bool,
|
|
|
|
|
|
|
|
|
|
/// Available threads.
|
|
|
|
|
threads: Vec<ThreadState>,
|
|
|
|
|
|
|
|
|
|
/// Idle processors.
|
|
|
|
|
processors: Vec<Processor>,
|
|
|
|
|
|
|
|
|
@ -35,6 +45,13 @@ struct Scheduler {
|
|
|
|
|
machines: Vec<Arc<Machine>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
struct ThreadState {
|
|
|
|
|
unparker: Unparker,
|
|
|
|
|
parked: Arc<atomic::AtomicBool>,
|
|
|
|
|
/// Used to transfer the machine into the thread.
|
|
|
|
|
machine_sender: crossbeam_channel::Sender<Arc<Machine>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// An async runtime.
|
|
|
|
|
pub struct Runtime {
|
|
|
|
|
/// The reactor.
|
|
|
|
@ -53,9 +70,9 @@ pub struct Runtime {
|
|
|
|
|
impl Runtime {
|
|
|
|
|
/// Creates a new runtime.
|
|
|
|
|
pub fn new() -> Runtime {
|
|
|
|
|
let cpus = num_cpus::get().max(1);
|
|
|
|
|
let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect();
|
|
|
|
|
let processors: Vec<_> = (0..*MAXPROCS).map(|_| Processor::new()).collect();
|
|
|
|
|
let stealers = processors.iter().map(|p| p.worker.stealer()).collect();
|
|
|
|
|
let threads = Vec::with_capacity(*MAXPROCS);
|
|
|
|
|
|
|
|
|
|
Runtime {
|
|
|
|
|
reactor: Reactor::new().unwrap(),
|
|
|
|
@ -63,7 +80,8 @@ impl Runtime {
|
|
|
|
|
stealers,
|
|
|
|
|
sched: Mutex::new(Scheduler {
|
|
|
|
|
processors,
|
|
|
|
|
machines: Vec::new(),
|
|
|
|
|
machines: Vec::with_capacity(*MAXPROCS),
|
|
|
|
|
threads,
|
|
|
|
|
polling: false,
|
|
|
|
|
}),
|
|
|
|
|
}
|
|
|
|
@ -102,18 +120,67 @@ impl Runtime {
|
|
|
|
|
|
|
|
|
|
loop {
|
|
|
|
|
// Get a list of new machines to start, if any need to be started.
|
|
|
|
|
for m in self.make_machines() {
|
|
|
|
|
let machines = self.make_machines();
|
|
|
|
|
for m in machines {
|
|
|
|
|
idle = 0;
|
|
|
|
|
|
|
|
|
|
// println!("getting idle thread");
|
|
|
|
|
let mut sched = self.sched.lock().unwrap();
|
|
|
|
|
'inner: for thread in &sched.threads {
|
|
|
|
|
// grab the first parked thread
|
|
|
|
|
if thread
|
|
|
|
|
.parked
|
|
|
|
|
.compare_and_swap(true, false, Ordering::Acquire)
|
|
|
|
|
{
|
|
|
|
|
// transfer the machine
|
|
|
|
|
thread
|
|
|
|
|
.machine_sender
|
|
|
|
|
.send(m)
|
|
|
|
|
.expect("failed to send machine to thread");
|
|
|
|
|
// unpark the thread
|
|
|
|
|
thread.unparker.unpark();
|
|
|
|
|
break 'inner;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// no idle thread available, check if we can spawn one
|
|
|
|
|
if sched.threads.len() < *MAXPROCS {
|
|
|
|
|
// we can spawn one, lets do it
|
|
|
|
|
let parked = Arc::new(atomic::AtomicBool::new(true));
|
|
|
|
|
let parked2 = parked.clone();
|
|
|
|
|
let (machine_sender, machine_recv) = crossbeam_channel::bounded(1);
|
|
|
|
|
let parker = Parker::new();
|
|
|
|
|
let unparker = parker.unparker().clone();
|
|
|
|
|
|
|
|
|
|
s.builder()
|
|
|
|
|
.name("async-std/machine".to_string())
|
|
|
|
|
.spawn(move |_| {
|
|
|
|
|
abort_on_panic(|| {
|
|
|
|
|
loop {
|
|
|
|
|
while parked2.load(Ordering::Acquire) {
|
|
|
|
|
parker.park();
|
|
|
|
|
}
|
|
|
|
|
// when this thread is unparked, retrieve machine
|
|
|
|
|
let m: Arc<Machine> =
|
|
|
|
|
machine_recv.recv().expect("failed to receive machine");
|
|
|
|
|
// store it in the thread local
|
|
|
|
|
let _ = MACHINE.with(|machine| machine.set(m.clone()));
|
|
|
|
|
// run it
|
|
|
|
|
m.run(self);
|
|
|
|
|
// when run ends, go into parked mode again
|
|
|
|
|
parked2.store(false, Ordering::Relaxed);
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
})
|
|
|
|
|
.expect("cannot start a machine thread");
|
|
|
|
|
|
|
|
|
|
sched.threads.push(ThreadState {
|
|
|
|
|
unparker,
|
|
|
|
|
parked,
|
|
|
|
|
machine_sender,
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
drop(sched);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Sleep for a bit longer if the scheduler state hasn't changed in a while.
|
|
|
|
@ -142,6 +209,7 @@ impl Runtime {
|
|
|
|
|
let m = Arc::new(Machine::new(p));
|
|
|
|
|
to_start.push(m.clone());
|
|
|
|
|
sched.machines.push(m);
|
|
|
|
|
assert!(sched.machines.len() <= *MAXPROCS);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -326,6 +394,7 @@ impl Machine {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// Unlock the schedule poll the reactor until new I/O events arrive.
|
|
|
|
|
// println!("polling start");
|
|
|
|
|
sched.polling = true;
|
|
|
|
|
drop(sched);
|
|
|
|
|
rt.reactor.poll(None).unwrap();
|
|
|
|
@ -333,6 +402,7 @@ impl Machine {
|
|
|
|
|
// Lock the scheduler again and re-register the machine.
|
|
|
|
|
sched = rt.sched.lock().unwrap();
|
|
|
|
|
sched.polling = false;
|
|
|
|
|
//println!("polling stop");
|
|
|
|
|
sched.machines.push(m);
|
|
|
|
|
|
|
|
|
|
runs = 0;
|
|
|
|
@ -344,9 +414,14 @@ impl Machine {
|
|
|
|
|
|
|
|
|
|
// Return the processor to the scheduler and remove the machine.
|
|
|
|
|
if let Some(p) = opt_p {
|
|
|
|
|
// println!("returning processor to pool");
|
|
|
|
|
let mut sched = rt.sched.lock().unwrap();
|
|
|
|
|
sched.processors.push(p);
|
|
|
|
|
assert!(sched.processors.len() <= *MAXPROCS);
|
|
|
|
|
// println!("machines {}", sched.machines.len());
|
|
|
|
|
sched.machines.retain(|elem| !ptr::eq(&**elem, self));
|
|
|
|
|
// println!("machines retained {}", sched.machines.len());
|
|
|
|
|
assert!(sched.machines.len() <= *MAXPROCS);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|