You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
transotf/src/transcoder_manager.rs

257 lines
6.6 KiB
Rust

use crate::transcoder::Transcoder;
use async_std::sync::{Arc, Receiver, RecvError, RwLock, RwLockReadGuard, Sender, TryRecvError};
use async_std::task;
use std::collections::{HashMap, HashSet};
use std::option::Option::Some;
use std::thread::Builder;
use std::thread::JoinHandle;
use uuid::Uuid;
#[derive(Debug)]
pub struct TranscoderInstance {
id: Uuid,
channel: Channel<Message>,
state: RwLock<TranscoderInstanceState>,
command: TranscoderCommand,
}
impl TranscoderInstance {
pub async fn state<T>(&self, block: fn(RwLockReadGuard<TranscoderInstanceState>) -> T) -> T {
block(self.state.read().await)
}
}
#[derive(Debug, Default)]
pub struct TranscoderInstanceState {
thread: Option<JoinHandle<()>>,
duration_secs: f64,
segments: HashSet<u32>,
init_written: bool,
}
impl TranscoderInstanceState {
#[inline]
pub fn duration(&self) -> f64 {
self.duration_secs
}
#[inline]
pub fn segments(&self) -> &HashSet<u32> {
&self.segments
}
#[inline]
pub fn init_written(&self) -> bool {
self.init_written
}
}
#[derive(Debug, Default)]
pub struct TranscoderManager {
instances: HashMap<Uuid, Arc<TranscoderInstance>>,
}
#[derive(Debug, Clone)]
pub struct Channel<T> {
receiver: Receiver<T>,
sender: Sender<T>,
}
impl<T> Channel<T> {
async fn send(&self, msg: T) {
self.sender.send(msg).await
}
fn send_blocking(&self, msg: T) {
task::block_on(self.sender.send(msg))
}
async fn recv(&self) -> Result<T, RecvError> {
self.receiver.recv().await
}
fn recv_blocking(&self) -> Result<T, RecvError> {
task::block_on(self.receiver.recv())
}
fn try_recv(&self) -> Result<T, TryRecvError> {
self.receiver.try_recv()
}
}
fn new_channel() -> (Channel<Message>, Channel<Message>) {
let (sender_a, receiver_a) = async_std::sync::channel(25);
let (sender_b, receiver_b) = async_std::sync::channel(25);
(
Channel {
sender: sender_a,
receiver: receiver_b,
},
Channel {
sender: sender_b,
receiver: receiver_a,
},
)
}
#[derive(Debug, Clone)]
pub struct TranscoderCommand {
input: String,
output: String,
audio: Option<i32>,
video: Option<i32>,
}
#[derive(Debug)]
pub enum Message {
Info { duration_secs: f64 },
SegmentReady(u32),
Seek(u32),
Failure(String),
Stop,
Finished,
}
fn transcoder_thread_main(command: TranscoderCommand, channel: Channel<Message>) {
if let Err(err) = transcoder_thread(command, &channel) {
channel.send_blocking(Message::Failure(err));
return;
}
}
fn transcoder_thread(command: TranscoderCommand, channel: &Channel<Message>) -> Result<(), String> {
let transcoder = Transcoder::create(
&command.input,
&command.output,
command.video,
command.audio,
);
let mut transcoder = match transcoder {
Ok(t) => t,
Err(e) => {
channel.send_blocking(Message::Failure(e));
return Ok(());
}
};
transcoder.open()?;
channel.send_blocking(Message::Info {
duration_secs: transcoder.duration_secs(),
});
let mut last_segment = transcoder.segment();
while transcoder.transcode()? {
while let Ok(msg) = channel.try_recv() {
match msg {
Message::Seek(segment) => {
transcoder.seek(segment, Some(transcoder.video_stream()))?;
}
Message::Stop => {
transcoder.stop();
return Ok(());
}
_ => {}
};
}
if last_segment < transcoder.segment() {
channel.send_blocking(Message::SegmentReady(last_segment));
last_segment = transcoder.segment();
}
}
transcoder.finish()?;
channel.send_blocking(Message::SegmentReady(transcoder.segment()));
transcoder.stop();
channel.send_blocking(Message::Finished);
Ok(())
}
impl TranscoderManager {
pub fn start(
&mut self,
input: String,
audio: Option<i32>,
video: Option<i32>,
) -> Result<Uuid, std::io::Error> {
let uuid = Uuid::new_v4();
let command = TranscoderCommand {
input: input.clone(),
output: format!("/tmp/transotf/{}/", uuid),
audio,
video,
};
let (thread_channel, own_channel) = new_channel();
let thread = {
let command = command.clone();
Builder::new()
.name(format!("t#{}", uuid))
.spawn(|| transcoder_thread_main(command, thread_channel))?
};
let instance = TranscoderInstance {
id: uuid,
command,
state: RwLock::new(TranscoderInstanceState {
thread: Some(thread),
init_written: false,
segments: HashSet::new(),
duration_secs: 0.0,
}),
channel: own_channel,
};
let instance = Arc::new(instance);
self.instances.insert(uuid, instance.clone());
task::spawn(async move {
loop {
let fut = instance.channel.recv();
if let Ok(x) = fut.await {
match x {
Message::Finished => {
let mut state = instance.state.write().await;
if let Some(join_handle) = std::mem::replace(&mut state.thread, None) {
join_handle.join().unwrap();
}
break;
}
Message::Info { duration_secs } => {
let mut state = instance.state.write().await;
state.duration_secs = duration_secs
}
Message::SegmentReady(segment) => {
let mut state = instance.state.write().await;
if segment == 0 {
state.init_written = true
}
state.segments.insert(segment);
}
_ => {}
}
println!("> {:?}", x);
} else {
return;
}
}
});
Ok(uuid)
}
pub fn get(&self, id: Uuid) -> Option<Arc<TranscoderInstance>> {
self.instances.get(&id).cloned()
}
}