Fix bug in MercurySender

This commit is contained in:
Johannesd3 2021-05-23 21:09:07 +02:00
parent 68818758a2
commit 28dd842e05

View file

@ -6,20 +6,21 @@ pub struct MercurySender {
mercury: MercuryManager, mercury: MercuryManager,
uri: String, uri: String,
pending: VecDeque<MercuryFuture<MercuryResponse>>, pending: VecDeque<MercuryFuture<MercuryResponse>>,
buffered_future: Option<MercuryFuture<MercuryResponse>>,
} }
impl MercurySender { impl MercurySender {
// TODO: pub(super) when stable
pub(crate) fn new(mercury: MercuryManager, uri: String) -> MercurySender { pub(crate) fn new(mercury: MercuryManager, uri: String) -> MercurySender {
MercurySender { MercurySender {
mercury, mercury,
uri, uri,
pending: VecDeque::new(), pending: VecDeque::new(),
buffered_future: None,
} }
} }
pub fn is_flushed(&self) -> bool { pub fn is_flushed(&self) -> bool {
self.pending.is_empty() self.buffered_future.is_none() && self.pending.is_empty()
} }
pub fn send(&mut self, item: Vec<u8>) { pub fn send(&mut self, item: Vec<u8>) {
@ -28,8 +29,13 @@ impl MercurySender {
} }
pub async fn flush(&mut self) -> Result<(), MercuryError> { pub async fn flush(&mut self) -> Result<(), MercuryError> {
for fut in self.pending.drain(..) { if self.buffered_future.is_none() {
self.buffered_future = self.pending.pop_front();
}
while let Some(fut) = self.buffered_future.as_mut() {
fut.await?; fut.await?;
self.buffered_future = self.pending.pop_front();
} }
Ok(()) Ok(())
} }
@ -41,6 +47,7 @@ impl Clone for MercurySender {
mercury: self.mercury.clone(), mercury: self.mercury.clone(),
uri: self.uri.clone(), uri: self.uri.clone(),
pending: VecDeque::new(), pending: VecDeque::new(),
buffered_future: None,
} }
} }
} }