Refactor player and spirc

This commit is contained in:
Paul Lietar 2017-01-29 14:11:20 +00:00
parent f3ba3f4bd4
commit 8aeb266a2d
2 changed files with 464 additions and 522 deletions

View file

@ -1,77 +1,41 @@
use futures::sync::oneshot;
use futures::{future, Future}; use futures::{future, Future};
use futures::sync::mpsc;
use std;
use std::borrow::Cow; use std::borrow::Cow;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
use std::sync::{Mutex, Arc, MutexGuard}; use std::mem;
use std::thread; use std::thread;
use vorbis; use std;
use vorbis::{self, VorbisError};
use audio_file::AudioFile;
use audio_decrypt::AudioDecrypt;
use audio_backend::Sink; use audio_backend::Sink;
use audio_decrypt::AudioDecrypt;
use audio_file::AudioFile;
use metadata::{FileFormat, Track}; use metadata::{FileFormat, Track};
use session::{Bitrate, Session}; use session::{Bitrate, Session};
use util::{self, SpotifyId, Subfile}; use util::{self, SpotifyId, Subfile};
pub use spirc::PlayStatus;
#[cfg(not(feature = "with-tremor"))]
fn vorbis_time_seek_ms<R>(decoder: &mut vorbis::Decoder<R>, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek {
decoder.time_seek(ms as f64 / 1000f64)
}
#[cfg(not(feature = "with-tremor"))]
fn vorbis_time_tell_ms<R>(decoder: &mut vorbis::Decoder<R>) -> Result<i64, vorbis::VorbisError> where R: Read + Seek {
decoder.time_tell().map(|t| (t * 1000f64) as i64)
}
#[cfg(feature = "with-tremor")]
fn vorbis_time_seek_ms<R>(decoder: &mut vorbis::Decoder<R>, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek {
decoder.time_seek(ms)
}
#[cfg(feature = "with-tremor")]
fn vorbis_time_tell_ms<R>(decoder: &mut vorbis::Decoder<R>) -> Result<i64, vorbis::VorbisError> where R: Read + Seek {
decoder.time_tell()
}
#[derive(Clone)] #[derive(Clone)]
pub struct Player { pub struct Player {
state: Arc<Mutex<PlayerState>>,
observers: Arc<Mutex<Vec<mpsc::UnboundedSender<PlayerState>>>>,
commands: std::sync::mpsc::Sender<PlayerCommand>, commands: std::sync::mpsc::Sender<PlayerCommand>,
} }
#[derive(Clone)]
pub struct PlayerState {
pub status: PlayStatus,
pub position_ms: u32,
pub position_measured_at: i64,
pub update_time: i64,
pub volume: u16,
pub track: Option<SpotifyId>,
pub end_of_track: bool,
}
struct PlayerInternal { struct PlayerInternal {
state: Arc<Mutex<PlayerState>>,
observers: Arc<Mutex<Vec<mpsc::UnboundedSender<PlayerState>>>>,
session: Session, session: Session,
commands: std::sync::mpsc::Receiver<PlayerCommand>, commands: std::sync::mpsc::Receiver<PlayerCommand>,
state: PlayerState,
volume: u16,
sink: Box<Sink>,
} }
#[derive(Debug)] //#[derive(Debug)]
enum PlayerCommand { enum PlayerCommand {
Load(SpotifyId, bool, u32), Load(SpotifyId, bool, u32, oneshot::Sender<()>),
Play, Play,
Pause, Pause,
Volume(u16), Volume(u16),
Stop, Stop,
Seek(u32), Seek(u32),
SeekAt(u32, i64),
} }
impl Player { impl Player {
@ -79,31 +43,21 @@ impl Player {
where F: FnOnce() -> Box<Sink> + Send + 'static { where F: FnOnce() -> Box<Sink> + Send + 'static {
let (cmd_tx, cmd_rx) = std::sync::mpsc::channel(); let (cmd_tx, cmd_rx) = std::sync::mpsc::channel();
let state = Arc::new(Mutex::new(PlayerState { thread::spawn(move || {
status: PlayStatus::kPlayStatusStop, let internal = PlayerInternal {
position_ms: 0, session: session,
position_measured_at: 0, commands: cmd_rx,
update_time: util::now_ms(),
volume: 0xFFFF,
track: None,
end_of_track: false,
}));
let observers = Arc::new(Mutex::new(Vec::new())); state: PlayerState::Stopped,
volume: 0xFFFF,
sink: sink_builder(),
};
let internal = PlayerInternal { internal.run();
session: session, });
commands: cmd_rx,
state: state.clone(),
observers: observers.clone(),
};
thread::spawn(move || internal.run(sink_builder()));
Player { Player {
commands: cmd_tx, commands: cmd_tx,
state: state,
observers: observers,
} }
} }
@ -111,8 +65,13 @@ impl Player {
self.commands.send(cmd).unwrap(); self.commands.send(cmd).unwrap();
} }
pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32) { pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32)
self.command(PlayerCommand::Load(track, start_playing, position_ms)); -> oneshot::Receiver<()>
{
let (tx, rx) = oneshot::channel();
self.command(PlayerCommand::Load(track, start_playing, position_ms, tx));
rx
} }
pub fn play(&self) { pub fn play(&self) {
@ -131,322 +90,310 @@ impl Player {
self.command(PlayerCommand::Seek(position_ms)); self.command(PlayerCommand::Seek(position_ms));
} }
pub fn seek_at(&self, position_ms: u32, measured_at: i64) {
self.command(PlayerCommand::SeekAt(position_ms, measured_at));
}
pub fn state(&self) -> PlayerState {
self.state.lock().unwrap().clone()
}
pub fn volume(&self, vol: u16) { pub fn volume(&self, vol: u16) {
self.command(PlayerCommand::Volume(vol)); self.command(PlayerCommand::Volume(vol));
} }
pub fn observe(&self) -> mpsc::UnboundedReceiver<PlayerState> {
let (tx, rx) = mpsc::unbounded();
self.observers.lock().unwrap().push(tx);
rx
}
} }
fn apply_volume(volume: u16, data: &[i16]) -> Cow<[i16]> { type Decoder = vorbis::Decoder<Subfile<AudioDecrypt<AudioFile>>>;
// Fast path when volume is 100% enum PlayerState {
if volume == 0xFFFF { Stopped,
Cow::Borrowed(data) Paused {
} else { decoder: Decoder,
Cow::Owned(data.iter() end_of_track: oneshot::Sender<()>,
.map(|&x| { },
(x as i32 Playing {
* volume as i32 decoder: Decoder,
/ 0xFFFF) as i16 end_of_track: oneshot::Sender<()>,
}) },
.collect())
} Invalid,
} }
fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option<Cow<'a, Track>> { impl PlayerState {
if track.available { fn is_playing(&self) -> bool {
Some(Cow::Borrowed(track)) use self::PlayerState::*;
} else { match *self {
let alternatives = track.alternatives Stopped | Paused { .. } => false,
.iter() Playing { .. } => true,
.map(|alt_id| { Invalid => panic!("invalid state"),
session.metadata().get::<Track>(*alt_id)
});
let alternatives = future::join_all(alternatives).wait().unwrap();
alternatives.into_iter().find(|alt| alt.available).map(Cow::Owned)
}
}
fn load_track(session: &Session, track_id: SpotifyId)
-> Option<vorbis::Decoder<Subfile<AudioDecrypt<AudioFile>>>>
{
let track = session.metadata().get::<Track>(track_id).wait().unwrap();
info!("Loading track \"{}\"", track.name);
let track = match find_available_alternative(session, &track) {
Some(track) => track,
None => {
warn!("Track \"{}\" is not available", track.name);
return None;
} }
}; }
let format = match session.config().bitrate { fn decoder(&mut self) -> Option<&mut Decoder> {
Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96, use self::PlayerState::*;
Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160, match *self {
Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320, Stopped => None,
}; Paused { ref mut decoder, .. } |
Playing { ref mut decoder, .. } => Some(decoder),
Invalid => panic!("invalid state"),
let file_id = match track.files.get(&format) {
Some(&file_id) => file_id,
None => {
warn!("Track \"{}\" is not available in format {:?}", track.name, format);
return None;
} }
}; }
let key = session.audio_key().request(track.id, file_id).wait().unwrap(); fn signal_end_of_track(self) {
use self::PlayerState::*;
match self {
Paused { end_of_track, .. } |
Playing { end_of_track, .. } => {
end_of_track.complete(())
}
let (open, _) = session.audio_file().open(file_id); Stopped => warn!("signal_end_of_track from stopped state"),
let encrypted_file = open.wait().unwrap(); Invalid => panic!("invalid state"),
}
}
let audio_file = Subfile::new(AudioDecrypt::new(key, encrypted_file), 0xa7); fn paused_to_playing(&mut self) {
let decoder = vorbis::Decoder::new(audio_file).unwrap(); use self::PlayerState::*;
match ::std::mem::replace(self, Invalid) {
Paused { decoder, end_of_track } => {
*self = Playing {
decoder: decoder,
end_of_track: end_of_track,
};
}
_ => panic!("invalid state"),
}
}
Some(decoder) fn playing_to_paused(&mut self) {
} use self::PlayerState::*;
match ::std::mem::replace(self, Invalid) {
fn run_onstart(session: &Session) { Playing { decoder, end_of_track } => {
match session.config().onstart { *self = Paused {
Some(ref program) => util::run_program(program), decoder: decoder,
None => {}, end_of_track: end_of_track,
}; };
} }
_ => panic!("invalid state"),
fn run_onstop(session: &Session) { }
match session.config().onstop { }
Some(ref program) => util::run_program(program),
None => {},
};
} }
impl PlayerInternal { impl PlayerInternal {
fn run(self, mut sink: Box<Sink>) { fn run(mut self) {
let mut decoder = None;
loop { loop {
let playing = self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay; let cmd = if self.state.is_playing() {
let cmd = if playing {
self.commands.try_recv().ok() self.commands.try_recv().ok()
} else { } else {
Some(self.commands.recv().unwrap()) Some(self.commands.recv().unwrap())
}; };
match cmd { if let Some(cmd) = cmd {
Some(PlayerCommand::Load(track_id, play, position)) => { self.handle_command(cmd);
self.update(|state| { }
if state.status == PlayStatus::kPlayStatusPlay {
sink.stop().unwrap();
run_onstop(&self.session);
}
state.end_of_track = false;
state.status = PlayStatus::kPlayStatusPause;
state.position_ms = position;
state.position_measured_at = util::now_ms();
state.track = Some(track_id);
true
});
drop(decoder);
decoder = match load_track(&self.session, track_id) { let packet = if let PlayerState::Playing { ref mut decoder, .. } = self.state {
Some(mut decoder) => { Some(decoder.packets().next())
match vorbis_time_seek_ms(&mut decoder, position as i64) { } else { None };
Ok(_) => (),
Err(err) => error!("Vorbis error: {:?}", err), if let Some(packet) = packet {
self.handle_packet(packet);
}
}
}
fn handle_packet(&mut self, packet: Option<Result<vorbis::Packet, VorbisError>>) {
match packet {
Some(Ok(mut packet)) => {
if self.volume < 0xFFFF {
for x in packet.data.iter_mut() {
*x = (*x as i32 * self.volume as i32 / 0xFFFF) as i16;
}
}
self.sink.write(&packet.data).unwrap();
}
Some(Err(vorbis::VorbisError::Hole)) => (),
Some(Err(e)) => panic!("Vorbis error {:?}", e),
None => {
self.sink.stop().unwrap();
self.run_onstop();
let old_state = mem::replace(&mut self.state, PlayerState::Stopped);
old_state.signal_end_of_track();
}
}
}
fn handle_command(&mut self, cmd: PlayerCommand) {
//debug!("command={:?}", cmd);
match cmd {
PlayerCommand::Load(track_id, play, position, end_of_track) => {
if self.state.is_playing() {
self.sink.stop().unwrap();
}
match self.load_track(track_id, position as i64) {
Some(decoder) => {
if play {
if !self.state.is_playing() {
self.run_onstart();
}
self.sink.start().unwrap();
self.state = PlayerState::Playing {
decoder: decoder,
end_of_track: end_of_track,
};
} else {
if self.state.is_playing() {
self.run_onstop();
} }
self.update(|state| { self.state = PlayerState::Paused {
state.status = if play { decoder: decoder,
run_onstart(&self.session); end_of_track: end_of_track,
sink.start().unwrap(); };
PlayStatus::kPlayStatusPlay
} else {
PlayStatus::kPlayStatusPause
};
state.position_ms = position;
state.position_measured_at = util::now_ms();
true
});
info!("Load Done");
Some(decoder)
}
None => {
self.update(|state| {
state.status = PlayStatus::kPlayStatusStop;
state.end_of_track = true;
true
});
None
} }
} }
}
Some(PlayerCommand::Seek(position)) => {
match vorbis_time_seek_ms(decoder.as_mut().unwrap(), position as i64) {
Ok(_) => (),
Err(err) => error!("Vorbis error: {:?}", err),
}
self.update(|state| {
state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
state.position_measured_at = util::now_ms();
true
});
}
Some(PlayerCommand::SeekAt(position, measured_at)) => {
let position = (util::now_ms() - measured_at + position as i64) as u32;
match vorbis_time_seek_ms(decoder.as_mut().unwrap(), position as i64) {
Ok(_) => (),
Err(err) => error!("Vorbis error: {:?}", err),
}
self.update(|state| {
state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
state.position_measured_at = util::now_ms();
true
});
}
Some(PlayerCommand::Play) => {
self.update(|state| {
state.status = PlayStatus::kPlayStatusPlay;
state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
state.position_measured_at = util::now_ms();
true
});
run_onstart(&self.session);
sink.start().unwrap();
}
Some(PlayerCommand::Pause) => {
self.update(|state| {
state.status = PlayStatus::kPlayStatusPause;
state.update_time = util::now_ms();
state.position_ms = decoder.as_mut().map(|d| vorbis_time_tell_ms(d).unwrap()).unwrap_or(0) as u32;
state.position_measured_at = util::now_ms();
true
});
sink.stop().unwrap();
run_onstop(&self.session);
}
Some(PlayerCommand::Volume(vol)) => {
self.update(|state| {
state.volume = vol;
false
});
}
Some(PlayerCommand::Stop) => {
self.update(|state| {
if state.status == PlayStatus::kPlayStatusPlay {
state.status = PlayStatus::kPlayStatusPause;
}
state.position_ms = 0;
state.position_measured_at = util::now_ms();
true
});
sink.stop().unwrap();
run_onstop(&self.session);
decoder = None;
}
None => (),
}
if self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay {
let packet = decoder.as_mut().unwrap().packets().next();
match packet {
Some(Ok(packet)) => {
let buffer = apply_volume(self.state.lock().unwrap().volume,
&packet.data);
sink.write(&buffer).unwrap();
self.update(|state| {
state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32;
state.position_measured_at = util::now_ms();
false
});
}
Some(Err(vorbis::VorbisError::Hole)) => (),
Some(Err(e)) => panic!("Vorbis error {:?}", e),
None => { None => {
self.update(|state| { if self.state.is_playing() {
state.status = PlayStatus::kPlayStatusStop; self.run_onstop();
state.end_of_track = true; }
true
});
sink.stop().unwrap();
run_onstop(&self.session);
decoder = None;
} }
} }
} }
}
}
fn update<F>(&self, f: F) PlayerCommand::Seek(position) => {
where F: FnOnce(&mut MutexGuard<PlayerState>) -> bool if let Some(decoder) = self.state.decoder() {
{ match vorbis_time_seek_ms(decoder, position as i64) {
let mut guard = self.state.lock().unwrap(); Ok(_) => (),
let update = f(&mut guard); Err(err) => error!("Vorbis error: {:?}", err),
}
} else {
warn!("Player::seek called from invalid state");
}
}
if update { PlayerCommand::Play => {
let observers = self.observers.lock().unwrap(); if let PlayerState::Paused { .. } = self.state {
self.state.paused_to_playing();
guard.update_time = util::now_ms(); self.run_onstart();
let state = guard.clone(); self.sink.start().unwrap();
drop(guard); } else {
warn!("Player::play called from invalid state");
}
}
for observer in observers.iter() { PlayerCommand::Pause => {
observer.send(state.clone()).unwrap(); if let PlayerState::Playing { .. } = self.state {
self.state.playing_to_paused();
self.sink.stop().unwrap();
self.run_onstop();
} else {
warn!("Player::pause called from invalid state");
}
}
PlayerCommand::Stop => {
match self.state {
PlayerState::Playing { .. } => {
self.sink.stop().unwrap();
self.run_onstop();
self.state = PlayerState::Stopped;
}
PlayerState::Paused { .. } => {
self.state = PlayerState::Stopped;
},
PlayerState::Stopped => {
warn!("Player::stop called from invalid state");
}
PlayerState::Invalid => panic!("invalid state"),
}
}
PlayerCommand::Volume(vol) => {
self.volume = vol;
} }
} }
} }
}
impl PlayerState { fn run_onstart(&self) {
pub fn status(&self) -> PlayStatus { match self.session.config().onstart {
self.status Some(ref program) => util::run_program(program),
None => {},
};
} }
pub fn position(&self) -> (u32, i64) { fn run_onstop(&self) {
(self.position_ms, self.position_measured_at) match self.session.config().onstop {
Some(ref program) => util::run_program(program),
None => {},
};
} }
pub fn volume(&self) -> u16 { fn find_available_alternative<'a>(&self, track: &'a Track) -> Option<Cow<'a, Track>> {
self.volume if track.available {
Some(Cow::Borrowed(track))
} else {
let alternatives = track.alternatives
.iter()
.map(|alt_id| {
self.session.metadata().get::<Track>(*alt_id)
});
let alternatives = future::join_all(alternatives).wait().unwrap();
alternatives.into_iter().find(|alt| alt.available).map(Cow::Owned)
}
} }
pub fn update_time(&self) -> i64 { fn load_track(&self, track_id: SpotifyId, position: i64) -> Option<Decoder> {
self.update_time let track = self.session.metadata().get::<Track>(track_id).wait().unwrap();
}
pub fn end_of_track(&self) -> bool { info!("Loading track \"{}\"", track.name);
self.end_of_track
let track = match self.find_available_alternative(&track) {
Some(track) => track,
None => {
warn!("Track \"{}\" is not available", track.name);
return None;
}
};
let format = match self.session.config().bitrate {
Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96,
Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160,
Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320,
};
let file_id = match track.files.get(&format) {
Some(&file_id) => file_id,
None => {
warn!("Track \"{}\" is not available in format {:?}", track.name, format);
return None;
}
};
let key = self.session.audio_key().request(track.id, file_id).wait().unwrap();
let (open, _) = self.session.audio_file().open(file_id);
let encrypted_file = open.wait().unwrap();
let audio_file = Subfile::new(AudioDecrypt::new(key, encrypted_file), 0xa7);
let mut decoder = vorbis::Decoder::new(audio_file).unwrap();
match vorbis_time_seek_ms(&mut decoder, position) {
Ok(_) => (),
Err(err) => error!("Vorbis error: {:?}", err),
}
info!("Track \"{}\" loaded", track.name);
Some(decoder)
} }
} }
#[cfg(not(feature = "with-tremor"))]
fn vorbis_time_seek_ms<R>(decoder: &mut vorbis::Decoder<R>, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek {
decoder.time_seek(ms as f64 / 1000f64)
}
#[cfg(feature = "with-tremor")]
fn vorbis_time_seek_ms<R>(decoder: &mut vorbis::Decoder<R>, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek {
decoder.time_seek(ms)
}

View file

@ -1,19 +1,18 @@
use protobuf::{self, Message, RepeatedField}; use futures::future;
use std::borrow::Cow;
use futures::{Future, Stream, Sink, Async, Poll};
use futures::stream::BoxStream;
use futures::sink::BoxSink; use futures::sink::BoxSink;
use futures::sync::mpsc; use futures::stream::BoxStream;
use futures::sync::{oneshot, mpsc};
use futures::{Future, Stream, Sink, Async, Poll, BoxFuture};
use protobuf::{self, Message};
use mercury::MercuryError; use mercury::MercuryError;
use player::{Player, PlayerState}; use player::Player;
use session::Session; use session::Session;
use util::{now_ms, SpotifyId, SeqGenerator}; use util::{now_ms, SpotifyId, SeqGenerator};
use version; use version;
use protocol; use protocol;
pub use protocol::spirc::PlayStatus; use protocol::spirc::{PlayStatus, State, MessageType, Frame, DeviceState};
use protocol::spirc::{MessageType, Frame, DeviceState};
pub struct SpircTask { pub struct SpircTask {
player: Player, player: Player,
@ -22,21 +21,12 @@ pub struct SpircTask {
ident: String, ident: String,
device: DeviceState, device: DeviceState,
state: State,
repeat: bool,
shuffle: bool,
last_command_ident: String,
last_command_msgid: u32,
tracks: Vec<SpotifyId>,
index: u32,
subscription: BoxStream<Frame, MercuryError>, subscription: BoxStream<Frame, MercuryError>,
sender: BoxSink<Frame, MercuryError>, sender: BoxSink<Frame, MercuryError>,
updates: mpsc::UnboundedReceiver<PlayerState>,
commands: mpsc::UnboundedReceiver<SpircCommand>, commands: mpsc::UnboundedReceiver<SpircCommand>,
end_of_track: BoxFuture<(), oneshot::Canceled>,
shutdown: bool, shutdown: bool,
} }
@ -49,6 +39,17 @@ pub struct Spirc {
commands: mpsc::UnboundedSender<SpircCommand>, commands: mpsc::UnboundedSender<SpircCommand>,
} }
fn initial_state() -> State {
protobuf_init!(protocol::spirc::State::new(), {
repeat: false,
shuffle: false,
status: PlayStatus::kPlayStatusStop,
position_ms: 0,
position_measured_at: 0,
})
}
fn initial_device_state(name: String, volume: u16) -> DeviceState { fn initial_device_state(name: String, volume: u16) -> DeviceState {
protobuf_init!(DeviceState::new(), { protobuf_init!(DeviceState::new(), {
sw_version: version::version_string(), sw_version: version::version_string(),
@ -56,12 +57,10 @@ fn initial_device_state(name: String, volume: u16) -> DeviceState {
can_play: true, can_play: true,
volume: volume as u32, volume: volume as u32,
name: name, name: name,
error_code: 0,
became_active_at: 0,
capabilities => [ capabilities => [
@{ @{
typ: protocol::spirc::CapabilityType::kCanBePlayer, typ: protocol::spirc::CapabilityType::kCanBePlayer,
intValue => [0] intValue => [1]
}, },
@{ @{
typ: protocol::spirc::CapabilityType::kDeviceType, typ: protocol::spirc::CapabilityType::kDeviceType,
@ -127,8 +126,6 @@ impl Spirc {
Ok(frame.write_to_bytes().unwrap()) Ok(frame.write_to_bytes().unwrap())
})); }));
let updates = player.observe();
let (cmd_tx, cmd_rx) = mpsc::unbounded(); let (cmd_tx, cmd_rx) = mpsc::unbounded();
let volume = 0xFFFF; let volume = 0xFFFF;
@ -141,21 +138,14 @@ impl Spirc {
sequence: SeqGenerator::new(1), sequence: SeqGenerator::new(1),
ident: ident, ident: ident,
device: device, device: device,
state: initial_state(),
repeat: false,
shuffle: false,
last_command_ident: String::new(),
last_command_msgid: 0,
tracks: Vec::new(),
index: 0,
subscription: subscription, subscription: subscription,
sender: sender, sender: sender,
updates: updates,
commands: cmd_rx, commands: cmd_rx,
end_of_track: future::empty().boxed(),
shutdown: false, shutdown: false,
}; };
@ -164,7 +154,7 @@ impl Spirc {
commands: cmd_tx, commands: cmd_tx,
}; };
task.notify(true, None); task.hello();
(spirc, task) (spirc, task)
} }
@ -192,15 +182,6 @@ impl Future for SpircTask {
Async::NotReady => (), Async::NotReady => (),
} }
match self.updates.poll().unwrap() {
Async::Ready(Some(state)) => {
progress = true;
self.handle_update(state);
}
Async::Ready(None) => panic!("player terminated"),
Async::NotReady => (),
}
match self.commands.poll().unwrap() { match self.commands.poll().unwrap() {
Async::Ready(Some(command)) => { Async::Ready(Some(command)) => {
progress = true; progress = true;
@ -209,6 +190,17 @@ impl Future for SpircTask {
Async::Ready(None) => (), Async::Ready(None) => (),
Async::NotReady => (), Async::NotReady => (),
} }
match self.end_of_track.poll() {
Ok(Async::Ready(())) => {
progress = true;
self.handle_end_of_track();
}
Ok(Async::NotReady) => (),
Err(oneshot::Canceled) => {
self.end_of_track = future::empty().boxed()
}
}
} }
let poll_sender = self.sender.poll_complete().unwrap(); let poll_sender = self.sender.poll_complete().unwrap();
@ -219,7 +211,6 @@ impl Future for SpircTask {
} }
if !progress { if !progress {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
} }
@ -227,24 +218,12 @@ impl Future for SpircTask {
} }
impl SpircTask { impl SpircTask {
fn handle_update(&mut self, player_state: PlayerState) {
let end_of_track = player_state.end_of_track();
if end_of_track {
self.index = (self.index + 1) % self.tracks.len() as u32;
let track = self.tracks[self.index as usize];
self.player.load(track, true, 0);
} else {
self.notify_with_player_state(false, None, &player_state);
}
}
fn handle_command(&mut self, cmd: SpircCommand) { fn handle_command(&mut self, cmd: SpircCommand) {
match cmd { match cmd {
SpircCommand::Shutdown => { SpircCommand::Shutdown => {
CommandSender::new(self, MessageType::kMessageTypeGoodbye).send(); CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
self.shutdown = true; self.shutdown = true;
self.commands.close(); self.commands.close();
self.updates.close();
} }
} }
} }
@ -262,142 +241,188 @@ impl SpircTask {
return; return;
} }
if frame.get_recipient().len() > 0 {
self.last_command_ident = frame.get_ident().to_owned();
self.last_command_msgid = frame.get_seq_nr();
}
match frame.get_typ() { match frame.get_typ() {
MessageType::kMessageTypeHello => { MessageType::kMessageTypeHello => {
self.notify(false, Some(frame.get_ident())); self.notify(Some(frame.get_ident()));
} }
MessageType::kMessageTypeLoad => { MessageType::kMessageTypeLoad => {
if !self.device.get_is_active() { if !self.device.get_is_active() {
self.device.set_is_active(true); self.device.set_is_active(true);
self.device.set_became_active_at(now_ms()); self.device.set_became_active_at(now_ms());
} }
self.reload_tracks(&frame); self.update_tracks(&frame);
if self.tracks.len() > 0 {
if self.state.get_track().len() > 0 {
self.state.set_position_ms(frame.get_state().get_position_ms());
self.state.set_position_measured_at(now_ms() as u64);
let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay; let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
let track = self.tracks[self.index as usize]; self.load_track(play);
let position = frame.get_state().get_position_ms();
self.player.load(track, play, position);
} else { } else {
self.notify(false, Some(frame.get_ident())); self.state.set_status(PlayStatus::kPlayStatusStop);
} }
self.notify(None);
} }
MessageType::kMessageTypePlay => { MessageType::kMessageTypePlay => {
self.player.play(); if self.state.get_status() == PlayStatus::kPlayStatusPause {
self.player.play();
self.state.set_status(PlayStatus::kPlayStatusPlay);
self.state.set_position_measured_at(now_ms() as u64);
}
self.notify(None);
} }
MessageType::kMessageTypePause => { MessageType::kMessageTypePause => {
self.player.pause(); if self.state.get_status() == PlayStatus::kPlayStatusPlay {
self.player.pause();
self.state.set_status(PlayStatus::kPlayStatusPause);
let now = now_ms() as u64;
let position = self.state.get_position_ms();
let diff = now - self.state.get_position_measured_at();
self.state.set_position_ms(position + diff as u32);
self.state.set_position_measured_at(now);
}
self.notify(None);
} }
MessageType::kMessageTypeNext => { MessageType::kMessageTypeNext => {
self.index = (self.index + 1) % self.tracks.len() as u32; let current_index = self.state.get_playing_track_index();
let track = self.tracks[self.index as usize]; let new_index = (current_index + 1) % (self.state.get_track().len() as u32);
self.player.load(track, true, 0);
self.state.set_playing_track_index(new_index);
self.state.set_position_ms(0);
self.state.set_position_measured_at(now_ms() as u64);
self.load_track(true);
self.notify(None);
} }
MessageType::kMessageTypePrev => { MessageType::kMessageTypePrev => {
self.index = (self.index - 1) % self.tracks.len() as u32; // Previous behaves differently based on the position
let track = self.tracks[self.index as usize]; // Under 3s it goes to the previous song
self.player.load(track, true, 0); // Over 3s it seeks to zero
if self.position() < 3000 {
let current_index = self.state.get_playing_track_index();
let new_index = (current_index - 1) % (self.state.get_track().len() as u32);
self.state.set_playing_track_index(new_index);
self.state.set_position_ms(0);
self.state.set_position_measured_at(now_ms() as u64);
self.load_track(true);
} else {
self.state.set_position_ms(0);
self.state.set_position_measured_at(now_ms() as u64);
self.player.seek(0);
}
self.notify(None);
} }
MessageType::kMessageTypeSeek => { MessageType::kMessageTypeSeek => {
self.player.seek(frame.get_position()); let position = frame.get_position();
self.state.set_position_ms(position);
self.state.set_position_measured_at(now_ms() as u64);
self.player.seek(position);
self.notify(None);
} }
MessageType::kMessageTypeReplace => { MessageType::kMessageTypeReplace => {
self.reload_tracks(&frame); self.update_tracks(&frame);
self.notify(None);
} }
MessageType::kMessageTypeVolume => {
let volume = frame.get_volume();
self.device.set_volume(volume);
self.player.volume(volume as u16);
self.notify(None);
}
MessageType::kMessageTypeNotify => { MessageType::kMessageTypeNotify => {
if self.device.get_is_active() && frame.get_device_state().get_is_active() { if self.device.get_is_active() &&
frame.get_device_state().get_is_active()
{
self.device.set_is_active(false); self.device.set_is_active(false);
self.state.set_status(PlayStatus::kPlayStatusStop);
self.player.stop(); self.player.stop();
} }
} }
MessageType::kMessageTypeVolume => {
let volume = frame.get_volume();
self.player.volume(volume as u16);
self.device.set_volume(volume);
self.notify(false, None);
}
MessageType::kMessageTypeGoodbye => (),
_ => (), _ => (),
} }
} }
fn reload_tracks(&mut self, ref frame: &protocol::spirc::Frame) { fn handle_end_of_track(&mut self) {
self.index = frame.get_state().get_playing_track_index(); let current_index = self.state.get_playing_track_index();
self.tracks = frame.get_state() let new_index = (current_index + 1) % (self.state.get_track().len() as u32);
.get_track()
.iter() self.state.set_playing_track_index(new_index);
.filter(|track| track.has_gid()) self.state.set_position_ms(0);
.map(|track| SpotifyId::from_raw(track.get_gid())) self.state.set_position_measured_at(now_ms() as u64);
.collect();
self.load_track(true);
self.notify(None);
} }
fn notify(&mut self, hello: bool, recipient: Option<&str>) { fn position(&mut self) -> u32 {
let mut cs = CommandSender::new(self, let diff = now_ms() as u64 - self.state.get_position_measured_at();
if hello { self.state.get_position_ms() + diff as u32
MessageType::kMessageTypeHello }
} else {
MessageType::kMessageTypeNotify fn update_tracks(&mut self, ref frame: &protocol::spirc::Frame) {
}); let index = frame.get_state().get_playing_track_index();
let tracks = frame.get_state().get_track();
self.state.set_playing_track_index(index);
self.state.set_track(tracks.into_iter().map(Clone::clone).collect());
}
fn load_track(&mut self, play: bool) {
let index = self.state.get_playing_track_index();
let track = {
let gid = self.state.get_track()[index as usize].get_gid();
SpotifyId::from_raw(gid)
};
let position = self.state.get_position_ms();
let end_of_track = self.player.load(track, play, position);
self.state.set_status(PlayStatus::kPlayStatusPlay);
self.end_of_track = end_of_track.boxed();
}
fn hello(&mut self) {
CommandSender::new(self, MessageType::kMessageTypeHello).send();
}
fn notify(&mut self, recipient: Option<&str>) {
let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
if let Some(s) = recipient { if let Some(s) = recipient {
cs = cs.recipient(&s); cs = cs.recipient(&s);
} }
cs.send(); cs.send();
} }
fn notify_with_player_state(&mut self, fn spirc_state(&self) -> protocol::spirc::State {
hello: bool, self.state.clone()
recipient: Option<&str>,
player_state: &PlayerState) {
let mut cs = CommandSender::new(self,
if hello {
MessageType::kMessageTypeHello
} else {
MessageType::kMessageTypeNotify
})
.player_state(player_state);
if let Some(s) = recipient {
cs = cs.recipient(&s);
}
cs.send();
}
fn spirc_state(&self, player_state: &PlayerState) -> protocol::spirc::State {
let (position_ms, position_measured_at) = player_state.position();
protobuf_init!(protocol::spirc::State::new(), {
status: player_state.status(),
position_ms: position_ms,
position_measured_at: position_measured_at as u64,
playing_track_index: self.index,
track: self.tracks.iter().map(|track| {
protobuf_init!(protocol::spirc::TrackRef::new(), {
gid: track.to_raw().to_vec()
})
}).collect(),
shuffle: self.shuffle,
repeat: self.repeat,
playing_from_fallback: true,
last_command_ident: self.last_command_ident.clone(),
last_command_msgid: self.last_command_msgid
})
} }
} }
struct CommandSender<'a> { struct CommandSender<'a> {
spirc: &'a mut SpircTask, spirc: &'a mut SpircTask,
cmd: MessageType, cmd: MessageType,
recipient: Option<&'a str>, recipient: Option<String>,
player_state: Option<&'a PlayerState>,
state: Option<protocol::spirc::State>,
} }
impl<'a> CommandSender<'a> { impl<'a> CommandSender<'a> {
@ -406,64 +431,34 @@ impl<'a> CommandSender<'a> {
spirc: spirc, spirc: spirc,
cmd: cmd, cmd: cmd,
recipient: None, recipient: None,
player_state: None,
state: None,
} }
} }
fn recipient(mut self, r: &'a str) -> CommandSender { fn recipient(mut self, r: &str) -> CommandSender<'a> {
self.recipient = Some(r); self.recipient = Some(r.to_owned());
self
}
fn player_state(mut self, s: &'a PlayerState) -> CommandSender {
self.player_state = Some(s);
self
}
#[allow(dead_code)]
fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> {
self.state = Some(s);
self self
} }
fn send(self) { fn send(self) {
let state = self.player_state.map_or_else(|| {
Cow::Owned(self.spirc.player.state())
}, |s| {
Cow::Borrowed(s)
});
let mut frame = protobuf_init!(Frame::new(), { let mut frame = protobuf_init!(Frame::new(), {
version: 1, version: 1,
ident: self.spirc.ident.clone(), ident: self.spirc.ident.clone(),
protocol_version: "2.0.0", protocol_version: "2.0.0",
seq_nr: self.spirc.sequence.get(), seq_nr: self.spirc.sequence.get(),
typ: self.cmd, typ: self.cmd,
recipient: RepeatedField::from_vec(
self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![])
),
device_state: self.spirc.device.clone(), device_state: self.spirc.device.clone(),
state_update_id: state.update_time() state_update_id: now_ms(),
}); });
if let Some(recipient) = self.recipient {
frame.mut_recipient().push(recipient.to_owned());
}
if self.spirc.device.get_is_active() { if self.spirc.device.get_is_active() {
frame.set_state(self.spirc.spirc_state(&state)); frame.set_state(self.spirc.spirc_state());
} }
let ready = self.spirc.sender.start_send(frame).unwrap().is_ready(); let ready = self.spirc.sender.start_send(frame).unwrap().is_ready();
assert!(ready); assert!(ready);
} }
} }
#[allow(dead_code)]
fn track_ids_to_state<I: Iterator<Item = SpotifyId>>(track_ids: I) -> protocol::spirc::State {
let tracks: Vec<protocol::spirc::TrackRef> =
track_ids.map(|i| {
protobuf_init!(protocol::spirc::TrackRef::new(), { gid: i.to_raw().to_vec()})
})
.collect();
protobuf_init!(protocol::spirc::State::new(), {
track: RepeatedField::from_vec(tracks)
})
}