Implement MercurySender not as sink

This commit is contained in:
johannesd3 2021-02-20 20:59:57 +01:00 committed by Johannes Dertmann
parent daf7ecd23a
commit 2c81aaaf4e
2 changed files with 27 additions and 67 deletions

View file

@ -9,9 +9,9 @@ use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State,
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::future::{self, FusedFuture}; use futures::future::{self, FusedFuture};
use futures::stream::FusedStream; use futures::stream::FusedStream;
use futures::{Future, FutureExt, Sink, SinkExt, StreamExt}; use futures::{Future, FutureExt, StreamExt};
use librespot_core::config::{ConnectConfig, VolumeCtrl}; use librespot_core::config::{ConnectConfig, VolumeCtrl};
use librespot_core::mercury::MercuryError; use librespot_core::mercury::{MercuryError, MercurySender};
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError}; use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError};
use librespot_core::util::url_encode; use librespot_core::util::url_encode;
@ -42,7 +42,6 @@ enum SpircPlayStatus {
type BoxedFuture<T> = Pin<Box<dyn FusedFuture<Output = T> + Send>>; type BoxedFuture<T> = Pin<Box<dyn FusedFuture<Output = T> + Send>>;
type BoxedStream<T> = Pin<Box<dyn FusedStream<Item = T> + Send>>; type BoxedStream<T> = Pin<Box<dyn FusedStream<Item = T> + Send>>;
type BoxedSink<T, E> = Pin<Box<dyn Sink<T, Error = E> + Send>>;
struct SpircTask { struct SpircTask {
player: Player, player: Player,
@ -58,9 +57,8 @@ struct SpircTask {
mixer_started: bool, mixer_started: bool,
play_status: SpircPlayStatus, play_status: SpircPlayStatus,
sender_flushed: bool,
subscription: BoxedStream<Frame>, subscription: BoxedStream<Frame>,
sender: BoxedSink<Frame, MercuryError>, sender: MercurySender,
commands: mpsc::UnboundedReceiver<SpircCommand>, commands: mpsc::UnboundedReceiver<SpircCommand>,
player_events: PlayerEventChannel, player_events: PlayerEventChannel,
@ -272,12 +270,7 @@ impl Spirc {
}), }),
); );
let sender = Box::pin( let sender = session.mercury().sender(uri);
session
.mercury()
.sender(uri)
.with(|frame: Frame| future::ready(Ok(frame.write_to_bytes().unwrap()))),
);
let (cmd_tx, cmd_rx) = mpsc::unbounded(); let (cmd_tx, cmd_rx) = mpsc::unbounded();
@ -311,7 +304,6 @@ impl Spirc {
commands: cmd_rx, commands: cmd_rx,
player_events: player_events, player_events: player_events,
sender_flushed: true,
shutdown: false, shutdown: false,
session: session, session: session,
@ -372,13 +364,9 @@ impl SpircTask {
event = self.player_events.next(), if !self.player_events.is_terminated() => if let Some(event) = event { event = self.player_events.next(), if !self.player_events.is_terminated() => if let Some(event) = event {
self.handle_player_event(event) self.handle_player_event(event)
}, },
result = self.sender.flush(), if !self.sender_flushed => { result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() {
if result.is_err() { error!("Cannot flush spirc event sender.");
error!("Cannot flush spirc event sender."); break;
break;
}
self.sender_flushed = true;
}, },
context = &mut self.context_fut, if !self.context_fut.is_terminated() => { context = &mut self.context_fut, if !self.context_fut.is_terminated() => {
match context { match context {
@ -424,8 +412,8 @@ impl SpircTask {
} }
} }
if self.sender.close().await.is_err() { if self.sender.flush().await.is_err() {
warn!("Cannot close spirc event sender."); warn!("Cannot flush spirc event sender.");
} }
} }
@ -1320,14 +1308,7 @@ impl<'a> CommandSender<'a> {
if !self.frame.has_state() && self.spirc.device.get_is_active() { if !self.frame.has_state() && self.spirc.device.get_is_active() {
self.frame.set_state(self.spirc.state.clone()); self.frame.set_state(self.spirc.state.clone());
} }
let sender = &mut self.spirc.sender;
future::poll_fn(|cx| sender.as_mut().poll_ready(cx)) self.spirc.sender.send(self.frame.write_to_bytes().unwrap());
.now_or_never()
.unwrap()
.unwrap();
sender.as_mut().start_send(self.frame).unwrap();
self.spirc.sender_flushed = false;
} }
} }

View file

@ -1,5 +1,4 @@
use futures::Sink; use std::collections::VecDeque;
use std::{collections::VecDeque, pin::Pin, task::Context};
use super::*; use super::*;
@ -18,6 +17,22 @@ impl MercurySender {
pending: VecDeque::new(), pending: VecDeque::new(),
} }
} }
pub fn is_flushed(&self) -> bool {
self.pending.is_empty()
}
pub fn send(&mut self, item: Vec<u8>) {
let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task);
}
pub async fn flush(&mut self) -> Result<(), MercuryError> {
for fut in self.pending.drain(..) {
fut.await?;
}
Ok(())
}
} }
impl Clone for MercurySender { impl Clone for MercurySender {
@ -29,39 +44,3 @@ impl Clone for MercurySender {
} }
} }
} }
impl Sink<Vec<u8>> for MercurySender {
type Error = MercuryError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
match self.pending.front_mut() {
Some(task) => {
match Pin::new(task).poll(cx) {
Poll::Ready(Err(x)) => return Poll::Ready(Err(x)),
Poll::Pending => return Poll::Pending,
_ => (),
};
}
None => {
return Poll::Ready(Ok(()));
}
}
self.pending.pop_front();
}
}
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task);
Ok(())
}
}