Improve checking of download chunks

This commit is contained in:
Roderick van Domburg 2022-01-13 20:11:03 +01:00
parent 78216eb6ee
commit ab67370dc8
No known key found for this signature in database
GPG key ID: A9EF5222A26F0451
2 changed files with 29 additions and 57 deletions

View file

@ -430,7 +430,10 @@ impl AudioFileStreaming {
let code = response.status(); let code = response.status();
if code != StatusCode::PARTIAL_CONTENT { if code != StatusCode::PARTIAL_CONTENT {
debug!("Streamer expected partial content but got: {}", code); debug!(
"Opening audio file expected partial content but got: {}",
code
);
return Err(AudioFileError::StatusCode(code).into()); return Err(AudioFileError::StatusCode(code).into());
} }

View file

@ -36,13 +36,8 @@ async fn receive_data(
file_data_tx: mpsc::UnboundedSender<ReceivedData>, file_data_tx: mpsc::UnboundedSender<ReceivedData>,
mut request: StreamingRequest, mut request: StreamingRequest,
) -> AudioFileResult { ) -> AudioFileResult {
let requested_offset = request.offset; let mut offset = request.offset;
let requested_length = request.length; let mut actual_length = 0;
let mut data_offset = requested_offset;
let mut request_length = requested_length;
// TODO : check Content-Length and Content-Range headers
let old_number_of_request = shared let old_number_of_request = shared
.number_of_open_requests .number_of_open_requests
@ -56,13 +51,20 @@ async fn receive_data(
None => match request.streamer.next().await { None => match request.streamer.next().await {
Some(Ok(response)) => response, Some(Ok(response)) => response,
Some(Err(e)) => break Err(e.into()), Some(Err(e)) => break Err(e.into()),
None => break Ok(()), None => {
if actual_length != request.length {
let msg =
format!("did not expect body to contain {} bytes", actual_length,);
break Err(Error::data_loss(msg));
}
break Ok(());
}
}, },
}; };
let code = response.status(); let code = response.status();
if code != StatusCode::PARTIAL_CONTENT { if code != StatusCode::PARTIAL_CONTENT {
debug!("Streamer expected partial content but got: {}", code);
break Err(AudioFileError::StatusCode(code).into()); break Err(AudioFileError::StatusCode(code).into());
} }
@ -72,6 +74,12 @@ async fn receive_data(
Err(e) => break Err(e.into()), Err(e) => break Err(e.into()),
}; };
let data_size = data.len();
file_data_tx.send(ReceivedData::Data(PartialFileData { offset, data }))?;
actual_length += data_size;
offset += data_size;
if measure_ping_time { if measure_ping_time {
let mut duration = Instant::now() - request.request_time; let mut duration = Instant::now() - request.request_time;
if duration > MAXIMUM_ASSUMED_PING_TIME { if duration > MAXIMUM_ASSUMED_PING_TIME {
@ -80,62 +88,23 @@ async fn receive_data(
file_data_tx.send(ReceivedData::ResponseTime(duration))?; file_data_tx.send(ReceivedData::ResponseTime(duration))?;
measure_ping_time = false; measure_ping_time = false;
} }
let data_size = data.len();
file_data_tx.send(ReceivedData::Data(PartialFileData {
offset: data_offset,
data,
}))?;
data_offset += data_size;
if request_length < data_size {
warn!(
"Data receiver for range {} (+{}) received more data from server than requested ({} instead of {}).",
requested_offset, requested_length, data_size, request_length
);
request_length = 0;
} else {
request_length -= data_size;
}
if request_length == 0 {
break Ok(());
}
}; };
drop(request.streamer); drop(request.streamer);
if request_length > 0 {
{
let missing_range = Range::new(data_offset, request_length);
let mut download_status = shared.download_status.lock();
download_status.requested.subtract_range(&missing_range);
shared.cond.notify_all();
}
}
shared shared
.number_of_open_requests .number_of_open_requests
.fetch_sub(1, Ordering::SeqCst); .fetch_sub(1, Ordering::SeqCst);
match result { if let Err(e) = result {
Ok(()) => { error!(
if request_length > 0 { "Streamer error requesting range {} +{}: {:?}",
warn!( request.offset, request.length, e
"Streamer for range {} (+{}) received less data from server than requested.", );
requested_offset, requested_length return Err(e);
);
}
Ok(())
}
Err(e) => {
error!(
"Error from streamer for range {} (+{}): {:?}",
requested_offset, requested_length, e
);
Err(e)
}
} }
Ok(())
} }
struct AudioFileFetch { struct AudioFileFetch {