use crate::av::avio::{AVIOWriter, AVIO}; use crate::av::decoder::Decoder; use crate::av::dictionary::Dictionary; use crate::av::encoder::Encoder; use crate::av::format::Format; use crate::av::mov_avc::{annex_b_to_avc, AvcDecoderConfigurationRecord, NalBuffer, NalUnitType}; use crate::av::packet::Packet; use crate::av::resampler::Resampler; use crate::av::scaler::Scaler; use crate::av::stream::Stream; use crate::av::xcoder::XCoder; use crate::av::Rational; use crate::utils::SortedFrameBuffer; use ffmpeg_sys_next::AVMediaType::{AVMEDIA_TYPE_AUDIO, AVMEDIA_TYPE_VIDEO}; use ffmpeg_sys_next::AVPictureType::AV_PICTURE_TYPE_I; use ffmpeg_sys_next::AVPixelFormat::AV_PIX_FMT_YUV420P; use ffmpeg_sys_next::{avio_wb32, AVIOContext, AVPixelFormat, AV_TIME_BASE, SWS_BILINEAR}; use num_rational::Ratio; use std::cmp::{max, min}; use std::fs::{DirBuilder, File}; use std::io::Write; use std::ops::Mul; use std::option::Option::Some; pub struct AudioTranscoder { encoder: Encoder, decoder: Decoder, frame_buffer: SortedFrameBuffer, output_stream: Stream, input_stream: Stream, format: Format, output_path: String, resampler: Resampler, segment: Option, last_pts: i64, current_pts: i64, seconds_per_segment: u32, segment_target: Option, header_written: bool, last_written_segment: Option, latest_written_segment: Option, } pub const SECONDS_PER_SEGMENT: u32 = 5; impl AudioTranscoder { fn process_packet(&mut self, packet: Packet) -> Result<(), String> { self.decoder .send_packet(&packet) .map_err(|err| format!("Failed sending audio packet: {}", err))?; while let Some(frame) = self.decoder.read_frame()? { if let Some(target) = self.segment_target { let segment = (self .input_stream .time_base() .mul(frame.pts() as i32) .to_integer() / self.seconds_per_segment as i32) as u32; if (target as i64 - 1) > (segment as i64) { continue; } } let resampled = self.resampler.convert(&frame)?; let resampled_pts = resampled.pts(); self.frame_buffer.insert_frame(resampled); let mut offset: f64 = 0.0; while let Some(drain_resampled) = self.resampler.drain()? { offset += (drain_resampled.nb_samples() as f64 / self.encoder.sample_rate() as f64) * self.input_stream.time_base().den() as f64; drain_resampled.set_pts(resampled_pts + offset as i64); self.frame_buffer.insert_frame(drain_resampled); } self.encode()?; } Ok(()) } #[inline] pub fn input_stream(&self) -> &Stream { &self.input_stream } #[inline] pub fn output_stream(&self) -> &Stream { &self.output_stream } pub fn encoder(&self) -> &Encoder { &self.encoder } pub fn decoder(&self) -> &Decoder { &self.decoder } fn start_segment(&self, force: bool) { if self.segment == Some(0) && !force { return; } unsafe { let pb = (*self.format.as_mut_ptr()).pb; avio_wb32(pb, 24); ffio_wfourcc(pb, b"styp"); ffio_wfourcc(pb, b"msdh"); avio_wb32(pb, 0); /* minor */ ffio_wfourcc(pb, b"msdh"); ffio_wfourcc(pb, b"msix"); } } fn write_header(&mut self) -> Result<(), String> { File::create(format!("{}/audio-init.mp4", &self.output_path)) .unwrap() .write_all(&mut self.format.avio_inner_mut().unwrap().buffer()) .map_err(|_| "Failed to write audio init segment".to_string())?; self.start_segment(true); self.format.write_packet_null()?; Ok(()) } fn write_segment(&mut self) -> Result<(), String> { if self.segment == Some(0) { self.write_header()?; self.header_written = true; } if !self.header_written { self.format.avio_inner_mut().unwrap().buffer(); self.start_segment(true); self.format.write_packet_null()?; self.header_written = true; } if let Some(avio) = self.format.avio.as_mut() { avio.flush() } let mut segment = self.format.avio_inner_mut().unwrap().buffer(); if self.segment_target.unwrap_or(self.segment.unwrap()) == self.segment.unwrap() { self.segment_target = None; File::create(format!( "{}/audio-segment-{:0>5}.m4s", self.output_path, self.segment.unwrap() )) .unwrap() .write_all(&mut segment) .map_err(|_| format!("Failed to write audio segment {}", self.segment.unwrap()))?; println!("Wrote segment (audio) {}", self.segment.unwrap()); self.last_written_segment = self.segment; self.latest_written_segment = max(self.latest_written_segment, Some(self.segment.unwrap())); } else { println!( "Won't write segment (audio) {} (searching for {})", self.segment.unwrap(), self.segment_target.unwrap() ) } self.start_segment(false); Ok(()) } fn encode(&mut self) -> Result<(), String> { while let Some(frame) = self.frame_buffer.pop_first() { if self.last_pts > frame.pts() { println!("WARN: out of order frame"); self.last_pts = frame.pts(); } if self.last_pts == 0 { self.last_pts = frame.pts(); } self.current_pts = frame.pts() % (self.input_stream.time_base().den() as u32 * self.seconds_per_segment) as i64; self.segment = Some( (frame.pts() as f64 / self.input_stream.time_base().den() as f64).floor() as u32 / self.seconds_per_segment, ); if self.current_pts == 0 && Some(0) < self.segment { self.segment = Some(self.segment.unwrap() - 1) } let (should_skip, peek) = if let Some(peek) = self.frame_buffer.first_key() { ( (self.current_pts + (peek - frame.pts())) < (self.seconds_per_segment * self.input_stream.time_base().den() as u32) as i64, peek, ) } else { (false, frame.pts()) }; let pts_passed = peek - frame.pts(); if self.segment == Some(2) { println!( ">>>>>> {} {} {} {:?} {}", self.current_pts, self.last_pts, pts_passed, should_skip, peek ); } if (!should_skip || self.current_pts == 0) && ((pts_passed + self.current_pts) > (self.seconds_per_segment * self.input_stream.time_base().den() as u32) as i64 || (self.current_pts == 0 && Some(0) < self.segment)) { self.format.write_packet_null()?; self.write_segment()?; self.current_pts = (pts_passed + self.current_pts) % self.input_stream.time_base().den() as i64; } else { self.current_pts += pts_passed } self.last_pts = frame.pts(); self.encoder.send_audio_frame(&frame)?; while let Some(new_packet) = self.encoder.read_packet()? { new_packet.set_pts(frame.pts()); new_packet.set_dts(new_packet.pts()); new_packet.set_duration(pts_passed); new_packet.rescale( self.input_stream.time_base(), self.output_stream.time_base(), ); new_packet.set_stream(0); new_packet.data(); self.format.write_packet(&new_packet)?; } } Ok(()) } fn static_create_output(input: &Format<()>) -> Result<(Format, Stream), String> { let audio_avio = AVIO::writer(Buffer::default()); let format = Format::output_avio(audio_avio, "mp4")?; let output_stream = format.new_stream("aac")?; format.set_flags(input.flags()); Ok((format, output_stream)) } fn create_output(&mut self, input: &Format<()>) -> Result<(), String> { let (format, stream) = Self::static_create_output(input)?; self.output_stream = stream; self.format = format; Ok(()) } fn open_output(&self) -> Result<(), String> { Transcoder::static_open_output(&self.format, None) } } pub struct Transcoder { input: Format<()>, input_video: Stream, input_frame_rate: Ratio, video_output: Format, output_path: String, output_video: Stream, video_frame_buffer: SortedFrameBuffer, video_encoder: Encoder, video_decoder: Decoder, frame_scaler: Scaler, last_pts: i64, current_pts: i64, highest_pts: i64, video_segment: Option, extra_data: Option>, audio: Option, segment_target: Option, input_video_index: Option, input_audio_index: Option, seconds_per_segment: u32, header_written: bool, last_written_segment: Option, latest_written_segment: Option, } impl Transcoder { pub fn create( input_path: &str, output_path: &str, video_index: Option, audio_index: Option, ) -> Result { let input = Format::open(input_path, None).expect("Failed to open media"); let output_path = output_path.to_string(); let input_video = input .stream(Some(AVMEDIA_TYPE_VIDEO), video_index, None) .map_err(|err| format!("Failed to find video stream: {}", err))? .ok_or("Failed to find video stream".to_string())?; let input_audio = input .stream( Some(AVMEDIA_TYPE_AUDIO), audio_index, Some(input_video.index()), ) .map_err(|err| format!("Failed to find audio stream: {}", err))?; if audio_index.is_some() && input_audio.is_none() { return Err("Failed to find audio stream".to_string()); } let input_frame_rate = input_video.avg_frame_rate(&input).unwrap().to_num(); let (video_output, output_video) = Self::create_streams(&input, &input_video)?; let audio = if let Some(input_stream) = input_audio { let (format, output_stream) = AudioTranscoder::static_create_output(&input)?; let decoder = input_stream.decoder(None).ok_or(format!( "Couldn't find encoder for input audio with codec: {:?}", input_stream.codec() ))?; decoder.configure(&input_stream)?; decoder.open()?; decoder.set_time_base(input_stream.time_base()); let encoder = output_stream .encoder(None) .ok_or("Couldn't find encoder for AAC".to_string())?; encoder.set_channels(decoder.channels()); encoder.set_channel_layout(decoder.channel_layout()); encoder.set_sample_format( encoder .sample_formats() .first() .copied() .ok_or("No sample formats found for AAC")?, ); encoder.set_sample_rate(decoder.sample_rate()); encoder.set_time_base(input_stream.time_base()); encoder.set_time_base(Ratio::new(1, decoder.sample_rate())); output_stream.set_time_base(encoder.time_base()); encoder.set_frame_size(decoder.frame_size() * 4); encoder.open()?; encoder.configure(&output_stream)?; let resampler = Resampler::from_coder(&decoder, &encoder); Transcoder::static_open_output(&format, None)?; Some(AudioTranscoder { frame_buffer: SortedFrameBuffer::new(), encoder, decoder, format, output_stream, input_stream, output_path: output_path.to_string(), current_pts: 0, last_pts: 0, segment: None, resampler, seconds_per_segment: SECONDS_PER_SEGMENT, segment_target: None, header_written: false, latest_written_segment: None, last_written_segment: None, }) } else { None }; let video_decoder = input_video.decoder_select(None, |d| { d.configure(&input_video)?; d.open()?; Ok(()) })?; let video_encoder = output_video.encoder_select(None, |encoder| { encoder.set_pixel_format(AVPixelFormat::AV_PIX_FMT_YUV420P); encoder.set_color_range(video_decoder.color_range()); encoder.set_color_primaries(video_decoder.color_primaries()); encoder.set_color_trc(video_decoder.color_trc()); encoder.set_height(video_decoder.height()); encoder.set_width(video_decoder.width()); encoder.set_time_base(input_frame_rate.invert()); encoder.open()?; encoder.configure(&output_video)?; Ok(()) })?; Self::static_open_output(&video_output, Some((&input_video, &video_decoder)))?; let frame_scaler = Scaler::from_coder(&video_decoder, &video_encoder, SWS_BILINEAR); let video_frame_buffer = SortedFrameBuffer::new(); Ok(Transcoder { input, input_video, input_frame_rate, video_output, output_video, audio, segment_target: None, input_video_index: None, output_path, video_encoder, video_decoder, frame_scaler, video_frame_buffer, last_pts: 0, current_pts: 0, video_segment: None, extra_data: None, input_audio_index: None, seconds_per_segment: SECONDS_PER_SEGMENT, header_written: false, latest_written_segment: None, last_written_segment: None, highest_pts: 0, }) } fn create_output(&mut self) -> Result<(), String> { let (video_output, output_video) = Self::create_streams(&self.input, &self.input_video)?; self.video_output = video_output; self.output_video = output_video; Ok(()) } fn create_streams( input: &Format<()>, input_video: &Stream, ) -> Result<(Format, Stream), String> { let video_avio = AVIO::writer(Buffer::default()); let video_output = Format::output_avio(video_avio, "mp4")?; let output_video = video_output.new_stream("h264")?; output_video.params().copy_from(&input_video.params())?; output_video.set_sample_aspect_ratio(input_video.sample_aspect_ratio()); video_output.set_flags(input.flags()); Ok((video_output, output_video)) } fn open_output(&self) -> Result<(), String> { Self::static_open_output( &self.video_output, Some((&self.output_video, &self.video_decoder)), ) } fn static_open_output( output: &Format, video: Option<(&Stream, &Decoder)>, ) -> Result<(), String> { let mut options = Dictionary::new(); options.set("movflags", "+dash+delay_moov+skip_trailer+frag_custom"); if let Some((video, video_decoder)) = video { let param = video.params(); param.set_height(video_decoder.height()); param.set_width(video_decoder.width()); param.set_pixel_format(AV_PIX_FMT_YUV420P); } output.init_output(options) } pub fn video_encoder(&self) -> &Encoder { &self.video_encoder } pub fn video_decoder(&self) -> &Decoder { &self.video_decoder } pub fn reset_output(&mut self) -> Result<(), String> { let time_base = self.output_video.time_base(); self.create_output()?; self.output_video .set_time_base(self.video_encoder.time_base()); self.video_encoder.configure(&self.output_video)?; self.open_output()?; self.video_encoder.flush(); self.output_video.set_time_base(time_base); self.video_segment = None; self.header_written = false; let mut options = Dictionary::new(); options.set("movflags", "+dash+delay_moov+skip_trailer+frag_custom"); self.video_output.write_header(Some(options)).unwrap(); if let Some(audio) = self.audio.as_mut() { let time_base = audio.output_stream.time_base(); audio.create_output(&self.input)?; audio.output_stream.set_time_base(audio.encoder.time_base()); audio.encoder.configure(&audio.output_stream)?; audio.open_output()?; audio.encoder.flush(); audio.output_stream.set_time_base(time_base); audio.segment = None; audio.header_written = false; let mut options = Dictionary::new(); options.set("movflags", "+dash+delay_moov+skip_trailer+frag_custom"); audio.format.write_header(Some(options)).unwrap(); audio.last_pts = 0; } Ok(()) } pub fn seek(&mut self, segment: u32, stream_index: Option) -> Result<(), String> { self.input.seek( segment * self.seconds_per_segment, stream_index, segment < self.last_written_segment().unwrap_or(0), )?; self.reset_output()?; self.video_frame_buffer.clear(); self.segment_target = Some(segment); self.video_segment = None; if let Some(audio) = self.audio.as_mut() { audio.frame_buffer.clear(); audio.segment_target = Some(segment); audio.segment = None; } Ok(()) } pub fn open(&mut self) -> Result<(), String> { DirBuilder::new() .recursive(true) .create(self.output_path.to_string()) .map_err(|_| "Failed to create target directory".to_string())?; let mut options = Dictionary::new(); options.set("movflags", "+dash+delay_moov+skip_trailer+frag_custom"); self.video_output.write_header(Some(options)).unwrap(); self.input_video_index = Some(self.input_video.index()); self.input_audio_index = self.audio.as_ref().map(|ia| ia.input_stream.index()); Ok(()) } pub fn last_frame_secs(&self) -> f64 { self.last_pts as f64 / self.input_video.time_base().den() as f64 } pub fn latest_frame_secs(&self) -> f64 { self.highest_pts as f64 / self.input_video.time_base().den() as f64 } pub fn duration_secs(&self) -> f64 { self.input.duration() as f64 / AV_TIME_BASE as f64 } pub fn has_audio(&self) -> bool { self.audio.is_some() } pub fn transcode(&mut self) -> Result { if let Some(packet) = self.input.next_packet() { if self.input_video_index == Some(packet.stream()) { self.video_process_packet(packet)?; } else if self.input_audio_index == Some(packet.stream()) { if let Some(ref mut audio) = &mut self.audio { audio.process_packet(packet)?; } } return Ok(true); } self.video_frame_buffer.close(); self.encode_video()?; if let Some(audio) = self.audio.as_mut() { audio.frame_buffer.close(); audio.encode()?; } Ok(false) } pub fn avg_frame_rate(&self) -> Option { self.video_stream().avg_frame_rate(&self.input) } #[inline] pub fn video_stream(&self) -> &Stream { &self.input_video } #[inline] pub fn audio(&self) -> Option<&AudioTranscoder> { self.audio.as_ref() } #[inline] pub fn audio_stream(&self) -> Option<&Stream> { if let Some(ref audio) = self.audio { Some(audio.input_stream()) } else { None } } pub fn seeking(&self) -> bool { self.segment_target.is_some() || self .audio .as_ref() .map_or(false, |audio| audio.segment_target.is_some()) } pub fn last_written_audio_segment(&self) -> Option { self.audio.as_ref().and_then(|x| x.last_written_segment) } pub fn last_written_segment(&self) -> Option { self.last_written_segment } pub fn latest_written_segment(&self) -> Option { if let Some(audio) = self.audio.as_ref() { min(self.latest_written_segment, audio.latest_written_segment) } else { self.latest_written_segment } } pub fn segment(&self) -> u32 { min( self.video_segment.unwrap_or(0), self.audio .as_ref() .map_or(u32::MAX, |audio| audio.segment.unwrap_or(u32::MAX)), ) } pub fn finish(&mut self) -> Result<(), String> { self.encode_video()?; self.video_output.write_trailer()?; self.video_write_segment()?; if let Some(audio) = self.audio.as_mut() { audio.encode()?; audio.format.write_trailer()?; audio.write_segment()?; } Ok(()) } fn video_process_packet(&mut self, packet: Packet) -> Result<(), String> { self.video_decoder.send_packet(&packet)?; while let Some(frame) = self.video_decoder.read_frame()? { let segment: u32 = (self .input_video .time_base() .mul(packet.pts() as i32) .to_integer() / self.seconds_per_segment as i32) as u32; if let Some(target) = self.segment_target { if (target as i64 - 1) > (segment as i64) { continue; } } let pts = frame.pts(); let frame = self .frame_scaler .scale(frame) .expect("Failed to scale video frame"); frame.set_pts(pts); self.video_frame_buffer.insert_frame(frame); self.encode_video()?; } Ok(()) } fn video_write_header(&mut self) -> Result<(), String> { File::create(format!("{}/video-init.mp4", &self.output_path)) .unwrap() .write_all(&mut self.video_output.avio_inner_mut().unwrap().buffer()) .map_err(|_| "Failed to write video init segment".to_string())?; self.start_segment(true); self.video_output.write_packet_null()?; Ok(()) } fn video_write_segment(&mut self) -> Result<(), String> { if self.video_segment == Some(0) { self.video_write_header()?; self.header_written = true; } if !self.header_written { self.video_output.avio_inner_mut().unwrap().buffer(); self.start_segment(true); self.video_output.write_packet_null()?; self.header_written = true; } if let Some(avio) = self.video_output.avio.as_mut() { avio.flush() } let segment = self.video_output.avio_inner_mut().unwrap().buffer(); if self.segment_target.unwrap_or(self.video_segment.unwrap()) == self.video_segment.unwrap() { self.segment_target = None; let segment_res = annex_b_to_avc(segment, 4); if let Err(_) = segment_res { println!("oh"); } let mut segment = segment_res?; File::create(format!( "{}/video-segment-{:0>5}.m4s", self.output_path, self.video_segment.unwrap() )) .unwrap() .write_all(&mut segment) .map_err(|_| { format!( "Failed to write video segment {}", self.video_segment.unwrap() ) })?; self.last_written_segment = self.video_segment; self.latest_written_segment = max( self.latest_written_segment, Some(self.video_segment.unwrap()), ); println!( "Wrote segment (video) {} ({} -> {} / coder: {} {})", self.video_segment.unwrap(), self.input_video.time_base(), self.output_video.time_base(), self.video_decoder.time_base(), self.video_encoder.time_base() ); } else { println!( "Won't write segment (video) {} (searching for {})", self.video_segment.unwrap(), self.segment_target.unwrap() ) } self.start_segment(false); self.video_encoder.flush(); Ok(()) } fn encode_video(&mut self) -> Result<(), String> { while let Some(frame) = self.video_frame_buffer.pop_first() { // if self.video_segment == None || self.last_pts > frame.pts() { self.last_pts = frame.pts() - (self .avg_frame_rate() .unwrap() .invert() .to_num() .mul(self.input_video.time_base().den() as i32)) .as_f64() as i64; self.current_pts = frame.pts() % (self.input_video.time_base().den() as u32 * self.seconds_per_segment) as i64; self.video_segment = Some( (frame.pts() as f64 / self.input_video.time_base().den() as f64).floor() as u32 / self.seconds_per_segment, ); if self.current_pts == 0 && Some(0) < self.video_segment { self.video_segment = Some(self.video_segment.unwrap() - 1) } let pts_passed = frame.pts() - self.last_pts; if (pts_passed + self.current_pts) > (self.seconds_per_segment * self.input_video.time_base().den() as u32) as i64 || (self.current_pts == 0 && Some(0) < self.video_segment) { self.video_output.write_packet_null()?; self.video_write_segment()?; self.current_pts = (pts_passed + self.current_pts) % self.input_video.time_base().den() as i64; frame.set_pict_type(AV_PICTURE_TYPE_I); } else { self.current_pts += pts_passed } self.last_pts = frame.pts(); self.highest_pts = max(self.highest_pts, self.last_pts); self.video_encoder.send_video_frame(&frame)?; while let Some(new_packet) = self.video_encoder.read_packet()? { if self.extra_data.is_none() { let items = NalBuffer::from_stream(new_packet.data())?; let mut sps = None; let mut pss = None; for item in items { if item.nal_unit.nal_unit_type == NalUnitType::PictureParameterSet { pss = Some(item.to_vec()); } else if item.nal_unit.nal_unit_type == NalUnitType::SequenceParameterSet { sps = Some(item.to_vec()); } } if let Some(sps) = sps { if let Some(pss) = pss { let config = AvcDecoderConfigurationRecord::from_parameter_sets(sps, pss); self.extra_data = Some(config.to_bytes()?); } } if let Some(extra_data) = &self.extra_data { self.output_video.params().set_extra_data(extra_data); self.output_video .params() .set_sample_aspect_ratio(Ratio::new(1, 1)); } } new_packet.set_pts(frame.pts()); new_packet.set_dts(new_packet.pts()); new_packet.set_duration(pts_passed); new_packet.rescale(self.input_video.time_base(), self.output_video.time_base()); new_packet.set_stream(0); self.video_output.write_packet(&new_packet)?; } } Ok(()) } pub fn video_stream_output(&self) -> &Stream { &self.output_video } pub fn audio_stream_output(&self) -> Option<&Stream> { self.audio.as_ref().map(|audio| &audio.output_stream) } fn start_segment(&self, force: bool) { if self.video_segment == Some(0) && !force { return; } unsafe { let pb = (*self.video_output.as_mut_ptr()).pb; avio_wb32(pb, 24); ffio_wfourcc(pb, b"styp"); ffio_wfourcc(pb, b"msdh"); avio_wb32(pb, 0); /* minor */ ffio_wfourcc(pb, b"msdh"); ffio_wfourcc(pb, b"msix"); } } pub fn stop(self) {} } #[derive(Default, Debug)] struct Buffer { buffer: Vec, } impl Buffer { fn buffer(&mut self) -> Vec { std::mem::replace(&mut self.buffer, vec![]) } } impl AVIOWriter for Buffer { fn write(&mut self, mut buffer: Vec) { self.buffer.append(&mut buffer) } } #[inline] fn ffio_wfourcc(pb: *mut AVIOContext, tag: &[u8]) { let mut nr: u32 = 0; nr |= tag[3] as u32; nr |= (tag[2] as u32) << 8; nr |= (tag[1] as u32) << 16; nr |= (tag[0] as u32) << 24; unsafe { avio_wb32(pb, nr) } }