Implement rate limiting

This commit is contained in:
Roderick van Domburg 2022-08-29 23:09:51 +02:00
parent 49e885d158
commit 6c2127bfcd
No known key found for this signature in database
GPG key ID: 87F5FDE8A56219F4
4 changed files with 107 additions and 22 deletions

35
Cargo.lock generated
View file

@ -585,6 +585,12 @@ version = "0.3.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a" checksum = "57c66a976bf5909d801bbef33416c41372779507e7a6b3a5e25e4749c58f776a"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.21" version = "0.3.21"
@ -726,6 +732,21 @@ dependencies = [
"system-deps", "system-deps",
] ]
[[package]]
name = "governor"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19775995ee20209163239355bc3ad2f33f83da35d9ef72dea26e5af753552c87"
dependencies = [
"futures",
"futures-timer",
"no-std-compat",
"nonzero_ext",
"parking_lot 0.12.1",
"rand",
"smallvec",
]
[[package]] [[package]]
name = "gstreamer" name = "gstreamer"
version = "0.18.8" version = "0.18.8"
@ -1404,6 +1425,7 @@ dependencies = [
"form_urlencoded", "form_urlencoded",
"futures-core", "futures-core",
"futures-util", "futures-util",
"governor",
"hex", "hex",
"hmac", "hmac",
"http", "http",
@ -1413,6 +1435,7 @@ dependencies = [
"hyper-rustls 0.23.0", "hyper-rustls 0.23.0",
"librespot-protocol", "librespot-protocol",
"log", "log",
"nonzero_ext",
"num", "num",
"num-bigint", "num-bigint",
"num-derive", "num-derive",
@ -1709,6 +1732,12 @@ dependencies = [
"memoffset", "memoffset",
] ]
[[package]]
name = "no-std-compat"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c"
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.1" version = "7.1.1"
@ -1719,6 +1748,12 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "nonzero_ext"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]] [[package]]
name = "ntapi" name = "ntapi"
version = "0.3.7" version = "0.3.7"

View file

@ -22,6 +22,7 @@ dns-sd = { version = "0.1", optional = true }
form_urlencoded = "1.0" form_urlencoded = "1.0"
futures-core = "0.3" futures-core = "0.3"
futures-util = { version = "0.3", features = ["alloc", "bilock", "sink", "unstable"] } futures-util = { version = "0.3", features = ["alloc", "bilock", "sink", "unstable"] }
governor = { version = "0.4", default-features = false, features = ["std", "jitter"] }
hex = "0.4" hex = "0.4"
hmac = "0.12" hmac = "0.12"
httparse = "1.7" httparse = "1.7"
@ -30,6 +31,7 @@ hyper = { version = "0.14", features = ["client", "http1", "http2", "tcp"] }
hyper-proxy = { version = "0.9", default-features = false, features = ["rustls"] } hyper-proxy = { version = "0.9", default-features = false, features = ["rustls"] }
hyper-rustls = { version = "0.23", features = ["http2"] } hyper-rustls = { version = "0.23", features = ["http2"] }
log = "0.4" log = "0.4"
nonzero_ext = "0.3"
num = "0.4" num = "0.4"
num-bigint = { version = "0.4", features = ["rand"] } num-bigint = { version = "0.4", features = ["rand"] }
num-derive = "0.3" num-derive = "0.3"

View file

