diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index e0b817ec..b3878a42 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -61,6 +61,7 @@ struct SpircTask { play_status: SpircPlayStatus, subscription: BoxedStream, + connection_id_update: BoxedStream, user_attributes_update: BoxedStream, user_attributes_mutation: BoxedStream, sender: MercurySender, @@ -254,6 +255,21 @@ impl Spirc { }), ); + let connection_id_update = Box::pin( + session + .mercury() + .listen_for("hm://pusher/v1/connections/") + .map(UnboundedReceiverStream::new) + .flatten_stream() + .map(|response| -> String { + response + .uri + .strip_prefix("hm://pusher/v1/connections/") + .unwrap_or("") + .to_owned() + }), + ); + let user_attributes_update = Box::pin( session .mercury() @@ -306,6 +322,7 @@ impl Spirc { play_status: SpircPlayStatus::Stopped, subscription, + connection_id_update, user_attributes_update, user_attributes_mutation, sender, @@ -390,6 +407,13 @@ impl SpircTask { break; } }, + connection_id_update = self.connection_id_update.next() => match connection_id_update { + Some(connection_id) => self.handle_connection_id_update(connection_id), + None => { + error!("connection ID update selected, but none received"); + break; + } + }, cmd = async { commands.unwrap().recv().await }, if commands.is_some() => if let Some(cmd) = cmd { self.handle_command(cmd); }, @@ -619,6 +643,11 @@ impl SpircTask { } } + fn handle_connection_id_update(&mut self, connection_id: String) { + trace!("Received connection ID update: {:?}", connection_id); + self.session.set_connection_id(connection_id); + } + fn handle_user_attributes_update(&mut self, update: UserAttributesUpdate) { trace!("Received attributes update: {:#?}", update); let attributes: UserAttributes = update diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 841bd3d1..ad2d5013 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -264,6 +264,7 @@ impl MercuryManager { if !found { debug!("unknown subscription uri={}", response.uri); + trace!("response pushed over Mercury: {:?}", response); } }) } else if let Some(cb) = pending.callback { diff --git a/core/src/session.rs b/core/src/session.rs index ed9609d7..426480f6 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -52,6 +52,7 @@ pub struct UserData { #[derive(Debug, Clone, Default)] struct SessionData { + connection_id: String, time_delta: i64, invalid: bool, user_data: UserData, @@ -319,6 +320,14 @@ impl Session { &self.config().device_id } + pub fn connection_id(&self) -> String { + self.0.data.read().unwrap().connection_id.clone() + } + + pub fn set_connection_id(&self, connection_id: String) { + self.0.data.write().unwrap().connection_id = connection_id; + } + pub fn username(&self) -> String { self.0 .data