Improve performance of getting/setting download mode

This commit is contained in:
Roderick van Domburg 2022-01-07 11:38:24 +01:00
parent d380f1f040
commit 62ccdbc580
No known key found for this signature in database
GPG key ID: FE2585E713F9F30A
2 changed files with 56 additions and 61 deletions

View file

@ -5,7 +5,7 @@ use std::{
fs,
io::{self, Read, Seek, SeekFrom},
sync::{
atomic::{AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
time::{Duration, Instant},
@ -137,10 +137,10 @@ pub struct StreamingRequest {
#[derive(Debug)]
pub enum StreamLoaderCommand {
Fetch(Range), // signal the stream loader to fetch a range of the file
RandomAccessMode(), // optimise download strategy for random access
StreamMode(), // optimise download strategy for streaming
Close(), // terminate and don't load any more data
Fetch(Range), // signal the stream loader to fetch a range of the file
RandomAccessMode, // optimise download strategy for random access
StreamMode, // optimise download strategy for streaming
Close, // terminate and don't load any more data
}
#[derive(Clone)]
@ -299,17 +299,17 @@ impl StreamLoaderController {
pub fn set_random_access_mode(&self) {
// optimise download strategy for random access
self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode);
}
pub fn set_stream_mode(&self) {
// optimise download strategy for streaming
self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
self.send_stream_loader_command(StreamLoaderCommand::StreamMode);
}
pub fn close(&self) {
// terminate stream loading and don't load any more data for this file.
self.send_stream_loader_command(StreamLoaderCommand::Close());
self.send_stream_loader_command(StreamLoaderCommand::Close);
}
}
@ -325,25 +325,13 @@ struct AudioFileDownloadStatus {
downloaded: RangeSet,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum DownloadStrategy {
RandomAccess(),
Streaming(),
}
impl Default for DownloadStrategy {
fn default() -> Self {
Self::Streaming()
}
}
struct AudioFileShared {
cdn_url: CdnUrl,
file_size: usize,
bytes_per_second: usize,
cond: Condvar,
download_status: Mutex<AudioFileDownloadStatus>,
download_strategy: Mutex<DownloadStrategy>,
download_streaming: AtomicBool,
number_of_open_requests: AtomicUsize,
ping_time_ms: AtomicUsize,
read_position: AtomicUsize,
@ -462,7 +450,7 @@ impl AudioFileStreaming {
requested: RangeSet::new(),
downloaded: RangeSet::new(),
}),
download_strategy: Mutex::new(DownloadStrategy::default()),
download_streaming: AtomicBool::new(true),
number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize),
read_position: AtomicUsize::new(0),
@ -507,24 +495,23 @@ impl Read for AudioFileStreaming {
return Ok(0);
}
let length_to_request = match *(self.shared.download_strategy.lock()) {
DownloadStrategy::RandomAccess() => length,
DownloadStrategy::Streaming() => {
// Due to the read-ahead stuff, we potentially request more than the actual request demanded.
let ping_time_seconds =
Duration::from_millis(self.shared.ping_time_ms.load(Ordering::Relaxed) as u64)
.as_secs_f32();
let length_to_request = if self.shared.download_streaming.load(Ordering::Acquire) {
// Due to the read-ahead stuff, we potentially request more than the actual request demanded.
let ping_time_seconds =
Duration::from_millis(self.shared.ping_time_ms.load(Ordering::Relaxed) as u64)
.as_secs_f32();
let length_to_request = length
+ max(
(READ_AHEAD_DURING_PLAYBACK.as_secs_f32()
* self.shared.bytes_per_second as f32) as usize,
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* ping_time_seconds
* self.shared.bytes_per_second as f32) as usize,
);
min(length_to_request, self.shared.file_size - offset)
}
let length_to_request = length
+ max(
(READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32)
as usize,
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* ping_time_seconds
* self.shared.bytes_per_second as f32) as usize,
);
min(length_to_request, self.shared.file_size - offset)
} else {
length
};
let mut ranges_to_request = RangeSet::new();
@ -575,7 +562,7 @@ impl Read for AudioFileStreaming {
impl Seek for AudioFileStreaming {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
// If we are already at this position, we don't need to switch download strategy.
// If we are already at this position, we don't need to switch download mode.
// These checks and locks are less expensive than interrupting streaming.
let current_position = self.position as i64;
let requested_pos = match pos {
@ -595,13 +582,17 @@ impl Seek for AudioFileStreaming {
.downloaded
.contains(requested_pos as usize);
let mut old_strategy = DownloadStrategy::default();
let mut was_streaming = false;
if !available {
// Ensure random access mode if we need to download this part.
old_strategy = std::mem::replace(
&mut *(self.shared.download_strategy.lock()),
DownloadStrategy::RandomAccess(),
);
// Checking whether we are streaming now is a micro-optimization
// to save an atomic load.
was_streaming = self.shared.download_streaming.load(Ordering::Acquire);
if was_streaming {
self.shared
.download_streaming
.store(false, Ordering::Release);
}
}
self.position = self.read_file.seek(pos)?;
@ -609,8 +600,10 @@ impl Seek for AudioFileStreaming {
.read_position
.store(self.position as usize, Ordering::Release);
if !available && old_strategy != DownloadStrategy::RandomAccess() {
*(self.shared.download_strategy.lock()) = old_strategy;
if !available && was_streaming {
self.shared
.download_streaming
.store(true, Ordering::Release);
}
Ok(self.position)

View file

@ -16,9 +16,9 @@ use librespot_core::{session::Session, Error};
use crate::range_set::{Range, RangeSet};
use super::{
AudioFileError, AudioFileResult, AudioFileShared, DownloadStrategy, StreamLoaderCommand,
StreamingRequest, FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME,
MAX_PREFETCH_REQUESTS, MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR,
AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, StreamingRequest,
FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS,
MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR,
};
struct PartialFileData {
@ -157,8 +157,8 @@ enum ControlFlow {
}
impl AudioFileFetch {
fn get_download_strategy(&mut self) -> DownloadStrategy {
*(self.shared.download_strategy.lock())
fn is_download_streaming(&mut self) -> bool {
self.shared.download_streaming.load(Ordering::Acquire)
}
fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult {
@ -337,15 +337,17 @@ impl AudioFileFetch {
) -> Result<ControlFlow, Error> {
match cmd {
StreamLoaderCommand::Fetch(request) => {
self.download_range(request.start, request.length)?;
self.download_range(request.start, request.length)?
}
StreamLoaderCommand::RandomAccessMode() => {
*(self.shared.download_strategy.lock()) = DownloadStrategy::RandomAccess();
}
StreamLoaderCommand::StreamMode() => {
*(self.shared.download_strategy.lock()) = DownloadStrategy::Streaming();
}
StreamLoaderCommand::Close() => return Ok(ControlFlow::Break),
StreamLoaderCommand::RandomAccessMode => self
.shared
.download_streaming
.store(false, Ordering::Release),
StreamLoaderCommand::StreamMode => self
.shared
.download_streaming
.store(true, Ordering::Release),
StreamLoaderCommand::Close => return Ok(ControlFlow::Break),
}
Ok(ControlFlow::Continue)
@ -430,7 +432,7 @@ pub(super) async fn audio_file_fetch(
else => (),
}
if fetch.get_download_strategy() == DownloadStrategy::Streaming() {
if fetch.is_download_streaming() {
let number_of_open_requests =
fetch.shared.number_of_open_requests.load(Ordering::SeqCst);
if number_of_open_requests < MAX_PREFETCH_REQUESTS {