Clean up some code

Ensure the player events are emitted correctly.
Only call the external script on events we want to notify about.
Stop sink when loading to pause.
cargo fmt
This commit is contained in:
Konstantin Seiler 2020-02-03 18:58:44 +11:00
parent ead794f4fd
commit 18d1181bf5
4 changed files with 374 additions and 460 deletions

View file

@ -1197,54 +1197,6 @@ impl SpircTask {
} }
} }
// fn load_track(&mut self, start_playing: bool, position_ms: u32) {
// let context_uri = self.state.get_context_uri().to_owned();
// let mut index = self.state.get_playing_track_index();
// let start_index = index;
// let tracks_len = self.state.get_track().len() as u32;
// debug!(
// "Loading context: <{}> index: [{}] of {}",
// context_uri, index, tracks_len
// );
// // Cycle through all tracks, break if we don't find any playable tracks
// // TODO: This will panic if no playable tracks are found!
// // tracks in each frame either have a gid or uri (that may or may not be a valid track)
// // E.g - context based frames sometimes contain tracks with <spotify:meta:page:>
// let track = {
// let mut track_ref = self.state.get_track()[index as usize].clone();
// let mut track_id = self.get_spotify_id_for_track(&track_ref);
// while track_id.is_err() || track_id.unwrap().audio_type == SpotifyAudioType::NonPlayable
// {
// warn!(
// "Skipping track <{:?}> at position [{}] of {}",
// track_ref.get_uri(),
// index,
// tracks_len
// );
// index = if index + 1 < tracks_len { index + 1 } else { 0 };
// self.state.set_playing_track_index(index);
// if index == start_index {
// warn!("No playable track found in state: {:?}", self.state);
// break;
// }
// track_ref = self.state.get_track()[index as usize].clone();
// track_id = self.get_spotify_id_for_track(&track_ref);
// }
// track_id
// }
// .expect("Invalid SpotifyId");
//
// self.play_request_id = Some(self.player.load(track, start_playing, position_ms));
//
// self.update_state_position(position_ms);
// self.state.set_status(PlayStatus::kPlayStatusLoading);
// if start_playing {
// self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
// } else {
// self.play_status = SpircPlayStatus::LoadingPause { position_ms };
// }
// }
fn hello(&mut self) { fn hello(&mut self) {
CommandSender::new(self, MessageType::kMessageTypeHello).send(); CommandSender::new(self, MessageType::kMessageTypeHello).send();
} }

View file

