From 9a0d2390b7ed30b482e434883a8f3bf879bdb2b2 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 11 Dec 2021 00:03:35 +0100 Subject: [PATCH] Get user attributes and updates --- Cargo.lock | 11 ++++ connect/src/spirc.rs | 82 +++++++++++++++++++++++++++- core/Cargo.toml | 1 + core/src/mercury/mod.rs | 21 +++++++ core/src/session.rs | 77 +++++++++++++++++++++++++- protocol/build.rs | 1 + protocol/proto/user_attributes.proto | 29 ++++++++++ 7 files changed, 220 insertions(+), 2 deletions(-) create mode 100644 protocol/proto/user_attributes.proto diff --git a/Cargo.lock b/Cargo.lock index d4501fef..5aa66853 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1270,6 +1270,7 @@ dependencies = [ "pbkdf2", "priority-queue", "protobuf", + "quick-xml", "rand", "serde", "serde_json", @@ -1950,6 +1951,16 @@ dependencies = [ "protobuf-codegen", ] +[[package]] +name = "quick-xml" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8533f14c8382aaad0d592c812ac3b826162128b65662331e1127b45c3d18536b" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.10" diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index e64e35a5..bc596bfc 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -6,14 +6,17 @@ use std::time::{SystemTime, UNIX_EPOCH}; use crate::context::StationContext; use crate::core::config::ConnectConfig; use crate::core::mercury::{MercuryError, MercurySender}; -use crate::core::session::Session; +use crate::core::session::{Session, UserAttributes}; use crate::core::spotify_id::SpotifyId; use crate::core::util::SeqGenerator; use crate::core::version; use crate::playback::mixer::Mixer; use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel}; + use crate::protocol; +use crate::protocol::explicit_content_pubsub::UserAttributesUpdate; use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef}; +use crate::protocol::user_attributes::UserAttributesMutation; use futures_util::future::{self, FusedFuture}; use futures_util::stream::FusedStream; @@ -58,6 +61,8 @@ struct SpircTask { play_status: SpircPlayStatus, subscription: BoxedStream, + user_attributes_update: BoxedStream, + user_attributes_mutation: BoxedStream, sender: MercurySender, commands: Option>, player_events: Option, @@ -248,6 +253,30 @@ impl Spirc { }), ); + let user_attributes_update = Box::pin( + session + .mercury() + .listen_for("spotify:user:attributes:update") + .map(UnboundedReceiverStream::new) + .flatten_stream() + .map(|response| -> UserAttributesUpdate { + let data = response.payload.first().unwrap(); + UserAttributesUpdate::parse_from_bytes(data).unwrap() + }), + ); + + let user_attributes_mutation = Box::pin( + session + .mercury() + .listen_for("spotify:user:attributes:mutated") + .map(UnboundedReceiverStream::new) + .flatten_stream() + .map(|response| -> UserAttributesMutation { + let data = response.payload.first().unwrap(); + UserAttributesMutation::parse_from_bytes(data).unwrap() + }), + ); + let sender = session.mercury().sender(uri); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); @@ -276,6 +305,8 @@ impl Spirc { play_status: SpircPlayStatus::Stopped, subscription, + user_attributes_update, + user_attributes_mutation, sender, commands: Some(cmd_rx), player_events: Some(player_events), @@ -344,6 +375,20 @@ impl SpircTask { break; } }, + user_attributes_update = self.user_attributes_update.next() => match user_attributes_update { + Some(attributes) => self.handle_user_attributes_update(attributes), + None => { + error!("user attributes update selected, but none received"); + break; + } + }, + user_attributes_mutation = self.user_attributes_mutation.next() => match user_attributes_mutation { + Some(attributes) => self.handle_user_attributes_mutation(attributes), + None => { + error!("user attributes mutation selected, but none received"); + break; + } + }, cmd = async { commands.unwrap().recv().await }, if commands.is_some() => if let Some(cmd) = cmd { self.handle_command(cmd); }, @@ -573,6 +618,41 @@ impl SpircTask { } } + fn handle_user_attributes_update(&mut self, update: UserAttributesUpdate) { + trace!("Received attributes update: {:?}", update); + let attributes: UserAttributes = update + .get_pairs() + .iter() + .map(|pair| (pair.get_key().to_owned(), pair.get_value().to_owned())) + .collect(); + let _ = self.session.set_user_attributes(attributes); + } + + fn handle_user_attributes_mutation(&mut self, mutation: UserAttributesMutation) { + for attribute in mutation.get_fields().iter() { + let key = attribute.get_name(); + if let Some(old_value) = self.session.user_attribute(key) { + let new_value = match old_value.as_ref() { + "0" => "1", + "1" => "0", + _ => &old_value, + }; + self.session.set_user_attribute(key, new_value); + trace!( + "Received attribute mutation, {} was {} is now {}", + key, + old_value, + new_value + ); + } else { + trace!( + "Received attribute mutation for {} but key was not found!", + key + ); + } + } + } + fn handle_frame(&mut self, frame: Frame) { let state_string = match frame.get_state().get_status() { PlayStatus::kPlayStatusLoading => "kPlayStatusLoading", diff --git a/core/Cargo.toml b/core/Cargo.toml index 4321c638..54fc1de7 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -36,6 +36,7 @@ once_cell = "1.5.2" pbkdf2 = { version = "0.8", default-features = false, features = ["hmac"] } priority-queue = "1.1" protobuf = "2.14.0" +quick-xml = { version = "0.22", features = [ "serialize" ] } rand = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 6cf3519e..841bd3d1 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -144,6 +144,27 @@ impl MercuryManager { } } + pub fn listen_for>( + &self, + uri: T, + ) -> impl Future> + 'static { + let uri = uri.into(); + + let manager = self.clone(); + async move { + let (tx, rx) = mpsc::unbounded_channel(); + + manager.lock(move |inner| { + if !inner.invalid { + debug!("listening to uri={}", uri); + inner.subscriptions.push((uri, tx)); + } + }); + + rx + } + } + pub(crate) fn dispatch(&self, cmd: PacketType, mut data: Bytes) { let seq_len = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; let seq = data.split_to(seq_len).as_ref().to_owned(); diff --git a/core/src/session.rs b/core/src/session.rs index f683960a..c1193dc3 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::future::Future; use std::io; use std::pin::Pin; @@ -13,6 +14,7 @@ use futures_core::TryStream; use futures_util::{future, ready, StreamExt, TryStreamExt}; use num_traits::FromPrimitive; use once_cell::sync::OnceCell; +use quick_xml::events::Event; use thiserror::Error; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -38,11 +40,14 @@ pub enum SessionError { IoError(#[from] io::Error), } +pub type UserAttributes = HashMap; + struct SessionData { country: String, time_delta: i64, canonical_username: String, invalid: bool, + user_attributes: UserAttributes, } struct SessionInternal { @@ -89,6 +94,7 @@ impl Session { canonical_username: String::new(), invalid: false, time_delta: 0, + user_attributes: HashMap::new(), }), http_client, tx_connection: sender_tx, @@ -224,11 +230,48 @@ impl Session { Some(MercuryReq) | Some(MercurySub) | Some(MercuryUnsub) | Some(MercuryEvent) => { self.mercury().dispatch(packet_type.unwrap(), data); } + Some(ProductInfo) => { + let data = std::str::from_utf8(&data).unwrap(); + let mut reader = quick_xml::Reader::from_str(data); + + let mut buf = Vec::new(); + let mut current_element = String::new(); + let mut user_attributes: UserAttributes = HashMap::new(); + + loop { + match reader.read_event(&mut buf) { + Ok(Event::Start(ref element)) => { + current_element = + std::str::from_utf8(element.name()).unwrap().to_owned() + } + Ok(Event::End(_)) => { + current_element = String::new(); + } + Ok(Event::Text(ref value)) => { + if !current_element.is_empty() { + let _ = user_attributes.insert( + current_element.clone(), + value.unescape_and_decode(&reader).unwrap(), + ); + } + } + Ok(Event::Eof) => break, + Ok(_) => (), + Err(e) => error!( + "Error parsing XML at position {}: {:?}", + reader.buffer_position(), + e + ), + } + } + + trace!("Received product info: {:?}", user_attributes); + self.0.data.write().unwrap().user_attributes = user_attributes; + } Some(PongAck) | Some(SecretBlock) | Some(LegacyWelcome) | Some(UnknownDataAllZeros) - | Some(ProductInfo) | Some(LicenseVersion) => {} _ => { if let Some(packet_type) = PacketType::from_u8(cmd) { @@ -264,6 +307,38 @@ impl Session { &self.config().device_id } + pub fn user_attribute(&self, key: &str) -> Option { + self.0 + .data + .read() + .unwrap() + .user_attributes + .get(key) + .map(|value| value.to_owned()) + } + + pub fn user_attributes(&self) -> UserAttributes { + self.0.data.read().unwrap().user_attributes.clone() + } + + pub fn set_user_attribute(&self, key: &str, value: &str) -> Option { + self.0 + .data + .write() + .unwrap() + .user_attributes + .insert(key.to_owned(), value.to_owned()) + } + + pub fn set_user_attributes(&self, attributes: UserAttributes) { + self.0 + .data + .write() + .unwrap() + .user_attributes + .extend(attributes) + } + fn weak(&self) -> SessionWeak { SessionWeak(Arc::downgrade(&self.0)) } diff --git a/protocol/build.rs b/protocol/build.rs index 2a763183..a4ca4c37 100644 --- a/protocol/build.rs +++ b/protocol/build.rs @@ -26,6 +26,7 @@ fn compile() { proto_dir.join("playlist_annotate3.proto"), proto_dir.join("playlist_permission.proto"), proto_dir.join("playlist4_external.proto"), + proto_dir.join("user_attributes.proto"), // TODO: remove these legacy protobufs when we are on the new API completely proto_dir.join("authentication.proto"), proto_dir.join("canvaz.proto"), diff --git a/protocol/proto/user_attributes.proto b/protocol/proto/user_attributes.proto new file mode 100644 index 00000000..96ecf010 --- /dev/null +++ b/protocol/proto/user_attributes.proto @@ -0,0 +1,29 @@ +// Custom protobuf crafted from spotify:user:attributes:mutated response: +// +// 1 { +// 1: "filter-explicit-content" +// } +// 2 { +// 1: 1639087299 +// 2: 418909000 +// } + +syntax = "proto3"; + +package spotify.user_attributes.proto; + +option optimize_for = CODE_SIZE; + +message UserAttributesMutation { + repeated MutatedField fields = 1; + MutationCommand cmd = 2; +} + +message MutatedField { + string name = 1; +} + +message MutationCommand { + int64 timestamp = 1; + int32 unknown = 2; +}