From 35ec580eac78ab43684f7d2c24de71153db582ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20B=C3=A4chler?= Date: Sun, 17 Dec 2017 14:29:58 +0100 Subject: [PATCH 1/4] Disable the "variable does not need to be mutable" compiler warning in generated code --- core/src/lib.in.rs | 1 + src/lib.in.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/core/src/lib.in.rs b/core/src/lib.in.rs index b3b606b4..c534cec4 100644 --- a/core/src/lib.in.rs +++ b/core/src/lib.in.rs @@ -1 +1,2 @@ +#[allow(unused_mut)] pub mod connection; diff --git a/src/lib.in.rs b/src/lib.in.rs index be92c5d8..9dc5e82c 100644 --- a/src/lib.in.rs +++ b/src/lib.in.rs @@ -1 +1,2 @@ +#[allow(unused_mut)] pub mod spirc; From f5d8019c18ef565b237bbefea9c2dbdde23cd374 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20B=C3=A4chler?= Date: Mon, 27 Nov 2017 20:01:30 +0100 Subject: [PATCH 2/4] Add proper error handling to the pulseaudio backend and ensure that no invalid pointers are passed to pulseaudio --- Cargo.lock | 15 +++--- Cargo.toml | 3 +- src/audio_backend/pulseaudio.rs | 92 +++++++++++++++++++++++---------- src/lib.rs | 3 ++ 4 files changed, 79 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d8128db1..eaddea19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,10 +1,3 @@ -[root] -name = "librespot-protocol" -version = "0.1.0" -dependencies = [ - "protobuf 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "aho-corasick" version = "0.6.3" @@ -271,6 +264,7 @@ dependencies = [ "futures 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)", "getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)", "libpulse-sys 0.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "librespot-audio 0.1.0", "librespot-core 0.1.0", @@ -353,6 +347,13 @@ dependencies = [ "protobuf 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "librespot-protocol" +version = "0.1.0" +dependencies = [ + "protobuf 1.4.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "linear-map" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index f4e63498..82bef079 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,6 +52,7 @@ url = "1.3" alsa = { git = "https://github.com/plietar/rust-alsa", optional = true } portaudio-rs = { version = "0.3.0", optional = true } libpulse-sys = { version = "0.0.0", optional = true } +libc = { version = "0.2", optional = true } [build-dependencies] rand = "0.3.13" @@ -61,7 +62,7 @@ protobuf_macros = { git = "https://github.com/plietar/rust-protobuf-macros", fea [features] alsa-backend = ["alsa"] portaudio-backend = ["portaudio-rs"] -pulseaudio-backend = ["libpulse-sys"] +pulseaudio-backend = ["libpulse-sys", "libc"] with-tremor = ["librespot-audio/with-tremor"] with-lewton = ["librespot-audio/with-lewton"] diff --git a/src/audio_backend/pulseaudio.rs b/src/audio_backend/pulseaudio.rs index 5e3b8c18..e9f0039b 100644 --- a/src/audio_backend/pulseaudio.rs +++ b/src/audio_backend/pulseaudio.rs @@ -2,8 +2,10 @@ use super::{Open, Sink}; use std::io; use libpulse_sys::*; use std::ptr::{null, null_mut}; -use std::mem::{transmute}; use std::ffi::CString; +use std::ffi::CStr; +use std::mem; +use libc; pub struct PulseAudioSink { s : *mut pa_simple, @@ -12,6 +14,39 @@ pub struct PulseAudioSink { desc : CString } +fn call_pulseaudio(f: F, fail_check: FailCheck, kind: io::ErrorKind) -> io::Result where + T: Copy, + F: Fn(*mut libc::c_int) -> T, + FailCheck: Fn(T) -> bool, +{ + let mut error: libc::c_int = 0; + let ret = f(&mut error); + if fail_check(ret) { + let err_cstr = unsafe { CStr::from_ptr(pa_strerror(error)) }; + let errstr = err_cstr.to_string_lossy().into_owned(); + Err(io::Error::new(kind, errstr)) + } else { + Ok(ret) + } +} + +impl PulseAudioSink { + fn free_connection(&mut self) { + if self.s != null_mut() { + unsafe { + pa_simple_free(self.s); + } + self.s = null_mut(); + } + } +} + +impl Drop for PulseAudioSink { + fn drop(&mut self) { + self.free_connection(); + } +} + impl Open for PulseAudioSink { fn open(device: Option) -> PulseAudioSink { debug!("Using PulseAudio sink"); @@ -27,7 +62,7 @@ impl Open for PulseAudioSink { }; let name = CString::new("librespot").unwrap(); - let description = CString::new("A spoty client library").unwrap(); + let description = CString::new("Spotify endpoint").unwrap(); PulseAudioSink { s: null_mut(), @@ -41,38 +76,43 @@ impl Open for PulseAudioSink { impl Sink for PulseAudioSink { fn start(&mut self) -> io::Result<()> { if self.s == null_mut() { - self.s = unsafe { - pa_simple_new(null(), // Use the default server. - self.name.as_ptr(), // Our application's name. - PA_STREAM_PLAYBACK, - null(), // Use the default device. - self.desc.as_ptr(), // desc of our stream. - &self.ss, // Our sample format. - null(), // Use default channel map - null(), // Use default buffering attributes. - null_mut(), // Ignore error code. - ) - }; - assert!(self.s != null_mut()); + self.s = call_pulseaudio( + |err| unsafe { + pa_simple_new(null(), // Use the default server. + self.name.as_ptr(), // Our application's name. + PA_STREAM_PLAYBACK, + null(), // Use the default device. + self.desc.as_ptr(), // desc of our stream. + &self.ss, // Our sample format. + null(), // Use default channel map + null(), // Use default buffering attributes. + err) + }, + |ptr| ptr == null_mut(), + io::ErrorKind::ConnectionRefused)?; } Ok(()) } fn stop(&mut self) -> io::Result<()> { - unsafe { - pa_simple_free(self.s); - } - self.s = null_mut(); + self.free_connection(); Ok(()) } fn write(&mut self, data: &[i16]) -> io::Result<()> { - unsafe { - let ptr = transmute(data.as_ptr()); - let bytes = data.len() as usize * 2; - pa_simple_write(self.s, ptr, bytes, null_mut()); - }; - - Ok(()) + if self.s == null_mut() { + Err(io::Error::new(io::ErrorKind::NotConnected, "Not connected to pulseaudio")) + } + else { + let ptr = data.as_ptr() as *const libc::c_void; + let len = data.len() as usize * mem::size_of::(); + call_pulseaudio( + |err| unsafe { + pa_simple_write(self.s, ptr, len, err) + }, + |ret| ret < 0, + io::ErrorKind::BrokenPipe)?; + Ok(()) + } } } diff --git a/src/lib.rs b/src/lib.rs index b9c920ec..5eaab011 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,6 +34,9 @@ extern crate portaudio_rs; #[cfg(feature = "libpulse-sys")] extern crate libpulse_sys; +#[cfg(feature = "libc")] +extern crate libc; + pub mod audio_backend; pub mod discovery; pub mod keymaster; From f250179fed78ecc5f26e55085f8f59043e6c3da5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20B=C3=A4chler?= Date: Tue, 28 Nov 2017 00:35:04 +0100 Subject: [PATCH 3/4] Join the player thread when the player is dropped --- src/player.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/player.rs b/src/player.rs index 29380e33..c07082f6 100644 --- a/src/player.rs +++ b/src/player.rs @@ -16,9 +16,9 @@ use audio::{VorbisDecoder, VorbisPacket}; use metadata::{FileFormat, Track, Metadata}; use mixer::AudioFilter; -#[derive(Clone)] pub struct Player { - commands: std::sync::mpsc::Sender, + commands: Option>, + thread_handle: Option>, } struct PlayerInternal { @@ -47,7 +47,7 @@ impl Player { { let (cmd_tx, cmd_rx) = std::sync::mpsc::channel(); - thread::spawn(move || { + let handle = thread::spawn(move || { debug!("new Player[{}]", session.session_id()); let internal = PlayerInternal { @@ -64,12 +64,13 @@ impl Player { }); Player { - commands: cmd_tx, + commands: Some(cmd_tx), + thread_handle: Some(handle), } } fn command(&self, cmd: PlayerCommand) { - self.commands.send(cmd).unwrap(); + self.commands.as_ref().unwrap().send(cmd).unwrap(); } pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32) @@ -98,6 +99,19 @@ impl Player { } } +impl Drop for Player { + fn drop(&mut self) { + debug!("Shutting down player thread ..."); + self.commands = None; + if let Some(handle) = self.thread_handle.take() { + match handle.join() { + Ok(_) => (), + Err(_) => error!("Player thread panicked!") + } + } + } +} + type Decoder = VorbisDecoder>>; enum PlayerState { Stopped, From 4cda8affcd188815ef552889676eca0b6bf1724f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Thomas=20B=C3=A4chler?= Date: Wed, 29 Nov 2017 00:18:12 +0100 Subject: [PATCH 4/4] Handle audio sink errors in the player Failing to open or write to the audio sink is not necessarily a fatal and permanent error. When the audio sink fails, the player now tries to restart the sink periodically. --- src/player.rs | 77 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 60 insertions(+), 17 deletions(-) diff --git a/src/player.rs b/src/player.rs index c07082f6..94df2e1c 100644 --- a/src/player.rs +++ b/src/player.rs @@ -2,8 +2,9 @@ use futures::sync::oneshot; use futures::{future, Future}; use std::borrow::Cow; use std::mem; -use std::sync::mpsc::{RecvError, TryRecvError}; +use std::sync::mpsc::{RecvError, TryRecvError, RecvTimeoutError}; use std::thread; +use std::time::Duration; use std; use core::config::{Bitrate, PlayerConfig}; @@ -28,6 +29,7 @@ struct PlayerInternal { state: PlayerState, sink: Box, + sink_running: bool, audio_filter: Option>, } @@ -57,6 +59,7 @@ impl Player { state: PlayerState::Stopped, sink: sink_builder(), + sink_running: false, audio_filter: audio_filter, }; @@ -191,10 +194,21 @@ impl PlayerInternal { fn run(mut self) { loop { let cmd = if self.state.is_playing() { - match self.commands.try_recv() { - Ok(cmd) => Some(cmd), - Err(TryRecvError::Empty) => None, - Err(TryRecvError::Disconnected) => return, + if self.sink_running + { + match self.commands.try_recv() { + Ok(cmd) => Some(cmd), + Err(TryRecvError::Empty) => None, + Err(TryRecvError::Disconnected) => return, + } + } + else + { + match self.commands.recv_timeout(Duration::from_secs(5)) { + Ok(cmd) => Some(cmd), + Err(RecvTimeoutError::Timeout) => None, + Err(RecvTimeoutError::Disconnected) => return, + } } } else { match self.commands.recv() { @@ -207,16 +221,42 @@ impl PlayerInternal { self.handle_command(cmd); } - let packet = if let PlayerState::Playing { ref mut decoder, .. } = self.state { - Some(decoder.next_packet().expect("Vorbis error")) - } else { None }; + if self.state.is_playing() && ! self.sink_running { + self.start_sink(); + } - if let Some(packet) = packet { - self.handle_packet(packet); + if self.sink_running { + let packet = if let PlayerState::Playing { ref mut decoder, .. } = self.state { + Some(decoder.next_packet().expect("Vorbis error")) + } else { + None + }; + + if let Some(packet) = packet { + self.handle_packet(packet); + } } } } + fn start_sink(&mut self) { + match self.sink.start() { + Ok(()) => self.sink_running = true, + Err(err) => error!("Could not start audio: {}", err), + } + } + + fn stop_sink_if_running(&mut self) { + if self.sink_running { + self.stop_sink(); + } + } + + fn stop_sink(&mut self) { + self.sink.stop().unwrap(); + self.sink_running = false; + } + fn handle_packet(&mut self, packet: Option) { match packet { Some(mut packet) => { @@ -224,11 +264,14 @@ impl PlayerInternal { editor.modify_stream(&mut packet.data_mut()) }; - self.sink.write(&packet.data()).unwrap(); + if let Err(err) = self.sink.write(&packet.data()) { + error!("Could not write audio: {}", err); + self.stop_sink(); + } } None => { - self.sink.stop().unwrap(); + self.stop_sink(); self.run_onstop(); let old_state = mem::replace(&mut self.state, PlayerState::Stopped); @@ -242,7 +285,7 @@ impl PlayerInternal { match cmd { PlayerCommand::Load(track_id, play, position, end_of_track) => { if self.state.is_playing() { - self.sink.stop().unwrap(); + self.stop_sink_if_running(); } match self.load_track(track_id, position as i64) { @@ -251,7 +294,7 @@ impl PlayerInternal { if !self.state.is_playing() { self.run_onstart(); } - self.sink.start().unwrap(); + self.start_sink(); self.state = PlayerState::Playing { decoder: decoder, @@ -294,7 +337,7 @@ impl PlayerInternal { self.state.paused_to_playing(); self.run_onstart(); - self.sink.start().unwrap(); + self.start_sink(); } else { warn!("Player::play called from invalid state"); } @@ -304,7 +347,7 @@ impl PlayerInternal { if let PlayerState::Playing { .. } = self.state { self.state.playing_to_paused(); - self.sink.stop().unwrap(); + self.stop_sink_if_running(); self.run_onstop(); } else { warn!("Player::pause called from invalid state"); @@ -314,7 +357,7 @@ impl PlayerInternal { PlayerCommand::Stop => { match self.state { PlayerState::Playing { .. } => { - self.sink.stop().unwrap(); + self.stop_sink_if_running(); self.run_onstop(); self.state = PlayerState::Stopped; }