From da0deb1de6ce499f8ebfd4d4d76931efc3d25273 Mon Sep 17 00:00:00 2001 From: George Hahn Date: Thu, 6 Jun 2024 03:05:42 -0600 Subject: [PATCH] Convert discovery to hyper 1.x --- Cargo.lock | 10 ++-- discovery/Cargo.toml | 5 +- discovery/src/lib.rs | 2 +- discovery/src/server.rs | 119 +++++++++++++++++++++++++--------------- 4 files changed, 86 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6d8c4ea..1fa25197 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -942,7 +942,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" dependencies = [ - "base64 0.21.7", + "base64", "bytes", "headers-core", "http", @@ -1459,7 +1459,7 @@ name = "librespot-core" version = "0.5.0-dev" dependencies = [ "aes", - "base64 0.21.7", + "base64", "byteorder", "bytes", "data-encoding", @@ -1513,7 +1513,8 @@ name = "librespot-discovery" version = "0.5.0-dev" dependencies = [ "aes", - "base64 0.21.7", + "base64", + "bytes", "cfg-if", "ctr", "dns-sd", @@ -1523,6 +1524,7 @@ dependencies = [ "futures-util", "hex", "hmac", + "http-body-util", "hyper", "hyper-util", "libmdns", @@ -2455,7 +2457,7 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c333bb734fcdedcea57de1602543590f545f127dc8b533324318fd492c5c70b" dependencies = [ - "base64 0.21.7", + "base64", "rustls-pki-types", ] diff --git a/discovery/Cargo.toml b/discovery/Cargo.toml index 396a3f1c..345859b1 100644 --- a/discovery/Cargo.toml +++ b/discovery/Cargo.toml @@ -11,6 +11,7 @@ edition = "2021" [dependencies] aes = "0.8" base64 = "0.21" +bytes = "1" cfg-if = "1.0" ctr = "0.9" dns-sd = { version = "0.1.3", optional = true } @@ -18,7 +19,9 @@ form_urlencoded = "1.0" futures-core = "0.3" futures-util = "0.3" hmac = "0.12" -hyper = { version = "0.14", features = ["http1", "server", "tcp", "backports", "deprecated"] } +hyper = { version = "1.3", features = ["http1"] } +hyper-util = { version = "0.1", features = ["server-auto", "server-graceful", "service"] } +http-body-util = "0.1.1" libmdns = "0.8" log = "0.4" rand = "0.8" diff --git a/discovery/src/lib.rs b/discovery/src/lib.rs index 1764640b..8caeadfb 100644 --- a/discovery/src/lib.rs +++ b/discovery/src/lib.rs @@ -124,7 +124,7 @@ impl Builder { pub fn launch(self) -> Result { let mut port = self.port; let name = self.server_config.name.clone().into_owned(); - let server = DiscoveryServer::new(self.server_config, &mut port)??; + let server = DiscoveryServer::new(self.server_config, &mut port)?; let _zeroconf_ip = self.zeroconf_ip; let svc; diff --git a/discovery/src/server.rs b/discovery/src/server.rs index b1fe6ae0..29fcbcff 100644 --- a/discovery/src/server.rs +++ b/discovery/src/server.rs @@ -2,7 +2,7 @@ use std::{ borrow::Cow, collections::BTreeMap, convert::Infallible, - net::{Ipv4Addr, SocketAddr}, + net::{Ipv4Addr, SocketAddr, TcpListener}, pin::Pin, sync::Arc, task::{Context, Poll}, @@ -11,13 +11,16 @@ use std::{ use aes::cipher::{KeyIvInit, StreamCipher}; use base64::engine::general_purpose::STANDARD as BASE64; use base64::engine::Engine as _; +use bytes::Bytes; use futures_core::Stream; use futures_util::{FutureExt, TryFutureExt}; use hmac::{Hmac, Mac}; +use http_body_util::{BodyExt, Full}; use hyper::{ - body::HttpBody, service::{make_service_fn, service_fn}, Body, Method, Request, Response, StatusCode + body::Incoming, Method, Request, Response, StatusCode }; +use hyper_util::{rt::TokioIo, server::graceful::GracefulShutdown}; use log::{debug, error, warn}; use serde_json::json; use sha1::{Digest, Sha1}; @@ -62,7 +65,7 @@ impl RequestHandler { (discovery, rx) } - fn handle_get_info(&self) -> Response { + fn handle_get_info(&self) -> Response> { let public_key = BASE64.encode(self.keys.public_key()); let device_type: &str = self.config.device_type.into(); let mut active_user = String::new(); @@ -106,11 +109,11 @@ impl RequestHandler { // - "deviceAPI_isGroup": False }) .to_string(); - - Response::new(Body::from(body)) + let body = Bytes::from(body); + Response::new(Full::new(body)) } - fn handle_add_user(&self, params: &Params<'_>) -> Result, Error> { + fn handle_add_user(&self, params: &Params<'_>) -> Result>, Error> { let username_key = "userName"; let username = params .get(username_key) @@ -170,7 +173,8 @@ impl RequestHandler { }); let body = result.to_string(); - return Ok(Response::new(Body::from(body))); + let body = Bytes::from(body); + return Ok(Response::new(Full::new(body))); } let decrypted = { @@ -192,10 +196,11 @@ impl RequestHandler { }); let body = result.to_string(); - Ok(Response::new(Body::from(body))) + let body = Bytes::from(body); + Ok(Response::new(Full::new(body))) } - fn not_found(&self) -> Response { + fn not_found(&self) -> Response> { let mut res = Response::default(); *res.status_mut() = StatusCode::NOT_FOUND; res @@ -203,8 +208,8 @@ impl RequestHandler { async fn handle( self: Arc, - request: Request, - ) -> Result>, Error> { + request: Request, + ) -> Result>>, Error> { let mut params = Params::new(); let (parts, body) = request.into_parts(); @@ -238,52 +243,78 @@ pub struct DiscoveryServer { } impl DiscoveryServer { - pub fn new(config: Config, port: &mut u16) -> Result, Error> { + pub fn new(config: Config, port: &mut u16) -> Result { let (discovery, cred_rx) = RequestHandler::new(config); - let discovery = Arc::new(discovery); - + let address = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), *port); + let (close_tx, close_rx) = oneshot::channel(); - let address = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), *port); - - let make_service = make_service_fn(move |_| { - let discovery = discovery.clone(); - async move { - Ok::<_, hyper::Error>(service_fn(move |request| { - discovery - .clone() - .handle(request) - .inspect_err(|e| error!("could not handle discovery request: {}", e)) - .and_then(|x| async move { Ok(x) }) - .map(Result::unwrap) // guaranteed by `and_then` above - })) + let listener = match TcpListener::bind(address) { + Ok(listener) => listener, + Err(e) => { + warn!("Discovery server failed to start: {e}"); + return Err(e.into()); } - }); + }; - let server = hyper::Server::try_bind(&address)?.serve(make_service); + listener.set_nonblocking(true)?; + let listener = tokio::net::TcpListener::from_std(listener)?; - *port = server.local_addr().port(); - debug!("Zeroconf server listening on 0.0.0.0:{}", *port); + match listener.local_addr() { + Ok(addr) => { + *port = addr.port(); + debug!("Zeroconf server listening on 0.0.0.0:{}", *port); + } + Err(e) => { + warn!("Discovery server failed to start: {e}"); + return Err(e.into()); + } + } - tokio::spawn(async { - let result = server - .with_graceful_shutdown(async { - if let Err(e) = close_rx.await { - debug!("unable to close discovery Rx channel completely: {e}"); + + tokio::spawn(async move { + let discovery = Arc::new(discovery); + + let server = hyper::server::conn::http1::Builder::new(); + let graceful = GracefulShutdown::new(); + let mut close_rx = std::pin::pin!(close_rx); + loop { + tokio::select! { + Ok((stream, _)) = listener.accept() => { + let io = TokioIo::new(stream); + let discovery = discovery.clone(); + + let svc = hyper::service::service_fn(move |request| { + discovery + .clone() + .handle(request) + .inspect_err(|e| error!("could not handle discovery request: {}", e)) + .and_then(|x| async move { Ok(x) }) + .map(Result::unwrap) // guaranteed by `and_then` above + }); + + let conn = server.serve_connection(io, svc); + let fut = graceful.watch(conn); + tokio::spawn(async move { + // Errors are logged in the service_fn + let _ = fut.await; + }); } - debug!("Shutting down discovery server"); - }) - .await; - - if let Err(e) = result { - warn!("Discovery server failed: {}", e); + _ = &mut close_rx => { + debug!("Shutting down discovery server"); + break; + } + } } + + graceful.shutdown().await; + debug!("Discovery server stopped"); }); - Ok(Ok(Self { + Ok(Self { cred_rx, _close_tx: close_tx, - })) + }) } }