diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs deleted file mode 100644 index 5fdf9e74..00000000 --- a/audio/src/fetch.rs +++ /dev/null @@ -1,1112 +0,0 @@ -use std::cmp::{max, min}; -use std::fs; -use std::future::Future; -use std::io::{self, Read, Seek, SeekFrom, Write}; -use std::pin::Pin; -use std::sync::atomic::{self, AtomicUsize}; -use std::sync::{Arc, Condvar, Mutex}; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; - -use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; -use bytes::Bytes; -use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt}; -use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; -use librespot_core::session::Session; -use librespot_core::spotify_id::FileId; -use tempfile::NamedTempFile; -use tokio::sync::{mpsc, oneshot}; - -use crate::range_set::{Range, RangeSet}; - -const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; -// The minimum size of a block that is requested from the Spotify servers in one request. -// This is the block size that is typically requested while doing a seek() on a file. -// Note: smaller requests can happen if part of the block is downloaded already. - -const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; -// The amount of data that is requested when initially opening a file. -// Note: if the file is opened to play from the beginning, the amount of data to -// read ahead is requested in addition to this amount. If the file is opened to seek to -// another position, then only this amount is requested on the first request. - -const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5; -// The pig time that is used for calculations before a ping time was actually measured. - -const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5; -// If the measured ping time to the Spotify server is larger than this value, it is capped -// to avoid run-away block sizes and pre-fetching. - -pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0; -// Before playback starts, this many seconds of data must be present. -// Note: the calculations are done using the nominal bitrate of the file. The actual amount -// of audio data may be larger or smaller. - -pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0; -// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping -// time to the Spotify server. -// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are -// obeyed. -// Note: the calculations are done using the nominal bitrate of the file. The actual amount -// of audio data may be larger or smaller. - -pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0; -// While playing back, this many seconds of data ahead of the current read position are -// requested. -// Note: the calculations are done using the nominal bitrate of the file. The actual amount -// of audio data may be larger or smaller. - -pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0; -// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping -// time to the Spotify server. -// Note: the calculations are done using the nominal bitrate of the file. The actual amount -// of audio data may be larger or smaller. - -const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0; -// If the amount of data that is pending (requested but not received) is less than a certain amount, -// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more -// data is calculated as -// < PREFETCH_THRESHOLD_FACTOR * * - -const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5; -// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account. -// The formula used is -// < FAST_PREFETCH_THRESHOLD_FACTOR * * -// This mechanism allows for fast downloading of the remainder of the file. The number should be larger -// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster -// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is -// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively -// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted. - -const MAX_PREFETCH_REQUESTS: usize = 4; -// Limit the number of requests that are pending simultaneously before pre-fetching data. Pending -// requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next -// for playback to be delayed leading to a buffer underrun. This limit has the effect that a new -// pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending. - -pub enum AudioFile { - Cached(fs::File), - Streaming(AudioFileStreaming), -} - -#[derive(Debug)] -enum StreamLoaderCommand { - Fetch(Range), // signal the stream loader to fetch a range of the file - RandomAccessMode(), // optimise download strategy for random access - StreamMode(), // optimise download strategy for streaming - Close(), // terminate and don't load any more data -} - -#[derive(Clone)] -pub struct StreamLoaderController { - channel_tx: Option>, - stream_shared: Option>, - file_size: usize, -} - -impl StreamLoaderController { - pub fn len(&self) -> usize { - self.file_size - } - - pub fn is_empty(&self) -> bool { - self.file_size == 0 - } - - pub fn range_available(&self, range: Range) -> bool { - if let Some(ref shared) = self.stream_shared { - let download_status = shared.download_status.lock().unwrap(); - range.length - <= download_status - .downloaded - .contained_length_from_value(range.start) - } else { - range.length <= self.len() - range.start - } - } - - pub fn range_to_end_available(&self) -> bool { - self.stream_shared.as_ref().map_or(true, |shared| { - let read_position = shared.read_position.load(atomic::Ordering::Relaxed); - self.range_available(Range::new(read_position, self.len() - read_position)) - }) - } - - pub fn ping_time_ms(&self) -> usize { - self.stream_shared.as_ref().map_or(0, |shared| { - shared.ping_time_ms.load(atomic::Ordering::Relaxed) - }) - } - - fn send_stream_loader_command(&self, command: StreamLoaderCommand) { - if let Some(ref channel) = self.channel_tx { - // ignore the error in case the channel has been closed already. - let _ = channel.send(command); - } - } - - pub fn fetch(&self, range: Range) { - // signal the stream loader to fetch a range of the file - self.send_stream_loader_command(StreamLoaderCommand::Fetch(range)); - } - - pub fn fetch_blocking(&self, mut range: Range) { - // signal the stream loader to tech a range of the file and block until it is loaded. - - // ensure the range is within the file's bounds. - if range.start >= self.len() { - range.length = 0; - } else if range.end() > self.len() { - range.length = self.len() - range.start; - } - - self.fetch(range); - - if let Some(ref shared) = self.stream_shared { - let mut download_status = shared.download_status.lock().unwrap(); - while range.length - > download_status - .downloaded - .contained_length_from_value(range.start) - { - download_status = shared - .cond - .wait_timeout(download_status, Duration::from_millis(1000)) - .unwrap() - .0; - if range.length - > (download_status - .downloaded - .union(&download_status.requested) - .contained_length_from_value(range.start)) - { - // For some reason, the requested range is neither downloaded nor requested. - // This could be due to a network error. Request it again. - self.fetch(range); - } - } - } - } - - pub fn fetch_next(&self, length: usize) { - if let Some(ref shared) = self.stream_shared { - let range = Range { - start: shared.read_position.load(atomic::Ordering::Relaxed), - length, - }; - self.fetch(range) - } - } - - pub fn fetch_next_blocking(&self, length: usize) { - if let Some(ref shared) = self.stream_shared { - let range = Range { - start: shared.read_position.load(atomic::Ordering::Relaxed), - length, - }; - self.fetch_blocking(range); - } - } - - pub fn set_random_access_mode(&self) { - // optimise download strategy for random access - self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode()); - } - - pub fn set_stream_mode(&self) { - // optimise download strategy for streaming - self.send_stream_loader_command(StreamLoaderCommand::StreamMode()); - } - - pub fn close(&self) { - // terminate stream loading and don't load any more data for this file. - self.send_stream_loader_command(StreamLoaderCommand::Close()); - } -} - -pub struct AudioFileStreaming { - read_file: fs::File, - position: u64, - stream_loader_command_tx: mpsc::UnboundedSender, - shared: Arc, -} - -struct AudioFileDownloadStatus { - requested: RangeSet, - downloaded: RangeSet, -} - -#[derive(Copy, Clone)] -enum DownloadStrategy { - RandomAccess(), - Streaming(), -} - -struct AudioFileShared { - file_id: FileId, - file_size: usize, - stream_data_rate: usize, - cond: Condvar, - download_status: Mutex, - download_strategy: Mutex, - number_of_open_requests: AtomicUsize, - ping_time_ms: AtomicUsize, - read_position: AtomicUsize, -} - -impl AudioFile { - pub async fn open( - session: &Session, - file_id: FileId, - bytes_per_second: usize, - play_from_beginning: bool, - ) -> Result { - if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) { - debug!("File {} already in cache", file_id); - return Ok(AudioFile::Cached(file)); - } - - debug!("Downloading file {}", file_id); - - let (complete_tx, complete_rx) = oneshot::channel(); - let mut initial_data_length = if play_from_beginning { - INITIAL_DOWNLOAD_SIZE - + max( - (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, - (INITIAL_PING_TIME_ESTIMATE_SECONDS - * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS - * bytes_per_second as f64) as usize, - ) - } else { - INITIAL_DOWNLOAD_SIZE - }; - if initial_data_length % 4 != 0 { - initial_data_length += 4 - (initial_data_length % 4); - } - let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); - - let streaming = AudioFileStreaming::open( - session.clone(), - data, - initial_data_length, - Instant::now(), - headers, - file_id, - complete_tx, - bytes_per_second, - ); - - let session_ = session.clone(); - session.spawn(complete_rx.map_ok(move |mut file| { - if let Some(cache) = session_.cache() { - debug!("File {} complete, saving to cache", file_id); - cache.save_file(file_id, &mut file); - } else { - debug!("File {} complete", file_id); - } - })); - - Ok(AudioFile::Streaming(streaming.await?)) - } - - pub fn get_stream_loader_controller(&self) -> StreamLoaderController { - match self { - AudioFile::Streaming(ref stream) => StreamLoaderController { - channel_tx: Some(stream.stream_loader_command_tx.clone()), - stream_shared: Some(stream.shared.clone()), - file_size: stream.shared.file_size, - }, - AudioFile::Cached(ref file) => StreamLoaderController { - channel_tx: None, - stream_shared: None, - file_size: file.metadata().unwrap().len() as usize, - }, - } - } - - pub fn is_cached(&self) -> bool { - matches!(self, AudioFile::Cached { .. }) - } -} - -impl AudioFileStreaming { - pub async fn open( - session: Session, - initial_data_rx: ChannelData, - initial_data_length: usize, - initial_request_sent_time: Instant, - headers: ChannelHeaders, - file_id: FileId, - complete_tx: oneshot::Sender, - streaming_data_rate: usize, - ) -> Result { - let (_, data) = headers - .try_filter(|(id, _)| future::ready(*id == 0x3)) - .next() - .await - .unwrap()?; - - let size = BigEndian::read_u32(&data) as usize * 4; - - let shared = Arc::new(AudioFileShared { - file_id, - file_size: size, - stream_data_rate: streaming_data_rate, - cond: Condvar::new(), - download_status: Mutex::new(AudioFileDownloadStatus { - requested: RangeSet::new(), - downloaded: RangeSet::new(), - }), - download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise - number_of_open_requests: AtomicUsize::new(0), - ping_time_ms: AtomicUsize::new(0), - read_position: AtomicUsize::new(0), - }); - - let mut write_file = NamedTempFile::new().unwrap(); - write_file.as_file().set_len(size as u64).unwrap(); - write_file.seek(SeekFrom::Start(0)).unwrap(); - - let read_file = write_file.reopen().unwrap(); - - //let (seek_tx, seek_rx) = mpsc::unbounded(); - let (stream_loader_command_tx, stream_loader_command_rx) = - mpsc::unbounded_channel::(); - - let fetcher = AudioFileFetch::new( - session.clone(), - shared.clone(), - initial_data_rx, - initial_request_sent_time, - initial_data_length, - write_file, - stream_loader_command_rx, - complete_tx, - ); - - session.spawn(fetcher); - Ok(AudioFileStreaming { - read_file, - position: 0, - stream_loader_command_tx, - shared, - }) - } -} - -fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { - assert!( - offset % 4 == 0, - "Range request start positions must be aligned by 4 bytes." - ); - assert!( - length % 4 == 0, - "Range request range lengths must be aligned by 4 bytes." - ); - let start = offset / 4; - let end = (offset + length) / 4; - - let (id, channel) = session.channel().allocate(); - - let mut data: Vec = Vec::new(); - data.write_u16::(id).unwrap(); - data.write_u8(0).unwrap(); - data.write_u8(1).unwrap(); - data.write_u16::(0x0000).unwrap(); - data.write_u32::(0x00000000).unwrap(); - data.write_u32::(0x00009C40).unwrap(); - data.write_u32::(0x00020000).unwrap(); - data.write(&file.0).unwrap(); - data.write_u32::(start as u32).unwrap(); - data.write_u32::(end as u32).unwrap(); - - session.send_packet(0x8, data); - - channel -} - -struct PartialFileData { - offset: usize, - data: Bytes, -} - -enum ReceivedData { - ResponseTimeMs(usize), - Data(PartialFileData), -} - -async fn audio_file_fetch_receive_data( - shared: Arc, - file_data_tx: mpsc::UnboundedSender, - mut data_rx: ChannelData, - initial_data_offset: usize, - initial_request_length: usize, - request_sent_time: Instant, -) { - let mut data_offset = initial_data_offset; - let mut request_length = initial_request_length; - let mut measure_ping_time = shared - .number_of_open_requests - .load(atomic::Ordering::SeqCst) - == 0; - - shared - .number_of_open_requests - .fetch_add(1, atomic::Ordering::SeqCst); - - let result = loop { - let data = match data_rx.next().await { - Some(Ok(data)) => data, - Some(Err(e)) => break Err(e), - None => break Ok(()), - }; - - if measure_ping_time { - let duration = Instant::now() - request_sent_time; - let duration_ms: u64; - if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { - duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; - } else { - duration_ms = duration.as_millis() as u64; - } - let _ = file_data_tx.send(ReceivedData::ResponseTimeMs(duration_ms as usize)); - measure_ping_time = false; - } - let data_size = data.len(); - let _ = file_data_tx.send(ReceivedData::Data(PartialFileData { - offset: data_offset, - data, - })); - data_offset += data_size; - if request_length < data_size { - warn!( - "Data receiver for range {} (+{}) received more data from server than requested.", - initial_data_offset, initial_request_length - ); - request_length = 0; - } else { - request_length -= data_size; - } - - if request_length == 0 { - break Ok(()); - } - }; - - if request_length > 0 { - let missing_range = Range::new(data_offset, request_length); - - let mut download_status = shared.download_status.lock().unwrap(); - download_status.requested.subtract_range(&missing_range); - shared.cond.notify_all(); - } - - shared - .number_of_open_requests - .fetch_sub(1, atomic::Ordering::SeqCst); - - if result.is_err() { - warn!( - "Error from channel for data receiver for range {} (+{}).", - initial_data_offset, initial_request_length - ); - } else if request_length > 0 { - warn!( - "Data receiver for range {} (+{}) received less data from server than requested.", - initial_data_offset, initial_request_length - ); - } -} -/* -async fn audio_file_fetch( - session: Session, - shared: Arc, - initial_data_rx: ChannelData, - initial_request_sent_time: Instant, - initial_data_length: usize, - - output: NamedTempFile, - stream_loader_command_rx: mpsc::UnboundedReceiver, - complete_tx: oneshot::Sender, -) { - let (file_data_tx, file_data_rx) = unbounded::(); - - let requested_range = Range::new(0, initial_data_length); - let mut download_status = shared.download_status.lock().unwrap(); - download_status.requested.add_range(&requested_range); - - session.spawn(audio_file_fetch_receive_data( - shared.clone(), - file_data_tx.clone(), - initial_data_rx, - 0, - initial_data_length, - initial_request_sent_time, - )); - - let mut network_response_times_ms: Vec::new(); - - let f1 = file_data_rx.map(|x| Ok::<_, ()>(x)).try_for_each(|x| { - match x { - ReceivedData::ResponseTimeMs(response_time_ms) => { - trace!("Ping time estimated as: {} ms.", response_time_ms); - - // record the response time - network_response_times_ms.push(response_time_ms); - - // prune old response times. Keep at most three. - while network_response_times_ms.len() > 3 { - network_response_times_ms.remove(0); - } - - // stats::median is experimental. So we calculate the median of up to three ourselves. - let ping_time_ms: usize = match network_response_times_ms.len() { - 1 => network_response_times_ms[0] as usize, - 2 => { - ((network_response_times_ms[0] + network_response_times_ms[1]) / 2) as usize - } - 3 => { - let mut times = network_response_times_ms.clone(); - times.sort(); - times[1] - } - _ => unreachable!(), - }; - - // store our new estimate for everyone to see - shared - .ping_time_ms - .store(ping_time_ms, atomic::Ordering::Relaxed); - } - ReceivedData::Data(data) => { - output - .as_mut() - .unwrap() - .seek(SeekFrom::Start(data.offset as u64)) - .unwrap(); - output - .as_mut() - .unwrap() - .write_all(data.data.as_ref()) - .unwrap(); - - let mut full = false; - - { - let mut download_status = shared.download_status.lock().unwrap(); - - let received_range = Range::new(data.offset, data.data.len()); - download_status.downloaded.add_range(&received_range); - shared.cond.notify_all(); - - if download_status.downloaded.contained_length_from_value(0) - >= shared.file_size - { - full = true; - } - - drop(download_status); - } - - if full { - self.finish(); - return future::ready(Err(())); - } - } - } - future::ready(Ok(())) - }); - - let f2 = stream_loader_command_rx.map(Ok::<_, ()>).try_for_each(|x| { - match cmd { - StreamLoaderCommand::Fetch(request) => { - self.download_range(request.start, request.length); - } - StreamLoaderCommand::RandomAccessMode() => { - *(shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); - } - StreamLoaderCommand::StreamMode() => { - *(shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); - } - StreamLoaderCommand::Close() => return future::ready(Err(())), - } - Ok(()) - }); - - let f3 = future::poll_fn(|_| { - if let DownloadStrategy::Streaming() = self.get_download_strategy() { - let number_of_open_requests = shared - .number_of_open_requests - .load(atomic::Ordering::SeqCst); - let max_requests_to_send = - MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests); - - if max_requests_to_send > 0 { - let bytes_pending: usize = { - let download_status = shared.download_status.lock().unwrap(); - download_status - .requested - .minus(&download_status.downloaded) - .len() - }; - - let ping_time_seconds = - 0.001 * shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - let download_rate = session.channel().get_download_rate_estimate(); - - let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * shared.stream_data_rate as f64) - as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) - as usize, - ); - - if bytes_pending < desired_pending_bytes { - self.pre_fetch_more_data( - desired_pending_bytes - bytes_pending, - max_requests_to_send, - ); - } - } - } - Poll::Pending - }); - future::select_all(vec![f1, f2, f3]).await -}*/ - -struct AudioFileFetch { - session: Session, - shared: Arc, - output: Option, - - file_data_tx: mpsc::UnboundedSender, - file_data_rx: mpsc::UnboundedReceiver, - - stream_loader_command_rx: mpsc::UnboundedReceiver, - complete_tx: Option>, - network_response_times_ms: Vec, -} - -impl AudioFileFetch { - fn new( - session: Session, - shared: Arc, - initial_data_rx: ChannelData, - initial_request_sent_time: Instant, - initial_data_length: usize, - - output: NamedTempFile, - stream_loader_command_rx: mpsc::UnboundedReceiver, - complete_tx: oneshot::Sender, - ) -> AudioFileFetch { - let (file_data_tx, file_data_rx) = mpsc::unbounded_channel::(); - - { - let requested_range = Range::new(0, initial_data_length); - let mut download_status = shared.download_status.lock().unwrap(); - download_status.requested.add_range(&requested_range); - } - - session.spawn(audio_file_fetch_receive_data( - shared.clone(), - file_data_tx.clone(), - initial_data_rx, - 0, - initial_data_length, - initial_request_sent_time, - )); - - AudioFileFetch { - session, - shared, - output: Some(output), - file_data_tx, - file_data_rx, - stream_loader_command_rx, - complete_tx: Some(complete_tx), - network_response_times_ms: Vec::new(), - } - } - - fn get_download_strategy(&mut self) -> DownloadStrategy { - *(self.shared.download_strategy.lock().unwrap()) - } - - fn download_range(&mut self, mut offset: usize, mut length: usize) { - if length < MINIMUM_DOWNLOAD_SIZE { - length = MINIMUM_DOWNLOAD_SIZE; - } - - // ensure the values are within the bounds and align them by 4 for the spotify protocol. - if offset >= self.shared.file_size { - return; - } - - if length == 0 { - return; - } - - if offset + length > self.shared.file_size { - length = self.shared.file_size - offset; - } - - if offset % 4 != 0 { - length += offset % 4; - offset -= offset % 4; - } - - if length % 4 != 0 { - length += 4 - (length % 4); - } - - let mut ranges_to_request = RangeSet::new(); - ranges_to_request.add_range(&Range::new(offset, length)); - - let mut download_status = self.shared.download_status.lock().unwrap(); - - ranges_to_request.subtract_range_set(&download_status.downloaded); - ranges_to_request.subtract_range_set(&download_status.requested); - - for range in ranges_to_request.iter() { - let (_headers, data) = request_range( - &self.session, - self.shared.file_id, - range.start, - range.length, - ) - .split(); - - download_status.requested.add_range(range); - - self.session.spawn(audio_file_fetch_receive_data( - self.shared.clone(), - self.file_data_tx.clone(), - data, - range.start, - range.length, - Instant::now(), - )); - } - } - - fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) { - let mut bytes_to_go = bytes; - let mut requests_to_go = max_requests_to_send; - - while bytes_to_go > 0 && requests_to_go > 0 { - // determine what is still missing - let mut missing_data = RangeSet::new(); - missing_data.add_range(&Range::new(0, self.shared.file_size)); - { - let download_status = self.shared.download_status.lock().unwrap(); - missing_data.subtract_range_set(&download_status.downloaded); - missing_data.subtract_range_set(&download_status.requested); - } - - // download data from after the current read position first - let mut tail_end = RangeSet::new(); - let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); - tail_end.add_range(&Range::new( - read_position, - self.shared.file_size - read_position, - )); - let tail_end = tail_end.intersection(&missing_data); - - if !tail_end.is_empty() { - let range = tail_end.get_range(0); - let offset = range.start; - let length = min(range.length, bytes_to_go); - self.download_range(offset, length); - requests_to_go -= 1; - bytes_to_go -= length; - } else if !missing_data.is_empty() { - // ok, the tail is downloaded, download something fom the beginning. - let range = missing_data.get_range(0); - let offset = range.start; - let length = min(range.length, bytes_to_go); - self.download_range(offset, length); - requests_to_go -= 1; - bytes_to_go -= length; - } else { - return; - } - } - } - - fn poll_file_data_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { - loop { - match self.file_data_rx.poll_recv(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => { - trace!("Ping time estimated as: {} ms.", response_time_ms); - - // record the response time - self.network_response_times_ms.push(response_time_ms); - - // prune old response times. Keep at most three. - while self.network_response_times_ms.len() > 3 { - self.network_response_times_ms.remove(0); - } - - // stats::median is experimental. So we calculate the median of up to three ourselves. - let ping_time_ms: usize = match self.network_response_times_ms.len() { - 1 => self.network_response_times_ms[0] as usize, - 2 => { - ((self.network_response_times_ms[0] - + self.network_response_times_ms[1]) - / 2) as usize - } - 3 => { - let mut times = self.network_response_times_ms.clone(); - times.sort_unstable(); - times[1] - } - _ => unreachable!(), - }; - - // store our new estimate for everyone to see - self.shared - .ping_time_ms - .store(ping_time_ms, atomic::Ordering::Relaxed); - } - Poll::Ready(Some(ReceivedData::Data(data))) => { - self.output - .as_mut() - .unwrap() - .seek(SeekFrom::Start(data.offset as u64)) - .unwrap(); - self.output - .as_mut() - .unwrap() - .write_all(data.data.as_ref()) - .unwrap(); - - let mut full = false; - - { - let mut download_status = self.shared.download_status.lock().unwrap(); - - let received_range = Range::new(data.offset, data.data.len()); - download_status.downloaded.add_range(&received_range); - self.shared.cond.notify_all(); - - if download_status.downloaded.contained_length_from_value(0) - >= self.shared.file_size - { - full = true; - } - - drop(download_status); - } - - if full { - self.finish(); - return Poll::Ready(()); - } - } - Poll::Pending => return Poll::Pending, - } - } - } - - fn poll_stream_loader_command_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { - loop { - match self.stream_loader_command_rx.poll_recv(cx) { - Poll::Ready(None) => return Poll::Ready(()), - Poll::Ready(Some(cmd)) => match cmd { - StreamLoaderCommand::Fetch(request) => { - self.download_range(request.start, request.length); - } - StreamLoaderCommand::RandomAccessMode() => { - *(self.shared.download_strategy.lock().unwrap()) = - DownloadStrategy::RandomAccess(); - } - StreamLoaderCommand::StreamMode() => { - *(self.shared.download_strategy.lock().unwrap()) = - DownloadStrategy::Streaming(); - } - StreamLoaderCommand::Close() => return Poll::Ready(()), - }, - Poll::Pending => return Poll::Pending, - } - } - } - - fn finish(&mut self) { - let mut output = self.output.take().unwrap(); - let complete_tx = self.complete_tx.take().unwrap(); - - output.seek(SeekFrom::Start(0)).unwrap(); - let _ = complete_tx.send(output); - } -} -impl Future for AudioFileFetch { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if let Poll::Ready(()) = self.poll_stream_loader_command_rx(cx) { - return Poll::Ready(()); - } - - if let Poll::Ready(()) = self.poll_file_data_rx(cx) { - return Poll::Ready(()); - } - - if let DownloadStrategy::Streaming() = self.get_download_strategy() { - let number_of_open_requests = self - .shared - .number_of_open_requests - .load(atomic::Ordering::SeqCst); - let max_requests_to_send = - MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests); - - if max_requests_to_send > 0 { - let bytes_pending: usize = { - let download_status = self.shared.download_status.lock().unwrap(); - download_status - .requested - .minus(&download_status.downloaded) - .len() - }; - - let ping_time_seconds = - 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - let download_rate = self.session.channel().get_download_rate_estimate(); - - let desired_pending_bytes = max( - (PREFETCH_THRESHOLD_FACTOR - * ping_time_seconds - * self.shared.stream_data_rate as f64) as usize, - (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) - as usize, - ); - - if bytes_pending < desired_pending_bytes { - self.pre_fetch_more_data( - desired_pending_bytes - bytes_pending, - max_requests_to_send, - ); - } - } - } - Poll::Pending - } -} - -impl Read for AudioFileStreaming { - fn read(&mut self, output: &mut [u8]) -> io::Result { - let offset = self.position as usize; - - if offset >= self.shared.file_size { - return Ok(0); - } - - let length = min(output.len(), self.shared.file_size - offset); - - let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) { - DownloadStrategy::RandomAccess() => length, - DownloadStrategy::Streaming() => { - // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded. - let ping_time_seconds = - 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; - - let length_to_request = length - + max( - (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) - as usize, - (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS - * ping_time_seconds - * self.shared.stream_data_rate as f64) as usize, - ); - min(length_to_request, self.shared.file_size - offset) - } - }; - - let mut ranges_to_request = RangeSet::new(); - ranges_to_request.add_range(&Range::new(offset, length_to_request)); - - let mut download_status = self.shared.download_status.lock().unwrap(); - ranges_to_request.subtract_range_set(&download_status.downloaded); - ranges_to_request.subtract_range_set(&download_status.requested); - - for &range in ranges_to_request.iter() { - self.stream_loader_command_tx - .send(StreamLoaderCommand::Fetch(range)) - .unwrap(); - } - - if length == 0 { - return Ok(0); - } - - let mut download_message_printed = false; - while !download_status.downloaded.contains(offset) { - if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() { - if !download_message_printed { - debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); - download_message_printed = true; - } - } - download_status = self - .shared - .cond - .wait_timeout(download_status, Duration::from_millis(1000)) - .unwrap() - .0; - } - let available_length = download_status - .downloaded - .contained_length_from_value(offset); - assert!(available_length > 0); - drop(download_status); - - self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap(); - let read_len = min(length, available_length); - let read_len = self.read_file.read(&mut output[..read_len])?; - - if download_message_printed { - debug!( - "Read at postion {} completed. {} bytes returned, {} bytes were requested.", - offset, - read_len, - output.len() - ); - } - - self.position += read_len as u64; - self.shared - .read_position - .store(self.position as usize, atomic::Ordering::Relaxed); - - Ok(read_len) - } -} - -impl Seek for AudioFileStreaming { - fn seek(&mut self, pos: SeekFrom) -> io::Result { - self.position = self.read_file.seek(pos)?; - // Do not seek past EOF - self.shared - .read_position - .store(self.position as usize, atomic::Ordering::Relaxed); - Ok(self.position) - } -} - -impl Read for AudioFile { - fn read(&mut self, output: &mut [u8]) -> io::Result { - match *self { - AudioFile::Cached(ref mut file) => file.read(output), - AudioFile::Streaming(ref mut file) => file.read(output), - } - } -} - -impl Seek for AudioFile { - fn seek(&mut self, pos: SeekFrom) -> io::Result { - match *self { - AudioFile::Cached(ref mut file) => file.seek(pos), - AudioFile::Streaming(ref mut file) => file.seek(pos), - } - } -} diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs new file mode 100644 index 00000000..c19fac2e --- /dev/null +++ b/audio/src/fetch/mod.rs @@ -0,0 +1,509 @@ +mod receive; + +use std::cmp::{max, min}; +use std::fs; +use std::io::{self, Read, Seek, SeekFrom}; +use std::sync::atomic::{self, AtomicUsize}; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::{Duration, Instant}; + +use byteorder::{BigEndian, ByteOrder}; +use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt}; +use librespot_core::channel::{ChannelData, ChannelError, ChannelHeaders}; +use librespot_core::session::Session; +use librespot_core::spotify_id::FileId; +use tempfile::NamedTempFile; +use tokio::sync::{mpsc, oneshot}; + +use self::receive::{audio_file_fetch, request_range}; +use crate::range_set::{Range, RangeSet}; + +const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; +// The minimum size of a block that is requested from the Spotify servers in one request. +// This is the block size that is typically requested while doing a seek() on a file. +// Note: smaller requests can happen if part of the block is downloaded already. + +const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; +// The amount of data that is requested when initially opening a file. +// Note: if the file is opened to play from the beginning, the amount of data to +// read ahead is requested in addition to this amount. If the file is opened to seek to +// another position, then only this amount is requested on the first request. + +const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5; +// The pig time that is used for calculations before a ping time was actually measured. + +const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5; +// If the measured ping time to the Spotify server is larger than this value, it is capped +// to avoid run-away block sizes and pre-fetching. + +pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0; +// Before playback starts, this many seconds of data must be present. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0; +// Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping +// time to the Spotify server. +// Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are +// obeyed. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0; +// While playing back, this many seconds of data ahead of the current read position are +// requested. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0; +// Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping +// time to the Spotify server. +// Note: the calculations are done using the nominal bitrate of the file. The actual amount +// of audio data may be larger or smaller. + +const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0; +// If the amount of data that is pending (requested but not received) is less than a certain amount, +// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more +// data is calculated as +// < PREFETCH_THRESHOLD_FACTOR * * + +const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5; +// Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account. +// The formula used is +// < FAST_PREFETCH_THRESHOLD_FACTOR * * +// This mechanism allows for fast downloading of the remainder of the file. The number should be larger +// than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster +// the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is +// performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively +// only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted. + +const MAX_PREFETCH_REQUESTS: usize = 4; +// Limit the number of requests that are pending simultaneously before pre-fetching data. Pending +// requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next +// for playback to be delayed leading to a buffer underrun. This limit has the effect that a new +// pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending. + +pub enum AudioFile { + Cached(fs::File), + Streaming(AudioFileStreaming), +} + +#[derive(Debug)] +enum StreamLoaderCommand { + Fetch(Range), // signal the stream loader to fetch a range of the file + RandomAccessMode(), // optimise download strategy for random access + StreamMode(), // optimise download strategy for streaming + Close(), // terminate and don't load any more data +} + +#[derive(Clone)] +pub struct StreamLoaderController { + channel_tx: Option>, + stream_shared: Option>, + file_size: usize, +} + +impl StreamLoaderController { + pub fn len(&self) -> usize { + self.file_size + } + + pub fn is_empty(&self) -> bool { + self.file_size == 0 + } + + pub fn range_available(&self, range: Range) -> bool { + if let Some(ref shared) = self.stream_shared { + let download_status = shared.download_status.lock().unwrap(); + range.length + <= download_status + .downloaded + .contained_length_from_value(range.start) + } else { + range.length <= self.len() - range.start + } + } + + pub fn range_to_end_available(&self) -> bool { + self.stream_shared.as_ref().map_or(true, |shared| { + let read_position = shared.read_position.load(atomic::Ordering::Relaxed); + self.range_available(Range::new(read_position, self.len() - read_position)) + }) + } + + pub fn ping_time_ms(&self) -> usize { + self.stream_shared.as_ref().map_or(0, |shared| { + shared.ping_time_ms.load(atomic::Ordering::Relaxed) + }) + } + + fn send_stream_loader_command(&self, command: StreamLoaderCommand) { + if let Some(ref channel) = self.channel_tx { + // ignore the error in case the channel has been closed already. + let _ = channel.send(command); + } + } + + pub fn fetch(&self, range: Range) { + // signal the stream loader to fetch a range of the file + self.send_stream_loader_command(StreamLoaderCommand::Fetch(range)); + } + + pub fn fetch_blocking(&self, mut range: Range) { + // signal the stream loader to tech a range of the file and block until it is loaded. + + // ensure the range is within the file's bounds. + if range.start >= self.len() { + range.length = 0; + } else if range.end() > self.len() { + range.length = self.len() - range.start; + } + + self.fetch(range); + + if let Some(ref shared) = self.stream_shared { + let mut download_status = shared.download_status.lock().unwrap(); + while range.length + > download_status + .downloaded + .contained_length_from_value(range.start) + { + download_status = shared + .cond + .wait_timeout(download_status, Duration::from_millis(1000)) + .unwrap() + .0; + if range.length + > (download_status + .downloaded + .union(&download_status.requested) + .contained_length_from_value(range.start)) + { + // For some reason, the requested range is neither downloaded nor requested. + // This could be due to a network error. Request it again. + self.fetch(range); + } + } + } + } + + pub fn fetch_next(&self, length: usize) { + if let Some(ref shared) = self.stream_shared { + let range = Range { + start: shared.read_position.load(atomic::Ordering::Relaxed), + length, + }; + self.fetch(range) + } + } + + pub fn fetch_next_blocking(&self, length: usize) { + if let Some(ref shared) = self.stream_shared { + let range = Range { + start: shared.read_position.load(atomic::Ordering::Relaxed), + length, + }; + self.fetch_blocking(range); + } + } + + pub fn set_random_access_mode(&self) { + // optimise download strategy for random access + self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode()); + } + + pub fn set_stream_mode(&self) { + // optimise download strategy for streaming + self.send_stream_loader_command(StreamLoaderCommand::StreamMode()); + } + + pub fn close(&self) { + // terminate stream loading and don't load any more data for this file. + self.send_stream_loader_command(StreamLoaderCommand::Close()); + } +} + +pub struct AudioFileStreaming { + read_file: fs::File, + position: u64, + stream_loader_command_tx: mpsc::UnboundedSender, + shared: Arc, +} + +struct AudioFileDownloadStatus { + requested: RangeSet, + downloaded: RangeSet, +} + +#[derive(Copy, Clone, PartialEq, Eq)] +enum DownloadStrategy { + RandomAccess(), + Streaming(), +} + +struct AudioFileShared { + file_id: FileId, + file_size: usize, + stream_data_rate: usize, + cond: Condvar, + download_status: Mutex, + download_strategy: Mutex, + ping_time_ms: AtomicUsize, + read_position: AtomicUsize, +} + +impl AudioFile { + pub async fn open( + session: &Session, + file_id: FileId, + bytes_per_second: usize, + play_from_beginning: bool, + ) -> Result { + if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) { + debug!("File {} already in cache", file_id); + return Ok(AudioFile::Cached(file)); + } + + debug!("Downloading file {}", file_id); + + let (complete_tx, complete_rx) = oneshot::channel(); + let mut initial_data_length = if play_from_beginning { + INITIAL_DOWNLOAD_SIZE + + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + (INITIAL_PING_TIME_ESTIMATE_SECONDS + * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * bytes_per_second as f64) as usize, + ) + } else { + INITIAL_DOWNLOAD_SIZE + }; + if initial_data_length % 4 != 0 { + initial_data_length += 4 - (initial_data_length % 4); + } + let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); + + let streaming = AudioFileStreaming::open( + session.clone(), + data, + initial_data_length, + Instant::now(), + headers, + file_id, + complete_tx, + bytes_per_second, + ); + + let session_ = session.clone(); + session.spawn(complete_rx.map_ok(move |mut file| { + if let Some(cache) = session_.cache() { + debug!("File {} complete, saving to cache", file_id); + cache.save_file(file_id, &mut file); + } else { + debug!("File {} complete", file_id); + } + })); + + Ok(AudioFile::Streaming(streaming.await?)) + } + + pub fn get_stream_loader_controller(&self) -> StreamLoaderController { + match self { + AudioFile::Streaming(ref stream) => StreamLoaderController { + channel_tx: Some(stream.stream_loader_command_tx.clone()), + stream_shared: Some(stream.shared.clone()), + file_size: stream.shared.file_size, + }, + AudioFile::Cached(ref file) => StreamLoaderController { + channel_tx: None, + stream_shared: None, + file_size: file.metadata().unwrap().len() as usize, + }, + } + } + + pub fn is_cached(&self) -> bool { + matches!(self, AudioFile::Cached { .. }) + } +} + +impl AudioFileStreaming { + pub async fn open( + session: Session, + initial_data_rx: ChannelData, + initial_data_length: usize, + initial_request_sent_time: Instant, + headers: ChannelHeaders, + file_id: FileId, + complete_tx: oneshot::Sender, + streaming_data_rate: usize, + ) -> Result { + let (_, data) = headers + .try_filter(|(id, _)| future::ready(*id == 0x3)) + .next() + .await + .unwrap()?; + + let size = BigEndian::read_u32(&data) as usize * 4; + + let shared = Arc::new(AudioFileShared { + file_id, + file_size: size, + stream_data_rate: streaming_data_rate, + cond: Condvar::new(), + download_status: Mutex::new(AudioFileDownloadStatus { + requested: RangeSet::new(), + downloaded: RangeSet::new(), + }), + download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise + ping_time_ms: AtomicUsize::new(0), + read_position: AtomicUsize::new(0), + }); + + let mut write_file = NamedTempFile::new().unwrap(); + write_file.as_file().set_len(size as u64).unwrap(); + write_file.seek(SeekFrom::Start(0)).unwrap(); + + let read_file = write_file.reopen().unwrap(); + + //let (seek_tx, seek_rx) = mpsc::unbounded(); + let (stream_loader_command_tx, stream_loader_command_rx) = + mpsc::unbounded_channel::(); + + session.spawn(audio_file_fetch( + session.clone(), + shared.clone(), + initial_data_rx, + initial_request_sent_time, + initial_data_length, + write_file, + stream_loader_command_rx, + complete_tx, + )); + + Ok(AudioFileStreaming { + read_file, + position: 0, + stream_loader_command_tx, + shared, + }) + } +} + +impl Read for AudioFileStreaming { + fn read(&mut self, output: &mut [u8]) -> io::Result { + let offset = self.position as usize; + + if offset >= self.shared.file_size { + return Ok(0); + } + + let length = min(output.len(), self.shared.file_size - offset); + + let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) { + DownloadStrategy::RandomAccess() => length, + DownloadStrategy::Streaming() => { + // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded. + let ping_time_seconds = + 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + + let length_to_request = length + + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64) + as usize, + (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * ping_time_seconds + * self.shared.stream_data_rate as f64) as usize, + ); + min(length_to_request, self.shared.file_size - offset) + } + }; + + let mut ranges_to_request = RangeSet::new(); + ranges_to_request.add_range(&Range::new(offset, length_to_request)); + + let mut download_status = self.shared.download_status.lock().unwrap(); + ranges_to_request.subtract_range_set(&download_status.downloaded); + ranges_to_request.subtract_range_set(&download_status.requested); + + for &range in ranges_to_request.iter() { + self.stream_loader_command_tx + .send(StreamLoaderCommand::Fetch(range)) + .unwrap(); + } + + if length == 0 { + return Ok(0); + } + + let mut download_message_printed = false; + while !download_status.downloaded.contains(offset) { + if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() { + if !download_message_printed { + debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); + download_message_printed = true; + } + } + download_status = self + .shared + .cond + .wait_timeout(download_status, Duration::from_millis(1000)) + .unwrap() + .0; + } + let available_length = download_status + .downloaded + .contained_length_from_value(offset); + assert!(available_length > 0); + drop(download_status); + + self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap(); + let read_len = min(length, available_length); + let read_len = self.read_file.read(&mut output[..read_len])?; + + if download_message_printed { + debug!( + "Read at postion {} completed. {} bytes returned, {} bytes were requested.", + offset, + read_len, + output.len() + ); + } + + self.position += read_len as u64; + self.shared + .read_position + .store(self.position as usize, atomic::Ordering::Relaxed); + + Ok(read_len) + } +} + +impl Seek for AudioFileStreaming { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.position = self.read_file.seek(pos)?; + // Do not seek past EOF + self.shared + .read_position + .store(self.position as usize, atomic::Ordering::Relaxed); + Ok(self.position) + } +} + +impl Read for AudioFile { + fn read(&mut self, output: &mut [u8]) -> io::Result { + match *self { + AudioFile::Cached(ref mut file) => file.read(output), + AudioFile::Streaming(ref mut file) => file.read(output), + } + } +} + +impl Seek for AudioFile { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + match *self { + AudioFile::Cached(ref mut file) => file.seek(pos), + AudioFile::Streaming(ref mut file) => file.seek(pos), + } + } +} diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs new file mode 100644 index 00000000..17f884f5 --- /dev/null +++ b/audio/src/fetch/receive.rs @@ -0,0 +1,455 @@ +use std::cmp::{max, min}; +use std::io::{Seek, SeekFrom, Write}; +use std::sync::{atomic, Arc}; +use std::time::Instant; + +use byteorder::{BigEndian, WriteBytesExt}; +use bytes::Bytes; +use futures_util::StreamExt; +use librespot_core::channel::{Channel, ChannelData}; +use librespot_core::session::Session; +use librespot_core::spotify_id::FileId; +use tempfile::NamedTempFile; +use tokio::sync::{mpsc, oneshot}; + +use crate::range_set::{Range, RangeSet}; + +use super::{AudioFileShared, DownloadStrategy, StreamLoaderCommand}; +use super::{ + FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME_SECONDS, MAX_PREFETCH_REQUESTS, + MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR, +}; + +pub fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { + assert!( + offset % 4 == 0, + "Range request start positions must be aligned by 4 bytes." + ); + assert!( + length % 4 == 0, + "Range request range lengths must be aligned by 4 bytes." + ); + let start = offset / 4; + let end = (offset + length) / 4; + + let (id, channel) = session.channel().allocate(); + + let mut data: Vec = Vec::new(); + data.write_u16::(id).unwrap(); + data.write_u8(0).unwrap(); + data.write_u8(1).unwrap(); + data.write_u16::(0x0000).unwrap(); + data.write_u32::(0x00000000).unwrap(); + data.write_u32::(0x00009C40).unwrap(); + data.write_u32::(0x00020000).unwrap(); + data.write(&file.0).unwrap(); + data.write_u32::(start as u32).unwrap(); + data.write_u32::(end as u32).unwrap(); + + session.send_packet(0x8, data); + + channel +} + +struct PartialFileData { + offset: usize, + data: Bytes, +} + +enum ReceivedData { + ResponseTimeMs(usize), + Data(PartialFileData), +} + +async fn receive_data( + shared: Arc, + file_data_tx: mpsc::UnboundedSender, + mut data_rx: ChannelData, + initial_data_offset: usize, + initial_request_length: usize, + request_sent_time: Instant, + mut measure_ping_time: bool, + finish_tx: mpsc::UnboundedSender<()>, +) { + let mut data_offset = initial_data_offset; + let mut request_length = initial_request_length; + + let result = loop { + let data = match data_rx.next().await { + Some(Ok(data)) => data, + Some(Err(e)) => break Err(e), + None => break Ok(()), + }; + + if measure_ping_time { + let duration = Instant::now() - request_sent_time; + let duration_ms: u64; + if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS { + duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; + } else { + duration_ms = duration.as_millis() as u64; + } + let _ = file_data_tx.send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + measure_ping_time = false; + } + let data_size = data.len(); + let _ = file_data_tx.send(ReceivedData::Data(PartialFileData { + offset: data_offset, + data, + })); + data_offset += data_size; + if request_length < data_size { + warn!( + "Data receiver for range {} (+{}) received more data from server than requested.", + initial_data_offset, initial_request_length + ); + request_length = 0; + } else { + request_length -= data_size; + } + + if request_length == 0 { + break Ok(()); + } + }; + + if request_length > 0 { + let missing_range = Range::new(data_offset, request_length); + + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.subtract_range(&missing_range); + shared.cond.notify_all(); + } + + let _ = finish_tx.send(()); + + if result.is_err() { + warn!( + "Error from channel for data receiver for range {} (+{}).", + initial_data_offset, initial_request_length + ); + } else if request_length > 0 { + warn!( + "Data receiver for range {} (+{}) received less data from server than requested.", + initial_data_offset, initial_request_length + ); + } +} + +struct AudioFileFetch { + session: Session, + shared: Arc, + output: Option, + + file_data_tx: mpsc::UnboundedSender, + complete_tx: Option>, + network_response_times_ms: Vec, + number_of_open_requests: usize, + + download_finish_tx: mpsc::UnboundedSender<()>, +} + +// Might be replaced by enum from std once stable +#[derive(PartialEq, Eq)] +enum ControlFlow { + Break, + Continue, +} + +impl AudioFileFetch { + fn get_download_strategy(&mut self) -> DownloadStrategy { + *(self.shared.download_strategy.lock().unwrap()) + } + + fn download_range(&mut self, mut offset: usize, mut length: usize) { + if length < MINIMUM_DOWNLOAD_SIZE { + length = MINIMUM_DOWNLOAD_SIZE; + } + + // ensure the values are within the bounds and align them by 4 for the spotify protocol. + if offset >= self.shared.file_size { + return; + } + + if length == 0 { + return; + } + + if offset + length > self.shared.file_size { + length = self.shared.file_size - offset; + } + + if offset % 4 != 0 { + length += offset % 4; + offset -= offset % 4; + } + + if length % 4 != 0 { + length += 4 - (length % 4); + } + + let mut ranges_to_request = RangeSet::new(); + ranges_to_request.add_range(&Range::new(offset, length)); + + let mut download_status = self.shared.download_status.lock().unwrap(); + + ranges_to_request.subtract_range_set(&download_status.downloaded); + ranges_to_request.subtract_range_set(&download_status.requested); + + for range in ranges_to_request.iter() { + let (_headers, data) = request_range( + &self.session, + self.shared.file_id, + range.start, + range.length, + ) + .split(); + + download_status.requested.add_range(range); + + self.session.spawn(receive_data( + self.shared.clone(), + self.file_data_tx.clone(), + data, + range.start, + range.length, + Instant::now(), + self.number_of_open_requests == 0, + self.download_finish_tx.clone(), + )); + + self.number_of_open_requests += 1; + } + } + + fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) { + let mut bytes_to_go = bytes; + let mut requests_to_go = max_requests_to_send; + + while bytes_to_go > 0 && requests_to_go > 0 { + // determine what is still missing + let mut missing_data = RangeSet::new(); + missing_data.add_range(&Range::new(0, self.shared.file_size)); + { + let download_status = self.shared.download_status.lock().unwrap(); + missing_data.subtract_range_set(&download_status.downloaded); + missing_data.subtract_range_set(&download_status.requested); + } + + // download data from after the current read position first + let mut tail_end = RangeSet::new(); + let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed); + tail_end.add_range(&Range::new( + read_position, + self.shared.file_size - read_position, + )); + let tail_end = tail_end.intersection(&missing_data); + + if !tail_end.is_empty() { + let range = tail_end.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + requests_to_go -= 1; + bytes_to_go -= length; + } else if !missing_data.is_empty() { + // ok, the tail is downloaded, download something fom the beginning. + let range = missing_data.get_range(0); + let offset = range.start; + let length = min(range.length, bytes_to_go); + self.download_range(offset, length); + requests_to_go -= 1; + bytes_to_go -= length; + } else { + return; + } + } + } + + fn handle_file_data(&mut self, data: ReceivedData) -> ControlFlow { + match data { + ReceivedData::ResponseTimeMs(response_time_ms) => { + trace!("Ping time estimated as: {} ms.", response_time_ms); + + // record the response time + self.network_response_times_ms.push(response_time_ms); + + // prune old response times. Keep at most three. + while self.network_response_times_ms.len() > 3 { + self.network_response_times_ms.remove(0); + } + + // stats::median is experimental. So we calculate the median of up to three ourselves. + let ping_time_ms: usize = match self.network_response_times_ms.len() { + 1 => self.network_response_times_ms[0] as usize, + 2 => { + ((self.network_response_times_ms[0] + self.network_response_times_ms[1]) + / 2) as usize + } + 3 => { + let mut times = self.network_response_times_ms.clone(); + times.sort_unstable(); + times[1] + } + _ => unreachable!(), + }; + + // store our new estimate for everyone to see + self.shared + .ping_time_ms + .store(ping_time_ms, atomic::Ordering::Relaxed); + } + ReceivedData::Data(data) => { + self.output + .as_mut() + .unwrap() + .seek(SeekFrom::Start(data.offset as u64)) + .unwrap(); + self.output + .as_mut() + .unwrap() + .write_all(data.data.as_ref()) + .unwrap(); + + let mut download_status = self.shared.download_status.lock().unwrap(); + + let received_range = Range::new(data.offset, data.data.len()); + download_status.downloaded.add_range(&received_range); + self.shared.cond.notify_all(); + + let full = download_status.downloaded.contained_length_from_value(0) + >= self.shared.file_size; + + drop(download_status); + + if full { + self.finish(); + return ControlFlow::Break; + } + } + } + ControlFlow::Continue + } + + fn handle_stream_loader_command(&mut self, cmd: StreamLoaderCommand) -> ControlFlow { + match cmd { + StreamLoaderCommand::Fetch(request) => { + self.download_range(request.start, request.length); + } + StreamLoaderCommand::RandomAccessMode() => { + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); + } + StreamLoaderCommand::StreamMode() => { + *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); + self.trigger_preload(); + } + StreamLoaderCommand::Close() => return ControlFlow::Break, + } + ControlFlow::Continue + } + + fn finish(&mut self) { + let mut output = self.output.take().unwrap(); + let complete_tx = self.complete_tx.take().unwrap(); + + output.seek(SeekFrom::Start(0)).unwrap(); + let _ = complete_tx.send(output); + } + + fn trigger_preload(&mut self) { + if self.number_of_open_requests >= MAX_PREFETCH_REQUESTS { + return; + } + + let max_requests_to_send = MAX_PREFETCH_REQUESTS - self.number_of_open_requests; + + let bytes_pending: usize = { + let download_status = self.shared.download_status.lock().unwrap(); + download_status + .requested + .minus(&download_status.downloaded) + .len() + }; + + let ping_time_seconds = + 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let download_rate = self.session.channel().get_download_rate_estimate(); + + let desired_pending_bytes = max( + (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * self.shared.stream_data_rate as f64) + as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) as usize, + ); + + if bytes_pending < desired_pending_bytes { + self.pre_fetch_more_data(desired_pending_bytes - bytes_pending, max_requests_to_send); + } + } +} + +pub(super) async fn audio_file_fetch( + session: Session, + shared: Arc, + initial_data_rx: ChannelData, + initial_request_sent_time: Instant, + initial_data_length: usize, + + output: NamedTempFile, + mut stream_loader_command_rx: mpsc::UnboundedReceiver, + complete_tx: oneshot::Sender, +) { + let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel(); + let (download_finish_tx, mut download_finish_rx) = mpsc::unbounded_channel(); + + { + let requested_range = Range::new(0, initial_data_length); + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.add_range(&requested_range); + } + + session.spawn(receive_data( + shared.clone(), + file_data_tx.clone(), + initial_data_rx, + 0, + initial_data_length, + initial_request_sent_time, + true, + download_finish_tx.clone(), + )); + + let mut fetch = AudioFileFetch { + session, + shared, + output: Some(output), + + file_data_tx, + complete_tx: Some(complete_tx), + network_response_times_ms: Vec::new(), + number_of_open_requests: 1, + + download_finish_tx, + }; + + loop { + tokio::select! { + cmd = stream_loader_command_rx.recv() => { + if cmd.map_or(true, |cmd| fetch.handle_stream_loader_command(cmd) == ControlFlow::Break) { + break; + } + }, + data = file_data_rx.recv() => { + if data.map_or(true, |data| fetch.handle_file_data(data) == ControlFlow::Break) { + break; + } + }, + _ = download_finish_rx.recv() => { + fetch.number_of_open_requests -= 1; + + if fetch.get_download_strategy() == DownloadStrategy::Streaming() { + fetch.trigger_preload(); + } + } + } + } +}