diff --git a/Cargo.lock b/Cargo.lock index 5aa66853..3e28c806 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -447,9 +447,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e" +checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca" dependencies = [ "futures-channel", "futures-core", @@ -462,9 +462,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27" +checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888" dependencies = [ "futures-core", "futures-sink", @@ -472,15 +472,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445" +checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" [[package]] name = "futures-executor" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97" +checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c" dependencies = [ "futures-core", "futures-task", @@ -489,16 +489,18 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11" +checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" [[package]] name = "futures-macro" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a89f17b21645bc4ed773c69af9c9a0effd4a3f1a3876eadd453469f8854e7fdd" +checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb" dependencies = [ + "autocfg", + "proc-macro-hack", "proc-macro2", "quote", "syn", @@ -506,22 +508,23 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "996c6442437b62d21a32cd9906f9c41e7dc1e19a9579843fad948696769305af" +checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11" [[package]] name = "futures-task" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12" +checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" [[package]] name = "futures-util" -version = "0.3.18" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e" +checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481" dependencies = [ + "autocfg", "futures-channel", "futures-core", "futures-io", @@ -531,6 +534,8 @@ dependencies = [ "memchr", "pin-project-lite", "pin-utils", + "proc-macro-hack", + "proc-macro-nested", "slab", ] @@ -726,9 +731,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.7" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fd819562fcebdac5afc5c113c3ec36f902840b70fd4fc458799c8ce4607ae55" +checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd" dependencies = [ "bytes", "fnv", @@ -861,9 +866,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.15" +version = "0.14.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "436ec0091e4f20e655156a30a0df3770fe2900aa301e548e08446ec794b6953c" +checksum = "b7ec3e62bdc98a2f0393a5048e4c30ef659440ea6e0e572965103e72bd836f55" dependencies = [ "bytes", "futures-channel", @@ -1215,10 +1220,14 @@ dependencies = [ "aes-ctr", "byteorder", "bytes", + "futures-core", + "futures-executor", "futures-util", + "hyper", "librespot-core", "log", "tempfile", + "thiserror", "tokio", ] @@ -1249,6 +1258,7 @@ dependencies = [ "base64", "byteorder", "bytes", + "chrono", "env_logger", "form_urlencoded", "futures-core", @@ -1272,6 +1282,8 @@ dependencies = [ "protobuf", "quick-xml", "rand", + "rustls", + "rustls-native-certs", "serde", "serde_json", "sha-1", @@ -1917,6 +1929,18 @@ dependencies = [ "version_check", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + [[package]] name = "proc-macro2" version = "1.0.33" diff --git a/audio/Cargo.toml b/audio/Cargo.toml index 77855e62..d5a7a074 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -14,7 +14,11 @@ version = "0.3.1" aes-ctr = "0.6" byteorder = "1.4" bytes = "1.0" -log = "0.4" +futures-core = { version = "0.3", default-features = false } +futures-executor = "0.3" futures-util = { version = "0.3", default_features = false } +hyper = { version = "0.14", features = ["client"] } +log = "0.4" tempfile = "3.1" +thiserror = "1.0" tokio = { version = "1", features = ["sync", "macros"] } diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index b68f6858..97037d6e 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -7,36 +7,55 @@ use std::sync::atomic::{self, AtomicUsize}; use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; -use byteorder::{BigEndian, ByteOrder}; -use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt}; +use futures_util::future::IntoStream; +use futures_util::{StreamExt, TryFutureExt}; +use hyper::client::ResponseFuture; +use hyper::header::CONTENT_RANGE; +use hyper::Body; use tempfile::NamedTempFile; +use thiserror::Error; use tokio::sync::{mpsc, oneshot}; -use librespot_core::channel::{ChannelData, ChannelError, ChannelHeaders}; +use librespot_core::cdn_url::{CdnUrl, CdnUrlError}; use librespot_core::file_id::FileId; use librespot_core::session::Session; +use librespot_core::spclient::SpClientError; -use self::receive::{audio_file_fetch, request_range}; +use self::receive::audio_file_fetch; use crate::range_set::{Range, RangeSet}; +#[derive(Error, Debug)] +pub enum AudioFileError { + #[error("could not complete CDN request: {0}")] + Cdn(hyper::Error), + #[error("empty response")] + Empty, + #[error("error parsing response")] + Parsing, + #[error("could not complete API request: {0}")] + SpClient(#[from] SpClientError), + #[error("could not get CDN URL: {0}")] + Url(#[from] CdnUrlError), +} + /// The minimum size of a block that is requested from the Spotify servers in one request. /// This is the block size that is typically requested while doing a `seek()` on a file. /// Note: smaller requests can happen if part of the block is downloaded already. -const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; +pub const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 256; /// The amount of data that is requested when initially opening a file. /// Note: if the file is opened to play from the beginning, the amount of data to /// read ahead is requested in addition to this amount. If the file is opened to seek to /// another position, then only this amount is requested on the first request. -const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; +pub const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 128; /// The ping time that is used for calculations before a ping time was actually measured. -const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500); +pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500); /// If the measured ping time to the Spotify server is larger than this value, it is capped /// to avoid run-away block sizes and pre-fetching. -const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500); +pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500); /// Before playback starts, this many seconds of data must be present. /// Note: the calculations are done using the nominal bitrate of the file. The actual amount @@ -65,7 +84,7 @@ pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f32 = 10.0; /// If the amount of data that is pending (requested but not received) is less than a certain amount, /// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more /// data is calculated as ` < PREFETCH_THRESHOLD_FACTOR * * ` -const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0; +pub const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0; /// Similar to `PREFETCH_THRESHOLD_FACTOR`, but it also takes the current download rate into account. /// The formula used is ` < FAST_PREFETCH_THRESHOLD_FACTOR * * ` @@ -74,16 +93,16 @@ const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0; /// the download rate ramps up. However, this comes at the cost that it might hurt ping time if a seek is /// performed while downloading. Values smaller than `1.0` cause the download rate to collapse and effectively /// only `PREFETCH_THRESHOLD_FACTOR` is in effect. Thus, set to `0.0` if bandwidth saturation is not wanted. -const FAST_PREFETCH_THRESHOLD_FACTOR: f32 = 1.5; +pub const FAST_PREFETCH_THRESHOLD_FACTOR: f32 = 1.5; /// Limit the number of requests that are pending simultaneously before pre-fetching data. Pending -/// requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next +/// requests share bandwidth. Thus, having too many requests can lead to the one that is needed next /// for playback to be delayed leading to a buffer underrun. This limit has the effect that a new /// pre-fetch request is only sent if less than `MAX_PREFETCH_REQUESTS` are pending. -const MAX_PREFETCH_REQUESTS: usize = 4; +pub const MAX_PREFETCH_REQUESTS: usize = 4; /// The time we will wait to obtain status updates on downloading. -const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(1); +pub const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(1); pub enum AudioFile { Cached(fs::File), @@ -91,7 +110,16 @@ pub enum AudioFile { } #[derive(Debug)] -enum StreamLoaderCommand { +pub struct StreamingRequest { + streamer: IntoStream, + initial_body: Option, + offset: usize, + length: usize, + request_time: Instant, +} + +#[derive(Debug)] +pub enum StreamLoaderCommand { Fetch(Range), // signal the stream loader to fetch a range of the file RandomAccessMode(), // optimise download strategy for random access StreamMode(), // optimise download strategy for streaming @@ -244,9 +272,9 @@ enum DownloadStrategy { } struct AudioFileShared { - file_id: FileId, + cdn_url: CdnUrl, file_size: usize, - stream_data_rate: usize, + bytes_per_second: usize, cond: Condvar, download_status: Mutex, download_strategy: Mutex, @@ -255,19 +283,13 @@ struct AudioFileShared { read_position: AtomicUsize, } -pub struct InitialData { - rx: ChannelData, - length: usize, - request_sent_time: Instant, -} - impl AudioFile { pub async fn open( session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool, - ) -> Result { + ) -> Result { if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) { debug!("File {} already in cache", file_id); return Ok(AudioFile::Cached(file)); @@ -276,35 +298,13 @@ impl AudioFile { debug!("Downloading file {}", file_id); let (complete_tx, complete_rx) = oneshot::channel(); - let mut length = if play_from_beginning { - INITIAL_DOWNLOAD_SIZE - + max( - (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize, - (INITIAL_PING_TIME_ESTIMATE.as_secs_f32() - * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS - * bytes_per_second as f32) as usize, - ) - } else { - INITIAL_DOWNLOAD_SIZE - }; - if length % 4 != 0 { - length += 4 - (length % 4); - } - let (headers, rx) = request_range(session, file_id, 0, length).split(); - - let initial_data = InitialData { - rx, - length, - request_sent_time: Instant::now(), - }; let streaming = AudioFileStreaming::open( session.clone(), - initial_data, - headers, file_id, complete_tx, bytes_per_second, + play_from_beginning, ); let session_ = session.clone(); @@ -343,24 +343,58 @@ impl AudioFile { impl AudioFileStreaming { pub async fn open( session: Session, - initial_data: InitialData, - headers: ChannelHeaders, file_id: FileId, complete_tx: oneshot::Sender, - streaming_data_rate: usize, - ) -> Result { - let (_, data) = headers - .try_filter(|(id, _)| future::ready(*id == 0x3)) - .next() - .await - .unwrap()?; + bytes_per_second: usize, + play_from_beginning: bool, + ) -> Result { + let download_size = if play_from_beginning { + INITIAL_DOWNLOAD_SIZE + + max( + (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize, + (INITIAL_PING_TIME_ESTIMATE.as_secs_f32() + * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * bytes_per_second as f32) as usize, + ) + } else { + INITIAL_DOWNLOAD_SIZE + }; - let size = BigEndian::read_u32(&data) as usize * 4; + let mut cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?; + let url = cdn_url.get_url()?; + + let mut streamer = session.spclient().stream_file(url, 0, download_size)?; + let request_time = Instant::now(); + + // Get the first chunk with the headers to get the file size. + // The remainder of that chunk with possibly also a response body is then + // further processed in `audio_file_fetch`. + let response = match streamer.next().await { + Some(Ok(data)) => data, + Some(Err(e)) => return Err(AudioFileError::Cdn(e)), + None => return Err(AudioFileError::Empty), + }; + let header_value = response + .headers() + .get(CONTENT_RANGE) + .ok_or(AudioFileError::Parsing)?; + + let str_value = header_value.to_str().map_err(|_| AudioFileError::Parsing)?; + let file_size_str = str_value.split('/').last().ok_or(AudioFileError::Parsing)?; + let file_size = file_size_str.parse().map_err(|_| AudioFileError::Parsing)?; + + let initial_request = StreamingRequest { + streamer, + initial_body: Some(response.into_body()), + offset: 0, + length: download_size, + request_time, + }; let shared = Arc::new(AudioFileShared { - file_id, - file_size: size, - stream_data_rate: streaming_data_rate, + cdn_url, + file_size, + bytes_per_second, cond: Condvar::new(), download_status: Mutex::new(AudioFileDownloadStatus { requested: RangeSet::new(), @@ -372,20 +406,17 @@ impl AudioFileStreaming { read_position: AtomicUsize::new(0), }); - let mut write_file = NamedTempFile::new().unwrap(); - write_file.as_file().set_len(size as u64).unwrap(); - write_file.seek(SeekFrom::Start(0)).unwrap(); - + // TODO : use new_in() to store securely in librespot directory + let write_file = NamedTempFile::new().unwrap(); let read_file = write_file.reopen().unwrap(); - // let (seek_tx, seek_rx) = mpsc::unbounded(); let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded_channel::(); session.spawn(audio_file_fetch( session.clone(), shared.clone(), - initial_data, + initial_request, write_file, stream_loader_command_rx, complete_tx, @@ -422,10 +453,10 @@ impl Read for AudioFileStreaming { let length_to_request = length + max( (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() - * self.shared.stream_data_rate as f32) as usize, + * self.shared.bytes_per_second as f32) as usize, (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS * ping_time_seconds - * self.shared.stream_data_rate as f32) as usize, + * self.shared.bytes_per_second as f32) as usize, ); min(length_to_request, self.shared.file_size - offset) } diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index 7b797b02..6157040f 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -4,56 +4,21 @@ use std::sync::{atomic, Arc}; use std::time::{Duration, Instant}; use atomic::Ordering; -use byteorder::{BigEndian, WriteBytesExt}; use bytes::Bytes; use futures_util::StreamExt; use tempfile::NamedTempFile; use tokio::sync::{mpsc, oneshot}; -use librespot_core::channel::{Channel, ChannelData}; -use librespot_core::file_id::FileId; -use librespot_core::packet::PacketType; use librespot_core::session::Session; use crate::range_set::{Range, RangeSet}; -use super::{AudioFileShared, DownloadStrategy, InitialData, StreamLoaderCommand}; +use super::{AudioFileShared, DownloadStrategy, StreamLoaderCommand, StreamingRequest}; use super::{ FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS, MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR, }; -pub fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { - assert!( - offset % 4 == 0, - "Range request start positions must be aligned by 4 bytes." - ); - assert!( - length % 4 == 0, - "Range request range lengths must be aligned by 4 bytes." - ); - let start = offset / 4; - let end = (offset + length) / 4; - - let (id, channel) = session.channel().allocate(); - - let mut data: Vec = Vec::new(); - data.write_u16::(id).unwrap(); - data.write_u8(0).unwrap(); - data.write_u8(1).unwrap(); - data.write_u16::(0x0000).unwrap(); - data.write_u32::(0x00000000).unwrap(); - data.write_u32::(0x00009C40).unwrap(); - data.write_u32::(0x00020000).unwrap(); - data.write_all(&file.0).unwrap(); - data.write_u32::(start as u32).unwrap(); - data.write_u32::(end as u32).unwrap(); - - session.send_packet(PacketType::StreamChunk, data); - - channel -} - struct PartialFileData { offset: usize, data: Bytes, @@ -67,13 +32,13 @@ enum ReceivedData { async fn receive_data( shared: Arc, file_data_tx: mpsc::UnboundedSender, - mut data_rx: ChannelData, - initial_data_offset: usize, - initial_request_length: usize, - request_sent_time: Instant, + mut request: StreamingRequest, ) { - let mut data_offset = initial_data_offset; - let mut request_length = initial_request_length; + let requested_offset = request.offset; + let requested_length = request.length; + + let mut data_offset = requested_offset; + let mut request_length = requested_length; let old_number_of_request = shared .number_of_open_requests @@ -82,21 +47,31 @@ async fn receive_data( let mut measure_ping_time = old_number_of_request == 0; let result = loop { - let data = match data_rx.next().await { - Some(Ok(data)) => data, - Some(Err(e)) => break Err(e), - None => break Ok(()), + let body = match request.initial_body.take() { + Some(data) => data, + None => match request.streamer.next().await { + Some(Ok(response)) => response.into_body(), + Some(Err(e)) => break Err(e), + None => break Ok(()), + }, + }; + + let data = match hyper::body::to_bytes(body).await { + Ok(bytes) => bytes, + Err(e) => break Err(e), }; if measure_ping_time { - let mut duration = Instant::now() - request_sent_time; + let mut duration = Instant::now() - request.request_time; if duration > MAXIMUM_ASSUMED_PING_TIME { duration = MAXIMUM_ASSUMED_PING_TIME; } let _ = file_data_tx.send(ReceivedData::ResponseTime(duration)); measure_ping_time = false; } + let data_size = data.len(); + let _ = file_data_tx.send(ReceivedData::Data(PartialFileData { offset: data_offset, data, @@ -104,8 +79,8 @@ async fn receive_data( data_offset += data_size; if request_length < data_size { warn!( - "Data receiver for range {} (+{}) received more data from server than requested.", - initial_data_offset, initial_request_length + "Data receiver for range {} (+{}) received more data from server than requested ({} instead of {}).", + requested_offset, requested_length, data_size, request_length ); request_length = 0; } else { @@ -117,6 +92,8 @@ async fn receive_data( } }; + drop(request.streamer); + if request_length > 0 { let missing_range = Range::new(data_offset, request_length); @@ -129,15 +106,15 @@ async fn receive_data( .number_of_open_requests .fetch_sub(1, Ordering::SeqCst); - if result.is_err() { - warn!( - "Error from channel for data receiver for range {} (+{}).", - initial_data_offset, initial_request_length + if let Err(e) = result { + error!( + "Error from streamer for range {} (+{}): {:?}", + requested_offset, requested_length, e ); } else if request_length > 0 { warn!( - "Data receiver for range {} (+{}) received less data from server than requested.", - initial_data_offset, initial_request_length + "Streamer for range {} (+{}) received less data from server than requested.", + requested_offset, requested_length ); } } @@ -164,12 +141,12 @@ impl AudioFileFetch { *(self.shared.download_strategy.lock().unwrap()) } - fn download_range(&mut self, mut offset: usize, mut length: usize) { + fn download_range(&mut self, offset: usize, mut length: usize) { if length < MINIMUM_DOWNLOAD_SIZE { length = MINIMUM_DOWNLOAD_SIZE; } - // ensure the values are within the bounds and align them by 4 for the spotify protocol. + // ensure the values are within the bounds if offset >= self.shared.file_size { return; } @@ -182,15 +159,6 @@ impl AudioFileFetch { length = self.shared.file_size - offset; } - if offset % 4 != 0 { - length += offset % 4; - offset -= offset % 4; - } - - if length % 4 != 0 { - length += 4 - (length % 4); - } - let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length)); @@ -199,25 +167,43 @@ impl AudioFileFetch { ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); + let cdn_url = &self.shared.cdn_url; + let file_id = cdn_url.file_id; + for range in ranges_to_request.iter() { - let (_headers, data) = request_range( - &self.session, - self.shared.file_id, - range.start, - range.length, - ) - .split(); + match cdn_url.urls.first() { + Some(url) => { + match self + .session + .spclient() + .stream_file(&url.0, range.start, range.length) + { + Ok(streamer) => { + download_status.requested.add_range(range); - download_status.requested.add_range(range); + let streaming_request = StreamingRequest { + streamer, + initial_body: None, + offset: range.start, + length: range.length, + request_time: Instant::now(), + }; - self.session.spawn(receive_data( - self.shared.clone(), - self.file_data_tx.clone(), - data, - range.start, - range.length, - Instant::now(), - )); + self.session.spawn(receive_data( + self.shared.clone(), + self.file_data_tx.clone(), + streaming_request, + )); + } + Err(e) => { + error!("Unable to open stream for track <{}>: {:?}", file_id, e); + } + } + } + None => { + error!("Unable to get CDN URL for track <{}>", file_id); + } + } } } @@ -268,8 +254,7 @@ impl AudioFileFetch { fn handle_file_data(&mut self, data: ReceivedData) -> ControlFlow { match data { ReceivedData::ResponseTime(response_time) => { - // chatty - // trace!("Ping time estimated as: {}ms", response_time.as_millis()); + trace!("Ping time estimated as: {} ms", response_time.as_millis()); // prune old response times. Keep at most two so we can push a third. while self.network_response_times.len() >= 3 { @@ -356,7 +341,7 @@ impl AudioFileFetch { pub(super) async fn audio_file_fetch( session: Session, shared: Arc, - initial_data: InitialData, + initial_request: StreamingRequest, output: NamedTempFile, mut stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: oneshot::Sender, @@ -364,7 +349,10 @@ pub(super) async fn audio_file_fetch( let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel(); { - let requested_range = Range::new(0, initial_data.length); + let requested_range = Range::new( + initial_request.offset, + initial_request.offset + initial_request.length, + ); let mut download_status = shared.download_status.lock().unwrap(); download_status.requested.add_range(&requested_range); } @@ -372,14 +360,11 @@ pub(super) async fn audio_file_fetch( session.spawn(receive_data( shared.clone(), file_data_tx.clone(), - initial_data.rx, - 0, - initial_data.length, - initial_data.request_sent_time, + initial_request, )); let mut fetch = AudioFileFetch { - session, + session: session.clone(), shared, output: Some(output), @@ -424,7 +409,7 @@ pub(super) async fn audio_file_fetch( let desired_pending_bytes = max( (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds - * fetch.shared.stream_data_rate as f32) as usize, + * fetch.shared.bytes_per_second as f32) as usize, (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f32) as usize, ); diff --git a/core/Cargo.toml b/core/Cargo.toml index 54fc1de7..876a0038 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -17,6 +17,7 @@ aes = "0.6" base64 = "0.13" byteorder = "1.4" bytes = "1" +chrono = "0.4" form_urlencoded = "1.0" futures-core = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] } @@ -38,11 +39,13 @@ priority-queue = "1.1" protobuf = "2.14.0" quick-xml = { version = "0.22", features = [ "serialize" ] } rand = "0.8" +rustls = "0.19" +rustls-native-certs = "0.5" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha-1 = "0.9" shannon = "0.2.0" -thiserror = "1.0.7" +thiserror = "1.0" tokio = { version = "1.5", features = ["io-util", "macros", "net", "rt", "time", "sync"] } tokio-stream = "0.1.1" tokio-tungstenite = { version = "0.14", default-features = false, features = ["rustls-tls"] } diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index d39c3101..e78a272c 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -1,4 +1,4 @@ -use hyper::{Body, Request}; +use hyper::{Body, Method, Request}; use serde::Deserialize; use std::error::Error; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -69,7 +69,7 @@ impl ApResolver { pub async fn try_apresolve(&self) -> Result> { let req = Request::builder() - .method("GET") + .method(Method::GET) .uri("http://apresolve.spotify.com/?type=accesspoint&type=dealer&type=spclient") .body(Body::empty())?; diff --git a/core/src/cdn_url.rs b/core/src/cdn_url.rs new file mode 100644 index 00000000..6d87cac9 --- /dev/null +++ b/core/src/cdn_url.rs @@ -0,0 +1,151 @@ +use chrono::Local; +use protobuf::{Message, ProtobufError}; +use thiserror::Error; +use url::Url; + +use std::convert::{TryFrom, TryInto}; +use std::ops::{Deref, DerefMut}; + +use super::date::Date; +use super::file_id::FileId; +use super::session::Session; +use super::spclient::SpClientError; + +use librespot_protocol as protocol; +use protocol::storage_resolve::StorageResolveResponse as CdnUrlMessage; +use protocol::storage_resolve::StorageResolveResponse_Result; + +#[derive(Error, Debug)] +pub enum CdnUrlError { + #[error("no URLs available")] + Empty, + #[error("all tokens expired")] + Expired, + #[error("error parsing response")] + Parsing, + #[error("could not parse protobuf: {0}")] + Protobuf(#[from] ProtobufError), + #[error("could not complete API request: {0}")] + SpClient(#[from] SpClientError), +} + +#[derive(Debug, Clone)] +pub struct MaybeExpiringUrl(pub String, pub Option); + +#[derive(Debug, Clone)] +pub struct MaybeExpiringUrls(pub Vec); + +impl Deref for MaybeExpiringUrls { + type Target = Vec; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for MaybeExpiringUrls { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[derive(Debug, Clone)] +pub struct CdnUrl { + pub file_id: FileId, + pub urls: MaybeExpiringUrls, +} + +impl CdnUrl { + pub fn new(file_id: FileId) -> Self { + Self { + file_id, + urls: MaybeExpiringUrls(Vec::new()), + } + } + + pub async fn resolve_audio(&self, session: &Session) -> Result { + let file_id = self.file_id; + let response = session.spclient().get_audio_urls(file_id).await?; + let msg = CdnUrlMessage::parse_from_bytes(&response)?; + let urls = MaybeExpiringUrls::try_from(msg)?; + + let cdn_url = Self { file_id, urls }; + + trace!("Resolved CDN storage: {:#?}", cdn_url); + + Ok(cdn_url) + } + + pub fn get_url(&mut self) -> Result<&str, CdnUrlError> { + if self.urls.is_empty() { + return Err(CdnUrlError::Empty); + } + + // remove expired URLs until the first one is current, or none are left + let now = Local::now(); + while !self.urls.is_empty() { + let maybe_expiring = self.urls[0].1; + if let Some(expiry) = maybe_expiring { + if now < expiry.as_utc() { + break; + } else { + self.urls.remove(0); + } + } + } + + if let Some(cdn_url) = self.urls.first() { + Ok(&cdn_url.0) + } else { + Err(CdnUrlError::Expired) + } + } +} + +impl TryFrom for MaybeExpiringUrls { + type Error = CdnUrlError; + fn try_from(msg: CdnUrlMessage) -> Result { + if !matches!(msg.get_result(), StorageResolveResponse_Result::CDN) { + return Err(CdnUrlError::Parsing); + } + + let is_expiring = !msg.get_fileid().is_empty(); + + let result = msg + .get_cdnurl() + .iter() + .map(|cdn_url| { + let url = Url::parse(cdn_url).map_err(|_| CdnUrlError::Parsing)?; + + if is_expiring { + let expiry_str = if let Some(token) = url + .query_pairs() + .into_iter() + .find(|(key, _value)| key == "__token__") + { + let start = token.1.find("exp=").ok_or(CdnUrlError::Parsing)?; + let slice = &token.1[start + 4..]; + let end = slice.find('~').ok_or(CdnUrlError::Parsing)?; + String::from(&slice[..end]) + } else if let Some(query) = url.query() { + let mut items = query.split('_'); + String::from(items.next().ok_or(CdnUrlError::Parsing)?) + } else { + return Err(CdnUrlError::Parsing); + }; + + let mut expiry: i64 = expiry_str.parse().map_err(|_| CdnUrlError::Parsing)?; + expiry -= 5 * 60; // seconds + + Ok(MaybeExpiringUrl( + cdn_url.to_owned(), + Some(expiry.try_into().map_err(|_| CdnUrlError::Parsing)?), + )) + } else { + Ok(MaybeExpiringUrl(cdn_url.to_owned(), None)) + } + }) + .collect::, CdnUrlError>>()?; + + Ok(Self(result)) + } +} diff --git a/metadata/src/date.rs b/core/src/date.rs similarity index 85% rename from metadata/src/date.rs rename to core/src/date.rs index c402c05f..a84da606 100644 --- a/metadata/src/date.rs +++ b/core/src/date.rs @@ -4,13 +4,17 @@ use std::ops::Deref; use chrono::{DateTime, Utc}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; - -use crate::error::MetadataError; +use thiserror::Error; use librespot_protocol as protocol; - use protocol::metadata::Date as DateMessage; +#[derive(Debug, Error)] +pub enum DateError { + #[error("item has invalid date")] + InvalidTimestamp, +} + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct Date(pub DateTime); @@ -26,11 +30,11 @@ impl Date { self.0.timestamp() } - pub fn from_timestamp(timestamp: i64) -> Result { + pub fn from_timestamp(timestamp: i64) -> Result { if let Some(date_time) = NaiveDateTime::from_timestamp_opt(timestamp, 0) { Ok(Self::from_utc(date_time)) } else { - Err(MetadataError::InvalidTimestamp) + Err(DateError::InvalidTimestamp) } } @@ -63,7 +67,7 @@ impl From> for Date { } impl TryFrom for Date { - type Error = MetadataError; + type Error = DateError; fn try_from(timestamp: i64) -> Result { Self::from_timestamp(timestamp) } diff --git a/core/src/http_client.rs b/core/src/http_client.rs index 7b8aad72..21624e1a 100644 --- a/core/src/http_client.rs +++ b/core/src/http_client.rs @@ -1,10 +1,14 @@ use bytes::Bytes; +use futures_util::future::IntoStream; +use futures_util::FutureExt; use http::header::HeaderValue; use http::uri::InvalidUri; -use hyper::header::InvalidHeaderValue; +use hyper::client::{HttpConnector, ResponseFuture}; +use hyper::header::{InvalidHeaderValue, USER_AGENT}; use hyper::{Body, Client, Request, Response, StatusCode}; use hyper_proxy::{Intercept, Proxy, ProxyConnector}; use hyper_rustls::HttpsConnector; +use rustls::ClientConfig; use std::env::consts::OS; use thiserror::Error; use url::Url; @@ -13,6 +17,7 @@ use crate::version::{SPOTIFY_MOBILE_VERSION, SPOTIFY_VERSION, VERSION_STRING}; pub struct HttpClient { proxy: Option, + tls_config: ClientConfig, } #[derive(Error, Debug)] @@ -43,15 +48,60 @@ impl From for HttpClientError { impl HttpClient { pub fn new(proxy: Option<&Url>) -> Self { + // configuring TLS is expensive and should be done once per process + let root_store = match rustls_native_certs::load_native_certs() { + Ok(store) => store, + Err((Some(store), err)) => { + warn!("Could not load all certificates: {:?}", err); + store + } + Err((None, err)) => Err(err).expect("cannot access native cert store"), + }; + + let mut tls_config = ClientConfig::new(); + tls_config.root_store = root_store; + tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + Self { proxy: proxy.cloned(), + tls_config, } } - pub async fn request(&self, mut req: Request) -> Result, HttpClientError> { + pub async fn request(&self, req: Request) -> Result, HttpClientError> { + let request = self.request_fut(req)?; + { + let response = request.await; + if let Ok(response) = &response { + let status = response.status(); + if status != StatusCode::OK { + return Err(HttpClientError::NotOK(status.into())); + } + } + response.map_err(HttpClientError::Response) + } + } + + pub async fn request_body(&self, req: Request) -> Result { + let response = self.request(req).await?; + hyper::body::to_bytes(response.into_body()) + .await + .map_err(HttpClientError::Response) + } + + pub fn request_stream( + &self, + req: Request, + ) -> Result, HttpClientError> { + Ok(self.request_fut(req)?.into_stream()) + } + + pub fn request_fut(&self, mut req: Request) -> Result { trace!("Requesting {:?}", req.uri().to_string()); - let connector = HttpsConnector::with_native_roots(); + let mut http = HttpConnector::new(); + http.enforce_http(false); + let connector = HttpsConnector::from((http, self.tls_config.clone())); let spotify_version = match OS { "android" | "ios" => SPOTIFY_MOBILE_VERSION.to_owned(), @@ -68,7 +118,7 @@ impl HttpClient { let headers_mut = req.headers_mut(); headers_mut.insert( - "User-Agent", + USER_AGENT, // Some features like lyrics are version-gated and require an official version string. HeaderValue::from_str(&format!( "Spotify/{} {} ({})", @@ -76,38 +126,16 @@ impl HttpClient { ))?, ); - let response = if let Some(url) = &self.proxy { + let request = if let Some(url) = &self.proxy { let proxy_uri = url.to_string().parse()?; let proxy = Proxy::new(Intercept::All, proxy_uri); let proxy_connector = ProxyConnector::from_proxy(connector, proxy)?; - Client::builder() - .build(proxy_connector) - .request(req) - .await - .map_err(HttpClientError::Request) + Client::builder().build(proxy_connector).request(req) } else { - Client::builder() - .build(connector) - .request(req) - .await - .map_err(HttpClientError::Request) + Client::builder().build(connector).request(req) }; - if let Ok(response) = &response { - let status = response.status(); - if status != StatusCode::OK { - return Err(HttpClientError::NotOK(status.into())); - } - } - - response - } - - pub async fn request_body(&self, req: Request) -> Result { - let response = self.request(req).await?; - hyper::body::to_bytes(response.into_body()) - .await - .map_err(HttpClientError::Response) + Ok(request) } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 09275d80..76ddbd37 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -11,9 +11,11 @@ pub mod apresolve; pub mod audio_key; pub mod authentication; pub mod cache; +pub mod cdn_url; pub mod channel; pub mod config; mod connection; +pub mod date; #[allow(dead_code)] mod dealer; #[doc(hidden)] diff --git a/core/src/spclient.rs b/core/src/spclient.rs index 3a40c1a7..c0336690 100644 --- a/core/src/spclient.rs +++ b/core/src/spclient.rs @@ -8,9 +8,11 @@ use crate::protocol::extended_metadata::BatchedEntityRequest; use crate::spotify_id::SpotifyId; use bytes::Bytes; +use futures_util::future::IntoStream; use http::header::HeaderValue; -use hyper::header::InvalidHeaderValue; -use hyper::{Body, HeaderMap, Request}; +use hyper::client::ResponseFuture; +use hyper::header::{InvalidHeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE, RANGE}; +use hyper::{Body, HeaderMap, Method, Request}; use protobuf::Message; use rand::Rng; use std::time::Duration; @@ -86,7 +88,7 @@ impl SpClient { pub async fn request_with_protobuf( &self, - method: &str, + method: &Method, endpoint: &str, headers: Option, message: &dyn Message, @@ -94,7 +96,7 @@ impl SpClient { let body = protobuf::text_format::print_to_string(message); let mut headers = headers.unwrap_or_else(HeaderMap::new); - headers.insert("Content-Type", "application/protobuf".parse()?); + headers.insert(CONTENT_TYPE, "application/protobuf".parse()?); self.request(method, endpoint, Some(headers), Some(body)) .await @@ -102,20 +104,20 @@ impl SpClient { pub async fn request_as_json( &self, - method: &str, + method: &Method, endpoint: &str, headers: Option, body: Option, ) -> SpClientResult { let mut headers = headers.unwrap_or_else(HeaderMap::new); - headers.insert("Accept", "application/json".parse()?); + headers.insert(ACCEPT, "application/json".parse()?); self.request(method, endpoint, Some(headers), body).await } pub async fn request( &self, - method: &str, + method: &Method, endpoint: &str, headers: Option, body: Option, @@ -130,12 +132,12 @@ impl SpClient { // Reconnection logic: retrieve the endpoint every iteration, so we can try // another access point when we are experiencing network issues (see below). - let mut uri = self.base_url().await; - uri.push_str(endpoint); + let mut url = self.base_url().await; + url.push_str(endpoint); let mut request = Request::builder() .method(method) - .uri(uri) + .uri(url) .body(Body::from(body.clone()))?; // Reconnection logic: keep getting (cached) tokens because they might have expired. @@ -144,7 +146,7 @@ impl SpClient { *headers_mut = hdrs.clone(); } headers_mut.insert( - "Authorization", + AUTHORIZATION, HeaderValue::from_str(&format!( "Bearer {}", self.session() @@ -212,13 +214,13 @@ impl SpClient { let mut headers = HeaderMap::new(); headers.insert("X-Spotify-Connection-Id", connection_id.parse()?); - self.request_with_protobuf("PUT", &endpoint, Some(headers), &state) + self.request_with_protobuf(&Method::PUT, &endpoint, Some(headers), &state) .await } pub async fn get_metadata(&self, scope: &str, id: SpotifyId) -> SpClientResult { let endpoint = format!("/metadata/4/{}/{}", scope, id.to_base16()); - self.request("GET", &endpoint, None, None).await + self.request(&Method::GET, &endpoint, None, None).await } pub async fn get_track_metadata(&self, track_id: SpotifyId) -> SpClientResult { @@ -244,7 +246,8 @@ impl SpClient { pub async fn get_lyrics(&self, track_id: SpotifyId) -> SpClientResult { let endpoint = format!("/color-lyrics/v1/track/{}", track_id.to_base62(),); - self.request_as_json("GET", &endpoint, None, None).await + self.request_as_json(&Method::GET, &endpoint, None, None) + .await } pub async fn get_lyrics_for_image( @@ -258,19 +261,48 @@ impl SpClient { image_id ); - self.request_as_json("GET", &endpoint, None, None).await + self.request_as_json(&Method::GET, &endpoint, None, None) + .await } // TODO: Find endpoint for newer canvas.proto and upgrade to that. pub async fn get_canvases(&self, request: EntityCanvazRequest) -> SpClientResult { let endpoint = "/canvaz-cache/v0/canvases"; - self.request_with_protobuf("POST", endpoint, None, &request) + self.request_with_protobuf(&Method::POST, endpoint, None, &request) .await } pub async fn get_extended_metadata(&self, request: BatchedEntityRequest) -> SpClientResult { let endpoint = "/extended-metadata/v0/extended-metadata"; - self.request_with_protobuf("POST", endpoint, None, &request) + self.request_with_protobuf(&Method::POST, endpoint, None, &request) .await } + + pub async fn get_audio_urls(&self, file_id: FileId) -> SpClientResult { + let endpoint = format!( + "/storage-resolve/files/audio/interactive/{}", + file_id.to_base16() + ); + self.request(&Method::GET, &endpoint, None, None).await + } + + pub fn stream_file( + &self, + url: &str, + offset: usize, + length: usize, + ) -> Result, SpClientError> { + let req = Request::builder() + .method(&Method::GET) + .uri(url) + .header( + RANGE, + HeaderValue::from_str(&format!("bytes={}-{}", offset, offset + length - 1))?, + ) + .body(Body::empty())?; + + let stream = self.session().http_client().request_stream(req)?; + + Ok(stream) + } } diff --git a/metadata/src/album.rs b/metadata/src/album.rs index fe01ee2b..ac6fec20 100644 --- a/metadata/src/album.rs +++ b/metadata/src/album.rs @@ -6,7 +6,6 @@ use crate::{ artist::Artists, availability::Availabilities, copyright::Copyrights, - date::Date, error::{MetadataError, RequestError}, external_id::ExternalIds, image::Images, @@ -18,6 +17,7 @@ use crate::{ Metadata, }; +use librespot_core::date::Date; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; use librespot_protocol as protocol; diff --git a/metadata/src/availability.rs b/metadata/src/availability.rs index eb2b5fdd..27a85eed 100644 --- a/metadata/src/availability.rs +++ b/metadata/src/availability.rs @@ -3,8 +3,9 @@ use std::ops::Deref; use thiserror::Error; -use crate::{date::Date, util::from_repeated_message}; +use crate::util::from_repeated_message; +use librespot_core::date::Date; use librespot_protocol as protocol; use protocol::metadata::Availability as AvailabilityMessage; diff --git a/metadata/src/episode.rs b/metadata/src/episode.rs index 7032999b..05d68aaf 100644 --- a/metadata/src/episode.rs +++ b/metadata/src/episode.rs @@ -9,7 +9,6 @@ use crate::{ }, availability::Availabilities, content_rating::ContentRatings, - date::Date, error::{MetadataError, RequestError}, image::Images, request::RequestResult, @@ -19,6 +18,7 @@ use crate::{ Metadata, }; +use librespot_core::date::Date; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; use librespot_protocol as protocol; diff --git a/metadata/src/error.rs b/metadata/src/error.rs index 2aeaef1e..d1f6cc0b 100644 --- a/metadata/src/error.rs +++ b/metadata/src/error.rs @@ -3,6 +3,7 @@ use thiserror::Error; use protobuf::ProtobufError; +use librespot_core::date::DateError; use librespot_core::mercury::MercuryError; use librespot_core::spclient::SpClientError; use librespot_core::spotify_id::SpotifyIdError; @@ -22,7 +23,7 @@ pub enum MetadataError { #[error("{0}")] InvalidSpotifyId(#[from] SpotifyIdError), #[error("item has invalid date")] - InvalidTimestamp, + InvalidTimestamp(#[from] DateError), #[error("audio item is non-playable")] NonPlayable, #[error("could not parse protobuf: {0}")] diff --git a/metadata/src/lib.rs b/metadata/src/lib.rs index 15b68e1f..af9c80ec 100644 --- a/metadata/src/lib.rs +++ b/metadata/src/lib.rs @@ -15,7 +15,6 @@ pub mod audio; pub mod availability; pub mod content_rating; pub mod copyright; -pub mod date; pub mod episode; pub mod error; pub mod external_id; diff --git a/metadata/src/playlist/attribute.rs b/metadata/src/playlist/attribute.rs index f00a2b13..ac2eef65 100644 --- a/metadata/src/playlist/attribute.rs +++ b/metadata/src/playlist/attribute.rs @@ -3,8 +3,9 @@ use std::convert::{TryFrom, TryInto}; use std::fmt::Debug; use std::ops::Deref; -use crate::{date::Date, error::MetadataError, image::PictureSizes, util::from_repeated_enum}; +use crate::{error::MetadataError, image::PictureSizes, util::from_repeated_enum}; +use librespot_core::date::Date; use librespot_core::spotify_id::SpotifyId; use librespot_protocol as protocol; diff --git a/metadata/src/playlist/item.rs b/metadata/src/playlist/item.rs index de2dc6db..5b97c382 100644 --- a/metadata/src/playlist/item.rs +++ b/metadata/src/playlist/item.rs @@ -2,10 +2,11 @@ use std::convert::{TryFrom, TryInto}; use std::fmt::Debug; use std::ops::Deref; -use crate::{date::Date, error::MetadataError, util::try_from_repeated_message}; +use crate::{error::MetadataError, util::try_from_repeated_message}; use super::attribute::{PlaylistAttributes, PlaylistItemAttributes}; +use librespot_core::date::Date; use librespot_core::spotify_id::SpotifyId; use librespot_protocol as protocol; diff --git a/metadata/src/playlist/list.rs b/metadata/src/playlist/list.rs index 625373db..5df839b1 100644 --- a/metadata/src/playlist/list.rs +++ b/metadata/src/playlist/list.rs @@ -5,7 +5,6 @@ use std::ops::Deref; use protobuf::Message; use crate::{ - date::Date, error::MetadataError, request::{MercuryRequest, RequestResult}, util::{from_repeated_enum, try_from_repeated_message}, @@ -17,6 +16,7 @@ use super::{ permission::Capabilities, }; +use librespot_core::date::Date; use librespot_core::session::Session; use librespot_core::spotify_id::{NamedSpotifyId, SpotifyId}; use librespot_protocol as protocol; diff --git a/metadata/src/sale_period.rs b/metadata/src/sale_period.rs index 6152b901..9040d71e 100644 --- a/metadata/src/sale_period.rs +++ b/metadata/src/sale_period.rs @@ -1,8 +1,9 @@ use std::fmt::Debug; use std::ops::Deref; -use crate::{date::Date, restriction::Restrictions, util::from_repeated_message}; +use crate::{restriction::Restrictions, util::from_repeated_message}; +use librespot_core::date::Date; use librespot_protocol as protocol; use protocol::metadata::SalePeriod as SalePeriodMessage; diff --git a/metadata/src/track.rs b/metadata/src/track.rs index d0639c82..fc9c131e 100644 --- a/metadata/src/track.rs +++ b/metadata/src/track.rs @@ -13,7 +13,6 @@ use crate::{ }, availability::{Availabilities, UnavailabilityReason}, content_rating::ContentRatings, - date::Date, error::RequestError, external_id::ExternalIds, restriction::Restrictions, @@ -22,6 +21,7 @@ use crate::{ Metadata, MetadataError, RequestResult, }; +use librespot_core::date::Date; use librespot_core::session::Session; use librespot_core::spotify_id::SpotifyId; use librespot_protocol as protocol; diff --git a/playback/src/player.rs b/playback/src/player.rs index 6dec6a56..50493185 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -341,8 +341,6 @@ impl Player { // While PlayerInternal is written as a future, it still contains blocking code. // It must be run by using block_on() in a dedicated thread. - // futures_executor::block_on(internal); - let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"); runtime.block_on(internal); @@ -1904,7 +1902,7 @@ impl PlayerInternal { let (result_tx, result_rx) = oneshot::channel(); let handle = tokio::runtime::Handle::current(); - std::thread::spawn(move || { + thread::spawn(move || { let data = handle.block_on(loader.load_track(spotify_id, position_ms)); if let Some(data) = data { let _ = result_tx.send(data); diff --git a/protocol/build.rs b/protocol/build.rs index a4ca4c37..aa107607 100644 --- a/protocol/build.rs +++ b/protocol/build.rs @@ -26,6 +26,7 @@ fn compile() { proto_dir.join("playlist_annotate3.proto"), proto_dir.join("playlist_permission.proto"), proto_dir.join("playlist4_external.proto"), + proto_dir.join("storage-resolve.proto"), proto_dir.join("user_attributes.proto"), // TODO: remove these legacy protobufs when we are on the new API completely proto_dir.join("authentication.proto"),