@ -1,7 +1,13 @@
use std::env::consts::OS; use std::{env::consts::OS, time::Duration};
use bytes::Bytes; use bytes::Bytes;
use futures_util::{future::IntoStream, FutureExt}; use futures_util::{future::IntoStream, FutureExt};
use governor::{
clock::MonotonicClock,
middleware::NoOpMiddleware,
state::{InMemoryState, NotKeyed},
Jitter, Quota, RateLimiter,
};
use http::{header::HeaderValue, Uri}; use http::{header::HeaderValue, Uri};
use hyper::{ use hyper::{
client::{HttpConnector, ResponseFuture}, client::{HttpConnector, ResponseFuture},
@ -10,6 +16,7 @@ use hyper::{
}; };
use hyper_proxy::{Intercept, Proxy, ProxyConnector}; use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
use nonzero_ext::nonzero;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use sysinfo::{System, SystemExt}; use sysinfo::{System, SystemExt};
use thiserror::Error; use thiserror::Error;
@ -20,6 +27,12 @@ use crate::{
Error, Error,
}; };
// The 30 seconds interval is documented by Spotify, but the calls per interval
// is a guesstimate and probably subject to licensing (purchasing extra calls)
// and may change at any time.
pub const RATE_LIMIT_INTERVAL: u64 = 30; // seconds
pub const RATE_LIMIT_CALLS_PER_INTERVAL: u32 = 300;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum HttpClientError { pub enum HttpClientError {
#[error("Response status code: {0}")] #[error("Response status code: {0}")]
@ -74,11 +87,11 @@ impl From<HttpClientError> for Error {
type HyperClient = Client<ProxyConnector<HttpsConnector<HttpConnector>>, Body>; type HyperClient = Client<ProxyConnector<HttpsConnector<HttpConnector>>, Body>;
#[derive(Clone)]
pub struct HttpClient { pub struct HttpClient {
user_agent: HeaderValue, user_agent: HeaderValue,
proxy_url: Option<Url>, proxy_url: Option<Url>,
hyper_client: OnceCell<HyperClient>, hyper_client: OnceCell<HyperClient>,
rate_limiter: RateLimiter<NotKeyed, InMemoryState, MonotonicClock, NoOpMiddleware>,
} }
impl HttpClient { impl HttpClient {
@ -109,10 +122,18 @@ impl HttpClient {
HeaderValue::from_static(FALLBACK_USER_AGENT) HeaderValue::from_static(FALLBACK_USER_AGENT)
}); });
let replenish_interval_ns = Duration::from_secs(RATE_LIMIT_INTERVAL).as_nanos()
/ RATE_LIMIT_CALLS_PER_INTERVAL as u128;
let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64))
.expect("replenish interval should be valid")
.allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]);
let rate_limiter = RateLimiter::direct(quota);
Self { Self {
user_agent, user_agent,
proxy_url: proxy_url.cloned(), proxy_url: proxy_url.cloned(),
hyper_client: OnceCell::new(), hyper_client: OnceCell::new(),
rate_limiter,
} }
} }
@ -147,17 +168,54 @@ impl HttpClient {
pub async fn request(&self, req: Request<Body>) -> Result<Response<Body>, Error> { pub async fn request(&self, req: Request<Body>) -> Result<Response<Body>, Error> {
debug!("Requesting {}", req.uri().to_string()); debug!("Requesting {}", req.uri().to_string());
// `Request` does not implement `Clone` because its `Body` may be a single-shot stream.
// As correct as that may be technically, we now need all this boilerplate to clone it
// ourselves, as any `Request` is moved in the loop.
let (parts, body) = req.into_parts();
let body_as_bytes = hyper::body::to_bytes(body)
.await
.unwrap_or_else(|_| Bytes::new());
loop {
let mut req = Request::new(Body::from(body_as_bytes.clone()));
*req.method_mut() = parts.method.clone();
*req.uri_mut() = parts.uri.clone();
*req.version_mut() = parts.version;
*req.headers_mut() = parts.headers.clone();
// For rate limiting we cannot *just* depend on Spotify sending us HTTP/429
// Retry-After headers. For example, when there is a service interruption
// and HTTP/500 is returned, we don't want to DoS the Spotify infrastructure.
self.rate_limiter
.until_ready_with_jitter(Jitter::up_to(Duration::from_secs(5)))
.await;
let request = self.request_fut(req)?; let request = self.request_fut(req)?;
let response = request.await; let response = request.await;
if let Ok(response) = &response { if let Ok(response) = &response {
let code = response.status(); let code = response.status();
if code == StatusCode::TOO_MANY_REQUESTS {
if let Some(retry_after) = response.headers().get("Retry-After") {
if let Ok(retry_after_str) = retry_after.to_str() {
if let Ok(retry_after_secs) = retry_after_str.parse::<u64>() {
warn!("Rate limiting, retrying in {} seconds...", retry_after_secs);
let duration = Duration::from_secs(retry_after_secs);
tokio::time::sleep(duration).await;
continue;
}
}
}
}
if code != StatusCode::OK { if code != StatusCode::OK {
return Err(HttpClientError::StatusCode(code).into()); return Err(HttpClientError::StatusCode(code).into());
} }
} }
Ok(response?) return Ok(response?);
}
} }
pub async fn request_body(&self, req: Request<Body>) -> Result<Bytes, Error> { pub async fn request_body(&self, req: Request<Body>) -> Result<Bytes, Error> {

View file

@ -15,7 +15,6 @@ use hyper::{
Body, HeaderMap, Method, Request, Body, HeaderMap, Method, Request,
}; };
use protobuf::{Message, ProtobufEnum}; use protobuf::{Message, ProtobufEnum};
use rand::Rng;
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
use sysinfo::{System, SystemExt}; use sysinfo::{System, SystemExt};
use thiserror::Error; use thiserror::Error;
@ -176,7 +175,7 @@ impl SpClient {
return Ok(client_token.access_token); return Ok(client_token.access_token);
} }
trace!("Client token unavailable or expired, requesting new token."); debug!("Client token unavailable or expired, requesting new token.");
let mut request = ClientTokenRequest::new(); let mut request = ClientTokenRequest::new();
request.set_request_type(ClientTokenRequestType::REQUEST_CLIENT_DATA_REQUEST); request.set_request_type(ClientTokenRequestType::REQUEST_CLIENT_DATA_REQUEST);
@ -270,7 +269,7 @@ impl SpClient {
// or are presented a hash cash challenge to solve first // or are presented a hash cash challenge to solve first
Some(ClientTokenResponseType::RESPONSE_GRANTED_TOKEN_RESPONSE) => break message, Some(ClientTokenResponseType::RESPONSE_GRANTED_TOKEN_RESPONSE) => break message,
Some(ClientTokenResponseType::RESPONSE_CHALLENGES_RESPONSE) => { Some(ClientTokenResponseType::RESPONSE_CHALLENGES_RESPONSE) => {
trace!("Received a hash cash challenge, solving..."); debug!("Received a hash cash challenge, solving...");
let challenges = message.get_challenges().clone(); let challenges = message.get_challenges().clone();
let state = challenges.get_state(); let state = challenges.get_state();
@ -500,16 +499,7 @@ impl SpClient {
} }
} }
// When retrying, avoid hammering the Spotify infrastructure by sleeping a while.
// The backoff time is chosen randomly from an ever-increasing range.
let max_seconds = u64::pow(tries as u64, 2) * 3;
let backoff = Duration::from_secs(rand::thread_rng().gen_range(1..=max_seconds));
warn!(
"Unable to complete API request, waiting {} seconds before retrying...",
backoff.as_secs(),
);
debug!("Error was: {:?}", last_response); debug!("Error was: {:?}", last_response);
tokio::time::sleep(backoff).await;
} }
last_response last_response