diff --git a/.travis.yml b/.travis.yml index 31b7bbe5..e268d62b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,18 +5,15 @@ rust: - beta - nightly -cache: cargo -# Reduce cache bloat +# Need to cache the whole `.cargo` directory to keep .crates.toml for +# cargo-update to work +cache: + directories: + - /home/travis/.cargo + +# But don't cache the cargo registry before_cache: - - rm -rfv "$TRAVIS_HOME/.cargo/registry/src" - - rm -rfv target/debug/incremental/{librespot,build_script_build}-* - - rm -rfv target/debug/.fingerprint/librespot-* - - rm -rfv target/debug/build/librespot-* - - rm -rfv target/debug/deps/liblibrespot-* - - rm -rfv target/debug/deps/librespot-* - - rm -rfv target/debug/{librespot,liblibrespot}.d - - rm -rfv target/debug/incremental/{build_script_build,librespot,librespot_core,librespot_connect,librespot_audio,librespot_metadata,librespot_playback,librespot_player,librespot_protocol}-* - - cargo clean -p librespot -p librespot-core -p librespot-connect -p librespot-audio -p librespot-metadata -p librespot-playback + - rm -rf /home/travis/.cargo/registry addons: apt: diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 1ab6ce78..c47cb4d3 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -144,6 +144,15 @@ impl StreamLoaderController { } } + pub fn range_to_end_available(&self) -> bool { + if let Some(ref shared) = self.stream_shared { + let read_position = shared.read_position.load(atomic::Ordering::Relaxed); + self.range_available(Range::new(read_position, self.len() - read_position)) + } else { + true + } + } + pub fn ping_time_ms(&self) -> usize { if let Some(ref shared) = self.stream_shared { return shared.ping_time_ms.load(atomic::Ordering::Relaxed); diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 5147201d..453327c5 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -2,7 +2,7 @@ use std; use std::time::{SystemTime, UNIX_EPOCH}; use futures::future; -use futures::sync::{mpsc, oneshot}; +use futures::sync::mpsc; use futures::{Async, Future, Poll, Sink, Stream}; use protobuf::{self, Message}; use rand; @@ -11,7 +11,7 @@ use serde_json; use crate::context::StationContext; use crate::playback::mixer::Mixer; -use crate::playback::player::Player; +use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel}; use crate::protocol; use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef}; use librespot_core::config::ConnectConfig; @@ -22,6 +22,24 @@ use librespot_core::util::SeqGenerator; use librespot_core::version; use librespot_core::volume::Volume; +enum SpircPlayStatus { + Stopped, + LoadingPlay { + position_ms: u32, + }, + LoadingPause { + position_ms: u32, + }, + Playing { + nominal_start_time: i64, + preloading_of_next_track_triggered: bool, + }, + Paused { + position_ms: u32, + preloading_of_next_track_triggered: bool, + }, +} + pub struct SpircTask { player: Player, mixer: Box, @@ -32,11 +50,14 @@ pub struct SpircTask { ident: String, device: DeviceState, state: State, + play_request_id: Option, + mixer_started: bool, + play_status: SpircPlayStatus, subscription: Box>, sender: Box>, commands: mpsc::UnboundedReceiver, - end_of_track: Box>, + player_events: PlayerEventChannel, shutdown: bool, session: Session, @@ -255,6 +276,8 @@ impl Spirc { }; let device = initial_device_state(config); + let player_events = player.get_player_event_channel(); + let mut task = SpircTask { player: player, mixer: mixer, @@ -266,11 +289,14 @@ impl Spirc { device: device, state: initial_state(), + play_request_id: None, + mixer_started: false, + play_status: SpircPlayStatus::Stopped, subscription: subscription, sender: sender, commands: cmd_rx, - end_of_track: Box::new(future::empty()), + player_events: player_events, shutdown: false, session: session.clone(), @@ -350,13 +376,14 @@ impl Future for SpircTask { Async::NotReady => (), } - match self.end_of_track.poll() { - Ok(Async::Ready(())) => { - progress = true; - self.handle_end_of_track(); - } + match self.player_events.poll() { Ok(Async::NotReady) => (), - Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()), + Ok(Async::Ready(None)) => (), + Err(_) => (), + Ok(Async::Ready(Some(event))) => { + progress = true; + self.handle_player_event(event); + } } // TODO: Refactor match self.context_fut.poll() { @@ -431,13 +458,33 @@ impl SpircTask { + (dur.subsec_nanos() / 1000_000) as i64) } + fn ensure_mixer_started(&mut self) { + if !self.mixer_started { + self.mixer.start(); + self.mixer_started = true; + } + } + + fn ensure_mixer_stopped(&mut self) { + if self.mixer_started { + self.mixer.stop(); + self.mixer_started = false; + } + } + + fn update_state_position(&mut self, position_ms: u32) { + let now = self.now_ms(); + self.state.set_position_measured_at(now as u64); + self.state.set_position_ms(position_ms); + } + fn handle_command(&mut self, cmd: SpircCommand) { let active = self.device.get_is_active(); match cmd { SpircCommand::Play => { if active { self.handle_play(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypePlay).send(); } @@ -445,7 +492,7 @@ impl SpircTask { SpircCommand::PlayPause => { if active { self.handle_play_pause(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypePlayPause).send(); } @@ -453,7 +500,7 @@ impl SpircTask { SpircCommand::Pause => { if active { self.handle_pause(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypePause).send(); } @@ -461,7 +508,7 @@ impl SpircTask { SpircCommand::Prev => { if active { self.handle_prev(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypePrev).send(); } @@ -469,7 +516,7 @@ impl SpircTask { SpircCommand::Next => { if active { self.handle_next(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypeNext).send(); } @@ -477,7 +524,7 @@ impl SpircTask { SpircCommand::VolumeUp => { if active { self.handle_volume_up(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send(); } @@ -485,7 +532,7 @@ impl SpircTask { SpircCommand::VolumeDown => { if active { self.handle_volume_down(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send(); } @@ -498,14 +545,122 @@ impl SpircTask { } } + fn handle_player_event(&mut self, event: PlayerEvent) { + // we only process events if the play_request_id matches. If it doesn't, it is + // an event that belongs to a previous track and only arrives now due to a race + // condition. In this case we have updated the state already and don't want to + // mess with it. + if let Some(play_request_id) = event.get_play_request_id() { + if Some(play_request_id) == self.play_request_id { + match event { + PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track(), + PlayerEvent::Loading { .. } => self.notify(None, false), + PlayerEvent::Playing { position_ms, .. } => { + let new_nominal_start_time = self.now_ms() - position_ms as i64; + match self.play_status { + SpircPlayStatus::Playing { + ref mut nominal_start_time, + .. + } => { + if (*nominal_start_time - new_nominal_start_time).abs() > 100 { + *nominal_start_time = new_nominal_start_time; + self.update_state_position(position_ms); + self.notify(None, true); + } + } + SpircPlayStatus::LoadingPlay { .. } + | SpircPlayStatus::LoadingPause { .. } => { + self.state.set_status(PlayStatus::kPlayStatusPlay); + self.update_state_position(position_ms); + self.notify(None, true); + self.play_status = SpircPlayStatus::Playing { + nominal_start_time: new_nominal_start_time, + preloading_of_next_track_triggered: false, + }; + } + _ => (), + }; + trace!("==> kPlayStatusPlay"); + } + PlayerEvent::Paused { + position_ms: new_position_ms, + .. + } => { + match self.play_status { + SpircPlayStatus::Paused { + ref mut position_ms, + .. + } => { + if *position_ms != new_position_ms { + *position_ms = new_position_ms; + self.update_state_position(new_position_ms); + self.notify(None, true); + } + } + SpircPlayStatus::LoadingPlay { .. } + | SpircPlayStatus::LoadingPause { .. } => { + self.state.set_status(PlayStatus::kPlayStatusPause); + self.update_state_position(new_position_ms); + self.notify(None, true); + self.play_status = SpircPlayStatus::Paused { + position_ms: new_position_ms, + preloading_of_next_track_triggered: false, + }; + } + _ => (), + } + trace!("==> kPlayStatusPause"); + } + PlayerEvent::Stopped { .. } => match self.play_status { + SpircPlayStatus::Stopped => (), + _ => { + warn!("The player has stopped unexpectedly."); + self.state.set_status(PlayStatus::kPlayStatusStop); + self.ensure_mixer_stopped(); + self.notify(None, true); + self.play_status = SpircPlayStatus::Stopped; + } + }, + PlayerEvent::TimeToPreloadNextTrack { .. } => match self.play_status { + SpircPlayStatus::Paused { + ref mut preloading_of_next_track_triggered, + .. + } + | SpircPlayStatus::Playing { + ref mut preloading_of_next_track_triggered, + .. + } => { + *preloading_of_next_track_triggered = true; + if let Some(track_id) = self.preview_next_track() { + self.player.preload(track_id); + } + } + SpircPlayStatus::LoadingPause { .. } + | SpircPlayStatus::LoadingPlay { .. } + | SpircPlayStatus::Stopped => (), + }, + _ => (), + } + } + } + } + fn handle_frame(&mut self, frame: Frame) { + let state_string = match frame.get_state().get_status() { + PlayStatus::kPlayStatusLoading => "kPlayStatusLoading", + PlayStatus::kPlayStatusPause => "kPlayStatusPause", + PlayStatus::kPlayStatusStop => "kPlayStatusStop", + PlayStatus::kPlayStatusPlay => "kPlayStatusPlay", + }; + debug!( - "{:?} {:?} {} {} {}", + "{:?} {:?} {} {} {} {}", frame.get_typ(), frame.get_device_state().get_name(), frame.get_ident(), frame.get_seq_nr(), - frame.get_state_update_id() + frame.get_state_update_id(), + state_string, ); if frame.get_ident() == self.ident @@ -516,7 +671,7 @@ impl SpircTask { match frame.get_typ() { MessageType::kMessageTypeHello => { - self.notify(Some(frame.get_ident())); + self.notify(Some(frame.get_ident()), true); } MessageType::kMessageTypeLoad => { @@ -529,61 +684,58 @@ impl SpircTask { self.update_tracks(&frame); if self.state.get_track().len() > 0 { - let now = self.now_ms(); - self.state - .set_position_ms(frame.get_state().get_position_ms()); - self.state.set_position_measured_at(now as u64); - - let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay; - self.load_track(play); + let start_playing = + frame.get_state().get_status() == PlayStatus::kPlayStatusPlay; + self.load_track(start_playing, frame.get_state().get_position_ms()); } else { info!("No more tracks left in queue"); self.state.set_status(PlayStatus::kPlayStatusStop); self.player.stop(); self.mixer.stop(); + self.play_status = SpircPlayStatus::Stopped; } - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypePlay => { self.handle_play(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypePlayPause => { self.handle_play_pause(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypePause => { self.handle_pause(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeNext => { self.handle_next(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypePrev => { self.handle_prev(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeVolumeUp => { self.handle_volume_up(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeVolumeDown => { self.handle_volume_down(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeRepeat => { self.state.set_repeat(frame.get_state().get_repeat()); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeShuffle => { @@ -603,27 +755,38 @@ impl SpircTask { let context = self.state.get_context_uri(); debug!("{:?}", context); } - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeSeek => { - let position = frame.get_position(); - - let now = self.now_ms(); - self.state.set_position_ms(position); - self.state.set_position_measured_at(now as u64); - self.player.seek(position); - self.notify(None); + self.handle_seek(frame.get_position()); + self.notify(None, true); } MessageType::kMessageTypeReplace => { self.update_tracks(&frame); - self.notify(None); + self.notify(None, true); + + if let SpircPlayStatus::Playing { + preloading_of_next_track_triggered, + .. + } + | SpircPlayStatus::Paused { + preloading_of_next_track_triggered, + .. + } = self.play_status + { + if preloading_of_next_track_triggered { + if let Some(track_id) = self.preview_next_track() { + self.player.preload(track_id); + } + } + } } MessageType::kMessageTypeVolume => { self.set_volume(frame.get_volume() as u16); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeNotify => { @@ -631,7 +794,8 @@ impl SpircTask { self.device.set_is_active(false); self.state.set_status(PlayStatus::kPlayStatusStop); self.player.stop(); - self.mixer.stop(); + self.ensure_mixer_stopped(); + self.play_status = SpircPlayStatus::Stopped; } } @@ -640,39 +804,87 @@ impl SpircTask { } fn handle_play(&mut self) { - if self.state.get_status() == PlayStatus::kPlayStatusPause { - self.mixer.start(); - self.player.play(); - self.state.set_status(PlayStatus::kPlayStatusPlay); - let now = self.now_ms(); - self.state.set_position_measured_at(now as u64); + match self.play_status { + SpircPlayStatus::Paused { + position_ms, + preloading_of_next_track_triggered, + } => { + self.ensure_mixer_started(); + self.player.play(); + self.state.set_status(PlayStatus::kPlayStatusPlay); + self.update_state_position(position_ms); + self.play_status = SpircPlayStatus::Playing { + nominal_start_time: self.now_ms() as i64 - position_ms as i64, + preloading_of_next_track_triggered, + }; + } + SpircPlayStatus::LoadingPause { position_ms } => { + self.ensure_mixer_started(); + self.player.play(); + self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; + } + _ => (), } } fn handle_play_pause(&mut self) { - match self.state.get_status() { - PlayStatus::kPlayStatusPlay => self.handle_pause(), - PlayStatus::kPlayStatusPause => self.handle_play(), + match self.play_status { + SpircPlayStatus::Paused { .. } | SpircPlayStatus::LoadingPause { .. } => { + self.handle_play() + } + SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => { + self.handle_play() + } _ => (), } } fn handle_pause(&mut self) { - if self.state.get_status() == PlayStatus::kPlayStatusPlay { - self.player.pause(); - self.mixer.stop(); - self.state.set_status(PlayStatus::kPlayStatusPause); - - let now = self.now_ms() as u64; - let position = self.state.get_position_ms(); - - let diff = now - self.state.get_position_measured_at(); - - self.state.set_position_ms(position + diff as u32); - self.state.set_position_measured_at(now); + match self.play_status { + SpircPlayStatus::Playing { + nominal_start_time, + preloading_of_next_track_triggered, + } => { + self.player.pause(); + self.state.set_status(PlayStatus::kPlayStatusPause); + let position_ms = (self.now_ms() - nominal_start_time) as u32; + self.update_state_position(position_ms); + self.play_status = SpircPlayStatus::Paused { + position_ms, + preloading_of_next_track_triggered, + }; + } + SpircPlayStatus::LoadingPlay { position_ms } => { + self.player.pause(); + self.play_status = SpircPlayStatus::LoadingPause { position_ms }; + } + _ => (), } } + fn handle_seek(&mut self, position_ms: u32) { + self.update_state_position(position_ms); + self.player.seek(position_ms); + let now = self.now_ms(); + match self.play_status { + SpircPlayStatus::Stopped => (), + SpircPlayStatus::LoadingPause { + position_ms: ref mut position, + } + | SpircPlayStatus::LoadingPlay { + position_ms: ref mut position, + } + | SpircPlayStatus::Paused { + position_ms: ref mut position, + .. + } => *position = position_ms, + SpircPlayStatus::Playing { + ref mut nominal_start_time, + .. + } => *nominal_start_time = now - position_ms as i64, + }; + } + fn consume_queued_track(&mut self) -> usize { // Removes current track if it is queued // Returns the index of the next track @@ -687,6 +899,11 @@ impl SpircTask { } } + fn preview_next_track(&mut self) -> Option { + self.get_track_id_to_play_from_playlist(self.state.get_playing_track_index() + 1) + .and_then(|(track_id, _)| Some(track_id)) + } + fn handle_next(&mut self) { let mut new_index = self.consume_queued_track() as u32; let mut continue_playing = true; @@ -720,17 +937,14 @@ impl SpircTask { if tracks_len > 0 { self.state.set_playing_track_index(new_index); - self.state.set_position_ms(0); - let now = self.now_ms(); - self.state.set_position_measured_at(now as u64); - - self.load_track(continue_playing); + self.load_track(continue_playing, 0); } else { info!("Not playing next track because there are no more tracks left in queue."); self.state.set_playing_track_index(0); self.state.set_status(PlayStatus::kPlayStatusStop); self.player.stop(); - self.mixer.stop(); + self.ensure_mixer_stopped(); + self.play_status = SpircPlayStatus::Stopped; } } @@ -765,17 +979,11 @@ impl SpircTask { pos += 1; } - let now = self.now_ms(); self.state.set_playing_track_index(new_index); - self.state.set_position_ms(0); - self.state.set_position_measured_at(now as u64); - self.load_track(true); + self.load_track(true, 0); } else { - let now = self.now_ms(); - self.state.set_position_ms(0); - self.state.set_position_measured_at(now as u64); - self.player.seek(0); + self.handle_seek(0); } } @@ -797,12 +1005,19 @@ impl SpircTask { fn handle_end_of_track(&mut self) { self.handle_next(); - self.notify(None); + self.notify(None, true); } fn position(&mut self) -> u32 { - let diff = self.now_ms() as u64 - self.state.get_position_measured_at(); - self.state.get_position_ms() + diff as u32 + match self.play_status { + SpircPlayStatus::Stopped => 0, + SpircPlayStatus::LoadingPlay { position_ms } + | SpircPlayStatus::LoadingPause { position_ms } + | SpircPlayStatus::Paused { position_ms, .. } => position_ms, + SpircPlayStatus::Playing { + nominal_start_time, .. + } => (self.now_ms() - nominal_start_time) as u32, + } } fn resolve_station( @@ -920,60 +1135,91 @@ impl SpircTask { }) } - fn load_track(&mut self, play: bool) { - let context_uri = self.state.get_context_uri().to_owned(); - let mut index = self.state.get_playing_track_index(); - let start_index = index; + fn get_track_id_to_play_from_playlist(&self, index: u32) -> Option<(SpotifyId, u32)> { let tracks_len = self.state.get_track().len() as u32; - debug!( - "Loading context: <{}> index: [{}] of {}", - context_uri, index, tracks_len - ); + + let mut new_playlist_index = index; + + if new_playlist_index >= tracks_len { + new_playlist_index = 0; + } + + let start_index = new_playlist_index; + // Cycle through all tracks, break if we don't find any playable tracks - // TODO: This will panic if no playable tracks are found! // tracks in each frame either have a gid or uri (that may or may not be a valid track) // E.g - context based frames sometimes contain tracks with - let track = { - let mut track_ref = self.state.get_track()[index as usize].clone(); - let mut track_id = self.get_spotify_id_for_track(&track_ref); - while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable - { - warn!( - "Skipping track <{:?}> at position [{}] of {}", - track_ref.get_uri(), - index, - tracks_len - ); - index = if index + 1 < tracks_len { index + 1 } else { 0 }; - self.state.set_playing_track_index(index); - if index == start_index { - warn!("No playable track found in state: {:?}", self.state); - break; - } - track_ref = self.state.get_track()[index as usize].clone(); - track_id = self.get_spotify_id_for_track(&track_ref); + + let mut track_ref = self.state.get_track()[new_playlist_index as usize].clone(); + let mut track_id = self.get_spotify_id_for_track(&track_ref); + while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable { + warn!( + "Skipping track <{:?}> at position [{}] of {}", + track_ref.get_uri(), + new_playlist_index, + tracks_len + ); + + new_playlist_index += 1; + if new_playlist_index >= tracks_len { + new_playlist_index = 0; } - track_id - } - .expect("Invalid SpotifyId"); - let position = self.state.get_position_ms(); - let end_of_track = self.player.load(track, play, position); - - if play { - self.state.set_status(PlayStatus::kPlayStatusPlay); - } else { - self.state.set_status(PlayStatus::kPlayStatusPause); + if new_playlist_index == start_index { + warn!("No playable track found in state: {:?}", self.state); + return None; + } + track_ref = self.state.get_track()[index as usize].clone(); + track_id = self.get_spotify_id_for_track(&track_ref); } - self.end_of_track = Box::new(end_of_track); + match track_id { + Ok(track_id) => Some((track_id, new_playlist_index)), + Err(_) => None, + } + } + + fn load_track(&mut self, start_playing: bool, position_ms: u32) { + let index = self.state.get_playing_track_index(); + + match self.get_track_id_to_play_from_playlist(index) { + Some((track, index)) => { + self.state.set_playing_track_index(index); + + self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); + + self.update_state_position(position_ms); + self.state.set_status(PlayStatus::kPlayStatusLoading); + if start_playing { + self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; + } else { + self.play_status = SpircPlayStatus::LoadingPause { position_ms }; + } + } + None => { + self.state.set_status(PlayStatus::kPlayStatusStop); + self.player.stop(); + self.ensure_mixer_stopped(); + self.play_status = SpircPlayStatus::Stopped; + } + } } fn hello(&mut self) { CommandSender::new(self, MessageType::kMessageTypeHello).send(); } - fn notify(&mut self, recipient: Option<&str>) { + fn notify(&mut self, recipient: Option<&str>, suppress_loading_status: bool) { + if suppress_loading_status && (self.state.get_status() == PlayStatus::kPlayStatusLoading) { + return; + }; + let status_string = match self.state.get_status() { + PlayStatus::kPlayStatusLoading => "kPlayStatusLoading", + PlayStatus::kPlayStatusPause => "kPlayStatusPause", + PlayStatus::kPlayStatusStop => "kPlayStatusStop", + PlayStatus::kPlayStatusPlay => "kPlayStatusPlay", + }; + trace!("Sending status to server: [{}]", status_string); let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify); if let Some(s) = recipient { cs = cs.recipient(&s); @@ -988,6 +1234,7 @@ impl SpircTask { if let Some(cache) = self.session.cache() { cache.save_volume(Volume { volume }) } + self.player.emit_volume_set_event(volume); } } diff --git a/core/src/util/mod.rs b/core/src/util/mod.rs index 5c1e50f5..800e04e1 100644 --- a/core/src/util/mod.rs +++ b/core/src/util/mod.rs @@ -38,7 +38,7 @@ pub trait Seq { macro_rules! impl_seq { ($($ty:ty)*) => { $( impl Seq for $ty { - fn next(&self) -> Self { *self + 1 } + fn next(&self) -> Self { (*self).wrapping_add(1) } } )* } } diff --git a/examples/play.rs b/examples/play.rs index 6888ebb4..4ba4c5b5 100644 --- a/examples/play.rs +++ b/examples/play.rs @@ -34,12 +34,14 @@ fn main() { .run(Session::connect(session_config, credentials, None, handle)) .unwrap(); - let (player, _) = Player::new(player_config, session.clone(), None, move || { + let (mut player, _) = Player::new(player_config, session.clone(), None, move || { (backend)(None) }); + player.load(track, true, 0); + println!("Playing..."); - core.run(player.load(track, true, 0)).unwrap(); + core.run(player.get_end_of_track_future()).unwrap(); println!("Done"); } diff --git a/metadata/src/lib.rs b/metadata/src/lib.rs index d4bd797c..8bd422ce 100644 --- a/metadata/src/lib.rs +++ b/metadata/src/lib.rs @@ -63,6 +63,7 @@ pub struct AudioItem { pub uri: String, pub files: LinearMap, pub name: String, + pub duration: i32, pub available: bool, pub alternatives: Option>, } @@ -100,6 +101,7 @@ impl AudioFiles for Track { uri: format!("spotify:track:{}", id.to_base62()), files: item.files, name: item.name, + duration: item.duration, available: item.available, alternatives: Some(item.alternatives), }) @@ -118,6 +120,7 @@ impl AudioFiles for Episode { uri: format!("spotify:episode:{}", id.to_base62()), files: item.files, name: item.name, + duration: item.duration, available: item.available, alternatives: None, }) diff --git a/playback/src/config.rs b/playback/src/config.rs index 0f711100..9d65042c 100644 --- a/playback/src/config.rs +++ b/playback/src/config.rs @@ -30,6 +30,7 @@ pub struct PlayerConfig { pub bitrate: Bitrate, pub normalisation: bool, pub normalisation_pregain: f32, + pub gapless: bool, } impl Default for PlayerConfig { @@ -38,6 +39,7 @@ impl Default for PlayerConfig { bitrate: Bitrate::default(), normalisation: false, normalisation_pregain: 0.0, + gapless: true, } } } diff --git a/playback/src/player.rs b/playback/src/player.rs index 38ee00c1..a26d3583 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -1,20 +1,20 @@ use byteorder::{LittleEndian, ReadBytesExt}; use futures; -use futures::sync::oneshot; -use futures::{future, Future}; +use futures::{future, Async, Future, Poll, Stream}; use std; use std::borrow::Cow; use std::cmp::max; use std::io::{Read, Result, Seek, SeekFrom}; use std::mem; -use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::config::{Bitrate, PlayerConfig}; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; +use librespot_core::util::SeqGenerator; + use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use crate::audio::{VorbisDecoder, VorbisPacket}; use crate::audio::{ @@ -25,48 +25,121 @@ use crate::audio_backend::Sink; use crate::metadata::{AudioItem, FileFormat}; use crate::mixer::AudioFilter; +const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; + pub struct Player { - commands: Option>, + commands: Option>, thread_handle: Option>, + play_request_id_generator: SeqGenerator, } struct PlayerInternal { session: Session, config: PlayerConfig, - commands: std::sync::mpsc::Receiver, + commands: futures::sync::mpsc::UnboundedReceiver, state: PlayerState, + preload: PlayerPreload, sink: Box, sink_running: bool, audio_filter: Option>, - event_sender: futures::sync::mpsc::UnboundedSender, + event_senders: Vec>, } enum PlayerCommand { - Load(SpotifyId, bool, u32, oneshot::Sender<()>), + Load { + track_id: SpotifyId, + play_request_id: u64, + play: bool, + position_ms: u32, + }, + Preload { + track_id: SpotifyId, + }, Play, Pause, Stop, Seek(u32), + AddEventSender(futures::sync::mpsc::UnboundedSender), + EmitVolumeSetEvent(u16), } #[derive(Debug, Clone)] pub enum PlayerEvent { - Started { + Stopped { + play_request_id: u64, track_id: SpotifyId, }, - + Loading { + play_request_id: u64, + track_id: SpotifyId, + position_ms: u32, + }, + Started { + play_request_id: u64, + track_id: SpotifyId, + position_ms: u32, + }, Changed { old_track_id: SpotifyId, new_track_id: SpotifyId, }, - - Stopped { + Playing { + play_request_id: u64, track_id: SpotifyId, + position_ms: u32, + duration_ms: u32, + }, + Paused { + play_request_id: u64, + track_id: SpotifyId, + position_ms: u32, + duration_ms: u32, + }, + TimeToPreloadNextTrack { + play_request_id: u64, + track_id: SpotifyId, + }, + EndOfTrack { + play_request_id: u64, + track_id: SpotifyId, + }, + VolumeSet { + volume: u16, }, } -type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver; +impl PlayerEvent { + pub fn get_play_request_id(&self) -> Option { + use PlayerEvent::*; + match self { + Loading { + play_request_id, .. + } + | Started { + play_request_id, .. + } + | Playing { + play_request_id, .. + } + | TimeToPreloadNextTrack { + play_request_id, .. + } + | EndOfTrack { + play_request_id, .. + } + | Paused { + play_request_id, .. + } + | Stopped { + play_request_id, .. + } => Some(*play_request_id), + Changed { .. } | VolumeSet { .. } => None, + } + } +} + +pub type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver; #[derive(Clone, Copy, Debug)] struct NormalisationData { @@ -125,7 +198,7 @@ impl Player { where F: FnOnce() -> Box + Send + 'static, { - let (cmd_tx, cmd_rx) = std::sync::mpsc::channel(); + let (cmd_tx, cmd_rx) = futures::sync::mpsc::unbounded(); let (event_sender, event_receiver) = futures::sync::mpsc::unbounded(); let handle = thread::spawn(move || { @@ -137,38 +210,47 @@ impl Player { commands: cmd_rx, state: PlayerState::Stopped, + preload: PlayerPreload::None, sink: sink_builder(), sink_running: false, audio_filter: audio_filter, - event_sender: event_sender, + event_senders: [event_sender].to_vec(), }; - internal.run(); + // While PlayerInternal is written as a future, it still contains blocking code. + // It must be run by using wait() in a dedicated thread. + let _ = internal.wait(); + debug!("PlayerInternal thread finished."); }); ( Player { commands: Some(cmd_tx), thread_handle: Some(handle), + play_request_id_generator: SeqGenerator::new(0), }, event_receiver, ) } fn command(&self, cmd: PlayerCommand) { - self.commands.as_ref().unwrap().send(cmd).unwrap(); + self.commands.as_ref().unwrap().unbounded_send(cmd).unwrap(); } - pub fn load( - &self, - track: SpotifyId, - start_playing: bool, - position_ms: u32, - ) -> oneshot::Receiver<()> { - let (tx, rx) = oneshot::channel(); - self.command(PlayerCommand::Load(track, start_playing, position_ms, tx)); + pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 { + let play_request_id = self.play_request_id_generator.get(); + self.command(PlayerCommand::Load { + track_id, + play_request_id, + play: start_playing, + position_ms, + }); - rx + play_request_id + } + + pub fn preload(&self, track_id: SpotifyId) { + self.command(PlayerCommand::Preload { track_id }); } pub fn play(&self) { @@ -186,6 +268,29 @@ impl Player { pub fn seek(&self, position_ms: u32) { self.command(PlayerCommand::Seek(position_ms)); } + + pub fn get_player_event_channel(&self) -> PlayerEventChannel { + let (event_sender, event_receiver) = futures::sync::mpsc::unbounded(); + self.command(PlayerCommand::AddEventSender(event_sender)); + event_receiver + } + + pub fn get_end_of_track_future(&self) -> Box> { + let result = self + .get_player_event_channel() + .filter(|event| match event { + PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } => true, + _ => false, + }) + .into_future() + .map_err(|_| ()) + .map(|_| ()); + Box::new(result) + } + + pub fn emit_volume_set_event(&self, volume: u16) { + self.command(PlayerCommand::EmitVolumeSetEvent(volume)); + } } impl Drop for Player { @@ -201,27 +306,64 @@ impl Drop for Player { } } +struct PlayerLoadedTrackData { + decoder: Decoder, + normalisation_factor: f32, + stream_loader_controller: StreamLoaderController, + bytes_per_second: usize, + duration_ms: u32, + stream_position_pcm: u64, +} + +enum PlayerPreload { + None, + Loading { + track_id: SpotifyId, + loader: Box>, + }, + Ready { + track_id: SpotifyId, + loaded_track: PlayerLoadedTrackData, + }, +} + type Decoder = VorbisDecoder>>; + enum PlayerState { Stopped, + Loading { + track_id: SpotifyId, + play_request_id: u64, + start_playback: bool, + loader: Box>, + }, Paused { track_id: SpotifyId, + play_request_id: u64, decoder: Decoder, - end_of_track: oneshot::Sender<()>, normalisation_factor: f32, stream_loader_controller: StreamLoaderController, bytes_per_second: usize, + duration_ms: u32, + stream_position_pcm: u64, + suggested_to_preload_next_track: bool, }, Playing { track_id: SpotifyId, + play_request_id: u64, decoder: Decoder, - end_of_track: oneshot::Sender<()>, normalisation_factor: f32, stream_loader_controller: StreamLoaderController, bytes_per_second: usize, + duration_ms: u32, + stream_position_pcm: u64, + reported_nominal_start_time: Option, + suggested_to_preload_next_track: bool, }, EndOfTrack { track_id: SpotifyId, + play_request_id: u64, + loaded_track: Option, }, Invalid, } @@ -230,16 +372,24 @@ impl PlayerState { fn is_playing(&self) -> bool { use self::PlayerState::*; match *self { - Stopped | EndOfTrack { .. } | Paused { .. } => false, + Stopped | EndOfTrack { .. } | Paused { .. } | Loading { .. } => false, Playing { .. } => true, Invalid => panic!("invalid state"), } } + fn is_stopped(&self) -> bool { + use self::PlayerState::*; + match *self { + Stopped => true, + _ => false, + } + } + fn decoder(&mut self) -> Option<&mut Decoder> { use self::PlayerState::*; match *self { - Stopped | EndOfTrack { .. } => None, + Stopped | EndOfTrack { .. } | Loading { .. } => None, Paused { ref mut decoder, .. } @@ -253,7 +403,7 @@ impl PlayerState { fn stream_loader_controller(&mut self) -> Option<&mut StreamLoaderController> { use self::PlayerState::*; match *self { - Stopped | EndOfTrack { .. } => None, + Stopped | EndOfTrack { .. } | Loading { .. } => None, Paused { ref mut stream_loader_controller, .. @@ -271,11 +421,27 @@ impl PlayerState { match mem::replace(self, Invalid) { Playing { track_id, - end_of_track, + play_request_id, + decoder, + duration_ms, + bytes_per_second, + normalisation_factor, + stream_loader_controller, + stream_position_pcm, .. } => { - let _ = end_of_track.send(()); - *self = EndOfTrack { track_id }; + *self = EndOfTrack { + track_id, + play_request_id, + loaded_track: Some(PlayerLoadedTrackData { + decoder, + duration_ms, + bytes_per_second, + normalisation_factor, + stream_loader_controller, + stream_position_pcm, + }), + }; } _ => panic!("Called playing_to_end_of_track in non-playing state."), } @@ -286,19 +452,26 @@ impl PlayerState { match ::std::mem::replace(self, Invalid) { Paused { track_id, + play_request_id, decoder, - end_of_track, normalisation_factor, stream_loader_controller, + duration_ms, bytes_per_second, + stream_position_pcm, + suggested_to_preload_next_track, } => { *self = Playing { - track_id: track_id, - decoder: decoder, - end_of_track: end_of_track, - normalisation_factor: normalisation_factor, - stream_loader_controller: stream_loader_controller, - bytes_per_second: bytes_per_second, + track_id, + play_request_id, + decoder, + normalisation_factor, + stream_loader_controller, + duration_ms, + bytes_per_second, + stream_position_pcm, + reported_nominal_start_time: None, + suggested_to_preload_next_track, }; } _ => panic!("invalid state"), @@ -310,19 +483,26 @@ impl PlayerState { match ::std::mem::replace(self, Invalid) { Playing { track_id, + play_request_id, decoder, - end_of_track, normalisation_factor, stream_loader_controller, + duration_ms, bytes_per_second, + stream_position_pcm, + reported_nominal_start_time: _, + suggested_to_preload_next_track, } => { *self = Paused { - track_id: track_id, - decoder: decoder, - end_of_track: end_of_track, - normalisation_factor: normalisation_factor, - stream_loader_controller: stream_loader_controller, - bytes_per_second: bytes_per_second, + track_id, + play_request_id, + decoder, + normalisation_factor, + stream_loader_controller, + duration_ms, + bytes_per_second, + stream_position_pcm, + suggested_to_preload_next_track, }; } _ => panic!("invalid state"), @@ -330,269 +510,12 @@ impl PlayerState { } } -impl PlayerInternal { - fn run(mut self) { - loop { - let cmd = if self.state.is_playing() { - if self.sink_running { - match self.commands.try_recv() { - Ok(cmd) => Some(cmd), - Err(TryRecvError::Empty) => None, - Err(TryRecvError::Disconnected) => return, - } - } else { - match self.commands.recv_timeout(Duration::from_secs(5)) { - Ok(cmd) => Some(cmd), - Err(RecvTimeoutError::Timeout) => None, - Err(RecvTimeoutError::Disconnected) => return, - } - } - } else { - match self.commands.recv() { - Ok(cmd) => Some(cmd), - Err(RecvError) => return, - } - }; - - if let Some(cmd) = cmd { - self.handle_command(cmd); - } - - if self.state.is_playing() && !self.sink_running { - self.start_sink(); - } - - if self.sink_running { - let mut current_normalisation_factor: f32 = 1.0; - - let packet = if let PlayerState::Playing { - ref mut decoder, - normalisation_factor, - .. - } = self.state - { - current_normalisation_factor = normalisation_factor; - Some(decoder.next_packet().expect("Vorbis error")) - } else { - None - }; - - if let Some(packet) = packet { - self.handle_packet(packet, current_normalisation_factor); - } - } - - if self.session.is_invalid() { - return; - } - } - } - - fn start_sink(&mut self) { - match self.sink.start() { - Ok(()) => self.sink_running = true, - Err(err) => error!("Could not start audio: {}", err), - } - } - - fn stop_sink_if_running(&mut self) { - if self.sink_running { - self.stop_sink(); - } - } - - fn stop_sink(&mut self) { - self.sink.stop().unwrap(); - self.sink_running = false; - } - - fn handle_packet(&mut self, packet: Option, normalisation_factor: f32) { - match packet { - Some(mut packet) => { - if packet.data().len() > 0 { - if let Some(ref editor) = self.audio_filter { - editor.modify_stream(&mut packet.data_mut()) - }; - - if self.config.normalisation && normalisation_factor != 1.0 { - for x in packet.data_mut().iter_mut() { - *x = (*x as f32 * normalisation_factor) as i16; - } - } - - if let Err(err) = self.sink.write(&packet.data()) { - error!("Could not write audio: {}", err); - self.stop_sink(); - } - } - } - - None => { - self.stop_sink(); - self.state.playing_to_end_of_track(); - } - } - } - - fn handle_command(&mut self, cmd: PlayerCommand) { - debug!("command={:?}", cmd); - match cmd { - PlayerCommand::Load(track_id, play, position, end_of_track) => { - if self.state.is_playing() { - self.stop_sink_if_running(); - } - - match self.load_track(track_id, position as i64) { - Some(( - decoder, - normalisation_factor, - stream_loader_controller, - bytes_per_second, - )) => { - if play { - match self.state { - PlayerState::Playing { - track_id: old_track_id, - .. - } - | PlayerState::EndOfTrack { - track_id: old_track_id, - .. - } => self.send_event(PlayerEvent::Changed { - old_track_id: old_track_id, - new_track_id: track_id, - }), - _ => self.send_event(PlayerEvent::Started { track_id }), - } - - self.start_sink(); - - self.state = PlayerState::Playing { - track_id: track_id, - decoder: decoder, - end_of_track: end_of_track, - normalisation_factor: normalisation_factor, - stream_loader_controller: stream_loader_controller, - bytes_per_second: bytes_per_second, - }; - } else { - self.state = PlayerState::Paused { - track_id: track_id, - decoder: decoder, - end_of_track: end_of_track, - normalisation_factor: normalisation_factor, - stream_loader_controller: stream_loader_controller, - bytes_per_second: bytes_per_second, - }; - match self.state { - PlayerState::Playing { - track_id: old_track_id, - .. - } - | PlayerState::EndOfTrack { - track_id: old_track_id, - .. - } => self.send_event(PlayerEvent::Changed { - old_track_id: old_track_id, - new_track_id: track_id, - }), - _ => (), - } - self.send_event(PlayerEvent::Stopped { track_id }); - } - } - - None => { - let _ = end_of_track.send(()); - } - } - } - - PlayerCommand::Seek(position) => { - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - stream_loader_controller.set_random_access_mode(); - } - if let Some(decoder) = self.state.decoder() { - match decoder.seek(position as i64) { - Ok(_) => (), - Err(err) => error!("Vorbis error: {:?}", err), - } - } else { - warn!("Player::seek called from invalid state"); - } - - // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - stream_loader_controller.set_stream_mode(); - } - if let PlayerState::Playing { - bytes_per_second, .. - } = self.state - { - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - // Request our read ahead range - let request_data_length = max( - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS - * (0.001 * stream_loader_controller.ping_time_ms() as f64) - * bytes_per_second as f64) as usize, - (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, - ); - stream_loader_controller.fetch_next(request_data_length); - - // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. - let wait_for_data_length = max( - (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS - * (0.001 * stream_loader_controller.ping_time_ms() as f64) - * bytes_per_second as f64) as usize, - (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, - ); - stream_loader_controller.fetch_next_blocking(wait_for_data_length); - } - } - } - - PlayerCommand::Play => { - if let PlayerState::Paused { track_id, .. } = self.state { - self.state.paused_to_playing(); - - self.send_event(PlayerEvent::Started { track_id }); - self.start_sink(); - } else { - warn!("Player::play called from invalid state"); - } - } - - PlayerCommand::Pause => { - if let PlayerState::Playing { track_id, .. } = self.state { - self.state.playing_to_paused(); - - self.stop_sink_if_running(); - self.send_event(PlayerEvent::Stopped { track_id }); - } else { - warn!("Player::pause called from invalid state"); - } - } - - PlayerCommand::Stop => match self.state { - PlayerState::Playing { track_id, .. } - | PlayerState::Paused { track_id, .. } - | PlayerState::EndOfTrack { track_id } => { - self.stop_sink_if_running(); - self.send_event(PlayerEvent::Stopped { track_id }); - self.state = PlayerState::Stopped; - } - PlayerState::Stopped => { - warn!("Player::stop called from invalid state"); - } - PlayerState::Invalid => panic!("invalid state"), - }, - } - } - - fn send_event(&mut self, event: PlayerEvent) { - let _ = self.event_sender.unbounded_send(event.clone()); - } +struct PlayerTrackLoader { + session: Session, + config: PlayerConfig, +} +impl PlayerTrackLoader { fn find_available_alternative<'a>(&self, audio: &'a AudioItem) -> Option> { if audio.available { Some(Cow::Borrowed(audio)) @@ -631,11 +554,7 @@ impl PlayerInternal { } } - fn load_track( - &self, - spotify_id: SpotifyId, - position: i64, - ) -> Option<(Decoder, f32, StreamLoaderController, usize)> { + fn load_track(&self, spotify_id: SpotifyId, position_ms: u32) -> Option { let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { Ok(audio) => audio, Err(_) => { @@ -653,6 +572,10 @@ impl PlayerInternal { return None; } }; + + assert!(audio.duration >= 0); + let duration_ms = audio.duration as u32; + // (Most) podcasts seem to support only 96 bit Vorbis, so fall back to it let formats = match self.config.bitrate { Bitrate::Bitrate96 => [ @@ -685,7 +608,7 @@ impl PlayerInternal { }; let bytes_per_second = self.stream_data_rate(*format); - let play_from_beginning = position == 0; + let play_from_beginning = position_ms == 0; let key = self.session.audio_key().request(spotify_id, file_id); let encrypted_file = AudioFile::open( @@ -737,42 +660,865 @@ impl PlayerInternal { let mut decoder = VorbisDecoder::new(audio_file).unwrap(); - if position != 0 { - match decoder.seek(position) { + if position_ms != 0 { + match decoder.seek(position_ms as i64) { Ok(_) => (), Err(err) => error!("Vorbis error: {:?}", err), } stream_loader_controller.set_stream_mode(); } - info!("<{}> loaded", audio.name); - Some(( + let stream_position_pcm = PlayerInternal::position_ms_to_pcm(position_ms); + info!("<{}> ({} ms) loaded", audio.name, audio.duration); + Some(PlayerLoadedTrackData { decoder, normalisation_factor, stream_loader_controller, bytes_per_second, - )) + duration_ms, + stream_position_pcm, + }) + } +} + +impl Future for PlayerInternal { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + // While this is written as a future, it still contains blocking code. + // It must be run on its own thread. + + loop { + let mut all_futures_completed_or_not_ready = true; + + // process commands that were sent to us + let cmd = match self.commands.poll() { + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), // client has disconnected - shut down. + Ok(Async::Ready(Some(cmd))) => { + all_futures_completed_or_not_ready = false; + Some(cmd) + } + Ok(Async::NotReady) => None, + Err(_) => None, + }; + + if let Some(cmd) = cmd { + self.handle_command(cmd); + } + + // Handle loading of a new track to play + if let PlayerState::Loading { + ref mut loader, + track_id, + start_playback, + play_request_id, + } = self.state + { + match loader.poll() { + Ok(Async::Ready(loaded_track)) => { + self.start_playback( + track_id, + play_request_id, + loaded_track, + start_playback, + ); + if let PlayerState::Loading { .. } = self.state { + panic!("The state wasn't changed by start_playback()"); + } + } + Ok(Async::NotReady) => (), + Err(_) => { + self.handle_player_stop(); + assert!(self.state.is_stopped()); + } + } + } + + // handle pending preload requests. + if let PlayerPreload::Loading { + ref mut loader, + track_id, + } = self.preload + { + match loader.poll() { + Ok(Async::Ready(loaded_track)) => { + self.preload = PlayerPreload::Ready { + track_id, + loaded_track, + }; + } + Ok(Async::NotReady) => (), + Err(_) => { + self.preload = PlayerPreload::None; + } + } + } + + if self.state.is_playing() { + self.ensure_sink_running(); + + if let PlayerState::Playing { + track_id, + play_request_id, + ref mut decoder, + normalisation_factor, + ref mut stream_position_pcm, + ref mut reported_nominal_start_time, + duration_ms, + .. + } = self.state + { + let packet = decoder.next_packet().expect("Vorbis error"); + + if let Some(ref packet) = packet { + *stream_position_pcm = + *stream_position_pcm + (packet.data().len() / 2) as u64; + let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm); + + let notify_about_position = match *reported_nominal_start_time { + None => true, + Some(reported_nominal_start_time) => { + // only notify if we're behind. If we're ahead it's probably due to a buffer of the backend and we;re actually in time. + let lag = (Instant::now() - reported_nominal_start_time).as_millis() + as i64 + - stream_position_millis as i64; + if lag > 1000 { + true + } else { + false + } + } + }; + if notify_about_position { + *reported_nominal_start_time = Some( + Instant::now() + - Duration::from_millis(stream_position_millis as u64), + ); + self.send_event(PlayerEvent::Playing { + track_id, + play_request_id, + position_ms: stream_position_millis as u32, + duration_ms, + }); + } + } + + self.handle_packet(packet, normalisation_factor); + } else { + unreachable!(); + }; + } + + if let PlayerState::Playing { + track_id, + play_request_id, + duration_ms, + stream_position_pcm, + ref mut stream_loader_controller, + ref mut suggested_to_preload_next_track, + .. + } + | PlayerState::Paused { + track_id, + play_request_id, + duration_ms, + stream_position_pcm, + ref mut stream_loader_controller, + ref mut suggested_to_preload_next_track, + .. + } = self.state + { + if (!*suggested_to_preload_next_track) + && ((duration_ms as i64 - Self::position_pcm_to_ms(stream_position_pcm) as i64) + < PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS as i64) + && stream_loader_controller.range_to_end_available() + { + *suggested_to_preload_next_track = true; + self.send_event(PlayerEvent::TimeToPreloadNextTrack { + track_id, + play_request_id, + }); + } + } + + if self.session.is_invalid() { + return Ok(Async::Ready(())); + } + + if (!self.state.is_playing()) && all_futures_completed_or_not_ready { + return Ok(Async::NotReady); + } + } + } +} + +impl PlayerInternal { + fn position_pcm_to_ms(position_pcm: u64) -> u32 { + (position_pcm * 10 / 441) as u32 + } + + fn position_ms_to_pcm(position_ms: u32) -> u64 { + position_ms as u64 * 441 / 10 + } + + fn ensure_sink_running(&mut self) { + if !self.sink_running { + trace!("== Starting sink =="); + match self.sink.start() { + Ok(()) => self.sink_running = true, + Err(err) => error!("Could not start audio: {}", err), + } + } + } + + fn ensure_sink_stopped(&mut self) { + if self.sink_running { + trace!("== Stopping sink =="); + self.sink.stop().unwrap(); + self.sink_running = false; + } + } + + fn handle_player_stop(&mut self) { + match self.state { + PlayerState::Playing { + track_id, + play_request_id, + .. + } + | PlayerState::Paused { + track_id, + play_request_id, + .. + } + | PlayerState::EndOfTrack { + track_id, + play_request_id, + .. + } + | PlayerState::Loading { + track_id, + play_request_id, + .. + } => { + self.ensure_sink_stopped(); + self.send_event(PlayerEvent::Stopped { + track_id, + play_request_id, + }); + self.state = PlayerState::Stopped; + } + PlayerState::Stopped => (), + PlayerState::Invalid => panic!("invalid state"), + } + } + + fn handle_play(&mut self) { + if let PlayerState::Paused { + track_id, + play_request_id, + stream_position_pcm, + duration_ms, + .. + } = self.state + { + self.state.paused_to_playing(); + + let position_ms = Self::position_pcm_to_ms(stream_position_pcm); + self.send_event(PlayerEvent::Playing { + track_id, + play_request_id, + position_ms, + duration_ms, + }); + self.ensure_sink_running(); + } else { + warn!("Player::play called from invalid state"); + } + } + + fn handle_pause(&mut self) { + if let PlayerState::Playing { + track_id, + play_request_id, + stream_position_pcm, + duration_ms, + .. + } = self.state + { + self.state.playing_to_paused(); + + self.ensure_sink_stopped(); + let position_ms = Self::position_pcm_to_ms(stream_position_pcm); + self.send_event(PlayerEvent::Paused { + track_id, + play_request_id, + position_ms, + duration_ms, + }); + } else { + warn!("Player::pause called from invalid state"); + } + } + + fn handle_packet(&mut self, packet: Option, normalisation_factor: f32) { + match packet { + Some(mut packet) => { + if packet.data().len() > 0 { + if let Some(ref editor) = self.audio_filter { + editor.modify_stream(&mut packet.data_mut()) + }; + + if self.config.normalisation && normalisation_factor != 1.0 { + for x in packet.data_mut().iter_mut() { + *x = (*x as f32 * normalisation_factor) as i16; + } + } + + if let Err(err) = self.sink.write(&packet.data()) { + error!("Could not write audio: {}", err); + self.ensure_sink_stopped(); + } + } + } + + None => { + self.state.playing_to_end_of_track(); + if let PlayerState::EndOfTrack { + track_id, + play_request_id, + .. + } = self.state + { + self.send_event(PlayerEvent::EndOfTrack { + track_id, + play_request_id, + }) + } else { + unreachable!(); + } + } + } + } + + fn start_playback( + &mut self, + track_id: SpotifyId, + play_request_id: u64, + loaded_track: PlayerLoadedTrackData, + start_playback: bool, + ) { + let position_ms = Self::position_pcm_to_ms(loaded_track.stream_position_pcm); + + if start_playback { + self.ensure_sink_running(); + + self.send_event(PlayerEvent::Playing { + track_id, + play_request_id, + position_ms, + duration_ms: loaded_track.duration_ms, + }); + + self.state = PlayerState::Playing { + track_id: track_id, + play_request_id: play_request_id, + decoder: loaded_track.decoder, + normalisation_factor: loaded_track.normalisation_factor, + stream_loader_controller: loaded_track.stream_loader_controller, + duration_ms: loaded_track.duration_ms, + bytes_per_second: loaded_track.bytes_per_second, + stream_position_pcm: loaded_track.stream_position_pcm, + reported_nominal_start_time: Some( + Instant::now() - Duration::from_millis(position_ms as u64), + ), + suggested_to_preload_next_track: false, + }; + } else { + self.ensure_sink_stopped(); + + self.state = PlayerState::Paused { + track_id: track_id, + play_request_id: play_request_id, + decoder: loaded_track.decoder, + normalisation_factor: loaded_track.normalisation_factor, + stream_loader_controller: loaded_track.stream_loader_controller, + duration_ms: loaded_track.duration_ms, + bytes_per_second: loaded_track.bytes_per_second, + stream_position_pcm: loaded_track.stream_position_pcm, + suggested_to_preload_next_track: false, + }; + + self.send_event(PlayerEvent::Paused { + track_id, + play_request_id, + position_ms, + duration_ms: loaded_track.duration_ms, + }); + } + } + + fn handle_command_load( + &mut self, + track_id: SpotifyId, + play_request_id: u64, + play: bool, + position_ms: u32, + ) { + if !self.config.gapless { + self.ensure_sink_stopped(); + } + // emit the correct player event + match self.state { + PlayerState::Playing { + track_id: old_track_id, + .. + } + | PlayerState::Paused { + track_id: old_track_id, + .. + } + | PlayerState::EndOfTrack { + track_id: old_track_id, + .. + } + | PlayerState::Loading { + track_id: old_track_id, + .. + } => self.send_event(PlayerEvent::Changed { + old_track_id: old_track_id, + new_track_id: track_id, + }), + PlayerState::Stopped => self.send_event(PlayerEvent::Started { + track_id, + play_request_id, + position_ms, + }), + PlayerState::Invalid { .. } => panic!("Player is in an invalid state."), + } + + // Now we check at different positions whether we already have a pre-loaded version + // of this track somewhere. If so, use it and return. + + // Check if there's a matching loaded track in the EndOfTrack player state. + // This is the case if we're repeating the same track again. + if let PlayerState::EndOfTrack { + track_id: previous_track_id, + ref mut loaded_track, + .. + } = self.state + { + if previous_track_id == track_id { + let loaded_track = mem::replace(&mut *loaded_track, None); + if let Some(mut loaded_track) = loaded_track { + if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm { + loaded_track + .stream_loader_controller + .set_random_access_mode(); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking. + // But most likely the track is fully + // loaded already because we played + // to the end of it. + loaded_track.stream_loader_controller.set_stream_mode(); + loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms); + } + self.preload = PlayerPreload::None; + self.start_playback(track_id, play_request_id, loaded_track, play); + return; + } + } + } + + // Check if we are already playing the track. If so, just do a seek and update our info. + if let PlayerState::Playing { + track_id: current_track_id, + ref mut stream_position_pcm, + ref mut decoder, + ref mut stream_loader_controller, + .. + } + | PlayerState::Paused { + track_id: current_track_id, + ref mut stream_position_pcm, + ref mut decoder, + ref mut stream_loader_controller, + .. + } = self.state + { + if current_track_id == track_id { + // we can use the current decoder. Ensure it's at the correct position. + if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm { + stream_loader_controller.set_random_access_mode(); + let _ = decoder.seek(position_ms as i64); // This may be blocking. + stream_loader_controller.set_stream_mode(); + *stream_position_pcm = Self::position_ms_to_pcm(position_ms); + } + + // Move the info from the current state into a PlayerLoadedTrackData so we can use + // the usual code path to start playback. + let old_state = mem::replace(&mut self.state, PlayerState::Invalid); + + if let PlayerState::Playing { + stream_position_pcm, + decoder, + stream_loader_controller, + bytes_per_second, + duration_ms, + normalisation_factor, + .. + } + | PlayerState::Paused { + stream_position_pcm, + decoder, + stream_loader_controller, + bytes_per_second, + duration_ms, + normalisation_factor, + .. + } = old_state + { + let loaded_track = PlayerLoadedTrackData { + decoder, + normalisation_factor, + stream_loader_controller, + bytes_per_second, + duration_ms, + stream_position_pcm, + }; + + self.preload = PlayerPreload::None; + self.start_playback(track_id, play_request_id, loaded_track, play); + + if let PlayerState::Invalid = self.state { + panic!("start_playback() hasn't set a valid player state."); + } + + return; + } else { + unreachable!(); + } + } + } + + // Check if the requested track has been preloaded already. If so use the preloaded data. + if let PlayerPreload::Ready { + track_id: loaded_track_id, + .. + } = self.preload + { + if track_id == loaded_track_id { + let preload = std::mem::replace(&mut self.preload, PlayerPreload::None); + if let PlayerPreload::Ready { + track_id, + mut loaded_track, + } = preload + { + if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm { + loaded_track + .stream_loader_controller + .set_random_access_mode(); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking + loaded_track.stream_loader_controller.set_stream_mode(); + } + self.start_playback(track_id, play_request_id, loaded_track, play); + return; + } else { + unreachable!(); + } + } + } + + // We need to load the track - either from scratch or by completing a preload. + // In any case we go into a Loading state to load the track. + self.ensure_sink_stopped(); + + self.send_event(PlayerEvent::Loading { + track_id, + play_request_id, + position_ms, + }); + + // Try to extract a pending loader from the preloading mechanism + let loader = if let PlayerPreload::Loading { + track_id: loaded_track_id, + .. + } = self.preload + { + if (track_id == loaded_track_id) && (position_ms == 0) { + let mut preload = PlayerPreload::None; + std::mem::swap(&mut preload, &mut self.preload); + if let PlayerPreload::Loading { loader, .. } = preload { + Some(loader) + } else { + None + } + } else { + None + } + } else { + None + }; + + self.preload = PlayerPreload::None; + + // If we don't have a loader yet, create one from scratch. + let loader = loader + .or_else(|| Some(self.load_track(track_id, position_ms))) + .unwrap(); + + // Set ourselves to a loading state. + self.state = PlayerState::Loading { + track_id, + play_request_id, + start_playback: play, + loader, + }; + } + + fn handle_command_preload(&mut self, track_id: SpotifyId) { + debug!("Preloading track"); + let mut preload_track = true; + + // check whether the track is already loaded somewhere or being loaded. + if let PlayerPreload::Loading { + track_id: currently_loading, + .. + } + | PlayerPreload::Ready { + track_id: currently_loading, + .. + } = self.preload + { + if currently_loading == track_id { + // we're already preloading the requested track. + preload_track = false; + } else { + // we're preloading something else - cancel it. + self.preload = PlayerPreload::None; + } + } + + if let PlayerState::Playing { + track_id: current_track_id, + .. + } + | PlayerState::Paused { + track_id: current_track_id, + .. + } + | PlayerState::EndOfTrack { + track_id: current_track_id, + .. + } = self.state + { + if current_track_id == track_id { + // we already have the requested track loaded. + preload_track = false; + } + } + + // schedule the preload if the current track if desired. + if preload_track { + let loader = self.load_track(track_id, 0); + self.preload = PlayerPreload::Loading { track_id, loader } + } + } + + fn handle_command_seek(&mut self, position_ms: u32) { + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_random_access_mode(); + } + if let Some(decoder) = self.state.decoder() { + match decoder.seek(position_ms as i64) { + Ok(_) => { + if let PlayerState::Playing { + ref mut stream_position_pcm, + .. + } + | PlayerState::Paused { + ref mut stream_position_pcm, + .. + } = self.state + { + *stream_position_pcm = Self::position_ms_to_pcm(position_ms); + } + } + Err(err) => error!("Vorbis error: {:?}", err), + } + } else { + warn!("Player::seek called from invalid state"); + } + + // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_stream_mode(); + } + + // ensure we have a bit of a buffer of downloaded data + self.preload_data_before_playback(); + + if let PlayerState::Playing { + track_id, + play_request_id, + ref mut reported_nominal_start_time, + duration_ms, + .. + } = self.state + { + *reported_nominal_start_time = + Some(Instant::now() - Duration::from_millis(position_ms as u64)); + self.send_event(PlayerEvent::Playing { + track_id, + play_request_id, + position_ms, + duration_ms, + }); + } + if let PlayerState::Paused { + track_id, + play_request_id, + duration_ms, + .. + } = self.state + { + self.send_event(PlayerEvent::Paused { + track_id, + play_request_id, + position_ms, + duration_ms, + }); + } + } + + fn handle_command(&mut self, cmd: PlayerCommand) { + debug!("command={:?}", cmd); + match cmd { + PlayerCommand::Load { + track_id, + play_request_id, + play, + position_ms, + } => self.handle_command_load(track_id, play_request_id, play, position_ms), + + PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id), + + PlayerCommand::Seek(position_ms) => self.handle_command_seek(position_ms), + + PlayerCommand::Play => self.handle_play(), + + PlayerCommand::Pause => self.handle_pause(), + + PlayerCommand::Stop => self.handle_player_stop(), + + PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender), + + PlayerCommand::EmitVolumeSetEvent(volume) => { + self.send_event(PlayerEvent::VolumeSet { volume }) + } + } + } + + fn send_event(&mut self, event: PlayerEvent) { + let mut index = 0; + while index < self.event_senders.len() { + match self.event_senders[index].unbounded_send(event.clone()) { + Ok(_) => index += 1, + Err(_) => { + self.event_senders.remove(index); + } + } + } + } + + fn load_track( + &self, + spotify_id: SpotifyId, + position_ms: u32, + ) -> Box> { + // This method creates a future that returns the loaded stream and associated info. + // Ideally all work should be done using asynchronous code. However, seek() on the + // audio stream is implemented in a blocking fashion. Thus, we can't turn it into future + // easily. Instead we spawn a thread to do the work and return a one-shot channel as the + // future to work with. + + let loader = PlayerTrackLoader { + session: self.session.clone(), + config: self.config.clone(), + }; + + let (result_tx, result_rx) = futures::sync::oneshot::channel(); + + std::thread::spawn(move || { + loader + .load_track(spotify_id, position_ms) + .and_then(move |data| { + let _ = result_tx.send(data); + Some(()) + }); + }); + + Box::new(result_rx.map_err(|_| ())) + } + + fn preload_data_before_playback(&mut self) { + if let PlayerState::Playing { + bytes_per_second, + ref mut stream_loader_controller, + .. + } = self.state + { + // Request our read ahead range + let request_data_length = max( + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + ); + stream_loader_controller.fetch_next(request_data_length); + + // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. + let wait_for_data_length = max( + (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + ); + stream_loader_controller.fetch_next_blocking(wait_for_data_length); + } } } impl Drop for PlayerInternal { fn drop(&mut self) { - debug!("drop Player[{}]", self.session.session_id()); + debug!("drop PlayerInternal[{}]", self.session.session_id()); } } impl ::std::fmt::Debug for PlayerCommand { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { match *self { - PlayerCommand::Load(track, play, position, _) => f + PlayerCommand::Load { + track_id, + play, + position_ms, + .. + } => f .debug_tuple("Load") - .field(&track) + .field(&track_id) .field(&play) - .field(&position) + .field(&position_ms) .finish(), + PlayerCommand::Preload { track_id } => { + f.debug_tuple("Preload").field(&track_id).finish() + } PlayerCommand::Play => f.debug_tuple("Play").finish(), PlayerCommand::Pause => f.debug_tuple("Pause").finish(), PlayerCommand::Stop => f.debug_tuple("Stop").finish(), PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(), + PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(), + PlayerCommand::EmitVolumeSetEvent(volume) => { + f.debug_tuple("VolumeSet").field(&volume).finish() + } } } } diff --git a/src/main.rs b/src/main.rs index 8ee3b0c4..ef8dbd7b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -180,6 +180,11 @@ fn setup(args: &[String]) -> Setup { "", "autoplay", "autoplay similar songs when your music ends.", + ) + .optflag( + "", + "disable-gapless", + "disable gapless playback.", ); let matches = match opts.parse(&args[1..]) { @@ -287,7 +292,7 @@ fn setup(args: &[String]) -> Setup { |s| { match Url::parse(&s) { Ok(url) => { - if url.host().is_none() || url.port().is_none() { + if url.host().is_none() || url.port_or_known_default().is_none() { panic!("Invalid proxy url, only urls on the format \"http://host:port\" are allowed"); } @@ -312,9 +317,9 @@ fn setup(args: &[String]) -> Setup { .as_ref() .map(|bitrate| Bitrate::from_str(bitrate).expect("Invalid bitrate")) .unwrap_or(Bitrate::default()); - PlayerConfig { bitrate: bitrate, + gapless: !matches.opt_present("disable-gapless"), normalisation: matches.opt_present("enable-volume-normalisation"), normalisation_pregain: matches .opt_str("normalisation-pregain") @@ -539,16 +544,18 @@ impl Future for Main { if let Some(ref mut player_event_channel) = self.player_event_channel { if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() { if let Some(ref program) = self.player_event_program { - let child = run_program_on_events(event, program) - .expect("program failed to start") - .map(|status| { - if !status.success() { - error!("child exited with status {:?}", status.code()); - } - }) - .map_err(|e| error!("failed to wait on child process: {}", e)); + if let Some(child) = run_program_on_events(event, program) { + let child = child + .expect("program failed to start") + .map(|status| { + if !status.success() { + error!("child exited with status {:?}", status.code()); + } + }) + .map_err(|e| error!("failed to wait on child process: {}", e)); - self.handle.spawn(child); + self.handle.spawn(child); + } } } } diff --git a/src/player_event_handler.rs b/src/player_event_handler.rs index 03bae147..2fa34d2b 100644 --- a/src/player_event_handler.rs +++ b/src/player_event_handler.rs @@ -14,7 +14,7 @@ fn run_program(program: &str, env_vars: HashMap<&str, String>) -> io::Result io::Result { +pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option> { let mut env_vars = HashMap::new(); match event { PlayerEvent::Changed { @@ -25,14 +25,15 @@ pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> io::Result { + PlayerEvent::Started { track_id, .. } => { env_vars.insert("PLAYER_EVENT", "start".to_string()); env_vars.insert("TRACK_ID", track_id.to_base62()); } - PlayerEvent::Stopped { track_id } => { + PlayerEvent::Stopped { track_id, .. } => { env_vars.insert("PLAYER_EVENT", "stop".to_string()); env_vars.insert("TRACK_ID", track_id.to_base62()); } + _ => return None, } - run_program(onevent, env_vars) + Some(run_program(onevent, env_vars)) }