Merge pull request #1234 from lelloman/lelloman/tunable-audio-fetch-params

Make audio fetch parameters tunable
This commit is contained in:
Roderick van Domburg 2023-12-24 13:11:38 +01:00 committed by GitHub
commit ccd1a72789
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 96 additions and 56 deletions

View file

@ -6,7 +6,7 @@ use std::{
io::{self, Read, Seek, SeekFrom}, io::{self, Read, Seek, SeekFrom},
sync::{ sync::{
atomic::{AtomicBool, AtomicUsize, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Arc, OnceLock,
}, },
time::Duration, time::Duration,
}; };
@ -55,42 +55,75 @@ impl From<AudioFileError> for Error {
} }
} }
#[derive(Clone)]
pub struct AudioFetchParams {
/// The minimum size of a block that is requested from the Spotify servers in one request. /// The minimum size of a block that is requested from the Spotify servers in one request.
/// This is the block size that is typically requested while doing a `seek()` on a file. /// This is the block size that is typically requested while doing a `seek()` on a file.
/// The Symphonia decoder requires this to be a power of 2 and > 32 kB. /// The Symphonia decoder requires this to be a power of 2 and > 32 kB.
/// Note: smaller requests can happen if part of the block is downloaded already. /// Note: smaller requests can happen if part of the block is downloaded already.
pub const MINIMUM_DOWNLOAD_SIZE: usize = 64 * 1024; pub minimum_download_size: usize,
/// The minimum network throughput that we expect. Together with the minimum download size, /// The minimum network throughput that we expect. Together with the minimum download size,
/// this will determine the time we will wait for a response. /// this will determine the time we will wait for a response.
pub const MINIMUM_THROUGHPUT: usize = 8 * 1024; pub minimum_throughput: usize,
/// The ping time that is used for calculations before a ping time was actually measured. /// The ping time that is used for calculations before a ping time was actually measured.
pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500); pub initial_ping_time_estimate: Duration,
/// If the measured ping time to the Spotify server is larger than this value, it is capped /// If the measured ping time to the Spotify server is larger than this value, it is capped
/// to avoid run-away block sizes and pre-fetching. /// to avoid run-away block sizes and pre-fetching.
pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500); pub maximum_assumed_ping_time: Duration,
/// Before playback starts, this many seconds of data must be present. /// Before playback starts, this many seconds of data must be present.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount /// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller. /// of audio data may be larger or smaller.
pub const READ_AHEAD_BEFORE_PLAYBACK: Duration = Duration::from_secs(1); pub read_ahead_before_playback: Duration,
/// While playing back, this many seconds of data ahead of the current read position are /// While playing back, this many seconds of data ahead of the current read position are
/// requested. /// requested.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount /// Note: the calculations are done using the nominal bitrate of the file. The actual amount
/// of audio data may be larger or smaller. /// of audio data may be larger or smaller.
pub const READ_AHEAD_DURING_PLAYBACK: Duration = Duration::from_secs(5); pub read_ahead_during_playback: Duration,
/// If the amount of data that is pending (requested but not received) is less than a certain amount, /// If the amount of data that is pending (requested but not received) is less than a certain amount,
/// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more /// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
/// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>` /// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>`
pub const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0; pub prefetch_threshold_factor: f32,
/// The time we will wait to obtain status updates on downloading. /// The time we will wait to obtain status updates on downloading.
pub const DOWNLOAD_TIMEOUT: Duration = pub download_timeout: Duration,
Duration::from_secs((MINIMUM_DOWNLOAD_SIZE / MINIMUM_THROUGHPUT) as u64); }
impl Default for AudioFetchParams {
fn default() -> Self {
let minimum_download_size = 64 * 1024;
let minimum_throughput = 8 * 1024;
Self {
minimum_download_size,
minimum_throughput,
initial_ping_time_estimate: Duration::from_millis(500),
maximum_assumed_ping_time: Duration::from_millis(1500),
read_ahead_before_playback: Duration::from_secs(1),
read_ahead_during_playback: Duration::from_secs(5),
prefetch_threshold_factor: 4.0,
download_timeout: Duration::from_secs(
(minimum_download_size / minimum_throughput) as u64,
),
}
}
}
static AUDIO_FETCH_PARAMS: OnceLock<AudioFetchParams> = OnceLock::new();
impl AudioFetchParams {
pub fn set(params: AudioFetchParams) -> Result<(), AudioFetchParams> {
AUDIO_FETCH_PARAMS.set(params)
}
pub fn get() -> &'static AudioFetchParams {
AUDIO_FETCH_PARAMS.get_or_init(AudioFetchParams::default)
}
}
pub enum AudioFile { pub enum AudioFile {
Cached(fs::File), Cached(fs::File),
@ -183,6 +216,7 @@ impl StreamLoaderController {
if let Some(ref shared) = self.stream_shared { if let Some(ref shared) = self.stream_shared {
let mut download_status = shared.download_status.lock(); let mut download_status = shared.download_status.lock();
let download_timeout = AudioFetchParams::get().download_timeout;
while range.length while range.length
> download_status > download_status
@ -191,7 +225,7 @@ impl StreamLoaderController {
{ {
if shared if shared
.cond .cond
.wait_for(&mut download_status, DOWNLOAD_TIMEOUT) .wait_for(&mut download_status, download_timeout)
.timed_out() .timed_out()
{ {
return Err(AudioFileError::WaitTimeout.into()); return Err(AudioFileError::WaitTimeout.into());
@ -297,7 +331,7 @@ impl AudioFileShared {
if ping_time_ms > 0 { if ping_time_ms > 0 {
Duration::from_millis(ping_time_ms as u64) Duration::from_millis(ping_time_ms as u64)
} else { } else {
INITIAL_PING_TIME_ESTIMATE AudioFetchParams::get().initial_ping_time_estimate
} }
} }
@ -395,6 +429,8 @@ impl AudioFileStreaming {
trace!("Streaming from {}", url); trace!("Streaming from {}", url);
} }
let minimum_download_size = AudioFetchParams::get().minimum_download_size;
// When the audio file is really small, this `download_size` may turn out to be // When the audio file is really small, this `download_size` may turn out to be
// larger than the audio file we're going to stream later on. This is OK; requesting // larger than the audio file we're going to stream later on. This is OK; requesting
// `Content-Range` > `Content-Length` will return the complete file with status code // `Content-Range` > `Content-Length` will return the complete file with status code
@ -402,7 +438,7 @@ impl AudioFileStreaming {
let mut streamer = let mut streamer =
session session
.spclient() .spclient()
.stream_from_cdn(&cdn_url, 0, MINIMUM_DOWNLOAD_SIZE)?; .stream_from_cdn(&cdn_url, 0, minimum_download_size)?;
// Get the first chunk with the headers to get the file size. // Get the first chunk with the headers to get the file size.
// The remainder of that chunk with possibly also a response body is then // The remainder of that chunk with possibly also a response body is then
@ -490,9 +526,10 @@ impl Read for AudioFileStreaming {
return Ok(0); return Ok(0);
} }
let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
let length_to_request = if self.shared.is_download_streaming() { let length_to_request = if self.shared.is_download_streaming() {
let length_to_request = length let length_to_request = length
+ (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32) + (read_ahead_during_playback.as_secs_f32() * self.shared.bytes_per_second as f32)
as usize; as usize;
// Due to the read-ahead stuff, we potentially request more than the actual request demanded. // Due to the read-ahead stuff, we potentially request more than the actual request demanded.
@ -515,11 +552,12 @@ impl Read for AudioFileStreaming {
.map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?; .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
} }
let download_timeout = AudioFetchParams::get().download_timeout;
while !download_status.downloaded.contains(offset) { while !download_status.downloaded.contains(offset) {
if self if self
.shared .shared
.cond .cond
.wait_for(&mut download_status, DOWNLOAD_TIMEOUT) .wait_for(&mut download_status, download_timeout)
.timed_out() .timed_out()
{ {
return Err(io::Error::new( return Err(io::Error::new(

View file

@ -16,9 +16,8 @@ use librespot_core::{http_client::HttpClient, session::Session, Error};
use crate::range_set::{Range, RangeSet}; use crate::range_set::{Range, RangeSet};
use super::{ use super::{
AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, StreamingRequest, AudioFetchParams, AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand,
MAXIMUM_ASSUMED_PING_TIME, MINIMUM_DOWNLOAD_SIZE, MINIMUM_THROUGHPUT, StreamingRequest,
PREFETCH_THRESHOLD_FACTOR,
}; };
struct PartialFileData { struct PartialFileData {
@ -151,6 +150,8 @@ struct AudioFileFetch {
file_data_tx: mpsc::UnboundedSender<ReceivedData>, file_data_tx: mpsc::UnboundedSender<ReceivedData>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>, complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times: Vec<Duration>, network_response_times: Vec<Duration>,
params: AudioFetchParams,
} }
// Might be replaced by enum from std once stable // Might be replaced by enum from std once stable
@ -166,8 +167,8 @@ impl AudioFileFetch {
} }
fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult { fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult {
if length < MINIMUM_DOWNLOAD_SIZE { if length < self.params.minimum_download_size {
length = MINIMUM_DOWNLOAD_SIZE; length = self.params.minimum_download_size;
} }
// If we are in streaming mode (so not seeking) then start downloading as large // If we are in streaming mode (so not seeking) then start downloading as large
@ -258,13 +259,13 @@ impl AudioFileFetch {
fn handle_file_data(&mut self, data: ReceivedData) -> Result<ControlFlow, Error> { fn handle_file_data(&mut self, data: ReceivedData) -> Result<ControlFlow, Error> {
match data { match data {
ReceivedData::Throughput(mut throughput) => { ReceivedData::Throughput(mut throughput) => {
if throughput < MINIMUM_THROUGHPUT { if throughput < self.params.minimum_throughput {
warn!( warn!(
"Throughput {} kbps lower than minimum {}, setting to minimum", "Throughput {} kbps lower than minimum {}, setting to minimum",
throughput / 1000, throughput / 1000,
MINIMUM_THROUGHPUT / 1000, self.params.minimum_throughput / 1000,
); );
throughput = MINIMUM_THROUGHPUT; throughput = self.params.minimum_throughput;
} }
let old_throughput = self.shared.throughput(); let old_throughput = self.shared.throughput();
@ -287,13 +288,13 @@ impl AudioFileFetch {
self.shared.set_throughput(avg_throughput); self.shared.set_throughput(avg_throughput);
} }
ReceivedData::ResponseTime(mut response_time) => { ReceivedData::ResponseTime(mut response_time) => {
if response_time > MAXIMUM_ASSUMED_PING_TIME { if response_time > self.params.maximum_assumed_ping_time {
warn!( warn!(
"Time to first byte {} ms exceeds maximum {}, setting to maximum", "Time to first byte {} ms exceeds maximum {}, setting to maximum",
response_time.as_millis(), response_time.as_millis(),
MAXIMUM_ASSUMED_PING_TIME.as_millis() self.params.maximum_assumed_ping_time.as_millis()
); );
response_time = MAXIMUM_ASSUMED_PING_TIME; response_time = self.params.maximum_assumed_ping_time;
} }
let old_ping_time_ms = self.shared.ping_time().as_millis(); let old_ping_time_ms = self.shared.ping_time().as_millis();
@ -423,6 +424,8 @@ pub(super) async fn audio_file_fetch(
initial_request, initial_request,
)); ));
let params = AudioFetchParams::get();
let mut fetch = AudioFileFetch { let mut fetch = AudioFileFetch {
session: session.clone(), session: session.clone(),
shared, shared,
@ -431,6 +434,8 @@ pub(super) async fn audio_file_fetch(
file_data_tx, file_data_tx,
complete_tx: Some(complete_tx), complete_tx: Some(complete_tx),
network_response_times: Vec::with_capacity(3), network_response_times: Vec::with_capacity(3),
params: params.clone(),
}; };
loop { loop {
@ -472,7 +477,7 @@ pub(super) async fn audio_file_fetch(
let throughput = fetch.shared.throughput(); let throughput = fetch.shared.throughput();
let desired_pending_bytes = max( let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR (params.prefetch_threshold_factor
* ping_time_seconds * ping_time_seconds
* fetch.shared.bytes_per_second as f32) as usize, * fetch.shared.bytes_per_second as f32) as usize,
(ping_time_seconds * throughput as f32) as usize, (ping_time_seconds * throughput as f32) as usize,

View file

@ -7,5 +7,4 @@ mod fetch;
mod range_set; mod range_set;
pub use decrypt::AudioDecrypt; pub use decrypt::AudioDecrypt;
pub use fetch::{AudioFile, AudioFileError, StreamLoaderController}; pub use fetch::{AudioFetchParams, AudioFile, AudioFileError, StreamLoaderController};
pub use fetch::{MINIMUM_DOWNLOAD_SIZE, READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_DURING_PLAYBACK};

View file

@ -36,7 +36,7 @@ impl SymphoniaDecoder {
R: MediaSource + 'static, R: MediaSource + 'static,
{ {
let mss_opts = MediaSourceStreamOptions { let mss_opts = MediaSourceStreamOptions {
buffer_len: librespot_audio::MINIMUM_DOWNLOAD_SIZE, buffer_len: librespot_audio::AudioFetchParams::get().minimum_download_size,
}; };
let mss = MediaSourceStream::new(Box::new(input), mss_opts); let mss = MediaSourceStream::new(Box::new(input), mss_opts);

View file

@ -24,10 +24,7 @@ use symphonia::core::io::MediaSource;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::{ use crate::{
audio::{ audio::{AudioDecrypt, AudioFetchParams, AudioFile, StreamLoaderController},
AudioDecrypt, AudioFile, StreamLoaderController, READ_AHEAD_BEFORE_PLAYBACK,
READ_AHEAD_DURING_PLAYBACK,
},
audio_backend::Sink, audio_backend::Sink,
config::{Bitrate, NormalisationMethod, NormalisationType, PlayerConfig}, config::{Bitrate, NormalisationMethod, NormalisationType, PlayerConfig},
convert::Converter, convert::Converter,
@ -2223,13 +2220,14 @@ impl PlayerInternal {
.. ..
} = self.state } = self.state
{ {
let read_ahead_during_playback = AudioFetchParams::get().read_ahead_during_playback;
// Request our read ahead range // Request our read ahead range
let request_data_length = let request_data_length =
(READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize; (read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize;
// Request the part we want to wait for blocking. This effectively means we wait for the previous request to partially complete. // Request the part we want to wait for blocking. This effectively means we wait for the previous request to partially complete.
let wait_for_data_length = let wait_for_data_length =
(READ_AHEAD_BEFORE_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize; (read_ahead_during_playback.as_secs_f32() * bytes_per_second as f32) as usize;
stream_loader_controller stream_loader_controller
.fetch_next_and_wait(request_data_length, wait_for_data_length) .fetch_next_and_wait(request_data_length, wait_for_data_length)