Use once_cell instead of lazy_static (#416)

`once_cell` provides a neat way of initializing lazy singletons without
macro. This PR use `sync::Lazy` to streamline same pattern proposed in
related rust RFC.

Resolve #406
pull/420/head
Wu Yu Wei 5 years ago committed by Stjepan Glavina
parent da795dec7b
commit ff6a44fcd5

@ -33,12 +33,12 @@ crossbeam-utils = "0.6.6"
futures-core-preview = "=0.3.0-alpha.19"
futures-io-preview = "=0.3.0-alpha.19"
futures-timer = "1.0.2"
lazy_static = "1.4.0"
log = { version = "0.4.8", features = ["kv_unstable"] }
memchr = "2.2.1"
mio = "0.6.19"
mio-uds = "0.6.7"
num_cpus = "1.10.1"
once_cell = "1.2.0"
pin-utils = "0.1.0-alpha.4"
slab = "0.4.2"
kv-log-macro = "1.0.4"

@ -1,8 +1,8 @@
use std::fmt;
use std::sync::{Arc, Mutex};
use lazy_static::lazy_static;
use mio::{self, Evented};
use once_cell::sync::Lazy;
use slab::Slab;
use crate::io;
@ -100,25 +100,23 @@ impl Reactor {
// }
}
lazy_static! {
/// The state of the global networking driver.
static ref REACTOR: Reactor = {
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
// handles.
std::thread::Builder::new()
.name("async-net-driver".to_string())
.spawn(move || {
// If the driver thread panics, there's not much we can do. It is not a
// recoverable error and there is no place to propagate it into so we just abort.
abort_on_panic(|| {
main_loop().expect("async networking thread has panicked");
})
/// The state of the global networking driver.
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
// Spawn a thread that waits on the poller for new events and wakes up tasks blocked on I/O
// handles.
std::thread::Builder::new()
.name("async-net-driver".to_string())
.spawn(move || {
// If the driver thread panics, there's not much we can do. It is not a
// recoverable error and there is no place to propagate it into so we just abort.
abort_on_panic(|| {
main_loop().expect("async networking thread has panicked");
})
.expect("cannot start a thread driving blocking tasks");
})
.expect("cannot start a thread driving blocking tasks");
Reactor::new().expect("cannot initialize reactor")
};
}
Reactor::new().expect("cannot initialize reactor")
});
/// Waits on the poller for new events and wakes up tasks blocked on I/O handles.
fn main_loop() -> io::Result<()> {

@ -5,7 +5,7 @@ use std::thread;
use std::time::Duration;
use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use crate::task::task::{JoinHandle, Tag};
use crate::utils::abort_on_panic;
@ -19,30 +19,30 @@ struct Pool {
receiver: Receiver<async_task::Task<Tag>>,
}
lazy_static! {
static ref POOL: Pool = {
for _ in 0..2 {
thread::Builder::new()
.name("async-blocking-driver".to_string())
.spawn(|| abort_on_panic(|| {
static POOL: Lazy<Pool> = Lazy::new(|| {
for _ in 0..2 {
thread::Builder::new()
.name("async-blocking-driver".to_string())
.spawn(|| {
abort_on_panic(|| {
for task in &POOL.receiver {
task.run();
}
}))
.expect("cannot start a thread driving blocking tasks");
}
// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
// the number of buffers that work must flow through
// before being acted on by a core. This helps keep
// latency snappy in the overall async system by
// reducing bufferbloat.
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
};
}
})
})
.expect("cannot start a thread driving blocking tasks");
}
// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
// the number of buffers that work must flow through
// before being acted on by a core. This helps keep
// latency snappy in the overall async system by
// reducing bufferbloat.
let (sender, receiver) = bounded(0);
Pool { sender, receiver }
});
// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't

@ -3,7 +3,7 @@ use std::thread;
use crossbeam_deque::{Injector, Stealer, Worker};
use kv_log_macro::trace;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use super::sleepers::Sleepers;
use super::task;
@ -111,28 +111,26 @@ impl Pool {
#[inline]
pub(crate) fn get() -> &'static Pool {
lazy_static! {
static ref POOL: Pool = {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
static POOL: Lazy<Pool> = Lazy::new(|| {
let num_threads = num_cpus::get().max(1);
let mut stealers = Vec::new();
// Spawn worker threads.
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
// Spawn worker threads.
for _ in 0..num_threads {
let worker = Worker::new_fifo();
stealers.push(worker.stealer());
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| abort_on_panic(|| worker::main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
thread::Builder::new()
.name("async-task-driver".to_string())
.spawn(|| abort_on_panic(|| worker::main_loop(worker)))
.expect("cannot start a thread driving tasks");
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
};
}
Pool {
injector: Injector::new(),
stealers,
sleepers: Sleepers::new(),
}
});
&*POOL
}

@ -5,7 +5,7 @@ use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use super::worker;
use crate::utils::abort_on_panic;
@ -174,9 +174,7 @@ impl<T: Send + 'static> LocalKey<T> {
fn key(&self) -> usize {
#[cold]
fn init(key: &AtomicUsize) -> usize {
lazy_static! {
static ref COUNTER: Mutex<usize> = Mutex::new(1);
}
static COUNTER: Lazy<Mutex<usize>> = Lazy::new(|| Mutex::new(1));
let mut counter = COUNTER.lock().unwrap();
let prev = key.compare_and_swap(0, *counter, Ordering::AcqRel);

Loading…
Cancel
Save