diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 8ca37c64..b15033b3 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -303,10 +303,8 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); - trace!("calling request_chunk"); let initial_data_length = MINIMUM_CHUNK_SIZE; let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); - trace!("returned from request_chunk"); let open = AudioFileOpenStreaming { session: session.clone(), @@ -320,7 +318,6 @@ impl AudioFile { complete_tx: Some(complete_tx), }; - trace!("cloning into cache session"); let session_ = session.clone(); session.spawn(move |_| { complete_rx @@ -335,7 +332,6 @@ impl AudioFile { .or_else(|oneshot::Canceled| Ok(())) }); - trace!("returning open stream"); AudioFileOpen::Streaming(open) } @@ -361,17 +357,15 @@ impl AudioFile { fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { - trace!("requesting range starting at {} of length {}", offset, length); + assert!(offset % 4 == 0, "Range request start positions must be aligned by 4 bytes."); + assert!(length % 4 == 0, "Range request range lengths must be aligned by 4 bytes."); let start = offset / 4; - let mut end = (offset+length) / 4; - if (offset+length) % 4 != 0 { - end += 1; - } + let end = (offset+length) / 4; let (id, channel) = session.channel().allocate(); - trace!("allocated channel {}", id); + trace!("requesting range starting at {} of length {} on channel {}.", offset, length, id); let mut data: Vec = Vec::new(); data.write_u16::(id).unwrap(); @@ -390,32 +384,6 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize) channel } -//fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { -// trace!("requesting chunk {}", index); -// -// let start = (index * CHUNK_SIZE / 4) as u32; -// let end = ((index + 1) * CHUNK_SIZE / 4) as u32; -// -// let (id, channel) = session.channel().allocate(); -// -// trace!("allocated channel {}", id); -// -// let mut data: Vec = Vec::new(); -// data.write_u16::(id).unwrap(); -// data.write_u8(0).unwrap(); -// data.write_u8(1).unwrap(); -// data.write_u16::(0x0000).unwrap(); -// data.write_u32::(0x00000000).unwrap(); -// data.write_u32::(0x00009C40).unwrap(); -// data.write_u32::(0x00020000).unwrap(); -// data.write(&file.0).unwrap(); -// data.write_u32::(start).unwrap(); -// data.write_u32::(end).unwrap(); -// -// session.send_packet(0x8, data); -// -// channel -//} struct PartialFileData { @@ -432,6 +400,8 @@ struct AudioFileFetchDataReceiver { shared: Arc, file_data_tx: mpsc::UnboundedSender, data_rx: ChannelData, + initial_data_offset: usize, + initial_request_length: usize, data_offset: usize, request_length: usize, request_sent_time: Option, @@ -456,6 +426,8 @@ impl AudioFileFetchDataReceiver { shared: shared, data_rx: data_rx, file_data_tx: file_data_tx, + initial_data_offset: data_offset, + initial_request_length: request_length, data_offset: data_offset, request_length: request_length, request_sent_time: Some(request_sent_time), @@ -505,34 +477,33 @@ impl Future for AudioFileFetchDataReceiver { } } let data_size = data.len(); - trace!("data_receiver got {} bytes of data", data_size); + trace!("data_receiver for range {} (+{}) got {} bytes of data starting at {}. ({} bytes pending).", self.initial_data_offset, self.initial_request_length, data_size, self.data_offset, self.request_length - data_size); let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, })); self.data_offset += data_size; if self.request_length < data_size { - warn!("Received more data from server than requested."); + warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length); self.request_length = 0; } else { self.request_length -= data_size; } if self.request_length == 0 { - trace!("Data receiver completed at position {}", self.data_offset); + trace!("Data receiver for range {} (+{}) completed.", self.initial_data_offset, self.initial_request_length); self.finish(); return Ok(Async::Ready(())); } } Ok(Async::Ready(None)) => { if self.request_length > 0 { - warn!("Received less data from server than requested."); + warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length); } self.finish(); return Ok(Async::Ready(())); } Ok(Async::NotReady) => { - //trace!("No more data for data_receiver at the moment."); return Ok(Async::NotReady); } Err(ChannelError) => { - warn!("error from channel"); + warn!("Error from channel for data receiver for range {} (+{}).", self.initial_data_offset, self.initial_request_length); self.finish(); return Ok(Async::Ready(())); } @@ -702,45 +673,16 @@ impl AudioFileFetch { } -// fn download(&mut self, mut new_index: usize) { -// assert!(new_index < self.shared.chunk_count); -// -// { -// let download_status = self.shared.download_status.lock().unwrap(); -// while download_status.downloaded.contains(new_index) { -// new_index = (new_index + 1) % self.shared.chunk_count; -// debug!("Download iterated to new_index {}", new_index); -// } -// } -// -// trace!("== download called for chunk {} of {}", new_index, self.shared.chunk_count); -// -// if self.index != new_index { -// self.index = new_index; -// -// let offset = self.index * CHUNK_SIZE; -// -// self.output -// .as_mut() -// .unwrap() -// .seek(SeekFrom::Start(offset as u64)) -// .unwrap(); -// -// let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split(); -// self.data_rx = data; -// } -// } fn poll_file_data_rx(&mut self) -> Poll<(), ()> { loop { match self.file_data_rx.poll() { Ok(Async::Ready(None)) => { - trace!("File data channel closed."); return Ok(Async::Ready(())); } Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => { - trace!("Received ping time information: {} ms.", response_time_ms); + trace!("Received ping time estimate: {} ms.", response_time_ms); // record the response time self.network_response_times_ms.push(response_time_ms); @@ -768,7 +710,6 @@ impl AudioFileFetch { }, Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { - trace!("Writing data to file: offset {}, length {}", data.offset, data.data.len()); self.output .as_mut() @@ -791,6 +732,9 @@ impl AudioFileFetch { if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size { full = true; } + + trace!("Downloaded: {} Requested: {}", download_status.downloaded, download_status.requested); + drop(download_status); } @@ -860,8 +804,6 @@ impl Future for AudioFileFetch { fn poll(&mut self) -> Poll<(), ()> { - trace!("Polling AudioFileFetch"); - match self.poll_stream_loader_command_rx() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => { @@ -942,7 +884,7 @@ impl Read for AudioFileStreaming { let read_len = min(length, available_length); let read_len = try!(self.read_file.read(&mut output[..read_len])); - trace!("read at postion {} (length : {})", offset, read_len); + trace!("read successfully at postion {} (length : {})", offset, read_len); self.position += read_len as u64; self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs index 378725f6..12b82997 100644 --- a/audio/src/range_set.rs +++ b/audio/src/range_set.rs @@ -1,6 +1,7 @@ use std::cmp::{max,min}; use std::slice::Iter; +use std::fmt; @@ -10,6 +11,13 @@ pub struct Range { pub length: usize, } +impl fmt::Display for Range { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + return write!(f, "[{}, {}]", self.start, self.start+self.length-1); + } +} + + impl Range { pub fn new(start: usize, length: usize) -> Range { @@ -25,11 +33,23 @@ impl Range { } + #[derive(Clone)] pub struct RangeSet { ranges: Vec, } +impl fmt::Display for RangeSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "(").unwrap(); + for range in self.ranges.iter() { + write!(f, "{}", range).unwrap(); + } + write!(f, ")") + } +} + + impl RangeSet { pub fn new() -> RangeSet {