Get user attributes and updates

This commit is contained in:
Roderick van Domburg 2021-12-11 00:03:35 +01:00
parent 40163754bb
commit 9a0d2390b7
No known key found for this signature in database
GPG key ID: A9EF5222A26F0451
7 changed files with 220 additions and 2 deletions

11
Cargo.lock generated
View file

@ -1270,6 +1270,7 @@ dependencies = [
"pbkdf2",
"priority-queue",
"protobuf",
"quick-xml",
"rand",
"serde",
"serde_json",
@ -1950,6 +1951,16 @@ dependencies = [
"protobuf-codegen",
]
[[package]]
name = "quick-xml"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8533f14c8382aaad0d592c812ac3b826162128b65662331e1127b45c3d18536b"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quote"
version = "1.0.10"

View file

@ -6,14 +6,17 @@ use std::time::{SystemTime, UNIX_EPOCH};
use crate::context::StationContext;
use crate::core::config::ConnectConfig;
use crate::core::mercury::{MercuryError, MercurySender};
use crate::core::session::Session;
use crate::core::session::{Session, UserAttributes};
use crate::core::spotify_id::SpotifyId;
use crate::core::util::SeqGenerator;
use crate::core::version;
use crate::playback::mixer::Mixer;
use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel};
use crate::protocol;
use crate::protocol::explicit_content_pubsub::UserAttributesUpdate;
use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
use crate::protocol::user_attributes::UserAttributesMutation;
use futures_util::future::{self, FusedFuture};
use futures_util::stream::FusedStream;
@ -58,6 +61,8 @@ struct SpircTask {
play_status: SpircPlayStatus,
subscription: BoxedStream<Frame>,
user_attributes_update: BoxedStream<UserAttributesUpdate>,
user_attributes_mutation: BoxedStream<UserAttributesMutation>,
sender: MercurySender,
commands: Option<mpsc::UnboundedReceiver<SpircCommand>>,
player_events: Option<PlayerEventChannel>,
@ -248,6 +253,30 @@ impl Spirc {
}),
);
let user_attributes_update = Box::pin(
session
.mercury()
.listen_for("spotify:user:attributes:update")
.map(UnboundedReceiverStream::new)
.flatten_stream()
.map(|response| -> UserAttributesUpdate {
let data = response.payload.first().unwrap();
UserAttributesUpdate::parse_from_bytes(data).unwrap()
}),
);
let user_attributes_mutation = Box::pin(
session
.mercury()
.listen_for("spotify:user:attributes:mutated")
.map(UnboundedReceiverStream::new)
.flatten_stream()
.map(|response| -> UserAttributesMutation {
let data = response.payload.first().unwrap();
UserAttributesMutation::parse_from_bytes(data).unwrap()
}),
);
let sender = session.mercury().sender(uri);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
@ -276,6 +305,8 @@ impl Spirc {
play_status: SpircPlayStatus::Stopped,
subscription,
user_attributes_update,
user_attributes_mutation,
sender,
commands: Some(cmd_rx),
player_events: Some(player_events),
@ -344,6 +375,20 @@ impl SpircTask {
break;
}
},
user_attributes_update = self.user_attributes_update.next() => match user_attributes_update {
Some(attributes) => self.handle_user_attributes_update(attributes),
None => {
error!("user attributes update selected, but none received");
break;
}
},
user_attributes_mutation = self.user_attributes_mutation.next() => match user_attributes_mutation {
Some(attributes) => self.handle_user_attributes_mutation(attributes),
None => {
error!("user attributes mutation selected, but none received");
break;
}
},
cmd = async { commands.unwrap().recv().await }, if commands.is_some() => if let Some(cmd) = cmd {
self.handle_command(cmd);
},
@ -573,6 +618,41 @@ impl SpircTask {
}
}
fn handle_user_attributes_update(&mut self, update: UserAttributesUpdate) {
trace!("Received attributes update: {:?}", update);
let attributes: UserAttributes = update
.get_pairs()
.iter()
.map(|pair| (pair.get_key().to_owned(), pair.get_value().to_owned()))
.collect();
let _ = self.session.set_user_attributes(attributes);
}
fn handle_user_attributes_mutation(&mut self, mutation: UserAttributesMutation) {
for attribute in mutation.get_fields().iter() {
let key = attribute.get_name();
if let Some(old_value) = self.session.user_attribute(key) {
let new_value = match old_value.as_ref() {
"0" => "1",
"1" => "0",
_ => &old_value,
};
self.session.set_user_attribute(key, new_value);
trace!(
"Received attribute mutation, {} was {} is now {}",
key,
old_value,
new_value
);
} else {
trace!(
"Received attribute mutation for {} but key was not found!",
key
);
}
}
}
fn handle_frame(&mut self, frame: Frame) {
let state_string = match frame.get_state().get_status() {
PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",

View file

@ -36,6 +36,7 @@ once_cell = "1.5.2"
pbkdf2 = { version = "0.8", default-features = false, features = ["hmac"] }
priority-queue = "1.1"
protobuf = "2.14.0"
quick-xml = { version = "0.22", features = [ "serialize" ] }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"

View file

@ -144,6 +144,27 @@ impl MercuryManager {
}
}
pub fn listen_for<T: Into<String>>(
&self,
uri: T,
) -> impl Future<Output = mpsc::UnboundedReceiver<MercuryResponse>> + 'static {
let uri = uri.into();
let manager = self.clone();
async move {
let (tx, rx) = mpsc::unbounded_channel();
manager.lock(move |inner| {
if !inner.invalid {
debug!("listening to uri={}", uri);
inner.subscriptions.push((uri, tx));
}
});
rx
}
}
pub(crate) fn dispatch(&self, cmd: PacketType, mut data: Bytes) {
let seq_len = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
let seq = data.split_to(seq_len).as_ref().to_owned();

View file

@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::future::Future;
use std::io;
use std::pin::Pin;
@ -13,6 +14,7 @@ use futures_core::TryStream;
use futures_util::{future, ready, StreamExt, TryStreamExt};
use num_traits::FromPrimitive;
use once_cell::sync::OnceCell;
use quick_xml::events::Event;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -38,11 +40,14 @@ pub enum SessionError {
IoError(#[from] io::Error),
}
pub type UserAttributes = HashMap<String, String>;
struct SessionData {
country: String,
time_delta: i64,
canonical_username: String,
invalid: bool,
user_attributes: UserAttributes,
}
struct SessionInternal {
@ -89,6 +94,7 @@ impl Session {
canonical_username: String::new(),
invalid: false,
time_delta: 0,
user_attributes: HashMap::new(),
}),
http_client,
tx_connection: sender_tx,
@ -224,11 +230,48 @@ impl Session {
Some(MercuryReq) | Some(MercurySub) | Some(MercuryUnsub) | Some(MercuryEvent) => {
self.mercury().dispatch(packet_type.unwrap(), data);
}
Some(ProductInfo) => {
let data = std::str::from_utf8(&data).unwrap();
let mut reader = quick_xml::Reader::from_str(data);
let mut buf = Vec::new();
let mut current_element = String::new();
let mut user_attributes: UserAttributes = HashMap::new();
loop {
match reader.read_event(&mut buf) {
Ok(Event::Start(ref element)) => {
current_element =
std::str::from_utf8(element.name()).unwrap().to_owned()
}
Ok(Event::End(_)) => {
current_element = String::new();
}
Ok(Event::Text(ref value)) => {
if !current_element.is_empty() {
let _ = user_attributes.insert(
current_element.clone(),
value.unescape_and_decode(&reader).unwrap(),
);
}
}
Ok(Event::Eof) => break,
Ok(_) => (),
Err(e) => error!(
"Error parsing XML at position {}: {:?}",
reader.buffer_position(),
e
),
}
}
trace!("Received product info: {:?}", user_attributes);
self.0.data.write().unwrap().user_attributes = user_attributes;
}
Some(PongAck)
| Some(SecretBlock)
| Some(LegacyWelcome)
| Some(UnknownDataAllZeros)
| Some(ProductInfo)
| Some(LicenseVersion) => {}
_ => {
if let Some(packet_type) = PacketType::from_u8(cmd) {
@ -264,6 +307,38 @@ impl Session {
&self.config().device_id
}
pub fn user_attribute(&self, key: &str) -> Option<String> {
self.0
.data
.read()
.unwrap()
.user_attributes
.get(key)
.map(|value| value.to_owned())
}
pub fn user_attributes(&self) -> UserAttributes {
self.0.data.read().unwrap().user_attributes.clone()
}
pub fn set_user_attribute(&self, key: &str, value: &str) -> Option<String> {
self.0
.data
.write()
.unwrap()
.user_attributes
.insert(key.to_owned(), value.to_owned())
}
pub fn set_user_attributes(&self, attributes: UserAttributes) {
self.0
.data
.write()
.unwrap()
.user_attributes
.extend(attributes)
}
fn weak(&self) -> SessionWeak {
SessionWeak(Arc::downgrade(&self.0))
}

View file

@ -26,6 +26,7 @@ fn compile() {
proto_dir.join("playlist_annotate3.proto"),
proto_dir.join("playlist_permission.proto"),
proto_dir.join("playlist4_external.proto"),
proto_dir.join("user_attributes.proto"),
// TODO: remove these legacy protobufs when we are on the new API completely
proto_dir.join("authentication.proto"),
proto_dir.join("canvaz.proto"),

View file

@ -0,0 +1,29 @@
// Custom protobuf crafted from spotify:user:attributes:mutated response:
//
// 1 {
// 1: "filter-explicit-content"
// }
// 2 {
// 1: 1639087299
// 2: 418909000
// }
syntax = "proto3";
package spotify.user_attributes.proto;
option optimize_for = CODE_SIZE;
message UserAttributesMutation {
repeated MutatedField fields = 1;
MutationCommand cmd = 2;
}
message MutatedField {
string name = 1;
}
message MutationCommand {
int64 timestamp = 1;
int32 unknown = 2;
}