librespot/audio/src/fetch/mod.rs

512 lines
19 KiB
Rust
Raw Normal View History

2021-02-28 10:36:15 +00:00
mod receive;
use std::cmp::{max, min};
2015-07-07 21:40:31 +00:00
use std::fs;
2021-02-28 10:36:15 +00:00
use std::io::{self, Read, Seek, SeekFrom};
use std::sync::atomic::{self, AtomicUsize};
2017-01-19 22:45:24 +00:00
use std::sync::{Arc, Condvar, Mutex};
2019-11-01 19:46:28 +00:00
use std::time::{Duration, Instant};
2015-06-23 14:38:29 +00:00
2021-02-28 10:36:15 +00:00
use byteorder::{BigEndian, ByteOrder};
use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt};
2021-02-28 10:36:15 +00:00
use librespot_core::channel::{ChannelData, ChannelError, ChannelHeaders};
2019-09-16 19:00:09 +00:00
use librespot_core::session::Session;
use librespot_core::spotify_id::FileId;
use tempfile::NamedTempFile;
use tokio::sync::{mpsc, oneshot};
2021-02-28 10:36:15 +00:00
use self::receive::{audio_file_fetch, request_range};
use crate::range_set::{Range, RangeSet};
2015-06-23 14:38:29 +00:00
const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
// The minimum size of a block that is requested from the Spotify servers in one request.
// This is the block size that is typically requested while doing a seek() on a file.
// Note: smaller requests can happen if part of the block is downloaded already.
2019-11-11 07:22:41 +00:00
const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16;
// The amount of data that is requested when initially opening a file.
// Note: if the file is opened to play from the beginning, the amount of data to
// read ahead is requested in addition to this amount. If the file is opened to seek to
// another position, then only this amount is requested on the first request.
const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
// The pig time that is used for calculations before a ping time was actually measured.
const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
// If the measured ping time to the Spotify server is larger than this value, it is capped
// to avoid run-away block sizes and pre-fetching.
pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
// Before playback starts, this many seconds of data must be present.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
// time to the Spotify server.
// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
// obeyed.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
2019-11-17 23:54:44 +00:00
pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0;
// While playing back, this many seconds of data ahead of the current read position are
// requested.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
2019-11-17 23:54:44 +00:00
pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0;
// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
// time to the Spotify server.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
// If the amount of data that is pending (requested but not received) is less than a certain amount,
// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
// data is calculated as
// <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
// The formula used is
// <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
// This mechanism allows for fast downloading of the remainder of the file. The number should be larger
// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
2019-11-17 23:54:44 +00:00
const MAX_PREFETCH_REQUESTS: usize = 4;
// Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
// requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
// for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
// pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
pub enum AudioFile {
Cached(fs::File),
Streaming(AudioFileStreaming),
}
2015-07-07 21:40:31 +00:00
#[derive(Debug)]
2019-11-11 07:22:41 +00:00
enum StreamLoaderCommand {
Fetch(Range), // signal the stream loader to fetch a range of the file
2019-11-01 19:46:28 +00:00
RandomAccessMode(), // optimise download strategy for random access
2019-11-11 07:22:41 +00:00
StreamMode(), // optimise download strategy for streaming
Close(), // terminate and don't load any more data
2019-11-01 19:46:28 +00:00
}
#[derive(Clone)]
pub struct StreamLoaderController {
channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
stream_shared: Option<Arc<AudioFileShared>>,
file_size: usize,
}
impl StreamLoaderController {
pub fn len(&self) -> usize {
2021-01-21 21:12:35 +00:00
self.file_size
}
pub fn is_empty(&self) -> bool {
self.file_size == 0
2019-11-01 19:46:28 +00:00
}
pub fn range_available(&self, range: Range) -> bool {
if let Some(ref shared) = self.stream_shared {
2019-11-01 21:38:46 +00:00
let download_status = shared.download_status.lock().unwrap();
2021-01-21 21:12:35 +00:00
range.length
2019-11-11 07:22:41 +00:00
<= download_status
.downloaded
.contained_length_from_value(range.start)
2019-11-01 19:46:28 +00:00
} else {
2021-01-21 21:12:35 +00:00
range.length <= self.len() - range.start
2019-11-01 19:46:28 +00:00
}
}
pub fn range_to_end_available(&self) -> bool {
2021-01-21 21:12:35 +00:00
self.stream_shared.as_ref().map_or(true, |shared| {
let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
self.range_available(Range::new(read_position, self.len() - read_position))
2021-01-21 21:12:35 +00:00
})
}
2019-11-01 19:46:28 +00:00
pub fn ping_time_ms(&self) -> usize {
2021-01-21 21:12:35 +00:00
self.stream_shared.as_ref().map_or(0, |shared| {
shared.ping_time_ms.load(atomic::Ordering::Relaxed)
})
2019-11-01 19:46:28 +00:00
}
2021-03-01 02:37:22 +00:00
fn send_stream_loader_command(&self, command: StreamLoaderCommand) {
if let Some(ref channel) = self.channel_tx {
2019-11-01 19:46:28 +00:00
// ignore the error in case the channel has been closed already.
let _ = channel.send(command);
2019-11-01 19:46:28 +00:00
}
}
2021-03-01 02:37:22 +00:00
pub fn fetch(&self, range: Range) {
2019-11-01 19:46:28 +00:00
// signal the stream loader to fetch a range of the file
self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
}
2021-03-01 02:37:22 +00:00
pub fn fetch_blocking(&self, mut range: Range) {
2019-11-01 19:46:28 +00:00
// signal the stream loader to tech a range of the file and block until it is loaded.
// ensure the range is within the file's bounds.
if range.start >= self.len() {
range.length = 0;
} else if range.end() > self.len() {
range.length = self.len() - range.start;
}
self.fetch(range);
if let Some(ref shared) = self.stream_shared {
let mut download_status = shared.download_status.lock().unwrap();
2019-11-11 07:22:41 +00:00
while range.length
> download_status
.downloaded
.contained_length_from_value(range.start)
{
download_status = shared
.cond
.wait_timeout(download_status, Duration::from_millis(1000))
.unwrap()
.0;
if range.length
> (download_status
.downloaded
.union(&download_status.requested)
.contained_length_from_value(range.start))
{
2019-11-01 19:46:28 +00:00
// For some reason, the requested range is neither downloaded nor requested.
// This could be due to a network error. Request it again.
2021-03-01 02:37:22 +00:00
self.fetch(range);
2019-11-01 19:46:28 +00:00
}
}
}
}
2021-03-01 02:37:22 +00:00
pub fn fetch_next(&self, length: usize) {
2021-01-21 21:12:35 +00:00
if let Some(ref shared) = self.stream_shared {
let range = Range {
2019-11-01 19:46:28 +00:00
start: shared.read_position.load(atomic::Ordering::Relaxed),
2021-03-01 02:37:22 +00:00
length,
2021-01-21 21:12:35 +00:00
};
self.fetch(range)
}
2019-11-01 19:46:28 +00:00
}
2021-03-01 02:37:22 +00:00
pub fn fetch_next_blocking(&self, length: usize) {
2021-01-21 21:12:35 +00:00
if let Some(ref shared) = self.stream_shared {
let range = Range {
2019-11-01 19:46:28 +00:00
start: shared.read_position.load(atomic::Ordering::Relaxed),
2021-03-01 02:37:22 +00:00
length,
2021-01-21 21:12:35 +00:00
};
self.fetch_blocking(range);
}
2019-11-01 19:46:28 +00:00
}
2021-03-01 02:37:22 +00:00
pub fn set_random_access_mode(&self) {
2019-11-01 19:46:28 +00:00
// optimise download strategy for random access
self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
}
2021-03-01 02:37:22 +00:00
pub fn set_stream_mode(&self) {
2019-11-01 19:46:28 +00:00
// optimise download strategy for streaming
self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
}
2021-03-01 02:37:22 +00:00
pub fn close(&self) {
2019-11-01 19:46:28 +00:00
// terminate stream loading and don't load any more data for this file.
self.send_stream_loader_command(StreamLoaderCommand::Close());
}
}
pub struct AudioFileStreaming {
read_file: fs::File,
position: u64,
2019-11-01 19:46:28 +00:00
stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
shared: Arc<AudioFileShared>,
}
2019-11-01 19:46:28 +00:00
struct AudioFileDownloadStatus {
requested: RangeSet,
downloaded: RangeSet,
}
#[derive(Copy, Clone, PartialEq, Eq)]
enum DownloadStrategy {
RandomAccess(),
Streaming(),
}
struct AudioFileShared {
2017-01-19 22:45:24 +00:00
file_id: FileId,
2019-11-01 19:46:28 +00:00
file_size: usize,
stream_data_rate: usize,
2015-07-07 21:40:31 +00:00
cond: Condvar,
2019-11-01 19:46:28 +00:00
download_status: Mutex<AudioFileDownloadStatus>,
download_strategy: Mutex<DownloadStrategy>,
2021-04-13 18:34:26 +00:00
number_of_open_requests: AtomicUsize,
2019-11-01 19:46:28 +00:00
ping_time_ms: AtomicUsize,
read_position: AtomicUsize,
2015-06-23 14:38:29 +00:00
}
2021-01-21 21:12:35 +00:00
impl AudioFile {
pub async fn open(
session: &Session,
file_id: FileId,
bytes_per_second: usize,
play_from_beginning: bool,
) -> Result<AudioFile, ChannelError> {
if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
debug!("File {} already in cache", file_id);
return Ok(AudioFile::Cached(file));
}
debug!("Downloading file {}", file_id);
let (complete_tx, complete_rx) = oneshot::channel();
let mut initial_data_length = if play_from_beginning {
INITIAL_DOWNLOAD_SIZE
+ max(
(READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
(INITIAL_PING_TIME_ESTIMATE_SECONDS
* READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* bytes_per_second as f64) as usize,
)
} else {
INITIAL_DOWNLOAD_SIZE
};
if initial_data_length % 4 != 0 {
initial_data_length += 4 - (initial_data_length % 4);
}
let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
let streaming = AudioFileStreaming::open(
session.clone(),
data,
initial_data_length,
Instant::now(),
headers,
file_id,
complete_tx,
bytes_per_second,
);
let session_ = session.clone();
session.spawn(complete_rx.map_ok(move |mut file| {
if let Some(cache) = session_.cache() {
debug!("File {} complete, saving to cache", file_id);
cache.save_file(file_id, &mut file);
2021-01-21 21:12:35 +00:00
} else {
debug!("File {} complete", file_id);
}
}));
Ok(AudioFile::Streaming(streaming.await?))
}
pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
match self {
AudioFile::Streaming(ref stream) => StreamLoaderController {
channel_tx: Some(stream.stream_loader_command_tx.clone()),
stream_shared: Some(stream.shared.clone()),
file_size: stream.shared.file_size,
},
AudioFile::Cached(ref file) => StreamLoaderController {
channel_tx: None,
stream_shared: None,
file_size: file.metadata().unwrap().len() as usize,
},
}
}
2021-02-02 01:18:58 +00:00
pub fn is_cached(&self) -> bool {
2021-03-01 02:37:22 +00:00
matches!(self, AudioFile::Cached { .. })
2021-02-02 01:18:58 +00:00
}
2021-01-21 21:12:35 +00:00
}
impl AudioFileStreaming {
pub async fn open(
session: Session,
initial_data_rx: ChannelData,
initial_data_length: usize,
initial_request_sent_time: Instant,
headers: ChannelHeaders,
file_id: FileId,
complete_tx: oneshot::Sender<NamedTempFile>,
streaming_data_rate: usize,
) -> Result<AudioFileStreaming, ChannelError> {
let (_, data) = headers
.try_filter(|(id, _)| future::ready(*id == 0x3))
.next()
.await
.unwrap()?;
let size = BigEndian::read_u32(&data) as usize * 4;
2015-07-07 21:40:31 +00:00
let shared = Arc::new(AudioFileShared {
2021-03-01 02:37:22 +00:00
file_id,
2019-11-01 19:46:28 +00:00
file_size: size,
2021-01-21 21:12:35 +00:00
stream_data_rate: streaming_data_rate,
2015-07-07 21:40:31 +00:00
cond: Condvar::new(),
2019-11-11 07:22:41 +00:00
download_status: Mutex::new(AudioFileDownloadStatus {
requested: RangeSet::new(),
downloaded: RangeSet::new(),
}),
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
2021-04-13 18:34:26 +00:00
number_of_open_requests: AtomicUsize::new(0),
2019-11-01 19:46:28 +00:00
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
2015-07-07 21:40:31 +00:00
});
2017-01-19 22:45:24 +00:00
let mut write_file = NamedTempFile::new().unwrap();
2019-11-01 21:38:46 +00:00
write_file.as_file().set_len(size as u64).unwrap();
2017-01-19 22:45:24 +00:00
write_file.seek(SeekFrom::Start(0)).unwrap();
2017-01-19 22:45:24 +00:00
let read_file = write_file.reopen().unwrap();
2015-09-01 11:20:37 +00:00
2019-11-01 19:46:28 +00:00
//let (seek_tx, seek_rx) = mpsc::unbounded();
2019-11-11 07:22:41 +00:00
let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded_channel::<StreamLoaderCommand>();
2015-07-07 21:40:31 +00:00
session.spawn(audio_file_fetch(
2021-01-21 21:12:35 +00:00
session.clone(),
2018-02-26 01:50:41 +00:00
shared.clone(),
2019-11-01 19:46:28 +00:00
initial_data_rx,
2021-01-21 21:12:35 +00:00
initial_request_sent_time,
2019-11-01 19:46:28 +00:00
initial_data_length,
2018-02-26 01:50:41 +00:00
write_file,
2019-11-01 19:46:28 +00:00
stream_loader_command_rx,
2018-02-26 01:50:41 +00:00
complete_tx,
));
2015-06-23 14:38:29 +00:00
2021-01-21 21:12:35 +00:00
Ok(AudioFileStreaming {
2021-03-01 02:37:22 +00:00
read_file,
2017-01-19 22:45:24 +00:00
position: 0,
2021-03-01 02:37:22 +00:00
stream_loader_command_tx,
shared,
2021-01-21 21:12:35 +00:00
})
2019-11-01 19:46:28 +00:00
}
2017-01-19 22:45:24 +00:00
}
impl Read for AudioFileStreaming {
2015-06-23 14:38:29 +00:00
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
2019-11-01 19:46:28 +00:00
let offset = self.position as usize;
if offset >= self.shared.file_size {
return Ok(0);
}
let length = min(output.len(), self.shared.file_size - offset);
2015-06-23 14:38:29 +00:00
let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
2019-11-11 07:22:41 +00:00
DownloadStrategy::RandomAccess() => length,
DownloadStrategy::Streaming() => {
// Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
2019-11-11 07:22:41 +00:00
let ping_time_seconds =
0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
let length_to_request = length
+ max(
(READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64)
as usize,
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* ping_time_seconds
* self.shared.stream_data_rate as f64) as usize,
);
min(length_to_request, self.shared.file_size - offset)
}
};
2015-07-07 21:40:31 +00:00
2019-11-01 19:46:28 +00:00
let mut ranges_to_request = RangeSet::new();
ranges_to_request.add_range(&Range::new(offset, length_to_request));
2019-11-01 19:46:28 +00:00
let mut download_status = self.shared.download_status.lock().unwrap();
ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested);
2021-01-21 21:12:35 +00:00
for &range in ranges_to_request.iter() {
2019-11-11 07:22:41 +00:00
self.stream_loader_command_tx
.send(StreamLoaderCommand::Fetch(range))
2019-11-11 07:22:41 +00:00
.unwrap();
2019-11-01 19:46:28 +00:00
}
if length == 0 {
return Ok(0);
}
2019-11-11 07:43:41 +00:00
let mut download_message_printed = false;
2019-11-01 19:46:28 +00:00
while !download_status.downloaded.contains(offset) {
if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() {
if !download_message_printed {
debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
download_message_printed = true;
}
2019-11-11 07:43:41 +00:00
}
2019-11-11 07:22:41 +00:00
download_status = self
.shared
.cond
.wait_timeout(download_status, Duration::from_millis(1000))
.unwrap()
.0;
2019-11-01 19:46:28 +00:00
}
2020-01-17 17:11:07 +00:00
let available_length = download_status
.downloaded
.contained_length_from_value(offset);
2019-11-01 19:46:28 +00:00
assert!(available_length > 0);
drop(download_status);
self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
let read_len = min(length, available_length);
let read_len = self.read_file.read(&mut output[..read_len])?;
2019-11-01 19:46:28 +00:00
2019-11-11 07:43:41 +00:00
if download_message_printed {
2019-11-18 00:08:34 +00:00
debug!(
"Read at postion {} completed. {} bytes returned, {} bytes were requested.",
offset,
read_len,
output.len()
);
2019-11-11 07:43:41 +00:00
}
2015-06-23 14:38:29 +00:00
2015-09-01 11:20:37 +00:00
self.position += read_len as u64;
2019-11-11 07:22:41 +00:00
self.shared
.read_position
.store(self.position as usize, atomic::Ordering::Relaxed);
2019-11-01 19:46:28 +00:00
2021-01-21 21:12:35 +00:00
Ok(read_len)
2015-06-23 14:38:29 +00:00
}
}
impl Seek for AudioFileStreaming {
2015-07-07 21:40:31 +00:00
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.position = self.read_file.seek(pos)?;
// Do not seek past EOF
2019-11-11 07:22:41 +00:00
self.shared
.read_position
.store(self.position as usize, atomic::Ordering::Relaxed);
2017-01-19 22:45:24 +00:00
Ok(self.position)
2015-06-23 14:38:29 +00:00
}
}
impl Read for AudioFile {
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
match *self {
AudioFile::Cached(ref mut file) => file.read(output),
AudioFile::Streaming(ref mut file) => file.read(output),
}
}
}
impl Seek for AudioFile {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
match *self {
AudioFile::Cached(ref mut file) => file.seek(pos),
AudioFile::Streaming(ref mut file) => file.seek(pos),
}
}
}