Use proxytunnel in apresolve

Implementing the tower_service::Service trait for a newly created
ProxyTunnel struct, so it can be used as connector in hyper.
This commit is contained in:
johannesd3 2021-01-30 14:45:31 +01:00
parent c1d62d72a7
commit bb44b99c92
6 changed files with 95 additions and 81 deletions

47
Cargo.lock generated
View file

@ -995,31 +995,6 @@ version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04"
[[package]]
name = "headers"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62689dc57c7456e69712607ffcbd0aa1dfcccf9af73727e9b25bc1825375cac3"
dependencies = [
"base64 0.13.0",
"bitflags 1.2.1",
"bytes",
"headers-core",
"http",
"mime",
"sha-1",
"time 0.1.43",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http",
]
[[package]] [[package]]
name = "heck" name = "heck"
version = "0.3.2" version = "0.3.2"
@ -1111,20 +1086,6 @@ dependencies = [
"want", "want",
] ]
[[package]]
name = "hyper-proxy"
version = "0.8.0"
source = "git+https://github.com/e00E/hyper-proxy.git?branch=upgrade-tokio#4be706f2f0297bd3d14f301b6ea0be8f3078bb17"
dependencies = [
"bytes",
"futures",
"headers",
"http",
"hyper",
"tokio",
"tower-service",
]
[[package]] [[package]]
name = "ident_case" name = "ident_case"
version = "1.0.1" version = "1.0.1"
@ -1402,7 +1363,6 @@ dependencies = [
"hmac", "hmac",
"httparse", "httparse",
"hyper", "hyper",
"hyper-proxy",
"librespot-protocol", "librespot-protocol",
"log", "log",
"num-bigint", "num-bigint",
@ -1420,6 +1380,7 @@ dependencies = [
"shannon", "shannon",
"tokio", "tokio",
"tokio-util", "tokio-util",
"tower-service",
"url 1.7.2", "url 1.7.2",
"uuid", "uuid",
"vergen", "vergen",
@ -1532,12 +1493,6 @@ version = "2.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525"
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]] [[package]]
name = "miniz_oxide" name = "miniz_oxide"
version = "0.4.3" version = "0.4.3"

View file

@ -21,7 +21,6 @@ futures = { version = "0.3", features = ["bilock", "unstable"] }
hmac = "0.7" hmac = "0.7"
httparse = "1.3" httparse = "1.3"
hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] } hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] }
hyper-proxy = { git = "https://github.com/e00E/hyper-proxy.git", branch="upgrade-tokio", default_features = false }
log = "0.4" log = "0.4"
num-bigint = "0.3" num-bigint = "0.3"
num-integer = "0.1" num-integer = "0.1"
@ -38,6 +37,7 @@ sha-1 = "~0.8"
shannon = "0.2.0" shannon = "0.2.0"
tokio = { version = "1.0", features = ["io-util", "rt-multi-thread"] } tokio = { version = "1.0", features = ["io-util", "rt-multi-thread"] }
tokio-util = { version = "0.6", features = ["codec"] } tokio-util = { version = "0.6", features = ["codec"] }
tower-service = "0.3"
url = "1.7" url = "1.7"
uuid = { version = "0.8", features = ["v4"] } uuid = { version = "0.8", features = ["v4"] }

View file

