diff --git a/src/player.rs b/src/player.rs index 132e7859..9c7cce61 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,77 +1,41 @@ +use futures::sync::oneshot; use futures::{future, Future}; -use futures::sync::mpsc; -use std; use std::borrow::Cow; use std::io::{Read, Seek}; -use std::sync::{Mutex, Arc, MutexGuard}; +use std::mem; use std::thread; -use vorbis; +use std; +use vorbis::{self, VorbisError}; -use audio_file::AudioFile; -use audio_decrypt::AudioDecrypt; use audio_backend::Sink; +use audio_decrypt::AudioDecrypt; +use audio_file::AudioFile; use metadata::{FileFormat, Track}; use session::{Bitrate, Session}; use util::{self, SpotifyId, Subfile}; -pub use spirc::PlayStatus; - -#[cfg(not(feature = "with-tremor"))] -fn vorbis_time_seek_ms(decoder: &mut vorbis::Decoder, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek { - decoder.time_seek(ms as f64 / 1000f64) -} - -#[cfg(not(feature = "with-tremor"))] -fn vorbis_time_tell_ms(decoder: &mut vorbis::Decoder) -> Result where R: Read + Seek { - decoder.time_tell().map(|t| (t * 1000f64) as i64) -} - -#[cfg(feature = "with-tremor")] -fn vorbis_time_seek_ms(decoder: &mut vorbis::Decoder, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek { - decoder.time_seek(ms) -} - -#[cfg(feature = "with-tremor")] -fn vorbis_time_tell_ms(decoder: &mut vorbis::Decoder) -> Result where R: Read + Seek { - decoder.time_tell() -} #[derive(Clone)] pub struct Player { - state: Arc>, - observers: Arc>>>, - commands: std::sync::mpsc::Sender, } -#[derive(Clone)] -pub struct PlayerState { - pub status: PlayStatus, - pub position_ms: u32, - pub position_measured_at: i64, - pub update_time: i64, - pub volume: u16, - pub track: Option, - - pub end_of_track: bool, -} - struct PlayerInternal { - state: Arc>, - observers: Arc>>>, - session: Session, commands: std::sync::mpsc::Receiver, + + state: PlayerState, + volume: u16, + sink: Box, } -#[derive(Debug)] +//#[derive(Debug)] enum PlayerCommand { - Load(SpotifyId, bool, u32), + Load(SpotifyId, bool, u32, oneshot::Sender<()>), Play, Pause, Volume(u16), Stop, Seek(u32), - SeekAt(u32, i64), } impl Player { @@ -79,31 +43,21 @@ impl Player { where F: FnOnce() -> Box + Send + 'static { let (cmd_tx, cmd_rx) = std::sync::mpsc::channel(); - let state = Arc::new(Mutex::new(PlayerState { - status: PlayStatus::kPlayStatusStop, - position_ms: 0, - position_measured_at: 0, - update_time: util::now_ms(), - volume: 0xFFFF, - track: None, - end_of_track: false, - })); + thread::spawn(move || { + let internal = PlayerInternal { + session: session, + commands: cmd_rx, - let observers = Arc::new(Mutex::new(Vec::new())); + state: PlayerState::Stopped, + volume: 0xFFFF, + sink: sink_builder(), + }; - let internal = PlayerInternal { - session: session, - commands: cmd_rx, - state: state.clone(), - observers: observers.clone(), - }; - - thread::spawn(move || internal.run(sink_builder())); + internal.run(); + }); Player { commands: cmd_tx, - state: state, - observers: observers, } } @@ -111,8 +65,13 @@ impl Player { self.commands.send(cmd).unwrap(); } - pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32) { - self.command(PlayerCommand::Load(track, start_playing, position_ms)); + pub fn load(&self, track: SpotifyId, start_playing: bool, position_ms: u32) + -> oneshot::Receiver<()> + { + let (tx, rx) = oneshot::channel(); + self.command(PlayerCommand::Load(track, start_playing, position_ms, tx)); + + rx } pub fn play(&self) { @@ -131,322 +90,310 @@ impl Player { self.command(PlayerCommand::Seek(position_ms)); } - pub fn seek_at(&self, position_ms: u32, measured_at: i64) { - self.command(PlayerCommand::SeekAt(position_ms, measured_at)); - } - - pub fn state(&self) -> PlayerState { - self.state.lock().unwrap().clone() - } - pub fn volume(&self, vol: u16) { self.command(PlayerCommand::Volume(vol)); } - - pub fn observe(&self) -> mpsc::UnboundedReceiver { - let (tx, rx) = mpsc::unbounded(); - self.observers.lock().unwrap().push(tx); - - rx - } } -fn apply_volume(volume: u16, data: &[i16]) -> Cow<[i16]> { - // Fast path when volume is 100% - if volume == 0xFFFF { - Cow::Borrowed(data) - } else { - Cow::Owned(data.iter() - .map(|&x| { - (x as i32 - * volume as i32 - / 0xFFFF) as i16 - }) - .collect()) - } +type Decoder = vorbis::Decoder>>; +enum PlayerState { + Stopped, + Paused { + decoder: Decoder, + end_of_track: oneshot::Sender<()>, + }, + Playing { + decoder: Decoder, + end_of_track: oneshot::Sender<()>, + }, + + Invalid, } -fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option> { - if track.available { - Some(Cow::Borrowed(track)) - } else { - let alternatives = track.alternatives - .iter() - .map(|alt_id| { - session.metadata().get::(*alt_id) - }); - let alternatives = future::join_all(alternatives).wait().unwrap(); - - alternatives.into_iter().find(|alt| alt.available).map(Cow::Owned) - } -} - -fn load_track(session: &Session, track_id: SpotifyId) - -> Option>>> -{ - let track = session.metadata().get::(track_id).wait().unwrap(); - - info!("Loading track \"{}\"", track.name); - - let track = match find_available_alternative(session, &track) { - Some(track) => track, - None => { - warn!("Track \"{}\" is not available", track.name); - return None; +impl PlayerState { + fn is_playing(&self) -> bool { + use self::PlayerState::*; + match *self { + Stopped | Paused { .. } => false, + Playing { .. } => true, + Invalid => panic!("invalid state"), } - }; + } - let format = match session.config().bitrate { - Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96, - Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160, - Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320, - }; - - - - let file_id = match track.files.get(&format) { - Some(&file_id) => file_id, - None => { - warn!("Track \"{}\" is not available in format {:?}", track.name, format); - return None; + fn decoder(&mut self) -> Option<&mut Decoder> { + use self::PlayerState::*; + match *self { + Stopped => None, + Paused { ref mut decoder, .. } | + Playing { ref mut decoder, .. } => Some(decoder), + Invalid => panic!("invalid state"), } - }; + } - let key = session.audio_key().request(track.id, file_id).wait().unwrap(); + fn signal_end_of_track(self) { + use self::PlayerState::*; + match self { + Paused { end_of_track, .. } | + Playing { end_of_track, .. } => { + end_of_track.complete(()) + } - let (open, _) = session.audio_file().open(file_id); - let encrypted_file = open.wait().unwrap(); + Stopped => warn!("signal_end_of_track from stopped state"), + Invalid => panic!("invalid state"), + } + } - let audio_file = Subfile::new(AudioDecrypt::new(key, encrypted_file), 0xa7); - let decoder = vorbis::Decoder::new(audio_file).unwrap(); + fn paused_to_playing(&mut self) { + use self::PlayerState::*; + match ::std::mem::replace(self, Invalid) { + Paused { decoder, end_of_track } => { + *self = Playing { + decoder: decoder, + end_of_track: end_of_track, + }; + } + _ => panic!("invalid state"), + } + } - Some(decoder) -} - -fn run_onstart(session: &Session) { - match session.config().onstart { - Some(ref program) => util::run_program(program), - None => {}, - }; -} - -fn run_onstop(session: &Session) { - match session.config().onstop { - Some(ref program) => util::run_program(program), - None => {}, - }; + fn playing_to_paused(&mut self) { + use self::PlayerState::*; + match ::std::mem::replace(self, Invalid) { + Playing { decoder, end_of_track } => { + *self = Paused { + decoder: decoder, + end_of_track: end_of_track, + }; + } + _ => panic!("invalid state"), + } + } } impl PlayerInternal { - fn run(self, mut sink: Box) { - let mut decoder = None; - + fn run(mut self) { loop { - let playing = self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay; - let cmd = if playing { + let cmd = if self.state.is_playing() { self.commands.try_recv().ok() } else { Some(self.commands.recv().unwrap()) }; - match cmd { - Some(PlayerCommand::Load(track_id, play, position)) => { - self.update(|state| { - if state.status == PlayStatus::kPlayStatusPlay { - sink.stop().unwrap(); - run_onstop(&self.session); - } - state.end_of_track = false; - state.status = PlayStatus::kPlayStatusPause; - state.position_ms = position; - state.position_measured_at = util::now_ms(); - state.track = Some(track_id); - true - }); - drop(decoder); + if let Some(cmd) = cmd { + self.handle_command(cmd); + } - decoder = match load_track(&self.session, track_id) { - Some(mut decoder) => { - match vorbis_time_seek_ms(&mut decoder, position as i64) { - Ok(_) => (), - Err(err) => error!("Vorbis error: {:?}", err), + let packet = if let PlayerState::Playing { ref mut decoder, .. } = self.state { + Some(decoder.packets().next()) + } else { None }; + + if let Some(packet) = packet { + self.handle_packet(packet); + } + } + } + + fn handle_packet(&mut self, packet: Option>) { + match packet { + Some(Ok(mut packet)) => { + if self.volume < 0xFFFF { + for x in packet.data.iter_mut() { + *x = (*x as i32 * self.volume as i32 / 0xFFFF) as i16; + } + } + + self.sink.write(&packet.data).unwrap(); + } + + Some(Err(vorbis::VorbisError::Hole)) => (), + Some(Err(e)) => panic!("Vorbis error {:?}", e), + None => { + self.sink.stop().unwrap(); + self.run_onstop(); + + let old_state = mem::replace(&mut self.state, PlayerState::Stopped); + old_state.signal_end_of_track(); + } + } + } + + fn handle_command(&mut self, cmd: PlayerCommand) { + //debug!("command={:?}", cmd); + match cmd { + PlayerCommand::Load(track_id, play, position, end_of_track) => { + if self.state.is_playing() { + self.sink.stop().unwrap(); + } + + match self.load_track(track_id, position as i64) { + Some(decoder) => { + if play { + if !self.state.is_playing() { + self.run_onstart(); + } + self.sink.start().unwrap(); + + self.state = PlayerState::Playing { + decoder: decoder, + end_of_track: end_of_track, + }; + } else { + if self.state.is_playing() { + self.run_onstop(); } - self.update(|state| { - state.status = if play { - run_onstart(&self.session); - sink.start().unwrap(); - PlayStatus::kPlayStatusPlay - } else { - PlayStatus::kPlayStatusPause - }; - state.position_ms = position; - state.position_measured_at = util::now_ms(); - - true - }); - - info!("Load Done"); - Some(decoder) - } - - None => { - self.update(|state| { - state.status = PlayStatus::kPlayStatusStop; - state.end_of_track = true; - true - }); - - None + self.state = PlayerState::Paused { + decoder: decoder, + end_of_track: end_of_track, + }; } } - - } - Some(PlayerCommand::Seek(position)) => { - match vorbis_time_seek_ms(decoder.as_mut().unwrap(), position as i64) { - Ok(_) => (), - Err(err) => error!("Vorbis error: {:?}", err), - } - self.update(|state| { - state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32; - state.position_measured_at = util::now_ms(); - - true - }); - } - Some(PlayerCommand::SeekAt(position, measured_at)) => { - let position = (util::now_ms() - measured_at + position as i64) as u32; - - match vorbis_time_seek_ms(decoder.as_mut().unwrap(), position as i64) { - Ok(_) => (), - Err(err) => error!("Vorbis error: {:?}", err), - } - self.update(|state| { - state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32; - state.position_measured_at = util::now_ms(); - - true - }); - } - Some(PlayerCommand::Play) => { - self.update(|state| { - state.status = PlayStatus::kPlayStatusPlay; - state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32; - state.position_measured_at = util::now_ms(); - true - }); - - run_onstart(&self.session); - sink.start().unwrap(); - } - Some(PlayerCommand::Pause) => { - self.update(|state| { - state.status = PlayStatus::kPlayStatusPause; - state.update_time = util::now_ms(); - state.position_ms = decoder.as_mut().map(|d| vorbis_time_tell_ms(d).unwrap()).unwrap_or(0) as u32; - state.position_measured_at = util::now_ms(); - true - }); - - sink.stop().unwrap(); - run_onstop(&self.session); - } - Some(PlayerCommand::Volume(vol)) => { - self.update(|state| { - state.volume = vol; - false - }); - } - Some(PlayerCommand::Stop) => { - self.update(|state| { - if state.status == PlayStatus::kPlayStatusPlay { - state.status = PlayStatus::kPlayStatusPause; - } - state.position_ms = 0; - state.position_measured_at = util::now_ms(); - true - }); - - sink.stop().unwrap(); - run_onstop(&self.session); - decoder = None; - } - None => (), - } - - if self.state.lock().unwrap().status == PlayStatus::kPlayStatusPlay { - let packet = decoder.as_mut().unwrap().packets().next(); - - match packet { - Some(Ok(packet)) => { - let buffer = apply_volume(self.state.lock().unwrap().volume, - &packet.data); - sink.write(&buffer).unwrap(); - - self.update(|state| { - state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32; - state.position_measured_at = util::now_ms(); - - false - }); - } - Some(Err(vorbis::VorbisError::Hole)) => (), - Some(Err(e)) => panic!("Vorbis error {:?}", e), None => { - self.update(|state| { - state.status = PlayStatus::kPlayStatusStop; - state.end_of_track = true; - true - }); - - sink.stop().unwrap(); - run_onstop(&self.session); - decoder = None; + if self.state.is_playing() { + self.run_onstop(); + } } } } - } - } - fn update(&self, f: F) - where F: FnOnce(&mut MutexGuard) -> bool - { - let mut guard = self.state.lock().unwrap(); - let update = f(&mut guard); + PlayerCommand::Seek(position) => { + if let Some(decoder) = self.state.decoder() { + match vorbis_time_seek_ms(decoder, position as i64) { + Ok(_) => (), + Err(err) => error!("Vorbis error: {:?}", err), + } + } else { + warn!("Player::seek called from invalid state"); + } + } - if update { - let observers = self.observers.lock().unwrap(); + PlayerCommand::Play => { + if let PlayerState::Paused { .. } = self.state { + self.state.paused_to_playing(); - guard.update_time = util::now_ms(); - let state = guard.clone(); - drop(guard); + self.run_onstart(); + self.sink.start().unwrap(); + } else { + warn!("Player::play called from invalid state"); + } + } - for observer in observers.iter() { - observer.send(state.clone()).unwrap(); + PlayerCommand::Pause => { + if let PlayerState::Playing { .. } = self.state { + self.state.playing_to_paused(); + + self.sink.stop().unwrap(); + self.run_onstop(); + } else { + warn!("Player::pause called from invalid state"); + } + } + + PlayerCommand::Stop => { + match self.state { + PlayerState::Playing { .. } => { + self.sink.stop().unwrap(); + self.run_onstop(); + self.state = PlayerState::Stopped; + } + PlayerState::Paused { .. } => { + self.state = PlayerState::Stopped; + }, + PlayerState::Stopped => { + warn!("Player::stop called from invalid state"); + } + PlayerState::Invalid => panic!("invalid state"), + } + } + + PlayerCommand::Volume(vol) => { + self.volume = vol; } } } -} -impl PlayerState { - pub fn status(&self) -> PlayStatus { - self.status + fn run_onstart(&self) { + match self.session.config().onstart { + Some(ref program) => util::run_program(program), + None => {}, + }; } - pub fn position(&self) -> (u32, i64) { - (self.position_ms, self.position_measured_at) + fn run_onstop(&self) { + match self.session.config().onstop { + Some(ref program) => util::run_program(program), + None => {}, + }; } - pub fn volume(&self) -> u16 { - self.volume + fn find_available_alternative<'a>(&self, track: &'a Track) -> Option> { + if track.available { + Some(Cow::Borrowed(track)) + } else { + let alternatives = track.alternatives + .iter() + .map(|alt_id| { + self.session.metadata().get::(*alt_id) + }); + let alternatives = future::join_all(alternatives).wait().unwrap(); + + alternatives.into_iter().find(|alt| alt.available).map(Cow::Owned) + } } - pub fn update_time(&self) -> i64 { - self.update_time - } + fn load_track(&self, track_id: SpotifyId, position: i64) -> Option { + let track = self.session.metadata().get::(track_id).wait().unwrap(); - pub fn end_of_track(&self) -> bool { - self.end_of_track + info!("Loading track \"{}\"", track.name); + + let track = match self.find_available_alternative(&track) { + Some(track) => track, + None => { + warn!("Track \"{}\" is not available", track.name); + return None; + } + }; + + let format = match self.session.config().bitrate { + Bitrate::Bitrate96 => FileFormat::OGG_VORBIS_96, + Bitrate::Bitrate160 => FileFormat::OGG_VORBIS_160, + Bitrate::Bitrate320 => FileFormat::OGG_VORBIS_320, + }; + + let file_id = match track.files.get(&format) { + Some(&file_id) => file_id, + None => { + warn!("Track \"{}\" is not available in format {:?}", track.name, format); + return None; + } + }; + + let key = self.session.audio_key().request(track.id, file_id).wait().unwrap(); + + let (open, _) = self.session.audio_file().open(file_id); + let encrypted_file = open.wait().unwrap(); + + let audio_file = Subfile::new(AudioDecrypt::new(key, encrypted_file), 0xa7); + let mut decoder = vorbis::Decoder::new(audio_file).unwrap(); + + match vorbis_time_seek_ms(&mut decoder, position) { + Ok(_) => (), + Err(err) => error!("Vorbis error: {:?}", err), + } + + info!("Track \"{}\" loaded", track.name); + + Some(decoder) } } + +#[cfg(not(feature = "with-tremor"))] +fn vorbis_time_seek_ms(decoder: &mut vorbis::Decoder, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek { + decoder.time_seek(ms as f64 / 1000f64) +} + +#[cfg(feature = "with-tremor")] +fn vorbis_time_seek_ms(decoder: &mut vorbis::Decoder, ms: i64) -> Result<(), vorbis::VorbisError> where R: Read + Seek { + decoder.time_seek(ms) +} diff --git a/src/spirc.rs b/src/spirc.rs index 33b5d72f..218c7aad 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,19 +1,18 @@ -use protobuf::{self, Message, RepeatedField}; -use std::borrow::Cow; -use futures::{Future, Stream, Sink, Async, Poll}; -use futures::stream::BoxStream; +use futures::future; use futures::sink::BoxSink; -use futures::sync::mpsc; +use futures::stream::BoxStream; +use futures::sync::{oneshot, mpsc}; +use futures::{Future, Stream, Sink, Async, Poll, BoxFuture}; +use protobuf::{self, Message}; use mercury::MercuryError; -use player::{Player, PlayerState}; +use player::Player; use session::Session; use util::{now_ms, SpotifyId, SeqGenerator}; use version; use protocol; -pub use protocol::spirc::PlayStatus; -use protocol::spirc::{MessageType, Frame, DeviceState}; +use protocol::spirc::{PlayStatus, State, MessageType, Frame, DeviceState}; pub struct SpircTask { player: Player, @@ -22,21 +21,12 @@ pub struct SpircTask { ident: String, device: DeviceState, - - repeat: bool, - shuffle: bool, - - last_command_ident: String, - last_command_msgid: u32, - - tracks: Vec, - index: u32, + state: State, subscription: BoxStream, sender: BoxSink, - - updates: mpsc::UnboundedReceiver, commands: mpsc::UnboundedReceiver, + end_of_track: BoxFuture<(), oneshot::Canceled>, shutdown: bool, } @@ -49,6 +39,17 @@ pub struct Spirc { commands: mpsc::UnboundedSender, } +fn initial_state() -> State { + protobuf_init!(protocol::spirc::State::new(), { + repeat: false, + shuffle: false, + + status: PlayStatus::kPlayStatusStop, + position_ms: 0, + position_measured_at: 0, + }) +} + fn initial_device_state(name: String, volume: u16) -> DeviceState { protobuf_init!(DeviceState::new(), { sw_version: version::version_string(), @@ -56,12 +57,10 @@ fn initial_device_state(name: String, volume: u16) -> DeviceState { can_play: true, volume: volume as u32, name: name, - error_code: 0, - became_active_at: 0, capabilities => [ @{ typ: protocol::spirc::CapabilityType::kCanBePlayer, - intValue => [0] + intValue => [1] }, @{ typ: protocol::spirc::CapabilityType::kDeviceType, @@ -127,8 +126,6 @@ impl Spirc { Ok(frame.write_to_bytes().unwrap()) })); - let updates = player.observe(); - let (cmd_tx, cmd_rx) = mpsc::unbounded(); let volume = 0xFFFF; @@ -141,21 +138,14 @@ impl Spirc { sequence: SeqGenerator::new(1), ident: ident, + device: device, - - repeat: false, - shuffle: false, - - last_command_ident: String::new(), - last_command_msgid: 0, - - tracks: Vec::new(), - index: 0, + state: initial_state(), subscription: subscription, sender: sender, - updates: updates, commands: cmd_rx, + end_of_track: future::empty().boxed(), shutdown: false, }; @@ -164,7 +154,7 @@ impl Spirc { commands: cmd_tx, }; - task.notify(true, None); + task.hello(); (spirc, task) } @@ -192,15 +182,6 @@ impl Future for SpircTask { Async::NotReady => (), } - match self.updates.poll().unwrap() { - Async::Ready(Some(state)) => { - progress = true; - self.handle_update(state); - } - Async::Ready(None) => panic!("player terminated"), - Async::NotReady => (), - } - match self.commands.poll().unwrap() { Async::Ready(Some(command)) => { progress = true; @@ -209,6 +190,17 @@ impl Future for SpircTask { Async::Ready(None) => (), Async::NotReady => (), } + + match self.end_of_track.poll() { + Ok(Async::Ready(())) => { + progress = true; + self.handle_end_of_track(); + } + Ok(Async::NotReady) => (), + Err(oneshot::Canceled) => { + self.end_of_track = future::empty().boxed() + } + } } let poll_sender = self.sender.poll_complete().unwrap(); @@ -219,7 +211,6 @@ impl Future for SpircTask { } if !progress { - return Ok(Async::NotReady); } } @@ -227,24 +218,12 @@ impl Future for SpircTask { } impl SpircTask { - fn handle_update(&mut self, player_state: PlayerState) { - let end_of_track = player_state.end_of_track(); - if end_of_track { - self.index = (self.index + 1) % self.tracks.len() as u32; - let track = self.tracks[self.index as usize]; - self.player.load(track, true, 0); - } else { - self.notify_with_player_state(false, None, &player_state); - } - } - fn handle_command(&mut self, cmd: SpircCommand) { match cmd { SpircCommand::Shutdown => { CommandSender::new(self, MessageType::kMessageTypeGoodbye).send(); self.shutdown = true; self.commands.close(); - self.updates.close(); } } } @@ -262,142 +241,188 @@ impl SpircTask { return; } - if frame.get_recipient().len() > 0 { - self.last_command_ident = frame.get_ident().to_owned(); - self.last_command_msgid = frame.get_seq_nr(); - } - match frame.get_typ() { MessageType::kMessageTypeHello => { - self.notify(false, Some(frame.get_ident())); + self.notify(Some(frame.get_ident())); } + MessageType::kMessageTypeLoad => { if !self.device.get_is_active() { self.device.set_is_active(true); self.device.set_became_active_at(now_ms()); } - self.reload_tracks(&frame); - if self.tracks.len() > 0 { + self.update_tracks(&frame); + + if self.state.get_track().len() > 0 { + self.state.set_position_ms(frame.get_state().get_position_ms()); + self.state.set_position_measured_at(now_ms() as u64); + let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay; - let track = self.tracks[self.index as usize]; - let position = frame.get_state().get_position_ms(); - self.player.load(track, play, position); + self.load_track(play); } else { - self.notify(false, Some(frame.get_ident())); + self.state.set_status(PlayStatus::kPlayStatusStop); } + + self.notify(None); } + MessageType::kMessageTypePlay => { - self.player.play(); + if self.state.get_status() == PlayStatus::kPlayStatusPause { + self.player.play(); + self.state.set_status(PlayStatus::kPlayStatusPlay); + self.state.set_position_measured_at(now_ms() as u64); + } + + self.notify(None); } + MessageType::kMessageTypePause => { - self.player.pause(); + if self.state.get_status() == PlayStatus::kPlayStatusPlay { + self.player.pause(); + self.state.set_status(PlayStatus::kPlayStatusPause); + + let now = now_ms() as u64; + let position = self.state.get_position_ms(); + + let diff = now - self.state.get_position_measured_at(); + + self.state.set_position_ms(position + diff as u32); + self.state.set_position_measured_at(now); + } + + self.notify(None); } + MessageType::kMessageTypeNext => { - self.index = (self.index + 1) % self.tracks.len() as u32; - let track = self.tracks[self.index as usize]; - self.player.load(track, true, 0); + let current_index = self.state.get_playing_track_index(); + let new_index = (current_index + 1) % (self.state.get_track().len() as u32); + + self.state.set_playing_track_index(new_index); + self.state.set_position_ms(0); + self.state.set_position_measured_at(now_ms() as u64); + + self.load_track(true); + self.notify(None); } + MessageType::kMessageTypePrev => { - self.index = (self.index - 1) % self.tracks.len() as u32; - let track = self.tracks[self.index as usize]; - self.player.load(track, true, 0); + // Previous behaves differently based on the position + // Under 3s it goes to the previous song + // Over 3s it seeks to zero + if self.position() < 3000 { + let current_index = self.state.get_playing_track_index(); + let new_index = (current_index - 1) % (self.state.get_track().len() as u32); + + self.state.set_playing_track_index(new_index); + self.state.set_position_ms(0); + self.state.set_position_measured_at(now_ms() as u64); + + self.load_track(true); + } else { + self.state.set_position_ms(0); + self.state.set_position_measured_at(now_ms() as u64); + self.player.seek(0); + } + self.notify(None); } + MessageType::kMessageTypeSeek => { - self.player.seek(frame.get_position()); + let position = frame.get_position(); + + self.state.set_position_ms(position); + self.state.set_position_measured_at(now_ms() as u64); + self.player.seek(position); + self.notify(None); } + MessageType::kMessageTypeReplace => { - self.reload_tracks(&frame); + self.update_tracks(&frame); + self.notify(None); } + + MessageType::kMessageTypeVolume => { + let volume = frame.get_volume(); + self.device.set_volume(volume); + self.player.volume(volume as u16); + self.notify(None); + } + MessageType::kMessageTypeNotify => { - if self.device.get_is_active() && frame.get_device_state().get_is_active() { + if self.device.get_is_active() && + frame.get_device_state().get_is_active() + { self.device.set_is_active(false); + self.state.set_status(PlayStatus::kPlayStatusStop); self.player.stop(); } } - MessageType::kMessageTypeVolume => { - let volume = frame.get_volume(); - self.player.volume(volume as u16); - self.device.set_volume(volume); - self.notify(false, None); - } - MessageType::kMessageTypeGoodbye => (), + _ => (), } } - fn reload_tracks(&mut self, ref frame: &protocol::spirc::Frame) { - self.index = frame.get_state().get_playing_track_index(); - self.tracks = frame.get_state() - .get_track() - .iter() - .filter(|track| track.has_gid()) - .map(|track| SpotifyId::from_raw(track.get_gid())) - .collect(); + fn handle_end_of_track(&mut self) { + let current_index = self.state.get_playing_track_index(); + let new_index = (current_index + 1) % (self.state.get_track().len() as u32); + + self.state.set_playing_track_index(new_index); + self.state.set_position_ms(0); + self.state.set_position_measured_at(now_ms() as u64); + + self.load_track(true); + self.notify(None); } - fn notify(&mut self, hello: bool, recipient: Option<&str>) { - let mut cs = CommandSender::new(self, - if hello { - MessageType::kMessageTypeHello - } else { - MessageType::kMessageTypeNotify - }); + fn position(&mut self) -> u32 { + let diff = now_ms() as u64 - self.state.get_position_measured_at(); + self.state.get_position_ms() + diff as u32 + } + + fn update_tracks(&mut self, ref frame: &protocol::spirc::Frame) { + let index = frame.get_state().get_playing_track_index(); + let tracks = frame.get_state().get_track(); + + self.state.set_playing_track_index(index); + self.state.set_track(tracks.into_iter().map(Clone::clone).collect()); + } + + fn load_track(&mut self, play: bool) { + let index = self.state.get_playing_track_index(); + let track = { + let gid = self.state.get_track()[index as usize].get_gid(); + SpotifyId::from_raw(gid) + }; + + let position = self.state.get_position_ms(); + + let end_of_track = self.player.load(track, play, position); + + self.state.set_status(PlayStatus::kPlayStatusPlay); + self.end_of_track = end_of_track.boxed(); + } + + fn hello(&mut self) { + CommandSender::new(self, MessageType::kMessageTypeHello).send(); + } + + fn notify(&mut self, recipient: Option<&str>) { + let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify); if let Some(s) = recipient { cs = cs.recipient(&s); } cs.send(); } - fn notify_with_player_state(&mut self, - hello: bool, - recipient: Option<&str>, - player_state: &PlayerState) { - let mut cs = CommandSender::new(self, - if hello { - MessageType::kMessageTypeHello - } else { - MessageType::kMessageTypeNotify - }) - .player_state(player_state); - if let Some(s) = recipient { - cs = cs.recipient(&s); - } - cs.send(); - } - - fn spirc_state(&self, player_state: &PlayerState) -> protocol::spirc::State { - let (position_ms, position_measured_at) = player_state.position(); - - protobuf_init!(protocol::spirc::State::new(), { - status: player_state.status(), - position_ms: position_ms, - position_measured_at: position_measured_at as u64, - - playing_track_index: self.index, - track: self.tracks.iter().map(|track| { - protobuf_init!(protocol::spirc::TrackRef::new(), { - gid: track.to_raw().to_vec() - }) - }).collect(), - - shuffle: self.shuffle, - repeat: self.repeat, - - playing_from_fallback: true, - - last_command_ident: self.last_command_ident.clone(), - last_command_msgid: self.last_command_msgid - }) + fn spirc_state(&self) -> protocol::spirc::State { + self.state.clone() } } struct CommandSender<'a> { spirc: &'a mut SpircTask, cmd: MessageType, - recipient: Option<&'a str>, - player_state: Option<&'a PlayerState>, - state: Option, + recipient: Option, } impl<'a> CommandSender<'a> { @@ -406,64 +431,34 @@ impl<'a> CommandSender<'a> { spirc: spirc, cmd: cmd, recipient: None, - player_state: None, - state: None, } } - fn recipient(mut self, r: &'a str) -> CommandSender { - self.recipient = Some(r); - self - } - - fn player_state(mut self, s: &'a PlayerState) -> CommandSender { - self.player_state = Some(s); - self - } - - #[allow(dead_code)] - fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> { - self.state = Some(s); + fn recipient(mut self, r: &str) -> CommandSender<'a> { + self.recipient = Some(r.to_owned()); self } fn send(self) { - let state = self.player_state.map_or_else(|| { - Cow::Owned(self.spirc.player.state()) - }, |s| { - Cow::Borrowed(s) - }); - let mut frame = protobuf_init!(Frame::new(), { version: 1, ident: self.spirc.ident.clone(), protocol_version: "2.0.0", seq_nr: self.spirc.sequence.get(), typ: self.cmd, - recipient: RepeatedField::from_vec( - self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) - ), device_state: self.spirc.device.clone(), - state_update_id: state.update_time() + state_update_id: now_ms(), }); + if let Some(recipient) = self.recipient { + frame.mut_recipient().push(recipient.to_owned()); + } + if self.spirc.device.get_is_active() { - frame.set_state(self.spirc.spirc_state(&state)); + frame.set_state(self.spirc.spirc_state()); } let ready = self.spirc.sender.start_send(frame).unwrap().is_ready(); assert!(ready); } } - -#[allow(dead_code)] -fn track_ids_to_state>(track_ids: I) -> protocol::spirc::State { - let tracks: Vec = - track_ids.map(|i| { - protobuf_init!(protocol::spirc::TrackRef::new(), { gid: i.to_raw().to_vec()}) - }) - .collect(); - protobuf_init!(protocol::spirc::State::new(), { - track: RepeatedField::from_vec(tracks) - }) -}