Introduce caching ApResolver component

This commit is contained in:
Roderick van Domburg 2021-06-21 23:49:37 +02:00
parent b6357a27a5
commit eee79f2a1e
No known key found for this signature in database
GPG key ID: 7076AA781B43EFE6
2 changed files with 139 additions and 140 deletions

View file

@ -1,118 +1,124 @@
use crate::http_client::HttpClient;
use hyper::{Body, Request}; use hyper::{Body, Request};
use serde::Deserialize; use serde::Deserialize;
use std::error::Error; use std::error::Error;
const APRESOLVE_ENDPOINT: &str =
"http://apresolve.spotify.com/?type=accesspoint&type=dealer&type=spclient";
// These addresses probably do some geo-location based traffic management or at least DNS-based
// load balancing. They are known to fail when the normal resolvers are up, so that's why they
// should only be used as fallback.
const AP_FALLBACK: &str = "ap.spotify.com";
const DEALER_FALLBACK: &str = "dealer.spotify.com";
const SPCLIENT_FALLBACK: &str = "spclient.wg.spotify.com";
const FALLBACK_PORT: u16 = 443;
pub type SocketAddress = (String, u16); pub type SocketAddress = (String, u16);
#[derive(Clone, Debug, Default, Deserialize)] #[derive(Default)]
struct AccessPoints {
accesspoint: Vec<SocketAddress>,
dealer: Vec<SocketAddress>,
spclient: Vec<SocketAddress>,
}
#[derive(Deserialize)]
struct ApResolveData { struct ApResolveData {
accesspoint: Vec<String>, accesspoint: Vec<String>,
dealer: Vec<String>, dealer: Vec<String>,
spclient: Vec<String>, spclient: Vec<String>,
} }
#[derive(Clone, Debug, Deserialize)] // These addresses probably do some geo-location based traffic management or at least DNS-based
pub struct AccessPoints { // load balancing. They are known to fail when the normal resolvers are up, so that's why they
pub accesspoint: SocketAddress, // should only be used as fallback.
pub dealer: SocketAddress, impl Default for ApResolveData {
pub spclient: SocketAddress, fn default() -> Self {
} Self {
accesspoint: vec![String::from("ap.spotify.com:443")],
fn select_ap(data: Vec<String>, fallback: &str, ap_port: Option<u16>) -> SocketAddress { dealer: vec![String::from("dealer.spotify.com:443")],
let port = ap_port.unwrap_or(FALLBACK_PORT); spclient: vec![String::from("spclient.wg.spotify.com:443")],
}
let mut aps = data.into_iter().filter_map(|ap| {
let mut split = ap.rsplitn(2, ':');
let port = split
.next()
.expect("rsplitn should not return empty iterator");
let host = split.next()?.to_owned();
let port: u16 = port.parse().ok()?;
Some((host, port))
});
let ap = if ap_port.is_some() {
aps.find(|(_, p)| *p == port)
} else {
aps.next()
};
ap.unwrap_or_else(|| (String::from(fallback), port))
}
async fn try_apresolve(http_client: &HttpClient) -> Result<ApResolveData, Box<dyn Error>> {
let req = Request::builder()
.method("GET")
.uri(APRESOLVE_ENDPOINT)
.body(Body::empty())
.unwrap();
let body = http_client.request_body(req).await?;
let data: ApResolveData = serde_json::from_slice(body.as_ref())?;
Ok(data)
}
pub async fn apresolve(http_client: &HttpClient, ap_port: Option<u16>) -> AccessPoints {
let data = try_apresolve(http_client).await.unwrap_or_else(|e| {
warn!("Failed to resolve access points: {}, using fallbacks.", e);
ApResolveData::default()
});
let accesspoint = select_ap(data.accesspoint, AP_FALLBACK, ap_port);
let dealer = select_ap(data.dealer, DEALER_FALLBACK, ap_port);
let spclient = select_ap(data.spclient, SPCLIENT_FALLBACK, ap_port);
AccessPoints {
accesspoint,
dealer,
spclient,
} }
} }
#[cfg(test)] component! {
mod test { ApResolver : ApResolverInner {
use std::net::ToSocketAddrs; data: AccessPoints = AccessPoints::default(),
}
use super::apresolve; }
use crate::http_client::HttpClient;
impl ApResolver {
#[tokio::test] fn split_aps(data: Vec<String>) -> Vec<SocketAddress> {
async fn test_apresolve() { data.into_iter()
let http_client = HttpClient::new(None); .filter_map(|ap| {
let aps = apresolve(&http_client, None).await; let mut split = ap.rsplitn(2, ':');
let port = split
// Assert that the result contains a valid host and port .next()
aps.accesspoint.to_socket_addrs().unwrap().next().unwrap(); .expect("rsplitn should not return empty iterator");
aps.dealer.to_socket_addrs().unwrap().next().unwrap(); let host = split.next()?.to_owned();
aps.spclient.to_socket_addrs().unwrap().next().unwrap(); let port: u16 = port.parse().ok()?;
} Some((host, port))
})
#[tokio::test] .collect()
async fn test_apresolve_port_443() { }
let http_client = HttpClient::new(None);
let aps = apresolve(&http_client, Some(443)).await; fn find_ap(&self, data: &[SocketAddress]) -> usize {
match self.session().config().proxy {
let port = aps Some(_) => data
.accesspoint .iter()
.to_socket_addrs() .position(|(_, port)| *port == self.session().config().ap_port.unwrap_or(443))
.unwrap() .expect("No access points available with that proxy port."),
.next() None => 0, // just pick the first one
.unwrap() }
.port(); }
assert_eq!(port, 443);
async fn try_apresolve(&self) -> Result<ApResolveData, Box<dyn Error>> {
let req = Request::builder()
.method("GET")
.uri("http://apresolve.spotify.com/?type=accesspoint&type=dealer&type=spclient")
.body(Body::empty())
.unwrap();
let body = self.session().http_client().request_body(req).await?;
let data: ApResolveData = serde_json::from_slice(body.as_ref())?;
Ok(data)
}
async fn apresolve(&self) {
let result = self.try_apresolve().await;
self.lock(|inner| {
let data = match result {
Ok(data) => data,
Err(e) => {
warn!("Failed to resolve access points, using fallbacks: {}", e);
ApResolveData::default()
}
};
inner.data.accesspoint = Self::split_aps(data.accesspoint);
inner.data.dealer = Self::split_aps(data.dealer);
inner.data.spclient = Self::split_aps(data.spclient);
})
}
fn is_empty(&self) -> bool {
self.lock(|inner| {
inner.data.accesspoint.is_empty()
|| inner.data.dealer.is_empty()
|| inner.data.spclient.is_empty()
})
}
pub async fn resolve(&self, endpoint: &str) -> SocketAddress {
if self.is_empty() {
self.apresolve().await;
}
self.lock(|inner| match endpoint {
"accesspoint" => {
let pos = self.find_ap(&inner.data.accesspoint);
inner.data.accesspoint.remove(pos)
}
"dealer" => {
let pos = self.find_ap(&inner.data.dealer);
inner.data.dealer.remove(pos)
}
"spclient" => {
let pos = self.find_ap(&inner.data.spclient);
inner.data.spclient.remove(pos)
}
_ => unimplemented!(),
})
} }
} }

