Merge pull request #664 from philippe44/passthrough-update

separated stream for each seek
This commit is contained in:
Sasha Hilton 2021-04-09 01:40:12 +01:00 committed by GitHub
commit 6df977907e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -5,75 +5,32 @@ use std::fmt;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
fn write_headers<T: Read + Seek>( fn get_header<T>(code: u8, rdr: &mut PacketReader<T>) -> Result<Box<[u8]>, PassthroughError>
rdr: &mut PacketReader<T>,
wtr: &mut PacketWriter<Vec<u8>>,
) -> Result<u32, PassthroughError> {
let mut stream_serial: u32 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u32;
// search for ident, comment, setup
get_header(1, rdr, wtr, &mut stream_serial, PacketWriteEndInfo::EndPage)?;
get_header(
3,
rdr,
wtr,
&mut stream_serial,
PacketWriteEndInfo::NormalPacket,
)?;
get_header(5, rdr, wtr, &mut stream_serial, PacketWriteEndInfo::EndPage)?;
// remove un-needed packets
rdr.delete_unread_packets();
return Ok(stream_serial);
}
fn get_header<T>(
code: u8,
rdr: &mut PacketReader<T>,
wtr: &mut PacketWriter<Vec<u8>>,
stream_serial: &mut u32,
info: PacketWriteEndInfo,
) -> Result<u32, PassthroughError>
where where
T: Read + Seek, T: Read + Seek,
{ {
let pck: Packet = rdr.read_packet_expected()?; let pck: Packet = rdr.read_packet_expected()?;
// set a unique serial number
if pck.stream_serial() != 0 {
*stream_serial = pck.stream_serial();
}
let pkt_type = pck.data[0]; let pkt_type = pck.data[0];
debug!("Vorbis header type{}", &pkt_type); debug!("Vorbis header type{}", &pkt_type);
// all headers are mandatory
if pkt_type != code { if pkt_type != code {
return Err(PassthroughError(OggReadError::InvalidData)); return Err(PassthroughError(OggReadError::InvalidData));
} }
// headers keep original granule number return Ok(pck.data.into_boxed_slice());
let absgp_page = pck.absgp_page();
wtr.write_packet(
pck.data.into_boxed_slice(),
*stream_serial,
info,
absgp_page,
)
.unwrap();
return Ok(*stream_serial);
} }
pub struct PassthroughDecoder<R: Read + Seek> { pub struct PassthroughDecoder<R: Read + Seek> {
rdr: PacketReader<R>, rdr: PacketReader<R>,
wtr: PacketWriter<Vec<u8>>, wtr: PacketWriter<Vec<u8>>,
lastgp_page: Option<u64>, eos: bool,
absgp_page: u64, bos: bool,
ofsgp_page: u64,
stream_serial: u32, stream_serial: u32,
ident: Box<[u8]>,
comment: Box<[u8]>,
setup: Box<[u8]>,
} }
pub struct PassthroughError(ogg::OggReadError); pub struct PassthroughError(ogg::OggReadError);
@ -82,17 +39,31 @@ impl<R: Read + Seek> PassthroughDecoder<R> {
/// Constructs a new Decoder from a given implementation of `Read + Seek`. /// Constructs a new Decoder from a given implementation of `Read + Seek`.
pub fn new(rdr: R) -> Result<Self, PassthroughError> { pub fn new(rdr: R) -> Result<Self, PassthroughError> {
let mut rdr = PacketReader::new(rdr); let mut rdr = PacketReader::new(rdr);
let mut wtr = PacketWriter::new(Vec::new()); let stream_serial = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u32;
let stream_serial = write_headers(&mut rdr, &mut wtr)?;
info!("Starting passthrough track with serial {}", stream_serial); info!("Starting passthrough track with serial {}", stream_serial);
// search for ident, comment, setup
let ident = get_header(1, &mut rdr)?;
let comment = get_header(3, &mut rdr)?;
let setup = get_header(5, &mut rdr)?;
// remove un-needed packets
rdr.delete_unread_packets();
return Ok(PassthroughDecoder { return Ok(PassthroughDecoder {
rdr, rdr,
wtr, wtr: PacketWriter::new(Vec::new()),
lastgp_page: Some(0), ofsgp_page: 0,
absgp_page: 0,
stream_serial, stream_serial,
ident,
comment,
setup,
eos: false,
bos: false,
}); });
} }
} }
@ -100,52 +71,94 @@ impl<R: Read + Seek> PassthroughDecoder<R> {
impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> { impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
fn seek(&mut self, ms: i64) -> Result<(), AudioError> { fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
info!("Seeking to {}", ms); info!("Seeking to {}", ms);
self.lastgp_page = match ms {
0 => Some(0), // add an eos to previous stream if missing
_ => None, if self.bos && !self.eos {
}; match self.rdr.read_packet() {
Ok(Some(pck)) => {
let absgp_page = pck.absgp_page() - self.ofsgp_page;
self.wtr
.write_packet(
pck.data.into_boxed_slice(),
self.stream_serial,
PacketWriteEndInfo::EndStream,
absgp_page,
)
.unwrap();
}
_ => warn! {"Cannot write EoS after seeking"},
};
}
self.eos = false;
self.bos = false;
self.ofsgp_page = 0;
self.stream_serial += 1;
// hard-coded to 44.1 kHz // hard-coded to 44.1 kHz
match self.rdr.seek_absgp(None, (ms * 44100 / 1000) as u64) { match self.rdr.seek_absgp(None, (ms * 44100 / 1000) as u64) {
Ok(_) => return Ok(()), Ok(_) => {
// need to set some offset for next_page()
let pck = self.rdr.read_packet().unwrap().unwrap();
self.ofsgp_page = pck.absgp_page();
debug!("Seek to offset page {}", self.ofsgp_page);
return Ok(());
}
Err(err) => return Err(AudioError::PassthroughError(err.into())), Err(err) => return Err(AudioError::PassthroughError(err.into())),
} }
} }
fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> { fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> {
let mut skip = self.lastgp_page.is_none(); // write headers if we are (re)starting
if self.bos == false {
self.wtr
.write_packet(
self.ident.clone(),
self.stream_serial,
PacketWriteEndInfo::EndPage,
0,
)
.unwrap();
self.wtr
.write_packet(
self.comment.clone(),
self.stream_serial,
PacketWriteEndInfo::NormalPacket,
0,
)
.unwrap();
self.wtr
.write_packet(
self.setup.clone(),
self.stream_serial,
PacketWriteEndInfo::EndPage,
0,
)
.unwrap();
self.bos = true;
debug!("Wrote Ogg headers");
}
loop { loop {
let pck = match self.rdr.read_packet() { let pck = match self.rdr.read_packet() {
Ok(Some(pck)) => pck, Ok(Some(pck)) => pck,
Ok(None) | Err(OggReadError::NoCapturePatternFound) => { Ok(None) | Err(OggReadError::NoCapturePatternFound) => {
info!("end of streaming"); info!("end of streaming");
return Ok(None); return Ok(None);
} }
Err(err) => return Err(AudioError::PassthroughError(err.into())), Err(err) => return Err(AudioError::PassthroughError(err.into())),
}; };
let pckgp_page = pck.absgp_page(); let pckgp_page = pck.absgp_page();
let lastgp_page = self.lastgp_page.get_or_insert(pckgp_page);
// consume packets till next page to get a granule reference // skip till we have audio and a calculable granule position
if skip { if pckgp_page == 0 || pckgp_page == self.ofsgp_page {
if *lastgp_page == pckgp_page { continue;
debug!("skipping packet");
continue;
}
skip = false;
info!("skipped at {}", pckgp_page);
} }
// now we can calculate absolute granule
self.absgp_page += pckgp_page - *lastgp_page;
self.lastgp_page = Some(pckgp_page);
// set packet type // set packet type
let inf = if pck.last_in_stream() { let inf = if pck.last_in_stream() {
self.lastgp_page = Some(0); self.eos = true;
PacketWriteEndInfo::EndStream PacketWriteEndInfo::EndStream
} else if pck.last_in_page() { } else if pck.last_in_page() {
PacketWriteEndInfo::EndPage PacketWriteEndInfo::EndPage
@ -158,7 +171,7 @@ impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
pck.data.into_boxed_slice(), pck.data.into_boxed_slice(),
self.stream_serial, self.stream_serial,
inf, inf,
self.absgp_page, pckgp_page - self.ofsgp_page,
) )
.unwrap(); .unwrap();