diff --git a/playback/src/audio_backend/gstreamer.rs b/playback/src/audio_backend/gstreamer.rs index 93aae00e..be48a74e 100644 --- a/playback/src/audio_backend/gstreamer.rs +++ b/playback/src/audio_backend/gstreamer.rs @@ -8,7 +8,7 @@ use glib::MainLoop; use zerocopy::*; pub struct GstreamerSink { - tx: SyncSender>, + tx: SyncSender>, pipeline: gst::Pipeline } @@ -21,6 +21,7 @@ impl Open for GstreamerSink { Some(x) => format!("{}{}", pipeline_str_preamble, x), None => format!("{}{}", pipeline_str_preamble, pipeline_str_rest) }; + println!("Pipeline: {}", pipeline_str); gst::init().unwrap(); let pipelinee = gst::parse_launch(&*pipeline_str).expect("New Pipeline error"); @@ -29,47 +30,20 @@ impl Open for GstreamerSink { let mut mainloop = glib::MainLoop::new(None, false); let mut appsrce : gst::Element = pipeline.get_by_name("appsrc0").expect("Couldn't get appsrc from pipeline"); let mut appsrc : gst_app::AppSrc = appsrce.dynamic_cast::().expect("Couldnt cast AppSrc element at runtime!"); - //let mut appsrc = gst_app::AppSrc::new_from_element(appsrc_element.to_element()); let bufferpool = gst::BufferPool::new(); let appsrc_caps = appsrc.get_caps().expect("get appsrc caps failed"); let mut conf = bufferpool.get_config(); - conf.set_params(Some(&appsrc_caps), 2048 * 2, 0, 0); - if bufferpool.set_active(true).is_err(){ - panic!("Couldn't activate buffer pool"); - } + conf.set_params(Some(&appsrc_caps), 8192, 0, 0); + bufferpool.set_config(conf).expect("Couldn't configure the buffer pool"); + bufferpool.set_active(true).expect("Couldn't activate buffer pool"); - /* - thread::spawn(move || { - let bus_receiver = bus.receiver(); - for message in bus_receiver.iter() { - match message.parse() { - gst::message::StateChanged(x) => - println!("element `{}` state changed", message.src_name()), - gst::message::Error(x) => { - println!("error msg from element `{}`: {}, quitting", message.src_name(), error.message()); - break; - }, - gst::message::Eos(ref _msg) => { - println!("eos received; quitting"); - break; - }, - _ => - println!("Pipe message: {} from {} at {}", message.type_name(), message.src_name(), message.timestamp()) - } - } - });*/ - - let (tx, rx) = sync_channel::>(64); + let (tx, rx) = sync_channel::>(128); thread::spawn(move || { for data in rx { let mut buffer = bufferpool.acquire_buffer(None).expect("acquire buffer"); - - //assert!(data.len() <= buffer.len::()); let mutbuf = buffer.make_mut(); - mutbuf.set_size(data.len() * 2); - mutbuf.map_writable().unwrap().as_mut_slice().clone_from_slice(&data[..].as_bytes()); - - //buffer.set_live(true); + mutbuf.set_size(data.len()); + mutbuf.copy_from_slice(0, data.as_bytes()); let res = appsrc.push_buffer(buffer).expect("Failed to push buffer"); } }); @@ -110,20 +84,17 @@ impl Open for GstreamerSink { impl Sink for GstreamerSink { fn start(&mut self) -> io::Result<()> { - //self.pipeline.play(); self.pipeline.set_state(gst::State::Playing).expect("Unable to set the pipeline to the `Playing` state"); Ok(()) } fn stop(&mut self) -> io::Result<()> { - //self.pipeline.pause(); self.pipeline.set_state(gst::State::Paused).expect("Unable to set the pipeline to the `Paused` state"); Ok(()) } fn write(&mut self, data: &[i16]) -> io::Result<()> { - // Copy expensively to avoid thread synchronization - let data = data.to_vec(); - self.tx.send(data).expect("tx send failed in write function"); - + // Copy expensively (in to_vec()) to avoid thread synchronization + let deighta : &[u8] = data.as_bytes(); + self.tx.send(deighta.to_vec()).expect("tx send failed in write function"); Ok(()) } } \ No newline at end of file