diff --git a/connect/src/context_resolver.rs b/connect/src/context_resolver.rs index bcc4a8df..2746d5ab 100644 --- a/connect/src/context_resolver.rs +++ b/connect/src/context_resolver.rs @@ -1,54 +1,92 @@ -use crate::state::ConnectState; -use librespot_protocol::player::Context; +use crate::{ + core::{Error, Session}, + protocol::{ + autoplay_context_request::AutoplayContextRequest, + player::{Context, TransferState}, + }, + state::{context::UpdateContext, ConnectState}, +}; use std::{ + collections::{HashMap, VecDeque}, fmt::{Display, Formatter}, hash::{Hash, Hasher}, + time::Duration, }; +use thiserror::Error as ThisError; +use tokio::time::Instant; + +#[derive(Debug, Clone)] +enum Resolve { + Uri(String), + Context(Context), +} + +#[derive(Debug, Clone)] +pub(super) enum ContextAction { + Append, + Replace, +} #[derive(Debug, Clone)] pub(super) struct ResolveContext { - context: Context, + resolve: Resolve, fallback: Option, - autoplay: bool, + update: UpdateContext, + action: ContextAction, } impl ResolveContext { - pub fn from_uri(uri: impl Into, fallback: impl Into, autoplay: bool) -> Self { - let fallback_uri = fallback.into(); + fn append_context(uri: impl Into) -> Self { Self { - context: Context { - uri: uri.into(), - ..Default::default() - }, - fallback: (!fallback_uri.is_empty()).then_some(fallback_uri), - autoplay, + resolve: Resolve::Uri(uri.into()), + fallback: None, + update: UpdateContext::Default, + action: ContextAction::Append, } } - pub fn from_context(context: Context, autoplay: bool) -> Self { + pub fn from_uri( + uri: impl Into, + fallback: impl Into, + update: UpdateContext, + action: ContextAction, + ) -> Self { + let fallback_uri = fallback.into(); Self { - context, + resolve: Resolve::Uri(uri.into()), + fallback: (!fallback_uri.is_empty()).then_some(fallback_uri), + update, + action, + } + } + + pub fn from_context(context: Context, update: UpdateContext, action: ContextAction) -> Self { + Self { + resolve: Resolve::Context(context), fallback: None, - autoplay, + update, + action, } } /// the uri which should be used to resolve the context, might not be the context uri - pub fn resolve_uri(&self) -> Option<&String> { + fn resolve_uri(&self) -> Option<&str> { // it's important to call this always, or at least for every ResolveContext // otherwise we might not even check if we need to fallback and just use the fallback uri - ConnectState::get_context_uri_from_context(&self.context) - .and_then(|s| (!s.is_empty()).then_some(s)) - .or(self.fallback.as_ref()) + match self.resolve { + Resolve::Uri(ref uri) => ConnectState::valid_resolve_uri(uri), + Resolve::Context(ref ctx) => { + ConnectState::get_context_uri_from_context(ctx).or(self.fallback.as_deref()) + } + } } /// the actual context uri - pub fn context_uri(&self) -> &str { - &self.context.uri - } - - pub fn autoplay(&self) -> bool { - self.autoplay + fn context_uri(&self) -> &str { + match self.resolve { + Resolve::Uri(ref uri) => uri, + Resolve::Context(ref ctx) => &ctx.uri, + } } } @@ -56,10 +94,10 @@ impl Display for ResolveContext { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "resolve_uri: <{:?}>, context_uri: <{}>, autoplay: <{}>", + "resolve_uri: <{:?}>, context_uri: <{}>, update: <{:?}>", self.resolve_uri(), - self.context.uri, - self.autoplay, + self.context_uri(), + self.update, ) } } @@ -68,7 +106,7 @@ impl PartialEq for ResolveContext { fn eq(&self, other: &Self) -> bool { let eq_context = self.context_uri() == other.context_uri(); let eq_resolve = self.resolve_uri() == other.resolve_uri(); - let eq_autoplay = self.autoplay == other.autoplay; + let eq_autoplay = self.update == other.update; eq_context && eq_resolve && eq_autoplay } @@ -80,12 +118,227 @@ impl Hash for ResolveContext { fn hash(&self, state: &mut H) { self.context_uri().hash(state); self.resolve_uri().hash(state); - self.autoplay.hash(state); + self.update.hash(state); } } -impl From for Context { - fn from(value: ResolveContext) -> Self { - value.context +#[derive(Debug, ThisError)] +enum ContextResolverError { + #[error("no next context to resolve")] + NoNext, + #[error("tried appending context with {0} pages")] + UnexpectedPagesSize(usize), + #[error("tried resolving not allowed context: {0:?}")] + NotAllowedContext(String), +} + +impl From for Error { + fn from(value: ContextResolverError) -> Self { + Error::failed_precondition(value) + } +} + +pub struct ContextResolver { + session: Session, + queue: VecDeque, + unavailable_contexts: HashMap, +} + +// time after which an unavailable context is retried +const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600); + +impl ContextResolver { + pub fn new(session: Session) -> Self { + Self { + session, + queue: VecDeque::new(), + unavailable_contexts: HashMap::new(), + } + } + + pub fn add(&mut self, resolve: ResolveContext) { + let last_try = self + .unavailable_contexts + .get(&resolve) + .map(|i| i.duration_since(Instant::now())); + + let last_try = if matches!(last_try, Some(last_try) if last_try > RETRY_UNAVAILABLE) { + let _ = self.unavailable_contexts.remove(&resolve); + debug!( + "context was requested {}s ago, trying again to resolve the requested context", + last_try.expect("checked by condition").as_secs() + ); + None + } else { + last_try + }; + + if last_try.is_some() { + debug!("tried loading unavailable context: {resolve}"); + return; + } else if self.queue.contains(&resolve) { + debug!("update for {resolve} is already added"); + return; + } + + self.queue.push_back(resolve) + } + + pub fn add_list(&mut self, resolve: Vec) { + for resolve in resolve { + self.add(resolve) + } + } + + pub fn remove_used_and_invalid(&mut self) { + if let Some((_, _, remove)) = self.find_next() { + for _ in 0..remove { + let _ = self.queue.pop_front(); + } + } + self.queue.pop_front(); + } + + pub fn clear(&mut self) { + self.queue = VecDeque::new() + } + + fn find_next(&self) -> Option<(&ResolveContext, &str, usize)> { + let mut idx = 0; + loop { + let next = self.queue.front()?; + match next.resolve_uri() { + None => { + warn!("skipped {idx} because of no valid resolve_uri: {next}"); + idx += 1; + continue; + } + Some(uri) => break Some((next, uri, idx)), + } + } + } + + pub fn has_next(&self) -> bool { + self.find_next().is_some() + } + + pub async fn get_next_context( + &self, + recent_track_uri: impl Fn() -> Vec, + ) -> Result { + let (next, resolve_uri, _) = self.find_next().ok_or(ContextResolverError::NoNext)?; + + match next.update { + UpdateContext::Default => { + let mut ctx = self.session.spclient().get_context(resolve_uri).await; + if let Ok(ctx) = ctx.as_mut() { + ctx.uri = next.context_uri().to_string(); + ctx.url = format!("context://{}", ctx.uri); + } + + ctx + } + UpdateContext::Autoplay => { + if resolve_uri.contains("spotify:show:") || resolve_uri.contains("spotify:episode:") + { + // autoplay is not supported for podcasts + Err(ContextResolverError::NotAllowedContext( + resolve_uri.to_string(), + ))? + } + + let request = AutoplayContextRequest { + context_uri: Some(resolve_uri.to_string()), + recent_track_uri: recent_track_uri(), + ..Default::default() + }; + self.session.spclient().get_autoplay_context(&request).await + } + } + } + + pub fn mark_next_unavailable(&mut self) { + if let Some((next, _, _)) = self.find_next() { + self.unavailable_contexts + .insert(next.clone(), Instant::now()); + } + } + + pub fn handle_next_context( + &self, + state: &mut ConnectState, + mut context: Context, + ) -> Result>, Error> { + let (next, _, _) = self.find_next().ok_or(ContextResolverError::NoNext)?; + + let remaining = match next.action { + ContextAction::Append if context.pages.len() == 1 => state + .fill_context_from_page(context.pages.remove(0)) + .map(|_| None), + ContextAction::Replace => { + let remaining = state.update_context(context, next.update); + if let Resolve::Context(ref ctx) = next.resolve { + state.merge_context(Some(ctx.clone())); + } + + remaining + } + ContextAction::Append => { + warn!("unexpected page size: {context:#?}"); + Err(ContextResolverError::UnexpectedPagesSize(context.pages.len()).into()) + } + }?; + + Ok(remaining.map(|remaining| { + remaining + .into_iter() + .map(ResolveContext::append_context) + .collect::>() + })) + } + + pub fn try_finish( + &self, + state: &mut ConnectState, + transfer_state: &mut Option, + ) -> bool { + let (next, _, _) = match self.find_next() { + None => return false, + Some(next) => next, + }; + + // when there is only one update type, we are the last of our kind, so we should update the state + if self + .queue + .iter() + .filter(|resolve| resolve.update == next.update) + .count() + != 1 + { + return false; + } + + debug!("last item of type <{:?}> finishing state", next.update); + + if let Some(transfer_state) = transfer_state.take() { + if let Err(why) = state.finish_transfer(transfer_state) { + error!("finishing setup of transfer failed: {why}") + } + } + + let res = if state.shuffling_context() { + state.shuffle() + } else { + state.reset_playback_to_position(Some(state.player().index.track as usize)) + }; + + if let Err(why) = res { + error!("setting up state failed after updating contexts: {why}") + } + + state.update_restrictions(); + state.update_queue_revision(); + + true } } diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index a6bc482d..958ea74f 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -1,15 +1,6 @@ pub use crate::model::{PlayingTrack, SpircLoadCommand}; -use crate::state::{context::ResetContext, metadata::Metadata}; -use crate::{ - context_resolver::ResolveContext, - model::SpircPlayStatus, - state::{ - context::{ContextType, UpdateContext}, - provider::IsProvider, - {ConnectState, ConnectStateConfig}, - }, -}; use crate::{ + context_resolver::{ContextAction, ContextResolver, ResolveContext}, core::{ authentication::Credentials, dealer::{ @@ -19,12 +10,12 @@ use crate::{ session::UserAttributes, Error, Session, SpotifyId, }, + model::SpircPlayStatus, playback::{ mixer::Mixer, player::{Player, PlayerEvent, PlayerEventChannel}, }, protocol::{ - autoplay_context_request::AutoplayContextRequest, connect::{Cluster, ClusterUpdate, LogoutCommand, SetVolumeCommand}, explicit_content_pubsub::UserAttributesUpdate, player::{Context, TransferState}, @@ -32,11 +23,17 @@ use crate::{ social_connect_v2::{session::_host_active_device_id, SessionUpdate}, user_attributes::UserAttributesMutation, }, + state::{ + context::{ + ResetContext, {ContextType, UpdateContext}, + }, + metadata::Metadata, + provider::IsProvider, + {ConnectState, ConnectStateConfig}, + }, }; use futures_util::StreamExt; use protobuf::MessageField; -use std::collections::HashMap; -use std::time::Instant; use std::{ future::Future, sync::atomic::{AtomicUsize, Ordering}, @@ -94,17 +91,11 @@ struct SpircTask { commands: Option>, player_events: Option, + context_resolver: ContextResolver, + shutdown: bool, session: Session, - /// the list of contexts to resolve - resolve_context: Vec, - - /// contexts may not be resolvable at the moment so we should ignore any further request - /// - /// an unavailable context is retried after [RETRY_UNAVAILABLE] - unavailable_contexts: HashMap, - /// is set when transferring, and used after resolving the contexts to finish the transfer pub transfer_state: Option, @@ -145,14 +136,10 @@ const CONTEXT_FETCH_THRESHOLD: usize = 2; const VOLUME_STEP_SIZE: u16 = 1024; // (u16::MAX + 1) / VOLUME_STEPS -// delay to resolve a bundle of context updates, delaying the update prevents duplicate context updates of the same type -const RESOLVE_CONTEXT_DELAY: Duration = Duration::from_millis(600); -// time after which an unavailable context is retried -const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600); // delay to update volume after a certain amount of time, instead on each update request const VOLUME_UPDATE_DELAY: Duration = Duration::from_secs(2); // to reduce updates to remote, we group some request by waiting for a set amount of time -const UPDATE_STATE_DELAY: Duration = Duration::from_millis(400); +const UPDATE_STATE_DELAY: Duration = Duration::from_millis(300); pub struct Spirc { commands: mpsc::UnboundedSender, @@ -250,11 +237,11 @@ impl Spirc { commands: Some(cmd_rx), player_events: Some(player_events), + context_resolver: ContextResolver::new(session.clone()), + shutdown: false, session, - resolve_context: Vec::new(), - unavailable_contexts: HashMap::new(), transfer_state: None, update_volume: false, update_state: false, @@ -360,6 +347,10 @@ impl SpircTask { let commands = self.commands.as_mut(); let player_events = self.player_events.as_mut(); + // when state and volume update have a higher priority than context resolving + // because of that the context resolving has to wait, so that the other tasks can finish + let allow_context_resolving = !self.update_state && !self.update_volume; + tokio::select! { // startup of the dealer requires a connection_id, which is retrieved at the very beginning connection_id_update = self.connection_id_update.next() => unwrap! { @@ -422,7 +413,7 @@ impl SpircTask { } }, event = async { player_events?.recv().await }, if player_events.is_some() => if let Some(event) = event { - if let Err(e) = self.handle_player_event(event).await { + if let Err(e) = self.handle_player_event(event) { error!("could not dispatch player event: {}", e); } }, @@ -430,12 +421,9 @@ impl SpircTask { self.update_state = false; if let Err(why) = self.notify().await { - error!("ContextError: {why}") + error!("state update: {why}") } }, - _ = async { sleep(RESOLVE_CONTEXT_DELAY).await }, if !self.resolve_context.is_empty() => { - self.handle_resolve_context().await - }, _ = async { sleep(VOLUME_UPDATE_DELAY).await }, if self.update_volume => { self.update_volume = false; @@ -451,6 +439,21 @@ impl SpircTask { error!("error updating connect state for volume update: {why}") } }, + // context resolver handling, the idea/reason behind it the following: + // + // when we request a context that has multiple pages (for example an artist) + // resolving all pages at once can take around ~1-30sec, when we resolve + // everything at once that would block our main loop for that time + // + // to circumvent this behavior, we request each context separately here and + // finish after we received our last item of a type + next_context = async { + self.context_resolver.get_next_context(|| { + self.connect_state.prev_autoplay_track_uris() + }).await + }, if allow_context_resolving && self.context_resolver.has_next() => { + self.handle_context(next_context) + }, else => break } } @@ -468,175 +471,43 @@ impl SpircTask { self.session.dealer().close().await; } - async fn handle_resolve_context(&mut self) { - let mut last_resolve = None::; - while let Some(resolve) = self.resolve_context.pop() { - if matches!(last_resolve, Some(ref last_resolve) if last_resolve == &resolve) { - debug!("did already update the context for {resolve}"); - continue; - } else { - last_resolve = Some(resolve.clone()); - - let resolve_uri = match resolve.resolve_uri() { - Some(resolve) => resolve, - None => { - warn!("tried to resolve context without resolve_uri: {resolve}"); - return; - } - }; - - debug!("resolving: {resolve}"); - // the autoplay endpoint can return a 404, when it tries to retrieve an - // autoplay context for an empty playlist as it seems - if let Err(why) = self - .resolve_context(resolve_uri, resolve.context_uri(), resolve.autoplay()) - .await - { - error!("failed resolving context <{resolve}>: {why}"); - self.unavailable_contexts.insert(resolve, Instant::now()); - continue; - } - - self.connect_state.merge_context(Some(resolve.into())); + fn handle_context(&mut self, next_context: Result) { + let next_context = match next_context { + Err(why) => { + self.context_resolver.mark_next_unavailable(); + self.context_resolver.remove_used_and_invalid(); + error!("{why}"); + return; } - } - - if let Some(transfer_state) = self.transfer_state.take() { - if let Err(why) = self.connect_state.finish_transfer(transfer_state) { - error!("finishing setup of transfer failed: {why}") - } - } - - if matches!(self.connect_state.active_context, ContextType::Default) { - let ctx = self.connect_state.context.as_ref(); - if matches!(ctx, Some(ctx) if ctx.tracks.is_empty()) { - self.connect_state.clear_next_tracks(true); - // skip to the next queued track, otherwise it should stop - let _ = self.handle_next(None); - } - } - - if let Err(why) = self.connect_state.fill_up_next_tracks() { - error!("fill up of next tracks failed after updating contexts: {why}") - } - - self.connect_state.update_restrictions(); - self.connect_state.update_queue_revision(); - - self.preload_autoplay_when_required(); - - self.update_state = true; - } - - async fn resolve_context( - &mut self, - resolve_uri: &str, - context_uri: &str, - autoplay: bool, - ) -> Result<(), Error> { - if !autoplay { - let mut ctx = self.session.spclient().get_context(resolve_uri).await?; - ctx.uri = context_uri.to_string(); - ctx.url = format!("context://{context_uri}"); - - if let Some(remaining) = self - .connect_state - .update_context(ctx, UpdateContext::Default)? - { - self.try_resolve_remaining(remaining).await; - } - - return Ok(()); - } - - // refuse resolve of not supported autoplay context - if resolve_uri.contains("spotify:show:") || resolve_uri.contains("spotify:episode:") { - // autoplay is not supported for podcasts - Err(SpircError::NotAllowedContext(resolve_uri.to_string()))? - } - - // resolve autoplay - let previous_tracks = self.connect_state.prev_autoplay_track_uris(); - - debug!( - "requesting autoplay context <{resolve_uri}> with {} previous tracks", - previous_tracks.len() - ); - - let ctx_request = AutoplayContextRequest { - context_uri: Some(resolve_uri.to_string()), - recent_track_uri: previous_tracks, - ..Default::default() + Ok(ctx) => ctx, }; - let context = self - .session - .spclient() - .get_autoplay_context(&ctx_request) - .await?; - - if let Some(remaining) = self - .connect_state - .update_context(context, UpdateContext::Autoplay)? + match self + .context_resolver + .handle_next_context(&mut self.connect_state, next_context) { - self.try_resolve_remaining(remaining).await; - } - - Ok(()) - } - - async fn try_resolve_remaining(&mut self, remaining: Vec) { - for resolve_uri in remaining { - let mut ctx = match self.session.spclient().get_context(&resolve_uri).await { - Ok(ctx) => ctx, - Err(why) => { - warn!("failed to retrieve context for remaining <{resolve_uri}>: {why}"); - continue; + Ok(remaining) => { + if let Some(remaining) = remaining { + self.context_resolver.add_list(remaining) } - }; - - if ctx.pages.len() > 1 { - warn!("context contained more page then expected: {ctx:#?}"); - continue; } - - debug!("appending context from single page, adding: <{}>", ctx.uri); - - if let Err(why) = self - .connect_state - .fill_context_from_page(ctx.pages.remove(0)) - { - warn!("failed appending context <{resolve_uri}>: {why}"); + Err(why) => { + error!("{why}") } } - } - fn add_resolve_context(&mut self, resolve: ResolveContext) { - let last_try = self - .unavailable_contexts - .get(&resolve) - .map(|i| i.duration_since(Instant::now())); - - let last_try = if matches!(last_try, Some(last_try) if last_try > RETRY_UNAVAILABLE) { - let _ = self.unavailable_contexts.remove(&resolve); - debug!( - "context was requested {}s ago, trying again to resolve the requested context", - last_try.expect("checked by condition").as_secs() - ); - None - } else { - last_try - }; - - if last_try.is_none() { - debug!("add resolve request: {resolve}"); - self.resolve_context.push(resolve); - } else { - debug!("tried loading unavailable context: {resolve}") + if self + .context_resolver + .try_finish(&mut self.connect_state, &mut self.transfer_state) + { + self.add_autoplay_resolving_when_required(); + self.update_state = true; } + + self.context_resolver.remove_used_and_invalid(); } - // todo: time_delta still necessary? + // todo: is the time_delta still necessary? fn now_ms(&self) -> i64 { let dur = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -687,7 +558,7 @@ impl SpircTask { self.notify().await } - async fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { + fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { if let PlayerEvent::TrackChanged { audio_item } = event { self.connect_state.update_duration(audio_item.duration_ms); return Ok(()); @@ -910,7 +781,7 @@ impl SpircTask { self.player .emit_auto_play_changed_event(matches!(new_value, "1")); - self.preload_autoplay_when_required() + self.add_autoplay_resolving_when_required() } } else { trace!( @@ -993,9 +864,10 @@ impl SpircTask { update_context.context.uri, self.connect_state.context_uri() ) } else { - self.add_resolve_context(ResolveContext::from_context( + self.context_resolver.add(ResolveContext::from_context( update_context.context, - false, + super::state::context::UpdateContext::Default, + ContextAction::Replace, )) } return Ok(()); @@ -1100,7 +972,12 @@ impl SpircTask { let fallback = self.connect_state.current_track(|t| &t.uri).clone(); - self.add_resolve_context(ResolveContext::from_uri(ctx_uri.clone(), &fallback, false)); + self.context_resolver.add(ResolveContext::from_uri( + ctx_uri.clone(), + &fallback, + UpdateContext::Default, + ContextAction::Replace, + )); let timestamp = self.now_ms(); let state = &mut self.connect_state; @@ -1123,7 +1000,12 @@ impl SpircTask { if self.connect_state.current_track(|t| t.is_autoplay()) || autoplay { debug!("currently in autoplay context, async resolving autoplay for {ctx_uri}"); - self.add_resolve_context(ResolveContext::from_uri(ctx_uri, fallback, true)) + self.context_resolver.add(ResolveContext::from_uri( + ctx_uri, + fallback, + UpdateContext::Autoplay, + ContextAction::Replace, + )) } self.transfer_state = Some(transfer); @@ -1132,6 +1014,7 @@ impl SpircTask { } async fn handle_disconnect(&mut self) -> Result<(), Error> { + self.context_resolver.clear(); self.handle_stop(); self.play_status = SpircPlayStatus::Stopped {}; @@ -1191,6 +1074,8 @@ impl SpircTask { cmd: SpircLoadCommand, context: Option, ) -> Result<(), Error> { + self.context_resolver.clear(); + self.connect_state .reset_context(ResetContext::WhenDifferent(&cmd.context_uri)); @@ -1206,15 +1091,20 @@ impl SpircTask { } } else { &cmd.context_uri - } - .clone(); + }; if current_context_uri == &cmd.context_uri && fallback == cmd.context_uri { debug!("context <{current_context_uri}> didn't change, no resolving required") } else { debug!("resolving context for load command"); - self.resolve_context(&fallback, &cmd.context_uri, false) - .await?; + self.context_resolver.add(ResolveContext::from_uri( + &cmd.context_uri, + fallback, + UpdateContext::Default, + ContextAction::Replace, + )); + let context = self.context_resolver.get_next_context(Vec::new).await; + self.handle_context(context); } // for play commands with skip by uid, the context of the command contains @@ -1228,11 +1118,11 @@ impl SpircTask { let index = match cmd.playing_track { PlayingTrack::Index(i) => i as usize, PlayingTrack::Uri(uri) => { - let ctx = self.connect_state.context.as_ref(); + let ctx = self.connect_state.get_context(ContextType::Default)?; ConnectState::find_index_in_context(ctx, |t| t.uri == uri)? } PlayingTrack::Uid(uid) => { - let ctx = self.connect_state.context.as_ref(); + let ctx = self.connect_state.get_context(ContextType::Default)?; ConnectState::find_index_in_context(ctx, |t| t.uid == uid)? } }; @@ -1244,18 +1134,21 @@ impl SpircTask { self.connect_state.set_shuffle(cmd.shuffle); self.connect_state.set_repeat_context(cmd.repeat); + self.connect_state.set_repeat_track(cmd.repeat_track); if cmd.shuffle { - self.connect_state.set_current_track(index)?; - self.connect_state.shuffle()?; + self.connect_state.set_current_track_random()?; + + if self.context_resolver.has_next() { + self.connect_state.update_queue_revision() + } else { + self.connect_state.shuffle()?; + } } else { - // manually overwrite a possible current queued track self.connect_state.set_current_track(index)?; self.connect_state.reset_playback_to_position(Some(index))?; } - self.connect_state.set_repeat_track(cmd.repeat_track); - if self.connect_state.current_track(MessageField::is_some) { self.load_track(cmd.start_playing, cmd.seek_to)?; } else { @@ -1263,8 +1156,6 @@ impl SpircTask { self.handle_stop() } - self.preload_autoplay_when_required(); - Ok(()) } @@ -1383,7 +1274,7 @@ impl SpircTask { Ok(()) } - fn preload_autoplay_when_required(&mut self) { + fn add_autoplay_resolving_when_required(&mut self) { let require_load_new = !self .connect_state .has_next_tracks(Some(CONTEXT_FETCH_THRESHOLD)) @@ -1395,9 +1286,25 @@ impl SpircTask { let current_context = self.connect_state.context_uri(); let fallback = self.connect_state.current_track(|t| &t.uri); - let resolve = ResolveContext::from_uri(current_context, fallback, true); - self.add_resolve_context(resolve); + let has_tracks = self + .connect_state + .get_context(ContextType::Autoplay) + .map(|c| !c.tracks.is_empty()) + .unwrap_or_default(); + + let resolve = ResolveContext::from_uri( + current_context, + fallback, + UpdateContext::Autoplay, + if has_tracks { + ContextAction::Append + } else { + ContextAction::Replace + }, + ); + + self.context_resolver.add(resolve); } fn handle_next(&mut self, track_uri: Option) -> Result<(), Error> { @@ -1420,7 +1327,7 @@ impl SpircTask { }; }; - self.preload_autoplay_when_required(); + self.add_autoplay_resolving_when_required(); if has_next_track { self.load_track(continue_playing, 0) @@ -1478,10 +1385,11 @@ impl SpircTask { } debug!("playlist modification for current context: {uri}"); - self.add_resolve_context(ResolveContext::from_uri( + self.context_resolver.add(ResolveContext::from_uri( uri, self.connect_state.current_track(|t| &t.uri), - false, + UpdateContext::Default, + ContextAction::Replace, )); Ok(()) diff --git a/connect/src/state.rs b/connect/src/state.rs index b9cf37d6..6366388f 100644 --- a/connect/src/state.rs +++ b/connect/src/state.rs @@ -100,17 +100,17 @@ pub struct ConnectState { unavailable_uri: Vec, - pub active_since: Option, + active_since: Option, queue_count: u64, // separation is necessary because we could have already loaded // the autoplay context but are still playing from the default context /// to update the active context use [switch_active_context](ConnectState::set_active_context) - pub active_context: ContextType, - pub fill_up_context: ContextType, + active_context: ContextType, + fill_up_context: ContextType, /// the context from which we play, is used to top up prev and next tracks - pub context: Option, + context: Option, /// a context to keep track of our shuffled context, /// should be only available when `player.option.shuffling_context` is true @@ -359,7 +359,7 @@ impl ConnectState { self.clear_prev_track(); if new_index > 0 { - let context = self.get_context(&self.active_context)?; + let context = self.get_context(self.active_context)?; let before_new_track = context.tracks.len() - new_index; self.player_mut().prev_tracks = context diff --git a/connect/src/state/context.rs b/connect/src/state/context.rs index 8aad6463..a9881ec4 100644 --- a/connect/src/state/context.rs +++ b/connect/src/state/context.rs @@ -1,7 +1,9 @@ -use crate::state::{metadata::Metadata, provider::Provider, ConnectState, StateError}; -use librespot_core::{Error, SpotifyId}; -use librespot_protocol::player::{ - Context, ContextIndex, ContextPage, ContextTrack, ProvidedTrack, Restrictions, +use crate::{ + core::{Error, SpotifyId}, + protocol::player::{ + Context, ContextIndex, ContextPage, ContextTrack, ProvidedTrack, Restrictions, + }, + state::{metadata::Metadata, provider::Provider, ConnectState, StateError}, }; use protobuf::MessageField; use std::collections::HashMap; @@ -27,7 +29,7 @@ pub enum ContextType { Autoplay, } -#[derive(Debug)] +#[derive(Debug, PartialEq, Hash, Copy, Clone)] pub enum UpdateContext { Default, Autoplay, @@ -62,26 +64,22 @@ fn page_url_to_uri(page_url: &str) -> String { impl ConnectState { pub fn find_index_in_context bool>( - context: Option<&StateContext>, + ctx: &StateContext, f: F, ) -> Result { - let ctx = context - .as_ref() - .ok_or(StateError::NoContext(ContextType::Default))?; - ctx.tracks .iter() .position(f) .ok_or(StateError::CanNotFindTrackInContext(None, ctx.tracks.len())) } - pub(super) fn get_context(&self, ty: &ContextType) -> Result<&StateContext, StateError> { + pub fn get_context(&self, ty: ContextType) -> Result<&StateContext, StateError> { match ty { ContextType::Default => self.context.as_ref(), ContextType::Shuffle => self.shuffle_context.as_ref(), ContextType::Autoplay => self.autoplay_context.as_ref(), } - .ok_or(StateError::NoContext(*ty)) + .ok_or(StateError::NoContext(ty)) } pub fn context_uri(&self) -> &String { @@ -117,21 +115,24 @@ impl ConnectState { self.update_restrictions() } - pub fn get_context_uri_from_context(context: &Context) -> Option<&String> { - if !context.uri.starts_with(SEARCH_IDENTIFIER) { - return Some(&context.uri); - } + pub fn valid_resolve_uri(uri: &str) -> Option<&str> { + (!uri.starts_with(SEARCH_IDENTIFIER)).then_some(uri) + } - context - .pages - .first() - .and_then(|p| p.tracks.first().map(|t| &t.uri)) + pub fn get_context_uri_from_context(context: &Context) -> Option<&str> { + match Self::valid_resolve_uri(&context.uri) { + Some(uri) => Some(uri), + None => context + .pages + .first() + .and_then(|p| p.tracks.first().map(|t| t.uri.as_ref())), + } } pub fn set_active_context(&mut self, new_context: ContextType) { self.active_context = new_context; - let ctx = match self.get_context(&new_context) { + let ctx = match self.get_context(new_context) { Err(why) => { debug!("couldn't load context info because: {why}"); return; @@ -213,7 +214,7 @@ impl ConnectState { if !self.context_uri().contains(SEARCH_IDENTIFIER) && self.context_uri() == &context.uri { - match Self::find_index_in_context(Some(&new_context), |t| { + match Self::find_index_in_context(&new_context, |t| { self.current_track(|t| &t.uri) == &t.uri }) { Ok(new_pos) => { @@ -326,12 +327,11 @@ impl ConnectState { } if let Ok(position) = - Self::find_index_in_context(Some(current_context), |t| t.uri == new_track.uri) + Self::find_index_in_context(current_context, |t| t.uri == new_track.uri) { let context_track = current_context.tracks.get_mut(position)?; for (key, value) in new_track.metadata { - warn!("merging metadata {key} {value}"); context_track.metadata.insert(key, value); } diff --git a/connect/src/state/handle.rs b/connect/src/state/handle.rs index a69e1ebe..1c1a4b32 100644 --- a/connect/src/state/handle.rs +++ b/connect/src/state/handle.rs @@ -1,5 +1,10 @@ -use crate::state::{context::ResetContext, ConnectState}; -use librespot_core::{dealer::protocol::SetQueueCommand, Error}; +use crate::{ + core::{dealer::protocol::SetQueueCommand, Error}, + state::{ + context::{ContextType, ResetContext}, + ConnectState, + }, +}; use protobuf::MessageField; impl ConnectState { @@ -16,7 +21,7 @@ impl ConnectState { return Ok(()); } - let ctx = self.context.as_ref(); + let ctx = self.get_context(ContextType::Default)?; let current_index = ConnectState::find_index_in_context(ctx, |c| self.current_track(|t| c.uri == t.uri))?; @@ -52,7 +57,7 @@ impl ConnectState { self.set_shuffle(false); self.reset_context(ResetContext::DefaultIndex); - let ctx = self.context.as_ref(); + let ctx = self.get_context(ContextType::Default)?; let current_track = ConnectState::find_index_in_context(ctx, |t| { self.current_track(|t| &t.uri) == &t.uri })?; diff --git a/connect/src/state/options.rs b/connect/src/state/options.rs index b6bc331c..12040d3d 100644 --- a/connect/src/state/options.rs +++ b/connect/src/state/options.rs @@ -51,12 +51,8 @@ impl ConnectState { let current_uri = self.current_track(|t| &t.uri); - let ctx = self - .context - .as_ref() - .ok_or(StateError::NoContext(ContextType::Default))?; - - let current_track = Self::find_index_in_context(Some(ctx), |t| &t.uri == current_uri)?; + let ctx = self.get_context(ContextType::Default)?; + let current_track = Self::find_index_in_context(ctx, |t| &t.uri == current_uri)?; let mut shuffle_context = ctx.clone(); // we don't need to include the current track, because it is already being played diff --git a/connect/src/state/tracks.rs b/connect/src/state/tracks.rs index 2dc1b9af..700bfec5 100644 --- a/connect/src/state/tracks.rs +++ b/connect/src/state/tracks.rs @@ -1,12 +1,15 @@ -use crate::state::{ - context::ContextType, - metadata::Metadata, - provider::{IsProvider, Provider}, - ConnectState, StateError, SPOTIFY_MAX_NEXT_TRACKS_SIZE, SPOTIFY_MAX_PREV_TRACKS_SIZE, +use crate::{ + core::{Error, SpotifyId}, + protocol::player::ProvidedTrack, + state::{ + context::ContextType, + metadata::Metadata, + provider::{IsProvider, Provider}, + ConnectState, StateError, SPOTIFY_MAX_NEXT_TRACKS_SIZE, SPOTIFY_MAX_PREV_TRACKS_SIZE, + }, }; -use librespot_core::{Error, SpotifyId}; -use librespot_protocol::player::ProvidedTrack; use protobuf::MessageField; +use rand::Rng; // identifier used as part of the uid pub const IDENTIFIER_DELIMITER: &str = "delimiter"; @@ -64,8 +67,14 @@ impl<'ct> ConnectState { &self.player().next_tracks } + pub fn set_current_track_random(&mut self) -> Result<(), Error> { + let max_tracks = self.get_context(self.active_context)?.tracks.len(); + let rng_track = rand::thread_rng().gen_range(0..max_tracks); + self.set_current_track(rng_track) + } + pub fn set_current_track(&mut self, index: usize) -> Result<(), Error> { - let context = self.get_context(&self.active_context)?; + let context = self.get_context(self.active_context)?; let new_track = context .tracks @@ -77,8 +86,8 @@ impl<'ct> ConnectState { debug!( "set track to: {} at {} of {} tracks", - index, new_track.uri, + index, context.tracks.len() ); @@ -132,7 +141,7 @@ impl<'ct> ConnectState { self.set_active_context(ContextType::Autoplay); None } else { - let ctx = self.context.as_ref(); + let ctx = self.get_context(ContextType::Default)?; let new_index = Self::find_index_in_context(ctx, |c| c.uri == new_track.uri); match new_index { Ok(new_index) => Some(new_index as u32), @@ -272,12 +281,12 @@ impl<'ct> ConnectState { } pub fn fill_up_next_tracks(&mut self) -> Result<(), StateError> { - let ctx = self.get_context(&self.fill_up_context)?; + let ctx = self.get_context(self.fill_up_context)?; let mut new_index = ctx.index.track as usize; let mut iteration = ctx.index.page; while self.next_tracks().len() < SPOTIFY_MAX_NEXT_TRACKS_SIZE { - let ctx = self.get_context(&self.fill_up_context)?; + let ctx = self.get_context(self.fill_up_context)?; let track = match ctx.tracks.get(new_index) { None if self.repeat_context() => { let delimiter = Self::new_delimiter(iteration.into()); @@ -292,14 +301,14 @@ impl<'ct> ConnectState { // transition to autoplay as fill up context self.fill_up_context = ContextType::Autoplay; - new_index = self.get_context(&ContextType::Autoplay)?.index.track as usize; + new_index = self.get_context(ContextType::Autoplay)?.index.track as usize; // add delimiter to only display the current context Self::new_delimiter(iteration.into()) } None if self.autoplay_context.is_some() => { match self - .get_context(&ContextType::Autoplay)? + .get_context(ContextType::Autoplay)? .tracks .get(new_index) { diff --git a/connect/src/state/transfer.rs b/connect/src/state/transfer.rs index feafb698..384087c7 100644 --- a/connect/src/state/transfer.rs +++ b/connect/src/state/transfer.rs @@ -86,7 +86,7 @@ impl ConnectState { self.set_active_context(context_ty); self.fill_up_context = context_ty; - let ctx = self.get_context(&self.active_context).ok(); + let ctx = self.get_context(self.active_context)?; let current_index = if track.is_queue() { Self::find_index_in_context(ctx, |c| c.uid == transfer.current_session.current_uid) @@ -99,7 +99,7 @@ impl ConnectState { "active track is <{}> with index {current_index:?} in {:?} context, has {} tracks", track.uri, self.active_context, - ctx.map(|c| c.tracks.len()).unwrap_or_default() + ctx.tracks.len() ); if self.player().track.is_none() {