use std::{ convert::TryFrom, future::Future, pin::Pin, sync::atomic::{AtomicUsize, Ordering}, time::{SystemTime, UNIX_EPOCH}, }; use futures_util::{stream::FusedStream, FutureExt, StreamExt}; use protobuf::{self, Message}; use rand::seq::SliceRandom; use thiserror::Error; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::{ config::ConnectConfig, context::{PageContext, StationContext}, core::{ authentication::Credentials, mercury::MercurySender, session::UserAttributes, util::SeqGenerator, version, Error, Session, SpotifyId, }, playback::{ mixer::Mixer, player::{Player, PlayerEvent, PlayerEventChannel}, }, protocol::{ self, explicit_content_pubsub::UserAttributesUpdate, spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef}, user_attributes::UserAttributesMutation, }, }; #[derive(Debug, Error)] pub enum SpircError { #[error("response payload empty")] NoData, #[error("playback of local files is not supported")] UnsupportedLocalPlayBack, #[error("message addressed at another ident: {0}")] Ident(String), #[error("message pushed for another URI")] InvalidUri(String), } impl From for Error { fn from(err: SpircError) -> Self { use SpircError::*; match err { NoData | UnsupportedLocalPlayBack => Error::unavailable(err), Ident(_) | InvalidUri(_) => Error::aborted(err), } } } #[derive(Debug)] enum SpircPlayStatus { Stopped, LoadingPlay { position_ms: u32, }, LoadingPause { position_ms: u32, }, Playing { nominal_start_time: i64, preloading_of_next_track_triggered: bool, }, Paused { position_ms: u32, preloading_of_next_track_triggered: bool, }, } type BoxedStream = Pin + Send>>; struct SpircTask { player: Player, mixer: Box, sequence: SeqGenerator, ident: String, device: DeviceState, state: State, play_request_id: Option, play_status: SpircPlayStatus, remote_update: BoxedStream>, connection_id_update: BoxedStream>, user_attributes_update: BoxedStream>, user_attributes_mutation: BoxedStream>, sender: MercurySender, commands: Option>, player_events: Option, shutdown: bool, session: Session, resolve_context: Option, autoplay_context: bool, context: Option, spirc_id: usize, } static SPIRC_COUNTER: AtomicUsize = AtomicUsize::new(0); #[derive(Debug)] pub enum SpircCommand { Play, PlayPause, Pause, Prev, Next, VolumeUp, VolumeDown, Shutdown, Shuffle(bool), Repeat(bool), Disconnect, SetPosition(u32), SetVolume(u16), } const CONTEXT_TRACKS_COUNT: usize = 50; const CONTEXT_TRACKS_HISTORY: usize = 10; const CONTEXT_FETCH_THRESHOLD: u32 = 5; const VOLUME_STEPS: i64 = 64; const VOLUME_STEP_SIZE: u16 = 1024; // (u16::MAX + 1) / VOLUME_STEPS pub struct Spirc { commands: mpsc::UnboundedSender, } fn initial_state() -> State { let mut frame = protocol::spirc::State::new(); frame.set_repeat(false); frame.set_shuffle(false); frame.set_status(PlayStatus::kPlayStatusStop); frame.set_position_ms(0); frame.set_position_measured_at(0); frame } fn initial_device_state(config: ConnectConfig) -> DeviceState { { let mut msg = DeviceState::new(); msg.set_sw_version(version::SEMVER.to_string()); msg.set_is_active(false); msg.set_can_play(true); msg.set_volume(0); msg.set_name(config.name); { let repeated = msg.mut_capabilities(); { let msg = repeated.push_default(); msg.set_typ(protocol::spirc::CapabilityType::kCanBePlayer); { let repeated = msg.mut_intValue(); repeated.push(1) }; msg }; { let msg = repeated.push_default(); msg.set_typ(protocol::spirc::CapabilityType::kDeviceType); { let repeated = msg.mut_intValue(); repeated.push(config.device_type as i64) }; msg }; { let msg = repeated.push_default(); msg.set_typ(protocol::spirc::CapabilityType::kGaiaEqConnectId); { let repeated = msg.mut_intValue(); repeated.push(1) }; msg }; { let msg = repeated.push_default(); msg.set_typ(protocol::spirc::CapabilityType::kSupportsLogout); { let repeated = msg.mut_intValue(); repeated.push(0) }; msg }; { let msg = repeated.push_default(); msg.set_typ(protocol::spirc::CapabilityType::kIsObservable); { let repeated = msg.mut_intValue(); repeated.push(1) }; msg }; { let msg = repeated.push_default(); msg.set_typ(protocol::spirc::CapabilityType::kVolumeSteps); { let repeated = msg.mut_intValue(); if config.has_volume_ctrl { repeated.push(VOLUME_STEPS) } else { repeated.push(0) } }; msg }; { let msg = repeated.push_default(); msg.set_typ(protocol::spirc::CapabilityType::kSupportsPlaylistV2); { let repeated = msg.mut_intValue(); repeated.push(1) }; msg }; { let msg = repeated.push_default(); msg.set_typ(protocol::spirc::CapabilityType::kSupportedContexts); { let repeated = msg.mut_stringValue(); repeated.push(::std::convert::Into::into("album")); repeated.push(::std::convert::Into::into("playlist")); repeated.push(::std::convert::Into::into("search")); repeated.push(::std::convert::Into::into("inbox")); repeated.push(::std::convert::Into::into("toplist")); repeated.push(::std::convert::Into::into("starred")); repeated.push(::std::convert::Into::into("publishedstarred")); repeated.push(::std::convert::Into::into("track")) }; msg }; { let msg = repeated.push_default(); msg.set_typ(protocol::spirc::CapabilityType::kSupportedTypes); { let repeated = msg.mut_stringValue(); repeated.push(::std::convert::Into::into("audio/track")); repeated.push(::std::convert::Into::into("audio/episode")); repeated.push(::std::convert::Into::into("track")) }; msg }; }; msg } } fn url_encode(bytes: impl AsRef<[u8]>) -> String { form_urlencoded::byte_serialize(bytes.as_ref()).collect() } impl Spirc { pub async fn new( config: ConnectConfig, session: Session, credentials: Credentials, player: Player, mixer: Box, ) -> Result<(Spirc, impl Future), Error> { let spirc_id = SPIRC_COUNTER.fetch_add(1, Ordering::AcqRel); debug!("new Spirc[{}]", spirc_id); let ident = session.device_id().to_owned(); let remote_update = Box::pin( session .mercury() .listen_for("hm://remote/user/") .map(UnboundedReceiverStream::new) .flatten_stream() .map(|response| -> Result<(String, Frame), Error> { let uri_split: Vec<&str> = response.uri.split('/').collect(); let username = match uri_split.get(4) { Some(s) => s.to_string(), None => String::new(), }; let data = response.payload.first().ok_or(SpircError::NoData)?; Ok((username, Frame::parse_from_bytes(data)?)) }), ); let connection_id_update = Box::pin( session .mercury() .listen_for("hm://pusher/v1/connections/") .map(UnboundedReceiverStream::new) .flatten_stream() .map(|response| -> Result { let connection_id = response .uri .strip_prefix("hm://pusher/v1/connections/") .ok_or_else(|| SpircError::InvalidUri(response.uri.clone()))?; Ok(connection_id.to_owned()) }), ); let user_attributes_update = Box::pin( session .mercury() .listen_for("spotify:user:attributes:update") .map(UnboundedReceiverStream::new) .flatten_stream() .map(|response| -> Result { let data = response.payload.first().ok_or(SpircError::NoData)?; Ok(UserAttributesUpdate::parse_from_bytes(data)?) }), ); let user_attributes_mutation = Box::pin( session .mercury() .listen_for("spotify:user:attributes:mutated") .map(UnboundedReceiverStream::new) .flatten_stream() .map(|response| -> Result { let data = response.payload.first().ok_or(SpircError::NoData)?; Ok(UserAttributesMutation::parse_from_bytes(data)?) }), ); // Connect *after* all message listeners are registered session.connect(credentials, true).await?; let canonical_username = &session.username(); debug!("canonical_username: {}", canonical_username); let sender_uri = format!("hm://remote/user/{}/", url_encode(canonical_username)); let sender = session.mercury().sender(sender_uri); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let initial_volume = config.initial_volume; let device = initial_device_state(config); let player_events = player.get_player_event_channel(); let mut task = SpircTask { player, mixer, sequence: SeqGenerator::new(1), ident, device, state: initial_state(), play_request_id: None, play_status: SpircPlayStatus::Stopped, remote_update, connection_id_update, user_attributes_update, user_attributes_mutation, sender, commands: Some(cmd_rx), player_events: Some(player_events), shutdown: false, session, resolve_context: None, autoplay_context: false, context: None, spirc_id, }; if let Some(volume) = initial_volume { task.set_volume(volume); } else { let current_volume = task.mixer.volume(); task.set_volume(current_volume); } let spirc = Spirc { commands: cmd_tx }; task.hello()?; Ok((spirc, task.run())) } pub fn play(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Play)?) } pub fn play_pause(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::PlayPause)?) } pub fn pause(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Pause)?) } pub fn prev(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Prev)?) } pub fn next(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Next)?) } pub fn volume_up(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::VolumeUp)?) } pub fn volume_down(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::VolumeDown)?) } pub fn shutdown(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Shutdown)?) } pub fn shuffle(&self, shuffle: bool) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Shuffle(shuffle))?) } pub fn repeat(&self, repeat: bool) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Repeat(repeat))?) } pub fn set_volume(&self, volume: u16) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::SetVolume(volume))?) } pub fn set_position_ms(&self, position_ms: u32) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::SetPosition(position_ms))?) } pub fn disconnect(&self) -> Result<(), Error> { Ok(self.commands.send(SpircCommand::Disconnect)?) } } impl SpircTask { async fn run(mut self) { while !self.session.is_invalid() && !self.shutdown { let commands = self.commands.as_mut(); let player_events = self.player_events.as_mut(); tokio::select! { remote_update = self.remote_update.next() => match remote_update { Some(result) => match result { Ok((username, frame)) => { if username != self.session.username() { warn!("could not dispatch remote update: frame was intended for {}", username); } else if let Err(e) = self.handle_remote_update(frame) { error!("could not dispatch remote update: {}", e); } }, Err(e) => error!("could not parse remote update: {}", e), } None => { error!("remote update selected, but none received"); break; } }, user_attributes_update = self.user_attributes_update.next() => match user_attributes_update { Some(result) => match result { Ok(attributes) => self.handle_user_attributes_update(attributes), Err(e) => error!("could not parse user attributes update: {}", e), } None => { error!("user attributes update selected, but none received"); break; } }, user_attributes_mutation = self.user_attributes_mutation.next() => match user_attributes_mutation { Some(result) => match result { Ok(attributes) => self.handle_user_attributes_mutation(attributes), Err(e) => error!("could not parse user attributes mutation: {}", e), } None => { error!("user attributes mutation selected, but none received"); break; } }, connection_id_update = self.connection_id_update.next() => match connection_id_update { Some(result) => match result { Ok(connection_id) => self.handle_connection_id_update(connection_id), Err(e) => error!("could not parse connection ID update: {}", e), } None => { error!("connection ID update selected, but none received"); break; } }, cmd = async { commands?.recv().await }, if commands.is_some() => if let Some(cmd) = cmd { if let Err(e) = self.handle_command(cmd) { debug!("could not dispatch command: {}", e); } }, event = async { player_events?.recv().await }, if player_events.is_some() => if let Some(event) = event { if let Err(e) = self.handle_player_event(event) { error!("could not dispatch player event: {}", e); } }, result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() { error!("Cannot flush spirc event sender."); break; }, context_uri = async { self.resolve_context.take() }, if self.resolve_context.is_some() => { let context_uri = context_uri.unwrap(); let is_next_page = context_uri.starts_with("hm://"); let context = if is_next_page { self.session.spclient().get_next_page(&context_uri).await } else { let previous_tracks = self.state.get_track().iter().filter_map(|t| SpotifyId::try_from(t).ok()).collect(); self.session.spclient().get_apollo_station(&context_uri, CONTEXT_TRACKS_COUNT, previous_tracks, self.autoplay_context).await }; match context { Ok(value) => { let r_context = if is_next_page { match serde_json::from_slice::(&value) { Ok(page_context) => { // page contexts don't have the stations full metadata, so decorate it let mut station_context = self.context.clone().unwrap_or_default(); station_context.tracks = page_context.tracks; station_context.next_page_url = page_context.next_page_url; station_context.correlation_id = page_context.correlation_id; Ok(station_context) }, Err(e) => Err(e), } } else { serde_json::from_slice::(&value) }; self.context = match r_context { Ok(context) => { info!( "Resolved {:?} tracks from <{:?}>", context.tracks.len(), self.state.get_context_uri(), ); Some(context) } Err(e) => { error!("Unable to parse JSONContext {:?}", e); None } }; }, Err(err) => { error!("ContextError: {:?}", err) } } }, else => break } } if self.sender.flush().await.is_err() { warn!("Cannot flush spirc event sender when done."); } } fn now_ms(&mut self) -> i64 { let dur = match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(dur) => dur, Err(err) => err.duration(), }; dur.as_millis() as i64 + 1000 * self.session.time_delta() } fn update_state_position(&mut self, position_ms: u32) { let now = self.now_ms(); self.state.set_position_measured_at(now as u64); self.state.set_position_ms(position_ms); } fn handle_command(&mut self, cmd: SpircCommand) -> Result<(), Error> { if matches!(cmd, SpircCommand::Shutdown) { trace!("Received SpircCommand::Shutdown"); CommandSender::new(self, MessageType::kMessageTypeGoodbye).send()?; self.handle_disconnect(); self.shutdown = true; if let Some(rx) = self.commands.as_mut() { rx.close() } Ok(()) } else if self.device.get_is_active() { trace!("Received SpircCommand::{:?}", cmd); match cmd { SpircCommand::Play => { self.handle_play(); self.notify(None) } SpircCommand::PlayPause => { self.handle_play_pause(); self.notify(None) } SpircCommand::Pause => { self.handle_pause(); self.notify(None) } SpircCommand::Prev => { self.handle_prev(); self.notify(None) } SpircCommand::Next => { self.handle_next(); self.notify(None) } SpircCommand::VolumeUp => { self.handle_volume_up(); self.notify(None) } SpircCommand::VolumeDown => { self.handle_volume_down(); self.notify(None) } SpircCommand::Disconnect => { self.handle_disconnect(); self.notify(None) } SpircCommand::Shuffle(shuffle) => { self.state.set_shuffle(shuffle); self.notify(None) } SpircCommand::Repeat(repeat) => { self.state.set_repeat(repeat); self.notify(None) } SpircCommand::SetPosition(position) => { self.handle_seek(position); self.notify(None) } SpircCommand::SetVolume(volume) => { self.set_volume(volume); self.notify(None) } _ => Ok(()), } } else { warn!("SpircCommand::{:?} will be ignored while Not Active", cmd); Ok(()) } } fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { // we only process events if the play_request_id matches. If it doesn't, it is // an event that belongs to a previous track and only arrives now due to a race // condition. In this case we have updated the state already and don't want to // mess with it. if let Some(play_request_id) = event.get_play_request_id() { if Some(play_request_id) == self.play_request_id { match event { PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track(), PlayerEvent::Loading { .. } => { match self.play_status { SpircPlayStatus::LoadingPlay { position_ms } => { self.update_state_position(position_ms); self.state.set_status(PlayStatus::kPlayStatusPlay); trace!("==> kPlayStatusPlay"); } SpircPlayStatus::LoadingPause { position_ms } => { self.update_state_position(position_ms); self.state.set_status(PlayStatus::kPlayStatusPause); trace!("==> kPlayStatusPause"); } _ => { self.state.set_status(PlayStatus::kPlayStatusLoading); self.update_state_position(0); trace!("==> kPlayStatusLoading"); } } self.notify(None) } PlayerEvent::Playing { position_ms, .. } | PlayerEvent::PositionCorrection { position_ms, .. } | PlayerEvent::Seeked { position_ms, .. } => { trace!("==> kPlayStatusPlay"); let new_nominal_start_time = self.now_ms() - position_ms as i64; match self.play_status { SpircPlayStatus::Playing { ref mut nominal_start_time, .. } => { if (*nominal_start_time - new_nominal_start_time).abs() > 100 { *nominal_start_time = new_nominal_start_time; self.update_state_position(position_ms); self.notify(None) } else { Ok(()) } } SpircPlayStatus::LoadingPlay { .. } | SpircPlayStatus::LoadingPause { .. } => { self.state.set_status(PlayStatus::kPlayStatusPlay); self.update_state_position(position_ms); self.play_status = SpircPlayStatus::Playing { nominal_start_time: new_nominal_start_time, preloading_of_next_track_triggered: false, }; self.notify(None) } _ => Ok(()), } } PlayerEvent::Paused { position_ms: new_position_ms, .. } => { trace!("==> kPlayStatusPause"); match self.play_status { SpircPlayStatus::Paused { .. } | SpircPlayStatus::Playing { .. } => { self.state.set_status(PlayStatus::kPlayStatusPause); self.update_state_position(new_position_ms); self.play_status = SpircPlayStatus::Paused { position_ms: new_position_ms, preloading_of_next_track_triggered: false, }; self.notify(None) } SpircPlayStatus::LoadingPlay { .. } | SpircPlayStatus::LoadingPause { .. } => { self.state.set_status(PlayStatus::kPlayStatusPause); self.update_state_position(new_position_ms); self.play_status = SpircPlayStatus::Paused { position_ms: new_position_ms, preloading_of_next_track_triggered: false, }; self.notify(None) } _ => Ok(()), } } PlayerEvent::Stopped { .. } => { trace!("==> kPlayStatusStop"); match self.play_status { SpircPlayStatus::Stopped => Ok(()), _ => { self.state.set_status(PlayStatus::kPlayStatusStop); self.play_status = SpircPlayStatus::Stopped; self.notify(None) } } } PlayerEvent::TimeToPreloadNextTrack { .. } => { self.handle_preload_next_track(); Ok(()) } PlayerEvent::Unavailable { track_id, .. } => { self.handle_unavailable(track_id); Ok(()) } _ => Ok(()), } } else { Ok(()) } } else { Ok(()) } } fn handle_connection_id_update(&mut self, connection_id: String) { trace!("Received connection ID update: {:?}", connection_id); self.session.set_connection_id(&connection_id); } fn handle_user_attributes_update(&mut self, update: UserAttributesUpdate) { trace!("Received attributes update: {:#?}", update); let attributes: UserAttributes = update .get_pairs() .iter() .map(|pair| (pair.get_key().to_owned(), pair.get_value().to_owned())) .collect(); self.session.set_user_attributes(attributes) } fn handle_user_attributes_mutation(&mut self, mutation: UserAttributesMutation) { for attribute in mutation.get_fields().iter() { let key = attribute.get_name(); if key == "autoplay" && self.session.config().autoplay.is_some() { trace!("Autoplay override active. Ignoring mutation."); continue; } if let Some(old_value) = self.session.user_data().attributes.get(key) { let new_value = match old_value.as_ref() { "0" => "1", "1" => "0", _ => old_value, }; self.session.set_user_attribute(key, new_value); trace!( "Received attribute mutation, {} was {} is now {}", key, old_value, new_value ); if key == "filter-explicit-content" && new_value == "1" { self.player .emit_filter_explicit_content_changed_event(matches!(new_value, "1")); } if key == "autoplay" && old_value != new_value { self.player .emit_auto_play_changed_event(matches!(new_value, "1")); } } else { trace!( "Received attribute mutation for {} but key was not found!", key ); } } } fn handle_remote_update(&mut self, update: Frame) -> Result<(), Error> { trace!("Received update frame: {:#?}", update); // First see if this update was intended for us. let device_id = &self.ident; let ident = update.get_ident(); if ident == device_id || (!update.get_recipient().is_empty() && !update.get_recipient().contains(device_id)) { return Err(SpircError::Ident(ident.to_string()).into()); } let old_client_id = self.session.client_id(); for entry in update.get_device_state().get_metadata().iter() { match entry.get_field_type() { "client-id" => self.session.set_client_id(entry.get_metadata()), "brand_display_name" => self.session.set_client_brand_name(entry.get_metadata()), "model_display_name" => self.session.set_client_model_name(entry.get_metadata()), _ => (), } } self.session .set_client_name(update.get_device_state().get_name()); let new_client_id = self.session.client_id(); if self.device.get_is_active() && new_client_id != old_client_id { self.player.emit_session_client_changed_event( new_client_id, self.session.client_name(), self.session.client_brand_name(), self.session.client_model_name(), ); } match update.get_typ() { MessageType::kMessageTypeHello => self.notify(Some(ident)), MessageType::kMessageTypeLoad => { if !self.device.get_is_active() { let now = self.now_ms(); self.device.set_is_active(true); self.device.set_became_active_at(now); self.player.emit_session_connected_event( self.session.connection_id(), self.session.username(), ); self.player.emit_session_client_changed_event( self.session.client_id(), self.session.client_name(), self.session.client_brand_name(), self.session.client_model_name(), ); self.player .emit_volume_changed_event(self.device.get_volume() as u16); self.player .emit_auto_play_changed_event(self.session.autoplay()); self.player.emit_filter_explicit_content_changed_event( self.session.filter_explicit_content(), ); self.player .emit_shuffle_changed_event(self.state.get_shuffle()); self.player .emit_repeat_changed_event(self.state.get_repeat()); } let context_uri = update.get_state().get_context_uri().to_owned(); // completely ignore local playback. if context_uri.starts_with("spotify:local-files") { self.notify(None)?; return Err(SpircError::UnsupportedLocalPlayBack.into()); } self.update_tracks(&update); if !self.state.get_track().is_empty() { let start_playing = update.get_state().get_status() == PlayStatus::kPlayStatusPlay; self.load_track(start_playing, update.get_state().get_position_ms()); } else { info!("No more tracks left in queue"); self.handle_stop(); } self.notify(None) } MessageType::kMessageTypePlay => { self.handle_play(); self.notify(None) } MessageType::kMessageTypePlayPause => { self.handle_play_pause(); self.notify(None) } MessageType::kMessageTypePause => { self.handle_pause(); self.notify(None) } MessageType::kMessageTypeNext => { self.handle_next(); self.notify(None) } MessageType::kMessageTypePrev => { self.handle_prev(); self.notify(None) } MessageType::kMessageTypeVolumeUp => { self.handle_volume_up(); self.notify(None) } MessageType::kMessageTypeVolumeDown => { self.handle_volume_down(); self.notify(None) } MessageType::kMessageTypeRepeat => { let repeat = update.get_state().get_repeat(); self.state.set_repeat(repeat); self.player.emit_repeat_changed_event(repeat); self.notify(None) } MessageType::kMessageTypeShuffle => { let shuffle = update.get_state().get_shuffle(); self.state.set_shuffle(shuffle); if shuffle { let current_index = self.state.get_playing_track_index(); let tracks = self.state.mut_track(); if !tracks.is_empty() { tracks.swap(0, current_index as usize); if let Some((_, rest)) = tracks.split_first_mut() { let mut rng = rand::thread_rng(); rest.shuffle(&mut rng); } self.state.set_playing_track_index(0); } } self.player.emit_shuffle_changed_event(shuffle); self.notify(None) } MessageType::kMessageTypeSeek => { self.handle_seek(update.get_position()); self.notify(None) } MessageType::kMessageTypeReplace => { let context_uri = update.get_state().get_context_uri().to_owned(); // completely ignore local playback. if context_uri.starts_with("spotify:local-files") { self.notify(None)?; return Err(SpircError::UnsupportedLocalPlayBack.into()); } self.update_tracks(&update); if let SpircPlayStatus::Playing { preloading_of_next_track_triggered, .. } | SpircPlayStatus::Paused { preloading_of_next_track_triggered, .. } = self.play_status { if preloading_of_next_track_triggered { // Get the next track_id in the playlist if let Some(track_id) = self.preview_next_track() { self.player.preload(track_id); } } } self.notify(None) } MessageType::kMessageTypeVolume => { self.set_volume(update.get_volume() as u16); self.notify(None) } MessageType::kMessageTypeNotify => { if self.device.get_is_active() && update.get_device_state().get_is_active() && self.device.get_became_active_at() <= update.get_device_state().get_became_active_at() { self.handle_disconnect(); } self.notify(None) } _ => Ok(()), } } fn handle_disconnect(&mut self) { self.device.set_is_active(false); self.handle_stop(); self.player .emit_session_disconnected_event(self.session.connection_id(), self.session.username()); } fn handle_stop(&mut self) { self.player.stop(); } fn handle_play(&mut self) { match self.play_status { SpircPlayStatus::Paused { position_ms, preloading_of_next_track_triggered, } => { self.player.play(); self.state.set_status(PlayStatus::kPlayStatusPlay); self.update_state_position(position_ms); self.play_status = SpircPlayStatus::Playing { nominal_start_time: self.now_ms() as i64 - position_ms as i64, preloading_of_next_track_triggered, }; } SpircPlayStatus::LoadingPause { position_ms } => { self.player.play(); self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; } _ => return, } // Synchronize the volume from the mixer. This is useful on // systems that can switch sources from and back to librespot. let current_volume = self.mixer.volume(); self.set_volume(current_volume); } fn handle_play_pause(&mut self) { match self.play_status { SpircPlayStatus::Paused { .. } | SpircPlayStatus::LoadingPause { .. } => { self.handle_play() } SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => { self.handle_pause() } _ => (), } } fn handle_pause(&mut self) { match self.play_status { SpircPlayStatus::Playing { nominal_start_time, preloading_of_next_track_triggered, } => { self.player.pause(); self.state.set_status(PlayStatus::kPlayStatusPause); let position_ms = (self.now_ms() - nominal_start_time) as u32; self.update_state_position(position_ms); self.play_status = SpircPlayStatus::Paused { position_ms, preloading_of_next_track_triggered, }; } SpircPlayStatus::LoadingPlay { position_ms } => { self.player.pause(); self.play_status = SpircPlayStatus::LoadingPause { position_ms }; } _ => (), } } fn handle_seek(&mut self, position_ms: u32) { self.update_state_position(position_ms); self.player.seek(position_ms); let now = self.now_ms(); match self.play_status { SpircPlayStatus::Stopped => (), SpircPlayStatus::LoadingPause { position_ms: ref mut position, } | SpircPlayStatus::LoadingPlay { position_ms: ref mut position, } | SpircPlayStatus::Paused { position_ms: ref mut position, .. } => *position = position_ms, SpircPlayStatus::Playing { ref mut nominal_start_time, .. } => *nominal_start_time = now - position_ms as i64, }; } fn consume_queued_track(&mut self) -> usize { // Removes current track if it is queued // Returns the index of the next track let current_index = self.state.get_playing_track_index() as usize; if (current_index < self.state.get_track().len()) && self.state.get_track()[current_index].get_queued() { self.state.mut_track().remove(current_index); current_index } else { current_index + 1 } } fn preview_next_track(&mut self) -> Option { self.get_track_id_to_play_from_playlist(self.state.get_playing_track_index() + 1) .map(|(track_id, _)| track_id) } fn handle_preload_next_track(&mut self) { // Requests the player thread to preload the next track match self.play_status { SpircPlayStatus::Paused { ref mut preloading_of_next_track_triggered, .. } | SpircPlayStatus::Playing { ref mut preloading_of_next_track_triggered, .. } => { *preloading_of_next_track_triggered = true; } _ => (), } if let Some(track_id) = self.preview_next_track() { self.player.preload(track_id); } else { self.handle_stop(); } } // Mark unavailable tracks so we can skip them later fn handle_unavailable(&mut self, track_id: SpotifyId) { let unavailables = self.get_track_index_for_spotify_id(&track_id, 0); for &index in unavailables.iter() { let mut unplayable_track_ref = TrackRef::new(); unplayable_track_ref.set_gid(self.state.get_track()[index].get_gid().to_vec()); // Misuse context field to flag the track unplayable_track_ref.set_context(String::from("NonPlayable")); std::mem::swap( &mut self.state.mut_track()[index], &mut unplayable_track_ref, ); debug!( "Marked <{:?}> at {:?} as NonPlayable", self.state.get_track()[index], index, ); } self.handle_preload_next_track(); } fn handle_next(&mut self) { let context_uri = self.state.get_context_uri().to_owned(); let mut tracks_len = self.state.get_track().len() as u32; let mut new_index = self.consume_queued_track() as u32; let mut continue_playing = self.state.get_status() == PlayStatus::kPlayStatusPlay; let update_tracks = self.autoplay_context && tracks_len - new_index < CONTEXT_FETCH_THRESHOLD; debug!( "At track {:?} of {:?} <{:?}> update [{}]", new_index + 1, tracks_len, context_uri, update_tracks, ); // When in autoplay, keep topping up the playlist when it nears the end if update_tracks { self.update_tracks_from_context(); new_index = self.state.get_playing_track_index(); tracks_len = self.state.get_track().len() as u32; } // When not in autoplay, either start autoplay or loop back to the start if new_index >= tracks_len { if self.session.autoplay() { // Extend the playlist debug!("Starting autoplay for <{}>", context_uri); self.autoplay_context = true; self.update_tracks_from_context(); self.player.set_auto_normalise_as_album(false); } else { new_index = 0; continue_playing &= self.state.get_repeat(); debug!("Looping back to start, repeat is {}", continue_playing); } } if tracks_len > 0 { self.state.set_playing_track_index(new_index); self.load_track(continue_playing, 0); } else { info!("Not playing next track because there are no more tracks left in queue."); self.state.set_playing_track_index(0); self.handle_stop(); } } fn handle_prev(&mut self) { // Previous behaves differently based on the position // Under 3s it goes to the previous song (starts playing) // Over 3s it seeks to zero (retains previous play status) if self.position() < 3000 { // Queued tracks always follow the currently playing track. // They should not be considered when calculating the previous // track so extract them beforehand and reinsert them after it. let mut queue_tracks = Vec::new(); { let queue_index = self.consume_queued_track(); let tracks = self.state.mut_track(); while queue_index < tracks.len() && tracks[queue_index].get_queued() { queue_tracks.push(tracks.remove(queue_index)); } } let current_index = self.state.get_playing_track_index(); let new_index = if current_index > 0 { current_index - 1 } else if self.state.get_repeat() { self.state.get_track().len() as u32 - 1 } else { 0 }; // Reinsert queued tracks after the new playing track. let mut pos = (new_index + 1) as usize; for track in queue_tracks { self.state.mut_track().insert(pos, track); pos += 1; } self.state.set_playing_track_index(new_index); let start_playing = self.state.get_status() == PlayStatus::kPlayStatusPlay; self.load_track(start_playing, 0); } else { self.handle_seek(0); } } fn handle_volume_up(&mut self) { let volume = (self.device.get_volume() as u16).saturating_add(VOLUME_STEP_SIZE); self.set_volume(volume); } fn handle_volume_down(&mut self) { let volume = (self.device.get_volume() as u16).saturating_sub(VOLUME_STEP_SIZE); self.set_volume(volume); } fn handle_end_of_track(&mut self) -> Result<(), Error> { self.handle_next(); self.notify(None) } fn position(&mut self) -> u32 { match self.play_status { SpircPlayStatus::Stopped => 0, SpircPlayStatus::LoadingPlay { position_ms } | SpircPlayStatus::LoadingPause { position_ms } | SpircPlayStatus::Paused { position_ms, .. } => position_ms, SpircPlayStatus::Playing { nominal_start_time, .. } => (self.now_ms() - nominal_start_time) as u32, } } fn update_tracks_from_context(&mut self) { if let Some(ref context) = self.context { self.resolve_context = if !self.autoplay_context || context.next_page_url.contains("autoplay=true") { Some(context.next_page_url.to_owned()) } else { // this arm means: we need to resolve for autoplay, // and were previously resolving for the original context Some(context.uri.to_owned()) }; let new_tracks = &context.tracks; debug!("Adding {:?} tracks from context to frame", new_tracks.len()); let mut track_vec = self.state.take_track().into_vec(); if let Some(head) = track_vec.len().checked_sub(CONTEXT_TRACKS_HISTORY) { track_vec.drain(0..head); } track_vec.extend_from_slice(new_tracks); self.state .set_track(protobuf::RepeatedField::from_vec(track_vec)); // Update playing index if let Some(new_index) = self .state .get_playing_track_index() .checked_sub(CONTEXT_TRACKS_HISTORY as u32) { self.state.set_playing_track_index(new_index); } } else { warn!("No context to update from!"); } } fn update_tracks(&mut self, frame: &protocol::spirc::Frame) { trace!("State: {:#?}", frame.get_state()); let index = frame.get_state().get_playing_track_index(); let context_uri = frame.get_state().get_context_uri().to_owned(); let tracks = frame.get_state().get_track(); trace!("Frame has {:?} tracks", tracks.len()); // First the tracks from the requested context, without autoplay. // We will transition into autoplay after the latest track of this context. self.autoplay_context = false; self.resolve_context = Some(context_uri.clone()); self.player .set_auto_normalise_as_album(context_uri.starts_with("spotify:album:")); self.state.set_playing_track_index(index); self.state.set_track(tracks.iter().cloned().collect()); self.state.set_context_uri(context_uri); // has_shuffle/repeat seem to always be true in these replace msgs, // but to replicate the behaviour of the Android client we have to // ignore false values. let state = frame.get_state(); if state.get_repeat() { self.state.set_repeat(true); } if state.get_shuffle() { self.state.set_shuffle(true); } } // Helper to find corresponding index(s) for track_id fn get_track_index_for_spotify_id( &self, track_id: &SpotifyId, start_index: usize, ) -> Vec { let index: Vec = self.state.get_track()[start_index..] .iter() .enumerate() .filter(|&(_, track_ref)| track_ref.get_gid() == track_id.to_raw()) .map(|(idx, _)| start_index + idx) .collect(); index } // Broken out here so we can refactor this later when we move to SpotifyObjectID or similar fn track_ref_is_unavailable(&self, track_ref: &TrackRef) -> bool { track_ref.get_context() == "NonPlayable" } fn get_track_id_to_play_from_playlist(&self, index: u32) -> Option<(SpotifyId, u32)> { let tracks_len = self.state.get_track().len(); // Guard against tracks_len being zero to prevent // 'index out of bounds: the len is 0 but the index is 0' // https://github.com/librespot-org/librespot/issues/226#issuecomment-971642037 if tracks_len == 0 { warn!("No playable track found in state: {:?}", self.state); return None; } let mut new_playlist_index = index as usize; if new_playlist_index >= tracks_len { new_playlist_index = 0; } let start_index = new_playlist_index; // Cycle through all tracks, break if we don't find any playable tracks // tracks in each frame either have a gid or uri (that may or may not be a valid track) // E.g - context based frames sometimes contain tracks with let mut track_ref = self.state.get_track()[new_playlist_index].clone(); let mut track_id = SpotifyId::try_from(&track_ref); while self.track_ref_is_unavailable(&track_ref) || track_id.is_err() { warn!( "Skipping track <{:?}> at position [{}] of {}", track_ref, new_playlist_index, tracks_len ); new_playlist_index += 1; if new_playlist_index >= tracks_len { new_playlist_index = 0; } if new_playlist_index == start_index { warn!("No playable track found in state: {:?}", self.state); return None; } track_ref = self.state.get_track()[new_playlist_index].clone(); track_id = SpotifyId::try_from(&track_ref); } match track_id { Ok(track_id) => Some((track_id, new_playlist_index as u32)), Err(_) => None, } } fn load_track(&mut self, start_playing: bool, position_ms: u32) { let index = self.state.get_playing_track_index(); match self.get_track_id_to_play_from_playlist(index) { Some((track, index)) => { self.state.set_playing_track_index(index); self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); self.update_state_position(position_ms); if start_playing { self.state.set_status(PlayStatus::kPlayStatusPlay); self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; } else { self.state.set_status(PlayStatus::kPlayStatusPause); self.play_status = SpircPlayStatus::LoadingPause { position_ms }; } } None => { self.handle_stop(); } } } fn hello(&mut self) -> Result<(), Error> { CommandSender::new(self, MessageType::kMessageTypeHello).send() } fn notify(&mut self, recipient: Option<&str>) -> Result<(), Error> { let status = self.state.get_status(); // When in loading state, the Spotify UI is disabled for interaction. // On desktop this isn't so bad but on mobile it means that the bottom // control disappears entirely. This is very confusing, so don't notify // in this case. if status == PlayStatus::kPlayStatusLoading { return Ok(()); } trace!("Sending status to server: [{:?}]", status); let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify); if let Some(s) = recipient { cs = cs.recipient(s); } cs.send() } fn set_volume(&mut self, volume: u16) { let old_volume = self.device.get_volume(); let new_volume = volume as u32; if old_volume != new_volume { self.device.set_volume(new_volume); self.mixer.set_volume(volume); if let Some(cache) = self.session.cache() { cache.save_volume(volume) } if self.device.get_is_active() { self.player.emit_volume_changed_event(volume); } } } } impl Drop for SpircTask { fn drop(&mut self) { debug!("drop Spirc[{}]", self.spirc_id); } } struct CommandSender<'a> { spirc: &'a mut SpircTask, frame: protocol::spirc::Frame, } impl<'a> CommandSender<'a> { fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender<'_> { let mut frame = protocol::spirc::Frame::new(); frame.set_version(1); frame.set_protocol_version(::std::convert::Into::into("2.0.0")); frame.set_ident(spirc.ident.clone()); frame.set_seq_nr(spirc.sequence.get()); frame.set_typ(cmd); frame.set_device_state(spirc.device.clone()); frame.set_state_update_id(spirc.now_ms()); CommandSender { spirc, frame } } fn recipient(mut self, recipient: &'a str) -> CommandSender<'_> { self.frame.mut_recipient().push(recipient.to_owned()); self } #[allow(dead_code)] fn state(mut self, state: protocol::spirc::State) -> CommandSender<'a> { self.frame.set_state(state); self } fn send(mut self) -> Result<(), Error> { if !self.frame.has_state() && self.spirc.device.get_is_active() { self.frame.set_state(self.spirc.state.clone()); } self.spirc.sender.send(self.frame.write_to_bytes()?) } }