mirror of
https://github.com/librespot-org/librespot.git
synced 2024-11-08 16:45:43 +00:00
Rate limit audio file streaming too
This commit is contained in:
parent
56b5f08a32
commit
5451d14972
6 changed files with 123 additions and 41 deletions
|
@ -11,7 +11,7 @@ use hyper::StatusCode;
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use librespot_core::{session::Session, Error};
|
use librespot_core::{http_client::HttpClient, session::Session, Error};
|
||||||
|
|
||||||
use crate::range_set::{Range, RangeSet};
|
use crate::range_set::{Range, RangeSet};
|
||||||
|
|
||||||
|
@ -64,6 +64,18 @@ async fn receive_data(
|
||||||
|
|
||||||
let code = response.status();
|
let code = response.status();
|
||||||
if code != StatusCode::PARTIAL_CONTENT {
|
if code != StatusCode::PARTIAL_CONTENT {
|
||||||
|
if code == StatusCode::TOO_MANY_REQUESTS {
|
||||||
|
if let Some(duration) = HttpClient::get_retry_after(response.headers()) {
|
||||||
|
warn!(
|
||||||
|
"Rate limiting, retrying in {} seconds...",
|
||||||
|
duration.as_secs()
|
||||||
|
);
|
||||||
|
// sleeping here means we hold onto this streamer "slot"
|
||||||
|
// (we don't decrease the number of open requests)
|
||||||
|
tokio::time::sleep(duration).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
break Err(AudioFileError::StatusCode(code).into());
|
break Err(AudioFileError::StatusCode(code).into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,7 +51,7 @@ sha1 = "0.10"
|
||||||
shannon = "0.2"
|
shannon = "0.2"
|
||||||
sysinfo = { version = "0.25", default-features = false }
|
sysinfo = { version = "0.25", default-features = false }
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
time = "0.3"
|
time = { version = "0.3", features = ["formatting", "parsing"] }
|
||||||
tokio = { version = "1", features = ["io-util", "macros", "net", "parking_lot", "rt", "sync", "time"] }
|
tokio = { version = "1", features = ["io-util", "macros", "net", "parking_lot", "rt", "sync", "time"] }
|
||||||
tokio-stream = "0.1"
|
tokio-stream = "0.1"
|
||||||
tokio-tungstenite = { version = "*", default-features = false, features = ["rustls-tls-native-roots"] }
|
tokio-tungstenite = { version = "*", default-features = false, features = ["rustls-tls-native-roots"] }
|
||||||
|
|
|
@ -4,7 +4,10 @@ use std::{
|
||||||
ops::Deref,
|
ops::Deref,
|
||||||
};
|
};
|
||||||
|
|
||||||
use time::{error::ComponentRange, Date as _Date, OffsetDateTime, PrimitiveDateTime, Time};
|
use time::{
|
||||||
|
error::ComponentRange, format_description::well_known::Iso8601, Date as _Date, OffsetDateTime,
|
||||||
|
PrimitiveDateTime, Time,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
|
|
||||||
|
@ -48,6 +51,11 @@ impl Date {
|
||||||
pub fn now_utc() -> Self {
|
pub fn now_utc() -> Self {
|
||||||
Self(OffsetDateTime::now_utc())
|
Self(OffsetDateTime::now_utc())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn from_iso8601(input: &str) -> Result<Self, Error> {
|
||||||
|
let date_time = OffsetDateTime::parse(input, &Iso8601::DEFAULT)?;
|
||||||
|
Ok(Self(date_time))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<&DateMessage> for Date {
|
impl TryFrom<&DateMessage> for Date {
|
||||||
|
|
|
@ -342,6 +342,12 @@ impl From<hyper::Error> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<time::error::Parse> for Error {
|
||||||
|
fn from(err: time::error::Parse) -> Self {
|
||||||
|
Self::new(ErrorKind::FailedPrecondition, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<quick_xml::Error> for Error {
|
impl From<quick_xml::Error> for Error {
|
||||||
fn from(err: quick_xml::Error) -> Self {
|
fn from(err: quick_xml::Error) -> Self {
|
||||||
Self::new(ErrorKind::FailedPrecondition, err)
|
Self::new(ErrorKind::FailedPrecondition, err)
|
||||||
|
|
|
@ -1,28 +1,31 @@
|
||||||
use std::{env::consts::OS, time::Duration};
|
use std::{
|
||||||
|
collections::HashMap,
|
||||||
|
env::consts::OS,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures_util::{future::IntoStream, FutureExt};
|
use futures_util::{future::IntoStream, FutureExt};
|
||||||
use governor::{
|
use governor::{
|
||||||
clock::MonotonicClock,
|
clock::MonotonicClock, middleware::NoOpMiddleware, state::InMemoryState, Quota, RateLimiter,
|
||||||
middleware::NoOpMiddleware,
|
|
||||||
state::{InMemoryState, NotKeyed},
|
|
||||||
Jitter, Quota, RateLimiter,
|
|
||||||
};
|
};
|
||||||
use http::{header::HeaderValue, Uri};
|
use http::{header::HeaderValue, Uri};
|
||||||
use hyper::{
|
use hyper::{
|
||||||
client::{HttpConnector, ResponseFuture},
|
client::{HttpConnector, ResponseFuture},
|
||||||
header::USER_AGENT,
|
header::USER_AGENT,
|
||||||
Body, Client, Request, Response, StatusCode,
|
Body, Client, HeaderMap, Request, Response, StatusCode,
|
||||||
};
|
};
|
||||||
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
|
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
|
||||||
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
|
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
|
||||||
use nonzero_ext::nonzero;
|
use nonzero_ext::nonzero;
|
||||||
use once_cell::sync::OnceCell;
|
use once_cell::sync::OnceCell;
|
||||||
|
use parking_lot::Mutex;
|
||||||
use sysinfo::{System, SystemExt};
|
use sysinfo::{System, SystemExt};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
date::Date,
|
||||||
version::{spotify_version, FALLBACK_USER_AGENT, VERSION_STRING},
|
version::{spotify_version, FALLBACK_USER_AGENT, VERSION_STRING},
|
||||||
Error,
|
Error,
|
||||||
};
|
};
|
||||||
|
@ -92,7 +95,11 @@ pub struct HttpClient {
|
||||||
user_agent: HeaderValue,
|
user_agent: HeaderValue,
|
||||||
proxy_url: Option<Url>,
|
proxy_url: Option<Url>,
|
||||||
hyper_client: OnceCell<HyperClient>,
|
hyper_client: OnceCell<HyperClient>,
|
||||||
rate_limiter: RateLimiter<NotKeyed, InMemoryState, MonotonicClock, NoOpMiddleware>,
|
|
||||||
|
// while the DashMap variant is more performant, our level of concurrency
|
||||||
|
// is pretty low so we can save pulling in that extra dependency
|
||||||
|
rate_limiter:
|
||||||
|
RateLimiter<String, Mutex<HashMap<String, InMemoryState>>, MonotonicClock, NoOpMiddleware>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HttpClient {
|
impl HttpClient {
|
||||||
|
@ -128,7 +135,7 @@ impl HttpClient {
|
||||||
let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64))
|
let quota = Quota::with_period(Duration::from_nanos(replenish_interval_ns as u64))
|
||||||
.expect("replenish interval should be valid")
|
.expect("replenish interval should be valid")
|
||||||
.allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]);
|
.allow_burst(nonzero![RATE_LIMIT_CALLS_PER_INTERVAL]);
|
||||||
let rate_limiter = RateLimiter::direct(quota);
|
let rate_limiter = RateLimiter::keyed(quota);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
user_agent,
|
user_agent,
|
||||||
|
@ -178,19 +185,13 @@ impl HttpClient {
|
||||||
.unwrap_or_else(|_| Bytes::new());
|
.unwrap_or_else(|_| Bytes::new());
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let mut req = Request::new(Body::from(body_as_bytes.clone()));
|
let mut req = Request::builder()
|
||||||
*req.method_mut() = parts.method.clone();
|
.method(parts.method.clone())
|
||||||
*req.uri_mut() = parts.uri.clone();
|
.uri(parts.uri.clone())
|
||||||
*req.version_mut() = parts.version;
|
.version(parts.version)
|
||||||
|
.body(Body::from(body_as_bytes.clone()))?;
|
||||||
*req.headers_mut() = parts.headers.clone();
|
*req.headers_mut() = parts.headers.clone();
|
||||||
|
|
||||||
// For rate limiting we cannot *just* depend on Spotify sending us HTTP/429
|
|
||||||
// Retry-After headers. For example, when there is a service interruption
|
|
||||||
// and HTTP/500 is returned, we don't want to DoS the Spotify infrastructure.
|
|
||||||
self.rate_limiter
|
|
||||||
.until_ready_with_jitter(Jitter::up_to(Duration::from_secs(5)))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let request = self.request_fut(req)?;
|
let request = self.request_fut(req)?;
|
||||||
let response = request.await;
|
let response = request.await;
|
||||||
|
|
||||||
|
@ -198,22 +199,13 @@ impl HttpClient {
|
||||||
let code = response.status();
|
let code = response.status();
|
||||||
|
|
||||||
if code == StatusCode::TOO_MANY_REQUESTS {
|
if code == StatusCode::TOO_MANY_REQUESTS {
|
||||||
if let Some(retry_after) = response.headers().get("Retry-After") {
|
if let Some(duration) = Self::get_retry_after(response.headers()) {
|
||||||
if let Ok(retry_after_str) = retry_after.to_str() {
|
warn!(
|
||||||
if let Ok(retry_after_secs) = retry_after_str.parse::<u64>() {
|
"Rate limited by service, retrying in {} seconds...",
|
||||||
let duration = Duration::from_secs(retry_after_secs);
|
duration.as_secs()
|
||||||
if duration <= RATE_LIMIT_MAX_WAIT {
|
);
|
||||||
warn!(
|
tokio::time::sleep(duration).await;
|
||||||
"Rate limiting, retrying in {} seconds...",
|
continue;
|
||||||
retry_after_secs
|
|
||||||
);
|
|
||||||
tokio::time::sleep(duration).await;
|
|
||||||
continue;
|
|
||||||
} else {
|
|
||||||
debug!("Not going to wait {} seconds", retry_after_secs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -239,7 +231,71 @@ impl HttpClient {
|
||||||
let headers_mut = req.headers_mut();
|
let headers_mut = req.headers_mut();
|
||||||
headers_mut.insert(USER_AGENT, self.user_agent.clone());
|
headers_mut.insert(USER_AGENT, self.user_agent.clone());
|
||||||
|
|
||||||
let request = self.hyper_client()?.request(req);
|
// For rate limiting we cannot *just* depend on Spotify sending us HTTP/429
|
||||||
Ok(request)
|
// Retry-After headers. For example, when there is a service interruption
|
||||||
|
// and HTTP/500 is returned, we don't want to DoS the Spotify infrastructure.
|
||||||
|
let domain = match req.uri().host() {
|
||||||
|
Some(host) => {
|
||||||
|
// strip the prefix from *.domain.tld (assume rate limit is per domain, not subdomain)
|
||||||
|
let mut parts = host
|
||||||
|
.split('.')
|
||||||
|
.map(|s| s.to_string())
|
||||||
|
.collect::<Vec<String>>();
|
||||||
|
let n = parts.len().saturating_sub(2);
|
||||||
|
parts.drain(n..).collect()
|
||||||
|
}
|
||||||
|
None => String::from(""),
|
||||||
|
};
|
||||||
|
self.rate_limiter.check_key(&domain).map_err(|e| {
|
||||||
|
Error::resource_exhausted(format!(
|
||||||
|
"rate limited for at least another {} seconds",
|
||||||
|
e.wait_time_from(Instant::now()).as_secs()
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(self.hyper_client()?.request(req))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_retry_after(headers: &HeaderMap<HeaderValue>) -> Option<Duration> {
|
||||||
|
let now = Date::now_utc().as_timestamp_ms();
|
||||||
|
|
||||||
|
let mut retry_after_ms = None;
|
||||||
|
if let Some(header_val) = headers.get("X-RateLimit-Next") {
|
||||||
|
// *.akamaized.net (Akamai)
|
||||||
|
if let Ok(date_str) = header_val.to_str() {
|
||||||
|
if let Ok(target) = Date::from_iso8601(date_str) {
|
||||||
|
retry_after_ms = Some(target.as_timestamp_ms().saturating_sub(now))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if let Some(header_val) = headers.get("Fastly-RateLimit-Reset") {
|
||||||
|
// *.scdn.co (Fastly)
|
||||||
|
if let Ok(timestamp) = header_val.to_str() {
|
||||||
|
if let Ok(target) = timestamp.parse::<i64>() {
|
||||||
|
retry_after_ms = Some(target.saturating_sub(now))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if let Some(header_val) = headers.get("Retry-After") {
|
||||||
|
// Generic RFC compliant (including *.spotify.com)
|
||||||
|
if let Ok(retry_after) = header_val.to_str() {
|
||||||
|
if let Ok(duration) = retry_after.parse::<i64>() {
|
||||||
|
retry_after_ms = Some(duration * 1000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(retry_after) = retry_after_ms {
|
||||||
|
let duration = Duration::from_millis(retry_after as u64);
|
||||||
|
if duration <= RATE_LIMIT_MAX_WAIT {
|
||||||
|
return Some(duration);
|
||||||
|
} else {
|
||||||
|
debug!(
|
||||||
|
"Waiting {} seconds would exceed {} second limit",
|
||||||
|
duration.as_secs(),
|
||||||
|
RATE_LIMIT_MAX_WAIT.as_secs()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ mod dealer;
|
||||||
pub mod diffie_hellman;
|
pub mod diffie_hellman;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod file_id;
|
pub mod file_id;
|
||||||
mod http_client;
|
pub mod http_client;
|
||||||
pub mod mercury;
|
pub mod mercury;
|
||||||
pub mod packet;
|
pub mod packet;
|
||||||
mod proxytunnel;
|
mod proxytunnel;
|
||||||
|
|
Loading…
Reference in a new issue