Merge pull request #449 from kaymes/blocking_sink_events

Add blocking SinkActive|SinkInactive events
This commit is contained in:
Sasha Hilton 2020-07-24 03:07:38 +01:00 committed by GitHub
commit 6eabf4a75c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 88 additions and 16 deletions

View file

@ -33,6 +33,15 @@ pub struct Player {
play_request_id_generator: SeqGenerator<u64>, play_request_id_generator: SeqGenerator<u64>,
} }
#[derive(PartialEq, Debug, Clone, Copy)]
pub enum SinkStatus {
Running,
Closed,
TemporarilyClosed,
}
pub type SinkEventCallback = Box<dyn Fn(SinkStatus) + Send>;
struct PlayerInternal { struct PlayerInternal {
session: Session, session: Session,
config: PlayerConfig, config: PlayerConfig,
@ -41,7 +50,8 @@ struct PlayerInternal {
state: PlayerState, state: PlayerState,
preload: PlayerPreload, preload: PlayerPreload,
sink: Box<dyn Sink>, sink: Box<dyn Sink>,
sink_running: bool, sink_status: SinkStatus,
sink_event_callback: Option<SinkEventCallback>,
audio_filter: Option<Box<dyn AudioFilter + Send>>, audio_filter: Option<Box<dyn AudioFilter + Send>>,
event_senders: Vec<futures::sync::mpsc::UnboundedSender<PlayerEvent>>, event_senders: Vec<futures::sync::mpsc::UnboundedSender<PlayerEvent>>,
} }
@ -61,6 +71,7 @@ enum PlayerCommand {
Stop, Stop,
Seek(u32), Seek(u32),
AddEventSender(futures::sync::mpsc::UnboundedSender<PlayerEvent>), AddEventSender(futures::sync::mpsc::UnboundedSender<PlayerEvent>),
SetSinkEventCallback(Option<SinkEventCallback>),
EmitVolumeSetEvent(u16), EmitVolumeSetEvent(u16),
} }
@ -240,7 +251,8 @@ impl Player {
state: PlayerState::Stopped, state: PlayerState::Stopped,
preload: PlayerPreload::None, preload: PlayerPreload::None,
sink: sink_builder(), sink: sink_builder(),
sink_running: false, sink_status: SinkStatus::Closed,
sink_event_callback: None,
audio_filter: audio_filter, audio_filter: audio_filter,
event_senders: [event_sender].to_vec(), event_senders: [event_sender].to_vec(),
}; };
@ -316,6 +328,10 @@ impl Player {
Box::new(result) Box::new(result)
} }
pub fn set_sink_event_callback(&self, callback: Option<SinkEventCallback>) {
self.command(PlayerCommand::SetSinkEventCallback(callback));
}
pub fn emit_volume_set_event(&self, volume: u16) { pub fn emit_volume_set_event(&self, volume: u16) {
self.command(PlayerCommand::EmitVolumeSetEvent(volume)); self.command(PlayerCommand::EmitVolumeSetEvent(volume));
} }
@ -917,20 +933,41 @@ impl PlayerInternal {
} }
fn ensure_sink_running(&mut self) { fn ensure_sink_running(&mut self) {
if !self.sink_running { if self.sink_status != SinkStatus::Running {
trace!("== Starting sink =="); trace!("== Starting sink ==");
if let Some(callback) = &mut self.sink_event_callback {
callback(SinkStatus::Running);
}
match self.sink.start() { match self.sink.start() {
Ok(()) => self.sink_running = true, Ok(()) => self.sink_status = SinkStatus::Running,
Err(err) => error!("Could not start audio: {}", err), Err(err) => error!("Could not start audio: {}", err),
} }
} }
} }
fn ensure_sink_stopped(&mut self) { fn ensure_sink_stopped(&mut self, temporarily: bool) {
if self.sink_running { match self.sink_status {
trace!("== Stopping sink =="); SinkStatus::Running => {
self.sink.stop().unwrap(); trace!("== Stopping sink ==");
self.sink_running = false; self.sink.stop().unwrap();
self.sink_status = if temporarily {
SinkStatus::TemporarilyClosed
} else {
SinkStatus::Closed
};
if let Some(callback) = &mut self.sink_event_callback {
callback(self.sink_status);
}
}
SinkStatus::TemporarilyClosed => {
if !temporarily {
self.sink_status = SinkStatus::Closed;
if let Some(callback) = &mut self.sink_event_callback {
callback(SinkStatus::Closed);
}
}
}
SinkStatus::Closed => (),
} }
} }
@ -956,7 +993,7 @@ impl PlayerInternal {
play_request_id, play_request_id,
.. ..
} => { } => {
self.ensure_sink_stopped(); self.ensure_sink_stopped(false);
self.send_event(PlayerEvent::Stopped { self.send_event(PlayerEvent::Stopped {
track_id, track_id,
play_request_id, play_request_id,
@ -1003,7 +1040,7 @@ impl PlayerInternal {
{ {
self.state.playing_to_paused(); self.state.playing_to_paused();
self.ensure_sink_stopped(); self.ensure_sink_stopped(false);
let position_ms = Self::position_pcm_to_ms(stream_position_pcm); let position_ms = Self::position_pcm_to_ms(stream_position_pcm);
self.send_event(PlayerEvent::Paused { self.send_event(PlayerEvent::Paused {
track_id, track_id,
@ -1032,7 +1069,7 @@ impl PlayerInternal {
if let Err(err) = self.sink.write(&packet.data()) { if let Err(err) = self.sink.write(&packet.data()) {
error!("Could not write audio: {}", err); error!("Could not write audio: {}", err);
self.ensure_sink_stopped(); self.ensure_sink_stopped(false);
} }
} }
} }
@ -1090,7 +1127,7 @@ impl PlayerInternal {
suggested_to_preload_next_track: false, suggested_to_preload_next_track: false,
}; };
} else { } else {
self.ensure_sink_stopped(); self.ensure_sink_stopped(false);
self.state = PlayerState::Paused { self.state = PlayerState::Paused {
track_id: track_id, track_id: track_id,
@ -1121,7 +1158,7 @@ impl PlayerInternal {
position_ms: u32, position_ms: u32,
) { ) {
if !self.config.gapless { if !self.config.gapless {
self.ensure_sink_stopped(); self.ensure_sink_stopped(play);
} }
// emit the correct player event // emit the correct player event
match self.state { match self.state {
@ -1289,7 +1326,7 @@ impl PlayerInternal {
// We need to load the track - either from scratch or by completing a preload. // We need to load the track - either from scratch or by completing a preload.
// In any case we go into a Loading state to load the track. // In any case we go into a Loading state to load the track.
self.ensure_sink_stopped(); self.ensure_sink_stopped(play);
self.send_event(PlayerEvent::Loading { self.send_event(PlayerEvent::Loading {
track_id, track_id,
@ -1470,6 +1507,8 @@ impl PlayerInternal {
PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender), PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender),
PlayerCommand::SetSinkEventCallback(callback) => self.sink_event_callback = callback,
PlayerCommand::EmitVolumeSetEvent(volume) => { PlayerCommand::EmitVolumeSetEvent(volume) => {
self.send_event(PlayerEvent::VolumeSet { volume }) self.send_event(PlayerEvent::VolumeSet { volume })
} }
@ -1574,6 +1613,9 @@ impl ::std::fmt::Debug for PlayerCommand {
PlayerCommand::Stop => f.debug_tuple("Stop").finish(), PlayerCommand::Stop => f.debug_tuple("Stop").finish(),
PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(), PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(),
PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(), PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(),
PlayerCommand::SetSinkEventCallback(_) => {
f.debug_tuple("SetSinkEventCallback").finish()
}
PlayerCommand::EmitVolumeSetEvent(volume) => { PlayerCommand::EmitVolumeSetEvent(volume) => {
f.debug_tuple("VolumeSet").field(&volume).finish() f.debug_tuple("VolumeSet").field(&volume).finish()
} }

View file

@ -27,7 +27,7 @@ use librespot::playback::mixer::{self, Mixer, MixerConfig};
use librespot::playback::player::{Player, PlayerEvent}; use librespot::playback::player::{Player, PlayerEvent};
mod player_event_handler; mod player_event_handler;
use crate::player_event_handler::run_program_on_events; use crate::player_event_handler::{emit_sink_event, run_program_on_events};
fn device_id(name: &str) -> String { fn device_id(name: &str) -> String {
hex::encode(Sha1::digest(name.as_bytes())) hex::encode(Sha1::digest(name.as_bytes()))
@ -87,6 +87,7 @@ struct Setup {
enable_discovery: bool, enable_discovery: bool,
zeroconf_port: u16, zeroconf_port: u16,
player_event_program: Option<String>, player_event_program: Option<String>,
emit_sink_events: bool,
} }
fn setup(args: &[String]) -> Setup { fn setup(args: &[String]) -> Setup {
@ -111,6 +112,7 @@ fn setup(args: &[String]) -> Setup {
"Run PROGRAM when playback is about to begin.", "Run PROGRAM when playback is about to begin.",
"PROGRAM", "PROGRAM",
) )
.optflag("", "emit-sink-events", "Run program set by --onevent before sink is opened and after it is closed.")
.optflag("v", "verbose", "Enable verbose output") .optflag("v", "verbose", "Enable verbose output")
.optopt("u", "username", "Username to sign in with", "USERNAME") .optopt("u", "username", "Username to sign in with", "USERNAME")
.optopt("p", "password", "Password", "PASSWORD") .optopt("p", "password", "Password", "PASSWORD")
@ -359,6 +361,7 @@ fn setup(args: &[String]) -> Setup {
mixer: mixer, mixer: mixer,
mixer_config: mixer_config, mixer_config: mixer_config,
player_event_program: matches.opt_str("onevent"), player_event_program: matches.opt_str("onevent"),
emit_sink_events: matches.opt_present("emit-sink-events"),
} }
} }
@ -386,6 +389,7 @@ struct Main {
player_event_channel: Option<UnboundedReceiver<PlayerEvent>>, player_event_channel: Option<UnboundedReceiver<PlayerEvent>>,
player_event_program: Option<String>, player_event_program: Option<String>,
emit_sink_events: bool,
} }
impl Main { impl Main {
@ -412,6 +416,7 @@ impl Main {
player_event_channel: None, player_event_channel: None,
player_event_program: setup.player_event_program, player_event_program: setup.player_event_program,
emit_sink_events: setup.emit_sink_events,
}; };
if setup.enable_discovery { if setup.enable_discovery {
@ -481,6 +486,15 @@ impl Future for Main {
(backend)(device) (backend)(device)
}); });
if self.emit_sink_events {
if let Some(player_event_program) = &self.player_event_program {
let player_event_program = player_event_program.clone();
player.set_sink_event_callback(Some(Box::new(move |sink_status| {
emit_sink_event(sink_status, &player_event_program)
})));
}
}
let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer); let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer);
self.spirc = Some(spirc); self.spirc = Some(spirc);
self.spirc_task = Some(spirc_task); self.spirc_task = Some(spirc_task);

View file

@ -5,6 +5,9 @@ use std::io;
use std::process::Command; use std::process::Command;
use tokio_process::{Child, CommandExt}; use tokio_process::{Child, CommandExt};
use futures::Future;
use librespot::playback::player::SinkStatus;
fn run_program(program: &str, env_vars: HashMap<&str, String>) -> io::Result<Child> { fn run_program(program: &str, env_vars: HashMap<&str, String>) -> io::Result<Child> {
let mut v: Vec<&str> = program.split_whitespace().collect(); let mut v: Vec<&str> = program.split_whitespace().collect();
info!("Running {:?} with environment variables {:?}", v, env_vars); info!("Running {:?} with environment variables {:?}", v, env_vars);
@ -63,3 +66,16 @@ pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option<io::Re
} }
Some(run_program(onevent, env_vars)) Some(run_program(onevent, env_vars))
} }
pub fn emit_sink_event(sink_status: SinkStatus, onevent: &str) {
let mut env_vars = HashMap::new();
env_vars.insert("PLAYER_EVENT", "sink".to_string());
let sink_status = match sink_status {
SinkStatus::Running => "running",
SinkStatus::TemporarilyClosed => "temporarily_closed",
SinkStatus::Closed => "closed",
};
env_vars.insert("SINK_STATUS", sink_status.to_string());
let _ = run_program(onevent, env_vars).and_then(|child| child.wait());
}