diff --git a/Cargo.lock b/Cargo.lock index 37cbae56..7eddf8df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "aes" version = "0.6.0" @@ -82,9 +84,9 @@ checksum = "28b2cd92db5cbd74e8e5028f7e27dd7aa3090e89e4f2a197cc7c8dfb69c7063b" [[package]] name = "async-trait" -version = "0.1.50" +version = "0.1.51" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b98e84bbb4cbcdd97da190ba0c58a1bb0de2c1fdf67d159e192ed766aeca722" +checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" dependencies = [ "proc-macro2", "quote", @@ -162,9 +164,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" +checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" [[package]] name = "cc" @@ -256,12 +258,28 @@ dependencies = [ "memchr", ] +[[package]] +name = "core-foundation" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6888e10551bb93e424d8df1d07f1a8b4fceb0001a3a4b048bfc47554946f47b3" +dependencies = [ + "core-foundation-sys 0.8.3", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7ca8a5221364ef15ce201e8ed2f609fc312682a8f4e0e3d4aa5879764e0fa3b" +[[package]] +name = "core-foundation-sys" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc" + [[package]] name = "coreaudio-rs" version = "0.10.0" @@ -288,7 +306,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8351ddf2aaa3c583fa388029f8b3d26f3c7035a20911fdd5f2e2ed7ab57dad25" dependencies = [ "alsa", - "core-foundation-sys", + "core-foundation-sys 0.6.2", "coreaudio-rs", "jack 0.6.6", "jni", @@ -326,6 +344,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "ct-logs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1a816186fa68d9e426e3cb4ae4dff1fcd8e4a2c34b781bf7a822574a0d0aac8" +dependencies = [ + "sct", +] + [[package]] name = "ctr" version = "0.6.0" @@ -719,6 +746,25 @@ dependencies = [ "system-deps", ] +[[package]] +name = "h2" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fd819562fcebdac5afc5c113c3ec36f902840b70fd4fc458799c8ce4607ae55" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.9.1" @@ -797,9 +843,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11" +checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b" dependencies = [ "bytes", "fnv", @@ -819,9 +865,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.4.1" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3a87b616e37e93c22fb19bcd386f02f3af5ea98a25670ad0fce773de23c5e68" +checksum = "acd94fdbe1d4ff688b67b04eee2e17bd50995534a61539e45adfefb45e5e5503" [[package]] name = "httpdate" @@ -837,20 +883,21 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.8" +version = "0.14.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3f71a7eea53a3f8257a7b4795373ff886397178cd634430ea94e12d7fe4fe34" +checksum = "436ec0091e4f20e655156a30a0df3770fe2900aa301e548e08446ec794b6953c" dependencies = [ "bytes", "futures-channel", "futures-core", "futures-util", + "h2", "http", "http-body", "httparse", "httpdate", "itoa", - "pin-project", + "pin-project-lite", "socket2", "tokio", "tower-service", @@ -869,8 +916,29 @@ dependencies = [ "headers", "http", "hyper", + "hyper-rustls", + "rustls-native-certs", "tokio", + "tokio-rustls", "tower-service", + "webpki", +] + +[[package]] +name = "hyper-rustls" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" +dependencies = [ + "ct-logs", + "futures-util", + "hyper", + "log", + "rustls", + "rustls-native-certs", + "tokio", + "tokio-rustls", + "webpki", ] [[package]] @@ -1052,9 +1120,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.95" +version = "0.2.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "789da6d93f1b866ffe175afc5322a4d76c038605a1c3319bb57b06967ca98a36" +checksum = "8521a1b57e76b1ec69af7599e75e38e7b7fad6610f037db8c79b127201b5d119" [[package]] name = "libloading" @@ -1223,6 +1291,7 @@ dependencies = [ "httparse", "hyper", "hyper-proxy", + "hyper-rustls", "librespot-protocol", "log", "num", @@ -1280,10 +1349,12 @@ version = "0.2.0" dependencies = [ "async-trait", "byteorder", + "bytes", "librespot-core", "librespot-protocol", "log", "protobuf", + "thiserror", ] [[package]] @@ -1667,6 +1738,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl-probe" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28988d872ab76095a6e6ac88d99b54fd267702734fd7ffe610ca27f533ddb95a" + [[package]] name = "parking_lot" version = "0.11.1" @@ -1866,24 +1943,24 @@ dependencies = [ [[package]] name = "protobuf" -version = "2.23.0" +version = "2.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45604fc7a88158e7d514d8e22e14ac746081e7a70d7690074dd0029ee37458d6" +checksum = "47c327e191621a2158159df97cdbc2e7074bb4e940275e35abf38eb3d2595754" [[package]] name = "protobuf-codegen" -version = "2.23.0" +version = "2.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cb87f342b585958c1c086313dbc468dcac3edf5e90362111c26d7a58127ac095" +checksum = "3df8c98c08bd4d6653c2dbae00bd68c1d1d82a360265a5b0bbc73d48c63cb853" dependencies = [ "protobuf", ] [[package]] name = "protobuf-codegen-pure" -version = "2.23.0" +version = "2.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ca6e0e2f898f7856a6328650abc9b2df71b7c1a5f39be0800d19051ad0214b2" +checksum = "394a73e2a819405364df8d30042c0f1174737a763e0170497ec9d36f8a2ea8f7" dependencies = [ "protobuf", "protobuf-codegen", @@ -2045,6 +2122,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls-native-certs" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092" +dependencies = [ + "openssl-probe", + "rustls", + "schannel", + "security-framework", +] + [[package]] name = "ryu" version = "1.0.5" @@ -2060,6 +2149,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -2099,6 +2198,29 @@ dependencies = [ "version-compare", ] +[[package]] +name = "security-framework" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23a2ac85147a3a11d77ecf1bc7166ec0b92febfa4461c37944e180f319ece467" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys 0.8.3", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e" +dependencies = [ + "core-foundation-sys 0.8.3", + "libc", +] + [[package]] name = "semver" version = "0.11.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 3c239034..64467366 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -16,15 +16,16 @@ version = "0.2.0" aes = "0.6" base64 = "0.13" byteorder = "1.4" -bytes = "1.0" +bytes = "1" form_urlencoded = "1.0" futures-core = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] } hmac = "0.11" httparse = "1.3" http = "0.2" -hyper = { version = "0.14", features = ["client", "tcp", "http1"] } -hyper-proxy = { version = "0.9.1", default-features = false } +hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] } +hyper-proxy = { version = "0.9.1", default-features = false, features = ["rustls"] } +hyper-rustls = { version = "0.22", default-features = false, features = ["native-tokio"] } log = "0.4" num = "0.4" num-bigint = { version = "0.4", features = ["rand"] } diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index 623c7cb3..d39c3101 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -6,14 +6,14 @@ use std::sync::atomic::{AtomicUsize, Ordering}; pub type SocketAddress = (String, u16); #[derive(Default)] -struct AccessPoints { +pub struct AccessPoints { accesspoint: Vec, dealer: Vec, spclient: Vec, } #[derive(Deserialize)] -struct ApResolveData { +pub struct ApResolveData { accesspoint: Vec, dealer: Vec, spclient: Vec, @@ -42,7 +42,7 @@ component! { impl ApResolver { // return a port if a proxy URL and/or a proxy port was specified. This is useful even when // there is no proxy, but firewalls only allow certain ports (e.g. 443 and not 4070). - fn port_config(&self) -> Option { + pub fn port_config(&self) -> Option { if self.session().config().proxy.is_some() || self.session().config().ap_port.is_some() { Some(self.session().config().ap_port.unwrap_or(443)) } else { @@ -54,9 +54,7 @@ impl ApResolver { data.into_iter() .filter_map(|ap| { let mut split = ap.rsplitn(2, ':'); - let port = split - .next() - .expect("rsplitn should not return empty iterator"); + let port = split.next()?; let host = split.next()?.to_owned(); let port: u16 = port.parse().ok()?; if let Some(p) = self.port_config() { @@ -69,12 +67,11 @@ impl ApResolver { .collect() } - async fn try_apresolve(&self) -> Result> { + pub async fn try_apresolve(&self) -> Result> { let req = Request::builder() .method("GET") .uri("http://apresolve.spotify.com/?type=accesspoint&type=dealer&type=spclient") - .body(Body::empty()) - .unwrap(); + .body(Body::empty())?; let body = self.session().http_client().request_body(req).await?; let data: ApResolveData = serde_json::from_slice(body.as_ref())?; diff --git a/core/src/dealer/mod.rs b/core/src/dealer/mod.rs index bca1ec20..ba1e68df 100644 --- a/core/src/dealer/mod.rs +++ b/core/src/dealer/mod.rs @@ -401,7 +401,7 @@ async fn connect( // Spawn a task that will forward messages from the channel to the websocket. let send_task = { - let shared = Arc::clone(&shared); + let shared = Arc::clone(shared); tokio::spawn(async move { let result = loop { @@ -450,7 +450,7 @@ async fn connect( }) }; - let shared = Arc::clone(&shared); + let shared = Arc::clone(shared); // A task that receives messages from the web socket. let receive_task = tokio::spawn(async { diff --git a/core/src/http_client.rs b/core/src/http_client.rs index 5f8ef780..ab1366a8 100644 --- a/core/src/http_client.rs +++ b/core/src/http_client.rs @@ -1,12 +1,25 @@ -use hyper::client::HttpConnector; -use hyper::{Body, Client, Request, Response}; +use hyper::{Body, Client, Request, Response, StatusCode}; use hyper_proxy::{Intercept, Proxy, ProxyConnector}; +use hyper_rustls::HttpsConnector; +use thiserror::Error; use url::Url; pub struct HttpClient { proxy: Option, } +#[derive(Error, Debug)] +pub enum HttpClientError { + #[error("could not parse request: {0}")] + Parsing(#[from] http::uri::InvalidUri), + #[error("could not send request: {0}")] + Request(hyper::Error), + #[error("could not read response: {0}")] + Response(hyper::Error), + #[error("could not build proxy connector: {0}")] + ProxyBuilder(#[from] std::io::Error), +} + impl HttpClient { pub fn new(proxy: Option<&Url>) -> Self { Self { @@ -14,21 +27,41 @@ impl HttpClient { } } - pub async fn request(&self, req: Request) -> Result, hyper::Error> { - if let Some(url) = &self.proxy { - // Panic safety: all URLs are valid URIs - let uri = url.to_string().parse().unwrap(); + pub async fn request(&self, req: Request) -> Result, HttpClientError> { + let connector = HttpsConnector::with_native_roots(); + let uri = req.uri().clone(); + + let response = if let Some(url) = &self.proxy { + let uri = url.to_string().parse()?; let proxy = Proxy::new(Intercept::All, uri); - let connector = HttpConnector::new(); - let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy); - Client::builder().build(proxy_connector).request(req).await + let proxy_connector = ProxyConnector::from_proxy(connector, proxy)?; + + Client::builder() + .build(proxy_connector) + .request(req) + .await + .map_err(HttpClientError::Request) } else { - Client::new().request(req).await + Client::builder() + .build(connector) + .request(req) + .await + .map_err(HttpClientError::Request) + }; + + if let Ok(response) = &response { + if response.status() != StatusCode::OK { + debug!("{} returned status {}", uri, response.status()); + } } + + response } - pub async fn request_body(&self, req: Request) -> Result { + pub async fn request_body(&self, req: Request) -> Result { let response = self.request(req).await?; - hyper::body::to_bytes(response.into_body()).await + hyper::body::to_bytes(response.into_body()) + .await + .map_err(HttpClientError::Response) } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 9c92c235..c928f32b 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -7,7 +7,7 @@ use librespot_protocol as protocol; #[macro_use] mod component; -mod apresolve; +pub mod apresolve; pub mod audio_key; pub mod authentication; pub mod cache; @@ -24,9 +24,10 @@ pub mod packet; mod proxytunnel; pub mod session; mod socket; -mod spclient; +#[allow(dead_code)] +pub mod spclient; pub mod spotify_id; -mod token; +pub mod token; #[doc(hidden)] pub mod util; pub mod version; diff --git a/core/src/mercury/types.rs b/core/src/mercury/types.rs index 1d6b5b15..007ffb38 100644 --- a/core/src/mercury/types.rs +++ b/core/src/mercury/types.rs @@ -1,6 +1,8 @@ use byteorder::{BigEndian, WriteBytesExt}; use protobuf::Message; +use std::fmt; use std::io::Write; +use thiserror::Error; use crate::packet::PacketType; use crate::protocol; @@ -28,9 +30,15 @@ pub struct MercuryResponse { pub payload: Vec>, } -#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)] +#[derive(Debug, Error, Hash, PartialEq, Eq, Copy, Clone)] pub struct MercuryError; +impl fmt::Display for MercuryError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Mercury error") + } +} + impl ToString for MercuryMethod { fn to_string(&self) -> String { match *self { @@ -55,6 +63,7 @@ impl MercuryMethod { } impl MercuryRequest { + // TODO: change into Result and remove unwraps pub fn encode(&self, seq: &[u8]) -> Vec { let mut packet = Vec::new(); packet.write_u16::(seq.len() as u16).unwrap(); diff --git a/core/src/session.rs b/core/src/session.rs index 81975a80..f683960a 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -27,6 +27,7 @@ use crate::connection::{self, AuthenticationError}; use crate::http_client::HttpClient; use crate::mercury::MercuryManager; use crate::packet::PacketType; +use crate::spclient::SpClient; use crate::token::TokenProvider; #[derive(Debug, Error)] @@ -55,6 +56,7 @@ struct SessionInternal { audio_key: OnceCell, channel: OnceCell, mercury: OnceCell, + spclient: OnceCell, token_provider: OnceCell, cache: Option>, @@ -95,6 +97,7 @@ impl Session { audio_key: OnceCell::new(), channel: OnceCell::new(), mercury: OnceCell::new(), + spclient: OnceCell::new(), token_provider: OnceCell::new(), handle: tokio::runtime::Handle::current(), session_id, @@ -159,6 +162,10 @@ impl Session { .get_or_init(|| MercuryManager::new(self.weak())) } + pub fn spclient(&self) -> &SpClient { + self.0.spclient.get_or_init(|| SpClient::new(self.weak())) + } + pub fn token_provider(&self) -> &TokenProvider { self.0 .token_provider diff --git a/core/src/spclient.rs b/core/src/spclient.rs index eb7b3f0f..77585bb9 100644 --- a/core/src/spclient.rs +++ b/core/src/spclient.rs @@ -1 +1,255 @@ -// https://github.com/librespot-org/librespot-java/blob/27783e06f456f95228c5ac37acf2bff8c1a8a0c4/lib/src/main/java/xyz/gianlu/librespot/dealer/ApiClient.java +use crate::apresolve::SocketAddress; +use crate::http_client::HttpClientError; +use crate::mercury::MercuryError; +use crate::protocol; +use crate::spotify_id::SpotifyId; + +use hyper::header::InvalidHeaderValue; +use hyper::{Body, HeaderMap, Request}; +use rand::Rng; +use std::time::Duration; +use thiserror::Error; + +component! { + SpClient : SpClientInner { + accesspoint: Option = None, + strategy: RequestStrategy = RequestStrategy::default(), + } +} + +pub type SpClientResult = Result; + +#[derive(Error, Debug)] +pub enum SpClientError { + #[error("could not get authorization token")] + Token(#[from] MercuryError), + #[error("could not parse request: {0}")] + Parsing(#[from] http::Error), + #[error("could not complete request: {0}")] + Network(#[from] HttpClientError), +} + +impl From for SpClientError { + fn from(err: InvalidHeaderValue) -> Self { + Self::Parsing(err.into()) + } +} + +#[derive(Copy, Clone, Debug)] +pub enum RequestStrategy { + TryTimes(usize), + Infinitely, +} + +impl Default for RequestStrategy { + fn default() -> Self { + RequestStrategy::TryTimes(10) + } +} + +impl SpClient { + pub fn set_strategy(&self, strategy: RequestStrategy) { + self.lock(|inner| inner.strategy = strategy) + } + + pub async fn flush_accesspoint(&self) { + self.lock(|inner| inner.accesspoint = None) + } + + pub async fn get_accesspoint(&self) -> SocketAddress { + // Memoize the current access point. + let ap = self.lock(|inner| inner.accesspoint.clone()); + match ap { + Some(tuple) => tuple, + None => { + let tuple = self.session().apresolver().resolve("spclient").await; + self.lock(|inner| inner.accesspoint = Some(tuple.clone())); + info!( + "Resolved \"{}:{}\" as spclient access point", + tuple.0, tuple.1 + ); + tuple + } + } + } + + pub async fn base_url(&self) -> String { + let ap = self.get_accesspoint().await; + format!("https://{}:{}", ap.0, ap.1) + } + + pub async fn protobuf_request( + &self, + method: &str, + endpoint: &str, + headers: Option, + message: &dyn protobuf::Message, + ) -> SpClientResult { + let body = protobuf::text_format::print_to_string(message); + + let mut headers = headers.unwrap_or_else(HeaderMap::new); + headers.insert("Content-Type", "application/protobuf".parse()?); + + self.request(method, endpoint, Some(headers), Some(body)) + .await + } + + pub async fn request( + &self, + method: &str, + endpoint: &str, + headers: Option, + body: Option, + ) -> SpClientResult { + let mut tries: usize = 0; + let mut last_response; + + let body = body.unwrap_or_else(String::new); + + loop { + tries += 1; + + // Reconnection logic: retrieve the endpoint every iteration, so we can try + // another access point when we are experiencing network issues (see below). + let mut uri = self.base_url().await; + uri.push_str(endpoint); + + let mut request = Request::builder() + .method(method) + .uri(uri) + .body(Body::from(body.clone()))?; + + // Reconnection logic: keep getting (cached) tokens because they might have expired. + let headers_mut = request.headers_mut(); + if let Some(ref hdrs) = headers { + *headers_mut = hdrs.clone(); + } + headers_mut.insert( + "Authorization", + http::header::HeaderValue::from_str(&format!( + "Bearer {}", + self.session() + .token_provider() + .get_token("playlist-read") + .await? + .access_token + ))?, + ); + + last_response = self + .session() + .http_client() + .request_body(request) + .await + .map_err(SpClientError::Network); + if last_response.is_ok() { + return last_response; + } + + // Break before the reconnection logic below, so that the current access point + // is retained when max_tries == 1. Leave it up to the caller when to flush. + if let RequestStrategy::TryTimes(max_tries) = self.lock(|inner| inner.strategy) { + if tries >= max_tries { + break; + } + } + + // Reconnection logic: drop the current access point if we are experiencing issues. + // This will cause the next call to base_url() to resolve a new one. + if let Err(SpClientError::Network(ref network_error)) = last_response { + match network_error { + HttpClientError::Response(_) | HttpClientError::Request(_) => { + // Keep trying the current access point three times before dropping it. + if tries % 3 == 0 { + self.flush_accesspoint().await + } + } + _ => break, // if we can't build the request now, then we won't ever + } + } + + // When retrying, avoid hammering the Spotify infrastructure by sleeping a while. + // The backoff time is chosen randomly from an ever-increasing range. + let max_seconds = u64::pow(tries as u64, 2) * 3; + let backoff = Duration::from_secs(rand::thread_rng().gen_range(1..=max_seconds)); + warn!( + "Unable to complete API request, waiting {} seconds before retrying...", + backoff.as_secs(), + ); + debug!("Error was: {:?}", last_response); + tokio::time::sleep(backoff).await; + } + + last_response + } + + pub async fn put_connect_state( + &self, + connection_id: String, + state: protocol::connect::PutStateRequest, + ) -> SpClientResult { + let endpoint = format!("/connect-state/v1/devices/{}", self.session().device_id()); + + let mut headers = HeaderMap::new(); + headers.insert("X-Spotify-Connection-Id", connection_id.parse()?); + + self.protobuf_request("PUT", &endpoint, Some(headers), &state) + .await + } + + pub async fn get_metadata(&self, scope: &str, id: SpotifyId) -> SpClientResult { + let endpoint = format!("/metadata/4/{}/{}", scope, id.to_base16()); + self.request("GET", &endpoint, None, None).await + } + + pub async fn get_track_metadata(&self, track_id: SpotifyId) -> SpClientResult { + self.get_metadata("track", track_id).await + } + + pub async fn get_episode_metadata(&self, episode_id: SpotifyId) -> SpClientResult { + self.get_metadata("episode", episode_id).await + } + + pub async fn get_album_metadata(&self, album_id: SpotifyId) -> SpClientResult { + self.get_metadata("album", album_id).await + } + + pub async fn get_artist_metadata(&self, artist_id: SpotifyId) -> SpClientResult { + self.get_metadata("artist", artist_id).await + } + + pub async fn get_show_metadata(&self, show_id: SpotifyId) -> SpClientResult { + self.get_metadata("show", show_id).await + } + + // TODO: Not working at the moment, always returns 400. + pub async fn get_lyrics(&self, track_id: SpotifyId) -> SpClientResult { + // /color-lyrics/v2/track/22L7bfCiAkJo5xGSQgmiIO/image/spotify:image:ab67616d0000b273d9194aa18fa4c9362b47464f?clientLanguage=en + // https://spclient.wg.spotify.com/color-lyrics/v2/track/{track_id}/image/spotify:image:{image_id}?clientLanguage=en + let endpoint = format!("/color-lyrics/v2/track/{}", track_id.to_base16()); + + let mut headers = HeaderMap::new(); + headers.insert("Content-Type", "application/json".parse()?); + + self.request("GET", &endpoint, Some(headers), None).await + } + + // TODO: Find endpoint for newer canvas.proto and upgrade to that. + pub async fn get_canvases( + &self, + request: protocol::canvaz::EntityCanvazRequest, + ) -> SpClientResult { + let endpoint = "/canvaz-cache/v0/canvases"; + self.protobuf_request("POST", endpoint, None, &request) + .await + } + + pub async fn get_extended_metadata( + &self, + request: protocol::extended_metadata::BatchedEntityRequest, + ) -> SpClientResult { + let endpoint = "/extended-metadata/v0/extended-metadata"; + self.protobuf_request("POST", endpoint, None, &request) + .await + } +} diff --git a/core/src/token.rs b/core/src/token.rs index 824fcc3b..91a395fd 100644 --- a/core/src/token.rs +++ b/core/src/token.rs @@ -23,11 +23,11 @@ component! { #[derive(Clone, Debug)] pub struct Token { - access_token: String, - expires_in: Duration, - token_type: String, - scopes: Vec, - timestamp: Instant, + pub access_token: String, + pub expires_in: Duration, + pub token_type: String, + pub scopes: Vec, + pub timestamp: Instant, } #[derive(Deserialize)] diff --git a/metadata/Cargo.toml b/metadata/Cargo.toml index 6e181a1a..9409bae6 100644 --- a/metadata/Cargo.toml +++ b/metadata/Cargo.toml @@ -10,12 +10,15 @@ edition = "2018" [dependencies] async-trait = "0.1" byteorder = "1.3" -protobuf = "2.14.0" +bytes = "1.0" log = "0.4" +protobuf = "2.14.0" +thiserror = "1" [dependencies.librespot-core] path = "../core" version = "0.2.0" + [dependencies.librespot-protocol] path = "../protocol" version = "0.2.0" diff --git a/metadata/src/lib.rs b/metadata/src/lib.rs index e7595f59..039bea83 100644 --- a/metadata/src/lib.rs +++ b/metadata/src/lib.rs @@ -12,9 +12,12 @@ use std::collections::HashMap; use librespot_core::mercury::MercuryError; use librespot_core::session::Session; +use librespot_core::spclient::SpClientError; use librespot_core::spotify_id::{FileId, SpotifyAudioType, SpotifyId}; use librespot_protocol as protocol; -use protobuf::Message; +use protobuf::{Message, ProtobufError}; + +use thiserror::Error; pub use crate::protocol::metadata::AudioFile_Format as FileFormat; @@ -48,9 +51,8 @@ where } } - (has_forbidden || has_allowed) - && (!has_forbidden || !countrylist_contains(forbidden.as_str(), country)) - && (!has_allowed || countrylist_contains(allowed.as_str(), country)) + !(has_forbidden && countrylist_contains(forbidden.as_str(), country) + || has_allowed && !countrylist_contains(allowed.as_str(), country)) } // A wrapper with fields the player needs @@ -66,24 +68,34 @@ pub struct AudioItem { } impl AudioItem { - pub async fn get_audio_item(session: &Session, id: SpotifyId) -> Result { + pub async fn get_audio_item(session: &Session, id: SpotifyId) -> Result { match id.audio_type { SpotifyAudioType::Track => Track::get_audio_item(session, id).await, SpotifyAudioType::Podcast => Episode::get_audio_item(session, id).await, - SpotifyAudioType::NonPlayable => Err(MercuryError), + SpotifyAudioType::NonPlayable => Err(MetadataError::NonPlayable), } } } +pub type AudioItemResult = Result; + #[async_trait] trait AudioFiles { - async fn get_audio_item(session: &Session, id: SpotifyId) -> Result; + async fn get_audio_item(session: &Session, id: SpotifyId) -> AudioItemResult; } #[async_trait] impl AudioFiles for Track { - async fn get_audio_item(session: &Session, id: SpotifyId) -> Result { + async fn get_audio_item(session: &Session, id: SpotifyId) -> AudioItemResult { let item = Self::get(session, id).await?; + let alternatives = { + if item.alternatives.is_empty() { + None + } else { + Some(item.alternatives) + } + }; + Ok(AudioItem { id, uri: format!("spotify:track:{}", id.to_base62()), @@ -91,14 +103,14 @@ impl AudioFiles for Track { name: item.name, duration: item.duration, available: item.available, - alternatives: Some(item.alternatives), + alternatives, }) } } #[async_trait] impl AudioFiles for Episode { - async fn get_audio_item(session: &Session, id: SpotifyId) -> Result { + async fn get_audio_item(session: &Session, id: SpotifyId) -> AudioItemResult { let item = Self::get(session, id).await?; Ok(AudioItem { @@ -113,23 +125,38 @@ impl AudioFiles for Episode { } } +#[derive(Debug, Error)] +pub enum MetadataError { + #[error("could not get metadata over HTTP: {0}")] + Http(#[from] SpClientError), + #[error("could not get metadata over Mercury: {0}")] + Mercury(#[from] MercuryError), + #[error("could not parse metadata: {0}")] + Parsing(#[from] ProtobufError), + #[error("response was empty")] + Empty, + #[error("audio item is non-playable")] + NonPlayable, +} + +pub type MetadataResult = Result; + #[async_trait] pub trait Metadata: Send + Sized + 'static { type Message: protobuf::Message; - fn request_url(id: SpotifyId) -> String; + async fn request(session: &Session, id: SpotifyId) -> MetadataResult; fn parse(msg: &Self::Message, session: &Session) -> Self; - async fn get(session: &Session, id: SpotifyId) -> Result { - let uri = Self::request_url(id); - let response = session.mercury().get(uri).await?; - let data = response.payload.first().expect("Empty payload"); - let msg = Self::Message::parse_from_bytes(data).unwrap(); - - Ok(Self::parse(&msg, &session)) + async fn get(session: &Session, id: SpotifyId) -> Result { + let response = Self::request(session, id).await?; + let msg = Self::Message::parse_from_bytes(&response)?; + Ok(Self::parse(&msg, session)) } } +// TODO: expose more fields available in the protobufs + #[derive(Debug, Clone)] pub struct Track { pub id: SpotifyId, @@ -189,14 +216,20 @@ pub struct Artist { pub top_tracks: Vec, } +#[async_trait] impl Metadata for Track { type Message = protocol::metadata::Track; - fn request_url(id: SpotifyId) -> String { - format!("hm://metadata/3/track/{}", id.to_base16()) + async fn request(session: &Session, track_id: SpotifyId) -> MetadataResult { + session + .spclient() + .get_track_metadata(track_id) + .await + .map_err(MetadataError::Http) } fn parse(msg: &Self::Message, session: &Session) -> Self { + debug!("MESSAGE: {:?}", msg); let country = session.country(); let artists = msg @@ -234,11 +267,16 @@ impl Metadata for Track { } } +#[async_trait] impl Metadata for Album { type Message = protocol::metadata::Album; - fn request_url(id: SpotifyId) -> String { - format!("hm://metadata/3/album/{}", id.to_base16()) + async fn request(session: &Session, album_id: SpotifyId) -> MetadataResult { + session + .spclient() + .get_album_metadata(album_id) + .await + .map_err(MetadataError::Http) } fn parse(msg: &Self::Message, _: &Session) -> Self { @@ -279,11 +317,20 @@ impl Metadata for Album { } } +#[async_trait] impl Metadata for Playlist { type Message = protocol::playlist4changes::SelectedListContent; - fn request_url(id: SpotifyId) -> String { - format!("hm://playlist/v2/playlist/{}", id.to_base62()) + // TODO: + // * Add PlaylistAnnotate3 annotations. + // * Find spclient endpoint and upgrade to that. + async fn request(session: &Session, playlist_id: SpotifyId) -> MetadataResult { + let uri = format!("hm://playlist/v2/playlist/{}", playlist_id.to_base62()); + let response = session.mercury().get(uri).await?; + match response.payload.first() { + Some(data) => Ok(data.to_vec().into()), + None => Err(MetadataError::Empty), + } } fn parse(msg: &Self::Message, _: &Session) -> Self { @@ -315,11 +362,16 @@ impl Metadata for Playlist { } } +#[async_trait] impl Metadata for Artist { type Message = protocol::metadata::Artist; - fn request_url(id: SpotifyId) -> String { - format!("hm://metadata/3/artist/{}", id.to_base16()) + async fn request(session: &Session, artist_id: SpotifyId) -> MetadataResult { + session + .spclient() + .get_artist_metadata(artist_id) + .await + .map_err(MetadataError::Http) } fn parse(msg: &Self::Message, session: &Session) -> Self { @@ -348,11 +400,16 @@ impl Metadata for Artist { } // Podcast +#[async_trait] impl Metadata for Episode { type Message = protocol::metadata::Episode; - fn request_url(id: SpotifyId) -> String { - format!("hm://metadata/3/episode/{}", id.to_base16()) + async fn request(session: &Session, episode_id: SpotifyId) -> MetadataResult { + session + .spclient() + .get_album_metadata(episode_id) + .await + .map_err(MetadataError::Http) } fn parse(msg: &Self::Message, session: &Session) -> Self { @@ -396,11 +453,16 @@ impl Metadata for Episode { } } +#[async_trait] impl Metadata for Show { type Message = protocol::metadata::Show; - fn request_url(id: SpotifyId) -> String { - format!("hm://metadata/3/show/{}", id.to_base16()) + async fn request(session: &Session, show_id: SpotifyId) -> MetadataResult { + session + .spclient() + .get_show_metadata(show_id) + .await + .map_err(MetadataError::Http) } fn parse(msg: &Self::Message, _: &Session) -> Self { diff --git a/playback/src/player.rs b/playback/src/player.rs index 0249db9c..1395b99a 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -331,7 +331,11 @@ impl Player { // While PlayerInternal is written as a future, it still contains blocking code. // It must be run by using block_on() in a dedicated thread. - futures_executor::block_on(internal); + // futures_executor::block_on(internal); + + let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"); + runtime.block_on(internal); + debug!("PlayerInternal thread finished."); }); @@ -1789,8 +1793,9 @@ impl PlayerInternal { let (result_tx, result_rx) = oneshot::channel(); + let handle = tokio::runtime::Handle::current(); std::thread::spawn(move || { - let data = futures_executor::block_on(loader.load_track(spotify_id, position_ms)); + let data = handle.block_on(loader.load_track(spotify_id, position_ms)); if let Some(data) = data { let _ = result_tx.send(data); } diff --git a/protocol/Cargo.toml b/protocol/Cargo.toml index 5c3ae084..2628ecd1 100644 --- a/protocol/Cargo.toml +++ b/protocol/Cargo.toml @@ -9,8 +9,8 @@ repository = "https://github.com/librespot-org/librespot" edition = "2018" [dependencies] -protobuf = "2.14.0" +protobuf = "2.25" [build-dependencies] -protobuf-codegen-pure = "2.14.0" +protobuf-codegen-pure = "2.25" glob = "0.3.0" diff --git a/protocol/build.rs b/protocol/build.rs index 53e04bc7..37be7000 100644 --- a/protocol/build.rs +++ b/protocol/build.rs @@ -16,9 +16,17 @@ fn compile() { let proto_dir = Path::new(&env::var("CARGO_MANIFEST_DIR").expect("env")).join("proto"); let files = &[ + proto_dir.join("connect.proto"), + proto_dir.join("devices.proto"), + proto_dir.join("entity_extension_data.proto"), + proto_dir.join("extended_metadata.proto"), + proto_dir.join("extension_kind.proto"), proto_dir.join("metadata.proto"), + proto_dir.join("player.proto"), // TODO: remove these legacy protobufs when we are on the new API completely proto_dir.join("authentication.proto"), + proto_dir.join("canvaz.proto"), + proto_dir.join("canvaz-meta.proto"), proto_dir.join("keyexchange.proto"), proto_dir.join("mercury.proto"), proto_dir.join("playlist4changes.proto"), diff --git a/protocol/proto/canvaz-meta.proto b/protocol/proto/canvaz-meta.proto new file mode 100644 index 00000000..540daeb6 --- /dev/null +++ b/protocol/proto/canvaz-meta.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package com.spotify.canvaz; + +option optimize_for = CODE_SIZE; +option java_package = "com.spotify.canvaz"; + +enum Type { + IMAGE = 0; + VIDEO = 1; + VIDEO_LOOPING = 2; + VIDEO_LOOPING_RANDOM = 3; + GIF = 4; +} \ No newline at end of file diff --git a/protocol/proto/canvaz.proto b/protocol/proto/canvaz.proto new file mode 100644 index 00000000..ca283ab5 --- /dev/null +++ b/protocol/proto/canvaz.proto @@ -0,0 +1,40 @@ +syntax = "proto3"; + +package com.spotify.canvazcache; + +import "canvaz-meta.proto"; + +option optimize_for = CODE_SIZE; +option java_package = "com.spotify.canvaz"; + +message Artist { + string uri = 1; + string name = 2; + string avatar = 3; +} + +message EntityCanvazResponse { + repeated Canvaz canvases = 1; + message Canvaz { + string id = 1; + string url = 2; + string file_id = 3; + com.spotify.canvaz.Type type = 4; + string entity_uri = 5; + Artist artist = 6; + bool explicit = 7; + string uploaded_by = 8; + string etag = 9; + string canvas_uri = 11; + } + + int64 ttl_in_seconds = 2; +} + +message EntityCanvazRequest { + repeated Entity entities = 1; + message Entity { + string entity_uri = 1; + string etag = 2; + } +} \ No newline at end of file diff --git a/protocol/proto/connect.proto b/protocol/proto/connect.proto index 310a5b55..dae2561a 100644 --- a/protocol/proto/connect.proto +++ b/protocol/proto/connect.proto @@ -70,7 +70,7 @@ message DeviceInfo { Capabilities capabilities = 4; repeated DeviceMetadata metadata = 5; string device_software_version = 6; - devices.DeviceType device_type = 7; + spotify.connectstate.devices.DeviceType device_type = 7; string spirc_version = 9; string device_id = 10; bool is_private_session = 11; @@ -82,7 +82,7 @@ message DeviceInfo { string product_id = 17; string deduplication_id = 18; uint32 selected_alias_id = 19; - map device_aliases = 20; + map device_aliases = 20; bool is_offline = 21; string public_ip = 22; string license = 23; @@ -134,7 +134,7 @@ message Capabilities { bool supports_set_options_command = 25; CapabilitySupportDetails supports_hifi = 26; - reserved 1, 4, 24, "supported_contexts", "supports_lossless_audio"; + // reserved 1, 4, 24, "supported_contexts", "supports_lossless_audio"; } message CapabilitySupportDetails { diff --git a/src/main.rs b/src/main.rs index a3687aaa..185a9bf2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -606,15 +606,11 @@ fn get_setup(args: &[String]) -> Setup { match Url::parse(&s) { Ok(url) => { if url.host().is_none() || url.port_or_known_default().is_none() { - panic!("Invalid proxy url, only URLs on the format \"http://host:port\" are allowed"); - } - - if url.scheme() != "http" { - panic!("Only unsecure http:// proxies are supported"); + panic!("Invalid proxy url, only URLs on the format \"http(s)://host:port\" are allowed"); } url }, - Err(err) => panic!("Invalid proxy URL: {}, only URLs in the format \"http://host:port\" are allowed", err) + Err(err) => panic!("Invalid proxy URL: {}, only URLs in the format \"http(s)://host:port\" are allowed", err) } }, ),