From eb1472c71337c58f8b28d99f2bfd714979e5a7ab Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Wed, 28 Sep 2022 21:25:56 +0200 Subject: [PATCH] Various loading improvements - Improve responsiveness by downloading the smallest possible chunk size when seeking or first loading. - Improve download time and decrease CPU usage by downloading the largest possible chunk size as throughput allows, still allowing for reasonable seek responsiveness (~1 second). - As a result, take refactoring opportunities: simplify prefetching logic, download threading, command sending, and some ergonomics. - Fix disappearing controls in the Spotify mobile UI while loading. - Fix handling of seek, pause, and play commands while loading. - Fix download rate calculation (don't use the Mercury rate). - Fix ping time calculation under lock contention. --- audio/src/fetch/mod.rs | 185 +++++++++++--------------- audio/src/fetch/receive.rs | 262 ++++++++++++++++++++----------------- audio/src/lib.rs | 5 +- connect/src/spirc.rs | 18 ++- core/src/channel.rs | 10 +- core/src/error.rs | 22 +++- playback/src/player.rs | 82 +++++++----- 7 files changed, 305 insertions(+), 279 deletions(-) diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index 30b8d859..e343ee1f 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -1,14 +1,14 @@ mod receive; use std::{ - cmp::{max, min}, + cmp::min, fs, io::{self, Read, Seek, SeekFrom}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, - time::{Duration, Instant}, + time::Duration, }; use futures_util::{future::IntoStream, StreamExt, TryFutureExt}; @@ -16,7 +16,7 @@ use hyper::{client::ResponseFuture, header::CONTENT_RANGE, Body, Response, Statu use parking_lot::{Condvar, Mutex}; use tempfile::NamedTempFile; use thiserror::Error; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, Semaphore}; use librespot_core::{cdn_url::CdnUrl, Error, FileId, Session}; @@ -59,17 +59,11 @@ impl From for Error { /// This is the block size that is typically requested while doing a `seek()` on a file. /// The Symphonia decoder requires this to be a power of 2 and > 32 kB. /// Note: smaller requests can happen if part of the block is downloaded already. -pub const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 128; +pub const MINIMUM_DOWNLOAD_SIZE: usize = 64 * 1024; /// The minimum network throughput that we expect. Together with the minimum download size, /// this will determine the time we will wait for a response. -pub const MINIMUM_THROUGHPUT: usize = 8192; - -/// The amount of data that is requested when initially opening a file. -/// Note: if the file is opened to play from the beginning, the amount of data to -/// read ahead is requested in addition to this amount. If the file is opened to seek to -/// another position, then only this amount is requested on the first request. -pub const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 8; +pub const MINIMUM_THROUGHPUT: usize = 8 * 1024; /// The ping time that is used for calculations before a ping time was actually measured. pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500); @@ -83,45 +77,17 @@ pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500); /// of audio data may be larger or smaller. pub const READ_AHEAD_BEFORE_PLAYBACK: Duration = Duration::from_secs(1); -/// Same as `READ_AHEAD_BEFORE_PLAYBACK`, but the time is taken as a factor of the ping -/// time to the Spotify server. Both `READ_AHEAD_BEFORE_PLAYBACK` and -/// `READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS` are obeyed. -/// Note: the calculations are done using the nominal bitrate of the file. The actual amount -/// of audio data may be larger or smaller. -pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f32 = 2.0; - /// While playing back, this many seconds of data ahead of the current read position are /// requested. /// Note: the calculations are done using the nominal bitrate of the file. The actual amount /// of audio data may be larger or smaller. pub const READ_AHEAD_DURING_PLAYBACK: Duration = Duration::from_secs(5); -/// Same as `READ_AHEAD_DURING_PLAYBACK`, but the time is taken as a factor of the ping -/// time to the Spotify server. -/// Note: the calculations are done using the nominal bitrate of the file. The actual amount -/// of audio data may be larger or smaller. -pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f32 = 10.0; - /// If the amount of data that is pending (requested but not received) is less than a certain amount, /// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more /// data is calculated as ` < PREFETCH_THRESHOLD_FACTOR * * ` pub const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0; -/// Similar to `PREFETCH_THRESHOLD_FACTOR`, but it also takes the current download rate into account. -/// The formula used is ` < FAST_PREFETCH_THRESHOLD_FACTOR * * ` -/// This mechanism allows for fast downloading of the remainder of the file. The number should be larger -/// than `1.0` so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster -/// the download rate ramps up. However, this comes at the cost that it might hurt ping time if a seek is -/// performed while downloading. Values smaller than `1.0` cause the download rate to collapse and effectively -/// only `PREFETCH_THRESHOLD_FACTOR` is in effect. Thus, set to `0.0` if bandwidth saturation is not wanted. -pub const FAST_PREFETCH_THRESHOLD_FACTOR: f32 = 1.5; - -/// Limit the number of requests that are pending simultaneously before pre-fetching data. Pending -/// requests share bandwidth. Thus, having too many requests can lead to the one that is needed next -/// for playback to be delayed leading to a buffer underrun. This limit has the effect that a new -/// pre-fetch request is only sent if less than `MAX_PREFETCH_REQUESTS` are pending. -pub const MAX_PREFETCH_REQUESTS: usize = 4; - /// The time we will wait to obtain status updates on downloading. pub const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs((MINIMUM_DOWNLOAD_SIZE / MINIMUM_THROUGHPUT) as u64); @@ -137,15 +103,12 @@ pub struct StreamingRequest { initial_response: Option>, offset: usize, length: usize, - request_time: Instant, } #[derive(Debug)] pub enum StreamLoaderCommand { - Fetch(Range), // signal the stream loader to fetch a range of the file - RandomAccessMode, // optimise download strategy for random access - StreamMode, // optimise download strategy for streaming - Close, // terminate and don't load any more data + Fetch(Range), // signal the stream loader to fetch a range of the file + Close, // terminate and don't load any more data } #[derive(Clone)] @@ -182,17 +145,15 @@ impl StreamLoaderController { pub fn range_to_end_available(&self) -> bool { match self.stream_shared { Some(ref shared) => { - let read_position = shared.read_position.load(Ordering::Acquire); + let read_position = shared.read_position(); self.range_available(Range::new(read_position, self.len() - read_position)) } None => true, } } - pub fn ping_time(&self) -> Duration { - Duration::from_millis(self.stream_shared.as_ref().map_or(0, |shared| { - shared.ping_time_ms.load(Ordering::Relaxed) as u64 - })) + pub fn ping_time(&self) -> Option { + self.stream_shared.as_ref().map(|shared| shared.ping_time()) } fn send_stream_loader_command(&self, command: StreamLoaderCommand) { @@ -252,31 +213,6 @@ impl StreamLoaderController { Ok(()) } - #[allow(dead_code)] - pub fn fetch_next(&self, length: usize) { - if let Some(ref shared) = self.stream_shared { - let range = Range { - start: shared.read_position.load(Ordering::Acquire), - length, - }; - self.fetch(range); - } - } - - #[allow(dead_code)] - pub fn fetch_next_blocking(&self, length: usize) -> AudioFileResult { - match self.stream_shared { - Some(ref shared) => { - let range = Range { - start: shared.read_position.load(Ordering::Acquire), - length, - }; - self.fetch_blocking(range) - } - None => Ok(()), - } - } - pub fn fetch_next_and_wait( &self, request_length: usize, @@ -284,7 +220,7 @@ impl StreamLoaderController { ) -> AudioFileResult { match self.stream_shared { Some(ref shared) => { - let start = shared.read_position.load(Ordering::Acquire); + let start = shared.read_position(); let request_range = Range { start, @@ -304,12 +240,16 @@ impl StreamLoaderController { pub fn set_random_access_mode(&self) { // optimise download strategy for random access - self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode); + if let Some(ref shared) = self.stream_shared { + shared.set_download_streaming(false) + } } pub fn set_stream_mode(&self) { // optimise download strategy for streaming - self.send_stream_loader_command(StreamLoaderCommand::StreamMode); + if let Some(ref shared) = self.stream_shared { + shared.set_download_streaming(true) + } } pub fn close(&self) { @@ -337,9 +277,51 @@ struct AudioFileShared { cond: Condvar, download_status: Mutex, download_streaming: AtomicBool, - number_of_open_requests: AtomicUsize, + download_slots: Semaphore, ping_time_ms: AtomicUsize, read_position: AtomicUsize, + throughput: AtomicUsize, +} + +impl AudioFileShared { + fn is_download_streaming(&self) -> bool { + self.download_streaming.load(Ordering::Acquire) + } + + fn set_download_streaming(&self, streaming: bool) { + self.download_streaming.store(streaming, Ordering::Release) + } + + fn ping_time(&self) -> Duration { + let ping_time_ms = self.ping_time_ms.load(Ordering::Acquire); + if ping_time_ms > 0 { + Duration::from_millis(ping_time_ms as u64) + } else { + INITIAL_PING_TIME_ESTIMATE + } + } + + fn set_ping_time(&self, duration: Duration) { + self.ping_time_ms + .store(duration.as_millis() as usize, Ordering::Release) + } + + fn throughput(&self) -> usize { + self.throughput.load(Ordering::Acquire) + } + + fn set_throughput(&self, throughput: usize) { + self.throughput.store(throughput, Ordering::Release) + } + + fn read_position(&self) -> usize { + self.read_position.load(Ordering::Acquire) + } + + fn set_read_position(&self, position: u64) { + self.read_position + .store(position as usize, Ordering::Release) + } } impl AudioFile { @@ -420,12 +402,11 @@ impl AudioFileStreaming { let mut streamer = session .spclient() - .stream_from_cdn(&cdn_url, 0, INITIAL_DOWNLOAD_SIZE)?; + .stream_from_cdn(&cdn_url, 0, MINIMUM_DOWNLOAD_SIZE)?; // Get the first chunk with the headers to get the file size. // The remainder of that chunk with possibly also a response body is then // further processed in `audio_file_fetch`. - let request_time = Instant::now(); let response = streamer.next().await.ok_or(AudioFileError::NoData)??; let code = response.status(); @@ -452,7 +433,6 @@ impl AudioFileStreaming { initial_response: Some(response), offset: 0, length: upper_bound + 1, - request_time, }; let shared = Arc::new(AudioFileShared { @@ -464,10 +444,11 @@ impl AudioFileStreaming { requested: RangeSet::new(), downloaded: RangeSet::new(), }), - download_streaming: AtomicBool::new(true), - number_of_open_requests: AtomicUsize::new(0), - ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize), + download_streaming: AtomicBool::new(false), + download_slots: Semaphore::new(1), + ping_time_ms: AtomicUsize::new(0), read_position: AtomicUsize::new(0), + throughput: AtomicUsize::new(0), }); let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?; @@ -509,20 +490,12 @@ impl Read for AudioFileStreaming { return Ok(0); } - let length_to_request = if self.shared.download_streaming.load(Ordering::Acquire) { - // Due to the read-ahead stuff, we potentially request more than the actual request demanded. - let ping_time_seconds = - Duration::from_millis(self.shared.ping_time_ms.load(Ordering::Relaxed) as u64) - .as_secs_f32(); - + let length_to_request = if self.shared.is_download_streaming() { let length_to_request = length - + max( - (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32) - as usize, - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS - * ping_time_seconds - * self.shared.bytes_per_second as f32) as usize, - ); + + (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32) + as usize; + + // Due to the read-ahead stuff, we potentially request more than the actual request demanded. min(length_to_request, self.shared.file_size - offset) } else { length @@ -566,9 +539,7 @@ impl Read for AudioFileStreaming { let read_len = self.read_file.read(&mut output[..read_len])?; self.position += read_len as u64; - self.shared - .read_position - .store(self.position as usize, Ordering::Release); + self.shared.set_read_position(self.position); Ok(read_len) } @@ -601,23 +572,17 @@ impl Seek for AudioFileStreaming { // Ensure random access mode if we need to download this part. // Checking whether we are streaming now is a micro-optimization // to save an atomic load. - was_streaming = self.shared.download_streaming.load(Ordering::Acquire); + was_streaming = self.shared.is_download_streaming(); if was_streaming { - self.shared - .download_streaming - .store(false, Ordering::Release); + self.shared.set_download_streaming(false); } } self.position = self.read_file.seek(pos)?; - self.shared - .read_position - .store(self.position as usize, Ordering::Release); + self.shared.set_read_position(self.position); if !available && was_streaming { - self.shared - .download_streaming - .store(true, Ordering::Release); + self.shared.set_download_streaming(true); } Ok(self.position) diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index 2c58fbf8..d090d547 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -1,7 +1,7 @@ use std::{ cmp::{max, min}, io::{Seek, SeekFrom, Write}, - sync::{atomic::Ordering, Arc}, + sync::Arc, time::{Duration, Instant}, }; @@ -17,8 +17,8 @@ use crate::range_set::{Range, RangeSet}; use super::{ AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, StreamingRequest, - FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS, - MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR, + MAXIMUM_ASSUMED_PING_TIME, MINIMUM_DOWNLOAD_SIZE, MINIMUM_THROUGHPUT, + PREFETCH_THRESHOLD_FACTOR, }; struct PartialFileData { @@ -27,10 +27,13 @@ struct PartialFileData { } enum ReceivedData { + Throughput(usize), ResponseTime(Duration), Data(PartialFileData), } +const ONE_SECOND: Duration = Duration::from_secs(1); + async fn receive_data( shared: Arc, file_data_tx: mpsc::UnboundedSender, @@ -39,15 +42,21 @@ async fn receive_data( let mut offset = request.offset; let mut actual_length = 0; - let old_number_of_request = shared - .number_of_open_requests - .fetch_add(1, Ordering::SeqCst); + let permit = shared.download_slots.acquire().await?; - let mut measure_ping_time = old_number_of_request == 0; + let request_time = Instant::now(); + let mut measure_ping_time = true; + let mut measure_throughput = true; let result: Result<_, Error> = loop { let response = match request.initial_response.take() { - Some(data) => data, + Some(data) => { + // the request was already made outside of this function + measure_ping_time = false; + measure_throughput = false; + + data + } None => match request.streamer.next().await { Some(Ok(response)) => response, Some(Err(e)) => break Err(e.into()), @@ -62,6 +71,15 @@ async fn receive_data( }, }; + if measure_ping_time { + let duration = Instant::now().duration_since(request_time); + // may be zero if we are handling an initial response + if duration.as_millis() > 0 { + file_data_tx.send(ReceivedData::ResponseTime(duration))?; + measure_ping_time = false; + } + } + let code = response.status(); if code != StatusCode::PARTIAL_CONTENT { if code == StatusCode::TOO_MANY_REQUESTS { @@ -90,24 +108,18 @@ async fn receive_data( actual_length += data_size; offset += data_size; - - if measure_ping_time { - let mut duration = Instant::now() - request.request_time; - if duration > MAXIMUM_ASSUMED_PING_TIME { - warn!( - "Ping time {} ms exceeds maximum {}, setting to maximum", - duration.as_millis(), - MAXIMUM_ASSUMED_PING_TIME.as_millis() - ); - duration = MAXIMUM_ASSUMED_PING_TIME; - } - file_data_tx.send(ReceivedData::ResponseTime(duration))?; - measure_ping_time = false; - } }; drop(request.streamer); + if measure_throughput { + let duration = Instant::now().duration_since(request_time).as_millis(); + if actual_length > 0 && duration > 0 { + let throughput = ONE_SECOND.as_millis() as usize * actual_length / duration as usize; + file_data_tx.send(ReceivedData::Throughput(throughput))?; + } + } + let bytes_remaining = request.length - actual_length; if bytes_remaining > 0 { { @@ -118,9 +130,7 @@ async fn receive_data( } } - shared - .number_of_open_requests - .fetch_sub(1, Ordering::SeqCst); + drop(permit); if let Err(e) = result { error!( @@ -151,8 +161,8 @@ enum ControlFlow { } impl AudioFileFetch { - fn is_download_streaming(&self) -> bool { - self.shared.download_streaming.load(Ordering::Acquire) + fn has_download_slots_available(&self) -> bool { + self.shared.download_slots.available_permits() > 0 } fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult { @@ -160,10 +170,17 @@ impl AudioFileFetch { length = MINIMUM_DOWNLOAD_SIZE; } + // If we are in streaming mode (so not seeking) then start downloading as large + // of chunks as possible for better throughput and improved CPU usage, while + // still being reasonably responsive (~1 second) in case we want to seek. + if self.shared.is_download_streaming() { + let throughput = self.shared.throughput(); + length = max(length, throughput); + } + if offset + length > self.shared.file_size { length = self.shared.file_size - offset; } - let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length)); @@ -191,7 +208,6 @@ impl AudioFileFetch { initial_response: None, offset: range.start, length: range.length, - request_time: Instant::now(), }; self.session.spawn(receive_data( @@ -204,51 +220,36 @@ impl AudioFileFetch { Ok(()) } - fn pre_fetch_more_data( - &mut self, - bytes: usize, - max_requests_to_send: usize, - ) -> AudioFileResult { - let mut bytes_to_go = bytes; - let mut requests_to_go = max_requests_to_send; + fn pre_fetch_more_data(&mut self, bytes: usize) -> AudioFileResult { + // determine what is still missing + let mut missing_data = RangeSet::new(); + missing_data.add_range(&Range::new(0, self.shared.file_size)); + { + let download_status = self.shared.download_status.lock(); + missing_data.subtract_range_set(&download_status.downloaded); + missing_data.subtract_range_set(&download_status.requested); + } - while bytes_to_go > 0 && requests_to_go > 0 { - // determine what is still missing - let mut missing_data = RangeSet::new(); - missing_data.add_range(&Range::new(0, self.shared.file_size)); - { - let download_status = self.shared.download_status.lock(); - missing_data.subtract_range_set(&download_status.downloaded); - missing_data.subtract_range_set(&download_status.requested); - } + // download data from after the current read position first + let mut tail_end = RangeSet::new(); + let read_position = self.shared.read_position(); + tail_end.add_range(&Range::new( + read_position, + self.shared.file_size - read_position, + )); + let tail_end = tail_end.intersection(&missing_data); - // download data from after the current read position first - let mut tail_end = RangeSet::new(); - let read_position = self.shared.read_position.load(Ordering::Acquire); - tail_end.add_range(&Range::new( - read_position, - self.shared.file_size - read_position, - )); - let tail_end = tail_end.intersection(&missing_data); - - if !tail_end.is_empty() { - let range = tail_end.get_range(0); - let offset = range.start; - let length = min(range.length, bytes_to_go); - self.download_range(offset, length)?; - requests_to_go -= 1; - bytes_to_go -= length; - } else if !missing_data.is_empty() { - // ok, the tail is downloaded, download something fom the beginning. - let range = missing_data.get_range(0); - let offset = range.start; - let length = min(range.length, bytes_to_go); - self.download_range(offset, length)?; - requests_to_go -= 1; - bytes_to_go -= length; - } else { - break; - } + if !tail_end.is_empty() { + let range = tail_end.get_range(0); + let offset = range.start; + let length = min(range.length, bytes); + self.download_range(offset, length)?; + } else if !missing_data.is_empty() { + // ok, the tail is downloaded, download something fom the beginning. + let range = missing_data.get_range(0); + let offset = range.start; + let length = min(range.length, bytes); + self.download_range(offset, length)?; } Ok(()) @@ -256,8 +257,46 @@ impl AudioFileFetch { fn handle_file_data(&mut self, data: ReceivedData) -> Result { match data { - ReceivedData::ResponseTime(response_time) => { - let old_ping_time_ms = self.shared.ping_time_ms.load(Ordering::Relaxed); + ReceivedData::Throughput(mut throughput) => { + if throughput < MINIMUM_THROUGHPUT { + warn!( + "Throughput {} kbps lower than minimum {}, setting to minimum", + throughput / 1000, + MINIMUM_THROUGHPUT / 1000, + ); + throughput = MINIMUM_THROUGHPUT; + } + + let old_throughput = self.shared.throughput(); + let avg_throughput = if old_throughput > 0 { + (old_throughput + throughput) / 2 + } else { + throughput + }; + + // print when the new estimate deviates by more than 10% from the last + if f32::abs((avg_throughput as f32 - old_throughput as f32) / old_throughput as f32) + > 0.1 + { + trace!( + "Throughput now estimated as: {} kbps", + avg_throughput / 1000 + ); + } + + self.shared.set_throughput(avg_throughput); + } + ReceivedData::ResponseTime(mut response_time) => { + if response_time > MAXIMUM_ASSUMED_PING_TIME { + warn!( + "Time to first byte {} ms exceeds maximum {}, setting to maximum", + response_time.as_millis(), + MAXIMUM_ASSUMED_PING_TIME.as_millis() + ); + response_time = MAXIMUM_ASSUMED_PING_TIME; + } + + let old_ping_time_ms = self.shared.ping_time().as_millis(); // prune old response times. Keep at most two so we can push a third. while self.network_response_times.len() >= 3 { @@ -268,8 +307,8 @@ impl AudioFileFetch { self.network_response_times.push(response_time); // stats::median is experimental. So we calculate the median of up to three ourselves. - let ping_time_ms = { - let response_time = match self.network_response_times.len() { + let ping_time = { + match self.network_response_times.len() { 1 => self.network_response_times[0], 2 => (self.network_response_times[0] + self.network_response_times[1]) / 2, 3 => { @@ -278,22 +317,23 @@ impl AudioFileFetch { times[1] } _ => unreachable!(), - }; - response_time.as_millis() as usize + } }; // print when the new estimate deviates by more than 10% from the last if f32::abs( - (ping_time_ms as f32 - old_ping_time_ms as f32) / old_ping_time_ms as f32, + (ping_time.as_millis() as f32 - old_ping_time_ms as f32) + / old_ping_time_ms as f32, ) > 0.1 { - debug!("Ping time now estimated as: {} ms", ping_time_ms); + trace!( + "Time to first byte now estimated as: {} ms", + ping_time.as_millis() + ); } // store our new estimate for everyone to see - self.shared - .ping_time_ms - .store(ping_time_ms, Ordering::Relaxed); + self.shared.set_ping_time(ping_time); } ReceivedData::Data(data) => { match self.output.as_mut() { @@ -333,14 +373,6 @@ impl AudioFileFetch { StreamLoaderCommand::Fetch(request) => { self.download_range(request.start, request.length)? } - StreamLoaderCommand::RandomAccessMode => self - .shared - .download_streaming - .store(false, Ordering::Release), - StreamLoaderCommand::StreamMode => self - .shared - .download_streaming - .store(true, Ordering::Release), StreamLoaderCommand::Close => return Ok(ControlFlow::Break), } @@ -426,40 +458,28 @@ pub(super) async fn audio_file_fetch( else => (), } - if fetch.is_download_streaming() { - let number_of_open_requests = - fetch.shared.number_of_open_requests.load(Ordering::SeqCst); - if number_of_open_requests < MAX_PREFETCH_REQUESTS { - let max_requests_to_send = MAX_PREFETCH_REQUESTS - number_of_open_requests; + if fetch.shared.is_download_streaming() && fetch.has_download_slots_available() { + let bytes_pending: usize = { + let download_status = fetch.shared.download_status.lock(); - let bytes_pending: usize = { - let download_status = fetch.shared.download_status.lock(); + download_status + .requested + .minus(&download_status.downloaded) + .len() + }; - download_status - .requested - .minus(&download_status.downloaded) - .len() - }; + let ping_time_seconds = fetch.shared.ping_time().as_secs_f32(); + let throughput = fetch.shared.throughput(); - let ping_time_seconds = - Duration::from_millis(fetch.shared.ping_time_ms.load(Ordering::Relaxed) as u64) - .as_secs_f32(); - let download_rate = fetch.session.channel().get_download_rate_estimate(); + let desired_pending_bytes = max( + (PREFETCH_THRESHOLD_FACTOR + * ping_time_seconds + * fetch.shared.bytes_per_second as f32) as usize, + (ping_time_seconds * throughput as f32) as usize, + ); - let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR - * ping_time_seconds - * fetch.shared.bytes_per_second as f32) as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f32) - as usize, - ); - - if bytes_pending < desired_pending_bytes { - fetch.pre_fetch_more_data( - desired_pending_bytes - bytes_pending, - max_requests_to_send, - )?; - } + if bytes_pending < desired_pending_bytes { + fetch.pre_fetch_more_data(desired_pending_bytes - bytes_pending)?; } } } diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 22bf2f0a..2a53c361 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -8,7 +8,4 @@ mod range_set; pub use decrypt::AudioDecrypt; pub use fetch::{AudioFile, AudioFileError, StreamLoaderController}; -pub use fetch::{ - MINIMUM_DOWNLOAD_SIZE, READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, - READ_AHEAD_DURING_PLAYBACK, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, -}; +pub use fetch::{MINIMUM_DOWNLOAD_SIZE, READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_DURING_PLAYBACK}; diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 616a44e5..a9144568 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -1502,13 +1502,17 @@ impl SpircTask { } fn notify(&mut self, recipient: Option<&str>) -> Result<(), Error> { - let status_string = match self.state.get_status() { - PlayStatus::kPlayStatusLoading => "kPlayStatusLoading", - PlayStatus::kPlayStatusPause => "kPlayStatusPause", - PlayStatus::kPlayStatusStop => "kPlayStatusStop", - PlayStatus::kPlayStatusPlay => "kPlayStatusPlay", - }; - trace!("Sending status to server: [{}]", status_string); + let status = self.state.get_status(); + + // When in loading state, the Spotify UI is disabled for interaction. + // On desktop this isn't so bad but on mobile it means that the bottom + // control disappears entirely. This is very confusing, so don't notify + // in this case. + if status == PlayStatus::kPlayStatusLoading { + return Ok(()); + } + + trace!("Sending status to server: [{:?}]", status); let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify); if let Some(s) = recipient { cs = cs.recipient(s); diff --git a/core/src/channel.rs b/core/src/channel.rs index c601cd7a..86909978 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -3,7 +3,7 @@ use std::{ fmt, pin::Pin, task::{Context, Poll}, - time::Instant, + time::{Duration, Instant}, }; use byteorder::{BigEndian, ByteOrder}; @@ -27,7 +27,7 @@ component! { } } -const ONE_SECOND_IN_MS: usize = 1000; +const ONE_SECOND: Duration = Duration::from_secs(1); #[derive(Debug, Error, Hash, PartialEq, Eq, Copy, Clone)] pub struct ChannelError; @@ -92,10 +92,8 @@ impl ChannelManager { self.lock(|inner| { let current_time = Instant::now(); if let Some(download_measurement_start) = inner.download_measurement_start { - if (current_time - download_measurement_start).as_millis() - > ONE_SECOND_IN_MS as u128 - { - inner.download_rate_estimate = ONE_SECOND_IN_MS + if (current_time - download_measurement_start) > ONE_SECOND { + inner.download_rate_estimate = ONE_SECOND.as_millis() as usize * inner.download_measurement_bytes / (current_time - download_measurement_start).as_millis() as usize; inner.download_measurement_start = Some(current_time); diff --git a/core/src/error.rs b/core/src/error.rs index 700aed84..87cf3c86 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -14,7 +14,9 @@ use http::{ }; use protobuf::ProtobufError; use thiserror::Error; -use tokio::sync::{mpsc::error::SendError, oneshot::error::RecvError}; +use tokio::sync::{ + mpsc::error::SendError, oneshot::error::RecvError, AcquireError, TryAcquireError, +}; use url::ParseError; #[cfg(feature = "with-dns-sd")] @@ -451,6 +453,24 @@ impl From> for Error { } } +impl From for Error { + fn from(err: AcquireError) -> Self { + Self { + kind: ErrorKind::ResourceExhausted, + error: ErrorMessage(err.to_string()).into(), + } + } +} + +impl From for Error { + fn from(err: TryAcquireError) -> Self { + Self { + kind: ErrorKind::ResourceExhausted, + error: ErrorMessage(err.to_string()).into(), + } + } +} + impl From for Error { fn from(err: ToStrError) -> Self { Self::new(ErrorKind::FailedPrecondition, err) diff --git a/playback/src/player.rs b/playback/src/player.rs index cd08197c..f0f0d492 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -1,5 +1,4 @@ use std::{ - cmp::max, collections::HashMap, fmt, future::Future, @@ -28,8 +27,7 @@ use tokio::sync::{mpsc, oneshot}; use crate::{ audio::{ AudioDecrypt, AudioFile, StreamLoaderController, READ_AHEAD_BEFORE_PLAYBACK, - READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK, - READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, + READ_AHEAD_DURING_PLAYBACK, }, audio_backend::Sink, config::{Bitrate, NormalisationMethod, NormalisationType, PlayerConfig}, @@ -1096,7 +1094,7 @@ impl PlayerTrackLoader { // If the position is invalid just start from // the beginning of the track. let position_ms = if position_ms > duration_ms { - warn!("Invalid start position of {}ms exceeds track's duration of {}ms, starting track from the beginning", position_ms, duration_ms); + warn!("Invalid start position of {} ms exceeds track's duration of {} ms, starting track from the beginning", position_ms, duration_ms); 0 } else { position_ms @@ -1475,22 +1473,28 @@ impl PlayerInternal { } fn handle_play(&mut self) { - if let PlayerState::Paused { - track_id, - play_request_id, - stream_position_ms, - .. - } = self.state - { - self.state.paused_to_playing(); - self.send_event(PlayerEvent::Playing { + match self.state { + PlayerState::Paused { track_id, play_request_id, - position_ms: stream_position_ms, - }); - self.ensure_sink_running(); - } else { - error!("Player::play called from invalid state: {:?}", self.state); + stream_position_ms, + .. + } => { + self.state.paused_to_playing(); + self.send_event(PlayerEvent::Playing { + track_id, + play_request_id, + position_ms: stream_position_ms, + }); + self.ensure_sink_running(); + } + PlayerState::Loading { + ref mut start_playback, + .. + } => { + *start_playback = true; + } + _ => error!("Player::play called from invalid state: {:?}", self.state), } } @@ -1512,6 +1516,12 @@ impl PlayerInternal { position_ms: stream_position_ms, }); } + PlayerState::Loading { + ref mut start_playback, + .. + } => { + *start_playback = false; + } _ => error!("Player::pause called from invalid state: {:?}", self.state), } } @@ -1980,6 +1990,25 @@ impl PlayerInternal { } fn handle_command_seek(&mut self, position_ms: u32) -> PlayerResult { + // When we are still loading, the user may immediately ask to + // seek to another position yet the decoder won't be ready for + // that. In this case just restart the loading process but + // with the requested position. + if let PlayerState::Loading { + track_id, + play_request_id, + start_playback, + .. + } = self.state + { + return self.handle_command_load( + track_id, + play_request_id, + start_playback, + position_ms, + ); + } + if let Some(decoder) = self.state.decoder() { match decoder.seek(position_ms) { Ok(new_position_ms) => { @@ -2178,21 +2207,14 @@ impl PlayerInternal { .. } = self.state { - let ping_time = stream_loader_controller.ping_time().as_secs_f32(); - // Request our read ahead range - let request_data_length = max( - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time * bytes_per_second as f32) - as usize, - (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize, - ); + let request_data_length = + (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize; // Request the part we want to wait for blocking. This effectively means we wait for the previous request to partially complete. - let wait_for_data_length = max( - (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS * ping_time * bytes_per_second as f32) - as usize, - (READ_AHEAD_BEFORE_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize, - ); + let wait_for_data_length = + (READ_AHEAD_BEFORE_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize; + stream_loader_controller .fetch_next_and_wait(request_data_length, wait_for_data_length) .map_err(Into::into)