From 8d74d48809f5440fae3aaaf13a29ac8b0321c5a9 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Thu, 6 Jan 2022 21:55:08 +0100 Subject: [PATCH] Audio file seeking improvements - Ensure there is enough disk space for the write file - Switch streaming mode only if necessary - Return `Err` on seeking errors, instead of exiting - Use the actual position after seeking --- audio/src/fetch/mod.rs | 47 ++++++++++++- playback/src/decoder/mod.rs | 6 ++ playback/src/player.rs | 133 ++++++++++-------------------------- 3 files changed, 86 insertions(+), 100 deletions(-) diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index ad1b98e1..0bc1f74c 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -325,12 +325,18 @@ struct AudioFileDownloadStatus { downloaded: RangeSet, } -#[derive(Copy, Clone, PartialEq, Eq)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] enum DownloadStrategy { RandomAccess(), Streaming(), } +impl Default for DownloadStrategy { + fn default() -> Self { + Self::Streaming() + } +} + struct AudioFileShared { cdn_url: CdnUrl, file_size: usize, @@ -456,13 +462,15 @@ impl AudioFileStreaming { requested: RangeSet::new(), downloaded: RangeSet::new(), }), - download_strategy: Mutex::new(DownloadStrategy::Streaming()), + download_strategy: Mutex::new(DownloadStrategy::default()), number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize), read_position: AtomicUsize::new(0), }); let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?; + write_file.as_file().set_len(file_size as u64)?; + let read_file = write_file.reopen()?; let (stream_loader_command_tx, stream_loader_command_rx) = @@ -567,11 +575,44 @@ impl Read for AudioFileStreaming { impl Seek for AudioFileStreaming { fn seek(&mut self, pos: SeekFrom) -> io::Result { + // If we are already at this position, we don't need to switch download strategy. + // These checks and locks are less expensive than interrupting streaming. + let current_position = self.position as i64; + let requested_pos = match pos { + SeekFrom::Start(pos) => pos as i64, + SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1, + SeekFrom::Current(pos) => current_position + pos, + }; + if requested_pos == current_position { + return Ok(current_position as u64); + } + + // Again if we have already downloaded this part. + let available = self + .shared + .download_status + .lock() + .downloaded + .contains(requested_pos as usize); + + let mut old_strategy = DownloadStrategy::default(); + if !available { + // Ensure random access mode if we need to download this part. + old_strategy = std::mem::replace( + &mut *(self.shared.download_strategy.lock()), + DownloadStrategy::RandomAccess(), + ); + } + self.position = self.read_file.seek(pos)?; - // Do not seek past EOF self.shared .read_position .store(self.position as usize, Ordering::Release); + + if !available && old_strategy != DownloadStrategy::RandomAccess() { + *(self.shared.download_strategy.lock()) = old_strategy; + } + Ok(self.position) } } diff --git a/playback/src/decoder/mod.rs b/playback/src/decoder/mod.rs index 8d4cc1c8..e74f92b7 100644 --- a/playback/src/decoder/mod.rs +++ b/playback/src/decoder/mod.rs @@ -59,6 +59,12 @@ pub trait AudioDecoder { fn next_packet(&mut self) -> DecoderResult>; } +impl From for librespot_core::error::Error { + fn from(err: DecoderError) -> Self { + librespot_core::error::Error::aborted(err) + } +} + impl From for DecoderError { fn from(err: symphonia::core::errors::Error) -> Self { Self::SymphoniaDecoder(err.to_string()) diff --git a/playback/src/player.rs b/playback/src/player.rs index a382b6c6..56c57334 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -591,25 +591,6 @@ impl PlayerState { } } - fn stream_loader_controller(&mut self) -> Option<&mut StreamLoaderController> { - use self::PlayerState::*; - match *self { - Stopped | EndOfTrack { .. } | Loading { .. } => None, - Paused { - ref mut stream_loader_controller, - .. - } - | Playing { - ref mut stream_loader_controller, - .. - } => Some(stream_loader_controller), - Invalid => { - error!("PlayerState::stream_loader_controller in invalid state"); - exit(1); - } - } - } - fn playing_to_end_of_track(&mut self) { use self::PlayerState::*; let new_state = mem::replace(self, Invalid); @@ -891,9 +872,7 @@ impl PlayerTrackLoader { let is_cached = encrypted_file.is_cached(); - // Setting up demuxing and decoding will trigger a seek() so always start in random access mode. let stream_loader_controller = encrypted_file.get_stream_loader_controller().ok()?; - stream_loader_controller.set_random_access_mode(); // Not all audio files are encrypted. If we can't get a key, try loading the track // without decryption. If the file was encrypted after all, the decoder will fail @@ -917,11 +896,17 @@ impl PlayerTrackLoader { (0, None) }; - let audio_file = Subfile::new( + let audio_file = match Subfile::new( decrypted_file, offset, stream_loader_controller.len() as u64, - ); + ) { + Ok(audio_file) => audio_file, + Err(e) => { + error!("PlayerTrackLoader::load_track error opening subfile: {}", e); + return None; + } + }; let result = if self.config.passthrough { PassthroughDecoder::new(audio_file, format).map(|x| Box::new(x) as Decoder) @@ -976,18 +961,17 @@ impl PlayerTrackLoader { // matter for playback (but won't hurt either), but may be useful for the // passthrough decoder. let stream_position_ms = match decoder.seek(position_ms) { - Ok(_) => position_ms, + Ok(new_position_ms) => new_position_ms, Err(e) => { - warn!( - "PlayerTrackLoader::load_track error seeking to {}: {}", + error!( + "PlayerTrackLoader::load_track error seeking to starting position {}: {}", position_ms, e ); - 0 + return None; } }; - // Transition from random access mode to streaming mode now that - // we are ready to play from the requested position. + // Ensure streaming mode now that we are ready to play from the requested position. stream_loader_controller.set_stream_mode(); info!("<{}> ({} ms) loaded", audio.name, audio.duration); @@ -1585,7 +1569,7 @@ impl PlayerInternal { play_request_id: u64, play: bool, position_ms: u32, - ) { + ) -> PlayerResult { if !self.config.gapless { self.ensure_sink_stopped(play); } @@ -1616,11 +1600,10 @@ impl PlayerInternal { position_ms, }), PlayerState::Invalid { .. } => { - error!( + return Err(Error::internal(format!( "Player::handle_command_load called from invalid state: {:?}", self.state - ); - exit(1); + ))); } } @@ -1638,29 +1621,20 @@ impl PlayerInternal { let mut loaded_track = match mem::replace(&mut self.state, PlayerState::Invalid) { PlayerState::EndOfTrack { loaded_track, .. } => loaded_track, _ => { - error!("PlayerInternal handle_command_load: Invalid PlayerState"); - exit(1); + return Err(Error::internal(format!("PlayerInternal::handle_command_load repeating the same track: invalid state: {:?}", self.state))); } }; if position_ms != loaded_track.stream_position_ms { - loaded_track - .stream_loader_controller - .set_random_access_mode(); // This may be blocking. - match loaded_track.decoder.seek(position_ms) { - Ok(_) => loaded_track.stream_position_ms = position_ms, - Err(e) => error!("PlayerInternal handle_command_load: {}", e), - } - loaded_track.stream_loader_controller.set_stream_mode(); + loaded_track.stream_position_ms = loaded_track.decoder.seek(position_ms)?; } self.preload = PlayerPreload::None; self.start_playback(track_id, play_request_id, loaded_track, play); if let PlayerState::Invalid = self.state { - error!("start_playback() hasn't set a valid player state."); - exit(1); + return Err(Error::internal(format!("PlayerInternal::handle_command_load repeating the same track: start_playback() did not transition to valid player state: {:?}", self.state))); } - return; + return Ok(()); } } @@ -1669,29 +1643,20 @@ impl PlayerInternal { track_id: current_track_id, ref mut stream_position_ms, ref mut decoder, - ref mut stream_loader_controller, .. } | PlayerState::Paused { track_id: current_track_id, ref mut stream_position_ms, ref mut decoder, - ref mut stream_loader_controller, .. } = self.state { if current_track_id == track_id { // we can use the current decoder. Ensure it's at the correct position. if position_ms != *stream_position_ms { - stream_loader_controller.set_random_access_mode(); // This may be blocking. - match decoder.seek(position_ms) { - Ok(_) => *stream_position_ms = position_ms, - Err(e) => { - error!("PlayerInternal::handle_command_load error seeking: {}", e) - } - } - stream_loader_controller.set_stream_mode(); + *stream_position_ms = decoder.seek(position_ms)?; } // Move the info from the current state into a PlayerLoadedTrackData so we can use @@ -1733,14 +1698,12 @@ impl PlayerInternal { self.start_playback(track_id, play_request_id, loaded_track, play); if let PlayerState::Invalid = self.state { - error!("start_playback() hasn't set a valid player state."); - exit(1); + return Err(Error::internal(format!("PlayerInternal::handle_command_load already playing this track: start_playback() did not transition to valid player state: {:?}", self.state))); } - return; + return Ok(()); } else { - error!("PlayerInternal handle_command_load: Invalid PlayerState"); - exit(1); + return Err(Error::internal(format!("PlayerInternal::handle_command_load already playing this track: invalid state: {:?}", self.state))); } } } @@ -1759,21 +1722,13 @@ impl PlayerInternal { } = preload { if position_ms != loaded_track.stream_position_ms { - loaded_track - .stream_loader_controller - .set_random_access_mode(); // This may be blocking - match loaded_track.decoder.seek(position_ms) { - Ok(_) => loaded_track.stream_position_ms = position_ms, - Err(e) => error!("PlayerInternal handle_command_load: {}", e), - } - loaded_track.stream_loader_controller.set_stream_mode(); + loaded_track.stream_position_ms = loaded_track.decoder.seek(position_ms)?; } self.start_playback(track_id, play_request_id, *loaded_track, play); - return; + return Ok(()); } else { - error!("PlayerInternal handle_command_load: Invalid PlayerState"); - exit(1); + return Err(Error::internal(format!("PlayerInternal::handle_command_loading preloaded track: invalid state: {:?}", self.state))); } } } @@ -1821,6 +1776,8 @@ impl PlayerInternal { start_playback: play, loader, }; + + Ok(()) } fn handle_command_preload(&mut self, track_id: SpotifyId) { @@ -1875,12 +1832,9 @@ impl PlayerInternal { } fn handle_command_seek(&mut self, position_ms: u32) -> PlayerResult { - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - stream_loader_controller.set_random_access_mode(); - } if let Some(decoder) = self.state.decoder() { match decoder.seek(position_ms) { - Ok(_) => { + Ok(new_position_ms) => { if let PlayerState::Playing { ref mut stream_position_ms, .. @@ -1890,7 +1844,7 @@ impl PlayerInternal { .. } = self.state { - *stream_position_ms = position_ms; + *stream_position_ms = new_position_ms; } } Err(e) => error!("PlayerInternal::handle_command_seek error: {}", e), @@ -1899,11 +1853,6 @@ impl PlayerInternal { error!("Player::seek called from invalid state: {:?}", self.state); } - // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. - if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - stream_loader_controller.set_stream_mode(); - } - // ensure we have a bit of a buffer of downloaded data self.preload_data_before_playback()?; @@ -1950,7 +1899,7 @@ impl PlayerInternal { play_request_id, play, position_ms, - } => self.handle_command_load(track_id, play_request_id, play, position_ms), + } => self.handle_command_load(track_id, play_request_id, play, position_ms)?, PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id), @@ -2191,25 +2140,15 @@ struct Subfile { } impl Subfile { - pub fn new(mut stream: T, offset: u64, length: u64) -> Subfile { + pub fn new(mut stream: T, offset: u64, length: u64) -> Result, io::Error> { let target = SeekFrom::Start(offset); - match stream.seek(target) { - Ok(pos) => { - if pos != offset { - error!( - "Subfile::new seeking to {:?} but position is now {:?}", - target, pos - ); - } - } - Err(e) => error!("Subfile new Error: {}", e), - } + stream.seek(target)?; - Subfile { + Ok(Subfile { stream, offset, length, - } + }) } }