librespot/audio/src/fetch.rs

1015 lines
36 KiB
Rust
Raw Normal View History

2018-02-26 01:50:41 +00:00
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
2019-11-01 19:46:28 +00:00
use bytes::Bytes;
2018-02-26 01:50:41 +00:00
use futures::sync::{mpsc, oneshot};
2018-04-25 17:29:50 +00:00
use futures::Stream;
use futures::{Async, Future, Poll};
use std::cmp::{min, max};
2015-07-07 21:40:31 +00:00
use std::fs;
2018-02-26 01:50:41 +00:00
use std::io::{self, Read, Seek, SeekFrom, Write};
2017-01-19 22:45:24 +00:00
use std::sync::{Arc, Condvar, Mutex};
2019-11-01 19:46:28 +00:00
use std::time::{Duration, Instant};
2016-01-12 23:29:31 +00:00
use tempfile::NamedTempFile;
2019-11-01 19:46:28 +00:00
use range_set::{Range, RangeSet};
2015-06-23 14:38:29 +00:00
2019-09-16 19:00:09 +00:00
use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
use librespot_core::session::Session;
use librespot_core::spotify_id::FileId;
2019-11-01 19:46:28 +00:00
use futures::sync::mpsc::unbounded;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
2015-06-23 14:38:29 +00:00
const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
// The minimum size of a block that is requested from the Spotify servers in one request.
// This is the block size that is typically requested while doing a seek() on a file.
// Note: smaller requests can happen if part of the block is downloaded already.
const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; // MUST be divisible by four!!!
// The amount of data that is requested when initially opening a file.
// Note: if the file is opened to play from the beginning, the amount of data to
// read ahead is requested in addition to this amount. If the file is opened to seek to
// another position, then only this amount is requested on the first request.
const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
// The pig time that is used for calculations before a ping time was actually measured.
const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
// If the measured ping time to the Spotify server is larger than this value, it is capped
// to avoid run-away block sizes and pre-fetching.
pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
// Before playback starts, this many seconds of data must be present.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
// time to the Spotify server.
// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
// obeyed.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 1.0;
// While playing back, this many seconds of data ahead of the current read position are
// requested.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
// time to the Spotify server.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
// If the amount of data that is pending (requested but not received) is less than a certain amount,
// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
// data is calculated as
// <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
// The formula used is
// <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
// This mechanism allows for fast downloading of the remainder of the file. The number should be larger
// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
2015-06-23 14:38:29 +00:00
pub enum AudioFile {
Cached(fs::File),
Streaming(AudioFileStreaming),
}
2015-07-07 21:40:31 +00:00
pub enum AudioFileOpen {
2017-08-03 19:37:04 +00:00
Cached(Option<fs::File>),
Streaming(AudioFileOpenStreaming),
2015-06-24 00:41:39 +00:00
}
2015-06-23 14:38:29 +00:00
pub struct AudioFileOpenStreaming {
2017-01-19 22:45:24 +00:00
session: Session,
2019-11-01 19:46:28 +00:00
initial_data_rx: Option<ChannelData>,
initial_data_length: Option<usize>,
initial_request_sent_time: Instant,
2017-01-19 22:45:24 +00:00
headers: ChannelHeaders,
file_id: FileId,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
streaming_data_rate: usize,
}
2019-11-01 19:46:28 +00:00
enum StreamLoaderCommand{
Fetch(Range), // signal the stream loader to fetch a range of the file
RandomAccessMode(), // optimise download strategy for random access
StreamMode(), // optimise download strategy for streaming
Close(), // terminate and don't load any more data
}
#[derive(Clone)]
pub struct StreamLoaderController {
channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
stream_shared: Option<Arc<AudioFileShared>>,
file_size: usize,
}
impl StreamLoaderController {
pub fn len(&self) -> usize {
return self.file_size;
}
pub fn range_available(&self, range: Range) -> bool {
if let Some(ref shared) = self.stream_shared {
2019-11-01 21:38:46 +00:00
let download_status = shared.download_status.lock().unwrap();
2019-11-01 19:46:28 +00:00
if range.length <= download_status.downloaded.contained_length_from_value(range.start) {
return true;
} else {
return false;
}
} else {
if range.length <= self.len() - range.start {
return true;
} else {
return false;
}
}
}
pub fn ping_time_ms(&self) -> usize {
if let Some(ref shared) = self.stream_shared {
return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
} else {
return 0;
}
}
fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
if let Some(ref mut channel) = self.channel_tx {
// ignore the error in case the channel has been closed already.
let _ = channel.unbounded_send(command);
}
}
pub fn fetch(&mut self, range: Range) {
// signal the stream loader to fetch a range of the file
self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
}
pub fn fetch_blocking(&mut self, mut range: Range) {
// signal the stream loader to tech a range of the file and block until it is loaded.
// ensure the range is within the file's bounds.
if range.start >= self.len() {
range.length = 0;
} else if range.end() > self.len() {
range.length = self.len() - range.start;
}
self.fetch(range);
if let Some(ref shared) = self.stream_shared {
let mut download_status = shared.download_status.lock().unwrap();
while range.length > download_status.downloaded.contained_length_from_value(range.start) {
download_status = shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
if range.length > (download_status.downloaded.union(&download_status.requested).contained_length_from_value(range.start)) {
// For some reason, the requested range is neither downloaded nor requested.
// This could be due to a network error. Request it again.
// We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
2019-11-01 19:46:28 +00:00
if let Some(ref mut channel) = self.channel_tx {
// ignore the error in case the channel has been closed already.
let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
}
}
}
}
}
pub fn fetch_next(&mut self, length: usize) {
let range:Range = if let Some(ref shared) = self.stream_shared {
Range {
start: shared.read_position.load(atomic::Ordering::Relaxed),
length: length,
}
} else {
return;
};
self.fetch(range);
}
pub fn fetch_next_blocking(&mut self, length: usize) {
let range:Range = if let Some(ref shared) = self.stream_shared {
Range {
start: shared.read_position.load(atomic::Ordering::Relaxed),
length: length,
}
} else {
return;
};
self.fetch_blocking(range);
}
pub fn set_random_access_mode(&mut self) {
// optimise download strategy for random access
self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
}
pub fn set_stream_mode(&mut self) {
// optimise download strategy for streaming
self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
}
pub fn close(&mut self) {
// terminate stream loading and don't load any more data for this file.
self.send_stream_loader_command(StreamLoaderCommand::Close());
}
}
pub struct AudioFileStreaming {
read_file: fs::File,
position: u64,
2019-11-01 19:46:28 +00:00
stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
shared: Arc<AudioFileShared>,
}
2019-11-01 19:46:28 +00:00
struct AudioFileDownloadStatus {
requested: RangeSet,
downloaded: RangeSet,
}
#[derive(Copy, Clone)]
enum DownloadStrategy {
RandomAccess(),
Streaming(),
}
struct AudioFileShared {
2017-01-19 22:45:24 +00:00
file_id: FileId,
2019-11-01 19:46:28 +00:00
file_size: usize,
stream_data_rate: usize,
2015-07-07 21:40:31 +00:00
cond: Condvar,
2019-11-01 19:46:28 +00:00
download_status: Mutex<AudioFileDownloadStatus>,
download_strategy: Mutex<DownloadStrategy>,
number_of_open_requests: AtomicUsize,
2019-11-01 19:46:28 +00:00
ping_time_ms: AtomicUsize,
read_position: AtomicUsize,
2015-06-23 14:38:29 +00:00
}
impl AudioFileOpenStreaming {
fn finish(&mut self, size: usize) -> AudioFileStreaming {
2015-07-07 21:40:31 +00:00
let shared = Arc::new(AudioFileShared {
2017-01-19 22:45:24 +00:00
file_id: self.file_id,
2019-11-01 19:46:28 +00:00
file_size: size,
stream_data_rate: self.streaming_data_rate,
2015-07-07 21:40:31 +00:00
cond: Condvar::new(),
2019-11-01 19:46:28 +00:00
download_status: Mutex::new(AudioFileDownloadStatus {requested: RangeSet::new(), downloaded: RangeSet::new()}),
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
number_of_open_requests: AtomicUsize::new(0),
2019-11-01 19:46:28 +00:00
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
2015-07-07 21:40:31 +00:00
});
2017-01-19 22:45:24 +00:00
let mut write_file = NamedTempFile::new().unwrap();
2019-11-01 21:38:46 +00:00
write_file.as_file().set_len(size as u64).unwrap();
2017-01-19 22:45:24 +00:00
write_file.seek(SeekFrom::Start(0)).unwrap();
2017-01-19 22:45:24 +00:00
let read_file = write_file.reopen().unwrap();
2015-09-01 11:20:37 +00:00
2019-11-01 19:46:28 +00:00
let initial_data_rx = self.initial_data_rx.take().unwrap();
let initial_data_length = self.initial_data_length.take().unwrap();
2017-01-19 22:45:24 +00:00
let complete_tx = self.complete_tx.take().unwrap();
2019-11-01 19:46:28 +00:00
//let (seek_tx, seek_rx) = mpsc::unbounded();
let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded::<StreamLoaderCommand>();
2015-07-07 21:40:31 +00:00
2017-01-19 22:45:24 +00:00
let fetcher = AudioFileFetch::new(
2018-02-26 01:50:41 +00:00
self.session.clone(),
shared.clone(),
2019-11-01 19:46:28 +00:00
initial_data_rx,
self.initial_request_sent_time,
initial_data_length,
2018-02-26 01:50:41 +00:00
write_file,
2019-11-01 19:46:28 +00:00
stream_loader_command_rx,
2018-02-26 01:50:41 +00:00
complete_tx,
2017-01-19 22:45:24 +00:00
);
self.session.spawn(move |_| fetcher);
2015-06-23 14:38:29 +00:00
AudioFileStreaming {
2017-01-19 22:45:24 +00:00
read_file: read_file,
2017-01-19 22:45:24 +00:00
position: 0,
2019-11-01 19:46:28 +00:00
//seek: seek_tx,
stream_loader_command_tx: stream_loader_command_tx,
2017-01-19 22:45:24 +00:00
shared: shared,
}
2015-07-07 21:40:31 +00:00
}
}
2017-01-19 22:45:24 +00:00
impl Future for AudioFileOpen {
type Item = AudioFile;
type Error = ChannelError;
2017-01-19 22:45:24 +00:00
fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
match *self {
AudioFileOpen::Streaming(ref mut open) => {
let file = try_ready!(open.poll());
Ok(Async::Ready(AudioFile::Streaming(file)))
}
2017-08-03 19:37:04 +00:00
AudioFileOpen::Cached(ref mut file) => {
let file = file.take().unwrap();
Ok(Async::Ready(AudioFile::Cached(file)))
}
}
}
}
impl Future for AudioFileOpenStreaming {
type Item = AudioFileStreaming;
type Error = ChannelError;
fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
2017-01-19 22:45:24 +00:00
loop {
let (id, data) = try_ready!(self.headers.poll()).unwrap();
2015-07-07 21:40:31 +00:00
2017-01-19 22:45:24 +00:00
if id == 0x3 {
let size = BigEndian::read_u32(&data) as usize * 4;
let file = self.finish(size);
2018-02-12 20:02:27 +00:00
2017-01-19 22:45:24 +00:00
return Ok(Async::Ready(file));
2015-07-07 21:40:31 +00:00
}
}
}
2017-01-19 22:45:24 +00:00
}
impl AudioFile {
pub fn open(session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool) -> AudioFileOpen {
let cache = session.cache().cloned();
if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
2017-01-29 17:54:32 +00:00
debug!("File {} already in cache", file_id);
2017-08-03 19:37:04 +00:00
return AudioFileOpen::Cached(Some(file));
}
2017-01-29 17:54:32 +00:00
debug!("Downloading file {}", file_id);
2017-01-19 22:45:24 +00:00
let (complete_tx, complete_rx) = oneshot::channel();
let mut initial_data_length = if play_from_beginning {
INITIAL_DOWNLOAD_SIZE + max((READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, (INITIAL_PING_TIME_ESTIMATE_SECONDS * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * bytes_per_second as f64) as usize)
} else {
INITIAL_DOWNLOAD_SIZE
};
if initial_data_length % 4 != 0 {
initial_data_length += 4 - (initial_data_length % 4);
}
2019-11-01 19:46:28 +00:00
let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
2015-07-07 21:40:31 +00:00
let open = AudioFileOpenStreaming {
session: session.clone(),
2017-01-19 22:45:24 +00:00
file_id: file_id,
headers: headers,
2019-11-01 19:46:28 +00:00
initial_data_rx: Some(data),
initial_data_length: Some(initial_data_length),
initial_request_sent_time: Instant::now(),
2015-07-07 21:40:31 +00:00
2017-01-19 22:45:24 +00:00
complete_tx: Some(complete_tx),
streaming_data_rate: bytes_per_second,
};
let session_ = session.clone();
session.spawn(move |_| {
2018-02-26 01:50:41 +00:00
complete_rx
.map(move |mut file| {
if let Some(cache) = session_.cache() {
cache.save_file(file_id, &mut file);
debug!("File {} complete, saving to cache", file_id);
} else {
debug!("File {} complete", file_id);
}
})
.or_else(|oneshot::Canceled| Ok(()))
});
return AudioFileOpen::Streaming(open);
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
2019-11-01 19:46:28 +00:00
match self {
AudioFile::Streaming(ref stream) => {
return StreamLoaderController {
2019-11-01 19:46:28 +00:00
channel_tx: Some(stream.stream_loader_command_tx.clone()),
stream_shared: Some(stream.shared.clone()),
file_size: stream.shared.file_size,
};
2019-11-01 19:46:28 +00:00
}
AudioFile::Cached(ref file) => {
return StreamLoaderController {
channel_tx: None,
stream_shared: None,
file_size: file.metadata().unwrap().len() as usize,
}
}
}
}
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
2019-11-05 11:58:00 +00:00
assert!(offset % 4 == 0, "Range request start positions must be aligned by 4 bytes.");
assert!(length % 4 == 0, "Range request range lengths must be aligned by 4 bytes.");
2019-11-01 19:46:28 +00:00
let start = offset / 4;
2019-11-05 11:58:00 +00:00
let end = (offset+length) / 4;
2017-01-19 22:45:24 +00:00
let (id, channel) = session.channel().allocate();
2019-11-05 11:58:00 +00:00
trace!("requesting range starting at {} of length {} on channel {}.", offset, length, id);
2019-11-01 19:48:18 +00:00
2017-01-19 22:45:24 +00:00
let mut data: Vec<u8> = Vec::new();
data.write_u16::<BigEndian>(id).unwrap();
data.write_u8(0).unwrap();
data.write_u8(1).unwrap();
data.write_u16::<BigEndian>(0x0000).unwrap();
data.write_u32::<BigEndian>(0x00000000).unwrap();
data.write_u32::<BigEndian>(0x00009C40).unwrap();
data.write_u32::<BigEndian>(0x00020000).unwrap();
data.write(&file.0).unwrap();
2019-11-01 19:46:28 +00:00
data.write_u32::<BigEndian>(start as u32).unwrap();
data.write_u32::<BigEndian>(end as u32).unwrap();
2017-01-19 22:45:24 +00:00
session.send_packet(0x8, data);
channel
}
2019-11-01 19:46:28 +00:00
struct PartialFileData {
offset: usize,
data: Bytes,
}
enum ReceivedData {
ResponseTimeMs(usize),
Data(PartialFileData),
}
struct AudioFileFetchDataReceiver {
shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData,
2019-11-05 11:58:00 +00:00
initial_data_offset: usize,
initial_request_length: usize,
2019-11-01 19:46:28 +00:00
data_offset: usize,
request_length: usize,
request_sent_time: Option<Instant>,
measure_ping_time: bool,
2019-11-01 19:46:28 +00:00
}
impl AudioFileFetchDataReceiver {
fn new(
shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData,
data_offset: usize,
request_length: usize,
request_sent_time: Instant,
) -> AudioFileFetchDataReceiver {
let measure_ping_time = shared.number_of_open_requests.load(atomic::Ordering::SeqCst) == 0;
shared.number_of_open_requests.fetch_add(1, atomic::Ordering::SeqCst);
2019-11-01 19:46:28 +00:00
AudioFileFetchDataReceiver {
shared: shared,
data_rx: data_rx,
file_data_tx: file_data_tx,
2019-11-05 11:58:00 +00:00
initial_data_offset: data_offset,
initial_request_length: request_length,
2019-11-01 19:46:28 +00:00
data_offset: data_offset,
request_length: request_length,
request_sent_time: Some(request_sent_time),
measure_ping_time: measure_ping_time,
2019-11-01 19:46:28 +00:00
}
}
}
impl AudioFileFetchDataReceiver {
fn finish(&mut self) {
if self.request_length > 0 {
let missing_range = Range::new(self.data_offset, self.request_length);
let mut download_status = self.shared.download_status.lock().unwrap();
download_status.requested.subtract_range(&missing_range);
self.shared.cond.notify_all();
}
self.shared.number_of_open_requests.fetch_sub(1, atomic::Ordering::SeqCst);
2019-11-01 19:46:28 +00:00
}
}
impl Future for AudioFileFetchDataReceiver {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
match self.data_rx.poll() {
Ok(Async::Ready(Some(data))) => {
if self.measure_ping_time {
if let Some(request_sent_time) = self.request_sent_time {
let duration = Instant::now() - request_sent_time;
let duration_ms: u64;
if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS {
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} else {
2019-11-07 21:58:17 +00:00
duration_ms = duration.as_millis() as u64;
}
let _ = self.file_data_tx.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
self.measure_ping_time = false;
2019-11-01 19:46:28 +00:00
}
}
let data_size = data.len();
2019-11-05 11:58:00 +00:00
trace!("data_receiver for range {} (+{}) got {} bytes of data starting at {}. ({} bytes pending).", self.initial_data_offset, self.initial_request_length, data_size, self.data_offset, self.request_length - data_size);
2019-11-01 19:46:28 +00:00
let _ = self.file_data_tx.unbounded_send(ReceivedData::Data(PartialFileData { offset: self.data_offset, data: data, }));
self.data_offset += data_size;
if self.request_length < data_size {
2019-11-05 11:58:00 +00:00
warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
2019-11-01 19:46:28 +00:00
self.request_length = 0;
} else {
self.request_length -= data_size;
}
if self.request_length == 0 {
2019-11-05 11:58:00 +00:00
trace!("Data receiver for range {} (+{}) completed.", self.initial_data_offset, self.initial_request_length);
self.finish();
2019-11-01 19:46:28 +00:00
return Ok(Async::Ready(()));
}
}
Ok(Async::Ready(None)) => {
if self.request_length > 0 {
2019-11-05 11:58:00 +00:00
warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
2019-11-01 19:46:28 +00:00
}
self.finish();
2019-11-01 19:46:28 +00:00
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Err(ChannelError) => {
2019-11-05 11:58:00 +00:00
warn!("Error from channel for data receiver for range {} (+{}).", self.initial_data_offset, self.initial_request_length);
2019-11-01 19:46:28 +00:00
self.finish();
return Ok(Async::Ready(()));
}
}
}
}
}
2017-01-19 22:45:24 +00:00
struct AudioFileFetch {
session: Session,
shared: Arc<AudioFileShared>,
output: Option<NamedTempFile>,
2019-11-01 19:46:28 +00:00
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
2017-01-19 22:45:24 +00:00
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
2019-11-01 19:46:28 +00:00
network_response_times_ms: Vec<usize>,
2017-01-19 22:45:24 +00:00
}
impl AudioFileFetch {
2018-02-26 01:50:41 +00:00
fn new(
session: Session,
shared: Arc<AudioFileShared>,
2019-11-01 19:46:28 +00:00
initial_data_rx: ChannelData,
initial_request_sent_time: Instant,
initial_data_length: usize,
2018-02-26 01:50:41 +00:00
output: NamedTempFile,
2019-11-01 19:46:28 +00:00
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
2018-02-26 01:50:41 +00:00
complete_tx: oneshot::Sender<NamedTempFile>,
) -> AudioFileFetch {
2019-11-01 19:46:28 +00:00
let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
{
let requested_range = Range::new(0, initial_data_length);
let mut download_status = shared.download_status.lock().unwrap();
download_status.requested.add_range(&requested_range);
}
let initial_data_receiver = AudioFileFetchDataReceiver::new(
shared.clone(),
file_data_tx.clone(),
initial_data_rx,
0,
initial_data_length,
initial_request_sent_time,
);
session.spawn(move |_| initial_data_receiver);
2017-01-19 22:45:24 +00:00
AudioFileFetch {
session: session,
shared: shared,
output: Some(output),
2019-11-01 19:46:28 +00:00
file_data_tx: file_data_tx,
file_data_rx: file_data_rx,
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
stream_loader_command_rx: stream_loader_command_rx,
2017-01-19 22:45:24 +00:00
complete_tx: Some(complete_tx),
2019-11-01 19:46:28 +00:00
network_response_times_ms: Vec::new(),
2015-07-07 21:40:31 +00:00
}
2015-06-23 17:34:48 +00:00
}
2015-06-24 00:41:39 +00:00
fn get_download_strategy(&mut self) -> DownloadStrategy {
*(self.shared.download_strategy.lock().unwrap())
}
2019-11-01 19:46:28 +00:00
fn download_range(&mut self, mut offset: usize, mut length: usize) {
2015-06-23 17:34:48 +00:00
if length < MINIMUM_DOWNLOAD_SIZE {
length = MINIMUM_DOWNLOAD_SIZE;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
// ensure the values are within the bounds and align them by 4 for the spotify protocol.
if offset >= self.shared.file_size {
return;
}
2015-06-24 00:41:39 +00:00
2019-11-01 19:46:28 +00:00
if length <= 0 {
return;
}
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
if offset + length > self.shared.file_size {
length = self.shared.file_size - offset;
}
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
if offset % 4 != 0 {
length += offset % 4;
offset -= offset % 4;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
if length % 4 != 0 {
length += 4 - (length % 4);
}
let mut ranges_to_request = RangeSet::new();
ranges_to_request.add_range(&Range::new(offset, length));
let mut download_status = self.shared.download_status.lock().unwrap();
ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested);
for range in ranges_to_request.iter() {
let (_headers, data) = request_range(&self.session, self.shared.file_id, range.start, range.length).split();
download_status.requested.add_range(range);
let receiver = AudioFileFetchDataReceiver::new(
self.shared.clone(),
self.file_data_tx.clone(),
data,
range.start,
range.length,
Instant::now(),
);
self.session.spawn(move |_| receiver);
}
}
2015-07-07 21:40:31 +00:00
fn pre_fetch_more_data(&mut self, bytes: usize) {
2019-11-01 19:46:28 +00:00
let mut bytes_to_go = bytes;
2019-11-01 19:46:28 +00:00
while bytes_to_go > 0 {
2019-11-01 19:46:28 +00:00
// determine what is still missing
let mut missing_data = RangeSet::new();
missing_data.add_range(&Range::new(0, self.shared.file_size));
{
let download_status = self.shared.download_status.lock().unwrap();
missing_data.subtract_range_set(&download_status.downloaded);
missing_data.subtract_range_set(&download_status.requested);
}
2019-11-01 19:46:28 +00:00
// download data from after the current read position first
let mut tail_end = RangeSet::new();
let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
tail_end.add_range(&Range::new(read_position, self.shared.file_size - read_position));
let tail_end = tail_end.intersection(&missing_data);
if !tail_end.is_empty() {
let range = tail_end.get_range(0);
let offset = range.start;
let length = min(range.length, bytes_to_go);
self.download_range(offset, length);
bytes_to_go -= length;
} else if !missing_data.is_empty() {
// ok, the tail is downloaded, download something fom the beginning.
let range = missing_data.get_range(0);
let offset = range.start;
let length = min(range.length, bytes_to_go);
self.download_range(offset, length);
bytes_to_go -= length;
} else {
return;
}
2019-11-01 19:46:28 +00:00
}
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:48:18 +00:00
2019-11-01 19:46:28 +00:00
fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
2017-01-19 22:45:24 +00:00
loop {
2019-11-01 19:46:28 +00:00
match self.file_data_rx.poll() {
2017-01-19 22:45:24 +00:00
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(()));
}
2019-11-01 19:46:28 +00:00
Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
2019-11-05 11:58:00 +00:00
trace!("Received ping time estimate: {} ms.", response_time_ms);
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
// record the response time
self.network_response_times_ms.push(response_time_ms);
2017-01-19 22:45:24 +00:00
2019-11-01 23:00:08 +00:00
// prune old response times. Keep at most three.
2019-11-01 19:46:28 +00:00
while self.network_response_times_ms.len() > 3 {
self.network_response_times_ms.remove(0);
}
// stats::median is experimental. So we calculate the median of up to three ourselves.
let ping_time_ms: usize = match self.network_response_times_ms.len() {
1 => self.network_response_times_ms[0] as usize,
2 => ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) / 2) as usize,
3 => {
let mut times = self.network_response_times_ms.clone();
times.sort();
times[1]
}
_ => unreachable!(),
};
// store our new estimate for everyone to see
self.shared.ping_time_ms.store(ping_time_ms, atomic::Ordering::Relaxed);
},
Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
self.output
.as_mut()
.unwrap()
.seek(SeekFrom::Start(data.offset as u64))
.unwrap();
self.output.as_mut().unwrap().write_all(data.data.as_ref()).unwrap();
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
let mut full = false;
{
let mut download_status = self.shared.download_status.lock().unwrap();
let received_range = Range::new(data.offset, data.data.len());
download_status.downloaded.add_range(&received_range);
2017-01-19 22:45:24 +00:00
self.shared.cond.notify_all();
2019-11-01 19:46:28 +00:00
if download_status.downloaded.contained_length_from_value(0) >= self.shared.file_size {
full = true;
}
2019-11-05 11:58:00 +00:00
2019-11-05 12:58:35 +00:00
trace!("Downloaded: {} Requested: {}", download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
2019-11-05 11:58:00 +00:00
2019-11-01 19:46:28 +00:00
drop(download_status);
}
2017-01-19 22:45:24 +00:00
if full {
self.finish();
return Ok(Async::Ready(()));
}
2019-11-01 19:46:28 +00:00
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
},
Err(()) => unreachable!(),
}
}
}
fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
loop {
match self.stream_loader_command_rx.poll() {
2019-11-02 00:04:46 +00:00
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(()));
}
2019-11-01 19:46:28 +00:00
Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
self.download_range(request.start, request.length);
}
Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
*(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess();
2019-11-01 19:46:28 +00:00
}
Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
*(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming();
2019-11-01 19:46:28 +00:00
}
Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
2017-01-19 22:45:24 +00:00
return Ok(Async::Ready(()));
2018-02-26 01:50:41 +00:00
}
2019-11-01 19:46:28 +00:00
Ok(Async::NotReady) => {
return Ok(Async::NotReady)
},
Err(()) => unreachable!(),
}
}
}
fn finish(&mut self) {
trace!("====== FINISHED DOWNLOADING FILE! ======");
let mut output = self.output.take().unwrap();
let complete_tx = self.complete_tx.take().unwrap();
output.seek(SeekFrom::Start(0)).unwrap();
let _ = complete_tx.send(output);
}
}
impl Future for AudioFileFetch {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
match self.poll_stream_loader_command_rx() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
Err(()) => unreachable!(),
}
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
match self.poll_file_data_rx() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
Err(()) => unreachable!(),
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
if let DownloadStrategy::Streaming() = self.get_download_strategy() {
2019-11-01 19:46:28 +00:00
let bytes_pending: usize = {
let download_status = self.shared.download_status.lock().unwrap();
download_status.requested.minus(&download_status.downloaded).len()
};
2019-11-07 21:58:17 +00:00
let ping_time_seconds = 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
let download_rate = self.session.channel().get_download_rate_estimate();
2019-11-01 19:46:28 +00:00
let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) as usize,
2019-11-07 21:58:17 +00:00
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize
);
if bytes_pending < desired_pending_bytes {
2019-11-07 21:58:17 +00:00
trace!("Prefetching more data. pending: {}, desired: {}, ping: {}, rate: {}", bytes_pending, desired_pending_bytes, ping_time_seconds, download_rate);
self.pre_fetch_more_data(desired_pending_bytes - bytes_pending);
2019-11-01 19:46:28 +00:00
}
}
return Ok(Async::NotReady)
2015-06-23 17:34:48 +00:00
}
2015-06-23 14:38:29 +00:00
}
impl Read for AudioFileStreaming {
2015-06-23 14:38:29 +00:00
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
2019-11-01 19:46:28 +00:00
let offset = self.position as usize;
if offset >= self.shared.file_size {
return Ok(0);
}
let length = min(output.len(), self.shared.file_size - offset);
2015-06-23 14:38:29 +00:00
let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
DownloadStrategy::RandomAccess() => { length }
DownloadStrategy::Streaming() => {
// Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
let ping_time_seconds = 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
let length_to_request = length + max(
(READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) as usize,
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time_seconds * self.shared.stream_data_rate as f64) as usize
);
min(length_to_request, self.shared.file_size - offset)
}
};
2015-07-07 21:40:31 +00:00
2019-11-01 19:46:28 +00:00
let mut ranges_to_request = RangeSet::new();
ranges_to_request.add_range(&Range::new(offset, length_to_request));
2019-11-01 19:46:28 +00:00
2019-11-02 00:04:46 +00:00
trace!("reading at postion {} (length : {})", offset, length);
2019-11-01 19:46:28 +00:00
let mut download_status = self.shared.download_status.lock().unwrap();
ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested);
for range in ranges_to_request.iter() {
2019-11-02 00:04:46 +00:00
trace!("requesting data at position {} (length : {})", range.start, range.length);
2019-11-01 19:46:28 +00:00
self.stream_loader_command_tx.unbounded_send(StreamLoaderCommand::Fetch(range.clone())).unwrap();
}
if length == 0 {
return Ok(0);
}
2019-11-01 19:46:28 +00:00
while !download_status.downloaded.contains(offset) {
2019-11-02 00:04:46 +00:00
trace!("waiting for download");
2019-11-01 19:46:28 +00:00
download_status = self.shared.cond.wait_timeout(download_status, Duration::from_millis(1000)).unwrap().0;
2019-11-02 00:04:46 +00:00
trace!("re-checking data availability at offset {}.", offset);
2019-11-01 19:46:28 +00:00
}
let available_length = download_status.downloaded.contained_length_from_value(offset);
assert!(available_length > 0);
drop(download_status);
self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
let read_len = min(length, available_length);
let read_len = try!(self.read_file.read(&mut output[..read_len]));
2019-11-05 11:58:00 +00:00
trace!("read successfully at postion {} (length : {})", offset, read_len);
2015-06-23 14:38:29 +00:00
2015-09-01 11:20:37 +00:00
self.position += read_len as u64;
2019-11-01 19:46:28 +00:00
self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);
2015-06-23 14:38:29 +00:00
2019-11-01 19:46:28 +00:00
return Ok(read_len);
2015-06-23 14:38:29 +00:00
}
}
impl Seek for AudioFileStreaming {
2015-07-07 21:40:31 +00:00
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.position = try!(self.read_file.seek(pos));
// Do not seek past EOF
2019-11-01 19:46:28 +00:00
self.shared.read_position.store(self.position as usize, atomic::Ordering::Relaxed);
2017-01-19 22:45:24 +00:00
Ok(self.position)
2015-06-23 14:38:29 +00:00
}
}
impl Read for AudioFile {
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
match *self {
AudioFile::Cached(ref mut file) => file.read(output),
AudioFile::Streaming(ref mut file) => file.read(output),
}
}
}
impl Seek for AudioFile {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
match *self {
AudioFile::Cached(ref mut file) => file.seek(pos),
AudioFile::Streaming(ref mut file) => file.seek(pos),
}
}
}