Call stream_from_cdn with CdnUrl

This commit is contained in:
Roderick van Domburg 2021-12-27 21:37:22 +01:00
parent 4646ff3075
commit 0f78fc277e
No known key found for this signature in database
GPG key ID: A9EF5222A26F0451
4 changed files with 26 additions and 15 deletions

View file

@ -399,12 +399,13 @@ impl AudioFileStreaming {
INITIAL_DOWNLOAD_SIZE INITIAL_DOWNLOAD_SIZE
}; };
trace!("Streaming {}", file_id);
let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?; let cdn_url = CdnUrl::new(file_id).resolve_audio(&session).await?;
let url = cdn_url.try_get_url()?;
trace!("Streaming {:?}", url); let mut streamer = session
.spclient()
let mut streamer = session.spclient().stream_file(url, 0, download_size)?; .stream_from_cdn(&cdn_url, 0, download_size)?;
let request_time = Instant::now(); let request_time = Instant::now();
// Get the first chunk with the headers to get the file size. // Get the first chunk with the headers to get the file size.

View file

@ -43,6 +43,8 @@ async fn receive_data(
let mut data_offset = requested_offset; let mut data_offset = requested_offset;
let mut request_length = requested_length; let mut request_length = requested_length;
// TODO : check Content-Length and Content-Range headers
let old_number_of_request = shared let old_number_of_request = shared
.number_of_open_requests .number_of_open_requests
.fetch_add(1, Ordering::SeqCst); .fetch_add(1, Ordering::SeqCst);
@ -180,14 +182,14 @@ impl AudioFileFetch {
ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested); ranges_to_request.subtract_range_set(&download_status.requested);
// Likewise, checking for the URL expiry once will guarantee validity long enough. // TODO : refresh cdn_url when the token expired
let url = self.shared.cdn_url.try_get_url()?;
for range in ranges_to_request.iter() { for range in ranges_to_request.iter() {
let streamer = self let streamer = self.session.spclient().stream_from_cdn(
.session &self.shared.cdn_url,
.spclient() range.start,
.stream_file(url, range.start, range.length)?; range.length,
)?;
download_status.requested.add_range(range); download_status.requested.add_range(range);

View file

@ -39,13 +39,15 @@ pub enum CdnUrlError {
Expired, Expired,
#[error("resolved storage is not for CDN")] #[error("resolved storage is not for CDN")]
Storage, Storage,
#[error("no URLs resolved")]
Unresolved,
} }
impl From<CdnUrlError> for Error { impl From<CdnUrlError> for Error {
fn from(err: CdnUrlError) -> Self { fn from(err: CdnUrlError) -> Self {
match err { match err {
CdnUrlError::Expired => Error::deadline_exceeded(err), CdnUrlError::Expired => Error::deadline_exceeded(err),
CdnUrlError::Storage => Error::unavailable(err), CdnUrlError::Storage | CdnUrlError::Unresolved => Error::unavailable(err),
} }
} }
} }
@ -66,7 +68,7 @@ impl CdnUrl {
pub async fn resolve_audio(&self, session: &Session) -> Result<Self, Error> { pub async fn resolve_audio(&self, session: &Session) -> Result<Self, Error> {
let file_id = self.file_id; let file_id = self.file_id;
let response = session.spclient().get_audio_urls(file_id).await?; let response = session.spclient().get_audio_storage(file_id).await?;
let msg = CdnUrlMessage::parse_from_bytes(&response)?; let msg = CdnUrlMessage::parse_from_bytes(&response)?;
let urls = MaybeExpiringUrls::try_from(msg)?; let urls = MaybeExpiringUrls::try_from(msg)?;
@ -78,6 +80,10 @@ impl CdnUrl {
} }
pub fn try_get_url(&self) -> Result<&str, Error> { pub fn try_get_url(&self) -> Result<&str, Error> {
if self.urls.is_empty() {
return Err(CdnUrlError::Unresolved.into());
}
let now = Local::now(); let now = Local::now();
let url = self.urls.iter().find(|url| match url.1 { let url = self.urls.iter().find(|url| match url.1 {
Some(expiry) => now < expiry.as_utc(), Some(expiry) => now < expiry.as_utc(),

View file

@ -13,6 +13,7 @@ use rand::Rng;
use crate::{ use crate::{
apresolve::SocketAddress, apresolve::SocketAddress,
cdn_url::CdnUrl,
error::ErrorKind, error::ErrorKind,
protocol::{ protocol::{
canvaz::EntityCanvazRequest, connect::PutStateRequest, canvaz::EntityCanvazRequest, connect::PutStateRequest,
@ -261,7 +262,7 @@ impl SpClient {
.await .await
} }
pub async fn get_audio_urls(&self, file_id: FileId) -> SpClientResult { pub async fn get_audio_storage(&self, file_id: FileId) -> SpClientResult {
let endpoint = format!( let endpoint = format!(
"/storage-resolve/files/audio/interactive/{}", "/storage-resolve/files/audio/interactive/{}",
file_id.to_base16() file_id.to_base16()
@ -269,12 +270,13 @@ impl SpClient {
self.request(&Method::GET, &endpoint, None, None).await self.request(&Method::GET, &endpoint, None, None).await
} }
pub fn stream_file( pub fn stream_from_cdn(
&self, &self,
url: &str, cdn_url: &CdnUrl,
offset: usize, offset: usize,
length: usize, length: usize,
) -> Result<IntoStream<ResponseFuture>, Error> { ) -> Result<IntoStream<ResponseFuture>, Error> {
let url = cdn_url.try_get_url()?;
let req = Request::builder() let req = Request::builder()
.method(&Method::GET) .method(&Method::GET)
.uri(url) .uri(url)