Fix broken streaming mode

This commit is contained in:
johannesd3 2021-04-13 20:34:26 +02:00 committed by Johannesd3
parent 7226bfd55a
commit 4925adb4f1
2 changed files with 46 additions and 52 deletions

View file

@ -248,6 +248,7 @@ struct AudioFileShared {
cond: Condvar,
download_status: Mutex<AudioFileDownloadStatus>,
download_strategy: Mutex<DownloadStrategy>,
number_of_open_requests: AtomicUsize,
ping_time_ms: AtomicUsize,
read_position: AtomicUsize,
}
@ -356,6 +357,7 @@ impl AudioFileStreaming {
downloaded: RangeSet::new(),
}),
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
});

View file

@ -68,12 +68,16 @@ async fn receive_data(
initial_data_offset: usize,
initial_request_length: usize,
request_sent_time: Instant,
mut measure_ping_time: bool,
finish_tx: mpsc::UnboundedSender<()>,
) {
let mut data_offset = initial_data_offset;
let mut request_length = initial_request_length;
let old_number_of_request = shared
.number_of_open_requests
.fetch_add(1, atomic::Ordering::SeqCst);
let mut measure_ping_time = old_number_of_request == 0;
let result = loop {
let data = match data_rx.next().await {
Some(Ok(data)) => data,
@ -121,7 +125,9 @@ async fn receive_data(
shared.cond.notify_all();
}
let _ = finish_tx.send(());
shared
.number_of_open_requests
.fetch_sub(1, atomic::Ordering::SeqCst);
if result.is_err() {
warn!(
@ -144,9 +150,6 @@ struct AudioFileFetch {
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times_ms: Vec<usize>,
number_of_open_requests: usize,
download_finish_tx: mpsc::UnboundedSender<()>,
}
// Might be replaced by enum from std once stable
@ -214,11 +217,7 @@ impl AudioFileFetch {
range.start,
range.length,
Instant::now(),
self.number_of_open_requests == 0,
self.download_finish_tx.clone(),
));
self.number_of_open_requests += 1;
}
}
@ -341,7 +340,6 @@ impl AudioFileFetch {
}
StreamLoaderCommand::StreamMode() => {
*(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming();
self.trigger_preload();
}
StreamLoaderCommand::Close() => return ControlFlow::Break,
}
@ -355,36 +353,6 @@ impl AudioFileFetch {
output.seek(SeekFrom::Start(0)).unwrap();
let _ = complete_tx.send(output);
}
fn trigger_preload(&mut self) {
if self.number_of_open_requests >= MAX_PREFETCH_REQUESTS {
return;
}
let max_requests_to_send = MAX_PREFETCH_REQUESTS - self.number_of_open_requests;
let bytes_pending: usize = {
let download_status = self.shared.download_status.lock().unwrap();
download_status
.requested
.minus(&download_status.downloaded)
.len()
};
let ping_time_seconds =
0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
let download_rate = self.session.channel().get_download_rate_estimate();
let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64)
as usize,
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize,
);
if bytes_pending < desired_pending_bytes {
self.pre_fetch_more_data(desired_pending_bytes - bytes_pending, max_requests_to_send);
}
}
}
pub(super) async fn audio_file_fetch(
@ -399,7 +367,6 @@ pub(super) async fn audio_file_fetch(
complete_tx: oneshot::Sender<NamedTempFile>,
) {
let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel();
let (download_finish_tx, mut download_finish_rx) = mpsc::unbounded_channel();
{
let requested_range = Range::new(0, initial_data_length);
@ -414,8 +381,6 @@ pub(super) async fn audio_file_fetch(
0,
initial_data_length,
initial_request_sent_time,
true,
download_finish_tx.clone(),
));
let mut fetch = AudioFileFetch {
@ -426,9 +391,6 @@ pub(super) async fn audio_file_fetch(
file_data_tx,
complete_tx: Some(complete_tx),
network_response_times_ms: Vec::new(),
number_of_open_requests: 1,
download_finish_tx,
};
loop {
@ -442,12 +404,42 @@ pub(super) async fn audio_file_fetch(
if data.map_or(true, |data| fetch.handle_file_data(data) == ControlFlow::Break) {
break;
}
},
_ = download_finish_rx.recv() => {
fetch.number_of_open_requests -= 1;
}
}
if fetch.get_download_strategy() == DownloadStrategy::Streaming() {
fetch.trigger_preload();
if fetch.get_download_strategy() == DownloadStrategy::Streaming() {
let number_of_open_requests = fetch
.shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst);
if number_of_open_requests < MAX_PREFETCH_REQUESTS {
let max_requests_to_send = MAX_PREFETCH_REQUESTS - number_of_open_requests;
let bytes_pending: usize = {
let download_status = fetch.shared.download_status.lock().unwrap();
download_status
.requested
.minus(&download_status.downloaded)
.len()
};
let ping_time_seconds =
0.001 * fetch.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
let download_rate = fetch.session.channel().get_download_rate_estimate();
let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR
* ping_time_seconds
* fetch.shared.stream_data_rate as f64) as usize,
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
as usize,
);
if bytes_pending < desired_pending_bytes {
fetch.pre_fetch_more_data(
desired_pending_bytes - bytes_pending,
max_requests_to_send,
);
}
}
}