undefined progress

master
eater 3 years ago
parent 2684e58bbf
commit 2b38afb346
Signed by: eater
GPG Key ID: AD2560A0F84F0759

1458
Cargo.lock generated

File diff suppressed because it is too large Load Diff

@ -6,19 +6,31 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
num-rational = "0.3.0"
byteorder = "1.3.4"
tide = "0.11.0"
num-rational = "0.4.0"
byteorder = "1.4.3"
tide = "0.16.0"
lazy_static = "1.4.0"
async-std = { version = "1.6.2", features = ["attributes"] }
uuid = { version = "0.4", features = ["serde", "v4"] }
http-types = "2.2.1"
uuid = { version = "0.8.2", features = ["serde", "v4"] }
http-types = "2.12.0"
futures = "0.3.17"
serde = { version = "1.0.130", features = ["derive"] }
futures-timer = { version = "3.0.2", default-features = false, features = [] }
async-std = { version = "1.10.0", features = ["attributes"] }
askama = "0.10.5"
# Fix for dependencies that do not matter
[dependencies.ffmpeg-sys-next]
version = "4.3.0"
version = "4.4.0"
features = ["avcodec", "avdevice", "avfilter", "avformat", "swresample", "swscale"]
# Fix for not depending on WASM shit
#[patch.crates-io]
#async-std = { path = "/home/eater/shit/async-std" }
#[dependencies.ffmpeg-next]
#version = "4.3.0"
#default-features = false

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

@ -0,0 +1,67 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>DASH Test</title>
<link href="//vjs.zencdn.net/7.8.2/video-js.min.css" rel="stylesheet">
<script src="https://ajax.googleapis.com/ajax/libs/shaka-player/3.0.1/shaka-player.ui.js"></script>
<link rel="stylesheet" href="https://ajax.googleapis.com/ajax/libs/shaka-player/3.0.1/controls.css">
</head>
<body>
<video id="shaka" width="1280" height="720" controls></video>
<script type="application/javascript">
const manifestUri = 'manifest.mpd';
function initApp() {
// Install built-in polyfills to patch browser incompatibilities.
shaka.polyfill.installAll();
// Check to see if the browser supports the basic APIs Shaka needs.
if (shaka.Player.isBrowserSupported()) {
// Everything looks good!
initPlayer();
} else {
// This browser does not have the minimum set of APIs we need.
console.error('Browser not supported!');
}
}
async function initPlayer() {
// Create a Player instance.
const video = document.getElementById('shaka');
const player = new shaka.Player(video);
// Attach player to the window to make it easy to access in the JS console.
window.shakaPlayer = player;
// Listen for error events.
player.addEventListener('error', onErrorEvent);
// Try to load a manifest.
// This is an asynchronous process.
try {
await player.load(manifestUri);
// This runs if the asynchronous load is successful.
console.log('The video has now been loaded!');
} catch (e) {
// onError is executed if the asynchronous load fails.
onError(e);
}
}
function onErrorEvent(event) {
// Extract the shaka.util.Error object from the event.
onError(event.detail);
}
function onError(error) {
// Log the error.
console.error('Error code', error.code, 'object', error);
}
document.addEventListener('DOMContentLoaded', initApp);
</script>
</body>
</html>

