Merge pull request #699 from Johannesd3/ready-macro

Use `ready!` macro to reduce boilerplate
This commit is contained in:
Sasha Hilton 2021-05-01 01:25:14 +01:00 committed by GitHub
commit 9d5ac1b156
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 16 additions and 47 deletions

View file

@ -7,7 +7,7 @@ use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures_core::Stream; use futures_core::Stream;
use futures_util::lock::BiLock; use futures_util::lock::BiLock;
use futures_util::StreamExt; use futures_util::{ready, StreamExt};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use crate::util::SeqGenerator; use crate::util::SeqGenerator;
@ -107,10 +107,7 @@ impl ChannelManager {
impl Channel { impl Channel {
fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll<Result<Bytes, ChannelError>> { fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll<Result<Bytes, ChannelError>> {
let (cmd, packet) = match self.receiver.poll_recv(cx) { let (cmd, packet) = ready!(self.receiver.poll_recv(cx)).ok_or(ChannelError)?;
Poll::Pending => return Poll::Pending,
Poll::Ready(o) => o.ok_or(ChannelError)?,
};
if cmd == 0xa { if cmd == 0xa {
let code = BigEndian::read_u16(&packet.as_ref()[..2]); let code = BigEndian::read_u16(&packet.as_ref()[..2]);
@ -140,11 +137,7 @@ impl Stream for Channel {
ChannelState::Closed => panic!("Polling already terminated channel"), ChannelState::Closed => panic!("Polling already terminated channel"),
ChannelState::Header(mut data) => { ChannelState::Header(mut data) => {
if data.is_empty() { if data.is_empty() {
data = match self.recv_packet(cx) { data = ready!(self.recv_packet(cx))?;
Poll::Ready(Ok(x)) => x,
Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))),
Poll::Pending => return Poll::Pending,
};
} }
let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
@ -163,11 +156,7 @@ impl Stream for Channel {
} }
ChannelState::Data => { ChannelState::Data => {
let data = match self.recv_packet(cx) { let data = ready!(self.recv_packet(cx))?;
Poll::Ready(Ok(x)) => x,
Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))),
Poll::Pending => return Poll::Pending,
};
if data.is_empty() { if data.is_empty() {
self.receiver.close(); self.receiver.close();
self.state = ChannelState::Closed; self.state = ChannelState::Closed;
@ -186,18 +175,10 @@ impl Stream for ChannelData {
type Item = Result<Bytes, ChannelError>; type Item = Result<Bytes, ChannelError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock(cx) { let mut channel = ready!(self.0.poll_lock(cx));
Poll::Ready(c) => c,
Poll::Pending => return Poll::Pending,
};
loop { loop {
let event = match channel.poll_next_unpin(cx) { match ready!(channel.poll_next_unpin(cx)?) {
Poll::Ready(x) => x.transpose()?,
Poll::Pending => return Poll::Pending,
};
match event {
Some(ChannelEvent::Header(..)) => (), Some(ChannelEvent::Header(..)) => (),
Some(ChannelEvent::Data(data)) => return Poll::Ready(Some(Ok(data))), Some(ChannelEvent::Data(data)) => return Poll::Ready(Some(Ok(data))),
None => return Poll::Ready(None), None => return Poll::Ready(None),
@ -210,19 +191,11 @@ impl Stream for ChannelHeaders {
type Item = Result<(u8, Vec<u8>), ChannelError>; type Item = Result<(u8, Vec<u8>), ChannelError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock(cx) { let mut channel = ready!(self.0.poll_lock(cx));
Poll::Ready(c) => c,
Poll::Pending => return Poll::Pending,
};
let event = match channel.poll_next_unpin(cx) { match ready!(channel.poll_next_unpin(cx)?) {
Poll::Ready(x) => x.transpose()?,
Poll::Pending => return Poll::Pending,
};
match event {
Some(ChannelEvent::Header(id, data)) => Poll::Ready(Some(Ok((id, data)))), Some(ChannelEvent::Header(id, data)) => Poll::Ready(Some(Ok((id, data)))),
Some(ChannelEvent::Data(..)) | None => Poll::Ready(None), _ => Poll::Ready(None),
} }
} }
} }

View file

@ -7,6 +7,7 @@ use std::task::Poll;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures_util::FutureExt;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::protocol; use crate::protocol;
@ -41,11 +42,7 @@ impl<T> Future for MercuryFuture<T> {
type Output = Result<T, MercuryError>; type Output = Result<T, MercuryError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.receiver).poll(cx) { self.receiver.poll_unpin(cx).map_err(|_| MercuryError)?
Poll::Ready(Ok(x)) => Poll::Ready(x),
Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)),
Poll::Pending => Poll::Pending,
}
} }
} }

View file

@ -10,7 +10,7 @@ use std::time::{SystemTime, UNIX_EPOCH};
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures_core::TryStream; use futures_core::TryStream;
use futures_util::{future, StreamExt, TryStreamExt}; use futures_util::{future, ready, StreamExt, TryStreamExt};
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -287,18 +287,17 @@ where
}; };
loop { loop {
let (cmd, data) = match self.0.try_poll_next_unpin(cx) { let (cmd, data) = match ready!(self.0.try_poll_next_unpin(cx)) {
Poll::Ready(Some(Ok(t))) => t, Some(Ok(t)) => t,
Poll::Ready(None) => { None => {
warn!("Connection to server closed."); warn!("Connection to server closed.");
session.shutdown(); session.shutdown();
return Poll::Ready(Ok(())); return Poll::Ready(Ok(()));
} }
Poll::Ready(Some(Err(e))) => { Some(Err(e)) => {
session.shutdown(); session.shutdown();
return Poll::Ready(Err(e)); return Poll::Ready(Err(e));
} }
Poll::Pending => return Poll::Pending,
}; };
session.dispatch(cmd, data); session.dispatch(cmd, data);