use anyhow::Context; use polling::{Event, Poller}; use signal_hook::consts::{SIGHUP, SIGINT, SIGTERM}; use signal_hook::iterator::{Handle, Signals, SignalsInfo}; use signal_hook::low_level::signal_name; use std::collections::HashMap; use std::ffi::CStr; use std::fs; use std::fs::{read_dir, read_to_string, DirEntry}; use std::io::{Read, Write}; use std::mem::size_of; use std::os::unix::io::AsRawFd; use std::os::unix::net::{SocketAddr, UnixListener, UnixStream}; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::time::Duration; use std::{io, mem}; use vore_core::consts::{VORE_CONFIG, VORE_DIRECTORY, VORE_SOCKET}; use vore_core::rpc::{AllRequests, AllResponses, Command, CommandCenter, Response}; use vore_core::{rpc, VirtualMachineInfo}; use vore_core::{GlobalConfig, InstanceConfig, VirtualMachine}; #[derive(Debug)] struct RpcConnection { stream: UnixStream, address: SocketAddr, buffer: Vec, uid: u32, user: Option, pid: i32, } impl Write for RpcConnection { fn write(&mut self, buf: &[u8]) -> io::Result { self.stream.write(buf) } fn flush(&mut self) -> io::Result<()> { self.stream.flush() } } impl Read for RpcConnection { fn read(&mut self, buf: &mut [u8]) -> io::Result { self.stream.read(buf) } } #[allow(clippy::char_lit_as_u8)] const NEWLINE: u8 = '\n' as u8; impl RpcConnection { pub fn handle_input( &mut self, own_id: usize, ) -> Result<(bool, Vec<(usize, Command)>), anyhow::Error> { let mut still_open = true; loop { let mut buffer = vec![0u8; 4096]; match self.stream.read(&mut buffer) { Ok(amount) if amount == 0 => { still_open = false; break; } Ok(amount) => self.buffer.extend_from_slice(&buffer[..amount]), Err(err) if err.kind() == io::ErrorKind::WouldBlock => break, Err(err) => return Err(err.into()), }; } let mut buffer = mem::take(&mut self.buffer); if still_open { self.buffer = buffer.split_off( buffer .iter() .enumerate() .rev() .find(|(_, x)| **x == NEWLINE) .map(|(idx, _)| idx + 1) .unwrap_or(buffer.len()), ); } let mut commands = vec![]; for part in buffer.split(|x| *x == NEWLINE) { if part.is_empty() { continue; } let lossy = String::from_utf8_lossy(part); match CommandCenter::read_command(&lossy) { Ok(cmd) => { log::debug!("Got command: {:?}", cmd); commands.push((own_id, cmd)); } Err(err) => { log::info!("RPC Connection produced error: {}", err) } } } Ok((still_open, commands)) } } #[derive(Clone, Eq, PartialEq, Debug)] enum EventTarget { RpcListener, Machine(String), RpcConnection(usize), None, } #[derive(Debug)] pub struct Daemon { event_key_storage: Vec, global_config: GlobalConfig, machines: HashMap, connections: Vec>, rpc_listener: UnixListener, socket_path: PathBuf, poller: Poller, signals: SignalsInfo, signals_handle: Handle, queue: Vec, command_queue: Vec<(usize, Command)>, } impl Daemon { pub fn new() -> Result { log::debug!("Loading global config ({})", VORE_CONFIG); let toml = std::fs::read_to_string(VORE_CONFIG)?; let mut global_config = GlobalConfig::load(&toml)?; log::debug!("Creating vore daemon"); let signals = Signals::new(&[SIGINT, SIGHUP])?; let handle = signals.handle(); log::debug!("Bound signal handlers"); let poller = Poller::new().context("Failed to make poller")?; let socket_path = PathBuf::from_str(VORE_SOCKET)?; let rpc_listener = UnixListener::bind(&socket_path).context("Failed to bind vore socket")?; global_config.vore.chown(socket_path.to_str().unwrap())?; rpc_listener.set_nonblocking(true)?; log::debug!("Bound to {}", VORE_SOCKET); let mut daemon = Daemon { event_key_storage: vec![], global_config, machines: Default::default(), connections: vec![], rpc_listener, poller, signals, signals_handle: handle, queue: vec![], command_queue: vec![], socket_path, }; daemon.init()?; Ok(daemon) } pub fn init(&mut self) -> Result<(), anyhow::Error> { let new_key = self.add_target(EventTarget::RpcListener); self.poller .add(&self.rpc_listener, Event::readable(new_key))?; Ok(()) } pub fn load_definitions(&mut self) -> Result<(), anyhow::Error> { let vm_dir = PathBuf::from(format!("{}/definitions", VORE_DIRECTORY)); if !vm_dir.is_dir() { return Ok(()); } let dir_iter = read_dir(&vm_dir).with_context(|| format!("Failed to list {:?} for vm's", &vm_dir))?; let mut process = |entry: Result| -> anyhow::Result<()> { let entry = entry?; let file_name = entry.path(); let path = file_name.to_str().context("Entry has invalid UTF-8 path")?; if !path.ends_with(".toml") { return Ok(()); } let toml = read_to_string(path) .with_context(|| format!("Failed to read VM definition {}", path))?; self.load_virtual_machine(&toml, None, false)?; Ok(()) }; for entry in dir_iter { if let Err(err) = process(entry) { log::error!("Failed parsing entry in {:?}: {:?}", vm_dir, err); } } Ok(()) } pub fn reserve_vfio_devices(&mut self) { for machine in self.machines.values() { for vfio_device in machine.vfio_devices() { if !vfio_device.reserve { continue; } if let Err(err) = VirtualMachine::prepare_vfio_device(true, true, &vfio_device) { log::error!( "Failed to reserve PCI device {} for {}: {:?}", vfio_device.address, machine.name(), err ); } else { log::info!( "Reserved PCI device {} for {}", vfio_device.address, machine.name() ); } } } } pub fn auto_start_machines(&mut self) { for machine in self.machines.values_mut() { if !machine.should_auto_start() { continue; } if let Err(err) = machine.start() { log::error!("Failed to auto-start {}: {:?}", machine.name(), err); } else { log::info!("Autostarted {}", machine.name()); } } } pub fn run(&mut self) -> Result<(), anyhow::Error> { self.load_definitions()?; self.reserve_vfio_devices(); self.auto_start_machines(); loop { let res = self .wait() .context("Got error while waiting for new notifications"); match res { // Interrupted is uh "always" when we get a signal Err(err) if err .downcast_ref::() .map(|x| x.kind() == io::ErrorKind::Interrupted) .unwrap_or(false) => { if !self.handle_exit_code()? { break; } } err => err?, } if !self.handle_event_queue()? { break; } self.handle_command_queue()?; } // TODO: clean up log::info!("vore daemon has ended"); std::fs::remove_file(&self.socket_path).context("Failed cleaning up socket")?; Ok(()) } pub fn handle_command_queue(&mut self) -> Result<(), anyhow::Error> { while let Some((id, command)) = self.command_queue.pop() { let resp = self.handle_command(&command); if let Err(err) = &resp { log::warn!("Command {:?} failed with error: {:?}", command, err) } if let Some(conn) = self.connections[id].as_mut() { conn.write_all(CommandCenter::write_answer(&command, resp)?.as_bytes())?; } } Ok(()) } pub fn load_virtual_machine( &mut self, toml: &str, working_directory: Option, save: bool, ) -> anyhow::Result { let config = InstanceConfig::from_toml(&toml)?; if save { let save_file = format!("{}/definitions/{}.toml", VORE_DIRECTORY, config.name); let file_path = Path::new(&save_file); if let Some(parent_dir) = file_path.parent() { if !parent_dir.is_dir() { fs::create_dir_all(parent_dir)?; } } fs::write(&save_file, toml).with_context(|| { format!( "Failed to save vm definition for {} to {}", config.name, save_file ) })?; } let working_dir = working_directory .unwrap_or_else(|| format!("{}/instance/{}", VORE_DIRECTORY, config.name)); let vm = VirtualMachine::new(config, &self.global_config, working_dir); let info = vm.info(); self.mount_machine(vm); Ok(info) } pub fn handle_command(&mut self, command: &Command) -> Result { let resp = match &command.data { AllRequests::Info(_) => rpc::InfoResponse { name: "vore".to_string(), version: format!( "{}.{}.{}{}", env!("CARGO_PKG_VERSION_MAJOR"), env!("CARGO_PKG_VERSION_MINOR"), env!("CARGO_PKG_VERSION_PATCH"), option_env!("CARGO_PKG_VERSION_PRE").unwrap_or("") ), } .into_enum(), AllRequests::List(_) => rpc::ListResponse { items: self.machines.values().map(|x| x.info()).collect(), } .into_enum(), AllRequests::Load(val) => rpc::LoadResponse { info: self.load_virtual_machine( &val.toml, val.working_directory.as_ref().cloned(), val.save, )?, } .into_enum(), AllRequests::Prepare(val) => { if let Some(machine) = self.machines.get_mut(&val.name) { machine.prepare(true, false)?; } else { anyhow::bail!("No machine with the name {} exists", val.name); } rpc::PrepareResponse {}.into_enum() } AllRequests::Start(val) => { let cloned = if let Some(machine) = self.machines.get_mut(&val.name) { machine.start()?; machine.control_stream().cloned() } else { anyhow::bail!("No machine with the name {} exists", val.name); }; if let Some(cloned) = cloned { let new_id = self.add_target(EventTarget::Machine(val.name.clone())); self.poller.add(&cloned, Event::readable(new_id))?; } rpc::StartResponse {}.into_enum() } AllRequests::Stop(val) => { if let Some(machine) = self.machines.get_mut(&val.name) { machine.stop()?; } else { anyhow::bail!("No machine with the name {} exists", val.name); } rpc::StartResponse {}.into_enum() } AllRequests::Unload(_) => { anyhow::bail!("Unimplemented"); } AllRequests::Kill(val) => { if let Some(machine) = self.machines.get_mut(&val.name) { machine.quit()?; } else { anyhow::bail!("No machine with the name {} exists", val.name); } rpc::StartResponse {}.into_enum() } }; Ok(resp) } pub fn handle_exit_code(&mut self) -> Result { for signal in self.signals.pending() { log::info!( "Received signal {} ({})", signal_name(signal).unwrap_or(""), signal ); match signal { SIGINT | SIGTERM => return Ok(false), _ => {} } } Ok(true) } pub fn handle_event_queue(&mut self) -> Result { let queue = mem::take(&mut self.queue); for event in queue { let target = self.event_key_storage.get(event.key).cloned(); if let Some(item) = target { log::debug!("Handling {:?} from target {:?}", event, item); match item { EventTarget::RpcListener => { self.poller .modify(&self.rpc_listener, Event::readable(event.key))?; self.accept_rpc_connections()?; } EventTarget::Machine(name) if self.machines.contains_key(&name) => { if let Some(machine) = self.machines.get_mut(&name) { machine.boop()?; } if let Some(control_socket) = self.machines.get(&name).and_then(|x| x.control_stream()) { self.poller .modify(control_socket, Event::readable(event.key))?; } } EventTarget::RpcConnection(rpc_connection_id) if self .connections .get(rpc_connection_id) .map(Option::is_some) .unwrap_or(false) => { let (still_open, mut commands) = if let Some(rpc_connection) = &mut self.connections[rpc_connection_id] { let input_res = rpc_connection.handle_input(rpc_connection_id)?; if input_res.0 { self.poller .modify(&rpc_connection.stream, Event::readable(event.key))?; } input_res } else { (false, vec![]) }; if !still_open { log::info!("RPC connection {} closed", rpc_connection_id); self.connections[rpc_connection_id] = None; } self.command_queue.append(&mut commands) } _ => continue, } } } Ok(true) } fn accept_rpc_connections(&mut self) -> Result<(), anyhow::Error> { loop { let (stream, address) = match self.rpc_listener.accept() { Ok(value) => value, Err(err) if err.kind() == io::ErrorKind::WouldBlock => return Ok(()), Err(err) => return Err(err.into()), }; stream.set_nonblocking(true)?; let mut user: Option = None; let ucred = unsafe { let mut ucred: libc::ucred = mem::zeroed(); let mut length = size_of::() as u32; libc::getsockopt( stream.as_raw_fd(), libc::SOL_SOCKET, libc::SO_PEERCRED, (&mut ucred) as *mut _ as _, &mut length, ); let passwd = libc::getpwuid(ucred.uid); if !passwd.is_null() { user = CStr::from_ptr((*passwd).pw_name) .to_str() .ok() .map(|x| x.to_string()) } ucred }; let conn = RpcConnection { stream, address, buffer: vec![], uid: ucred.uid, user, pid: ucred.pid, }; log::info!( "Got new RPC connection from {} (pid: {}, socket: {:?})", conn.user.as_ref().map_or_else( || format!("uid:{}", conn.uid), |x| format!("{} ({})", x, conn.uid), ), conn.pid, conn.address, ); let id = self.add_rpc_connection(conn); let event_target = self.add_target(EventTarget::RpcConnection(id)); self.poller.add( &self.connections[id].as_ref().unwrap().stream, Event::readable(event_target), )?; } } pub fn wait(&mut self) -> Result<(), anyhow::Error> { self.poller .wait(&mut self.queue, Some(Duration::from_secs(5)))?; Ok(()) } fn add_target(&mut self, event_target: EventTarget) -> usize { let id = self .event_key_storage .iter() .enumerate() .find(|(_, target)| target.eq(&&EventTarget::None)); if let Some((id, _)) = id { self.event_key_storage[id] = event_target; return id; } let new_id = self.event_key_storage.len(); self.event_key_storage.push(event_target); new_id } fn add_rpc_connection(&mut self, rpc_connection: RpcConnection) -> usize { let id = self .connections .iter() .enumerate() .find(|(_, target)| target.is_none()); if let Some((id, _)) = id { self.connections[id] = Some(rpc_connection); return id; } let new_id = self.connections.len(); self.connections.push(Some(rpc_connection)); new_id } fn mount_machine(&mut self, vm: VirtualMachine) { log::info!("Loaded {}", vm.name()); let name = vm.name().to_string(); self.machines.insert(name, vm); } }