@ -1,11 +1,12 @@
const AP_FALLBACK: &'static str = "ap.spotify.com:443"; const AP_FALLBACK: &'static str = "ap.spotify.com:443";
const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/"; const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com:80";
use hyper::{client::HttpConnector, Body, Client, Method, Request, Uri}; use hyper::{Body, Client, Method, Request, Uri};
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use std::error::Error; use std::error::Error;
use url::Url; use url::Url;
use crate::proxytunnel::ProxyTunnel;
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct APResolveData { pub struct APResolveData {
ap_list: Vec<String>, ap_list: Vec<String>,
@ -14,7 +15,7 @@ pub struct APResolveData {
async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String, Box<dyn Error>> { async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String, Box<dyn Error>> {
let port = ap_port.unwrap_or(443); let port = ap_port.unwrap_or(443);
let mut req = Request::builder() let req = Request::builder()
.method(Method::GET) .method(Method::GET)
.uri( .uri(
APRESOLVE_ENDPOINT APRESOLVE_ENDPOINT
@ -24,18 +25,10 @@ async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String,
.body(Body::empty())?; .body(Body::empty())?;
let response = if let Some(url) = proxy { let response = if let Some(url) = proxy {
let proxy = { Client::builder()
let proxy_url = url.as_str().parse().expect("invalid http proxy"); .build(ProxyTunnel::new(url)?)
let proxy = Proxy::new(Intercept::All, proxy_url); .request(req)
let connector = HttpConnector::new(); .await?
ProxyConnector::from_proxy_unsecured(connector, proxy)
};
if let Some(headers) = proxy.http_headers(&APRESOLVE_ENDPOINT.parse().unwrap()) {
req.headers_mut().extend(headers.clone());
};
Client::builder().build(proxy).request(req).await?
} else { } else {
Client::new().request(req).await? Client::new().request(req).await?
}; };

View file

@ -22,6 +22,21 @@ pub type Transport = Framed<TcpStream, APCodec>;
pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport> { pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport> {
let socket = if let Some(proxy) = proxy { let socket = if let Some(proxy) = proxy {
info!("Using proxy \"{}\"", proxy); info!("Using proxy \"{}\"", proxy);
let mut split = addr.rsplit(':');
let port = split
.next()
.unwrap() // will never panic, split iterator contains at least one element
.parse()
.map_err(|e| {
io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid port: {}", e))
})?;
let host = split
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?;
let socket_addr = proxy.to_socket_addrs().and_then(|mut iter| { let socket_addr = proxy.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or_else(|| { iter.next().ok_or_else(|| {
io::Error::new( io::Error::new(
@ -31,7 +46,8 @@ pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport>
}) })
})?; })?;
let socket = TcpStream::connect(&socket_addr).await?; let socket = TcpStream::connect(&socket_addr).await?;
proxytunnel::connect(socket, &addr).await?
proxytunnel::connect(socket, host, port).await?
} else { } else {
let socket_addr = addr.to_socket_addrs().and_then(|mut iter| { let socket_addr = addr.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or_else(|| { iter.next().ok_or_else(|| {

View file

@ -14,7 +14,6 @@ extern crate futures;
extern crate hmac; extern crate hmac;
extern crate httparse; extern crate httparse;
extern crate hyper; extern crate hyper;
extern crate hyper_proxy;
extern crate num_bigint; extern crate num_bigint;
extern crate num_integer; extern crate num_integer;
extern crate num_traits; extern crate num_traits;
@ -28,6 +27,7 @@ extern crate sha1;
extern crate shannon; extern crate shannon;
pub extern crate tokio; pub extern crate tokio;
extern crate tokio_util; extern crate tokio_util;
extern crate tower_service;
extern crate url; extern crate url;
extern crate uuid; extern crate uuid;

View file

@ -1,27 +1,36 @@
use std::io; use futures::Future;
use hyper::Uri; use hyper::Uri;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use std::{
io,
net::{SocketAddr, ToSocketAddrs},
pin::Pin,
task::Poll,
};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::TcpStream,
};
use tower_service::Service;
pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>( pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
mut connection: T, mut proxy_connection: T,
connect_url: &str, connect_host: &str,
connect_port: u16,
) -> io::Result<T> { ) -> io::Result<T> {
let uri = connect_url.parse::<Uri>().unwrap(); let mut buffer = Vec::new();
let mut buffer = format!( buffer.extend_from_slice(b"CONNECT ");
"CONNECT {0}:{1} HTTP/1.1\r\n\ buffer.extend_from_slice(connect_host.as_bytes());
\r\n", buffer.push(b':');
uri.host().unwrap_or_else(|| panic!("No host in {}", uri)), buffer.extend_from_slice(connect_port.to_string().as_bytes());
uri.port().unwrap_or_else(|| panic!("No port in {}", uri)) buffer.extend_from_slice(b" HTTP/1.1\r\n\r\n");
)
.into_bytes(); proxy_connection.write_all(buffer.as_ref()).await?;
connection.write_all(buffer.as_ref()).await?;
buffer.resize(buffer.capacity(), 0); buffer.resize(buffer.capacity(), 0);
let mut offset = 0; let mut offset = 0;
loop { loop {
let bytes_read = connection.read(&mut buffer[offset..]).await?; let bytes_read = proxy_connection.read(&mut buffer[offset..]).await?;
if bytes_read == 0 { if bytes_read == 0 {
return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy"));
} }
@ -36,7 +45,7 @@ pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
if status.is_complete() { if status.is_complete() {
return match response.code { return match response.code {
Some(200) => Ok(connection), // Proxy says all is well Some(200) => Ok(proxy_connection), // Proxy says all is well
Some(code) => { Some(code) => {
let reason = response.reason.unwrap_or("no reason"); let reason = response.reason.unwrap_or("no reason");
let msg = format!("Proxy responded with {}: {}", code, reason); let msg = format!("Proxy responded with {}: {}", code, reason);
@ -54,3 +63,44 @@ pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
} }
} }
} }
#[derive(Clone)]
pub struct ProxyTunnel {
proxy_addr: SocketAddr,
}
impl ProxyTunnel {
pub fn new<T: ToSocketAddrs>(addr: T) -> io::Result<Self> {
let addr = addr.to_socket_addrs()?.next().ok_or_else(|| {
io::Error::new(io::ErrorKind::InvalidInput, "No socket address given")
})?;
Ok(Self { proxy_addr: addr })
}
}
impl Service<Uri> for ProxyTunnel {
type Response = TcpStream;
type Error = io::Error;
type Future = Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send>>;
fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, url: Uri) -> Self::Future {
let proxy_addr = self.proxy_addr;
let fut = async move {
let host = url
.host()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Host is missing"))?;
let port = url
.port()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Port is missing"))?;
let conn = TcpStream::connect(proxy_addr).await?;
connect(conn, host, port.as_u16()).await
};
Box::pin(fut)
}
}