@ -66,6 +66,10 @@ enum PlayerCommand {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum PlayerEvent { pub enum PlayerEvent {
Stopped {
play_request_id: u64,
track_id: SpotifyId,
},
Loading { Loading {
play_request_id: u64, play_request_id: u64,
track_id: SpotifyId, track_id: SpotifyId,
@ -76,31 +80,27 @@ pub enum PlayerEvent {
track_id: SpotifyId, track_id: SpotifyId,
position_ms: u32, position_ms: u32,
}, },
Changed {
old_track_id: SpotifyId,
new_track_id: SpotifyId,
},
Playing { Playing {
play_request_id: u64, play_request_id: u64,
track_id: SpotifyId, track_id: SpotifyId,
position_ms: u32, position_ms: u32,
duration_ms: u32, duration_ms: u32,
}, },
Changed {
old_track_id: SpotifyId,
new_track_id: SpotifyId,
},
TimeToPreloadNextTrack {
play_request_id: u64,
track_id: SpotifyId,
},
EndOfTrack {
play_request_id: u64,
track_id: SpotifyId,
},
Paused { Paused {
play_request_id: u64, play_request_id: u64,
track_id: SpotifyId, track_id: SpotifyId,
position_ms: u32, position_ms: u32,
duration_ms: u32, duration_ms: u32,
}, },
Stopped { TimeToPreloadNextTrack {
play_request_id: u64,
track_id: SpotifyId,
},
EndOfTrack {
play_request_id: u64, play_request_id: u64,
track_id: SpotifyId, track_id: SpotifyId,
}, },
@ -217,6 +217,8 @@ impl Player {
event_senders: [event_sender].to_vec(), event_senders: [event_sender].to_vec(),
}; };
// While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread.
let _ = internal.wait(); let _ = internal.wait();
debug!("PlayerInternal thread finished."); debug!("PlayerInternal thread finished.");
}); });
@ -683,6 +685,9 @@ impl Future for PlayerInternal {
type Error = (); type Error = ();
fn poll(&mut self) -> Poll<(), ()> { fn poll(&mut self) -> Poll<(), ()> {
// While this is written as a future, it still contains blocking code.
// It must be run on its own thread.
loop { loop {
let mut all_futures_completed_or_not_ready = true; let mut all_futures_completed_or_not_ready = true;
@ -716,7 +721,6 @@ impl Future for PlayerInternal {
play_request_id, play_request_id,
loaded_track, loaded_track,
start_playback, start_playback,
false,
); );
if let PlayerState::Loading { .. } = self.state { if let PlayerState::Loading { .. } = self.state {
panic!("The state wasn't changed by start_playback()"); panic!("The state wasn't changed by start_playback()");
@ -752,12 +756,8 @@ impl Future for PlayerInternal {
if self.state.is_playing() { if self.state.is_playing() {
self.ensure_sink_running(); self.ensure_sink_running();
}
if self.sink_running { if let PlayerState::Playing {
let mut current_normalisation_factor: f32 = 1.0;
let packet = if let PlayerState::Playing {
track_id, track_id,
play_request_id, play_request_id,
ref mut decoder, ref mut decoder,
@ -768,7 +768,6 @@ impl Future for PlayerInternal {
.. ..
} = self.state } = self.state
{ {
current_normalisation_factor = normalisation_factor;
let packet = decoder.next_packet().expect("Vorbis error"); let packet = decoder.next_packet().expect("Vorbis error");
if let Some(ref packet) = packet { if let Some(ref packet) = packet {
@ -804,14 +803,10 @@ impl Future for PlayerInternal {
} }
} }
Some(packet) self.handle_packet(packet, normalisation_factor);
} else { } else {
None unreachable!();
}; };
if let Some(packet) = packet {
self.handle_packet(packet, current_normalisation_factor);
}
} }
if let PlayerState::Playing { if let PlayerState::Playing {
@ -833,9 +828,8 @@ impl Future for PlayerInternal {
.. ..
} = self.state } = self.state
{ {
let stream_position_millis = Self::position_pcm_to_ms(stream_position_pcm);
if (!*suggested_to_preload_next_track) if (!*suggested_to_preload_next_track)
&& ((duration_ms as i64 - stream_position_millis as i64) && ((duration_ms as i64 - Self::position_pcm_to_ms(stream_position_pcm) as i64)
< PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS as i64) < PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS as i64)
&& stream_loader_controller.range_to_end_available() && stream_loader_controller.range_to_end_available()
{ {
@ -851,7 +845,7 @@ impl Future for PlayerInternal {
return Ok(Async::Ready(())); return Ok(Async::Ready(()));
} }
if (!self.sink_running) && all_futures_completed_or_not_ready { if (!self.state.is_playing()) && all_futures_completed_or_not_ready {
return Ok(Async::NotReady); return Ok(Async::NotReady);
} }
} }
@ -924,16 +918,18 @@ impl PlayerInternal {
track_id, track_id,
play_request_id, play_request_id,
stream_position_pcm, stream_position_pcm,
duration_ms,
.. ..
} = self.state } = self.state
{ {
self.state.paused_to_playing(); self.state.paused_to_playing();
let position_ms = Self::position_pcm_to_ms(stream_position_pcm); let position_ms = Self::position_pcm_to_ms(stream_position_pcm);
self.send_event(PlayerEvent::Started { self.send_event(PlayerEvent::Playing {
track_id, track_id,
play_request_id, play_request_id,
position_ms, position_ms,
duration_ms,
}); });
self.ensure_sink_running(); self.ensure_sink_running();
} else { } else {
@ -1011,44 +1007,9 @@ impl PlayerInternal {
play_request_id: u64, play_request_id: u64,
loaded_track: PlayerLoadedTrackData, loaded_track: PlayerLoadedTrackData,
start_playback: bool, start_playback: bool,
state_is_invalid_because_the_same_track_is_getting_repeated: bool,
) { ) {
let position_ms = Self::position_pcm_to_ms(loaded_track.stream_position_pcm); let position_ms = Self::position_pcm_to_ms(loaded_track.stream_position_pcm);
match self.state {
PlayerState::Playing {
track_id: old_track_id,
..
}
| PlayerState::Paused {
track_id: old_track_id,
..
}
| PlayerState::EndOfTrack {
track_id: old_track_id,
..
} => self.send_event(PlayerEvent::Changed {
old_track_id: old_track_id,
new_track_id: track_id,
}),
PlayerState::Stopped => self.send_event(PlayerEvent::Started {
track_id,
play_request_id,
position_ms,
}),
PlayerState::Loading { .. } => (),
PlayerState::Invalid { .. } => {
if state_is_invalid_because_the_same_track_is_getting_repeated {
self.send_event(PlayerEvent::Changed {
old_track_id: track_id,
new_track_id: track_id,
})
} else {
panic!("Player is in an invalid state.")
}
}
}
if start_playback { if start_playback {
self.ensure_sink_running(); self.ensure_sink_running();
@ -1074,6 +1035,8 @@ impl PlayerInternal {
suggested_to_preload_next_track: false, suggested_to_preload_next_track: false,
}; };
} else { } else {
self.ensure_sink_stopped();
self.state = PlayerState::Paused { self.state = PlayerState::Paused {
track_id: track_id, track_id: track_id,
play_request_id: play_request_id, play_request_id: play_request_id,
@ -1095,15 +1058,13 @@ impl PlayerInternal {
} }
} }
fn handle_command(&mut self, cmd: PlayerCommand) { fn handle_command_load(
debug!("command={:?}", cmd); &mut self,
match cmd { track_id: SpotifyId,
PlayerCommand::Load { play_request_id: u64,
track_id, play: bool,
play_request_id, position_ms: u32,
play, ) {
position_ms,
} => {
// emit the correct player event // emit the correct player event
match self.state { match self.state {
PlayerState::Playing { PlayerState::Playing {
@ -1133,7 +1094,8 @@ impl PlayerInternal {
PlayerState::Invalid { .. } => panic!("Player is in an invalid state."), PlayerState::Invalid { .. } => panic!("Player is in an invalid state."),
} }
let mut load_command_processed = false; // Now we check at different positions whether we already have a pre-loaded version
// of this track somewhere. If so, use it and return.
// Check if there's a matching loaded track in the EndOfTrack player state. // Check if there's a matching loaded track in the EndOfTrack player state.
// This is the case if we're repeating the same track again. // This is the case if we're repeating the same track again.
@ -1146,9 +1108,7 @@ impl PlayerInternal {
if previous_track_id == track_id { if previous_track_id == track_id {
let loaded_track = mem::replace(&mut *loaded_track, None); let loaded_track = mem::replace(&mut *loaded_track, None);
if let Some(mut loaded_track) = loaded_track { if let Some(mut loaded_track) = loaded_track {
if Self::position_ms_to_pcm(position_ms) if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm {
!= loaded_track.stream_position_pcm
{
loaded_track loaded_track
.stream_loader_controller .stream_loader_controller
.set_random_access_mode(); .set_random_access_mode();
@ -1157,17 +1117,10 @@ impl PlayerInternal {
// loaded already because we played // loaded already because we played
// to the end of it. // to the end of it.
loaded_track.stream_loader_controller.set_stream_mode(); loaded_track.stream_loader_controller.set_stream_mode();
loaded_track.stream_position_pcm = loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms);
Self::position_ms_to_pcm(position_ms);
} }
self.start_playback( self.start_playback(track_id, play_request_id, loaded_track, play);
track_id, return;
play_request_id,
loaded_track,
play,
false,
);
load_command_processed = true;
} }
} }
} }
@ -1189,6 +1142,7 @@ impl PlayerInternal {
} = self.state } = self.state
{ {
if current_track_id == track_id { if current_track_id == track_id {
// we can use the current decoder. Ensure it's at the correct position.
if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm { if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm {
stream_loader_controller.set_random_access_mode(); stream_loader_controller.set_random_access_mode();
let _ = decoder.seek(position_ms as i64); // This may be blocking. let _ = decoder.seek(position_ms as i64); // This may be blocking.
@ -1196,6 +1150,8 @@ impl PlayerInternal {
*stream_position_pcm = Self::position_ms_to_pcm(position_ms); *stream_position_pcm = Self::position_ms_to_pcm(position_ms);
} }
// Move the info from the current state into a PlayerLoadedTrackData so we can use
// the usual code path to start playback.
let old_state = mem::replace(&mut self.state, PlayerState::Invalid); let old_state = mem::replace(&mut self.state, PlayerState::Invalid);
if let PlayerState::Playing { if let PlayerState::Playing {
@ -1226,15 +1182,13 @@ impl PlayerInternal {
stream_position_pcm, stream_position_pcm,
}; };
self.start_playback( self.start_playback(track_id, play_request_id, loaded_track, play);
track_id,
play_request_id,
loaded_track,
play,
true,
);
load_command_processed = true; if let PlayerState::Invalid = self.state {
panic!("start_playback() hasn't set a valid player state.");
}
return;
} else { } else {
unreachable!(); unreachable!();
} }
@ -1242,7 +1196,6 @@ impl PlayerInternal {
} }
// Check if the requested track has been preloaded already. If so use the preloaded data. // Check if the requested track has been preloaded already. If so use the preloaded data.
if !load_command_processed {
if let PlayerPreload::Ready { if let PlayerPreload::Ready {
track_id: loaded_track_id, track_id: loaded_track_id,
.. ..
@ -1255,31 +1208,23 @@ impl PlayerInternal {
mut loaded_track, mut loaded_track,
} = preload } = preload
{ {
if Self::position_ms_to_pcm(position_ms) if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm {
!= loaded_track.stream_position_pcm
{
loaded_track loaded_track
.stream_loader_controller .stream_loader_controller
.set_random_access_mode(); .set_random_access_mode();
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking
loaded_track.stream_loader_controller.set_stream_mode(); loaded_track.stream_loader_controller.set_stream_mode();
} }
self.start_playback( self.start_playback(track_id, play_request_id, loaded_track, play);
track_id, return;
play_request_id, } else {
loaded_track, unreachable!();
play,
false,
);
load_command_processed = true;
}
} }
} }
} }
// We need to load the track - either from scratch or by completing a preload. // We need to load the track - either from scratch or by completing a preload.
// In any case we go into a Loading state to load the track. // In any case we go into a Loading state to load the track.
if !load_command_processed {
self.ensure_sink_stopped(); self.ensure_sink_stopped();
self.send_event(PlayerEvent::Loading { self.send_event(PlayerEvent::Loading {
@ -1288,6 +1233,7 @@ impl PlayerInternal {
position_ms, position_ms,
}); });
// Try to extract a pending loader from the preloading mechanism
let loader = if let PlayerPreload::Loading { let loader = if let PlayerPreload::Loading {
track_id: loaded_track_id, track_id: loaded_track_id,
.. ..
@ -1310,10 +1256,12 @@ impl PlayerInternal {
self.preload = PlayerPreload::None; self.preload = PlayerPreload::None;
// If we don't have a loader yet, create one from scratch.
let loader = loader let loader = loader
.or_else(|| Some(self.load_track(track_id, position_ms))) .or_else(|| Some(self.load_track(track_id, position_ms)))
.unwrap(); .unwrap();
// Set ourselves to a loading state.
self.state = PlayerState::Loading { self.state = PlayerState::Loading {
track_id, track_id,
play_request_id, play_request_id,
@ -1321,12 +1269,12 @@ impl PlayerInternal {
loader, loader,
}; };
} }
}
PlayerCommand::Preload { track_id } => { fn handle_command_preload(&mut self, track_id: SpotifyId) {
debug!("Preloading track"); debug!("Preloading track");
let mut preload_track = true; let mut preload_track = true;
// check whether the track is already loaded somewhere or being loaded.
if let PlayerPreload::Loading { if let PlayerPreload::Loading {
track_id: currently_loading, track_id: currently_loading,
.. ..
@ -1337,10 +1285,10 @@ impl PlayerInternal {
} = self.preload } = self.preload
{ {
if currently_loading == track_id { if currently_loading == track_id {
// we're already loading the requested track. // we're already preloading the requested track.
preload_track = false; preload_track = false;
} else { } else {
// we're loading something else - cancel it. // we're preloading something else - cancel it.
self.preload = PlayerPreload::None; self.preload = PlayerPreload::None;
} }
} }
@ -1364,13 +1312,14 @@ impl PlayerInternal {
} }
} }
// schedule the preload if the current track if desired.
if preload_track { if preload_track {
let loader = self.load_track(track_id, 0); let loader = self.load_track(track_id, 0);
self.preload = PlayerPreload::Loading { track_id, loader } self.preload = PlayerPreload::Loading { track_id, loader }
} }
} }
PlayerCommand::Seek(position_ms) => { fn handle_command_seek(&mut self, position_ms: u32) {
if let Some(stream_loader_controller) = self.state.stream_loader_controller() { if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
stream_loader_controller.set_random_access_mode(); stream_loader_controller.set_random_access_mode();
} }
@ -1400,6 +1349,7 @@ impl PlayerInternal {
stream_loader_controller.set_stream_mode(); stream_loader_controller.set_stream_mode();
} }
// ensure we have a bit of a buffer of downloaded data
self.preload_data_before_playback(); self.preload_data_before_playback();
if let PlayerState::Playing { if let PlayerState::Playing {
@ -1415,7 +1365,7 @@ impl PlayerInternal {
self.send_event(PlayerEvent::Playing { self.send_event(PlayerEvent::Playing {
track_id, track_id,
play_request_id, play_request_id,
position_ms: position_ms, position_ms,
duration_ms, duration_ms,
}); });
} }
@ -1429,19 +1379,29 @@ impl PlayerInternal {
self.send_event(PlayerEvent::Paused { self.send_event(PlayerEvent::Paused {
track_id, track_id,
play_request_id, play_request_id,
position_ms: position_ms, position_ms,
duration_ms, duration_ms,
}); });
} }
} }
PlayerCommand::Play => { fn handle_command(&mut self, cmd: PlayerCommand) {
self.handle_play(); debug!("command={:?}", cmd);
} match cmd {
PlayerCommand::Load {
track_id,
play_request_id,
play,
position_ms,
} => self.handle_command_load(track_id, play_request_id, play, position_ms),
PlayerCommand::Pause => { PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id),
self.handle_pause();
} PlayerCommand::Seek(position_ms) => self.handle_command_seek(position_ms),
PlayerCommand::Play => self.handle_play(),
PlayerCommand::Pause => self.handle_pause(),
PlayerCommand::Stop => self.handle_player_stop(), PlayerCommand::Stop => self.handle_player_stop(),

View file

@ -539,7 +539,8 @@ impl Future for Main {
if let Some(ref mut player_event_channel) = self.player_event_channel { if let Some(ref mut player_event_channel) = self.player_event_channel {
if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() { if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() {
if let Some(ref program) = self.player_event_program { if let Some(ref program) = self.player_event_program {
let child = run_program_on_events(event, program) if let Some(child) = run_program_on_events(event, program) {
let child = child
.expect("program failed to start") .expect("program failed to start")
.map(|status| { .map(|status| {
if !status.success() { if !status.success() {
@ -552,6 +553,7 @@ impl Future for Main {
} }
} }
} }
}
if !progress { if !progress {
return Ok(Async::NotReady); return Ok(Async::NotReady);

View file

@ -14,7 +14,7 @@ fn run_program(program: &str, env_vars: HashMap<&str, String>) -> io::Result<Chi
.spawn_async() .spawn_async()
} }
pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> io::Result<Child> { pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option<io::Result<Child>> {
let mut env_vars = HashMap::new(); let mut env_vars = HashMap::new();
match event { match event {
PlayerEvent::Changed { PlayerEvent::Changed {
@ -33,7 +33,7 @@ pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> io::Result<Ch
env_vars.insert("PLAYER_EVENT", "stop".to_string()); env_vars.insert("PLAYER_EVENT", "stop".to_string());
env_vars.insert("TRACK_ID", track_id.to_base62()); env_vars.insert("TRACK_ID", track_id.to_base62());
} }
_ => (), _ => return None,
} }
run_program(onevent, env_vars) Some(run_program(onevent, env_vars))
} }