From 3fe384958833e983ac58da25094ffdc4e0b8d27c Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 23 Jan 2020 01:14:43 +1100 Subject: [PATCH] Enable Mercury to be shut down and all pending requests being cancelled. --- core/src/mercury/mod.rs | 42 ++++++++++++++++++++++++++++------------- core/src/session.rs | 1 + 2 files changed, 30 insertions(+), 13 deletions(-) diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 0b69d8ee..2fb6b878 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -20,6 +20,7 @@ component! { sequence: SeqGenerator = SeqGenerator::new(0), pending: HashMap, MercuryPending> = HashMap::new(), subscriptions: Vec<(String, mpsc::UnboundedSender)> = Vec::new(), + is_shutdown: bool = false, } } @@ -61,7 +62,11 @@ impl MercuryManager { }; let seq = self.next_seq(); - self.lock(|inner| inner.pending.insert(seq.clone(), pending)); + self.lock(|inner| { + if !inner.is_shutdown { + inner.pending.insert(seq.clone(), pending); + } + }); let cmd = req.method.command(); let data = req.encode(&seq); @@ -109,21 +114,23 @@ impl MercuryManager { let (tx, rx) = mpsc::unbounded(); manager.lock(move |inner| { - debug!("subscribed uri={} count={}", uri, response.payload.len()); - if response.payload.len() > 0 { - // Old subscription protocol, watch the provided list of URIs - for sub in response.payload { - let mut sub: protocol::pubsub::Subscription = - protobuf::parse_from_bytes(&sub).unwrap(); - let sub_uri = sub.take_uri(); + if !inner.is_shutdown { + debug!("subscribed uri={} count={}", uri, response.payload.len()); + if response.payload.len() > 0 { + // Old subscription protocol, watch the provided list of URIs + for sub in response.payload { + let mut sub: protocol::pubsub::Subscription = + protobuf::parse_from_bytes(&sub).unwrap(); + let sub_uri = sub.take_uri(); - debug!("subscribed sub_uri={}", sub_uri); + debug!("subscribed sub_uri={}", sub_uri); - inner.subscriptions.push((sub_uri, tx.clone())); + inner.subscriptions.push((sub_uri, tx.clone())); + } + } else { + // New subscription protocol, watch the requested URI + inner.subscriptions.push((uri, tx)); } - } else { - // New subscription protocol, watch the requested URI - inner.subscriptions.push((uri, tx)); } }); @@ -222,4 +229,13 @@ impl MercuryManager { } } } + + pub(crate) fn shutdown(&self) { + self.lock(|inner| { + inner.is_shutdown = true; + // destroy the sending halves of the channels to signal everyone who is waiting for something. + inner.pending.clear(); + inner.subscriptions.clear(); + }); + } } diff --git a/core/src/session.rs b/core/src/session.rs index 7695eb55..e661e4ff 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -237,6 +237,7 @@ impl Session { pub fn shutdown(&self) { debug!("Invalidating session[{}]", self.0.session_id); self.0.data.write().unwrap().invalid = true; + self.mercury().shutdown(); } pub fn is_invalid(&self) -> bool {