View file

@ -16,7 +16,7 @@ use thiserror::Error;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::apresolve::apresolve; use crate::apresolve::ApResolver;
use crate::audio_key::AudioKeyManager; use crate::audio_key::AudioKeyManager;
use crate::authentication::Credentials; use crate::authentication::Credentials;
use crate::cache::Cache; use crate::cache::Cache;
@ -49,6 +49,7 @@ struct SessionInternal {
http_client: HttpClient, http_client: HttpClient,
tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>, tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
apresolver: OnceCell<ApResolver>,
audio_key: OnceCell<AudioKeyManager>, audio_key: OnceCell<AudioKeyManager>,
channel: OnceCell<ChannelManager>, channel: OnceCell<ChannelManager>,
mercury: OnceCell<MercuryManager>, mercury: OnceCell<MercuryManager>,
@ -72,40 +73,6 @@ impl Session {
cache: Option<Cache>, cache: Option<Cache>,
) -> Result<Session, SessionError> { ) -> Result<Session, SessionError> {
let http_client = HttpClient::new(config.proxy.as_ref()); let http_client = HttpClient::new(config.proxy.as_ref());
let ap = apresolve(&http_client, config.ap_port).await.accesspoint;
info!("Connecting to AP \"{}:{}\"", ap.0, ap.1);
let mut transport = connection::connect(&ap.0, ap.1, config.proxy.as_ref()).await?;
let reusable_credentials =
connection::authenticate(&mut transport, credentials, &config.device_id).await?;
info!("Authenticated as \"{}\" !", reusable_credentials.username);
if let Some(cache) = &cache {
cache.save_credentials(&reusable_credentials);
}
let session = Session::create(
transport,
http_client,
config,
cache,
reusable_credentials.username,
tokio::runtime::Handle::current(),
);
Ok(session)
}
fn create(
transport: connection::Transport,
http_client: HttpClient,
config: SessionConfig,
cache: Option<Cache>,
username: String,
handle: tokio::runtime::Handle,
) -> Session {
let (sink, stream) = transport.split();
let (sender_tx, sender_rx) = mpsc::unbounded_channel(); let (sender_tx, sender_rx) = mpsc::unbounded_channel();
let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed); let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
@ -115,21 +82,37 @@ impl Session {
config, config,
data: RwLock::new(SessionData { data: RwLock::new(SessionData {
country: String::new(), country: String::new(),
canonical_username: username, canonical_username: String::new(),
invalid: false, invalid: false,
time_delta: 0, time_delta: 0,
}), }),
http_client, http_client,
tx_connection: sender_tx, tx_connection: sender_tx,
cache: cache.map(Arc::new), cache: cache.map(Arc::new),
apresolver: OnceCell::new(),
audio_key: OnceCell::new(), audio_key: OnceCell::new(),
channel: OnceCell::new(), channel: OnceCell::new(),
mercury: OnceCell::new(), mercury: OnceCell::new(),
token_provider: OnceCell::new(), token_provider: OnceCell::new(),
handle, handle: tokio::runtime::Handle::current(),
session_id, session_id,
})); }));
let ap = session.apresolver().resolve("accesspoint").await;
info!("Connecting to AP \"{}:{}\"", ap.0, ap.1);
let mut transport =
connection::connect(&ap.0, ap.1, session.config().proxy.as_ref()).await?;
let reusable_credentials =
connection::authenticate(&mut transport, credentials, &session.config().device_id)
.await?;
info!("Authenticated as \"{}\" !", reusable_credentials.username);
session.0.data.write().unwrap().canonical_username = reusable_credentials.username.clone();
if let Some(cache) = session.cache() {
cache.save_credentials(&reusable_credentials);
}
let (sink, stream) = transport.split();
let sender_task = UnboundedReceiverStream::new(sender_rx) let sender_task = UnboundedReceiverStream::new(sender_rx)
.map(Ok) .map(Ok)
.forward(sink); .forward(sink);
@ -143,7 +126,13 @@ impl Session {
} }
}); });
session Ok(session)
}
pub fn apresolver(&self) -> &ApResolver {
self.0
.apresolver
.get_or_init(|| ApResolver::new(self.weak()))
} }
pub fn audio_key(&self) -> &AudioKeyManager { pub fn audio_key(&self) -> &AudioKeyManager {
@ -158,6 +147,10 @@ impl Session {
.get_or_init(|| ChannelManager::new(self.weak())) .get_or_init(|| ChannelManager::new(self.weak()))
} }
pub fn http_client(&self) -> &HttpClient {
&self.0.http_client
}
pub fn mercury(&self) -> &MercuryManager { pub fn mercury(&self) -> &MercuryManager {
self.0 self.0
.mercury .mercury
@ -230,7 +223,7 @@ impl Session {
self.0.cache.as_ref() self.0.cache.as_ref()
} }
fn config(&self) -> &SessionConfig { pub fn config(&self) -> &SessionConfig {
&self.0.config &self.0.config
} }