Merge branch 'dev' into resampling

This commit is contained in:
Jason Gray 2023-09-04 10:23:41 -05:00 committed by GitHub
commit b867ab2eab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 112 additions and 56 deletions

View file

@ -36,6 +36,9 @@ https://github.com/librespot-org/librespot
- [all] `chrono` replaced with `time` (breaking) - [all] `chrono` replaced with `time` (breaking)
- [all] `time` updated (CVE-2020-26235) - [all] `time` updated (CVE-2020-26235)
- [all] Improve lock contention and performance (breaking) - [all] Improve lock contention and performance (breaking)
- [all] Use a single `player` instance. Eliminates occasional `player` and
`audio backend` restarts, which can cause issues with some playback
configurations.
- [audio] Files are now downloaded over the HTTPS CDN (breaking) - [audio] Files are now downloaded over the HTTPS CDN (breaking)
- [audio] Improve file opening and seeking performance (breaking) - [audio] Improve file opening and seeking performance (breaking)
- [core] MSRV is now 1.65 (breaking) - [core] MSRV is now 1.65 (breaking)

4
Cargo.lock generated
View file

@ -2395,9 +2395,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
version = "0.100.1" version = "0.100.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6207cd5ed3d8dca7816f8f3725513a34609c0c765bf652b8c3cb4cfd87db46b" checksum = "e98ff011474fa39949b7e5c0428f9b4937eda7da7848bbb947786b7be0b27dab"
dependencies = [ dependencies = [
"ring", "ring",
"untrusted", "untrusted",

View file

@ -3,6 +3,7 @@ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
sync::atomic::{AtomicUsize, Ordering}, sync::atomic::{AtomicUsize, Ordering},
sync::Arc,
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
}; };
@ -77,8 +78,8 @@ enum SpircPlayStatus {
type BoxedStream<T> = Pin<Box<dyn FusedStream<Item = T> + Send>>; type BoxedStream<T> = Pin<Box<dyn FusedStream<Item = T> + Send>>;
struct SpircTask { struct SpircTask {
player: Player, player: Arc<Player>,
mixer: Box<dyn Mixer>, mixer: Arc<dyn Mixer>,
sequence: SeqGenerator<u32>, sequence: SeqGenerator<u32>,
@ -272,8 +273,8 @@ impl Spirc {
config: ConnectConfig, config: ConnectConfig,
session: Session, session: Session,
credentials: Credentials, credentials: Credentials,
player: Player, player: Arc<Player>,
mixer: Box<dyn Mixer>, mixer: Arc<dyn Mixer>,
) -> Result<(Spirc, impl Future<Output = ()>), Error> { ) -> Result<(Spirc, impl Future<Output = ()>), Error> {
let spirc_id = SPIRC_COUNTER.fetch_add(1, Ordering::AcqRel); let spirc_id = SPIRC_COUNTER.fetch_add(1, Ordering::AcqRel);
debug!("new Spirc[{}]", spirc_id); debug!("new Spirc[{}]", spirc_id);
@ -663,6 +664,11 @@ impl SpircTask {
} }
fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> {
// update play_request_id
if let PlayerEvent::PlayRequestIdChanged { play_request_id } = event {
self.play_request_id = Some(play_request_id);
return Ok(());
}
// we only process events if the play_request_id matches. If it doesn't, it is // we only process events if the play_request_id matches. If it doesn't, it is
// an event that belongs to a previous track and only arrives now due to a race // an event that belongs to a previous track and only arrives now due to a race
// condition. In this case we have updated the state already and don't want to // condition. In this case we have updated the state already and don't want to
@ -1462,7 +1468,7 @@ impl SpircTask {
Some((track, index)) => { Some((track, index)) => {
self.state.set_playing_track_index(index); self.state.set_playing_track_index(index);
self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); self.player.load(track, start_playing, position_ms);
self.update_state_position(position_ms); self.update_state_position(position_ms);
if start_playing { if start_playing {

View file

@ -27,7 +27,7 @@ impl From<AuthenticationError> for Error {
} }
/// The credentials are used to log into the Spotify API. /// The credentials are used to log into the Spotify API.
#[derive(Debug, Clone, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)]
pub struct Credentials { pub struct Credentials {
pub username: String, pub username: String,

View file

@ -176,9 +176,15 @@ impl Session {
self.set_username(&reusable_credentials.username); self.set_username(&reusable_credentials.username);
if let Some(cache) = self.cache() { if let Some(cache) = self.cache() {
if store_credentials { if store_credentials {
let cred_changed = cache
.credentials()
.map(|c| c != reusable_credentials)
.unwrap_or(true);
if cred_changed {
cache.save_credentials(&reusable_credentials); cache.save_credentials(&reusable_credentials);
} }
} }
}
let (tx_connection, rx_connection) = mpsc::unbounded_channel(); let (tx_connection, rx_connection) = mpsc::unbounded_channel();
self.0 self.0

View file

@ -17,6 +17,7 @@ use librespot_metadata::{Album, Metadata};
use librespot_playback::mixer::{softmixer::SoftMixer, Mixer, MixerConfig}; use librespot_playback::mixer::{softmixer::SoftMixer, Mixer, MixerConfig};
use librespot_protocol::spirc::TrackRef; use librespot_protocol::spirc::TrackRef;
use std::env; use std::env;
use std::sync::Arc;
use tokio::join; use tokio::join;
#[tokio::main] #[tokio::main]
@ -55,7 +56,7 @@ async fn main() {
session.clone(), session.clone(),
credentials, credentials,
player, player,
Box::new(SoftMixer::open(MixerConfig::default())), Arc::new(SoftMixer::open(MixerConfig::default())),
) )
.await .await
.unwrap(); .unwrap();

View file

@ -55,8 +55,8 @@ impl Open for JackSink {
let client_name = client_name.unwrap_or_else(|| "librespot".to_string()); let client_name = client_name.unwrap_or_else(|| "librespot".to_string());
let (client, _status) = let (client, _status) =
Client::new(&client_name[..], ClientOptions::NO_START_SERVER).unwrap(); Client::new(&client_name[..], ClientOptions::NO_START_SERVER).unwrap();
let ch_r = client.register_port("out_0", AudioOut::default()).unwrap(); let ch_r = client.register_port("out_0", AudioOut).unwrap();
let ch_l = client.register_port("out_1", AudioOut::default()).unwrap(); let ch_l = client.register_port("out_1", AudioOut).unwrap();
// buffer for samples from librespot (~10ms) // buffer for samples from librespot (~10ms)
let (tx, rx) = sync_channel::<f32>(NUM_CHANNELS as usize * 1024 * AudioFormat::F32.size()); let (tx, rx) = sync_channel::<f32>(NUM_CHANNELS as usize * 1024 * AudioFormat::F32.size());
let jack_data = JackData { let jack_data = JackData {

View file

@ -1,3 +1,5 @@
use std::sync::Arc;
use crate::config::VolumeCtrl; use crate::config::VolumeCtrl;
pub mod mappings; pub mod mappings;
@ -5,7 +7,7 @@ use self::mappings::MappedCtrl;
pub struct NoOpVolume; pub struct NoOpVolume;
pub trait Mixer: Send { pub trait Mixer: Send + Sync {
fn open(config: MixerConfig) -> Self fn open(config: MixerConfig) -> Self
where where
Self: Sized; Self: Sized;
@ -55,10 +57,10 @@ impl Default for MixerConfig {
} }
} }
pub type MixerFn = fn(MixerConfig) -> Box<dyn Mixer>; pub type MixerFn = fn(MixerConfig) -> Arc<dyn Mixer>;
fn mk_sink<M: Mixer + 'static>(config: MixerConfig) -> Box<dyn Mixer> { fn mk_sink<M: Mixer + 'static>(config: MixerConfig) -> Arc<dyn Mixer> {
Box::new(M::open(config)) Arc::new(M::open(config))
} }
pub const MIXERS: &[(&str, MixerFn)] = &[ pub const MIXERS: &[(&str, MixerFn)] = &[

View file

@ -51,7 +51,6 @@ pub type PlayerResult = Result<(), Error>;
pub struct Player { pub struct Player {
commands: Option<mpsc::UnboundedSender<PlayerCommand>>, commands: Option<mpsc::UnboundedSender<PlayerCommand>>,
thread_handle: Option<thread::JoinHandle<()>>, thread_handle: Option<thread::JoinHandle<()>>,
play_request_id_generator: SeqGenerator<u64>,
} }
#[derive(PartialEq, Eq, Debug, Clone, Copy)] #[derive(PartialEq, Eq, Debug, Clone, Copy)]
@ -79,6 +78,7 @@ struct PlayerInternal {
auto_normalise_as_album: bool, auto_normalise_as_album: bool,
player_id: usize, player_id: usize,
play_request_id_generator: SeqGenerator<u64>,
} }
pub static PLAYER_COUNTER: AtomicUsize = AtomicUsize::new(0); pub static PLAYER_COUNTER: AtomicUsize = AtomicUsize::new(0);
@ -86,7 +86,6 @@ pub static PLAYER_COUNTER: AtomicUsize = AtomicUsize::new(0);
enum PlayerCommand { enum PlayerCommand {
Load { Load {
track_id: SpotifyId, track_id: SpotifyId,
play_request_id: u64,
play: bool, play: bool,
position_ms: u32, position_ms: u32,
}, },
@ -97,6 +96,7 @@ enum PlayerCommand {
Pause, Pause,
Stop, Stop,
Seek(u32), Seek(u32),
SetSession(Session),
AddEventSender(mpsc::UnboundedSender<PlayerEvent>), AddEventSender(mpsc::UnboundedSender<PlayerEvent>),
SetSinkEventCallback(Option<SinkEventCallback>), SetSinkEventCallback(Option<SinkEventCallback>),
EmitVolumeChangedEvent(u16), EmitVolumeChangedEvent(u16),
@ -123,6 +123,10 @@ enum PlayerCommand {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum PlayerEvent { pub enum PlayerEvent {
// Play request id changed
PlayRequestIdChanged {
play_request_id: u64,
},
// Fired when the player is stopped (e.g. by issuing a "stop" command to the player). // Fired when the player is stopped (e.g. by issuing a "stop" command to the player).
Stopped { Stopped {
play_request_id: u64, play_request_id: u64,
@ -318,7 +322,7 @@ impl Player {
session: Session, session: Session,
volume_getter: Box<dyn VolumeGetter>, volume_getter: Box<dyn VolumeGetter>,
sink_builder: F, sink_builder: F,
) -> Self ) -> Arc<Self>
where where
F: FnOnce() -> Box<dyn Sink> + Send + 'static, F: FnOnce() -> Box<dyn Sink> + Send + 'static,
{ {
@ -349,6 +353,7 @@ impl Player {
auto_normalise_as_album: false, auto_normalise_as_album: false,
player_id, player_id,
play_request_id_generator: SeqGenerator::new(0),
}; };
// While PlayerInternal is written as a future, it still contains blocking code. // While PlayerInternal is written as a future, it still contains blocking code.
@ -382,11 +387,17 @@ impl Player {
} }
}; };
Self { Arc::new(Self {
commands: Some(cmd_tx), commands: Some(cmd_tx),
thread_handle: Some(handle), thread_handle: Some(handle),
play_request_id_generator: SeqGenerator::new(0), })
} }
pub fn is_invalid(&self) -> bool {
if let Some(handle) = self.thread_handle.as_ref() {
return handle.is_finished();
}
true
} }
fn command(&self, cmd: PlayerCommand) { fn command(&self, cmd: PlayerCommand) {
@ -397,16 +408,12 @@ impl Player {
} }
} }
pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 { pub fn load(&self, track_id: SpotifyId, start_playing: bool, position_ms: u32) {
let play_request_id = self.play_request_id_generator.get();
self.command(PlayerCommand::Load { self.command(PlayerCommand::Load {
track_id, track_id,
play_request_id,
play: start_playing, play: start_playing,
position_ms, position_ms,
}); });
play_request_id
} }
pub fn preload(&self, track_id: SpotifyId) { pub fn preload(&self, track_id: SpotifyId) {
@ -429,6 +436,10 @@ impl Player {
self.command(PlayerCommand::Seek(position_ms)); self.command(PlayerCommand::Seek(position_ms));
} }
pub fn set_session(&self, session: Session) {
self.command(PlayerCommand::SetSession(session));
}
pub fn get_player_event_channel(&self) -> PlayerEventChannel { pub fn get_player_event_channel(&self) -> PlayerEventChannel {
let (event_sender, event_receiver) = mpsc::unbounded_channel(); let (event_sender, event_receiver) = mpsc::unbounded_channel();
self.command(PlayerCommand::AddEventSender(event_sender)); self.command(PlayerCommand::AddEventSender(event_sender));
@ -1264,10 +1275,6 @@ impl Future for PlayerInternal {
} }
} }
if self.session.is_invalid() {
return Poll::Ready(());
}
if (!self.state.is_playing()) && all_futures_completed_or_not_ready { if (!self.state.is_playing()) && all_futures_completed_or_not_ready {
return Poll::Pending; return Poll::Pending;
} }
@ -1515,10 +1522,15 @@ impl PlayerInternal {
fn handle_command_load( fn handle_command_load(
&mut self, &mut self,
track_id: SpotifyId, track_id: SpotifyId,
play_request_id: u64, play_request_id_option: Option<u64>,
play: bool, play: bool,
position_ms: u32, position_ms: u32,
) -> PlayerResult { ) -> PlayerResult {
let play_request_id =
play_request_id_option.unwrap_or(self.play_request_id_generator.get());
self.send_event(PlayerEvent::PlayRequestIdChanged { play_request_id });
if !self.config.gapless { if !self.config.gapless {
self.ensure_sink_stopped(play); self.ensure_sink_stopped(play);
} }
@ -1771,7 +1783,7 @@ impl PlayerInternal {
{ {
return self.handle_command_load( return self.handle_command_load(
track_id, track_id,
play_request_id, Some(play_request_id),
start_playback, start_playback,
position_ms, position_ms,
); );
@ -1828,10 +1840,9 @@ impl PlayerInternal {
match cmd { match cmd {
PlayerCommand::Load { PlayerCommand::Load {
track_id, track_id,
play_request_id,
play, play,
position_ms, position_ms,
} => self.handle_command_load(track_id, play_request_id, play, position_ms)?, } => self.handle_command_load(track_id, None, play, position_ms)?,
PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id), PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id),
@ -1843,6 +1854,8 @@ impl PlayerInternal {
PlayerCommand::Stop => self.handle_player_stop(), PlayerCommand::Stop => self.handle_player_stop(),
PlayerCommand::SetSession(session) => self.session = session,
PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender), PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender),
PlayerCommand::SetSinkEventCallback(callback) => self.sink_event_callback = callback, PlayerCommand::SetSinkEventCallback(callback) => self.sink_event_callback = callback,
@ -2057,6 +2070,7 @@ impl fmt::Debug for PlayerCommand {
PlayerCommand::Pause => f.debug_tuple("Pause").finish(), PlayerCommand::Pause => f.debug_tuple("Pause").finish(),
PlayerCommand::Stop => f.debug_tuple("Stop").finish(), PlayerCommand::Stop => f.debug_tuple("Stop").finish(),
PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(), PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(),
PlayerCommand::SetSession(_) => f.debug_tuple("SetSession").finish(),
PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(), PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(),
PlayerCommand::SetSinkEventCallback(_) => { PlayerCommand::SetSinkEventCallback(_) => {
f.debug_tuple("SetSinkEventCallback").finish() f.debug_tuple("SetSinkEventCallback").finish()

View file

@ -1747,6 +1747,32 @@ async fn main() {
exit(1); exit(1);
} }
let mixer_config = setup.mixer_config.clone();
let mixer = (setup.mixer)(mixer_config);
let player_config = setup.player_config.clone();
let soft_volume = mixer.get_soft_volume();
let format = setup.format;
let backend = setup.backend;
let device = setup.device.clone();
let sample_rate = player_config.sample_rate.as_u32();
let player = Player::new(player_config, session.clone(), soft_volume, move || {
(backend)(device, format, sample_rate)
});
if let Some(player_event_program) = setup.player_event_program.clone() {
_event_handler = Some(EventHandler::new(
player.get_player_event_channel(),
&player_event_program,
));
if setup.emit_sink_events {
player.set_sink_event_callback(Some(Box::new(move |sink_status| {
run_program_on_sink_events(sink_status, &player_event_program)
})));
}
}
loop { loop {
tokio::select! { tokio::select! {
credentials = async { credentials = async {
@ -1769,6 +1795,9 @@ async fn main() {
// Continue shutdown in its own task // Continue shutdown in its own task
tokio::spawn(spirc_task); tokio::spawn(spirc_task);
} }
if !session.is_invalid() {
session.shutdown();
}
connecting = true; connecting = true;
}, },
@ -1781,33 +1810,17 @@ async fn main() {
_ = async {}, if connecting && last_credentials.is_some() => { _ = async {}, if connecting && last_credentials.is_some() => {
if session.is_invalid() { if session.is_invalid() {
session = Session::new(setup.session_config.clone(), setup.cache.clone()); session = Session::new(setup.session_config.clone(), setup.cache.clone());
player.set_session(session.clone());
} }
let mixer_config = setup.mixer_config.clone();
let mixer = (setup.mixer)(mixer_config);
let player_config = setup.player_config.clone();
let connect_config = setup.connect_config.clone(); let connect_config = setup.connect_config.clone();
let soft_volume = mixer.get_soft_volume();
let format = setup.format;
let backend = setup.backend;
let device = setup.device.clone();
let sample_rate = player_config.sample_rate.as_u32();
let player = Player::new(player_config, session.clone(), soft_volume, move || {
(backend)(device, format, sample_rate)
});
if let Some(player_event_program) = setup.player_event_program.clone() { let (spirc_, spirc_task_) = match Spirc::new(connect_config,
_event_handler = Some(EventHandler::new(player.get_player_event_channel(), &player_event_program)); session.clone(),
last_credentials.clone().unwrap_or_default(),
if setup.emit_sink_events { player.clone(),
player.set_sink_event_callback(Some(Box::new(move |sink_status| { mixer.clone()).await {
run_program_on_sink_events(sink_status, &player_event_program)
})));
}
};
let (spirc_, spirc_task_) = match Spirc::new(connect_config, session.clone(), last_credentials.clone().unwrap_or_default(), player, mixer).await {
Ok((spirc_, spirc_task_)) => (spirc_, spirc_task_), Ok((spirc_, spirc_task_)) => (spirc_, spirc_task_),
Err(e) => { Err(e) => {
error!("could not initialize spirc: {}", e); error!("could not initialize spirc: {}", e);
@ -1844,12 +1857,19 @@ async fn main() {
if last_credentials.is_some() && !reconnect_exceeds_rate_limit() { if last_credentials.is_some() && !reconnect_exceeds_rate_limit() {
auto_connect_times.push(Instant::now()); auto_connect_times.push(Instant::now());
if !session.is_invalid() {
session.shutdown();
}
connecting = true; connecting = true;
} else { } else {
error!("Spirc shut down too often. Not reconnecting automatically."); error!("Spirc shut down too often. Not reconnecting automatically.");
exit(1); exit(1);
} }
}, },
_ = async {}, if player.is_invalid() => {
error!("Player shut down unexpectedly");
exit(1);
},
_ = tokio::signal::ctrl_c() => { _ = tokio::signal::ctrl_c() => {
break; break;
}, },

View file

@ -42,6 +42,10 @@ impl EventHandler {
let mut env_vars = HashMap::new(); let mut env_vars = HashMap::new();
match event { match event {
PlayerEvent::PlayRequestIdChanged { play_request_id } => {
env_vars.insert("PLAYER_EVENT", "play_request_id_changed".to_string());
env_vars.insert("PLAY_REQUEST_ID", play_request_id.to_string());
}
PlayerEvent::TrackChanged { audio_item } => { PlayerEvent::TrackChanged { audio_item } => {
match audio_item.track_id.to_base62() { match audio_item.track_id.to_base62() {
Err(e) => { Err(e) => {