diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index c19fac2e..8e076ebc 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -248,6 +248,7 @@ struct AudioFileShared { cond: Condvar, download_status: Mutex, download_strategy: Mutex, + number_of_open_requests: AtomicUsize, ping_time_ms: AtomicUsize, read_position: AtomicUsize, } @@ -356,6 +357,7 @@ impl AudioFileStreaming { downloaded: RangeSet::new(), }), download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise + number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(0), read_position: AtomicUsize::new(0), }); diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index 17f884f5..0f056c96 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -68,12 +68,16 @@ async fn receive_data( initial_data_offset: usize, initial_request_length: usize, request_sent_time: Instant, - mut measure_ping_time: bool, - finish_tx: mpsc::UnboundedSender<()>, ) { let mut data_offset = initial_data_offset; let mut request_length = initial_request_length; + let old_number_of_request = shared + .number_of_open_requests + .fetch_add(1, atomic::Ordering::SeqCst); + + let mut measure_ping_time = old_number_of_request == 0; + let result = loop { let data = match data_rx.next().await { Some(Ok(data)) => data, @@ -121,7 +125,9 @@ async fn receive_data( shared.cond.notify_all(); } - let _ = finish_tx.send(()); + shared + .number_of_open_requests + .fetch_sub(1, atomic::Ordering::SeqCst); if result.is_err() { warn!( @@ -144,9 +150,6 @@ struct AudioFileFetch { file_data_tx: mpsc::UnboundedSender, complete_tx: Option>, network_response_times_ms: Vec, - number_of_open_requests: usize, - - download_finish_tx: mpsc::UnboundedSender<()>, } // Might be replaced by enum from std once stable @@ -214,11 +217,7 @@ impl AudioFileFetch { range.start, range.length, Instant::now(), - self.number_of_open_requests == 0, - self.download_finish_tx.clone(), )); - - self.number_of_open_requests += 1; } } @@ -341,7 +340,6 @@ impl AudioFileFetch { } StreamLoaderCommand::StreamMode() => { *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); - self.trigger_preload(); } StreamLoaderCommand::Close() => return ControlFlow::Break, } @@ -355,36 +353,6 @@ impl AudioFileFetch { output.seek(SeekFrom::Start(0)).unwrap(); let _ = complete_tx.send(output); } - - fn trigger_preload(&mut self) { - if self.number_of_open_requests >= MAX_PREFETCH_REQUESTS { - return; - } - - let max_requests_to_send = MAX_PREFETCH_REQUESTS - self.number_of_open_requests; - - let bytes_pending: usize = { - let download_status = self.shared.download_status.lock().unwrap(); - download_status - .requested - .minus(&download_status.downloaded) - .len() - }; - - let ping_time_seconds = - 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - let download_rate = self.session.channel().get_download_rate_estimate(); - - let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) - as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize, - ); - - if bytes_pending < desired_pending_bytes { - self.pre_fetch_more_data(desired_pending_bytes - bytes_pending, max_requests_to_send); - } - } } pub(super) async fn audio_file_fetch( @@ -399,7 +367,6 @@ pub(super) async fn audio_file_fetch( complete_tx: oneshot::Sender, ) { let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel(); - let (download_finish_tx, mut download_finish_rx) = mpsc::unbounded_channel(); { let requested_range = Range::new(0, initial_data_length); @@ -414,8 +381,6 @@ pub(super) async fn audio_file_fetch( 0, initial_data_length, initial_request_sent_time, - true, - download_finish_tx.clone(), )); let mut fetch = AudioFileFetch { @@ -426,9 +391,6 @@ pub(super) async fn audio_file_fetch( file_data_tx, complete_tx: Some(complete_tx), network_response_times_ms: Vec::new(), - number_of_open_requests: 1, - - download_finish_tx, }; loop { @@ -442,12 +404,42 @@ pub(super) async fn audio_file_fetch( if data.map_or(true, |data| fetch.handle_file_data(data) == ControlFlow::Break) { break; } - }, - _ = download_finish_rx.recv() => { - fetch.number_of_open_requests -= 1; + } + } - if fetch.get_download_strategy() == DownloadStrategy::Streaming() { - fetch.trigger_preload(); + if fetch.get_download_strategy() == DownloadStrategy::Streaming() { + let number_of_open_requests = fetch + .shared + .number_of_open_requests + .load(atomic::Ordering::SeqCst); + if number_of_open_requests < MAX_PREFETCH_REQUESTS { + let max_requests_to_send = MAX_PREFETCH_REQUESTS - number_of_open_requests; + + let bytes_pending: usize = { + let download_status = fetch.shared.download_status.lock().unwrap(); + download_status + .requested + .minus(&download_status.downloaded) + .len() + }; + + let ping_time_seconds = + 0.001 * fetch.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let download_rate = fetch.session.channel().get_download_rate_estimate(); + + let desired_pending_bytes = max( + (PREFETCH_THRESHOLD_FACTOR + * ping_time_seconds + * fetch.shared.stream_data_rate as f64) as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) + as usize, + ); + + if bytes_pending < desired_pending_bytes { + fetch.pre_fetch_more_data( + desired_pending_bytes - bytes_pending, + max_requests_to_send, + ); } } }