Audio file seeking improvements

- Ensure there is enough disk space for the write file
- Switch streaming mode only if necessary
- Return `Err` on seeking errors, instead of exiting
- Use the actual position after seeking
This commit is contained in:
Roderick van Domburg 2022-01-06 21:55:08 +01:00
parent 6c25fb79dc
commit 8d74d48809
No known key found for this signature in database
GPG key ID: A9EF5222A26F0451
3 changed files with 86 additions and 100 deletions

View file

@ -325,12 +325,18 @@ struct AudioFileDownloadStatus {
downloaded: RangeSet, downloaded: RangeSet,
} }
#[derive(Copy, Clone, PartialEq, Eq)] #[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum DownloadStrategy { enum DownloadStrategy {
RandomAccess(), RandomAccess(),
Streaming(), Streaming(),
} }
impl Default for DownloadStrategy {
fn default() -> Self {
Self::Streaming()
}
}
struct AudioFileShared { struct AudioFileShared {
cdn_url: CdnUrl, cdn_url: CdnUrl,
file_size: usize, file_size: usize,
@ -456,13 +462,15 @@ impl AudioFileStreaming {
requested: RangeSet::new(), requested: RangeSet::new(),
downloaded: RangeSet::new(), downloaded: RangeSet::new(),
}), }),
download_strategy: Mutex::new(DownloadStrategy::Streaming()), download_strategy: Mutex::new(DownloadStrategy::default()),
number_of_open_requests: AtomicUsize::new(0), number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize), ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize),
read_position: AtomicUsize::new(0), read_position: AtomicUsize::new(0),
}); });
let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?; let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?;
write_file.as_file().set_len(file_size as u64)?;
let read_file = write_file.reopen()?; let read_file = write_file.reopen()?;
let (stream_loader_command_tx, stream_loader_command_rx) = let (stream_loader_command_tx, stream_loader_command_rx) =
@ -567,11 +575,44 @@ impl Read for AudioFileStreaming {
impl Seek for AudioFileStreaming { impl Seek for AudioFileStreaming {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
// If we are already at this position, we don't need to switch download strategy.
// These checks and locks are less expensive than interrupting streaming.
let current_position = self.position as i64;
let requested_pos = match pos {
SeekFrom::Start(pos) => pos as i64,
SeekFrom::End(pos) => self.shared.file_size as i64 - pos - 1,
SeekFrom::Current(pos) => current_position + pos,
};
if requested_pos == current_position {
return Ok(current_position as u64);
}
// Again if we have already downloaded this part.
let available = self
.shared
.download_status
.lock()
.downloaded
.contains(requested_pos as usize);
let mut old_strategy = DownloadStrategy::default();
if !available {
// Ensure random access mode if we need to download this part.
old_strategy = std::mem::replace(
&mut *(self.shared.download_strategy.lock()),
DownloadStrategy::RandomAccess(),
);
}
self.position = self.read_file.seek(pos)?; self.position = self.read_file.seek(pos)?;
// Do not seek past EOF
self.shared self.shared
.read_position .read_position
.store(self.position as usize, Ordering::Release); .store(self.position as usize, Ordering::Release);
if !available && old_strategy != DownloadStrategy::RandomAccess() {
*(self.shared.download_strategy.lock()) = old_strategy;
}
Ok(self.position) Ok(self.position)
} }
} }

View file

