Improve player (#823)

* Improve error handling
* Harmonize `Seek`: Make the decoders and player use the same math for converting between samples and milliseconds
* Reduce duplicate calls: Make decoder seek in PCM, not ms
* Simplify decoder errors with `thiserror`
This commit is contained in:
Jason Gray 2021-09-20 12:29:12 -05:00 committed by GitHub
parent 949ca4fded
commit 89577d1fc1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 280 additions and 242 deletions

View file

@ -15,7 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- [playback] Add `--normalisation-type auto` that switches between album and track automatically
### Changed
- [audio, playback] Moved `VorbisDecoder`, `VorbisError`, `AudioPacket`, `PassthroughDecoder`, `PassthroughError`, `AudioError`, `AudioDecoder` and the `convert` module from `librespot-audio` to `librespot-playback`. The underlying crates `vorbis`, `librespot-tremor`, `lewton` and `ogg` should be used directly. (breaking)
- [audio, playback] Moved `VorbisDecoder`, `VorbisError`, `AudioPacket`, `PassthroughDecoder`, `PassthroughError`, `DecoderError`, `AudioDecoder` and the `convert` module from `librespot-audio` to `librespot-playback`. The underlying crates `vorbis`, `librespot-tremor`, `lewton` and `ogg` should be used directly. (breaking)
- [audio, playback] Use `Duration` for time constants and functions (breaking)
- [connect, playback] Moved volume controls from `librespot-connect` to `librespot-playback` crate
- [connect] Synchronize player volume with mixer volume on playback

View file

@ -25,6 +25,7 @@ byteorder = "1.4"
shell-words = "1.0.0"
tokio = { version = "1", features = ["sync"] }
zerocopy = { version = "0.3" }
thiserror = { version = "1" }
# Backends
alsa = { version = "0.5", optional = true }
@ -40,7 +41,6 @@ glib = { version = "0.10", optional = true }
# Rodio dependencies
rodio = { version = "0.14", optional = true, default-features = false }
cpal = { version = "0.13", optional = true }
thiserror = { version = "1", optional = true }
# Decoder
lewton = "0.10"
@ -51,11 +51,11 @@ rand = "0.8"
rand_distr = "0.4"
[features]
alsa-backend = ["alsa", "thiserror"]
alsa-backend = ["alsa"]
portaudio-backend = ["portaudio-rs"]
pulseaudio-backend = ["libpulse-binding", "libpulse-simple-binding", "thiserror"]
pulseaudio-backend = ["libpulse-binding", "libpulse-simple-binding"]
jackaudio-backend = ["jack"]
rodio-backend = ["rodio", "cpal", "thiserror"]
rodiojack-backend = ["rodio", "cpal/jack", "thiserror"]
rodio-backend = ["rodio", "cpal"]
rodiojack-backend = ["rodio", "cpal/jack"]
sdl-backend = ["sdl2"]
gstreamer-backend = ["gstreamer", "gstreamer-app", "glib"]

View file

@ -71,7 +71,10 @@ impl Open for JackSink {
impl Sink for JackSink {
fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> io::Result<()> {
let samples_f32: &[f32] = &converter.f64_to_f32(packet.samples());
let samples = packet
.samples()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let samples_f32: &[f32] = &converter.f64_to_f32(samples);
for sample in samples_f32.iter() {
let res = self.send.send(*sample);
if res.is_err() {

View file

@ -148,7 +148,10 @@ impl<'a> Sink for PortAudioSink<'a> {
};
}
let samples = packet.samples();
let samples = packet
.samples()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
let result = match self {
Self::F32(stream, _parameters) => {
let samples_f32: &[f32] = &converter.f64_to_f32(samples);

View file

@ -176,7 +176,9 @@ pub fn open(host: cpal::Host, device: Option<String>, format: AudioFormat) -> Ro
impl Sink for RodioSink {
fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> io::Result<()> {
let samples = packet.samples();
let samples = packet
.samples()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
match self.format {
AudioFormat::F32 => {
let samples_f32: &[f32] = &converter.f64_to_f32(samples);

View file

@ -92,7 +92,9 @@ impl Sink for SdlSink {
}};
}
let samples = packet.samples();
let samples = packet
.samples()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
match self {
Self::F32(queue) => {
let samples_f32: &[f32] = &converter.f64_to_f32(samples);

View file

@ -1,22 +1,23 @@
use super::{AudioDecoder, AudioError, AudioPacket};
use super::{AudioDecoder, AudioPacket, DecoderError, DecoderResult};
use lewton::audio::AudioReadError::AudioIsHeader;
use lewton::inside_ogg::OggStreamReader;
use lewton::samples::InterleavedSamples;
use lewton::OggReadError::NoCapturePatternFound;
use lewton::VorbisError::{BadAudio, OggError};
use std::error;
use std::fmt;
use std::io::{Read, Seek};
use std::time::Duration;
pub struct VorbisDecoder<R: Read + Seek>(OggStreamReader<R>);
pub struct VorbisError(lewton::VorbisError);
impl<R> VorbisDecoder<R>
where
R: Read + Seek,
{
pub fn new(input: R) -> Result<VorbisDecoder<R>, VorbisError> {
Ok(VorbisDecoder(OggStreamReader::new(input)?))
pub fn new(input: R) -> DecoderResult<VorbisDecoder<R>> {
let reader =
OggStreamReader::new(input).map_err(|e| DecoderError::LewtonDecoder(e.to_string()))?;
Ok(VorbisDecoder(reader))
}
}
@ -24,51 +25,22 @@ impl<R> AudioDecoder for VorbisDecoder<R>
where
R: Read + Seek,
{
fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
let absgp = Duration::from_millis(ms as u64 * crate::SAMPLE_RATE as u64).as_secs();
match self.0.seek_absgp_pg(absgp as u64) {
Ok(_) => Ok(()),
Err(err) => Err(AudioError::VorbisError(err.into())),
}
fn seek(&mut self, absgp: u64) -> DecoderResult<()> {
self.0
.seek_absgp_pg(absgp)
.map_err(|e| DecoderError::LewtonDecoder(e.to_string()))?;
Ok(())
}
fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> {
use lewton::audio::AudioReadError::AudioIsHeader;
use lewton::OggReadError::NoCapturePatternFound;
use lewton::VorbisError::{BadAudio, OggError};
fn next_packet(&mut self) -> DecoderResult<Option<AudioPacket>> {
loop {
match self.0.read_dec_packet_generic::<InterleavedSamples<f32>>() {
Ok(Some(packet)) => return Ok(Some(AudioPacket::samples_from_f32(packet.samples))),
Ok(None) => return Ok(None),
Err(BadAudio(AudioIsHeader)) => (),
Err(OggError(NoCapturePatternFound)) => (),
Err(err) => return Err(AudioError::VorbisError(err.into())),
Err(e) => return Err(DecoderError::LewtonDecoder(e.to_string())),
}
}
}
}
impl From<lewton::VorbisError> for VorbisError {
fn from(err: lewton::VorbisError) -> VorbisError {
VorbisError(err)
}
}
impl fmt::Debug for VorbisError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl fmt::Display for VorbisError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}
impl error::Error for VorbisError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
error::Error::source(&self.0)
}
}

View file

@ -1,10 +1,30 @@
use std::fmt;
use thiserror::Error;
mod lewton_decoder;
pub use lewton_decoder::{VorbisDecoder, VorbisError};
pub use lewton_decoder::VorbisDecoder;
mod passthrough_decoder;
pub use passthrough_decoder::{PassthroughDecoder, PassthroughError};
pub use passthrough_decoder::PassthroughDecoder;
#[derive(Error, Debug)]
pub enum DecoderError {
#[error("Lewton Decoder Error: {0}")]
LewtonDecoder(String),
#[error("Passthrough Decoder Error: {0}")]
PassthroughDecoder(String),
}
pub type DecoderResult<T> = Result<T, DecoderError>;
#[derive(Error, Debug)]
pub enum AudioPacketError {
#[error("Decoder OggData Error: Can't return OggData on Samples")]
OggData,
#[error("Decoder Samples Error: Can't return Samples on OggData")]
Samples,
}
pub type AudioPacketResult<T> = Result<T, AudioPacketError>;
pub enum AudioPacket {
Samples(Vec<f64>),
@ -17,17 +37,17 @@ impl AudioPacket {
AudioPacket::Samples(f64_samples)
}
pub fn samples(&self) -> &[f64] {
pub fn samples(&self) -> AudioPacketResult<&[f64]> {
match self {
AudioPacket::Samples(s) => s,
AudioPacket::OggData(_) => panic!("can't return OggData on samples"),
AudioPacket::Samples(s) => Ok(s),
AudioPacket::OggData(_) => Err(AudioPacketError::OggData),
}
}
pub fn oggdata(&self) -> &[u8] {
pub fn oggdata(&self) -> AudioPacketResult<&[u8]> {
match self {
AudioPacket::Samples(_) => panic!("can't return samples on OggData"),
AudioPacket::OggData(d) => d,
AudioPacket::OggData(d) => Ok(d),
AudioPacket::Samples(_) => Err(AudioPacketError::Samples),
}
}
@ -39,34 +59,7 @@ impl AudioPacket {
}
}
#[derive(Debug)]
pub enum AudioError {
PassthroughError(PassthroughError),
VorbisError(VorbisError),
}
impl fmt::Display for AudioError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AudioError::PassthroughError(err) => write!(f, "PassthroughError({})", err),
AudioError::VorbisError(err) => write!(f, "VorbisError({})", err),
}
}
}
impl From<VorbisError> for AudioError {
fn from(err: VorbisError) -> AudioError {
AudioError::VorbisError(err)
}
}
impl From<PassthroughError> for AudioError {
fn from(err: PassthroughError) -> AudioError {
AudioError::PassthroughError(err)
}
}
pub trait AudioDecoder {
fn seek(&mut self, ms: i64) -> Result<(), AudioError>;
fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError>;
fn seek(&mut self, absgp: u64) -> DecoderResult<()>;
fn next_packet(&mut self) -> DecoderResult<Option<AudioPacket>>;
}

View file

@ -1,23 +1,22 @@
// Passthrough decoder for librespot
use super::{AudioDecoder, AudioError, AudioPacket};
use crate::SAMPLE_RATE;
use super::{AudioDecoder, AudioPacket, DecoderError, DecoderResult};
use ogg::{OggReadError, Packet, PacketReader, PacketWriteEndInfo, PacketWriter};
use std::fmt;
use std::io::{Read, Seek};
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
fn get_header<T>(code: u8, rdr: &mut PacketReader<T>) -> Result<Box<[u8]>, PassthroughError>
fn get_header<T>(code: u8, rdr: &mut PacketReader<T>) -> DecoderResult<Box<[u8]>>
where
T: Read + Seek,
{
let pck: Packet = rdr.read_packet_expected()?;
let pck: Packet = rdr
.read_packet_expected()
.map_err(|e| DecoderError::PassthroughDecoder(e.to_string()))?;
let pkt_type = pck.data[0];
debug!("Vorbis header type {}", &pkt_type);
if pkt_type != code {
return Err(PassthroughError(OggReadError::InvalidData));
return Err(DecoderError::PassthroughDecoder("Invalid Data".to_string()));
}
Ok(pck.data.into_boxed_slice())
@ -35,16 +34,14 @@ pub struct PassthroughDecoder<R: Read + Seek> {
setup: Box<[u8]>,
}
pub struct PassthroughError(ogg::OggReadError);
impl<R: Read + Seek> PassthroughDecoder<R> {
/// Constructs a new Decoder from a given implementation of `Read + Seek`.
pub fn new(rdr: R) -> Result<Self, PassthroughError> {
pub fn new(rdr: R) -> DecoderResult<Self> {
let mut rdr = PacketReader::new(rdr);
let stream_serial = SystemTime::now()
let since_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u32;
.map_err(|e| DecoderError::PassthroughDecoder(e.to_string()))?;
let stream_serial = since_epoch.as_millis() as u32;
info!("Starting passthrough track with serial {}", stream_serial);
@ -71,9 +68,7 @@ impl<R: Read + Seek> PassthroughDecoder<R> {
}
impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
info!("Seeking to {}", ms);
fn seek(&mut self, absgp: u64) -> DecoderResult<()> {
// add an eos to previous stream if missing
if self.bos && !self.eos {
match self.rdr.read_packet() {
@ -86,7 +81,7 @@ impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
PacketWriteEndInfo::EndStream,
absgp_page,
)
.unwrap();
.map_err(|e| DecoderError::PassthroughDecoder(e.to_string()))?;
}
_ => warn! {"Cannot write EoS after seeking"},
};
@ -97,23 +92,29 @@ impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
self.ofsgp_page = 0;
self.stream_serial += 1;
// hard-coded to 44.1 kHz
match self.rdr.seek_absgp(
None,
Duration::from_millis(ms as u64 * SAMPLE_RATE as u64).as_secs(),
) {
match self.rdr.seek_absgp(None, absgp) {
Ok(_) => {
// need to set some offset for next_page()
let pck = self.rdr.read_packet().unwrap().unwrap();
let pck = self
.rdr
.read_packet()
.map_err(|e| DecoderError::PassthroughDecoder(e.to_string()))?;
match pck {
Some(pck) => {
self.ofsgp_page = pck.absgp_page();
debug!("Seek to offset page {}", self.ofsgp_page);
Ok(())
}
Err(err) => Err(AudioError::PassthroughError(err.into())),
None => Err(DecoderError::PassthroughDecoder(
"Packet is None".to_string(),
)),
}
}
Err(e) => Err(DecoderError::PassthroughDecoder(e.to_string())),
}
}
fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> {
fn next_packet(&mut self) -> DecoderResult<Option<AudioPacket>> {
// write headers if we are (re)starting
if !self.bos {
self.wtr
@ -123,7 +124,7 @@ impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
PacketWriteEndInfo::EndPage,
0,
)
.unwrap();
.map_err(|e| DecoderError::PassthroughDecoder(e.to_string()))?;
self.wtr
.write_packet(
self.comment.clone(),
@ -131,7 +132,7 @@ impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
PacketWriteEndInfo::NormalPacket,
0,
)
.unwrap();
.map_err(|e| DecoderError::PassthroughDecoder(e.to_string()))?;
self.wtr
.write_packet(
self.setup.clone(),
@ -139,7 +140,7 @@ impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
PacketWriteEndInfo::EndPage,
0,
)
.unwrap();
.map_err(|e| DecoderError::PassthroughDecoder(e.to_string()))?;
self.bos = true;
debug!("Wrote Ogg headers");
}
@ -151,7 +152,7 @@ impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
info!("end of streaming");
return Ok(None);
}
Err(err) => return Err(AudioError::PassthroughError(err.into())),
Err(e) => return Err(DecoderError::PassthroughDecoder(e.to_string())),
};
let pckgp_page = pck.absgp_page();
@ -178,32 +179,14 @@ impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
inf,
pckgp_page - self.ofsgp_page,
)
.unwrap();
.map_err(|e| DecoderError::PassthroughDecoder(e.to_string()))?;
let data = self.wtr.inner_mut();
if !data.is_empty() {
let result = AudioPacket::OggData(std::mem::take(data));
return Ok(Some(result));
let ogg_data = AudioPacket::OggData(std::mem::take(data));
return Ok(Some(ogg_data));
}
}
}
}
impl fmt::Debug for PassthroughError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl From<ogg::OggReadError> for PassthroughError {
fn from(err: OggReadError) -> PassthroughError {
PassthroughError(err)
}
}
impl fmt::Display for PassthroughError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}

View file

@ -16,3 +16,5 @@ pub mod player;
pub const SAMPLE_RATE: u32 = 44100;
pub const NUM_CHANNELS: u8 = 2;
pub const SAMPLES_PER_SECOND: u32 = SAMPLE_RATE as u32 * NUM_CHANNELS as u32;
pub const PAGES_PER_MS: f64 = SAMPLE_RATE as f64 / 1000.0;
pub const MS_PER_PAGE: f64 = 1000.0 / SAMPLE_RATE as f64;

View file

@ -23,11 +23,11 @@ use crate::convert::Converter;
use crate::core::session::Session;
use crate::core::spotify_id::SpotifyId;
use crate::core::util::SeqGenerator;
use crate::decoder::{AudioDecoder, AudioError, AudioPacket, PassthroughDecoder, VorbisDecoder};
use crate::decoder::{AudioDecoder, AudioPacket, DecoderError, PassthroughDecoder, VorbisDecoder};
use crate::metadata::{AudioItem, FileFormat};
use crate::mixer::AudioFilter;
use crate::{NUM_CHANNELS, SAMPLES_PER_SECOND};
use crate::{MS_PER_PAGE, NUM_CHANNELS, PAGES_PER_MS, SAMPLES_PER_SECOND};
const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;
pub const DB_VOLTAGE_RATIO: f64 = 20.0;
@ -356,7 +356,11 @@ impl Player {
}
fn command(&self, cmd: PlayerCommand) {
self.commands.as_ref().unwrap().send(cmd).unwrap();
if let Some(commands) = self.commands.as_ref() {
if let Err(e) = commands.send(cmd) {
error!("Player Commands Error: {}", e);
}
}
}
pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 {
@ -429,7 +433,7 @@ impl Drop for Player {
if let Some(handle) = self.thread_handle.take() {
match handle.join() {
Ok(_) => (),
Err(_) => error!("Player thread panicked!"),
Err(e) => error!("Player thread Error: {:?}", e),
}
}
}
@ -505,7 +509,10 @@ impl PlayerState {
match *self {
Stopped | EndOfTrack { .. } | Paused { .. } | Loading { .. } => false,
Playing { .. } => true,
Invalid => panic!("invalid state"),
Invalid => {
error!("PlayerState is_playing: invalid state");
exit(1);
}
}
}
@ -530,7 +537,10 @@ impl PlayerState {
| Playing {
ref mut decoder, ..
} => Some(decoder),
Invalid => panic!("invalid state"),
Invalid => {
error!("PlayerState decoder: invalid state");
exit(1);
}
}
}
@ -546,7 +556,10 @@ impl PlayerState {
ref mut stream_loader_controller,
..
} => Some(stream_loader_controller),
Invalid => panic!("invalid state"),
Invalid => {
error!("PlayerState stream_loader_controller: invalid state");
exit(1);
}
}
}
@ -577,7 +590,10 @@ impl PlayerState {
},
};
}
_ => panic!("Called playing_to_end_of_track in non-playing state."),
_ => {
error!("Called playing_to_end_of_track in non-playing state.");
exit(1);
}
}
}
@ -610,7 +626,10 @@ impl PlayerState {
suggested_to_preload_next_track,
};
}
_ => panic!("invalid state"),
_ => {
error!("PlayerState paused_to_playing: invalid state");
exit(1);
}
}
}
@ -643,7 +662,10 @@ impl PlayerState {
suggested_to_preload_next_track,
};
}
_ => panic!("invalid state"),
_ => {
error!("PlayerState playing_to_paused: invalid state");
exit(1);
}
}
}
}
@ -699,8 +721,8 @@ impl PlayerTrackLoader {
) -> Option<PlayerLoadedTrackData> {
let audio = match AudioItem::get_audio_item(&self.session, spotify_id).await {
Ok(audio) => audio,
Err(_) => {
error!("Unable to load audio item.");
Err(e) => {
error!("Unable to load audio item: {:?}", e);
return None;
}
};
@ -768,8 +790,8 @@ impl PlayerTrackLoader {
let encrypted_file = match encrypted_file.await {
Ok(encrypted_file) => encrypted_file,
Err(_) => {
error!("Unable to load encrypted file.");
Err(e) => {
error!("Unable to load encrypted file: {:?}", e);
return None;
}
};
@ -787,8 +809,8 @@ impl PlayerTrackLoader {
let key = match self.session.audio_key().request(spotify_id, file_id).await {
Ok(key) => key,
Err(_) => {
error!("Unable to load decryption key");
Err(e) => {
error!("Unable to load decryption key: {:?}", e);
return None;
}
};
@ -813,12 +835,12 @@ impl PlayerTrackLoader {
let result = if self.config.passthrough {
match PassthroughDecoder::new(audio_file) {
Ok(result) => Ok(Box::new(result) as Decoder),
Err(e) => Err(AudioError::PassthroughError(e)),
Err(e) => Err(DecoderError::PassthroughDecoder(e.to_string())),
}
} else {
match VorbisDecoder::new(audio_file) {
Ok(result) => Ok(Box::new(result) as Decoder),
Err(e) => Err(AudioError::VorbisError(e)),
Err(e) => Err(DecoderError::LewtonDecoder(e.to_string())),
}
};
@ -830,15 +852,18 @@ impl PlayerTrackLoader {
e
);
if self
.session
.cache()
.expect("If the audio file is cached, a cache should exist")
.remove_file(file_id)
.is_err()
{
match self.session.cache() {
Some(cache) => {
if cache.remove_file(file_id).is_err() {
error!("Error removing file from cache");
return None;
}
}
None => {
error!("If the audio file is cached, a cache should exist");
return None;
}
}
// Just try it again
continue;
@ -849,13 +874,15 @@ impl PlayerTrackLoader {
}
};
if position_ms != 0 {
if let Err(err) = decoder.seek(position_ms as i64) {
error!("Vorbis error: {}", err);
let position_pcm = PlayerInternal::position_ms_to_pcm(position_ms);
if position_pcm != 0 {
if let Err(e) = decoder.seek(position_pcm) {
error!("PlayerTrackLoader load_track: {}", e);
}
stream_loader_controller.set_stream_mode();
}
let stream_position_pcm = PlayerInternal::position_ms_to_pcm(position_ms);
let stream_position_pcm = position_pcm;
info!("<{}> ({} ms) loaded", audio.name, audio.duration);
return Some(PlayerLoadedTrackData {
@ -912,7 +939,8 @@ impl Future for PlayerInternal {
start_playback,
);
if let PlayerState::Loading { .. } = self.state {
panic!("The state wasn't changed by start_playback()");
error!("The state wasn't changed by start_playback()");
exit(1);
}
}
Poll::Ready(Err(_)) => {
@ -976,30 +1004,37 @@ impl Future for PlayerInternal {
..
} = self.state
{
let packet = decoder.next_packet().expect("Vorbis error");
match decoder.next_packet() {
Ok(packet) => {
if !passthrough {
if let Some(ref packet) = packet {
match packet.samples() {
Ok(samples) => {
*stream_position_pcm +=
(packet.samples().len() / NUM_CHANNELS as usize) as u64;
(samples.len() / NUM_CHANNELS as usize) as u64;
let stream_position_millis =
Self::position_pcm_to_ms(*stream_position_pcm);
let notify_about_position = match *reported_nominal_start_time {
let notify_about_position =
match *reported_nominal_start_time {
None => true,
Some(reported_nominal_start_time) => {
// only notify if we're behind. If we're ahead it's probably due to a buffer of the backend and we're actually in time.
let lag = (Instant::now() - reported_nominal_start_time)
let lag = (Instant::now()
- reported_nominal_start_time)
.as_millis()
as i64
- stream_position_millis as i64;
lag > Duration::from_secs(1).as_millis() as i64
lag > Duration::from_secs(1).as_millis()
as i64
}
};
if notify_about_position {
*reported_nominal_start_time = Some(
Instant::now()
- Duration::from_millis(stream_position_millis as u64),
- Duration::from_millis(
stream_position_millis as u64,
),
);
self.send_event(PlayerEvent::Playing {
track_id,
@ -1009,14 +1044,27 @@ impl Future for PlayerInternal {
});
}
}
Err(e) => {
error!("PlayerInternal poll: {}", e);
exit(1);
}
}
}
} else {
// position, even if irrelevant, must be set so that seek() is called
*stream_position_pcm = duration_ms.into();
}
self.handle_packet(packet, normalisation_factor);
}
Err(e) => {
error!("PlayerInternal poll: {}", e);
exit(1);
}
}
} else {
unreachable!();
error!("PlayerInternal poll: Invalid PlayerState");
exit(1);
};
}
@ -1065,11 +1113,11 @@ impl Future for PlayerInternal {
impl PlayerInternal {
fn position_pcm_to_ms(position_pcm: u64) -> u32 {
(position_pcm * 10 / 441) as u32
(position_pcm as f64 * MS_PER_PAGE) as u32
}
fn position_ms_to_pcm(position_ms: u32) -> u64 {
position_ms as u64 * 441 / 10
(position_ms as f64 * PAGES_PER_MS) as u64
}
fn ensure_sink_running(&mut self) {
@ -1080,8 +1128,8 @@ impl PlayerInternal {
}
match self.sink.start() {
Ok(()) => self.sink_status = SinkStatus::Running,
Err(err) => {
error!("Fatal error, could not start audio sink: {}", err);
Err(e) => {
error!("{}", e);
exit(1);
}
}
@ -1103,8 +1151,8 @@ impl PlayerInternal {
callback(self.sink_status);
}
}
Err(err) => {
error!("Fatal error, could not stop audio sink: {}", err);
Err(e) => {
error!("{}", e);
exit(1);
}
}
@ -1151,7 +1199,10 @@ impl PlayerInternal {
self.state = PlayerState::Stopped;
}
PlayerState::Stopped => (),
PlayerState::Invalid => panic!("invalid state"),
PlayerState::Invalid => {
error!("PlayerInternal handle_player_stop: invalid state");
exit(1);
}
}
}
@ -1317,8 +1368,8 @@ impl PlayerInternal {
}
}
if let Err(err) = self.sink.write(&packet, &mut self.converter) {
error!("Fatal error, could not write audio to audio sink: {}", err);
if let Err(e) = self.sink.write(&packet, &mut self.converter) {
error!("{}", e);
exit(1);
}
}
@ -1337,7 +1388,8 @@ impl PlayerInternal {
play_request_id,
})
} else {
unreachable!();
error!("PlayerInternal handle_packet: Invalid PlayerState");
exit(1);
}
}
}
@ -1458,7 +1510,10 @@ impl PlayerInternal {
play_request_id,
position_ms,
}),
PlayerState::Invalid { .. } => panic!("Player is in an invalid state."),
PlayerState::Invalid { .. } => {
error!("PlayerInternal handle_command_load: invalid state");
exit(1);
}
}
// Now we check at different positions whether we already have a pre-loaded version
@ -1474,24 +1529,30 @@ impl PlayerInternal {
if previous_track_id == track_id {
let mut loaded_track = match mem::replace(&mut self.state, PlayerState::Invalid) {
PlayerState::EndOfTrack { loaded_track, .. } => loaded_track,
_ => unreachable!(),
_ => {
error!("PlayerInternal handle_command_load: Invalid PlayerState");
exit(1);
}
};
if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm {
let position_pcm = Self::position_ms_to_pcm(position_ms);
if position_pcm != loaded_track.stream_position_pcm {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking.
// But most likely the track is fully
// loaded already because we played
// to the end of it.
if let Err(e) = loaded_track.decoder.seek(position_pcm) {
// This may be blocking.
error!("PlayerInternal handle_command_load: {}", e);
}
loaded_track.stream_loader_controller.set_stream_mode();
loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms);
loaded_track.stream_position_pcm = position_pcm;
}
self.preload = PlayerPreload::None;
self.start_playback(track_id, play_request_id, loaded_track, play);
if let PlayerState::Invalid = self.state {
panic!("start_playback() hasn't set a valid player state.");
error!("start_playback() hasn't set a valid player state.");
exit(1);
}
return;
}
@ -1515,11 +1576,16 @@ impl PlayerInternal {
{
if current_track_id == track_id {
// we can use the current decoder. Ensure it's at the correct position.
if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm {
let position_pcm = Self::position_ms_to_pcm(position_ms);
if position_pcm != *stream_position_pcm {
stream_loader_controller.set_random_access_mode();
let _ = decoder.seek(position_ms as i64); // This may be blocking.
if let Err(e) = decoder.seek(position_pcm) {
// This may be blocking.
error!("PlayerInternal handle_command_load: {}", e);
}
stream_loader_controller.set_stream_mode();
*stream_position_pcm = Self::position_ms_to_pcm(position_ms);
*stream_position_pcm = position_pcm;
}
// Move the info from the current state into a PlayerLoadedTrackData so we can use
@ -1558,12 +1624,14 @@ impl PlayerInternal {
self.start_playback(track_id, play_request_id, loaded_track, play);
if let PlayerState::Invalid = self.state {
panic!("start_playback() hasn't set a valid player state.");
error!("start_playback() hasn't set a valid player state.");
exit(1);
}
return;
} else {
unreachable!();
error!("PlayerInternal handle_command_load: Invalid PlayerState");
exit(1);
}
}
}
@ -1581,17 +1649,23 @@ impl PlayerInternal {
mut loaded_track,
} = preload
{
if Self::position_ms_to_pcm(position_ms) != loaded_track.stream_position_pcm {
let position_pcm = Self::position_ms_to_pcm(position_ms);
if position_pcm != loaded_track.stream_position_pcm {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking
if let Err(e) = loaded_track.decoder.seek(position_pcm) {
// This may be blocking
error!("PlayerInternal handle_command_load: {}", e);
}
loaded_track.stream_loader_controller.set_stream_mode();
}
self.start_playback(track_id, play_request_id, *loaded_track, play);
return;
} else {
unreachable!();
error!("PlayerInternal handle_command_load: Invalid PlayerState");
exit(1);
}
}
}
@ -1697,7 +1771,9 @@ impl PlayerInternal {
stream_loader_controller.set_random_access_mode();
}
if let Some(decoder) = self.state.decoder() {
match decoder.seek(position_ms as i64) {
let position_pcm = Self::position_ms_to_pcm(position_ms);
match decoder.seek(position_pcm) {
Ok(_) => {
if let PlayerState::Playing {
ref mut stream_position_pcm,
@ -1708,10 +1784,10 @@ impl PlayerInternal {
..
} = self.state
{
*stream_position_pcm = Self::position_ms_to_pcm(position_ms);
*stream_position_pcm = position_pcm;
}
}
Err(err) => error!("Vorbis error: {:?}", err),
Err(e) => error!("PlayerInternal handle_command_seek: {}", e),
}
} else {
warn!("Player::seek called from invalid state");
@ -1954,7 +2030,9 @@ struct Subfile<T: Read + Seek> {
impl<T: Read + Seek> Subfile<T> {
pub fn new(mut stream: T, offset: u64) -> Subfile<T> {
stream.seek(SeekFrom::Start(offset)).unwrap();
if let Err(e) = stream.seek(SeekFrom::Start(offset)) {
error!("Subfile new Error: {}", e);
}
Subfile { stream, offset }
}
}