You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
transotf/src/transcoder_manager.rs

572 lines
17 KiB
Rust

use crate::av::xcoder::XCoder;
use crate::av::Rational;
use crate::transcoder::{Transcoder, SECONDS_PER_SEGMENT};
use async_std::channel::{bounded, Receiver, RecvError, SendError, Sender, TryRecvError};
use async_std::sync::{Arc, RwLock, RwLockReadGuard};
use async_std::task;
use futures::channel::oneshot::{
channel as one_shot, Receiver as OneShotReceiver, Sender as OneShotSender,
};
use serde::Serialize;
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::option::Option::Some;
use std::thread::Builder;
use std::thread::JoinHandle;
use uuid::Uuid;
#[derive(Debug, Default)]
struct OneShotSet(Vec<OneShotSender<()>>);
impl OneShotSet {
fn add(&mut self) -> OneShotReceiver<()> {
let (s, r) = one_shot::<()>();
self.0.push(s);
r
}
fn trigger(&mut self) {
while let Some(sender) = self.0.pop() {
sender.send(()).unwrap()
}
}
fn new() -> OneShotSet {
OneShotSet(vec![])
}
}
#[derive(Debug, Default)]
pub struct TranscoderEvents {
init_queue: OneShotSet,
segment_queue: HashMap<u32, OneShotSet>,
}
impl TranscoderEvents {
pub fn new() -> TranscoderEvents {
Default::default()
}
}
#[derive(Debug)]
pub struct TranscoderInstance {
id: Uuid,
channel: Channel<Message>,
state: RwLock<TranscoderInstanceState>,
command: TranscoderCommand,
events: RwLock<TranscoderEvents>,
status: RwLock<TranscoderStatus>,
}
impl TranscoderInstance {
pub async fn state<T>(&self, block: fn(RwLockReadGuard<TranscoderInstanceState>) -> T) -> T {
block(self.state.read().await)
}
pub fn id(&self) -> Uuid {
self.id
}
pub async fn seek(&self, segment: u32) {
self.channel.send(Message::Seek(segment)).await.unwrap()
}
pub async fn wait_for_segment(&self, segment: u32) -> bool {
{
let state = self.state.read().await;
if state.info.segments() < segment {
return false;
}
if state.segments.contains(&segment) {
return true;
}
if state.state == TranscoderState::Finished {
return false;
}
}
println!("Waiting in segment queue for segment {}", segment);
let r = {
self.events
.write()
.await
.segment_queue
.entry(segment)
.or_insert_with(|| OneShotSet::new())
.add()
};
r.await.is_ok()
}
pub async fn wait_for_init(&self) -> bool {
if self.state.read().await.init_written {
return true;
}
let r = { self.events.write().await.init_queue.add() };
r.await.is_ok()
}
pub async fn status(&self) -> TranscoderStatus {
self.status.read().await.clone()
}
}
#[derive(Serialize, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub enum TranscoderState {
None,
Seeking,
Transcoding,
Failed,
Stopped,
Finished,
}
impl Default for TranscoderState {
fn default() -> Self {
TranscoderState::None
}
}
#[derive(Serialize, Debug, Clone, Default)]
pub struct VideoInput {
pub codec: String,
pub encoder: String,
pub decoder: String,
pub frame_rate_rat: Option<[u32; 2]>,
pub frame_rate: Option<f64>,
pub width: u32,
pub height: u32,
pub time_scale: i32,
}
#[derive(Serialize, Debug, Clone, Default)]
pub struct AudioInput {
pub codec: String,
pub encoder: String,
pub decoder: String,
pub channels: i32,
pub channel_layout: String,
pub sample_rate: i32,
pub time_scale: i32,
}
#[derive(Serialize, Debug, Clone, Default)]
pub struct TranscoderStatus {
pub state: TranscoderState,
pub failure_reason: Option<String>,
pub segments: Vec<u32>,
pub init_written: bool,
pub duration_secs: f64,
pub current_segment: u32,
pub segment_count: u32,
pub video: VideoInput,
pub audio: Option<AudioInput>,
}
#[derive(Debug, Default)]
pub struct TranscoderInstanceState {
thread: Option<JoinHandle<()>>,
info: TranscoderInfo,
segments: HashSet<u32>,
init_written: bool,
state: TranscoderState,
failure_reason: Option<String>,
}
impl TranscoderInstanceState {
#[inline]
pub fn info(&self) -> &TranscoderInfo {
&self.info
}
#[inline]
pub fn segments(&self) -> &HashSet<u32> {
&self.segments
}
#[inline]
pub fn init_written(&self) -> bool {
self.init_written
}
}
#[derive(Debug, Default)]
pub struct TranscoderManager {
instances: HashMap<Uuid, Arc<TranscoderInstance>>,
}
#[derive(Debug, Clone)]
pub struct Channel<T> {
receiver: Receiver<T>,
sender: Sender<T>,
}
impl<T> Channel<T> {
async fn send(&self, msg: T) -> Result<(), SendError<T>> {
self.sender.send(msg).await
}
fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
task::block_on(self.sender.send(msg))
}
async fn recv(&self) -> Result<T, RecvError> {
self.receiver.recv().await
}
fn recv_blocking(&self) -> Result<T, RecvError> {
task::block_on(self.receiver.recv())
}
fn try_recv(&self) -> Result<T, TryRecvError> {
self.receiver.try_recv()
}
}
fn new_channel() -> (Channel<Message>, Channel<Message>) {
let (sender_a, receiver_a) = bounded(25);
let (sender_b, receiver_b) = bounded(25);
(
Channel {
sender: sender_a,
receiver: receiver_b,
},
Channel {
sender: sender_b,
receiver: receiver_a,
},
)
}
#[derive(Debug, Clone)]
pub struct TranscoderCommand {
input: String,
output: String,
audio: Option<i32>,
video: Option<i32>,
}
#[derive(Debug, Clone, Default)]
pub struct TranscoderInfo {
pub duration_secs: f64,
pub audio: Option<AudioInput>,
pub video: VideoInput,
}
impl TranscoderInfo {
fn segments(&self) -> u32 {
(self.duration_secs / SECONDS_PER_SEGMENT as f64).ceil() as u32
}
}
#[derive(Debug)]
pub enum Message {
Info(TranscoderInfo),
SegmentReady(u32),
Seek(u32),
Seeking,
Failure(String),
Stop,
Stopped,
Finished(f64, u32),
}
fn transcoder_thread_main(command: TranscoderCommand, channel: Channel<Message>) {
if let Err(err) = transcoder_thread(command, &channel) {
channel.send_blocking(Message::Failure(err)).unwrap();
return;
}
}
fn transcoder_thread(command: TranscoderCommand, channel: &Channel<Message>) -> Result<(), String> {
let transcoder = Transcoder::create(
&command.input,
&command.output,
command.video,
command.audio,
);
let mut transcoder = match transcoder {
Ok(t) => t,
Err(e) => {
channel.send_blocking(Message::Failure(e)).unwrap();
return Ok(());
}
};
let mut segments = HashSet::new();
let mut video_segments = HashSet::new();
let mut video_last_segment = None;
let mut audio_segments = HashSet::new();
let mut audio_last_segment = None;
transcoder.open()?;
channel
.send_blocking(Message::Info(TranscoderInfo {
duration_secs: transcoder.duration_secs(),
audio: transcoder.audio().map(|audio| AudioInput {
codec: audio.input_stream().codec_name(),
decoder: audio.decoder().name(),
encoder: audio.encoder().name(),
channels: audio.decoder().channels(),
channel_layout: audio.decoder().channel_layout_name(),
sample_rate: audio.decoder().sample_rate(),
time_scale: audio.output_stream().time_base().den(),
}),
video: VideoInput {
codec: transcoder.video_stream().codec_name(),
encoder: transcoder.video_encoder().name(),
decoder: transcoder.video_decoder().name(),
frame_rate_rat: transcoder
.avg_frame_rate()
.map(|x| [x.den() as u32, x.num() as u32]),
frame_rate: transcoder.avg_frame_rate().map(|x| x.as_f64()),
width: transcoder.video_decoder().width() as u32,
height: transcoder.video_decoder().height() as u32,
time_scale: transcoder.video_stream_output().time_base().den(),
},
}))
.unwrap();
let mut completed = false;
let has_audio = transcoder.has_audio();
while !completed {
if transcoder.transcode()? {
while let Ok(msg) = channel.try_recv() {
match msg {
Message::Seek(segment) => {
channel.send_blocking(Message::Seeking).unwrap();
let stream = Some(transcoder.video_stream().index());
transcoder.seek(segment, stream)?;
}
Message::Stop => {
transcoder.stop();
channel.send_blocking(Message::Stopped).unwrap();
return Ok(());
}
_ => {}
};
}
if video_last_segment != transcoder.last_written_segment() {
if let Some(segment) = transcoder.last_written_segment() {
video_segments.insert(segment);
if !has_audio || audio_segments.contains(&segment) {
channel
.send_blocking(Message::SegmentReady(segment))
.unwrap();
segments.insert(segment);
}
}
video_last_segment = transcoder.last_written_segment();
}
if audio_last_segment != transcoder.last_written_audio_segment() {
if let Some(segment) = transcoder.last_written_audio_segment() {
audio_segments.insert(segment);
if video_segments.contains(&segment) {
channel
.send_blocking(Message::SegmentReady(segment))
.unwrap();
segments.insert(segment);
}
}
audio_last_segment = transcoder.last_written_audio_segment();
}
} else {
println!("EOF, seeking to missed parts");
let mut si: u32 = 0;
while si <= transcoder.latest_written_segment().unwrap() as u32 {
if segments.contains(&si) {
si += 1;
continue;
}
println!("Seeking to segment {}", si);
transcoder.seek(si, Some(transcoder.video_stream().index()))?;
break;
}
if si > transcoder.latest_written_segment().unwrap() {
completed = true;
}
}
}
transcoder.finish()?;
channel
.send_blocking(Message::SegmentReady(transcoder.segment()))
.unwrap();
let duration = transcoder.latest_frame_secs();
let segment_count = transcoder.segment();
transcoder.stop();
channel
.send_blocking(Message::Finished(duration, segment_count))
.unwrap();
Ok(())
}
impl TranscoderManager {
pub fn start(
&mut self,
input: String,
audio: Option<i32>,
video: Option<i32>,
) -> Result<Uuid, std::io::Error> {
let uuid = Uuid::new_v4();
let command = TranscoderCommand {
input: input.clone(),
output: format!("/tmp/transotf/{}/", uuid),
audio,
video,
};
let (thread_channel, own_channel) = new_channel();
let thread = {
let command = command.clone();
Builder::new()
.name(format!("t#{}", uuid))
.spawn(|| transcoder_thread_main(command, thread_channel))?
};
let instance = TranscoderInstance {
id: uuid,
command,
state: RwLock::new({
let mut it = TranscoderInstanceState::default();
it.thread = Some(thread);
it
}),
channel: own_channel,
events: RwLock::new(TranscoderEvents::new()),
status: Default::default(),
};
let instance = Arc::new(instance);
self.instances.insert(uuid, instance.clone());
task::spawn(async move {
loop {
let fut = instance.channel.recv();
if let Ok(x) = fut.await {
println!("[{}] {:?}", uuid, &x);
match x {
Message::Finished(duration, segment_count) => {
{
let mut status = instance.status.write().await;
status.state = TranscoderState::Finished;
status.segment_count = segment_count;
status.duration_secs = duration;
}
{
let mut state = instance.state.write().await;
if let Some(join_handle) =
std::mem::replace(&mut state.thread, None)
{
join_handle.join().unwrap();
}
}
break;
}
Message::Stopped => {
let mut status = instance.status.write().await;
status.state = TranscoderState::Stopped
}
Message::Failure(error) => {
let mut status = instance.status.write().await;
status.state = TranscoderState::Failed;
status.failure_reason = Some(error);
}
Message::Info(info) => {
{
let mut state = instance.state.write().await;
state.info = info.clone()
}
{
let mut status = instance.status.write().await;
status.audio = info.audio;
status.video = info.video;
status.segment_count =
(info.duration_secs / SECONDS_PER_SEGMENT as f64).ceil() as u32;
status.duration_secs = info.duration_secs
}
}
Message::Seeking => {
let mut status = instance.status.write().await;
status.state = TranscoderState::Seeking
}
Message::SegmentReady(segment) => {
let mut state = instance.state.write().await;
if segment == 0 {
state.init_written = true;
instance.events.write().await.init_queue.trigger()
}
let current_segment = min(segment + 1, state.info.segments() - 1);
{
let mut status = instance.status.write().await;
status.init_written = true;
status.current_segment = current_segment;
status.state = TranscoderState::Transcoding;
if state.segments.insert(segment) {
status.segments.push(segment);
}
}
instance
.events
.write()
.await
.segment_queue
.get_mut(&segment)
.map(|x| x.trigger());
let mut skip_ahead = 1 + segment;
while state.info.segments() > skip_ahead
&& state.segments.contains(&skip_ahead)
{
skip_ahead += 1;
}
if skip_ahead > (segment + 5) {
instance.seek(skip_ahead).await;
}
}
_ => {}
}
} else {
return;
}
}
});
Ok(uuid)
}
pub fn get(&self, id: Uuid) -> Option<Arc<TranscoderInstance>> {
self.instances.get(&id).cloned()
}
}