@ -39,15 +39,25 @@ impl<T> Format<T> {
}
impl<T> Format<T> {
pub fn seek(&self, seconds: u32, stream: Option<&Stream>) -> Result<(), String> {
pub fn seek(
&self,
seconds: u32,
stream_index: Option<i32>,
backwards: bool,
) -> Result<(), String> {
let seconds: i32 = seconds as i32;
let time_base = stream.map(|s| s.time_base().den()).unwrap_or(AV_TIME_BASE);
let time_base = stream_index
.map(|si| self.stream(None, Some(si), None))
.unwrap_or(Ok(None))?
.map(|s| s.time_base().den())
.unwrap_or(AV_TIME_BASE);
verify_response("Failed to seek", unsafe {
av_seek_frame(
self.ctx,
stream.map(|s| s.index()).unwrap_or(-1),
stream_index.unwrap_or(-1),
(seconds * time_base) as i64,
AVSEEK_FLAG_BACKWARD,
if backwards { AVSEEK_FLAG_BACKWARD } else { 0 },
)
})?;
@ -56,7 +66,7 @@ impl<T> Format<T> {
pub fn stream(
&self,
stream_type: AVMediaType,
stream_type: Option<AVMediaType>,
index: Option<i32>,
related: Option<i32>,
) -> Result<Option<Stream>, String> {
@ -66,16 +76,20 @@ impl<T> Format<T> {
let index = if let Some(index) = index {
index
} else {
} else if let Some(stream_type) = stream_type {
verify_response(
"Failed finding best stream",
av_find_best_stream(ctx, stream_type, -1, related.unwrap_or(-1), null_mut(), 0),
)?
} else {
0
};
for _ in 0..(*ctx).nb_streams {
let curr_stream = *stream;
if (*(*curr_stream).codecpar).codec_type == stream_type {
if stream_type.is_none()
|| Some((*(*curr_stream).codecpar).codec_type) == stream_type
{
if (*curr_stream).index == index {
return Ok(Some(Stream(curr_stream)));
}
@ -131,7 +145,8 @@ impl<T> Format<T> {
pub fn write_packet_null(&self) -> Result<(), String> {
verify_response("failed to write to output", unsafe {
av_write_frame(self.as_mut_ptr(), null_mut())
})?;
})
.unwrap();
Ok(())
}
@ -147,7 +162,8 @@ impl<T> Format<T> {
pub fn write_packet(&self, packet: &Packet) -> Result<(), String> {
verify_response("failed to write to output", unsafe {
av_write_frame(self.as_mut_ptr(), packet.as_mut_ptr())
})?;
})
.unwrap();
Ok(())
}

@ -1,10 +1,9 @@
use crate::av_err2str;
use ffmpeg_sys_next::AVCodecID::AV_CODEC_ID_HEVC;
use ffmpeg_sys_next::{
av_codec_is_decoder, av_codec_is_encoder, av_codec_next, av_inv_q, av_log_set_level,
av_register_all, avcodec_alloc_context3, avcodec_find_decoder_by_name,
avcodec_find_encoder_by_name, AVCodec, AVCodecContext, AVCodecID, AVRational,
AV_CODEC_CAP_HARDWARE,
av_codec_is_decoder, av_codec_is_encoder, av_codec_next, av_inv_q, av_register_all,
avcodec_alloc_context3, avcodec_find_decoder_by_name, avcodec_find_encoder_by_name, AVCodec,
AVCodecContext, AVCodecID, AVRational, AV_CODEC_CAP_HARDWARE,
};
use num_rational::Ratio;
use std::any::type_name;
@ -214,7 +213,7 @@ fn remove_if_exists(list: &mut Vec<FfmpegCodec>, needle: &str, to_remove: Vec<&s
pub fn init() {
unsafe {
av_register_all();
av_log_set_level(-8);
// av_log_set_level(-8);
}
}
@ -271,6 +270,16 @@ pub trait Rational {
}
fn simplify(&self) -> Self;
#[inline]
fn as_f64(&self) -> f64 {
self.num() as f64 / self.den() as f64
}
#[inline]
fn as_f32(&self) -> f32 {
self.num() as f32 / self.den() as f32
}
}
impl Rational for Ratio<i32> {

@ -7,8 +7,9 @@ use crate::av::encoder_selector::EncoderSelector;
use crate::av::format::Format;
use crate::av::xcoder::XCoder;
use crate::av::{get_best_decoder, get_best_encoder, Rational};
use ffmpeg_sys_next::{av_guess_frame_rate, AVCodecID, AVRational, AVStream};
use ffmpeg_sys_next::{av_guess_frame_rate, avcodec_get_name, AVCodecID, AVRational, AVStream};
use num_rational::Ratio;
use std::ffi::CStr;
use std::ptr::null_mut;
pub struct Stream(pub *mut AVStream);
@ -63,6 +64,15 @@ impl Stream {
unsafe { (*(*self.0).codec).codec_id }
}
pub fn codec_name(&self) -> String {
unsafe {
CStr::from_ptr(avcodec_get_name(self.codec()))
.to_str()
.unwrap()
.to_string()
}
}
pub fn params(&self) -> CodecParameters {
unsafe { CodecParameters((*self.0).codecpar) }
}
@ -82,11 +92,21 @@ impl Stream {
unsafe { (*self.as_ptr()).sample_aspect_ratio }
}
#[inline]
pub fn duration(&self) -> i64 {
unsafe { (*self.as_ptr()).duration }
}
pub fn frame_count(&self) -> i64 {
unsafe { (*self.as_ptr()).nb_frames }
}
#[inline]
pub fn set_sample_aspect_ratio(&self, sample_aspect_ratio: impl Rational) {
unsafe { (*self.as_mut_ptr()).sample_aspect_ratio = sample_aspect_ratio.to_av() }
}
#[inline]
pub fn avg_frame_rate<T>(&self, fmt: &Format<T>) -> Option<impl Rational> {
Some(unsafe { av_guess_frame_rate(fmt.as_mut_ptr(), self.as_mut_ptr(), null_mut()) })
}

@ -4,7 +4,8 @@ use crate::av::{verify_response, Rational};
use crate::av_err2str;
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_NONE;
use ffmpeg_sys_next::{
av_malloc, avcodec_flush_buffers, avcodec_open2, AVCodecContext, AVCodecID, AVColorPrimaries,
av_free, av_get_channel_layout_string, av_get_pix_fmt_name, av_get_sample_fmt_name, av_malloc,
avcodec_flush_buffers, avcodec_open2, AVCodecContext, AVCodecID, AVColorPrimaries,
AVColorRange, AVColorTransferCharacteristic, AVPixelFormat, AVRational, AVSampleFormat,
AVERROR, AVERROR_EOF, EAGAIN,
};
@ -72,6 +73,16 @@ pub trait XCoder: Sized {
unsafe { (*self.as_ptr()).pix_fmt }
}
#[inline]
fn pixel_format_name(&self) -> String {
unsafe {
CStr::from_ptr(av_get_pix_fmt_name(self.pixel_format()))
.to_str()
.unwrap()
.to_string()
}
}
#[inline]
fn set_pixel_format(&self, pixel_format: AVPixelFormat) {
unsafe { (*self.as_mut_ptr()).pix_fmt = pixel_format }
@ -164,11 +175,32 @@ pub trait XCoder: Sized {
unsafe { (*self.as_mut_ptr()).channel_layout = channel_layout }
}
fn channel_layout_name(&self) -> String {
unsafe {
let str = av_malloc(1024).cast();
av_get_channel_layout_string(str, 1024, self.channels(), self.channel_layout());
let string = CStr::from_ptr(str).to_str().unwrap().to_string();
av_free(str.cast());
string
}
}
#[inline]
fn sample_format(&self) -> AVSampleFormat {
unsafe { (*self.as_ptr()).sample_fmt }
}
#[inline]
fn sample_format_name(&self) -> String {
unsafe {
CStr::from_ptr(av_get_sample_fmt_name(self.sample_format()))
.to_str()
.unwrap()
.to_string()
}
}
#[inline]
fn set_sample_format(&self, sample_format: AVSampleFormat) {
unsafe { (*self.as_mut_ptr()).sample_fmt = sample_format }

@ -2,17 +2,20 @@
use crate::av::init;
use crate::transcoder_manager::{TranscoderInstance, TranscoderManager};
use askama::Template;
use async_std::fs::File;
use async_std::sync::{Arc, RwLock};
use ffmpeg_sys_next::{av_make_error_string, av_malloc, av_version_info};
use http_types::Error as HttpError;
use lazy_static::lazy_static;
use serde::Serialize;
use std::collections::HashMap;
use std::ffi::{CStr, CString};
use std::ops::Deref;
use std::option::Option::Some;
use std::os::raw::{c_char, c_int};
use std::str::FromStr;
use tide::{Request, StatusCode};
use tide::{Body, Request, Response, StatusCode};
use uuid::Uuid;
pub mod av;
@ -20,8 +23,6 @@ pub mod transcoder;
pub mod transcoder_manager;
pub mod utils;
// const INPUT_PATH: &str = "/tank/ephemeral/anime/series/Witch Hunter Robin - 2ndfire/[2ndfire]Witch_Hunter_Robin_-_01[91DCE49A].mkv";
const INPUT_PATH: &str = "/tank/ephemeral/anime/series/Brave Witches - Eila/Extras/[Eila] Brave Witches - NCOP [BD 1080p][DB44ED0B].mkv";
const OUTPUT_PATH: &str = "/tmp/transotf";
fn av_err2str(error_code: c_int) -> String {
@ -41,14 +42,27 @@ lazy_static! {
unsafe { CStr::from_ptr(av_version_info()).to_str().unwrap_or("n/a") },
)
};
static ref FAVICON: Vec<u8> = include_bytes!("../resources/favicon.ico").to_vec();
}
#[derive(Default)]
#[derive(Default, Clone)]
struct State {
manager: RwLock<TranscoderManager>,
manager: Arc<RwLock<TranscoderManager>>,
}
async fn create_transcode(req: Request<State>) -> Result<String, HttpError> {
#[derive(Serialize)]
struct CreateDTO {
id: String,
manifest: String,
_player: String,
}
fn build_url(req: &Request<State>, url: String) -> String {
let host = req.header("host").and_then(|x| x.get(0)).unwrap().clone();
format!("http://{}/{}", host, url.trim_start_matches("/"))
}
async fn create_transcode(req: Request<State>) -> Result<Response, HttpError> {
let params = req.query::<HashMap<String, String>>()?;
let target = params.get(&"target".to_string());
let target = if let Some(target) = target {
@ -82,18 +96,22 @@ async fn create_transcode(req: Request<State>) -> Result<String, HttpError> {
)
})?;
Ok(format!("{}", id))
}
let id = id.to_string();
let dto = CreateDTO {
manifest: build_url(&req, format!("/session/{}/manifest.mpd", id)),
_player: build_url(&req, format!("/session/{}/player", id)),
id,
};
async fn get_init(req: Request<State>) -> Result<String, HttpError> {
let _manager = req.state().manager.read().await;
// manager.get()
let mut resp = Response::new(200);
resp.set_body(Body::from_json(&dto)?);
resp.insert_header("Content-Type", "application/json");
Err(HttpError::from_str(StatusCode::NotFound, ":3"))
Ok(resp)
}
async fn get_instance(req: &Request<State>) -> Result<Arc<TranscoderInstance>, HttpError> {
let id = req.param::<String>("id")?;
let id = req.param("id")?;
let id = Uuid::from_str(&id)?;
let manager = req.state().manager.read().await;
if let Some(instance) = manager.get(id) {
@ -101,45 +119,159 @@ async fn get_instance(req: &Request<State>) -> Result<Arc<TranscoderInstance>, H
} else {
Err(HttpError::from_str(
StatusCode::NotFound,
format!("Can't find transcoder with id {}", id),
format!("Can't find session with id {}", id),
))
}
}
async fn get_manifest(req: Request<State>) -> Result<String, HttpError> {
#[derive(Template)]
#[template(path = "manifest.xml")]
struct MPDManifest {
id: Uuid,
has_audio: bool,
duration: f64,
is_vlc: bool,
audio_time_base_den: i32,
video_time_base_den: i32,
}
mod filters {
pub fn iso_duration(secs: &f64) -> ::askama::Result<String> {
let minutes = (secs / 60.0).floor();
let secs = ((secs % 60.0) * 100.0).floor() / 100.0;
let hours = (minutes / 60.0).floor();
let minutes = minutes % 60.0;
return Ok(format!("PT{}H{}M{}S", hours, minutes, secs));
}
}
async fn get_manifest(req: Request<State>) -> Result<Response, HttpError> {
let transcoder_instance = get_instance(&req).await?;
let (duration, segments, init_written) = transcoder_instance
.state(|item| {
(
item.duration(),
item.segments().clone(),
item.init_written(),
)
.clone()
})
.await;
let id = transcoder_instance.id();
let status = transcoder_instance.status().await;
let mut resp = Response::new(200);
resp.insert_header("Content-Type", "application/dash+xml");
let mpd = MPDManifest {
id,
has_audio: status.audio.is_some(),
duration: status.duration_secs,
is_vlc: req
.header("User-Agent")
.map(|x| x.last().to_string())
.map_or(false, |x| x.starts_with("VLC")),
audio_time_base_den: status.audio.map_or(0, |audio| audio.time_scale),
video_time_base_den: status.video.time_scale,
};
resp.set_body(mpd.render().unwrap());
Ok(resp)
}
async fn get_init(req: Request<State>) -> Result<Response, HttpError> {
let transcoder_instance = get_instance(&req).await?;
let type_ = req.param("type").unwrap();
let id = transcoder_instance.id();
if (type_ != "audio" && type_ != "video") || !transcoder_instance.wait_for_init().await {
return Ok(Response::new(StatusCode::NotFound));
}
let mut ok = Response::new(StatusCode::Ok);
ok.insert_header("Content-Type", "video/mp4");
ok.set_body(Body::from_file(format!("{}/{}/{}-init.mp4", OUTPUT_PATH, id, type_)).await?);
Ok(ok)
}
async fn get_segment(req: Request<State>) -> Result<Response, HttpError> {
let transcoder_instance = get_instance(&req).await?;
let type_ = req.param("type").unwrap();
let segment = req.param("nr").unwrap();
let id = transcoder_instance.id();
if !segment.ends_with(".m4s")
|| (type_ != "audio" && type_ != "video")
|| !transcoder_instance.wait_for_init().await
{
return Ok(Response::new(StatusCode::NotFound));
}
let segment = &segment[0..segment.len() - 4];
let segment = if let Ok(segment) = u32::from_str(segment) {
segment
} else {
return Ok(Response::new(StatusCode::NotFound));
};
let status = transcoder_instance.status().await;
if (segment as i64 - 5) > status.current_segment as i64 || segment < status.current_segment {
if !status.segments.contains(&segment) {
transcoder_instance.seek(segment).await;
}
}
if !transcoder_instance.wait_for_segment(segment).await {
return Ok(Response::new(StatusCode::NotFound));
}
let mut ok = Response::new(StatusCode::Ok);
ok.insert_header("Content-Type", "video/mp4");
ok.set_body(
Body::from_file(format!(
"{}/{}/{}-segment-{:0>5}.m4s",
OUTPUT_PATH, id, type_, segment
))
.await?,
);
Ok(ok)
}
async fn get_player(req: Request<State>) -> Result<Response, HttpError> {
get_instance(&req).await?;
let mut resp = Response::new(StatusCode::Ok);
resp.insert_header("Content-Type", "text/html");
resp.set_body(include_str!("../resources/player.html"));
Ok(format!("{} {:?} {}", duration, segments, init_written))
Ok(resp)
}
// async fn get_status(req: Request<State>) -> Result<String, HttpError> {
// let id = req.param::<String>("id")?;
// let id = Uuid::from_str(&id)?;
// let manager = req.state().manager.read().await;
// }
async fn get_favicon(_: Request<State>) -> Result<Response, HttpError> {
let mut resp = Response::new(StatusCode::Ok);
resp.insert_header("Content-Type", "image/x-icon");
resp.set_body(Vec::deref(&FAVICON));
Ok(resp)
}
async fn get_status(req: Request<State>) -> Result<Response, HttpError> {
let instance = get_instance(&req).await?;
let mut resp = Response::new(StatusCode::Ok);
resp.insert_header("Content-Type", "application/json");
resp.set_body(Body::from_json(&instance.status().await)?);
Ok(resp)
}
#[async_std::main]
async fn main() -> std::io::Result<()> {
println!("{}", HOME_PAGE.clone());
init();
let mut app = tide::with_state(State::default());
app.at("/").get(|_| async { Ok(HOME_PAGE.clone()) });
app.at("/favicon.ico").get(get_favicon);
app.at("/transcode").get(create_transcode);
// app.at("/session/:id").get(get_status);
app.at("/session/:id/status.json").get(get_status);
app.at("/session/:id/player").get(get_player);
app.at("/session/:id/manifest").get(get_manifest);
// app.at("/session/:id/{type}/init.mp4");
// app.at("/session/:id/{type}/{nr}.m4s");
app.listen("0:8000").await
app.at("/session/:id/manifest.xml").get(get_manifest);
app.at("/session/:id/manifest.mpd").get(get_manifest);
app.at("/session/:id/:type/init.mp4").get(get_init);
app.at("/session/:id/:type/:nr").get(get_segment);
let listen_fut = app.listen("0:8000");
println!("Listening on 0:8000");
listen_fut.await
}

@ -16,7 +16,7 @@ use ffmpeg_sys_next::AVPictureType::AV_PICTURE_TYPE_I;
use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P;
use ffmpeg_sys_next::{avio_wb32, AVIOContext, AVPixelFormat, AV_TIME_BASE, SWS_BILINEAR};
use num_rational::Ratio;
use std::cmp::min;
use std::cmp::{max, min};
use std::fs::{DirBuilder, File};
use std::io::Write;
use std::ops::Mul;
@ -31,13 +31,17 @@ pub struct AudioTranscoder {
format: Format<Buffer>,
output_path: String,
resampler: Resampler,
segment: u32,
segment: Option<u32>,
last_pts: i64,
current_pts: i64,
seconds_per_segment: u32,
segment_target: Option<u32>,
header_written: bool,
last_written_segment: Option<u32>,
latest_written_segment: Option<u32>,
}
const SECONDS_PER_SEGMENT: u32 = 5;
pub const SECONDS_PER_SEGMENT: u32 = 5;
impl AudioTranscoder {
fn process_packet(&mut self, packet: Packet) -> Result<(), String> {
@ -45,10 +49,22 @@ impl AudioTranscoder {
.send_packet(&packet)
.map_err(|err| format!("Failed sending audio packet: {}", err))?;
while let Some(frame) = self.decoder.read_frame()? {
if let Some(target) = self.segment_target {
let segment = (self
.input_stream
.time_base()
.mul(frame.pts() as i32)
.to_integer()
/ self.seconds_per_segment as i32) as u32;
if (target as i64 - 1) > (segment as i64) {
continue;
}
}
let resampled = self.resampler.convert(&frame)?;
let resampled_pts = resampled.pts();
self.frame_buffer.insert_frame(resampled);
let mut offset: f64 = 0f64;
let mut offset: f64 = 0.0;
while let Some(drain_resampled) = self.resampler.drain()? {
offset += (drain_resampled.nb_samples() as f64 / self.encoder.sample_rate() as f64)
* self.input_stream.time_base().den() as f64;
@ -63,12 +79,25 @@ impl AudioTranscoder {
}
#[inline]
fn input_stream(&self) -> &Stream {
pub fn input_stream(&self) -> &Stream {
&self.input_stream
}
#[inline]
pub fn output_stream(&self) -> &Stream {
&self.output_stream
}
pub fn encoder(&self) -> &Encoder {
&self.encoder
}
pub fn decoder(&self) -> &Decoder {
&self.decoder
}
fn start_segment(&self, force: bool) {
if self.segment == 0 && !force {
if self.segment == Some(0) && !force {
return;
}
@ -97,8 +126,16 @@ impl AudioTranscoder {
}
fn write_segment(&mut self) -> Result<(), String> {
if self.segment == 0 {
if self.segment == Some(0) {
self.write_header()?;
self.header_written = true;
}
if !self.header_written {
self.format.avio_inner_mut().unwrap().buffer();
self.start_segment(true);
self.format.write_packet_null()?;
self.header_written = true;
}
if let Some(avio) = self.format.avio.as_mut() {
@ -107,32 +144,82 @@ impl AudioTranscoder {
let mut segment = self.format.avio_inner_mut().unwrap().buffer();
File::create(format!(
"{}/audio-segment-{:0>5}.m4s",
self.output_path, self.segment
))
.unwrap()
.write_all(&mut segment)
.map_err(|_| format!("Failed to write audio segment {}", self.segment))?;
if self.segment_target.unwrap_or(self.segment.unwrap()) == self.segment.unwrap() {
self.segment_target = None;
File::create(format!(
"{}/audio-segment-{:0>5}.m4s",
self.output_path,
self.segment.unwrap()
))
.unwrap()
.write_all(&mut segment)
.map_err(|_| format!("Failed to write audio segment {}", self.segment.unwrap()))?;
println!("Wrote segment (audio) {}", self.segment.unwrap());
self.last_written_segment = self.segment;
self.latest_written_segment =
max(self.latest_written_segment, Some(self.segment.unwrap()));
} else {
println!(
"Won't write segment (audio) {} (searching for {})",
self.segment.unwrap(),
self.segment_target.unwrap()
)
}
self.start_segment(false);
Ok(())
}
fn encode(&mut self) -> Result<(), String> {
while let Some(frame) = self.frame_buffer.pop_first() {
let pts_passed = frame.pts() - self.last_pts;
if pts_passed <= 0 && self.last_pts != 0 {
println!("WARN: new frame out of order");
if self.last_pts > frame.pts() {
println!("WARN: out of order frame");
self.last_pts = frame.pts();
}
if (pts_passed + self.current_pts)
> (self.seconds_per_segment as i32 * self.input_stream.time_base().den()) as i64
if self.last_pts == 0 {
self.last_pts = frame.pts();
}
self.current_pts = frame.pts()
% (self.input_stream.time_base().den() as u32 * self.seconds_per_segment) as i64;
self.segment = Some(
(frame.pts() as f64 / self.input_stream.time_base().den() as f64).floor() as u32
/ self.seconds_per_segment,
);
if self.current_pts == 0 && Some(0) < self.segment {
self.segment = Some(self.segment.unwrap() - 1)
}
let (should_skip, peek) = if let Some(peek) = self.frame_buffer.first_key() {
(
(self.current_pts + (peek - frame.pts()))
< (self.seconds_per_segment * self.input_stream.time_base().den() as u32)
as i64,
peek,
)
} else {
(false, frame.pts())
};
let pts_passed = peek - frame.pts();
if self.segment == Some(2) {
println!(
">>>>>> {} {} {} {:?} {}",
self.current_pts, self.last_pts, pts_passed, should_skip, peek
);
}
if (!should_skip || self.current_pts == 0)
&& ((pts_passed + self.current_pts)
> (self.seconds_per_segment * self.input_stream.time_base().den() as u32)
as i64
|| (self.current_pts == 0 && Some(0) < self.segment))
{
self.format.write_packet_null()?;
self.write_segment()?;
self.segment += 1;
self.current_pts =
(pts_passed + self.current_pts) % self.input_stream.time_base().den() as i64;
} else {
@ -157,6 +244,27 @@ impl AudioTranscoder {
Ok(())
}
fn static_create_output(input: &Format<()>) -> Result<(Format<Buffer>, Stream), String> {
let audio_avio = AVIO::writer(Buffer::default());
let format = Format::output_avio(audio_avio, "mp4")?;
let output_stream = format.new_stream("aac")?;
format.set_flags(input.flags());
Ok((format, output_stream))
}
fn create_output(&mut self, input: &Format<()>) -> Result<(), String> {
let (format, stream) = Self::static_create_output(input)?;
self.output_stream = stream;
self.format = format;
Ok(())
}
fn open_output(&self) -> Result<(), String> {
Transcoder::static_open_output(&self.format, None)
}
}
pub struct Transcoder {
@ -172,13 +280,17 @@ pub struct Transcoder {
frame_scaler: Scaler,
last_pts: i64,
current_pts: i64,
video_segment: u32,
highest_pts: i64,
video_segment: Option<u32>,
extra_data: Option<Vec<u8>>,
audio: Option<AudioTranscoder>,
segment_target: Option<u32>,
input_video_index: Option<i32>,
input_audio_index: Option<i32>,
seconds_per_segment: u32,
header_written: bool,
last_written_segment: Option<u32>,
latest_written_segment: Option<u32>,
}
impl Transcoder {
@ -192,12 +304,16 @@ impl Transcoder {
let output_path = output_path.to_string();
let input_video = input
.stream(AVMEDIA_TYPE_VIDEO, video_index, None)
.stream(Some(AVMEDIA_TYPE_VIDEO), video_index, None)
.map_err(|err| format!("Failed to find video stream: {}", err))?
.ok_or("Failed to find video stream".to_string())?;
let input_audio = input
.stream(AVMEDIA_TYPE_AUDIO, audio_index, Some(input_video.index()))
.stream(
Some(AVMEDIA_TYPE_AUDIO),
audio_index,
Some(input_video.index()),
)
.map_err(|err| format!("Failed to find audio stream: {}", err))?;
if audio_index.is_some() && input_audio.is_none() {
return Err("Failed to find audio stream".to_string());
@ -205,23 +321,9 @@ impl Transcoder {
let input_frame_rate = input_video.avg_frame_rate(&input).unwrap().to_num();
let mut options = Dictionary::new();
options.set("movflags", "+dash+delay_moov+skip_trailer+frag_custom");
let video_avio = AVIO::writer(Buffer::default());
let video_output = Format::output_avio(video_avio, "mp4")?;
let output_video = video_output.new_stream("h264")?;
output_video.params().copy_from(&input_video.params())?;
output_video.set_sample_aspect_ratio(input_video.sample_aspect_ratio());
video_output.set_flags(input.flags());
let (video_output, output_video) = Self::create_streams(&input, &input_video)?;
let audio = if let Some(input_stream) = input_audio {
let audio_avio = AVIO::writer(Buffer::default());
let format = Format::output_avio(audio_avio, "mp4")?;
let output_stream = format.new_stream("aac")?;
format.set_flags(input.flags());
let (format, output_stream) = AudioTranscoder::static_create_output(&input)?;
let decoder = input_stream.decoder(None).ok_or(format!(
"Couldn't find encoder for input audio with codec: {:?}",
input_stream.codec()
@ -251,9 +353,7 @@ impl Transcoder {
encoder.configure(&output_stream)?;
let resampler = Resampler::from_coder(&decoder, &encoder);
let mut audio_options = Dictionary::new();
audio_options.copy_from(&mut options)?;
format.init_output(audio_options)?;
Transcoder::static_open_output(&format, None)?;
Some(AudioTranscoder {
frame_buffer: SortedFrameBuffer::new(),
@ -265,9 +365,13 @@ impl Transcoder {
output_path: output_path.to_string(),
current_pts: 0,
last_pts: 0,
segment: 0,
segment: None,
resampler,
seconds_per_segment: SECONDS_PER_SEGMENT,
segment_target: None,
header_written: false,
latest_written_segment: None,
last_written_segment: None,
})
} else {
None
@ -288,18 +392,13 @@ impl Transcoder {
encoder.set_height(video_decoder.height());
encoder.set_width(video_decoder.width());
encoder.set_time_base(input_frame_rate.invert());
output_video.set_time_base(encoder.time_base());
encoder.open()?;
encoder.configure(&output_video)?;
Ok(())
})?;
let param = output_video.params();
param.set_height(video_decoder.height());
param.set_width(video_decoder.width());
param.set_pixel_format(AV_PIX_FMT_YUV420P);
video_output.init_output(options)?;
Self::static_open_output(&video_output, Some((&input_video, &video_decoder)))?;
let frame_scaler = Scaler::from_coder(&video_decoder, &video_encoder, SWS_BILINEAR);
let video_frame_buffer = SortedFrameBuffer::new();
@ -320,15 +419,121 @@ impl Transcoder {
video_frame_buffer,
last_pts: 0,
current_pts: 0,
video_segment: 0,
video_segment: None,
extra_data: None,
input_audio_index: None,
seconds_per_segment: SECONDS_PER_SEGMENT,
header_written: false,
latest_written_segment: None,
last_written_segment: None,
highest_pts: 0,
})
}
pub fn seek(&self, segment: u32, stream: Option<&Stream>) -> Result<(), String> {
self.input.seek(segment * self.seconds_per_segment, stream)
fn create_output(&mut self) -> Result<(), String> {
let (video_output, output_video) = Self::create_streams(&self.input, &self.input_video)?;
self.video_output = video_output;
self.output_video = output_video;
Ok(())
}
fn create_streams(
input: &Format<()>,
input_video: &Stream,
) -> Result<(Format<Buffer>, Stream), String> {
let video_avio = AVIO::writer(Buffer::default());
let video_output = Format::output_avio(video_avio, "mp4")?;
let output_video = video_output.new_stream("h264")?;
output_video.params().copy_from(&input_video.params())?;
output_video.set_sample_aspect_ratio(input_video.sample_aspect_ratio());
video_output.set_flags(input.flags());
Ok((video_output, output_video))
}
fn open_output(&self) -> Result<(), String> {
Self::static_open_output(
&self.video_output,
Some((&self.output_video, &self.video_decoder)),
)
}
fn static_open_output(
output: &Format<Buffer>,
video: Option<(&Stream, &Decoder)>,
) -> Result<(), String> {
let mut options = Dictionary::new();
options.set("movflags", "+dash+delay_moov+skip_trailer+frag_custom");
if let Some((video, video_decoder)) = video {
let param = video.params();
param.set_height(video_decoder.height());
param.set_width(video_decoder.width());
param.set_pixel_format(AV_PIX_FMT_YUV420P);
}
output.init_output(options)
}
pub fn video_encoder(&self) -> &Encoder {
&self.video_encoder
}
pub fn video_decoder(&self) -> &Decoder {
&self.video_decoder
}
pub fn reset_output(&mut self) -> Result<(), String> {
let time_base = self.output_video.time_base();
self.create_output()?;
self.output_video
.set_time_base(self.video_encoder.time_base());
self.video_encoder.configure(&self.output_video)?;
self.open_output()?;
self.video_encoder.flush();
self.output_video.set_time_base(time_base);
self.video_segment = None;
self.header_written = false;
let mut options = Dictionary::new();
options.set("movflags", "+dash+delay_moov+skip_trailer+frag_custom");
self.video_output.write_header(Some(options)).unwrap();
if let Some(audio) = self.audio.as_mut() {
let time_base = audio.output_stream.time_base();
audio.create_output(&self.input)?;
audio.output_stream.set_time_base(audio.encoder.time_base());
audio.encoder.configure(&audio.output_stream)?;
audio.open_output()?;
audio.encoder.flush();
audio.output_stream.set_time_base(time_base);
audio.segment = None;
audio.header_written = false;
let mut options = Dictionary::new();
options.set("movflags", "+dash+delay_moov+skip_trailer+frag_custom");
audio.format.write_header(Some(options)).unwrap();
audio.last_pts = 0;
}
Ok(())
}
pub fn seek(&mut self, segment: u32, stream_index: Option<i32>) -> Result<(), String> {
self.input.seek(
segment * self.seconds_per_segment,
stream_index,
segment < self.last_written_segment().unwrap_or(0),
)?;
self.reset_output()?;
self.video_frame_buffer.clear();
self.segment_target = Some(segment);
self.video_segment = None;
if let Some(audio) = self.audio.as_mut() {
audio.frame_buffer.clear();
audio.segment_target = Some(segment);
audio.segment = None;
}
Ok(())
}
pub fn open(&mut self) -> Result<(), String> {
@ -336,37 +541,56 @@ impl Transcoder {
.recursive(true)
.create(self.output_path.to_string())
.map_err(|_| "Failed to create target directory".to_string())?;
self.video_output.write_header(None).unwrap();
let mut options = Dictionary::new();
options.set("movflags", "+dash+delay_moov+skip_trailer+frag_custom");
self.video_output.write_header(Some(options)).unwrap();
self.input_video_index = Some(self.input_video.index());
self.input_audio_index = self.audio.as_ref().map(|ia| ia.input_stream.index());
Ok(())
}
pub fn duration(&self) -> i64 {
self.input.duration()
pub fn last_frame_secs(&self) -> f64 {
self.last_pts as f64 / self.input_video.time_base().den() as f64
}
pub fn latest_frame_secs(&self) -> f64 {
self.highest_pts as f64 / self.input_video.time_base().den() as f64
}
pub fn duration_secs(&self) -> f64 {
self.input.duration() as f64 / AV_TIME_BASE as f64
}
pub fn has_audio(&self) -> bool {
self.audio.is_some()
}
pub fn transcode(&mut self) -> Result<bool, String> {
Ok(if let Some(packet) = self.input.next_packet() {
if let Some(packet) = self.input.next_packet() {
if self.input_video_index == Some(packet.stream()) {
self.video_process_packet(packet)?;
} else if self.input_audio_index == Some(packet.stream()) {
// Safe assumption
if let Some(ref mut audio) = &mut self.audio {
audio.process_packet(packet)?;
}
}
true
} else {
false
})
return Ok(true);
}
self.video_frame_buffer.close();
self.encode_video()?;
if let Some(audio) = self.audio.as_mut() {
audio.frame_buffer.close();
audio.encode()?;
}
Ok(false)
}
pub fn avg_frame_rate(&self) -> Option<impl Rational> {
self.video_stream().avg_frame_rate(&self.input)
}
#[inline]
@ -374,6 +598,11 @@ impl Transcoder {
&self.input_video
}
#[inline]
pub fn audio(&self) -> Option<&AudioTranscoder> {
self.audio.as_ref()
}
#[inline]
pub fn audio_stream(&self) -> Option<&Stream> {
if let Some(ref audio) = self.audio {
@ -383,10 +612,36 @@ impl Transcoder {
}
}
pub fn seeking(&self) -> bool {
self.segment_target.is_some()
|| self
.audio
.as_ref()
.map_or(false, |audio| audio.segment_target.is_some())
}
pub fn last_written_audio_segment(&self) -> Option<u32> {
self.audio.as_ref().and_then(|x| x.last_written_segment)
}
pub fn last_written_segment(&self) -> Option<u32> {
self.last_written_segment
}
pub fn latest_written_segment(&self) -> Option<u32> {
if let Some(audio) = self.audio.as_ref() {
min(self.latest_written_segment, audio.latest_written_segment)
} else {
self.latest_written_segment
}
}
pub fn segment(&self) -> u32 {
min(
self.video_segment,
self.audio.as_ref().map_or(u32::MAX, |audio| audio.segment),
self.video_segment.unwrap_or(0),
self.audio
.as_ref()
.map_or(u32::MAX, |audio| audio.segment.unwrap_or(u32::MAX)),
)
}
@ -395,6 +650,12 @@ impl Transcoder {
self.video_output.write_trailer()?;
self.video_write_segment()?;
if let Some(audio) = self.audio.as_mut() {
audio.encode()?;
audio.format.write_trailer()?;
audio.write_segment()?;
}
Ok(())
}
@ -404,12 +665,11 @@ impl Transcoder {
let segment: u32 = (self
.input_video
.time_base()
.to_num()
.mul(packet.pts() as i32)
.to_integer()
/ self.seconds_per_segment as i32) as u32;
if let Some(target) = self.segment_target {
if target - 1 < segment {
if (target as i64 - 1) > (segment as i64) {
continue;
}
}
@ -441,8 +701,16 @@ impl Transcoder {
}
fn video_write_segment(&mut self) -> Result<(), String> {
if self.video_segment == 0 {
if self.video_segment == Some(0) {
self.video_write_header()?;
self.header_written = true;
}
if !self.header_written {
self.video_output.avio_inner_mut().unwrap().buffer();
self.start_segment(true);
self.video_output.write_packet_null()?;
self.header_written = true;
}
if let Some(avio) = self.video_output.avio.as_mut() {
@ -450,15 +718,48 @@ impl Transcoder {
}
let segment = self.video_output.avio_inner_mut().unwrap().buffer();
let mut segment = annex_b_to_avc(segment, 4)?;
File::create(format!(
"{}/video-segment-{:0>5}.m4s",
self.output_path, self.video_segment
))
.unwrap()
.write_all(&mut segment)
.map_err(|_| format!("Failed to write video segment {}", self.video_segment))?;
if self.segment_target.unwrap_or(self.video_segment.unwrap()) == self.video_segment.unwrap()
{
self.segment_target = None;
let segment_res = annex_b_to_avc(segment, 4);
if let Err(_) = segment_res {
println!("oh");
}
let mut segment = segment_res?;
File::create(format!(
"{}/video-segment-{:0>5}.m4s",
self.output_path,
self.video_segment.unwrap()
))
.unwrap()
.write_all(&mut segment)
.map_err(|_| {
format!(
"Failed to write video segment {}",
self.video_segment.unwrap()
)
})?;
self.last_written_segment = self.video_segment;
self.latest_written_segment = max(
self.latest_written_segment,
Some(self.video_segment.unwrap()),
);
println!(
"Wrote segment (video) {} ({} -> {} / coder: {} {})",
self.video_segment.unwrap(),
self.input_video.time_base(),
self.output_video.time_base(),
self.video_decoder.time_base(),
self.video_encoder.time_base()
);
} else {
println!(
"Won't write segment (video) {} (searching for {})",
self.video_segment.unwrap(),
self.segment_target.unwrap()
)
}
self.start_segment(false);
self.video_encoder.flush();
Ok(())
@ -466,21 +767,42 @@ impl Transcoder {
fn encode_video(&mut self) -> Result<(), String> {
while let Some(frame) = self.video_frame_buffer.pop_first() {
// if self.video_segment == None || self.last_pts > frame.pts() {
self.last_pts = frame.pts()
- (self
.avg_frame_rate()
.unwrap()
.invert()
.to_num()
.mul(self.input_video.time_base().den() as i32))
.as_f64() as i64;
self.current_pts = frame.pts()
% (self.input_video.time_base().den() as u32 * self.seconds_per_segment) as i64;
self.video_segment = Some(
(frame.pts() as f64 / self.input_video.time_base().den() as f64).floor() as u32
/ self.seconds_per_segment,
);
if self.current_pts == 0 && Some(0) < self.video_segment {
self.video_segment = Some(self.video_segment.unwrap() - 1)
}
let pts_passed = frame.pts() - self.last_pts;
if (pts_passed + self.current_pts)
> (self.seconds_per_segment * self.input_frame_rate.den() as u32) as i64
> (self.seconds_per_segment * self.input_video.time_base().den() as u32) as i64
|| (self.current_pts == 0 && Some(0) < self.video_segment)
{
self.video_output.write_packet_null()?;
self.video_write_segment()?;
self.video_segment += 1;
self.current_pts =
(pts_passed + self.current_pts) % self.input_frame_rate.den() as i64;
(pts_passed + self.current_pts) % self.input_video.time_base().den() as i64;
frame.set_pict_type(AV_PICTURE_TYPE_I);
} else {
self.current_pts += pts_passed
}
self.last_pts = frame.pts();
self.highest_pts = max(self.highest_pts, self.last_pts);
self.video_encoder.send_video_frame(&frame)?;
while let Some(new_packet) = self.video_encoder.read_packet()? {
@ -525,8 +847,16 @@ impl Transcoder {
Ok(())
}
pub fn video_stream_output(&self) -> &Stream {
&self.output_video
}
pub fn audio_stream_output(&self) -> Option<&Stream> {
self.audio.as_ref().map(|audio| &audio.output_stream)
}
fn start_segment(&self, force: bool) {
if self.video_segment == 0 && !force {
if self.video_segment == Some(0) && !force {
return;
}

@ -1,38 +1,188 @@
use crate::transcoder::Transcoder;
use async_std::sync::{Arc, Receiver, RecvError, RwLock, RwLockReadGuard, Sender, TryRecvError};
use crate::av::xcoder::XCoder;
use crate::av::Rational;
use crate::transcoder::{Transcoder, SECONDS_PER_SEGMENT};
use async_std::channel::{bounded, Receiver, RecvError, SendError, Sender, TryRecvError};
use async_std::sync::{Arc, RwLock, RwLockReadGuard};
use async_std::task;
use futures::channel::oneshot::{
channel as one_shot, Receiver as OneShotReceiver, Sender as OneShotSender,
};
use serde::Serialize;
use std::cmp::min;
use std::collections::{HashMap, HashSet};
use std::option::Option::Some;
use std::thread::Builder;
use std::thread::JoinHandle;
use uuid::Uuid;
#[derive(Debug, Default)]
struct OneShotSet(Vec<OneShotSender<()>>);
impl OneShotSet {
fn add(&mut self) -> OneShotReceiver<()> {
let (s, r) = one_shot::<()>();
self.0.push(s);
r
}
fn trigger(&mut self) {
while let Some(sender) = self.0.pop() {
sender.send(()).unwrap()
}
}
fn new() -> OneShotSet {
OneShotSet(vec![])
}
}
#[derive(Debug, Default)]
pub struct TranscoderEvents {
init_queue: OneShotSet,
segment_queue: HashMap<u32, OneShotSet>,
}
impl TranscoderEvents {
pub fn new() -> TranscoderEvents {
Default::default()
}
}
#[derive(Debug)]
pub struct TranscoderInstance {
id: Uuid,
channel: Channel<Message>,
state: RwLock<TranscoderInstanceState>,
command: TranscoderCommand,
events: RwLock<TranscoderEvents>,
status: RwLock<TranscoderStatus>,
}
impl TranscoderInstance {
pub async fn state<T>(&self, block: fn(RwLockReadGuard<TranscoderInstanceState>) -> T) -> T {
block(self.state.read().await)
}
pub fn id(&self) -> Uuid {
self.id
}
pub async fn seek(&self, segment: u32) {
self.channel.send(Message::Seek(segment)).await.unwrap()
}
pub async fn wait_for_segment(&self, segment: u32) -> bool {
{
let state = self.state.read().await;
if state.info.segments() < segment {
return false;
}
if state.segments.contains(&segment) {
return true;
}
if state.state == TranscoderState::Finished {
return false;
}
}
println!("Waiting in segment queue for segment {}", segment);
let r = {
self.events
.write()
.await
.segment_queue
.entry(segment)
.or_insert_with(|| OneShotSet::new())
.add()
};
r.await.is_ok()
}
pub async fn wait_for_init(&self) -> bool {
if self.state.read().await.init_written {
return true;
}
let r = { self.events.write().await.init_queue.add() };
r.await.is_ok()
}
pub async fn status(&self) -> TranscoderStatus {
self.status.read().await.clone()
}
}
#[derive(Serialize, Debug, Clone, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub enum TranscoderState {
None,
Seeking,
Transcoding,
Failed,
Stopped,
Finished,
}
impl Default for TranscoderState {
fn default() -> Self {
TranscoderState::None
}
}
#[derive(Serialize, Debug, Clone, Default)]
pub struct VideoInput {
pub codec: String,
pub encoder: String,
pub decoder: String,
pub frame_rate_rat: Option<[u32; 2]>,
pub frame_rate: Option<f64>,
pub width: u32,
pub height: u32,
pub time_scale: i32,
}
#[derive(Serialize, Debug, Clone, Default)]
pub struct AudioInput {
pub codec: String,
pub encoder: String,
pub decoder: String,
pub channels: i32,
pub channel_layout: String,
pub sample_rate: i32,
pub time_scale: i32,
}
#[derive(Serialize, Debug, Clone, Default)]
pub struct TranscoderStatus {
pub state: TranscoderState,
pub failure_reason: Option<String>,
pub segments: Vec<u32>,
pub init_written: bool,
pub duration_secs: f64,
pub current_segment: u32,
pub segment_count: u32,
pub video: VideoInput,
pub audio: Option<AudioInput>,
}
#[derive(Debug, Default)]
pub struct TranscoderInstanceState {
thread: Option<JoinHandle<()>>,
duration_secs: f64,
info: TranscoderInfo,
segments: HashSet<u32>,
init_written: bool,
state: TranscoderState,
failure_reason: Option<String>,
}
impl TranscoderInstanceState {
#[inline]
pub fn duration(&self) -> f64 {
self.duration_secs
pub fn info(&self) -> &TranscoderInfo {
&self.info
}
#[inline]
@ -58,11 +208,11 @@ pub struct Channel<T> {
}
impl<T> Channel<T> {
async fn send(&self, msg: T) {
async fn send(&self, msg: T) -> Result<(), SendError<T>> {
self.sender.send(msg).await
}
fn send_blocking(&self, msg: T) {
fn send_blocking(&self, msg: T) -> Result<(), SendError<T>> {
task::block_on(self.sender.send(msg))
}
@ -80,8 +230,8 @@ impl<T> Channel<T> {
}
fn new_channel() -> (Channel<Message>, Channel<Message>) {
let (sender_a, receiver_a) = async_std::sync::channel(25);
let (sender_b, receiver_b) = async_std::sync::channel(25);
let (sender_a, receiver_a) = bounded(25);
let (sender_b, receiver_b) = bounded(25);
(
Channel {
@ -103,19 +253,34 @@ pub struct TranscoderCommand {
video: Option<i32>,
}
#[derive(Debug, Clone, Default)]
pub struct TranscoderInfo {
pub duration_secs: f64,
pub audio: Option<AudioInput>,
pub video: VideoInput,
}
impl TranscoderInfo {
fn segments(&self) -> u32 {
(self.duration_secs / SECONDS_PER_SEGMENT as f64).ceil() as u32
}
}
#[derive(Debug)]
pub enum Message {
Info { duration_secs: f64 },
Info(TranscoderInfo),
SegmentReady(u32),
Seek(u32),
Seeking,
Failure(String),
Stop,
Finished,
Stopped,
Finished(f64, u32),
}
fn transcoder_thread_main(command: TranscoderCommand, channel: Channel<Message>) {
if let Err(err) = transcoder_thread(command, &channel) {
channel.send_blocking(Message::Failure(err));
channel.send_blocking(Message::Failure(err)).unwrap();
return;
}
}
@ -131,43 +296,124 @@ fn transcoder_thread(command: TranscoderCommand, channel: &Channel<Message>) ->
let mut transcoder = match transcoder {
Ok(t) => t,
Err(e) => {
channel.send_blocking(Message::Failure(e));
channel.send_blocking(Message::Failure(e)).unwrap();
return Ok(());
}
};
let mut segments = HashSet::new();
let mut video_segments = HashSet::new();
let mut video_last_segment = None;
let mut audio_segments = HashSet::new();
let mut audio_last_segment = None;
transcoder.open()?;
channel.send_blocking(Message::Info {
duration_secs: transcoder.duration_secs(),
});
channel
.send_blocking(Message::Info(TranscoderInfo {
duration_secs: transcoder.duration_secs(),
audio: transcoder.audio().map(|audio| AudioInput {
codec: audio.input_stream().codec_name(),
decoder: audio.decoder().name(),
encoder: audio.encoder().name(),
channels: audio.decoder().channels(),
channel_layout: audio.decoder().channel_layout_name(),
sample_rate: audio.decoder().sample_rate(),
time_scale: audio.output_stream().time_base().den(),
}),
video: VideoInput {
codec: transcoder.video_stream().codec_name(),
encoder: transcoder.video_encoder().name(),
decoder: transcoder.video_decoder().name(),
frame_rate_rat: transcoder
.avg_frame_rate()
.map(|x| [x.den() as u32, x.num() as u32]),
frame_rate: transcoder.avg_frame_rate().map(|x| x.as_f64()),
width: transcoder.video_decoder().width() as u32,
height: transcoder.video_decoder().height() as u32,
time_scale: transcoder.video_stream_output().time_base().den(),
},
}))
.unwrap();
let mut completed = false;
let has_audio = transcoder.has_audio();
while !completed {
if transcoder.transcode()? {
while let Ok(msg) = channel.try_recv() {
match msg {
Message::Seek(segment) => {
channel.send_blocking(Message::Seeking).unwrap();
let stream = Some(transcoder.video_stream().index());
transcoder.seek(segment, stream)?;
}
Message::Stop => {
transcoder.stop();
channel.send_blocking(Message::Stopped).unwrap();
return Ok(());
}
_ => {}
};
}
let mut last_segment = transcoder.segment();
while transcoder.transcode()? {
while let Ok(msg) = channel.try_recv() {
match msg {
Message::Seek(segment) => {
transcoder.seek(segment, Some(transcoder.video_stream()))?;
if video_last_segment != transcoder.last_written_segment() {
if let Some(segment) = transcoder.last_written_segment() {
video_segments.insert(segment);
if !has_audio || audio_segments.contains(&segment) {
channel
.send_blocking(Message::SegmentReady(segment))
.unwrap();
segments.insert(segment);
}
}
Message::Stop => {
transcoder.stop();
return Ok(());
video_last_segment = transcoder.last_written_segment();
}
if audio_last_segment != transcoder.last_written_audio_segment() {
if let Some(segment) = transcoder.last_written_audio_segment() {
audio_segments.insert(segment);
if video_segments.contains(&segment) {
channel
.send_blocking(Message::SegmentReady(segment))
.unwrap();
segments.insert(segment);
}
}
_ => {}
};
}
audio_last_segment = transcoder.last_written_audio_segment();
}
} else {
println!("EOF, seeking to missed parts");
let mut si: u32 = 0;
while si <= transcoder.latest_written_segment().unwrap() as u32 {
if segments.contains(&si) {
si += 1;
continue;
}
if last_segment < transcoder.segment() {
channel.send_blocking(Message::SegmentReady(last_segment));
last_segment = transcoder.segment();
println!("Seeking to segment {}", si);
transcoder.seek(si, Some(transcoder.video_stream().index()))?;
break;
}
if si > transcoder.latest_written_segment().unwrap() {
completed = true;
}
}
}
transcoder.finish()?;
channel.send_blocking(Message::SegmentReady(transcoder.segment()));
channel
.send_blocking(Message::SegmentReady(transcoder.segment()))
.unwrap();
let duration = transcoder.latest_frame_secs();
let segment_count = transcoder.segment();
transcoder.stop();
channel.send_blocking(Message::Finished);
channel
.send_blocking(Message::Finished(duration, segment_count))
.unwrap();
Ok(())
}
@ -197,13 +443,14 @@ impl TranscoderManager {
let instance = TranscoderInstance {
id: uuid,
command,
state: RwLock::new(TranscoderInstanceState {
thread: Some(thread),
init_written: false,
segments: HashSet::new(),
duration_secs: 0.0,
state: RwLock::new({
let mut it = TranscoderInstanceState::default();
it.thread = Some(thread);
it
}),
channel: own_channel,
events: RwLock::new(TranscoderEvents::new()),
status: Default::default(),
};
let instance = Arc::new(instance);
@ -213,34 +460,102 @@ impl TranscoderManager {
loop {
let fut = instance.channel.recv();
if let Ok(x) = fut.await {
println!("[{}] {:?}", uuid, &x);
match x {
Message::Finished => {
let mut state = instance.state.write().await;
if let Some(join_handle) = std::mem::replace(&mut state.thread, None) {
join_handle.join().unwrap();
Message::Finished(duration, segment_count) => {
{
let mut status = instance.status.write().await;
status.state = TranscoderState::Finished;
status.segment_count = segment_count;
status.duration_secs = duration;
}
{
let mut state = instance.state.write().await;
if let Some(join_handle) =
std::mem::replace(&mut state.thread, None)
{
join_handle.join().unwrap();
}
}
break;
}
Message::Info { duration_secs } => {
let mut state = instance.state.write().await;
state.duration_secs = duration_secs
Message::Stopped => {
let mut status = instance.status.write().await;
status.state = TranscoderState::Stopped
}
Message::Failure(error) => {
let mut status = instance.status.write().await;
status.state = TranscoderState::Failed;
status.failure_reason = Some(error);
}
Message::Info(info) => {
{
let mut state = instance.state.write().await;
state.info = info.clone()
}
{
let mut status = instance.status.write().await;
status.audio = info.audio;
status.video = info.video;
status.segment_count =
(info.duration_secs / SECONDS_PER_SEGMENT as f64).ceil() as u32;
status.duration_secs = info.duration_secs
}
}
Message::Seeking => {
let mut status = instance.status.write().await;
status.state = TranscoderState::Seeking
}
Message::SegmentReady(segment) => {
let mut state = instance.state.write().await;
if segment == 0 {
state.init_written = true
state.init_written = true;
instance.events.write().await.init_queue.trigger()
}
state.segments.insert(segment);
let current_segment = min(segment + 1, state.info.segments() - 1);
{
let mut status = instance.status.write().await;
status.init_written = true;
status.current_segment = current_segment;
status.state = TranscoderState::Transcoding;
if state.segments.insert(segment) {
status.segments.push(segment);
}
}
instance
.events
.write()
.await
.segment_queue
.get_mut(&segment)
.map(|x| x.trigger());
let mut skip_ahead = 1 + segment;
while state.info.segments() > skip_ahead
&& state.segments.contains(&skip_ahead)
{
skip_ahead += 1;
}
if skip_ahead > (segment + 5) {
instance.seek(skip_ahead).await;
}
}
_ => {}
}
println!("> {:?}", x);
} else {
return;
}

@ -87,6 +87,12 @@ impl<K: Copy + Debug + Eq + Hash + Ord, V: Debug> SortedBuffer<K, V> {
self.open = false;
}
pub fn clear(&mut self) {
self.sorted_keys.clear();
self.items.clear();
self.lowest_value = None;
}
pub fn first_key(&mut self) -> Option<K> {
self.sorted_keys.first().copied()
}

@ -0,0 +1,34 @@
<?xml version="1.0"?>
<MPD xmlns="urn:mpeg:dash:schema:mpd:2011"
minimumUpdatePeriod="PT4S"
minBufferTime="1.5"
{% if is_vlc %}
type="static"
{% else %}
type="dynamic"
{% endif %}
profiles="urn:mpeg:dash:profile:isoff-live:2011"
mediaPresentationDuration="{{ duration|iso_duration }}">
<Period start="PT0.0S">
<BaseURL>/session/{{ id }}/</BaseURL>
<AdaptationSet contentType="video">
<BaseURL>video/</BaseURL>
<Representation mimeType="video/mp4" codecs="avc1.640028" id="video">
<SegmentTemplate media="$Number$.m4s"
initialization="init.mp4" duration="{{ video_time_base_den * 5 }}" timescale="{{ video_time_base_den }}"
startNumber="0"/>
</Representation>
</AdaptationSet>
{% if has_audio %}
<AdaptationSet contentType="audio">
<BaseURL>audio/</BaseURL>
<Representation id="audio" mimeType="audio/mp4" codecs="mp4a.40.2">
<SegmentTemplate media="$Number$.m4s"
initialization="init.mp4" duration="{{ audio_time_base_den * 5 }}" timescale="{{ audio_time_base_den }}"
startNumber="0"/>
</Representation>
</AdaptationSet>
{% endif %}
</Period>
</MPD>

@ -1,66 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>DASH Test</title>
<link href="//vjs.zencdn.net/7.8.2/video-js.min.css" rel="stylesheet">
<script src="https://ajax.googleapis.com/ajax/libs/shaka-player/3.0.1/shaka-player.ui.js"></script>
<link rel="stylesheet" href="https://ajax.googleapis.com/ajax/libs/shaka-player/3.0.1/controls.css">
</head>
<body>
<video id="shaka" width="1280" height="720" controls></video>
<script type="application/javascript">
const manifestUri = 'test.mpd';
function initApp() {
// Install built-in polyfills to patch browser incompatibilities.
shaka.polyfill.installAll();
// Check to see if the browser supports the basic APIs Shaka needs.
if (shaka.Player.isBrowserSupported()) {
// Everything looks good!
initPlayer();
} else {
// This browser does not have the minimum set of APIs we need.
console.error('Browser not supported!');
}
}
async function initPlayer() {
// Create a Player instance.
const video = document.getElementById('shaka');
const player = new shaka.Player(video);
// Attach player to the window to make it easy to access in the JS console.
window.shakaPlayer = player;
// Listen for error events.
player.addEventListener('error', onErrorEvent);
// Try to load a manifest.
// This is an asynchronous process.
try {
await player.load(manifestUri);
// This runs if the asynchronous load is successful.
console.log('The video has now been loaded!');
} catch (e) {
// onError is executed if the asynchronous load fails.
onError(e);
}
}
function onErrorEvent(event) {
// Extract the shaka.util.Error object from the event.
onError(event.detail);
}
function onError(error) {
// Log the error.
console.error('Error code', error.code, 'object', error);
}
document.addEventListener('DOMContentLoaded', initApp);
</script>
</body>
</html>
Loading…
Cancel
Save