From 5451d149720f7a5888a1d4c774e7fdad16bb2976 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Thu, 1 Sep 2022 22:35:03 +0200 Subject: [PATCH] Rate limit audio file streaming too --- audio/src/fetch/receive.rs | 14 +++- core/Cargo.toml | 2 +- core/src/date.rs | 10 ++- core/src/error.rs | 6 ++ core/src/http_client.rs | 130 ++++++++++++++++++++++++++----------- core/src/lib.rs | 2 +- 6 files changed, 123 insertions(+), 41 deletions(-) diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index af12810c..2c58fbf8 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -11,7 +11,7 @@ use hyper::StatusCode; use tempfile::NamedTempFile; use tokio::sync::{mpsc, oneshot}; -use librespot_core::{session::Session, Error}; +use librespot_core::{http_client::HttpClient, session::Session, Error}; use crate::range_set::{Range, RangeSet}; @@ -64,6 +64,18 @@ async fn receive_data( let code = response.status(); if code != StatusCode::PARTIAL_CONTENT { + if code == StatusCode::TOO_MANY_REQUESTS { + if let Some(duration) = HttpClient::get_retry_after(response.headers()) { + warn!( + "Rate limiting, retrying in {} seconds...", + duration.as_secs() + ); + // sleeping here means we hold onto this streamer "slot" + // (we don't decrease the number of open requests) + tokio::time::sleep(duration).await; + } + } + break Err(AudioFileError::StatusCode(code).into()); } diff --git a/core/Cargo.toml b/core/Cargo.toml index d0bcda02..5974ef2a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -51,7 +51,7 @@ sha1 = "0.10" shannon = "0.2" sysinfo = { version = "0.25", default-features = false } thiserror = "1.0" -time = "0.3" +time = { version = "0.3", features = ["formatting", "parsing"] } tokio = { version = "1", features = ["io-util", "macros", "net", "parking_lot", "rt", "sync", "time"] } tokio-stream = "0.1" tokio-tungstenite = { version = "*", default-features = false, features = ["rustls-tls-native-roots"] } diff --git a/core/src/date.rs b/core/src/date.rs index c9aadce8..a3c1b8d7 100644 --- a/core/src/date.rs +++ b/core/src/date.rs @@ -4,7 +4,10 @@ use std::{ ops::Deref, }; -use time::{error::ComponentRange, Date as _Date, OffsetDateTime, PrimitiveDateTime, Time}; +use time::{ + error::ComponentRange, format_description::well_known::Iso8601, Date as _Date, OffsetDateTime, + PrimitiveDateTime, Time, +}; use crate::Error; @@ -48,6 +51,11 @@ impl Date { pub fn now_utc() -> Self { Self(OffsetDateTime::now_utc()) } + + pub fn from_iso8601(input: &str) -> Result { + let date_time = OffsetDateTime::parse(input, &Iso8601::DEFAULT)?; + Ok(Self(date_time)) + } } impl TryFrom<&DateMessage> for Date { diff --git a/core/src/error.rs b/core/src/error.rs index d032bd2a..5614a5d3 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -342,6 +342,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: time::error::Parse) -> Self { + Self::new(ErrorKind::FailedPrecondition, err) + } +} + impl From for Error { fn from(err: quick_xml::Error) -> Self { Self::new(ErrorKind::FailedPrecondition, err) diff --git a/core/src/http_client.rs b/core/src/http_client.rs index d63910b6..63f1d7a8 100644 --- a/core/src/http_client.rs +++ b/core/src/http_client.rs @@ -1,28 +1,31 @@ -use std::{env::consts::OS, time::Duration}; +use std::{ + collections::HashMap, + env::consts::OS, + time::{Duration, Instant}, +}; use bytes::Bytes; use futures_util::{future::IntoStream, FutureExt}; use governor::{ - clock::MonotonicClock, - middleware::NoOpMiddleware, - state::{InMemoryState, NotKeyed}, - Jitter, Quota, RateLimiter, + clock::MonotonicClock, middleware::NoOpMiddleware, state::InMemoryState, Quota, RateLimiter, }; use http::{header::HeaderValue, Uri}; use hyper::{ client::{HttpConnector, ResponseFuture}, header::USER_AGENT, - Body, Client, Request, Response, StatusCode, + Body, Client, HeaderMap, Request, Response, StatusCode, }; use hyper_proxy::{Intercept, Proxy, ProxyConnector}; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; use nonzero_ext::nonzero; use once_cell::sync::OnceCell; +use parking_lot::Mutex; use sysinfo::{System, SystemExt}; use thiserror::Error; use url::Url; use crate::{ + date::Date, version::{spotify_version, FALLBACK_USER_AGENT, VERSION_STRING}, Error, }; @@ -92,7 +95,11 @@ pub struct HttpClient { user_agent: HeaderValue, proxy_url: Option, hyper_client: OnceCell, - rate_limiter: RateLimiter, + + // while the DashMap variant is more performant, our level of concurrency + // is pretty low so we can save pulling in that extra dependency + rate_limiter: + RateLimiter>, MonotonicClock, NoOpMiddleware>, } impl HttpClient { @@ -128,7 +135,7 @@ impl HttpClient { let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64)) .expect("replenish interval should be valid") .allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]); - let rate_limiter = RateLimiter::direct(quota); + let rate_limiter = RateLimiter::keyed(quota); Self { user_agent, @@ -178,19 +185,13 @@ impl HttpClient { .unwrap_or_else(|_| Bytes::new()); loop { - let mut req = Request::new(Body::from(body_as_bytes.clone())); - *req.method_mut() = parts.method.clone(); - *req.uri_mut() = parts.uri.clone(); - *req.version_mut() = parts.version; + let mut req = Request::builder() + .method(parts.method.clone()) + .uri(parts.uri.clone()) + .version(parts.version) + .body(Body::from(body_as_bytes.clone()))?; *req.headers_mut() = parts.headers.clone(); - // For rate limiting we cannot *just* depend on Spotify sending us HTTP/429 - // Retry-After headers. For example, when there is a service interruption - // and HTTP/500 is returned, we don't want to DoS the Spotify infrastructure. - self.rate_limiter - .until_ready_with_jitter(Jitter::up_to(Duration::from_secs(5))) - .await; - let request = self.request_fut(req)?; let response = request.await; @@ -198,22 +199,13 @@ impl HttpClient { let code = response.status(); if code == StatusCode::TOO_MANY_REQUESTS { - if let Some(retry_after) = response.headers().get("Retry-After") { - if let Ok(retry_after_str) = retry_after.to_str() { - if let Ok(retry_after_secs) = retry_after_str.parse::() { - let duration = Duration::from_secs(retry_after_secs); - if duration <= RATE_LIMIT_MAX_WAIT { - warn!( - "Rate limiting, retrying in {} seconds...", - retry_after_secs - ); - tokio::time::sleep(duration).await; - continue; - } else { - debug!("Not going to wait {} seconds", retry_after_secs); - } - } - } + if let Some(duration) = Self::get_retry_after(response.headers()) { + warn!( + "Rate limited by service, retrying in {} seconds...", + duration.as_secs() + ); + tokio::time::sleep(duration).await; + continue; } } @@ -239,7 +231,71 @@ impl HttpClient { let headers_mut = req.headers_mut(); headers_mut.insert(USER_AGENT, self.user_agent.clone()); - let request = self.hyper_client()?.request(req); - Ok(request) + // For rate limiting we cannot *just* depend on Spotify sending us HTTP/429 + // Retry-After headers. For example, when there is a service interruption + // and HTTP/500 is returned, we don't want to DoS the Spotify infrastructure. + let domain = match req.uri().host() { + Some(host) => { + // strip the prefix from *.domain.tld (assume rate limit is per domain, not subdomain) + let mut parts = host + .split('.') + .map(|s| s.to_string()) + .collect::>(); + let n = parts.len().saturating_sub(2); + parts.drain(n..).collect() + } + None => String::from(""), + }; + self.rate_limiter.check_key(&domain).map_err(|e| { + Error::resource_exhausted(format!( + "rate limited for at least another {} seconds", + e.wait_time_from(Instant::now()).as_secs() + )) + })?; + + Ok(self.hyper_client()?.request(req)) + } + + pub fn get_retry_after(headers: &HeaderMap) -> Option { + let now = Date::now_utc().as_timestamp_ms(); + + let mut retry_after_ms = None; + if let Some(header_val) = headers.get("X-RateLimit-Next") { + // *.akamaized.net (Akamai) + if let Ok(date_str) = header_val.to_str() { + if let Ok(target) = Date::from_iso8601(date_str) { + retry_after_ms = Some(target.as_timestamp_ms().saturating_sub(now)) + } + } + } else if let Some(header_val) = headers.get("Fastly-RateLimit-Reset") { + // *.scdn.co (Fastly) + if let Ok(timestamp) = header_val.to_str() { + if let Ok(target) = timestamp.parse::() { + retry_after_ms = Some(target.saturating_sub(now)) + } + } + } else if let Some(header_val) = headers.get("Retry-After") { + // Generic RFC compliant (including *.spotify.com) + if let Ok(retry_after) = header_val.to_str() { + if let Ok(duration) = retry_after.parse::() { + retry_after_ms = Some(duration * 1000) + } + } + } + + if let Some(retry_after) = retry_after_ms { + let duration = Duration::from_millis(retry_after as u64); + if duration <= RATE_LIMIT_MAX_WAIT { + return Some(duration); + } else { + debug!( + "Waiting {} seconds would exceed {} second limit", + duration.as_secs(), + RATE_LIMIT_MAX_WAIT.as_secs() + ); + } + } + + None } } diff --git a/core/src/lib.rs b/core/src/lib.rs index d476d917..f0ee345c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -21,7 +21,7 @@ mod dealer; pub mod diffie_hellman; pub mod error; pub mod file_id; -mod http_client; +pub mod http_client; pub mod mercury; pub mod packet; mod proxytunnel;