Merge branch 'new-api' of github.com:librespot-org/librespot into new-api

This commit is contained in:
Roderick van Domburg 2022-01-08 23:29:57 +01:00
commit 42455e0cdd
No known key found for this signature in database
GPG key ID: A9EF5222A26F0451
6 changed files with 65 additions and 85 deletions

View file

@ -5,7 +5,7 @@ use std::{
fs, fs,
io::{self, Read, Seek, SeekFrom}, io::{self, Read, Seek, SeekFrom},
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Arc,
}, },
time::{Duration, Instant}, time::{Duration, Instant},
@ -138,9 +138,9 @@ pub struct StreamingRequest {
#[derive(Debug)] #[derive(Debug)]
pub enum StreamLoaderCommand { pub enum StreamLoaderCommand {
Fetch(Range), // signal the stream loader to fetch a range of the file Fetch(Range), // signal the stream loader to fetch a range of the file
RandomAccessMode(), // optimise download strategy for random access RandomAccessMode, // optimise download strategy for random access
StreamMode(), // optimise download strategy for streaming StreamMode, // optimise download strategy for streaming
Close(), // terminate and don't load any more data Close, // terminate and don't load any more data
} }
#[derive(Clone)] #[derive(Clone)]
@ -299,17 +299,17 @@ impl StreamLoaderController {
pub fn set_random_access_mode(&self) { pub fn set_random_access_mode(&self) {
// optimise download strategy for random access // optimise download strategy for random access
self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode()); self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode);
} }
pub fn set_stream_mode(&self) { pub fn set_stream_mode(&self) {
// optimise download strategy for streaming // optimise download strategy for streaming
self.send_stream_loader_command(StreamLoaderCommand::StreamMode()); self.send_stream_loader_command(StreamLoaderCommand::StreamMode);
} }
pub fn close(&self) { pub fn close(&self) {
// terminate stream loading and don't load any more data for this file. // terminate stream loading and don't load any more data for this file.
self.send_stream_loader_command(StreamLoaderCommand::Close()); self.send_stream_loader_command(StreamLoaderCommand::Close);
} }
} }
@ -325,25 +325,13 @@ struct AudioFileDownloadStatus {
downloaded: RangeSet, downloaded: RangeSet,
} }
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
enum DownloadStrategy {
RandomAccess(),
Streaming(),
}
impl Default for DownloadStrategy {
fn default() -> Self {
Self::Streaming()
}
}
struct AudioFileShared { struct AudioFileShared {
cdn_url: CdnUrl, cdn_url: CdnUrl,
file_size: usize, file_size: usize,
bytes_per_second: usize, bytes_per_second: usize,
cond: Condvar, cond: Condvar,
download_status: Mutex<AudioFileDownloadStatus>, download_status: Mutex<AudioFileDownloadStatus>,
download_strategy: Mutex<DownloadStrategy>, download_streaming: AtomicBool,
number_of_open_requests: AtomicUsize, number_of_open_requests: AtomicUsize,
ping_time_ms: AtomicUsize, ping_time_ms: AtomicUsize,
read_position: AtomicUsize, read_position: AtomicUsize,
@ -462,7 +450,7 @@ impl AudioFileStreaming {
requested: RangeSet::new(), requested: RangeSet::new(),
downloaded: RangeSet::new(), downloaded: RangeSet::new(),
}), }),
download_strategy: Mutex::new(DownloadStrategy::default()), download_streaming: AtomicBool::new(true),
number_of_open_requests: AtomicUsize::new(0), number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize), ping_time_ms: AtomicUsize::new(INITIAL_PING_TIME_ESTIMATE.as_millis() as usize),
read_position: AtomicUsize::new(0), read_position: AtomicUsize::new(0),
@ -507,9 +495,7 @@ impl Read for AudioFileStreaming {
return Ok(0); return Ok(0);
} }
let length_to_request = match *(self.shared.download_strategy.lock()) { let length_to_request = if self.shared.download_streaming.load(Ordering::Acquire) {
DownloadStrategy::RandomAccess() => length,
DownloadStrategy::Streaming() => {
// Due to the read-ahead stuff, we potentially request more than the actual request demanded. // Due to the read-ahead stuff, we potentially request more than the actual request demanded.
let ping_time_seconds = let ping_time_seconds =
Duration::from_millis(self.shared.ping_time_ms.load(Ordering::Relaxed) as u64) Duration::from_millis(self.shared.ping_time_ms.load(Ordering::Relaxed) as u64)
@ -517,14 +503,15 @@ impl Read for AudioFileStreaming {
let length_to_request = length let length_to_request = length
+ max( + max(
(READ_AHEAD_DURING_PLAYBACK.as_secs_f32() (READ_AHEAD_DURING_PLAYBACK.as_secs_f32() * self.shared.bytes_per_second as f32)
* self.shared.bytes_per_second as f32) as usize, as usize,
(READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
* ping_time_seconds * ping_time_seconds
* self.shared.bytes_per_second as f32) as usize, * self.shared.bytes_per_second as f32) as usize,
); );
min(length_to_request, self.shared.file_size - offset) min(length_to_request, self.shared.file_size - offset)
} } else {
length
}; };
let mut ranges_to_request = RangeSet::new(); let mut ranges_to_request = RangeSet::new();
@ -575,7 +562,7 @@ impl Read for AudioFileStreaming {
impl Seek for AudioFileStreaming { impl Seek for AudioFileStreaming {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> { fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
// If we are already at this position, we don't need to switch download strategy. // If we are already at this position, we don't need to switch download mode.
// These checks and locks are less expensive than interrupting streaming. // These checks and locks are less expensive than interrupting streaming.
let current_position = self.position as i64; let current_position = self.position as i64;
let requested_pos = match pos { let requested_pos = match pos {
@ -595,13 +582,17 @@ impl Seek for AudioFileStreaming {
.downloaded .downloaded
.contains(requested_pos as usize); .contains(requested_pos as usize);
let mut old_strategy = DownloadStrategy::default(); let mut was_streaming = false;
if !available { if !available {
// Ensure random access mode if we need to download this part. // Ensure random access mode if we need to download this part.
old_strategy = std::mem::replace( // Checking whether we are streaming now is a micro-optimization
&mut *(self.shared.download_strategy.lock()), // to save an atomic load.
DownloadStrategy::RandomAccess(), was_streaming = self.shared.download_streaming.load(Ordering::Acquire);
); if was_streaming {
self.shared
.download_streaming
.store(false, Ordering::Release);
}
} }
self.position = self.read_file.seek(pos)?; self.position = self.read_file.seek(pos)?;
@ -609,8 +600,10 @@ impl Seek for AudioFileStreaming {
.read_position .read_position
.store(self.position as usize, Ordering::Release); .store(self.position as usize, Ordering::Release);
if !available && old_strategy != DownloadStrategy::RandomAccess() { if !available && was_streaming {
*(self.shared.download_strategy.lock()) = old_strategy; self.shared
.download_streaming
.store(true, Ordering::Release);
} }
Ok(self.position) Ok(self.position)

View file

@ -16,9 +16,9 @@ use librespot_core::{session::Session, Error};
use crate::range_set::{Range, RangeSet}; use crate::range_set::{Range, RangeSet};
use super::{ use super::{
AudioFileError, AudioFileResult, AudioFileShared, DownloadStrategy, StreamLoaderCommand, AudioFileError, AudioFileResult, AudioFileShared, StreamLoaderCommand, StreamingRequest,
StreamingRequest, FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, FAST_PREFETCH_THRESHOLD_FACTOR, MAXIMUM_ASSUMED_PING_TIME, MAX_PREFETCH_REQUESTS,
MAX_PREFETCH_REQUESTS, MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR, MINIMUM_DOWNLOAD_SIZE, PREFETCH_THRESHOLD_FACTOR,
}; };
struct PartialFileData { struct PartialFileData {
@ -157,8 +157,8 @@ enum ControlFlow {
} }
impl AudioFileFetch { impl AudioFileFetch {
fn get_download_strategy(&mut self) -> DownloadStrategy { fn is_download_streaming(&mut self) -> bool {
*(self.shared.download_strategy.lock()) self.shared.download_streaming.load(Ordering::Acquire)
} }
fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult { fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult {
@ -337,15 +337,17 @@ impl AudioFileFetch {
) -> Result<ControlFlow, Error> { ) -> Result<ControlFlow, Error> {
match cmd { match cmd {
StreamLoaderCommand::Fetch(request) => { StreamLoaderCommand::Fetch(request) => {
self.download_range(request.start, request.length)?; self.download_range(request.start, request.length)?
} }
StreamLoaderCommand::RandomAccessMode() => { StreamLoaderCommand::RandomAccessMode => self
*(self.shared.download_strategy.lock()) = DownloadStrategy::RandomAccess(); .shared
} .download_streaming
StreamLoaderCommand::StreamMode() => { .store(false, Ordering::Release),
*(self.shared.download_strategy.lock()) = DownloadStrategy::Streaming(); StreamLoaderCommand::StreamMode => self
} .shared
StreamLoaderCommand::Close() => return Ok(ControlFlow::Break), .download_streaming
.store(true, Ordering::Release),
StreamLoaderCommand::Close => return Ok(ControlFlow::Break),
} }
Ok(ControlFlow::Continue) Ok(ControlFlow::Continue)
@ -430,7 +432,7 @@ pub(super) async fn audio_file_fetch(
else => (), else => (),
} }
if fetch.get_download_strategy() == DownloadStrategy::Streaming() { if fetch.is_download_streaming() {
let number_of_open_requests = let number_of_open_requests =
fetch.shared.number_of_open_requests.load(Ordering::SeqCst); fetch.shared.number_of_open_requests.load(Ordering::SeqCst);
if number_of_open_requests < MAX_PREFETCH_REQUESTS { if number_of_open_requests < MAX_PREFETCH_REQUESTS {

View file

@ -56,19 +56,10 @@ impl AudioPacket {
} }
} }
#[derive(Debug, Clone, Eq, PartialEq, Hash, Ord, PartialOrd)]
pub enum AudioPositionKind {
// the position is at the expected packet
Current,
// the decoder skipped some corrupted or invalid data,
// and the position is now later than expected
SkippedTo,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AudioPacketPosition { pub struct AudioPacketPosition {
pub position_ms: u32, pub position_ms: u32,
pub kind: AudioPositionKind, pub skipped: bool,
} }
impl Deref for AudioPacketPosition { impl Deref for AudioPacketPosition {

View file

@ -7,9 +7,7 @@ use std::{
// TODO: move this to the Symphonia Ogg demuxer // TODO: move this to the Symphonia Ogg demuxer
use ogg::{OggReadError, Packet, PacketReader, PacketWriteEndInfo, PacketWriter}; use ogg::{OggReadError, Packet, PacketReader, PacketWriteEndInfo, PacketWriter};
use super::{ use super::{AudioDecoder, AudioPacket, AudioPacketPosition, DecoderError, DecoderResult};
AudioDecoder, AudioPacket, AudioPacketPosition, AudioPositionKind, DecoderError, DecoderResult,
};
use crate::{ use crate::{
metadata::audio::{AudioFileFormat, AudioFiles}, metadata::audio::{AudioFileFormat, AudioFiles},
@ -212,7 +210,7 @@ impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
let position_ms = Self::position_pcm_to_ms(pckgp_page); let position_ms = Self::position_pcm_to_ms(pckgp_page);
let packet_position = AudioPacketPosition { let packet_position = AudioPacketPosition {
position_ms, position_ms,
kind: AudioPositionKind::Current, skipped: false,
}; };
let ogg_data = AudioPacket::Raw(std::mem::take(data)); let ogg_data = AudioPacket::Raw(std::mem::take(data));

View file

@ -16,9 +16,7 @@ use symphonia::{
}, },
}; };
use super::{ use super::{AudioDecoder, AudioPacket, AudioPacketPosition, DecoderError, DecoderResult};
AudioDecoder, AudioPacket, AudioPacketPosition, AudioPositionKind, DecoderError, DecoderResult,
};
use crate::{ use crate::{
metadata::audio::{AudioFileFormat, AudioFiles}, metadata::audio::{AudioFileFormat, AudioFiles},
@ -173,7 +171,7 @@ impl AudioDecoder for SymphoniaDecoder {
} }
fn next_packet(&mut self) -> DecoderResult<Option<(AudioPacketPosition, AudioPacket)>> { fn next_packet(&mut self) -> DecoderResult<Option<(AudioPacketPosition, AudioPacket)>> {
let mut position_kind = AudioPositionKind::Current; let mut skipped = false;
loop { loop {
let packet = match self.format.next_packet() { let packet = match self.format.next_packet() {
@ -193,7 +191,7 @@ impl AudioDecoder for SymphoniaDecoder {
let position_ms = self.ts_to_ms(packet.pts()); let position_ms = self.ts_to_ms(packet.pts());
let packet_position = AudioPacketPosition { let packet_position = AudioPacketPosition {
position_ms, position_ms,
kind: position_kind, skipped,
}; };
match self.decoder.decode(&packet) { match self.decoder.decode(&packet) {
@ -215,7 +213,7 @@ impl AudioDecoder for SymphoniaDecoder {
// The packet failed to decode due to corrupted or invalid data, get a new // The packet failed to decode due to corrupted or invalid data, get a new
// packet and try again. // packet and try again.
warn!("Skipping malformed audio packet at {} ms", position_ms); warn!("Skipping malformed audio packet at {} ms", position_ms);
position_kind = AudioPositionKind::SkippedTo; skipped = true;
continue; continue;
} }
Err(err) => return Err(err.into()), Err(err) => return Err(err.into()),

View file

@ -30,8 +30,7 @@ use crate::{
convert::Converter, convert::Converter,
core::{util::SeqGenerator, Error, Session, SpotifyId}, core::{util::SeqGenerator, Error, Session, SpotifyId},
decoder::{ decoder::{
AudioDecoder, AudioPacket, AudioPacketPosition, AudioPositionKind, PassthroughDecoder, AudioDecoder, AudioPacket, AudioPacketPosition, PassthroughDecoder, SymphoniaDecoder,
SymphoniaDecoder,
}, },
metadata::audio::{AudioFileFormat, AudioFiles, AudioItem}, metadata::audio::{AudioFileFormat, AudioFiles, AudioItem},
mixer::AudioFilter, mixer::AudioFilter,
@ -1116,8 +1115,7 @@ impl Future for PlayerInternal {
// Only notify if we're skipped some packets *or* we are behind. // Only notify if we're skipped some packets *or* we are behind.
// If we're ahead it's probably due to a buffer of the backend // If we're ahead it's probably due to a buffer of the backend
// and we're actually in time. // and we're actually in time.
let notify_about_position = packet_position.kind let notify_about_position = packet_position.skipped
!= AudioPositionKind::Current
|| match *reported_nominal_start_time { || match *reported_nominal_start_time {
None => true, None => true,
Some(reported_nominal_start_time) => { Some(reported_nominal_start_time) => {