Introduce HTTP client

This commit is contained in:
Roderick van Domburg 2021-06-20 23:09:27 +02:00
parent ce4f8dc288
commit 15628842af
No known key found for this signature in database
GPG key ID: 7076AA781B43EFE6
5 changed files with 58 additions and 33 deletions

View file

@ -1,10 +1,7 @@
use std::error::Error; use crate::http_client::HttpClient;
use hyper::{Body, Request};
use hyper::client::HttpConnector;
use hyper::{Body, Client, Request};
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use serde::Deserialize; use serde::Deserialize;
use url::Url; use std::error::Error;
const APRESOLVE_ENDPOINT: &str = const APRESOLVE_ENDPOINT: &str =
"http://apresolve.spotify.com/?type=accesspoint&type=dealer&type=spclient"; "http://apresolve.spotify.com/?type=accesspoint&type=dealer&type=spclient";
@ -56,35 +53,21 @@ fn select_ap(data: Vec<String>, fallback: &str, ap_port: Option<u16>) -> SocketA
ap.unwrap_or_else(|| (String::from(fallback), port)) ap.unwrap_or_else(|| (String::from(fallback), port))
} }
async fn try_apresolve(proxy: Option<&Url>) -> Result<ApResolveData, Box<dyn Error>> { async fn try_apresolve(http_client: &HttpClient) -> Result<ApResolveData, Box<dyn Error>> {
let req = Request::builder() let req = Request::builder()
.method("GET") .method("GET")
.uri(APRESOLVE_ENDPOINT) .uri(APRESOLVE_ENDPOINT)
.body(Body::empty()) .body(Body::empty())
.unwrap(); .unwrap();
let response = if let Some(url) = proxy { let body = http_client.request_body(req).await?;
// Panic safety: all URLs are valid URIs
let uri = url.to_string().parse().unwrap();
let proxy = Proxy::new(Intercept::All, uri);
let connector = HttpConnector::new();
let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy);
Client::builder()
.build(proxy_connector)
.request(req)
.await?
} else {
Client::new().request(req).await?
};
let body = hyper::body::to_bytes(response.into_body()).await?;
let data: ApResolveData = serde_json::from_slice(body.as_ref())?; let data: ApResolveData = serde_json::from_slice(body.as_ref())?;
Ok(data) Ok(data)
} }
pub async fn apresolve(proxy: Option<&Url>, ap_port: Option<u16>) -> AccessPoints { pub async fn apresolve(http_client: &HttpClient, ap_port: Option<u16>) -> AccessPoints {
let data = try_apresolve(proxy).await.unwrap_or_else(|e| { let data = try_apresolve(http_client).await.unwrap_or_else(|e| {
warn!("Failed to resolve access points: {}, using fallbacks.", e); warn!("Failed to resolve access points: {}, using fallbacks.", e);
ApResolveData::default() ApResolveData::default()
}); });
@ -105,10 +88,12 @@ mod test {
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use super::apresolve; use super::apresolve;
use crate::http_client::HttpClient;
#[tokio::test] #[tokio::test]
async fn test_apresolve() { async fn test_apresolve() {
let aps = apresolve(None, None).await; let http_client = HttpClient::new(None);
let aps = apresolve(&http_client, None).await;
// Assert that the result contains a valid host and port // Assert that the result contains a valid host and port
aps.accesspoint.to_socket_addrs().unwrap().next().unwrap(); aps.accesspoint.to_socket_addrs().unwrap().next().unwrap();
@ -118,7 +103,8 @@ mod test {
#[tokio::test] #[tokio::test]
async fn test_apresolve_port_443() { async fn test_apresolve_port_443() {
let aps = apresolve(None, Some(443)).await; let http_client = HttpClient::new(None);
let aps = apresolve(&http_client, Some(443)).await;
let port = aps let port = aps
.accesspoint .accesspoint

34
core/src/http_client.rs Normal file
View file

@ -0,0 +1,34 @@
use hyper::client::HttpConnector;
use hyper::{Body, Client, Request, Response};
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use url::Url;
pub struct HttpClient {
proxy: Option<Url>,
}
impl HttpClient {
pub fn new(proxy: Option<&Url>) -> Self {
Self {
proxy: proxy.cloned(),
}
}
pub async fn request(&self, req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
if let Some(url) = &self.proxy {
// Panic safety: all URLs are valid URIs
let uri = url.to_string().parse().unwrap();
let proxy = Proxy::new(Intercept::All, uri);
let connector = HttpConnector::new();
let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy);
Client::builder().build(proxy_connector).request(req).await
} else {
Client::new().request(req).await
}
}
pub async fn request_body(&self, req: Request<Body>) -> Result<bytes::Bytes, hyper::Error> {
let response = self.request(req).await?;
hyper::body::to_bytes(response.into_body()).await
}
}

View file

@ -19,11 +19,13 @@ mod connection;
mod dealer; mod dealer;
#[doc(hidden)] #[doc(hidden)]
pub mod diffie_hellman; pub mod diffie_hellman;
mod http_client;
pub mod keymaster; pub mod keymaster;
pub mod mercury; pub mod mercury;
mod proxytunnel; mod proxytunnel;
pub mod session; pub mod session;
mod socket; mod socket;
mod spclient;
pub mod spotify_id; pub mod spotify_id;
mod token; mod token;
#[doc(hidden)] #[doc(hidden)]

View file

@ -23,6 +23,7 @@ use crate::cache::Cache;
use crate::channel::ChannelManager; use crate::channel::ChannelManager;
use crate::config::SessionConfig; use crate::config::SessionConfig;
use crate::connection::{self, AuthenticationError}; use crate::connection::{self, AuthenticationError};
use crate::http_client::HttpClient;
use crate::mercury::MercuryManager; use crate::mercury::MercuryManager;
use crate::token::TokenProvider; use crate::token::TokenProvider;
@ -45,6 +46,7 @@ struct SessionInternal {
config: SessionConfig, config: SessionConfig,
data: RwLock<SessionData>, data: RwLock<SessionData>,
http_client: HttpClient,
tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>, tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
audio_key: OnceCell<AudioKeyManager>, audio_key: OnceCell<AudioKeyManager>,
@ -69,22 +71,22 @@ impl Session {
credentials: Credentials, credentials: Credentials,
cache: Option<Cache>, cache: Option<Cache>,
) -> Result<Session, SessionError> { ) -> Result<Session, SessionError> {
let ap = apresolve(config.proxy.as_ref(), config.ap_port) let http_client = HttpClient::new(config.proxy.as_ref());
.await let ap = apresolve(&http_client, config.ap_port).await.accesspoint;
.accesspoint;
info!("Connecting to AP \"{}:{}\"", ap.0, ap.1); info!("Connecting to AP \"{}:{}\"", ap.0, ap.1);
let mut conn = connection::connect(&ap.0, ap.1, config.proxy.as_ref()).await?; let mut transport = connection::connect(&ap.0, ap.1, config.proxy.as_ref()).await?;
let reusable_credentials = let reusable_credentials =
connection::authenticate(&mut conn, credentials, &config.device_id).await?; connection::authenticate(&mut transport, credentials, &config.device_id).await?;
info!("Authenticated as \"{}\" !", reusable_credentials.username); info!("Authenticated as \"{}\" !", reusable_credentials.username);
if let Some(cache) = &cache { if let Some(cache) = &cache {
cache.save_credentials(&reusable_credentials); cache.save_credentials(&reusable_credentials);
} }
let session = Session::create( let session = Session::create(
conn, transport,
http_client,
config, config,
cache, cache,
reusable_credentials.username, reusable_credentials.username,
@ -96,6 +98,7 @@ impl Session {
fn create( fn create(
transport: connection::Transport, transport: connection::Transport,
http_client: HttpClient,
config: SessionConfig, config: SessionConfig,
cache: Option<Cache>, cache: Option<Cache>,
username: String, username: String,
@ -116,6 +119,7 @@ impl Session {
invalid: false, invalid: false,
time_delta: 0, time_delta: 0,
}), }),
http_client,
tx_connection: sender_tx, tx_connection: sender_tx,
cache: cache.map(Arc::new), cache: cache.map(Arc::new),
audio_key: OnceCell::new(), audio_key: OnceCell::new(),

View file

@ -1,2 +1 @@
// https://github.com/librespot-org/librespot-java/blob/27783e06f456f95228c5ac37acf2bff8c1a8a0c4/lib/src/main/java/xyz/gianlu/librespot/dealer/ApiClient.java // https://github.com/librespot-org/librespot-java/blob/27783e06f456f95228c5ac37acf2bff8c1a8a0c4/lib/src/main/java/xyz/gianlu/librespot/dealer/ApiClient.java