diff --git a/core/src/channel.rs b/core/src/channel.rs index 387b3966..4a78a4aa 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -7,7 +7,7 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; use futures_core::Stream; use futures_util::lock::BiLock; -use futures_util::StreamExt; +use futures_util::{ready, StreamExt}; use tokio::sync::mpsc; use crate::util::SeqGenerator; @@ -107,10 +107,7 @@ impl ChannelManager { impl Channel { fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll> { - let (cmd, packet) = match self.receiver.poll_recv(cx) { - Poll::Pending => return Poll::Pending, - Poll::Ready(o) => o.ok_or(ChannelError)?, - }; + let (cmd, packet) = ready!(self.receiver.poll_recv(cx)).ok_or(ChannelError)?; if cmd == 0xa { let code = BigEndian::read_u16(&packet.as_ref()[..2]); @@ -140,11 +137,7 @@ impl Stream for Channel { ChannelState::Closed => panic!("Polling already terminated channel"), ChannelState::Header(mut data) => { if data.is_empty() { - data = match self.recv_packet(cx) { - Poll::Ready(Ok(x)) => x, - Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))), - Poll::Pending => return Poll::Pending, - }; + data = ready!(self.recv_packet(cx))?; } let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; @@ -163,11 +156,7 @@ impl Stream for Channel { } ChannelState::Data => { - let data = match self.recv_packet(cx) { - Poll::Ready(Ok(x)) => x, - Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))), - Poll::Pending => return Poll::Pending, - }; + let data = ready!(self.recv_packet(cx))?; if data.is_empty() { self.receiver.close(); self.state = ChannelState::Closed; @@ -186,18 +175,10 @@ impl Stream for ChannelData { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut channel = match self.0.poll_lock(cx) { - Poll::Ready(c) => c, - Poll::Pending => return Poll::Pending, - }; + let mut channel = ready!(self.0.poll_lock(cx)); loop { - let event = match channel.poll_next_unpin(cx) { - Poll::Ready(x) => x.transpose()?, - Poll::Pending => return Poll::Pending, - }; - - match event { + match ready!(channel.poll_next_unpin(cx)?) { Some(ChannelEvent::Header(..)) => (), Some(ChannelEvent::Data(data)) => return Poll::Ready(Some(Ok(data))), None => return Poll::Ready(None), @@ -210,19 +191,11 @@ impl Stream for ChannelHeaders { type Item = Result<(u8, Vec), ChannelError>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut channel = match self.0.poll_lock(cx) { - Poll::Ready(c) => c, - Poll::Pending => return Poll::Pending, - }; + let mut channel = ready!(self.0.poll_lock(cx)); - let event = match channel.poll_next_unpin(cx) { - Poll::Ready(x) => x.transpose()?, - Poll::Pending => return Poll::Pending, - }; - - match event { + match ready!(channel.poll_next_unpin(cx)?) { Some(ChannelEvent::Header(id, data)) => Poll::Ready(Some(Ok((id, data)))), - Some(ChannelEvent::Data(..)) | None => Poll::Ready(None), + _ => Poll::Ready(None), } } } diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index ef04e985..3ea15448 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -7,6 +7,7 @@ use std::task::Poll; use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; +use futures_util::FutureExt; use tokio::sync::{mpsc, oneshot}; use crate::protocol; @@ -41,11 +42,7 @@ impl Future for MercuryFuture { type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.receiver).poll(cx) { - Poll::Ready(Ok(x)) => Poll::Ready(x), - Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)), - Poll::Pending => Poll::Pending, - } + self.receiver.poll_unpin(cx).map_err(|_| MercuryError)? } } diff --git a/core/src/session.rs b/core/src/session.rs index 388ef391..6c4abc54 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -10,7 +10,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; use futures_core::TryStream; -use futures_util::{future, StreamExt, TryStreamExt}; +use futures_util::{future, ready, StreamExt, TryStreamExt}; use once_cell::sync::OnceCell; use thiserror::Error; use tokio::sync::mpsc; @@ -287,18 +287,17 @@ where }; loop { - let (cmd, data) = match self.0.try_poll_next_unpin(cx) { - Poll::Ready(Some(Ok(t))) => t, - Poll::Ready(None) => { + let (cmd, data) = match ready!(self.0.try_poll_next_unpin(cx)) { + Some(Ok(t)) => t, + None => { warn!("Connection to server closed."); session.shutdown(); return Poll::Ready(Ok(())); } - Poll::Ready(Some(Err(e))) => { + Some(Err(e)) => { session.shutdown(); return Poll::Ready(Err(e)); } - Poll::Pending => return Poll::Pending, }; session.dispatch(cmd, data);