From 2c81aaaf4e440a41ca5c2a37f0f760e70958951b Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Sat, 20 Feb 2021 20:59:57 +0100 Subject: [PATCH] Implement MercurySender not as sink --- connect/src/spirc.rs | 39 +++++++-------------------- core/src/mercury/sender.rs | 55 ++++++++++++-------------------------- 2 files changed, 27 insertions(+), 67 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 5dc89599..2e3694e4 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -9,9 +9,9 @@ use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, use futures::channel::mpsc; use futures::future::{self, FusedFuture}; use futures::stream::FusedStream; -use futures::{Future, FutureExt, Sink, SinkExt, StreamExt}; +use futures::{Future, FutureExt, StreamExt}; use librespot_core::config::{ConnectConfig, VolumeCtrl}; -use librespot_core::mercury::MercuryError; +use librespot_core::mercury::{MercuryError, MercurySender}; use librespot_core::session::Session; use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError}; use librespot_core::util::url_encode; @@ -42,7 +42,6 @@ enum SpircPlayStatus { type BoxedFuture = Pin + Send>>; type BoxedStream = Pin + Send>>; -type BoxedSink = Pin + Send>>; struct SpircTask { player: Player, @@ -58,9 +57,8 @@ struct SpircTask { mixer_started: bool, play_status: SpircPlayStatus, - sender_flushed: bool, subscription: BoxedStream, - sender: BoxedSink, + sender: MercurySender, commands: mpsc::UnboundedReceiver, player_events: PlayerEventChannel, @@ -272,12 +270,7 @@ impl Spirc { }), ); - let sender = Box::pin( - session - .mercury() - .sender(uri) - .with(|frame: Frame| future::ready(Ok(frame.write_to_bytes().unwrap()))), - ); + let sender = session.mercury().sender(uri); let (cmd_tx, cmd_rx) = mpsc::unbounded(); @@ -311,7 +304,6 @@ impl Spirc { commands: cmd_rx, player_events: player_events, - sender_flushed: true, shutdown: false, session: session, @@ -372,13 +364,9 @@ impl SpircTask { event = self.player_events.next(), if !self.player_events.is_terminated() => if let Some(event) = event { self.handle_player_event(event) }, - result = self.sender.flush(), if !self.sender_flushed => { - if result.is_err() { - error!("Cannot flush spirc event sender."); - break; - } - - self.sender_flushed = true; + result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() { + error!("Cannot flush spirc event sender."); + break; }, context = &mut self.context_fut, if !self.context_fut.is_terminated() => { match context { @@ -424,8 +412,8 @@ impl SpircTask { } } - if self.sender.close().await.is_err() { - warn!("Cannot close spirc event sender."); + if self.sender.flush().await.is_err() { + warn!("Cannot flush spirc event sender."); } } @@ -1320,14 +1308,7 @@ impl<'a> CommandSender<'a> { if !self.frame.has_state() && self.spirc.device.get_is_active() { self.frame.set_state(self.spirc.state.clone()); } - let sender = &mut self.spirc.sender; - future::poll_fn(|cx| sender.as_mut().poll_ready(cx)) - .now_or_never() - .unwrap() - .unwrap(); - - sender.as_mut().start_send(self.frame).unwrap(); - self.spirc.sender_flushed = false; + self.spirc.sender.send(self.frame.write_to_bytes().unwrap()); } } diff --git a/core/src/mercury/sender.rs b/core/src/mercury/sender.rs index 860c2f33..e276bcf1 100644 --- a/core/src/mercury/sender.rs +++ b/core/src/mercury/sender.rs @@ -1,5 +1,4 @@ -use futures::Sink; -use std::{collections::VecDeque, pin::Pin, task::Context}; +use std::collections::VecDeque; use super::*; @@ -18,6 +17,22 @@ impl MercurySender { pending: VecDeque::new(), } } + + pub fn is_flushed(&self) -> bool { + self.pending.is_empty() + } + + pub fn send(&mut self, item: Vec) { + let task = self.mercury.send(self.uri.clone(), item); + self.pending.push_back(task); + } + + pub async fn flush(&mut self) -> Result<(), MercuryError> { + for fut in self.pending.drain(..) { + fut.await?; + } + Ok(()) + } } impl Clone for MercurySender { @@ -29,39 +44,3 @@ impl Clone for MercurySender { } } } - -impl Sink> for MercurySender { - type Error = MercuryError; - - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_flush(cx) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - match self.pending.front_mut() { - Some(task) => { - match Pin::new(task).poll(cx) { - Poll::Ready(Err(x)) => return Poll::Ready(Err(x)), - Poll::Pending => return Poll::Pending, - _ => (), - }; - } - None => { - return Poll::Ready(Ok(())); - } - } - self.pending.pop_front(); - } - } - - fn start_send(mut self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { - let task = self.mercury.send(self.uri.clone(), item); - self.pending.push_back(task); - Ok(()) - } -}