From 5784b4652cc082536c2357e2f9da8096b3b488fe Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sat, 1 Feb 2020 08:41:11 +1100 Subject: [PATCH 01/12] Prepare for gapless play. - change communication between player and spirc to use player events channel. - enhance player events channel - have spirc send loading messages to Spotify - enable preloading of tracks in the player --- audio/src/fetch.rs | 9 + connect/src/spirc.rs | 315 ++++++-- examples/play.rs | 49 +- metadata/src/lib.rs | 3 + playback/src/player.rs | 1392 +++++++++++++++++++++++++++-------- src/player_event_handler.rs | 5 +- 6 files changed, 1375 insertions(+), 398 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 1ab6ce78..c47cb4d3 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -144,6 +144,15 @@ impl StreamLoaderController { } } + pub fn range_to_end_available(&self) -> bool { + if let Some(ref shared) = self.stream_shared { + let read_position = shared.read_position.load(atomic::Ordering::Relaxed); + self.range_available(Range::new(read_position, self.len() - read_position)) + } else { + true + } + } + pub fn ping_time_ms(&self) -> usize { if let Some(ref shared) = self.stream_shared { return shared.ping_time_ms.load(atomic::Ordering::Relaxed); diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index aed73aac..758cc5fa 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -2,7 +2,7 @@ use std; use std::time::{SystemTime, UNIX_EPOCH}; use futures::future; -use futures::sync::{mpsc, oneshot}; +use futures::sync::mpsc; use futures::{Async, Future, Poll, Sink, Stream}; use protobuf::{self, Message}; use rand; @@ -11,7 +11,7 @@ use serde_json; use crate::context::StationContext; use crate::playback::mixer::Mixer; -use crate::playback::player::Player; +use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel}; use crate::protocol; use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef}; use librespot_core::config::ConnectConfig; @@ -22,6 +22,14 @@ use librespot_core::util::SeqGenerator; use librespot_core::version; use librespot_core::volume::Volume; +enum SpircPlayStatus { + Stopped, + LoadingPlay { position_ms: u32 }, + LoadingPause { position_ms: u32 }, + Playing { nominal_start_time: i64 }, + Paused { position_ms: u32 }, +} + pub struct SpircTask { player: Player, mixer: Box, @@ -32,11 +40,14 @@ pub struct SpircTask { ident: String, device: DeviceState, state: State, + play_request_id: Option, + mixer_started: bool, + play_status: SpircPlayStatus, subscription: Box>, sender: Box>, commands: mpsc::UnboundedReceiver, - end_of_track: Box>, + player_events: PlayerEventChannel, shutdown: bool, session: Session, @@ -255,6 +266,8 @@ impl Spirc { }; let device = initial_device_state(config); + let player_events = player.get_player_event_channel(); + let mut task = SpircTask { player: player, mixer: mixer, @@ -266,11 +279,14 @@ impl Spirc { device: device, state: initial_state(), + play_request_id: None, + mixer_started: false, + play_status: SpircPlayStatus::Stopped, subscription: subscription, sender: sender, commands: cmd_rx, - end_of_track: Box::new(future::empty()), + player_events: player_events, shutdown: false, session: session.clone(), @@ -350,13 +366,14 @@ impl Future for SpircTask { Async::NotReady => (), } - match self.end_of_track.poll() { - Ok(Async::Ready(())) => { - progress = true; - self.handle_end_of_track(); - } + match self.player_events.poll() { Ok(Async::NotReady) => (), - Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()), + Ok(Async::Ready(None)) => (), + Err(_) => (), + Ok(Async::Ready(Some(event))) => { + progress = true; + self.handle_player_event(event); + } } // TODO: Refactor match self.context_fut.poll() { @@ -431,6 +448,26 @@ impl SpircTask { + (dur.subsec_nanos() / 1000_000) as i64) } + fn ensure_mixer_started(&mut self) { + if !self.mixer_started { + self.mixer.start(); + self.mixer_started = true; + } + } + + fn ensure_mixer_stopped(&mut self) { + if self.mixer_started { + self.mixer.stop(); + self.mixer_started = false; + } + } + + fn update_state_position(&mut self, position_ms: u32) { + let now = self.now_ms(); + self.state.set_position_measured_at(now as u64); + self.state.set_position_ms(position_ms); + } + fn handle_command(&mut self, cmd: SpircCommand) { let active = self.device.get_is_active(); match cmd { @@ -498,14 +535,112 @@ impl SpircTask { } } + fn handle_player_event(&mut self, event: PlayerEvent) { + // we only process events if the play_request_id matches. If it doesn't, it is + // an event that belongs to a previous track and only arrives now due to a race + // condition. In this case we have updated the state already and don't want to + // mess with it. + if let Some(play_request_id) = event.get_play_request_id() { + if Some(play_request_id) == self.play_request_id { + match event { + PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track(), + PlayerEvent::Loading { .. } => (), + PlayerEvent::Playing { position_ms, .. } => { + let new_nominal_start_time = + self.now_ms() - position_ms as i64; + match self.play_status { + SpircPlayStatus::Playing { nominal_start_time } => { + if (nominal_start_time - new_nominal_start_time) + .abs() + > 100 + { + self.update_state_position(position_ms); + self.notify(None); + self.play_status = SpircPlayStatus::Playing { + nominal_start_time: new_nominal_start_time, + }; + } + } + SpircPlayStatus::LoadingPlay { .. } + | SpircPlayStatus::LoadingPause { .. } => { + self.state.set_status(PlayStatus::kPlayStatusPlay); + self.update_state_position(position_ms); + self.notify(None); + self.play_status = SpircPlayStatus::Playing { + nominal_start_time: new_nominal_start_time, + }; + } + _ => (), + }; + trace!("==> kPlayStatusPlay"); + } + PlayerEvent::Paused { + position_ms: new_position_ms, + .. + } => { + match self.play_status { + SpircPlayStatus::Paused { + ref mut position_ms, + } => { + if *position_ms != new_position_ms { + *position_ms = new_position_ms; + self.update_state_position(new_position_ms); + self.notify(None); + } + } + SpircPlayStatus::LoadingPlay { .. } + | SpircPlayStatus::LoadingPause { .. } => { + self.state.set_status(PlayStatus::kPlayStatusPause); + self.update_state_position(new_position_ms); + self.notify(None); + self.play_status = SpircPlayStatus::Paused { + position_ms: new_position_ms, + }; + } + _ => (), + } + trace!("==> kPlayStatusPause"); + } + PlayerEvent::Stopped { .. } => match self.play_status { + SpircPlayStatus::Stopped => (), + _ => { + warn!("The player has stopped unexpentedly."); + self.state.set_status(PlayStatus::kPlayStatusStop); + self.ensure_mixer_stopped(); + self.notify(None); + self.play_status = SpircPlayStatus::Stopped; + } + }, + PlayerEvent::TimeToPreloadNextTrack {..} => match self.play_status { + SpircPlayStatus::Paused {..}|SpircPlayStatus::Playing {..}| SpircPlayStatus::LoadingPause{..}|SpircPlayStatus::LoadingPlay {..} => { + if let Some(track_id) = self.preview_next_track() { + self.player.preload(track_id); + } + } + SpircPlayStatus::Stopped => (), + } + _ => (), + } + } + } + } + fn handle_frame(&mut self, frame: Frame) { + let state_string = match frame.get_state().get_status() { + PlayStatus::kPlayStatusLoading => "kPlayStatusLoading", + PlayStatus::kPlayStatusPause => "kPlayStatusPause", + PlayStatus::kPlayStatusStop => "kPlayStatusStop", + PlayStatus::kPlayStatusPlay => "kPlayStatusPlay", + }; + debug!( - "{:?} {:?} {} {} {}", + "{:?} {:?} {} {} {} {}", frame.get_typ(), frame.get_device_state().get_name(), frame.get_ident(), frame.get_seq_nr(), - frame.get_state_update_id() + frame.get_state_update_id(), + state_string, ); if frame.get_ident() == self.ident @@ -529,18 +664,15 @@ impl SpircTask { self.update_tracks(&frame); if self.state.get_track().len() > 0 { - let now = self.now_ms(); - self.state - .set_position_ms(frame.get_state().get_position_ms()); - self.state.set_position_measured_at(now as u64); - - let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay; - self.load_track(play); + let start_playing = + frame.get_state().get_status() == PlayStatus::kPlayStatusPlay; + self.load_track(start_playing, frame.get_state().get_position_ms()); } else { info!("No more tracks left in queue"); self.state.set_status(PlayStatus::kPlayStatusStop); self.player.stop(); self.mixer.stop(); + self.play_status = SpircPlayStatus::Stopped; } self.notify(None); @@ -607,12 +739,7 @@ impl SpircTask { } MessageType::kMessageTypeSeek => { - let position = frame.get_position(); - - let now = self.now_ms(); - self.state.set_position_ms(position); - self.state.set_position_measured_at(now as u64); - self.player.seek(position); + self.handle_seek(frame.get_position()); self.notify(None); } @@ -631,7 +758,8 @@ impl SpircTask { self.device.set_is_active(false); self.state.set_status(PlayStatus::kPlayStatusStop); self.player.stop(); - self.mixer.stop(); + self.ensure_mixer_stopped(); + self.play_status = SpircPlayStatus::Stopped; } } @@ -640,39 +768,75 @@ impl SpircTask { } fn handle_play(&mut self) { - if self.state.get_status() == PlayStatus::kPlayStatusPause { - self.mixer.start(); - self.player.play(); - self.state.set_status(PlayStatus::kPlayStatusPlay); - let now = self.now_ms(); - self.state.set_position_measured_at(now as u64); + match self.play_status { + SpircPlayStatus::Paused { position_ms } => { + self.ensure_mixer_started(); + self.player.play(); + self.state.set_status(PlayStatus::kPlayStatusPlay); + self.update_state_position(position_ms); + self.play_status = SpircPlayStatus::Playing { + nominal_start_time: self.now_ms() as i64 - position_ms as i64, + }; + } + SpircPlayStatus::LoadingPause { position_ms } => { + self.ensure_mixer_started(); + self.player.play(); + self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; + } + _ => (), } } fn handle_play_pause(&mut self) { - match self.state.get_status() { - PlayStatus::kPlayStatusPlay => self.handle_pause(), - PlayStatus::kPlayStatusPause => self.handle_play(), + match self.play_status { + SpircPlayStatus::Paused { .. } | SpircPlayStatus::LoadingPause { .. } => { + self.handle_play() + } + SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => { + self.handle_play() + } _ => (), } } fn handle_pause(&mut self) { - if self.state.get_status() == PlayStatus::kPlayStatusPlay { - self.player.pause(); - self.mixer.stop(); - self.state.set_status(PlayStatus::kPlayStatusPause); - - let now = self.now_ms() as u64; - let position = self.state.get_position_ms(); - - let diff = now - self.state.get_position_measured_at(); - - self.state.set_position_ms(position + diff as u32); - self.state.set_position_measured_at(now); + match self.play_status { + SpircPlayStatus::Playing { nominal_start_time } => { + self.player.pause(); + self.state.set_status(PlayStatus::kPlayStatusPause); + let position_ms = (self.now_ms() - nominal_start_time) as u32; + self.update_state_position(position_ms); + self.play_status = SpircPlayStatus::Paused { position_ms }; + } + SpircPlayStatus::LoadingPlay { position_ms } => { + self.player.pause(); + self.play_status = SpircPlayStatus::LoadingPause { position_ms }; + } + _ => (), } } + fn handle_seek(&mut self, position_ms: u32) { + self.update_state_position(position_ms); + self.player.seek(position_ms); + let now = self.now_ms(); + match self.play_status { + SpircPlayStatus::Stopped => (), + SpircPlayStatus::LoadingPause { + position_ms: ref mut position, + } + | SpircPlayStatus::LoadingPlay { + position_ms: ref mut position, + } + | SpircPlayStatus::Paused { + position_ms: ref mut position, + } => *position = position_ms, + SpircPlayStatus::Playing { + ref mut nominal_start_time, + } => *nominal_start_time = now - position_ms as i64, + }; + } + fn consume_queued_track(&mut self) -> usize { // Removes current track if it is queued // Returns the index of the next track @@ -687,6 +851,10 @@ impl SpircTask { } } + fn preview_next_track(&mut self) -> Option { + None + } + fn handle_next(&mut self) { let mut new_index = self.consume_queued_track() as u32; let mut continue_playing = true; @@ -720,17 +888,14 @@ impl SpircTask { if tracks_len > 0 { self.state.set_playing_track_index(new_index); - self.state.set_position_ms(0); - let now = self.now_ms(); - self.state.set_position_measured_at(now as u64); - - self.load_track(continue_playing); + self.load_track(continue_playing, 0); } else { info!("Not playing next track because there are no more tracks left in queue."); self.state.set_playing_track_index(0); self.state.set_status(PlayStatus::kPlayStatusStop); self.player.stop(); - self.mixer.stop(); + self.ensure_mixer_stopped(); + self.play_status = SpircPlayStatus::Stopped; } } @@ -765,17 +930,11 @@ impl SpircTask { pos += 1; } - let now = self.now_ms(); self.state.set_playing_track_index(new_index); - self.state.set_position_ms(0); - self.state.set_position_measured_at(now as u64); - self.load_track(true); + self.load_track(true, 0); } else { - let now = self.now_ms(); - self.state.set_position_ms(0); - self.state.set_position_measured_at(now as u64); - self.player.seek(0); + self.handle_seek(0); } } @@ -801,8 +960,15 @@ impl SpircTask { } fn position(&mut self) -> u32 { - let diff = self.now_ms() as u64 - self.state.get_position_measured_at(); - self.state.get_position_ms() + diff as u32 + match self.play_status { + SpircPlayStatus::Stopped => 0, + SpircPlayStatus::LoadingPlay { position_ms } + | SpircPlayStatus::LoadingPause { position_ms } + | SpircPlayStatus::Paused { position_ms } => position_ms, + SpircPlayStatus::Playing { nominal_start_time } => { + (self.now_ms() - nominal_start_time) as u32 + } + } } fn resolve_station( @@ -912,7 +1078,7 @@ impl SpircTask { }) } - fn load_track(&mut self, play: bool) { + fn load_track(&mut self, start_playing: bool, position_ms: u32) { let context_uri = self.state.get_context_uri().to_owned(); let mut index = self.state.get_playing_track_index(); let start_index = index; @@ -949,16 +1115,15 @@ impl SpircTask { } .expect("Invalid SpotifyId"); - let position = self.state.get_position_ms(); - let end_of_track = self.player.load(track, play, position); + self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); - if play { - self.state.set_status(PlayStatus::kPlayStatusPlay); + self.update_state_position(position_ms); + self.state.set_status(PlayStatus::kPlayStatusLoading); + if start_playing { + self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; } else { - self.state.set_status(PlayStatus::kPlayStatusPause); + self.play_status = SpircPlayStatus::LoadingPause { position_ms }; } - - self.end_of_track = Box::new(end_of_track); } fn hello(&mut self) { @@ -966,6 +1131,13 @@ impl SpircTask { } fn notify(&mut self, recipient: Option<&str>) { + let status_string = match self.state.get_status() { + PlayStatus::kPlayStatusLoading => "kPlayStatusLoading", + PlayStatus::kPlayStatusPause => "kPlayStatusPause", + PlayStatus::kPlayStatusStop => "kPlayStatusStop", + PlayStatus::kPlayStatusPlay => "kPlayStatusPlay", + }; + trace!("Sending status to server: [{}]", status_string); let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify); if let Some(s) = recipient { cs = cs.recipient(&s); @@ -980,6 +1152,7 @@ impl SpircTask { if let Some(cache) = self.session.cache() { cache.save_volume(Volume { volume }) } + self.player.emit_volume_set_event(volume); } } diff --git a/examples/play.rs b/examples/play.rs index 6888ebb4..acefe570 100644 --- a/examples/play.rs +++ b/examples/play.rs @@ -7,8 +7,53 @@ use librespot::core::session::Session; use librespot::core::spotify_id::SpotifyId; use librespot::playback::config::PlayerConfig; +use futures::stream::Stream; +use futures::{Async, Future, Poll}; use librespot::playback::audio_backend; -use librespot::playback::player::Player; +use librespot::playback::player::{Player, PlayerEvent, PlayerEventChannel}; + +pub struct SingleTrackPlayer { + play_request_id: u64, + event_channel: PlayerEventChannel, +} + +impl SingleTrackPlayer { + pub fn new(ref mut player: Player, track_id: SpotifyId) -> SingleTrackPlayer { + let event_channel = player.get_player_event_channel(); + let play_request_id = player.load(track_id, true, 0); + SingleTrackPlayer { + play_request_id, + event_channel, + } + } +} + +impl Future for SingleTrackPlayer { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + match self.event_channel.poll().unwrap() { + Async::NotReady => return Ok(Async::NotReady), + Async::Ready(None) => return Ok(Async::Ready(())), + Async::Ready(Some(event)) => match event { + PlayerEvent::EndOfTrack { + play_request_id, .. + } + | PlayerEvent::Stopped { + play_request_id, .. + } => { + if play_request_id == self.play_request_id { + return Ok(Async::Ready(())); + } + } + _ => (), + }, + } + } + } +} fn main() { let mut core = Core::new().unwrap(); @@ -39,7 +84,7 @@ fn main() { }); println!("Playing..."); - core.run(player.load(track, true, 0)).unwrap(); + core.run(SingleTrackPlayer::new(player, track)).unwrap(); println!("Done"); } diff --git a/metadata/src/lib.rs b/metadata/src/lib.rs index d4bd797c..8bd422ce 100644 --- a/metadata/src/lib.rs +++ b/metadata/src/lib.rs @@ -63,6 +63,7 @@ pub struct AudioItem { pub uri: String, pub files: LinearMap, pub name: String, + pub duration: i32, pub available: bool, pub alternatives: Option>, } @@ -100,6 +101,7 @@ impl AudioFiles for Track { uri: format!("spotify:track:{}", id.to_base62()), files: item.files, name: item.name, + duration: item.duration, available: item.available, alternatives: Some(item.alternatives), }) @@ -118,6 +120,7 @@ impl AudioFiles for Episode { uri: format!("spotify:episode:{}", id.to_base62()), files: item.files, name: item.name, + duration: item.duration, available: item.available, alternatives: None, }) diff --git a/playback/src/player.rs b/playback/src/player.rs index 38ee00c1..0d2c1efb 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -1,20 +1,20 @@ use byteorder::{LittleEndian, ReadBytesExt}; use futures; -use futures::sync::oneshot; -use futures::{future, Future}; +use futures::{future, Async, Future, Poll, Stream}; use std; use std::borrow::Cow; use std::cmp::max; use std::io::{Read, Result, Seek, SeekFrom}; use std::mem; -use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; use std::thread; -use std::time::Duration; +use std::time::{Duration, Instant}; use crate::config::{Bitrate, PlayerConfig}; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; +use librespot_core::util::SeqGenerator; + use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use crate::audio::{VorbisDecoder, VorbisPacket}; use crate::audio::{ @@ -25,48 +25,121 @@ use crate::audio_backend::Sink; use crate::metadata::{AudioItem, FileFormat}; use crate::mixer::AudioFilter; +const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; + pub struct Player { - commands: Option>, + commands: Option>, thread_handle: Option>, + play_request_id_generator: SeqGenerator, } struct PlayerInternal { session: Session, config: PlayerConfig, - commands: std::sync::mpsc::Receiver, + commands: futures::sync::mpsc::UnboundedReceiver, state: PlayerState, + preload: PlayerPreload, sink: Box, sink_running: bool, audio_filter: Option>, - event_sender: futures::sync::mpsc::UnboundedSender, + event_senders: Vec>, } enum PlayerCommand { - Load(SpotifyId, bool, u32, oneshot::Sender<()>), + Load { + track_id: SpotifyId, + play_request_id: u64, + play: bool, + position_ms: u32, + }, + Preload { + track_id: SpotifyId, + }, Play, Pause, Stop, Seek(u32), + AddEventSender(futures::sync::mpsc::UnboundedSender), + EmitVolumeSetEvent(u16), } #[derive(Debug, Clone)] pub enum PlayerEvent { - Started { + Loading { + play_request_id: u64, track_id: SpotifyId, + position_ms: u32, + }, + Started { + play_request_id: u64, + track_id: SpotifyId, + position_ms: u32, + }, + Playing { + play_request_id: u64, + track_id: SpotifyId, + position_ms: u32, + duration_ms: u32, }, - Changed { old_track_id: SpotifyId, new_track_id: SpotifyId, }, - - Stopped { + TimeToPreloadNextTrack { + play_request_id: u64, track_id: SpotifyId, }, + EndOfTrack { + play_request_id: u64, + track_id: SpotifyId, + }, + Paused { + play_request_id: u64, + track_id: SpotifyId, + position_ms: u32, + duration_ms: u32, + }, + Stopped { + play_request_id: u64, + track_id: SpotifyId, + }, + VolumeSet { + volume: u16, + }, } -type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver; +impl PlayerEvent { + pub fn get_play_request_id(&self) -> Option { + use PlayerEvent::*; + match self { + Loading { + play_request_id, .. + } + | Started { + play_request_id, .. + } + | Playing { + play_request_id, .. + } + | TimeToPreloadNextTrack { + play_request_id, .. + } + | EndOfTrack { + play_request_id, .. + } + | Paused { + play_request_id, .. + } + | Stopped { + play_request_id, .. + } => Some(*play_request_id), + Changed { .. } | VolumeSet { .. } => None, + } + } +} + +pub type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver; #[derive(Clone, Copy, Debug)] struct NormalisationData { @@ -125,7 +198,7 @@ impl Player { where F: FnOnce() -> Box + Send + 'static, { - let (cmd_tx, cmd_rx) = std::sync::mpsc::channel(); + let (cmd_tx, cmd_rx) = futures::sync::mpsc::unbounded(); let (event_sender, event_receiver) = futures::sync::mpsc::unbounded(); let handle = thread::spawn(move || { @@ -137,38 +210,47 @@ impl Player { commands: cmd_rx, state: PlayerState::Stopped, + preload: PlayerPreload::None, sink: sink_builder(), sink_running: false, audio_filter: audio_filter, - event_sender: event_sender, + event_senders: [event_sender].to_vec(), }; - internal.run(); + let _ = internal.wait(); + debug!("PlayerInternal thread finished."); }); ( Player { commands: Some(cmd_tx), thread_handle: Some(handle), + play_request_id_generator: SeqGenerator::new(0), }, event_receiver, ) } fn command(&self, cmd: PlayerCommand) { - self.commands.as_ref().unwrap().send(cmd).unwrap(); + self.commands.as_ref().unwrap().unbounded_send(cmd).unwrap(); } - pub fn load( - &self, - track: SpotifyId, - start_playing: bool, - position_ms: u32, - ) -> oneshot::Receiver<()> { - let (tx, rx) = oneshot::channel(); - self.command(PlayerCommand::Load(track, start_playing, position_ms, tx)); + pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 { + let play_request_id = self.play_request_id_generator.get(); + self.command(PlayerCommand::Load { + track_id, + play_request_id, + play: start_playing, + position_ms, + }); - rx + play_request_id + } + + pub fn preload(&mut self, track_id: SpotifyId) { + self.command(PlayerCommand::Preload { + track_id, + }); } pub fn play(&self) { @@ -186,6 +268,16 @@ impl Player { pub fn seek(&self, position_ms: u32) { self.command(PlayerCommand::Seek(position_ms)); } + + pub fn get_player_event_channel(&self) -> PlayerEventChannel { + let (event_sender, event_receiver) = futures::sync::mpsc::unbounded(); + self.command(PlayerCommand::AddEventSender(event_sender)); + event_receiver + } + + pub fn emit_volume_set_event(&self, volume: u16) { + self.command(PlayerCommand::EmitVolumeSetEvent(volume)); + } } impl Drop for Player { @@ -201,27 +293,63 @@ impl Drop for Player { } } +struct PlayerLoadedTrackData { + decoder: Decoder, + normalisation_factor: f32, + stream_loader_controller: StreamLoaderController, + bytes_per_second: usize, + duration_ms: u32, + stream_position: u64, +} + +enum PlayerPreload { + None, + Loading { + track_id: SpotifyId, + loader: Box>, + }, + Ready { + track_id: SpotifyId, + loaded_track: PlayerLoadedTrackData, + }, +} + type Decoder = VorbisDecoder>>; + enum PlayerState { Stopped, + Loading { + track_id: SpotifyId, + play_request_id: u64, + start_playback: bool, + loader: Box>, + }, Paused { track_id: SpotifyId, + play_request_id: u64, decoder: Decoder, - end_of_track: oneshot::Sender<()>, normalisation_factor: f32, stream_loader_controller: StreamLoaderController, bytes_per_second: usize, + duration_ms: u32, + stream_position: u64, + suggested_to_preload_next_track: bool, }, Playing { track_id: SpotifyId, + play_request_id: u64, decoder: Decoder, - end_of_track: oneshot::Sender<()>, normalisation_factor: f32, stream_loader_controller: StreamLoaderController, bytes_per_second: usize, + duration_ms: u32, + stream_position: u64, + reported_nominal_start_time: Option, + suggested_to_preload_next_track: bool, }, EndOfTrack { track_id: SpotifyId, + play_request_id: u64, }, Invalid, } @@ -230,16 +358,24 @@ impl PlayerState { fn is_playing(&self) -> bool { use self::PlayerState::*; match *self { - Stopped | EndOfTrack { .. } | Paused { .. } => false, + Stopped | EndOfTrack { .. } | Paused { .. } | Loading { .. } => false, Playing { .. } => true, Invalid => panic!("invalid state"), } } + fn is_stopped(&self) -> bool { + use self::PlayerState::*; + match *self { + Stopped => true, + _ => false, + } + } + fn decoder(&mut self) -> Option<&mut Decoder> { use self::PlayerState::*; match *self { - Stopped | EndOfTrack { .. } => None, + Stopped | EndOfTrack { .. } | Loading { .. } => None, Paused { ref mut decoder, .. } @@ -253,7 +389,7 @@ impl PlayerState { fn stream_loader_controller(&mut self) -> Option<&mut StreamLoaderController> { use self::PlayerState::*; match *self { - Stopped | EndOfTrack { .. } => None, + Stopped | EndOfTrack { .. } | Loading { .. } => None, Paused { ref mut stream_loader_controller, .. @@ -271,11 +407,13 @@ impl PlayerState { match mem::replace(self, Invalid) { Playing { track_id, - end_of_track, + play_request_id, .. } => { - let _ = end_of_track.send(()); - *self = EndOfTrack { track_id }; + *self = EndOfTrack { + track_id, + play_request_id, + }; } _ => panic!("Called playing_to_end_of_track in non-playing state."), } @@ -286,19 +424,26 @@ impl PlayerState { match ::std::mem::replace(self, Invalid) { Paused { track_id, + play_request_id, decoder, - end_of_track, normalisation_factor, stream_loader_controller, + duration_ms, bytes_per_second, + stream_position, + suggested_to_preload_next_track, } => { *self = Playing { - track_id: track_id, - decoder: decoder, - end_of_track: end_of_track, - normalisation_factor: normalisation_factor, - stream_loader_controller: stream_loader_controller, - bytes_per_second: bytes_per_second, + track_id, + play_request_id, + decoder, + normalisation_factor, + stream_loader_controller, + duration_ms, + bytes_per_second, + stream_position, + reported_nominal_start_time: None, + suggested_to_preload_next_track, }; } _ => panic!("invalid state"), @@ -310,19 +455,26 @@ impl PlayerState { match ::std::mem::replace(self, Invalid) { Playing { track_id, + play_request_id, decoder, - end_of_track, normalisation_factor, stream_loader_controller, + duration_ms, bytes_per_second, + stream_position, + reported_nominal_start_time: _, + suggested_to_preload_next_track, } => { *self = Paused { - track_id: track_id, - decoder: decoder, - end_of_track: end_of_track, - normalisation_factor: normalisation_factor, - stream_loader_controller: stream_loader_controller, - bytes_per_second: bytes_per_second, + track_id, + play_request_id, + decoder, + normalisation_factor, + stream_loader_controller, + duration_ms, + bytes_per_second, + stream_position, + suggested_to_preload_next_track, }; } _ => panic!("invalid state"), @@ -330,269 +482,12 @@ impl PlayerState { } } -impl PlayerInternal { - fn run(mut self) { - loop { - let cmd = if self.state.is_playing() { - if self.sink_running { - match self.commands.try_recv() { - Ok(cmd) => Some(cmd), - Err(TryRecvError::Empty) => None, - Err(TryRecvError::Disconnected) => return, - } - } else { - match self.commands.recv_timeout(Duration::from_secs(5)) { - Ok(cmd) => Some(cmd), - Err(RecvTimeoutError::Timeout) => None, - Err(RecvTimeoutError::Disconnected) => return, - } - } - } else { - match self.commands.recv() { - Ok(cmd) => Some(cmd), - Err(RecvError) => return, - } - }; - - if let Some(cmd) = cmd { - self.handle_command(cmd); - } - - if self.state.is_playing() && !self.sink_running { - self.start_sink(); - } - - if self.sink_running { - let mut current_normalisation_factor: f32 = 1.0; - - let packet = if let PlayerState::Playing { - ref mut decoder, - normalisation_factor, - .. - } = self.state - { - current_normalisation_factor = normalisation_factor; - Some(decoder.next_packet().expect("Vorbis error")) - } else { - None - }; - - if let Some(packet) = packet { - self.handle_packet(packet, current_normalisation_factor); - } - } - - if self.session.is_invalid() { - return; - } - } - } - - fn start_sink(&mut self) { - match self.sink.start() { - Ok(()) => self.sink_running = true, - Err(err) => error!("Could not start audio: {}", err), - } - } - - fn stop_sink_if_running(&mut self) { - if self.sink_running { - self.stop_sink(); - } - } - - fn stop_sink(&mut self) { - self.sink.stop().unwrap(); - self.sink_running = false; - } - - fn handle_packet(&mut self, packet: Option, normalisation_factor: f32) { - match packet { - Some(mut packet) => { - if packet.data().len() > 0 { - if let Some(ref editor) = self.audio_filter { - editor.modify_stream(&mut packet.data_mut()) - }; - - if self.config.normalisation && normalisation_factor != 1.0 { - for x in packet.data_mut().iter_mut() { - *x = (*x as f32 * normalisation_factor) as i16; - } - } - - if let Err(err) = self.sink.write(&packet.data()) { - error!("Could not write audio: {}", err); - self.stop_sink(); - } - } - } - - None => { - self.stop_sink(); - self.state.playing_to_end_of_track(); - } - } - } - - fn handle_command(&mut self, cmd: PlayerCommand) { - debug!("command={:?}", cmd); - match cmd { - PlayerCommand::Load(track_id, play, position, end_of_track) => { - if self.state.is_playing() { - self.stop_sink_if_running(); - } - - match self.load_track(track_id, position as i64) { - Some(( - decoder, - normalisation_factor, - stream_loader_controller, - bytes_per_second, - )) => { - if play { - match self.state { - PlayerState::Playing { - track_id: old_track_id, - .. - } - | PlayerState::EndOfTrack { - track_id: old_track_id, - .. - } => self.send_event(PlayerEvent::Changed { - old_track_id: old_track_id, - new_track_id: track_id, - }), - _ => self.send_event(PlayerEvent::Started { track_id }), - } - - self.start_sink(); - - self.state = PlayerState::Playing { - track_id: track_id, - decoder: decoder, - end_of_track: end_of_track, - normalisation_factor: normalisation_factor, - stream_loader_controller: stream_loader_controller, - bytes_per_second: bytes_per_second, - }; - } else { - self.state = PlayerState::Paused { - track_id: track_id, - decoder: decoder, - end_of_track: end_of_track, - normalisation_factor: normalisation_factor, - stream_loader_controller: stream_loader_controller, - bytes_per_second: bytes_per_second, - }; - match self.state { - PlayerState::Playing { - track_id: old_track_id, - .. - } - | PlayerState::EndOfTrack { - track_id: old_track_id, - .. - } => self.send_event(PlayerEvent::Changed { - old_track_id: old_track_id, - new_track_id: track_id, - }), - _ => (), - } - self.send_event(PlayerEvent::Stopped { track_id }); - } - } - - None => { - let _ = end_of_track.send(()); - } - } - } - - PlayerCommand::Seek(position) => { - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - stream_loader_controller.set_random_access_mode(); - } - if let Some(decoder) = self.state.decoder() { - match decoder.seek(position as i64) { - Ok(_) => (), - Err(err) => error!("Vorbis error: {:?}", err), - } - } else { - warn!("Player::seek called from invalid state"); - } - - // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - stream_loader_controller.set_stream_mode(); - } - if let PlayerState::Playing { - bytes_per_second, .. - } = self.state - { - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - // Request our read ahead range - let request_data_length = max( - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS - * (0.001 * stream_loader_controller.ping_time_ms() as f64) - * bytes_per_second as f64) as usize, - (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, - ); - stream_loader_controller.fetch_next(request_data_length); - - // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. - let wait_for_data_length = max( - (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS - * (0.001 * stream_loader_controller.ping_time_ms() as f64) - * bytes_per_second as f64) as usize, - (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, - ); - stream_loader_controller.fetch_next_blocking(wait_for_data_length); - } - } - } - - PlayerCommand::Play => { - if let PlayerState::Paused { track_id, .. } = self.state { - self.state.paused_to_playing(); - - self.send_event(PlayerEvent::Started { track_id }); - self.start_sink(); - } else { - warn!("Player::play called from invalid state"); - } - } - - PlayerCommand::Pause => { - if let PlayerState::Playing { track_id, .. } = self.state { - self.state.playing_to_paused(); - - self.stop_sink_if_running(); - self.send_event(PlayerEvent::Stopped { track_id }); - } else { - warn!("Player::pause called from invalid state"); - } - } - - PlayerCommand::Stop => match self.state { - PlayerState::Playing { track_id, .. } - | PlayerState::Paused { track_id, .. } - | PlayerState::EndOfTrack { track_id } => { - self.stop_sink_if_running(); - self.send_event(PlayerEvent::Stopped { track_id }); - self.state = PlayerState::Stopped; - } - PlayerState::Stopped => { - warn!("Player::stop called from invalid state"); - } - PlayerState::Invalid => panic!("invalid state"), - }, - } - } - - fn send_event(&mut self, event: PlayerEvent) { - let _ = self.event_sender.unbounded_send(event.clone()); - } +struct PlayerTrackLoader { + session: Session, + config: PlayerConfig, +} +impl PlayerTrackLoader { fn find_available_alternative<'a>(&self, audio: &'a AudioItem) -> Option> { if audio.available { Some(Cow::Borrowed(audio)) @@ -631,11 +526,7 @@ impl PlayerInternal { } } - fn load_track( - &self, - spotify_id: SpotifyId, - position: i64, - ) -> Option<(Decoder, f32, StreamLoaderController, usize)> { + fn load_track(&self, spotify_id: SpotifyId, position: u64) -> Option { let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { Ok(audio) => audio, Err(_) => { @@ -653,6 +544,10 @@ impl PlayerInternal { return None; } }; + + assert!(audio.duration >= 0); + let duration_ms = audio.duration as u32; + // (Most) podcasts seem to support only 96 bit Vorbis, so fall back to it let formats = match self.config.bitrate { Bitrate::Bitrate96 => [ @@ -738,41 +633,892 @@ impl PlayerInternal { let mut decoder = VorbisDecoder::new(audio_file).unwrap(); if position != 0 { - match decoder.seek(position) { + match decoder.seek(position as i64) { Ok(_) => (), Err(err) => error!("Vorbis error: {:?}", err), } stream_loader_controller.set_stream_mode(); } - info!("<{}> loaded", audio.name); - Some(( + let stream_position = position * 441 / 10; + info!("<{}> ({} ms) loaded", audio.name, audio.duration); + Some(PlayerLoadedTrackData { decoder, normalisation_factor, stream_loader_controller, bytes_per_second, - )) + duration_ms, + stream_position, + }) + } +} + +impl Future for PlayerInternal { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + let mut last_printed_stream_position_for_debug = 0; + loop { + let mut all_futures_completed_or_not_ready = true; + + // process commands that were sent to us + let cmd = match self.commands.poll() { + Ok(Async::Ready(None)) => return Ok(Async::Ready(())), // client has disconnected - shut down. + Ok(Async::Ready(Some(cmd))) => { + all_futures_completed_or_not_ready = false; + Some(cmd) + } + Ok(Async::NotReady) => None, + Err(_) => None, + }; + + if let Some(cmd) = cmd { + self.handle_command(cmd); + } + + // Handle loading of a new track to play + if let PlayerState::Loading { + ref mut loader, + track_id, + start_playback, + play_request_id, + } = self.state + { + match loader.poll() { + Ok(Async::Ready(loaded_track)) => { + self.start_playback( + track_id, + play_request_id, + loaded_track, + start_playback, + ); + if let PlayerState::Loading { .. } = self.state { + panic!("The state wasn't changed by start_playback()"); + } + } + Ok(Async::NotReady) => (), + Err(_) => { + self.handle_player_stop(); + assert!(self.state.is_stopped()); + } + } + } + + // handle pending preload requests. + if let PlayerPreload::Loading { + ref mut loader, + track_id, + } = self.preload + { + match loader.poll() { + Ok(Async::Ready(loaded_track)) => { + self.preload = PlayerPreload::Ready { + track_id, + loaded_track, + }; + } + Ok(Async::NotReady) => (), + Err(_) => { + self.preload = PlayerPreload::None; + } + } + } + + if self.state.is_playing() && !self.sink_running { + self.start_sink(); + } + + if self.sink_running { + let mut current_normalisation_factor: f32 = 1.0; + + let packet = if let PlayerState::Playing { + track_id, + play_request_id, + ref mut decoder, + normalisation_factor, + ref mut stream_position, + ref mut reported_nominal_start_time, + duration_ms, + .. + } = self.state + { + current_normalisation_factor = normalisation_factor; + let packet = decoder.next_packet().expect("Vorbis error"); + + if let Some(ref packet) = packet { + *stream_position = *stream_position + (packet.data().len() / 2) as u64; + let stream_position_seconds = *stream_position / 44100; + if stream_position_seconds != last_printed_stream_position_for_debug { + trace!( + "Stream position: {} ({} seconds)", + *stream_position, + stream_position_seconds + ); + last_printed_stream_position_for_debug = stream_position_seconds; + } + let stream_position_millis = *stream_position * 10 / 441; + + let notify_about_position = match *reported_nominal_start_time { + None => true, + Some(reported_nominal_start_time) => { + // only notify if we're behind. If we're ahead it's probably due to a buffer of the backend and we;re actually in time. + let lag = (Instant::now() - reported_nominal_start_time).as_millis() + as i64 + - stream_position_millis as i64; + if lag > 1000 { + true + } else { + false + } + } + }; + if notify_about_position { + *reported_nominal_start_time = Some( + Instant::now() + - Duration::from_millis(stream_position_millis as u64), + ); + self.send_event(PlayerEvent::Playing { + track_id, + play_request_id, + position_ms: stream_position_millis as u32, + duration_ms, + }); + } + } + + Some(packet) + } else { + None + }; + + if let Some(packet) = packet { + self.handle_packet(packet, current_normalisation_factor); + } + } + + if let PlayerState::Playing { + track_id, + play_request_id, + duration_ms, + stream_position, + ref mut stream_loader_controller, + ref mut suggested_to_preload_next_track, + .. + } + | PlayerState::Paused { + track_id, + play_request_id, + duration_ms, + stream_position, + ref mut stream_loader_controller, + ref mut suggested_to_preload_next_track, + .. + } = self.state + { + let stream_position_millis = stream_position * 10 / 441; + if (!*suggested_to_preload_next_track) + && ((duration_ms as i64 - stream_position_millis as i64) + < PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS as i64) + && stream_loader_controller.range_to_end_available() + { + *suggested_to_preload_next_track = true; + self.send_event(PlayerEvent::TimeToPreloadNextTrack { + track_id, + play_request_id, + }); + } + } + + if self.session.is_invalid() { + return Ok(Async::Ready(())); + } + + if (!self.sink_running) && all_futures_completed_or_not_ready { + return Ok(Async::NotReady); + } + } + } +} + +impl PlayerInternal { + fn start_sink(&mut self) { + match self.sink.start() { + Ok(()) => self.sink_running = true, + Err(err) => error!("Could not start audio: {}", err), + } + } + + fn stop_sink_if_running(&mut self) { + if self.sink_running { + self.stop_sink(); + } + } + + fn stop_sink(&mut self) { + self.sink.stop().unwrap(); + self.sink_running = false; + } + + fn handle_player_stop(&mut self) { + match self.state { + PlayerState::Playing { + track_id, + play_request_id, + .. + } + | PlayerState::Paused { + track_id, + play_request_id, + .. + } + | PlayerState::EndOfTrack { + track_id, + play_request_id, + } + | PlayerState::Loading { + track_id, + play_request_id, + .. + } => { + self.stop_sink_if_running(); + self.send_event(PlayerEvent::Stopped { + track_id, + play_request_id, + }); + self.state = PlayerState::Stopped; + } + PlayerState::Stopped => (), + PlayerState::Invalid => panic!("invalid state"), + } + } + + fn handle_packet(&mut self, packet: Option, normalisation_factor: f32) { + match packet { + Some(mut packet) => { + if packet.data().len() > 0 { + if let Some(ref editor) = self.audio_filter { + editor.modify_stream(&mut packet.data_mut()) + }; + + if self.config.normalisation && normalisation_factor != 1.0 { + for x in packet.data_mut().iter_mut() { + *x = (*x as f32 * normalisation_factor) as i16; + } + } + + if let Err(err) = self.sink.write(&packet.data()) { + error!("Could not write audio: {}", err); + self.stop_sink(); + } + } + } + + None => { + self.stop_sink(); + self.state.playing_to_end_of_track(); + if let PlayerState::EndOfTrack { + track_id, + play_request_id, + } = self.state + { + self.send_event(PlayerEvent::EndOfTrack { + track_id, + play_request_id, + }) + } else { + unreachable!(); + } + } + } + } + + fn start_playback( + &mut self, + track_id: SpotifyId, + play_request_id: u64, + loaded_track: PlayerLoadedTrackData, + start_playback: bool, + ) { + let position_ms = (loaded_track.stream_position * 10 / 441) as u32; + + match self.state { + PlayerState::Playing { + track_id: old_track_id, + .. + } + | PlayerState::Paused { + track_id: old_track_id, + .. + } + | PlayerState::EndOfTrack { + track_id: old_track_id, + .. + } => self.send_event(PlayerEvent::Changed { + old_track_id: old_track_id, + new_track_id: track_id, + }), + PlayerState::Stopped => self.send_event(PlayerEvent::Started { + track_id, + play_request_id, + position_ms, + }), + PlayerState::Loading { .. } => (), + PlayerState::Invalid { .. } => panic!("Player is in an invalid state."), + } + + if start_playback { + self.start_sink(); + + self.send_event(PlayerEvent::Playing { + track_id, + play_request_id, + position_ms, + duration_ms: loaded_track.duration_ms, + }); + + self.state = PlayerState::Playing { + track_id: track_id, + play_request_id: play_request_id, + decoder: loaded_track.decoder, + normalisation_factor: loaded_track.normalisation_factor, + stream_loader_controller: loaded_track.stream_loader_controller, + duration_ms: loaded_track.duration_ms, + bytes_per_second: loaded_track.bytes_per_second, + stream_position: loaded_track.stream_position, + reported_nominal_start_time: Some( + Instant::now() - Duration::from_millis(position_ms as u64), + ), + suggested_to_preload_next_track: false, + }; + } else { + self.state = PlayerState::Paused { + track_id: track_id, + play_request_id: play_request_id, + decoder: loaded_track.decoder, + normalisation_factor: loaded_track.normalisation_factor, + stream_loader_controller: loaded_track.stream_loader_controller, + duration_ms: loaded_track.duration_ms, + bytes_per_second: loaded_track.bytes_per_second, + stream_position: loaded_track.stream_position, + suggested_to_preload_next_track: false, + }; + + self.send_event(PlayerEvent::Paused { + track_id, + play_request_id, + position_ms, + duration_ms: loaded_track.duration_ms, + }); + } + } + + fn handle_command(&mut self, cmd: PlayerCommand) { + debug!("command={:?}", cmd); + match cmd { + PlayerCommand::Load { + track_id, + play_request_id, + play, + position_ms, + } => { + if self.state.is_playing() { + self.stop_sink_if_running(); + } + + match self.state { + PlayerState::Playing { + track_id: old_track_id, + .. + } + | PlayerState::Paused { + track_id: old_track_id, + .. + } + | PlayerState::EndOfTrack { + track_id: old_track_id, + .. + } + | PlayerState::Loading { + track_id: old_track_id, + .. + } => self.send_event(PlayerEvent::Changed { + old_track_id: old_track_id, + new_track_id: track_id, + }), + PlayerState::Stopped => self.send_event(PlayerEvent::Started { + track_id, + play_request_id, + position_ms, + }), + PlayerState::Invalid { .. } => panic!("Player is in an invalid state."), + } + + let mut load_command_processed = false; + if let PlayerPreload::Ready { + track_id: loaded_track_id, + .. + } = self.preload + { + if (track_id == loaded_track_id) && (position_ms == 0) { + let mut preload = PlayerPreload::None; + std::mem::swap(&mut preload, &mut self.preload); + if let PlayerPreload::Ready { + track_id, + loaded_track, + } = preload + { + self.start_playback(track_id, play_request_id, loaded_track, play); + load_command_processed = true; + } + } + } + + if !load_command_processed { + self.send_event(PlayerEvent::Loading { + track_id, + play_request_id, + position_ms, + }); + + let loader = if let PlayerPreload::Loading { + track_id: loaded_track_id, + .. + } = self.preload + { + if (track_id == loaded_track_id) && (position_ms == 0) { + let mut preload = PlayerPreload::None; + std::mem::swap(&mut preload, &mut self.preload); + if let PlayerPreload::Loading { loader, .. } = preload { + Some(loader) + } else { + None + } + } else { + None + } + } else { + None + }; + + self.preload = PlayerPreload::None; + + let loader = loader + .or_else(|| Some(self.load_track_threaded(track_id, position_ms as u64))); + let loader = loader.unwrap(); + + self.state = PlayerState::Loading { + track_id, + play_request_id, + start_playback: play, + loader, + }; + } + } + + PlayerCommand::Preload { track_id } => { + if let PlayerPreload::Loading { + track_id: currently_loading, + .. + } + | PlayerPreload::Ready { + track_id: currently_loading, + .. + } = self.preload + { + if currently_loading != track_id { + self.preload = PlayerPreload::None; + } + } + if let PlayerPreload::None = self.preload { + let loader = self.load_track_threaded(track_id, 0); + self.preload = PlayerPreload::Loading { track_id, loader } + } + } + + PlayerCommand::Seek(position) => { + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_random_access_mode(); + } + if let Some(decoder) = self.state.decoder() { + match decoder.seek(position as i64) { + Ok(_) => { + if let PlayerState::Playing { + ref mut stream_position, + .. + } + | PlayerState::Paused { + ref mut stream_position, + .. + } = self.state + { + *stream_position = position as u64 * 441 / 10; + } + } + Err(err) => error!("Vorbis error: {:?}", err), + } + } else { + warn!("Player::seek called from invalid state"); + } + + // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_stream_mode(); + } + + self.preload_data_before_playback(); + + if let PlayerState::Playing { + track_id, + play_request_id, + ref mut reported_nominal_start_time, + duration_ms, + .. + } = self.state + { + *reported_nominal_start_time = + Some(Instant::now() - Duration::from_millis(position as u64)); + self.send_event(PlayerEvent::Playing { + track_id, + play_request_id, + position_ms: position, + duration_ms, + }); + } + if let PlayerState::Paused { + track_id, + play_request_id, + duration_ms, + .. + } = self.state + { + self.send_event(PlayerEvent::Paused { + track_id, + play_request_id, + position_ms: position, + duration_ms, + }); + } + } + + PlayerCommand::Play => { + if let PlayerState::Paused { + track_id, + play_request_id, + stream_position, + .. + } = self.state + { + self.state.paused_to_playing(); + + let position_ms = (stream_position * 10 / 441) as u32; + self.send_event(PlayerEvent::Started { + track_id, + play_request_id, + position_ms, + }); + self.start_sink(); + } else { + warn!("Player::play called from invalid state"); + } + } + + PlayerCommand::Pause => { + if let PlayerState::Playing { + track_id, + play_request_id, + stream_position, + duration_ms, + .. + } = self.state + { + self.state.playing_to_paused(); + + self.stop_sink_if_running(); + let position_ms = (stream_position * 10 / 441) as u32; + self.send_event(PlayerEvent::Paused { + track_id, + play_request_id, + position_ms, + duration_ms, + }); + } else { + warn!("Player::pause called from invalid state"); + } + } + + PlayerCommand::Stop => self.handle_player_stop(), + + PlayerCommand::AddEventSender(sender) => self.event_senders.push(sender), + + PlayerCommand::EmitVolumeSetEvent(volume) => { + self.send_event(PlayerEvent::VolumeSet { volume }) + } + } + } + + fn send_event(&mut self, event: PlayerEvent) { + let mut index = 0; + while index < self.event_senders.len() { + match self.event_senders[index].unbounded_send(event.clone()) { + Ok(_) => index += 1, + Err(_) => { + self.event_senders.remove(index); + } + } + } + } + + // fn find_available_alternative<'a>(&self, audio: &'a AudioItem) -> Option> { + // if audio.available { + // Some(Cow::Borrowed(audio)) + // } else { + // if let Some(alternatives) = &audio.alternatives { + // let alternatives = alternatives + // .iter() + // .map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id)); + // let alternatives = future::join_all(alternatives).wait().unwrap(); + // alternatives + // .into_iter() + // .find(|alt| alt.available) + // .map(Cow::Owned) + // } else { + // None + // } + // } + // } + + // fn stream_data_rate(&self, format: FileFormat) -> usize { + // match format { + // FileFormat::OGG_VORBIS_96 => 12 * 1024, + // FileFormat::OGG_VORBIS_160 => 20 * 1024, + // FileFormat::OGG_VORBIS_320 => 40 * 1024, + // FileFormat::MP3_256 => 32 * 1024, + // FileFormat::MP3_320 => 40 * 1024, + // FileFormat::MP3_160 => 20 * 1024, + // FileFormat::MP3_96 => 12 * 1024, + // FileFormat::MP3_160_ENC => 20 * 1024, + // FileFormat::MP4_128_DUAL => 16 * 1024, + // FileFormat::OTHER3 => 40 * 1024, // better some high guess than nothing + // FileFormat::AAC_160 => 20 * 1024, + // FileFormat::AAC_320 => 40 * 1024, + // FileFormat::MP4_128 => 16 * 1024, + // FileFormat::OTHER5 => 40 * 1024, // better some high guess than nothing + // } + // } + + fn load_track_threaded( + &self, + spotify_id: SpotifyId, + position: u64, + ) -> Box> { + // This method creates a future that returns the loaded stream and associated info. + // Ideally all work should be done using asynchronous code. However, seek() on the + // audio stream is implemented in a blocking fashion. Thus, we can't turn it into future + // easily. Instead we spawn a thread to do the work and return a one-shot channel as the + // future to work with. + + let loader = PlayerTrackLoader { + session: self.session.clone(), + config: self.config.clone(), + }; + + let (result_tx, result_rx) = futures::sync::oneshot::channel(); + + std::thread::spawn(move || { + loader + .load_track(spotify_id, position) + .and_then(move |data| { + let _ = result_tx.send(data); + Some(()) + }); + }); + + Box::new(result_rx.map_err(|_| ())) + } + + // fn load_track( + // &self, + // spotify_id: SpotifyId, + // position: u64, + // ) -> Option<(Decoder, f32, StreamLoaderController, usize, u64)> { + // let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { + // Ok(audio) => audio, + // Err(_) => { + // error!("Unable to load audio item."); + // return None; + // } + // }; + // + // info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri); + // + // let audio = match self.find_available_alternative(&audio) { + // Some(audio) => audio, + // None => { + // warn!("<{}> is not available", audio.uri); + // return None; + // } + // }; + // // (Most) podcasts seem to support only 96 bit Vorbis, so fall back to it + // let formats = match self.config.bitrate { + // Bitrate::Bitrate96 => [ + // FileFormat::OGG_VORBIS_96, + // FileFormat::OGG_VORBIS_160, + // FileFormat::OGG_VORBIS_320, + // ], + // Bitrate::Bitrate160 => [ + // FileFormat::OGG_VORBIS_160, + // FileFormat::OGG_VORBIS_96, + // FileFormat::OGG_VORBIS_320, + // ], + // Bitrate::Bitrate320 => [ + // FileFormat::OGG_VORBIS_320, + // FileFormat::OGG_VORBIS_160, + // FileFormat::OGG_VORBIS_96, + // ], + // }; + // let format = formats + // .iter() + // .find(|format| audio.files.contains_key(format)) + // .unwrap(); + // + // let file_id = match audio.files.get(&format) { + // Some(&file_id) => file_id, + // None => { + // warn!("<{}> in not available in format {:?}", audio.name, format); + // return None; + // } + // }; + // + // let bytes_per_second = self.stream_data_rate(*format); + // let play_from_beginning = position == 0; + // + // let key = self.session.audio_key().request(spotify_id, file_id); + // let encrypted_file = AudioFile::open( + // &self.session, + // file_id, + // bytes_per_second, + // play_from_beginning, + // ); + // + // let encrypted_file = match encrypted_file.wait() { + // Ok(encrypted_file) => encrypted_file, + // Err(_) => { + // error!("Unable to load encrypted file."); + // return None; + // } + // }; + // + // let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); + // + // if play_from_beginning { + // // No need to seek -> we stream from the beginning + // stream_loader_controller.set_stream_mode(); + // } else { + // // we need to seek -> we set stream mode after the initial seek. + // stream_loader_controller.set_random_access_mode(); + // } + // + // let key = match key.wait() { + // Ok(key) => key, + // Err(_) => { + // error!("Unable to load decryption key"); + // return None; + // } + // }; + // + // let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); + // + // let normalisation_factor = match NormalisationData::parse_from_file(&mut decrypted_file) { + // Ok(normalisation_data) => { + // NormalisationData::get_factor(&self.config, normalisation_data) + // } + // Err(_) => { + // warn!("Unable to extract normalisation data, using default value."); + // 1.0 as f32 + // } + // }; + // + // let audio_file = Subfile::new(decrypted_file, 0xa7); + // + // let mut decoder = VorbisDecoder::new(audio_file).unwrap(); + // + // if position != 0 { + // match decoder.seek(position as i64) { + // Ok(_) => (), + // Err(err) => error!("Vorbis error: {:?}", err), + // } + // stream_loader_controller.set_stream_mode(); + // } + // let stream_position = position * 441 / 10; + // info!("<{}> loaded", audio.name); + // Some(( + // decoder, + // normalisation_factor, + // stream_loader_controller, + // bytes_per_second, + // stream_position, + // )) + // } + + fn preload_data_before_playback(&mut self) { + if let PlayerState::Playing { + bytes_per_second, + ref mut stream_loader_controller, + .. + } = self.state + { + // Request our read ahead range + let request_data_length = max( + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + ); + stream_loader_controller.fetch_next(request_data_length); + + // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. + let wait_for_data_length = max( + (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + ); + stream_loader_controller.fetch_next_blocking(wait_for_data_length); + } } } impl Drop for PlayerInternal { fn drop(&mut self) { - debug!("drop Player[{}]", self.session.session_id()); + debug!("drop PlayerInternal[{}]", self.session.session_id()); } } impl ::std::fmt::Debug for PlayerCommand { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { match *self { - PlayerCommand::Load(track, play, position, _) => f + PlayerCommand::Load { + track_id, + play, + position_ms, + .. + } => f .debug_tuple("Load") - .field(&track) + .field(&track_id) .field(&play) - .field(&position) + .field(&position_ms) .finish(), + PlayerCommand::Preload { track_id } => { + f.debug_tuple("Preload").field(&track_id).finish() + } PlayerCommand::Play => f.debug_tuple("Play").finish(), PlayerCommand::Pause => f.debug_tuple("Pause").finish(), PlayerCommand::Stop => f.debug_tuple("Stop").finish(), PlayerCommand::Seek(position) => f.debug_tuple("Seek").field(&position).finish(), + PlayerCommand::AddEventSender(_) => f.debug_tuple("AddEventSender").finish(), + PlayerCommand::EmitVolumeSetEvent(volume) => { + f.debug_tuple("VolumeSet").field(&volume).finish() + } } } } diff --git a/src/player_event_handler.rs b/src/player_event_handler.rs index 03bae147..6df73d15 100644 --- a/src/player_event_handler.rs +++ b/src/player_event_handler.rs @@ -25,14 +25,15 @@ pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> io::Result { + PlayerEvent::Started { track_id, .. } => { env_vars.insert("PLAYER_EVENT", "start".to_string()); env_vars.insert("TRACK_ID", track_id.to_base62()); } - PlayerEvent::Stopped { track_id } => { + PlayerEvent::Stopped { track_id, .. } => { env_vars.insert("PLAYER_EVENT", "stop".to_string()); env_vars.insert("TRACK_ID", track_id.to_base62()); } + _ => (), } run_program(onevent, env_vars) } From 9eef690a22d9943321aa06e19968336d243b4241 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sun, 2 Feb 2020 11:07:05 +1100 Subject: [PATCH 02/12] Some clean up --- connect/src/spirc.rs | 17 ++- examples/play.rs | 53 +------- playback/src/player.rs | 279 ++++++++++------------------------------- 3 files changed, 81 insertions(+), 268 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 758cc5fa..89036025 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -546,14 +546,10 @@ impl SpircTask { PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track(), PlayerEvent::Loading { .. } => (), PlayerEvent::Playing { position_ms, .. } => { - let new_nominal_start_time = - self.now_ms() - position_ms as i64; + let new_nominal_start_time = self.now_ms() - position_ms as i64; match self.play_status { SpircPlayStatus::Playing { nominal_start_time } => { - if (nominal_start_time - new_nominal_start_time) - .abs() - > 100 - { + if (nominal_start_time - new_nominal_start_time).abs() > 100 { self.update_state_position(position_ms); self.notify(None); self.play_status = SpircPlayStatus::Playing { @@ -611,14 +607,17 @@ impl SpircTask { self.play_status = SpircPlayStatus::Stopped; } }, - PlayerEvent::TimeToPreloadNextTrack {..} => match self.play_status { - SpircPlayStatus::Paused {..}|SpircPlayStatus::Playing {..}| SpircPlayStatus::LoadingPause{..}|SpircPlayStatus::LoadingPlay {..} => { + PlayerEvent::TimeToPreloadNextTrack { .. } => match self.play_status { + SpircPlayStatus::Paused { .. } + | SpircPlayStatus::Playing { .. } + | SpircPlayStatus::LoadingPause { .. } + | SpircPlayStatus::LoadingPlay { .. } => { if let Some(track_id) = self.preview_next_track() { self.player.preload(track_id); } } SpircPlayStatus::Stopped => (), - } + }, _ => (), } } diff --git a/examples/play.rs b/examples/play.rs index acefe570..4ba4c5b5 100644 --- a/examples/play.rs +++ b/examples/play.rs @@ -7,53 +7,8 @@ use librespot::core::session::Session; use librespot::core::spotify_id::SpotifyId; use librespot::playback::config::PlayerConfig; -use futures::stream::Stream; -use futures::{Async, Future, Poll}; use librespot::playback::audio_backend; -use librespot::playback::player::{Player, PlayerEvent, PlayerEventChannel}; - -pub struct SingleTrackPlayer { - play_request_id: u64, - event_channel: PlayerEventChannel, -} - -impl SingleTrackPlayer { - pub fn new(ref mut player: Player, track_id: SpotifyId) -> SingleTrackPlayer { - let event_channel = player.get_player_event_channel(); - let play_request_id = player.load(track_id, true, 0); - SingleTrackPlayer { - play_request_id, - event_channel, - } - } -} - -impl Future for SingleTrackPlayer { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - loop { - match self.event_channel.poll().unwrap() { - Async::NotReady => return Ok(Async::NotReady), - Async::Ready(None) => return Ok(Async::Ready(())), - Async::Ready(Some(event)) => match event { - PlayerEvent::EndOfTrack { - play_request_id, .. - } - | PlayerEvent::Stopped { - play_request_id, .. - } => { - if play_request_id == self.play_request_id { - return Ok(Async::Ready(())); - } - } - _ => (), - }, - } - } - } -} +use librespot::playback::player::Player; fn main() { let mut core = Core::new().unwrap(); @@ -79,12 +34,14 @@ fn main() { .run(Session::connect(session_config, credentials, None, handle)) .unwrap(); - let (player, _) = Player::new(player_config, session.clone(), None, move || { + let (mut player, _) = Player::new(player_config, session.clone(), None, move || { (backend)(None) }); + player.load(track, true, 0); + println!("Playing..."); - core.run(SingleTrackPlayer::new(player, track)).unwrap(); + core.run(player.get_end_of_track_future()).unwrap(); println!("Done"); } diff --git a/playback/src/player.rs b/playback/src/player.rs index 0d2c1efb..d7df74ff 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -247,10 +247,8 @@ impl Player { play_request_id } - pub fn preload(&mut self, track_id: SpotifyId) { - self.command(PlayerCommand::Preload { - track_id, - }); + pub fn preload(&self, track_id: SpotifyId) { + self.command(PlayerCommand::Preload { track_id }); } pub fn play(&self) { @@ -275,6 +273,19 @@ impl Player { event_receiver } + pub fn get_end_of_track_future(&self) -> Box> { + let result = self + .get_player_event_channel() + .filter(|event| match event { + PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } => true, + _ => false, + }) + .into_future() + .map_err(|_| ()) + .map(|_| ()); + Box::new(result) + } + pub fn emit_volume_set_event(&self, volume: u16) { self.command(PlayerCommand::EmitVolumeSetEvent(volume)); } @@ -299,7 +310,7 @@ struct PlayerLoadedTrackData { stream_loader_controller: StreamLoaderController, bytes_per_second: usize, duration_ms: u32, - stream_position: u64, + stream_position_pcm: u64, } enum PlayerPreload { @@ -332,7 +343,7 @@ enum PlayerState { stream_loader_controller: StreamLoaderController, bytes_per_second: usize, duration_ms: u32, - stream_position: u64, + stream_position_pcm: u64, suggested_to_preload_next_track: bool, }, Playing { @@ -343,7 +354,7 @@ enum PlayerState { stream_loader_controller: StreamLoaderController, bytes_per_second: usize, duration_ms: u32, - stream_position: u64, + stream_position_pcm: u64, reported_nominal_start_time: Option, suggested_to_preload_next_track: bool, }, @@ -430,7 +441,7 @@ impl PlayerState { stream_loader_controller, duration_ms, bytes_per_second, - stream_position, + stream_position_pcm, suggested_to_preload_next_track, } => { *self = Playing { @@ -441,7 +452,7 @@ impl PlayerState { stream_loader_controller, duration_ms, bytes_per_second, - stream_position, + stream_position_pcm, reported_nominal_start_time: None, suggested_to_preload_next_track, }; @@ -461,7 +472,7 @@ impl PlayerState { stream_loader_controller, duration_ms, bytes_per_second, - stream_position, + stream_position_pcm, reported_nominal_start_time: _, suggested_to_preload_next_track, } => { @@ -473,7 +484,7 @@ impl PlayerState { stream_loader_controller, duration_ms, bytes_per_second, - stream_position, + stream_position_pcm, suggested_to_preload_next_track, }; } @@ -526,7 +537,7 @@ impl PlayerTrackLoader { } } - fn load_track(&self, spotify_id: SpotifyId, position: u64) -> Option { + fn load_track(&self, spotify_id: SpotifyId, position_ms: u32) -> Option { let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { Ok(audio) => audio, Err(_) => { @@ -580,7 +591,7 @@ impl PlayerTrackLoader { }; let bytes_per_second = self.stream_data_rate(*format); - let play_from_beginning = position == 0; + let play_from_beginning = position_ms == 0; let key = self.session.audio_key().request(spotify_id, file_id); let encrypted_file = AudioFile::open( @@ -632,14 +643,14 @@ impl PlayerTrackLoader { let mut decoder = VorbisDecoder::new(audio_file).unwrap(); - if position != 0 { - match decoder.seek(position as i64) { + if position_ms != 0 { + match decoder.seek(position_ms as i64) { Ok(_) => (), Err(err) => error!("Vorbis error: {:?}", err), } stream_loader_controller.set_stream_mode(); } - let stream_position = position * 441 / 10; + let stream_position_pcm = PlayerInternal::position_ms_to_pcm(position_ms); info!("<{}> ({} ms) loaded", audio.name, audio.duration); Some(PlayerLoadedTrackData { decoder, @@ -647,7 +658,7 @@ impl PlayerTrackLoader { stream_loader_controller, bytes_per_second, duration_ms, - stream_position, + stream_position_pcm, }) } } @@ -736,7 +747,7 @@ impl Future for PlayerInternal { play_request_id, ref mut decoder, normalisation_factor, - ref mut stream_position, + ref mut stream_position_pcm, ref mut reported_nominal_start_time, duration_ms, .. @@ -746,17 +757,18 @@ impl Future for PlayerInternal { let packet = decoder.next_packet().expect("Vorbis error"); if let Some(ref packet) = packet { - *stream_position = *stream_position + (packet.data().len() / 2) as u64; - let stream_position_seconds = *stream_position / 44100; - if stream_position_seconds != last_printed_stream_position_for_debug { + *stream_position_pcm = + *stream_position_pcm + (packet.data().len() / 2) as u64; + let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm); + + if stream_position_millis / 1000 != last_printed_stream_position_for_debug { trace!( "Stream position: {} ({} seconds)", - *stream_position, - stream_position_seconds + *stream_position_pcm, + stream_position_millis / 1000 ); - last_printed_stream_position_for_debug = stream_position_seconds; + last_printed_stream_position_for_debug = stream_position_millis / 1000; } - let stream_position_millis = *stream_position * 10 / 441; let notify_about_position = match *reported_nominal_start_time { None => true, @@ -800,7 +812,7 @@ impl Future for PlayerInternal { track_id, play_request_id, duration_ms, - stream_position, + stream_position_pcm, ref mut stream_loader_controller, ref mut suggested_to_preload_next_track, .. @@ -809,13 +821,13 @@ impl Future for PlayerInternal { track_id, play_request_id, duration_ms, - stream_position, + stream_position_pcm, ref mut stream_loader_controller, ref mut suggested_to_preload_next_track, .. } = self.state { - let stream_position_millis = stream_position * 10 / 441; + let stream_position_millis = Self::position_pcm_to_ms(stream_position_pcm); if (!*suggested_to_preload_next_track) && ((duration_ms as i64 - stream_position_millis as i64) < PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS as i64) @@ -841,6 +853,14 @@ impl Future for PlayerInternal { } impl PlayerInternal { + fn position_pcm_to_ms(position_pcm: u64) -> u32 { + (position_pcm * 10 / 441) as u32 + } + + fn position_ms_to_pcm(position_ms: u32) -> u64 { + position_ms as u64 * 441 / 10 + } + fn start_sink(&mut self) { match self.sink.start() { Ok(()) => self.sink_running = true, @@ -939,7 +959,7 @@ impl PlayerInternal { loaded_track: PlayerLoadedTrackData, start_playback: bool, ) { - let position_ms = (loaded_track.stream_position * 10 / 441) as u32; + let position_ms = Self::position_pcm_to_ms(loaded_track.stream_position_pcm); match self.state { PlayerState::Playing { @@ -984,7 +1004,7 @@ impl PlayerInternal { stream_loader_controller: loaded_track.stream_loader_controller, duration_ms: loaded_track.duration_ms, bytes_per_second: loaded_track.bytes_per_second, - stream_position: loaded_track.stream_position, + stream_position_pcm: loaded_track.stream_position_pcm, reported_nominal_start_time: Some( Instant::now() - Duration::from_millis(position_ms as u64), ), @@ -999,7 +1019,7 @@ impl PlayerInternal { stream_loader_controller: loaded_track.stream_loader_controller, duration_ms: loaded_track.duration_ms, bytes_per_second: loaded_track.bytes_per_second, - stream_position: loaded_track.stream_position, + stream_position_pcm: loaded_track.stream_position_pcm, suggested_to_preload_next_track: false, }; @@ -1102,8 +1122,7 @@ impl PlayerInternal { self.preload = PlayerPreload::None; - let loader = loader - .or_else(|| Some(self.load_track_threaded(track_id, position_ms as u64))); + let loader = loader.or_else(|| Some(self.load_track(track_id, position_ms))); let loader = loader.unwrap(); self.state = PlayerState::Loading { @@ -1130,28 +1149,28 @@ impl PlayerInternal { } } if let PlayerPreload::None = self.preload { - let loader = self.load_track_threaded(track_id, 0); + let loader = self.load_track(track_id, 0); self.preload = PlayerPreload::Loading { track_id, loader } } } - PlayerCommand::Seek(position) => { + PlayerCommand::Seek(position_ms) => { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { stream_loader_controller.set_random_access_mode(); } if let Some(decoder) = self.state.decoder() { - match decoder.seek(position as i64) { + match decoder.seek(position_ms as i64) { Ok(_) => { if let PlayerState::Playing { - ref mut stream_position, + ref mut stream_position_pcm, .. } | PlayerState::Paused { - ref mut stream_position, + ref mut stream_position_pcm, .. } = self.state { - *stream_position = position as u64 * 441 / 10; + *stream_position_pcm = Self::position_ms_to_pcm(position_ms); } } Err(err) => error!("Vorbis error: {:?}", err), @@ -1176,11 +1195,11 @@ impl PlayerInternal { } = self.state { *reported_nominal_start_time = - Some(Instant::now() - Duration::from_millis(position as u64)); + Some(Instant::now() - Duration::from_millis(position_ms as u64)); self.send_event(PlayerEvent::Playing { track_id, play_request_id, - position_ms: position, + position_ms: position_ms, duration_ms, }); } @@ -1194,7 +1213,7 @@ impl PlayerInternal { self.send_event(PlayerEvent::Paused { track_id, play_request_id, - position_ms: position, + position_ms: position_ms, duration_ms, }); } @@ -1204,13 +1223,13 @@ impl PlayerInternal { if let PlayerState::Paused { track_id, play_request_id, - stream_position, + stream_position_pcm, .. } = self.state { self.state.paused_to_playing(); - let position_ms = (stream_position * 10 / 441) as u32; + let position_ms = Self::position_pcm_to_ms(stream_position_pcm); self.send_event(PlayerEvent::Started { track_id, play_request_id, @@ -1226,7 +1245,7 @@ impl PlayerInternal { if let PlayerState::Playing { track_id, play_request_id, - stream_position, + stream_position_pcm, duration_ms, .. } = self.state @@ -1234,7 +1253,7 @@ impl PlayerInternal { self.state.playing_to_paused(); self.stop_sink_if_running(); - let position_ms = (stream_position * 10 / 441) as u32; + let position_ms = Self::position_pcm_to_ms(stream_position_pcm); self.send_event(PlayerEvent::Paused { track_id, play_request_id, @@ -1268,48 +1287,10 @@ impl PlayerInternal { } } - // fn find_available_alternative<'a>(&self, audio: &'a AudioItem) -> Option> { - // if audio.available { - // Some(Cow::Borrowed(audio)) - // } else { - // if let Some(alternatives) = &audio.alternatives { - // let alternatives = alternatives - // .iter() - // .map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id)); - // let alternatives = future::join_all(alternatives).wait().unwrap(); - // alternatives - // .into_iter() - // .find(|alt| alt.available) - // .map(Cow::Owned) - // } else { - // None - // } - // } - // } - - // fn stream_data_rate(&self, format: FileFormat) -> usize { - // match format { - // FileFormat::OGG_VORBIS_96 => 12 * 1024, - // FileFormat::OGG_VORBIS_160 => 20 * 1024, - // FileFormat::OGG_VORBIS_320 => 40 * 1024, - // FileFormat::MP3_256 => 32 * 1024, - // FileFormat::MP3_320 => 40 * 1024, - // FileFormat::MP3_160 => 20 * 1024, - // FileFormat::MP3_96 => 12 * 1024, - // FileFormat::MP3_160_ENC => 20 * 1024, - // FileFormat::MP4_128_DUAL => 16 * 1024, - // FileFormat::OTHER3 => 40 * 1024, // better some high guess than nothing - // FileFormat::AAC_160 => 20 * 1024, - // FileFormat::AAC_320 => 40 * 1024, - // FileFormat::MP4_128 => 16 * 1024, - // FileFormat::OTHER5 => 40 * 1024, // better some high guess than nothing - // } - // } - - fn load_track_threaded( + fn load_track( &self, spotify_id: SpotifyId, - position: u64, + position_ms: u32, ) -> Box> { // This method creates a future that returns the loaded stream and associated info. // Ideally all work should be done using asynchronous code. However, seek() on the @@ -1326,7 +1307,7 @@ impl PlayerInternal { std::thread::spawn(move || { loader - .load_track(spotify_id, position) + .load_track(spotify_id, position_ms) .and_then(move |data| { let _ = result_tx.send(data); Some(()) @@ -1336,130 +1317,6 @@ impl PlayerInternal { Box::new(result_rx.map_err(|_| ())) } - // fn load_track( - // &self, - // spotify_id: SpotifyId, - // position: u64, - // ) -> Option<(Decoder, f32, StreamLoaderController, usize, u64)> { - // let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { - // Ok(audio) => audio, - // Err(_) => { - // error!("Unable to load audio item."); - // return None; - // } - // }; - // - // info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri); - // - // let audio = match self.find_available_alternative(&audio) { - // Some(audio) => audio, - // None => { - // warn!("<{}> is not available", audio.uri); - // return None; - // } - // }; - // // (Most) podcasts seem to support only 96 bit Vorbis, so fall back to it - // let formats = match self.config.bitrate { - // Bitrate::Bitrate96 => [ - // FileFormat::OGG_VORBIS_96, - // FileFormat::OGG_VORBIS_160, - // FileFormat::OGG_VORBIS_320, - // ], - // Bitrate::Bitrate160 => [ - // FileFormat::OGG_VORBIS_160, - // FileFormat::OGG_VORBIS_96, - // FileFormat::OGG_VORBIS_320, - // ], - // Bitrate::Bitrate320 => [ - // FileFormat::OGG_VORBIS_320, - // FileFormat::OGG_VORBIS_160, - // FileFormat::OGG_VORBIS_96, - // ], - // }; - // let format = formats - // .iter() - // .find(|format| audio.files.contains_key(format)) - // .unwrap(); - // - // let file_id = match audio.files.get(&format) { - // Some(&file_id) => file_id, - // None => { - // warn!("<{}> in not available in format {:?}", audio.name, format); - // return None; - // } - // }; - // - // let bytes_per_second = self.stream_data_rate(*format); - // let play_from_beginning = position == 0; - // - // let key = self.session.audio_key().request(spotify_id, file_id); - // let encrypted_file = AudioFile::open( - // &self.session, - // file_id, - // bytes_per_second, - // play_from_beginning, - // ); - // - // let encrypted_file = match encrypted_file.wait() { - // Ok(encrypted_file) => encrypted_file, - // Err(_) => { - // error!("Unable to load encrypted file."); - // return None; - // } - // }; - // - // let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); - // - // if play_from_beginning { - // // No need to seek -> we stream from the beginning - // stream_loader_controller.set_stream_mode(); - // } else { - // // we need to seek -> we set stream mode after the initial seek. - // stream_loader_controller.set_random_access_mode(); - // } - // - // let key = match key.wait() { - // Ok(key) => key, - // Err(_) => { - // error!("Unable to load decryption key"); - // return None; - // } - // }; - // - // let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); - // - // let normalisation_factor = match NormalisationData::parse_from_file(&mut decrypted_file) { - // Ok(normalisation_data) => { - // NormalisationData::get_factor(&self.config, normalisation_data) - // } - // Err(_) => { - // warn!("Unable to extract normalisation data, using default value."); - // 1.0 as f32 - // } - // }; - // - // let audio_file = Subfile::new(decrypted_file, 0xa7); - // - // let mut decoder = VorbisDecoder::new(audio_file).unwrap(); - // - // if position != 0 { - // match decoder.seek(position as i64) { - // Ok(_) => (), - // Err(err) => error!("Vorbis error: {:?}", err), - // } - // stream_loader_controller.set_stream_mode(); - // } - // let stream_position = position * 441 / 10; - // info!("<{}> loaded", audio.name); - // Some(( - // decoder, - // normalisation_factor, - // stream_loader_controller, - // bytes_per_second, - // stream_position, - // )) - // } - fn preload_data_before_playback(&mut self) { if let PlayerState::Playing { bytes_per_second, From 87563412013304ea88f4b356f148d78a1f86ffae Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Sun, 2 Feb 2020 11:12:17 +1100 Subject: [PATCH 03/12] Remove debug message --- playback/src/player.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/playback/src/player.rs b/playback/src/player.rs index d7df74ff..471dc2ae 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -668,7 +668,6 @@ impl Future for PlayerInternal { type Error = (); fn poll(&mut self) -> Poll<(), ()> { - let mut last_printed_stream_position_for_debug = 0; loop { let mut all_futures_completed_or_not_ready = true; @@ -761,15 +760,6 @@ impl Future for PlayerInternal { *stream_position_pcm + (packet.data().len() / 2) as u64; let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm); - if stream_position_millis / 1000 != last_printed_stream_position_for_debug { - trace!( - "Stream position: {} ({} seconds)", - *stream_position_pcm, - stream_position_millis / 1000 - ); - last_printed_stream_position_for_debug = stream_position_millis / 1000; - } - let notify_about_position = match *reported_nominal_start_time { None => true, Some(reported_nominal_start_time) => { From 349e182d419bc3a7337be9c485b6f15fe42c25ac Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Mon, 3 Feb 2020 01:37:17 +1100 Subject: [PATCH 04/12] Smarter handling of preloading and loading of tracks that are already loaded. --- playback/src/player.rs | 243 +++++++++++++++++++++++++++++++---------- 1 file changed, 188 insertions(+), 55 deletions(-) diff --git a/playback/src/player.rs b/playback/src/player.rs index 471dc2ae..d63faa35 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -361,6 +361,7 @@ enum PlayerState { EndOfTrack { track_id: SpotifyId, play_request_id: u64, + loaded_track: Option, }, Invalid, } @@ -419,11 +420,25 @@ impl PlayerState { Playing { track_id, play_request_id, + decoder, + duration_ms, + bytes_per_second, + normalisation_factor, + stream_loader_controller, + stream_position_pcm, .. } => { *self = EndOfTrack { track_id, play_request_id, + loaded_track: Some(PlayerLoadedTrackData { + decoder, + duration_ms, + bytes_per_second, + normalisation_factor, + stream_loader_controller, + stream_position_pcm, + }), }; } _ => panic!("Called playing_to_end_of_track in non-playing state."), @@ -884,6 +899,7 @@ impl PlayerInternal { | PlayerState::EndOfTrack { track_id, play_request_id, + .. } | PlayerState::Loading { track_id, @@ -902,6 +918,52 @@ impl PlayerInternal { } } + fn handle_play(&mut self) { + if let PlayerState::Paused { + track_id, + play_request_id, + stream_position_pcm, + .. + } = self.state + { + self.state.paused_to_playing(); + + let position_ms = Self::position_pcm_to_ms(stream_position_pcm); + self.send_event(PlayerEvent::Started { + track_id, + play_request_id, + position_ms, + }); + self.start_sink(); + } else { + warn!("Player::play called from invalid state"); + } + } + + fn handle_pause(&mut self) { + if let PlayerState::Playing { + track_id, + play_request_id, + stream_position_pcm, + duration_ms, + .. + } = self.state + { + self.state.playing_to_paused(); + + self.stop_sink_if_running(); + let position_ms = Self::position_pcm_to_ms(stream_position_pcm); + self.send_event(PlayerEvent::Paused { + track_id, + play_request_id, + position_ms, + duration_ms, + }); + } else { + warn!("Player::pause called from invalid state"); + } + } + fn handle_packet(&mut self, packet: Option, normalisation_factor: f32) { match packet { Some(mut packet) => { @@ -929,6 +991,7 @@ impl PlayerInternal { if let PlayerState::EndOfTrack { track_id, play_request_id, + .. } = self.state { self.send_event(PlayerEvent::EndOfTrack { @@ -1035,6 +1098,7 @@ impl PlayerInternal { self.stop_sink_if_running(); } + // emit the correct player event match self.state { PlayerState::Playing { track_id: old_track_id, @@ -1064,25 +1128,105 @@ impl PlayerInternal { } let mut load_command_processed = false; - if let PlayerPreload::Ready { - track_id: loaded_track_id, + + // Check if there's a matching loaded track in the EndOfTrack player state. + // This is the case if we're repeating the same track again. + if let PlayerState::EndOfTrack { + track_id: previous_track_id, + ref mut loaded_track, .. - } = self.preload + } = self.state { - if (track_id == loaded_track_id) && (position_ms == 0) { - let mut preload = PlayerPreload::None; - std::mem::swap(&mut preload, &mut self.preload); - if let PlayerPreload::Ready { - track_id, - loaded_track, - } = preload - { + if previous_track_id == track_id { + let loaded_track = mem::replace(&mut *loaded_track, None); + if let Some(mut loaded_track) = loaded_track { + if Self::position_ms_to_pcm(position_ms) + != loaded_track.stream_position_pcm + { + loaded_track + .stream_loader_controller + .set_random_access_mode(); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking. + // But most likely the track is fully + // loaded already because we played + // to the end of it. + loaded_track.stream_loader_controller.set_stream_mode(); + loaded_track.stream_position_pcm = + Self::position_ms_to_pcm(position_ms); + } self.start_playback(track_id, play_request_id, loaded_track, play); load_command_processed = true; } } } + // Check if we are already playing the track. If so, just do a seek and update our info. + if let PlayerState::Playing { + track_id: current_track_id, + play_request_id: ref mut current_play_request_id, + ref mut stream_position_pcm, + ref mut decoder, + ref mut stream_loader_controller, + .. + } + | PlayerState::Paused { + track_id: current_track_id, + play_request_id: ref mut current_play_request_id, + ref mut stream_position_pcm, + ref mut decoder, + ref mut stream_loader_controller, + .. + } = self.state + { + if current_track_id == track_id { + if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm { + stream_loader_controller.set_random_access_mode(); + let _ = decoder.seek(position_ms as i64); // This may be blocking. + stream_loader_controller.set_stream_mode(); + *stream_position_pcm = Self::position_ms_to_pcm(position_ms); + } + *current_play_request_id = play_request_id; + if play { + self.handle_play(); + } else { + self.handle_pause(); + } + load_command_processed = true; + } + } + + // Check if the requested track has been preloaded already. If so use the preloaded data. + if !load_command_processed { + if let PlayerPreload::Ready { + track_id: loaded_track_id, + .. + } = self.preload + { + if track_id == loaded_track_id { + let preload = std::mem::replace(&mut self.preload, PlayerPreload::None); + if let PlayerPreload::Ready { + track_id, + mut loaded_track, + } = preload + { + if Self::position_ms_to_pcm(position_ms) + != loaded_track.stream_position_pcm + { + loaded_track + .stream_loader_controller + .set_random_access_mode(); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking + loaded_track.stream_loader_controller.set_stream_mode(); + } + self.start_playback(track_id, play_request_id, loaded_track, play); + load_command_processed = true; + } + } + } + } + + // We need to load the track - either from scratch or by completing a preload. + // In any case we go into a Loading state to load the track. if !load_command_processed { self.send_event(PlayerEvent::Loading { track_id, @@ -1112,8 +1256,9 @@ impl PlayerInternal { self.preload = PlayerPreload::None; - let loader = loader.or_else(|| Some(self.load_track(track_id, position_ms))); - let loader = loader.unwrap(); + let loader = loader + .or_else(|| Some(self.load_track(track_id, position_ms))) + .unwrap(); self.state = PlayerState::Loading { track_id, @@ -1125,6 +1270,8 @@ impl PlayerInternal { } PlayerCommand::Preload { track_id } => { + let mut preload_track = true; + if let PlayerPreload::Loading { track_id: currently_loading, .. @@ -1134,11 +1281,35 @@ impl PlayerInternal { .. } = self.preload { - if currently_loading != track_id { + if currently_loading == track_id { + // we're already loading the requested track. + preload_track = false; + } else { + // we're loading something else - cancel it. self.preload = PlayerPreload::None; } } - if let PlayerPreload::None = self.preload { + + if let PlayerState::Playing { + track_id: current_track_id, + .. + } + | PlayerState::Paused { + track_id: current_track_id, + .. + } + | PlayerState::EndOfTrack { + track_id: current_track_id, + .. + } = self.state + { + if current_track_id == track_id { + // we already have the requested track loaded. + preload_track = false; + } + } + + if preload_track { let loader = self.load_track(track_id, 0); self.preload = PlayerPreload::Loading { track_id, loader } } @@ -1210,49 +1381,11 @@ impl PlayerInternal { } PlayerCommand::Play => { - if let PlayerState::Paused { - track_id, - play_request_id, - stream_position_pcm, - .. - } = self.state - { - self.state.paused_to_playing(); - - let position_ms = Self::position_pcm_to_ms(stream_position_pcm); - self.send_event(PlayerEvent::Started { - track_id, - play_request_id, - position_ms, - }); - self.start_sink(); - } else { - warn!("Player::play called from invalid state"); - } + self.handle_play(); } PlayerCommand::Pause => { - if let PlayerState::Playing { - track_id, - play_request_id, - stream_position_pcm, - duration_ms, - .. - } = self.state - { - self.state.playing_to_paused(); - - self.stop_sink_if_running(); - let position_ms = Self::position_pcm_to_ms(stream_position_pcm); - self.send_event(PlayerEvent::Paused { - track_id, - play_request_id, - position_ms, - duration_ms, - }); - } else { - warn!("Player::pause called from invalid state"); - } + self.handle_pause(); } PlayerCommand::Stop => self.handle_player_stop(), From 6fed8d0413ddb82fde1f6f61c83a201d83f454bb Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Mon, 3 Feb 2020 09:15:15 +1100 Subject: [PATCH 05/12] Make preloading work. --- connect/src/spirc.rs | 240 ++++++++++++++++++++++++++++++----------- playback/src/player.rs | 1 + 2 files changed, 181 insertions(+), 60 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 89036025..71a44541 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -24,10 +24,20 @@ use librespot_core::volume::Volume; enum SpircPlayStatus { Stopped, - LoadingPlay { position_ms: u32 }, - LoadingPause { position_ms: u32 }, - Playing { nominal_start_time: i64 }, - Paused { position_ms: u32 }, + LoadingPlay { + position_ms: u32, + }, + LoadingPause { + position_ms: u32, + }, + Playing { + nominal_start_time: i64, + preloading_of_next_track_triggered: bool, + }, + Paused { + position_ms: u32, + preloading_of_next_track_triggered: bool, + }, } pub struct SpircTask { @@ -548,13 +558,14 @@ impl SpircTask { PlayerEvent::Playing { position_ms, .. } => { let new_nominal_start_time = self.now_ms() - position_ms as i64; match self.play_status { - SpircPlayStatus::Playing { nominal_start_time } => { - if (nominal_start_time - new_nominal_start_time).abs() > 100 { + SpircPlayStatus::Playing { + ref mut nominal_start_time, + .. + } => { + if (*nominal_start_time - new_nominal_start_time).abs() > 100 { + *nominal_start_time = new_nominal_start_time; self.update_state_position(position_ms); self.notify(None); - self.play_status = SpircPlayStatus::Playing { - nominal_start_time: new_nominal_start_time, - }; } } SpircPlayStatus::LoadingPlay { .. } @@ -564,6 +575,7 @@ impl SpircTask { self.notify(None); self.play_status = SpircPlayStatus::Playing { nominal_start_time: new_nominal_start_time, + preloading_of_next_track_triggered: false, }; } _ => (), @@ -577,6 +589,7 @@ impl SpircTask { match self.play_status { SpircPlayStatus::Paused { ref mut position_ms, + .. } => { if *position_ms != new_position_ms { *position_ms = new_position_ms; @@ -591,6 +604,7 @@ impl SpircTask { self.notify(None); self.play_status = SpircPlayStatus::Paused { position_ms: new_position_ms, + preloading_of_next_track_triggered: false, }; } _ => (), @@ -608,15 +622,22 @@ impl SpircTask { } }, PlayerEvent::TimeToPreloadNextTrack { .. } => match self.play_status { - SpircPlayStatus::Paused { .. } - | SpircPlayStatus::Playing { .. } - | SpircPlayStatus::LoadingPause { .. } - | SpircPlayStatus::LoadingPlay { .. } => { + SpircPlayStatus::Paused { + ref mut preloading_of_next_track_triggered, + .. + } + | SpircPlayStatus::Playing { + ref mut preloading_of_next_track_triggered, + .. + } => { + *preloading_of_next_track_triggered = true; if let Some(track_id) = self.preview_next_track() { self.player.preload(track_id); } } - SpircPlayStatus::Stopped => (), + SpircPlayStatus::LoadingPause { .. } + | SpircPlayStatus::LoadingPlay { .. } + | SpircPlayStatus::Stopped => (), }, _ => (), } @@ -745,6 +766,22 @@ impl SpircTask { MessageType::kMessageTypeReplace => { self.update_tracks(&frame); self.notify(None); + + if let SpircPlayStatus::Playing { + preloading_of_next_track_triggered, + .. + } + | SpircPlayStatus::Paused { + preloading_of_next_track_triggered, + .. + } = self.play_status + { + if preloading_of_next_track_triggered { + if let Some(track_id) = self.preview_next_track() { + self.player.preload(track_id); + } + } + } } MessageType::kMessageTypeVolume => { @@ -768,13 +805,17 @@ impl SpircTask { fn handle_play(&mut self) { match self.play_status { - SpircPlayStatus::Paused { position_ms } => { + SpircPlayStatus::Paused { + position_ms, + preloading_of_next_track_triggered, + } => { self.ensure_mixer_started(); self.player.play(); self.state.set_status(PlayStatus::kPlayStatusPlay); self.update_state_position(position_ms); self.play_status = SpircPlayStatus::Playing { nominal_start_time: self.now_ms() as i64 - position_ms as i64, + preloading_of_next_track_triggered, }; } SpircPlayStatus::LoadingPause { position_ms } => { @@ -800,12 +841,18 @@ impl SpircTask { fn handle_pause(&mut self) { match self.play_status { - SpircPlayStatus::Playing { nominal_start_time } => { + SpircPlayStatus::Playing { + nominal_start_time, + preloading_of_next_track_triggered, + } => { self.player.pause(); self.state.set_status(PlayStatus::kPlayStatusPause); let position_ms = (self.now_ms() - nominal_start_time) as u32; self.update_state_position(position_ms); - self.play_status = SpircPlayStatus::Paused { position_ms }; + self.play_status = SpircPlayStatus::Paused { + position_ms, + preloading_of_next_track_triggered, + }; } SpircPlayStatus::LoadingPlay { position_ms } => { self.player.pause(); @@ -829,9 +876,11 @@ impl SpircTask { } | SpircPlayStatus::Paused { position_ms: ref mut position, + .. } => *position = position_ms, SpircPlayStatus::Playing { ref mut nominal_start_time, + .. } => *nominal_start_time = now - position_ms as i64, }; } @@ -851,7 +900,8 @@ impl SpircTask { } fn preview_next_track(&mut self) -> Option { - None + self.get_track_id_to_play_from_playlist(self.state.get_playing_track_index() + 1) + .and_then(|(track_id, _)| Some(track_id)) } fn handle_next(&mut self) { @@ -963,10 +1013,10 @@ impl SpircTask { SpircPlayStatus::Stopped => 0, SpircPlayStatus::LoadingPlay { position_ms } | SpircPlayStatus::LoadingPause { position_ms } - | SpircPlayStatus::Paused { position_ms } => position_ms, - SpircPlayStatus::Playing { nominal_start_time } => { - (self.now_ms() - nominal_start_time) as u32 - } + | SpircPlayStatus::Paused { position_ms, .. } => position_ms, + SpircPlayStatus::Playing { + nominal_start_time, .. + } => (self.now_ms() - nominal_start_time) as u32, } } @@ -1077,54 +1127,124 @@ impl SpircTask { }) } - fn load_track(&mut self, start_playing: bool, position_ms: u32) { - let context_uri = self.state.get_context_uri().to_owned(); - let mut index = self.state.get_playing_track_index(); - let start_index = index; + fn get_track_id_to_play_from_playlist(&self, index: u32) -> Option<(SpotifyId, u32)> { let tracks_len = self.state.get_track().len() as u32; - debug!( - "Loading context: <{}> index: [{}] of {}", - context_uri, index, tracks_len - ); + + let mut new_playlist_index = index; + + if new_playlist_index >= tracks_len { + new_playlist_index = 0; + } + + let start_index = new_playlist_index; + // Cycle through all tracks, break if we don't find any playable tracks - // TODO: This will panic if no playable tracks are found! // tracks in each frame either have a gid or uri (that may or may not be a valid track) // E.g - context based frames sometimes contain tracks with - let track = { - let mut track_ref = self.state.get_track()[index as usize].clone(); - let mut track_id = self.get_spotify_id_for_track(&track_ref); - while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable - { - warn!( - "Skipping track <{:?}> at position [{}] of {}", - track_ref.get_uri(), - index, - tracks_len - ); - index = if index + 1 < tracks_len { index + 1 } else { 0 }; - self.state.set_playing_track_index(index); - if index == start_index { - warn!("No playable track found in state: {:?}", self.state); - break; - } - track_ref = self.state.get_track()[index as usize].clone(); - track_id = self.get_spotify_id_for_track(&track_ref); + + let mut track_ref = self.state.get_track()[index as usize].clone(); + let mut track_id = self.get_spotify_id_for_track(&track_ref); + while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable { + warn!( + "Skipping track <{:?}> at position [{}] of {}", + track_ref.get_uri(), + new_playlist_index, + tracks_len + ); + + new_playlist_index += 1; + if new_playlist_index >= tracks_len { + new_playlist_index = 0; } - track_id + + if new_playlist_index == start_index { + warn!("No playable track found in state: {:?}", self.state); + return None; + } + track_ref = self.state.get_track()[index as usize].clone(); + track_id = self.get_spotify_id_for_track(&track_ref); } - .expect("Invalid SpotifyId"); - self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); - - self.update_state_position(position_ms); - self.state.set_status(PlayStatus::kPlayStatusLoading); - if start_playing { - self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; - } else { - self.play_status = SpircPlayStatus::LoadingPause { position_ms }; + match track_id { + Ok(track_id) => Some((track_id, new_playlist_index)), + Err(_) => None, } } + fn load_track(&mut self, start_playing: bool, position_ms: u32) { + let index = self.state.get_playing_track_index(); + + match self.get_track_id_to_play_from_playlist(index) { + Some((track, index)) => { + self.state.set_playing_track_index(index); + + self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); + + self.update_state_position(position_ms); + self.state.set_status(PlayStatus::kPlayStatusLoading); + if start_playing { + self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; + } else { + self.play_status = SpircPlayStatus::LoadingPause { position_ms }; + } + } + None => { + self.state.set_status(PlayStatus::kPlayStatusStop); + self.player.stop(); + self.ensure_mixer_stopped(); + self.play_status = SpircPlayStatus::Stopped; + } + } + } + + // fn load_track(&mut self, start_playing: bool, position_ms: u32) { + // let context_uri = self.state.get_context_uri().to_owned(); + // let mut index = self.state.get_playing_track_index(); + // let start_index = index; + // let tracks_len = self.state.get_track().len() as u32; + // debug!( + // "Loading context: <{}> index: [{}] of {}", + // context_uri, index, tracks_len + // ); + // // Cycle through all tracks, break if we don't find any playable tracks + // // TODO: This will panic if no playable tracks are found! + // // tracks in each frame either have a gid or uri (that may or may not be a valid track) + // // E.g - context based frames sometimes contain tracks with + // let track = { + // let mut track_ref = self.state.get_track()[index as usize].clone(); + // let mut track_id = self.get_spotify_id_for_track(&track_ref); + // while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable + // { + // warn!( + // "Skipping track <{:?}> at position [{}] of {}", + // track_ref.get_uri(), + // index, + // tracks_len + // ); + // index = if index + 1 < tracks_len { index + 1 } else { 0 }; + // self.state.set_playing_track_index(index); + // if index == start_index { + // warn!("No playable track found in state: {:?}", self.state); + // break; + // } + // track_ref = self.state.get_track()[index as usize].clone(); + // track_id = self.get_spotify_id_for_track(&track_ref); + // } + // track_id + // } + // .expect("Invalid SpotifyId"); + // + // self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); + // + // self.update_state_position(position_ms); + // self.state.set_status(PlayStatus::kPlayStatusLoading); + // if start_playing { + // self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; + // } else { + // self.play_status = SpircPlayStatus::LoadingPause { position_ms }; + // } + // } + fn hello(&mut self) { CommandSender::new(self, MessageType::kMessageTypeHello).send(); } diff --git a/playback/src/player.rs b/playback/src/player.rs index d63faa35..c1faa7cb 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -1270,6 +1270,7 @@ impl PlayerInternal { } PlayerCommand::Preload { track_id } => { + debug!("Preloading track"); let mut preload_track = true; if let PlayerPreload::Loading { From 981b76bace9f2df6db391c677bce34cadad70653 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Mon, 3 Feb 2020 10:11:27 +1100 Subject: [PATCH 06/12] Keep the sink open Fix typo --- connect/src/spirc.rs | 2 +- playback/src/player.rs | 43 +++++++++++++++++++++--------------------- 2 files changed, 22 insertions(+), 23 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 71a44541..b39ec3aa 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -1142,7 +1142,7 @@ impl SpircTask { // tracks in each frame either have a gid or uri (that may or may not be a valid track) // E.g - context based frames sometimes contain tracks with - let mut track_ref = self.state.get_track()[index as usize].clone(); + let mut track_ref = self.state.get_track()[new_playlist_index as usize].clone(); let mut track_id = self.get_spotify_id_for_track(&track_ref); while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable { warn!( diff --git a/playback/src/player.rs b/playback/src/player.rs index c1faa7cb..a00b8961 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -749,8 +749,8 @@ impl Future for PlayerInternal { } } - if self.state.is_playing() && !self.sink_running { - self.start_sink(); + if self.state.is_playing() { + self.ensure_sink_running(); } if self.sink_running { @@ -866,23 +866,24 @@ impl PlayerInternal { position_ms as u64 * 441 / 10 } - fn start_sink(&mut self) { - match self.sink.start() { - Ok(()) => self.sink_running = true, - Err(err) => error!("Could not start audio: {}", err), + fn ensure_sink_running(&mut self) { + if !self.sink_running { + trace!("== Starting sink =="); + match self.sink.start() { + Ok(()) => self.sink_running = true, + Err(err) => error!("Could not start audio: {}", err), + } } } - fn stop_sink_if_running(&mut self) { + fn ensure_sink_stopped(&mut self) { if self.sink_running { - self.stop_sink(); + trace!("== Stopping sink =="); + self.sink.stop().unwrap(); + self.sink_running = false; } } - fn stop_sink(&mut self) { - self.sink.stop().unwrap(); - self.sink_running = false; - } fn handle_player_stop(&mut self) { match self.state { @@ -906,7 +907,7 @@ impl PlayerInternal { play_request_id, .. } => { - self.stop_sink_if_running(); + self.ensure_sink_stopped(); self.send_event(PlayerEvent::Stopped { track_id, play_request_id, @@ -934,7 +935,7 @@ impl PlayerInternal { play_request_id, position_ms, }); - self.start_sink(); + self.ensure_sink_running(); } else { warn!("Player::play called from invalid state"); } @@ -951,7 +952,7 @@ impl PlayerInternal { { self.state.playing_to_paused(); - self.stop_sink_if_running(); + self.ensure_sink_stopped(); let position_ms = Self::position_pcm_to_ms(stream_position_pcm); self.send_event(PlayerEvent::Paused { track_id, @@ -980,13 +981,12 @@ impl PlayerInternal { if let Err(err) = self.sink.write(&packet.data()) { error!("Could not write audio: {}", err); - self.stop_sink(); + self.ensure_sink_stopped(); } } } None => { - self.stop_sink(); self.state.playing_to_end_of_track(); if let PlayerState::EndOfTrack { track_id, @@ -1040,7 +1040,7 @@ impl PlayerInternal { } if start_playback { - self.start_sink(); + self.ensure_sink_running(); self.send_event(PlayerEvent::Playing { track_id, @@ -1094,10 +1094,6 @@ impl PlayerInternal { play, position_ms, } => { - if self.state.is_playing() { - self.stop_sink_if_running(); - } - // emit the correct player event match self.state { PlayerState::Playing { @@ -1228,6 +1224,9 @@ impl PlayerInternal { // We need to load the track - either from scratch or by completing a preload. // In any case we go into a Loading state to load the track. if !load_command_processed { + + self.ensure_sink_stopped(); + self.send_event(PlayerEvent::Loading { track_id, play_request_id, From 499824a6ba1bbc6963d711606e96ed53909d42aa Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Mon, 3 Feb 2020 11:57:09 +1100 Subject: [PATCH 07/12] rust fmt --- playback/src/player.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/playback/src/player.rs b/playback/src/player.rs index a00b8961..2e61fae4 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -884,7 +884,6 @@ impl PlayerInternal { } } - fn handle_player_stop(&mut self) { match self.state { PlayerState::Playing { @@ -1224,7 +1223,6 @@ impl PlayerInternal { // We need to load the track - either from scratch or by completing a preload. // In any case we go into a Loading state to load the track. if !load_command_processed { - self.ensure_sink_stopped(); self.send_event(PlayerEvent::Loading { From ead794f4fd4f06d87b2c2037aa1f8a31f67aa4db Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Mon, 3 Feb 2020 14:31:15 +1100 Subject: [PATCH 08/12] Correct notifications when loading the same track again. --- playback/src/player.rs | 79 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 11 deletions(-) diff --git a/playback/src/player.rs b/playback/src/player.rs index 2e61fae4..f6c60020 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -716,6 +716,7 @@ impl Future for PlayerInternal { play_request_id, loaded_track, start_playback, + false, ); if let PlayerState::Loading { .. } = self.state { panic!("The state wasn't changed by start_playback()"); @@ -1010,6 +1011,7 @@ impl PlayerInternal { play_request_id: u64, loaded_track: PlayerLoadedTrackData, start_playback: bool, + state_is_invalid_because_the_same_track_is_getting_repeated: bool, ) { let position_ms = Self::position_pcm_to_ms(loaded_track.stream_position_pcm); @@ -1035,7 +1037,16 @@ impl PlayerInternal { position_ms, }), PlayerState::Loading { .. } => (), - PlayerState::Invalid { .. } => panic!("Player is in an invalid state."), + PlayerState::Invalid { .. } => { + if state_is_invalid_because_the_same_track_is_getting_repeated { + self.send_event(PlayerEvent::Changed { + old_track_id: track_id, + new_track_id: track_id, + }) + } else { + panic!("Player is in an invalid state.") + } + } } if start_playback { @@ -1149,7 +1160,13 @@ impl PlayerInternal { loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms); } - self.start_playback(track_id, play_request_id, loaded_track, play); + self.start_playback( + track_id, + play_request_id, + loaded_track, + play, + false, + ); load_command_processed = true; } } @@ -1158,7 +1175,6 @@ impl PlayerInternal { // Check if we are already playing the track. If so, just do a seek and update our info. if let PlayerState::Playing { track_id: current_track_id, - play_request_id: ref mut current_play_request_id, ref mut stream_position_pcm, ref mut decoder, ref mut stream_loader_controller, @@ -1166,7 +1182,6 @@ impl PlayerInternal { } | PlayerState::Paused { track_id: current_track_id, - play_request_id: ref mut current_play_request_id, ref mut stream_position_pcm, ref mut decoder, ref mut stream_loader_controller, @@ -1180,13 +1195,49 @@ impl PlayerInternal { stream_loader_controller.set_stream_mode(); *stream_position_pcm = Self::position_ms_to_pcm(position_ms); } - *current_play_request_id = play_request_id; - if play { - self.handle_play(); - } else { - self.handle_pause(); + + let old_state = mem::replace(&mut self.state, PlayerState::Invalid); + + if let PlayerState::Playing { + stream_position_pcm, + decoder, + stream_loader_controller, + bytes_per_second, + duration_ms, + normalisation_factor, + .. + } + | PlayerState::Paused { + stream_position_pcm, + decoder, + stream_loader_controller, + bytes_per_second, + duration_ms, + normalisation_factor, + .. + } = old_state + { + let loaded_track = PlayerLoadedTrackData { + decoder, + normalisation_factor, + stream_loader_controller, + bytes_per_second, + duration_ms, + stream_position_pcm, + }; + + self.start_playback( + track_id, + play_request_id, + loaded_track, + play, + true, + ); + + load_command_processed = true; + } else { + unreachable!(); } - load_command_processed = true; } } @@ -1213,7 +1264,13 @@ impl PlayerInternal { let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking loaded_track.stream_loader_controller.set_stream_mode(); } - self.start_playback(track_id, play_request_id, loaded_track, play); + self.start_playback( + track_id, + play_request_id, + loaded_track, + play, + false, + ); load_command_processed = true; } } From 18d1181bf511e7fd7bd3ab1f04613cc4c4b28d2d Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Mon, 3 Feb 2020 18:58:44 +1100 Subject: [PATCH 09/12] Clean up some code Ensure the player events are emitted correctly. Only call the external script on events we want to notify about. Stop sink when loading to pause. cargo fmt --- connect/src/spirc.rs | 48 --- playback/src/player.rs | 760 +++++++++++++++++------------------- src/main.rs | 20 +- src/player_event_handler.rs | 6 +- 4 files changed, 374 insertions(+), 460 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index b39ec3aa..d697ee4a 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -1197,54 +1197,6 @@ impl SpircTask { } } - // fn load_track(&mut self, start_playing: bool, position_ms: u32) { - // let context_uri = self.state.get_context_uri().to_owned(); - // let mut index = self.state.get_playing_track_index(); - // let start_index = index; - // let tracks_len = self.state.get_track().len() as u32; - // debug!( - // "Loading context: <{}> index: [{}] of {}", - // context_uri, index, tracks_len - // ); - // // Cycle through all tracks, break if we don't find any playable tracks - // // TODO: This will panic if no playable tracks are found! - // // tracks in each frame either have a gid or uri (that may or may not be a valid track) - // // E.g - context based frames sometimes contain tracks with - // let track = { - // let mut track_ref = self.state.get_track()[index as usize].clone(); - // let mut track_id = self.get_spotify_id_for_track(&track_ref); - // while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable - // { - // warn!( - // "Skipping track <{:?}> at position [{}] of {}", - // track_ref.get_uri(), - // index, - // tracks_len - // ); - // index = if index + 1 < tracks_len { index + 1 } else { 0 }; - // self.state.set_playing_track_index(index); - // if index == start_index { - // warn!("No playable track found in state: {:?}", self.state); - // break; - // } - // track_ref = self.state.get_track()[index as usize].clone(); - // track_id = self.get_spotify_id_for_track(&track_ref); - // } - // track_id - // } - // .expect("Invalid SpotifyId"); - // - // self.play_request_id = Some(self.player.load(track, start_playing, position_ms)); - // - // self.update_state_position(position_ms); - // self.state.set_status(PlayStatus::kPlayStatusLoading); - // if start_playing { - // self.play_status = SpircPlayStatus::LoadingPlay { position_ms }; - // } else { - // self.play_status = SpircPlayStatus::LoadingPause { position_ms }; - // } - // } - fn hello(&mut self) { CommandSender::new(self, MessageType::kMessageTypeHello).send(); } diff --git a/playback/src/player.rs b/playback/src/player.rs index f6c60020..b30289b6 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -66,6 +66,10 @@ enum PlayerCommand { #[derive(Debug, Clone)] pub enum PlayerEvent { + Stopped { + play_request_id: u64, + track_id: SpotifyId, + }, Loading { play_request_id: u64, track_id: SpotifyId, @@ -76,31 +80,27 @@ pub enum PlayerEvent { track_id: SpotifyId, position_ms: u32, }, + Changed { + old_track_id: SpotifyId, + new_track_id: SpotifyId, + }, Playing { play_request_id: u64, track_id: SpotifyId, position_ms: u32, duration_ms: u32, }, - Changed { - old_track_id: SpotifyId, - new_track_id: SpotifyId, - }, - TimeToPreloadNextTrack { - play_request_id: u64, - track_id: SpotifyId, - }, - EndOfTrack { - play_request_id: u64, - track_id: SpotifyId, - }, Paused { play_request_id: u64, track_id: SpotifyId, position_ms: u32, duration_ms: u32, }, - Stopped { + TimeToPreloadNextTrack { + play_request_id: u64, + track_id: SpotifyId, + }, + EndOfTrack { play_request_id: u64, track_id: SpotifyId, }, @@ -217,6 +217,8 @@ impl Player { event_senders: [event_sender].to_vec(), }; + // While PlayerInternal is written as a future, it still contains blocking code. + // It must be run by using wait() in a dedicated thread. let _ = internal.wait(); debug!("PlayerInternal thread finished."); }); @@ -683,6 +685,9 @@ impl Future for PlayerInternal { type Error = (); fn poll(&mut self) -> Poll<(), ()> { + // While this is written as a future, it still contains blocking code. + // It must be run on its own thread. + loop { let mut all_futures_completed_or_not_ready = true; @@ -716,7 +721,6 @@ impl Future for PlayerInternal { play_request_id, loaded_track, start_playback, - false, ); if let PlayerState::Loading { .. } = self.state { panic!("The state wasn't changed by start_playback()"); @@ -752,12 +756,8 @@ impl Future for PlayerInternal { if self.state.is_playing() { self.ensure_sink_running(); - } - if self.sink_running { - let mut current_normalisation_factor: f32 = 1.0; - - let packet = if let PlayerState::Playing { + if let PlayerState::Playing { track_id, play_request_id, ref mut decoder, @@ -768,7 +768,6 @@ impl Future for PlayerInternal { .. } = self.state { - current_normalisation_factor = normalisation_factor; let packet = decoder.next_packet().expect("Vorbis error"); if let Some(ref packet) = packet { @@ -804,14 +803,10 @@ impl Future for PlayerInternal { } } - Some(packet) + self.handle_packet(packet, normalisation_factor); } else { - None + unreachable!(); }; - - if let Some(packet) = packet { - self.handle_packet(packet, current_normalisation_factor); - } } if let PlayerState::Playing { @@ -833,9 +828,8 @@ impl Future for PlayerInternal { .. } = self.state { - let stream_position_millis = Self::position_pcm_to_ms(stream_position_pcm); if (!*suggested_to_preload_next_track) - && ((duration_ms as i64 - stream_position_millis as i64) + && ((duration_ms as i64 - Self::position_pcm_to_ms(stream_position_pcm) as i64) < PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS as i64) && stream_loader_controller.range_to_end_available() { @@ -851,7 +845,7 @@ impl Future for PlayerInternal { return Ok(Async::Ready(())); } - if (!self.sink_running) && all_futures_completed_or_not_ready { + if (!self.state.is_playing()) && all_futures_completed_or_not_ready { return Ok(Async::NotReady); } } @@ -924,16 +918,18 @@ impl PlayerInternal { track_id, play_request_id, stream_position_pcm, + duration_ms, .. } = self.state { self.state.paused_to_playing(); let position_ms = Self::position_pcm_to_ms(stream_position_pcm); - self.send_event(PlayerEvent::Started { + self.send_event(PlayerEvent::Playing { track_id, play_request_id, position_ms, + duration_ms, }); self.ensure_sink_running(); } else { @@ -1011,44 +1007,9 @@ impl PlayerInternal { play_request_id: u64, loaded_track: PlayerLoadedTrackData, start_playback: bool, - state_is_invalid_because_the_same_track_is_getting_repeated: bool, ) { let position_ms = Self::position_pcm_to_ms(loaded_track.stream_position_pcm); - match self.state { - PlayerState::Playing { - track_id: old_track_id, - .. - } - | PlayerState::Paused { - track_id: old_track_id, - .. - } - | PlayerState::EndOfTrack { - track_id: old_track_id, - .. - } => self.send_event(PlayerEvent::Changed { - old_track_id: old_track_id, - new_track_id: track_id, - }), - PlayerState::Stopped => self.send_event(PlayerEvent::Started { - track_id, - play_request_id, - position_ms, - }), - PlayerState::Loading { .. } => (), - PlayerState::Invalid { .. } => { - if state_is_invalid_because_the_same_track_is_getting_repeated { - self.send_event(PlayerEvent::Changed { - old_track_id: track_id, - new_track_id: track_id, - }) - } else { - panic!("Player is in an invalid state.") - } - } - } - if start_playback { self.ensure_sink_running(); @@ -1074,6 +1035,8 @@ impl PlayerInternal { suggested_to_preload_next_track: false, }; } else { + self.ensure_sink_stopped(); + self.state = PlayerState::Paused { track_id: track_id, play_request_id: play_request_id, @@ -1095,6 +1058,333 @@ impl PlayerInternal { } } + fn handle_command_load( + &mut self, + track_id: SpotifyId, + play_request_id: u64, + play: bool, + position_ms: u32, + ) { + // emit the correct player event + match self.state { + PlayerState::Playing { + track_id: old_track_id, + .. + } + | PlayerState::Paused { + track_id: old_track_id, + .. + } + | PlayerState::EndOfTrack { + track_id: old_track_id, + .. + } + | PlayerState::Loading { + track_id: old_track_id, + .. + } => self.send_event(PlayerEvent::Changed { + old_track_id: old_track_id, + new_track_id: track_id, + }), + PlayerState::Stopped => self.send_event(PlayerEvent::Started { + track_id, + play_request_id, + position_ms, + }), + PlayerState::Invalid { .. } => panic!("Player is in an invalid state."), + } + + // Now we check at different positions whether we already have a pre-loaded version + // of this track somewhere. If so, use it and return. + + // Check if there's a matching loaded track in the EndOfTrack player state. + // This is the case if we're repeating the same track again. + if let PlayerState::EndOfTrack { + track_id: previous_track_id, + ref mut loaded_track, + .. + } = self.state + { + if previous_track_id == track_id { + let loaded_track = mem::replace(&mut *loaded_track, None); + if let Some(mut loaded_track) = loaded_track { + if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm { + loaded_track + .stream_loader_controller + .set_random_access_mode(); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking. + // But most likely the track is fully + // loaded already because we played + // to the end of it. + loaded_track.stream_loader_controller.set_stream_mode(); + loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms); + } + self.start_playback(track_id, play_request_id, loaded_track, play); + return; + } + } + } + + // Check if we are already playing the track. If so, just do a seek and update our info. + if let PlayerState::Playing { + track_id: current_track_id, + ref mut stream_position_pcm, + ref mut decoder, + ref mut stream_loader_controller, + .. + } + | PlayerState::Paused { + track_id: current_track_id, + ref mut stream_position_pcm, + ref mut decoder, + ref mut stream_loader_controller, + .. + } = self.state + { + if current_track_id == track_id { + // we can use the current decoder. Ensure it's at the correct position. + if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm { + stream_loader_controller.set_random_access_mode(); + let _ = decoder.seek(position_ms as i64); // This may be blocking. + stream_loader_controller.set_stream_mode(); + *stream_position_pcm = Self::position_ms_to_pcm(position_ms); + } + + // Move the info from the current state into a PlayerLoadedTrackData so we can use + // the usual code path to start playback. + let old_state = mem::replace(&mut self.state, PlayerState::Invalid); + + if let PlayerState::Playing { + stream_position_pcm, + decoder, + stream_loader_controller, + bytes_per_second, + duration_ms, + normalisation_factor, + .. + } + | PlayerState::Paused { + stream_position_pcm, + decoder, + stream_loader_controller, + bytes_per_second, + duration_ms, + normalisation_factor, + .. + } = old_state + { + let loaded_track = PlayerLoadedTrackData { + decoder, + normalisation_factor, + stream_loader_controller, + bytes_per_second, + duration_ms, + stream_position_pcm, + }; + + self.start_playback(track_id, play_request_id, loaded_track, play); + + if let PlayerState::Invalid = self.state { + panic!("start_playback() hasn't set a valid player state."); + } + + return; + } else { + unreachable!(); + } + } + } + + // Check if the requested track has been preloaded already. If so use the preloaded data. + if let PlayerPreload::Ready { + track_id: loaded_track_id, + .. + } = self.preload + { + if track_id == loaded_track_id { + let preload = std::mem::replace(&mut self.preload, PlayerPreload::None); + if let PlayerPreload::Ready { + track_id, + mut loaded_track, + } = preload + { + if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm { + loaded_track + .stream_loader_controller + .set_random_access_mode(); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking + loaded_track.stream_loader_controller.set_stream_mode(); + } + self.start_playback(track_id, play_request_id, loaded_track, play); + return; + } else { + unreachable!(); + } + } + } + + // We need to load the track - either from scratch or by completing a preload. + // In any case we go into a Loading state to load the track. + self.ensure_sink_stopped(); + + self.send_event(PlayerEvent::Loading { + track_id, + play_request_id, + position_ms, + }); + + // Try to extract a pending loader from the preloading mechanism + let loader = if let PlayerPreload::Loading { + track_id: loaded_track_id, + .. + } = self.preload + { + if (track_id == loaded_track_id) && (position_ms == 0) { + let mut preload = PlayerPreload::None; + std::mem::swap(&mut preload, &mut self.preload); + if let PlayerPreload::Loading { loader, .. } = preload { + Some(loader) + } else { + None + } + } else { + None + } + } else { + None + }; + + self.preload = PlayerPreload::None; + + // If we don't have a loader yet, create one from scratch. + let loader = loader + .or_else(|| Some(self.load_track(track_id, position_ms))) + .unwrap(); + + // Set ourselves to a loading state. + self.state = PlayerState::Loading { + track_id, + play_request_id, + start_playback: play, + loader, + }; + } + + fn handle_command_preload(&mut self, track_id: SpotifyId) { + debug!("Preloading track"); + let mut preload_track = true; + + // check whether the track is already loaded somewhere or being loaded. + if let PlayerPreload::Loading { + track_id: currently_loading, + .. + } + | PlayerPreload::Ready { + track_id: currently_loading, + .. + } = self.preload + { + if currently_loading == track_id { + // we're already preloading the requested track. + preload_track = false; + } else { + // we're preloading something else - cancel it. + self.preload = PlayerPreload::None; + } + } + + if let PlayerState::Playing { + track_id: current_track_id, + .. + } + | PlayerState::Paused { + track_id: current_track_id, + .. + } + | PlayerState::EndOfTrack { + track_id: current_track_id, + .. + } = self.state + { + if current_track_id == track_id { + // we already have the requested track loaded. + preload_track = false; + } + } + + // schedule the preload if the current track if desired. + if preload_track { + let loader = self.load_track(track_id, 0); + self.preload = PlayerPreload::Loading { track_id, loader } + } + } + + fn handle_command_seek(&mut self, position_ms: u32) { + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_random_access_mode(); + } + if let Some(decoder) = self.state.decoder() { + match decoder.seek(position_ms as i64) { + Ok(_) => { + if let PlayerState::Playing { + ref mut stream_position_pcm, + .. + } + | PlayerState::Paused { + ref mut stream_position_pcm, + .. + } = self.state + { + *stream_position_pcm = Self::position_ms_to_pcm(position_ms); + } + } + Err(err) => error!("Vorbis error: {:?}", err), + } + } else { + warn!("Player::seek called from invalid state"); + } + + // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_stream_mode(); + } + + // ensure we have a bit of a buffer of downloaded data + self.preload_data_before_playback(); + + if let PlayerState::Playing { + track_id, + play_request_id, + ref mut reported_nominal_start_time, + duration_ms, + .. + } = self.state + { + *reported_nominal_start_time = + Some(Instant::now() - Duration::from_millis(position_ms as u64)); + self.send_event(PlayerEvent::Playing { + track_id, + play_request_id, + position_ms, + duration_ms, + }); + } + if let PlayerState::Paused { + track_id, + play_request_id, + duration_ms, + .. + } = self.state + { + self.send_event(PlayerEvent::Paused { + track_id, + play_request_id, + position_ms, + duration_ms, + }); + } + } + fn handle_command(&mut self, cmd: PlayerCommand) { debug!("command={:?}", cmd); match cmd { @@ -1103,345 +1393,15 @@ impl PlayerInternal { play_request_id, play, position_ms, - } => { - // emit the correct player event - match self.state { - PlayerState::Playing { - track_id: old_track_id, - .. - } - | PlayerState::Paused { - track_id: old_track_id, - .. - } - | PlayerState::EndOfTrack { - track_id: old_track_id, - .. - } - | PlayerState::Loading { - track_id: old_track_id, - .. - } => self.send_event(PlayerEvent::Changed { - old_track_id: old_track_id, - new_track_id: track_id, - }), - PlayerState::Stopped => self.send_event(PlayerEvent::Started { - track_id, - play_request_id, - position_ms, - }), - PlayerState::Invalid { .. } => panic!("Player is in an invalid state."), - } + } => self.handle_command_load(track_id, play_request_id, play, position_ms), - let mut load_command_processed = false; + PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id), - // Check if there's a matching loaded track in the EndOfTrack player state. - // This is the case if we're repeating the same track again. - if let PlayerState::EndOfTrack { - track_id: previous_track_id, - ref mut loaded_track, - .. - } = self.state - { - if previous_track_id == track_id { - let loaded_track = mem::replace(&mut *loaded_track, None); - if let Some(mut loaded_track) = loaded_track { - if Self::position_ms_to_pcm(position_ms) - != loaded_track.stream_position_pcm - { - loaded_track - .stream_loader_controller - .set_random_access_mode(); - let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking. - // But most likely the track is fully - // loaded already because we played - // to the end of it. - loaded_track.stream_loader_controller.set_stream_mode(); - loaded_track.stream_position_pcm = - Self::position_ms_to_pcm(position_ms); - } - self.start_playback( - track_id, - play_request_id, - loaded_track, - play, - false, - ); - load_command_processed = true; - } - } - } + PlayerCommand::Seek(position_ms) => self.handle_command_seek(position_ms), - // Check if we are already playing the track. If so, just do a seek and update our info. - if let PlayerState::Playing { - track_id: current_track_id, - ref mut stream_position_pcm, - ref mut decoder, - ref mut stream_loader_controller, - .. - } - | PlayerState::Paused { - track_id: current_track_id, - ref mut stream_position_pcm, - ref mut decoder, - ref mut stream_loader_controller, - .. - } = self.state - { - if current_track_id == track_id { - if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm { - stream_loader_controller.set_random_access_mode(); - let _ = decoder.seek(position_ms as i64); // This may be blocking. - stream_loader_controller.set_stream_mode(); - *stream_position_pcm = Self::position_ms_to_pcm(position_ms); - } + PlayerCommand::Play => self.handle_play(), - let old_state = mem::replace(&mut self.state, PlayerState::Invalid); - - if let PlayerState::Playing { - stream_position_pcm, - decoder, - stream_loader_controller, - bytes_per_second, - duration_ms, - normalisation_factor, - .. - } - | PlayerState::Paused { - stream_position_pcm, - decoder, - stream_loader_controller, - bytes_per_second, - duration_ms, - normalisation_factor, - .. - } = old_state - { - let loaded_track = PlayerLoadedTrackData { - decoder, - normalisation_factor, - stream_loader_controller, - bytes_per_second, - duration_ms, - stream_position_pcm, - }; - - self.start_playback( - track_id, - play_request_id, - loaded_track, - play, - true, - ); - - load_command_processed = true; - } else { - unreachable!(); - } - } - } - - // Check if the requested track has been preloaded already. If so use the preloaded data. - if !load_command_processed { - if let PlayerPreload::Ready { - track_id: loaded_track_id, - .. - } = self.preload - { - if track_id == loaded_track_id { - let preload = std::mem::replace(&mut self.preload, PlayerPreload::None); - if let PlayerPreload::Ready { - track_id, - mut loaded_track, - } = preload - { - if Self::position_ms_to_pcm(position_ms) - != loaded_track.stream_position_pcm - { - loaded_track - .stream_loader_controller - .set_random_access_mode(); - let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking - loaded_track.stream_loader_controller.set_stream_mode(); - } - self.start_playback( - track_id, - play_request_id, - loaded_track, - play, - false, - ); - load_command_processed = true; - } - } - } - } - - // We need to load the track - either from scratch or by completing a preload. - // In any case we go into a Loading state to load the track. - if !load_command_processed { - self.ensure_sink_stopped(); - - self.send_event(PlayerEvent::Loading { - track_id, - play_request_id, - position_ms, - }); - - let loader = if let PlayerPreload::Loading { - track_id: loaded_track_id, - .. - } = self.preload - { - if (track_id == loaded_track_id) && (position_ms == 0) { - let mut preload = PlayerPreload::None; - std::mem::swap(&mut preload, &mut self.preload); - if let PlayerPreload::Loading { loader, .. } = preload { - Some(loader) - } else { - None - } - } else { - None - } - } else { - None - }; - - self.preload = PlayerPreload::None; - - let loader = loader - .or_else(|| Some(self.load_track(track_id, position_ms))) - .unwrap(); - - self.state = PlayerState::Loading { - track_id, - play_request_id, - start_playback: play, - loader, - }; - } - } - - PlayerCommand::Preload { track_id } => { - debug!("Preloading track"); - let mut preload_track = true; - - if let PlayerPreload::Loading { - track_id: currently_loading, - .. - } - | PlayerPreload::Ready { - track_id: currently_loading, - .. - } = self.preload - { - if currently_loading == track_id { - // we're already loading the requested track. - preload_track = false; - } else { - // we're loading something else - cancel it. - self.preload = PlayerPreload::None; - } - } - - if let PlayerState::Playing { - track_id: current_track_id, - .. - } - | PlayerState::Paused { - track_id: current_track_id, - .. - } - | PlayerState::EndOfTrack { - track_id: current_track_id, - .. - } = self.state - { - if current_track_id == track_id { - // we already have the requested track loaded. - preload_track = false; - } - } - - if preload_track { - let loader = self.load_track(track_id, 0); - self.preload = PlayerPreload::Loading { track_id, loader } - } - } - - PlayerCommand::Seek(position_ms) => { - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - stream_loader_controller.set_random_access_mode(); - } - if let Some(decoder) = self.state.decoder() { - match decoder.seek(position_ms as i64) { - Ok(_) => { - if let PlayerState::Playing { - ref mut stream_position_pcm, - .. - } - | PlayerState::Paused { - ref mut stream_position_pcm, - .. - } = self.state - { - *stream_position_pcm = Self::position_ms_to_pcm(position_ms); - } - } - Err(err) => error!("Vorbis error: {:?}", err), - } - } else { - warn!("Player::seek called from invalid state"); - } - - // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - stream_loader_controller.set_stream_mode(); - } - - self.preload_data_before_playback(); - - if let PlayerState::Playing { - track_id, - play_request_id, - ref mut reported_nominal_start_time, - duration_ms, - .. - } = self.state - { - *reported_nominal_start_time = - Some(Instant::now() - Duration::from_millis(position_ms as u64)); - self.send_event(PlayerEvent::Playing { - track_id, - play_request_id, - position_ms: position_ms, - duration_ms, - }); - } - if let PlayerState::Paused { - track_id, - play_request_id, - duration_ms, - .. - } = self.state - { - self.send_event(PlayerEvent::Paused { - track_id, - play_request_id, - position_ms: position_ms, - duration_ms, - }); - } - } - - PlayerCommand::Play => { - self.handle_play(); - } - - PlayerCommand::Pause => { - self.handle_pause(); - } + PlayerCommand::Pause => self.handle_pause(), PlayerCommand::Stop => self.handle_player_stop(), diff --git a/src/main.rs b/src/main.rs index 8ee3b0c4..70a2dff8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -539,16 +539,18 @@ impl Future for Main { if let Some(ref mut player_event_channel) = self.player_event_channel { if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() { if let Some(ref program) = self.player_event_program { - let child = run_program_on_events(event, program) - .expect("program failed to start") - .map(|status| { - if !status.success() { - error!("child exited with status {:?}", status.code()); - } - }) - .map_err(|e| error!("failed to wait on child process: {}", e)); + if let Some(child) = run_program_on_events(event, program) { + let child = child + .expect("program failed to start") + .map(|status| { + if !status.success() { + error!("child exited with status {:?}", status.code()); + } + }) + .map_err(|e| error!("failed to wait on child process: {}", e)); - self.handle.spawn(child); + self.handle.spawn(child); + } } } } diff --git a/src/player_event_handler.rs b/src/player_event_handler.rs index 6df73d15..2fa34d2b 100644 --- a/src/player_event_handler.rs +++ b/src/player_event_handler.rs @@ -14,7 +14,7 @@ fn run_program(program: &str, env_vars: HashMap<&str, String>) -> io::Result io::Result { +pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option> { let mut env_vars = HashMap::new(); match event { PlayerEvent::Changed { @@ -33,7 +33,7 @@ pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> io::Result (), + _ => return None, } - run_program(onevent, env_vars) + Some(run_program(onevent, env_vars)) } From 3f111a97785c1c1d5a77d833a98e5fb085a85399 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Fri, 7 Feb 2020 21:11:49 +1100 Subject: [PATCH 10/12] Suppress sending loading state to Spotify unless we actually need to load a track. --- connect/src/spirc.rs | 61 +++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index d697ee4a..cb199d1a 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -484,7 +484,7 @@ impl SpircTask { SpircCommand::Play => { if active { self.handle_play(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypePlay).send(); } @@ -492,7 +492,7 @@ impl SpircTask { SpircCommand::PlayPause => { if active { self.handle_play_pause(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypePlayPause).send(); } @@ -500,7 +500,7 @@ impl SpircTask { SpircCommand::Pause => { if active { self.handle_pause(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypePause).send(); } @@ -508,7 +508,7 @@ impl SpircTask { SpircCommand::Prev => { if active { self.handle_prev(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypePrev).send(); } @@ -516,7 +516,7 @@ impl SpircTask { SpircCommand::Next => { if active { self.handle_next(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypeNext).send(); } @@ -524,7 +524,7 @@ impl SpircTask { SpircCommand::VolumeUp => { if active { self.handle_volume_up(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypeVolumeUp).send(); } @@ -532,7 +532,7 @@ impl SpircTask { SpircCommand::VolumeDown => { if active { self.handle_volume_down(); - self.notify(None); + self.notify(None, true); } else { CommandSender::new(self, MessageType::kMessageTypeVolumeDown).send(); } @@ -554,7 +554,7 @@ impl SpircTask { if Some(play_request_id) == self.play_request_id { match event { PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track(), - PlayerEvent::Loading { .. } => (), + PlayerEvent::Loading { .. } => self.notify(None, false), PlayerEvent::Playing { position_ms, .. } => { let new_nominal_start_time = self.now_ms() - position_ms as i64; match self.play_status { @@ -565,14 +565,14 @@ impl SpircTask { if (*nominal_start_time - new_nominal_start_time).abs() > 100 { *nominal_start_time = new_nominal_start_time; self.update_state_position(position_ms); - self.notify(None); + self.notify(None, true); } } SpircPlayStatus::LoadingPlay { .. } | SpircPlayStatus::LoadingPause { .. } => { self.state.set_status(PlayStatus::kPlayStatusPlay); self.update_state_position(position_ms); - self.notify(None); + self.notify(None, true); self.play_status = SpircPlayStatus::Playing { nominal_start_time: new_nominal_start_time, preloading_of_next_track_triggered: false, @@ -594,14 +594,14 @@ impl SpircTask { if *position_ms != new_position_ms { *position_ms = new_position_ms; self.update_state_position(new_position_ms); - self.notify(None); + self.notify(None, true); } } SpircPlayStatus::LoadingPlay { .. } | SpircPlayStatus::LoadingPause { .. } => { self.state.set_status(PlayStatus::kPlayStatusPause); self.update_state_position(new_position_ms); - self.notify(None); + self.notify(None, true); self.play_status = SpircPlayStatus::Paused { position_ms: new_position_ms, preloading_of_next_track_triggered: false, @@ -617,7 +617,7 @@ impl SpircTask { warn!("The player has stopped unexpentedly."); self.state.set_status(PlayStatus::kPlayStatusStop); self.ensure_mixer_stopped(); - self.notify(None); + self.notify(None, true); self.play_status = SpircPlayStatus::Stopped; } }, @@ -671,7 +671,7 @@ impl SpircTask { match frame.get_typ() { MessageType::kMessageTypeHello => { - self.notify(Some(frame.get_ident())); + self.notify(Some(frame.get_ident()), true); } MessageType::kMessageTypeLoad => { @@ -695,47 +695,47 @@ impl SpircTask { self.play_status = SpircPlayStatus::Stopped; } - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypePlay => { self.handle_play(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypePlayPause => { self.handle_play_pause(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypePause => { self.handle_pause(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeNext => { self.handle_next(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypePrev => { self.handle_prev(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeVolumeUp => { self.handle_volume_up(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeVolumeDown => { self.handle_volume_down(); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeRepeat => { self.state.set_repeat(frame.get_state().get_repeat()); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeShuffle => { @@ -755,17 +755,17 @@ impl SpircTask { let context = self.state.get_context_uri(); debug!("{:?}", context); } - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeSeek => { self.handle_seek(frame.get_position()); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeReplace => { self.update_tracks(&frame); - self.notify(None); + self.notify(None, true); if let SpircPlayStatus::Playing { preloading_of_next_track_triggered, @@ -786,7 +786,7 @@ impl SpircTask { MessageType::kMessageTypeVolume => { self.set_volume(frame.get_volume() as u16); - self.notify(None); + self.notify(None, true); } MessageType::kMessageTypeNotify => { @@ -1005,7 +1005,7 @@ impl SpircTask { fn handle_end_of_track(&mut self) { self.handle_next(); - self.notify(None); + self.notify(None, true); } fn position(&mut self) -> u32 { @@ -1201,7 +1201,10 @@ impl SpircTask { CommandSender::new(self, MessageType::kMessageTypeHello).send(); } - fn notify(&mut self, recipient: Option<&str>) { + fn notify(&mut self, recipient: Option<&str>, suppress_loading_status: bool) { + if suppress_loading_status && (self.state.get_status() == PlayStatus::kPlayStatusLoading) { + return; + }; let status_string = match self.state.get_status() { PlayStatus::kPlayStatusLoading => "kPlayStatusLoading", PlayStatus::kPlayStatusPause => "kPlayStatusPause", From 873f86bb9605a3155e17b929e728137480af111d Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Fri, 7 Feb 2020 23:52:20 +1100 Subject: [PATCH 11/12] Cancel preload requests to free bandwidth when repeating the same track. --- playback/src/player.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/playback/src/player.rs b/playback/src/player.rs index b30289b6..ef7484c7 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -1119,6 +1119,7 @@ impl PlayerInternal { loaded_track.stream_loader_controller.set_stream_mode(); loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms); } + self.preload = PlayerPreload::None; self.start_playback(track_id, play_request_id, loaded_track, play); return; } @@ -1182,6 +1183,7 @@ impl PlayerInternal { stream_position_pcm, }; + self.preload = PlayerPreload::None; self.start_playback(track_id, play_request_id, loaded_track, play); if let PlayerState::Invalid = self.state { From f3b13beb1731bc379327094e7dc6375ef213d7c2 Mon Sep 17 00:00:00 2001 From: Konstantin Seiler Date: Thu, 27 Feb 2020 12:25:25 +1100 Subject: [PATCH 12/12] Fix typo. --- connect/src/spirc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index cb199d1a..57c5eb19 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -614,7 +614,7 @@ impl SpircTask { PlayerEvent::Stopped { .. } => match self.play_status { SpircPlayStatus::Stopped => (), _ => { - warn!("The player has stopped unexpentedly."); + warn!("The player has stopped unexpectedly."); self.state.set_status(PlayStatus::kPlayStatusStop); self.ensure_mixer_stopped(); self.notify(None, true);