diff --git a/Cargo.lock b/Cargo.lock
index d4501fef..5aa66853 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1270,6 +1270,7 @@ dependencies = [
"pbkdf2",
"priority-queue",
"protobuf",
+ "quick-xml",
"rand",
"serde",
"serde_json",
@@ -1950,6 +1951,16 @@ dependencies = [
"protobuf-codegen",
]
+[[package]]
+name = "quick-xml"
+version = "0.22.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8533f14c8382aaad0d592c812ac3b826162128b65662331e1127b45c3d18536b"
+dependencies = [
+ "memchr",
+ "serde",
+]
+
[[package]]
name = "quote"
version = "1.0.10"
diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs
index e64e35a5..bc596bfc 100644
--- a/connect/src/spirc.rs
+++ b/connect/src/spirc.rs
@@ -6,14 +6,17 @@ use std::time::{SystemTime, UNIX_EPOCH};
use crate::context::StationContext;
use crate::core::config::ConnectConfig;
use crate::core::mercury::{MercuryError, MercurySender};
-use crate::core::session::Session;
+use crate::core::session::{Session, UserAttributes};
use crate::core::spotify_id::SpotifyId;
use crate::core::util::SeqGenerator;
use crate::core::version;
use crate::playback::mixer::Mixer;
use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel};
+
use crate::protocol;
+use crate::protocol::explicit_content_pubsub::UserAttributesUpdate;
use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
+use crate::protocol::user_attributes::UserAttributesMutation;
use futures_util::future::{self, FusedFuture};
use futures_util::stream::FusedStream;
@@ -58,6 +61,8 @@ struct SpircTask {
play_status: SpircPlayStatus,
subscription: BoxedStream,
+ user_attributes_update: BoxedStream,
+ user_attributes_mutation: BoxedStream,
sender: MercurySender,
commands: Option>,
player_events: Option,
@@ -248,6 +253,30 @@ impl Spirc {
}),
);
+ let user_attributes_update = Box::pin(
+ session
+ .mercury()
+ .listen_for("spotify:user:attributes:update")
+ .map(UnboundedReceiverStream::new)
+ .flatten_stream()
+ .map(|response| -> UserAttributesUpdate {
+ let data = response.payload.first().unwrap();
+ UserAttributesUpdate::parse_from_bytes(data).unwrap()
+ }),
+ );
+
+ let user_attributes_mutation = Box::pin(
+ session
+ .mercury()
+ .listen_for("spotify:user:attributes:mutated")
+ .map(UnboundedReceiverStream::new)
+ .flatten_stream()
+ .map(|response| -> UserAttributesMutation {
+ let data = response.payload.first().unwrap();
+ UserAttributesMutation::parse_from_bytes(data).unwrap()
+ }),
+ );
+
let sender = session.mercury().sender(uri);
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
@@ -276,6 +305,8 @@ impl Spirc {
play_status: SpircPlayStatus::Stopped,
subscription,
+ user_attributes_update,
+ user_attributes_mutation,
sender,
commands: Some(cmd_rx),
player_events: Some(player_events),
@@ -344,6 +375,20 @@ impl SpircTask {
break;
}
},
+ user_attributes_update = self.user_attributes_update.next() => match user_attributes_update {
+ Some(attributes) => self.handle_user_attributes_update(attributes),
+ None => {
+ error!("user attributes update selected, but none received");
+ break;
+ }
+ },
+ user_attributes_mutation = self.user_attributes_mutation.next() => match user_attributes_mutation {
+ Some(attributes) => self.handle_user_attributes_mutation(attributes),
+ None => {
+ error!("user attributes mutation selected, but none received");
+ break;
+ }
+ },
cmd = async { commands.unwrap().recv().await }, if commands.is_some() => if let Some(cmd) = cmd {
self.handle_command(cmd);
},
@@ -573,6 +618,41 @@ impl SpircTask {
}
}
+ fn handle_user_attributes_update(&mut self, update: UserAttributesUpdate) {
+ trace!("Received attributes update: {:?}", update);
+ let attributes: UserAttributes = update
+ .get_pairs()
+ .iter()
+ .map(|pair| (pair.get_key().to_owned(), pair.get_value().to_owned()))
+ .collect();
+ let _ = self.session.set_user_attributes(attributes);
+ }
+
+ fn handle_user_attributes_mutation(&mut self, mutation: UserAttributesMutation) {
+ for attribute in mutation.get_fields().iter() {
+ let key = attribute.get_name();
+ if let Some(old_value) = self.session.user_attribute(key) {
+ let new_value = match old_value.as_ref() {
+ "0" => "1",
+ "1" => "0",
+ _ => &old_value,
+ };
+ self.session.set_user_attribute(key, new_value);
+ trace!(
+ "Received attribute mutation, {} was {} is now {}",
+ key,
+ old_value,
+ new_value
+ );
+ } else {
+ trace!(
+ "Received attribute mutation for {} but key was not found!",
+ key
+ );
+ }
+ }
+ }
+
fn handle_frame(&mut self, frame: Frame) {
let state_string = match frame.get_state().get_status() {
PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",
diff --git a/core/Cargo.toml b/core/Cargo.toml
index 4321c638..54fc1de7 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -36,6 +36,7 @@ once_cell = "1.5.2"
pbkdf2 = { version = "0.8", default-features = false, features = ["hmac"] }
priority-queue = "1.1"
protobuf = "2.14.0"
+quick-xml = { version = "0.22", features = [ "serialize" ] }
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs
index 6cf3519e..841bd3d1 100644
--- a/core/src/mercury/mod.rs
+++ b/core/src/mercury/mod.rs
@@ -144,6 +144,27 @@ impl MercuryManager {
}
}
+ pub fn listen_for>(
+ &self,
+ uri: T,
+ ) -> impl Future