Send player event messages over futures aware channel.

This commit is contained in:
Simon Persson 2018-02-20 23:09:48 +01:00
parent 2eb4aa61d3
commit 93af49aadf
4 changed files with 49 additions and 47 deletions

View file

@ -1,7 +1,4 @@
use std::str::FromStr; use std::str::FromStr;
use core::spotify_id::SpotifyId;
use std::sync::mpsc::Sender;
use player::PlayerEvent;
#[derive(Clone, Copy, Debug, Hash, PartialOrd, Ord, PartialEq, Eq)] #[derive(Clone, Copy, Debug, Hash, PartialOrd, Ord, PartialEq, Eq)]
pub enum Bitrate { pub enum Bitrate {
@ -31,14 +28,12 @@ impl Default for Bitrate {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct PlayerConfig { pub struct PlayerConfig {
pub bitrate: Bitrate, pub bitrate: Bitrate,
pub event_sender : Option<Sender<PlayerEvent>>,
} }
impl Default for PlayerConfig { impl Default for PlayerConfig {
fn default() -> PlayerConfig { fn default() -> PlayerConfig {
PlayerConfig { PlayerConfig {
bitrate: Bitrate::default(), bitrate: Bitrate::default(),
event_sender: None,
} }
} }
} }

View file

@ -1,5 +1,6 @@
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::{future, Future}; use futures::{future, Future};
use futures::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded};
use std; use std;
use std::borrow::Cow; use std::borrow::Cow;
use std::io::{Read, Seek, SeekFrom, Result}; use std::io::{Read, Seek, SeekFrom, Result};
@ -32,6 +33,7 @@ struct PlayerInternal {
sink: Box<Sink>, sink: Box<Sink>,
sink_running: bool, sink_running: bool,
audio_filter: Option<Box<AudioFilter + Send>>, audio_filter: Option<Box<AudioFilter + Send>>,
event_sender: UnboundedSender<PlayerEvent>,
} }
enum PlayerCommand { enum PlayerCommand {
@ -58,14 +60,15 @@ pub enum PlayerEvent {
} }
} }
type PlayerEventChannel = UnboundedReceiver<PlayerEvent>;
impl Player { impl Player {
pub fn new<F>(config: PlayerConfig, session: Session, pub fn new<F>(config: PlayerConfig, session: Session,
audio_filter: Option<Box<AudioFilter + Send>>, audio_filter: Option<Box<AudioFilter + Send>>,
sink_builder: F) -> Player sink_builder: F) -> (Player, PlayerEventChannel)
where F: FnOnce() -> Box<Sink> + Send + 'static where F: FnOnce() -> Box<Sink> + Send + 'static
{ {
let (cmd_tx, cmd_rx) = std::sync::mpsc::channel(); let (cmd_tx, cmd_rx) = std::sync::mpsc::channel();
let (event_sender, event_receiver) = unbounded();
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
debug!("new Player[{}]", session.session_id()); debug!("new Player[{}]", session.session_id());
@ -79,15 +82,14 @@ impl Player {
sink: sink_builder(), sink: sink_builder(),
sink_running: false, sink_running: false,
audio_filter: audio_filter, audio_filter: audio_filter,
event_sender: event_sender,
}; };
internal.run(); internal.run();
}); });
Player { (Player { commands: Some(cmd_tx), thread_handle: Some(handle) },
commands: Some(cmd_tx), event_receiver)
thread_handle: Some(handle),
}
} }
fn command(&self, cmd: PlayerCommand) { fn command(&self, cmd: PlayerCommand) {
@ -414,13 +416,11 @@ impl PlayerInternal {
} }
fn send_event(&mut self, event: PlayerEvent) { fn send_event(&mut self, event: PlayerEvent) {
if let Some(ref s) = self.config.event_sender { match self.event_sender.unbounded_send(event.clone()) {
match s.send(event.clone()) {
Ok(_) => info!("Sent event {:?} to event listener.", event), Ok(_) => info!("Sent event {:?} to event listener.", event),
Err(err) => error!("Failed to send event {:?} to listener: {:?}", event, err) Err(err) => error!("Failed to send event {:?} to listener: {:?}", event, err)
} }
} }
}
fn find_available_alternative<'a>(&self, track: &'a Track) -> Option<Cow<'a, Track>> { fn find_available_alternative<'a>(&self, track: &'a Track) -> Option<Cow<'a, Track>> {
if track.available { if track.available {

View file

@ -9,6 +9,7 @@ extern crate tokio_signal;
use env_logger::LogBuilder; use env_logger::LogBuilder;
use futures::{Future, Async, Poll, Stream}; use futures::{Future, Async, Poll, Stream};
use futures::sync::mpsc::UnboundedReceiver;
use std::env; use std::env;
use std::io::{self, stderr, Write}; use std::io::{self, stderr, Write};
use std::path::PathBuf; use std::path::PathBuf;
@ -28,7 +29,7 @@ use librespot::playback::audio_backend::{self, Sink, BACKENDS};
use librespot::playback::config::{Bitrate, PlayerConfig}; use librespot::playback::config::{Bitrate, PlayerConfig};
use librespot::connect::discovery::{discovery, DiscoveryStream}; use librespot::connect::discovery::{discovery, DiscoveryStream};
use librespot::playback::mixer::{self, Mixer}; use librespot::playback::mixer::{self, Mixer};
use librespot::playback::player::Player; use librespot::playback::player::{Player, PlayerEvent};
use librespot::connect::spirc::{Spirc, SpircTask}; use librespot::connect::spirc::{Spirc, SpircTask};
mod player_event_handler; mod player_event_handler;
@ -86,6 +87,7 @@ struct Setup {
credentials: Option<Credentials>, credentials: Option<Credentials>,
enable_discovery: bool, enable_discovery: bool,
zeroconf_port: u16, zeroconf_port: u16,
player_event_program: Option<String>,
} }
fn setup(args: &[String]) -> Setup { fn setup(args: &[String]) -> Setup {
@ -185,10 +187,7 @@ fn setup(args: &[String]) -> Setup {
.map(|bitrate| Bitrate::from_str(bitrate).expect("Invalid bitrate")) .map(|bitrate| Bitrate::from_str(bitrate).expect("Invalid bitrate"))
.unwrap_or(Bitrate::default()); .unwrap_or(Bitrate::default());
PlayerConfig { PlayerConfig { bitrate: bitrate }
bitrate: bitrate,
event_sender: matches.opt_str("onevent").map(run_program_on_events)
}
}; };
let connect_config = { let connect_config = {
@ -216,6 +215,7 @@ fn setup(args: &[String]) -> Setup {
enable_discovery: enable_discovery, enable_discovery: enable_discovery,
zeroconf_port: zeroconf_port, zeroconf_port: zeroconf_port,
mixer: mixer, mixer: mixer,
player_event_program: matches.opt_str("onevent"),
} }
} }
@ -237,6 +237,9 @@ struct Main {
connect: Box<Future<Item=Session, Error=io::Error>>, connect: Box<Future<Item=Session, Error=io::Error>>,
shutdown: bool, shutdown: bool,
player_event_channel: Option<UnboundedReceiver<PlayerEvent>>,
player_event_program: Option<String>,
} }
impl Main { impl Main {
@ -257,6 +260,9 @@ impl Main {
spirc_task: None, spirc_task: None,
shutdown: false, shutdown: false,
signal: Box::new(tokio_signal::ctrl_c(&handle).flatten_stream()), signal: Box::new(tokio_signal::ctrl_c(&handle).flatten_stream()),
player_event_channel: None,
player_event_program: setup.player_event_program,
}; };
if setup.enable_discovery { if setup.enable_discovery {
@ -314,13 +320,14 @@ impl Future for Main {
let audio_filter = mixer.get_audio_filter(); let audio_filter = mixer.get_audio_filter();
let backend = self.backend; let backend = self.backend;
let player = Player::new(player_config, session.clone(), audio_filter, move || { let (player, event_channel) = Player::new(player_config, session.clone(), audio_filter, move || {
(backend)(device) (backend)(device)
}); });
let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer); let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer);
self.spirc = Some(spirc); self.spirc = Some(spirc);
self.spirc_task = Some(spirc_task); self.spirc_task = Some(spirc_task);
self.player_event_channel = Some(event_channel);
progress = true; progress = true;
} }
@ -348,6 +355,14 @@ impl Future for Main {
} }
} }
if let Some(ref mut player_event_channel) = self.player_event_channel {
if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() {
if let Some(ref program) = self.player_event_program {
run_program_on_events(event, program);
}
}
}
if !progress { if !progress {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }

View file

@ -1,6 +1,4 @@
use std::process::Command; use std::process::Command;
use std::sync::mpsc::{channel, Sender};
use std::thread;
use std::collections::HashMap; use std::collections::HashMap;
use librespot::playback::player::PlayerEvent; use librespot::playback::player::PlayerEvent;
@ -15,12 +13,9 @@ fn run_program(program: &str, env_vars: HashMap<&str, String>) {
info!("Exit status: {}", status); info!("Exit status: {}", status);
} }
pub fn run_program_on_events(onevent: String) -> Sender<PlayerEvent> { pub fn run_program_on_events(event: PlayerEvent, onevent: &str) {
let (sender, receiver) = channel();
thread::spawn(move || {
while let Ok(msg) = receiver.recv() {
let mut env_vars = HashMap::new(); let mut env_vars = HashMap::new();
match msg { match event {
PlayerEvent::Changed { old_track_id, new_track_id } => { PlayerEvent::Changed { old_track_id, new_track_id } => {
env_vars.insert("PLAYER_EVENT", "change".to_string()); env_vars.insert("PLAYER_EVENT", "change".to_string());
env_vars.insert("OLD_TRACK_ID", old_track_id.to_base16()); env_vars.insert("OLD_TRACK_ID", old_track_id.to_base16());
@ -35,8 +30,5 @@ pub fn run_program_on_events(onevent: String) -> Sender<PlayerEvent> {
env_vars.insert("TRACK_ID", track_id.to_base16()); env_vars.insert("TRACK_ID", track_id.to_base16());
} }
} }
run_program(&onevent, env_vars); run_program(onevent, env_vars);
}
});
sender
} }