From ab67370dc8ea620895e510078f4dd2913b0598d7 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Thu, 13 Jan 2022 20:11:03 +0100 Subject: [PATCH] Improve checking of download chunks --- audio/src/fetch/mod.rs | 5 ++- audio/src/fetch/receive.rs | 81 ++++++++++++-------------------------- 2 files changed, 29 insertions(+), 57 deletions(-) diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index ef19a45a..bbf2a974 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -430,7 +430,10 @@ impl AudioFileStreaming { let code = response.status(); if code != StatusCode::PARTIAL_CONTENT { - debug!("Streamer expected partial content but got: {}", code); + debug!( + "Opening audio file expected partial content but got: {}", + code + ); return Err(AudioFileError::StatusCode(code).into()); } diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index 568efafd..cc70a4f0 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -36,13 +36,8 @@ async fn receive_data( file_data_tx: mpsc::UnboundedSender, mut request: StreamingRequest, ) -> AudioFileResult { - let requested_offset = request.offset; - let requested_length = request.length; - - let mut data_offset = requested_offset; - let mut request_length = requested_length; - - // TODO : check Content-Length and Content-Range headers + let mut offset = request.offset; + let mut actual_length = 0; let old_number_of_request = shared .number_of_open_requests @@ -56,13 +51,20 @@ async fn receive_data( None => match request.streamer.next().await { Some(Ok(response)) => response, Some(Err(e)) => break Err(e.into()), - None => break Ok(()), + None => { + if actual_length != request.length { + let msg = + format!("did not expect body to contain {} bytes", actual_length,); + break Err(Error::data_loss(msg)); + } + + break Ok(()); + } }, }; let code = response.status(); if code != StatusCode::PARTIAL_CONTENT { - debug!("Streamer expected partial content but got: {}", code); break Err(AudioFileError::StatusCode(code).into()); } @@ -72,6 +74,12 @@ async fn receive_data( Err(e) => break Err(e.into()), }; + let data_size = data.len(); + file_data_tx.send(ReceivedData::Data(PartialFileData { offset, data }))?; + + actual_length += data_size; + offset += data_size; + if measure_ping_time { let mut duration = Instant::now() - request.request_time; if duration > MAXIMUM_ASSUMED_PING_TIME { @@ -80,62 +88,23 @@ async fn receive_data( file_data_tx.send(ReceivedData::ResponseTime(duration))?; measure_ping_time = false; } - - let data_size = data.len(); - - file_data_tx.send(ReceivedData::Data(PartialFileData { - offset: data_offset, - data, - }))?; - data_offset += data_size; - if request_length < data_size { - warn!( - "Data receiver for range {} (+{}) received more data from server than requested ({} instead of {}).", - requested_offset, requested_length, data_size, request_length - ); - request_length = 0; - } else { - request_length -= data_size; - } - - if request_length == 0 { - break Ok(()); - } }; drop(request.streamer); - if request_length > 0 { - { - let missing_range = Range::new(data_offset, request_length); - let mut download_status = shared.download_status.lock(); - download_status.requested.subtract_range(&missing_range); - shared.cond.notify_all(); - } - } - shared .number_of_open_requests .fetch_sub(1, Ordering::SeqCst); - match result { - Ok(()) => { - if request_length > 0 { - warn!( - "Streamer for range {} (+{}) received less data from server than requested.", - requested_offset, requested_length - ); - } - Ok(()) - } - Err(e) => { - error!( - "Error from streamer for range {} (+{}): {:?}", - requested_offset, requested_length, e - ); - Err(e) - } + if let Err(e) = result { + error!( + "Streamer error requesting range {} +{}: {:?}", + request.offset, request.length, e + ); + return Err(e); } + + Ok(()) } struct AudioFileFetch {