diff --git a/core/src/channel.rs b/core/src/channel.rs index 98ab62e6..daf02c7d 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -14,7 +14,7 @@ component! { download_rate_estimate: usize = 0, download_measurement_start: Option = None, download_measurement_bytes: usize = 0, - is_shutdown: bool = false, + invalid: bool = false, } } @@ -47,7 +47,7 @@ impl ChannelManager { let seq = self.lock(|inner| { let seq = inner.sequence.get(); - if !inner.is_shutdown { + if !inner.invalid { inner.channels.insert(seq, tx); } seq @@ -93,7 +93,7 @@ impl ChannelManager { pub(crate) fn shutdown(&self) { self.lock(|inner| { - inner.is_shutdown = true; + inner.invalid = true; // destroy the sending halves of the channels to signal everyone who is waiting for something. inner.channels.clear(); }); diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 2fb6b878..1eb0e7e2 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -20,7 +20,7 @@ component! { sequence: SeqGenerator = SeqGenerator::new(0), pending: HashMap, MercuryPending> = HashMap::new(), subscriptions: Vec<(String, mpsc::UnboundedSender)> = Vec::new(), - is_shutdown: bool = false, + invalid: bool = false, } } @@ -63,7 +63,7 @@ impl MercuryManager { let seq = self.next_seq(); self.lock(|inner| { - if !inner.is_shutdown { + if !inner.invalid { inner.pending.insert(seq.clone(), pending); } }); @@ -114,7 +114,7 @@ impl MercuryManager { let (tx, rx) = mpsc::unbounded(); manager.lock(move |inner| { - if !inner.is_shutdown { + if !inner.invalid { debug!("subscribed uri={} count={}", uri, response.payload.len()); if response.payload.len() > 0 { // Old subscription protocol, watch the provided list of URIs @@ -232,7 +232,7 @@ impl MercuryManager { pub(crate) fn shutdown(&self) { self.lock(|inner| { - inner.is_shutdown = true; + inner.invalid = true; // destroy the sending halves of the channels to signal everyone who is waiting for something. inner.pending.clear(); inner.subscriptions.clear();