|
|
|
use crate::av::xcoder::XCoder;
|
|
|
|
use crate::av::Rational;
|
|
|
|
use crate::transcoder::{Transcoder, SECONDS_PER_SEGMENT};
|
|
|
|
use async_std::channel::{bounded, Receiver, RecvError, SendError, Sender, TryRecvError};
|
|
|
|
use async_std::sync::{Arc, RwLock, RwLockReadGuard};
|
|
|
|
use async_std::task;
|
|
|
|
use futures::channel::oneshot::{
|
|
|
|
channel as one_shot, Receiver as OneShotReceiver, Sender as OneShotSender,
|
|
|
|
};
|
|
|
|
use serde::Serialize;
|
|
|
|
use std::cmp::min;
|
|
|
|
use std::collections::{HashMap, HashSet};
|
|
|
|
use std::option::Option::Some;
|
|
|
|
use std::thread::Builder;
|
|
|
|
use std::thread::JoinHandle;
|
|
|
|
use uuid::Uuid;
|
|
|
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
struct OneShotSet(Vec<OneShotSender<()>>);
|
|
|
|
|
|
|
|
impl OneShotSet {
|
|
|
|
fn add(&mut self) -> OneShotReceiver<()> {
|
|
|
|
let (s, r) = one_shot::<()>();
|
|
|
|
self.0.push(s);
|
|
|
|
|
|
|
|
r
|
|
|
|
}
|
|
|
|
|
|
|
|
fn trigger(&mut self) {
|
|
|
|
while let Some(sender) = self.0.pop() {
|
|
|
|
sender.send(()).unwrap()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn new() -> OneShotSet {
|
|
|
|
OneShotSet(vec![])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
pub struct TranscoderEvents {
|
|
|
|
init_queue: OneShotSet,
|
|
|
|
segment_queue: HashMap<u32, OneShotSet>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TranscoderEvents {
|
|
|
|
pub fn new() -> TranscoderEvents {
|
|
|
|
Default::default()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub struct TranscoderInstance {
|
|
|
|
id: Uuid,
|
|
|
|
channel: Channel<Message>,
|
|
|
|
state: RwLock<TranscoderInstanceState>,
|
|
|
|
command: TranscoderCommand,
|
|
|
|
events: RwLock<TranscoderEvents>,
|
|
|
|
status: RwLock<TranscoderStatus>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TranscoderInstance {
|
|
|
|
pub async fn state<T>(&self, block: fn(RwLockReadGuard<TranscoderInstanceState>) -> T) -> T {
|
|
|
|
block(self.state.read().await)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn id(&self) -> Uuid {
|
|
|
|
self.id
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn seek(&self, segment: u32) {
|
|
|
|
self.channel.send(Message::Seek(segment)).await.unwrap()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn wait_for_segment(&self, segment: u32) -> bool {
|
|
|
|
{
|
|
|
|
let state = self.state.read().await;
|
|
|
|
if state.info.segments() < segment {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
|
|
|
if state.segments.contains(&segment) {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
if state.state == TranscoderState::Finished {
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
println!("Waiting in segment queue for segment {}", segment);
|
|
|
|
let r = {
|
|
|
|
self.events
|
|
|
|
.write()
|
|
|
|
.await
|
|
|
|
.segment_queue
|
|
|
|
.entry(segment)
|
|
|
|
.or_insert_with(|| OneShotSet::new())
|
|
|
|
.add()
|
|
|
|
};
|
|
|
|
|
|
|
|
r.await.is_ok()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn wait_for_init(&self) -> bool {
|
|
|
|
if self.state.read().await.init_written {
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
let r = { self.events.write().await.init_queue.add() };
|
|
|
|
|
|
|
|
r.await.is_ok()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn status(&self) -> TranscoderStatus {
|
|
|
|
self.status.read().await.clone()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Serialize, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
|
|
|
|
pub enum TranscoderState {
|
|
|
|
None,
|
|
|
|
Seeking,
|
|
|
|
Transcoding,
|
|
|
|
Failed,
|
|
|
|
Stopped,
|
|
|
|
Finished,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for TranscoderState {
|
|
|
|
fn default() -> Self {
|
|
|
|
TranscoderState::None
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Serialize, Debug, Clone, Default)]
|
|
|
|
pub struct VideoInput {
|
|
|
|
pub codec: String,
|
|
|
|
pub encoder: String,
|
|
|
|
pub decoder: String,
|
|
|
|
pub frame_rate_rat: Option<[u32; 2]>,
|
|
|
|
pub frame_rate: Option<f64>,
|
|
|
|
pub width: u32,
|
|
|
|
pub height: u32,
|
|
|
|
pub time_scale: i32,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Serialize, Debug, Clone, Default)]
|
|
|
|
pub struct AudioInput {
|
|
|
|
pub codec: String,
|
|
|
|
pub encoder: String,
|
|
|
|
pub decoder: String,
|
|
|
|
pub channels: i32,
|
|
|
|
pub channel_layout: String,
|
|
|
|
pub sample_rate: i32,
|
|
|
|
pub time_scale: i32,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Serialize, Debug, Clone, Default)]
|
|
|
|
pub struct TranscoderStatus {
|
|
|
|
pub state: TranscoderState,
|
|
|
|
pub failure_reason: Option<String>,
|
|
|
|
pub segments: Vec<u32>,
|
|
|
|
pub init_written: bool,
|
|
|
|
pub duration_secs: f64,
|
|
|
|
pub current_segment: u32,
|
|
|
|
pub segment_count: u32,
|
|
|
|
pub video: VideoInput,
|
|
|
|
pub audio: Option<AudioInput>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
pub struct TranscoderInstanceState {
|
|
|
|
thread: Option<JoinHandle<()>>,
|
|
|
|
info: TranscoderInfo,
|
|
|
|
segments: HashSet<u32>,
|
|
|
|
init_written: bool,
|
|
|
|
state: TranscoderState,
|
|
|
|
failure_reason: Option<String>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TranscoderInstanceState {
|
|
|
|
#[inline]
|
|
|
|
pub fn info(&self) -> &TranscoderInfo {
|
|
|
|
&self.info
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
pub fn segments(&self) -> &HashSet<u32> {
|
|
|
|
&self.segments
|
|
|
|
}
|
|
|
|
|
|
|
|
#[inline]
|
|
|
|
pub fn init_written(&self) -> bool {
|
|
|
|
self.init_written
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Default)]
|
|
|
|
pub struct TranscoderManager {
|
|
|
|
instances: HashMap<Uuid, Arc<TranscoderInstance>>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct Channel<T> {
|
|
|
|
receiver: Receiver<T>,
|
|
|
|
sender: Sender<T>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<T> Channel<T> {
|
|
|
|
async fn send(&self, msg: T) -> Result<(), SendError<T>> {
|
|
|
|
self.sender.send(msg).await
|
|
|
|
}
|
|
|
|
|
|
|
|
fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
|
|
|
|
task::block_on(self.sender.send(msg))
|
|
|
|
}
|
|
|
|
|
|
|
|
async fn recv(&self) -> Result<T, RecvError> {
|
|
|
|
self.receiver.recv().await
|
|
|
|
}
|
|
|
|
|
|
|
|
fn recv_blocking(&self) -> Result<T, RecvError> {
|
|
|
|
task::block_on(self.receiver.recv())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn try_recv(&self) -> Result<T, TryRecvError> {
|
|
|
|
self.receiver.try_recv()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn new_channel() -> (Channel<Message>, Channel<Message>) {
|
|
|
|
let (sender_a, receiver_a) = bounded(25);
|
|
|
|
let (sender_b, receiver_b) = bounded(25);
|
|
|
|
|
|
|
|
(
|
|
|
|
Channel {
|
|
|
|
sender: sender_a,
|
|
|
|
receiver: receiver_b,
|
|
|
|
},
|
|
|
|
Channel {
|
|
|
|
sender: sender_b,
|
|
|
|
receiver: receiver_a,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone)]
|
|
|
|
pub struct TranscoderCommand {
|
|
|
|
input: String,
|
|
|
|
output: String,
|
|
|
|
audio: Option<i32>,
|
|
|
|
video: Option<i32>,
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Default)]
|
|
|
|
pub struct TranscoderInfo {
|
|
|
|
pub duration_secs: f64,
|
|
|
|
pub audio: Option<AudioInput>,
|
|
|
|
pub video: VideoInput,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TranscoderInfo {
|
|
|
|
fn segments(&self) -> u32 {
|
|
|
|
(self.duration_secs / SECONDS_PER_SEGMENT as f64).ceil() as u32
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum Message {
|
|
|
|
Info(TranscoderInfo),
|
|
|
|
SegmentReady(u32),
|
|
|
|
Seek(u32),
|
|
|
|
Seeking,
|
|
|
|
Failure(String),
|
|
|
|
Stop,
|
|
|
|
Stopped,
|
|
|
|
Finished(f64, u32),
|
|
|
|
}
|
|
|
|
|
|
|
|
fn transcoder_thread_main(command: TranscoderCommand, channel: Channel<Message>) {
|
|
|
|
if let Err(err) = transcoder_thread(command, &channel) {
|
|
|
|
channel.send_blocking(Message::Failure(err)).unwrap();
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn transcoder_thread(command: TranscoderCommand, channel: &Channel<Message>) -> Result<(), String> {
|
|
|
|
let transcoder = Transcoder::create(
|
|
|
|
&command.input,
|
|
|
|
&command.output,
|
|
|
|
command.video,
|
|
|
|
command.audio,
|
|
|
|
);
|
|
|
|
|
|
|
|
let mut transcoder = match transcoder {
|
|
|
|
Ok(t) => t,
|
|
|
|
Err(e) => {
|
|
|
|
channel.send_blocking(Message::Failure(e)).unwrap();
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
let mut segments = HashSet::new();
|
|
|
|
let mut video_segments = HashSet::new();
|
|
|
|
let mut video_last_segment = None;
|
|
|
|
let mut audio_segments = HashSet::new();
|
|
|
|
let mut audio_last_segment = None;
|
|
|
|
transcoder.open()?;
|
|
|
|
|
|
|
|
channel
|
|
|
|
.send_blocking(Message::Info(TranscoderInfo {
|
|
|
|
duration_secs: transcoder.duration_secs(),
|
|
|
|
audio: transcoder.audio().map(|audio| AudioInput {
|
|
|
|
codec: audio.input_stream().codec_name(),
|
|
|
|
decoder: audio.decoder().name(),
|
|
|
|
encoder: audio.encoder().name(),
|
|
|
|
channels: audio.decoder().channels(),
|
|
|
|
channel_layout: audio.decoder().channel_layout_name(),
|
|
|
|
sample_rate: audio.decoder().sample_rate(),
|
|
|
|
time_scale: audio.output_stream().time_base().den(),
|
|
|
|
}),
|
|
|
|
video: VideoInput {
|
|
|
|
codec: transcoder.video_stream().codec_name(),
|
|
|
|
encoder: transcoder.video_encoder().name(),
|
|
|
|
decoder: transcoder.video_decoder().name(),
|
|
|
|
frame_rate_rat: transcoder
|
|
|
|
.avg_frame_rate()
|
|
|
|
.map(|x| [x.den() as u32, x.num() as u32]),
|
|
|
|
frame_rate: transcoder.avg_frame_rate().map(|x| x.as_f64()),
|
|
|
|
width: transcoder.video_decoder().width() as u32,
|
|
|
|
height: transcoder.video_decoder().height() as u32,
|
|
|
|
time_scale: transcoder.video_stream_output().time_base().den(),
|
|
|
|
},
|
|
|
|
}))
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let mut completed = false;
|
|
|
|
let has_audio = transcoder.has_audio();
|
|
|
|
|
|
|
|
while !completed {
|
|
|
|
if transcoder.transcode()? {
|
|
|
|
while let Ok(msg) = channel.try_recv() {
|
|
|
|
match msg {
|
|
|
|
Message::Seek(segment) => {
|
|
|
|
channel.send_blocking(Message::Seeking).unwrap();
|
|
|
|
let stream = Some(transcoder.video_stream().index());
|
|
|
|
transcoder.seek(segment, stream)?;
|
|
|
|
}
|
|
|
|
Message::Stop => {
|
|
|
|
transcoder.stop();
|
|
|
|
channel.send_blocking(Message::Stopped).unwrap();
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
|
|
|
_ => {}
|
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
if video_last_segment != transcoder.last_written_segment() {
|
|
|
|
if let Some(segment) = transcoder.last_written_segment() {
|
|
|
|
video_segments.insert(segment);
|
|
|
|
if !has_audio || audio_segments.contains(&segment) {
|
|
|
|
channel
|
|
|
|
.send_blocking(Message::SegmentReady(segment))
|
|
|
|
.unwrap();
|
|
|
|
segments.insert(segment);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
video_last_segment = transcoder.last_written_segment();
|
|
|
|
}
|
|
|
|
|
|
|
|
if audio_last_segment != transcoder.last_written_audio_segment() {
|
|
|
|
if let Some(segment) = transcoder.last_written_audio_segment() {
|
|
|
|
audio_segments.insert(segment);
|
|
|
|
if video_segments.contains(&segment) {
|
|
|
|
channel
|
|
|
|
.send_blocking(Message::SegmentReady(segment))
|
|
|
|
.unwrap();
|
|
|
|
segments.insert(segment);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
audio_last_segment = transcoder.last_written_audio_segment();
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
println!("EOF, seeking to missed parts");
|
|
|
|
let mut si: u32 = 0;
|
|
|
|
while si <= transcoder.latest_written_segment().unwrap() as u32 {
|
|
|
|
if segments.contains(&si) {
|
|
|
|
si += 1;
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
|
|
|
|
println!("Seeking to segment {}", si);
|
|
|
|
transcoder.seek(si, Some(transcoder.video_stream().index()))?;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
if si > transcoder.latest_written_segment().unwrap() {
|
|
|
|
completed = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
transcoder.finish()?;
|
|
|
|
channel
|
|
|
|
.send_blocking(Message::SegmentReady(transcoder.segment()))
|
|
|
|
.unwrap();
|
|
|
|
let duration = transcoder.latest_frame_secs();
|
|
|
|
let segment_count = transcoder.segment();
|
|
|
|
transcoder.stop();
|
|
|
|
channel
|
|
|
|
.send_blocking(Message::Finished(duration, segment_count))
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
impl TranscoderManager {
|
|
|
|
pub fn start(
|
|
|
|
&mut self,
|
|
|
|
input: String,
|
|
|
|
audio: Option<i32>,
|
|
|
|
video: Option<i32>,
|
|
|
|
) -> Result<Uuid, std::io::Error> {
|
|
|
|
let uuid = Uuid::new_v4();
|
|
|
|
let command = TranscoderCommand {
|
|
|
|
input: input.clone(),
|
|
|
|
output: format!("/tmp/transotf/{}/", uuid),
|
|
|
|
audio,
|
|
|
|
video,
|
|
|
|
};
|
|
|
|
let (thread_channel, own_channel) = new_channel();
|
|
|
|
let thread = {
|
|
|
|
let command = command.clone();
|
|
|
|
Builder::new()
|
|
|
|
.name(format!("t#{}", uuid))
|
|
|
|
.spawn(|| transcoder_thread_main(command, thread_channel))?
|
|
|
|
};
|
|
|
|
|
|
|
|
let instance = TranscoderInstance {
|
|
|
|
id: uuid,
|
|
|
|
command,
|
|
|
|
state: RwLock::new({
|
|
|
|
let mut it = TranscoderInstanceState::default();
|
|
|
|
it.thread = Some(thread);
|
|
|
|
it
|
|
|
|
}),
|
|
|
|
channel: own_channel,
|
|
|
|
events: RwLock::new(TranscoderEvents::new()),
|
|
|
|
status: Default::default(),
|
|
|
|
};
|
|
|
|
|
|
|
|
let instance = Arc::new(instance);
|
|
|
|
self.instances.insert(uuid, instance.clone());
|
|
|
|
|
|
|
|
task::spawn(async move {
|
|
|
|
loop {
|
|
|
|
let fut = instance.channel.recv();
|
|
|
|
if let Ok(x) = fut.await {
|
|
|
|
println!("[{}] {:?}", uuid, &x);
|
|
|
|
match x {
|
|
|
|
Message::Finished(duration, segment_count) => {
|
|
|
|
{
|
|
|
|
let mut status = instance.status.write().await;
|
|
|
|
status.state = TranscoderState::Finished;
|
|
|
|
status.segment_count = segment_count;
|
|
|
|
status.duration_secs = duration;
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut state = instance.state.write().await;
|
|
|
|
if let Some(join_handle) =
|
|
|
|
std::mem::replace(&mut state.thread, None)
|
|
|
|
{
|
|
|
|
join_handle.join().unwrap();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
|
|
|
|
Message::Stopped => {
|
|
|
|
let mut status = instance.status.write().await;
|
|
|
|
status.state = TranscoderState::Stopped
|
|
|
|
}
|
|
|
|
|
|
|
|
Message::Failure(error) => {
|
|
|
|
let mut status = instance.status.write().await;
|
|
|
|
status.state = TranscoderState::Failed;
|
|
|
|
status.failure_reason = Some(error);
|
|
|
|
}
|
|
|
|
|
|
|
|
Message::Info(info) => {
|
|
|
|
{
|
|
|
|
let mut state = instance.state.write().await;
|
|
|
|
state.info = info.clone()
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut status = instance.status.write().await;
|
|
|
|
status.audio = info.audio;
|
|
|
|
status.video = info.video;
|
|
|
|
status.segment_count =
|
|
|
|
(info.duration_secs / SECONDS_PER_SEGMENT as f64).ceil() as u32;
|
|
|
|
status.duration_secs = info.duration_secs
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
Message::Seeking => {
|
|
|
|
let mut status = instance.status.write().await;
|
|
|
|
status.state = TranscoderState::Seeking
|
|
|
|
}
|
|
|
|
|
|
|
|
Message::SegmentReady(segment) => {
|
|
|
|
let mut state = instance.state.write().await;
|
|
|
|
if segment == 0 {
|
|
|
|
state.init_written = true;
|
|
|
|
instance.events.write().await.init_queue.trigger()
|
|
|
|
}
|
|
|
|
|
|
|
|
let current_segment = min(segment + 1, state.info.segments() - 1);
|
|
|
|
|
|
|
|
{
|
|
|
|
let mut status = instance.status.write().await;
|
|
|
|
status.init_written = true;
|
|
|
|
status.current_segment = current_segment;
|
|
|
|
status.state = TranscoderState::Transcoding;
|
|
|
|
|
|
|
|
if state.segments.insert(segment) {
|
|
|
|
status.segments.push(segment);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
instance
|
|
|
|
.events
|
|
|
|
.write()
|
|
|
|
.await
|
|
|
|
.segment_queue
|
|
|
|
.get_mut(&segment)
|
|
|
|
.map(|x| x.trigger());
|
|
|
|
|
|
|
|
let mut skip_ahead = 1 + segment;
|
|
|
|
while state.info.segments() > skip_ahead
|
|
|
|
&& state.segments.contains(&skip_ahead)
|
|
|
|
{
|
|
|
|
skip_ahead += 1;
|
|
|
|
}
|
|
|
|
|
|
|
|
if skip_ahead > (segment + 5) {
|
|
|
|
instance.seek(skip_ahead).await;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
_ => {}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
Ok(uuid)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn get(&self, id: Uuid) -> Option<Arc<TranscoderInstance>> {
|
|
|
|
self.instances.get(&id).cloned()
|
|
|
|
}
|
|
|
|
}
|