diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index e3d63dfb..46d8a88a 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use futures::sync::{mpsc, oneshot}; use futures::Stream; use futures::{Async, Future, Poll}; -use std::cmp::min; +use std::cmp::{min, max}; use std::fs; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; @@ -18,9 +18,67 @@ use futures::sync::mpsc::unbounded; use std::sync::atomic; use std::sync::atomic::AtomicUsize; -const MINIMUM_CHUNK_SIZE: usize = 1024 * 16; -const MAXIMUM_CHUNK_SIZE: usize = 1024 * 128; -const MAXIMUM_ASSUMED_PING_TIME_SECONDS: u64 = 5; + +const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; +// The minimum size of a block that is requested from the Spotify servers in one request. +// This is the block size that is typically requested while doing a seek() on a file. +// Note: smaller requests can happen if part of the block is downloaded already. + +const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; // MUST be divisible by four!!! +// The amount of data that is requested when initially opening a file. +// Note: if the file is opened to play from the beginning, the amount of data to +// read ahead is requested in addition to this amount. If the file is opened to seek to +// another position, then only this amount is requested on the first request. + +const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5; +// The pig time that is used for calculations before a ping time was actually measured. + +const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5; +// If the measured ping time to the Spotify server is larger than this value, it is capped +// to avoid run-away block sizes and pre-fetching. + +pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0; +// Before playback starts, this many seconds of data must be present. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0; +// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping +// time to the Spotify server. +// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are +// obeyed. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 1.0; +// While playing back, this many seconds of data ahead of the current read position are +// requested. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 2.0; +// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping +// time to the Spotify server. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0; +// If the amount of data that is pending (requested but not received) is less than a certain amount, +// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more +// data is calculated as +// < PREFETCH_THRESHOLD_FACTOR * * + +const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5; +// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account. +// The formula used is +// < FAST_PREFETCH_THRESHOLD_FACTOR * * +// This mechanism allows for fast downloading of the remainder of the file. The number should be larger +// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster +// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is +// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively +// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted. + + pub enum AudioFile { Cached(fs::File), @@ -40,6 +98,7 @@ pub struct AudioFileOpenStreaming { headers: ChannelHeaders, file_id: FileId, complete_tx: Option>, + streaming_data_rate: usize, } @@ -47,7 +106,6 @@ enum StreamLoaderCommand{ Fetch(Range), // signal the stream loader to fetch a range of the file RandomAccessMode(), // optimise download strategy for random access StreamMode(), // optimise download strategy for streaming - StreamDataRate(usize), // when optimising for streaming, assume a streaming rate of this many bytes per second. Close(), // terminate and don't load any more data } @@ -57,7 +115,6 @@ pub struct StreamLoaderController { channel_tx: Option>, stream_shared: Option>, file_size: usize, - bytes_per_second: usize, } @@ -66,8 +123,6 @@ impl StreamLoaderController { return self.file_size; } - pub fn data_rate(&self) -> usize { return self.bytes_per_second; } - pub fn range_available(&self, range: Range) -> bool { if let Some(ref shared) = self.stream_shared { let download_status = shared.download_status.lock().unwrap(); @@ -124,7 +179,7 @@ impl StreamLoaderController { if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) { // For some reason, the requested range is neither downloaded nor requested. // This could be due to a network error. Request it again. - // We can't use self.fetch here because self can't borrowed mutably, so we access the channel directly. + // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly. if let Some(ref mut channel) = self.channel_tx { // ignore the error in case the channel has been closed already. let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range)); @@ -169,12 +224,6 @@ impl StreamLoaderController { self.send_stream_loader_command(StreamLoaderCommand::StreamMode()); } - pub fn set_stream_data_rate(&mut self, bytes_per_second: usize) { - // when optimising for streaming, assume a streaming rate of this many bytes per second. - self.bytes_per_second = bytes_per_second; - self.send_stream_loader_command(StreamLoaderCommand::StreamDataRate(bytes_per_second)); - } - pub fn close(&mut self) { // terminate stream loading and don't load any more data for this file. self.send_stream_loader_command(StreamLoaderCommand::Close()); @@ -200,11 +249,19 @@ struct AudioFileDownloadStatus { downloaded: RangeSet, } +#[derive(Copy, Clone)] +enum DownloadStrategy { + RandomAccess(), + Streaming(), +} + struct AudioFileShared { file_id: FileId, file_size: usize, + stream_data_rate: usize, cond: Condvar, download_status: Mutex, + download_strategy: Mutex, number_of_open_requests: AtomicUsize, ping_time_ms: AtomicUsize, read_position: AtomicUsize, @@ -216,8 +273,10 @@ impl AudioFileOpenStreaming { let shared = Arc::new(AudioFileShared { file_id: self.file_id, file_size: size, + stream_data_rate: self.streaming_data_rate, cond: Condvar::new(), download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}), + download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise number_of_open_requests: AtomicUsize::new(0), ping_time_ms: AtomicUsize::new(0), read_position: AtomicUsize::new(0), @@ -296,7 +355,7 @@ impl Future for AudioFileOpenStreaming { } impl AudioFile { - pub fn open(session: &Session, file_id: FileId) -> AudioFileOpen { + pub fn open(session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool) -> AudioFileOpen { let cache = session.cache().cloned(); if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) { @@ -307,7 +366,14 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); - let initial_data_length = MINIMUM_CHUNK_SIZE; + let mut initial_data_length = if play_from_beginning { + INITIAL_DOWNLOAD_SIZE + max((READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, (INITIAL_PING_TIME_ESTIMATE_SECONDS * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * bytes_per_second as f64) as usize) + } else { + INITIAL_DOWNLOAD_SIZE + }; + if initial_data_length % 4 != 0 { + initial_data_length += 4 - (initial_data_length % 4); + } let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); let open = AudioFileOpenStreaming { @@ -320,6 +386,8 @@ impl AudioFile { initial_request_sent_time: Instant::now(), complete_tx: Some(complete_tx), + streaming_data_rate: bytes_per_second, + }; let session_ = session.clone(); @@ -336,27 +404,23 @@ impl AudioFile { .or_else(|oneshot::Canceled| Ok(())) }); - AudioFileOpen::Streaming(open) + return AudioFileOpen::Streaming(open); } - pub fn get_stream_loader_controller(&self, bytes_per_second: usize) -> StreamLoaderController { + pub fn get_stream_loader_controller(&self) -> StreamLoaderController { match self { - AudioFile::Streaming(stream) => { - let mut result = StreamLoaderController { + AudioFile::Streaming(ref stream) => { + return StreamLoaderController { channel_tx: Some(stream.stream_loader_command_tx.clone()), stream_shared: Some(stream.shared.clone()), file_size: stream.shared.file_size, - bytes_per_second: bytes_per_second, }; - result.set_stream_data_rate(bytes_per_second); - return result; } AudioFile::Cached(ref file) => { return StreamLoaderController { channel_tx: None, stream_shared: None, file_size: file.metadata().unwrap().len() as usize, - bytes_per_second: bytes_per_second, } } } @@ -474,8 +538,8 @@ impl Future for AudioFileFetchDataReceiver { if let Some(request_sent_time) = self.request_sent_time { let duration = Instant::now() - request_sent_time; let duration_ms: u64; - if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS { - duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000; + if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; } else { duration_ms = duration.as_secs() * 1000 + duration.subsec_millis() as u64; } @@ -520,11 +584,6 @@ impl Future for AudioFileFetchDataReceiver { } -enum DownloadStrategy { - RandomAccess(), - Streaming(), -} - struct AudioFileFetch { session: Session, shared: Arc, @@ -533,11 +592,8 @@ struct AudioFileFetch { file_data_tx: mpsc::UnboundedSender, file_data_rx: mpsc::UnboundedReceiver, - //seek_rx: mpsc::UnboundedReceiver, stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: Option>, - download_strategy: DownloadStrategy, - streaming_data_rate: usize, network_response_times_ms: Vec, } @@ -584,16 +640,18 @@ impl AudioFileFetch { stream_loader_command_rx: stream_loader_command_rx, complete_tx: Some(complete_tx), - download_strategy: DownloadStrategy::RandomAccess(), // start with random access mode until someone tells us otherwise - streaming_data_rate: 40, // assume 360 kbit per second unless someone tells us otherwise. network_response_times_ms: Vec::new(), } } + fn get_download_strategy(&mut self) -> DownloadStrategy { + *(self.shared.download_strategy.lock().unwrap()) + } + fn download_range(&mut self, mut offset: usize, mut length: usize) { - if length < MINIMUM_CHUNK_SIZE { - length = MINIMUM_CHUNK_SIZE; + if length < MINIMUM_DOWNLOAD_SIZE { + length = MINIMUM_DOWNLOAD_SIZE; } // ensure the values are within the bounds and align them by 4 for the spotify protocol. @@ -647,35 +705,43 @@ impl AudioFileFetch { } - fn pre_fetch_more_data(&mut self) { + fn pre_fetch_more_data(&mut self, bytes: usize) { - // determine what is still missing - let mut missing_data = RangeSet::new(); - missing_data.add_range(&Range::new(0,self.shared.file_size)); - { - let download_status = self.shared.download_status.lock().unwrap(); - missing_data.subtract_range_set(&download_status.downloaded); - missing_data.subtract_range_set(&download_status.requested); - } + let mut bytes_to_go = bytes; - // download data from after the current read position first - let mut tail_end = RangeSet::new(); - let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); - tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position)); - let tail_end = tail_end.intersection(&missing_data); + while bytes_to_go > 0 { - if ! tail_end.is_empty() { - let range = tail_end.get_range(0); - let offset = range.start; - let length = min(range.length, MAXIMUM_CHUNK_SIZE); - self.download_range(offset, length); + // determine what is still missing + let mut missing_data = RangeSet::new(); + missing_data.add_range(&Range::new(0, self.shared.file_size)); + { + let download_status = self.shared.download_status.lock().unwrap(); + missing_data.subtract_range_set(&download_status.downloaded); + missing_data.subtract_range_set(&download_status.requested); + } - } else if ! missing_data.is_empty() { - // ok, the tail is downloaded, download something fom the beginning. - let range = missing_data.get_range(0); - let offset = range.start; - let length = min(range.length, MAXIMUM_CHUNK_SIZE); - self.download_range(offset, length); + // download data from after the current read position first + let mut tail_end = RangeSet::new(); + let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); + tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position)); + let tail_end = tail_end.intersection(&missing_data); + + if !tail_end.is_empty() { + let range = tail_end.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + bytes_to_go -= length; + } else if !missing_data.is_empty() { + // ok, the tail is downloaded, download something fom the beginning. + let range = missing_data.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + bytes_to_go -= length; + } else { + return; + } } } @@ -774,13 +840,10 @@ impl AudioFileFetch { self.download_range(request.start, request.length); } Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => { - self.download_strategy = DownloadStrategy::RandomAccess(); + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); } Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => { - self.download_strategy = DownloadStrategy::Streaming(); - } - Ok(Async::Ready(Some(StreamLoaderCommand::StreamDataRate(rate)))) => { - self.streaming_data_rate = rate; + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); } Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { return Ok(Async::Ready(())); @@ -827,18 +890,22 @@ impl Future for AudioFileFetch { Err(()) => unreachable!(), } - - if let DownloadStrategy::Streaming() = self.download_strategy { + if let DownloadStrategy::Streaming() = self.get_download_strategy() { let bytes_pending: usize = { let download_status = self.shared.download_status.lock().unwrap(); download_status.requested.minus(&download_status.downloaded).len() }; - let ping_time = self.shared.ping_time_ms.load(atomic::Ordering::Relaxed); + let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - if bytes_pending < 2 * ping_time * self.streaming_data_rate / 1000 { - trace!("Prefetching more data. pending bytes({}) < 2 * ping time ({}) * data rate({}) / 1000.",bytes_pending, ping_time, self.streaming_data_rate); - self.pre_fetch_more_data(); + let desired_pending_bytes = max( + (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.session.channel().get_download_rate_estimate() as f64) as usize + ); + + if bytes_pending < desired_pending_bytes { + trace!("Prefetching more data. pending bytes({}) < {}",bytes_pending, desired_pending_bytes); + self.pre_fetch_more_data(desired_pending_bytes - bytes_pending); } } @@ -857,14 +924,25 @@ impl Read for AudioFileStreaming { let length = min(output.len(), self.shared.file_size - offset); - if length == 0 { - return Ok(0); - } + + let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) { + DownloadStrategy::RandomAccess() => { length } + DownloadStrategy::Streaming() => { + // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded. + let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + + let length_to_request = length + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) as usize, + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time_seconds * self.shared.stream_data_rate as f64) as usize + ); + min(length_to_request, self.shared.file_size - offset) + } + }; let mut ranges_to_request = RangeSet::new(); - ranges_to_request.add_range(&Range::new(offset, length)); + ranges_to_request.add_range(&Range::new(offset, length_to_request)); trace!("reading at postion {} (length : {})", offset, length); @@ -878,6 +956,11 @@ impl Read for AudioFileStreaming { self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap(); } + + if length == 0 { + return Ok(0); + } + while !download_status.downloaded.contains(offset) { trace!("waiting for download"); download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0; diff --git a/audio/src/lib.rs b/audio/src/lib.rs index b178c395..845ba5f9 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -25,6 +25,7 @@ mod range_set; pub use decrypt::AudioDecrypt; pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController}; +pub use fetch::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS}; #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; diff --git a/core/src/channel.rs b/core/src/channel.rs index 3238a0a6..62620e36 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -3,6 +3,7 @@ use bytes::Bytes; use futures::sync::{mpsc, BiLock}; use futures::{Async, Poll, Stream}; use std::collections::HashMap; +use std::time::{Instant}; use util::SeqGenerator; @@ -10,6 +11,10 @@ component! { ChannelManager : ChannelManagerInner { sequence: SeqGenerator = SeqGenerator::new(0), channels: HashMap> = HashMap::new(), + download_rate_estimate: usize = 0, + download_measurement_start: Option = None, + download_measurement_bytes: usize = 0, + download_measurement_last_received: Option = None, } } @@ -59,14 +64,46 @@ impl ChannelManager { let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref()); + + trace!("Received data for channel {}: {} bytes.", id, data.len()); self.lock(|inner| { + + let current_time = Instant::now(); + if let Some(download_measurement_start) = inner.download_measurement_start { + if let Some(download_measurement_last_received) = inner.download_measurement_last_received { + if (current_time - download_measurement_start).as_millis() > 1000 { + if (current_time - download_measurement_last_received).as_millis() <= 500 { + inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (current_time - download_measurement_start).as_millis() as usize; + } else { + inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (500+(download_measurement_last_received - download_measurement_start).as_millis() as usize); + } + + inner.download_measurement_start = Some(current_time); + inner.download_measurement_bytes = 0; + } + } + } else { + inner.download_measurement_start = Some(current_time); + } + + inner.download_measurement_last_received = Some(current_time); + inner.download_measurement_bytes += data.len(); + if let Entry::Occupied(entry) = inner.channels.entry(id) { let _ = entry.get().unbounded_send((cmd, data)); } }); } + + pub fn get_download_rate_estimate(&self) -> usize { + return self.lock(|inner| { + inner.download_rate_estimate + }); + + } + } impl Channel { diff --git a/playback/src/player.rs b/playback/src/player.rs index 32500e30..bdccea38 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -9,12 +9,14 @@ use std::mem; use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError}; use std::thread; use std::time::Duration; +use std::cmp::max; use config::{Bitrate, PlayerConfig}; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; use audio::{AudioDecrypt, AudioFile, StreamLoaderController}; +use audio::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS}; use audio::{VorbisDecoder, VorbisPacket}; use audio_backend::Sink; use metadata::{AudioItem, FileFormat}; @@ -203,6 +205,7 @@ enum PlayerState { end_of_track: oneshot::Sender<()>, normalisation_factor: f32, stream_loader_controller: StreamLoaderController, + bytes_per_second: usize, }, Playing { track_id: SpotifyId, @@ -210,6 +213,7 @@ enum PlayerState { end_of_track: oneshot::Sender<()>, normalisation_factor: f32, stream_loader_controller: StreamLoaderController, + bytes_per_second: usize, }, EndOfTrack { track_id: SpotifyId, @@ -269,6 +273,7 @@ impl PlayerState { end_of_track, normalisation_factor, stream_loader_controller, + bytes_per_second } => { *self = Playing { track_id: track_id, @@ -276,6 +281,7 @@ impl PlayerState { end_of_track: end_of_track, normalisation_factor: normalisation_factor, stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; } _ => panic!("invalid state"), @@ -291,6 +297,7 @@ impl PlayerState { end_of_track, normalisation_factor, stream_loader_controller, + bytes_per_second, } => { *self = Paused { track_id: track_id, @@ -298,6 +305,7 @@ impl PlayerState { end_of_track: end_of_track, normalisation_factor: normalisation_factor, stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; } _ => panic!("invalid state"), @@ -418,7 +426,7 @@ impl PlayerInternal { } match self.load_track(track_id, position as i64) { - Some((decoder, normalisation_factor, stream_loader_controller)) => { + Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) => { if play { match self.state { PlayerState::Playing { @@ -443,6 +451,7 @@ impl PlayerInternal { end_of_track: end_of_track, normalisation_factor: normalisation_factor, stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; } else { self.state = PlayerState::Paused { @@ -451,6 +460,7 @@ impl PlayerInternal { end_of_track: end_of_track, normalisation_factor: normalisation_factor, stream_loader_controller: stream_loader_controller, + bytes_per_second: bytes_per_second, }; match self.state { PlayerState::Playing { @@ -493,10 +503,21 @@ impl PlayerInternal { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { stream_loader_controller.set_stream_mode(); } - if let PlayerState::Playing{..} = self.state { + if let PlayerState::Playing{bytes_per_second, ..} = self.state { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { - let stream_data_rate = stream_loader_controller.data_rate(); - let wait_for_data_length = (2 * stream_loader_controller.ping_time_ms() * stream_data_rate) / 1000; + + // Request our read ahead range + let request_data_length = max( + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize, + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize + ); + stream_loader_controller.fetch_next(request_data_length); + + // Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete. + let wait_for_data_length = max( + (READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize, + (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize + ); stream_loader_controller.fetch_next_blocking(wait_for_data_length); } } @@ -580,7 +601,7 @@ impl PlayerInternal { } } - fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController)> { + fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController, usize)> { let audio = AudioItem::get_audio_item(&self.session, spotify_id) .wait() .unwrap(); @@ -624,14 +645,17 @@ impl PlayerInternal { } }; + let bytes_per_second = self.stream_data_rate(*format); + let play_from_beginning = position==0; + let key = self.session.audio_key().request(spotify_id, file_id); - let encrypted_file = AudioFile::open(&self.session, file_id); + let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning); let encrypted_file = encrypted_file.wait().unwrap(); - let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(self.stream_data_rate(*format)); + let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(); - if position == 0 { + if play_from_beginning { // No need to seek -> we stream from the beginning stream_loader_controller.set_stream_mode(); } else { @@ -663,7 +687,7 @@ impl PlayerInternal { stream_loader_controller.set_stream_mode(); } info!("<{}> loaded", audio.name); - Some((decoder, normalisation_factor, stream_loader_controller)) + Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) } }