diff --git a/src/mercury/mod.rs b/src/mercury/mod.rs index 36517405..2bf52a11 100644 --- a/src/mercury/mod.rs +++ b/src/mercury/mod.rs @@ -19,7 +19,7 @@ component! { MercuryManager : MercuryManagerInner { sequence: SeqGenerator = SeqGenerator::new(0), pending: HashMap, MercuryPending> = HashMap::new(), - subscriptions: HashMap> = HashMap::new(), + subscriptions: Vec<(String, mpsc::UnboundedSender)> = Vec::new(), } } @@ -114,14 +114,21 @@ impl MercuryManager { let (tx, rx) = mpsc::unbounded(); manager.lock(move |inner| { - for sub in response.payload { - let mut sub : protocol::pubsub::Subscription - = protobuf::parse_from_bytes(&sub).unwrap(); - let sub_uri = sub.take_uri(); + debug!("subscribed uri={} count={}", uri, response.payload.len()); + if response.payload.len() > 0 { + // Old subscription protocol, watch the provided list of URIs + for sub in response.payload { + let mut sub : protocol::pubsub::Subscription + = protobuf::parse_from_bytes(&sub).unwrap(); + let sub_uri = sub.take_uri(); - debug!("subscribed {} ({})", uri, sub_uri); + debug!("subscribed sub_uri={}", sub_uri); - inner.subscriptions.insert(sub_uri, tx.clone()); + inner.subscriptions.push((sub_uri, tx.clone())); + } + } else { + // New subscription protocol, watch the requested URI + inner.subscriptions.push((uri, tx)); } }); @@ -197,12 +204,22 @@ impl MercuryManager { } else { if cmd == 0xb5 { self.lock(|inner| { - use std::collections::hash_map::Entry; - if let Entry::Occupied(entry) = inner.subscriptions.entry(response.uri.clone()) { - // TODO: send unsub message - if entry.get().send(response).is_err() { - entry.remove(); + let mut found = false; + inner.subscriptions.retain(|&(ref prefix, ref sub)| { + if response.uri.starts_with(prefix) { + found = true; + + // if send fails, remove from list of subs + // TODO: send unsub message + sub.send(response.clone()).is_ok() + } else { + // URI doesn't match + true } + }); + + if !found { + debug!("unknown subscription uri={}", response.uri); } }) } else if let Some(cb) = pending.callback { diff --git a/src/mercury/types.rs b/src/mercury/types.rs index 22683301..9952b533 100644 --- a/src/mercury/types.rs +++ b/src/mercury/types.rs @@ -20,7 +20,7 @@ pub struct MercuryRequest { pub payload: Vec>, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MercuryResponse { pub uri: String, pub status_code: i32,