Reorganize the spirc/player interaction.

Updates are now sent through a callback mechanism rather than a channel.
This commit is contained in:
Paul Lietar 2016-01-20 15:47:05 +00:00
parent 463ce97661
commit 8fed885595
3 changed files with 136 additions and 86 deletions

View file

@ -1,6 +1,6 @@
#![crate_name = "librespot"] #![crate_name = "librespot"]
#![feature(plugin,zero_one,iter_arith,mpsc_select)] #![feature(plugin,zero_one,iter_arith)]
#![plugin(protobuf_macros)] #![plugin(protobuf_macros)]
#![plugin(json_macros)] #![plugin(json_macros)]

View file

@ -1,6 +1,6 @@
use eventual::{self, Async}; use eventual::{self, Async};
use portaudio; use portaudio;
use std::sync::{mpsc, Mutex, Arc, Condvar, MutexGuard}; use std::sync::{mpsc, Mutex, Arc, MutexGuard};
use std::thread; use std::thread;
use vorbis; use vorbis;
@ -10,8 +10,11 @@ use audio_decrypt::AudioDecrypt;
use util::{self, SpotifyId, Subfile}; use util::{self, SpotifyId, Subfile};
use spirc::PlayStatus; use spirc::PlayStatus;
pub type PlayerObserver = Box<Fn(&PlayerState) + Send>;
pub struct Player { pub struct Player {
state: Arc<(Mutex<PlayerState>, Condvar)>, state: Arc<Mutex<PlayerState>>,
observers: Arc<Mutex<Vec<PlayerObserver>>>,
commands: mpsc::Sender<PlayerCommand>, commands: mpsc::Sender<PlayerCommand>,
} }
@ -27,7 +30,9 @@ pub struct PlayerState {
} }
struct PlayerInternal { struct PlayerInternal {
state: Arc<(Mutex<PlayerState>, Condvar)>, state: Arc<Mutex<PlayerState>>,
observers: Arc<Mutex<Vec<PlayerObserver>>>,
session: Session, session: Session,
commands: mpsc::Receiver<PlayerCommand>, commands: mpsc::Receiver<PlayerCommand>,
} }
@ -45,20 +50,22 @@ impl Player {
pub fn new(session: Session) -> Player { pub fn new(session: Session) -> Player {
let (cmd_tx, cmd_rx) = mpsc::channel(); let (cmd_tx, cmd_rx) = mpsc::channel();
let state = Arc::new((Mutex::new(PlayerState { let state = Arc::new(Mutex::new(PlayerState {
status: PlayStatus::kPlayStatusStop, status: PlayStatus::kPlayStatusStop,
position_ms: 0, position_ms: 0,
position_measured_at: 0, position_measured_at: 0,
update_time: util::now_ms(), update_time: util::now_ms(),
volume: 0x8000, volume: 0x8000,
end_of_track: false, end_of_track: false,
}), }));
Condvar::new()));
let observers = Arc::new(Mutex::new(Vec::new()));
let internal = PlayerInternal { let internal = PlayerInternal {
session: session, session: session,
commands: cmd_rx, commands: cmd_rx,
state: state.clone(), state: state.clone(),
observers: observers.clone(),
}; };
thread::spawn(move || internal.run()); thread::spawn(move || internal.run());
@ -66,6 +73,7 @@ impl Player {
Player { Player {
commands: cmd_tx, commands: cmd_tx,
state: state, state: state,
observers: observers,
} }
} }
@ -94,31 +102,15 @@ impl Player {
} }
pub fn state(&self) -> MutexGuard<PlayerState> { pub fn state(&self) -> MutexGuard<PlayerState> {
self.state.0.lock().unwrap() self.state.lock().unwrap()
} }
pub fn volume(&self, vol: u16) { pub fn volume(&self, vol: u16) {
self.command(PlayerCommand::Volume(vol)); self.command(PlayerCommand::Volume(vol));
} }
pub fn updates(&self) -> mpsc::Receiver<i64> { pub fn add_observer(&self, observer: PlayerObserver) {
let state = self.state.clone(); self.observers.lock().unwrap().push(observer);
let (update_tx, update_rx) = mpsc::channel();
thread::spawn(move || {
let mut guard = state.0.lock().unwrap();
let mut last_update;
loop {
last_update = guard.update_time;
update_tx.send(guard.update_time).unwrap();
while last_update >= guard.update_time {
guard = state.1.wait(guard).unwrap();
}
}
});
update_rx
} }
} }
@ -135,7 +127,7 @@ impl PlayerInternal {
let mut decoder = None; let mut decoder = None;
loop { loop {
let playing = self.state.0.lock().unwrap().status == PlayStatus::kPlayStatusPlay; let playing = self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay;
let cmd = if playing { let cmd = if playing {
self.commands.try_recv().ok() self.commands.try_recv().ok()
} else { } else {
@ -252,14 +244,14 @@ impl PlayerInternal {
None => (), None => (),
} }
if self.state.0.lock().unwrap().status == PlayStatus::kPlayStatusPlay { if self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay {
match decoder.as_mut().unwrap().packets().next() { match decoder.as_mut().unwrap().packets().next() {
Some(Ok(packet)) => { Some(Ok(packet)) => {
let buffer = packet.data let buffer = packet.data
.iter() .iter()
.map(|&x| { .map(|&x| {
(x as i32 (x as i32
* self.state.0.lock().unwrap().volume as i32 * self.state.lock().unwrap().volume as i32
/ 0xFFFF) as i16 / 0xFFFF) as i16
}) })
.collect::<Vec<i16>>(); .collect::<Vec<i16>>();
@ -307,11 +299,15 @@ impl PlayerInternal {
fn update<F>(&self, f: F) fn update<F>(&self, f: F)
where F: FnOnce(&mut MutexGuard<PlayerState>) -> bool where F: FnOnce(&mut MutexGuard<PlayerState>) -> bool
{ {
let mut guard = self.state.0.lock().unwrap(); let mut guard = self.state.lock().unwrap();
let update = f(&mut guard); let update = f(&mut guard);
let observers = self.observers.lock().unwrap();
if update { if update {
guard.update_time = util::now_ms(); guard.update_time = util::now_ms();
self.state.1.notify_all(); for observer in observers.iter() {
observer(&guard);
}
} }
} }
} }

View file

@ -6,16 +6,19 @@ use session::Session;
use util::SpotifyId; use util::SpotifyId;
use util::version::version_string; use util::version::version_string;
use mercury::{MercuryRequest, MercuryMethod}; use mercury::{MercuryRequest, MercuryMethod};
use player::Player; use player::{Player, PlayerState};
use std::sync::{Mutex, Arc};
use librespot_protocol as protocol; use librespot_protocol as protocol;
pub use librespot_protocol::spirc::PlayStatus; pub use librespot_protocol::spirc::PlayStatus;
pub struct SpircManager { pub struct SpircManager(Arc<Mutex<SpircInternal>>);
struct SpircInternal {
player: Player, player: Player,
session: Session, session: Session,
state_update_id: i64,
seq_nr: u32, seq_nr: u32,
name: String, name: String,
@ -41,11 +44,10 @@ impl SpircManager {
let ident = session.0.data.read().unwrap().device_id.clone(); let ident = session.0.data.read().unwrap().device_id.clone();
let name = session.0.config.device_name.clone(); let name = session.0.config.device_name.clone();
SpircManager { SpircManager(Arc::new(Mutex::new(SpircInternal {
player: player, player: player,
session: session, session: session,
state_update_id: 0,
seq_nr: 0, seq_nr: 0,
name: name, name: name,
@ -64,56 +66,64 @@ impl SpircManager {
tracks: Vec::new(), tracks: Vec::new(),
index: 0, index: 0,
} })))
} }
pub fn run(&mut self) { pub fn run(&mut self) {
let rx = self.session.mercury_sub(format!("hm://remote/user/{}/", let rx = {
self.session let mut internal = self.0.lock().unwrap();
.0
.data
.read()
.unwrap()
.canonical_username
.clone()));
let updates = self.player.updates();
self.notify(true, None); let rx = internal.session.mercury_sub(internal.uri());
loop { internal.notify(true, None);
select! {
pkt = rx.recv() => {
let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(
pkt.unwrap().payload.first().unwrap()).unwrap();
println!("{:?} {} {} {} {}", // Use a weak pointer to avoid creating an Rc cycle between the player and the
frame.get_typ(), // SpircManager
frame.get_device_state().get_name(), let _self = Arc::downgrade(&self.0);
frame.get_ident(), internal.player.add_observer(Box::new(move |state| {
frame.get_seq_nr(), if let Some(_self) = _self.upgrade() {
frame.get_state_update_id()); let mut internal = _self.lock().unwrap();
if frame.get_ident() != self.ident && internal.on_update(state);
(frame.get_recipient().len() == 0 ||
frame.get_recipient().contains(&self.ident)) {
self.handle(frame);
}
},
update_time = updates.recv() => {
let end_of_track = self.player.state().end_of_track();
if end_of_track {
self.index = (self.index + 1) % self.tracks.len() as u32;
let track = self.tracks[self.index as usize];
self.player.load(track, true, 0);
} else {
self.state_update_id = update_time.unwrap();
self.notify(false, None);
}
} }
} }));
rx
};
for pkt in rx {
let data = pkt.payload.first().unwrap();
let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(data).unwrap();
println!("{:?} {} {} {} {}",
frame.get_typ(),
frame.get_device_state().get_name(),
frame.get_ident(),
frame.get_seq_nr(),
frame.get_state_update_id());
self.0.lock().unwrap().handle(frame);
}
}
}
impl SpircInternal {
fn on_update(&mut self, player_state: &PlayerState) {
let end_of_track = player_state.end_of_track();
if end_of_track {
self.index = (self.index + 1) % self.tracks.len() as u32;
let track = self.tracks[self.index as usize];
self.player.load(track, true, 0);
} else {
self.notify_with_player_state(false, None, player_state);
} }
} }
fn handle(&mut self, frame: protocol::spirc::Frame) { fn handle(&mut self, frame: protocol::spirc::Frame) {
if frame.get_ident() == self.ident ||
(frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident)) {
return;
}
if frame.get_recipient().len() > 0 { if frame.get_recipient().len() > 0 {
self.last_command_ident = frame.get_ident().to_owned(); self.last_command_ident = frame.get_ident().to_owned();
self.last_command_msgid = frame.get_seq_nr(); self.last_command_msgid = frame.get_seq_nr();
@ -179,7 +189,12 @@ impl SpircManager {
.map(|track| SpotifyId::from_raw(track.get_gid())) .map(|track| SpotifyId::from_raw(track.get_gid()))
.collect(); .collect();
} }
// FIXME: this entire function is duplicated in notify_with_player_state, but the borrow
// checker makes it hard to refactor
fn notify(&mut self, hello: bool, recipient: Option<&str>) { fn notify(&mut self, hello: bool, recipient: Option<&str>) {
let player_state = self.player.state();
let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
version: 1, version: 1,
ident: self.ident.clone(), ident: self.ident.clone(),
@ -191,22 +206,21 @@ impl SpircManager {
protocol::spirc::MessageType::kMessageTypeNotify protocol::spirc::MessageType::kMessageTypeNotify
}, },
device_state: self.device_state(), device_state: self.device_state(&player_state),
recipient: protobuf::RepeatedField::from_vec( recipient: protobuf::RepeatedField::from_vec(
recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
), ),
state_update_id: self.state_update_id as i64 state_update_id: player_state.update_time() as i64
}); });
if self.is_active { if self.is_active {
pkt.set_state(self.spirc_state()); pkt.set_state(self.spirc_state(&player_state));
} }
self.session self.session
.mercury(MercuryRequest { .mercury(MercuryRequest {
method: MercuryMethod::SEND, method: MercuryMethod::SEND,
uri: format!("hm://remote/user/{}", uri: self.uri(),
self.session.0.data.read().unwrap().canonical_username.clone()),
content_type: None, content_type: None,
payload: vec![pkt.write_to_bytes().unwrap()], payload: vec![pkt.write_to_bytes().unwrap()],
}) })
@ -214,12 +228,47 @@ impl SpircManager {
.unwrap(); .unwrap();
} }
fn spirc_state(&self) -> protocol::spirc::State { fn notify_with_player_state(&mut self,
let state = self.player.state(); hello: bool,
let (position_ms, position_measured_at) = state.position(); recipient: Option<&str>,
player_state: &PlayerState) {
let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), {
version: 1,
ident: self.ident.clone(),
protocol_version: "2.0.0".to_owned(),
seq_nr: { self.seq_nr += 1; self.seq_nr },
typ: if hello {
protocol::spirc::MessageType::kMessageTypeHello
} else {
protocol::spirc::MessageType::kMessageTypeNotify
},
device_state: self.device_state(&player_state),
recipient: protobuf::RepeatedField::from_vec(
recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
),
state_update_id: player_state.update_time() as i64
});
if self.is_active {
pkt.set_state(self.spirc_state(&player_state));
}
self.session
.mercury(MercuryRequest {
method: MercuryMethod::SEND,
uri: self.uri(),
content_type: None,
payload: vec![pkt.write_to_bytes().unwrap()],
})
.fire();
}
fn spirc_state(&self, player_state: &PlayerState) -> protocol::spirc::State {
let (position_ms, position_measured_at) = player_state.position();
protobuf_init!(protocol::spirc::State::new(), { protobuf_init!(protocol::spirc::State::new(), {
status: state.status(), status: player_state.status(),
position_ms: position_ms, position_ms: position_ms,
position_measured_at: position_measured_at as u64, position_measured_at: position_measured_at as u64,
@ -240,12 +289,12 @@ impl SpircManager {
}) })
} }
fn device_state(&self) -> protocol::spirc::DeviceState { fn device_state(&self, player_state: &PlayerState) -> protocol::spirc::DeviceState {
protobuf_init!(protocol::spirc::DeviceState::new(), { protobuf_init!(protocol::spirc::DeviceState::new(), {
sw_version: version_string(), sw_version: version_string(),
is_active: self.is_active, is_active: self.is_active,
can_play: self.can_play, can_play: self.can_play,
volume: self.player.state().volume() as u32, volume: player_state.volume() as u32,
name: self.name.clone(), name: self.name.clone(),
error_code: 0, error_code: 0,
became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 }, became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 },
@ -299,4 +348,9 @@ impl SpircManager {
], ],
}) })
} }
fn uri(&self) -> String {
format!("hm://remote/user/{}",
self.session.0.data.read().unwrap().canonical_username.clone())
}
} }