From 07514c9dcca59bd13b149cd19a276fa27c6d6986 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Mon, 25 Jan 2021 20:55:49 +0100 Subject: [PATCH 1/5] Add proxy support to apresolve --- Cargo.lock | 46 +++++++++++++++++++++++++++++++++++++++++++ core/Cargo.toml | 1 + core/src/apresolve.rs | 27 ++++++++++++------------- core/src/lib.rs | 1 + 4 files changed, 61 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5813a707..3754fd69 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -962,6 +962,31 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" +[[package]] +name = "headers" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62689dc57c7456e69712607ffcbd0aa1dfcccf9af73727e9b25bc1825375cac3" +dependencies = [ + "base64 0.13.0", + "bitflags 1.2.1", + "bytes", + "headers-core", + "http", + "mime", + "sha-1", + "time 0.1.43", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.3.2" @@ -1047,6 +1072,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-proxy" +version = "0.8.0" +source = "git+https://github.com/e00E/hyper-proxy.git?branch=upgrade-tokio#4be706f2f0297bd3d14f301b6ea0be8f3078bb17" +dependencies = [ + "bytes", + "futures", + "headers", + "http", + "hyper", + "tokio", + "tower-service", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -1323,6 +1362,7 @@ dependencies = [ "hmac", "httparse", "hyper", + "hyper-proxy", "librespot-protocol", "log", "num-bigint", @@ -1452,6 +1492,12 @@ version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "miniz_oxide" version = "0.4.3" diff --git a/core/Cargo.toml b/core/Cargo.toml index c092c04d..4ff46936 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -21,6 +21,7 @@ futures = { version = "0.3", features = ["bilock", "unstable"] } hmac = "0.7" httparse = "1.3" hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] } +hyper-proxy = { git = "https://github.com/e00E/hyper-proxy.git", branch="upgrade-tokio", default_features = false } log = "0.4" num-bigint = "0.3" num-integer = "0.1" diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index 07c2958f..d35b2091 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -1,7 +1,8 @@ const AP_FALLBACK: &'static str = "ap.spotify.com:443"; const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/"; -use hyper::{Body, Client, Method, Request, Uri}; +use hyper::{client::HttpConnector, Body, Client, Method, Request, Uri}; +use hyper_proxy::{Intercept, Proxy, ProxyConnector}; use std::error::Error; use url::Url; @@ -13,7 +14,7 @@ pub struct APResolveData { async fn apresolve(proxy: &Option, ap_port: &Option) -> Result> { let port = ap_port.unwrap_or(443); - let req = Request::builder() + let mut req = Request::builder() .method(Method::GET) .uri( APRESOLVE_ENDPOINT @@ -22,25 +23,22 @@ async fn apresolve(proxy: &Option, ap_port: &Option) -> Result, ap_port: &Option) -> Result Date: Mon, 25 Jan 2021 20:56:22 +0100 Subject: [PATCH 2/5] Replaced .fold(0, add) by .sum() --- audio/src/range_set.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs index 8712dfd4..d01d888e 100644 --- a/audio/src/range_set.rs +++ b/audio/src/range_set.rs @@ -54,7 +54,7 @@ impl RangeSet { } pub fn len(&self) -> usize { - self.ranges.iter().map(|r| r.length).fold(0, std::ops::Add::add) + self.ranges.iter().map(|r| r.length).sum() } pub fn get_range(&self, index: usize) -> Range { From a45fe85c27512b9d1a25b191731dc90103905f62 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Sat, 30 Jan 2021 13:53:44 +0100 Subject: [PATCH 3/5] Enable logging in test --- Cargo.lock | 61 +++++++++++++++++++++++++++++++++++++++++++ core/Cargo.toml | 1 + core/tests/connect.rs | 1 + 3 files changed, 63 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 3754fd69..e4176518 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -64,6 +64,15 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "aho-corasick" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5" +dependencies = [ + "memchr", +] + [[package]] name = "alsa" version = "0.2.2" @@ -152,6 +161,17 @@ dependencies = [ "syn", ] +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -567,6 +587,19 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "env_logger" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26ecb66b4bdca6c1409b40fb255eefc2bd4f6d135dab3c3124f80ffa2a9661e" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + [[package]] name = "error-chain" version = "0.12.4" @@ -1048,6 +1081,12 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "494b4d60369511e7dea41cf646832512a94e542f68bb9c49e54518e0f468eb47" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.2" @@ -1358,6 +1397,7 @@ dependencies = [ "base64 0.13.0", "byteorder", "bytes", + "env_logger", "futures", "hmac", "httparse", @@ -2172,7 +2212,10 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9251239e129e16308e70d853559389de218ac275b515068abc96829d05b948a" dependencies = [ + "aho-corasick", + "memchr", "regex-syntax", + "thread_local", ] [[package]] @@ -2598,6 +2641,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "termcolor" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dfed899f0eb03f32ee8c6a0aabdb8a7949659e3466561fc0adf54e26d88c5f4" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.23" @@ -2618,6 +2670,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8208a331e1cb318dd5bd76951d2b8fc48ca38a69f5f4e4af1b6a9f8c6236915" +dependencies = [ + "once_cell", +] + [[package]] name = "time" version = "0.1.43" diff --git a/core/Cargo.toml b/core/Cargo.toml index 4ff46936..1f7b9afc 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -46,4 +46,5 @@ rand = "0.7" vergen = "3.0.4" [dev-dependencies] +env_logger = "*" tokio = {version = "1.0", features = ["macros"] } \ No newline at end of file diff --git a/core/tests/connect.rs b/core/tests/connect.rs index 44d418a1..4ea2a1fe 100644 --- a/core/tests/connect.rs +++ b/core/tests/connect.rs @@ -7,6 +7,7 @@ mod tests { use apresolve::apresolve_or_fallback; #[tokio::test] async fn test_ap_resolve() { + env_logger::init(); let ap = apresolve_or_fallback(&None, &None).await; println!("AP: {:?}", ap); } From c1d62d72a7bd12f757d7d5b09b02b2c2387eda38 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Sat, 30 Jan 2021 14:03:34 +0100 Subject: [PATCH 4/5] Fixed ProxyTunnel --- core/src/proxytunnel.rs | 55 ++++++++++++++++++++++++----------------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/core/src/proxytunnel.rs b/core/src/proxytunnel.rs index 508de7f8..c8e9eab6 100644 --- a/core/src/proxytunnel.rs +++ b/core/src/proxytunnel.rs @@ -17,29 +17,40 @@ pub async fn connect( .into_bytes(); connection.write_all(buffer.as_ref()).await?; - buffer.clear(); - connection.read_to_end(&mut buffer).await?; - if buffer.is_empty() { - return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); - } + buffer.resize(buffer.capacity(), 0); - let mut headers = [httparse::EMPTY_HEADER; 16]; - let mut response = httparse::Response::new(&mut headers); - - response - .parse(&buffer[..]) - .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?; - - match response.code { - Some(200) => Ok(connection), // Proxy says all is well - Some(code) => { - let reason = response.reason.unwrap_or("no reason"); - let msg = format!("Proxy responded with {}: {}", code, reason); - Err(io::Error::new(io::ErrorKind::Other, msg)) + let mut offset = 0; + loop { + let bytes_read = connection.read(&mut buffer[offset..]).await?; + if bytes_read == 0 { + return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); + } + offset += bytes_read; + + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut response = httparse::Response::new(&mut headers); + + let status = response + .parse(&buffer[..offset]) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; + + if status.is_complete() { + return match response.code { + Some(200) => Ok(connection), // Proxy says all is well + Some(code) => { + let reason = response.reason.unwrap_or("no reason"); + let msg = format!("Proxy responded with {}: {}", code, reason); + Err(io::Error::new(io::ErrorKind::Other, msg)) + } + None => Err(io::Error::new( + io::ErrorKind::Other, + "Malformed response from proxy", + )), + }; + } + + if offset >= buffer.len() { + buffer.resize(buffer.len() * 2, 0); } - None => Err(io::Error::new( - io::ErrorKind::Other, - "Malformed response from proxy", - )), } } From bb44b99c92f16c2770651e75647a3e19cad8b77a Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Sat, 30 Jan 2021 14:45:31 +0100 Subject: [PATCH 5/5] Use proxytunnel in apresolve Implementing the tower_service::Service trait for a newly created ProxyTunnel struct, so it can be used as connector in hyper. --- Cargo.lock | 47 +--------------------- core/Cargo.toml | 2 +- core/src/apresolve.rs | 25 +++++------- core/src/connection/mod.rs | 18 ++++++++- core/src/lib.rs | 2 +- core/src/proxytunnel.rs | 82 ++++++++++++++++++++++++++++++-------- 6 files changed, 95 insertions(+), 81 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e4176518..7606ed82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -995,31 +995,6 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" -[[package]] -name = "headers" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62689dc57c7456e69712607ffcbd0aa1dfcccf9af73727e9b25bc1825375cac3" -dependencies = [ - "base64 0.13.0", - "bitflags 1.2.1", - "bytes", - "headers-core", - "http", - "mime", - "sha-1", - "time 0.1.43", -] - -[[package]] -name = "headers-core" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" -dependencies = [ - "http", -] - [[package]] name = "heck" version = "0.3.2" @@ -1111,20 +1086,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-proxy" -version = "0.8.0" -source = "git+https://github.com/e00E/hyper-proxy.git?branch=upgrade-tokio#4be706f2f0297bd3d14f301b6ea0be8f3078bb17" -dependencies = [ - "bytes", - "futures", - "headers", - "http", - "hyper", - "tokio", - "tower-service", -] - [[package]] name = "ident_case" version = "1.0.1" @@ -1402,7 +1363,6 @@ dependencies = [ "hmac", "httparse", "hyper", - "hyper-proxy", "librespot-protocol", "log", "num-bigint", @@ -1420,6 +1380,7 @@ dependencies = [ "shannon", "tokio", "tokio-util", + "tower-service", "url 1.7.2", "uuid", "vergen", @@ -1532,12 +1493,6 @@ version = "2.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" -[[package]] -name = "mime" -version = "0.3.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" - [[package]] name = "miniz_oxide" version = "0.4.3" diff --git a/core/Cargo.toml b/core/Cargo.toml index 1f7b9afc..e0d79527 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -21,7 +21,6 @@ futures = { version = "0.3", features = ["bilock", "unstable"] } hmac = "0.7" httparse = "1.3" hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] } -hyper-proxy = { git = "https://github.com/e00E/hyper-proxy.git", branch="upgrade-tokio", default_features = false } log = "0.4" num-bigint = "0.3" num-integer = "0.1" @@ -38,6 +37,7 @@ sha-1 = "~0.8" shannon = "0.2.0" tokio = { version = "1.0", features = ["io-util", "rt-multi-thread"] } tokio-util = { version = "0.6", features = ["codec"] } +tower-service = "0.3" url = "1.7" uuid = { version = "0.8", features = ["v4"] } diff --git a/core/src/apresolve.rs b/core/src/apresolve.rs index d35b2091..81340c9d 100644 --- a/core/src/apresolve.rs +++ b/core/src/apresolve.rs @@ -1,11 +1,12 @@ const AP_FALLBACK: &'static str = "ap.spotify.com:443"; -const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/"; +const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com:80"; -use hyper::{client::HttpConnector, Body, Client, Method, Request, Uri}; -use hyper_proxy::{Intercept, Proxy, ProxyConnector}; +use hyper::{Body, Client, Method, Request, Uri}; use std::error::Error; use url::Url; +use crate::proxytunnel::ProxyTunnel; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct APResolveData { ap_list: Vec, @@ -14,7 +15,7 @@ pub struct APResolveData { async fn apresolve(proxy: &Option, ap_port: &Option) -> Result> { let port = ap_port.unwrap_or(443); - let mut req = Request::builder() + let req = Request::builder() .method(Method::GET) .uri( APRESOLVE_ENDPOINT @@ -24,18 +25,10 @@ async fn apresolve(proxy: &Option, ap_port: &Option) -> Result; pub async fn connect(addr: String, proxy: &Option) -> io::Result { let socket = if let Some(proxy) = proxy { info!("Using proxy \"{}\"", proxy); + + let mut split = addr.rsplit(':'); + + let port = split + .next() + .unwrap() // will never panic, split iterator contains at least one element + .parse() + .map_err(|e| { + io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid port: {}", e)) + })?; + + let host = split + .next() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?; + let socket_addr = proxy.to_socket_addrs().and_then(|mut iter| { iter.next().ok_or_else(|| { io::Error::new( @@ -31,7 +46,8 @@ pub async fn connect(addr: String, proxy: &Option) -> io::Result }) })?; let socket = TcpStream::connect(&socket_addr).await?; - proxytunnel::connect(socket, &addr).await? + + proxytunnel::connect(socket, host, port).await? } else { let socket_addr = addr.to_socket_addrs().and_then(|mut iter| { iter.next().ok_or_else(|| { diff --git a/core/src/lib.rs b/core/src/lib.rs index 4fb632a0..3e332c28 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -14,7 +14,6 @@ extern crate futures; extern crate hmac; extern crate httparse; extern crate hyper; -extern crate hyper_proxy; extern crate num_bigint; extern crate num_integer; extern crate num_traits; @@ -28,6 +27,7 @@ extern crate sha1; extern crate shannon; pub extern crate tokio; extern crate tokio_util; +extern crate tower_service; extern crate url; extern crate uuid; diff --git a/core/src/proxytunnel.rs b/core/src/proxytunnel.rs index c8e9eab6..c2033c85 100644 --- a/core/src/proxytunnel.rs +++ b/core/src/proxytunnel.rs @@ -1,27 +1,36 @@ -use std::io; - +use futures::Future; use hyper::Uri; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use std::{ + io, + net::{SocketAddr, ToSocketAddrs}, + pin::Pin, + task::Poll, +}; +use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, + net::TcpStream, +}; +use tower_service::Service; pub async fn connect( - mut connection: T, - connect_url: &str, + mut proxy_connection: T, + connect_host: &str, + connect_port: u16, ) -> io::Result { - let uri = connect_url.parse::().unwrap(); - let mut buffer = format!( - "CONNECT {0}:{1} HTTP/1.1\r\n\ - \r\n", - uri.host().unwrap_or_else(|| panic!("No host in {}", uri)), - uri.port().unwrap_or_else(|| panic!("No port in {}", uri)) - ) - .into_bytes(); - connection.write_all(buffer.as_ref()).await?; + let mut buffer = Vec::new(); + buffer.extend_from_slice(b"CONNECT "); + buffer.extend_from_slice(connect_host.as_bytes()); + buffer.push(b':'); + buffer.extend_from_slice(connect_port.to_string().as_bytes()); + buffer.extend_from_slice(b" HTTP/1.1\r\n\r\n"); + + proxy_connection.write_all(buffer.as_ref()).await?; buffer.resize(buffer.capacity(), 0); let mut offset = 0; loop { - let bytes_read = connection.read(&mut buffer[offset..]).await?; + let bytes_read = proxy_connection.read(&mut buffer[offset..]).await?; if bytes_read == 0 { return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); } @@ -36,7 +45,7 @@ pub async fn connect( if status.is_complete() { return match response.code { - Some(200) => Ok(connection), // Proxy says all is well + Some(200) => Ok(proxy_connection), // Proxy says all is well Some(code) => { let reason = response.reason.unwrap_or("no reason"); let msg = format!("Proxy responded with {}: {}", code, reason); @@ -54,3 +63,44 @@ pub async fn connect( } } } + +#[derive(Clone)] +pub struct ProxyTunnel { + proxy_addr: SocketAddr, +} + +impl ProxyTunnel { + pub fn new(addr: T) -> io::Result { + let addr = addr.to_socket_addrs()?.next().ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "No socket address given") + })?; + Ok(Self { proxy_addr: addr }) + } +} + +impl Service for ProxyTunnel { + type Response = TcpStream; + type Error = io::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, url: Uri) -> Self::Future { + let proxy_addr = self.proxy_addr; + let fut = async move { + let host = url + .host() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Host is missing"))?; + let port = url + .port() + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Port is missing"))?; + + let conn = TcpStream::connect(proxy_addr).await?; + connect(conn, host, port.as_u16()).await + }; + + Box::pin(fut) + } +}