diff --git a/core/src/mercury/sender.rs b/core/src/mercury/sender.rs index 383d449d..268554d9 100644 --- a/core/src/mercury/sender.rs +++ b/core/src/mercury/sender.rs @@ -6,20 +6,21 @@ pub struct MercurySender { mercury: MercuryManager, uri: String, pending: VecDeque>, + buffered_future: Option>, } impl MercurySender { - // TODO: pub(super) when stable pub(crate) fn new(mercury: MercuryManager, uri: String) -> MercurySender { MercurySender { mercury, uri, pending: VecDeque::new(), + buffered_future: None, } } pub fn is_flushed(&self) -> bool { - self.pending.is_empty() + self.buffered_future.is_none() && self.pending.is_empty() } pub fn send(&mut self, item: Vec) { @@ -28,8 +29,13 @@ impl MercurySender { } pub async fn flush(&mut self) -> Result<(), MercuryError> { - for fut in self.pending.drain(..) { + if self.buffered_future.is_none() { + self.buffered_future = self.pending.pop_front(); + } + + while let Some(fut) = self.buffered_future.as_mut() { fut.await?; + self.buffered_future = self.pending.pop_front(); } Ok(()) } @@ -41,6 +47,7 @@ impl Clone for MercurySender { mercury: self.mercury.clone(), uri: self.uri.clone(), pending: VecDeque::new(), + buffered_future: None, } } }