From d8969dab0cca17a75e04fc768ee4dde8194937bd Mon Sep 17 00:00:00 2001 From: Felix Prillwitz Date: Tue, 10 Dec 2024 18:51:52 +0100 Subject: [PATCH] connect: reduce/group state updates by delaying them slightly --- connect/src/spirc.rs | 48 +++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 240beda7..0f819c8f 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -111,6 +111,10 @@ struct SpircTask { /// when no other future resolves, otherwise resets the delay update_volume: bool, + /// when set to true, it will update the volume after [UPDATE_STATE_DELAY], + /// when no other future resolves, otherwise resets the delay + update_state: bool, + spirc_id: usize, } @@ -141,11 +145,13 @@ const CONTEXT_FETCH_THRESHOLD: usize = 2; const VOLUME_STEP_SIZE: u16 = 1024; // (u16::MAX + 1) / VOLUME_STEPS // delay to resolve a bundle of context updates, delaying the update prevents duplicate context updates of the same type -const RESOLVE_CONTEXT_DELAY: Duration = Duration::from_millis(500); +const RESOLVE_CONTEXT_DELAY: Duration = Duration::from_millis(600); // time after which an unavailable context is retried const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600); // delay to update volume after a certain amount of time, instead on each update request const VOLUME_UPDATE_DELAY: Duration = Duration::from_secs(2); +// to reduce updates to remote, we group some request by waiting for a set amount of time +const UPDATE_STATE_DELAY: Duration = Duration::from_millis(400); pub struct Spirc { commands: mpsc::UnboundedSender, @@ -250,6 +256,7 @@ impl Spirc { unavailable_contexts: HashMap::new(), transfer_state: None, update_volume: false, + update_state: false, spirc_id, }; @@ -418,11 +425,16 @@ impl SpircTask { error!("could not dispatch player event: {}", e); } }, - _ = async { sleep(RESOLVE_CONTEXT_DELAY).await }, if !self.resolve_context.is_empty() => { - if let Err(why) = self.handle_resolve_context().await { + _ = async { sleep(UPDATE_STATE_DELAY).await }, if self.update_state => { + self.update_state = false; + + if let Err(why) = self.notify().await { error!("ContextError: {why}") } }, + _ = async { sleep(RESOLVE_CONTEXT_DELAY).await }, if !self.resolve_context.is_empty() => { + self.handle_resolve_context().await + }, _ = async { sleep(VOLUME_UPDATE_DELAY).await }, if self.update_volume => { self.update_volume = false; @@ -455,7 +467,7 @@ impl SpircTask { self.session.dealer().close().await; } - async fn handle_resolve_context(&mut self) -> Result<(), Error> { + async fn handle_resolve_context(&mut self) { let mut last_resolve = None::; while let Some(resolve) = self.resolve_context.pop() { if matches!(last_resolve, Some(ref last_resolve) if last_resolve == &resolve) { @@ -468,7 +480,7 @@ impl SpircTask { Some(resolve) => resolve, None => { warn!("tried to resolve context without resolve_uri: {resolve}"); - return Ok(()); + return; } }; @@ -494,24 +506,30 @@ impl SpircTask { } if let Some(transfer_state) = self.transfer_state.take() { - self.connect_state.finish_transfer(transfer_state)? + if let Err(why) = self.connect_state.finish_transfer(transfer_state) { + error!("finishing setup of transfer failed: {why}") + } } if matches!(self.connect_state.active_context, ContextType::Default) { let ctx = self.connect_state.context.as_ref(); if matches!(ctx, Some(ctx) if ctx.tracks.is_empty()) { self.connect_state.clear_next_tracks(true); - self.handle_next(None)?; + // skip to the next queued track, otherwise it should stop + let _ = self.handle_next(None); } } - self.connect_state.fill_up_next_tracks()?; + if let Err(why) = self.connect_state.fill_up_next_tracks() { + error!("fill up of next tracks failed after updating contexts: {why}") + } + self.connect_state.update_restrictions(); self.connect_state.update_queue_revision(); self.preload_autoplay_when_required(); - self.notify().await + self.update_state = true; } async fn resolve_context( @@ -542,10 +560,6 @@ impl SpircTask { error!("resolving context should only update the tracks, but had no page, or track. {ctx:#?}"); }; - if let Err(why) = self.notify().await { - error!("failed to update connect state, after updating the context: {why}") - } - return Ok(()); } @@ -776,7 +790,8 @@ impl SpircTask { _ => return Ok(()), } - self.notify().await + self.update_state = true; + Ok(()) } async fn handle_connection_id_update(&mut self, connection_id: String) -> Result<(), Error> { @@ -909,7 +924,7 @@ impl SpircTask { // fixme: workaround fix, because of missing information why it behaves like it does // background: when another device sends a connect-state update, some player's position de-syncs // tried: providing session_id, playback_id, track-metadata "track_player" - self.notify().await?; + self.update_state = true; } } else if self.connect_state.is_active() { self.connect_state.became_inactive(&self.session).await?; @@ -1038,7 +1053,8 @@ impl SpircTask { Resume(_) => self.handle_play(), } - self.notify().await + self.update_state = true; + Ok(()) } fn handle_transfer(&mut self, mut transfer: TransferState) -> Result<(), Error> {