diff --git a/src/audio_file.rs b/src/audio_file.rs index f4cb4e13..8e738fa3 100644 --- a/src/audio_file.rs +++ b/src/audio_file.rs @@ -11,7 +11,7 @@ use stream::StreamEvent; use util::FileId; use session::Session; -const CHUNK_SIZE : usize = 0x40000; +const CHUNK_SIZE : usize = 0x10000; pub struct AudioFile<'s> { position: usize, diff --git a/src/main.rs b/src/main.rs index 48fcd45a..11d0da28 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ #![feature(plugin,scoped,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,arc_weak,append,future)] #![allow(deprecated)] -//#![allow(unused_imports,dead_code)] +#![allow(unused_imports,dead_code)] #![plugin(protobuf_macros)] #[macro_use] extern crate lazy_static; @@ -46,7 +46,7 @@ use metadata::{AlbumRef, ArtistRef, TrackRef}; use session::{Config, Session}; use util::SpotifyId; use util::version::version_string; -use player::Player; +use player::{Player, PlayerCommand}; use mercury::{MercuryRequest, MercuryMethod}; use librespot_protocol as protocol; use librespot_protocol::spirc::PlayStatus; @@ -76,22 +76,32 @@ fn main() { } }); + let player = Player::new(&session); + SpircManager { session: &session, - username: username.clone(), - name: name.clone(), - ident: session.config.device_id.clone(), - device_type: 5, + player: &player, + username: username.clone(), state_update_id: 0, seq_nr: 0, - volume: 0x8000, + name: name.clone(), + ident: session.config.device_id.clone(), + device_type: 5, can_play: true, + + repeat: false, + shuffle: false, + volume: 0x8000, + is_active: false, became_active_at: 0, - state: PlayerState::new() + last_command_ident: String::new(), + last_command_msgid: 0, + + track: None }.run(); poll_thread.join(); @@ -123,89 +133,10 @@ fn print_track(session: &Session, track_id: SpotifyId) { } } -struct PlayerState { - status: PlayStatus, - - context_uri: String, - index: u32, - queue: Vec, - - repeat: bool, - shuffle: bool, - - position_ms: u32, - position_measured_at: i64, - - last_command_ident: String, - last_command_msgid: u32, -} - -impl PlayerState { - fn new() -> PlayerState { - PlayerState { - status: PlayStatus::kPlayStatusPause, - - context_uri: String::new(), - index: 0, - queue: Vec::new(), - - repeat: false, - shuffle: false, - - position_ms: 0, - position_measured_at: 0, - - last_command_ident: String::new(), - last_command_msgid: 0 - } - } - - fn import(&mut self, state: &protocol::spirc::State) { - self.status = state.get_status(); - - self.context_uri = state.get_context_uri().to_string(); - self.index = state.get_playing_track_index(); - self.queue = state.get_track().iter().filter(|t| { - t.has_gid() - }).map(|t| { - SpotifyId::from_raw(t.get_gid()) - }).collect(); - - self.repeat = state.get_repeat(); - self.shuffle = state.get_shuffle(); - - self.position_ms = state.get_position_ms(); - self.position_measured_at = SpircManager::now(); - } - - fn export(&self) -> protocol::spirc::State { - protobuf_init!(protocol::spirc::State::new(), { - status: self.status, - - context_uri: self.context_uri.to_string(), - playing_track_index: self.index, - track: self.queue.iter().map(|t| { - protobuf_init!(protocol::spirc::TrackRef::new(), { - gid: t.to_raw().to_vec() - }) - }).collect(), - - shuffle: self.shuffle, - repeat: self.repeat, - - position_ms: self.position_ms, - position_measured_at: self.position_measured_at as u64, - - playing_from_fallback: true, - - last_command_ident: self.last_command_ident.clone(), - last_command_msgid: self.last_command_msgid - }) - } -} - struct SpircManager<'s> { + player: &'s Player<'s>, session: &'s Session, + username: String, state_update_id: i64, seq_nr: u32, @@ -213,90 +144,88 @@ struct SpircManager<'s> { name: String, ident: String, device_type: u8, - - volume: u16, can_play: bool, + + repeat: bool, + shuffle: bool, + volume: u16, + is_active: bool, became_active_at: i64, - state: PlayerState + last_command_ident: String, + last_command_msgid: u32, + + track: Option } impl <'s> SpircManager<'s> { fn run(&mut self) { - let rx = self.session - .mercury_sub(format!("hm://remote/user/{}/v23", self.username)) - .into_iter().map(|pkt| { - protobuf::parse_from_bytes::(pkt.payload.front().unwrap()).unwrap() - }); + let rx = self.session.mercury_sub(format!("hm://remote/user/{}/v23", self.username)); self.notify(None); - for frame in rx { - println!("{:?} {} {} {} {}", - frame.get_typ(), - frame.get_device_state().get_name(), - frame.get_ident(), - frame.get_seq_nr(), - frame.get_state_update_id()); - if frame.get_ident() != self.ident && - (frame.get_recipient().len() == 0 || - frame.get_recipient().contains(&self.ident)) { - self.handle(frame); + loop { + if let Ok(pkt) = rx.try_recv() { + let frame = protobuf::parse_from_bytes::( + pkt.payload.front().unwrap()).unwrap(); + println!("{:?} {} {} {} {}", + frame.get_typ(), + frame.get_device_state().get_name(), + frame.get_ident(), + frame.get_seq_nr(), + frame.get_state_update_id()); + if frame.get_ident() != self.ident && + (frame.get_recipient().len() == 0 || + frame.get_recipient().contains(&self.ident)) { + self.handle(frame); + } + } + + let h = self.player.state.0.lock().unwrap(); + if h.update_time > self.state_update_id { + self.state_update_id = util::now_ms(); + drop(h); + self.notify(None); } } } fn handle(&mut self, frame: protocol::spirc::Frame) { + if frame.get_recipient().len() > 0 { + self.last_command_ident = frame.get_ident().to_string(); + self.last_command_msgid = frame.get_seq_nr(); + } match frame.get_typ() { protocol::spirc::MessageType::kMessageTypeHello => { self.notify(Some(frame.get_ident())); } protocol::spirc::MessageType::kMessageTypeLoad => { - self.is_active = true; - self.became_active_at = SpircManager::now(); + if !self.is_active { + self.is_active = true; + self.became_active_at = util::now_ms(); + } - self.state.import(frame.get_state()); - - self.state.last_command_ident = frame.get_ident().to_string(); - self.state.last_command_msgid = frame.get_seq_nr(); - - self.state_update_id = SpircManager::now(); - self.notify(None); + let index = frame.get_state().get_playing_track_index() as usize; + let track = SpotifyId::from_raw(frame.get_state().get_track()[index].get_gid()); + self.track = Some(track); + self.player.command(PlayerCommand::Load(track, + frame.get_state().get_status() == PlayStatus::kPlayStatusPlay, + frame.get_state().get_position_ms())); } protocol::spirc::MessageType::kMessageTypePlay => { - self.state.status = PlayStatus::kPlayStatusPlay; - self.state.position_measured_at = SpircManager::now(); - - self.state.last_command_ident = frame.get_ident().to_string(); - self.state.last_command_msgid = frame.get_seq_nr(); - - self.state_update_id = SpircManager::now(); - self.notify(None); + self.player.command(PlayerCommand::Play); } protocol::spirc::MessageType::kMessageTypePause => { - self.state.status = PlayStatus::kPlayStatusPause; - self.state.position_measured_at = SpircManager::now(); - - self.state.last_command_ident = frame.get_ident().to_string(); - self.state.last_command_msgid = frame.get_seq_nr(); - - self.state_update_id = SpircManager::now(); - self.notify(None); + self.player.command(PlayerCommand::Pause); } protocol::spirc::MessageType::kMessageTypeSeek => { - self.state.position_ms = frame.get_position(); - self.state.position_measured_at = SpircManager::now(); - - self.state.last_command_ident = frame.get_ident().to_string(); - self.state.last_command_msgid = frame.get_seq_nr(); - - self.state_update_id = SpircManager::now(); - self.notify(None); + self.player.command(PlayerCommand::Seek(frame.get_position())); } protocol::spirc::MessageType::kMessageTypeNotify => { - if frame.get_device_state().get_is_active() { - //println!("{:?}", frame.get_state()); + if self.is_active && frame.get_device_state().get_is_active() { + self.is_active = false; + self.player.command(PlayerCommand::Stop); } } _ => () @@ -318,7 +247,7 @@ impl <'s> SpircManager<'s> { }); if self.is_active { - pkt.set_state(self.state.export()); + pkt.set_state(self.state()); } self.session.mercury(MercuryRequest{ @@ -329,6 +258,31 @@ impl <'s> SpircManager<'s> { }); } + fn state(&mut self) -> protocol::spirc::State { + let state = self.player.state.0.lock().unwrap(); + + protobuf_init!(protocol::spirc::State::new(), { + status: state.status, + position_ms: state.position_ms, + position_measured_at: state.position_measured_at as u64, + + playing_track_index: 0, + track => [ + @{ + gid: self.track.unwrap().to_raw().to_vec() + } + ], + + shuffle: self.shuffle, + repeat: self.repeat, + + playing_from_fallback: true, + + last_command_ident: self.last_command_ident.clone(), + last_command_msgid: self.last_command_msgid + }) + } + fn device_state(&mut self) -> protocol::spirc::DeviceState { protobuf_init!(protocol::spirc::DeviceState::new(), { sw_version: version_string(), @@ -388,10 +342,5 @@ impl <'s> SpircManager<'s> { ], }) } - - fn now() -> i64 { - let ts = time::now_utc().to_timespec(); - ts.sec * 1000 + ts.nsec as i64 / 1000000 - } } diff --git a/src/player.rs b/src/player.rs index 979ba51c..6a79b31e 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,27 +1,79 @@ use portaudio; use vorbis; +use std::sync::{mpsc, Mutex, Arc, Condvar}; +use std::thread; use metadata::TrackRef; use session::Session; use audio_file::AudioFile; use audio_decrypt::AudioDecrypt; -use util::Subfile; +use util::{self, SpotifyId, Subfile}; +use librespot_protocol::spirc::PlayStatus; -pub struct Player; +pub enum PlayerCommand { + Load(SpotifyId, bool, u32), + Play, + Pause, + Stop, + Seek(u32) +} -impl Player { - pub fn play(session: &Session, track: TrackRef) { - let file_id = *track.wait().unwrap().files.first().unwrap(); +pub struct PlayerState { + pub status: PlayStatus, + pub position_ms: u32, + pub position_measured_at: i64, + pub update_time: i64 +} - let key = session.audio_key(track.id(), file_id).into_inner(); +struct PlayerInternal<'s> { + state: Arc<(Mutex, Condvar)>, - let mut decoder = - vorbis::Decoder::new( - Subfile::new( - AudioDecrypt::new(key, - AudioFile::new(session, file_id)), 0xa7)).unwrap(); - //decoder.time_seek(60f64).unwrap(); + session: &'s Session, + commands: mpsc::Receiver, +} +pub struct Player<'s> { + pub state: Arc<(Mutex, Condvar)>, + + commands: mpsc::Sender, + + #[allow(dead_code)] + thread: thread::JoinGuard<'s, ()>, +} + +impl <'s> Player<'s> { + pub fn new(session: &Session) -> Player { + let (cmd_tx, cmd_rx) = mpsc::channel(); + + let state = Arc::new((Mutex::new(PlayerState { + status: PlayStatus::kPlayStatusStop, + position_ms: 0, + position_measured_at: 0, + update_time: util::now_ms(), + }), Condvar::new())); + + let internal = PlayerInternal { + session: session, + commands: cmd_rx, + state: state.clone() + }; + + Player { + commands: cmd_tx, + state: state, + thread: thread::scoped(move || { + internal.run() + }) + } + } + + pub fn command(&self, cmd: PlayerCommand) { + self.commands.send(cmd).unwrap(); + } +} + +impl <'s> PlayerInternal<'s> { + fn run(self) { portaudio::initialize().unwrap(); let stream = portaudio::stream::Stream::::open_default( @@ -31,20 +83,97 @@ impl Player { portaudio::stream::FRAMES_PER_BUFFER_UNSPECIFIED, None ).unwrap(); - stream.start().unwrap(); - for pkt in decoder.packets() { - match pkt { - Ok(packet) => { - match stream.write(&packet.data) { - Ok(_) => (), - Err(portaudio::PaError::OutputUnderflowed) - => eprintln!("Underflow"), - Err(e) => panic!("PA Error {}", e) + let mut decoder = None; + + loop { + match self.commands.try_recv() { + Ok(PlayerCommand::Load(id, play, position)) => { + println!("Load"); + let mut h = self.state.0.lock().unwrap(); + if h.status == PlayStatus::kPlayStatusPlay { + stream.stop().unwrap(); + } + h.status = PlayStatus::kPlayStatusLoading; + h.position_ms = position; + h.position_measured_at = util::now_ms(); + h.update_time = util::now_ms(); + drop(h); + + let track : TrackRef = self.session.metadata(id); + let file_id = *track.wait().unwrap().files.first().unwrap(); + let key = self.session.audio_key(track.id(), file_id).into_inner(); + decoder = Some( + vorbis::Decoder::new( + Subfile::new( + AudioDecrypt::new(key, + AudioFile::new(self.session, file_id)), 0xa7)).unwrap()); + decoder.as_mut().unwrap().time_seek(position as f64 / 1000f64).unwrap(); + + let mut h = self.state.0.lock().unwrap(); + h.status = if play { + stream.start().unwrap(); + PlayStatus::kPlayStatusPlay + } else { + PlayStatus::kPlayStatusPause }; + h.position_ms = position; + h.position_measured_at = util::now_ms(); + h.update_time = util::now_ms(); + println!("Load Done"); + } + Ok(PlayerCommand::Seek(ms)) => { + let mut h = self.state.0.lock().unwrap(); + decoder.as_mut().unwrap().time_seek(ms as f64 / 1000f64).unwrap(); + h.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32; + h.position_measured_at = util::now_ms(); + h.update_time = util::now_ms(); }, - Err(vorbis::VorbisError::Hole) => (), - Err(e) => panic!("Vorbis error {:?}", e) + Ok(PlayerCommand::Play) => { + println!("Play"); + let mut h = self.state.0.lock().unwrap(); + h.status = PlayStatus::kPlayStatusPlay; + h.update_time = util::now_ms(); + + stream.start().unwrap(); + }, + Ok(PlayerCommand::Pause) => { + let mut h = self.state.0.lock().unwrap(); + h.status = PlayStatus::kPlayStatusPause; + h.update_time = util::now_ms(); + + stream.stop().unwrap(); + }, + Ok(PlayerCommand::Stop) => { + let mut h = self.state.0.lock().unwrap(); + if h.status == PlayStatus::kPlayStatusPlay { + stream.stop().unwrap(); + } + + h.status = PlayStatus::kPlayStatusPause; + h.update_time = util::now_ms(); + decoder = None; + }, + Err(..) => (), + } + + if self.state.0.lock().unwrap().status == PlayStatus::kPlayStatusPlay { + match decoder.as_mut().unwrap().packets().next().unwrap() { + Ok(packet) => { + match stream.write(&packet.data) { + Ok(_) => (), + Err(portaudio::PaError::OutputUnderflowed) + => eprintln!("Underflow"), + Err(e) => panic!("PA Error {}", e) + }; + }, + Err(vorbis::VorbisError::Hole) => (), + Err(e) => panic!("Vorbis error {:?}", e) + } + + let mut h = self.state.0.lock().unwrap(); + h.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32; + h.position_measured_at = util::now_ms(); } } @@ -54,3 +183,4 @@ impl Player { } } + diff --git a/src/util/mod.rs b/src/util/mod.rs index 061b2dda..34bada1c 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,4 +1,5 @@ use rand::{Rng,Rand}; +use time; mod int128; mod spotify_id; @@ -66,3 +67,9 @@ impl IgnoreExt for Result { } } } + +pub fn now_ms() -> i64 { + let ts = time::now_utc().to_timespec(); + ts.sec * 1000 + ts.nsec as i64 / 1000000 +} +