Implement CDN for audio files

This commit is contained in:
Roderick van Domburg 2021-12-16 22:42:37 +01:00
parent 9a93cca562
commit 2f7b9863d9
No known key found for this signature in database
GPG key ID: A9EF5222A26F0451
23 changed files with 518 additions and 251 deletions

68
Cargo.lock generated
View file

@ -447,9 +447,9 @@ dependencies = [
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.18" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cd0210d8c325c245ff06fd95a3b13689a1a276ac8cfa8e8720cb840bfb84b9e" checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -462,9 +462,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.18" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fc8cd39e3dbf865f7340dce6a2d401d24fd37c6fe6c4f0ee0de8bfca2252d27" checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -472,15 +472,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.18" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "629316e42fe7c2a0b9a65b47d159ceaa5453ab14e8f0a3c5eedbb8cd55b4a445" checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.18" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b808bf53348a36cab739d7e04755909b9fcaaa69b7d7e588b37b6ec62704c97" checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@ -489,16 +489,18 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.18" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e481354db6b5c353246ccf6a728b0c5511d752c08da7260546fc0933869daa11" checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377"
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.18" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a89f17b21645bc4ed773c69af9c9a0effd4a3f1a3876eadd453469f8854e7fdd" checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb"
dependencies = [ dependencies = [
"autocfg",
"proc-macro-hack",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn", "syn",
@ -506,22 +508,23 @@ dependencies = [
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.18" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "996c6442437b62d21a32cd9906f9c41e7dc1e19a9579843fad948696769305af" checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.18" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dabf1872aaab32c886832f2276d2f5399887e2bd613698a02359e4ea83f8de12" checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.18" version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41d22213122356472061ac0f1ab2cee28d2bac8491410fd68c2af53d1cedb83e" checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481"
dependencies = [ dependencies = [
"autocfg",
"futures-channel", "futures-channel",
"futures-core", "futures-core",
"futures-io", "futures-io",
@ -531,6 +534,8 @@ dependencies = [
"memchr", "memchr",
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab", "slab",
] ]
@ -726,9 +731,9 @@ dependencies = [
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.7" version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fd819562fcebdac5afc5c113c3ec36f902840b70fd4fc458799c8ce4607ae55" checksum = "8f072413d126e57991455e0a922b31e4c8ba7c2ffbebf6b78b4f8521397d65cd"
dependencies = [ dependencies = [
"bytes", "bytes",
"fnv", "fnv",
@ -861,9 +866,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "0.14.15" version = "0.14.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "436ec0091e4f20e655156a30a0df3770fe2900aa301e548e08446ec794b6953c" checksum = "b7ec3e62bdc98a2f0393a5048e4c30ef659440ea6e0e572965103e72bd836f55"
dependencies = [ dependencies = [
"bytes", "bytes",
"futures-channel", "futures-channel",
@ -1215,10 +1220,14 @@ dependencies = [
"aes-ctr", "aes-ctr",
"byteorder", "byteorder",
"bytes", "bytes",
"futures-core",
"futures-executor",
"futures-util", "futures-util",
"hyper",
"librespot-core", "librespot-core",
"log", "log",
"tempfile", "tempfile",
"thiserror",
"tokio", "tokio",
] ]
@ -1249,6 +1258,7 @@ dependencies = [
"base64", "base64",
"byteorder", "byteorder",
"bytes", "bytes",
"chrono",
"env_logger", "env_logger",
"form_urlencoded", "form_urlencoded",
"futures-core", "futures-core",
@ -1272,6 +1282,8 @@ dependencies = [
"protobuf", "protobuf",
"quick-xml", "quick-xml",
"rand", "rand",
"rustls",
"rustls-native-certs",
"serde", "serde",
"serde_json", "serde_json",
"sha-1", "sha-1",
@ -1917,6 +1929,18 @@ dependencies = [
"version_check", "version_check",
] ]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.33" version = "1.0.33"

View file

@ -14,7 +14,11 @@ version = "0.3.1"
aes-ctr = "0.6" aes-ctr = "0.6"
byteorder = "1.4" byteorder = "1.4"
bytes = "1.0" bytes = "1.0"
log = "0.4" futures-core = { version = "0.3", default-features = false }
futures-executor = "0.3"
futures-util = { version = "0.3", default_features = false } futures-util = { version = "0.3", default_features = false }
hyper = { version = "0.14", features = ["client"] }
log = "0.4"
tempfile = "3.1" tempfile = "3.1"
thiserror = "1.0"
tokio = { version = "1", features = ["sync", "macros"] } tokio = { version = "1", features = ["sync", "macros"] }

View file

@ -7,36 +7,55 @@ use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Condvar, Mutex}; use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use byteorder::{BigEndian, ByteOrder}; use futures_util::future::IntoStream;
use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt}; use futures_util::{StreamExt, TryFutureExt};
use hyper::client::ResponseFuture;
use hyper::header::CONTENT_RANGE;
use hyper::Body;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use librespot_core::channel::{ChannelData, ChannelError, ChannelHeaders}; use librespot_core::cdn_url::{CdnUrl, CdnUrlError};
use librespot_core::file_id::FileId; use librespot_core::file_id::FileId;
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spclient::SpClientError;
use self::receive::{audio_file_fetch, request_range}; use self::receive::audio_file_fetch;
use crate::range_set::{Range, RangeSet}; use crate::range_set::{Range, RangeSet};
#[derive(Error, Debug)]
pub enum AudioFileError {
#[error("could not complete CDN request: {0}")]
Cdn(hyper::Error),
#[error("empty response")]
Empty,
#[error("error parsing response")]
Parsing,
#[error("could not complete API request: {0}")]
SpClient(#[from] SpClientError),
#[error("could not get CDN URL: {0}")]
Url(#[from] CdnUrlError),
}
/// The minimum size of a block that is requested from the Spotify servers in one request. /// The minimum size of a block that is requested from the Spotify servers in one request.
/// This is the block size that is typically requested while doing a `seek()` on a file. /// This is the block size that is typically requested while doing a `seek()` on a file.
/// Note: smaller requests can happen if part of the block is downloaded already. /// Note: smaller requests can happen if part of the block is downloaded already.
const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; pub const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 256;
/// The amount of data that is requested when initially opening a file. /// The amount of data that is requested when initially opening a file.
/// Note: if the file is opened to play from the beginning, the amount of data to /// Note: if the file is opened to play from the beginning, the amount of data to
/// read ahead is requested in addition to this amount. If the file is opened to seek to /// read ahead is requested in addition to this amount. If the file is opened to seek to
/// another position, then only this amount is requested on the first request. /// another position, then only this amount is requested on the first request.
const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16; pub const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 128;
/// The ping time that is used for calculations before a ping time was actually measured. /// The ping time that is used for calculations before a ping time was actually measured.
const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500); pub const INITIAL_PING_TIME_ESTIMATE: Duration = Duration::from_millis(500);
/// If the measured ping time to the Spotify server is larger than this value, it is capped /// If the measured ping time to the Spotify server is larger than this value, it is capped
/// to avoid run-away block sizes and pre-fetching. /// to avoid run-away block sizes and pre-fetching.
const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500); pub const MAXIMUM_ASSUMED_PING_TIME: Duration = Duration::from_millis(1500);
/// Before playback starts, this many seconds of data must be present. /// Before playback starts, this many seconds of data must be present.
/// Note: the calculations are done using the nominal bitrate of the file. The actual amount /// Note: the calculations are done using the nominal bitrate of the file. The actual amount
@ -65,7 +84,7 @@ pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f32 = 10.0;
/// If the amount of data that is pending (requested but not received) is less than a certain amount, /// If the amount of data that is pending (requested but not received) is less than a certain amount,
/// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more /// data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
/// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>` /// data is calculated as `<pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>`
const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0; pub const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0;
/// Similar to `PREFETCH_THRESHOLD_FACTOR`, but it also takes the current download rate into account. /// Similar to `PREFETCH_THRESHOLD_FACTOR`, but it also takes the current download rate into account.
/// The formula used is `<pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>` /// The formula used is `<pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>`
@ -74,16 +93,16 @@ const PREFETCH_THRESHOLD_FACTOR: f32 = 4.0;
/// the download rate ramps up. However, this comes at the cost that it might hurt ping time if a seek is /// the download rate ramps up. However, this comes at the cost that it might hurt ping time if a seek is
/// performed while downloading. Values smaller than `1.0` cause the download rate to collapse and effectively /// performed while downloading. Values smaller than `1.0` cause the download rate to collapse and effectively
/// only `PREFETCH_THRESHOLD_FACTOR` is in effect. Thus, set to `0.0` if bandwidth saturation is not wanted. /// only `PREFETCH_THRESHOLD_FACTOR` is in effect. Thus, set to `0.0` if bandwidth saturation is not wanted.
const FAST_PREFETCH_THRESHOLD_FACTOR: f32 = 1.5; pub const FAST_PREFETCH_THRESHOLD_FACTOR: f32 = 1.5;
/// Limit the number of requests that are pending simultaneously before pre-fetching data. Pending /// Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
/// requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next /// requests share bandwidth. Thus, having too many requests can lead to the one that is needed next
/// for playback to be delayed leading to a buffer underrun. This limit has the effect that a new /// for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
/// pre-fetch request is only sent if less than `MAX_PREFETCH_REQUESTS` are pending. /// pre-fetch request is only sent if less than `MAX_PREFETCH_REQUESTS` are pending.
const MAX_PREFETCH_REQUESTS: usize = 4; pub const MAX_PREFETCH_REQUESTS: usize = 4;
/// The time we will wait to obtain status updates on downloading. /// The time we will wait to obtain status updates on downloading.
const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(1); pub const DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(1);
pub enum AudioFile { pub enum AudioFile {
Cached(fs::File), Cached(fs::File),
@ -91,7 +110,16 @@ pub enum AudioFile {
} }
#[derive(Debug)] #[derive(Debug)]
enum StreamLoaderCommand { pub struct StreamingRequest {
streamer: IntoStream<ResponseFuture>,
initial_body: Option<Body>,
offset: usize,
length: usize,
request_time: Instant,
}
#[derive(Debug)]
pub enum StreamLoaderCommand {
Fetch(Range), // signal the stream loader to fetch a range of the file Fetch(Range), // signal the stream loader to fetch a range of the file
RandomAccessMode(), // optimise download strategy for random access RandomAccessMode(), // optimise download strategy for random access
StreamMode(), // optimise download strategy for streaming StreamMode(), // optimise download strategy for streaming
@ -244,9 +272,9 @@ enum DownloadStrategy {
} }
struct AudioFileShared { struct AudioFileShared {
file_id: FileId, cdn_url: CdnUrl,
file_size: usize, file_size: usize,
stream_data_rate: usize, bytes_per_second: usize,
cond: Condvar, cond: Condvar,
download_status: Mutex<AudioFileDownloadStatus>, download_status: Mutex<AudioFileDownloadStatus>,
download_strategy: Mutex<DownloadStrategy>, download_strategy: Mutex<DownloadStrategy>,
@ -255,19 +283,13 @@ struct AudioFileShared {
read_position: AtomicUsize, read_position: AtomicUsize,
} }
pub struct InitialData {
rx: ChannelData,
length: usize,
request_sent_time: Instant,
}
impl AudioFile { impl AudioFile {
pub async fn open( pub async fn open(
session: &Session, session: &Session,
file_id: FileId, file_id: FileId,
bytes_per_second: usize, bytes_per_second: usize,
play_from_beginning: bool, play_from_beginning: bool,
) -> Result<AudioFile, ChannelError> { ) -> Result<AudioFile, AudioFileError> {
if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) { if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
debug!("File {} already in cache", file_id); debug!("File {} already in cache", file_id);
return Ok(AudioFile::Cached(file)); return Ok(AudioFile::Cached(file));
@ -276,35 +298,13 @@ impl AudioFile {
debug!("Downloading file {}", file_id); debug!("Downloading file {}", file_id);
let (complete_tx, complete_rx) = oneshot::channel(); let (complete_tx, complete_rx) = oneshot::channel();
let mut length = if play_from_beginning {
INITIAL_DOWNLOAD_SIZE
+ max(
(READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize,
(INITIAL_PING_TIME_ESTIMATE.as_secs_f32()
* READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* bytes_per_second as f32) as usize,
)
} else {
INITIAL_DOWNLOAD_SIZE
};
if length % 4 != 0 {
length += 4 - (length % 4);
}
let (headers, rx) = request_range(session, file_id, 0, length).split();
let initial_data = InitialData {
rx,
length,
request_sent_time: Instant::now(),
};
let streaming = AudioFileStreaming::open( let streaming = AudioFileStreaming::open(
session.clone(), session.clone(),
initial_data,
headers,
file_id, file_id,
complete_tx, complete_tx,
bytes_per_second, bytes_per_second,
play_from_beginning,
); );
let session_ = session.clone(); let session_ = session.clone();
@ -343,24 +343,58 @@ impl AudioFile {
impl AudioFileStreaming { impl AudioFileStreaming {
pub async fn open( pub async fn open(
session: Session, session: Session,
initial_data: InitialData,
headers: ChannelHeaders,
file_id: FileId, file_id: FileId,
complete_tx: oneshot::Sender<NamedTempFile>, complete_tx: oneshot::Sender<NamedTempFile>,
streaming_data_rate: usize, bytes_per_second: usize,
) -> Result<AudioFileStreaming, ChannelError> { play_from_beginning: bool,
let (_, data) = headers ) -> Result<AudioFileStreaming, AudioFileError> {
.try_filter(|(id, _)| future::ready(*id == 0x3)) let download_size = if play_from_beginning {
.next() INITIAL_DOWNLOAD_SIZE
.await + max(
.unwrap()?; (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * bytes_per_second as f32) as usize,
(INITIAL_PING_TIME_ESTIMATE.as_secs_f32()
* READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* bytes_per_second as f32) as usize,
)
} else {
INITIAL_DOWNLOAD_SIZE
};
let size = BigEndian::read_u32(&data) as usize * 4; let mut cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
let url = cdn_url.get_url()?;
let mut streamer = session.spclient().stream_file(url, 0, download_size)?;
let request_time = Instant::now();
// Get the first chunk with the headers to get the file size.
// The remainder of that chunk with possibly also a response body is then
// further processed in `audio_file_fetch`.
let response = match streamer.next().await {
Some(Ok(data)) => data,
Some(Err(e)) => return Err(AudioFileError::Cdn(e)),
None => return Err(AudioFileError::Empty),
};
let header_value = response
.headers()
.get(CONTENT_RANGE)
.ok_or(AudioFileError::Parsing)?;
let str_value = header_value.to_str().map_err(|_| AudioFileError::Parsing)?;
let file_size_str = str_value.split('/').last().ok_or(AudioFileError::Parsing)?;
let file_size = file_size_str.parse().map_err(|_| AudioFileError::Parsing)?;
let initial_request = StreamingRequest {
streamer,
initial_body: Some(response.into_body()),
offset: 0,
length: download_size,
request_time,
};
let shared = Arc::new(AudioFileShared { let shared = Arc::new(AudioFileShared {
file_id, cdn_url,
file_size: size, file_size,
stream_data_rate: streaming_data_rate, bytes_per_second,
cond: Condvar::new(), cond: Condvar::new(),
download_status: Mutex::new(AudioFileDownloadStatus { download_status: Mutex::new(AudioFileDownloadStatus {
requested: RangeSet::new(), requested: RangeSet::new(),
@ -372,20 +406,17 @@ impl AudioFileStreaming {
read_position: AtomicUsize::new(0), read_position: AtomicUsize::new(0),
}); });
let mut write_file = NamedTempFile::new().unwrap(); // TODO : use new_in() to store securely in librespot directory
write_file.as_file().set_len(size as u64).unwrap(); let write_file = NamedTempFile::new().unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
let read_file = write_file.reopen().unwrap(); let read_file = write_file.reopen().unwrap();
// let (seek_tx, seek_rx) = mpsc::unbounded();
let (stream_loader_command_tx, stream_loader_command_rx) = let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded_channel::<StreamLoaderCommand>(); mpsc::unbounded_channel::<StreamLoaderCommand>();
session.spawn(audio_file_fetch( session.spawn(audio_file_fetch(
session.clone(), session.clone(),
shared.clone(), shared.clone(),
initial_data, initial_request,
write_file, write_file,
stream_loader_command_rx, stream_loader_command_rx,
complete_tx, complete_tx,
@ -422,10 +453,10 @@ impl Read for AudioFileStreaming {
let length_to_request = length let length_to_request = length
+ max( + max(
(READ_AHEAD_DURING_PLAYBACK.as_secs_f32() (READ_AHEAD_DURING_PLAYBACK.as_secs_f32()
* self.shared.stream_data_rate as f32) as usize, * self.shared.bytes_per_second as f32) as usize,
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* ping_time_seconds * ping_time_seconds
* self.shared.stream_data_rate as f32) as usize, * self.shared.bytes_per_second as f32) as usize,
); );
min(length_to_request, self.shared.file_size - offset) min(length_to_request, self.shared.file_size - offset)
} }

View file

@ -4,56 +4,21 @@ use std::sync::{atomic, Arc};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use atomic::Ordering; use atomic::Ordering;
use byteorder::{BigEndian, WriteBytesExt};
use bytes::Bytes; use bytes::Bytes;
use futures_util::StreamExt; use futures_util::StreamExt;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use librespot_core::channel::{Channel, ChannelData};
use librespot_core::file_id::FileId;
use librespot_core::packet::PacketType;
use librespot_core::session::Session; use librespot_core::session::Session;
use crate::range_set::{Range, RangeSet}; use crate::range_set::{Range, RangeSet};
use super::{AudioFileShared, DownloadStrategy, InitialData, StreamLoaderCommand}; use super::{AudioFileShared, DownloadStrategy, StreamLoaderCommand, StreamingRequest};
use super::{ use super::{
FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS, FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS,
MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR, MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR,
}; };
pub fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
assert!(
offset % 4 == 0,
"Range request start positions must be aligned by 4 bytes."
);
assert!(
length % 4 == 0,
"Range request range lengths must be aligned by 4 bytes."
);
let start = offset / 4;
let end = (offset + length) / 4;
let (id, channel) = session.channel().allocate();
let mut data: Vec<u8> = Vec::new();
data.write_u16::<BigEndian>(id).unwrap();
data.write_u8(0).unwrap();
data.write_u8(1).unwrap();
data.write_u16::<BigEndian>(0x0000).unwrap();
data.write_u32::<BigEndian>(0x00000000).unwrap();
data.write_u32::<BigEndian>(0x00009C40).unwrap();
data.write_u32::<BigEndian>(0x00020000).unwrap();
data.write_all(&file.0).unwrap();
data.write_u32::<BigEndian>(start as u32).unwrap();
data.write_u32::<BigEndian>(end as u32).unwrap();
session.send_packet(PacketType::StreamChunk, data);
channel
}
struct PartialFileData { struct PartialFileData {
offset: usize, offset: usize,
data: Bytes, data: Bytes,
@ -67,13 +32,13 @@ enum ReceivedData {
async fn receive_data( async fn receive_data(
shared: Arc<AudioFileShared>, shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>, file_data_tx: mpsc::UnboundedSender<ReceivedData>,
mut data_rx: ChannelData, mut request: StreamingRequest,
initial_data_offset: usize,
initial_request_length: usize,
request_sent_time: Instant,
) { ) {
let mut data_offset = initial_data_offset; let requested_offset = request.offset;
let mut request_length = initial_request_length; let requested_length = request.length;
let mut data_offset = requested_offset;
let mut request_length = requested_length;
let old_number_of_request = shared let old_number_of_request = shared
.number_of_open_requests .number_of_open_requests
@ -82,21 +47,31 @@ async fn receive_data(
let mut measure_ping_time = old_number_of_request == 0; let mut measure_ping_time = old_number_of_request == 0;
let result = loop { let result = loop {
let data = match data_rx.next().await { let body = match request.initial_body.take() {
Some(Ok(data)) => data, Some(data) => data,
Some(Err(e)) => break Err(e), None => match request.streamer.next().await {
None => break Ok(()), Some(Ok(response)) => response.into_body(),
Some(Err(e)) => break Err(e),
None => break Ok(()),
},
};
let data = match hyper::body::to_bytes(body).await {
Ok(bytes) => bytes,
Err(e) => break Err(e),
}; };
if measure_ping_time { if measure_ping_time {
let mut duration = Instant::now() - request_sent_time; let mut duration = Instant::now() - request.request_time;
if duration > MAXIMUM_ASSUMED_PING_TIME { if duration > MAXIMUM_ASSUMED_PING_TIME {
duration = MAXIMUM_ASSUMED_PING_TIME; duration = MAXIMUM_ASSUMED_PING_TIME;
} }
let _ = file_data_tx.send(ReceivedData::ResponseTime(duration)); let _ = file_data_tx.send(ReceivedData::ResponseTime(duration));
measure_ping_time = false; measure_ping_time = false;
} }
let data_size = data.len(); let data_size = data.len();
let _ = file_data_tx.send(ReceivedData::Data(PartialFileData { let _ = file_data_tx.send(ReceivedData::Data(PartialFileData {
offset: data_offset, offset: data_offset,
data, data,
@ -104,8 +79,8 @@ async fn receive_data(
data_offset += data_size; data_offset += data_size;
if request_length < data_size { if request_length < data_size {
warn!( warn!(
"Data receiver for range {} (+{}) received more data from server than requested.", "Data receiver for range {} (+{}) received more data from server than requested ({} instead of {}).",
initial_data_offset, initial_request_length requested_offset, requested_length, data_size, request_length
); );
request_length = 0; request_length = 0;
} else { } else {
@ -117,6 +92,8 @@ async fn receive_data(
} }
}; };
drop(request.streamer);
if request_length > 0 { if request_length > 0 {
let missing_range = Range::new(data_offset, request_length); let missing_range = Range::new(data_offset, request_length);
@ -129,15 +106,15 @@ async fn receive_data(
.number_of_open_requests .number_of_open_requests
.fetch_sub(1, Ordering::SeqCst); .fetch_sub(1, Ordering::SeqCst);
if result.is_err() { if let Err(e) = result {
warn!( error!(
"Error from channel for data receiver for range {} (+{}).", "Error from streamer for range {} (+{}): {:?}",
initial_data_offset, initial_request_length requested_offset, requested_length, e
); );
} else if request_length > 0 { } else if request_length > 0 {
warn!( warn!(
"Data receiver for range {} (+{}) received less data from server than requested.", "Streamer for range {} (+{}) received less data from server than requested.",
initial_data_offset, initial_request_length requested_offset, requested_length
); );
} }
} }
@ -164,12 +141,12 @@ impl AudioFileFetch {
*(self.shared.download_strategy.lock().unwrap()) *(self.shared.download_strategy.lock().unwrap())
} }
fn download_range(&mut self, mut offset: usize, mut length: usize) { fn download_range(&mut self, offset: usize, mut length: usize) {
if length < MINIMUM_DOWNLOAD_SIZE { if length < MINIMUM_DOWNLOAD_SIZE {
length = MINIMUM_DOWNLOAD_SIZE; length = MINIMUM_DOWNLOAD_SIZE;
} }
// ensure the values are within the bounds and align them by 4 for the spotify protocol. // ensure the values are within the bounds
if offset >= self.shared.file_size { if offset >= self.shared.file_size {
return; return;
} }
@ -182,15 +159,6 @@ impl AudioFileFetch {
length = self.shared.file_size - offset; length = self.shared.file_size - offset;
} }
if offset % 4 != 0 {
length += offset % 4;
offset -= offset % 4;
}
if length % 4 != 0 {
length += 4 - (length % 4);
}
let mut ranges_to_request = RangeSet::new(); let mut ranges_to_request = RangeSet::new();
ranges_to_request.add_range(&Range::new(offset, length)); ranges_to_request.add_range(&Range::new(offset, length));
@ -199,25 +167,43 @@ impl AudioFileFetch {
ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested); ranges_to_request.subtract_range_set(&download_status.requested);
let cdn_url = &self.shared.cdn_url;
let file_id = cdn_url.file_id;
for range in ranges_to_request.iter() { for range in ranges_to_request.iter() {
let (_headers, data) = request_range( match cdn_url.urls.first() {
&self.session, Some(url) => {
self.shared.file_id, match self
range.start, .session
range.length, .spclient()
) .stream_file(&url.0, range.start, range.length)
.split(); {
Ok(streamer) => {
download_status.requested.add_range(range);
download_status.requested.add_range(range); let streaming_request = StreamingRequest {
streamer,
initial_body: None,
offset: range.start,
length: range.length,
request_time: Instant::now(),
};
self.session.spawn(receive_data( self.session.spawn(receive_data(
self.shared.clone(), self.shared.clone(),
self.file_data_tx.clone(), self.file_data_tx.clone(),
data, streaming_request,
range.start, ));
range.length, }
Instant::now(), Err(e) => {
)); error!("Unable to open stream for track <{}>: {:?}", file_id, e);
}
}
}
None => {
error!("Unable to get CDN URL for track <{}>", file_id);
}
}
} }
} }
@ -268,8 +254,7 @@ impl AudioFileFetch {
fn handle_file_data(&mut self, data: ReceivedData) -> ControlFlow { fn handle_file_data(&mut self, data: ReceivedData) -> ControlFlow {
match data { match data {
ReceivedData::ResponseTime(response_time) => { ReceivedData::ResponseTime(response_time) => {
// chatty trace!("Ping time estimated as: {} ms", response_time.as_millis());
// trace!("Ping time estimated as: {}ms", response_time.as_millis());
// prune old response times. Keep at most two so we can push a third. // prune old response times. Keep at most two so we can push a third.
while self.network_response_times.len() >= 3 { while self.network_response_times.len() >= 3 {
@ -356,7 +341,7 @@ impl AudioFileFetch {
pub(super) async fn audio_file_fetch( pub(super) async fn audio_file_fetch(
session: Session, session: Session,
shared: Arc<AudioFileShared>, shared: Arc<AudioFileShared>,
initial_data: InitialData, initial_request: StreamingRequest,
output: NamedTempFile, output: NamedTempFile,
mut stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>, mut stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
complete_tx: oneshot::Sender<NamedTempFile>, complete_tx: oneshot::Sender<NamedTempFile>,
@ -364,7 +349,10 @@ pub(super) async fn audio_file_fetch(
let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel(); let (file_data_tx, mut file_data_rx) = mpsc::unbounded_channel();
{ {
let requested_range = Range::new(0, initial_data.length); let requested_range = Range::new(
initial_request.offset,
initial_request.offset + initial_request.length,
);
let mut download_status = shared.download_status.lock().unwrap(); let mut download_status = shared.download_status.lock().unwrap();
download_status.requested.add_range(&requested_range); download_status.requested.add_range(&requested_range);
} }
@ -372,14 +360,11 @@ pub(super) async fn audio_file_fetch(
session.spawn(receive_data( session.spawn(receive_data(
shared.clone(), shared.clone(),
file_data_tx.clone(), file_data_tx.clone(),
initial_data.rx, initial_request,
0,
initial_data.length,
initial_data.request_sent_time,
)); ));
let mut fetch = AudioFileFetch { let mut fetch = AudioFileFetch {
session, session: session.clone(),
shared, shared,
output: Some(output), output: Some(output),
@ -424,7 +409,7 @@ pub(super) async fn audio_file_fetch(
let desired_pending_bytes = max( let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR (PREFETCH_THRESHOLD_FACTOR
* ping_time_seconds * ping_time_seconds
* fetch.shared.stream_data_rate as f32) as usize, * fetch.shared.bytes_per_second as f32) as usize,
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f32) (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f32)
as usize, as usize,
); );

View file

@ -17,6 +17,7 @@ aes = "0.6"
base64 = "0.13" base64 = "0.13"
byteorder = "1.4" byteorder = "1.4"
bytes = "1" bytes = "1"
chrono = "0.4"
form_urlencoded = "1.0" form_urlencoded = "1.0"
futures-core = { version = "0.3", default-features = false } futures-core = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] } futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] }
@ -38,11 +39,13 @@ priority-queue = "1.1"
protobuf = "2.14.0" protobuf = "2.14.0"
quick-xml = { version = "0.22", features = [ "serialize" ] } quick-xml = { version = "0.22", features = [ "serialize" ] }
rand = "0.8" rand = "0.8"
rustls = "0.19"
rustls-native-certs = "0.5"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha-1 = "0.9" sha-1 = "0.9"
shannon = "0.2.0" shannon = "0.2.0"
thiserror = "1.0.7" thiserror = "1.0"
tokio = { version = "1.5", features = ["io-util", "macros", "net", "rt", "time", "sync"] } tokio = { version = "1.5", features = ["io-util", "macros", "net", "rt", "time", "sync"] }
tokio-stream = "0.1.1" tokio-stream = "0.1.1"
tokio-tungstenite = { version = "0.14", default-features = false, features = ["rustls-tls"] } tokio-tungstenite = { version = "0.14", default-features = false, features = ["rustls-tls"] }

View file

@ -1,4 +1,4 @@
use hyper::{Body, Request}; use hyper::{Body, Method, Request};
use serde::Deserialize; use serde::Deserialize;
use std::error::Error; use std::error::Error;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
@ -69,7 +69,7 @@ impl ApResolver {
pub async fn try_apresolve(&self) -> Result<ApResolveData, Box<dyn Error>> { pub async fn try_apresolve(&self) -> Result<ApResolveData, Box<dyn Error>> {
let req = Request::builder() let req = Request::builder()
.method("GET") .method(Method::GET)
.uri("http://apresolve.spotify.com/?type=accesspoint&type=dealer&type=spclient") .uri("http://apresolve.spotify.com/?type=accesspoint&type=dealer&type=spclient")
.body(Body::empty())?; .body(Body::empty())?;

151
core/src/cdn_url.rs Normal file
View file

@ -0,0 +1,151 @@
use chrono::Local;
use protobuf::{Message, ProtobufError};
use thiserror::Error;
use url::Url;
use std::convert::{TryFrom, TryInto};
use std::ops::{Deref, DerefMut};
use super::date::Date;
use super::file_id::FileId;
use super::session::Session;
use super::spclient::SpClientError;
use librespot_protocol as protocol;
use protocol::storage_resolve::StorageResolveResponse as CdnUrlMessage;
use protocol::storage_resolve::StorageResolveResponse_Result;
#[derive(Error, Debug)]
pub enum CdnUrlError {
#[error("no URLs available")]
Empty,
#[error("all tokens expired")]
Expired,
#[error("error parsing response")]
Parsing,
#[error("could not parse protobuf: {0}")]
Protobuf(#[from] ProtobufError),
#[error("could not complete API request: {0}")]
SpClient(#[from] SpClientError),
}
#[derive(Debug, Clone)]
pub struct MaybeExpiringUrl(pub String, pub Option<Date>);
#[derive(Debug, Clone)]
pub struct MaybeExpiringUrls(pub Vec<MaybeExpiringUrl>);
impl Deref for MaybeExpiringUrls {
type Target = Vec<MaybeExpiringUrl>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for MaybeExpiringUrls {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[derive(Debug, Clone)]
pub struct CdnUrl {
pub file_id: FileId,
pub urls: MaybeExpiringUrls,
}
impl CdnUrl {
pub fn new(file_id: FileId) -> Self {
Self {
file_id,
urls: MaybeExpiringUrls(Vec::new()),
}
}
pub async fn resolve_audio(&self, session: &Session) -> Result<Self, CdnUrlError> {
let file_id = self.file_id;
let response = session.spclient().get_audio_urls(file_id).await?;
let msg = CdnUrlMessage::parse_from_bytes(&response)?;
let urls = MaybeExpiringUrls::try_from(msg)?;
let cdn_url = Self { file_id, urls };
trace!("Resolved CDN storage: {:#?}", cdn_url);
Ok(cdn_url)
}
pub fn get_url(&mut self) -> Result<&str, CdnUrlError> {
if self.urls.is_empty() {
return Err(CdnUrlError::Empty);
}
// remove expired URLs until the first one is current, or none are left
let now = Local::now();
while !self.urls.is_empty() {
let maybe_expiring = self.urls[0].1;
if let Some(expiry) = maybe_expiring {
if now < expiry.as_utc() {
break;
} else {
self.urls.remove(0);
}
}
}
if let Some(cdn_url) = self.urls.first() {
Ok(&cdn_url.0)
} else {
Err(CdnUrlError::Expired)
}
}
}
impl TryFrom<CdnUrlMessage> for MaybeExpiringUrls {
type Error = CdnUrlError;
fn try_from(msg: CdnUrlMessage) -> Result<Self, Self::Error> {
if !matches!(msg.get_result(), StorageResolveResponse_Result::CDN) {
return Err(CdnUrlError::Parsing);
}
let is_expiring = !msg.get_fileid().is_empty();
let result = msg
.get_cdnurl()
.iter()
.map(|cdn_url| {
let url = Url::parse(cdn_url).map_err(|_| CdnUrlError::Parsing)?;
if is_expiring {
let expiry_str = if let Some(token) = url
.query_pairs()
.into_iter()
.find(|(key, _value)| key == "__token__")
{
let start = token.1.find("exp=").ok_or(CdnUrlError::Parsing)?;
let slice = &token.1[start + 4..];
let end = slice.find('~').ok_or(CdnUrlError::Parsing)?;
String::from(&slice[..end])
} else if let Some(query) = url.query() {
let mut items = query.split('_');
String::from(items.next().ok_or(CdnUrlError::Parsing)?)
} else {
return Err(CdnUrlError::Parsing);
};
let mut expiry: i64 = expiry_str.parse().map_err(|_| CdnUrlError::Parsing)?;
expiry -= 5 * 60; // seconds
Ok(MaybeExpiringUrl(
cdn_url.to_owned(),
Some(expiry.try_into().map_err(|_| CdnUrlError::Parsing)?),
))
} else {
Ok(MaybeExpiringUrl(cdn_url.to_owned(), None))
}
})
.collect::<Result<Vec<MaybeExpiringUrl>, CdnUrlError>>()?;
Ok(Self(result))
}
}

View file

@ -4,13 +4,17 @@ use std::ops::Deref;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use thiserror::Error;
use crate::error::MetadataError;
use librespot_protocol as protocol; use librespot_protocol as protocol;
use protocol::metadata::Date as DateMessage; use protocol::metadata::Date as DateMessage;
#[derive(Debug, Error)]
pub enum DateError {
#[error("item has invalid date")]
InvalidTimestamp,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Date(pub DateTime<Utc>); pub struct Date(pub DateTime<Utc>);
@ -26,11 +30,11 @@ impl Date {
self.0.timestamp() self.0.timestamp()
} }
pub fn from_timestamp(timestamp: i64) -> Result<Self, MetadataError> { pub fn from_timestamp(timestamp: i64) -> Result<Self, DateError> {
if let Some(date_time) = NaiveDateTime::from_timestamp_opt(timestamp, 0) { if let Some(date_time) = NaiveDateTime::from_timestamp_opt(timestamp, 0) {
Ok(Self::from_utc(date_time)) Ok(Self::from_utc(date_time))
} else { } else {
Err(MetadataError::InvalidTimestamp) Err(DateError::InvalidTimestamp)
} }
} }
@ -63,7 +67,7 @@ impl From<DateTime<Utc>> for Date {
} }
impl TryFrom<i64> for Date { impl TryFrom<i64> for Date {
type Error = MetadataError; type Error = DateError;
fn try_from(timestamp: i64) -> Result<Self, Self::Error> { fn try_from(timestamp: i64) -> Result<Self, Self::Error> {
Self::from_timestamp(timestamp) Self::from_timestamp(timestamp)
} }

View file

@ -1,10 +1,14 @@
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::IntoStream;
use futures_util::FutureExt;
use http::header::HeaderValue; use http::header::HeaderValue;
use http::uri::InvalidUri; use http::uri::InvalidUri;
use hyper::header::InvalidHeaderValue; use hyper::client::{HttpConnector, ResponseFuture};
use hyper::header::{InvalidHeaderValue, USER_AGENT};
use hyper::{Body, Client, Request, Response, StatusCode}; use hyper::{Body, Client, Request, Response, StatusCode};
use hyper_proxy::{Intercept, Proxy, ProxyConnector}; use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use hyper_rustls::HttpsConnector; use hyper_rustls::HttpsConnector;
use rustls::ClientConfig;
use std::env::consts::OS; use std::env::consts::OS;
use thiserror::Error; use thiserror::Error;
use url::Url; use url::Url;
@ -13,6 +17,7 @@ use crate::version::{SPOTIFY_MOBILE_VERSION, SPOTIFY_VERSION, VERSION_STRING};
pub struct HttpClient { pub struct HttpClient {
proxy: Option<Url>, proxy: Option<Url>,
tls_config: ClientConfig,
} }
#[derive(Error, Debug)] #[derive(Error, Debug)]
@ -43,15 +48,60 @@ impl From<InvalidUri> for HttpClientError {
impl HttpClient { impl HttpClient {
pub fn new(proxy: Option<&Url>) -> Self { pub fn new(proxy: Option<&Url>) -> Self {
// configuring TLS is expensive and should be done once per process
let root_store = match rustls_native_certs::load_native_certs() {
Ok(store) => store,
Err((Some(store), err)) => {
warn!("Could not load all certificates: {:?}", err);
store
}
Err((None, err)) => Err(err).expect("cannot access native cert store"),
};
let mut tls_config = ClientConfig::new();
tls_config.root_store = root_store;
tls_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
Self { Self {
proxy: proxy.cloned(), proxy: proxy.cloned(),
tls_config,
} }
} }
pub async fn request(&self, mut req: Request<Body>) -> Result<Response<Body>, HttpClientError> { pub async fn request(&self, req: Request<Body>) -> Result<Response<Body>, HttpClientError> {
let request = self.request_fut(req)?;
{
let response = request.await;
if let Ok(response) = &response {
let status = response.status();
if status != StatusCode::OK {
return Err(HttpClientError::NotOK(status.into()));
}
}
response.map_err(HttpClientError::Response)
}
}
pub async fn request_body(&self, req: Request<Body>) -> Result<Bytes, HttpClientError> {
let response = self.request(req).await?;
hyper::body::to_bytes(response.into_body())
.await
.map_err(HttpClientError::Response)
}
pub fn request_stream(
&self,
req: Request<Body>,
) -> Result<IntoStream<ResponseFuture>, HttpClientError> {
Ok(self.request_fut(req)?.into_stream())
}
pub fn request_fut(&self, mut req: Request<Body>) -> Result<ResponseFuture, HttpClientError> {
trace!("Requesting {:?}", req.uri().to_string()); trace!("Requesting {:?}", req.uri().to_string());
let connector = HttpsConnector::with_native_roots(); let mut http = HttpConnector::new();
http.enforce_http(false);
let connector = HttpsConnector::from((http, self.tls_config.clone()));
let spotify_version = match OS { let spotify_version = match OS {
"android" | "ios" => SPOTIFY_MOBILE_VERSION.to_owned(), "android" | "ios" => SPOTIFY_MOBILE_VERSION.to_owned(),
@ -68,7 +118,7 @@ impl HttpClient {
let headers_mut = req.headers_mut(); let headers_mut = req.headers_mut();
headers_mut.insert( headers_mut.insert(
"User-Agent", USER_AGENT,
// Some features like lyrics are version-gated and require an official version string. // Some features like lyrics are version-gated and require an official version string.
HeaderValue::from_str(&format!( HeaderValue::from_str(&format!(
"Spotify/{} {} ({})", "Spotify/{} {} ({})",
@ -76,38 +126,16 @@ impl HttpClient {
))?, ))?,
); );
let response = if let Some(url) = &self.proxy { let request = if let Some(url) = &self.proxy {
let proxy_uri = url.to_string().parse()?; let proxy_uri = url.to_string().parse()?;
let proxy = Proxy::new(Intercept::All, proxy_uri); let proxy = Proxy::new(Intercept::All, proxy_uri);
let proxy_connector = ProxyConnector::from_proxy(connector, proxy)?; let proxy_connector = ProxyConnector::from_proxy(connector, proxy)?;
Client::builder() Client::builder().build(proxy_connector).request(req)
.build(proxy_connector)
.request(req)
.await
.map_err(HttpClientError::Request)
} else { } else {
Client::builder() Client::builder().build(connector).request(req)
.build(connector)
.request(req)
.await
.map_err(HttpClientError::Request)
}; };
if let Ok(response) = &response { Ok(request)
let status = response.status();
if status != StatusCode::OK {
return Err(HttpClientError::NotOK(status.into()));
}
}
response
}
pub async fn request_body(&self, req: Request<Body>) -> Result<Bytes, HttpClientError> {
let response = self.request(req).await?;
hyper::body::to_bytes(response.into_body())
.await
.map_err(HttpClientError::Response)
} }
} }

View file

@ -11,9 +11,11 @@ pub mod apresolve;
pub mod audio_key; pub mod audio_key;
pub mod authentication; pub mod authentication;
pub mod cache; pub mod cache;
pub mod cdn_url;
pub mod channel; pub mod channel;
pub mod config; pub mod config;
mod connection; mod connection;
pub mod date;
#[allow(dead_code)] #[allow(dead_code)]
mod dealer; mod dealer;
#[doc(hidden)] #[doc(hidden)]

View file

@ -8,9 +8,11 @@ use crate::protocol::extended_metadata::BatchedEntityRequest;
use crate::spotify_id::SpotifyId; use crate::spotify_id::SpotifyId;
use bytes::Bytes; use bytes::Bytes;
use futures_util::future::IntoStream;
use http::header::HeaderValue; use http::header::HeaderValue;
use hyper::header::InvalidHeaderValue; use hyper::client::ResponseFuture;
use hyper::{Body, HeaderMap, Request}; use hyper::header::{InvalidHeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE, RANGE};
use hyper::{Body, HeaderMap, Method, Request};
use protobuf::Message; use protobuf::Message;
use rand::Rng; use rand::Rng;
use std::time::Duration; use std::time::Duration;
@ -86,7 +88,7 @@ impl SpClient {
pub async fn request_with_protobuf( pub async fn request_with_protobuf(
&self, &self,
method: &str, method: &Method,
endpoint: &str, endpoint: &str,
headers: Option<HeaderMap>, headers: Option<HeaderMap>,
message: &dyn Message, message: &dyn Message,
@ -94,7 +96,7 @@ impl SpClient {
let body = protobuf::text_format::print_to_string(message); let body = protobuf::text_format::print_to_string(message);
let mut headers = headers.unwrap_or_else(HeaderMap::new); let mut headers = headers.unwrap_or_else(HeaderMap::new);
headers.insert("Content-Type", "application/protobuf".parse()?); headers.insert(CONTENT_TYPE, "application/protobuf".parse()?);
self.request(method, endpoint, Some(headers), Some(body)) self.request(method, endpoint, Some(headers), Some(body))
.await .await
@ -102,20 +104,20 @@ impl SpClient {
pub async fn request_as_json( pub async fn request_as_json(
&self, &self,
method: &str, method: &Method,
endpoint: &str, endpoint: &str,
headers: Option<HeaderMap>, headers: Option<HeaderMap>,
body: Option<String>, body: Option<String>,
) -> SpClientResult { ) -> SpClientResult {
let mut headers = headers.unwrap_or_else(HeaderMap::new); let mut headers = headers.unwrap_or_else(HeaderMap::new);
headers.insert("Accept", "application/json".parse()?); headers.insert(ACCEPT, "application/json".parse()?);
self.request(method, endpoint, Some(headers), body).await self.request(method, endpoint, Some(headers), body).await
} }
pub async fn request( pub async fn request(
&self, &self,
method: &str, method: &Method,
endpoint: &str, endpoint: &str,
headers: Option<HeaderMap>, headers: Option<HeaderMap>,
body: Option<String>, body: Option<String>,
@ -130,12 +132,12 @@ impl SpClient {
// Reconnection logic: retrieve the endpoint every iteration, so we can try // Reconnection logic: retrieve the endpoint every iteration, so we can try
// another access point when we are experiencing network issues (see below). // another access point when we are experiencing network issues (see below).
let mut uri = self.base_url().await; let mut url = self.base_url().await;
uri.push_str(endpoint); url.push_str(endpoint);
let mut request = Request::builder() let mut request = Request::builder()
.method(method) .method(method)
.uri(uri) .uri(url)
.body(Body::from(body.clone()))?; .body(Body::from(body.clone()))?;
// Reconnection logic: keep getting (cached) tokens because they might have expired. // Reconnection logic: keep getting (cached) tokens because they might have expired.
@ -144,7 +146,7 @@ impl SpClient {
*headers_mut = hdrs.clone(); *headers_mut = hdrs.clone();
} }
headers_mut.insert( headers_mut.insert(
"Authorization", AUTHORIZATION,
HeaderValue::from_str(&format!( HeaderValue::from_str(&format!(
"Bearer {}", "Bearer {}",
self.session() self.session()
@ -212,13 +214,13 @@ impl SpClient {
let mut headers = HeaderMap::new(); let mut headers = HeaderMap::new();
headers.insert("X-Spotify-Connection-Id", connection_id.parse()?); headers.insert("X-Spotify-Connection-Id", connection_id.parse()?);
self.request_with_protobuf("PUT", &endpoint, Some(headers), &state) self.request_with_protobuf(&Method::PUT, &endpoint, Some(headers), &state)
.await .await
} }
pub async fn get_metadata(&self, scope: &str, id: SpotifyId) -> SpClientResult { pub async fn get_metadata(&self, scope: &str, id: SpotifyId) -> SpClientResult {
let endpoint = format!("/metadata/4/{}/{}", scope, id.to_base16()); let endpoint = format!("/metadata/4/{}/{}", scope, id.to_base16());
self.request("GET", &endpoint, None, None).await self.request(&Method::GET, &endpoint, None, None).await
} }
pub async fn get_track_metadata(&self, track_id: SpotifyId) -> SpClientResult { pub async fn get_track_metadata(&self, track_id: SpotifyId) -> SpClientResult {
@ -244,7 +246,8 @@ impl SpClient {
pub async fn get_lyrics(&self, track_id: SpotifyId) -> SpClientResult { pub async fn get_lyrics(&self, track_id: SpotifyId) -> SpClientResult {
let endpoint = format!("/color-lyrics/v1/track/{}", track_id.to_base62(),); let endpoint = format!("/color-lyrics/v1/track/{}", track_id.to_base62(),);
self.request_as_json("GET", &endpoint, None, None).await self.request_as_json(&Method::GET, &endpoint, None, None)
.await
} }
pub async fn get_lyrics_for_image( pub async fn get_lyrics_for_image(
@ -258,19 +261,48 @@ impl SpClient {
image_id image_id
); );
self.request_as_json("GET", &endpoint, None, None).await self.request_as_json(&Method::GET, &endpoint, None, None)
.await
} }
// TODO: Find endpoint for newer canvas.proto and upgrade to that. // TODO: Find endpoint for newer canvas.proto and upgrade to that.
pub async fn get_canvases(&self, request: EntityCanvazRequest) -> SpClientResult { pub async fn get_canvases(&self, request: EntityCanvazRequest) -> SpClientResult {
let endpoint = "/canvaz-cache/v0/canvases"; let endpoint = "/canvaz-cache/v0/canvases";
self.request_with_protobuf("POST", endpoint, None, &request) self.request_with_protobuf(&Method::POST, endpoint, None, &request)
.await .await
} }
pub async fn get_extended_metadata(&self, request: BatchedEntityRequest) -> SpClientResult { pub async fn get_extended_metadata(&self, request: BatchedEntityRequest) -> SpClientResult {
let endpoint = "/extended-metadata/v0/extended-metadata"; let endpoint = "/extended-metadata/v0/extended-metadata";
self.request_with_protobuf("POST", endpoint, None, &request) self.request_with_protobuf(&Method::POST, endpoint, None, &request)
.await .await
} }
pub async fn get_audio_urls(&self, file_id: FileId) -> SpClientResult {
let endpoint = format!(
"/storage-resolve/files/audio/interactive/{}",
file_id.to_base16()
);
self.request(&Method::GET, &endpoint, None, None).await
}
pub fn stream_file(
&self,
url: &str,
offset: usize,
length: usize,
) -> Result<IntoStream<ResponseFuture>, SpClientError> {
let req = Request::builder()
.method(&Method::GET)
.uri(url)
.header(
RANGE,
HeaderValue::from_str(&format!("bytes={}-{}", offset, offset + length - 1))?,
)
.body(Body::empty())?;
let stream = self.session().http_client().request_stream(req)?;
Ok(stream)
}
} }

View file

@ -6,7 +6,6 @@ use crate::{
artist::Artists, artist::Artists,
availability::Availabilities, availability::Availabilities,
copyright::Copyrights, copyright::Copyrights,
date::Date,
error::{MetadataError, RequestError}, error::{MetadataError, RequestError},
external_id::ExternalIds, external_id::ExternalIds,
image::Images, image::Images,
@ -18,6 +17,7 @@ use crate::{
Metadata, Metadata,
}; };
use librespot_core::date::Date;
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId; use librespot_core::spotify_id::SpotifyId;
use librespot_protocol as protocol; use librespot_protocol as protocol;

View file

@ -3,8 +3,9 @@ use std::ops::Deref;
use thiserror::Error; use thiserror::Error;
use crate::{date::Date, util::from_repeated_message}; use crate::util::from_repeated_message;
use librespot_core::date::Date;
use librespot_protocol as protocol; use librespot_protocol as protocol;
use protocol::metadata::Availability as AvailabilityMessage; use protocol::metadata::Availability as AvailabilityMessage;

View file

@ -9,7 +9,6 @@ use crate::{
}, },
availability::Availabilities, availability::Availabilities,
content_rating::ContentRatings, content_rating::ContentRatings,
date::Date,
error::{MetadataError, RequestError}, error::{MetadataError, RequestError},
image::Images, image::Images,
request::RequestResult, request::RequestResult,
@ -19,6 +18,7 @@ use crate::{
Metadata, Metadata,
}; };
use librespot_core::date::Date;
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId; use librespot_core::spotify_id::SpotifyId;
use librespot_protocol as protocol; use librespot_protocol as protocol;

View file

@ -3,6 +3,7 @@ use thiserror::Error;
use protobuf::ProtobufError; use protobuf::ProtobufError;
use librespot_core::date::DateError;
use librespot_core::mercury::MercuryError; use librespot_core::mercury::MercuryError;
use librespot_core::spclient::SpClientError; use librespot_core::spclient::SpClientError;
use librespot_core::spotify_id::SpotifyIdError; use librespot_core::spotify_id::SpotifyIdError;
@ -22,7 +23,7 @@ pub enum MetadataError {
#[error("{0}")] #[error("{0}")]
InvalidSpotifyId(#[from] SpotifyIdError), InvalidSpotifyId(#[from] SpotifyIdError),
#[error("item has invalid date")] #[error("item has invalid date")]
InvalidTimestamp, InvalidTimestamp(#[from] DateError),
#[error("audio item is non-playable")] #[error("audio item is non-playable")]
NonPlayable, NonPlayable,
#[error("could not parse protobuf: {0}")] #[error("could not parse protobuf: {0}")]

View file

@ -15,7 +15,6 @@ pub mod audio;
pub mod availability; pub mod availability;
pub mod content_rating; pub mod content_rating;
pub mod copyright; pub mod copyright;
pub mod date;
pub mod episode; pub mod episode;
pub mod error; pub mod error;
pub mod external_id; pub mod external_id;

View file

@ -3,8 +3,9 @@ use std::convert::{TryFrom, TryInto};
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Deref; use std::ops::Deref;
use crate::{date::Date, error::MetadataError, image::PictureSizes, util::from_repeated_enum}; use crate::{error::MetadataError, image::PictureSizes, util::from_repeated_enum};
use librespot_core::date::Date;
use librespot_core::spotify_id::SpotifyId; use librespot_core::spotify_id::SpotifyId;
use librespot_protocol as protocol; use librespot_protocol as protocol;

View file

@ -2,10 +2,11 @@ use std::convert::{TryFrom, TryInto};
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Deref; use std::ops::Deref;
use crate::{date::Date, error::MetadataError, util::try_from_repeated_message}; use crate::{error::MetadataError, util::try_from_repeated_message};
use super::attribute::{PlaylistAttributes, PlaylistItemAttributes}; use super::attribute::{PlaylistAttributes, PlaylistItemAttributes};
use librespot_core::date::Date;
use librespot_core::spotify_id::SpotifyId; use librespot_core::spotify_id::SpotifyId;
use librespot_protocol as protocol; use librespot_protocol as protocol;

View file

@ -5,7 +5,6 @@ use std::ops::Deref;
use protobuf::Message; use protobuf::Message;
use crate::{ use crate::{
date::Date,
error::MetadataError, error::MetadataError,
request::{MercuryRequest, RequestResult}, request::{MercuryRequest, RequestResult},
util::{from_repeated_enum, try_from_repeated_message}, util::{from_repeated_enum, try_from_repeated_message},
@ -17,6 +16,7 @@ use super::{
permission::Capabilities, permission::Capabilities,
}; };
use librespot_core::date::Date;
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::{NamedSpotifyId, SpotifyId}; use librespot_core::spotify_id::{NamedSpotifyId, SpotifyId};
use librespot_protocol as protocol; use librespot_protocol as protocol;

View file

@ -1,8 +1,9 @@
use std::fmt::Debug; use std::fmt::Debug;
use std::ops::Deref; use std::ops::Deref;
use crate::{date::Date, restriction::Restrictions, util::from_repeated_message}; use crate::{restriction::Restrictions, util::from_repeated_message};
use librespot_core::date::Date;
use librespot_protocol as protocol; use librespot_protocol as protocol;
use protocol::metadata::SalePeriod as SalePeriodMessage; use protocol::metadata::SalePeriod as SalePeriodMessage;

View file

@ -13,7 +13,6 @@ use crate::{
}, },
availability::{Availabilities, UnavailabilityReason}, availability::{Availabilities, UnavailabilityReason},
content_rating::ContentRatings, content_rating::ContentRatings,
date::Date,
error::RequestError, error::RequestError,
external_id::ExternalIds, external_id::ExternalIds,
restriction::Restrictions, restriction::Restrictions,
@ -22,6 +21,7 @@ use crate::{
Metadata, MetadataError, RequestResult, Metadata, MetadataError, RequestResult,
}; };
use librespot_core::date::Date;
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId; use librespot_core::spotify_id::SpotifyId;
use librespot_protocol as protocol; use librespot_protocol as protocol;

View file

@ -341,8 +341,6 @@ impl Player {
// While PlayerInternal is written as a future, it still contains blocking code. // While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using block_on() in a dedicated thread. // It must be run by using block_on() in a dedicated thread.
// futures_executor::block_on(internal);
let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime"); let runtime = tokio::runtime::Runtime::new().expect("Failed to create Tokio runtime");
runtime.block_on(internal); runtime.block_on(internal);
@ -1904,7 +1902,7 @@ impl PlayerInternal {
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
let handle = tokio::runtime::Handle::current(); let handle = tokio::runtime::Handle::current();
std::thread::spawn(move || { thread::spawn(move || {
let data = handle.block_on(loader.load_track(spotify_id, position_ms)); let data = handle.block_on(loader.load_track(spotify_id, position_ms));
if let Some(data) = data { if let Some(data) = data {
let _ = result_tx.send(data); let _ = result_tx.send(data);

View file

@ -26,6 +26,7 @@ fn compile() {
proto_dir.join("playlist_annotate3.proto"), proto_dir.join("playlist_annotate3.proto"),
proto_dir.join("playlist_permission.proto"), proto_dir.join("playlist_permission.proto"),
proto_dir.join("playlist4_external.proto"), proto_dir.join("playlist4_external.proto"),
proto_dir.join("storage-resolve.proto"),
proto_dir.join("user_attributes.proto"), proto_dir.join("user_attributes.proto"),
// TODO: remove these legacy protobufs when we are on the new API completely // TODO: remove these legacy protobufs when we are on the new API completely
proto_dir.join("authentication.proto"), proto_dir.join("authentication.proto"),