Set better log messages.

This commit is contained in:
Konstantin Seiler 2019-11-05 22:58:00 +11:00
parent c991974f82
commit 393df6475e
2 changed files with 38 additions and 76 deletions

View file

@ -303,10 +303,8 @@ impl AudioFile {
debug!("Downloading file {}", file_id);
let (complete_tx, complete_rx) = oneshot::channel();
trace!("calling request_chunk");
let initial_data_length = MINIMUM_CHUNK_SIZE;
let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
trace!("returned from request_chunk");
let open = AudioFileOpenStreaming {
session: session.clone(),
@ -320,7 +318,6 @@ impl AudioFile {
complete_tx: Some(complete_tx),
};
trace!("cloning into cache session");
let session_ = session.clone();
session.spawn(move |_| {
complete_rx
@ -335,7 +332,6 @@ impl AudioFile {
.or_else(|oneshot::Canceled| Ok(()))
});
trace!("returning open stream");
AudioFileOpen::Streaming(open)
}
@ -361,17 +357,15 @@ impl AudioFile {
fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
trace!("requesting range starting at {} of length {}", offset, length);
assert!(offset % 4 == 0, "Range request start positions must be aligned by 4 bytes.");
assert!(length % 4 == 0, "Range request range lengths must be aligned by 4 bytes.");
let start = offset / 4;
let mut end = (offset+length) / 4;
if (offset+length) % 4 != 0 {
end += 1;
}
let end = (offset+length) / 4;
let (id, channel) = session.channel().allocate();
trace!("allocated channel {}", id);
trace!("requesting range starting at {} of length {} on channel {}.", offset, length, id);
let mut data: Vec<u8> = Vec::new();
data.write_u16::<BigEndian>(id).unwrap();
@ -390,32 +384,6 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize)
channel
}
//fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel {
// trace!("requesting chunk {}", index);
//
// let start = (index * CHUNK_SIZE / 4) as u32;
// let end = ((index + 1) * CHUNK_SIZE / 4) as u32;
//
// let (id, channel) = session.channel().allocate();
//
// trace!("allocated channel {}", id);
//
// let mut data: Vec<u8> = Vec::new();
// data.write_u16::<BigEndian>(id).unwrap();
// data.write_u8(0).unwrap();
// data.write_u8(1).unwrap();
// data.write_u16::<BigEndian>(0x0000).unwrap();
// data.write_u32::<BigEndian>(0x00000000).unwrap();
// data.write_u32::<BigEndian>(0x00009C40).unwrap();
// data.write_u32::<BigEndian>(0x00020000).unwrap();
// data.write(&file.0).unwrap();
// data.write_u32::<BigEndian>(start).unwrap();
// data.write_u32::<BigEndian>(end).unwrap();
//
// session.send_packet(0x8, data);
//
// channel
//}
struct PartialFileData {
@ -432,6 +400,8 @@ struct AudioFileFetchDataReceiver {
shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData,
initial_data_offset: usize,
initial_request_length: usize,
data_offset: usize,
request_length: usize,
request_sent_time: Option<Instant>,
@ -456,6 +426,8 @@ impl AudioFileFetchDataReceiver {
shared: shared,
data_rx: data_rx,
file_data_tx: file_data_tx,
initial_data_offset: data_offset,
initial_request_length: request_length,
data_offset: data_offset,
request_length: request_length,
request_sent_time: Some(request_sent_time),
@ -505,34 +477,33 @@ impl Future for AudioFileFetchDataReceiver {
}
}
let data_size = data.len();
trace!("data_receiver got {} bytes of data", data_size);
trace!("data_receiver for range {} (+{}) got {} bytes of data starting at {}. ({} bytes pending).", self.initial_data_offset, self.initial_request_length, data_size, self.data_offset, self.request_length - data_size);
let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, }));
self.data_offset += data_size;
if self.request_length < data_size {
warn!("Received more data from server than requested.");
warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
self.request_length = 0;
} else {
self.request_length -= data_size;
}
if self.request_length == 0 {
trace!("Data receiver completed at position {}", self.data_offset);
trace!("Data receiver for range {} (+{}) completed.", self.initial_data_offset, self.initial_request_length);
self.finish();
return Ok(Async::Ready(()));
}
}
Ok(Async::Ready(None)) => {
if self.request_length > 0 {
warn!("Received less data from server than requested.");
warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
}
self.finish();
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => {
//trace!("No more data for data_receiver at the moment.");
return Ok(Async::NotReady);
}
Err(ChannelError) => {
warn!("error from channel");
warn!("Error from channel for data receiver for range {} (+{}).", self.initial_data_offset, self.initial_request_length);
self.finish();
return Ok(Async::Ready(()));
}
@ -702,45 +673,16 @@ impl AudioFileFetch {
}
// fn download(&mut self, mut new_index: usize) {
// assert!(new_index < self.shared.chunk_count);
//
// {
// let download_status = self.shared.download_status.lock().unwrap();
// while download_status.downloaded.contains(new_index) {
// new_index = (new_index + 1) % self.shared.chunk_count;
// debug!("Download iterated to new_index {}", new_index);
// }
// }
//
// trace!("== download called for chunk {} of {}", new_index, self.shared.chunk_count);
//
// if self.index != new_index {
// self.index = new_index;
//
// let offset = self.index * CHUNK_SIZE;
//
// self.output
// .as_mut()
// .unwrap()
// .seek(SeekFrom::Start(offset as u64))
// .unwrap();
//
// let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split();
// self.data_rx = data;
// }
// }
fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
loop {
match self.file_data_rx.poll() {
Ok(Async::Ready(None)) => {
trace!("File data channel closed.");
return Ok(Async::Ready(()));
}
Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
trace!("Received ping time information: {} ms.", response_time_ms);
trace!("Received ping time estimate: {} ms.", response_time_ms);
// record the response time
self.network_response_times_ms.push(response_time_ms);
@ -768,7 +710,6 @@ impl AudioFileFetch {
},
Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
trace!("Writing data to file: offset {}, length {}", data.offset, data.data.len());
self.output
.as_mut()
@ -791,6 +732,9 @@ impl AudioFileFetch {
if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size {
full = true;
}
trace!("Downloaded: {} Requested: {}", download_status.downloaded, download_status.requested);
drop(download_status);
}
@ -860,8 +804,6 @@ impl Future for AudioFileFetch {
fn poll(&mut self) -> Poll<(), ()> {
trace!("Polling AudioFileFetch");
match self.poll_stream_loader_command_rx() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
@ -942,7 +884,7 @@ impl Read for AudioFileStreaming {
let read_len = min(length, available_length);
let read_len = try!(self.read_file.read(&mut output[..read_len]));
trace!("read at postion {} (length : {})", offset, read_len);
trace!("read successfully at postion {} (length : {})", offset, read_len);
self.position += read_len as u64;
self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);

View file

@ -1,6 +1,7 @@
use std::cmp::{max,min};
use std::slice::Iter;
use std::fmt;
@ -10,6 +11,13 @@ pub struct Range {
pub length: usize,
}
impl fmt::Display for Range {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
return write!(f, "[{}, {}]", self.start, self.start+self.length-1);
}
}
impl Range {
pub fn new(start: usize, length: usize) -> Range {
@ -25,11 +33,23 @@ impl Range {
}
#[derive(Clone)]
pub struct RangeSet {
ranges: Vec<Range>,
}
impl fmt::Display for RangeSet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "(").unwrap();
for range in self.ranges.iter() {
write!(f, "{}", range).unwrap();
}
write!(f, ")")
}
}
impl RangeSet {
pub fn new() -> RangeSet {