diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index a791841c..80df21a7 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -3,28 +3,27 @@ use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::Stream; use futures::{Async, Future, Poll}; -use std::cmp::{min, max}; +use range_set::{Range, RangeSet}; +use std::cmp::{max, min}; use std::fs; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; use tempfile::NamedTempFile; -use range_set::{Range, RangeSet}; +use futures::sync::mpsc::unbounded; use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use librespot_core::session::Session; use librespot_core::spotify_id::FileId; -use futures::sync::mpsc::unbounded; use std::sync::atomic; use std::sync::atomic::AtomicUsize; - const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; // The minimum size of a block that is requested from the Spotify servers in one request. // This is the block size that is typically requested while doing a seek() on a file. // Note: smaller requests can happen if part of the block is downloaded already. -const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; // MUST be divisible by four!!! +const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; // The amount of data that is requested when initially opening a file. // Note: if the file is opened to play from the beginning, the amount of data to // read ahead is requested in addition to this amount. If the file is opened to seek to @@ -78,8 +77,6 @@ const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5; // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted. - - pub enum AudioFile { Cached(fs::File), Streaming(AudioFileStreaming), @@ -101,15 +98,13 @@ pub struct AudioFileOpenStreaming { streaming_data_rate: usize, } - -enum StreamLoaderCommand{ - Fetch(Range), // signal the stream loader to fetch a range of the file +enum StreamLoaderCommand { + Fetch(Range), // signal the stream loader to fetch a range of the file RandomAccessMode(), // optimise download strategy for random access - StreamMode(), // optimise download strategy for streaming - Close(), // terminate and don't load any more data + StreamMode(), // optimise download strategy for streaming + Close(), // terminate and don't load any more data } - #[derive(Clone)] pub struct StreamLoaderController { channel_tx: Option>, @@ -117,7 +112,6 @@ pub struct StreamLoaderController { file_size: usize, } - impl StreamLoaderController { pub fn len(&self) -> usize { return self.file_size; @@ -126,7 +120,11 @@ impl StreamLoaderController { pub fn range_available(&self, range: Range) -> bool { if let Some(ref shared) = self.stream_shared { let download_status = shared.download_status.lock().unwrap(); - if range.length <= download_status.downloaded.contained_length_from_value(range.start) { + if range.length + <= download_status + .downloaded + .contained_length_from_value(range.start) + { return true; } else { return false; @@ -174,9 +172,22 @@ impl StreamLoaderController { if let Some(ref shared) = self.stream_shared { let mut download_status = shared.download_status.lock().unwrap(); - while range.length > download_status.downloaded.contained_length_from_value(range.start) { - download_status = shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; - if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) { + while range.length + > download_status + .downloaded + .contained_length_from_value(range.start) + { + download_status = shared + .cond + .wait_timeout(download_status, Duration::from_millis(1000)) + .unwrap() + .0; + if range.length + > (download_status + .downloaded + .union(&download_status.requested) + .contained_length_from_value(range.start)) + { // For some reason, the requested range is neither downloaded nor requested. // This could be due to a network error. Request it again. // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly. @@ -187,11 +198,10 @@ impl StreamLoaderController { } } } - } pub fn fetch_next(&mut self, length: usize) { - let range:Range = if let Some(ref shared) = self.stream_shared { + let range: Range = if let Some(ref shared) = self.stream_shared { Range { start: shared.read_position.load(atomic::Ordering::Relaxed), length: length, @@ -203,7 +213,7 @@ impl StreamLoaderController { } pub fn fetch_next_blocking(&mut self, length: usize) { - let range:Range = if let Some(ref shared) = self.stream_shared { + let range: Range = if let Some(ref shared) = self.stream_shared { Range { start: shared.read_position.load(atomic::Ordering::Relaxed), length: length, @@ -228,11 +238,8 @@ impl StreamLoaderController { // terminate stream loading and don't load any more data for this file. self.send_stream_loader_command(StreamLoaderCommand::Close()); } - - } - pub struct AudioFileStreaming { read_file: fs::File, @@ -243,7 +250,6 @@ pub struct AudioFileStreaming { shared: Arc, } - struct AudioFileDownloadStatus { requested: RangeSet, downloaded: RangeSet, @@ -269,13 +275,15 @@ struct AudioFileShared { impl AudioFileOpenStreaming { fn finish(&mut self, size: usize) -> AudioFileStreaming { - let shared = Arc::new(AudioFileShared { file_id: self.file_id, file_size: size, stream_data_rate: self.streaming_data_rate, cond: Condvar::new(), - download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}), + download_status: Mutex::new(AudioFileDownloadStatus { + requested: RangeSet::new(), + downloaded: RangeSet::new(), + }), download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(0), @@ -292,7 +300,8 @@ impl AudioFileOpenStreaming { let initial_data_length = self.initial_data_length.take().unwrap(); let complete_tx = self.complete_tx.take().unwrap(); //let (seek_tx, seek_rx) = mpsc::unbounded(); - let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded::(); + let (stream_loader_command_tx, stream_loader_command_rx) = + mpsc::unbounded::(); let fetcher = AudioFileFetch::new( self.session.clone(), @@ -355,7 +364,12 @@ impl Future for AudioFileOpenStreaming { } impl AudioFile { - pub fn open(session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool) -> AudioFileOpen { + pub fn open( + session: &Session, + file_id: FileId, + bytes_per_second: usize, + play_from_beginning: bool, + ) -> AudioFileOpen { let cache = session.cache().cloned(); if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) { @@ -367,10 +381,16 @@ impl AudioFile { let (complete_tx, complete_rx) = oneshot::channel(); let mut initial_data_length = if play_from_beginning { - INITIAL_DOWNLOAD_SIZE + max((READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, (INITIAL_PING_TIME_ESTIMATE_SECONDS * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * bytes_per_second as f64) as usize) - } else { - INITIAL_DOWNLOAD_SIZE - }; + INITIAL_DOWNLOAD_SIZE + + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + (INITIAL_PING_TIME_ESTIMATE_SECONDS + * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * bytes_per_second as f64) as usize, + ) + } else { + INITIAL_DOWNLOAD_SIZE + }; if initial_data_length % 4 != 0 { initial_data_length += 4 - (initial_data_length % 4); } @@ -387,7 +407,6 @@ impl AudioFile { complete_tx: Some(complete_tx), streaming_data_rate: bytes_per_second, - }; let session_ = session.clone(); @@ -427,17 +446,26 @@ impl AudioFile { } } - fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { - - assert!(offset % 4 == 0, "Range request start positions must be aligned by 4 bytes."); - assert!(length % 4 == 0, "Range request range lengths must be aligned by 4 bytes."); + assert!( + offset % 4 == 0, + "Range request start positions must be aligned by 4 bytes." + ); + assert!( + length % 4 == 0, + "Range request range lengths must be aligned by 4 bytes." + ); let start = offset / 4; - let end = (offset+length) / 4; + let end = (offset + length) / 4; let (id, channel) = session.channel().allocate(); - trace!("requesting range starting at {} of length {} on channel {}.", offset, length, id); + trace!( + "requesting range starting at {} of length {} on channel {}.", + offset, + length, + id + ); let mut data: Vec = Vec::new(); data.write_u16::(id).unwrap(); @@ -456,8 +484,6 @@ fn request_range(session: &Session, file: FileId, offset: usize, length: usize) channel } - - struct PartialFileData { offset: usize, data: Bytes, @@ -489,10 +515,11 @@ impl AudioFileFetchDataReceiver { request_length: usize, request_sent_time: Instant, ) -> AudioFileFetchDataReceiver { - let measure_ping_time = shared.number_of_open_requests.load(atomic::Ordering::SeqCst) == 0; - shared.number_of_open_requests.fetch_add(1, atomic::Ordering::SeqCst); + shared + .number_of_open_requests + .fetch_add(1, atomic::Ordering::SeqCst); AudioFileFetchDataReceiver { shared: shared, @@ -508,12 +535,9 @@ impl AudioFileFetchDataReceiver { } } - - impl AudioFileFetchDataReceiver { fn finish(&mut self) { if self.request_length > 0 { - let missing_range = Range::new(self.data_offset, self.request_length); let mut download_status = self.shared.download_status.lock().unwrap(); @@ -521,8 +545,9 @@ impl AudioFileFetchDataReceiver { self.shared.cond.notify_all(); } - self.shared.number_of_open_requests.fetch_sub(1, atomic::Ordering::SeqCst); - + self.shared + .number_of_open_requests + .fetch_sub(1, atomic::Ordering::SeqCst); } } @@ -538,18 +563,26 @@ impl Future for AudioFileFetchDataReceiver { if let Some(request_sent_time) = self.request_sent_time { let duration = Instant::now() - request_sent_time; let duration_ms: u64; - if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS + { duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; } else { duration_ms = duration.as_millis() as u64; } - let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + let _ = self + .file_data_tx + .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); self.measure_ping_time = false; } } let data_size = data.len(); trace!("data_receiver for range {} (+{}) got {} bytes of data starting at {}. ({} bytes pending).", self.initial_data_offset, self.initial_request_length, data_size, self.data_offset, self.request_length - data_size); - let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, })); + let _ = self + .file_data_tx + .unbounded_send(ReceivedData::Data(PartialFileData { + offset: self.data_offset, + data: data, + })); self.data_offset += data_size; if self.request_length < data_size { warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length); @@ -558,7 +591,11 @@ impl Future for AudioFileFetchDataReceiver { self.request_length -= data_size; } if self.request_length == 0 { - trace!("Data receiver for range {} (+{}) completed.", self.initial_data_offset, self.initial_request_length); + trace!( + "Data receiver for range {} (+{}) completed.", + self.initial_data_offset, + self.initial_request_length + ); self.finish(); return Ok(Async::Ready(())); } @@ -574,7 +611,10 @@ impl Future for AudioFileFetchDataReceiver { return Ok(Async::NotReady); } Err(ChannelError) => { - warn!("Error from channel for data receiver for range {} (+{}).", self.initial_data_offset, self.initial_request_length); + warn!( + "Error from channel for data receiver for range {} (+{}).", + self.initial_data_offset, self.initial_request_length + ); self.finish(); return Ok(Async::Ready(())); } @@ -583,7 +623,6 @@ impl Future for AudioFileFetchDataReceiver { } } - struct AudioFileFetch { session: Session, shared: Arc, @@ -609,7 +648,6 @@ impl AudioFileFetch { stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: oneshot::Sender, ) -> AudioFileFetch { - let (file_data_tx, file_data_rx) = unbounded::(); { @@ -618,7 +656,6 @@ impl AudioFileFetch { download_status.requested.add_range(&requested_range); } - let initial_data_receiver = AudioFileFetchDataReceiver::new( shared.clone(), file_data_tx.clone(), @@ -649,7 +686,6 @@ impl AudioFileFetch { } fn download_range(&mut self, mut offset: usize, mut length: usize) { - if length < MINIMUM_DOWNLOAD_SIZE { length = MINIMUM_DOWNLOAD_SIZE; } @@ -684,13 +720,12 @@ impl AudioFileFetch { ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); - for range in ranges_to_request.iter() { - let (_headers, data) = request_range(&self.session, self.shared.file_id, range.start, range.length).split(); + let (_headers, data) = + request_range(&self.session, self.shared.file_id, range.start, range.length).split(); download_status.requested.add_range(range); - let receiver = AudioFileFetchDataReceiver::new( self.shared.clone(), self.file_data_tx.clone(), @@ -702,15 +737,12 @@ impl AudioFileFetch { self.session.spawn(move |_| receiver); } - } fn pre_fetch_more_data(&mut self, bytes: usize) { - let mut bytes_to_go = bytes; while bytes_to_go > 0 { - // determine what is still missing let mut missing_data = RangeSet::new(); missing_data.add_range(&Range::new(0, self.shared.file_size)); @@ -743,12 +775,9 @@ impl AudioFileFetch { return; } } - } - fn poll_file_data_rx(&mut self) -> Poll<(), ()> { - loop { match self.file_data_rx.poll() { Ok(Async::Ready(None)) => { @@ -768,7 +797,10 @@ impl AudioFileFetch { // stats::median is experimental. So we calculate the median of up to three ourselves. let ping_time_ms: usize = match self.network_response_times_ms.len() { 1 => self.network_response_times_ms[0] as usize, - 2 => ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) as usize, + 2 => { + ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) + as usize + } 3 => { let mut times = self.network_response_times_ms.clone(); times.sort(); @@ -778,20 +810,21 @@ impl AudioFileFetch { }; // store our new estimate for everyone to see - self.shared.ping_time_ms.store(ping_time_ms, atomic::Ordering::Relaxed); - - }, + self.shared + .ping_time_ms + .store(ping_time_ms, atomic::Ordering::Relaxed); + } Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { - - self.output .as_mut() .unwrap() .seek(SeekFrom::Start(data.offset as u64)) .unwrap(); - self.output.as_mut().unwrap().write_all(data.data.as_ref()).unwrap(); - - + self.output + .as_mut() + .unwrap() + .write_all(data.data.as_ref()) + .unwrap(); let mut full = false; @@ -802,11 +835,17 @@ impl AudioFileFetch { download_status.downloaded.add_range(&received_range); self.shared.cond.notify_all(); - if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size { + if download_status.downloaded.contained_length_from_value(0) + >= self.shared.file_size + { full = true; } - trace!("Downloaded: {} Requested: {}", download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); + trace!( + "Downloaded: {} Requested: {}", + download_status.downloaded, + download_status.requested.minus(&download_status.downloaded) + ); drop(download_status); } @@ -815,22 +854,16 @@ impl AudioFileFetch { self.finish(); return Ok(Async::Ready(())); } - - } Ok(Async::NotReady) => { return Ok(Async::NotReady); - }, + } Err(()) => unreachable!(), } - } - } - fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> { - loop { match self.stream_loader_command_rx.poll() { Ok(Async::Ready(None)) => { @@ -848,13 +881,10 @@ impl AudioFileFetch { Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { return Ok(Async::Ready(())); } - Ok(Async::NotReady) => { - return Ok(Async::NotReady) - }, + Ok(Async::NotReady) => return Ok(Async::NotReady), Err(()) => unreachable!(), } } - } fn finish(&mut self) { @@ -865,7 +895,6 @@ impl AudioFileFetch { output.seek(SeekFrom::Start(0)).unwrap(); let _ = complete_tx.send(output); } - } impl Future for AudioFileFetch { @@ -873,7 +902,6 @@ impl Future for AudioFileFetch { type Error = (); fn poll(&mut self) -> Poll<(), ()> { - match self.poll_stream_loader_command_rx() { Ok(Async::NotReady) => (), Ok(Async::Ready(_)) => { @@ -896,22 +924,29 @@ impl Future for AudioFileFetch { download_status.requested.minus(&download_status.downloaded).len() }; - let ping_time_seconds = 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let ping_time_seconds = + 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; let download_rate = self.session.channel().get_download_rate_estimate(); let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize + (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) + as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize, ); if bytes_pending < desired_pending_bytes { - trace!("Prefetching more data. pending: {}, desired: {}, ping: {}, rate: {}", bytes_pending, desired_pending_bytes, ping_time_seconds, download_rate); + trace!( + "Prefetching more data. pending: {}, desired: {}, ping: {}, rate: {}", + bytes_pending, + desired_pending_bytes, + ping_time_seconds, + download_rate + ); self.pre_fetch_more_data(desired_pending_bytes - bytes_pending); } } - - return Ok(Async::NotReady) + return Ok(Async::NotReady); } } @@ -925,23 +960,25 @@ impl Read for AudioFileStreaming { let length = min(output.len(), self.shared.file_size - offset); - let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) { - DownloadStrategy::RandomAccess() => { length } + DownloadStrategy::RandomAccess() => length, DownloadStrategy::Streaming() => { // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded. - let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let ping_time_seconds = + 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - let length_to_request = length + max( - (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) as usize, - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time_seconds * self.shared.stream_data_rate as f64) as usize - ); + let length_to_request = length + + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) + as usize, + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * ping_time_seconds + * self.shared.stream_data_rate as f64) as usize, + ); min(length_to_request, self.shared.file_size - offset) } }; - - let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length_to_request)); @@ -951,27 +988,35 @@ impl Read for AudioFileStreaming { ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); - for range in ranges_to_request.iter() { - trace!("requesting data at position {} (length : {})", range.start, range.length); - self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap(); + trace!( + "requesting data at position {} (length : {})", + range.start, + range.length + ); + self.stream_loader_command_tx + .unbounded_send(StreamLoaderCommand::Fetch(range.clone())) + .unwrap(); } - if length == 0 { return Ok(0); } while !download_status.downloaded.contains(offset) { trace!("waiting for download"); - download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; + download_status = self + .shared + .cond + .wait_timeout(download_status, Duration::from_millis(1000)) + .unwrap() + .0; trace!("re-checking data availability at offset {}.", offset); } let available_length = download_status.downloaded.contained_length_from_value(offset); assert!(available_length > 0); drop(download_status); - self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap(); let read_len = min(length, available_length); let read_len = try!(self.read_file.read(&mut output[..read_len])); @@ -979,8 +1024,9 @@ impl Read for AudioFileStreaming { trace!("read successfully at postion {} (length : {})", offset, read_len); self.position += read_len as u64; - self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); - + self.shared + .read_position + .store(self.position as usize, atomic::Ordering::Relaxed); return Ok(read_len); } @@ -990,7 +1036,9 @@ impl Seek for AudioFileStreaming { fn seek(&mut self, pos: SeekFrom) -> io::Result { self.position = try!(self.read_file.seek(pos)); // Do not seek past EOF - self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed); + self.shared + .read_position + .store(self.position as usize, atomic::Ordering::Relaxed); Ok(self.position) } } diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 845ba5f9..9a82f90e 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -3,13 +3,13 @@ extern crate futures; #[macro_use] extern crate log; +extern crate aes_ctr; extern crate bit_set; extern crate byteorder; extern crate bytes; extern crate num_bigint; extern crate num_traits; extern crate tempfile; -extern crate aes_ctr; extern crate librespot_core; @@ -25,7 +25,10 @@ mod range_set; pub use decrypt::AudioDecrypt; pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController}; -pub use fetch::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS}; +pub use fetch::{ + READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, + READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, +}; #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs index 835477be..448c0971 100644 --- a/audio/src/range_set.rs +++ b/audio/src/range_set.rs @@ -1,9 +1,6 @@ - -use std::cmp::{max,min}; -use std::slice::Iter; +use std::cmp::{max, min}; use std::fmt; - - +use std::slice::Iter; #[derive(Copy, Clone)] pub struct Range { @@ -13,27 +10,23 @@ pub struct Range { impl fmt::Display for Range { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - return write!(f, "[{}, {}]", self.start, self.start+self.length-1); + return write!(f, "[{}, {}]", self.start, self.start + self.length - 1); } } - impl Range { - pub fn new(start: usize, length: usize) -> Range { return Range { start: start, length: length, - } + }; } pub fn end(&self) -> usize { return self.start + self.length; } - } - #[derive(Clone)] pub struct RangeSet { ranges: Vec, @@ -49,11 +42,9 @@ impl fmt::Display for RangeSet { } } - - impl RangeSet { pub fn new() -> RangeSet { - RangeSet{ + RangeSet { ranges: Vec::::new(), } } @@ -98,7 +89,6 @@ impl RangeSet { } } return 0; - } #[allow(dead_code)] @@ -111,23 +101,20 @@ impl RangeSet { return true; } - - pub fn add_range(&mut self, range:&Range) { - + pub fn add_range(&mut self, range: &Range) { if range.length <= 0 { // the interval is empty or invalid -> nothing to do. return; } - for index in 0..self.ranges.len() { // the new range is clear of any ranges we already iterated over. - if range.end() < self.ranges[index].start{ + if range.end() < self.ranges[index].start { // the new range starts after anything we already passed and ends before the next range starts (they don't touch) -> insert it. self.ranges.insert(index, range.clone()); return; - - } else if range.start <= self.ranges[index].end() && self.ranges[index].start <= range.end() { + } else if range.start <= self.ranges[index].end() && self.ranges[index].start <= range.end() + { // the new range overlaps (or touches) the first range. They are to be merged. // In addition we might have to merge further ranges in as well. @@ -142,7 +129,6 @@ impl RangeSet { self.ranges.insert(index, new_range); return; - } } @@ -165,7 +151,6 @@ impl RangeSet { } pub fn subtract_range(&mut self, range: &Range) { - if range.length <= 0 { return; } @@ -175,8 +160,7 @@ impl RangeSet { if range.end() <= self.ranges[index].start { // the remaining ranges are past the one to subtract. -> we're done. - return - + return; } else if range.start <= self.ranges[index].start && self.ranges[index].start < range.end() { // the range to subtract started before the current range and reaches into the current range // -> we have to remove the beginning of the range or the entire range and do the same for following ranges. @@ -191,7 +175,6 @@ impl RangeSet { } return; - } else if range.end() < self.ranges[index].end() { // the range to subtract punches a hole into the current range -> we need to create two smaller ranges. @@ -206,11 +189,9 @@ impl RangeSet { self.ranges.insert(index, first_range); return; - } else if range.start < self.ranges[index].end() { // the range truncates the existing range -> truncate the range. Let the for loop take care of overlaps with other ranges. self.ranges[index].length = range.start - self.ranges[index].start; - } } } @@ -245,19 +226,15 @@ impl RangeSet { let new_start = max(self.ranges[self_index].start, other.ranges[other_index].start); let new_end = min(self.ranges[self_index].end(), other.ranges[other_index].end()); assert!(new_start <= new_end); - result.add_range(&Range::new(new_start, new_end-new_start)); + result.add_range(&Range::new(new_start, new_end - new_start)); if self.ranges[self_index].end() <= other.ranges[other_index].end() { self_index += 1; } else { other_index += 1; } - } - } return result; } - } - diff --git a/core/src/channel.rs b/core/src/channel.rs index 276f2bfc..a4785eb8 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use futures::sync::{mpsc, BiLock}; use futures::{Async, Poll, Stream}; use std::collections::HashMap; -use std::time::{Instant}; +use std::time::Instant; use util::SeqGenerator; @@ -64,11 +64,11 @@ impl ChannelManager { let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref()); self.lock(|inner| { - let current_time = Instant::now(); if let Some(download_measurement_start) = inner.download_measurement_start { if (current_time - download_measurement_start).as_millis() > 1000 { - inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (current_time - download_measurement_start).as_millis() as usize; + inner.download_rate_estimate = 1000 * inner.download_measurement_bytes + / (current_time - download_measurement_start).as_millis() as usize; inner.download_measurement_start = Some(current_time); inner.download_measurement_bytes = 0; } @@ -85,12 +85,8 @@ impl ChannelManager { } pub fn get_download_rate_estimate(&self) -> usize { - return self.lock(|inner| { - inner.download_rate_estimate - }); - + return self.lock(|inner| inner.download_rate_estimate); } - } impl Channel { diff --git a/playback/src/player.rs b/playback/src/player.rs index bdccea38..a54a577f 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -4,20 +4,23 @@ use futures::sync::oneshot; use futures::{future, Future}; use std; use std::borrow::Cow; +use std::cmp::max; use std::io::{Read, Result, Seek, SeekFrom}; use std::mem; use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; use std::thread; use std::time::Duration; -use std::cmp::max; use config::{Bitrate, PlayerConfig}; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; use audio::{AudioDecrypt, AudioFile, StreamLoaderController}; -use audio::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS}; use audio::{VorbisDecoder, VorbisPacket}; +use audio::{ + READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, + READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, +}; use audio_backend::Sink; use metadata::{AudioItem, FileFormat}; use mixer::AudioFilter; @@ -244,7 +247,14 @@ impl PlayerState { use self::PlayerState::*; match *self { Stopped | EndOfTrack { .. } => None, - Paused { ref mut stream_loader_controller, .. } | Playing { ref mut stream_loader_controller, .. } => Some(stream_loader_controller), + Paused { + ref mut stream_loader_controller, + .. + } + | Playing { + ref mut stream_loader_controller, + .. + } => Some(stream_loader_controller), Invalid => panic!("invalid state"), } } @@ -273,7 +283,7 @@ impl PlayerState { end_of_track, normalisation_factor, stream_loader_controller, - bytes_per_second + bytes_per_second, } => { *self = Playing { track_id: track_id, @@ -426,7 +436,12 @@ impl PlayerInternal { } match self.load_track(track_id, position as i64) { - Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) => { + Some(( + decoder, + normalisation_factor, + stream_loader_controller, + bytes_per_second, + )) => { if play { match self.state { PlayerState::Playing { @@ -503,25 +518,27 @@ impl PlayerInternal { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { stream_loader_controller.set_stream_mode(); } - if let PlayerState::Playing{bytes_per_second, ..} = self.state { + if let PlayerState::Playing { bytes_per_second, .. } = self.state { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - // Request our read ahead range let request_data_length = max( - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize, - (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, ); stream_loader_controller.fetch_next(request_data_length); // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. let wait_for_data_length = max( - (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize, - (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize + (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, ); stream_loader_controller.fetch_next_blocking(wait_for_data_length); } } - } PlayerCommand::Play => { @@ -584,9 +601,9 @@ impl PlayerInternal { fn stream_data_rate(&self, format: FileFormat) -> usize { match format { - FileFormat::OGG_VORBIS_96 => 12 * 1024, + FileFormat::OGG_VORBIS_96 => 12 * 1024, FileFormat::OGG_VORBIS_160 => 20 * 1024, - FileFormat::OGG_VORBIS_320=> 40 * 1024, + FileFormat::OGG_VORBIS_320 => 40 * 1024, FileFormat::MP3_256 => 32 * 1024, FileFormat::MP3_320 => 40 * 1024, FileFormat::MP3_160 => 20 * 1024, @@ -601,7 +618,11 @@ impl PlayerInternal { } } - fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController, usize)> { + fn load_track( + &self, + spotify_id: SpotifyId, + position: i64, + ) -> Option<(Decoder, f32, StreamLoaderController, usize)> { let audio = AudioItem::get_audio_item(&self.session, spotify_id) .wait() .unwrap(); @@ -646,10 +667,11 @@ impl PlayerInternal { }; let bytes_per_second = self.stream_data_rate(*format); - let play_from_beginning = position==0; + let play_from_beginning = position == 0; let key = self.session.audio_key().request(spotify_id, file_id); - let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning); + let encrypted_file = + AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning); let encrypted_file = encrypted_file.wait().unwrap(); @@ -663,7 +685,6 @@ impl PlayerInternal { stream_loader_controller.set_random_access_mode(); } - let key = key.wait().unwrap(); let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); @@ -687,7 +708,12 @@ impl PlayerInternal { stream_loader_controller.set_stream_mode(); } info!("<{}> loaded", audio.name); - Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) + Some(( + decoder, + normalisation_factor, + stream_loader_controller, + bytes_per_second, + )) } }