Convert discovery to hyper 1.x

This commit is contained in:
George Hahn 2024-06-06 03:05:42 -06:00
parent 6a4053e871
commit da0deb1de6
4 changed files with 86 additions and 50 deletions

10
Cargo.lock generated
View file

@ -942,7 +942,7 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9"
dependencies = [
"base64 0.21.7",
"base64",
"bytes",
"headers-core",
"http",
@ -1459,7 +1459,7 @@ name = "librespot-core"
version = "0.5.0-dev"
dependencies = [
"aes",
"base64 0.21.7",
"base64",
"byteorder",
"bytes",
"data-encoding",
@ -1513,7 +1513,8 @@ name = "librespot-discovery"
version = "0.5.0-dev"
dependencies = [
"aes",
"base64 0.21.7",
"base64",
"bytes",
"cfg-if",
"ctr",
"dns-sd",
@ -1523,6 +1524,7 @@ dependencies = [
"futures-util",
"hex",
"hmac",
"http-body-util",
"hyper",
"hyper-util",
"libmdns",
@ -2455,7 +2457,7 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c333bb734fcdedcea57de1602543590f545f127dc8b533324318fd492c5c70b"
dependencies = [
"base64 0.21.7",
"base64",
"rustls-pki-types",
]

View file

@ -11,6 +11,7 @@ edition = "2021"
[dependencies]
aes = "0.8"
base64 = "0.21"
bytes = "1"
cfg-if = "1.0"
ctr = "0.9"
dns-sd = { version = "0.1.3", optional = true }
@ -18,7 +19,9 @@ form_urlencoded = "1.0"
futures-core = "0.3"
futures-util = "0.3"
hmac = "0.12"
hyper = { version = "0.14", features = ["http1", "server", "tcp", "backports", "deprecated"] }
hyper = { version = "1.3", features = ["http1"] }
hyper-util = { version = "0.1", features = ["server-auto", "server-graceful", "service"] }
http-body-util = "0.1.1"
libmdns = "0.8"
log = "0.4"
rand = "0.8"

View file

@ -124,7 +124,7 @@ impl Builder {
pub fn launch(self) -> Result<Discovery, Error> {
let mut port = self.port;
let name = self.server_config.name.clone().into_owned();
let server = DiscoveryServer::new(self.server_config, &mut port)??;
let server = DiscoveryServer::new(self.server_config, &mut port)?;
let _zeroconf_ip = self.zeroconf_ip;
let svc;

View file

@ -2,7 +2,7 @@ use std::{
borrow::Cow,
collections::BTreeMap,
convert::Infallible,
net::{Ipv4Addr, SocketAddr},
net::{Ipv4Addr, SocketAddr, TcpListener},
pin::Pin,
sync::Arc,
task::{Context, Poll},
@ -11,13 +11,16 @@ use std::{
use aes::cipher::{KeyIvInit, StreamCipher};
use base64::engine::general_purpose::STANDARD as BASE64;
use base64::engine::Engine as _;
use bytes::Bytes;
use futures_core::Stream;
use futures_util::{FutureExt, TryFutureExt};
use hmac::{Hmac, Mac};
use http_body_util::{BodyExt, Full};
use hyper::{
body::HttpBody, service::{make_service_fn, service_fn}, Body, Method, Request, Response, StatusCode
body::Incoming, Method, Request, Response, StatusCode
};
use hyper_util::{rt::TokioIo, server::graceful::GracefulShutdown};
use log::{debug, error, warn};
use serde_json::json;
use sha1::{Digest, Sha1};
@ -62,7 +65,7 @@ impl RequestHandler {
(discovery, rx)
}
fn handle_get_info(&self) -> Response<hyper::Body> {
fn handle_get_info(&self) -> Response<Full<Bytes>> {
let public_key = BASE64.encode(self.keys.public_key());
let device_type: &str = self.config.device_type.into();
let mut active_user = String::new();
@ -106,11 +109,11 @@ impl RequestHandler {
// - "deviceAPI_isGroup": False
})
.to_string();
Response::new(Body::from(body))
let body = Bytes::from(body);
Response::new(Full::new(body))
}
fn handle_add_user(&self, params: &Params<'_>) -> Result<Response<hyper::Body>, Error> {
fn handle_add_user(&self, params: &Params<'_>) -> Result<Response<Full<Bytes>>, Error> {
let username_key = "userName";
let username = params
.get(username_key)
@ -170,7 +173,8 @@ impl RequestHandler {
});
let body = result.to_string();
return Ok(Response::new(Body::from(body)));
let body = Bytes::from(body);
return Ok(Response::new(Full::new(body)));
}
let decrypted = {
@ -192,10 +196,11 @@ impl RequestHandler {
});
let body = result.to_string();
Ok(Response::new(Body::from(body)))
let body = Bytes::from(body);
Ok(Response::new(Full::new(body)))
}
fn not_found(&self) -> Response<hyper::Body> {
fn not_found(&self) -> Response<Full<Bytes>> {
let mut res = Response::default();
*res.status_mut() = StatusCode::NOT_FOUND;
res
@ -203,8 +208,8 @@ impl RequestHandler {
async fn handle(
self: Arc<Self>,
request: Request<Body>,
) -> Result<hyper::Result<Response<Body>>, Error> {
request: Request<Incoming>,
) -> Result<hyper::Result<Response<Full<Bytes>>>, Error> {
let mut params = Params::new();
let (parts, body) = request.into_parts();
@ -238,52 +243,78 @@ pub struct DiscoveryServer {
}
impl DiscoveryServer {
pub fn new(config: Config, port: &mut u16) -> Result<hyper::Result<Self>, Error> {
pub fn new(config: Config, port: &mut u16) -> Result<Self, Error> {
let (discovery, cred_rx) = RequestHandler::new(config);
let discovery = Arc::new(discovery);
let address = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), *port);
let (close_tx, close_rx) = oneshot::channel();
let address = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), *port);
let listener = match TcpListener::bind(address) {
Ok(listener) => listener,
Err(e) => {
warn!("Discovery server failed to start: {e}");
return Err(e.into());
}
};
let make_service = make_service_fn(move |_| {
listener.set_nonblocking(true)?;
let listener = tokio::net::TcpListener::from_std(listener)?;
match listener.local_addr() {
Ok(addr) => {
*port = addr.port();
debug!("Zeroconf server listening on 0.0.0.0:{}", *port);
}
Err(e) => {
warn!("Discovery server failed to start: {e}");
return Err(e.into());
}
}
tokio::spawn(async move {
let discovery = Arc::new(discovery);
let server = hyper::server::conn::http1::Builder::new();
let graceful = GracefulShutdown::new();
let mut close_rx = std::pin::pin!(close_rx);
loop {
tokio::select! {
Ok((stream, _)) = listener.accept() => {
let io = TokioIo::new(stream);
let discovery = discovery.clone();
async move {
Ok::<_, hyper::Error>(service_fn(move |request| {
let svc = hyper::service::service_fn(move |request| {
discovery
.clone()
.handle(request)
.inspect_err(|e| error!("could not handle discovery request: {}", e))
.and_then(|x| async move { Ok(x) })
.map(Result::unwrap) // guaranteed by `and_then` above
}))
}
});
let server = hyper::Server::try_bind(&address)?.serve(make_service);
*port = server.local_addr().port();
debug!("Zeroconf server listening on 0.0.0.0:{}", *port);
tokio::spawn(async {
let result = server
.with_graceful_shutdown(async {
if let Err(e) = close_rx.await {
debug!("unable to close discovery Rx channel completely: {e}");
let conn = server.serve_connection(io, svc);
let fut = graceful.watch(conn);
tokio::spawn(async move {
// Errors are logged in the service_fn
let _ = fut.await;
});
}
_ = &mut close_rx => {
debug!("Shutting down discovery server");
})
.await;
if let Err(e) = result {
warn!("Discovery server failed: {}", e);
break;
}
}
}
graceful.shutdown().await;
debug!("Discovery server stopped");
});
Ok(Ok(Self {
Ok(Self {
cred_rx,
_close_tx: close_tx,
}))
})
}
}