@ -59,6 +59,12 @@ pub trait AudioDecoder {
fn next_packet(&mut self) -> DecoderResult<Option<(u32, AudioPacket)>>; fn next_packet(&mut self) -> DecoderResult<Option<(u32, AudioPacket)>>;
} }
impl From<DecoderError> for librespot_core::error::Error {
fn from(err: DecoderError) -> Self {
librespot_core::error::Error::aborted(err)
}
}
impl From<symphonia::core::errors::Error> for DecoderError { impl From<symphonia::core::errors::Error> for DecoderError {
fn from(err: symphonia::core::errors::Error) -> Self { fn from(err: symphonia::core::errors::Error) -> Self {
Self::SymphoniaDecoder(err.to_string()) Self::SymphoniaDecoder(err.to_string())

View file

@ -591,25 +591,6 @@ impl PlayerState {
} }
} }
fn stream_loader_controller(&mut self) -> Option<&mut StreamLoaderController> {
use self::PlayerState::*;
match *self {
Stopped | EndOfTrack { .. } | Loading { .. } => None,
Paused {
ref mut stream_loader_controller,
..
}
| Playing {
ref mut stream_loader_controller,
..
} => Some(stream_loader_controller),
Invalid => {
error!("PlayerState::stream_loader_controller in invalid state");
exit(1);
}
}
}
fn playing_to_end_of_track(&mut self) { fn playing_to_end_of_track(&mut self) {
use self::PlayerState::*; use self::PlayerState::*;
let new_state = mem::replace(self, Invalid); let new_state = mem::replace(self, Invalid);
@ -891,9 +872,7 @@ impl PlayerTrackLoader {
let is_cached = encrypted_file.is_cached(); let is_cached = encrypted_file.is_cached();
// Setting up demuxing and decoding will trigger a seek() so always start in random access mode.
let stream_loader_controller = encrypted_file.get_stream_loader_controller().ok()?; let stream_loader_controller = encrypted_file.get_stream_loader_controller().ok()?;
stream_loader_controller.set_random_access_mode();
// Not all audio files are encrypted. If we can't get a key, try loading the track // Not all audio files are encrypted. If we can't get a key, try loading the track
// without decryption. If the file was encrypted after all, the decoder will fail // without decryption. If the file was encrypted after all, the decoder will fail
@ -917,11 +896,17 @@ impl PlayerTrackLoader {
(0, None) (0, None)
}; };
let audio_file = Subfile::new( let audio_file = match Subfile::new(
decrypted_file, decrypted_file,
offset, offset,
stream_loader_controller.len() as u64, stream_loader_controller.len() as u64,
); ) {
Ok(audio_file) => audio_file,
Err(e) => {
error!("PlayerTrackLoader::load_track error opening subfile: {}", e);
return None;
}
};
let result = if self.config.passthrough { let result = if self.config.passthrough {
PassthroughDecoder::new(audio_file, format).map(|x| Box::new(x) as Decoder) PassthroughDecoder::new(audio_file, format).map(|x| Box::new(x) as Decoder)
@ -976,18 +961,17 @@ impl PlayerTrackLoader {
// matter for playback (but won't hurt either), but may be useful for the // matter for playback (but won't hurt either), but may be useful for the
// passthrough decoder. // passthrough decoder.
let stream_position_ms = match decoder.seek(position_ms) { let stream_position_ms = match decoder.seek(position_ms) {
Ok(_) => position_ms, Ok(new_position_ms) => new_position_ms,
Err(e) => { Err(e) => {
warn!( error!(
"PlayerTrackLoader::load_track error seeking to {}: {}", "PlayerTrackLoader::load_track error seeking to starting position {}: {}",
position_ms, e position_ms, e
); );
0 return None;
} }
}; };
// Transition from random access mode to streaming mode now that // Ensure streaming mode now that we are ready to play from the requested position.
// we are ready to play from the requested position.
stream_loader_controller.set_stream_mode(); stream_loader_controller.set_stream_mode();
info!("<{}> ({} ms) loaded", audio.name, audio.duration); info!("<{}> ({} ms) loaded", audio.name, audio.duration);
@ -1585,7 +1569,7 @@ impl PlayerInternal {
play_request_id: u64, play_request_id: u64,
play: bool, play: bool,
position_ms: u32, position_ms: u32,
) { ) -> PlayerResult {
if !self.config.gapless { if !self.config.gapless {
self.ensure_sink_stopped(play); self.ensure_sink_stopped(play);
} }
@ -1616,11 +1600,10 @@ impl PlayerInternal {
position_ms, position_ms,
}), }),
PlayerState::Invalid { .. } => { PlayerState::Invalid { .. } => {
error!( return Err(Error::internal(format!(
"Player::handle_command_load called from invalid state: {:?}", "Player::handle_command_load called from invalid state: {:?}",
self.state self.state
); )));
exit(1);
} }
} }
@ -1638,29 +1621,20 @@ impl PlayerInternal {
let mut loaded_track = match mem::replace(&mut self.state, PlayerState::Invalid) { let mut loaded_track = match mem::replace(&mut self.state, PlayerState::Invalid) {
PlayerState::EndOfTrack { loaded_track, .. } => loaded_track, PlayerState::EndOfTrack { loaded_track, .. } => loaded_track,
_ => { _ => {
error!("PlayerInternal handle_command_load: Invalid PlayerState"); return Err(Error::internal(format!("PlayerInternal::handle_command_load repeating the same track: invalid state: {:?}", self.state)));
exit(1);
} }
}; };
if position_ms != loaded_track.stream_position_ms { if position_ms != loaded_track.stream_position_ms {
loaded_track
.stream_loader_controller
.set_random_access_mode();
// This may be blocking. // This may be blocking.
match loaded_track.decoder.seek(position_ms) { loaded_track.stream_position_ms = loaded_track.decoder.seek(position_ms)?;
Ok(_) => loaded_track.stream_position_ms = position_ms,
Err(e) => error!("PlayerInternal handle_command_load: {}", e),
}
loaded_track.stream_loader_controller.set_stream_mode();
} }
self.preload = PlayerPreload::None; self.preload = PlayerPreload::None;
self.start_playback(track_id, play_request_id, loaded_track, play); self.start_playback(track_id, play_request_id, loaded_track, play);
if let PlayerState::Invalid = self.state { if let PlayerState::Invalid = self.state {
error!("start_playback() hasn't set a valid player state."); return Err(Error::internal(format!("PlayerInternal::handle_command_load repeating the same track: start_playback() did not transition to valid player state: {:?}", self.state)));
exit(1);
} }
return; return Ok(());
} }
} }
@ -1669,29 +1643,20 @@ impl PlayerInternal {
track_id: current_track_id, track_id: current_track_id,
ref mut stream_position_ms, ref mut stream_position_ms,
ref mut decoder, ref mut decoder,
ref mut stream_loader_controller,
.. ..
} }
| PlayerState::Paused { | PlayerState::Paused {
track_id: current_track_id, track_id: current_track_id,
ref mut stream_position_ms, ref mut stream_position_ms,
ref mut decoder, ref mut decoder,
ref mut stream_loader_controller,
.. ..
} = self.state } = self.state
{ {
if current_track_id == track_id { if current_track_id == track_id {
// we can use the current decoder. Ensure it's at the correct position. // we can use the current decoder. Ensure it's at the correct position.
if position_ms != *stream_position_ms { if position_ms != *stream_position_ms {
stream_loader_controller.set_random_access_mode();
// This may be blocking. // This may be blocking.
match decoder.seek(position_ms) { *stream_position_ms = decoder.seek(position_ms)?;
Ok(_) => *stream_position_ms = position_ms,
Err(e) => {
error!("PlayerInternal::handle_command_load error seeking: {}", e)
}
}
stream_loader_controller.set_stream_mode();
} }
// Move the info from the current state into a PlayerLoadedTrackData so we can use // Move the info from the current state into a PlayerLoadedTrackData so we can use
@ -1733,14 +1698,12 @@ impl PlayerInternal {
self.start_playback(track_id, play_request_id, loaded_track, play); self.start_playback(track_id, play_request_id, loaded_track, play);
if let PlayerState::Invalid = self.state { if let PlayerState::Invalid = self.state {
error!("start_playback() hasn't set a valid player state."); return Err(Error::internal(format!("PlayerInternal::handle_command_load already playing this track: start_playback() did not transition to valid player state: {:?}", self.state)));
exit(1);
} }
return; return Ok(());
} else { } else {
error!("PlayerInternal handle_command_load: Invalid PlayerState"); return Err(Error::internal(format!("PlayerInternal::handle_command_load already playing this track: invalid state: {:?}", self.state)));
exit(1);
} }
} }
} }
@ -1759,21 +1722,13 @@ impl PlayerInternal {
} = preload } = preload
{ {
if position_ms != loaded_track.stream_position_ms { if position_ms != loaded_track.stream_position_ms {
loaded_track
.stream_loader_controller
.set_random_access_mode();
// This may be blocking // This may be blocking
match loaded_track.decoder.seek(position_ms) { loaded_track.stream_position_ms = loaded_track.decoder.seek(position_ms)?;
Ok(_) => loaded_track.stream_position_ms = position_ms,
Err(e) => error!("PlayerInternal handle_command_load: {}", e),
}
loaded_track.stream_loader_controller.set_stream_mode();
} }
self.start_playback(track_id, play_request_id, *loaded_track, play); self.start_playback(track_id, play_request_id, *loaded_track, play);
return; return Ok(());
} else { } else {
error!("PlayerInternal handle_command_load: Invalid PlayerState"); return Err(Error::internal(format!("PlayerInternal::handle_command_loading preloaded track: invalid state: {:?}", self.state)));
exit(1);
} }
} }
} }
@ -1821,6 +1776,8 @@ impl PlayerInternal {
start_playback: play, start_playback: play,
loader, loader,
}; };
Ok(())
} }
fn handle_command_preload(&mut self, track_id: SpotifyId) { fn handle_command_preload(&mut self, track_id: SpotifyId) {
@ -1875,12 +1832,9 @@ impl PlayerInternal {
} }
fn handle_command_seek(&mut self, position_ms: u32) -> PlayerResult { fn handle_command_seek(&mut self, position_ms: u32) -> PlayerResult {
if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
stream_loader_controller.set_random_access_mode();
}
if let Some(decoder) = self.state.decoder() { if let Some(decoder) = self.state.decoder() {
match decoder.seek(position_ms) { match decoder.seek(position_ms) {
Ok(_) => { Ok(new_position_ms) => {
if let PlayerState::Playing { if let PlayerState::Playing {
ref mut stream_position_ms, ref mut stream_position_ms,
.. ..
@ -1890,7 +1844,7 @@ impl PlayerInternal {
.. ..
} = self.state } = self.state
{ {
*stream_position_ms = position_ms; *stream_position_ms = new_position_ms;
} }
} }
Err(e) => error!("PlayerInternal::handle_command_seek error: {}", e), Err(e) => error!("PlayerInternal::handle_command_seek error: {}", e),
@ -1899,11 +1853,6 @@ impl PlayerInternal {
error!("Player::seek called from invalid state: {:?}", self.state); error!("Player::seek called from invalid state: {:?}", self.state);
} }
// If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun.
if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
stream_loader_controller.set_stream_mode();
}
// ensure we have a bit of a buffer of downloaded data // ensure we have a bit of a buffer of downloaded data
self.preload_data_before_playback()?; self.preload_data_before_playback()?;
@ -1950,7 +1899,7 @@ impl PlayerInternal {
play_request_id, play_request_id,
play, play,
position_ms, position_ms,
} => self.handle_command_load(track_id, play_request_id, play, position_ms), } => self.handle_command_load(track_id, play_request_id, play, position_ms)?,
PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id), PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id),
@ -2191,25 +2140,15 @@ struct Subfile<T: Read + Seek> {
} }
impl<T: Read + Seek> Subfile<T> { impl<T: Read + Seek> Subfile<T> {
pub fn new(mut stream: T, offset: u64, length: u64) -> Subfile<T> { pub fn new(mut stream: T, offset: u64, length: u64) -> Result<Subfile<T>, io::Error> {
let target = SeekFrom::Start(offset); let target = SeekFrom::Start(offset);
match stream.seek(target) { stream.seek(target)?;
Ok(pos) => {
if pos != offset {
error!(
"Subfile::new seeking to {:?} but position is now {:?}",
target, pos
);
}
}
Err(e) => error!("Subfile new Error: {}", e),
}
Subfile { Ok(Subfile {
stream, stream,
offset, offset,
length, length,
} })
} }
} }