From d380f1f04049719e6d973baed0fb57dbacad790c Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 7 Jan 2022 11:13:23 +0100 Subject: [PATCH 1/2] Simplify `AudioPacketPosition` --- playback/src/decoder/mod.rs | 11 +---------- playback/src/decoder/passthrough_decoder.rs | 6 ++---- playback/src/decoder/symphonia_decoder.rs | 10 ++++------ playback/src/player.rs | 6 ++---- 4 files changed, 9 insertions(+), 24 deletions(-) diff --git a/playback/src/decoder/mod.rs b/playback/src/decoder/mod.rs index 05279c1b..2526da34 100644 --- a/playback/src/decoder/mod.rs +++ b/playback/src/decoder/mod.rs @@ -56,19 +56,10 @@ impl AudioPacket { } } -#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)] -pub enum AudioPositionKind { - // the position is at the expected packet - Current, - // the decoder skipped some corrupted or invalid data, - // and the position is now later than expected - SkippedTo, -} - #[derive(Debug, Clone)] pub struct AudioPacketPosition { pub position_ms: u32, - pub kind: AudioPositionKind, + pub skipped: bool, } impl Deref for AudioPacketPosition { diff --git a/playback/src/decoder/passthrough_decoder.rs b/playback/src/decoder/passthrough_decoder.rs index ec3a7753..b04b8e0d 100644 --- a/playback/src/decoder/passthrough_decoder.rs +++ b/playback/src/decoder/passthrough_decoder.rs @@ -7,9 +7,7 @@ use std::{ // TODO: move this to the Symphonia Ogg demuxer use ogg::{OggReadError, Packet, PacketReader, PacketWriteEndInfo, PacketWriter}; -use super::{ - AudioDecoder, AudioPacket, AudioPacketPosition, AudioPositionKind, DecoderError, DecoderResult, -}; +use super::{AudioDecoder, AudioPacket, AudioPacketPosition, DecoderError, DecoderResult}; use crate::{ metadata::audio::{AudioFileFormat, AudioFiles}, @@ -212,7 +210,7 @@ impl AudioDecoder for PassthroughDecoder { let position_ms = Self::position_pcm_to_ms(pckgp_page); let packet_position = AudioPacketPosition { position_ms, - kind: AudioPositionKind::Current, + skipped: false, }; let ogg_data = AudioPacket::Raw(std::mem::take(data)); diff --git a/playback/src/decoder/symphonia_decoder.rs b/playback/src/decoder/symphonia_decoder.rs index 049e4998..27cb9e83 100644 --- a/playback/src/decoder/symphonia_decoder.rs +++ b/playback/src/decoder/symphonia_decoder.rs @@ -16,9 +16,7 @@ use symphonia::{ }, }; -use super::{ - AudioDecoder, AudioPacket, AudioPacketPosition, AudioPositionKind, DecoderError, DecoderResult, -}; +use super::{AudioDecoder, AudioPacket, AudioPacketPosition, DecoderError, DecoderResult}; use crate::{ metadata::audio::{AudioFileFormat, AudioFiles}, @@ -173,7 +171,7 @@ impl AudioDecoder for SymphoniaDecoder { } fn next_packet(&mut self) -> DecoderResult> { - let mut position_kind = AudioPositionKind::Current; + let mut skipped = false; loop { let packet = match self.format.next_packet() { @@ -193,7 +191,7 @@ impl AudioDecoder for SymphoniaDecoder { let position_ms = self.ts_to_ms(packet.pts()); let packet_position = AudioPacketPosition { position_ms, - kind: position_kind, + skipped, }; match self.decoder.decode(&packet) { @@ -215,7 +213,7 @@ impl AudioDecoder for SymphoniaDecoder { // The packet failed to decode due to corrupted or invalid data, get a new // packet and try again. warn!("Skipping malformed audio packet at {} ms", position_ms); - position_kind = AudioPositionKind::SkippedTo; + skipped = true; continue; } Err(err) => return Err(err.into()), diff --git a/playback/src/player.rs b/playback/src/player.rs index f8319798..cfa4414e 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -30,8 +30,7 @@ use crate::{ convert::Converter, core::{util::SeqGenerator, Error, Session, SpotifyId}, decoder::{ - AudioDecoder, AudioPacket, AudioPacketPosition, AudioPositionKind, PassthroughDecoder, - SymphoniaDecoder, + AudioDecoder, AudioPacket, AudioPacketPosition, PassthroughDecoder, SymphoniaDecoder, }, metadata::audio::{AudioFileFormat, AudioFiles, AudioItem}, mixer::AudioFilter, @@ -1116,8 +1115,7 @@ impl Future for PlayerInternal { // Only notify if we're skipped some packets *or* we are behind. // If we're ahead it's probably due to a buffer of the backend // and we're actually in time. - let notify_about_position = packet_position.kind - != AudioPositionKind::Current + let notify_about_position = packet_position.skipped || match *reported_nominal_start_time { None => true, Some(reported_nominal_start_time) => { From 62ccdbc580e5e42f3abe7b2677c9d967f6c1d31f Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Fri, 7 Jan 2022 11:38:24 +0100 Subject: [PATCH 2/2] Improve performance of getting/setting download mode --- audio/src/fetch/mod.rs | 87 ++++++++++++++++++-------------------- audio/src/fetch/receive.rs | 30 +++++++------ 2 files changed, 56 insertions(+), 61 deletions(-) diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index 0bc1f74c..4a7742ec 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -5,7 +5,7 @@ use std::{ fs, io::{self, Read, Seek, SeekFrom}, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, time::{Duration, Instant}, @@ -137,10 +137,10 @@ pub struct StreamingRequest { #[derive(Debug)] pub enum StreamLoaderCommand { - Fetch(Range), // signal the stream loader to fetch a range of the file - RandomAccessMode(), // optimise download strategy for random access - StreamMode(), // optimise download strategy for streaming - Close(), // terminate and don't load any more data + Fetch(Range), // signal the stream loader to fetch a range of the file + RandomAccessMode, // optimise download strategy for random access + StreamMode, // optimise download strategy for streaming + Close, // terminate and don't load any more data } #[derive(Clone)] @@ -299,17 +299,17 @@ impl StreamLoaderController { pub fn set_random_access_mode(&self) { // optimise download strategy for random access - self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode()); + self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode); } pub fn set_stream_mode(&self) { // optimise download strategy for streaming - self.send_stream_loader_command(StreamLoaderCommand::StreamMode()); + self.send_stream_loader_command(StreamLoaderCommand::StreamMode); } pub fn close(&self) { // terminate stream loading and don't load any more data for this file. - self.send_stream_loader_command(StreamLoaderCommand::Close()); + self.send_stream_loader_command(StreamLoaderCommand::Close); } } @@ -325,25 +325,13 @@ struct AudioFileDownloadStatus { downloaded: RangeSet, } -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -enum DownloadStrategy { - RandomAccess(), - Streaming(), -} - -impl Default for DownloadStrategy { - fn default() -> Self { - Self::Streaming() - } -} - struct AudioFileShared { cdn_url: CdnUrl, file_size: usize, bytes_per_second: usize, cond: Condvar, download_status: Mutex, - download_strategy: Mutex, + download_streaming: AtomicBool, number_of_open_requests: AtomicUsize, ping_time_ms: AtomicUsize, read_position: AtomicUsize, @@ -462,7 +450,7 @@ impl AudioFileStreaming { requested: RangeSet::new(), downloaded: RangeSet::new(), }), - download_strategy: Mutex::new(DownloadStrategy::default()), + download_streaming: AtomicBool::new(true), number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize), read_position: AtomicUsize::new(0), @@ -507,24 +495,23 @@ impl Read for AudioFileStreaming { return Ok(0); } - let length_to_request = match *(self.shared.download_strategy.lock()) { - DownloadStrategy::RandomAccess() => length, - DownloadStrategy::Streaming() => { - // Due to the read-ahead stuff, we potentially request more than the actual request demanded. - let ping_time_seconds = - Duration::from_millis(self.shared.ping_time_ms.load(Ordering::Relaxed) as u64) - .as_secs_f32(); + let length_to_request = if self.shared.download_streaming.load(Ordering::Acquire) { + // Due to the read-ahead stuff, we potentially request more than the actual request demanded. + let ping_time_seconds = + Duration::from_millis(self.shared.ping_time_ms.load(Ordering::Relaxed) as u64) + .as_secs_f32(); - let length_to_request = length - + max( - (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() - * self.shared.bytes_per_second as f32) as usize, - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS - * ping_time_seconds - * self.shared.bytes_per_second as f32) as usize, - ); - min(length_to_request, self.shared.file_size - offset) - } + let length_to_request = length + + max( + (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32) + as usize, + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * ping_time_seconds + * self.shared.bytes_per_second as f32) as usize, + ); + min(length_to_request, self.shared.file_size - offset) + } else { + length }; let mut ranges_to_request = RangeSet::new(); @@ -575,7 +562,7 @@ impl Read for AudioFileStreaming { impl Seek for AudioFileStreaming { fn seek(&mut self, pos: SeekFrom) -> io::Result { - // If we are already at this position, we don't need to switch download strategy. + // If we are already at this position, we don't need to switch download mode. // These checks and locks are less expensive than interrupting streaming. let current_position = self.position as i64; let requested_pos = match pos { @@ -595,13 +582,17 @@ impl Seek for AudioFileStreaming { .downloaded .contains(requested_pos as usize); - let mut old_strategy = DownloadStrategy::default(); + let mut was_streaming = false; if !available { // Ensure random access mode if we need to download this part. - old_strategy = std::mem::replace( - &mut *(self.shared.download_strategy.lock()), - DownloadStrategy::RandomAccess(), - ); + // Checking whether we are streaming now is a micro-optimization + // to save an atomic load. + was_streaming = self.shared.download_streaming.load(Ordering::Acquire); + if was_streaming { + self.shared + .download_streaming + .store(false, Ordering::Release); + } } self.position = self.read_file.seek(pos)?; @@ -609,8 +600,10 @@ impl Seek for AudioFileStreaming { .read_position .store(self.position as usize, Ordering::Release); - if !available && old_strategy != DownloadStrategy::RandomAccess() { - *(self.shared.download_strategy.lock()) = old_strategy; + if !available && was_streaming { + self.shared + .download_streaming + .store(true, Ordering::Release); } Ok(self.position) diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index 08013b5b..274f0c89 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -16,9 +16,9 @@ use librespot_core::{session::Session, Error}; use crate::range_set::{Range, RangeSet}; use super::{ - AudioFileError, AudioFileResult, AudioFileShared, DownloadStrategy, StreamLoaderCommand, - StreamingRequest, FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, - MAX_PREFETCH_REQUESTS, MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR, + AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, StreamingRequest, + FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS, + MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR, }; struct PartialFileData { @@ -157,8 +157,8 @@ enum ControlFlow { } impl AudioFileFetch { - fn get_download_strategy(&mut self) -> DownloadStrategy { - *(self.shared.download_strategy.lock()) + fn is_download_streaming(&mut self) -> bool { + self.shared.download_streaming.load(Ordering::Acquire) } fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult { @@ -337,15 +337,17 @@ impl AudioFileFetch { ) -> Result { match cmd { StreamLoaderCommand::Fetch(request) => { - self.download_range(request.start, request.length)?; + self.download_range(request.start, request.length)? } - StreamLoaderCommand::RandomAccessMode() => { - *(self.shared.download_strategy.lock()) = DownloadStrategy::RandomAccess(); - } - StreamLoaderCommand::StreamMode() => { - *(self.shared.download_strategy.lock()) = DownloadStrategy::Streaming(); - } - StreamLoaderCommand::Close() => return Ok(ControlFlow::Break), + StreamLoaderCommand::RandomAccessMode => self + .shared + .download_streaming + .store(false, Ordering::Release), + StreamLoaderCommand::StreamMode => self + .shared + .download_streaming + .store(true, Ordering::Release), + StreamLoaderCommand::Close => return Ok(ControlFlow::Break), } Ok(ControlFlow::Continue) @@ -430,7 +432,7 @@ pub(super) async fn audio_file_fetch( else => (), } - if fetch.get_download_strategy() == DownloadStrategy::Streaming() { + if fetch.is_download_streaming() { let number_of_open_requests = fetch.shared.number_of_open_requests.load(Ordering::SeqCst); if number_of_open_requests < MAX_PREFETCH_REQUESTS {