use crate::av::xcoder::XCoder; use crate::av::Rational; use crate::transcoder::{Transcoder, SECONDS_PER_SEGMENT}; use async_std::channel::{bounded, Receiver, RecvError, SendError, Sender, TryRecvError}; use async_std::sync::{Arc, RwLock, RwLockReadGuard}; use async_std::task; use futures::channel::oneshot::{ channel as one_shot, Receiver as OneShotReceiver, Sender as OneShotSender, }; use serde::Serialize; use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::option::Option::Some; use std::thread::Builder; use std::thread::JoinHandle; use uuid::Uuid; #[derive(Debug, Default)] struct OneShotSet(Vec>); impl OneShotSet { fn add(&mut self) -> OneShotReceiver<()> { let (s, r) = one_shot::<()>(); self.0.push(s); r } fn trigger(&mut self) { while let Some(sender) = self.0.pop() { sender.send(()).unwrap() } } fn new() -> OneShotSet { OneShotSet(vec![]) } } #[derive(Debug, Default)] pub struct TranscoderEvents { init_queue: OneShotSet, segment_queue: HashMap, } impl TranscoderEvents { pub fn new() -> TranscoderEvents { Default::default() } } #[derive(Debug)] pub struct TranscoderInstance { id: Uuid, channel: Channel, state: RwLock, command: TranscoderCommand, events: RwLock, status: RwLock, } impl TranscoderInstance { pub async fn state(&self, block: fn(RwLockReadGuard) -> T) -> T { block(self.state.read().await) } pub fn id(&self) -> Uuid { self.id } pub async fn seek(&self, segment: u32) { self.channel.send(Message::Seek(segment)).await.unwrap() } pub async fn wait_for_segment(&self, segment: u32) -> bool { { let state = self.state.read().await; if state.info.segments() < segment { return false; } if state.segments.contains(&segment) { return true; } if state.state == TranscoderState::Finished { return false; } } println!("Waiting in segment queue for segment {}", segment); let r = { self.events .write() .await .segment_queue .entry(segment) .or_insert_with(|| OneShotSet::new()) .add() }; r.await.is_ok() } pub async fn wait_for_init(&self) -> bool { if self.state.read().await.init_written { return true; } let r = { self.events.write().await.init_queue.add() }; r.await.is_ok() } pub async fn status(&self) -> TranscoderStatus { self.status.read().await.clone() } } #[derive(Serialize, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)] pub enum TranscoderState { None, Seeking, Transcoding, Failed, Stopped, Finished, } impl Default for TranscoderState { fn default() -> Self { TranscoderState::None } } #[derive(Serialize, Debug, Clone, Default)] pub struct VideoInput { pub codec: String, pub encoder: String, pub decoder: String, pub frame_rate_rat: Option<[u32; 2]>, pub frame_rate: Option, pub width: u32, pub height: u32, pub time_scale: i32, } #[derive(Serialize, Debug, Clone, Default)] pub struct AudioInput { pub codec: String, pub encoder: String, pub decoder: String, pub channels: i32, pub channel_layout: String, pub sample_rate: i32, pub time_scale: i32, } #[derive(Serialize, Debug, Clone, Default)] pub struct TranscoderStatus { pub state: TranscoderState, pub failure_reason: Option, pub segments: Vec, pub init_written: bool, pub duration_secs: f64, pub current_segment: u32, pub segment_count: u32, pub video: VideoInput, pub audio: Option, } #[derive(Debug, Default)] pub struct TranscoderInstanceState { thread: Option>, info: TranscoderInfo, segments: HashSet, init_written: bool, state: TranscoderState, failure_reason: Option, } impl TranscoderInstanceState { #[inline] pub fn info(&self) -> &TranscoderInfo { &self.info } #[inline] pub fn segments(&self) -> &HashSet { &self.segments } #[inline] pub fn init_written(&self) -> bool { self.init_written } } #[derive(Debug, Default)] pub struct TranscoderManager { instances: HashMap>, } #[derive(Debug, Clone)] pub struct Channel { receiver: Receiver, sender: Sender, } impl Channel { async fn send(&self, msg: T) -> Result<(), SendError> { self.sender.send(msg).await } fn send_blocking(&self, msg: T) -> Result<(), SendError> { task::block_on(self.sender.send(msg)) } async fn recv(&self) -> Result { self.receiver.recv().await } fn recv_blocking(&self) -> Result { task::block_on(self.receiver.recv()) } fn try_recv(&self) -> Result { self.receiver.try_recv() } } fn new_channel() -> (Channel, Channel) { let (sender_a, receiver_a) = bounded(25); let (sender_b, receiver_b) = bounded(25); ( Channel { sender: sender_a, receiver: receiver_b, }, Channel { sender: sender_b, receiver: receiver_a, }, ) } #[derive(Debug, Clone)] pub struct TranscoderCommand { input: String, output: String, audio: Option, video: Option, } #[derive(Debug, Clone, Default)] pub struct TranscoderInfo { pub duration_secs: f64, pub audio: Option, pub video: VideoInput, } impl TranscoderInfo { fn segments(&self) -> u32 { (self.duration_secs / SECONDS_PER_SEGMENT as f64).ceil() as u32 } } #[derive(Debug)] pub enum Message { Info(TranscoderInfo), SegmentReady(u32), Seek(u32), Seeking, Failure(String), Stop, Stopped, Finished(f64, u32), } fn transcoder_thread_main(command: TranscoderCommand, channel: Channel) { if let Err(err) = transcoder_thread(command, &channel) { channel.send_blocking(Message::Failure(err)).unwrap(); return; } } fn transcoder_thread(command: TranscoderCommand, channel: &Channel) -> Result<(), String> { let transcoder = Transcoder::create( &command.input, &command.output, command.video, command.audio, ); let mut transcoder = match transcoder { Ok(t) => t, Err(e) => { channel.send_blocking(Message::Failure(e)).unwrap(); return Ok(()); } }; let mut segments = HashSet::new(); let mut video_segments = HashSet::new(); let mut video_last_segment = None; let mut audio_segments = HashSet::new(); let mut audio_last_segment = None; transcoder.open()?; channel .send_blocking(Message::Info(TranscoderInfo { duration_secs: transcoder.duration_secs(), audio: transcoder.audio().map(|audio| AudioInput { codec: audio.input_stream().codec_name(), decoder: audio.decoder().name(), encoder: audio.encoder().name(), channels: audio.decoder().channels(), channel_layout: audio.decoder().channel_layout_name(), sample_rate: audio.decoder().sample_rate(), time_scale: audio.output_stream().time_base().den(), }), video: VideoInput { codec: transcoder.video_stream().codec_name(), encoder: transcoder.video_encoder().name(), decoder: transcoder.video_decoder().name(), frame_rate_rat: transcoder .avg_frame_rate() .map(|x| [x.den() as u32, x.num() as u32]), frame_rate: transcoder.avg_frame_rate().map(|x| x.as_f64()), width: transcoder.video_decoder().width() as u32, height: transcoder.video_decoder().height() as u32, time_scale: transcoder.video_stream_output().time_base().den(), }, })) .unwrap(); let mut completed = false; let has_audio = transcoder.has_audio(); while !completed { if transcoder.transcode()? { while let Ok(msg) = channel.try_recv() { match msg { Message::Seek(segment) => { channel.send_blocking(Message::Seeking).unwrap(); let stream = Some(transcoder.video_stream().index()); transcoder.seek(segment, stream)?; } Message::Stop => { transcoder.stop(); channel.send_blocking(Message::Stopped).unwrap(); return Ok(()); } _ => {} }; } if video_last_segment != transcoder.last_written_segment() { if let Some(segment) = transcoder.last_written_segment() { video_segments.insert(segment); if !has_audio || audio_segments.contains(&segment) { channel .send_blocking(Message::SegmentReady(segment)) .unwrap(); segments.insert(segment); } } video_last_segment = transcoder.last_written_segment(); } if audio_last_segment != transcoder.last_written_audio_segment() { if let Some(segment) = transcoder.last_written_audio_segment() { audio_segments.insert(segment); if video_segments.contains(&segment) { channel .send_blocking(Message::SegmentReady(segment)) .unwrap(); segments.insert(segment); } } audio_last_segment = transcoder.last_written_audio_segment(); } } else { println!("EOF, seeking to missed parts"); let mut si: u32 = 0; while si <= transcoder.latest_written_segment().unwrap() as u32 { if segments.contains(&si) { si += 1; continue; } println!("Seeking to segment {}", si); transcoder.seek(si, Some(transcoder.video_stream().index()))?; break; } if si > transcoder.latest_written_segment().unwrap() { completed = true; } } } transcoder.finish()?; channel .send_blocking(Message::SegmentReady(transcoder.segment())) .unwrap(); let duration = transcoder.latest_frame_secs(); let segment_count = transcoder.segment(); transcoder.stop(); channel .send_blocking(Message::Finished(duration, segment_count)) .unwrap(); Ok(()) } impl TranscoderManager { pub fn start( &mut self, input: String, audio: Option, video: Option, ) -> Result { let uuid = Uuid::new_v4(); let command = TranscoderCommand { input: input.clone(), output: format!("/tmp/transotf/{}/", uuid), audio, video, }; let (thread_channel, own_channel) = new_channel(); let thread = { let command = command.clone(); Builder::new() .name(format!("t#{}", uuid)) .spawn(|| transcoder_thread_main(command, thread_channel))? }; let instance = TranscoderInstance { id: uuid, command, state: RwLock::new({ let mut it = TranscoderInstanceState::default(); it.thread = Some(thread); it }), channel: own_channel, events: RwLock::new(TranscoderEvents::new()), status: Default::default(), }; let instance = Arc::new(instance); self.instances.insert(uuid, instance.clone()); task::spawn(async move { loop { let fut = instance.channel.recv(); if let Ok(x) = fut.await { println!("[{}] {:?}", uuid, &x); match x { Message::Finished(duration, segment_count) => { { let mut status = instance.status.write().await; status.state = TranscoderState::Finished; status.segment_count = segment_count; status.duration_secs = duration; } { let mut state = instance.state.write().await; if let Some(join_handle) = std::mem::replace(&mut state.thread, None) { join_handle.join().unwrap(); } } break; } Message::Stopped => { let mut status = instance.status.write().await; status.state = TranscoderState::Stopped } Message::Failure(error) => { let mut status = instance.status.write().await; status.state = TranscoderState::Failed; status.failure_reason = Some(error); } Message::Info(info) => { { let mut state = instance.state.write().await; state.info = info.clone() } { let mut status = instance.status.write().await; status.audio = info.audio; status.video = info.video; status.segment_count = (info.duration_secs / SECONDS_PER_SEGMENT as f64).ceil() as u32; status.duration_secs = info.duration_secs } } Message::Seeking => { let mut status = instance.status.write().await; status.state = TranscoderState::Seeking } Message::SegmentReady(segment) => { let mut state = instance.state.write().await; if segment == 0 { state.init_written = true; instance.events.write().await.init_queue.trigger() } let current_segment = min(segment + 1, state.info.segments() - 1); { let mut status = instance.status.write().await; status.init_written = true; status.current_segment = current_segment; status.state = TranscoderState::Transcoding; if state.segments.insert(segment) { status.segments.push(segment); } } instance .events .write() .await .segment_queue .get_mut(&segment) .map(|x| x.trigger()); let mut skip_ahead = 1 + segment; while state.info.segments() > skip_ahead && state.segments.contains(&skip_ahead) { skip_ahead += 1; } if skip_ahead > (segment + 5) { instance.seek(skip_ahead).await; } } _ => {} } } else { return; } } }); Ok(uuid) } pub fn get(&self, id: Uuid) -> Option> { self.instances.get(&id).cloned() } }