Improve lock ordering and contention

This commit is contained in:
Roderick van Domburg 2022-01-05 20:44:08 +01:00
parent 5c2b5a21c1
commit 1a7c440bd7
No known key found for this signature in database
GPG key ID: A9EF5222A26F0451
6 changed files with 82 additions and 48 deletions

View file

@ -5,7 +5,7 @@ use std::{
fs,
io::{self, Read, Seek, SeekFrom},
sync::{
atomic::{self, AtomicUsize},
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
@ -67,6 +67,9 @@ pub const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 128;
/// another position, then only this amount is requested on the first request.
pub const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 8;
/// The ping time that is used for calculations before a ping time was actually measured.
pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500);
/// If the measured ping time to the Spotify server is larger than this value, it is capped
/// to avoid run-away block sizes and pre-fetching.
pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500);
@ -174,7 +177,7 @@ impl StreamLoaderController {
pub fn range_to_end_available(&self) -> bool {
match self.stream_shared {
Some(ref shared) => {
let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
let read_position = shared.read_position.load(Ordering::Acquire);
self.range_available(Range::new(read_position, self.len() - read_position))
}
None => true,
@ -183,7 +186,7 @@ impl StreamLoaderController {
pub fn ping_time(&self) -> Duration {
Duration::from_millis(self.stream_shared.as_ref().map_or(0, |shared| {
shared.ping_time_ms.load(atomic::Ordering::Relaxed) as u64
shared.ping_time_ms.load(Ordering::Relaxed) as u64
}))
}
@ -244,21 +247,23 @@ impl StreamLoaderController {
Ok(())
}
#[allow(dead_code)]
pub fn fetch_next(&self, length: usize) {
if let Some(ref shared) = self.stream_shared {
let range = Range {
start: shared.read_position.load(atomic::Ordering::Relaxed),
start: shared.read_position.load(Ordering::Acquire),
length,
};
self.fetch(range);
}
}
#[allow(dead_code)]
pub fn fetch_next_blocking(&self, length: usize) -> AudioFileResult {
match self.stream_shared {
Some(ref shared) => {
let range = Range {
start: shared.read_position.load(atomic::Ordering::Relaxed),
start: shared.read_position.load(Ordering::Acquire),
length,
};
self.fetch_blocking(range)
@ -267,6 +272,31 @@ impl StreamLoaderController {
}
}
pub fn fetch_next_and_wait(
&self,
request_length: usize,
wait_length: usize,
) -> AudioFileResult {
match self.stream_shared {
Some(ref shared) => {
let start = shared.read_position.load(Ordering::Acquire);
let request_range = Range {
start,
length: request_length,
};
self.fetch(request_range);
let wait_range = Range {
start,
length: wait_length,
};
self.fetch_blocking(wait_range)
}
None => Ok(()),
}
}
pub fn set_random_access_mode(&self) {
// optimise download strategy for random access
self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
@ -428,7 +458,7 @@ impl AudioFileStreaming {
}),
download_strategy: Mutex::new(DownloadStrategy::Streaming()),
number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize),
read_position: AtomicUsize::new(0),
});
@ -465,15 +495,17 @@ impl Read for AudioFileStreaming {
}
let length = min(output.len(), self.shared.file_size - offset);
if length == 0 {
return Ok(0);
}
let length_to_request = match *(self.shared.download_strategy.lock()) {
DownloadStrategy::RandomAccess() => length,
DownloadStrategy::Streaming() => {
// Due to the read-ahead stuff, we potentially request more than the actual request demanded.
let ping_time_seconds = Duration::from_millis(
self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as u64,
)
.as_secs_f32();
let ping_time_seconds =
Duration::from_millis(self.shared.ping_time_ms.load(Ordering::Relaxed) as u64)
.as_secs_f32();
let length_to_request = length
+ max(
@ -501,10 +533,6 @@ impl Read for AudioFileStreaming {
.map_err(|err| io::Error::new(io::ErrorKind::BrokenPipe, err))?;
}
if length == 0 {
return Ok(0);
}
while !download_status.downloaded.contains(offset) {
if self
.shared
@ -531,7 +559,7 @@ impl Read for AudioFileStreaming {
self.position += read_len as u64;
self.shared
.read_position
.store(self.position as usize, atomic::Ordering::Relaxed);
.store(self.position as usize, Ordering::Release);
Ok(read_len)
}
@ -543,7 +571,7 @@ impl Seek for AudioFileStreaming {
// Do not seek past EOF
self.shared
.read_position
.store(self.position as usize, atomic::Ordering::Relaxed);
.store(self.position as usize, Ordering::Release);
Ok(self.position)
}
}

View file

@ -1,11 +1,10 @@
use std::{
cmp::{max, min},
io::{Seek, SeekFrom, Write},
sync::{atomic, Arc},
sync::{atomic::Ordering, Arc},
time::{Duration, Instant},
};
use atomic::Ordering;
use bytes::Bytes;
use futures_util::StreamExt;
use hyper::StatusCode;
@ -231,7 +230,7 @@ impl AudioFileFetch {
// download data from after the current read position first
let mut tail_end = RangeSet::new();
let read_position = self.shared.read_position.load(Ordering::Relaxed);
let read_position = self.shared.read_position.load(Ordering::Acquire);
tail_end.add_range(&Range::new(
read_position,
self.shared.file_size - read_position,

View file

@ -1,4 +1,7 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::{
hint,
sync::atomic::{AtomicBool, Ordering},
};
use hyper::{Body, Method, Request};
use serde::Deserialize;
@ -37,7 +40,7 @@ impl Default for ApResolveData {
component! {
ApResolver : ApResolverInner {
data: AccessPoints = AccessPoints::default(),
spinlock: AtomicUsize = AtomicUsize::new(0),
in_progress: AtomicBool = AtomicBool::new(false),
}
}
@ -107,16 +110,15 @@ impl ApResolver {
})
}
pub async fn resolve(&self, endpoint: &str) -> SocketAddress {
pub async fn resolve(&self, endpoint: &str) -> Result<SocketAddress, Error> {
// Use a spinlock to make this function atomic. Otherwise, various race conditions may
// occur, e.g. when the session is created, multiple components are launched almost in
// parallel and they will all call this function, while resolving is still in progress.
self.lock(|inner| {
while inner.spinlock.load(Ordering::SeqCst) != 0 {
#[allow(deprecated)]
std::sync::atomic::spin_loop_hint()
while inner.in_progress.load(Ordering::Acquire) {
hint::spin_loop();
}
inner.spinlock.store(1, Ordering::SeqCst);
inner.in_progress.store(true, Ordering::Release);
});
if self.is_empty() {
@ -131,10 +133,15 @@ impl ApResolver {
"accesspoint" => inner.data.accesspoint.remove(0),
"dealer" => inner.data.dealer.remove(0),
"spclient" => inner.data.spclient.remove(0),
_ => unimplemented!(),
_ => {
return Err(Error::unimplemented(format!(
"No implementation to resolve access point {}",
endpoint
)))
}
};
inner.spinlock.store(0, Ordering::SeqCst);
access_point
inner.in_progress.store(false, Ordering::Release);
Ok(access_point)
})
}
}

View file

@ -110,7 +110,7 @@ impl Session {
) -> Result<Session, Error> {
let http_client = HttpClient::new(config.proxy.as_ref());
let (sender_tx, sender_rx) = mpsc::unbounded_channel();
let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
let session_id = SESSION_COUNTER.fetch_add(1, Ordering::AcqRel);
debug!("new Session[{}]", session_id);
@ -130,7 +130,7 @@ impl Session {
session_id,
}));
let ap = session.apresolver().resolve("accesspoint").await;
let ap = session.apresolver().resolve("accesspoint").await?;
info!("Connecting to AP \"{}:{}\"", ap.0, ap.1);
let mut transport =
connection::connect(&ap.0, ap.1, session.config().proxy.as_ref()).await?;

View file

@ -65,13 +65,13 @@ impl SpClient {
self.lock(|inner| inner.accesspoint = None)
}
pub async fn get_accesspoint(&self) -> SocketAddress {
pub async fn get_accesspoint(&self) -> Result<SocketAddress, Error> {
// Memoize the current access point.
let ap = self.lock(|inner| inner.accesspoint.clone());
match ap {
let tuple = match ap {
Some(tuple) => tuple,
None => {
let tuple = self.session().apresolver().resolve("spclient").await;
let tuple = self.session().apresolver().resolve("spclient").await?;
self.lock(|inner| inner.accesspoint = Some(tuple.clone()));
info!(
"Resolved \"{}:{}\" as spclient access point",
@ -79,12 +79,13 @@ impl SpClient {
);
tuple
}
}
};
Ok(tuple)
}
pub async fn base_url(&self) -> String {
let ap = self.get_accesspoint().await;
format!("https://{}:{}", ap.0, ap.1)
pub async fn base_url(&self) -> Result<String, Error> {
let ap = self.get_accesspoint().await?;
Ok(format!("https://{}:{}", ap.0, ap.1))
}
pub async fn request_with_protobuf(
@ -133,7 +134,7 @@ impl SpClient {
// Reconnection logic: retrieve the endpoint every iteration, so we can try
// another access point when we are experiencing network issues (see below).
let mut url = self.base_url().await;
let mut url = self.base_url().await?;
url.push_str(endpoint);
// Add metrics. There is also an optional `partner` key with a value like

View file

@ -2057,24 +2057,23 @@ impl PlayerInternal {
..
} = self.state
{
let ping_time = stream_loader_controller.ping_time().as_secs_f32();
// Request our read ahead range
let request_data_length = max(
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* stream_loader_controller.ping_time().as_secs_f32()
* bytes_per_second as f32) as usize,
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time * bytes_per_second as f32)
as usize,
(READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize,
);
stream_loader_controller.fetch_next(request_data_length);
// Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete.
// Request the part we want to wait for blocking. This effectively means we wait for the previous request to partially complete.
let wait_for_data_length = max(
(READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS
* stream_loader_controller.ping_time().as_secs_f32()
* bytes_per_second as f32) as usize,
(READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS * ping_time * bytes_per_second as f32)
as usize,
(READ_AHEAD_BEFORE_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize,
);
stream_loader_controller
.fetch_next_blocking(wait_for_data_length)
.fetch_next_and_wait(request_data_length, wait_for_data_length)
.map_err(Into::into)
} else {
Ok(())