diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index 5b39dc08..ad1b98e1 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -5,7 +5,7 @@ use std::{ fs, io::{self, Read, Seek, SeekFrom}, sync::{ - atomic::{self, AtomicUsize}, + atomic::{AtomicUsize, Ordering}, Arc, }, time::{Duration, Instant}, @@ -67,6 +67,9 @@ pub const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 128; /// another position, then only this amount is requested on the first request. pub const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 8; +/// The ping time that is used for calculations before a ping time was actually measured. +pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500); + /// If the measured ping time to the Spotify server is larger than this value, it is capped /// to avoid run-away block sizes and pre-fetching. pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500); @@ -174,7 +177,7 @@ impl StreamLoaderController { pub fn range_to_end_available(&self) -> bool { match self.stream_shared { Some(ref shared) => { - let read_position = shared.read_position.load(atomic::Ordering::Relaxed); + let read_position = shared.read_position.load(Ordering::Acquire); self.range_available(Range::new(read_position, self.len() - read_position)) } None => true, @@ -183,7 +186,7 @@ impl StreamLoaderController { pub fn ping_time(&self) -> Duration { Duration::from_millis(self.stream_shared.as_ref().map_or(0, |shared| { - shared.ping_time_ms.load(atomic::Ordering::Relaxed) as u64 + shared.ping_time_ms.load(Ordering::Relaxed) as u64 })) } @@ -244,21 +247,23 @@ impl StreamLoaderController { Ok(()) } + #[allow(dead_code)] pub fn fetch_next(&self, length: usize) { if let Some(ref shared) = self.stream_shared { let range = Range { - start: shared.read_position.load(atomic::Ordering::Relaxed), + start: shared.read_position.load(Ordering::Acquire), length, }; self.fetch(range); } } + #[allow(dead_code)] pub fn fetch_next_blocking(&self, length: usize) -> AudioFileResult { match self.stream_shared { Some(ref shared) => { let range = Range { - start: shared.read_position.load(atomic::Ordering::Relaxed), + start: shared.read_position.load(Ordering::Acquire), length, }; self.fetch_blocking(range) @@ -267,6 +272,31 @@ impl StreamLoaderController { } } + pub fn fetch_next_and_wait( + &self, + request_length: usize, + wait_length: usize, + ) -> AudioFileResult { + match self.stream_shared { + Some(ref shared) => { + let start = shared.read_position.load(Ordering::Acquire); + + let request_range = Range { + start, + length: request_length, + }; + self.fetch(request_range); + + let wait_range = Range { + start, + length: wait_length, + }; + self.fetch_blocking(wait_range) + } + None => Ok(()), + } + } + pub fn set_random_access_mode(&self) { // optimise download strategy for random access self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode()); @@ -428,7 +458,7 @@ impl AudioFileStreaming { }), download_strategy: Mutex::new(DownloadStrategy::Streaming()), number_of_open_requests: AtomicUsize::new(0), - ping_time_ms: AtomicUsize::new(0), + ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize), read_position: AtomicUsize::new(0), }); @@ -465,15 +495,17 @@ impl Read for AudioFileStreaming { } let length = min(output.len(), self.shared.file_size - offset); + if length == 0 { + return Ok(0); + } let length_to_request = match *(self.shared.download_strategy.lock()) { DownloadStrategy::RandomAccess() => length, DownloadStrategy::Streaming() => { // Due to the read-ahead stuff, we potentially request more than the actual request demanded. - let ping_time_seconds = Duration::from_millis( - self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as u64, - ) - .as_secs_f32(); + let ping_time_seconds = + Duration::from_millis(self.shared.ping_time_ms.load(Ordering::Relaxed) as u64) + .as_secs_f32(); let length_to_request = length + max( @@ -501,10 +533,6 @@ impl Read for AudioFileStreaming { .map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?; } - if length == 0 { - return Ok(0); - } - while !download_status.downloaded.contains(offset) { if self .shared @@ -531,7 +559,7 @@ impl Read for AudioFileStreaming { self.position += read_len as u64; self.shared .read_position - .store(self.position as usize, atomic::Ordering::Relaxed); + .store(self.position as usize, Ordering::Release); Ok(read_len) } @@ -543,7 +571,7 @@ impl Seek for AudioFileStreaming { // Do not seek past EOF self.shared .read_position - .store(self.position as usize, atomic::Ordering::Relaxed); + .store(self.position as usize, Ordering::Release); Ok(self.position) } } diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index b3d97eb4..08013b5b 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -1,11 +1,10 @@ use std::{ cmp::{max, min}, io::{Seek, SeekFrom, Write}, - sync::{atomic, Arc}, + sync::{atomic::Ordering, Arc}, time::{Duration, Instant}, }; -use atomic::Ordering; use bytes::Bytes; use futures_util::StreamExt; use hyper::StatusCode; @@ -231,7 +230,7 @@ impl AudioFileFetch { // download data from after the current read position first let mut tail_end = RangeSet::new(); - let read_position = self.shared.read_position.load(Ordering::Relaxed); + let read_position = self.shared.read_position.load(Ordering::Acquire); tail_end.add_range(&Range::new( read_position, self.shared.file_size - read_position, diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index 69a8e15c..1e1c6de6 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -1,4 +1,7 @@ -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{ + hint, + sync::atomic::{AtomicBool, Ordering}, +}; use hyper::{Body, Method, Request}; use serde::Deserialize; @@ -37,7 +40,7 @@ impl Default for ApResolveData { component! { ApResolver : ApResolverInner { data: AccessPoints = AccessPoints::default(), - spinlock: AtomicUsize = AtomicUsize::new(0), + in_progress: AtomicBool = AtomicBool::new(false), } } @@ -107,16 +110,15 @@ impl ApResolver { }) } - pub async fn resolve(&self, endpoint: &str) -> SocketAddress { + pub async fn resolve(&self, endpoint: &str) -> Result { // Use a spinlock to make this function atomic. Otherwise, various race conditions may // occur, e.g. when the session is created, multiple components are launched almost in // parallel and they will all call this function, while resolving is still in progress. self.lock(|inner| { - while inner.spinlock.load(Ordering::SeqCst) != 0 { - #[allow(deprecated)] - std::sync::atomic::spin_loop_hint() + while inner.in_progress.load(Ordering::Acquire) { + hint::spin_loop(); } - inner.spinlock.store(1, Ordering::SeqCst); + inner.in_progress.store(true, Ordering::Release); }); if self.is_empty() { @@ -131,10 +133,15 @@ impl ApResolver { "accesspoint" => inner.data.accesspoint.remove(0), "dealer" => inner.data.dealer.remove(0), "spclient" => inner.data.spclient.remove(0), - _ => unimplemented!(), + _ => { + return Err(Error::unimplemented(format!( + "No implementation to resolve access point {}", + endpoint + ))) + } }; - inner.spinlock.store(0, Ordering::SeqCst); - access_point + inner.in_progress.store(false, Ordering::Release); + Ok(access_point) }) } } diff --git a/core/src/session.rs b/core/src/session.rs index aecdaada..2b431715 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -110,7 +110,7 @@ impl Session { ) -> Result { let http_client = HttpClient::new(config.proxy.as_ref()); let (sender_tx, sender_rx) = mpsc::unbounded_channel(); - let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed); + let session_id = SESSION_COUNTER.fetch_add(1, Ordering::AcqRel); debug!("new Session[{}]", session_id); @@ -130,7 +130,7 @@ impl Session { session_id, })); - let ap = session.apresolver().resolve("accesspoint").await; + let ap = session.apresolver().resolve("accesspoint").await?; info!("Connecting to AP \"{}:{}\"", ap.0, ap.1); let mut transport = connection::connect(&ap.0, ap.1, session.config().proxy.as_ref()).await?; diff --git a/core/src/spclient.rs b/core/src/spclient.rs index ffc2ebba..de57e97b 100644 --- a/core/src/spclient.rs +++ b/core/src/spclient.rs @@ -65,13 +65,13 @@ impl SpClient { self.lock(|inner| inner.accesspoint = None) } - pub async fn get_accesspoint(&self) -> SocketAddress { + pub async fn get_accesspoint(&self) -> Result { // Memoize the current access point. let ap = self.lock(|inner| inner.accesspoint.clone()); - match ap { + let tuple = match ap { Some(tuple) => tuple, None => { - let tuple = self.session().apresolver().resolve("spclient").await; + let tuple = self.session().apresolver().resolve("spclient").await?; self.lock(|inner| inner.accesspoint = Some(tuple.clone())); info!( "Resolved \"{}:{}\" as spclient access point", @@ -79,12 +79,13 @@ impl SpClient { ); tuple } - } + }; + Ok(tuple) } - pub async fn base_url(&self) -> String { - let ap = self.get_accesspoint().await; - format!("https://{}:{}", ap.0, ap.1) + pub async fn base_url(&self) -> Result { + let ap = self.get_accesspoint().await?; + Ok(format!("https://{}:{}", ap.0, ap.1)) } pub async fn request_with_protobuf( @@ -133,7 +134,7 @@ impl SpClient { // Reconnection logic: retrieve the endpoint every iteration, so we can try // another access point when we are experiencing network issues (see below). - let mut url = self.base_url().await; + let mut url = self.base_url().await?; url.push_str(endpoint); // Add metrics. There is also an optional `partner` key with a value like diff --git a/playback/src/player.rs b/playback/src/player.rs index d5d4b269..a382b6c6 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -2057,24 +2057,23 @@ impl PlayerInternal { .. } = self.state { + let ping_time = stream_loader_controller.ping_time().as_secs_f32(); + // Request our read ahead range let request_data_length = max( - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS - * stream_loader_controller.ping_time().as_secs_f32() - * bytes_per_second as f32) as usize, + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time * bytes_per_second as f32) + as usize, (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize, ); - stream_loader_controller.fetch_next(request_data_length); - // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. + // Request the part we want to wait for blocking. This effectively means we wait for the previous request to partially complete. let wait_for_data_length = max( - (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS - * stream_loader_controller.ping_time().as_secs_f32() - * bytes_per_second as f32) as usize, + (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS * ping_time * bytes_per_second as f32) + as usize, (READ_AHEAD_BEFORE_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize, ); stream_loader_controller - .fetch_next_blocking(wait_for_data_length) + .fetch_next_and_wait(request_data_length, wait_for_data_length) .map_err(Into::into) } else { Ok(())