diff --git a/.travis.yml b/.travis.yml index 0c150760..182adfdb 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: rust rust: - - 1.32.0 + - 1.33.0 - stable - beta - nightly diff --git a/Cargo.lock b/Cargo.lock index be3d3a49..1e7cb3f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,6 +811,7 @@ dependencies = [ "aes-ctr 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "bit-set 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", "lewton 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)", "librespot-core 0.1.0", diff --git a/audio/Cargo.toml b/audio/Cargo.toml index 2d67437d..6c485e25 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -12,6 +12,7 @@ version = "0.1.0" [dependencies] bit-set = "0.5" byteorder = "1.3" +bytes = "0.4" futures = "0.1" lewton = "0.9" log = "0.4" diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 51573f6c..69e34d28 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -1,19 +1,81 @@ -use bit_set::BitSet; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::Stream; use futures::{Async, Future, Poll}; -use std::cmp::min; +use range_set::{Range, RangeSet}; +use std::cmp::{max, min}; use std::fs; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; +use std::time::{Duration, Instant}; use tempfile::NamedTempFile; +use futures::sync::mpsc::unbounded; use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use librespot_core::session::Session; use librespot_core::spotify_id::FileId; +use std::sync::atomic; +use std::sync::atomic::AtomicUsize; -const CHUNK_SIZE: usize = 0x20000; +const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; +// The minimum size of a block that is requested from the Spotify servers in one request. +// This is the block size that is typically requested while doing a seek() on a file. +// Note: smaller requests can happen if part of the block is downloaded already. + +const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; +// The amount of data that is requested when initially opening a file. +// Note: if the file is opened to play from the beginning, the amount of data to +// read ahead is requested in addition to this amount. If the file is opened to seek to +// another position, then only this amount is requested on the first request. + +const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5; +// The pig time that is used for calculations before a ping time was actually measured. + +const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5; +// If the measured ping time to the Spotify server is larger than this value, it is capped +// to avoid run-away block sizes and pre-fetching. + +pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0; +// Before playback starts, this many seconds of data must be present. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0; +// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping +// time to the Spotify server. +// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are +// obeyed. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 1.0; +// While playing back, this many seconds of data ahead of the current read position are +// requested. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 2.0; +// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping +// time to the Spotify server. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0; +// If the amount of data that is pending (requested but not received) is less than a certain amount, +// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more +// data is calculated as +// < PREFETCH_THRESHOLD_FACTOR * * + +const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5; +// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account. +// The formula used is +// < FAST_PREFETCH_THRESHOLD_FACTOR * * +// This mechanism allows for fast downloading of the remainder of the file. The number should be larger +// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster +// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is +// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively +// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted. pub enum AudioFile { Cached(fs::File), @@ -27,37 +89,205 @@ pub enum AudioFileOpen { pub struct AudioFileOpenStreaming { session: Session, - data_rx: Option, + initial_data_rx: Option, + initial_data_length: Option, + initial_request_sent_time: Instant, headers: ChannelHeaders, file_id: FileId, complete_tx: Option>, + streaming_data_rate: usize, +} + +enum StreamLoaderCommand { + Fetch(Range), // signal the stream loader to fetch a range of the file + RandomAccessMode(), // optimise download strategy for random access + StreamMode(), // optimise download strategy for streaming + Close(), // terminate and don't load any more data +} + +#[derive(Clone)] +pub struct StreamLoaderController { + channel_tx: Option>, + stream_shared: Option>, + file_size: usize, +} + +impl StreamLoaderController { + pub fn len(&self) -> usize { + return self.file_size; + } + + pub fn range_available(&self, range: Range) -> bool { + if let Some(ref shared) = self.stream_shared { + let download_status = shared.download_status.lock().unwrap(); + if range.length + <= download_status + .downloaded + .contained_length_from_value(range.start) + { + return true; + } else { + return false; + } + } else { + if range.length <= self.len() - range.start { + return true; + } else { + return false; + } + } + } + + pub fn ping_time_ms(&self) -> usize { + if let Some(ref shared) = self.stream_shared { + return shared.ping_time_ms.load(atomic::Ordering::Relaxed); + } else { + return 0; + } + } + + fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) { + if let Some(ref mut channel) = self.channel_tx { + // ignore the error in case the channel has been closed already. + let _ = channel.unbounded_send(command); + } + } + + pub fn fetch(&mut self, range: Range) { + // signal the stream loader to fetch a range of the file + self.send_stream_loader_command(StreamLoaderCommand::Fetch(range)); + } + + pub fn fetch_blocking(&mut self, mut range: Range) { + // signal the stream loader to tech a range of the file and block until it is loaded. + + // ensure the range is within the file's bounds. + if range.start >= self.len() { + range.length = 0; + } else if range.end() > self.len() { + range.length = self.len() - range.start; + } + + self.fetch(range); + + if let Some(ref shared) = self.stream_shared { + let mut download_status = shared.download_status.lock().unwrap(); + while range.length + > download_status + .downloaded + .contained_length_from_value(range.start) + { + download_status = shared + .cond + .wait_timeout(download_status, Duration::from_millis(1000)) + .unwrap() + .0; + if range.length + > (download_status + .downloaded + .union(&download_status.requested) + .contained_length_from_value(range.start)) + { + // For some reason, the requested range is neither downloaded nor requested. + // This could be due to a network error. Request it again. + // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly. + if let Some(ref mut channel) = self.channel_tx { + // ignore the error in case the channel has been closed already. + let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range)); + } + } + } + } + } + + pub fn fetch_next(&mut self, length: usize) { + let range: Range = if let Some(ref shared) = self.stream_shared { + Range { + start: shared.read_position.load(atomic::Ordering::Relaxed), + length: length, + } + } else { + return; + }; + self.fetch(range); + } + + pub fn fetch_next_blocking(&mut self, length: usize) { + let range: Range = if let Some(ref shared) = self.stream_shared { + Range { + start: shared.read_position.load(atomic::Ordering::Relaxed), + length: length, + } + } else { + return; + }; + self.fetch_blocking(range); + } + + pub fn set_random_access_mode(&mut self) { + // optimise download strategy for random access + self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode()); + } + + pub fn set_stream_mode(&mut self) { + // optimise download strategy for streaming + self.send_stream_loader_command(StreamLoaderCommand::StreamMode()); + } + + pub fn close(&mut self) { + // terminate stream loading and don't load any more data for this file. + self.send_stream_loader_command(StreamLoaderCommand::Close()); + } } pub struct AudioFileStreaming { read_file: fs::File, position: u64, - seek: mpsc::UnboundedSender, + + stream_loader_command_tx: mpsc::UnboundedSender, shared: Arc, } +struct AudioFileDownloadStatus { + requested: RangeSet, + downloaded: RangeSet, +} + +#[derive(Copy, Clone)] +enum DownloadStrategy { + RandomAccess(), + Streaming(), +} + struct AudioFileShared { file_id: FileId, - chunk_count: usize, + file_size: usize, + stream_data_rate: usize, cond: Condvar, - bitmap: Mutex, + download_status: Mutex, + download_strategy: Mutex, + number_of_open_requests: AtomicUsize, + ping_time_ms: AtomicUsize, + read_position: AtomicUsize, } impl AudioFileOpenStreaming { fn finish(&mut self, size: usize) -> AudioFileStreaming { - let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE; - let shared = Arc::new(AudioFileShared { file_id: self.file_id, - chunk_count: chunk_count, + file_size: size, + stream_data_rate: self.streaming_data_rate, cond: Condvar::new(), - bitmap: Mutex::new(BitSet::with_capacity(chunk_count)), + download_status: Mutex::new(AudioFileDownloadStatus { + requested: RangeSet::new(), + downloaded: RangeSet::new(), + }), + download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise + number_of_open_requests: AtomicUsize::new(0), + ping_time_ms: AtomicUsize::new(0), + read_position: AtomicUsize::new(0), }); let mut write_file = NamedTempFile::new().unwrap(); @@ -66,16 +296,21 @@ impl AudioFileOpenStreaming { let read_file = write_file.reopen().unwrap(); - let data_rx = self.data_rx.take().unwrap(); + let initial_data_rx = self.initial_data_rx.take().unwrap(); + let initial_data_length = self.initial_data_length.take().unwrap(); let complete_tx = self.complete_tx.take().unwrap(); - let (seek_tx, seek_rx) = mpsc::unbounded(); + //let (seek_tx, seek_rx) = mpsc::unbounded(); + let (stream_loader_command_tx, stream_loader_command_rx) = + mpsc::unbounded::(); let fetcher = AudioFileFetch::new( self.session.clone(), shared.clone(), - data_rx, + initial_data_rx, + self.initial_request_sent_time, + initial_data_length, write_file, - seek_rx, + stream_loader_command_rx, complete_tx, ); self.session.spawn(move |_| fetcher); @@ -84,7 +319,8 @@ impl AudioFileOpenStreaming { read_file: read_file, position: 0, - seek: seek_tx, + //seek: seek_tx, + stream_loader_command_tx: stream_loader_command_tx, shared: shared, } @@ -128,7 +364,12 @@ impl Future for AudioFileOpenStreaming { } impl AudioFile { - pub fn open(session: &Session, file_id: FileId) -> AudioFileOpen { + pub fn open( + session: &Session, + file_id: FileId, + bytes_per_second: usize, + play_from_beginning: bool, + ) -> AudioFileOpen { let cache = session.cache().cloned(); if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) { @@ -139,16 +380,33 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); - let (headers, data) = request_chunk(session, file_id, 0).split(); + let mut initial_data_length = if play_from_beginning { + INITIAL_DOWNLOAD_SIZE + + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + (INITIAL_PING_TIME_ESTIMATE_SECONDS + * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * bytes_per_second as f64) as usize, + ) + } else { + INITIAL_DOWNLOAD_SIZE + }; + if initial_data_length % 4 != 0 { + initial_data_length += 4 - (initial_data_length % 4); + } + let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); let open = AudioFileOpenStreaming { session: session.clone(), file_id: file_id, headers: headers, - data_rx: Some(data), + initial_data_rx: Some(data), + initial_data_length: Some(initial_data_length), + initial_request_sent_time: Instant::now(), complete_tx: Some(complete_tx), + streaming_data_rate: bytes_per_second, }; let session_ = session.clone(); @@ -165,15 +423,40 @@ impl AudioFile { .or_else(|oneshot::Canceled| Ok(())) }); - AudioFileOpen::Streaming(open) + return AudioFileOpen::Streaming(open); + } + + pub fn get_stream_loader_controller(&self) -> StreamLoaderController { + match self { + AudioFile::Streaming(ref stream) => { + return StreamLoaderController { + channel_tx: Some(stream.stream_loader_command_tx.clone()), + stream_shared: Some(stream.shared.clone()), + file_size: stream.shared.file_size, + }; + } + AudioFile::Cached(ref file) => { + return StreamLoaderController { + channel_tx: None, + stream_shared: None, + file_size: file.metadata().unwrap().len() as usize, + } + } + } } } -fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { - trace!("requesting chunk {}", index); - - let start = (index * CHUNK_SIZE / 4) as u32; - let end = ((index + 1) * CHUNK_SIZE / 4) as u32; +fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { + assert!( + offset % 4 == 0, + "Range request start positions must be aligned by 4 bytes." + ); + assert!( + length % 4 == 0, + "Range request range lengths must be aligned by 4 bytes." + ); + let start = offset / 4; + let end = (offset + length) / 4; let (id, channel) = session.channel().allocate(); @@ -186,71 +469,402 @@ fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { data.write_u32::(0x00009C40).unwrap(); data.write_u32::(0x00020000).unwrap(); data.write(&file.0).unwrap(); - data.write_u32::(start).unwrap(); - data.write_u32::(end).unwrap(); + data.write_u32::(start as u32).unwrap(); + data.write_u32::(end as u32).unwrap(); session.send_packet(0x8, data); channel } +struct PartialFileData { + offset: usize, + data: Bytes, +} + +enum ReceivedData { + ResponseTimeMs(usize), + Data(PartialFileData), +} + +struct AudioFileFetchDataReceiver { + shared: Arc, + file_data_tx: mpsc::UnboundedSender, + data_rx: ChannelData, + initial_data_offset: usize, + initial_request_length: usize, + data_offset: usize, + request_length: usize, + request_sent_time: Option, + measure_ping_time: bool, +} + +impl AudioFileFetchDataReceiver { + fn new( + shared: Arc, + file_data_tx: mpsc::UnboundedSender, + data_rx: ChannelData, + data_offset: usize, + request_length: usize, + request_sent_time: Instant, + ) -> AudioFileFetchDataReceiver { + let measure_ping_time = shared.number_of_open_requests.load(atomic::Ordering::SeqCst) == 0; + + shared + .number_of_open_requests + .fetch_add(1, atomic::Ordering::SeqCst); + + AudioFileFetchDataReceiver { + shared: shared, + data_rx: data_rx, + file_data_tx: file_data_tx, + initial_data_offset: data_offset, + initial_request_length: request_length, + data_offset: data_offset, + request_length: request_length, + request_sent_time: Some(request_sent_time), + measure_ping_time: measure_ping_time, + } + } +} + +impl AudioFileFetchDataReceiver { + fn finish(&mut self) { + if self.request_length > 0 { + let missing_range = Range::new(self.data_offset, self.request_length); + + let mut download_status = self.shared.download_status.lock().unwrap(); + download_status.requested.subtract_range(&missing_range); + self.shared.cond.notify_all(); + } + + self.shared + .number_of_open_requests + .fetch_sub(1, atomic::Ordering::SeqCst); + } +} + +impl Future for AudioFileFetchDataReceiver { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + match self.data_rx.poll() { + Ok(Async::Ready(Some(data))) => { + if self.measure_ping_time { + if let Some(request_sent_time) = self.request_sent_time { + let duration = Instant::now() - request_sent_time; + let duration_ms: u64; + if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS + { + duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; + } else { + duration_ms = duration.as_millis() as u64; + } + let _ = self + .file_data_tx + .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + self.measure_ping_time = false; + } + } + let data_size = data.len(); + let _ = self + .file_data_tx + .unbounded_send(ReceivedData::Data(PartialFileData { + offset: self.data_offset, + data: data, + })); + self.data_offset += data_size; + if self.request_length < data_size { + warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length); + self.request_length = 0; + } else { + self.request_length -= data_size; + } + if self.request_length == 0 { + self.finish(); + return Ok(Async::Ready(())); + } + } + Ok(Async::Ready(None)) => { + if self.request_length > 0 { + warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length); + } + self.finish(); + return Ok(Async::Ready(())); + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady); + } + Err(ChannelError) => { + warn!( + "Error from channel for data receiver for range {} (+{}).", + self.initial_data_offset, self.initial_request_length + ); + self.finish(); + return Ok(Async::Ready(())); + } + } + } + } +} + struct AudioFileFetch { session: Session, shared: Arc, output: Option, - index: usize, - data_rx: ChannelData, + file_data_tx: mpsc::UnboundedSender, + file_data_rx: mpsc::UnboundedReceiver, - seek_rx: mpsc::UnboundedReceiver, + stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: Option>, + network_response_times_ms: Vec, } impl AudioFileFetch { fn new( session: Session, shared: Arc, - data_rx: ChannelData, + initial_data_rx: ChannelData, + initial_request_sent_time: Instant, + initial_data_length: usize, + output: NamedTempFile, - seek_rx: mpsc::UnboundedReceiver, + stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: oneshot::Sender, ) -> AudioFileFetch { + let (file_data_tx, file_data_rx) = unbounded::(); + + { + let requested_range = Range::new(0, initial_data_length); + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.add_range(&requested_range); + } + + let initial_data_receiver = AudioFileFetchDataReceiver::new( + shared.clone(), + file_data_tx.clone(), + initial_data_rx, + 0, + initial_data_length, + initial_request_sent_time, + ); + + session.spawn(move |_| initial_data_receiver); + AudioFileFetch { session: session, shared: shared, output: Some(output), - index: 0, - data_rx: data_rx, + file_data_tx: file_data_tx, + file_data_rx: file_data_rx, - seek_rx: seek_rx, + stream_loader_command_rx: stream_loader_command_rx, complete_tx: Some(complete_tx), + network_response_times_ms: Vec::new(), } } - fn download(&mut self, mut new_index: usize) { - assert!(new_index < self.shared.chunk_count); + fn get_download_strategy(&mut self) -> DownloadStrategy { + *(self.shared.download_strategy.lock().unwrap()) + } - { - let bitmap = self.shared.bitmap.lock().unwrap(); - while bitmap.contains(new_index) { - new_index = (new_index + 1) % self.shared.chunk_count; - } + fn download_range(&mut self, mut offset: usize, mut length: usize) { + if length < MINIMUM_DOWNLOAD_SIZE { + length = MINIMUM_DOWNLOAD_SIZE; } - if self.index != new_index { - self.index = new_index; + // ensure the values are within the bounds and align them by 4 for the spotify protocol. + if offset >= self.shared.file_size { + return; + } - let offset = self.index * CHUNK_SIZE; + if length <= 0 { + return; + } - self.output - .as_mut() - .unwrap() - .seek(SeekFrom::Start(offset as u64)) - .unwrap(); + if offset + length > self.shared.file_size { + length = self.shared.file_size - offset; + } - let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split(); - self.data_rx = data; + if offset % 4 != 0 { + length += offset % 4; + offset -= offset % 4; + } + + if length % 4 != 0 { + length += 4 - (length % 4); + } + + let mut ranges_to_request = RangeSet::new(); + ranges_to_request.add_range(&Range::new(offset, length)); + + let mut download_status = self.shared.download_status.lock().unwrap(); + + ranges_to_request.subtract_range_set(&download_status.downloaded); + ranges_to_request.subtract_range_set(&download_status.requested); + + for range in ranges_to_request.iter() { + let (_headers, data) = + request_range(&self.session, self.shared.file_id, range.start, range.length).split(); + + download_status.requested.add_range(range); + + let receiver = AudioFileFetchDataReceiver::new( + self.shared.clone(), + self.file_data_tx.clone(), + data, + range.start, + range.length, + Instant::now(), + ); + + self.session.spawn(move |_| receiver); + } + } + + fn pre_fetch_more_data(&mut self, bytes: usize) { + let mut bytes_to_go = bytes; + + while bytes_to_go > 0 { + // determine what is still missing + let mut missing_data = RangeSet::new(); + missing_data.add_range(&Range::new(0, self.shared.file_size)); + { + let download_status = self.shared.download_status.lock().unwrap(); + missing_data.subtract_range_set(&download_status.downloaded); + missing_data.subtract_range_set(&download_status.requested); + } + + // download data from after the current read position first + let mut tail_end = RangeSet::new(); + let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); + tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position)); + let tail_end = tail_end.intersection(&missing_data); + + if !tail_end.is_empty() { + let range = tail_end.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + bytes_to_go -= length; + } else if !missing_data.is_empty() { + // ok, the tail is downloaded, download something fom the beginning. + let range = missing_data.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + bytes_to_go -= length; + } else { + return; + } + } + } + + fn poll_file_data_rx(&mut self) -> Poll<(), ()> { + loop { + match self.file_data_rx.poll() { + Ok(Async::Ready(None)) => { + return Ok(Async::Ready(())); + } + Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => { + trace!("Ping time estimated as: {} ms.", response_time_ms); + + // record the response time + self.network_response_times_ms.push(response_time_ms); + + // prune old response times. Keep at most three. + while self.network_response_times_ms.len() > 3 { + self.network_response_times_ms.remove(0); + } + + // stats::median is experimental. So we calculate the median of up to three ourselves. + let ping_time_ms: usize = match self.network_response_times_ms.len() { + 1 => self.network_response_times_ms[0] as usize, + 2 => { + ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) + as usize + } + 3 => { + let mut times = self.network_response_times_ms.clone(); + times.sort(); + times[1] + } + _ => unreachable!(), + }; + + // store our new estimate for everyone to see + self.shared + .ping_time_ms + .store(ping_time_ms, atomic::Ordering::Relaxed); + } + Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { + self.output + .as_mut() + .unwrap() + .seek(SeekFrom::Start(data.offset as u64)) + .unwrap(); + self.output + .as_mut() + .unwrap() + .write_all(data.data.as_ref()) + .unwrap(); + + let mut full = false; + + { + let mut download_status = self.shared.download_status.lock().unwrap(); + + let received_range = Range::new(data.offset, data.data.len()); + download_status.downloaded.add_range(&received_range); + self.shared.cond.notify_all(); + + if download_status.downloaded.contained_length_from_value(0) + >= self.shared.file_size + { + full = true; + } + + drop(download_status); + } + + if full { + self.finish(); + return Ok(Async::Ready(())); + } + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady); + } + Err(()) => unreachable!(), + } + } + } + + fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> { + loop { + match self.stream_loader_command_rx.poll() { + Ok(Async::Ready(None)) => { + return Ok(Async::Ready(())); + } + Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => { + self.download_range(request.start, request.length); + } + Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => { + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); + } + Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => { + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); + } + Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { + return Ok(Async::Ready(())); + } + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(()) => unreachable!(), + } } } @@ -268,80 +882,126 @@ impl Future for AudioFileFetch { type Error = (); fn poll(&mut self) -> Poll<(), ()> { - loop { - let mut progress = false; - - match self.seek_rx.poll() { - Ok(Async::Ready(None)) => { - return Ok(Async::Ready(())); - } - Ok(Async::Ready(Some(offset))) => { - progress = true; - let index = offset as usize / CHUNK_SIZE; - self.download(index); - } - Ok(Async::NotReady) => (), - Err(()) => unreachable!(), + match self.poll_stream_loader_command_rx() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => { + return Ok(Async::Ready(())); } + Err(()) => unreachable!(), + } - match self.data_rx.poll() { - Ok(Async::Ready(Some(data))) => { - progress = true; - - self.output.as_mut().unwrap().write_all(data.as_ref()).unwrap(); - } - Ok(Async::Ready(None)) => { - progress = true; - - trace!("chunk {} / {} complete", self.index, self.shared.chunk_count); - - let full = { - let mut bitmap = self.shared.bitmap.lock().unwrap(); - bitmap.insert(self.index as usize); - self.shared.cond.notify_all(); - - bitmap.len() >= self.shared.chunk_count - }; - - if full { - self.finish(); - return Ok(Async::Ready(())); - } - - let new_index = (self.index + 1) % self.shared.chunk_count; - self.download(new_index); - } - Ok(Async::NotReady) => (), - Err(ChannelError) => { - warn!("error from channel"); - return Ok(Async::Ready(())); - } + match self.poll_file_data_rx() { + Ok(Async::NotReady) => (), + Ok(Async::Ready(_)) => { + return Ok(Async::Ready(())); } + Err(()) => unreachable!(), + } - if !progress { - return Ok(Async::NotReady); + if let DownloadStrategy::Streaming() = self.get_download_strategy() { + let bytes_pending: usize = { + let download_status = self.shared.download_status.lock().unwrap(); + download_status.requested.minus(&download_status.downloaded).len() + }; + + let ping_time_seconds = + 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let download_rate = self.session.channel().get_download_rate_estimate(); + + let desired_pending_bytes = max( + (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) + as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize, + ); + + if bytes_pending < desired_pending_bytes { + self.pre_fetch_more_data(desired_pending_bytes - bytes_pending); } } + + return Ok(Async::NotReady); } } impl Read for AudioFileStreaming { fn read(&mut self, output: &mut [u8]) -> io::Result { - let index = self.position as usize / CHUNK_SIZE; - let offset = self.position as usize % CHUNK_SIZE; - let len = min(output.len(), CHUNK_SIZE - offset); + let offset = self.position as usize; - let mut bitmap = self.shared.bitmap.lock().unwrap(); - while !bitmap.contains(index) { - bitmap = self.shared.cond.wait(bitmap).unwrap(); + if offset >= self.shared.file_size { + return Ok(0); } - drop(bitmap); - let read_len = try!(self.read_file.read(&mut output[..len])); + let length = min(output.len(), self.shared.file_size - offset); + + let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) { + DownloadStrategy::RandomAccess() => length, + DownloadStrategy::Streaming() => { + // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded. + let ping_time_seconds = + 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + + let length_to_request = length + + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) + as usize, + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * ping_time_seconds + * self.shared.stream_data_rate as f64) as usize, + ); + min(length_to_request, self.shared.file_size - offset) + } + }; + + let mut ranges_to_request = RangeSet::new(); + ranges_to_request.add_range(&Range::new(offset, length_to_request)); + + let mut download_status = self.shared.download_status.lock().unwrap(); + ranges_to_request.subtract_range_set(&download_status.downloaded); + ranges_to_request.subtract_range_set(&download_status.requested); + + for range in ranges_to_request.iter() { + self.stream_loader_command_tx + .unbounded_send(StreamLoaderCommand::Fetch(range.clone())) + .unwrap(); + } + + if length == 0 { + return Ok(0); + } + + let mut download_message_printed = false; + while !download_status.downloaded.contains(offset) { + if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() { + if !download_message_printed { + debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); + download_message_printed = true; + } + } + download_status = self + .shared + .cond + .wait_timeout(download_status, Duration::from_millis(1000)) + .unwrap() + .0; + } + let available_length = download_status.downloaded.contained_length_from_value(offset); + assert!(available_length > 0); + drop(download_status); + + self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap(); + let read_len = min(length, available_length); + let read_len = try!(self.read_file.read(&mut output[..read_len])); + + if download_message_printed { + debug!("Read at postion {} completed. {} bytes returned, {} bytes were requested.", offset, read_len, output.len()); + } self.position += read_len as u64; + self.shared + .read_position + .store(self.position as usize, atomic::Ordering::Relaxed); - Ok(read_len) + return Ok(read_len); } } @@ -349,15 +1009,9 @@ impl Seek for AudioFileStreaming { fn seek(&mut self, pos: SeekFrom) -> io::Result { self.position = try!(self.read_file.seek(pos)); // Do not seek past EOF - if (self.position as usize % CHUNK_SIZE) != 0 { - // Notify the fetch thread to get the correct block - // This can fail if fetch thread has completed, in which case the - // block is ready. Just ignore the error. - let _ = self.seek.unbounded_send(self.position); - } else { - warn!("Trying to seek past EOF"); - } - + self.shared + .read_position + .store(self.position as usize, atomic::Ordering::Relaxed); Ok(self.position) } } diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 4cb2b937..9a82f90e 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -3,12 +3,13 @@ extern crate futures; #[macro_use] extern crate log; +extern crate aes_ctr; extern crate bit_set; extern crate byteorder; +extern crate bytes; extern crate num_bigint; extern crate num_traits; extern crate tempfile; -extern crate aes_ctr; extern crate librespot_core; @@ -20,8 +21,14 @@ mod lewton_decoder; #[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] mod libvorbis_decoder; +mod range_set; + pub use decrypt::AudioDecrypt; -pub use fetch::{AudioFile, AudioFileOpen}; +pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController}; +pub use fetch::{ + READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, + READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, +}; #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs new file mode 100644 index 00000000..448c0971 --- /dev/null +++ b/audio/src/range_set.rs @@ -0,0 +1,240 @@ +use std::cmp::{max, min}; +use std::fmt; +use std::slice::Iter; + +#[derive(Copy, Clone)] +pub struct Range { + pub start: usize, + pub length: usize, +} + +impl fmt::Display for Range { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + return write!(f, "[{}, {}]", self.start, self.start + self.length - 1); + } +} + +impl Range { + pub fn new(start: usize, length: usize) -> Range { + return Range { + start: start, + length: length, + }; + } + + pub fn end(&self) -> usize { + return self.start + self.length; + } +} + +#[derive(Clone)] +pub struct RangeSet { + ranges: Vec, +} + +impl fmt::Display for RangeSet { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "(").unwrap(); + for range in self.ranges.iter() { + write!(f, "{}", range).unwrap(); + } + write!(f, ")") + } +} + +impl RangeSet { + pub fn new() -> RangeSet { + RangeSet { + ranges: Vec::::new(), + } + } + + pub fn is_empty(&self) -> bool { + return self.ranges.is_empty(); + } + + pub fn len(&self) -> usize { + let mut result = 0; + for range in self.ranges.iter() { + result += range.length; + } + return result; + } + + pub fn get_range(&self, index: usize) -> Range { + return self.ranges[index].clone(); + } + + pub fn iter(&self) -> Iter { + return self.ranges.iter(); + } + + pub fn contains(&self, value: usize) -> bool { + for range in self.ranges.iter() { + if value < range.start { + return false; + } else if range.start <= value && value < range.end() { + return true; + } + } + return false; + } + + pub fn contained_length_from_value(&self, value: usize) -> usize { + for range in self.ranges.iter() { + if value < range.start { + return 0; + } else if range.start <= value && value < range.end() { + return range.end() - value; + } + } + return 0; + } + + #[allow(dead_code)] + pub fn contains_range_set(&self, other: &RangeSet) -> bool { + for range in other.ranges.iter() { + if self.contained_length_from_value(range.start) < range.length { + return false; + } + } + return true; + } + + pub fn add_range(&mut self, range: &Range) { + if range.length <= 0 { + // the interval is empty or invalid -> nothing to do. + return; + } + + for index in 0..self.ranges.len() { + // the new range is clear of any ranges we already iterated over. + if range.end() < self.ranges[index].start { + // the new range starts after anything we already passed and ends before the next range starts (they don't touch) -> insert it. + self.ranges.insert(index, range.clone()); + return; + } else if range.start <= self.ranges[index].end() && self.ranges[index].start <= range.end() + { + // the new range overlaps (or touches) the first range. They are to be merged. + // In addition we might have to merge further ranges in as well. + + let mut new_range = range.clone(); + + while index < self.ranges.len() && self.ranges[index].start <= new_range.end() { + let new_end = max(new_range.end(), self.ranges[index].end()); + new_range.start = min(new_range.start, self.ranges[index].start); + new_range.length = new_end - new_range.start; + self.ranges.remove(index); + } + + self.ranges.insert(index, new_range); + return; + } + } + + // the new range is after everything else -> just add it + self.ranges.push(range.clone()); + } + + #[allow(dead_code)] + pub fn add_range_set(&mut self, other: &RangeSet) { + for range in other.ranges.iter() { + self.add_range(range); + } + } + + #[allow(dead_code)] + pub fn union(&self, other: &RangeSet) -> RangeSet { + let mut result = self.clone(); + result.add_range_set(other); + return result; + } + + pub fn subtract_range(&mut self, range: &Range) { + if range.length <= 0 { + return; + } + + for index in 0..self.ranges.len() { + // the ranges we already passed don't overlap with the range to remove + + if range.end() <= self.ranges[index].start { + // the remaining ranges are past the one to subtract. -> we're done. + return; + } else if range.start <= self.ranges[index].start && self.ranges[index].start < range.end() { + // the range to subtract started before the current range and reaches into the current range + // -> we have to remove the beginning of the range or the entire range and do the same for following ranges. + + while index < self.ranges.len() && self.ranges[index].end() <= range.end() { + self.ranges.remove(index); + } + + if index < self.ranges.len() && self.ranges[index].start < range.end() { + self.ranges[index].length -= range.end() - self.ranges[index].start; + self.ranges[index].start = range.end(); + } + + return; + } else if range.end() < self.ranges[index].end() { + // the range to subtract punches a hole into the current range -> we need to create two smaller ranges. + + let first_range = Range { + start: self.ranges[index].start, + length: range.start - self.ranges[index].start, + }; + + self.ranges[index].length -= range.end() - self.ranges[index].start; + self.ranges[index].start = range.end(); + + self.ranges.insert(index, first_range); + + return; + } else if range.start < self.ranges[index].end() { + // the range truncates the existing range -> truncate the range. Let the for loop take care of overlaps with other ranges. + self.ranges[index].length = range.start - self.ranges[index].start; + } + } + } + + pub fn subtract_range_set(&mut self, other: &RangeSet) { + for range in other.ranges.iter() { + self.subtract_range(range); + } + } + + pub fn minus(&self, other: &RangeSet) -> RangeSet { + let mut result = self.clone(); + result.subtract_range_set(other); + return result; + } + + pub fn intersection(&self, other: &RangeSet) -> RangeSet { + let mut result = RangeSet::new(); + + let mut self_index: usize = 0; + let mut other_index: usize = 0; + + while self_index < self.ranges.len() && other_index < other.ranges.len() { + if self.ranges[self_index].end() <= other.ranges[other_index].start { + // skip the interval + self_index += 1; + } else if other.ranges[other_index].end() <= self.ranges[self_index].start { + // skip the interval + other_index += 1; + } else { + // the two intervals overlap. Add the union and advance the index of the one that ends first. + let new_start = max(self.ranges[self_index].start, other.ranges[other_index].start); + let new_end = min(self.ranges[self_index].end(), other.ranges[other_index].end()); + assert!(new_start <= new_end); + result.add_range(&Range::new(new_start, new_end - new_start)); + if self.ranges[self_index].end() <= other.ranges[other_index].end() { + self_index += 1; + } else { + other_index += 1; + } + } + } + + return result; + } +} diff --git a/core/src/channel.rs b/core/src/channel.rs index 57655feb..a4785eb8 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -3,6 +3,7 @@ use bytes::Bytes; use futures::sync::{mpsc, BiLock}; use futures::{Async, Poll, Stream}; use std::collections::HashMap; +use std::time::Instant; use util::SeqGenerator; @@ -10,6 +11,9 @@ component! { ChannelManager : ChannelManagerInner { sequence: SeqGenerator = SeqGenerator::new(0), channels: HashMap> = HashMap::new(), + download_rate_estimate: usize = 0, + download_measurement_start: Option = None, + download_measurement_bytes: usize = 0, } } @@ -60,11 +64,29 @@ impl ChannelManager { let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref()); self.lock(|inner| { + let current_time = Instant::now(); + if let Some(download_measurement_start) = inner.download_measurement_start { + if (current_time - download_measurement_start).as_millis() > 1000 { + inner.download_rate_estimate = 1000 * inner.download_measurement_bytes + / (current_time - download_measurement_start).as_millis() as usize; + inner.download_measurement_start = Some(current_time); + inner.download_measurement_bytes = 0; + } + } else { + inner.download_measurement_start = Some(current_time); + } + + inner.download_measurement_bytes += data.len(); + if let Entry::Occupied(entry) = inner.channels.entry(id) { let _ = entry.get().unbounded_send((cmd, data)); } }); } + + pub fn get_download_rate_estimate(&self) -> usize { + return self.lock(|inner| inner.download_rate_estimate); + } } impl Channel { diff --git a/playback/src/player.rs b/playback/src/player.rs index 95eb549d..a54a577f 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -4,6 +4,7 @@ use futures::sync::oneshot; use futures::{future, Future}; use std; use std::borrow::Cow; +use std::cmp::max; use std::io::{Read, Result, Seek, SeekFrom}; use std::mem; use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; @@ -14,8 +15,12 @@ use config::{Bitrate, PlayerConfig}; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; -use audio::{AudioDecrypt, AudioFile}; +use audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use audio::{VorbisDecoder, VorbisPacket}; +use audio::{ + READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, + READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, +}; use audio_backend::Sink; use metadata::{AudioItem, FileFormat}; use mixer::AudioFilter; @@ -202,12 +207,16 @@ enum PlayerState { decoder: Decoder, end_of_track: oneshot::Sender<()>, normalisation_factor: f32, + stream_loader_controller: StreamLoaderController, + bytes_per_second: usize, }, Playing { track_id: SpotifyId, decoder: Decoder, end_of_track: oneshot::Sender<()>, normalisation_factor: f32, + stream_loader_controller: StreamLoaderController, + bytes_per_second: usize, }, EndOfTrack { track_id: SpotifyId, @@ -234,6 +243,22 @@ impl PlayerState { } } + fn stream_loader_controller(&mut self) -> Option<&mut StreamLoaderController> { + use self::PlayerState::*; + match *self { + Stopped | EndOfTrack { .. } => None, + Paused { + ref mut stream_loader_controller, + .. + } + | Playing { + ref mut stream_loader_controller, + .. + } => Some(stream_loader_controller), + Invalid => panic!("invalid state"), + } + } + fn playing_to_end_of_track(&mut self) { use self::PlayerState::*; match mem::replace(self, Invalid) { @@ -257,12 +282,16 @@ impl PlayerState { decoder, end_of_track, normalisation_factor, + stream_loader_controller, + bytes_per_second, } => { *self = Playing { track_id: track_id, decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; } _ => panic!("invalid state"), @@ -277,12 +306,16 @@ impl PlayerState { decoder, end_of_track, normalisation_factor, + stream_loader_controller, + bytes_per_second, } => { *self = Paused { track_id: track_id, decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; } _ => panic!("invalid state"), @@ -403,7 +436,12 @@ impl PlayerInternal { } match self.load_track(track_id, position as i64) { - Some((decoder, normalisation_factor)) => { + Some(( + decoder, + normalisation_factor, + stream_loader_controller, + bytes_per_second, + )) => { if play { match self.state { PlayerState::Playing { @@ -427,6 +465,8 @@ impl PlayerInternal { decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; } else { self.state = PlayerState::Paused { @@ -434,6 +474,8 @@ impl PlayerInternal { decoder: decoder, end_of_track: end_of_track, normalisation_factor: normalisation_factor, + stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; match self.state { PlayerState::Playing { @@ -460,6 +502,9 @@ impl PlayerInternal { } PlayerCommand::Seek(position) => { + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_random_access_mode(); + } if let Some(decoder) = self.state.decoder() { match decoder.seek(position as i64) { Ok(_) => (), @@ -468,6 +513,32 @@ impl PlayerInternal { } else { warn!("Player::seek called from invalid state"); } + + // If we're playing, ensure, that we have enough data leaded to avoid a buffer underrun. + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + stream_loader_controller.set_stream_mode(); + } + if let PlayerState::Playing { bytes_per_second, .. } = self.state { + if let Some(stream_loader_controller) = self.state.stream_loader_controller() { + // Request our read ahead range + let request_data_length = max( + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + ); + stream_loader_controller.fetch_next(request_data_length); + + // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. + let wait_for_data_length = max( + (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS + * (0.001 * stream_loader_controller.ping_time_ms() as f64) + * bytes_per_second as f64) as usize, + (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + ); + stream_loader_controller.fetch_next_blocking(wait_for_data_length); + } + } } PlayerCommand::Play => { @@ -528,7 +599,30 @@ impl PlayerInternal { } } - fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32)> { + fn stream_data_rate(&self, format: FileFormat) -> usize { + match format { + FileFormat::OGG_VORBIS_96 => 12 * 1024, + FileFormat::OGG_VORBIS_160 => 20 * 1024, + FileFormat::OGG_VORBIS_320 => 40 * 1024, + FileFormat::MP3_256 => 32 * 1024, + FileFormat::MP3_320 => 40 * 1024, + FileFormat::MP3_160 => 20 * 1024, + FileFormat::MP3_96 => 12 * 1024, + FileFormat::MP3_160_ENC => 20 * 1024, + FileFormat::MP4_128_DUAL => 16 * 1024, + FileFormat::OTHER3 => 40 * 1024, // better some high guess than nothing + FileFormat::AAC_160 => 20 * 1024, + FileFormat::AAC_320 => 40 * 1024, + FileFormat::MP4_128 => 16 * 1024, + FileFormat::OTHER5 => 40 * 1024, // better some high guess than nothing + } + } + + fn load_track( + &self, + spotify_id: SpotifyId, + position: i64, + ) -> Option<(Decoder, f32, StreamLoaderController, usize)> { let audio = AudioItem::get_audio_item(&self.session, spotify_id) .wait() .unwrap(); @@ -572,10 +666,25 @@ impl PlayerInternal { } }; + let bytes_per_second = self.stream_data_rate(*format); + let play_from_beginning = position == 0; + let key = self.session.audio_key().request(spotify_id, file_id); - let encrypted_file = AudioFile::open(&self.session, file_id); + let encrypted_file = + AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning); let encrypted_file = encrypted_file.wait().unwrap(); + + let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); + + if play_from_beginning { + // No need to seek -> we stream from the beginning + stream_loader_controller.set_stream_mode(); + } else { + // we need to seek -> we set stream mode after the initial seek. + stream_loader_controller.set_random_access_mode(); + } + let key = key.wait().unwrap(); let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); @@ -596,9 +705,15 @@ impl PlayerInternal { Ok(_) => (), Err(err) => error!("Vorbis error: {:?}", err), } + stream_loader_controller.set_stream_mode(); } info!("<{}> loaded", audio.name); - Some((decoder, normalisation_factor)) + Some(( + decoder, + normalisation_factor, + stream_loader_controller, + bytes_per_second, + )) } }