mirror of
https://github.com/librespot-org/librespot.git
synced 2024-12-18 17:11:53 +00:00
Refine file downloading heuristics to use data rates and ping times everywhere.
This commit is contained in:
parent
c50fc9885a
commit
6422dcef78
4 changed files with 233 additions and 88 deletions
|
@ -3,7 +3,7 @@ use bytes::Bytes;
|
|||
use futures::sync::{mpsc, oneshot};
|
||||
use futures::Stream;
|
||||
use futures::{Async, Future, Poll};
|
||||
use std::cmp::min;
|
||||
use std::cmp::{min, max};
|
||||
use std::fs;
|
||||
use std::io::{self, Read, Seek, SeekFrom, Write};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
|
@ -18,9 +18,67 @@ use futures::sync::mpsc::unbounded;
|
|||
use std::sync::atomic;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
|
||||
const MINIMUM_CHUNK_SIZE: usize = 1024 * 16;
|
||||
const MAXIMUM_CHUNK_SIZE: usize = 1024 * 128;
|
||||
const MAXIMUM_ASSUMED_PING_TIME_SECONDS: u64 = 5;
|
||||
|
||||
const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
|
||||
// The minimum size of a block that is requested from the Spotify servers in one request.
|
||||
// This is the block size that is typically requested while doing a seek() on a file.
|
||||
// Note: smaller requests can happen if part of the block is downloaded already.
|
||||
|
||||
const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; // MUST be divisible by four!!!
|
||||
// The amount of data that is requested when initially opening a file.
|
||||
// Note: if the file is opened to play from the beginning, the amount of data to
|
||||
// read ahead is requested in addition to this amount. If the file is opened to seek to
|
||||
// another position, then only this amount is requested on the first request.
|
||||
|
||||
const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
|
||||
// The pig time that is used for calculations before a ping time was actually measured.
|
||||
|
||||
const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
|
||||
// If the measured ping time to the Spotify server is larger than this value, it is capped
|
||||
// to avoid run-away block sizes and pre-fetching.
|
||||
|
||||
pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
|
||||
// Before playback starts, this many seconds of data must be present.
|
||||
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
|
||||
// of audio data may be larger or smaller.
|
||||
|
||||
pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
|
||||
// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
|
||||
// time to the Spotify server.
|
||||
// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
|
||||
// obeyed.
|
||||
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
|
||||
// of audio data may be larger or smaller.
|
||||
|
||||
pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 1.0;
|
||||
// While playing back, this many seconds of data ahead of the current read position are
|
||||
// requested.
|
||||
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
|
||||
// of audio data may be larger or smaller.
|
||||
|
||||
pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
|
||||
// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
|
||||
// time to the Spotify server.
|
||||
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
|
||||
// of audio data may be larger or smaller.
|
||||
|
||||
const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
|
||||
// If the amount of data that is pending (requested but not received) is less than a certain amount,
|
||||
// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
|
||||
// data is calculated as
|
||||
// <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
|
||||
|
||||
const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
|
||||
// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
|
||||
// The formula used is
|
||||
// <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
|
||||
// This mechanism allows for fast downloading of the remainder of the file. The number should be larger
|
||||
// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
|
||||
// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
|
||||
// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
|
||||
// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
|
||||
|
||||
|
||||
|
||||
pub enum AudioFile {
|
||||
Cached(fs::File),
|
||||
|
@ -40,6 +98,7 @@ pub struct AudioFileOpenStreaming {
|
|||
headers: ChannelHeaders,
|
||||
file_id: FileId,
|
||||
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
|
||||
streaming_data_rate: usize,
|
||||
}
|
||||
|
||||
|
||||
|
@ -47,7 +106,6 @@ enum StreamLoaderCommand{
|
|||
Fetch(Range), // signal the stream loader to fetch a range of the file
|
||||
RandomAccessMode(), // optimise download strategy for random access
|
||||
StreamMode(), // optimise download strategy for streaming
|
||||
StreamDataRate(usize), // when optimising for streaming, assume a streaming rate of this many bytes per second.
|
||||
Close(), // terminate and don't load any more data
|
||||
}
|
||||
|
||||
|
@ -57,7 +115,6 @@ pub struct StreamLoaderController {
|
|||
channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
|
||||
stream_shared: Option<Arc<AudioFileShared>>,
|
||||
file_size: usize,
|
||||
bytes_per_second: usize,
|
||||
}
|
||||
|
||||
|
||||
|
@ -66,8 +123,6 @@ impl StreamLoaderController {
|
|||
return self.file_size;
|
||||
}
|
||||
|
||||
pub fn data_rate(&self) -> usize { return self.bytes_per_second; }
|
||||
|
||||
pub fn range_available(&self, range: Range) -> bool {
|
||||
if let Some(ref shared) = self.stream_shared {
|
||||
let download_status = shared.download_status.lock().unwrap();
|
||||
|
@ -124,7 +179,7 @@ impl StreamLoaderController {
|
|||
if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) {
|
||||
// For some reason, the requested range is neither downloaded nor requested.
|
||||
// This could be due to a network error. Request it again.
|
||||
// We can't use self.fetch here because self can't borrowed mutably, so we access the channel directly.
|
||||
// We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
|
||||
if let Some(ref mut channel) = self.channel_tx {
|
||||
// ignore the error in case the channel has been closed already.
|
||||
let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
|
||||
|
@ -169,12 +224,6 @@ impl StreamLoaderController {
|
|||
self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
|
||||
}
|
||||
|
||||
pub fn set_stream_data_rate(&mut self, bytes_per_second: usize) {
|
||||
// when optimising for streaming, assume a streaming rate of this many bytes per second.
|
||||
self.bytes_per_second = bytes_per_second;
|
||||
self.send_stream_loader_command(StreamLoaderCommand::StreamDataRate(bytes_per_second));
|
||||
}
|
||||
|
||||
pub fn close(&mut self) {
|
||||
// terminate stream loading and don't load any more data for this file.
|
||||
self.send_stream_loader_command(StreamLoaderCommand::Close());
|
||||
|
@ -200,11 +249,19 @@ struct AudioFileDownloadStatus {
|
|||
downloaded: RangeSet,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
enum DownloadStrategy {
|
||||
RandomAccess(),
|
||||
Streaming(),
|
||||
}
|
||||
|
||||
struct AudioFileShared {
|
||||
file_id: FileId,
|
||||
file_size: usize,
|
||||
stream_data_rate: usize,
|
||||
cond: Condvar,
|
||||
download_status: Mutex<AudioFileDownloadStatus>,
|
||||
download_strategy: Mutex<DownloadStrategy>,
|
||||
number_of_open_requests: AtomicUsize,
|
||||
ping_time_ms: AtomicUsize,
|
||||
read_position: AtomicUsize,
|
||||
|
@ -216,8 +273,10 @@ impl AudioFileOpenStreaming {
|
|||
let shared = Arc::new(AudioFileShared {
|
||||
file_id: self.file_id,
|
||||
file_size: size,
|
||||
stream_data_rate: self.streaming_data_rate,
|
||||
cond: Condvar::new(),
|
||||
download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}),
|
||||
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
|
||||
number_of_open_requests: AtomicUsize::new(0),
|
||||
ping_time_ms: AtomicUsize::new(0),
|
||||
read_position: AtomicUsize::new(0),
|
||||
|
@ -296,7 +355,7 @@ impl Future for AudioFileOpenStreaming {
|
|||
}
|
||||
|
||||
impl AudioFile {
|
||||
pub fn open(session: &Session, file_id: FileId) -> AudioFileOpen {
|
||||
pub fn open(session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool) -> AudioFileOpen {
|
||||
let cache = session.cache().cloned();
|
||||
|
||||
if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
|
||||
|
@ -307,7 +366,14 @@ impl AudioFile {
|
|||
debug!("Downloading file {}", file_id);
|
||||
|
||||
let (complete_tx, complete_rx) = oneshot::channel();
|
||||
let initial_data_length = MINIMUM_CHUNK_SIZE;
|
||||
let mut initial_data_length = if play_from_beginning {
|
||||
INITIAL_DOWNLOAD_SIZE + max((READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, (INITIAL_PING_TIME_ESTIMATE_SECONDS * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * bytes_per_second as f64) as usize)
|
||||
} else {
|
||||
INITIAL_DOWNLOAD_SIZE
|
||||
};
|
||||
if initial_data_length % 4 != 0 {
|
||||
initial_data_length += 4 - (initial_data_length % 4);
|
||||
}
|
||||
let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
|
||||
|
||||
let open = AudioFileOpenStreaming {
|
||||
|
@ -320,6 +386,8 @@ impl AudioFile {
|
|||
initial_request_sent_time: Instant::now(),
|
||||
|
||||
complete_tx: Some(complete_tx),
|
||||
streaming_data_rate: bytes_per_second,
|
||||
|
||||
};
|
||||
|
||||
let session_ = session.clone();
|
||||
|
@ -336,27 +404,23 @@ impl AudioFile {
|
|||
.or_else(|oneshot::Canceled| Ok(()))
|
||||
});
|
||||
|
||||
AudioFileOpen::Streaming(open)
|
||||
return AudioFileOpen::Streaming(open);
|
||||
}
|
||||
|
||||
pub fn get_stream_loader_controller(&self, bytes_per_second: usize) -> StreamLoaderController {
|
||||
pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
|
||||
match self {
|
||||
AudioFile::Streaming(stream) => {
|
||||
let mut result = StreamLoaderController {
|
||||
AudioFile::Streaming(ref stream) => {
|
||||
return StreamLoaderController {
|
||||
channel_tx: Some(stream.stream_loader_command_tx.clone()),
|
||||
stream_shared: Some(stream.shared.clone()),
|
||||
file_size: stream.shared.file_size,
|
||||
bytes_per_second: bytes_per_second,
|
||||
};
|
||||
result.set_stream_data_rate(bytes_per_second);
|
||||
return result;
|
||||
}
|
||||
AudioFile::Cached(ref file) => {
|
||||
return StreamLoaderController {
|
||||
channel_tx: None,
|
||||
stream_shared: None,
|
||||
file_size: file.metadata().unwrap().len() as usize,
|
||||
bytes_per_second: bytes_per_second,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -474,8 +538,8 @@ impl Future for AudioFileFetchDataReceiver {
|
|||
if let Some(request_sent_time) = self.request_sent_time {
|
||||
let duration = Instant::now() - request_sent_time;
|
||||
let duration_ms: u64;
|
||||
if duration.as_secs() > MAXIMUM_ASSUMED_PING_TIME_SECONDS {
|
||||
duration_ms = MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000;
|
||||
if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS {
|
||||
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
|
||||
} else {
|
||||
duration_ms = duration.as_secs() * 1000 + duration.subsec_millis() as u64;
|
||||
}
|
||||
|
@ -520,11 +584,6 @@ impl Future for AudioFileFetchDataReceiver {
|
|||
}
|
||||
|
||||
|
||||
enum DownloadStrategy {
|
||||
RandomAccess(),
|
||||
Streaming(),
|
||||
}
|
||||
|
||||
struct AudioFileFetch {
|
||||
session: Session,
|
||||
shared: Arc<AudioFileShared>,
|
||||
|
@ -533,11 +592,8 @@ struct AudioFileFetch {
|
|||
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
|
||||
file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
|
||||
|
||||
//seek_rx: mpsc::UnboundedReceiver<u64>,
|
||||
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
|
||||
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
|
||||
download_strategy: DownloadStrategy,
|
||||
streaming_data_rate: usize,
|
||||
network_response_times_ms: Vec<usize>,
|
||||
}
|
||||
|
||||
|
@ -584,16 +640,18 @@ impl AudioFileFetch {
|
|||
|
||||
stream_loader_command_rx: stream_loader_command_rx,
|
||||
complete_tx: Some(complete_tx),
|
||||
download_strategy: DownloadStrategy::RandomAccess(), // start with random access mode until someone tells us otherwise
|
||||
streaming_data_rate: 40, // assume 360 kbit per second unless someone tells us otherwise.
|
||||
network_response_times_ms: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_download_strategy(&mut self) -> DownloadStrategy {
|
||||
*(self.shared.download_strategy.lock().unwrap())
|
||||
}
|
||||
|
||||
fn download_range(&mut self, mut offset: usize, mut length: usize) {
|
||||
|
||||
if length < MINIMUM_CHUNK_SIZE {
|
||||
length = MINIMUM_CHUNK_SIZE;
|
||||
if length < MINIMUM_DOWNLOAD_SIZE {
|
||||
length = MINIMUM_DOWNLOAD_SIZE;
|
||||
}
|
||||
|
||||
// ensure the values are within the bounds and align them by 4 for the spotify protocol.
|
||||
|
@ -647,35 +705,43 @@ impl AudioFileFetch {
|
|||
|
||||
}
|
||||
|
||||
fn pre_fetch_more_data(&mut self) {
|
||||
fn pre_fetch_more_data(&mut self, bytes: usize) {
|
||||
|
||||
// determine what is still missing
|
||||
let mut missing_data = RangeSet::new();
|
||||
missing_data.add_range(&Range::new(0,self.shared.file_size));
|
||||
{
|
||||
let download_status = self.shared.download_status.lock().unwrap();
|
||||
missing_data.subtract_range_set(&download_status.downloaded);
|
||||
missing_data.subtract_range_set(&download_status.requested);
|
||||
}
|
||||
let mut bytes_to_go = bytes;
|
||||
|
||||
// download data from after the current read position first
|
||||
let mut tail_end = RangeSet::new();
|
||||
let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
|
||||
tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position));
|
||||
let tail_end = tail_end.intersection(&missing_data);
|
||||
while bytes_to_go > 0 {
|
||||
|
||||
if ! tail_end.is_empty() {
|
||||
let range = tail_end.get_range(0);
|
||||
let offset = range.start;
|
||||
let length = min(range.length, MAXIMUM_CHUNK_SIZE);
|
||||
self.download_range(offset, length);
|
||||
// determine what is still missing
|
||||
let mut missing_data = RangeSet::new();
|
||||
missing_data.add_range(&Range::new(0, self.shared.file_size));
|
||||
{
|
||||
let download_status = self.shared.download_status.lock().unwrap();
|
||||
missing_data.subtract_range_set(&download_status.downloaded);
|
||||
missing_data.subtract_range_set(&download_status.requested);
|
||||
}
|
||||
|
||||
} else if ! missing_data.is_empty() {
|
||||
// ok, the tail is downloaded, download something fom the beginning.
|
||||
let range = missing_data.get_range(0);
|
||||
let offset = range.start;
|
||||
let length = min(range.length, MAXIMUM_CHUNK_SIZE);
|
||||
self.download_range(offset, length);
|
||||
// download data from after the current read position first
|
||||
let mut tail_end = RangeSet::new();
|
||||
let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
|
||||
tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position));
|
||||
let tail_end = tail_end.intersection(&missing_data);
|
||||
|
||||
if !tail_end.is_empty() {
|
||||
let range = tail_end.get_range(0);
|
||||
let offset = range.start;
|
||||
let length = min(range.length, bytes_to_go);
|
||||
self.download_range(offset, length);
|
||||
bytes_to_go -= length;
|
||||
} else if !missing_data.is_empty() {
|
||||
// ok, the tail is downloaded, download something fom the beginning.
|
||||
let range = missing_data.get_range(0);
|
||||
let offset = range.start;
|
||||
let length = min(range.length, bytes_to_go);
|
||||
self.download_range(offset, length);
|
||||
bytes_to_go -= length;
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -774,13 +840,10 @@ impl AudioFileFetch {
|
|||
self.download_range(request.start, request.length);
|
||||
}
|
||||
Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
|
||||
self.download_strategy = DownloadStrategy::RandomAccess();
|
||||
*(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess();
|
||||
}
|
||||
Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
|
||||
self.download_strategy = DownloadStrategy::Streaming();
|
||||
}
|
||||
Ok(Async::Ready(Some(StreamLoaderCommand::StreamDataRate(rate)))) => {
|
||||
self.streaming_data_rate = rate;
|
||||
*(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming();
|
||||
}
|
||||
Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
|
||||
return Ok(Async::Ready(()));
|
||||
|
@ -827,18 +890,22 @@ impl Future for AudioFileFetch {
|
|||
Err(()) => unreachable!(),
|
||||
}
|
||||
|
||||
|
||||
if let DownloadStrategy::Streaming() = self.download_strategy {
|
||||
if let DownloadStrategy::Streaming() = self.get_download_strategy() {
|
||||
let bytes_pending: usize = {
|
||||
let download_status = self.shared.download_status.lock().unwrap();
|
||||
download_status.requested.minus(&download_status.downloaded).len()
|
||||
};
|
||||
|
||||
let ping_time = self.shared.ping_time_ms.load(atomic::Ordering::Relaxed);
|
||||
let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
|
||||
|
||||
if bytes_pending < 2 * ping_time * self.streaming_data_rate / 1000 {
|
||||
trace!("Prefetching more data. pending bytes({}) < 2 * ping time ({}) * data rate({}) / 1000.",bytes_pending, ping_time, self.streaming_data_rate);
|
||||
self.pre_fetch_more_data();
|
||||
let desired_pending_bytes = max(
|
||||
(PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) as usize,
|
||||
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.session.channel().get_download_rate_estimate() as f64) as usize
|
||||
);
|
||||
|
||||
if bytes_pending < desired_pending_bytes {
|
||||
trace!("Prefetching more data. pending bytes({}) < {}",bytes_pending, desired_pending_bytes);
|
||||
self.pre_fetch_more_data(desired_pending_bytes - bytes_pending);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -857,14 +924,25 @@ impl Read for AudioFileStreaming {
|
|||
|
||||
let length = min(output.len(), self.shared.file_size - offset);
|
||||
|
||||
if length == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
|
||||
DownloadStrategy::RandomAccess() => { length }
|
||||
DownloadStrategy::Streaming() => {
|
||||
// Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
|
||||
let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
|
||||
|
||||
let length_to_request = length + max(
|
||||
(READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) as usize,
|
||||
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time_seconds * self.shared.stream_data_rate as f64) as usize
|
||||
);
|
||||
min(length_to_request, self.shared.file_size - offset)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
|
||||
let mut ranges_to_request = RangeSet::new();
|
||||
ranges_to_request.add_range(&Range::new(offset, length));
|
||||
ranges_to_request.add_range(&Range::new(offset, length_to_request));
|
||||
|
||||
trace!("reading at postion {} (length : {})", offset, length);
|
||||
|
||||
|
@ -878,6 +956,11 @@ impl Read for AudioFileStreaming {
|
|||
self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap();
|
||||
}
|
||||
|
||||
|
||||
if length == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
|
||||
while !download_status.downloaded.contains(offset) {
|
||||
trace!("waiting for download");
|
||||
download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
|
||||
|
|
|
@ -25,6 +25,7 @@ mod range_set;
|
|||
|
||||
pub use decrypt::AudioDecrypt;
|
||||
pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController};
|
||||
pub use fetch::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS};
|
||||
|
||||
#[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))]
|
||||
pub use lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket};
|
||||
|
|
|
@ -3,6 +3,7 @@ use bytes::Bytes;
|
|||
use futures::sync::{mpsc, BiLock};
|
||||
use futures::{Async, Poll, Stream};
|
||||
use std::collections::HashMap;
|
||||
use std::time::{Instant};
|
||||
|
||||
use util::SeqGenerator;
|
||||
|
||||
|
@ -10,6 +11,10 @@ component! {
|
|||
ChannelManager : ChannelManagerInner {
|
||||
sequence: SeqGenerator<u16> = SeqGenerator::new(0),
|
||||
channels: HashMap<u16, mpsc::UnboundedSender<(u8, Bytes)>> = HashMap::new(),
|
||||
download_rate_estimate: usize = 0,
|
||||
download_measurement_start: Option<Instant> = None,
|
||||
download_measurement_bytes: usize = 0,
|
||||
download_measurement_last_received: Option<Instant> = None,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,14 +64,46 @@ impl ChannelManager {
|
|||
|
||||
let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref());
|
||||
|
||||
|
||||
|
||||
trace!("Received data for channel {}: {} bytes.", id, data.len());
|
||||
|
||||
self.lock(|inner| {
|
||||
|
||||
let current_time = Instant::now();
|
||||
if let Some(download_measurement_start) = inner.download_measurement_start {
|
||||
if let Some(download_measurement_last_received) = inner.download_measurement_last_received {
|
||||
if (current_time - download_measurement_start).as_millis() > 1000 {
|
||||
if (current_time - download_measurement_last_received).as_millis() <= 500 {
|
||||
inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (current_time - download_measurement_start).as_millis() as usize;
|
||||
} else {
|
||||
inner.download_rate_estimate = 1000 * inner.download_measurement_bytes / (500+(download_measurement_last_received - download_measurement_start).as_millis() as usize);
|
||||
}
|
||||
|
||||
inner.download_measurement_start = Some(current_time);
|
||||
inner.download_measurement_bytes = 0;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
inner.download_measurement_start = Some(current_time);
|
||||
}
|
||||
|
||||
inner.download_measurement_last_received = Some(current_time);
|
||||
inner.download_measurement_bytes += data.len();
|
||||
|
||||
if let Entry::Occupied(entry) = inner.channels.entry(id) {
|
||||
let _ = entry.get().unbounded_send((cmd, data));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn get_download_rate_estimate(&self) -> usize {
|
||||
return self.lock(|inner| {
|
||||
inner.download_rate_estimate
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
impl Channel {
|
||||
|
|
|
@ -9,12 +9,14 @@ use std::mem;
|
|||
use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::cmp::max;
|
||||
|
||||
use config::{Bitrate, PlayerConfig};
|
||||
use librespot_core::session::Session;
|
||||
use librespot_core::spotify_id::SpotifyId;
|
||||
|
||||
use audio::{AudioDecrypt, AudioFile, StreamLoaderController};
|
||||
use audio::{READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS};
|
||||
use audio::{VorbisDecoder, VorbisPacket};
|
||||
use audio_backend::Sink;
|
||||
use metadata::{AudioItem, FileFormat};
|
||||
|
@ -203,6 +205,7 @@ enum PlayerState {
|
|||
end_of_track: oneshot::Sender<()>,
|
||||
normalisation_factor: f32,
|
||||
stream_loader_controller: StreamLoaderController,
|
||||
bytes_per_second: usize,
|
||||
},
|
||||
Playing {
|
||||
track_id: SpotifyId,
|
||||
|
@ -210,6 +213,7 @@ enum PlayerState {
|
|||
end_of_track: oneshot::Sender<()>,
|
||||
normalisation_factor: f32,
|
||||
stream_loader_controller: StreamLoaderController,
|
||||
bytes_per_second: usize,
|
||||
},
|
||||
EndOfTrack {
|
||||
track_id: SpotifyId,
|
||||
|
@ -269,6 +273,7 @@ impl PlayerState {
|
|||
end_of_track,
|
||||
normalisation_factor,
|
||||
stream_loader_controller,
|
||||
bytes_per_second
|
||||
} => {
|
||||
*self = Playing {
|
||||
track_id: track_id,
|
||||
|
@ -276,6 +281,7 @@ impl PlayerState {
|
|||
end_of_track: end_of_track,
|
||||
normalisation_factor: normalisation_factor,
|
||||
stream_loader_controller: stream_loader_controller,
|
||||
bytes_per_second: bytes_per_second,
|
||||
};
|
||||
}
|
||||
_ => panic!("invalid state"),
|
||||
|
@ -291,6 +297,7 @@ impl PlayerState {
|
|||
end_of_track,
|
||||
normalisation_factor,
|
||||
stream_loader_controller,
|
||||
bytes_per_second,
|
||||
} => {
|
||||
*self = Paused {
|
||||
track_id: track_id,
|
||||
|
@ -298,6 +305,7 @@ impl PlayerState {
|
|||
end_of_track: end_of_track,
|
||||
normalisation_factor: normalisation_factor,
|
||||
stream_loader_controller: stream_loader_controller,
|
||||
bytes_per_second: bytes_per_second,
|
||||
};
|
||||
}
|
||||
_ => panic!("invalid state"),
|
||||
|
@ -418,7 +426,7 @@ impl PlayerInternal {
|
|||
}
|
||||
|
||||
match self.load_track(track_id, position as i64) {
|
||||
Some((decoder, normalisation_factor, stream_loader_controller)) => {
|
||||
Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second)) => {
|
||||
if play {
|
||||
match self.state {
|
||||
PlayerState::Playing {
|
||||
|
@ -443,6 +451,7 @@ impl PlayerInternal {
|
|||
end_of_track: end_of_track,
|
||||
normalisation_factor: normalisation_factor,
|
||||
stream_loader_controller: stream_loader_controller,
|
||||
bytes_per_second: bytes_per_second,
|
||||
};
|
||||
} else {
|
||||
self.state = PlayerState::Paused {
|
||||
|
@ -451,6 +460,7 @@ impl PlayerInternal {
|
|||
end_of_track: end_of_track,
|
||||
normalisation_factor: normalisation_factor,
|
||||
stream_loader_controller: stream_loader_controller,
|
||||
bytes_per_second: bytes_per_second,
|
||||
};
|
||||
match self.state {
|
||||
PlayerState::Playing {
|
||||
|
@ -493,10 +503,21 @@ impl PlayerInternal {
|
|||
if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
|
||||
stream_loader_controller.set_stream_mode();
|
||||
}
|
||||
if let PlayerState::Playing{..} = self.state {
|
||||
if let PlayerState::Playing{bytes_per_second, ..} = self.state {
|
||||
if let Some(stream_loader_controller) = self.state.stream_loader_controller() {
|
||||
let stream_data_rate = stream_loader_controller.data_rate();
|
||||
let wait_for_data_length = (2 * stream_loader_controller.ping_time_ms() * stream_data_rate) / 1000;
|
||||
|
||||
// Request our read ahead range
|
||||
let request_data_length = max(
|
||||
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize,
|
||||
(READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize
|
||||
);
|
||||
stream_loader_controller.fetch_next(request_data_length);
|
||||
|
||||
// Request the part we want to wait for blocking. This effecively means we wait for the previous request to partially complete.
|
||||
let wait_for_data_length = max(
|
||||
(READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS * (0.001 * stream_loader_controller.ping_time_ms() as f64) * bytes_per_second as f64) as usize,
|
||||
(READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize
|
||||
);
|
||||
stream_loader_controller.fetch_next_blocking(wait_for_data_length);
|
||||
}
|
||||
}
|
||||
|
@ -580,7 +601,7 @@ impl PlayerInternal {
|
|||
}
|
||||
}
|
||||
|
||||
fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController)> {
|
||||
fn load_track(&self, spotify_id: SpotifyId, position: i64) -> Option<(Decoder, f32, StreamLoaderController, usize)> {
|
||||
let audio = AudioItem::get_audio_item(&self.session, spotify_id)
|
||||
.wait()
|
||||
.unwrap();
|
||||
|
@ -624,14 +645,17 @@ impl PlayerInternal {
|
|||
}
|
||||
};
|
||||
|
||||
let bytes_per_second = self.stream_data_rate(*format);
|
||||
let play_from_beginning = position==0;
|
||||
|
||||
let key = self.session.audio_key().request(spotify_id, file_id);
|
||||
let encrypted_file = AudioFile::open(&self.session, file_id);
|
||||
let encrypted_file = AudioFile::open(&self.session, file_id, bytes_per_second, play_from_beginning);
|
||||
|
||||
let encrypted_file = encrypted_file.wait().unwrap();
|
||||
|
||||
let mut stream_loader_controller = encrypted_file.get_stream_loader_controller(self.stream_data_rate(*format));
|
||||
let mut stream_loader_controller = encrypted_file.get_stream_loader_controller();
|
||||
|
||||
if position == 0 {
|
||||
if play_from_beginning {
|
||||
// No need to seek -> we stream from the beginning
|
||||
stream_loader_controller.set_stream_mode();
|
||||
} else {
|
||||
|
@ -663,7 +687,7 @@ impl PlayerInternal {
|
|||
stream_loader_controller.set_stream_mode();
|
||||
}
|
||||
info!("<{}> loaded", audio.name);
|
||||
Some((decoder, normalisation_factor, stream_loader_controller))
|
||||
Some((decoder, normalisation_factor, stream_loader_controller, bytes_per_second))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue