connect: resolve context rework

- resolved contexts independently, by that we don't block our main loop
- move resolve logic into own file
- polish handling for play and transfer
This commit is contained in:
Felix Prillwitz 2024-12-11 22:12:05 +01:00
parent 673ded3d57
commit d3ae5c0820
No known key found for this signature in database
GPG key ID: DE334B43606D1455
8 changed files with 474 additions and 303 deletions

View file

@ -1,54 +1,92 @@
use crate::state::ConnectState; use crate::{
use librespot_protocol::player::Context; core::{Error, Session},
protocol::{
autoplay_context_request::AutoplayContextRequest,
player::{Context, TransferState},
},
state::{context::UpdateContext, ConnectState},
};
use std::{ use std::{
collections::{HashMap, VecDeque},
fmt::{Display, Formatter}, fmt::{Display, Formatter},
hash::{Hash, Hasher}, hash::{Hash, Hasher},
time::Duration,
}; };
use thiserror::Error as ThisError;
use tokio::time::Instant;
#[derive(Debug, Clone)]
enum Resolve {
Uri(String),
Context(Context),
}
#[derive(Debug, Clone)]
pub(super) enum ContextAction {
Append,
Replace,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(super) struct ResolveContext { pub(super) struct ResolveContext {
context: Context, resolve: Resolve,
fallback: Option<String>, fallback: Option<String>,
autoplay: bool, update: UpdateContext,
action: ContextAction,
} }
impl ResolveContext { impl ResolveContext {
pub fn from_uri(uri: impl Into<String>, fallback: impl Into<String>, autoplay: bool) -> Self { fn append_context(uri: impl Into<String>) -> Self {
let fallback_uri = fallback.into();
Self { Self {
context: Context { resolve: Resolve::Uri(uri.into()),
uri: uri.into(), fallback: None,
..Default::default() update: UpdateContext::Default,
}, action: ContextAction::Append,
fallback: (!fallback_uri.is_empty()).then_some(fallback_uri),
autoplay,
} }
} }
pub fn from_context(context: Context, autoplay: bool) -> Self { pub fn from_uri(
uri: impl Into<String>,
fallback: impl Into<String>,
update: UpdateContext,
action: ContextAction,
) -> Self {
let fallback_uri = fallback.into();
Self { Self {
context, resolve: Resolve::Uri(uri.into()),
fallback: (!fallback_uri.is_empty()).then_some(fallback_uri),
update,
action,
}
}
pub fn from_context(context: Context, update: UpdateContext, action: ContextAction) -> Self {
Self {
resolve: Resolve::Context(context),
fallback: None, fallback: None,
autoplay, update,
action,
} }
} }
/// the uri which should be used to resolve the context, might not be the context uri /// the uri which should be used to resolve the context, might not be the context uri
pub fn resolve_uri(&self) -> Option<&String> { fn resolve_uri(&self) -> Option<&str> {
// it's important to call this always, or at least for every ResolveContext // it's important to call this always, or at least for every ResolveContext
// otherwise we might not even check if we need to fallback and just use the fallback uri // otherwise we might not even check if we need to fallback and just use the fallback uri
ConnectState::get_context_uri_from_context(&self.context) match self.resolve {
.and_then(|s| (!s.is_empty()).then_some(s)) Resolve::Uri(ref uri) => ConnectState::valid_resolve_uri(uri),
.or(self.fallback.as_ref()) Resolve::Context(ref ctx) => {
ConnectState::get_context_uri_from_context(ctx).or(self.fallback.as_deref())
}
}
} }
/// the actual context uri /// the actual context uri
pub fn context_uri(&self) -> &str { fn context_uri(&self) -> &str {
&self.context.uri match self.resolve {
Resolve::Uri(ref uri) => uri,
Resolve::Context(ref ctx) => &ctx.uri,
} }
pub fn autoplay(&self) -> bool {
self.autoplay
} }
} }
@ -56,10 +94,10 @@ impl Display for ResolveContext {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!( write!(
f, f,
"resolve_uri: <{:?}>, context_uri: <{}>, autoplay: <{}>", "resolve_uri: <{:?}>, context_uri: <{}>, update: <{:?}>",
self.resolve_uri(), self.resolve_uri(),
self.context.uri, self.context_uri(),
self.autoplay, self.update,
) )
} }
} }
@ -68,7 +106,7 @@ impl PartialEq for ResolveContext {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
let eq_context = self.context_uri() == other.context_uri(); let eq_context = self.context_uri() == other.context_uri();
let eq_resolve = self.resolve_uri() == other.resolve_uri(); let eq_resolve = self.resolve_uri() == other.resolve_uri();
let eq_autoplay = self.autoplay == other.autoplay; let eq_autoplay = self.update == other.update;
eq_context && eq_resolve && eq_autoplay eq_context && eq_resolve && eq_autoplay
} }
@ -80,12 +118,227 @@ impl Hash for ResolveContext {
fn hash<H: Hasher>(&self, state: &mut H) { fn hash<H: Hasher>(&self, state: &mut H) {
self.context_uri().hash(state); self.context_uri().hash(state);
self.resolve_uri().hash(state); self.resolve_uri().hash(state);
self.autoplay.hash(state); self.update.hash(state);
} }
} }
impl From<ResolveContext> for Context { #[derive(Debug, ThisError)]
fn from(value: ResolveContext) -> Self { enum ContextResolverError {
value.context #[error("no next context to resolve")]
NoNext,
#[error("tried appending context with {0} pages")]
UnexpectedPagesSize(usize),
#[error("tried resolving not allowed context: {0:?}")]
NotAllowedContext(String),
}
impl From<ContextResolverError> for Error {
fn from(value: ContextResolverError) -> Self {
Error::failed_precondition(value)
}
}
pub struct ContextResolver {
session: Session,
queue: VecDeque<ResolveContext>,
unavailable_contexts: HashMap<ResolveContext, Instant>,
}
// time after which an unavailable context is retried
const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600);
impl ContextResolver {
pub fn new(session: Session) -> Self {
Self {
session,
queue: VecDeque::new(),
unavailable_contexts: HashMap::new(),
}
}
pub fn add(&mut self, resolve: ResolveContext) {
let last_try = self
.unavailable_contexts
.get(&resolve)
.map(|i| i.duration_since(Instant::now()));
let last_try = if matches!(last_try, Some(last_try) if last_try > RETRY_UNAVAILABLE) {
let _ = self.unavailable_contexts.remove(&resolve);
debug!(
"context was requested {}s ago, trying again to resolve the requested context",
last_try.expect("checked by condition").as_secs()
);
None
} else {
last_try
};
if last_try.is_some() {
debug!("tried loading unavailable context: {resolve}");
return;
} else if self.queue.contains(&resolve) {
debug!("update for {resolve} is already added");
return;
}
self.queue.push_back(resolve)
}
pub fn add_list(&mut self, resolve: Vec<ResolveContext>) {
for resolve in resolve {
self.add(resolve)
}
}
pub fn remove_used_and_invalid(&mut self) {
if let Some((_, _, remove)) = self.find_next() {
for _ in 0..remove {
let _ = self.queue.pop_front();
}
}
self.queue.pop_front();
}
pub fn clear(&mut self) {
self.queue = VecDeque::new()
}
fn find_next(&self) -> Option<(&ResolveContext, &str, usize)> {
let mut idx = 0;
loop {
let next = self.queue.front()?;
match next.resolve_uri() {
None => {
warn!("skipped {idx} because of no valid resolve_uri: {next}");
idx += 1;
continue;
}
Some(uri) => break Some((next, uri, idx)),
}
}
}
pub fn has_next(&self) -> bool {
self.find_next().is_some()
}
pub async fn get_next_context(
&self,
recent_track_uri: impl Fn() -> Vec<String>,
) -> Result<Context, Error> {
let (next, resolve_uri, _) = self.find_next().ok_or(ContextResolverError::NoNext)?;
match next.update {
UpdateContext::Default => {
let mut ctx = self.session.spclient().get_context(resolve_uri).await;
if let Ok(ctx) = ctx.as_mut() {
ctx.uri = next.context_uri().to_string();
ctx.url = format!("context://{}", ctx.uri);
}
ctx
}
UpdateContext::Autoplay => {
if resolve_uri.contains("spotify:show:") || resolve_uri.contains("spotify:episode:")
{
// autoplay is not supported for podcasts
Err(ContextResolverError::NotAllowedContext(
resolve_uri.to_string(),
))?
}
let request = AutoplayContextRequest {
context_uri: Some(resolve_uri.to_string()),
recent_track_uri: recent_track_uri(),
..Default::default()
};
self.session.spclient().get_autoplay_context(&request).await
}
}
}
pub fn mark_next_unavailable(&mut self) {
if let Some((next, _, _)) = self.find_next() {
self.unavailable_contexts
.insert(next.clone(), Instant::now());
}
}
pub fn handle_next_context(
&self,
state: &mut ConnectState,
mut context: Context,
) -> Result<Option<Vec<ResolveContext>>, Error> {
let (next, _, _) = self.find_next().ok_or(ContextResolverError::NoNext)?;
let remaining = match next.action {
ContextAction::Append if context.pages.len() == 1 => state
.fill_context_from_page(context.pages.remove(0))
.map(|_| None),
ContextAction::Replace => {
let remaining = state.update_context(context, next.update);
if let Resolve::Context(ref ctx) = next.resolve {
state.merge_context(Some(ctx.clone()));
}
remaining
}
ContextAction::Append => {
warn!("unexpected page size: {context:#?}");
Err(ContextResolverError::UnexpectedPagesSize(context.pages.len()).into())
}
}?;
Ok(remaining.map(|remaining| {
remaining
.into_iter()
.map(ResolveContext::append_context)
.collect::<Vec<_>>()
}))
}
pub fn try_finish(
&self,
state: &mut ConnectState,
transfer_state: &mut Option<TransferState>,
) -> bool {
let (next, _, _) = match self.find_next() {
None => return false,
Some(next) => next,
};
// when there is only one update type, we are the last of our kind, so we should update the state
if self
.queue
.iter()
.filter(|resolve| resolve.update == next.update)
.count()
!= 1
{
return false;
}
debug!("last item of type <{:?}> finishing state", next.update);
if let Some(transfer_state) = transfer_state.take() {
if let Err(why) = state.finish_transfer(transfer_state) {
error!("finishing setup of transfer failed: {why}")
}
}
let res = if state.shuffling_context() {
state.shuffle()
} else {
state.reset_playback_to_position(Some(state.player().index.track as usize))
};
if let Err(why) = res {
error!("setting up state failed after updating contexts: {why}")
}
state.update_restrictions();
state.update_queue_revision();
true
} }
} }

