From 0d51fd43dce3206945b8c7bfb98e6a6e19a633f9 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Sat, 18 Dec 2021 23:44:13 +0100 Subject: [PATCH] Remove unwraps from librespot-audio --- audio/src/fetch/mod.rs | 118 +++++++++++++++------- audio/src/fetch/receive.rs | 200 ++++++++++++++++++++++++------------- audio/src/lib.rs | 2 +- audio/src/range_set.rs | 8 +- core/src/cache.rs | 32 +++--- core/src/cdn_url.rs | 2 +- core/src/http_client.rs | 8 +- core/src/version.rs | 3 + playback/src/player.rs | 93 +++++++++++------ 9 files changed, 301 insertions(+), 165 deletions(-) diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index 50029840..09db431f 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -25,16 +25,28 @@ use self::receive::audio_file_fetch; use crate::range_set::{Range, RangeSet}; +pub type AudioFileResult = Result<(), AudioFileError>; + #[derive(Error, Debug)] pub enum AudioFileError { #[error("could not complete CDN request: {0}")] - Cdn(hyper::Error), + Cdn(#[from] hyper::Error), + #[error("channel was disconnected")] + Channel, #[error("empty response")] Empty, + #[error("I/O error: {0}")] + Io(#[from] io::Error), + #[error("output file unavailable")] + Output, #[error("error parsing response")] Parsing, + #[error("mutex was poisoned")] + Poisoned, #[error("could not complete API request: {0}")] SpClient(#[from] SpClientError), + #[error("streamer did not report progress")] + Timeout, #[error("could not get CDN URL: {0}")] Url(#[from] CdnUrlError), } @@ -42,7 +54,7 @@ pub enum AudioFileError { /// The minimum size of a block that is requested from the Spotify servers in one request. /// This is the block size that is typically requested while doing a `seek()` on a file. /// Note: smaller requests can happen if part of the block is downloaded already. -pub const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 256; +pub const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 128; /// The amount of data that is requested when initially opening a file. /// Note: if the file is opened to play from the beginning, the amount of data to @@ -142,23 +154,32 @@ impl StreamLoaderController { self.file_size == 0 } - pub fn range_available(&self, range: Range) -> bool { - if let Some(ref shared) = self.stream_shared { - let download_status = shared.download_status.lock().unwrap(); + pub fn range_available(&self, range: Range) -> Result { + let available = if let Some(ref shared) = self.stream_shared { + let download_status = shared + .download_status + .lock() + .map_err(|_| AudioFileError::Poisoned)?; + range.length <= download_status .downloaded .contained_length_from_value(range.start) } else { range.length <= self.len() - range.start - } + }; + + Ok(available) } - pub fn range_to_end_available(&self) -> bool { - self.stream_shared.as_ref().map_or(true, |shared| { - let read_position = shared.read_position.load(atomic::Ordering::Relaxed); - self.range_available(Range::new(read_position, self.len() - read_position)) - }) + pub fn range_to_end_available(&self) -> Result { + match self.stream_shared { + Some(ref shared) => { + let read_position = shared.read_position.load(atomic::Ordering::Relaxed); + self.range_available(Range::new(read_position, self.len() - read_position)) + } + None => Ok(true), + } } pub fn ping_time(&self) -> Duration { @@ -179,7 +200,7 @@ impl StreamLoaderController { self.send_stream_loader_command(StreamLoaderCommand::Fetch(range)); } - pub fn fetch_blocking(&self, mut range: Range) { + pub fn fetch_blocking(&self, mut range: Range) -> AudioFileResult { // signal the stream loader to tech a range of the file and block until it is loaded. // ensure the range is within the file's bounds. @@ -192,7 +213,11 @@ impl StreamLoaderController { self.fetch(range); if let Some(ref shared) = self.stream_shared { - let mut download_status = shared.download_status.lock().unwrap(); + let mut download_status = shared + .download_status + .lock() + .map_err(|_| AudioFileError::Poisoned)?; + while range.length > download_status .downloaded @@ -201,7 +226,7 @@ impl StreamLoaderController { download_status = shared .cond .wait_timeout(download_status, DOWNLOAD_TIMEOUT) - .unwrap() + .map_err(|_| AudioFileError::Timeout)? .0; if range.length > (download_status @@ -215,6 +240,8 @@ impl StreamLoaderController { } } } + + Ok(()) } pub fn fetch_next(&self, length: usize) { @@ -223,17 +250,20 @@ impl StreamLoaderController { start: shared.read_position.load(atomic::Ordering::Relaxed), length, }; - self.fetch(range) + self.fetch(range); } } - pub fn fetch_next_blocking(&self, length: usize) { - if let Some(ref shared) = self.stream_shared { - let range = Range { - start: shared.read_position.load(atomic::Ordering::Relaxed), - length, - }; - self.fetch_blocking(range); + pub fn fetch_next_blocking(&self, length: usize) -> AudioFileResult { + match self.stream_shared { + Some(ref shared) => { + let range = Range { + start: shared.read_position.load(atomic::Ordering::Relaxed), + length, + }; + self.fetch_blocking(range) + } + None => Ok(()), } } @@ -310,11 +340,9 @@ impl AudioFile { let session_ = session.clone(); session.spawn(complete_rx.map_ok(move |mut file| { if let Some(cache) = session_.cache() { - if cache.file_path(file_id).is_some() { - cache.save_file(file_id, &mut file); + if cache.save_file(file_id, &mut file) { debug!("File {} cached to {:?}", file_id, cache.file(file_id)); } - debug!("Downloading file {} complete", file_id); } })); @@ -322,8 +350,8 @@ impl AudioFile { Ok(AudioFile::Streaming(streaming.await?)) } - pub fn get_stream_loader_controller(&self) -> StreamLoaderController { - match self { + pub fn get_stream_loader_controller(&self) -> Result { + let controller = match self { AudioFile::Streaming(ref stream) => StreamLoaderController { channel_tx: Some(stream.stream_loader_command_tx.clone()), stream_shared: Some(stream.shared.clone()), @@ -332,9 +360,11 @@ impl AudioFile { AudioFile::Cached(ref file) => StreamLoaderController { channel_tx: None, stream_shared: None, - file_size: file.metadata().unwrap().len() as usize, + file_size: file.metadata()?.len() as usize, }, - } + }; + + Ok(controller) } pub fn is_cached(&self) -> bool { @@ -410,8 +440,8 @@ impl AudioFileStreaming { read_position: AtomicUsize::new(0), }); - let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone()).unwrap(); - let read_file = write_file.reopen().unwrap(); + let write_file = NamedTempFile::new_in(session.config().tmp_dir.clone())?; + let read_file = write_file.reopen()?; let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded_channel::(); @@ -444,7 +474,12 @@ impl Read for AudioFileStreaming { let length = min(output.len(), self.shared.file_size - offset); - let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) { + let length_to_request = match *(self + .shared + .download_strategy + .lock() + .map_err(|_| io::Error::new(io::ErrorKind::Other, "mutex was poisoned"))?) + { DownloadStrategy::RandomAccess() => length, DownloadStrategy::Streaming() => { // Due to the read-ahead stuff, we potentially request more than the actual request demanded. @@ -468,14 +503,18 @@ impl Read for AudioFileStreaming { let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length_to_request)); - let mut download_status = self.shared.download_status.lock().unwrap(); + let mut download_status = self + .shared + .download_status + .lock() + .map_err(|_| io::Error::new(io::ErrorKind::Other, "mutex was poisoned"))?; ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); for &range in ranges_to_request.iter() { self.stream_loader_command_tx .send(StreamLoaderCommand::Fetch(range)) - .unwrap(); + .map_err(|_| io::Error::new(io::ErrorKind::Other, "tx channel is disconnected"))?; } if length == 0 { @@ -484,7 +523,12 @@ impl Read for AudioFileStreaming { let mut download_message_printed = false; while !download_status.downloaded.contains(offset) { - if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() { + if let DownloadStrategy::Streaming() = *self + .shared + .download_strategy + .lock() + .map_err(|_| io::Error::new(io::ErrorKind::Other, "mutex was poisoned"))? + { if !download_message_printed { debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded)); download_message_printed = true; @@ -494,7 +538,7 @@ impl Read for AudioFileStreaming { .shared .cond .wait_timeout(download_status, DOWNLOAD_TIMEOUT) - .unwrap() + .map_err(|_| io::Error::new(io::ErrorKind::Other, "timeout acquiring mutex"))? .0; } let available_length = download_status @@ -503,7 +547,7 @@ impl Read for AudioFileStreaming { assert!(available_length > 0); drop(download_status); - self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap(); + self.position = self.read_file.seek(SeekFrom::Start(offset as u64))?; let read_len = min(length, available_length); let read_len = self.read_file.read(&mut output[..read_len])?; diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index 6157040f..4eef2b66 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -13,7 +13,10 @@ use librespot_core::session::Session; use crate::range_set::{Range, RangeSet}; -use super::{AudioFileShared, DownloadStrategy, StreamLoaderCommand, StreamingRequest}; +use super::{ + AudioFileError, AudioFileResult, AudioFileShared, DownloadStrategy, StreamLoaderCommand, + StreamingRequest, +}; use super::{ FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS, MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR, @@ -33,7 +36,7 @@ async fn receive_data( shared: Arc, file_data_tx: mpsc::UnboundedSender, mut request: StreamingRequest, -) { +) -> AudioFileResult { let requested_offset = request.offset; let requested_length = request.length; @@ -97,7 +100,10 @@ async fn receive_data( if request_length > 0 { let missing_range = Range::new(data_offset, request_length); - let mut download_status = shared.download_status.lock().unwrap(); + let mut download_status = shared + .download_status + .lock() + .map_err(|_| AudioFileError::Poisoned)?; download_status.requested.subtract_range(&missing_range); shared.cond.notify_all(); } @@ -106,16 +112,23 @@ async fn receive_data( .number_of_open_requests .fetch_sub(1, Ordering::SeqCst); - if let Err(e) = result { - error!( - "Error from streamer for range {} (+{}): {:?}", - requested_offset, requested_length, e - ); - } else if request_length > 0 { - warn!( - "Streamer for range {} (+{}) received less data from server than requested.", - requested_offset, requested_length - ); + match result { + Ok(()) => { + if request_length > 0 { + warn!( + "Streamer for range {} (+{}) received less data from server than requested.", + requested_offset, requested_length + ); + } + Ok(()) + } + Err(e) => { + error!( + "Error from streamer for range {} (+{}): {:?}", + requested_offset, requested_length, e + ); + Err(e.into()) + } } } @@ -137,24 +150,21 @@ enum ControlFlow { } impl AudioFileFetch { - fn get_download_strategy(&mut self) -> DownloadStrategy { - *(self.shared.download_strategy.lock().unwrap()) + fn get_download_strategy(&mut self) -> Result { + let strategy = self + .shared + .download_strategy + .lock() + .map_err(|_| AudioFileError::Poisoned)?; + + Ok(*(strategy)) } - fn download_range(&mut self, offset: usize, mut length: usize) { + fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult { if length < MINIMUM_DOWNLOAD_SIZE { length = MINIMUM_DOWNLOAD_SIZE; } - // ensure the values are within the bounds - if offset >= self.shared.file_size { - return; - } - - if length == 0 { - return; - } - if offset + length > self.shared.file_size { length = self.shared.file_size - offset; } @@ -162,7 +172,11 @@ impl AudioFileFetch { let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length)); - let mut download_status = self.shared.download_status.lock().unwrap(); + let mut download_status = self + .shared + .download_status + .lock() + .map_err(|_| AudioFileError::Poisoned)?; ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); @@ -205,9 +219,15 @@ impl AudioFileFetch { } } } + + Ok(()) } - fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) { + fn pre_fetch_more_data( + &mut self, + bytes: usize, + max_requests_to_send: usize, + ) -> AudioFileResult { let mut bytes_to_go = bytes; let mut requests_to_go = max_requests_to_send; @@ -216,7 +236,11 @@ impl AudioFileFetch { let mut missing_data = RangeSet::new(); missing_data.add_range(&Range::new(0, self.shared.file_size)); { - let download_status = self.shared.download_status.lock().unwrap(); + let download_status = self + .shared + .download_status + .lock() + .map_err(|_| AudioFileError::Poisoned)?; missing_data.subtract_range_set(&download_status.downloaded); missing_data.subtract_range_set(&download_status.requested); } @@ -234,7 +258,7 @@ impl AudioFileFetch { let range = tail_end.get_range(0); let offset = range.start; let length = min(range.length, bytes_to_go); - self.download_range(offset, length); + self.download_range(offset, length)?; requests_to_go -= 1; bytes_to_go -= length; } else if !missing_data.is_empty() { @@ -242,20 +266,20 @@ impl AudioFileFetch { let range = missing_data.get_range(0); let offset = range.start; let length = min(range.length, bytes_to_go); - self.download_range(offset, length); + self.download_range(offset, length)?; requests_to_go -= 1; bytes_to_go -= length; } else { - return; + break; } } + + Ok(()) } - fn handle_file_data(&mut self, data: ReceivedData) -> ControlFlow { + fn handle_file_data(&mut self, data: ReceivedData) -> Result { match data { ReceivedData::ResponseTime(response_time) => { - trace!("Ping time estimated as: {} ms", response_time.as_millis()); - // prune old response times. Keep at most two so we can push a third. while self.network_response_times.len() >= 3 { self.network_response_times.remove(0); @@ -276,24 +300,27 @@ impl AudioFileFetch { _ => unreachable!(), }; + trace!("Ping time estimated as: {} ms", ping_time.as_millis()); + // store our new estimate for everyone to see self.shared .ping_time_ms .store(ping_time.as_millis() as usize, Ordering::Relaxed); } ReceivedData::Data(data) => { - self.output - .as_mut() - .unwrap() - .seek(SeekFrom::Start(data.offset as u64)) - .unwrap(); - self.output - .as_mut() - .unwrap() - .write_all(data.data.as_ref()) - .unwrap(); + match self.output.as_mut() { + Some(output) => { + output.seek(SeekFrom::Start(data.offset as u64))?; + output.write_all(data.data.as_ref())?; + } + None => return Err(AudioFileError::Output), + } - let mut download_status = self.shared.download_status.lock().unwrap(); + let mut download_status = self + .shared + .download_status + .lock() + .map_err(|_| AudioFileError::Poisoned)?; let received_range = Range::new(data.offset, data.data.len()); download_status.downloaded.add_range(&received_range); @@ -305,36 +332,50 @@ impl AudioFileFetch { drop(download_status); if full { - self.finish(); - return ControlFlow::Break; + self.finish()?; + return Ok(ControlFlow::Break); } } } - ControlFlow::Continue + + Ok(ControlFlow::Continue) } - fn handle_stream_loader_command(&mut self, cmd: StreamLoaderCommand) -> ControlFlow { + fn handle_stream_loader_command( + &mut self, + cmd: StreamLoaderCommand, + ) -> Result { match cmd { StreamLoaderCommand::Fetch(request) => { - self.download_range(request.start, request.length); + self.download_range(request.start, request.length)?; } StreamLoaderCommand::RandomAccessMode() => { - *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); + *(self + .shared + .download_strategy + .lock() + .map_err(|_| AudioFileError::Poisoned)?) = DownloadStrategy::RandomAccess(); } StreamLoaderCommand::StreamMode() => { - *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); + *(self + .shared + .download_strategy + .lock() + .map_err(|_| AudioFileError::Poisoned)?) = DownloadStrategy::Streaming(); } - StreamLoaderCommand::Close() => return ControlFlow::Break, + StreamLoaderCommand::Close() => return Ok(ControlFlow::Break), } - ControlFlow::Continue + Ok(ControlFlow::Continue) } - fn finish(&mut self) { - let mut output = self.output.take().unwrap(); - let complete_tx = self.complete_tx.take().unwrap(); + fn finish(&mut self) -> AudioFileResult { + let mut output = self.output.take().ok_or(AudioFileError::Output)?; + let complete_tx = self.complete_tx.take().ok_or(AudioFileError::Output)?; - output.seek(SeekFrom::Start(0)).unwrap(); - let _ = complete_tx.send(output); + output.seek(SeekFrom::Start(0))?; + complete_tx + .send(output) + .map_err(|_| AudioFileError::Channel) } } @@ -345,7 +386,7 @@ pub(super) async fn audio_file_fetch( output: NamedTempFile, mut stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: oneshot::Sender, -) { +) -> AudioFileResult { let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel(); { @@ -353,7 +394,10 @@ pub(super) async fn audio_file_fetch( initial_request.offset, initial_request.offset + initial_request.length, ); - let mut download_status = shared.download_status.lock().unwrap(); + let mut download_status = shared + .download_status + .lock() + .map_err(|_| AudioFileError::Poisoned)?; download_status.requested.add_range(&requested_range); } @@ -376,25 +420,39 @@ pub(super) async fn audio_file_fetch( loop { tokio::select! { cmd = stream_loader_command_rx.recv() => { - if cmd.map_or(true, |cmd| fetch.handle_stream_loader_command(cmd) == ControlFlow::Break) { - break; + match cmd { + Some(cmd) => { + if fetch.handle_stream_loader_command(cmd)? == ControlFlow::Break { + break; + } + } + None => break, + } } - }, - data = file_data_rx.recv() => { - if data.map_or(true, |data| fetch.handle_file_data(data) == ControlFlow::Break) { - break; + data = file_data_rx.recv() => { + match data { + Some(data) => { + if fetch.handle_file_data(data)? == ControlFlow::Break { + break; + } + } + None => break, } } } - if fetch.get_download_strategy() == DownloadStrategy::Streaming() { + if fetch.get_download_strategy()? == DownloadStrategy::Streaming() { let number_of_open_requests = fetch.shared.number_of_open_requests.load(Ordering::SeqCst); if number_of_open_requests < MAX_PREFETCH_REQUESTS { let max_requests_to_send = MAX_PREFETCH_REQUESTS - number_of_open_requests; let bytes_pending: usize = { - let download_status = fetch.shared.download_status.lock().unwrap(); + let download_status = fetch + .shared + .download_status + .lock() + .map_err(|_| AudioFileError::Poisoned)?; download_status .requested .minus(&download_status.downloaded) @@ -418,9 +476,11 @@ pub(super) async fn audio_file_fetch( fetch.pre_fetch_more_data( desired_pending_bytes - bytes_pending, max_requests_to_send, - ); + )?; } } } } + + Ok(()) } diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 0c96b0d0..5685486d 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -7,7 +7,7 @@ mod fetch; mod range_set; pub use decrypt::AudioDecrypt; -pub use fetch::{AudioFile, StreamLoaderController}; +pub use fetch::{AudioFile, AudioFileError, StreamLoaderController}; pub use fetch::{ READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs index f74058a3..a37b03ae 100644 --- a/audio/src/range_set.rs +++ b/audio/src/range_set.rs @@ -10,7 +10,7 @@ pub struct Range { impl fmt::Display for Range { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - return write!(f, "[{}, {}]", self.start, self.start + self.length - 1); + write!(f, "[{}, {}]", self.start, self.start + self.length - 1) } } @@ -24,16 +24,16 @@ impl Range { } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct RangeSet { ranges: Vec, } impl fmt::Display for RangeSet { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "(").unwrap(); + write!(f, "(")?; for range in self.ranges.iter() { - write!(f, "{}", range).unwrap(); + write!(f, "{}", range)?; } write!(f, ")") } diff --git a/core/src/cache.rs b/core/src/cache.rs index af92ab78..aec00e84 100644 --- a/core/src/cache.rs +++ b/core/src/cache.rs @@ -350,7 +350,7 @@ impl Cache { } } - pub fn file_path(&self, file: FileId) -> Option { + fn file_path(&self, file: FileId) -> Option { self.audio_location.as_ref().map(|location| { let name = file.to_base16(); let mut path = location.join(&name[0..2]); @@ -377,24 +377,22 @@ impl Cache { } } - pub fn save_file(&self, file: FileId, contents: &mut F) { - let path = if let Some(path) = self.file_path(file) { - path - } else { - return; - }; - let parent = path.parent().unwrap(); - - let result = fs::create_dir_all(parent) - .and_then(|_| File::create(&path)) - .and_then(|mut file| io::copy(contents, &mut file)); - - if let Ok(size) = result { - if let Some(limiter) = self.size_limiter.as_deref() { - limiter.add(&path, size); - limiter.prune(); + pub fn save_file(&self, file: FileId, contents: &mut F) -> bool { + if let Some(path) = self.file_path(file) { + if let Some(parent) = path.parent() { + if let Ok(size) = fs::create_dir_all(parent) + .and_then(|_| File::create(&path)) + .and_then(|mut file| io::copy(contents, &mut file)) + { + if let Some(limiter) = self.size_limiter.as_deref() { + limiter.add(&path, size); + limiter.prune(); + } + return true; + } } } + false } pub fn remove_file(&self, file: FileId) -> Result<(), RemoveFileError> { diff --git a/core/src/cdn_url.rs b/core/src/cdn_url.rs index 6d87cac9..13f23a37 100644 --- a/core/src/cdn_url.rs +++ b/core/src/cdn_url.rs @@ -80,7 +80,7 @@ impl CdnUrl { return Err(CdnUrlError::Empty); } - // remove expired URLs until the first one is current, or none are left + // prune expired URLs until the first one is current, or none are left let now = Local::now(); while !self.urls.is_empty() { let maybe_expiring = self.urls[0].1; diff --git a/core/src/http_client.rs b/core/src/http_client.rs index ebd7aefd..52206c5c 100644 --- a/core/src/http_client.rs +++ b/core/src/http_client.rs @@ -14,7 +14,9 @@ use url::Url; use std::env::consts::OS; -use crate::version::{SPOTIFY_MOBILE_VERSION, SPOTIFY_VERSION, VERSION_STRING}; +use crate::version::{ + FALLBACK_USER_AGENT, SPOTIFY_MOBILE_VERSION, SPOTIFY_VERSION, VERSION_STRING, +}; pub struct HttpClient { user_agent: HeaderValue, @@ -64,8 +66,8 @@ impl HttpClient { let user_agent = HeaderValue::from_str(user_agent_str).unwrap_or_else(|err| { error!("Invalid user agent <{}>: {}", user_agent_str, err); - error!("Parts of the API will probably not work. Please report this as a bug."); - HeaderValue::from_static("") + error!("Please report this as a bug."); + HeaderValue::from_static(FALLBACK_USER_AGENT) }); // configuring TLS is expensive and should be done once per process diff --git a/core/src/version.rs b/core/src/version.rs index a7e3acd9..98047ef1 100644 --- a/core/src/version.rs +++ b/core/src/version.rs @@ -21,3 +21,6 @@ pub const SPOTIFY_VERSION: u64 = 117300517; /// The protocol version of the Spotify mobile app. pub const SPOTIFY_MOBILE_VERSION: &str = "8.6.84"; + +/// The user agent to fall back to, if one could not be determined dynamically. +pub const FALLBACK_USER_AGENT: &str = "Spotify/117300517 Linux/0 (librespot)"; diff --git a/playback/src/player.rs b/playback/src/player.rs index ed4fc055..f0c4acda 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -10,9 +10,10 @@ use std::{mem, thread}; use byteorder::{LittleEndian, ReadBytesExt}; use futures_util::stream::futures_unordered::FuturesUnordered; use futures_util::{future, StreamExt, TryFutureExt}; +use thiserror::Error; use tokio::sync::{mpsc, oneshot}; -use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController}; +use crate::audio::{AudioDecrypt, AudioFile, AudioFileError, StreamLoaderController}; use crate::audio::{ READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, @@ -32,6 +33,14 @@ use crate::{MS_PER_PAGE, NUM_CHANNELS, PAGES_PER_MS, SAMPLES_PER_SECOND}; const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; pub const DB_VOLTAGE_RATIO: f64 = 20.0; +pub type PlayerResult = Result<(), PlayerError>; + +#[derive(Debug, Error)] +pub enum PlayerError { + #[error("audio file error: {0}")] + AudioFile(#[from] AudioFileError), +} + pub struct Player { commands: Option>, thread_handle: Option>, @@ -216,6 +225,17 @@ pub struct NormalisationData { album_peak: f32, } +impl Default for NormalisationData { + fn default() -> Self { + Self { + track_gain_db: 0.0, + track_peak: 1.0, + album_gain_db: 0.0, + album_peak: 1.0, + } + } +} + impl NormalisationData { fn parse_from_file(mut file: T) -> io::Result { const SPOTIFY_NORMALIZATION_HEADER_START_OFFSET: u64 = 144; @@ -698,19 +718,20 @@ impl PlayerTrackLoader { } fn stream_data_rate(&self, format: AudioFileFormat) -> usize { - match format { - AudioFileFormat::OGG_VORBIS_96 => 12 * 1024, - AudioFileFormat::OGG_VORBIS_160 => 20 * 1024, - AudioFileFormat::OGG_VORBIS_320 => 40 * 1024, - AudioFileFormat::MP3_256 => 32 * 1024, - AudioFileFormat::MP3_320 => 40 * 1024, - AudioFileFormat::MP3_160 => 20 * 1024, - AudioFileFormat::MP3_96 => 12 * 1024, - AudioFileFormat::MP3_160_ENC => 20 * 1024, - AudioFileFormat::AAC_24 => 3 * 1024, - AudioFileFormat::AAC_48 => 6 * 1024, - AudioFileFormat::FLAC_FLAC => 112 * 1024, // assume 900 kbps on average - } + let kbps = match format { + AudioFileFormat::OGG_VORBIS_96 => 12, + AudioFileFormat::OGG_VORBIS_160 => 20, + AudioFileFormat::OGG_VORBIS_320 => 40, + AudioFileFormat::MP3_256 => 32, + AudioFileFormat::MP3_320 => 40, + AudioFileFormat::MP3_160 => 20, + AudioFileFormat::MP3_96 => 12, + AudioFileFormat::MP3_160_ENC => 20, + AudioFileFormat::AAC_24 => 3, + AudioFileFormat::AAC_48 => 6, + AudioFileFormat::FLAC_FLAC => 112, // assume 900 kbit/s on average + }; + kbps * 1024 } async fn load_track( @@ -805,9 +826,10 @@ impl PlayerTrackLoader { return None; } }; + let is_cached = encrypted_file.is_cached(); - let stream_loader_controller = encrypted_file.get_stream_loader_controller(); + let stream_loader_controller = encrypted_file.get_stream_loader_controller().ok()?; if play_from_beginning { // No need to seek -> we stream from the beginning @@ -830,13 +852,8 @@ impl PlayerTrackLoader { let normalisation_data = match NormalisationData::parse_from_file(&mut decrypted_file) { Ok(data) => data, Err(_) => { - warn!("Unable to extract normalisation data, using default value."); - NormalisationData { - track_gain_db: 0.0, - track_peak: 1.0, - album_gain_db: 0.0, - album_peak: 1.0, - } + warn!("Unable to extract normalisation data, using default values."); + NormalisationData::default() } }; @@ -929,7 +946,9 @@ impl Future for PlayerInternal { }; if let Some(cmd) = cmd { - self.handle_command(cmd); + if let Err(e) = self.handle_command(cmd) { + error!("Error handling command: {}", e); + } } // Handle loading of a new track to play @@ -1109,7 +1128,9 @@ impl Future for PlayerInternal { if (!*suggested_to_preload_next_track) && ((duration_ms as i64 - Self::position_pcm_to_ms(stream_position_pcm) as i64) < PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS as i64) - && stream_loader_controller.range_to_end_available() + && stream_loader_controller + .range_to_end_available() + .unwrap_or(false) { *suggested_to_preload_next_track = true; self.send_event(PlayerEvent::TimeToPreloadNextTrack { @@ -1785,7 +1806,7 @@ impl PlayerInternal { } } - fn handle_command_seek(&mut self, position_ms: u32) { + fn handle_command_seek(&mut self, position_ms: u32) -> PlayerResult { if let Some(stream_loader_controller) = self.state.stream_loader_controller() { stream_loader_controller.set_random_access_mode(); } @@ -1818,7 +1839,7 @@ impl PlayerInternal { } // ensure we have a bit of a buffer of downloaded data - self.preload_data_before_playback(); + self.preload_data_before_playback()?; if let PlayerState::Playing { track_id, @@ -1851,11 +1872,13 @@ impl PlayerInternal { duration_ms, }); } + + Ok(()) } - fn handle_command(&mut self, cmd: PlayerCommand) { + fn handle_command(&mut self, cmd: PlayerCommand) -> PlayerResult { debug!("command={:?}", cmd); - match cmd { + let result = match cmd { PlayerCommand::Load { track_id, play_request_id, @@ -1865,7 +1888,7 @@ impl PlayerInternal { PlayerCommand::Preload { track_id } => self.handle_command_preload(track_id), - PlayerCommand::Seek(position_ms) => self.handle_command_seek(position_ms), + PlayerCommand::Seek(position_ms) => self.handle_command_seek(position_ms)?, PlayerCommand::Play => self.handle_play(), @@ -1884,7 +1907,9 @@ impl PlayerInternal { PlayerCommand::SetAutoNormaliseAsAlbum(setting) => { self.auto_normalise_as_album = setting } - } + }; + + Ok(result) } fn send_event(&mut self, event: PlayerEvent) { @@ -1928,7 +1953,7 @@ impl PlayerInternal { result_rx.map_err(|_| ()) } - fn preload_data_before_playback(&mut self) { + fn preload_data_before_playback(&mut self) -> Result<(), PlayerError> { if let PlayerState::Playing { bytes_per_second, ref mut stream_loader_controller, @@ -1951,7 +1976,11 @@ impl PlayerInternal { * bytes_per_second as f32) as usize, (READ_AHEAD_BEFORE_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize, ); - stream_loader_controller.fetch_next_blocking(wait_for_data_length); + stream_loader_controller + .fetch_next_blocking(wait_for_data_length) + .map_err(|e| e.into()) + } else { + Ok(()) } } }