From 7921f239276099ac1175233fc45252e14030ea52 Mon Sep 17 00:00:00 2001 From: Roderick van Domburg Date: Mon, 3 Jan 2022 00:13:28 +0100 Subject: [PATCH] Improve format handling and support MP3 - Switch from `lewton` to `Symphonia`. This is a pure Rust demuxer and decoder in active development that supports a wide range of formats, including Ogg Vorbis, MP3, AAC and FLAC for future HiFi support. At the moment only Ogg Vorbis and MP3 are enabled; all AAC files are DRM-protected. - Bump MSRV to 1.51, required for `Symphonia`. - Filter out all files whose format is not specified. - Not all episodes seem to be encrypted. If we can't get an audio key, try and see if we can play the file without decryption. - After seeking, report the actual position instead of the target. - Remove the 0xa7 bytes offset from `Subfile`, `Symphonia` does not balk at Spotify's custom Ogg packet before it. This also simplifies handling of formats other than Ogg Vorbis. - When there is no next track to load, signal the UI that the player has stopped. Before, the player would get stuck in an infinite reloading loop when there was only one track in the queue and that track could not be loaded. --- Cargo.lock | 119 +++++++++- audio/src/decrypt.rs | 24 +- audio/src/fetch/mod.rs | 1 + audio/src/lib.rs | 4 +- connect/src/spirc.rs | 34 ++- metadata/src/audio/file.rs | 10 +- playback/Cargo.toml | 10 +- playback/src/audio_backend/mod.rs | 2 +- playback/src/decoder/lewton_decoder.rs | 4 +- playback/src/decoder/mod.rs | 62 ++++-- playback/src/decoder/passthrough_decoder.rs | 33 ++- playback/src/decoder/symphonia_decoder.rs | 229 ++++++++++++-------- playback/src/player.rs | 153 +++++++------ 13 files changed, 445 insertions(+), 240 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1e5bc36f..a7f5093d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -97,6 +97,12 @@ version = "1.0.51" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b26702f315f53b6071259e15dd9d64528213b44d61de1ec926eca7715d62203" +[[package]] +name = "arrayvec" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" + [[package]] name = "async-trait" version = "0.1.51" @@ -186,6 +192,12 @@ version = "3.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c" +[[package]] +name = "bytemuck" +version = "1.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439989e6b8c38d1b6570a384ef1e49c8848128f5a97f3914baef02920842712f" + [[package]] name = "byteorder" version = "1.4.3" @@ -446,6 +458,15 @@ version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" +[[package]] +name = "encoding_rs" +version = "0.8.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df" +dependencies = [ + "cfg-if 1.0.0", +] + [[package]] name = "env_logger" version = "0.8.4" @@ -1121,17 +1142,6 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" -[[package]] -name = "lewton" -version = "0.10.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "777b48df9aaab155475a83a7df3070395ea1ac6902f5cd062b8f2b028075c030" -dependencies = [ - "byteorder", - "ogg", - "tinyvec", -] - [[package]] name = "libc" version = "0.2.109" @@ -1398,7 +1408,6 @@ dependencies = [ "gstreamer", "gstreamer-app", "jack", - "lewton", "libpulse-binding", "libpulse-simple-binding", "librespot-audio", @@ -1413,6 +1422,7 @@ dependencies = [ "rodio", "sdl2", "shell-words", + "symphonia", "thiserror", "tokio", "zerocopy", @@ -2470,6 +2480,91 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" +[[package]] +name = "symphonia" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7e5f38aa07e792f4eebb0faa93cee088ec82c48222dd332897aae1569d9a4b7" +dependencies = [ + "lazy_static", + "symphonia-bundle-mp3", + "symphonia-codec-vorbis", + "symphonia-core", + "symphonia-format-ogg", + "symphonia-metadata", +] + +[[package]] +name = "symphonia-bundle-mp3" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec4d97c4a61ece4651751dddb393ebecb7579169d9e758ae808fe507a5250790" +dependencies = [ + "bitflags", + "lazy_static", + "log", + "symphonia-core", + "symphonia-metadata", +] + +[[package]] +name = "symphonia-codec-vorbis" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a29ed6748078effb35a05064a451493a78038918981dc1a76bdf5a2752d441fa" +dependencies = [ + "log", + "symphonia-core", + "symphonia-utils-xiph", +] + +[[package]] +name = "symphonia-core" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa135e97be0f4a666c31dfe5ef4c75435ba3d355fd6a73d2100aa79b14c104c9" +dependencies = [ + "arrayvec", + "bitflags", + "bytemuck", + "lazy_static", + "log", +] + +[[package]] +name = "symphonia-format-ogg" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7b2357288a79adfec532cfd86049696cfa5c58efeff83bd51687a528f18a519" +dependencies = [ + "log", + "symphonia-core", + "symphonia-metadata", + "symphonia-utils-xiph", +] + +[[package]] +name = "symphonia-metadata" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5260599daba18d8fe905ca3eb3b42ba210529a6276886632412cc74984e79b1a" +dependencies = [ + "encoding_rs", + "lazy_static", + "log", + "symphonia-core", +] + +[[package]] +name = "symphonia-utils-xiph" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a37026c6948ff842e0bf94b4008579cc71ab16ed0ff9ca70a331f60f4f1e1e9" +dependencies = [ + "symphonia-core", + "symphonia-metadata", +] + [[package]] name = "syn" version = "1.0.82" diff --git a/audio/src/decrypt.rs b/audio/src/decrypt.rs index 95dc7c08..e11241a9 100644 --- a/audio/src/decrypt.rs +++ b/audio/src/decrypt.rs @@ -14,16 +14,20 @@ const AUDIO_AESIV: [u8; 16] = [ ]; pub struct AudioDecrypt { - cipher: Aes128Ctr, + // a `None` cipher is a convenience to make `AudioDecrypt` pass files unaltered + cipher: Option, reader: T, } impl AudioDecrypt { - pub fn new(key: AudioKey, reader: T) -> AudioDecrypt { - let cipher = Aes128Ctr::new( - GenericArray::from_slice(&key.0), - GenericArray::from_slice(&AUDIO_AESIV), - ); + pub fn new(key: Option, reader: T) -> AudioDecrypt { + let cipher = key.map(|key| { + Aes128Ctr::new( + GenericArray::from_slice(&key.0), + GenericArray::from_slice(&AUDIO_AESIV), + ) + }); + AudioDecrypt { cipher, reader } } } @@ -32,7 +36,9 @@ impl io::Read for AudioDecrypt { fn read(&mut self, output: &mut [u8]) -> io::Result { let len = self.reader.read(output)?; - self.cipher.apply_keystream(&mut output[..len]); + if let Some(ref mut cipher) = self.cipher { + cipher.apply_keystream(&mut output[..len]); + } Ok(len) } @@ -42,7 +48,9 @@ impl io::Seek for AudioDecrypt { fn seek(&mut self, pos: io::SeekFrom) -> io::Result { let newpos = self.reader.seek(pos)?; - self.cipher.seek(newpos); + if let Some(ref mut cipher) = self.cipher { + cipher.seek(newpos); + } Ok(newpos) } diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index 2f478bba..f3229574 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -57,6 +57,7 @@ impl From for Error { /// The minimum size of a block that is requested from the Spotify servers in one request. /// This is the block size that is typically requested while doing a `seek()` on a file. +/// The Symphonia decoder requires this to be a power of 2 and > 32 kB. /// Note: smaller requests can happen if part of the block is downloaded already. pub const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 128; diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 5685486d..22bf2f0a 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -9,6 +9,6 @@ mod range_set; pub use decrypt::AudioDecrypt; pub use fetch::{AudioFile, AudioFileError, StreamLoaderController}; pub use fetch::{ - READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK, - READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, + MINIMUM_DOWNLOAD_SIZE, READ_AHEAD_BEFORE_PLAYBACK, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, + READ_AHEAD_DURING_PLAYBACK, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, }; diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index eedb6cbd..427555ff 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -687,7 +687,6 @@ impl SpircTask { match self.play_status { SpircPlayStatus::Stopped => Ok(()), _ => { - warn!("The player has stopped unexpectedly."); self.state.set_status(PlayStatus::kPlayStatusStop); self.play_status = SpircPlayStatus::Stopped; self.notify(None) @@ -801,9 +800,7 @@ impl SpircTask { self.load_track(start_playing, update.get_state().get_position_ms()); } else { info!("No more tracks left in queue"); - self.state.set_status(PlayStatus::kPlayStatusStop); - self.player.stop(); - self.play_status = SpircPlayStatus::Stopped; + self.handle_stop(); } self.notify(None) @@ -909,9 +906,7 @@ impl SpircTask { <= update.get_device_state().get_became_active_at() { self.device.set_is_active(false); - self.state.set_status(PlayStatus::kPlayStatusStop); - self.player.stop(); - self.play_status = SpircPlayStatus::Stopped; + self.handle_stop(); } Ok(()) } @@ -920,6 +915,10 @@ impl SpircTask { } } + fn handle_stop(&mut self) { + self.player.stop(); + } + fn handle_play(&mut self) { match self.play_status { SpircPlayStatus::Paused { @@ -1036,13 +1035,14 @@ impl SpircTask { .. } => { *preloading_of_next_track_triggered = true; - if let Some(track_id) = self.preview_next_track() { - self.player.preload(track_id); - } } - SpircPlayStatus::LoadingPause { .. } - | SpircPlayStatus::LoadingPlay { .. } - | SpircPlayStatus::Stopped => (), + _ => (), + } + + if let Some(track_id) = self.preview_next_track() { + self.player.preload(track_id); + } else { + self.handle_stop(); } } @@ -1122,9 +1122,7 @@ impl SpircTask { } else { info!("Not playing next track because there are no more tracks left in queue."); self.state.set_playing_track_index(0); - self.state.set_status(PlayStatus::kPlayStatusStop); - self.player.stop(); - self.play_status = SpircPlayStatus::Stopped; + self.handle_stop(); } } @@ -1392,9 +1390,7 @@ impl SpircTask { } } None => { - self.state.set_status(PlayStatus::kPlayStatusStop); - self.player.stop(); - self.play_status = SpircPlayStatus::Stopped; + self.handle_stop(); } } } diff --git a/metadata/src/audio/file.rs b/metadata/src/audio/file.rs index d3ce69b7..65608814 100644 --- a/metadata/src/audio/file.rs +++ b/metadata/src/audio/file.rs @@ -20,7 +20,15 @@ impl From<&[AudioFileMessage]> for AudioFiles { fn from(files: &[AudioFileMessage]) -> Self { let audio_files = files .iter() - .map(|file| (file.get_format(), FileId::from(file.get_file_id()))) + .filter_map(|file| { + let file_id = FileId::from(file.get_file_id()); + if file.has_format() { + Some((file.get_format(), file_id)) + } else { + trace!("Ignoring file <{}> with unspecified format", file_id); + None + } + }) .collect(); AudioFiles(audio_files) diff --git a/playback/Cargo.toml b/playback/Cargo.toml index 92452d3c..262312c0 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -18,15 +18,15 @@ path = "../metadata" version = "0.3.1" [dependencies] +byteorder = "1.4" futures-executor = "0.3" futures-util = { version = "0.3", default_features = false, features = ["alloc"] } log = "0.4" -byteorder = "1.4" +parking_lot = { version = "0.11", features = ["deadlock_detection"] } shell-words = "1.0.0" thiserror = "1.0" tokio = { version = "1", features = ["parking_lot", "rt", "rt-multi-thread", "sync"] } zerocopy = { version = "0.3" } -parking_lot = { version = "0.11", features = ["deadlock_detection"] } # Backends alsa = { version = "0.5", optional = true } @@ -43,8 +43,10 @@ glib = { version = "0.10", optional = true } rodio = { version = "0.14", optional = true, default-features = false } cpal = { version = "0.13", optional = true } -# Decoder -lewton = "0.10" +# Container and audio decoder +symphonia = { version = "0.4", default-features = false, features = ["mp3", "ogg", "vorbis"] } + +# Legacy Ogg container decoder for the passthrough decoder ogg = "0.8" # Dithering diff --git a/playback/src/audio_backend/mod.rs b/playback/src/audio_backend/mod.rs index dc21fb3d..aab43476 100644 --- a/playback/src/audio_backend/mod.rs +++ b/playback/src/audio_backend/mod.rs @@ -71,7 +71,7 @@ macro_rules! sink_as_bytes { self.write_bytes(samples_s16.as_bytes()) } }, - AudioPacket::OggData(samples) => self.write_bytes(samples), + AudioPacket::Raw(samples) => self.write_bytes(samples), } } }; diff --git a/playback/src/decoder/lewton_decoder.rs b/playback/src/decoder/lewton_decoder.rs index bc90b992..9e79c1e3 100644 --- a/playback/src/decoder/lewton_decoder.rs +++ b/playback/src/decoder/lewton_decoder.rs @@ -25,11 +25,11 @@ impl AudioDecoder for VorbisDecoder where R: Read + Seek, { - fn seek(&mut self, absgp: u64) -> DecoderResult<()> { + fn seek(&mut self, absgp: u64) -> Result { self.0 .seek_absgp_pg(absgp) .map_err(|e| DecoderError::LewtonDecoder(e.to_string()))?; - Ok(()) + Ok(absgp) } fn next_packet(&mut self) -> DecoderResult> { diff --git a/playback/src/decoder/mod.rs b/playback/src/decoder/mod.rs index 087bba4c..c0ede5a0 100644 --- a/playback/src/decoder/mod.rs +++ b/playback/src/decoder/mod.rs @@ -1,26 +1,28 @@ use thiserror::Error; -mod lewton_decoder; -pub use lewton_decoder::VorbisDecoder; +use crate::metadata::audio::AudioFileFormat; mod passthrough_decoder; pub use passthrough_decoder::PassthroughDecoder; +mod symphonia_decoder; +pub use symphonia_decoder::SymphoniaDecoder; + #[derive(Error, Debug)] pub enum DecoderError { - #[error("Lewton Decoder Error: {0}")] - LewtonDecoder(String), #[error("Passthrough Decoder Error: {0}")] PassthroughDecoder(String), + #[error("Symphonia Decoder Error: {0}")] + SymphoniaDecoder(String), } pub type DecoderResult = Result; #[derive(Error, Debug)] pub enum AudioPacketError { - #[error("Decoder OggData Error: Can't return OggData on Samples")] - OggData, - #[error("Decoder Samples Error: Can't return Samples on OggData")] + #[error("Decoder Raw Error: Can't return Raw on Samples")] + Raw, + #[error("Decoder Samples Error: Can't return Samples on Raw")] Samples, } @@ -28,25 +30,20 @@ pub type AudioPacketResult = Result; pub enum AudioPacket { Samples(Vec), - OggData(Vec), + Raw(Vec), } impl AudioPacket { - pub fn samples_from_f32(f32_samples: Vec) -> Self { - let f64_samples = f32_samples.iter().map(|sample| *sample as f64).collect(); - AudioPacket::Samples(f64_samples) - } - pub fn samples(&self) -> AudioPacketResult<&[f64]> { match self { AudioPacket::Samples(s) => Ok(s), - AudioPacket::OggData(_) => Err(AudioPacketError::OggData), + AudioPacket::Raw(_) => Err(AudioPacketError::Raw), } } pub fn oggdata(&self) -> AudioPacketResult<&[u8]> { match self { - AudioPacket::OggData(d) => Ok(d), + AudioPacket::Raw(d) => Ok(d), AudioPacket::Samples(_) => Err(AudioPacketError::Samples), } } @@ -54,12 +51,43 @@ impl AudioPacket { pub fn is_empty(&self) -> bool { match self { AudioPacket::Samples(s) => s.is_empty(), - AudioPacket::OggData(d) => d.is_empty(), + AudioPacket::Raw(d) => d.is_empty(), } } } pub trait AudioDecoder { - fn seek(&mut self, absgp: u64) -> DecoderResult<()>; + fn seek(&mut self, absgp: u64) -> Result; fn next_packet(&mut self) -> DecoderResult>; + + fn is_ogg_vorbis(format: AudioFileFormat) -> bool + where + Self: Sized, + { + matches!( + format, + AudioFileFormat::OGG_VORBIS_320 + | AudioFileFormat::OGG_VORBIS_160 + | AudioFileFormat::OGG_VORBIS_96 + ) + } + + fn is_mp3(format: AudioFileFormat) -> bool + where + Self: Sized, + { + matches!( + format, + AudioFileFormat::MP3_320 + | AudioFileFormat::MP3_256 + | AudioFileFormat::MP3_160 + | AudioFileFormat::MP3_96 + ) + } +} + +impl From for DecoderError { + fn from(err: symphonia::core::errors::Error) -> Self { + Self::SymphoniaDecoder(err.to_string()) + } } diff --git a/playback/src/decoder/passthrough_decoder.rs b/playback/src/decoder/passthrough_decoder.rs index dd8e3b32..9b8eedf8 100644 --- a/playback/src/decoder/passthrough_decoder.rs +++ b/playback/src/decoder/passthrough_decoder.rs @@ -1,8 +1,15 @@ // Passthrough decoder for librespot -use super::{AudioDecoder, AudioPacket, DecoderError, DecoderResult}; +use std::{ + io::{Read, Seek}, + time::{SystemTime, UNIX_EPOCH}, +}; + +// TODO: move this to the Symphonia Ogg demuxer use ogg::{OggReadError, Packet, PacketReader, PacketWriteEndInfo, PacketWriter}; -use std::io::{Read, Seek}; -use std::time::{SystemTime, UNIX_EPOCH}; + +use super::{AudioDecoder, AudioPacket, DecoderError, DecoderResult}; + +use crate::metadata::audio::AudioFileFormat; fn get_header(code: u8, rdr: &mut PacketReader) -> DecoderResult> where @@ -36,7 +43,14 @@ pub struct PassthroughDecoder { impl PassthroughDecoder { /// Constructs a new Decoder from a given implementation of `Read + Seek`. - pub fn new(rdr: R) -> DecoderResult { + pub fn new(rdr: R, format: AudioFileFormat) -> DecoderResult { + if !Self::is_ogg_vorbis(format) { + return Err(DecoderError::PassthroughDecoder(format!( + "Passthrough decoder is not implemented for format {:?}", + format + ))); + } + let mut rdr = PacketReader::new(rdr); let since_epoch = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -68,7 +82,7 @@ impl PassthroughDecoder { } impl AudioDecoder for PassthroughDecoder { - fn seek(&mut self, absgp: u64) -> DecoderResult<()> { + fn seek(&mut self, absgp: u64) -> Result { // add an eos to previous stream if missing if self.bos && !self.eos { match self.rdr.read_packet() { @@ -101,9 +115,10 @@ impl AudioDecoder for PassthroughDecoder { .map_err(|e| DecoderError::PassthroughDecoder(e.to_string()))?; match pck { Some(pck) => { - self.ofsgp_page = pck.absgp_page(); - debug!("Seek to offset page {}", self.ofsgp_page); - Ok(()) + let new_page = pck.absgp_page(); + self.ofsgp_page = new_page; + debug!("Seek to offset page {}", new_page); + Ok(new_page) } None => Err(DecoderError::PassthroughDecoder( "Packet is None".to_string(), @@ -184,7 +199,7 @@ impl AudioDecoder for PassthroughDecoder { let data = self.wtr.inner_mut(); if !data.is_empty() { - let ogg_data = AudioPacket::OggData(std::mem::take(data)); + let ogg_data = AudioPacket::Raw(std::mem::take(data)); return Ok(Some(ogg_data)); } } diff --git a/playback/src/decoder/symphonia_decoder.rs b/playback/src/decoder/symphonia_decoder.rs index 309c495d..5546faa5 100644 --- a/playback/src/decoder/symphonia_decoder.rs +++ b/playback/src/decoder/symphonia_decoder.rs @@ -1,136 +1,173 @@ +use std::io; + +use symphonia::core::{ + audio::{SampleBuffer, SignalSpec}, + codecs::{Decoder, DecoderOptions}, + errors::Error, + formats::{FormatReader, SeekMode, SeekTo}, + io::{MediaSource, MediaSourceStream, MediaSourceStreamOptions}, + meta::{MetadataOptions, StandardTagKey, Value}, + probe::Hint, +}; + use super::{AudioDecoder, AudioPacket, DecoderError, DecoderResult}; -use crate::audio::AudioFile; - -use symphonia::core::audio::{AudioBufferRef, Channels}; -use symphonia::core::codecs::Decoder; -use symphonia::core::errors::Error as SymphoniaError; -use symphonia::core::formats::{FormatReader, SeekMode, SeekTo}; -use symphonia::core::io::{MediaSource, MediaSourceStream}; -use symphonia::core::units::TimeStamp; -use symphonia::default::{codecs::VorbisDecoder, formats::OggReader}; - -use std::io::{Read, Seek, SeekFrom}; - -impl MediaSource for FileWithConstSize -where - R: Read + Seek + Send, -{ - fn is_seekable(&self) -> bool { - true - } - - fn byte_len(&self) -> Option { - Some(self.len()) - } -} - -pub struct FileWithConstSize { - stream: T, - len: u64, -} - -impl FileWithConstSize { - pub fn len(&self) -> u64 { - self.len - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -impl FileWithConstSize -where - T: Seek, -{ - pub fn new(mut stream: T) -> Self { - stream.seek(SeekFrom::End(0)).unwrap(); - let len = stream.stream_position().unwrap(); - stream.seek(SeekFrom::Start(0)).unwrap(); - Self { stream, len } - } -} - -impl Read for FileWithConstSize -where - T: Read, -{ - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - self.stream.read(buf) - } -} - -impl Seek for FileWithConstSize -where - T: Seek, -{ - fn seek(&mut self, pos: SeekFrom) -> std::io::Result { - self.stream.seek(pos) - } -} +use crate::{metadata::audio::AudioFileFormat, player::NormalisationData}; pub struct SymphoniaDecoder { track_id: u32, decoder: Box, format: Box, - position: TimeStamp, + sample_buffer: SampleBuffer, } impl SymphoniaDecoder { - pub fn new(input: R) -> DecoderResult + pub fn new(input: R, format: AudioFileFormat) -> DecoderResult where - R: Read + Seek, + R: MediaSource + 'static, { - let mss_opts = Default::default(); - let mss = MediaSourceStream::new(Box::new(FileWithConstSize::new(input)), mss_opts); + let mss_opts = MediaSourceStreamOptions { + buffer_len: librespot_audio::MINIMUM_DOWNLOAD_SIZE, + }; + let mss = MediaSourceStream::new(Box::new(input), mss_opts); + + // Not necessary, but speeds up loading. + let mut hint = Hint::new(); + if Self::is_ogg_vorbis(format) { + hint.with_extension("ogg"); + hint.mime_type("audio/ogg"); + } else if Self::is_mp3(format) { + hint.with_extension("mp3"); + hint.mime_type("audio/mp3"); + } let format_opts = Default::default(); - let format = OggReader::try_new(mss, &format_opts).map_err(|e| DecoderError::SymphoniaDecoder(e.to_string()))?; + let metadata_opts: MetadataOptions = Default::default(); + let decoder_opts: DecoderOptions = Default::default(); - let track = format.default_track().unwrap(); - let decoder_opts = Default::default(); - let decoder = VorbisDecoder::try_new(&track.codec_params, &decoder_opts)?; + let probed = + symphonia::default::get_probe().format(&hint, mss, &format_opts, &metadata_opts)?; + let format = probed.format; + + let track = format.default_track().ok_or_else(|| { + DecoderError::SymphoniaDecoder("Could not retrieve default track".into()) + })?; + + let decoder = symphonia::default::get_codecs().make(&track.codec_params, &decoder_opts)?; + + let codec_params = decoder.codec_params(); + let rate = codec_params.sample_rate.ok_or_else(|| { + DecoderError::SymphoniaDecoder("Could not retrieve sample rate".into()) + })?; + let channels = codec_params.channels.ok_or_else(|| { + DecoderError::SymphoniaDecoder("Could not retrieve channel configuration".into()) + })?; + + if rate != crate::SAMPLE_RATE { + return Err(DecoderError::SymphoniaDecoder(format!( + "Unsupported sample rate: {}", + rate + ))); + } + + if channels.count() != crate::NUM_CHANNELS as usize { + return Err(DecoderError::SymphoniaDecoder(format!( + "Unsupported number of channels: {}", + channels + ))); + } + + // TODO: settle on a sane default depending on the format + let max_frames = decoder.codec_params().max_frames_per_packet.unwrap_or(8192); + let sample_buffer = SampleBuffer::new(max_frames, SignalSpec { rate, channels }); Ok(Self { track_id: track.id, - decoder: Box::new(decoder), - format: Box::new(format), - position: 0, + decoder, + format, + sample_buffer, }) } + + pub fn normalisation_data(&mut self) -> Option { + let mut metadata = self.format.metadata(); + loop { + if let Some(_discarded_revision) = metadata.pop() { + // Advance to the latest metadata revision. + continue; + } else { + let revision = metadata.current()?; + let tags = revision.tags(); + + if tags.is_empty() { + // The latest metadata entry in the log is empty. + return None; + } + + let mut data = NormalisationData::default(); + let mut i = 0; + while i < tags.len() { + if let Value::Float(value) = tags[i].value { + #[allow(non_snake_case)] + match tags[i].std_key { + Some(StandardTagKey::ReplayGainAlbumGain) => data.album_gain_db = value, + Some(StandardTagKey::ReplayGainAlbumPeak) => data.album_peak = value, + Some(StandardTagKey::ReplayGainTrackGain) => data.track_gain_db = value, + Some(StandardTagKey::ReplayGainTrackPeak) => data.track_peak = value, + _ => (), + } + } + i += 1; + } + + break Some(data); + } + } + } } impl AudioDecoder for SymphoniaDecoder { - fn seek(&mut self, absgp: u64) -> DecoderResult<()> { + // TODO : change to position ms + fn seek(&mut self, absgp: u64) -> Result { let seeked_to = self.format.seek( SeekMode::Accurate, - SeekTo::Time { - time: absgp, // TODO : move to Duration - track_id: Some(self.track_id), + SeekTo::TimeStamp { + ts: absgp, // TODO : move to Duration + track_id: self.track_id, }, )?; - self.position = seeked_to.actual_ts; - // TODO : Ok(self.position) - Ok(()) + Ok(seeked_to.actual_ts) } fn next_packet(&mut self) -> DecoderResult> { let packet = match self.format.next_packet() { Ok(packet) => packet, - Err(e) => { - log::error!("format error: {}", err); - return Err(DecoderError::SymphoniaDecoder(e.to_string())), + Err(Error::IoError(err)) => { + if err.kind() == io::ErrorKind::UnexpectedEof { + return Ok(None); + } else { + return Err(DecoderError::SymphoniaDecoder(err.to_string())); + } + } + Err(err) => { + return Err(err.into()); } }; + match self.decoder.decode(&packet) { Ok(audio_buf) => { - self.position += packet.frames() as TimeStamp; - Ok(Some(packet)) + // TODO : track current playback position + self.sample_buffer.copy_interleaved_ref(audio_buf); + Ok(Some(AudioPacket::Samples( + self.sample_buffer.samples().to_vec(), + ))) } - // TODO: Handle non-fatal decoding errors and retry. - Err(e) => - return Err(DecoderError::SymphoniaDecoder(e.to_string())), + Err(Error::ResetRequired) => { + // This may happen after a seek. + self.decoder.reset(); + self.next_packet() + } + Err(err) => Err(err.into()), } } } diff --git a/playback/src/player.rs b/playback/src/player.rs index 211e1795..f9120b83 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -16,6 +16,7 @@ use std::{ use byteorder::{LittleEndian, ReadBytesExt}; use futures_util::{future, stream::futures_unordered::FuturesUnordered, StreamExt, TryFutureExt}; use parking_lot::Mutex; +use symphonia::core::io::MediaSource; use tokio::sync::{mpsc, oneshot}; use crate::{ @@ -28,7 +29,7 @@ use crate::{ config::{Bitrate, NormalisationMethod, NormalisationType, PlayerConfig}, convert::Converter, core::{util::SeqGenerator, Error, Session, SpotifyId}, - decoder::{AudioDecoder, AudioPacket, DecoderError, PassthroughDecoder, VorbisDecoder}, + decoder::{AudioDecoder, AudioPacket, PassthroughDecoder, SymphoniaDecoder}, metadata::audio::{AudioFileFormat, AudioItem}, mixer::AudioFilter, }; @@ -220,10 +221,12 @@ pub fn ratio_to_db(ratio: f64) -> f64 { #[derive(Clone, Copy, Debug)] pub struct NormalisationData { - track_gain_db: f32, - track_peak: f32, - album_gain_db: f32, - album_peak: f32, + // Spotify provides these as `f32`, but audio metadata can contain up to `f64`. + // Also, this negates the need for casting during sample processing. + pub track_gain_db: f64, + pub track_peak: f64, + pub album_gain_db: f64, + pub album_peak: f64, } impl Default for NormalisationData { @@ -238,7 +241,7 @@ impl Default for NormalisationData { } impl NormalisationData { - fn parse_from_file(mut file: T) -> io::Result { + fn parse_from_ogg(mut file: T) -> io::Result { const SPOTIFY_NORMALIZATION_HEADER_START_OFFSET: u64 = 144; let newpos = file.seek(SeekFrom::Start(SPOTIFY_NORMALIZATION_HEADER_START_OFFSET))?; @@ -251,10 +254,10 @@ impl NormalisationData { return Ok(NormalisationData::default()); } - let track_gain_db = file.read_f32::()?; - let track_peak = file.read_f32::()?; - let album_gain_db = file.read_f32::()?; - let album_peak = file.read_f32::()?; + let track_gain_db = file.read_f32::()? as f64; + let track_peak = file.read_f32::()? as f64; + let album_gain_db = file.read_f32::()? as f64; + let album_peak = file.read_f32::()? as f64; let r = NormalisationData { track_gain_db, @@ -277,11 +280,11 @@ impl NormalisationData { [data.track_gain_db, data.track_peak] }; - let normalisation_power = gain_db as f64 + config.normalisation_pregain; + let normalisation_power = gain_db + config.normalisation_pregain; let mut normalisation_factor = db_to_ratio(normalisation_power); - if normalisation_factor * gain_peak as f64 > config.normalisation_threshold { - let limited_normalisation_factor = config.normalisation_threshold / gain_peak as f64; + if normalisation_factor * gain_peak > config.normalisation_threshold { + let limited_normalisation_factor = config.normalisation_threshold / gain_peak; let limited_normalisation_power = ratio_to_db(limited_normalisation_factor); if config.normalisation_method == NormalisationMethod::Basic { @@ -304,7 +307,7 @@ impl NormalisationData { normalisation_factor * 100.0 ); - normalisation_factor as f64 + normalisation_factor } } @@ -820,23 +823,34 @@ impl PlayerTrackLoader { } let duration_ms = audio.duration as u32; - // (Most) podcasts seem to support only 96 kbps Vorbis, so fall back to it - // TODO: update this logic once we also support MP3 and/or FLAC + // (Most) podcasts seem to support only 96 kbps Ogg Vorbis, so fall back to it let formats = match self.config.bitrate { Bitrate::Bitrate96 => [ AudioFileFormat::OGG_VORBIS_96, + AudioFileFormat::MP3_96, AudioFileFormat::OGG_VORBIS_160, + AudioFileFormat::MP3_160, + AudioFileFormat::MP3_256, AudioFileFormat::OGG_VORBIS_320, + AudioFileFormat::MP3_320, ], Bitrate::Bitrate160 => [ AudioFileFormat::OGG_VORBIS_160, + AudioFileFormat::MP3_160, AudioFileFormat::OGG_VORBIS_96, + AudioFileFormat::MP3_96, + AudioFileFormat::MP3_256, AudioFileFormat::OGG_VORBIS_320, + AudioFileFormat::MP3_320, ], Bitrate::Bitrate320 => [ AudioFileFormat::OGG_VORBIS_320, + AudioFileFormat::MP3_320, + AudioFileFormat::MP3_256, AudioFileFormat::OGG_VORBIS_160, + AudioFileFormat::MP3_160, AudioFileFormat::OGG_VORBIS_96, + AudioFileFormat::MP3_96, ], }; @@ -879,43 +893,48 @@ impl PlayerTrackLoader { let is_cached = encrypted_file.is_cached(); + // Setting up demuxing and decoding will trigger a seek() so always start in random access mode. let stream_loader_controller = encrypted_file.get_stream_loader_controller().ok()?; - - let key = match self.session.audio_key().request(spotify_id, file_id).await { - Ok(key) => key, - Err(e) => { - error!("Unable to load decryption key: {:?}", e); - return None; - } - }; - - let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); - - // Parsing normalisation data will trigger a seek() so always start in random access mode. stream_loader_controller.set_random_access_mode(); - let normalisation_data = match NormalisationData::parse_from_file(&mut decrypted_file) { - Ok(data) => data, - Err(_) => { - warn!("Unable to extract normalisation data, using default values."); - NormalisationData::default() + // Not all audio files are encrypted. If we can't get a key, try loading the track + // without decryption. If the file was encrypted after all, the decoder will fail + // parsing and bail out, so we should be safe from outputting ear-piercing noise. + let key = match self.session.audio_key().request(spotify_id, file_id).await { + Ok(key) => Some(key), + Err(e) => { + warn!("Unable to load key, continuing without decryption: {}", e); + None } }; + let decrypted_file = AudioDecrypt::new(key, encrypted_file); + let mut audio_file = + Subfile::new(decrypted_file, stream_loader_controller.len() as u64); - let audio_file = Subfile::new(decrypted_file, 0xa7); - + let mut normalisation_data = None; let result = if self.config.passthrough { - match PassthroughDecoder::new(audio_file) { - Ok(result) => Ok(Box::new(result) as Decoder), - Err(e) => Err(DecoderError::PassthroughDecoder(e.to_string())), - } + PassthroughDecoder::new(audio_file, format).map(|x| Box::new(x) as Decoder) } else { - match VorbisDecoder::new(audio_file) { - Ok(result) => Ok(Box::new(result) as Decoder), - Err(e) => Err(DecoderError::LewtonDecoder(e.to_string())), + // Spotify stores normalisation data in a custom Ogg packet instead of Vorbis comments. + if SymphoniaDecoder::is_ogg_vorbis(format) { + normalisation_data = NormalisationData::parse_from_ogg(&mut audio_file).ok(); } + + SymphoniaDecoder::new(audio_file, format).map(|mut decoder| { + // For formats other that Vorbis, we'll try getting normalisation data from + // ReplayGain metadata fields, if present. + if normalisation_data.is_none() { + normalisation_data = decoder.normalisation_data(); + } + Box::new(decoder) as Decoder + }) }; + let normalisation_data = normalisation_data.unwrap_or_else(|| { + warn!("Unable to get normalisation data, continuing with defaults."); + NormalisationData::default() + }); + let mut decoder = match result { Ok(decoder) => decoder, Err(e) if is_cached => { @@ -1035,7 +1054,7 @@ impl Future for PlayerInternal { track_id, e ); debug_assert!(self.state.is_loading()); - self.send_event(PlayerEvent::EndOfTrack { + self.send_event(PlayerEvent::Unavailable { track_id, play_request_id, }) @@ -2184,27 +2203,24 @@ impl fmt::Debug for PlayerState { } } } + struct Subfile { stream: T, - offset: u64, + length: u64, } impl Subfile { - pub fn new(mut stream: T, offset: u64) -> Subfile { - let target = SeekFrom::Start(offset); - match stream.seek(target) { + pub fn new(mut stream: T, length: u64) -> Subfile { + match stream.seek(SeekFrom::Start(0)) { Ok(pos) => { - if pos != offset { - error!( - "Subfile::new seeking to {:?} but position is now {:?}", - target, pos - ); + if pos != 0 { + error!("Subfile::new seeking to 0 but position is now {:?}", pos); } } Err(e) => error!("Subfile new Error: {}", e), } - Subfile { stream, offset } + Subfile { stream, length } } } @@ -2215,21 +2231,20 @@ impl Read for Subfile { } impl Seek for Subfile { - fn seek(&mut self, mut pos: SeekFrom) -> io::Result { - pos = match pos { - SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset), - x => x, - }; - - let newpos = self.stream.seek(pos)?; - - if newpos >= self.offset { - Ok(newpos - self.offset) - } else { - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "newpos < self.offset", - )) - } + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.stream.seek(pos) + } +} + +impl MediaSource for Subfile +where + R: Read + Seek + Send, +{ + fn is_seekable(&self) -> bool { + true + } + + fn byte_len(&self) -> Option { + Some(self.length) } }