2020-01-17 18:09:10 +00:00
use crate ::range_set ::{ Range , RangeSet } ;
2018-02-26 01:50:41 +00:00
use byteorder ::{ BigEndian , ByteOrder , WriteBytesExt } ;
2019-11-01 19:46:28 +00:00
use bytes ::Bytes ;
2021-01-21 21:12:35 +00:00
use futures ::{
channel ::{ mpsc , oneshot } ,
future ,
} ;
use futures ::{ Future , Stream , StreamExt , TryFutureExt , TryStreamExt } ;
2015-07-07 21:40:31 +00:00
use std ::fs ;
2018-02-26 01:50:41 +00:00
use std ::io ::{ self , Read , Seek , SeekFrom , Write } ;
2017-01-19 22:45:24 +00:00
use std ::sync ::{ Arc , Condvar , Mutex } ;
2021-01-21 21:12:35 +00:00
use std ::task ::Poll ;
2019-11-01 19:46:28 +00:00
use std ::time ::{ Duration , Instant } ;
2021-01-21 21:12:35 +00:00
use std ::{
cmp ::{ max , min } ,
pin ::Pin ,
task ::Context ,
} ;
2016-01-12 23:29:31 +00:00
use tempfile ::NamedTempFile ;
2015-06-23 14:38:29 +00:00
2021-01-21 21:12:35 +00:00
use futures ::channel ::mpsc ::unbounded ;
2019-09-16 19:00:09 +00:00
use librespot_core ::channel ::{ Channel , ChannelData , ChannelError , ChannelHeaders } ;
use librespot_core ::session ::Session ;
use librespot_core ::spotify_id ::FileId ;
2019-11-01 19:46:28 +00:00
use std ::sync ::atomic ;
use std ::sync ::atomic ::AtomicUsize ;
2015-06-23 14:38:29 +00:00
2019-11-07 13:02:53 +00:00
const MINIMUM_DOWNLOAD_SIZE : usize = 1024 * 16 ;
// The minimum size of a block that is requested from the Spotify servers in one request.
// This is the block size that is typically requested while doing a seek() on a file.
// Note: smaller requests can happen if part of the block is downloaded already.
2019-11-11 07:22:41 +00:00
const INITIAL_DOWNLOAD_SIZE : usize = 1024 * 16 ;
2019-11-07 13:02:53 +00:00
// The amount of data that is requested when initially opening a file.
// Note: if the file is opened to play from the beginning, the amount of data to
// read ahead is requested in addition to this amount. If the file is opened to seek to
// another position, then only this amount is requested on the first request.
const INITIAL_PING_TIME_ESTIMATE_SECONDS : f64 = 0.5 ;
// The pig time that is used for calculations before a ping time was actually measured.
const MAXIMUM_ASSUMED_PING_TIME_SECONDS : f64 = 1.5 ;
// If the measured ping time to the Spotify server is larger than this value, it is capped
// to avoid run-away block sizes and pre-fetching.
pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS : f64 = 1.0 ;
// Before playback starts, this many seconds of data must be present.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS : f64 = 2.0 ;
// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
// time to the Spotify server.
// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
// obeyed.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
2019-11-17 23:54:44 +00:00
pub const READ_AHEAD_DURING_PLAYBACK_SECONDS : f64 = 5.0 ;
2019-11-07 13:02:53 +00:00
// While playing back, this many seconds of data ahead of the current read position are
// requested.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
2019-11-17 23:54:44 +00:00
pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS : f64 = 10.0 ;
2019-11-07 13:02:53 +00:00
// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
// time to the Spotify server.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
const PREFETCH_THRESHOLD_FACTOR : f64 = 4.0 ;
// If the amount of data that is pending (requested but not received) is less than a certain amount,
// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
// data is calculated as
// <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
const FAST_PREFETCH_THRESHOLD_FACTOR : f64 = 1.5 ;
// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
// The formula used is
// <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
// This mechanism allows for fast downloading of the remainder of the file. The number should be larger
// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
2019-11-17 23:54:44 +00:00
const MAX_PREFETCH_REQUESTS : usize = 4 ;
// Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
// requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
// for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
// pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
2017-01-29 15:36:39 +00:00
pub enum AudioFile {
Cached ( fs ::File ) ,
Streaming ( AudioFileStreaming ) ,
}
2015-07-07 21:40:31 +00:00
2019-11-11 07:22:41 +00:00
enum StreamLoaderCommand {
Fetch ( Range ) , // signal the stream loader to fetch a range of the file
2019-11-01 19:46:28 +00:00
RandomAccessMode ( ) , // optimise download strategy for random access
2019-11-11 07:22:41 +00:00
StreamMode ( ) , // optimise download strategy for streaming
Close ( ) , // terminate and don't load any more data
2019-11-01 19:46:28 +00:00
}
#[ derive(Clone) ]
pub struct StreamLoaderController {
channel_tx : Option < mpsc ::UnboundedSender < StreamLoaderCommand > > ,
stream_shared : Option < Arc < AudioFileShared > > ,
file_size : usize ,
}
impl StreamLoaderController {
pub fn len ( & self ) -> usize {
2021-01-21 21:12:35 +00:00
self . file_size
}
pub fn is_empty ( & self ) -> bool {
self . file_size = = 0
2019-11-01 19:46:28 +00:00
}
pub fn range_available ( & self , range : Range ) -> bool {
if let Some ( ref shared ) = self . stream_shared {
2019-11-01 21:38:46 +00:00
let download_status = shared . download_status . lock ( ) . unwrap ( ) ;
2021-01-21 21:12:35 +00:00
range . length
2019-11-11 07:22:41 +00:00
< = download_status
. downloaded
. contained_length_from_value ( range . start )
2019-11-01 19:46:28 +00:00
} else {
2021-01-21 21:12:35 +00:00
range . length < = self . len ( ) - range . start
2019-11-01 19:46:28 +00:00
}
}
2020-01-31 21:41:11 +00:00
pub fn range_to_end_available ( & self ) -> bool {
2021-01-21 21:12:35 +00:00
self . stream_shared . as_ref ( ) . map_or ( true , | shared | {
2020-01-31 21:41:11 +00:00
let read_position = shared . read_position . load ( atomic ::Ordering ::Relaxed ) ;
self . range_available ( Range ::new ( read_position , self . len ( ) - read_position ) )
2021-01-21 21:12:35 +00:00
} )
2020-01-31 21:41:11 +00:00
}
2019-11-01 19:46:28 +00:00
pub fn ping_time_ms ( & self ) -> usize {
2021-01-21 21:12:35 +00:00
self . stream_shared . as_ref ( ) . map_or ( 0 , | shared | {
shared . ping_time_ms . load ( atomic ::Ordering ::Relaxed )
} )
2019-11-01 19:46:28 +00:00
}
fn send_stream_loader_command ( & mut self , command : StreamLoaderCommand ) {
if let Some ( ref mut channel ) = self . channel_tx {
// ignore the error in case the channel has been closed already.
let _ = channel . unbounded_send ( command ) ;
}
}
pub fn fetch ( & mut self , range : Range ) {
// signal the stream loader to fetch a range of the file
self . send_stream_loader_command ( StreamLoaderCommand ::Fetch ( range ) ) ;
}
pub fn fetch_blocking ( & mut self , mut range : Range ) {
// signal the stream loader to tech a range of the file and block until it is loaded.
// ensure the range is within the file's bounds.
if range . start > = self . len ( ) {
range . length = 0 ;
} else if range . end ( ) > self . len ( ) {
range . length = self . len ( ) - range . start ;
}
self . fetch ( range ) ;
if let Some ( ref shared ) = self . stream_shared {
let mut download_status = shared . download_status . lock ( ) . unwrap ( ) ;
2019-11-11 07:22:41 +00:00
while range . length
> download_status
. downloaded
. contained_length_from_value ( range . start )
{
download_status = shared
. cond
. wait_timeout ( download_status , Duration ::from_millis ( 1000 ) )
. unwrap ( )
. 0 ;
if range . length
> ( download_status
. downloaded
. union ( & download_status . requested )
. contained_length_from_value ( range . start ) )
{
2019-11-01 19:46:28 +00:00
// For some reason, the requested range is neither downloaded nor requested.
// This could be due to a network error. Request it again.
2019-11-07 13:02:53 +00:00
// We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
2019-11-01 19:46:28 +00:00
if let Some ( ref mut channel ) = self . channel_tx {
// ignore the error in case the channel has been closed already.
let _ = channel . unbounded_send ( StreamLoaderCommand ::Fetch ( range ) ) ;
}
}
}
}
}
pub fn fetch_next ( & mut self , length : usize ) {
2021-01-21 21:12:35 +00:00
if let Some ( ref shared ) = self . stream_shared {
let range = Range {
2019-11-01 19:46:28 +00:00
start : shared . read_position . load ( atomic ::Ordering ::Relaxed ) ,
length : length ,
2021-01-21 21:12:35 +00:00
} ;
self . fetch ( range )
}
2019-11-01 19:46:28 +00:00
}
pub fn fetch_next_blocking ( & mut self , length : usize ) {
2021-01-21 21:12:35 +00:00
if let Some ( ref shared ) = self . stream_shared {
let range = Range {
2019-11-01 19:46:28 +00:00
start : shared . read_position . load ( atomic ::Ordering ::Relaxed ) ,
length : length ,
2021-01-21 21:12:35 +00:00
} ;
self . fetch_blocking ( range ) ;
}
2019-11-01 19:46:28 +00:00
}
pub fn set_random_access_mode ( & mut self ) {
// optimise download strategy for random access
self . send_stream_loader_command ( StreamLoaderCommand ::RandomAccessMode ( ) ) ;
}
pub fn set_stream_mode ( & mut self ) {
// optimise download strategy for streaming
self . send_stream_loader_command ( StreamLoaderCommand ::StreamMode ( ) ) ;
}
pub fn close ( & mut self ) {
// terminate stream loading and don't load any more data for this file.
self . send_stream_loader_command ( StreamLoaderCommand ::Close ( ) ) ;
}
}
2017-01-29 15:36:39 +00:00
pub struct AudioFileStreaming {
read_file : fs ::File ,
position : u64 ,
2019-11-01 19:46:28 +00:00
stream_loader_command_tx : mpsc ::UnboundedSender < StreamLoaderCommand > ,
2017-01-29 15:36:39 +00:00
shared : Arc < AudioFileShared > ,
}
2019-11-01 19:46:28 +00:00
struct AudioFileDownloadStatus {
requested : RangeSet ,
downloaded : RangeSet ,
}
2019-11-07 13:02:53 +00:00
#[ derive(Copy, Clone) ]
enum DownloadStrategy {
RandomAccess ( ) ,
Streaming ( ) ,
}
2016-05-09 11:22:51 +00:00
struct AudioFileShared {
2017-01-19 22:45:24 +00:00
file_id : FileId ,
2019-11-01 19:46:28 +00:00
file_size : usize ,
2019-11-07 13:02:53 +00:00
stream_data_rate : usize ,
2015-07-07 21:40:31 +00:00
cond : Condvar ,
2019-11-01 19:46:28 +00:00
download_status : Mutex < AudioFileDownloadStatus > ,
2019-11-07 13:02:53 +00:00
download_strategy : Mutex < DownloadStrategy > ,
2019-11-02 06:19:31 +00:00
number_of_open_requests : AtomicUsize ,
2019-11-01 19:46:28 +00:00
ping_time_ms : AtomicUsize ,
read_position : AtomicUsize ,
2015-06-23 14:38:29 +00:00
}
2021-01-21 21:12:35 +00:00
impl AudioFile {
pub async fn open (
session : & Session ,
file_id : FileId ,
bytes_per_second : usize ,
play_from_beginning : bool ,
) -> Result < AudioFile , ChannelError > {
if let Some ( file ) = session . cache ( ) . and_then ( | cache | cache . file ( file_id ) ) {
debug! ( " File {} already in cache " , file_id ) ;
return Ok ( AudioFile ::Cached ( file ) ) ;
}
debug! ( " Downloading file {} " , file_id ) ;
let ( complete_tx , complete_rx ) = oneshot ::channel ( ) ;
let mut initial_data_length = if play_from_beginning {
INITIAL_DOWNLOAD_SIZE
+ max (
( READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64 ) as usize ,
( INITIAL_PING_TIME_ESTIMATE_SECONDS
* READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* bytes_per_second as f64 ) as usize ,
)
} else {
INITIAL_DOWNLOAD_SIZE
} ;
if initial_data_length % 4 ! = 0 {
initial_data_length + = 4 - ( initial_data_length % 4 ) ;
}
let ( headers , data ) = request_range ( session , file_id , 0 , initial_data_length ) . split ( ) ;
let streaming = AudioFileStreaming ::open (
session . clone ( ) ,
data ,
initial_data_length ,
Instant ::now ( ) ,
headers ,
file_id ,
complete_tx ,
bytes_per_second ,
) ;
let session_ = session . clone ( ) ;
session . spawn ( complete_rx . map_ok ( move | mut file | {
if let Some ( cache ) = session_ . cache ( ) {
debug! ( " File {} complete, saving to cache " , file_id ) ;
2021-02-10 20:51:33 +00:00
cache . save_file ( file_id , & mut file ) ;
2021-01-21 21:12:35 +00:00
} else {
debug! ( " File {} complete " , file_id ) ;
}
} ) ) ;
Ok ( AudioFile ::Streaming ( streaming . await ? ) )
}
pub fn get_stream_loader_controller ( & self ) -> StreamLoaderController {
match self {
AudioFile ::Streaming ( ref stream ) = > StreamLoaderController {
channel_tx : Some ( stream . stream_loader_command_tx . clone ( ) ) ,
stream_shared : Some ( stream . shared . clone ( ) ) ,
file_size : stream . shared . file_size ,
} ,
AudioFile ::Cached ( ref file ) = > StreamLoaderController {
channel_tx : None ,
stream_shared : None ,
file_size : file . metadata ( ) . unwrap ( ) . len ( ) as usize ,
} ,
}
}
2021-02-02 01:18:58 +00:00
pub fn is_cached ( & self ) -> bool {
2021-02-09 08:14:32 +00:00
match self {
AudioFile ::Cached { .. } = > true ,
_ = > false ,
}
2021-02-02 01:18:58 +00:00
}
2021-01-21 21:12:35 +00:00
}
impl AudioFileStreaming {
pub async fn open (
session : Session ,
initial_data_rx : ChannelData ,
initial_data_length : usize ,
initial_request_sent_time : Instant ,
headers : ChannelHeaders ,
file_id : FileId ,
complete_tx : oneshot ::Sender < NamedTempFile > ,
streaming_data_rate : usize ,
) -> Result < AudioFileStreaming , ChannelError > {
let ( _ , data ) = headers
. try_filter ( | ( id , _ ) | future ::ready ( * id = = 0x3 ) )
. next ( )
. await
. unwrap ( ) ? ;
let size = BigEndian ::read_u32 ( & data ) as usize * 4 ;
2015-07-07 21:40:31 +00:00
let shared = Arc ::new ( AudioFileShared {
2021-01-21 21:12:35 +00:00
file_id : file_id ,
2019-11-01 19:46:28 +00:00
file_size : size ,
2021-01-21 21:12:35 +00:00
stream_data_rate : streaming_data_rate ,
2015-07-07 21:40:31 +00:00
cond : Condvar ::new ( ) ,
2019-11-11 07:22:41 +00:00
download_status : Mutex ::new ( AudioFileDownloadStatus {
requested : RangeSet ::new ( ) ,
downloaded : RangeSet ::new ( ) ,
} ) ,
2019-11-07 13:02:53 +00:00
download_strategy : Mutex ::new ( DownloadStrategy ::RandomAccess ( ) ) , // start with random access mode until someone tells us otherwise
2019-11-02 06:19:31 +00:00
number_of_open_requests : AtomicUsize ::new ( 0 ) ,
2019-11-01 19:46:28 +00:00
ping_time_ms : AtomicUsize ::new ( 0 ) ,
read_position : AtomicUsize ::new ( 0 ) ,
2015-07-07 21:40:31 +00:00
} ) ;
2017-01-19 22:45:24 +00:00
let mut write_file = NamedTempFile ::new ( ) . unwrap ( ) ;
2019-11-01 21:38:46 +00:00
write_file . as_file ( ) . set_len ( size as u64 ) . unwrap ( ) ;
2017-01-19 22:45:24 +00:00
write_file . seek ( SeekFrom ::Start ( 0 ) ) . unwrap ( ) ;
2015-07-02 17:24:25 +00:00
2017-01-19 22:45:24 +00:00
let read_file = write_file . reopen ( ) . unwrap ( ) ;
2015-09-01 11:20:37 +00:00
2019-11-01 19:46:28 +00:00
//let (seek_tx, seek_rx) = mpsc::unbounded();
2019-11-11 07:22:41 +00:00
let ( stream_loader_command_tx , stream_loader_command_rx ) =
mpsc ::unbounded ::< StreamLoaderCommand > ( ) ;
2015-07-07 21:40:31 +00:00
2017-01-19 22:45:24 +00:00
let fetcher = AudioFileFetch ::new (
2021-01-21 21:12:35 +00:00
session . clone ( ) ,
2018-02-26 01:50:41 +00:00
shared . clone ( ) ,
2019-11-01 19:46:28 +00:00
initial_data_rx ,
2021-01-21 21:12:35 +00:00
initial_request_sent_time ,
2019-11-01 19:46:28 +00:00
initial_data_length ,
2018-02-26 01:50:41 +00:00
write_file ,
2019-11-01 19:46:28 +00:00
stream_loader_command_rx ,
2018-02-26 01:50:41 +00:00
complete_tx ,
2017-01-19 22:45:24 +00:00
) ;
2015-06-23 14:38:29 +00:00
2021-01-21 21:12:35 +00:00
session . spawn ( fetcher ) ;
Ok ( AudioFileStreaming {
2017-01-19 22:45:24 +00:00
read_file : read_file ,
position : 0 ,
2019-11-01 19:46:28 +00:00
//seek: seek_tx,
stream_loader_command_tx : stream_loader_command_tx ,
2017-01-19 22:45:24 +00:00
shared : shared ,
2021-01-21 21:12:35 +00:00
} )
2019-11-01 19:46:28 +00:00
}
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
fn request_range ( session : & Session , file : FileId , offset : usize , length : usize ) -> Channel {
2019-11-11 07:22:41 +00:00
assert! (
offset % 4 = = 0 ,
" Range request start positions must be aligned by 4 bytes. "
) ;
assert! (
length % 4 = = 0 ,
" Range request range lengths must be aligned by 4 bytes. "
) ;
2019-11-01 19:46:28 +00:00
let start = offset / 4 ;
2019-11-11 07:22:41 +00:00
let end = ( offset + length ) / 4 ;
2017-01-19 22:45:24 +00:00
let ( id , channel ) = session . channel ( ) . allocate ( ) ;
let mut data : Vec < u8 > = Vec ::new ( ) ;
data . write_u16 ::< BigEndian > ( id ) . unwrap ( ) ;
data . write_u8 ( 0 ) . unwrap ( ) ;
data . write_u8 ( 1 ) . unwrap ( ) ;
data . write_u16 ::< BigEndian > ( 0x0000 ) . unwrap ( ) ;
data . write_u32 ::< BigEndian > ( 0x00000000 ) . unwrap ( ) ;
data . write_u32 ::< BigEndian > ( 0x00009C40 ) . unwrap ( ) ;
data . write_u32 ::< BigEndian > ( 0x00020000 ) . unwrap ( ) ;
data . write ( & file . 0 ) . unwrap ( ) ;
2019-11-01 19:46:28 +00:00
data . write_u32 ::< BigEndian > ( start as u32 ) . unwrap ( ) ;
data . write_u32 ::< BigEndian > ( end as u32 ) . unwrap ( ) ;
2017-01-19 22:45:24 +00:00
session . send_packet ( 0x8 , data ) ;
channel
}
2019-11-01 19:46:28 +00:00
struct PartialFileData {
offset : usize ,
data : Bytes ,
}
enum ReceivedData {
ResponseTimeMs ( usize ) ,
Data ( PartialFileData ) ,
}
2021-01-21 21:12:35 +00:00
async fn audio_file_fetch_receive_data (
2019-11-01 19:46:28 +00:00
shared : Arc < AudioFileShared > ,
file_data_tx : mpsc ::UnboundedSender < ReceivedData > ,
data_rx : ChannelData ,
2019-11-05 11:58:00 +00:00
initial_data_offset : usize ,
initial_request_length : usize ,
2021-01-21 21:12:35 +00:00
request_sent_time : Instant ,
) {
let mut data_offset = initial_data_offset ;
let mut request_length = initial_request_length ;
let mut measure_ping_time = shared
. number_of_open_requests
. load ( atomic ::Ordering ::SeqCst )
= = 0 ;
shared
. number_of_open_requests
. fetch_add ( 1 , atomic ::Ordering ::SeqCst ) ;
enum TryFoldErr {
ChannelError ,
FinishEarly ,
2019-11-01 19:46:28 +00:00
}
2021-01-21 21:12:35 +00:00
let result = data_rx
. map_err ( | _ | TryFoldErr ::ChannelError )
. try_for_each ( | data | {
if measure_ping_time {
let duration = Instant ::now ( ) - request_sent_time ;
let duration_ms : u64 ;
if 0.001 * ( duration . as_millis ( ) as f64 )
> MAXIMUM_ASSUMED_PING_TIME_SECONDS
{
duration_ms = ( MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0 ) as u64 ;
} else {
duration_ms = duration . as_millis ( ) as u64 ;
}
let _ = file_data_tx
. unbounded_send ( ReceivedData ::ResponseTimeMs ( duration_ms as usize ) ) ;
measure_ping_time = false ;
}
let data_size = data . len ( ) ;
let _ = file_data_tx
. unbounded_send ( ReceivedData ::Data ( PartialFileData {
offset : data_offset ,
data : data ,
} ) ) ;
data_offset + = data_size ;
if request_length < data_size {
warn! ( " Data receiver for range {} (+{}) received more data from server than requested. " , initial_data_offset , initial_request_length ) ;
request_length = 0 ;
} else {
request_length - = data_size ;
}
2019-11-01 19:46:28 +00:00
2021-01-21 21:12:35 +00:00
future ::ready ( if request_length = = 0 {
Err ( TryFoldErr ::FinishEarly )
2021-01-22 21:32:45 +00:00
} else {
Ok ( ( ) )
2021-01-21 21:12:35 +00:00
} )
} )
. await ;
2019-11-02 06:19:31 +00:00
2021-01-21 21:12:35 +00:00
if request_length > 0 {
let missing_range = Range ::new ( data_offset , request_length ) ;
let mut download_status = shared . download_status . lock ( ) . unwrap ( ) ;
download_status . requested . subtract_range ( & missing_range ) ;
shared . cond . notify_all ( ) ;
}
shared
. number_of_open_requests
. fetch_sub ( 1 , atomic ::Ordering ::SeqCst ) ;
if let Err ( TryFoldErr ::ChannelError ) = result {
warn! (
" Error from channel for data receiver for range {} (+{}). " ,
initial_data_offset , initial_request_length
) ;
} else if request_length > 0 {
warn! (
" Data receiver for range {} (+{}) received less data from server than requested. " ,
initial_data_offset , initial_request_length
) ;
2019-11-01 19:46:28 +00:00
}
}
2021-01-22 21:32:45 +00:00
/*
2021-01-21 21:12:35 +00:00
async fn audio_file_fetch (
session : Session ,
shared : Arc < AudioFileShared > ,
initial_data_rx : ChannelData ,
initial_request_sent_time : Instant ,
initial_data_length : usize ,
2019-11-01 19:46:28 +00:00
2021-01-21 21:12:35 +00:00
output : NamedTempFile ,
stream_loader_command_rx : mpsc ::UnboundedReceiver < StreamLoaderCommand > ,
complete_tx : oneshot ::Sender < NamedTempFile > ,
) {
let ( file_data_tx , file_data_rx ) = unbounded ::< ReceivedData > ( ) ;
let requested_range = Range ::new ( 0 , initial_data_length ) ;
let mut download_status = shared . download_status . lock ( ) . unwrap ( ) ;
download_status . requested . add_range ( & requested_range ) ;
session . spawn ( audio_file_fetch_receive_data (
shared . clone ( ) ,
file_data_tx . clone ( ) ,
initial_data_rx ,
0 ,
initial_data_length ,
initial_request_sent_time ,
) ) ;
let mut network_response_times_ms : Vec ::new ( ) ;
let f1 = file_data_rx . map ( | x | Ok ::< _ , ( ) > ( x ) ) . try_for_each ( | x | {
match x {
ReceivedData ::ResponseTimeMs ( response_time_ms ) = > {
trace! ( " Ping time estimated as: {} ms. " , response_time_ms ) ;
// record the response time
network_response_times_ms . push ( response_time_ms ) ;
// prune old response times. Keep at most three.
while network_response_times_ms . len ( ) > 3 {
network_response_times_ms . remove ( 0 ) ;
}
2019-11-01 19:46:28 +00:00
2021-01-21 21:12:35 +00:00
// stats::median is experimental. So we calculate the median of up to three ourselves.
let ping_time_ms : usize = match network_response_times_ms . len ( ) {
1 = > network_response_times_ms [ 0 ] as usize ,
2 = > {
( ( network_response_times_ms [ 0 ] + network_response_times_ms [ 1 ] ) / 2 ) as usize
2019-11-01 19:46:28 +00:00
}
2021-01-21 21:12:35 +00:00
3 = > {
let mut times = network_response_times_ms . clone ( ) ;
times . sort ( ) ;
times [ 1 ]
2019-11-01 19:46:28 +00:00
}
2021-01-21 21:12:35 +00:00
_ = > unreachable! ( ) ,
} ;
// store our new estimate for everyone to see
shared
. ping_time_ms
. store ( ping_time_ms , atomic ::Ordering ::Relaxed ) ;
}
ReceivedData ::Data ( data ) = > {
output
. as_mut ( )
. unwrap ( )
. seek ( SeekFrom ::Start ( data . offset as u64 ) )
. unwrap ( ) ;
output
. as_mut ( )
. unwrap ( )
. write_all ( data . data . as_ref ( ) )
. unwrap ( ) ;
let mut full = false ;
{
let mut download_status = shared . download_status . lock ( ) . unwrap ( ) ;
let received_range = Range ::new ( data . offset , data . data . len ( ) ) ;
download_status . downloaded . add_range ( & received_range ) ;
shared . cond . notify_all ( ) ;
if download_status . downloaded . contained_length_from_value ( 0 )
> = shared . file_size
{
full = true ;
2019-11-01 19:46:28 +00:00
}
2021-01-21 21:12:35 +00:00
drop ( download_status ) ;
2019-11-01 19:46:28 +00:00
}
2021-01-21 21:12:35 +00:00
if full {
2019-11-02 06:19:31 +00:00
self . finish ( ) ;
2021-01-21 21:12:35 +00:00
return future ::ready ( Err ( ( ) ) ) ;
2019-11-01 19:46:28 +00:00
}
2021-01-21 21:12:35 +00:00
}
}
future ::ready ( Ok ( ( ) ) )
} ) ;
let f2 = stream_loader_command_rx . map ( Ok ::< _ , ( ) > ) . try_for_each ( | x | {
match cmd {
StreamLoaderCommand ::Fetch ( request ) = > {
self . download_range ( request . start , request . length ) ;
}
StreamLoaderCommand ::RandomAccessMode ( ) = > {
* ( shared . download_strategy . lock ( ) . unwrap ( ) ) = DownloadStrategy ::RandomAccess ( ) ;
}
StreamLoaderCommand ::StreamMode ( ) = > {
* ( shared . download_strategy . lock ( ) . unwrap ( ) ) = DownloadStrategy ::Streaming ( ) ;
}
StreamLoaderCommand ::Close ( ) = > return future ::ready ( Err ( ( ) ) ) ,
}
Ok ( ( ) )
} ) ;
let f3 = future ::poll_fn ( | _ | {
if let DownloadStrategy ::Streaming ( ) = self . get_download_strategy ( ) {
let number_of_open_requests = shared
. number_of_open_requests
. load ( atomic ::Ordering ::SeqCst ) ;
let max_requests_to_send =
MAX_PREFETCH_REQUESTS - min ( MAX_PREFETCH_REQUESTS , number_of_open_requests ) ;
if max_requests_to_send > 0 {
let bytes_pending : usize = {
let download_status = shared . download_status . lock ( ) . unwrap ( ) ;
download_status
. requested
. minus ( & download_status . downloaded )
. len ( )
} ;
let ping_time_seconds =
0.001 * shared . ping_time_ms . load ( atomic ::Ordering ::Relaxed ) as f64 ;
let download_rate = session . channel ( ) . get_download_rate_estimate ( ) ;
let desired_pending_bytes = max (
( PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * shared . stream_data_rate as f64 )
as usize ,
( FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64 )
as usize ,
) ;
if bytes_pending < desired_pending_bytes {
self . pre_fetch_more_data (
desired_pending_bytes - bytes_pending ,
max_requests_to_send ,
2019-11-11 07:22:41 +00:00
) ;
2019-11-01 19:46:28 +00:00
}
}
}
2021-01-21 21:12:35 +00:00
Poll ::Pending
} ) ;
future ::select_all ( vec! [ f1 , f2 , f3 ] ) . await
} * /
2019-11-01 19:46:28 +00:00
2021-01-22 21:32:45 +00:00
pin_project! {
struct AudioFileFetch {
session : Session ,
shared : Arc < AudioFileShared > ,
output : Option < NamedTempFile > ,
2017-01-19 22:45:24 +00:00
2021-01-22 21:32:45 +00:00
file_data_tx : mpsc ::UnboundedSender < ReceivedData > ,
#[ pin ]
file_data_rx : mpsc ::UnboundedReceiver < ReceivedData > ,
2017-01-19 22:45:24 +00:00
2021-01-22 21:32:45 +00:00
#[ pin ]
stream_loader_command_rx : mpsc ::UnboundedReceiver < StreamLoaderCommand > ,
complete_tx : Option < oneshot ::Sender < NamedTempFile > > ,
network_response_times_ms : Vec < usize > ,
}
2017-01-19 22:45:24 +00:00
}
impl AudioFileFetch {
2018-02-26 01:50:41 +00:00
fn new (
session : Session ,
shared : Arc < AudioFileShared > ,
2019-11-01 19:46:28 +00:00
initial_data_rx : ChannelData ,
initial_request_sent_time : Instant ,
initial_data_length : usize ,
2018-02-26 01:50:41 +00:00
output : NamedTempFile ,
2019-11-01 19:46:28 +00:00
stream_loader_command_rx : mpsc ::UnboundedReceiver < StreamLoaderCommand > ,
2018-02-26 01:50:41 +00:00
complete_tx : oneshot ::Sender < NamedTempFile > ,
) -> AudioFileFetch {
2019-11-01 19:46:28 +00:00
let ( file_data_tx , file_data_rx ) = unbounded ::< ReceivedData > ( ) ;
{
let requested_range = Range ::new ( 0 , initial_data_length ) ;
let mut download_status = shared . download_status . lock ( ) . unwrap ( ) ;
download_status . requested . add_range ( & requested_range ) ;
}
2021-01-21 21:12:35 +00:00
session . spawn ( audio_file_fetch_receive_data (
2019-11-01 19:46:28 +00:00
shared . clone ( ) ,
file_data_tx . clone ( ) ,
initial_data_rx ,
0 ,
initial_data_length ,
initial_request_sent_time ,
2021-01-21 21:12:35 +00:00
) ) ;
2019-11-01 19:46:28 +00:00
2017-01-19 22:45:24 +00:00
AudioFileFetch {
session : session ,
shared : shared ,
output : Some ( output ) ,
2019-11-01 19:46:28 +00:00
file_data_tx : file_data_tx ,
file_data_rx : file_data_rx ,
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
stream_loader_command_rx : stream_loader_command_rx ,
2017-01-19 22:45:24 +00:00
complete_tx : Some ( complete_tx ) ,
2019-11-01 19:46:28 +00:00
network_response_times_ms : Vec ::new ( ) ,
2015-07-07 21:40:31 +00:00
}
2015-06-23 17:34:48 +00:00
}
2015-06-24 00:41:39 +00:00
2019-11-07 13:02:53 +00:00
fn get_download_strategy ( & mut self ) -> DownloadStrategy {
* ( self . shared . download_strategy . lock ( ) . unwrap ( ) )
}
2019-11-01 19:46:28 +00:00
fn download_range ( & mut self , mut offset : usize , mut length : usize ) {
2019-11-07 13:02:53 +00:00
if length < MINIMUM_DOWNLOAD_SIZE {
length = MINIMUM_DOWNLOAD_SIZE ;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
// ensure the values are within the bounds and align them by 4 for the spotify protocol.
if offset > = self . shared . file_size {
return ;
}
2015-06-24 00:41:39 +00:00
2021-01-21 21:12:35 +00:00
if length = = 0 {
2019-11-01 19:46:28 +00:00
return ;
}
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
if offset + length > self . shared . file_size {
length = self . shared . file_size - offset ;
}
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
if offset % 4 ! = 0 {
length + = offset % 4 ;
offset - = offset % 4 ;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
if length % 4 ! = 0 {
length + = 4 - ( length % 4 ) ;
}
let mut ranges_to_request = RangeSet ::new ( ) ;
ranges_to_request . add_range ( & Range ::new ( offset , length ) ) ;
let mut download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
ranges_to_request . subtract_range_set ( & download_status . downloaded ) ;
ranges_to_request . subtract_range_set ( & download_status . requested ) ;
for range in ranges_to_request . iter ( ) {
2020-01-17 17:11:07 +00:00
let ( _headers , data ) = request_range (
& self . session ,
self . shared . file_id ,
range . start ,
range . length ,
)
. split ( ) ;
2019-11-01 19:46:28 +00:00
download_status . requested . add_range ( range ) ;
2021-01-21 21:12:35 +00:00
self . session . spawn ( audio_file_fetch_receive_data (
2019-11-01 19:46:28 +00:00
self . shared . clone ( ) ,
self . file_data_tx . clone ( ) ,
data ,
range . start ,
range . length ,
Instant ::now ( ) ,
2021-01-21 21:12:35 +00:00
) ) ;
2019-11-01 19:46:28 +00:00
}
2016-05-09 11:22:51 +00:00
}
2015-07-07 21:40:31 +00:00
2019-11-17 23:54:44 +00:00
fn pre_fetch_more_data ( & mut self , bytes : usize , max_requests_to_send : usize ) {
2019-11-07 13:02:53 +00:00
let mut bytes_to_go = bytes ;
2019-11-17 23:54:44 +00:00
let mut requests_to_go = max_requests_to_send ;
2019-11-01 19:46:28 +00:00
2019-11-17 23:54:44 +00:00
while bytes_to_go > 0 & & requests_to_go > 0 {
2019-11-07 13:02:53 +00:00
// determine what is still missing
let mut missing_data = RangeSet ::new ( ) ;
missing_data . add_range ( & Range ::new ( 0 , self . shared . file_size ) ) ;
{
let download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
missing_data . subtract_range_set ( & download_status . downloaded ) ;
missing_data . subtract_range_set ( & download_status . requested ) ;
}
2019-11-01 19:46:28 +00:00
2019-11-07 13:02:53 +00:00
// download data from after the current read position first
let mut tail_end = RangeSet ::new ( ) ;
let read_position = self . shared . read_position . load ( atomic ::Ordering ::Relaxed ) ;
2020-01-17 17:11:07 +00:00
tail_end . add_range ( & Range ::new (
read_position ,
self . shared . file_size - read_position ,
) ) ;
2019-11-07 13:02:53 +00:00
let tail_end = tail_end . intersection ( & missing_data ) ;
if ! tail_end . is_empty ( ) {
let range = tail_end . get_range ( 0 ) ;
let offset = range . start ;
let length = min ( range . length , bytes_to_go ) ;
self . download_range ( offset , length ) ;
2019-11-18 00:08:34 +00:00
requests_to_go - = 1 ;
2019-11-07 13:02:53 +00:00
bytes_to_go - = length ;
} else if ! missing_data . is_empty ( ) {
// ok, the tail is downloaded, download something fom the beginning.
let range = missing_data . get_range ( 0 ) ;
let offset = range . start ;
let length = min ( range . length , bytes_to_go ) ;
self . download_range ( offset , length ) ;
2019-11-18 00:08:34 +00:00
requests_to_go - = 1 ;
2019-11-07 13:02:53 +00:00
bytes_to_go - = length ;
} else {
return ;
}
2019-11-01 19:46:28 +00:00
}
2017-01-19 22:45:24 +00:00
}
2021-01-21 21:12:35 +00:00
fn poll_file_data_rx ( & mut self , cx : & mut Context < '_ > ) -> Poll < ( ) > {
2017-01-19 22:45:24 +00:00
loop {
2021-01-21 21:12:35 +00:00
match Pin ::new ( & mut self . file_data_rx ) . poll_next ( cx ) {
Poll ::Ready ( None ) = > return Poll ::Ready ( ( ) ) ,
Poll ::Ready ( Some ( ReceivedData ::ResponseTimeMs ( response_time_ms ) ) ) = > {
2019-11-11 07:43:41 +00:00
trace! ( " Ping time estimated as: {} ms. " , response_time_ms ) ;
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
// record the response time
self . network_response_times_ms . push ( response_time_ms ) ;
2017-01-19 22:45:24 +00:00
2019-11-01 23:00:08 +00:00
// prune old response times. Keep at most three.
2019-11-01 19:46:28 +00:00
while self . network_response_times_ms . len ( ) > 3 {
self . network_response_times_ms . remove ( 0 ) ;
}
// stats::median is experimental. So we calculate the median of up to three ourselves.
let ping_time_ms : usize = match self . network_response_times_ms . len ( ) {
1 = > self . network_response_times_ms [ 0 ] as usize ,
2019-11-11 07:22:41 +00:00
2 = > {
2020-01-17 17:11:07 +00:00
( ( self . network_response_times_ms [ 0 ]
+ self . network_response_times_ms [ 1 ] )
/ 2 ) as usize
2019-11-11 07:22:41 +00:00
}
2019-11-01 19:46:28 +00:00
3 = > {
let mut times = self . network_response_times_ms . clone ( ) ;
2021-01-21 21:12:35 +00:00
times . sort_unstable ( ) ;
2019-11-01 19:46:28 +00:00
times [ 1 ]
}
_ = > unreachable! ( ) ,
} ;
// store our new estimate for everyone to see
2019-11-11 07:22:41 +00:00
self . shared
. ping_time_ms
. store ( ping_time_ms , atomic ::Ordering ::Relaxed ) ;
}
2021-01-21 21:12:35 +00:00
Poll ::Ready ( Some ( ReceivedData ::Data ( data ) ) ) = > {
2019-11-01 19:46:28 +00:00
self . output
. as_mut ( )
. unwrap ( )
. seek ( SeekFrom ::Start ( data . offset as u64 ) )
. unwrap ( ) ;
2019-11-11 07:22:41 +00:00
self . output
. as_mut ( )
. unwrap ( )
. write_all ( data . data . as_ref ( ) )
. unwrap ( ) ;
2019-11-01 19:46:28 +00:00
let mut full = false ;
{
let mut download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
let received_range = Range ::new ( data . offset , data . data . len ( ) ) ;
download_status . downloaded . add_range ( & received_range ) ;
2017-01-19 22:45:24 +00:00
self . shared . cond . notify_all ( ) ;
2019-11-11 07:22:41 +00:00
if download_status . downloaded . contained_length_from_value ( 0 )
> = self . shared . file_size
{
2019-11-01 19:46:28 +00:00
full = true ;
}
2019-11-05 11:58:00 +00:00
2019-11-01 19:46:28 +00:00
drop ( download_status ) ;
}
2017-01-19 22:45:24 +00:00
if full {
self . finish ( ) ;
2021-01-22 21:32:45 +00:00
return Poll ::Ready ( ( ) ) ;
2017-01-19 22:45:24 +00:00
}
}
2021-01-22 21:32:45 +00:00
Poll ::Pending = > return Poll ::Pending ,
2019-11-01 19:46:28 +00:00
}
}
}
2021-01-21 21:12:35 +00:00
fn poll_stream_loader_command_rx ( & mut self , cx : & mut Context < '_ > ) -> Poll < ( ) > {
2019-11-01 19:46:28 +00:00
loop {
2021-01-21 21:12:35 +00:00
match Pin ::new ( & mut self . stream_loader_command_rx ) . poll_next ( cx ) {
2021-01-22 21:32:45 +00:00
Poll ::Ready ( None ) = > return Poll ::Ready ( ( ) ) ,
Poll ::Ready ( Some ( cmd ) ) = > match cmd {
StreamLoaderCommand ::Fetch ( request ) = > {
self . download_range ( request . start , request . length ) ;
}
StreamLoaderCommand ::RandomAccessMode ( ) = > {
* ( self . shared . download_strategy . lock ( ) . unwrap ( ) ) =
2021-01-21 21:12:35 +00:00
DownloadStrategy ::RandomAccess ( ) ;
2021-01-22 21:32:45 +00:00
}
StreamLoaderCommand ::StreamMode ( ) = > {
* ( self . shared . download_strategy . lock ( ) . unwrap ( ) ) =
2021-01-21 21:12:35 +00:00
DownloadStrategy ::Streaming ( ) ;
}
2021-01-22 21:32:45 +00:00
StreamLoaderCommand ::Close ( ) = > return Poll ::Ready ( ( ) ) ,
} ,
Poll ::Pending = > return Poll ::Pending ,
2019-11-01 19:46:28 +00:00
}
}
}
fn finish ( & mut self ) {
let mut output = self . output . take ( ) . unwrap ( ) ;
let complete_tx = self . complete_tx . take ( ) . unwrap ( ) ;
output . seek ( SeekFrom ::Start ( 0 ) ) . unwrap ( ) ;
let _ = complete_tx . send ( output ) ;
}
}
impl Future for AudioFileFetch {
2021-01-21 21:12:35 +00:00
type Output = ( ) ;
fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < '_ > ) -> Poll < ( ) > {
if let Poll ::Ready ( ( ) ) = self . poll_stream_loader_command_rx ( cx ) {
2021-01-22 21:32:45 +00:00
return Poll ::Ready ( ( ) ) ;
2019-11-01 19:46:28 +00:00
}
2017-01-19 22:45:24 +00:00
2021-01-21 21:12:35 +00:00
if let Poll ::Ready ( ( ) ) = self . poll_file_data_rx ( cx ) {
2021-01-22 21:32:45 +00:00
return Poll ::Ready ( ( ) ) ;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
2019-11-07 13:02:53 +00:00
if let DownloadStrategy ::Streaming ( ) = self . get_download_strategy ( ) {
2020-01-17 17:11:07 +00:00
let number_of_open_requests = self
. shared
. number_of_open_requests
. load ( atomic ::Ordering ::SeqCst ) ;
2019-11-18 00:08:34 +00:00
let max_requests_to_send =
MAX_PREFETCH_REQUESTS - min ( MAX_PREFETCH_REQUESTS , number_of_open_requests ) ;
2019-11-01 19:46:28 +00:00
2019-11-18 00:08:34 +00:00
if max_requests_to_send > 0 {
2019-11-17 23:54:44 +00:00
let bytes_pending : usize = {
let download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
2020-01-17 17:11:07 +00:00
download_status
. requested
. minus ( & download_status . downloaded )
. len ( )
2019-11-17 23:54:44 +00:00
} ;
let ping_time_seconds =
0.001 * self . shared . ping_time_ms . load ( atomic ::Ordering ::Relaxed ) as f64 ;
let download_rate = self . session . channel ( ) . get_download_rate_estimate ( ) ;
2019-11-07 13:02:53 +00:00
2019-11-17 23:54:44 +00:00
let desired_pending_bytes = max (
2020-01-17 17:11:07 +00:00
( PREFETCH_THRESHOLD_FACTOR
* ping_time_seconds
* self . shared . stream_data_rate as f64 ) as usize ,
( FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64 )
2019-11-17 23:54:44 +00:00
as usize ,
) ;
if bytes_pending < desired_pending_bytes {
2019-11-18 00:08:34 +00:00
self . pre_fetch_more_data (
desired_pending_bytes - bytes_pending ,
max_requests_to_send ,
) ;
2019-11-17 23:54:44 +00:00
}
2019-11-01 19:46:28 +00:00
}
}
2021-01-21 21:12:35 +00:00
Poll ::Pending
2015-06-23 17:34:48 +00:00
}
2015-06-23 14:38:29 +00:00
}
2017-01-29 15:36:39 +00:00
impl Read for AudioFileStreaming {
2015-06-23 14:38:29 +00:00
fn read ( & mut self , output : & mut [ u8 ] ) -> io ::Result < usize > {
2019-11-01 19:46:28 +00:00
let offset = self . position as usize ;
if offset > = self . shared . file_size {
return Ok ( 0 ) ;
}
let length = min ( output . len ( ) , self . shared . file_size - offset ) ;
2015-06-23 14:38:29 +00:00
2019-11-07 13:02:53 +00:00
let length_to_request = match * ( self . shared . download_strategy . lock ( ) . unwrap ( ) ) {
2019-11-11 07:22:41 +00:00
DownloadStrategy ::RandomAccess ( ) = > length ,
2019-11-07 13:02:53 +00:00
DownloadStrategy ::Streaming ( ) = > {
// Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
2019-11-11 07:22:41 +00:00
let ping_time_seconds =
0.0001 * self . shared . ping_time_ms . load ( atomic ::Ordering ::Relaxed ) as f64 ;
let length_to_request = length
+ max (
( READ_AHEAD_DURING_PLAYBACK_SECONDS * self . shared . stream_data_rate as f64 )
as usize ,
( READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* ping_time_seconds
* self . shared . stream_data_rate as f64 ) as usize ,
) ;
2019-11-07 13:02:53 +00:00
min ( length_to_request , self . shared . file_size - offset )
}
} ;
2015-07-07 21:40:31 +00:00
2019-11-01 19:46:28 +00:00
let mut ranges_to_request = RangeSet ::new ( ) ;
2019-11-07 13:02:53 +00:00
ranges_to_request . add_range ( & Range ::new ( offset , length_to_request ) ) ;
2019-11-01 19:46:28 +00:00
let mut download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
ranges_to_request . subtract_range_set ( & download_status . downloaded ) ;
ranges_to_request . subtract_range_set ( & download_status . requested ) ;
2021-01-21 21:12:35 +00:00
for & range in ranges_to_request . iter ( ) {
2019-11-11 07:22:41 +00:00
self . stream_loader_command_tx
2021-01-21 21:12:35 +00:00
. unbounded_send ( StreamLoaderCommand ::Fetch ( range ) )
2019-11-11 07:22:41 +00:00
. unwrap ( ) ;
2019-11-01 19:46:28 +00:00
}
2019-11-07 13:02:53 +00:00
if length = = 0 {
return Ok ( 0 ) ;
}
2019-11-11 07:43:41 +00:00
let mut download_message_printed = false ;
2019-11-01 19:46:28 +00:00
while ! download_status . downloaded . contains ( offset ) {
2019-11-11 08:00:19 +00:00
if let DownloadStrategy ::Streaming ( ) = * self . shared . download_strategy . lock ( ) . unwrap ( ) {
if ! download_message_printed {
debug! ( " Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {} " , offset , download_status . downloaded , download_status . requested . minus ( & download_status . downloaded ) ) ;
download_message_printed = true ;
}
2019-11-11 07:43:41 +00:00
}
2019-11-11 07:22:41 +00:00
download_status = self
. shared
. cond
. wait_timeout ( download_status , Duration ::from_millis ( 1000 ) )
. unwrap ( )
. 0 ;
2019-11-01 19:46:28 +00:00
}
2020-01-17 17:11:07 +00:00
let available_length = download_status
. downloaded
. contained_length_from_value ( offset ) ;
2019-11-01 19:46:28 +00:00
assert! ( available_length > 0 ) ;
drop ( download_status ) ;
self . position = self . read_file . seek ( SeekFrom ::Start ( offset as u64 ) ) . unwrap ( ) ;
let read_len = min ( length , available_length ) ;
2019-10-08 09:31:18 +00:00
let read_len = self . read_file . read ( & mut output [ .. read_len ] ) ? ;
2019-11-01 19:46:28 +00:00
2019-11-11 07:43:41 +00:00
if download_message_printed {
2019-11-18 00:08:34 +00:00
debug! (
" Read at postion {} completed. {} bytes returned, {} bytes were requested. " ,
offset ,
read_len ,
output . len ( )
) ;
2019-11-11 07:43:41 +00:00
}
2015-06-23 14:38:29 +00:00
2015-09-01 11:20:37 +00:00
self . position + = read_len as u64 ;
2019-11-11 07:22:41 +00:00
self . shared
. read_position
. store ( self . position as usize , atomic ::Ordering ::Relaxed ) ;
2019-11-01 19:46:28 +00:00
2021-01-21 21:12:35 +00:00
Ok ( read_len )
2015-06-23 14:38:29 +00:00
}
}
2017-01-29 15:36:39 +00:00
impl Seek for AudioFileStreaming {
2015-07-07 21:40:31 +00:00
fn seek ( & mut self , pos : SeekFrom ) -> io ::Result < u64 > {
2019-10-08 09:31:18 +00:00
self . position = self . read_file . seek ( pos ) ? ;
2018-11-10 20:31:03 +00:00
// Do not seek past EOF
2019-11-11 07:22:41 +00:00
self . shared
. read_position
. store ( self . position as usize , atomic ::Ordering ::Relaxed ) ;
2017-01-19 22:45:24 +00:00
Ok ( self . position )
2015-06-23 14:38:29 +00:00
}
}
2017-01-29 15:36:39 +00:00
impl Read for AudioFile {
fn read ( & mut self , output : & mut [ u8 ] ) -> io ::Result < usize > {
match * self {
AudioFile ::Cached ( ref mut file ) = > file . read ( output ) ,
AudioFile ::Streaming ( ref mut file ) = > file . read ( output ) ,
}
}
}
impl Seek for AudioFile {
fn seek ( & mut self , pos : SeekFrom ) -> io ::Result < u64 > {
match * self {
AudioFile ::Cached ( ref mut file ) = > file . seek ( pos ) ,
AudioFile ::Streaming ( ref mut file ) = > file . seek ( pos ) ,
}
}
}