Simplify code

This commit is contained in:
johannesd3 2021-02-13 15:38:05 +01:00 committed by Johannesd3
parent 27f308b82f
commit f9c0e26f6d

View file

@ -449,7 +449,7 @@ enum ReceivedData {
async fn audio_file_fetch_receive_data(
shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData,
mut data_rx: ChannelData,
initial_data_offset: usize,
initial_request_length: usize,
request_sent_time: Instant,
@ -465,49 +465,44 @@ async fn audio_file_fetch_receive_data(
.number_of_open_requests
.fetch_add(1, atomic::Ordering::SeqCst);
enum TryFoldErr {
ChannelError,
FinishEarly,
}
let result = loop {
let data = match data_rx.next().await {
Some(Ok(data)) => data,
Some(Err(e)) => break Err(e),
None => break Ok(()),
};
let result = data_rx
.map_err(|_| TryFoldErr::ChannelError)
.try_for_each(|data| {
if measure_ping_time {
let duration = Instant::now() - request_sent_time;
let duration_ms: u64;
if 0.001 * (duration.as_millis() as f64)
> MAXIMUM_ASSUMED_PING_TIME_SECONDS
{
if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS {
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} else {
duration_ms = duration.as_millis() as u64;
}
let _ = file_data_tx
.send(ReceivedData::ResponseTimeMs(duration_ms as usize));
let _ = file_data_tx.send(ReceivedData::ResponseTimeMs(duration_ms as usize));
measure_ping_time = false;
}
let data_size = data.len();
let _ = file_data_tx
.send(ReceivedData::Data(PartialFileData {
let _ = file_data_tx.send(ReceivedData::Data(PartialFileData {
offset: data_offset,
data: data,
}));
data_offset += data_size;
if request_length < data_size {
warn!("Data receiver for range {} (+{}) received more data from server than requested.", initial_data_offset, initial_request_length);
warn!(
"Data receiver for range {} (+{}) received more data from server than requested.",
initial_data_offset, initial_request_length
);
request_length = 0;
} else {
request_length -= data_size;
}
future::ready(if request_length == 0 {
Err(TryFoldErr::FinishEarly)
} else {
Ok(())
})
})
.await;
if request_length == 0 {
break Ok(());
}
};
if request_length > 0 {
let missing_range = Range::new(data_offset, request_length);
@ -521,7 +516,7 @@ async fn audio_file_fetch_receive_data(
.number_of_open_requests
.fetch_sub(1, atomic::Ordering::SeqCst);
if let Err(TryFoldErr::ChannelError) = result {
if result.is_err() {
warn!(
"Error from channel for data receiver for range {} (+{}).",
initial_data_offset, initial_request_length