View file

@ -1,15 +1,6 @@
pub use crate::model::{PlayingTrack, SpircLoadCommand}; pub use crate::model::{PlayingTrack, SpircLoadCommand};
use crate::state::{context::ResetContext, metadata::Metadata};
use crate::{
context_resolver::ResolveContext,
model::SpircPlayStatus,
state::{
context::{ContextType, UpdateContext},
provider::IsProvider,
{ConnectState, ConnectStateConfig},
},
};
use crate::{ use crate::{
context_resolver::{ContextAction, ContextResolver, ResolveContext},
core::{ core::{
authentication::Credentials, authentication::Credentials,
dealer::{ dealer::{
@ -19,12 +10,12 @@ use crate::{
session::UserAttributes, session::UserAttributes,
Error, Session, SpotifyId, Error, Session, SpotifyId,
}, },
model::SpircPlayStatus,
playback::{ playback::{
mixer::Mixer, mixer::Mixer,
player::{Player, PlayerEvent, PlayerEventChannel}, player::{Player, PlayerEvent, PlayerEventChannel},
}, },
protocol::{ protocol::{
autoplay_context_request::AutoplayContextRequest,
connect::{Cluster, ClusterUpdate, LogoutCommand, SetVolumeCommand}, connect::{Cluster, ClusterUpdate, LogoutCommand, SetVolumeCommand},
explicit_content_pubsub::UserAttributesUpdate, explicit_content_pubsub::UserAttributesUpdate,
player::{Context, TransferState}, player::{Context, TransferState},
@ -32,11 +23,17 @@ use crate::{
social_connect_v2::{session::_host_active_device_id, SessionUpdate}, social_connect_v2::{session::_host_active_device_id, SessionUpdate},
user_attributes::UserAttributesMutation, user_attributes::UserAttributesMutation,
}, },
state::{
context::{
ResetContext, {ContextType, UpdateContext},
},
metadata::Metadata,
provider::IsProvider,
{ConnectState, ConnectStateConfig},
},
}; };
use futures_util::StreamExt; use futures_util::StreamExt;
use protobuf::MessageField; use protobuf::MessageField;
use std::collections::HashMap;
use std::time::Instant;
use std::{ use std::{
future::Future, future::Future,
sync::atomic::{AtomicUsize, Ordering}, sync::atomic::{AtomicUsize, Ordering},
@ -94,17 +91,11 @@ struct SpircTask {
commands: Option<mpsc::UnboundedReceiver<SpircCommand>>, commands: Option<mpsc::UnboundedReceiver<SpircCommand>>,
player_events: Option<PlayerEventChannel>, player_events: Option<PlayerEventChannel>,
context_resolver: ContextResolver,
shutdown: bool, shutdown: bool,
session: Session, session: Session,
/// the list of contexts to resolve
resolve_context: Vec<ResolveContext>,
/// contexts may not be resolvable at the moment so we should ignore any further request
///
/// an unavailable context is retried after [RETRY_UNAVAILABLE]
unavailable_contexts: HashMap<ResolveContext, Instant>,
/// is set when transferring, and used after resolving the contexts to finish the transfer /// is set when transferring, and used after resolving the contexts to finish the transfer
pub transfer_state: Option<TransferState>, pub transfer_state: Option<TransferState>,
@ -145,14 +136,10 @@ const CONTEXT_FETCH_THRESHOLD: usize = 2;
const VOLUME_STEP_SIZE: u16 = 1024; // (u16::MAX + 1) / VOLUME_STEPS const VOLUME_STEP_SIZE: u16 = 1024; // (u16::MAX + 1) / VOLUME_STEPS
// delay to resolve a bundle of context updates, delaying the update prevents duplicate context updates of the same type
const RESOLVE_CONTEXT_DELAY: Duration = Duration::from_millis(600);
// time after which an unavailable context is retried
const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600);
// delay to update volume after a certain amount of time, instead on each update request // delay to update volume after a certain amount of time, instead on each update request
const VOLUME_UPDATE_DELAY: Duration = Duration::from_secs(2); const VOLUME_UPDATE_DELAY: Duration = Duration::from_secs(2);
// to reduce updates to remote, we group some request by waiting for a set amount of time // to reduce updates to remote, we group some request by waiting for a set amount of time
const UPDATE_STATE_DELAY: Duration = Duration::from_millis(400); const UPDATE_STATE_DELAY: Duration = Duration::from_millis(300);
pub struct Spirc { pub struct Spirc {
commands: mpsc::UnboundedSender<SpircCommand>, commands: mpsc::UnboundedSender<SpircCommand>,
@ -250,11 +237,11 @@ impl Spirc {
commands: Some(cmd_rx), commands: Some(cmd_rx),
player_events: Some(player_events), player_events: Some(player_events),
context_resolver: ContextResolver::new(session.clone()),
shutdown: false, shutdown: false,
session, session,
resolve_context: Vec::new(),
unavailable_contexts: HashMap::new(),
transfer_state: None, transfer_state: None,
update_volume: false, update_volume: false,
update_state: false, update_state: false,
@ -360,6 +347,10 @@ impl SpircTask {
let commands = self.commands.as_mut(); let commands = self.commands.as_mut();
let player_events = self.player_events.as_mut(); let player_events = self.player_events.as_mut();
// when state and volume update have a higher priority than context resolving
// because of that the context resolving has to wait, so that the other tasks can finish
let allow_context_resolving = !self.update_state && !self.update_volume;
tokio::select! { tokio::select! {
// startup of the dealer requires a connection_id, which is retrieved at the very beginning // startup of the dealer requires a connection_id, which is retrieved at the very beginning
connection_id_update = self.connection_id_update.next() => unwrap! { connection_id_update = self.connection_id_update.next() => unwrap! {
@ -422,7 +413,7 @@ impl SpircTask {
} }
}, },
event = async { player_events?.recv().await }, if player_events.is_some() => if let Some(event) = event { event = async { player_events?.recv().await }, if player_events.is_some() => if let Some(event) = event {
if let Err(e) = self.handle_player_event(event).await { if let Err(e) = self.handle_player_event(event) {
error!("could not dispatch player event: {}", e); error!("could not dispatch player event: {}", e);
} }
}, },
@ -430,12 +421,9 @@ impl SpircTask {
self.update_state = false; self.update_state = false;
if let Err(why) = self.notify().await { if let Err(why) = self.notify().await {
error!("ContextError: {why}") error!("state update: {why}")
} }
}, },
_ = async { sleep(RESOLVE_CONTEXT_DELAY).await }, if !self.resolve_context.is_empty() => {
self.handle_resolve_context().await
},
_ = async { sleep(VOLUME_UPDATE_DELAY).await }, if self.update_volume => { _ = async { sleep(VOLUME_UPDATE_DELAY).await }, if self.update_volume => {
self.update_volume = false; self.update_volume = false;
@ -451,6 +439,21 @@ impl SpircTask {
error!("error updating connect state for volume update: {why}") error!("error updating connect state for volume update: {why}")
} }
}, },
// context resolver handling, the idea/reason behind it the following:
//
// when we request a context that has multiple pages (for example an artist)
// resolving all pages at once can take around ~1-30sec, when we resolve
// everything at once that would block our main loop for that time
//
// to circumvent this behavior, we request each context separately here and
// finish after we received our last item of a type
next_context = async {
self.context_resolver.get_next_context(|| {
self.connect_state.prev_autoplay_track_uris()
}).await
}, if allow_context_resolving && self.context_resolver.has_next() => {
self.handle_context(next_context)
},
else => break else => break
} }
} }
@ -468,175 +471,43 @@ impl SpircTask {
self.session.dealer().close().await; self.session.dealer().close().await;
} }
async fn handle_resolve_context(&mut self) { fn handle_context(&mut self, next_context: Result<Context, Error>) {
let mut last_resolve = None::<ResolveContext>; let next_context = match next_context {
while let Some(resolve) = self.resolve_context.pop() { Err(why) => {
if matches!(last_resolve, Some(ref last_resolve) if last_resolve == &resolve) { self.context_resolver.mark_next_unavailable();
debug!("did already update the context for {resolve}"); self.context_resolver.remove_used_and_invalid();
continue; error!("{why}");
} else {
last_resolve = Some(resolve.clone());
let resolve_uri = match resolve.resolve_uri() {
Some(resolve) => resolve,
None => {
warn!("tried to resolve context without resolve_uri: {resolve}");
return; return;
} }
Ok(ctx) => ctx,
}; };
debug!("resolving: {resolve}"); match self
// the autoplay endpoint can return a 404, when it tries to retrieve an .context_resolver
// autoplay context for an empty playlist as it seems .handle_next_context(&mut self.connect_state, next_context)
if let Err(why) = self
.resolve_context(resolve_uri, resolve.context_uri(), resolve.autoplay())
.await
{ {
error!("failed resolving context <{resolve}>: {why}"); Ok(remaining) => {
self.unavailable_contexts.insert(resolve, Instant::now()); if let Some(remaining) = remaining {
continue; self.context_resolver.add_list(remaining)
} }
}
self.connect_state.merge_context(Some(resolve.into())); Err(why) => {
error!("{why}")
} }
} }
if let Some(transfer_state) = self.transfer_state.take() { if self
if let Err(why) = self.connect_state.finish_transfer(transfer_state) { .context_resolver
error!("finishing setup of transfer failed: {why}") .try_finish(&mut self.connect_state, &mut self.transfer_state)
} {
} self.add_autoplay_resolving_when_required();
if matches!(self.connect_state.active_context, ContextType::Default) {
let ctx = self.connect_state.context.as_ref();
if matches!(ctx, Some(ctx) if ctx.tracks.is_empty()) {
self.connect_state.clear_next_tracks(true);
// skip to the next queued track, otherwise it should stop
let _ = self.handle_next(None);
}
}
if let Err(why) = self.connect_state.fill_up_next_tracks() {
error!("fill up of next tracks failed after updating contexts: {why}")
}
self.connect_state.update_restrictions();
self.connect_state.update_queue_revision();
self.preload_autoplay_when_required();
self.update_state = true; self.update_state = true;
} }
async fn resolve_context( self.context_resolver.remove_used_and_invalid();
&mut self,
resolve_uri: &str,
context_uri: &str,
autoplay: bool,
) -> Result<(), Error> {
if !autoplay {
let mut ctx = self.session.spclient().get_context(resolve_uri).await?;
ctx.uri = context_uri.to_string();
ctx.url = format!("context://{context_uri}");
if let Some(remaining) = self
.connect_state
.update_context(ctx, UpdateContext::Default)?
{
self.try_resolve_remaining(remaining).await;
} }
return Ok(()); // todo: is the time_delta still necessary?
}
// refuse resolve of not supported autoplay context
if resolve_uri.contains("spotify:show:") || resolve_uri.contains("spotify:episode:") {
// autoplay is not supported for podcasts
Err(SpircError::NotAllowedContext(resolve_uri.to_string()))?
}
// resolve autoplay
let previous_tracks = self.connect_state.prev_autoplay_track_uris();
debug!(
"requesting autoplay context <{resolve_uri}> with {} previous tracks",
previous_tracks.len()
);
let ctx_request = AutoplayContextRequest {
context_uri: Some(resolve_uri.to_string()),
recent_track_uri: previous_tracks,
..Default::default()
};
let context = self
.session
.spclient()
.get_autoplay_context(&ctx_request)
.await?;
if let Some(remaining) = self
.connect_state
.update_context(context, UpdateContext::Autoplay)?
{
self.try_resolve_remaining(remaining).await;
}
Ok(())
}
async fn try_resolve_remaining(&mut self, remaining: Vec<String>) {
for resolve_uri in remaining {
let mut ctx = match self.session.spclient().get_context(&resolve_uri).await {
Ok(ctx) => ctx,
Err(why) => {
warn!("failed to retrieve context for remaining <{resolve_uri}>: {why}");
continue;
}
};
if ctx.pages.len() > 1 {
warn!("context contained more page then expected: {ctx:#?}");
continue;
}
debug!("appending context from single page, adding: <{}>", ctx.uri);
if let Err(why) = self
.connect_state
.fill_context_from_page(ctx.pages.remove(0))
{
warn!("failed appending context <{resolve_uri}>: {why}");
}
}
}
fn add_resolve_context(&mut self, resolve: ResolveContext) {
let last_try = self
.unavailable_contexts
.get(&resolve)
.map(|i| i.duration_since(Instant::now()));
let last_try = if matches!(last_try, Some(last_try) if last_try > RETRY_UNAVAILABLE) {
let _ = self.unavailable_contexts.remove(&resolve);
debug!(
"context was requested {}s ago, trying again to resolve the requested context",
last_try.expect("checked by condition").as_secs()
);
None
} else {
last_try
};
if last_try.is_none() {
debug!("add resolve request: {resolve}");
self.resolve_context.push(resolve);
} else {
debug!("tried loading unavailable context: {resolve}")
}
}
// todo: time_delta still necessary?
fn now_ms(&self) -> i64 { fn now_ms(&self) -> i64 {
let dur = SystemTime::now() let dur = SystemTime::now()
.duration_since(UNIX_EPOCH) .duration_since(UNIX_EPOCH)
@ -687,7 +558,7 @@ impl SpircTask {
self.notify().await self.notify().await
} }
async fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> {
if let PlayerEvent::TrackChanged { audio_item } = event { if let PlayerEvent::TrackChanged { audio_item } = event {
self.connect_state.update_duration(audio_item.duration_ms); self.connect_state.update_duration(audio_item.duration_ms);
return Ok(()); return Ok(());
@ -910,7 +781,7 @@ impl SpircTask {
self.player self.player
.emit_auto_play_changed_event(matches!(new_value, "1")); .emit_auto_play_changed_event(matches!(new_value, "1"));
self.preload_autoplay_when_required() self.add_autoplay_resolving_when_required()
} }
} else { } else {
trace!( trace!(
@ -993,9 +864,10 @@ impl SpircTask {
update_context.context.uri, self.connect_state.context_uri() update_context.context.uri, self.connect_state.context_uri()
) )
} else { } else {
self.add_resolve_context(ResolveContext::from_context( self.context_resolver.add(ResolveContext::from_context(
update_context.context, update_context.context,
false, super::state::context::UpdateContext::Default,
ContextAction::Replace,
)) ))
} }
return Ok(()); return Ok(());
@ -1100,7 +972,12 @@ impl SpircTask {
let fallback = self.connect_state.current_track(|t| &t.uri).clone(); let fallback = self.connect_state.current_track(|t| &t.uri).clone();
self.add_resolve_context(ResolveContext::from_uri(ctx_uri.clone(), &fallback, false)); self.context_resolver.add(ResolveContext::from_uri(
ctx_uri.clone(),
&fallback,
UpdateContext::Default,
ContextAction::Replace,
));
let timestamp = self.now_ms(); let timestamp = self.now_ms();
let state = &mut self.connect_state; let state = &mut self.connect_state;
@ -1123,7 +1000,12 @@ impl SpircTask {
if self.connect_state.current_track(|t| t.is_autoplay()) || autoplay { if self.connect_state.current_track(|t| t.is_autoplay()) || autoplay {
debug!("currently in autoplay context, async resolving autoplay for {ctx_uri}"); debug!("currently in autoplay context, async resolving autoplay for {ctx_uri}");
self.add_resolve_context(ResolveContext::from_uri(ctx_uri, fallback, true)) self.context_resolver.add(ResolveContext::from_uri(
ctx_uri,
fallback,
UpdateContext::Autoplay,
ContextAction::Replace,
))
} }
self.transfer_state = Some(transfer); self.transfer_state = Some(transfer);
@ -1132,6 +1014,7 @@ impl SpircTask {
} }
async fn handle_disconnect(&mut self) -> Result<(), Error> { async fn handle_disconnect(&mut self) -> Result<(), Error> {
self.context_resolver.clear();
self.handle_stop(); self.handle_stop();
self.play_status = SpircPlayStatus::Stopped {}; self.play_status = SpircPlayStatus::Stopped {};
@ -1191,6 +1074,8 @@ impl SpircTask {
cmd: SpircLoadCommand, cmd: SpircLoadCommand,
context: Option<Context>, context: Option<Context>,
) -> Result<(), Error> { ) -> Result<(), Error> {
self.context_resolver.clear();
self.connect_state self.connect_state
.reset_context(ResetContext::WhenDifferent(&cmd.context_uri)); .reset_context(ResetContext::WhenDifferent(&cmd.context_uri));
@ -1206,15 +1091,20 @@ impl SpircTask {
} }
} else { } else {
&cmd.context_uri &cmd.context_uri
} };
.clone();
if current_context_uri == &cmd.context_uri && fallback == cmd.context_uri { if current_context_uri == &cmd.context_uri && fallback == cmd.context_uri {
debug!("context <{current_context_uri}> didn't change, no resolving required") debug!("context <{current_context_uri}> didn't change, no resolving required")
} else { } else {
debug!("resolving context for load command"); debug!("resolving context for load command");
self.resolve_context(&fallback, &cmd.context_uri, false) self.context_resolver.add(ResolveContext::from_uri(
.await?; &cmd.context_uri,
fallback,
UpdateContext::Default,
ContextAction::Replace,
));
let context = self.context_resolver.get_next_context(Vec::new).await;
self.handle_context(context);
} }
// for play commands with skip by uid, the context of the command contains // for play commands with skip by uid, the context of the command contains
@ -1228,11 +1118,11 @@ impl SpircTask {
let index = match cmd.playing_track { let index = match cmd.playing_track {
PlayingTrack::Index(i) => i as usize, PlayingTrack::Index(i) => i as usize,
PlayingTrack::Uri(uri) => { PlayingTrack::Uri(uri) => {
let ctx = self.connect_state.context.as_ref(); let ctx = self.connect_state.get_context(ContextType::Default)?;
ConnectState::find_index_in_context(ctx, |t| t.uri == uri)? ConnectState::find_index_in_context(ctx, |t| t.uri == uri)?
} }
PlayingTrack::Uid(uid) => { PlayingTrack::Uid(uid) => {
let ctx = self.connect_state.context.as_ref(); let ctx = self.connect_state.get_context(ContextType::Default)?;
ConnectState::find_index_in_context(ctx, |t| t.uid == uid)? ConnectState::find_index_in_context(ctx, |t| t.uid == uid)?
} }
}; };
@ -1244,18 +1134,21 @@ impl SpircTask {
self.connect_state.set_shuffle(cmd.shuffle); self.connect_state.set_shuffle(cmd.shuffle);
self.connect_state.set_repeat_context(cmd.repeat); self.connect_state.set_repeat_context(cmd.repeat);
self.connect_state.set_repeat_track(cmd.repeat_track);
if cmd.shuffle { if cmd.shuffle {
self.connect_state.set_current_track(index)?; self.connect_state.set_current_track_random()?;
self.connect_state.shuffle()?;
if self.context_resolver.has_next() {
self.connect_state.update_queue_revision()
} else {
self.connect_state.shuffle()?;
}
} else { } else {
// manually overwrite a possible current queued track
self.connect_state.set_current_track(index)?; self.connect_state.set_current_track(index)?;
self.connect_state.reset_playback_to_position(Some(index))?; self.connect_state.reset_playback_to_position(Some(index))?;
} }
self.connect_state.set_repeat_track(cmd.repeat_track);
if self.connect_state.current_track(MessageField::is_some) { if self.connect_state.current_track(MessageField::is_some) {
self.load_track(cmd.start_playing, cmd.seek_to)?; self.load_track(cmd.start_playing, cmd.seek_to)?;
} else { } else {
@ -1263,8 +1156,6 @@ impl SpircTask {
self.handle_stop() self.handle_stop()
} }
self.preload_autoplay_when_required();
Ok(()) Ok(())
} }
@ -1383,7 +1274,7 @@ impl SpircTask {
Ok(()) Ok(())
} }
fn preload_autoplay_when_required(&mut self) { fn add_autoplay_resolving_when_required(&mut self) {
let require_load_new = !self let require_load_new = !self
.connect_state .connect_state
.has_next_tracks(Some(CONTEXT_FETCH_THRESHOLD)) .has_next_tracks(Some(CONTEXT_FETCH_THRESHOLD))
@ -1395,9 +1286,25 @@ impl SpircTask {
let current_context = self.connect_state.context_uri(); let current_context = self.connect_state.context_uri();
let fallback = self.connect_state.current_track(|t| &t.uri); let fallback = self.connect_state.current_track(|t| &t.uri);
let resolve = ResolveContext::from_uri(current_context, fallback, true);
self.add_resolve_context(resolve); let has_tracks = self
.connect_state
.get_context(ContextType::Autoplay)
.map(|c| !c.tracks.is_empty())
.unwrap_or_default();
let resolve = ResolveContext::from_uri(
current_context,
fallback,
UpdateContext::Autoplay,
if has_tracks {
ContextAction::Append
} else {
ContextAction::Replace
},
);
self.context_resolver.add(resolve);
} }
fn handle_next(&mut self, track_uri: Option<String>) -> Result<(), Error> { fn handle_next(&mut self, track_uri: Option<String>) -> Result<(), Error> {
@ -1420,7 +1327,7 @@ impl SpircTask {
}; };
}; };
self.preload_autoplay_when_required(); self.add_autoplay_resolving_when_required();
if has_next_track { if has_next_track {
self.load_track(continue_playing, 0) self.load_track(continue_playing, 0)
@ -1478,10 +1385,11 @@ impl SpircTask {
} }
debug!("playlist modification for current context: {uri}"); debug!("playlist modification for current context: {uri}");
self.add_resolve_context(ResolveContext::from_uri( self.context_resolver.add(ResolveContext::from_uri(
uri, uri,
self.connect_state.current_track(|t| &t.uri), self.connect_state.current_track(|t| &t.uri),
false, UpdateContext::Default,
ContextAction::Replace,
)); ));
Ok(()) Ok(())

View file

@ -100,17 +100,17 @@ pub struct ConnectState {
unavailable_uri: Vec<String>, unavailable_uri: Vec<String>,
pub active_since: Option<SystemTime>, active_since: Option<SystemTime>,
queue_count: u64, queue_count: u64,
// separation is necessary because we could have already loaded // separation is necessary because we could have already loaded
// the autoplay context but are still playing from the default context // the autoplay context but are still playing from the default context
/// to update the active context use [switch_active_context](ConnectState::set_active_context) /// to update the active context use [switch_active_context](ConnectState::set_active_context)
pub active_context: ContextType, active_context: ContextType,
pub fill_up_context: ContextType, fill_up_context: ContextType,
/// the context from which we play, is used to top up prev and next tracks /// the context from which we play, is used to top up prev and next tracks
pub context: Option<StateContext>, context: Option<StateContext>,
/// a context to keep track of our shuffled context, /// a context to keep track of our shuffled context,
/// should be only available when `player.option.shuffling_context` is true /// should be only available when `player.option.shuffling_context` is true
@ -359,7 +359,7 @@ impl ConnectState {
self.clear_prev_track(); self.clear_prev_track();
if new_index > 0 { if new_index > 0 {
let context = self.get_context(&self.active_context)?; let context = self.get_context(self.active_context)?;
let before_new_track = context.tracks.len() - new_index; let before_new_track = context.tracks.len() - new_index;
self.player_mut().prev_tracks = context self.player_mut().prev_tracks = context

View file

@ -1,7 +1,9 @@
use crate::state::{metadata::Metadata, provider::Provider, ConnectState, StateError}; use crate::{
use librespot_core::{Error, SpotifyId}; core::{Error, SpotifyId},
use librespot_protocol::player::{ protocol::player::{
Context, ContextIndex, ContextPage, ContextTrack, ProvidedTrack, Restrictions, Context, ContextIndex, ContextPage, ContextTrack, ProvidedTrack, Restrictions,
},
state::{metadata::Metadata, provider::Provider, ConnectState, StateError},
}; };
use protobuf::MessageField; use protobuf::MessageField;
use std::collections::HashMap; use std::collections::HashMap;
@ -27,7 +29,7 @@ pub enum ContextType {
Autoplay, Autoplay,
} }
#[derive(Debug)] #[derive(Debug, PartialEq, Hash, Copy, Clone)]
pub enum UpdateContext { pub enum UpdateContext {
Default, Default,
Autoplay, Autoplay,
@ -62,26 +64,22 @@ fn page_url_to_uri(page_url: &str) -> String {
impl ConnectState { impl ConnectState {
pub fn find_index_in_context<F: Fn(&ProvidedTrack) -> bool>( pub fn find_index_in_context<F: Fn(&ProvidedTrack) -> bool>(
context: Option<&StateContext>, ctx: &StateContext,
f: F, f: F,
) -> Result<usize, StateError> { ) -> Result<usize, StateError> {
let ctx = context
.as_ref()
.ok_or(StateError::NoContext(ContextType::Default))?;
ctx.tracks ctx.tracks
.iter() .iter()
.position(f) .position(f)
.ok_or(StateError::CanNotFindTrackInContext(None, ctx.tracks.len())) .ok_or(StateError::CanNotFindTrackInContext(None, ctx.tracks.len()))
} }
pub(super) fn get_context(&self, ty: &ContextType) -> Result<&StateContext, StateError> { pub fn get_context(&self, ty: ContextType) -> Result<&StateContext, StateError> {
match ty { match ty {
ContextType::Default => self.context.as_ref(), ContextType::Default => self.context.as_ref(),
ContextType::Shuffle => self.shuffle_context.as_ref(), ContextType::Shuffle => self.shuffle_context.as_ref(),
ContextType::Autoplay => self.autoplay_context.as_ref(), ContextType::Autoplay => self.autoplay_context.as_ref(),
} }
.ok_or(StateError::NoContext(*ty)) .ok_or(StateError::NoContext(ty))
} }
pub fn context_uri(&self) -> &String { pub fn context_uri(&self) -> &String {
@ -117,21 +115,24 @@ impl ConnectState {
self.update_restrictions() self.update_restrictions()
} }
pub fn get_context_uri_from_context(context: &Context) -> Option<&String> { pub fn valid_resolve_uri(uri: &str) -> Option<&str> {
if !context.uri.starts_with(SEARCH_IDENTIFIER) { (!uri.starts_with(SEARCH_IDENTIFIER)).then_some(uri)
return Some(&context.uri);
} }
context pub fn get_context_uri_from_context(context: &Context) -> Option<&str> {
match Self::valid_resolve_uri(&context.uri) {
Some(uri) => Some(uri),
None => context
.pages .pages
.first() .first()
.and_then(|p| p.tracks.first().map(|t| &t.uri)) .and_then(|p| p.tracks.first().map(|t| t.uri.as_ref())),
}
} }
pub fn set_active_context(&mut self, new_context: ContextType) { pub fn set_active_context(&mut self, new_context: ContextType) {
self.active_context = new_context; self.active_context = new_context;
let ctx = match self.get_context(&new_context) { let ctx = match self.get_context(new_context) {
Err(why) => { Err(why) => {
debug!("couldn't load context info because: {why}"); debug!("couldn't load context info because: {why}");
return; return;
@ -213,7 +214,7 @@ impl ConnectState {
if !self.context_uri().contains(SEARCH_IDENTIFIER) if !self.context_uri().contains(SEARCH_IDENTIFIER)
&& self.context_uri() == &context.uri && self.context_uri() == &context.uri
{ {
match Self::find_index_in_context(Some(&new_context), |t| { match Self::find_index_in_context(&new_context, |t| {
self.current_track(|t| &t.uri) == &t.uri self.current_track(|t| &t.uri) == &t.uri
}) { }) {
Ok(new_pos) => { Ok(new_pos) => {
@ -326,12 +327,11 @@ impl ConnectState {
} }
if let Ok(position) = if let Ok(position) =
Self::find_index_in_context(Some(current_context), |t| t.uri == new_track.uri) Self::find_index_in_context(current_context, |t| t.uri == new_track.uri)
{ {
let context_track = current_context.tracks.get_mut(position)?; let context_track = current_context.tracks.get_mut(position)?;
for (key, value) in new_track.metadata { for (key, value) in new_track.metadata {
warn!("merging metadata {key} {value}");
context_track.metadata.insert(key, value); context_track.metadata.insert(key, value);
} }

View file

@ -1,5 +1,10 @@
use crate::state::{context::ResetContext, ConnectState}; use crate::{
use librespot_core::{dealer::protocol::SetQueueCommand, Error}; core::{dealer::protocol::SetQueueCommand, Error},
state::{
context::{ContextType, ResetContext},
ConnectState,
},
};
use protobuf::MessageField; use protobuf::MessageField;
impl ConnectState { impl ConnectState {
@ -16,7 +21,7 @@ impl ConnectState {
return Ok(()); return Ok(());
} }
let ctx = self.context.as_ref(); let ctx = self.get_context(ContextType::Default)?;
let current_index = let current_index =
ConnectState::find_index_in_context(ctx, |c| self.current_track(|t| c.uri == t.uri))?; ConnectState::find_index_in_context(ctx, |c| self.current_track(|t| c.uri == t.uri))?;
@ -52,7 +57,7 @@ impl ConnectState {
self.set_shuffle(false); self.set_shuffle(false);
self.reset_context(ResetContext::DefaultIndex); self.reset_context(ResetContext::DefaultIndex);
let ctx = self.context.as_ref(); let ctx = self.get_context(ContextType::Default)?;
let current_track = ConnectState::find_index_in_context(ctx, |t| { let current_track = ConnectState::find_index_in_context(ctx, |t| {
self.current_track(|t| &t.uri) == &t.uri self.current_track(|t| &t.uri) == &t.uri
})?; })?;

View file

@ -51,12 +51,8 @@ impl ConnectState {
let current_uri = self.current_track(|t| &t.uri); let current_uri = self.current_track(|t| &t.uri);
let ctx = self let ctx = self.get_context(ContextType::Default)?;
.context let current_track = Self::find_index_in_context(ctx, |t| &t.uri == current_uri)?;
.as_ref()
.ok_or(StateError::NoContext(ContextType::Default))?;
let current_track = Self::find_index_in_context(Some(ctx), |t| &t.uri == current_uri)?;
let mut shuffle_context = ctx.clone(); let mut shuffle_context = ctx.clone();
// we don't need to include the current track, because it is already being played // we don't need to include the current track, because it is already being played

View file

@ -1,12 +1,15 @@
use crate::state::{ use crate::{
core::{Error, SpotifyId},
protocol::player::ProvidedTrack,
state::{
context::ContextType, context::ContextType,
metadata::Metadata, metadata::Metadata,
provider::{IsProvider, Provider}, provider::{IsProvider, Provider},
ConnectState, StateError, SPOTIFY_MAX_NEXT_TRACKS_SIZE, SPOTIFY_MAX_PREV_TRACKS_SIZE, ConnectState, StateError, SPOTIFY_MAX_NEXT_TRACKS_SIZE, SPOTIFY_MAX_PREV_TRACKS_SIZE,
},
}; };
use librespot_core::{Error, SpotifyId};
use librespot_protocol::player::ProvidedTrack;
use protobuf::MessageField; use protobuf::MessageField;
use rand::Rng;
// identifier used as part of the uid // identifier used as part of the uid
pub const IDENTIFIER_DELIMITER: &str = "delimiter"; pub const IDENTIFIER_DELIMITER: &str = "delimiter";
@ -64,8 +67,14 @@ impl<'ct> ConnectState {
&self.player().next_tracks &self.player().next_tracks
} }
pub fn set_current_track_random(&mut self) -> Result<(), Error> {
let max_tracks = self.get_context(self.active_context)?.tracks.len();
let rng_track = rand::thread_rng().gen_range(0..max_tracks);
self.set_current_track(rng_track)
}
pub fn set_current_track(&mut self, index: usize) -> Result<(), Error> { pub fn set_current_track(&mut self, index: usize) -> Result<(), Error> {
let context = self.get_context(&self.active_context)?; let context = self.get_context(self.active_context)?;
let new_track = context let new_track = context
.tracks .tracks
@ -77,8 +86,8 @@ impl<'ct> ConnectState {
debug!( debug!(
"set track to: {} at {} of {} tracks", "set track to: {} at {} of {} tracks",
index,
new_track.uri, new_track.uri,
index,
context.tracks.len() context.tracks.len()
); );
@ -132,7 +141,7 @@ impl<'ct> ConnectState {
self.set_active_context(ContextType::Autoplay); self.set_active_context(ContextType::Autoplay);
None None
} else { } else {
let ctx = self.context.as_ref(); let ctx = self.get_context(ContextType::Default)?;
let new_index = Self::find_index_in_context(ctx, |c| c.uri == new_track.uri); let new_index = Self::find_index_in_context(ctx, |c| c.uri == new_track.uri);
match new_index { match new_index {
Ok(new_index) => Some(new_index as u32), Ok(new_index) => Some(new_index as u32),
@ -272,12 +281,12 @@ impl<'ct> ConnectState {
} }
pub fn fill_up_next_tracks(&mut self) -> Result<(), StateError> { pub fn fill_up_next_tracks(&mut self) -> Result<(), StateError> {
let ctx = self.get_context(&self.fill_up_context)?; let ctx = self.get_context(self.fill_up_context)?;
let mut new_index = ctx.index.track as usize; let mut new_index = ctx.index.track as usize;
let mut iteration = ctx.index.page; let mut iteration = ctx.index.page;
while self.next_tracks().len() < SPOTIFY_MAX_NEXT_TRACKS_SIZE { while self.next_tracks().len() < SPOTIFY_MAX_NEXT_TRACKS_SIZE {
let ctx = self.get_context(&self.fill_up_context)?; let ctx = self.get_context(self.fill_up_context)?;
let track = match ctx.tracks.get(new_index) { let track = match ctx.tracks.get(new_index) {
None if self.repeat_context() => { None if self.repeat_context() => {
let delimiter = Self::new_delimiter(iteration.into()); let delimiter = Self::new_delimiter(iteration.into());
@ -292,14 +301,14 @@ impl<'ct> ConnectState {
// transition to autoplay as fill up context // transition to autoplay as fill up context
self.fill_up_context = ContextType::Autoplay; self.fill_up_context = ContextType::Autoplay;
new_index = self.get_context(&ContextType::Autoplay)?.index.track as usize; new_index = self.get_context(ContextType::Autoplay)?.index.track as usize;
// add delimiter to only display the current context // add delimiter to only display the current context
Self::new_delimiter(iteration.into()) Self::new_delimiter(iteration.into())
} }
None if self.autoplay_context.is_some() => { None if self.autoplay_context.is_some() => {
match self match self
.get_context(&ContextType::Autoplay)? .get_context(ContextType::Autoplay)?
.tracks .tracks
.get(new_index) .get(new_index)
{ {

View file

@ -86,7 +86,7 @@ impl ConnectState {
self.set_active_context(context_ty); self.set_active_context(context_ty);
self.fill_up_context = context_ty; self.fill_up_context = context_ty;
let ctx = self.get_context(&self.active_context).ok(); let ctx = self.get_context(self.active_context)?;
let current_index = if track.is_queue() { let current_index = if track.is_queue() {
Self::find_index_in_context(ctx, |c| c.uid == transfer.current_session.current_uid) Self::find_index_in_context(ctx, |c| c.uid == transfer.current_session.current_uid)
@ -99,7 +99,7 @@ impl ConnectState {
"active track is <{}> with index {current_index:?} in {:?} context, has {} tracks", "active track is <{}> with index {current_index:?} in {:?} context, has {} tracks",
track.uri, track.uri,
self.active_context, self.active_context,
ctx.map(|c| c.tracks.len()).unwrap_or_default() ctx.tracks.len()
); );
if self.player().track.is_none() { if self.player().track.is_none() {