From e175a88f5bc27589c93e97ba33278a9838236920 Mon Sep 17 00:00:00 2001 From: Domenico Cerasuolo Date: Sun, 17 Dec 2023 18:06:15 +0100 Subject: [PATCH] Make audio fetch parameters tunable This change collects all those audio fetch parameters that were defined as static constants into a dedicated struct, AudioFetchParams. This struct can be read and set statically, allowing a user of the library to modify those parameters without the need to recompile. --- audio/src/fetch/mod.rs | 108 +++++++++++++++------- audio/src/fetch/receive.rs | 29 +++--- audio/src/lib.rs | 3 +- playback/src/decoder/symphonia_decoder.rs | 2 +- playback/src/player.rs | 10 +- 5 files changed, 96 insertions(+), 56 deletions(-) diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index e343ee1f..f6071936 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -6,7 +6,7 @@ use std::{ io::{self, Read, Seek, SeekFrom}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, + Arc, OnceLock, }, time::Duration, }; @@ -55,42 +55,75 @@ impl From for Error { } } -/// The minimum size of a block that is requested from the Spotify servers in one request. -/// This is the block size that is typically requested while doing a `seek()` on a file. -/// The Symphonia decoder requires this to be a power of 2 and > 32 kB. -/// Note: smaller requests can happen if part of the block is downloaded already. -pub const MINIMUM_DOWNLOAD_SIZE: usize = 64 * 1024; +#[derive(Clone)] +pub struct AudioFetchParams { + /// The minimum size of a block that is requested from the Spotify servers in one request. + /// This is the block size that is typically requested while doing a `seek()` on a file. + /// The Symphonia decoder requires this to be a power of 2 and > 32 kB. + /// Note: smaller requests can happen if part of the block is downloaded already. + pub minimum_download_size: usize, -/// The minimum network throughput that we expect. Together with the minimum download size, -/// this will determine the time we will wait for a response. -pub const MINIMUM_THROUGHPUT: usize = 8 * 1024; + /// The minimum network throughput that we expect. Together with the minimum download size, + /// this will determine the time we will wait for a response. + pub minimum_throughput: usize, -/// The ping time that is used for calculations before a ping time was actually measured. -pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500); + /// The ping time that is used for calculations before a ping time was actually measured. + pub initial_ping_time_estimate: Duration, -/// If the measured ping time to the Spotify server is larger than this value, it is capped -/// to avoid run-away block sizes and pre-fetching. -pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500); + /// If the measured ping time to the Spotify server is larger than this value, it is capped + /// to avoid run-away block sizes and pre-fetching. + pub maximum_assumed_ping_time: Duration, -/// Before playback starts, this many seconds of data must be present. -/// Note: the calculations are done using the nominal bitrate of the file. The actual amount -/// of audio data may be larger or smaller. -pub const READ_AHEAD_BEFORE_PLAYBACK: Duration = Duration::from_secs(1); + /// Before playback starts, this many seconds of data must be present. + /// Note: the calculations are done using the nominal bitrate of the file. The actual amount + /// of audio data may be larger or smaller. + pub read_ahead_before_playback: Duration, -/// While playing back, this many seconds of data ahead of the current read position are -/// requested. -/// Note: the calculations are done using the nominal bitrate of the file. The actual amount -/// of audio data may be larger or smaller. -pub const READ_AHEAD_DURING_PLAYBACK: Duration = Duration::from_secs(5); + /// While playing back, this many seconds of data ahead of the current read position are + /// requested. + /// Note: the calculations are done using the nominal bitrate of the file. The actual amount + /// of audio data may be larger or smaller. + pub read_ahead_during_playback: Duration, -/// If the amount of data that is pending (requested but not received) is less than a certain amount, -/// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more -/// data is calculated as ` < PREFETCH_THRESHOLD_FACTOR * * ` -pub const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0; + /// If the amount of data that is pending (requested but not received) is less than a certain amount, + /// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more + /// data is calculated as ` < PREFETCH_THRESHOLD_FACTOR * * ` + pub prefetch_threshold_factor: f32, -/// The time we will wait to obtain status updates on downloading. -pub const DOWNLOAD_TIMEOUT: Duration = - Duration::from_secs((MINIMUM_DOWNLOAD_SIZE / MINIMUM_THROUGHPUT) as u64); + /// The time we will wait to obtain status updates on downloading. + pub download_timeout: Duration, +} + +impl Default for AudioFetchParams { + fn default() -> Self { + let minimum_download_size = 64 * 1024; + let minimum_throughput = 8 * 1024; + Self { + minimum_download_size, + minimum_throughput, + initial_ping_time_estimate: Duration::from_millis(500), + maximum_assumed_ping_time: Duration::from_millis(1500), + read_ahead_before_playback: Duration::from_secs(1), + read_ahead_during_playback: Duration::from_secs(5), + prefetch_threshold_factor: 4.0, + download_timeout: Duration::from_secs( + (minimum_download_size / minimum_throughput) as u64, + ), + } + } +} + +static AUDIO_FETCH_PARAMS: OnceLock = OnceLock::new(); + +impl AudioFetchParams { + pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> { + AUDIO_FETCH_PARAMS.set(params) + } + + pub fn get() -> &'static AudioFetchParams { + AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default) + } +} pub enum AudioFile { Cached(fs::File), @@ -183,6 +216,7 @@ impl StreamLoaderController { if let Some(ref shared) = self.stream_shared { let mut download_status = shared.download_status.lock(); + let download_timeout = AudioFetchParams::get().download_timeout; while range.length > download_status @@ -191,7 +225,7 @@ impl StreamLoaderController { { if shared .cond - .wait_for(&mut download_status, DOWNLOAD_TIMEOUT) + .wait_for(&mut download_status, download_timeout) .timed_out() { return Err(AudioFileError::WaitTimeout.into()); @@ -297,7 +331,7 @@ impl AudioFileShared { if ping_time_ms > 0 { Duration::from_millis(ping_time_ms as u64) } else { - INITIAL_PING_TIME_ESTIMATE + AudioFetchParams::get().initial_ping_time_estimate } } @@ -395,6 +429,8 @@ impl AudioFileStreaming { trace!("Streaming from {}", url); } + let minimum_download_size = AudioFetchParams::get().minimum_download_size; + // When the audio file is really small, this `download_size` may turn out to be // larger than the audio file we're going to stream later on. This is OK; requesting // `Content-Range` > `Content-Length` will return the complete file with status code @@ -402,7 +438,7 @@ impl AudioFileStreaming { let mut streamer = session .spclient() - .stream_from_cdn(&cdn_url, 0, MINIMUM_DOWNLOAD_SIZE)?; + .stream_from_cdn(&cdn_url, 0, minimum_download_size)?; // Get the first chunk with the headers to get the file size. // The remainder of that chunk with possibly also a response body is then @@ -490,9 +526,10 @@ impl Read for AudioFileStreaming { return Ok(0); } + let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback; let length_to_request = if self.shared.is_download_streaming() { let length_to_request = length - + (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32) + + (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32) as usize; // Due to the read-ahead stuff, we potentially request more than the actual request demanded. @@ -515,11 +552,12 @@ impl Read for AudioFileStreaming { .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?; } + let download_timeout = AudioFetchParams::get().download_timeout; while !download_status.downloaded.contains(offset) { if self .shared .cond - .wait_for(&mut download_status, DOWNLOAD_TIMEOUT) + .wait_for(&mut download_status, download_timeout) .timed_out() { return Err(io::Error::new( diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index 39ad84c6..0b001113 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -16,9 +16,8 @@ use librespot_core::{http_client::HttpClient, session::Session, Error}; use crate::range_set::{Range, RangeSet}; use super::{ - AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, StreamingRequest, - MAXIMUM_ASSUMED_PING_TIME, MINIMUM_DOWNLOAD_SIZE, MINIMUM_THROUGHPUT, - PREFETCH_THRESHOLD_FACTOR, + AudioFetchParams, AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, + StreamingRequest, }; struct PartialFileData { @@ -151,6 +150,8 @@ struct AudioFileFetch { file_data_tx: mpsc::UnboundedSender, complete_tx: Option>, network_response_times: Vec, + + params: AudioFetchParams, } // Might be replaced by enum from std once stable @@ -166,8 +167,8 @@ impl AudioFileFetch { } fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult { - if length < MINIMUM_DOWNLOAD_SIZE { - length = MINIMUM_DOWNLOAD_SIZE; + if length < self.params.minimum_download_size { + length = self.params.minimum_download_size; } // If we are in streaming mode (so not seeking) then start downloading as large @@ -258,13 +259,13 @@ impl AudioFileFetch { fn handle_file_data(&mut self, data: ReceivedData) -> Result { match data { ReceivedData::Throughput(mut throughput) => { - if throughput < MINIMUM_THROUGHPUT { + if throughput < self.params.minimum_throughput { warn!( "Throughput {} kbps lower than minimum {}, setting to minimum", throughput / 1000, - MINIMUM_THROUGHPUT / 1000, + self.params.minimum_throughput / 1000, ); - throughput = MINIMUM_THROUGHPUT; + throughput = self.params.minimum_throughput; } let old_throughput = self.shared.throughput(); @@ -287,13 +288,13 @@ impl AudioFileFetch { self.shared.set_throughput(avg_throughput); } ReceivedData::ResponseTime(mut response_time) => { - if response_time > MAXIMUM_ASSUMED_PING_TIME { + if response_time > self.params.maximum_assumed_ping_time { warn!( "Time to first byte {} ms exceeds maximum {}, setting to maximum", response_time.as_millis(), - MAXIMUM_ASSUMED_PING_TIME.as_millis() + self.params.maximum_assumed_ping_time.as_millis() ); - response_time = MAXIMUM_ASSUMED_PING_TIME; + response_time = self.params.maximum_assumed_ping_time; } let old_ping_time_ms = self.shared.ping_time().as_millis(); @@ -423,6 +424,8 @@ pub(super) async fn audio_file_fetch( initial_request, )); + let params = AudioFetchParams::get(); + let mut fetch = AudioFileFetch { session: session.clone(), shared, @@ -431,6 +434,8 @@ pub(super) async fn audio_file_fetch( file_data_tx, complete_tx: Some(complete_tx), network_response_times: Vec::with_capacity(3), + + params: params.clone(), }; loop { @@ -472,7 +477,7 @@ pub(super) async fn audio_file_fetch( let throughput = fetch.shared.throughput(); let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR + (params.prefetch_threshold_factor * ping_time_seconds * fetch.shared.bytes_per_second as f32) as usize, (ping_time_seconds * throughput as f32) as usize, diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 2a53c361..99c41df2 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -7,5 +7,4 @@ mod fetch; mod range_set; pub use decrypt::AudioDecrypt; -pub use fetch::{AudioFile, AudioFileError, StreamLoaderController}; -pub use fetch::{MINIMUM_DOWNLOAD_SIZE, READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_DURING_PLAYBACK}; +pub use fetch::{AudioFetchParams, AudioFile, AudioFileError, StreamLoaderController}; diff --git a/playback/src/decoder/symphonia_decoder.rs b/playback/src/decoder/symphonia_decoder.rs index 2bf2517d..b6a8023e 100644 --- a/playback/src/decoder/symphonia_decoder.rs +++ b/playback/src/decoder/symphonia_decoder.rs @@ -36,7 +36,7 @@ impl SymphoniaDecoder { R: MediaSource + 'static, { let mss_opts = MediaSourceStreamOptions { - buffer_len: librespot_audio::MINIMUM_DOWNLOAD_SIZE, + buffer_len: librespot_audio::AudioFetchParams::get().minimum_download_size, }; let mss = MediaSourceStream::new(Box::new(input), mss_opts); diff --git a/playback/src/player.rs b/playback/src/player.rs index 457f1896..b55e1230 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -24,10 +24,7 @@ use symphonia::core::io::MediaSource; use tokio::sync::{mpsc, oneshot}; use crate::{ - audio::{ - AudioDecrypt, AudioFile, StreamLoaderController, READ_AHEAD_BEFORE_PLAYBACK, - READ_AHEAD_DURING_PLAYBACK, - }, + audio::{AudioDecrypt, AudioFetchParams, AudioFile, StreamLoaderController}, audio_backend::Sink, config::{Bitrate, NormalisationMethod, NormalisationType, PlayerConfig}, convert::Converter, @@ -2223,13 +2220,14 @@ impl PlayerInternal { .. } = self.state { + let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback; // Request our read ahead range let request_data_length = - (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize; + (read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize; // Request the part we want to wait for blocking. This effectively means we wait for the previous request to partially complete. let wait_for_data_length = - (READ_AHEAD_BEFORE_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize; + (read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize; stream_loader_controller .fetch_next_and_wait(request_data_length, wait_for_data_length)