Fix compile errors in backends

fe37186 added the restriction that `Sink`s must be `Send`. It turned
out later that this restrictions was unnecessary, and since some
`Sink`s aren't `Send` yet, this restriction is lifted again.

librespot-org/librespot#601 refactored the `RodioSink` in order to make
it `Send`. These changes are partly reverted in favour of the initial
simpler design.

Furthermore, there were some compile errors in the gstreamer backend
which are hereby fixed.
This commit is contained in:
johannesd3 2021-04-01 18:41:01 +02:00
parent 9a3a666856
commit 5435ab3270
5 changed files with 18 additions and 36 deletions

View file

@ -1,7 +1,9 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket; use crate::audio::AudioPacket;
use gst::prelude::*; use gst::prelude::*;
use gst::*; use gstreamer as gst;
use gstreamer_app as gst_app;
use zerocopy::*; use zerocopy::*;
use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::mpsc::{sync_channel, SyncSender};
@ -52,14 +54,13 @@ impl Open for GstreamerSink {
thread::spawn(move || { thread::spawn(move || {
for data in rx { for data in rx {
let buffer = bufferpool.acquire_buffer(None); let buffer = bufferpool.acquire_buffer(None);
if !buffer.is_err() { if let Ok(mut buffer) = buffer {
let mut okbuffer = buffer.unwrap(); let mutbuf = buffer.make_mut();
let mutbuf = okbuffer.make_mut();
mutbuf.set_size(data.len()); mutbuf.set_size(data.len());
mutbuf mutbuf
.copy_from_slice(0, data.as_bytes()) .copy_from_slice(0, data.as_bytes())
.expect("Failed to copy from slice"); .expect("Failed to copy from slice");
let _eat = appsrc.push_buffer(okbuffer); let _eat = appsrc.push_buffer(buffer);
} }
} }
}); });
@ -69,8 +70,8 @@ impl Open for GstreamerSink {
let watch_mainloop = thread_mainloop.clone(); let watch_mainloop = thread_mainloop.clone();
bus.add_watch(move |_, msg| { bus.add_watch(move |_, msg| {
match msg.view() { match msg.view() {
MessageView::Eos(..) => watch_mainloop.quit(), gst::MessageView::Eos(..) => watch_mainloop.quit(),
MessageView::Error(err) => { gst::MessageView::Error(err) => {
println!( println!(
"Error from {:?}: {} ({:?})", "Error from {:?}: {} ({:?})",
err.get_src().map(|s| s.get_path_string()), err.get_src().map(|s| s.get_path_string()),

View file

@ -11,9 +11,9 @@ pub trait Sink {
fn write(&mut self, packet: &AudioPacket) -> io::Result<()>; fn write(&mut self, packet: &AudioPacket) -> io::Result<()>;
} }
pub type SinkBuilder = fn(Option<String>) -> Box<dyn Sink + Send>; pub type SinkBuilder = fn(Option<String>) -> Box<dyn Sink>;
fn mk_sink<S: Sink + Open + Send + 'static>(device: Option<String>) -> Box<dyn Sink + Send> { fn mk_sink<S: Sink + Open + 'static>(device: Option<String>) -> Box<dyn Sink> {
Box::new(S::open(device)) Box::new(S::open(device))
} }

View file

@ -1,5 +1,4 @@
use std::process::exit; use std::process::exit;
use std::{convert::Infallible, sync::mpsc};
use std::{io, thread, time}; use std::{io, thread, time};
use cpal::traits::{DeviceTrait, HostTrait}; use cpal::traits::{DeviceTrait, HostTrait};
@ -15,12 +14,12 @@ use crate::audio::AudioPacket;
compile_error!("Rodio JACK backend is currently only supported on linux."); compile_error!("Rodio JACK backend is currently only supported on linux.");
#[cfg(feature = "rodio-backend")] #[cfg(feature = "rodio-backend")]
pub fn mk_rodio(device: Option<String>) -> Box<dyn Sink + Send> { pub fn mk_rodio(device: Option<String>) -> Box<dyn Sink> {
Box::new(open(cpal::default_host(), device)) Box::new(open(cpal::default_host(), device))
} }
#[cfg(feature = "rodiojack-backend")] #[cfg(feature = "rodiojack-backend")]
pub fn mk_rodiojack(device: Option<String>) -> Box<dyn Sink + Send> { pub fn mk_rodiojack(device: Option<String>) -> Box<dyn Sink> {
Box::new(open( Box::new(open(
cpal::host_from_id(cpal::HostId::Jack).unwrap(), cpal::host_from_id(cpal::HostId::Jack).unwrap(),
device, device,
@ -43,8 +42,7 @@ pub enum RodioError {
pub struct RodioSink { pub struct RodioSink {
rodio_sink: rodio::Sink, rodio_sink: rodio::Sink,
// will produce a TryRecvError on the receiver side when it is dropped. _stream: rodio::OutputStream,
_close_tx: mpsc::SyncSender<Infallible>,
} }
fn list_formats(device: &rodio::Device) { fn list_formats(device: &rodio::Device) {
@ -152,29 +150,12 @@ fn create_sink(
pub fn open(host: cpal::Host, device: Option<String>) -> RodioSink { pub fn open(host: cpal::Host, device: Option<String>) -> RodioSink {
debug!("Using rodio sink with cpal host: {}", host.id().name()); debug!("Using rodio sink with cpal host: {}", host.id().name());
let (sink_tx, sink_rx) = mpsc::sync_channel(1); let (sink, stream) = create_sink(&host, device).unwrap();
let (close_tx, close_rx) = mpsc::sync_channel(1);
std::thread::spawn(move || match create_sink(&host, device) {
Ok((sink, stream)) => {
sink_tx.send(Ok(sink)).unwrap();
close_rx.recv().unwrap_err(); // This will fail as soon as the sender is dropped
debug!("drop rodio::OutputStream");
drop(stream);
}
Err(e) => {
sink_tx.send(Err(e)).unwrap();
}
});
// Instead of the second `unwrap`, better error handling could be introduced
let sink = sink_rx.recv().unwrap().unwrap();
debug!("Rodio sink was created"); debug!("Rodio sink was created");
RodioSink { RodioSink {
rodio_sink: sink, rodio_sink: sink,
_close_tx: close_tx, _stream: stream,
} }
} }

View file

@ -50,7 +50,7 @@ struct PlayerInternal {
state: PlayerState, state: PlayerState,
preload: PlayerPreload, preload: PlayerPreload,
sink: Box<dyn Sink + Send>, sink: Box<dyn Sink>,
sink_status: SinkStatus, sink_status: SinkStatus,
sink_event_callback: Option<SinkEventCallback>, sink_event_callback: Option<SinkEventCallback>,
audio_filter: Option<Box<dyn AudioFilter + Send>>, audio_filter: Option<Box<dyn AudioFilter + Send>>,
@ -242,7 +242,7 @@ impl Player {
sink_builder: F, sink_builder: F,
) -> (Player, PlayerEventChannel) ) -> (Player, PlayerEventChannel)
where where
F: FnOnce() -> Box<dyn Sink + Send> + Send + 'static, F: FnOnce() -> Box<dyn Sink> + Send + 'static,
{ {
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (event_sender, event_receiver) = mpsc::unbounded_channel(); let (event_sender, event_receiver) = mpsc::unbounded_channel();

View file

@ -105,7 +105,7 @@ fn print_version() {
#[derive(Clone)] #[derive(Clone)]
struct Setup { struct Setup {
backend: fn(Option<String>) -> Box<dyn Sink + Send + 'static>, backend: fn(Option<String>) -> Box<dyn Sink + 'static>,
device: Option<String>, device: Option<String>,
mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>, mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>,