Fix GStreamer lagging audio on next track

Also: remove unnecessary thread and channel
This commit is contained in:
Roderick van Domburg 2022-01-09 20:38:54 +01:00
parent d2c377d14b
commit e69d5a8e91
No known key found for this signature in database
GPG key ID: FE2585E713F9F30A

View file

@ -1,13 +1,13 @@
use std::{ use std::{ops::Drop, thread};
ops::Drop,
sync::mpsc::{sync_channel, SyncSender},
thread,
};
use gstreamer as gst; use gstreamer as gst;
use gstreamer_app as gst_app; use gstreamer_app as gst_app;
use gst::{prelude::*, State}; use gst::{
event::{FlushStart, FlushStop},
prelude::*,
State,
};
use zerocopy::AsBytes; use zerocopy::AsBytes;
use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult}; use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult};
@ -18,7 +18,8 @@ use crate::{
#[allow(dead_code)] #[allow(dead_code)]
pub struct GstreamerSink { pub struct GstreamerSink {
tx: SyncSender<Vec<u8>>, appsrc: gst_app::AppSrc,
bufferpool: gst::BufferPool,
pipeline: gst::Pipeline, pipeline: gst::Pipeline,
format: AudioFormat, format: AudioFormat,
} }
@ -35,7 +36,7 @@ impl Open for GstreamerSink {
_ => format!("{:?}", format), _ => format!("{:?}", format),
}; };
let sample_size = format.size(); let sample_size = format.size();
let gst_bytes = 2048 * sample_size; let gst_bytes = NUM_CHANNELS as usize * 1024 * sample_size;
#[cfg(target_endian = "little")] #[cfg(target_endian = "little")]
const ENDIANNESS: &str = "LE"; const ENDIANNESS: &str = "LE";
@ -67,38 +68,25 @@ impl Open for GstreamerSink {
let appsrc: gst_app::AppSrc = appsrce let appsrc: gst_app::AppSrc = appsrce
.dynamic_cast::<gst_app::AppSrc>() .dynamic_cast::<gst_app::AppSrc>()
.expect("couldn't cast AppSrc element at runtime!"); .expect("couldn't cast AppSrc element at runtime!");
let bufferpool = gst::BufferPool::new();
let appsrc_caps = appsrc.caps().expect("couldn't get appsrc caps"); let appsrc_caps = appsrc.caps().expect("couldn't get appsrc caps");
let bufferpool = gst::BufferPool::new();
let mut conf = bufferpool.config(); let mut conf = bufferpool.config();
conf.set_params(Some(&appsrc_caps), 4096 * sample_size as u32, 0, 0); conf.set_params(Some(&appsrc_caps), gst_bytes as u32, 0, 0);
bufferpool bufferpool
.set_config(conf) .set_config(conf)
.expect("couldn't configure the buffer pool"); .expect("couldn't configure the buffer pool");
bufferpool
.set_active(true)
.expect("couldn't activate buffer pool");
let (tx, rx) = sync_channel::<Vec<u8>>(64 * sample_size);
thread::spawn(move || {
for data in rx {
let buffer = bufferpool.acquire_buffer(None);
if let Ok(mut buffer) = buffer {
let mutbuf = buffer.make_mut();
mutbuf.set_size(data.len());
mutbuf
.copy_from_slice(0, data.as_bytes())
.expect("Failed to copy from slice");
let _eat = appsrc.push_buffer(buffer);
}
}
});
thread::spawn(move || { thread::spawn(move || {
let thread_mainloop = mainloop; let thread_mainloop = mainloop;
let watch_mainloop = thread_mainloop.clone(); let watch_mainloop = thread_mainloop.clone();
bus.add_watch(move |_, msg| { bus.add_watch(move |_, msg| {
match msg.view() { match msg.view() {
gst::MessageView::Eos(..) => watch_mainloop.quit(), gst::MessageView::Eos(_) => {
println!("gst signaled end of stream");
watch_mainloop.quit();
}
gst::MessageView::Error(err) => { gst::MessageView::Error(err) => {
println!( println!(
"Error from {:?}: {} ({:?})", "Error from {:?}: {} ({:?})",
@ -122,7 +110,8 @@ impl Open for GstreamerSink {
.expect("unable to set the pipeline to the `Ready` state"); .expect("unable to set the pipeline to the `Ready` state");
Self { Self {
tx, appsrc,
bufferpool,
pipeline, pipeline,
format, format,
} }
@ -131,6 +120,10 @@ impl Open for GstreamerSink {
impl Sink for GstreamerSink { impl Sink for GstreamerSink {
fn start(&mut self) -> SinkResult<()> { fn start(&mut self) -> SinkResult<()> {
self.appsrc.send_event(FlushStop::new(true));
self.bufferpool
.set_active(true)
.expect("couldn't activate buffer pool");
self.pipeline self.pipeline
.set_state(State::Playing) .set_state(State::Playing)
.map_err(|e| SinkError::StateChange(e.to_string()))?; .map_err(|e| SinkError::StateChange(e.to_string()))?;
@ -138,9 +131,13 @@ impl Sink for GstreamerSink {
} }
fn stop(&mut self) -> SinkResult<()> { fn stop(&mut self) -> SinkResult<()> {
self.appsrc.send_event(FlushStart::new());
self.pipeline self.pipeline
.set_state(State::Paused) .set_state(State::Paused)
.map_err(|e| SinkError::StateChange(e.to_string()))?; .map_err(|e| SinkError::StateChange(e.to_string()))?;
self.bufferpool
.set_active(false)
.expect("couldn't deactivate buffer pool");
Ok(()) Ok(())
} }
@ -149,16 +146,30 @@ impl Sink for GstreamerSink {
impl Drop for GstreamerSink { impl Drop for GstreamerSink {
fn drop(&mut self) { fn drop(&mut self) {
// Follow the state transitions documented at:
// https://gstreamer.freedesktop.org/documentation/additional/design/states.html?gi-language=c
let _ = self.pipeline.set_state(State::Ready);
let _ = self.pipeline.set_state(State::Null); let _ = self.pipeline.set_state(State::Null);
} }
} }
impl SinkAsBytes for GstreamerSink { impl SinkAsBytes for GstreamerSink {
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
// Copy expensively (in to_vec()) to avoid thread synchronization let mut buffer = self
self.tx .bufferpool
.send(data.to_vec()) .acquire_buffer(None)
.expect("tx send failed in write function"); .map_err(|e| SinkError::OnWrite(e.to_string()))?;
let mutbuf = buffer.make_mut();
mutbuf.set_size(data.len());
mutbuf
.copy_from_slice(0, data.as_bytes())
.expect("Failed to copy from slice");
self.appsrc
.push_buffer(buffer)
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
Ok(()) Ok(())
} }
} }