From f9c0e26f6df22fa0bbb1e37e9546c5ed778195ec Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Sat, 13 Feb 2021 15:38:05 +0100 Subject: [PATCH] Simplify code --- audio/src/fetch.rs | 77 ++++++++++++++++++++++------------------------ 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 0ec9b01d..ccbb8989 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -449,7 +449,7 @@ enum ReceivedData { async fn audio_file_fetch_receive_data( shared: Arc, file_data_tx: mpsc::UnboundedSender, - data_rx: ChannelData, + mut data_rx: ChannelData, initial_data_offset: usize, initial_request_length: usize, request_sent_time: Instant, @@ -465,49 +465,44 @@ async fn audio_file_fetch_receive_data( .number_of_open_requests .fetch_add(1, atomic::Ordering::SeqCst); - enum TryFoldErr { - ChannelError, - FinishEarly, - } + let result = loop { + let data = match data_rx.next().await { + Some(Ok(data)) => data, + Some(Err(e)) => break Err(e), + None => break Ok(()), + }; - let result = data_rx - .map_err(|_| TryFoldErr::ChannelError) - .try_for_each(|data| { - if measure_ping_time { - let duration = Instant::now() - request_sent_time; - let duration_ms: u64; - if 0.001 * (duration.as_millis() as f64) - > MAXIMUM_ASSUMED_PING_TIME_SECONDS - { - duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; - } else { - duration_ms = duration.as_millis() as u64; - } - let _ = file_data_tx - .send(ReceivedData::ResponseTimeMs(duration_ms as usize)); - measure_ping_time = false; - } - let data_size = data.len(); - let _ = file_data_tx - .send(ReceivedData::Data(PartialFileData { - offset: data_offset, - data: data, - })); - data_offset += data_size; - if request_length < data_size { - warn!("Data receiver for range {} (+{}) received more data from server than requested.", initial_data_offset, initial_request_length); - request_length = 0; + if measure_ping_time { + let duration = Instant::now() - request_sent_time; + let duration_ms: u64; + if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; } else { - request_length -= data_size; + duration_ms = duration.as_millis() as u64; } + let _ = file_data_tx.send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + measure_ping_time = false; + } + let data_size = data.len(); + let _ = file_data_tx.send(ReceivedData::Data(PartialFileData { + offset: data_offset, + data: data, + })); + data_offset += data_size; + if request_length < data_size { + warn!( + "Data receiver for range {} (+{}) received more data from server than requested.", + initial_data_offset, initial_request_length + ); + request_length = 0; + } else { + request_length -= data_size; + } - future::ready(if request_length == 0 { - Err(TryFoldErr::FinishEarly) - } else { - Ok(()) - }) - }) - .await; + if request_length == 0 { + break Ok(()); + } + }; if request_length > 0 { let missing_range = Range::new(data_offset, request_length); @@ -521,7 +516,7 @@ async fn audio_file_fetch_receive_data( .number_of_open_requests .fetch_sub(1, atomic::Ordering::SeqCst); - if let Err(TryFoldErr::ChannelError) = result { + if result.is_err() { warn!( "Error from channel for data receiver for range {} (+{}).", initial_data_offset, initial_request_length