Add support for new mercury subscription protocol.

Fixes #185
This commit is contained in:
Paul Lietar 2017-05-15 23:47:34 +01:00
parent 7c237c77df
commit 6f24e3b731
2 changed files with 30 additions and 13 deletions

View file

@ -19,7 +19,7 @@ component! {
MercuryManager : MercuryManagerInner { MercuryManager : MercuryManagerInner {
sequence: SeqGenerator<u64> = SeqGenerator::new(0), sequence: SeqGenerator<u64> = SeqGenerator::new(0),
pending: HashMap<Vec<u8>, MercuryPending> = HashMap::new(), pending: HashMap<Vec<u8>, MercuryPending> = HashMap::new(),
subscriptions: HashMap<String, mpsc::UnboundedSender<MercuryResponse>> = HashMap::new(), subscriptions: Vec<(String, mpsc::UnboundedSender<MercuryResponse>)> = Vec::new(),
} }
} }
@ -114,14 +114,21 @@ impl MercuryManager {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
manager.lock(move |inner| { manager.lock(move |inner| {
for sub in response.payload { debug!("subscribed uri={} count={}", uri, response.payload.len());
let mut sub : protocol::pubsub::Subscription if response.payload.len() > 0 {
= protobuf::parse_from_bytes(&sub).unwrap(); // Old subscription protocol, watch the provided list of URIs
let sub_uri = sub.take_uri(); for sub in response.payload {
let mut sub : protocol::pubsub::Subscription
= protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
debug!("subscribed {} ({})", uri, sub_uri); debug!("subscribed sub_uri={}", sub_uri);
inner.subscriptions.insert(sub_uri, tx.clone()); inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
} }
}); });
@ -197,12 +204,22 @@ impl MercuryManager {
} else { } else {
if cmd == 0xb5 { if cmd == 0xb5 {
self.lock(|inner| { self.lock(|inner| {
use std::collections::hash_map::Entry; let mut found = false;
if let Entry::Occupied(entry) = inner.subscriptions.entry(response.uri.clone()) { inner.subscriptions.retain(|&(ref prefix, ref sub)| {
// TODO: send unsub message if response.uri.starts_with(prefix) {
if entry.get().send(response).is_err() { found = true;
entry.remove();
// if send fails, remove from list of subs
// TODO: send unsub message
sub.send(response.clone()).is_ok()
} else {
// URI doesn't match
true
} }
});
if !found {
debug!("unknown subscription uri={}", response.uri);
} }
}) })
} else if let Some(cb) = pending.callback { } else if let Some(cb) = pending.callback {

View file

@ -20,7 +20,7 @@ pub struct MercuryRequest {
pub payload: Vec<Vec<u8>>, pub payload: Vec<Vec<u8>>,
} }
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct MercuryResponse { pub struct MercuryResponse {
pub uri: String, pub uri: String,
pub status_code: i32, pub status_code: i32,