2020-01-17 18:09:10 +00:00
use crate ::range_set ::{ Range , RangeSet } ;
2018-02-26 01:50:41 +00:00
use byteorder ::{ BigEndian , ByteOrder , WriteBytesExt } ;
2019-11-01 19:46:28 +00:00
use bytes ::Bytes ;
2018-02-26 01:50:41 +00:00
use futures ::sync ::{ mpsc , oneshot } ;
2018-04-25 17:29:50 +00:00
use futures ::Stream ;
2018-03-23 05:13:01 +00:00
use futures ::{ Async , Future , Poll } ;
2019-11-11 07:22:41 +00:00
use std ::cmp ::{ max , min } ;
2015-07-07 21:40:31 +00:00
use std ::fs ;
2018-02-26 01:50:41 +00:00
use std ::io ::{ self , Read , Seek , SeekFrom , Write } ;
2017-01-19 22:45:24 +00:00
use std ::sync ::{ Arc , Condvar , Mutex } ;
2019-11-01 19:46:28 +00:00
use std ::time ::{ Duration , Instant } ;
2016-01-12 23:29:31 +00:00
use tempfile ::NamedTempFile ;
2015-06-23 14:38:29 +00:00
2019-11-11 07:22:41 +00:00
use futures ::sync ::mpsc ::unbounded ;
2019-09-16 19:00:09 +00:00
use librespot_core ::channel ::{ Channel , ChannelData , ChannelError , ChannelHeaders } ;
use librespot_core ::session ::Session ;
use librespot_core ::spotify_id ::FileId ;
2019-11-01 19:46:28 +00:00
use std ::sync ::atomic ;
use std ::sync ::atomic ::AtomicUsize ;
2015-06-23 14:38:29 +00:00
2019-11-07 13:02:53 +00:00
const MINIMUM_DOWNLOAD_SIZE : usize = 1024 * 16 ;
// The minimum size of a block that is requested from the Spotify servers in one request.
// This is the block size that is typically requested while doing a seek() on a file.
// Note: smaller requests can happen if part of the block is downloaded already.
2019-11-11 07:22:41 +00:00
const INITIAL_DOWNLOAD_SIZE : usize = 1024 * 16 ;
2019-11-07 13:02:53 +00:00
// The amount of data that is requested when initially opening a file.
// Note: if the file is opened to play from the beginning, the amount of data to
// read ahead is requested in addition to this amount. If the file is opened to seek to
// another position, then only this amount is requested on the first request.
const INITIAL_PING_TIME_ESTIMATE_SECONDS : f64 = 0.5 ;
// The pig time that is used for calculations before a ping time was actually measured.
const MAXIMUM_ASSUMED_PING_TIME_SECONDS : f64 = 1.5 ;
// If the measured ping time to the Spotify server is larger than this value, it is capped
// to avoid run-away block sizes and pre-fetching.
pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS : f64 = 1.0 ;
// Before playback starts, this many seconds of data must be present.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS : f64 = 2.0 ;
// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
// time to the Spotify server.
// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
// obeyed.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
2019-11-17 23:54:44 +00:00
pub const READ_AHEAD_DURING_PLAYBACK_SECONDS : f64 = 5.0 ;
2019-11-07 13:02:53 +00:00
// While playing back, this many seconds of data ahead of the current read position are
// requested.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
2019-11-17 23:54:44 +00:00
pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS : f64 = 10.0 ;
2019-11-07 13:02:53 +00:00
// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
// time to the Spotify server.
// Note: the calculations are done using the nominal bitrate of the file. The actual amount
// of audio data may be larger or smaller.
const PREFETCH_THRESHOLD_FACTOR : f64 = 4.0 ;
// If the amount of data that is pending (requested but not received) is less than a certain amount,
// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
// data is calculated as
// <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
const FAST_PREFETCH_THRESHOLD_FACTOR : f64 = 1.5 ;
// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
// The formula used is
// <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
// This mechanism allows for fast downloading of the remainder of the file. The number should be larger
// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
2019-11-17 23:54:44 +00:00
const MAX_PREFETCH_REQUESTS : usize = 4 ;
// Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
// requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
// for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
// pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
2017-01-29 15:36:39 +00:00
pub enum AudioFile {
Cached ( fs ::File ) ,
Streaming ( AudioFileStreaming ) ,
}
2015-07-07 21:40:31 +00:00
2017-01-29 15:36:39 +00:00
pub enum AudioFileOpen {
2017-08-03 19:37:04 +00:00
Cached ( Option < fs ::File > ) ,
2017-01-29 15:36:39 +00:00
Streaming ( AudioFileOpenStreaming ) ,
2015-06-24 00:41:39 +00:00
}
2015-06-23 14:38:29 +00:00
2017-01-29 15:36:39 +00:00
pub struct AudioFileOpenStreaming {
2017-01-19 22:45:24 +00:00
session : Session ,
2019-11-01 19:46:28 +00:00
initial_data_rx : Option < ChannelData > ,
initial_data_length : Option < usize > ,
initial_request_sent_time : Instant ,
2017-01-19 22:45:24 +00:00
headers : ChannelHeaders ,
file_id : FileId ,
complete_tx : Option < oneshot ::Sender < NamedTempFile > > ,
2019-11-07 13:02:53 +00:00
streaming_data_rate : usize ,
2016-05-09 11:22:51 +00:00
}
2019-11-11 07:22:41 +00:00
enum StreamLoaderCommand {
Fetch ( Range ) , // signal the stream loader to fetch a range of the file
2019-11-01 19:46:28 +00:00
RandomAccessMode ( ) , // optimise download strategy for random access
2019-11-11 07:22:41 +00:00
StreamMode ( ) , // optimise download strategy for streaming
Close ( ) , // terminate and don't load any more data
2019-11-01 19:46:28 +00:00
}
#[ derive(Clone) ]
pub struct StreamLoaderController {
channel_tx : Option < mpsc ::UnboundedSender < StreamLoaderCommand > > ,
stream_shared : Option < Arc < AudioFileShared > > ,
file_size : usize ,
}
impl StreamLoaderController {
pub fn len ( & self ) -> usize {
return self . file_size ;
}
pub fn range_available ( & self , range : Range ) -> bool {
if let Some ( ref shared ) = self . stream_shared {
2019-11-01 21:38:46 +00:00
let download_status = shared . download_status . lock ( ) . unwrap ( ) ;
2019-11-11 07:22:41 +00:00
if range . length
< = download_status
. downloaded
. contained_length_from_value ( range . start )
{
2019-11-01 19:46:28 +00:00
return true ;
} else {
return false ;
}
} else {
if range . length < = self . len ( ) - range . start {
return true ;
} else {
return false ;
}
}
}
pub fn ping_time_ms ( & self ) -> usize {
if let Some ( ref shared ) = self . stream_shared {
return shared . ping_time_ms . load ( atomic ::Ordering ::Relaxed ) ;
} else {
return 0 ;
}
}
fn send_stream_loader_command ( & mut self , command : StreamLoaderCommand ) {
if let Some ( ref mut channel ) = self . channel_tx {
// ignore the error in case the channel has been closed already.
let _ = channel . unbounded_send ( command ) ;
}
}
pub fn fetch ( & mut self , range : Range ) {
// signal the stream loader to fetch a range of the file
self . send_stream_loader_command ( StreamLoaderCommand ::Fetch ( range ) ) ;
}
pub fn fetch_blocking ( & mut self , mut range : Range ) {
// signal the stream loader to tech a range of the file and block until it is loaded.
// ensure the range is within the file's bounds.
if range . start > = self . len ( ) {
range . length = 0 ;
} else if range . end ( ) > self . len ( ) {
range . length = self . len ( ) - range . start ;
}
self . fetch ( range ) ;
if let Some ( ref shared ) = self . stream_shared {
let mut download_status = shared . download_status . lock ( ) . unwrap ( ) ;
2019-11-11 07:22:41 +00:00
while range . length
> download_status
. downloaded
. contained_length_from_value ( range . start )
{
download_status = shared
. cond
. wait_timeout ( download_status , Duration ::from_millis ( 1000 ) )
. unwrap ( )
. 0 ;
if range . length
> ( download_status
. downloaded
. union ( & download_status . requested )
. contained_length_from_value ( range . start ) )
{
2019-11-01 19:46:28 +00:00
// For some reason, the requested range is neither downloaded nor requested.
// This could be due to a network error. Request it again.
2019-11-07 13:02:53 +00:00
// We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
2019-11-01 19:46:28 +00:00
if let Some ( ref mut channel ) = self . channel_tx {
// ignore the error in case the channel has been closed already.
let _ = channel . unbounded_send ( StreamLoaderCommand ::Fetch ( range ) ) ;
}
}
}
}
}
pub fn fetch_next ( & mut self , length : usize ) {
2019-11-11 07:22:41 +00:00
let range : Range = if let Some ( ref shared ) = self . stream_shared {
2019-11-01 19:46:28 +00:00
Range {
start : shared . read_position . load ( atomic ::Ordering ::Relaxed ) ,
length : length ,
}
} else {
return ;
} ;
self . fetch ( range ) ;
}
pub fn fetch_next_blocking ( & mut self , length : usize ) {
2019-11-11 07:22:41 +00:00
let range : Range = if let Some ( ref shared ) = self . stream_shared {
2019-11-01 19:46:28 +00:00
Range {
start : shared . read_position . load ( atomic ::Ordering ::Relaxed ) ,
length : length ,
}
} else {
return ;
} ;
self . fetch_blocking ( range ) ;
}
pub fn set_random_access_mode ( & mut self ) {
// optimise download strategy for random access
self . send_stream_loader_command ( StreamLoaderCommand ::RandomAccessMode ( ) ) ;
}
pub fn set_stream_mode ( & mut self ) {
// optimise download strategy for streaming
self . send_stream_loader_command ( StreamLoaderCommand ::StreamMode ( ) ) ;
}
pub fn close ( & mut self ) {
// terminate stream loading and don't load any more data for this file.
self . send_stream_loader_command ( StreamLoaderCommand ::Close ( ) ) ;
}
}
2017-01-29 15:36:39 +00:00
pub struct AudioFileStreaming {
read_file : fs ::File ,
position : u64 ,
2019-11-01 19:46:28 +00:00
stream_loader_command_tx : mpsc ::UnboundedSender < StreamLoaderCommand > ,
2017-01-29 15:36:39 +00:00
shared : Arc < AudioFileShared > ,
}
2019-11-01 19:46:28 +00:00
struct AudioFileDownloadStatus {
requested : RangeSet ,
downloaded : RangeSet ,
}
2019-11-07 13:02:53 +00:00
#[ derive(Copy, Clone) ]
enum DownloadStrategy {
RandomAccess ( ) ,
Streaming ( ) ,
}
2016-05-09 11:22:51 +00:00
struct AudioFileShared {
2017-01-19 22:45:24 +00:00
file_id : FileId ,
2019-11-01 19:46:28 +00:00
file_size : usize ,
2019-11-07 13:02:53 +00:00
stream_data_rate : usize ,
2015-07-07 21:40:31 +00:00
cond : Condvar ,
2019-11-01 19:46:28 +00:00
download_status : Mutex < AudioFileDownloadStatus > ,
2019-11-07 13:02:53 +00:00
download_strategy : Mutex < DownloadStrategy > ,
2019-11-02 06:19:31 +00:00
number_of_open_requests : AtomicUsize ,
2019-11-01 19:46:28 +00:00
ping_time_ms : AtomicUsize ,
read_position : AtomicUsize ,
2015-06-23 14:38:29 +00:00
}
2017-01-29 15:36:39 +00:00
impl AudioFileOpenStreaming {
fn finish ( & mut self , size : usize ) -> AudioFileStreaming {
2015-07-07 21:40:31 +00:00
let shared = Arc ::new ( AudioFileShared {
2017-01-19 22:45:24 +00:00
file_id : self . file_id ,
2019-11-01 19:46:28 +00:00
file_size : size ,
2019-11-07 13:02:53 +00:00
stream_data_rate : self . streaming_data_rate ,
2015-07-07 21:40:31 +00:00
cond : Condvar ::new ( ) ,
2019-11-11 07:22:41 +00:00
download_status : Mutex ::new ( AudioFileDownloadStatus {
requested : RangeSet ::new ( ) ,
downloaded : RangeSet ::new ( ) ,
} ) ,
2019-11-07 13:02:53 +00:00
download_strategy : Mutex ::new ( DownloadStrategy ::RandomAccess ( ) ) , // start with random access mode until someone tells us otherwise
2019-11-02 06:19:31 +00:00
number_of_open_requests : AtomicUsize ::new ( 0 ) ,
2019-11-01 19:46:28 +00:00
ping_time_ms : AtomicUsize ::new ( 0 ) ,
read_position : AtomicUsize ::new ( 0 ) ,
2015-07-07 21:40:31 +00:00
} ) ;
2017-01-19 22:45:24 +00:00
let mut write_file = NamedTempFile ::new ( ) . unwrap ( ) ;
2019-11-01 21:38:46 +00:00
write_file . as_file ( ) . set_len ( size as u64 ) . unwrap ( ) ;
2017-01-19 22:45:24 +00:00
write_file . seek ( SeekFrom ::Start ( 0 ) ) . unwrap ( ) ;
2015-07-02 17:24:25 +00:00
2017-01-19 22:45:24 +00:00
let read_file = write_file . reopen ( ) . unwrap ( ) ;
2015-09-01 11:20:37 +00:00
2019-11-01 19:46:28 +00:00
let initial_data_rx = self . initial_data_rx . take ( ) . unwrap ( ) ;
let initial_data_length = self . initial_data_length . take ( ) . unwrap ( ) ;
2017-01-19 22:45:24 +00:00
let complete_tx = self . complete_tx . take ( ) . unwrap ( ) ;
2019-11-01 19:46:28 +00:00
//let (seek_tx, seek_rx) = mpsc::unbounded();
2019-11-11 07:22:41 +00:00
let ( stream_loader_command_tx , stream_loader_command_rx ) =
mpsc ::unbounded ::< StreamLoaderCommand > ( ) ;
2015-07-07 21:40:31 +00:00
2017-01-19 22:45:24 +00:00
let fetcher = AudioFileFetch ::new (
2018-02-26 01:50:41 +00:00
self . session . clone ( ) ,
shared . clone ( ) ,
2019-11-01 19:46:28 +00:00
initial_data_rx ,
self . initial_request_sent_time ,
initial_data_length ,
2018-02-26 01:50:41 +00:00
write_file ,
2019-11-01 19:46:28 +00:00
stream_loader_command_rx ,
2018-02-26 01:50:41 +00:00
complete_tx ,
2017-01-19 22:45:24 +00:00
) ;
self . session . spawn ( move | _ | fetcher ) ;
2015-06-23 14:38:29 +00:00
2017-01-29 15:36:39 +00:00
AudioFileStreaming {
2017-01-19 22:45:24 +00:00
read_file : read_file ,
2016-05-09 11:22:51 +00:00
2017-01-19 22:45:24 +00:00
position : 0 ,
2019-11-01 19:46:28 +00:00
//seek: seek_tx,
stream_loader_command_tx : stream_loader_command_tx ,
2017-01-19 22:45:24 +00:00
shared : shared ,
}
2015-07-07 21:40:31 +00:00
}
2016-05-09 11:22:51 +00:00
}
2017-01-19 22:45:24 +00:00
impl Future for AudioFileOpen {
type Item = AudioFile ;
type Error = ChannelError ;
2016-05-09 11:22:51 +00:00
2017-01-19 22:45:24 +00:00
fn poll ( & mut self ) -> Poll < AudioFile , ChannelError > {
2017-01-29 15:36:39 +00:00
match * self {
AudioFileOpen ::Streaming ( ref mut open ) = > {
let file = try_ready! ( open . poll ( ) ) ;
Ok ( Async ::Ready ( AudioFile ::Streaming ( file ) ) )
}
2017-08-03 19:37:04 +00:00
AudioFileOpen ::Cached ( ref mut file ) = > {
let file = file . take ( ) . unwrap ( ) ;
2017-01-29 15:36:39 +00:00
Ok ( Async ::Ready ( AudioFile ::Cached ( file ) ) )
}
}
}
}
impl Future for AudioFileOpenStreaming {
type Item = AudioFileStreaming ;
type Error = ChannelError ;
fn poll ( & mut self ) -> Poll < AudioFileStreaming , ChannelError > {
2017-01-19 22:45:24 +00:00
loop {
let ( id , data ) = try_ready! ( self . headers . poll ( ) ) . unwrap ( ) ;
2015-07-07 21:40:31 +00:00
2017-01-19 22:45:24 +00:00
if id = = 0x3 {
let size = BigEndian ::read_u32 ( & data ) as usize * 4 ;
let file = self . finish ( size ) ;
2018-02-12 20:02:27 +00:00
2017-01-19 22:45:24 +00:00
return Ok ( Async ::Ready ( file ) ) ;
2015-07-07 21:40:31 +00:00
}
2016-05-09 11:22:51 +00:00
}
}
2017-01-19 22:45:24 +00:00
}
2016-05-09 11:22:51 +00:00
2017-05-16 00:04:10 +00:00
impl AudioFile {
2019-11-11 07:22:41 +00:00
pub fn open (
session : & Session ,
file_id : FileId ,
bytes_per_second : usize ,
play_from_beginning : bool ,
) -> AudioFileOpen {
2017-05-16 00:04:10 +00:00
let cache = session . cache ( ) . cloned ( ) ;
2017-01-29 15:36:39 +00:00
if let Some ( file ) = cache . as_ref ( ) . and_then ( | cache | cache . file ( file_id ) ) {
2017-01-29 17:54:32 +00:00
debug! ( " File {} already in cache " , file_id ) ;
2017-08-03 19:37:04 +00:00
return AudioFileOpen ::Cached ( Some ( file ) ) ;
2017-01-29 15:36:39 +00:00
}
2017-01-29 17:54:32 +00:00
debug! ( " Downloading file {} " , file_id ) ;
2017-01-19 22:45:24 +00:00
let ( complete_tx , complete_rx ) = oneshot ::channel ( ) ;
2019-11-07 13:02:53 +00:00
let mut initial_data_length = if play_from_beginning {
2019-11-11 07:22:41 +00:00
INITIAL_DOWNLOAD_SIZE
+ max (
( READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64 ) as usize ,
( INITIAL_PING_TIME_ESTIMATE_SECONDS
* READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* bytes_per_second as f64 ) as usize ,
)
} else {
INITIAL_DOWNLOAD_SIZE
} ;
2019-11-07 13:02:53 +00:00
if initial_data_length % 4 ! = 0 {
initial_data_length + = 4 - ( initial_data_length % 4 ) ;
}
2019-11-01 19:46:28 +00:00
let ( headers , data ) = request_range ( session , file_id , 0 , initial_data_length ) . split ( ) ;
2015-07-07 21:40:31 +00:00
2017-01-29 15:36:39 +00:00
let open = AudioFileOpenStreaming {
2017-05-16 00:04:10 +00:00
session : session . clone ( ) ,
2017-01-19 22:45:24 +00:00
file_id : file_id ,
headers : headers ,
2019-11-01 19:46:28 +00:00
initial_data_rx : Some ( data ) ,
initial_data_length : Some ( initial_data_length ) ,
initial_request_sent_time : Instant ::now ( ) ,
2015-07-07 21:40:31 +00:00
2017-01-19 22:45:24 +00:00
complete_tx : Some ( complete_tx ) ,
2019-11-07 13:02:53 +00:00
streaming_data_rate : bytes_per_second ,
2016-05-09 11:22:51 +00:00
} ;
2017-05-16 00:04:10 +00:00
let session_ = session . clone ( ) ;
session . spawn ( move | _ | {
2018-02-26 01:50:41 +00:00
complete_rx
. map ( move | mut file | {
if let Some ( cache ) = session_ . cache ( ) {
cache . save_file ( file_id , & mut file ) ;
debug! ( " File {} complete, saving to cache " , file_id ) ;
} else {
debug! ( " File {} complete " , file_id ) ;
}
} )
. or_else ( | oneshot ::Canceled | Ok ( ( ) ) )
2017-01-29 15:36:39 +00:00
} ) ;
2019-11-07 13:02:53 +00:00
return AudioFileOpen ::Streaming ( open ) ;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
2019-11-07 13:02:53 +00:00
pub fn get_stream_loader_controller ( & self ) -> StreamLoaderController {
2019-11-01 19:46:28 +00:00
match self {
2019-11-07 13:02:53 +00:00
AudioFile ::Streaming ( ref stream ) = > {
return StreamLoaderController {
2019-11-01 19:46:28 +00:00
channel_tx : Some ( stream . stream_loader_command_tx . clone ( ) ) ,
stream_shared : Some ( stream . shared . clone ( ) ) ,
file_size : stream . shared . file_size ,
2019-11-05 21:16:01 +00:00
} ;
2019-11-01 19:46:28 +00:00
}
AudioFile ::Cached ( ref file ) = > {
return StreamLoaderController {
channel_tx : None ,
stream_shared : None ,
file_size : file . metadata ( ) . unwrap ( ) . len ( ) as usize ,
2020-01-17 17:11:07 +00:00
} ;
2019-11-01 19:46:28 +00:00
}
}
}
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
fn request_range ( session : & Session , file : FileId , offset : usize , length : usize ) -> Channel {
2019-11-11 07:22:41 +00:00
assert! (
offset % 4 = = 0 ,
" Range request start positions must be aligned by 4 bytes. "
) ;
assert! (
length % 4 = = 0 ,
" Range request range lengths must be aligned by 4 bytes. "
) ;
2019-11-01 19:46:28 +00:00
let start = offset / 4 ;
2019-11-11 07:22:41 +00:00
let end = ( offset + length ) / 4 ;
2017-01-19 22:45:24 +00:00
let ( id , channel ) = session . channel ( ) . allocate ( ) ;
let mut data : Vec < u8 > = Vec ::new ( ) ;
data . write_u16 ::< BigEndian > ( id ) . unwrap ( ) ;
data . write_u8 ( 0 ) . unwrap ( ) ;
data . write_u8 ( 1 ) . unwrap ( ) ;
data . write_u16 ::< BigEndian > ( 0x0000 ) . unwrap ( ) ;
data . write_u32 ::< BigEndian > ( 0x00000000 ) . unwrap ( ) ;
data . write_u32 ::< BigEndian > ( 0x00009C40 ) . unwrap ( ) ;
data . write_u32 ::< BigEndian > ( 0x00020000 ) . unwrap ( ) ;
data . write ( & file . 0 ) . unwrap ( ) ;
2019-11-01 19:46:28 +00:00
data . write_u32 ::< BigEndian > ( start as u32 ) . unwrap ( ) ;
data . write_u32 ::< BigEndian > ( end as u32 ) . unwrap ( ) ;
2017-01-19 22:45:24 +00:00
session . send_packet ( 0x8 , data ) ;
channel
}
2019-11-01 19:46:28 +00:00
struct PartialFileData {
offset : usize ,
data : Bytes ,
}
enum ReceivedData {
ResponseTimeMs ( usize ) ,
Data ( PartialFileData ) ,
}
struct AudioFileFetchDataReceiver {
shared : Arc < AudioFileShared > ,
file_data_tx : mpsc ::UnboundedSender < ReceivedData > ,
data_rx : ChannelData ,
2019-11-05 11:58:00 +00:00
initial_data_offset : usize ,
initial_request_length : usize ,
2019-11-01 19:46:28 +00:00
data_offset : usize ,
request_length : usize ,
request_sent_time : Option < Instant > ,
2019-11-02 06:19:31 +00:00
measure_ping_time : bool ,
2019-11-01 19:46:28 +00:00
}
impl AudioFileFetchDataReceiver {
fn new (
shared : Arc < AudioFileShared > ,
file_data_tx : mpsc ::UnboundedSender < ReceivedData > ,
data_rx : ChannelData ,
data_offset : usize ,
request_length : usize ,
request_sent_time : Instant ,
) -> AudioFileFetchDataReceiver {
2020-01-17 17:11:07 +00:00
let measure_ping_time = shared
. number_of_open_requests
. load ( atomic ::Ordering ::SeqCst )
= = 0 ;
2019-11-02 06:19:31 +00:00
2019-11-11 07:22:41 +00:00
shared
. number_of_open_requests
. fetch_add ( 1 , atomic ::Ordering ::SeqCst ) ;
2019-11-02 06:19:31 +00:00
2019-11-01 19:46:28 +00:00
AudioFileFetchDataReceiver {
shared : shared ,
data_rx : data_rx ,
file_data_tx : file_data_tx ,
2019-11-05 11:58:00 +00:00
initial_data_offset : data_offset ,
initial_request_length : request_length ,
2019-11-01 19:46:28 +00:00
data_offset : data_offset ,
request_length : request_length ,
request_sent_time : Some ( request_sent_time ) ,
2019-11-02 06:19:31 +00:00
measure_ping_time : measure_ping_time ,
2019-11-01 19:46:28 +00:00
}
}
}
impl AudioFileFetchDataReceiver {
fn finish ( & mut self ) {
if self . request_length > 0 {
let missing_range = Range ::new ( self . data_offset , self . request_length ) ;
let mut download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
download_status . requested . subtract_range ( & missing_range ) ;
self . shared . cond . notify_all ( ) ;
}
2019-11-02 06:19:31 +00:00
2019-11-11 07:22:41 +00:00
self . shared
. number_of_open_requests
. fetch_sub ( 1 , atomic ::Ordering ::SeqCst ) ;
2019-11-01 19:46:28 +00:00
}
}
impl Future for AudioFileFetchDataReceiver {
type Item = ( ) ;
type Error = ( ) ;
fn poll ( & mut self ) -> Poll < ( ) , ( ) > {
loop {
match self . data_rx . poll ( ) {
Ok ( Async ::Ready ( Some ( data ) ) ) = > {
2019-11-02 06:19:31 +00:00
if self . measure_ping_time {
if let Some ( request_sent_time ) = self . request_sent_time {
let duration = Instant ::now ( ) - request_sent_time ;
let duration_ms : u64 ;
2020-01-17 17:11:07 +00:00
if 0.001 * ( duration . as_millis ( ) as f64 )
> MAXIMUM_ASSUMED_PING_TIME_SECONDS
2019-11-11 07:22:41 +00:00
{
2019-11-07 13:02:53 +00:00
duration_ms = ( MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0 ) as u64 ;
2019-11-02 06:19:31 +00:00
} else {
2019-11-07 21:58:17 +00:00
duration_ms = duration . as_millis ( ) as u64 ;
2019-11-02 06:19:31 +00:00
}
2019-11-11 07:22:41 +00:00
let _ = self
. file_data_tx
. unbounded_send ( ReceivedData ::ResponseTimeMs ( duration_ms as usize ) ) ;
2019-11-02 06:19:31 +00:00
self . measure_ping_time = false ;
2019-11-01 19:46:28 +00:00
}
}
let data_size = data . len ( ) ;
2019-11-11 07:22:41 +00:00
let _ = self
. file_data_tx
. unbounded_send ( ReceivedData ::Data ( PartialFileData {
offset : self . data_offset ,
data : data ,
} ) ) ;
2019-11-01 19:46:28 +00:00
self . data_offset + = data_size ;
if self . request_length < data_size {
2019-11-05 11:58:00 +00:00
warn! ( " Data receiver for range {} (+{}) received more data from server than requested. " , self . initial_data_offset , self . initial_request_length ) ;
2019-11-01 19:46:28 +00:00
self . request_length = 0 ;
} else {
self . request_length - = data_size ;
}
if self . request_length = = 0 {
2019-11-02 06:19:31 +00:00
self . finish ( ) ;
2019-11-01 19:46:28 +00:00
return Ok ( Async ::Ready ( ( ) ) ) ;
}
}
Ok ( Async ::Ready ( None ) ) = > {
if self . request_length > 0 {
2019-11-05 11:58:00 +00:00
warn! ( " Data receiver for range {} (+{}) received less data from server than requested. " , self . initial_data_offset , self . initial_request_length ) ;
2019-11-01 19:46:28 +00:00
}
2019-11-02 06:19:31 +00:00
self . finish ( ) ;
2019-11-01 19:46:28 +00:00
return Ok ( Async ::Ready ( ( ) ) ) ;
}
Ok ( Async ::NotReady ) = > {
return Ok ( Async ::NotReady ) ;
}
Err ( ChannelError ) = > {
2019-11-11 07:22:41 +00:00
warn! (
" Error from channel for data receiver for range {} (+{}). " ,
self . initial_data_offset , self . initial_request_length
) ;
2019-11-01 19:46:28 +00:00
self . finish ( ) ;
return Ok ( Async ::Ready ( ( ) ) ) ;
}
}
}
}
}
2017-01-19 22:45:24 +00:00
struct AudioFileFetch {
session : Session ,
shared : Arc < AudioFileShared > ,
output : Option < NamedTempFile > ,
2019-11-01 19:46:28 +00:00
file_data_tx : mpsc ::UnboundedSender < ReceivedData > ,
file_data_rx : mpsc ::UnboundedReceiver < ReceivedData > ,
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
stream_loader_command_rx : mpsc ::UnboundedReceiver < StreamLoaderCommand > ,
2017-01-19 22:45:24 +00:00
complete_tx : Option < oneshot ::Sender < NamedTempFile > > ,
2019-11-01 19:46:28 +00:00
network_response_times_ms : Vec < usize > ,
2017-01-19 22:45:24 +00:00
}
impl AudioFileFetch {
2018-02-26 01:50:41 +00:00
fn new (
session : Session ,
shared : Arc < AudioFileShared > ,
2019-11-01 19:46:28 +00:00
initial_data_rx : ChannelData ,
initial_request_sent_time : Instant ,
initial_data_length : usize ,
2018-02-26 01:50:41 +00:00
output : NamedTempFile ,
2019-11-01 19:46:28 +00:00
stream_loader_command_rx : mpsc ::UnboundedReceiver < StreamLoaderCommand > ,
2018-02-26 01:50:41 +00:00
complete_tx : oneshot ::Sender < NamedTempFile > ,
) -> AudioFileFetch {
2019-11-01 19:46:28 +00:00
let ( file_data_tx , file_data_rx ) = unbounded ::< ReceivedData > ( ) ;
{
let requested_range = Range ::new ( 0 , initial_data_length ) ;
let mut download_status = shared . download_status . lock ( ) . unwrap ( ) ;
download_status . requested . add_range ( & requested_range ) ;
}
let initial_data_receiver = AudioFileFetchDataReceiver ::new (
shared . clone ( ) ,
file_data_tx . clone ( ) ,
initial_data_rx ,
0 ,
initial_data_length ,
initial_request_sent_time ,
) ;
session . spawn ( move | _ | initial_data_receiver ) ;
2017-01-19 22:45:24 +00:00
AudioFileFetch {
session : session ,
shared : shared ,
output : Some ( output ) ,
2019-11-01 19:46:28 +00:00
file_data_tx : file_data_tx ,
file_data_rx : file_data_rx ,
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
stream_loader_command_rx : stream_loader_command_rx ,
2017-01-19 22:45:24 +00:00
complete_tx : Some ( complete_tx ) ,
2019-11-01 19:46:28 +00:00
network_response_times_ms : Vec ::new ( ) ,
2015-07-07 21:40:31 +00:00
}
2015-06-23 17:34:48 +00:00
}
2015-06-24 00:41:39 +00:00
2019-11-07 13:02:53 +00:00
fn get_download_strategy ( & mut self ) -> DownloadStrategy {
* ( self . shared . download_strategy . lock ( ) . unwrap ( ) )
}
2019-11-01 19:46:28 +00:00
fn download_range ( & mut self , mut offset : usize , mut length : usize ) {
2019-11-07 13:02:53 +00:00
if length < MINIMUM_DOWNLOAD_SIZE {
length = MINIMUM_DOWNLOAD_SIZE ;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
// ensure the values are within the bounds and align them by 4 for the spotify protocol.
if offset > = self . shared . file_size {
return ;
}
2015-06-24 00:41:39 +00:00
2019-11-01 19:46:28 +00:00
if length < = 0 {
return ;
}
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
if offset + length > self . shared . file_size {
length = self . shared . file_size - offset ;
}
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
if offset % 4 ! = 0 {
length + = offset % 4 ;
offset - = offset % 4 ;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
if length % 4 ! = 0 {
length + = 4 - ( length % 4 ) ;
}
let mut ranges_to_request = RangeSet ::new ( ) ;
ranges_to_request . add_range ( & Range ::new ( offset , length ) ) ;
let mut download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
ranges_to_request . subtract_range_set ( & download_status . downloaded ) ;
ranges_to_request . subtract_range_set ( & download_status . requested ) ;
for range in ranges_to_request . iter ( ) {
2020-01-17 17:11:07 +00:00
let ( _headers , data ) = request_range (
& self . session ,
self . shared . file_id ,
range . start ,
range . length ,
)
. split ( ) ;
2019-11-01 19:46:28 +00:00
download_status . requested . add_range ( range ) ;
let receiver = AudioFileFetchDataReceiver ::new (
self . shared . clone ( ) ,
self . file_data_tx . clone ( ) ,
data ,
range . start ,
range . length ,
Instant ::now ( ) ,
) ;
self . session . spawn ( move | _ | receiver ) ;
}
2016-05-09 11:22:51 +00:00
}
2015-07-07 21:40:31 +00:00
2019-11-17 23:54:44 +00:00
fn pre_fetch_more_data ( & mut self , bytes : usize , max_requests_to_send : usize ) {
2019-11-07 13:02:53 +00:00
let mut bytes_to_go = bytes ;
2019-11-17 23:54:44 +00:00
let mut requests_to_go = max_requests_to_send ;
2019-11-01 19:46:28 +00:00
2019-11-17 23:54:44 +00:00
while bytes_to_go > 0 & & requests_to_go > 0 {
2019-11-07 13:02:53 +00:00
// determine what is still missing
let mut missing_data = RangeSet ::new ( ) ;
missing_data . add_range ( & Range ::new ( 0 , self . shared . file_size ) ) ;
{
let download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
missing_data . subtract_range_set ( & download_status . downloaded ) ;
missing_data . subtract_range_set ( & download_status . requested ) ;
}
2019-11-01 19:46:28 +00:00
2019-11-07 13:02:53 +00:00
// download data from after the current read position first
let mut tail_end = RangeSet ::new ( ) ;
let read_position = self . shared . read_position . load ( atomic ::Ordering ::Relaxed ) ;
2020-01-17 17:11:07 +00:00
tail_end . add_range ( & Range ::new (
read_position ,
self . shared . file_size - read_position ,
) ) ;
2019-11-07 13:02:53 +00:00
let tail_end = tail_end . intersection ( & missing_data ) ;
if ! tail_end . is_empty ( ) {
let range = tail_end . get_range ( 0 ) ;
let offset = range . start ;
let length = min ( range . length , bytes_to_go ) ;
self . download_range ( offset , length ) ;
2019-11-18 00:08:34 +00:00
requests_to_go - = 1 ;
2019-11-07 13:02:53 +00:00
bytes_to_go - = length ;
} else if ! missing_data . is_empty ( ) {
// ok, the tail is downloaded, download something fom the beginning.
let range = missing_data . get_range ( 0 ) ;
let offset = range . start ;
let length = min ( range . length , bytes_to_go ) ;
self . download_range ( offset , length ) ;
2019-11-18 00:08:34 +00:00
requests_to_go - = 1 ;
2019-11-07 13:02:53 +00:00
bytes_to_go - = length ;
} else {
return ;
}
2019-11-01 19:46:28 +00:00
}
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
fn poll_file_data_rx ( & mut self ) -> Poll < ( ) , ( ) > {
2017-01-19 22:45:24 +00:00
loop {
2019-11-01 19:46:28 +00:00
match self . file_data_rx . poll ( ) {
2017-01-19 22:45:24 +00:00
Ok ( Async ::Ready ( None ) ) = > {
return Ok ( Async ::Ready ( ( ) ) ) ;
}
2019-11-01 19:46:28 +00:00
Ok ( Async ::Ready ( Some ( ReceivedData ::ResponseTimeMs ( response_time_ms ) ) ) ) = > {
2019-11-11 07:43:41 +00:00
trace! ( " Ping time estimated as: {} ms. " , response_time_ms ) ;
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
// record the response time
self . network_response_times_ms . push ( response_time_ms ) ;
2017-01-19 22:45:24 +00:00
2019-11-01 23:00:08 +00:00
// prune old response times. Keep at most three.
2019-11-01 19:46:28 +00:00
while self . network_response_times_ms . len ( ) > 3 {
self . network_response_times_ms . remove ( 0 ) ;
}
// stats::median is experimental. So we calculate the median of up to three ourselves.
let ping_time_ms : usize = match self . network_response_times_ms . len ( ) {
1 = > self . network_response_times_ms [ 0 ] as usize ,
2019-11-11 07:22:41 +00:00
2 = > {
2020-01-17 17:11:07 +00:00
( ( self . network_response_times_ms [ 0 ]
+ self . network_response_times_ms [ 1 ] )
/ 2 ) as usize
2019-11-11 07:22:41 +00:00
}
2019-11-01 19:46:28 +00:00
3 = > {
let mut times = self . network_response_times_ms . clone ( ) ;
times . sort ( ) ;
times [ 1 ]
}
_ = > unreachable! ( ) ,
} ;
// store our new estimate for everyone to see
2019-11-11 07:22:41 +00:00
self . shared
. ping_time_ms
. store ( ping_time_ms , atomic ::Ordering ::Relaxed ) ;
}
2019-11-01 19:46:28 +00:00
Ok ( Async ::Ready ( Some ( ReceivedData ::Data ( data ) ) ) ) = > {
self . output
. as_mut ( )
. unwrap ( )
. seek ( SeekFrom ::Start ( data . offset as u64 ) )
. unwrap ( ) ;
2019-11-11 07:22:41 +00:00
self . output
. as_mut ( )
. unwrap ( )
. write_all ( data . data . as_ref ( ) )
. unwrap ( ) ;
2019-11-01 19:46:28 +00:00
let mut full = false ;
{
let mut download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
let received_range = Range ::new ( data . offset , data . data . len ( ) ) ;
download_status . downloaded . add_range ( & received_range ) ;
2017-01-19 22:45:24 +00:00
self . shared . cond . notify_all ( ) ;
2019-11-11 07:22:41 +00:00
if download_status . downloaded . contained_length_from_value ( 0 )
> = self . shared . file_size
{
2019-11-01 19:46:28 +00:00
full = true ;
}
2019-11-05 11:58:00 +00:00
2019-11-01 19:46:28 +00:00
drop ( download_status ) ;
}
2017-01-19 22:45:24 +00:00
if full {
self . finish ( ) ;
return Ok ( Async ::Ready ( ( ) ) ) ;
}
}
2019-11-01 19:46:28 +00:00
Ok ( Async ::NotReady ) = > {
return Ok ( Async ::NotReady ) ;
2019-11-11 07:22:41 +00:00
}
2019-11-01 19:46:28 +00:00
Err ( ( ) ) = > unreachable! ( ) ,
}
}
}
fn poll_stream_loader_command_rx ( & mut self ) -> Poll < ( ) , ( ) > {
loop {
match self . stream_loader_command_rx . poll ( ) {
2019-11-02 00:04:46 +00:00
Ok ( Async ::Ready ( None ) ) = > {
return Ok ( Async ::Ready ( ( ) ) ) ;
}
2019-11-01 19:46:28 +00:00
Ok ( Async ::Ready ( Some ( StreamLoaderCommand ::Fetch ( request ) ) ) ) = > {
self . download_range ( request . start , request . length ) ;
}
Ok ( Async ::Ready ( Some ( StreamLoaderCommand ::RandomAccessMode ( ) ) ) ) = > {
2020-01-17 17:11:07 +00:00
* ( self . shared . download_strategy . lock ( ) . unwrap ( ) ) =
DownloadStrategy ::RandomAccess ( ) ;
2019-11-01 19:46:28 +00:00
}
Ok ( Async ::Ready ( Some ( StreamLoaderCommand ::StreamMode ( ) ) ) ) = > {
2020-01-17 17:11:07 +00:00
* ( self . shared . download_strategy . lock ( ) . unwrap ( ) ) =
DownloadStrategy ::Streaming ( ) ;
2019-11-01 19:46:28 +00:00
}
Ok ( Async ::Ready ( Some ( StreamLoaderCommand ::Close ( ) ) ) ) = > {
2017-01-19 22:45:24 +00:00
return Ok ( Async ::Ready ( ( ) ) ) ;
2018-02-26 01:50:41 +00:00
}
2019-11-11 07:22:41 +00:00
Ok ( Async ::NotReady ) = > return Ok ( Async ::NotReady ) ,
2019-11-01 19:46:28 +00:00
Err ( ( ) ) = > unreachable! ( ) ,
}
}
}
fn finish ( & mut self ) {
let mut output = self . output . take ( ) . unwrap ( ) ;
let complete_tx = self . complete_tx . take ( ) . unwrap ( ) ;
output . seek ( SeekFrom ::Start ( 0 ) ) . unwrap ( ) ;
let _ = complete_tx . send ( output ) ;
}
}
impl Future for AudioFileFetch {
type Item = ( ) ;
type Error = ( ) ;
fn poll ( & mut self ) -> Poll < ( ) , ( ) > {
match self . poll_stream_loader_command_rx ( ) {
Ok ( Async ::NotReady ) = > ( ) ,
Ok ( Async ::Ready ( _ ) ) = > {
return Ok ( Async ::Ready ( ( ) ) ) ;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
Err ( ( ) ) = > unreachable! ( ) ,
}
2017-01-19 22:45:24 +00:00
2019-11-01 19:46:28 +00:00
match self . poll_file_data_rx ( ) {
Ok ( Async ::NotReady ) = > ( ) ,
Ok ( Async ::Ready ( _ ) ) = > {
return Ok ( Async ::Ready ( ( ) ) ) ;
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
Err ( ( ) ) = > unreachable! ( ) ,
2017-01-19 22:45:24 +00:00
}
2019-11-01 19:46:28 +00:00
2019-11-07 13:02:53 +00:00
if let DownloadStrategy ::Streaming ( ) = self . get_download_strategy ( ) {
2020-01-17 17:11:07 +00:00
let number_of_open_requests = self
. shared
. number_of_open_requests
. load ( atomic ::Ordering ::SeqCst ) ;
2019-11-18 00:08:34 +00:00
let max_requests_to_send =
MAX_PREFETCH_REQUESTS - min ( MAX_PREFETCH_REQUESTS , number_of_open_requests ) ;
2019-11-01 19:46:28 +00:00
2019-11-18 00:08:34 +00:00
if max_requests_to_send > 0 {
2019-11-17 23:54:44 +00:00
let bytes_pending : usize = {
let download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
2020-01-17 17:11:07 +00:00
download_status
. requested
. minus ( & download_status . downloaded )
. len ( )
2019-11-17 23:54:44 +00:00
} ;
let ping_time_seconds =
0.001 * self . shared . ping_time_ms . load ( atomic ::Ordering ::Relaxed ) as f64 ;
let download_rate = self . session . channel ( ) . get_download_rate_estimate ( ) ;
2019-11-07 13:02:53 +00:00
2019-11-17 23:54:44 +00:00
let desired_pending_bytes = max (
2020-01-17 17:11:07 +00:00
( PREFETCH_THRESHOLD_FACTOR
* ping_time_seconds
* self . shared . stream_data_rate as f64 ) as usize ,
( FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64 )
2019-11-17 23:54:44 +00:00
as usize ,
) ;
if bytes_pending < desired_pending_bytes {
2019-11-18 00:08:34 +00:00
self . pre_fetch_more_data (
desired_pending_bytes - bytes_pending ,
max_requests_to_send ,
) ;
2019-11-17 23:54:44 +00:00
}
2019-11-01 19:46:28 +00:00
}
}
2019-11-11 07:22:41 +00:00
return Ok ( Async ::NotReady ) ;
2015-06-23 17:34:48 +00:00
}
2015-06-23 14:38:29 +00:00
}
2017-01-29 15:36:39 +00:00
impl Read for AudioFileStreaming {
2015-06-23 14:38:29 +00:00
fn read ( & mut self , output : & mut [ u8 ] ) -> io ::Result < usize > {
2019-11-01 19:46:28 +00:00
let offset = self . position as usize ;
if offset > = self . shared . file_size {
return Ok ( 0 ) ;
}
let length = min ( output . len ( ) , self . shared . file_size - offset ) ;
2015-06-23 14:38:29 +00:00
2019-11-07 13:02:53 +00:00
let length_to_request = match * ( self . shared . download_strategy . lock ( ) . unwrap ( ) ) {
2019-11-11 07:22:41 +00:00
DownloadStrategy ::RandomAccess ( ) = > length ,
2019-11-07 13:02:53 +00:00
DownloadStrategy ::Streaming ( ) = > {
// Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
2019-11-11 07:22:41 +00:00
let ping_time_seconds =
0.0001 * self . shared . ping_time_ms . load ( atomic ::Ordering ::Relaxed ) as f64 ;
let length_to_request = length
+ max (
( READ_AHEAD_DURING_PLAYBACK_SECONDS * self . shared . stream_data_rate as f64 )
as usize ,
( READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* ping_time_seconds
* self . shared . stream_data_rate as f64 ) as usize ,
) ;
2019-11-07 13:02:53 +00:00
min ( length_to_request , self . shared . file_size - offset )
}
} ;
2015-07-07 21:40:31 +00:00
2019-11-01 19:46:28 +00:00
let mut ranges_to_request = RangeSet ::new ( ) ;
2019-11-07 13:02:53 +00:00
ranges_to_request . add_range ( & Range ::new ( offset , length_to_request ) ) ;
2019-11-01 19:46:28 +00:00
let mut download_status = self . shared . download_status . lock ( ) . unwrap ( ) ;
ranges_to_request . subtract_range_set ( & download_status . downloaded ) ;
ranges_to_request . subtract_range_set ( & download_status . requested ) ;
for range in ranges_to_request . iter ( ) {
2019-11-11 07:22:41 +00:00
self . stream_loader_command_tx
. unbounded_send ( StreamLoaderCommand ::Fetch ( range . clone ( ) ) )
. unwrap ( ) ;
2019-11-01 19:46:28 +00:00
}
2019-11-07 13:02:53 +00:00
if length = = 0 {
return Ok ( 0 ) ;
}
2019-11-11 07:43:41 +00:00
let mut download_message_printed = false ;
2019-11-01 19:46:28 +00:00
while ! download_status . downloaded . contains ( offset ) {
2019-11-11 08:00:19 +00:00
if let DownloadStrategy ::Streaming ( ) = * self . shared . download_strategy . lock ( ) . unwrap ( ) {
if ! download_message_printed {
debug! ( " Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {} " , offset , download_status . downloaded , download_status . requested . minus ( & download_status . downloaded ) ) ;
download_message_printed = true ;
}
2019-11-11 07:43:41 +00:00
}
2019-11-11 07:22:41 +00:00
download_status = self
. shared
. cond
. wait_timeout ( download_status , Duration ::from_millis ( 1000 ) )
. unwrap ( )
. 0 ;
2019-11-01 19:46:28 +00:00
}
2020-01-17 17:11:07 +00:00
let available_length = download_status
. downloaded
. contained_length_from_value ( offset ) ;
2019-11-01 19:46:28 +00:00
assert! ( available_length > 0 ) ;
drop ( download_status ) ;
self . position = self . read_file . seek ( SeekFrom ::Start ( offset as u64 ) ) . unwrap ( ) ;
let read_len = min ( length , available_length ) ;
2019-10-08 09:31:18 +00:00
let read_len = self . read_file . read ( & mut output [ .. read_len ] ) ? ;
2019-11-01 19:46:28 +00:00
2019-11-11 07:43:41 +00:00
if download_message_printed {
2019-11-18 00:08:34 +00:00
debug! (
" Read at postion {} completed. {} bytes returned, {} bytes were requested. " ,
offset ,
read_len ,
output . len ( )
) ;
2019-11-11 07:43:41 +00:00
}
2015-06-23 14:38:29 +00:00
2015-09-01 11:20:37 +00:00
self . position + = read_len as u64 ;
2019-11-11 07:22:41 +00:00
self . shared
. read_position
. store ( self . position as usize , atomic ::Ordering ::Relaxed ) ;
2019-11-01 19:46:28 +00:00
return Ok ( read_len ) ;
2015-06-23 14:38:29 +00:00
}
}
2017-01-29 15:36:39 +00:00
impl Seek for AudioFileStreaming {
2015-07-07 21:40:31 +00:00
fn seek ( & mut self , pos : SeekFrom ) -> io ::Result < u64 > {
2019-10-08 09:31:18 +00:00
self . position = self . read_file . seek ( pos ) ? ;
2018-11-10 20:31:03 +00:00
// Do not seek past EOF
2019-11-11 07:22:41 +00:00
self . shared
. read_position
. store ( self . position as usize , atomic ::Ordering ::Relaxed ) ;
2017-01-19 22:45:24 +00:00
Ok ( self . position )
2015-06-23 14:38:29 +00:00
}
}
2017-01-29 15:36:39 +00:00
impl Read for AudioFile {
fn read ( & mut self , output : & mut [ u8 ] ) -> io ::Result < usize > {
match * self {
AudioFile ::Cached ( ref mut file ) = > file . read ( output ) ,
AudioFile ::Streaming ( ref mut file ) = > file . read ( output ) ,
}
}
}
impl Seek for AudioFile {
fn seek ( & mut self , pos : SeekFrom ) -> io ::Result < u64 > {
match * self {
AudioFile ::Cached ( ref mut file ) = > file . seek ( pos ) ,
AudioFile ::Streaming ( ref mut file ) = > file . seek ( pos ) ,
}
}
}