Merge pull request #569 from philippe44/passthrough-v3

Allow pipeline writer to spit out Ogg directly, including when seeking
This commit is contained in:
Sasha Hilton 2021-02-23 00:16:01 +00:00 committed by GitHub
commit e8204c970e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 404 additions and 98 deletions

5
.gitignore vendored
View file

@ -3,4 +3,7 @@ target
spotify_appkey.key spotify_appkey.key
.vagrant/ .vagrant/
.project .project
.history .history
*.save

1
Cargo.lock generated
View file

@ -1470,6 +1470,7 @@ dependencies = [
"log 0.4.14", "log 0.4.14",
"num-bigint", "num-bigint",
"num-traits", "num-traits",
"ogg",
"tempfile", "tempfile",
"vorbis", "vorbis",
] ]

View file

@ -17,6 +17,7 @@ byteorder = "1.3"
bytes = "0.4" bytes = "0.4"
futures = "0.1" futures = "0.1"
lewton = "0.10" lewton = "0.10"
ogg = "0.8"
log = "0.4" log = "0.4"
num-bigint = "0.3" num-bigint = "0.3"
num-traits = "0.2" num-traits = "0.2"

View file

@ -2,12 +2,12 @@ extern crate lewton;
use self::lewton::inside_ogg::OggStreamReader; use self::lewton::inside_ogg::OggStreamReader;
use super::{AudioDecoder, AudioError, AudioPacket};
use std::error; use std::error;
use std::fmt; use std::fmt;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
pub struct VorbisDecoder<R: Read + Seek>(OggStreamReader<R>); pub struct VorbisDecoder<R: Read + Seek>(OggStreamReader<R>);
pub struct VorbisPacket(Vec<i16>);
pub struct VorbisError(lewton::VorbisError); pub struct VorbisError(lewton::VorbisError);
impl<R> VorbisDecoder<R> impl<R> VorbisDecoder<R>
@ -17,41 +17,38 @@ where
pub fn new(input: R) -> Result<VorbisDecoder<R>, VorbisError> { pub fn new(input: R) -> Result<VorbisDecoder<R>, VorbisError> {
Ok(VorbisDecoder(OggStreamReader::new(input)?)) Ok(VorbisDecoder(OggStreamReader::new(input)?))
} }
}
pub fn seek(&mut self, ms: i64) -> Result<(), VorbisError> { impl<R> AudioDecoder for VorbisDecoder<R>
where
R: Read + Seek,
{
fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
let absgp = ms * 44100 / 1000; let absgp = ms * 44100 / 1000;
self.0.seek_absgp_pg(absgp as u64)?; match self.0.seek_absgp_pg(absgp as u64) {
Ok(()) Ok(_) => return Ok(()),
Err(err) => return Err(AudioError::VorbisError(err.into())),
}
} }
pub fn next_packet(&mut self) -> Result<Option<VorbisPacket>, VorbisError> { fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> {
use self::lewton::audio::AudioReadError::AudioIsHeader; use self::lewton::audio::AudioReadError::AudioIsHeader;
use self::lewton::OggReadError::NoCapturePatternFound; use self::lewton::OggReadError::NoCapturePatternFound;
use self::lewton::VorbisError::BadAudio; use self::lewton::VorbisError::BadAudio;
use self::lewton::VorbisError::OggError; use self::lewton::VorbisError::OggError;
loop { loop {
match self.0.read_dec_packet_itl() { match self.0.read_dec_packet_itl() {
Ok(Some(packet)) => return Ok(Some(VorbisPacket(packet))), Ok(Some(packet)) => return Ok(Some(AudioPacket::Samples(packet))),
Ok(None) => return Ok(None), Ok(None) => return Ok(None),
Err(BadAudio(AudioIsHeader)) => (), Err(BadAudio(AudioIsHeader)) => (),
Err(OggError(NoCapturePatternFound)) => (), Err(OggError(NoCapturePatternFound)) => (),
Err(err) => return Err(err.into()), Err(err) => return Err(AudioError::VorbisError(err.into())),
} }
} }
} }
} }
impl VorbisPacket {
pub fn data(&self) -> &[i16] {
&self.0
}
pub fn data_mut(&mut self) -> &mut [i16] {
&mut self.0
}
}
impl From<lewton::VorbisError> for VorbisError { impl From<lewton::VorbisError> for VorbisError {
fn from(err: lewton::VorbisError) -> VorbisError { fn from(err: lewton::VorbisError) -> VorbisError {
VorbisError(err) VorbisError(err)

View file

@ -20,6 +20,7 @@ mod fetch;
mod lewton_decoder; mod lewton_decoder;
#[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] #[cfg(any(feature = "with-tremor", feature = "with-vorbis"))]
mod libvorbis_decoder; mod libvorbis_decoder;
mod passthrough_decoder;
mod range_set; mod range_set;
@ -29,8 +30,70 @@ pub use fetch::{
READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
}; };
use std::fmt;
pub enum AudioPacket {
Samples(Vec<i16>),
OggData(Vec<u8>),
}
impl AudioPacket {
pub fn samples(&self) -> &[i16] {
match self {
AudioPacket::Samples(s) => s,
AudioPacket::OggData(_) => panic!("can't return OggData on samples"),
}
}
pub fn oggdata(&self) -> &[u8] {
match self {
AudioPacket::Samples(_) => panic!("can't return samples on OggData"),
AudioPacket::OggData(d) => d,
}
}
pub fn is_empty(&self) -> bool {
match self {
AudioPacket::Samples(s) => s.is_empty(),
AudioPacket::OggData(d) => d.is_empty(),
}
}
}
#[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] #[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))]
pub use crate::lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; pub use crate::lewton_decoder::{VorbisDecoder, VorbisError};
#[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] #[cfg(any(feature = "with-tremor", feature = "with-vorbis"))]
pub use libvorbis_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; pub use libvorbis_decoder::{VorbisDecoder, VorbisError};
pub use passthrough_decoder::{PassthroughDecoder, PassthroughError};
#[derive(Debug)]
pub enum AudioError {
PassthroughError(PassthroughError),
VorbisError(VorbisError),
}
impl fmt::Display for AudioError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AudioError::PassthroughError(err) => write!(f, "PassthroughError({})", err),
AudioError::VorbisError(err) => write!(f, "VorbisError({})", err),
}
}
}
impl From<VorbisError> for AudioError {
fn from(err: VorbisError) -> AudioError {
AudioError::VorbisError(VorbisError::from(err))
}
}
impl From<PassthroughError> for AudioError {
fn from(err: PassthroughError) -> AudioError {
AudioError::PassthroughError(PassthroughError::from(err))
}
}
pub trait AudioDecoder {
fn seek(&mut self, ms: i64) -> Result<(), AudioError>;
fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError>;
}

View file

@ -3,12 +3,12 @@ extern crate librespot_tremor as vorbis;
#[cfg(not(feature = "with-tremor"))] #[cfg(not(feature = "with-tremor"))]
extern crate vorbis; extern crate vorbis;
use super::{AudioDecoder, AudioError, AudioPacket};
use std::error; use std::error;
use std::fmt; use std::fmt;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
pub struct VorbisDecoder<R: Read + Seek>(vorbis::Decoder<R>); pub struct VorbisDecoder<R: Read + Seek>(vorbis::Decoder<R>);
pub struct VorbisPacket(vorbis::Packet);
pub struct VorbisError(vorbis::VorbisError); pub struct VorbisError(vorbis::VorbisError);
impl<R> VorbisDecoder<R> impl<R> VorbisDecoder<R>
@ -18,23 +18,28 @@ where
pub fn new(input: R) -> Result<VorbisDecoder<R>, VorbisError> { pub fn new(input: R) -> Result<VorbisDecoder<R>, VorbisError> {
Ok(VorbisDecoder(vorbis::Decoder::new(input)?)) Ok(VorbisDecoder(vorbis::Decoder::new(input)?))
} }
}
impl<R> AudioDecoder for VorbisDecoder<R>
where
R: Read + Seek,
{
#[cfg(not(feature = "with-tremor"))] #[cfg(not(feature = "with-tremor"))]
pub fn seek(&mut self, ms: i64) -> Result<(), VorbisError> { fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
self.0.time_seek(ms as f64 / 1000f64)?; self.0.time_seek(ms as f64 / 1000f64)?;
Ok(()) Ok(())
} }
#[cfg(feature = "with-tremor")] #[cfg(feature = "with-tremor")]
pub fn seek(&mut self, ms: i64) -> Result<(), VorbisError> { fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
self.0.time_seek(ms)?; self.0.time_seek(ms)?;
Ok(()) Ok(())
} }
pub fn next_packet(&mut self) -> Result<Option<VorbisPacket>, VorbisError> { fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> {
loop { loop {
match self.0.packets().next() { match self.0.packets().next() {
Some(Ok(packet)) => return Ok(Some(VorbisPacket(packet))), Some(Ok(packet)) => return Ok(Some(AudioPacket::Samples(packet.data))),
None => return Ok(None), None => return Ok(None),
Some(Err(vorbis::VorbisError::Hole)) => (), Some(Err(vorbis::VorbisError::Hole)) => (),
@ -44,16 +49,6 @@ where
} }
} }
impl VorbisPacket {
pub fn data(&self) -> &[i16] {
&self.0.data
}
pub fn data_mut(&mut self) -> &mut [i16] {
&mut self.0.data
}
}
impl From<vorbis::VorbisError> for VorbisError { impl From<vorbis::VorbisError> for VorbisError {
fn from(err: vorbis::VorbisError) -> VorbisError { fn from(err: vorbis::VorbisError) -> VorbisError {
VorbisError(err) VorbisError(err)
@ -77,3 +72,9 @@ impl error::Error for VorbisError {
error::Error::source(&self.0) error::Error::source(&self.0)
} }
} }
impl From<vorbis::VorbisError> for AudioError {
fn from(err: vorbis::VorbisError) -> AudioError {
AudioError::VorbisError(VorbisError(err))
}
}

View file

@ -0,0 +1,191 @@
// Passthrough decoder for librespot
use super::{AudioDecoder, AudioError, AudioPacket};
use ogg::{OggReadError, Packet, PacketReader, PacketWriteEndInfo, PacketWriter};
use std::fmt;
use std::io::{Read, Seek};
use std::time::{SystemTime, UNIX_EPOCH};
fn write_headers<T: Read + Seek>(
rdr: &mut PacketReader<T>,
wtr: &mut PacketWriter<Vec<u8>>,
) -> Result<u32, PassthroughError> {
let mut stream_serial: u32 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u32;
// search for ident, comment, setup
get_header(1, rdr, wtr, &mut stream_serial, PacketWriteEndInfo::EndPage)?;
get_header(
3,
rdr,
wtr,
&mut stream_serial,
PacketWriteEndInfo::NormalPacket,
)?;
get_header(5, rdr, wtr, &mut stream_serial, PacketWriteEndInfo::EndPage)?;
// remove un-needed packets
rdr.delete_unread_packets();
return Ok(stream_serial);
}
fn get_header<T>(
code: u8,
rdr: &mut PacketReader<T>,
wtr: &mut PacketWriter<Vec<u8>>,
stream_serial: &mut u32,
info: PacketWriteEndInfo,
) -> Result<u32, PassthroughError>
where
T: Read + Seek,
{
let pck: Packet = rdr.read_packet_expected()?;
// set a unique serial number
if pck.stream_serial() != 0 {
*stream_serial = pck.stream_serial();
}
let pkt_type = pck.data[0];
debug!("Vorbis header type{}", &pkt_type);
// all headers are mandatory
if pkt_type != code {
return Err(PassthroughError(OggReadError::InvalidData));
}
// headers keep original granule number
let absgp_page = pck.absgp_page();
wtr.write_packet(
pck.data.into_boxed_slice(),
*stream_serial,
info,
absgp_page,
)
.unwrap();
return Ok(*stream_serial);
}
pub struct PassthroughDecoder<R: Read + Seek> {
rdr: PacketReader<R>,
wtr: PacketWriter<Vec<u8>>,
lastgp_page: Option<u64>,
absgp_page: u64,
stream_serial: u32,
}
pub struct PassthroughError(ogg::OggReadError);
impl<R: Read + Seek> PassthroughDecoder<R> {
/// Constructs a new Decoder from a given implementation of `Read + Seek`.
pub fn new(rdr: R) -> Result<Self, PassthroughError> {
let mut rdr = PacketReader::new(rdr);
let mut wtr = PacketWriter::new(Vec::new());
let stream_serial = write_headers(&mut rdr, &mut wtr)?;
info!("Starting passthrough track with serial {}", stream_serial);
return Ok(PassthroughDecoder {
rdr,
wtr,
lastgp_page: Some(0),
absgp_page: 0,
stream_serial,
});
}
}
impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
info!("Seeking to {}", ms);
self.lastgp_page = match ms {
0 => Some(0),
_ => None,
};
// hard-coded to 44.1 kHz
match self.rdr.seek_absgp(None, (ms * 44100 / 1000) as u64) {
Ok(_) => return Ok(()),
Err(err) => return Err(AudioError::PassthroughError(err.into())),
}
}
fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> {
let mut skip = self.lastgp_page.is_none();
loop {
let pck = match self.rdr.read_packet() {
Ok(Some(pck)) => pck,
Ok(None) | Err(OggReadError::NoCapturePatternFound) => {
info!("end of streaming");
return Ok(None);
}
Err(err) => return Err(AudioError::PassthroughError(err.into())),
};
let pckgp_page = pck.absgp_page();
let lastgp_page = self.lastgp_page.get_or_insert(pckgp_page);
// consume packets till next page to get a granule reference
if skip {
if *lastgp_page == pckgp_page {
debug!("skipping packet");
continue;
}
skip = false;
info!("skipped at {}", pckgp_page);
}
// now we can calculate absolute granule
self.absgp_page += pckgp_page - *lastgp_page;
self.lastgp_page = Some(pckgp_page);
// set packet type
let inf = if pck.last_in_stream() {
self.lastgp_page = Some(0);
PacketWriteEndInfo::EndStream
} else if pck.last_in_page() {
PacketWriteEndInfo::EndPage
} else {
PacketWriteEndInfo::NormalPacket
};
self.wtr
.write_packet(
pck.data.into_boxed_slice(),
self.stream_serial,
inf,
self.absgp_page,
)
.unwrap();
let data = self.wtr.inner_mut();
if data.len() > 0 {
let result = AudioPacket::OggData(std::mem::take(data));
return Ok(Some(result));
}
}
}
}
impl fmt::Debug for PassthroughError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl From<ogg::OggReadError> for PassthroughError {
fn from(err: OggReadError) -> PassthroughError {
PassthroughError(err)
}
}
impl fmt::Display for PassthroughError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use alsa::device_name::HintIter; use alsa::device_name::HintIter;
use alsa::pcm::{Access, Format, Frames, HwParams, PCM}; use alsa::pcm::{Access, Format, Frames, HwParams, PCM};
use alsa::{Direction, Error, ValueOr}; use alsa::{Direction, Error, ValueOr};
@ -124,8 +125,9 @@ impl Sink for AlsaSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
let mut processed_data = 0; let mut processed_data = 0;
let data = packet.samples();
while processed_data < data.len() { while processed_data < data.len() {
let data_to_buffer = min( let data_to_buffer = min(
self.buffer.capacity() - self.buffer.len(), self.buffer.capacity() - self.buffer.len(),

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use gst::prelude::*; use gst::prelude::*;
use gst::*; use gst::*;
use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::mpsc::{sync_channel, SyncSender};
@ -104,9 +105,9 @@ impl Sink for GstreamerSink {
fn stop(&mut self) -> io::Result<()> { fn stop(&mut self) -> io::Result<()> {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
// Copy expensively (in to_vec()) to avoid thread synchronization // Copy expensively (in to_vec()) to avoid thread synchronization
let deighta: &[u8] = data.as_bytes(); let deighta: &[u8] = packet.samples().as_bytes();
self.tx self.tx
.send(deighta.to_vec()) .send(deighta.to_vec())
.expect("tx send failed in write function"); .expect("tx send failed in write function");

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use jack::{ use jack::{
AsyncClient, AudioOut, Client, ClientOptions, Control, Port, ProcessHandler, ProcessScope, AsyncClient, AudioOut, Client, ClientOptions, Control, Port, ProcessHandler, ProcessScope,
}; };
@ -73,8 +74,8 @@ impl Sink for JackSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
for s in data.iter() { for s in packet.samples().iter() {
let res = self.send.send(*s); let res = self.send.send(*s);
if res.is_err() { if res.is_err() {
error!("jackaudio: cannot write to channel"); error!("jackaudio: cannot write to channel");

View file

@ -1,3 +1,4 @@
use crate::audio::AudioPacket;
use std::io; use std::io;
pub trait Open { pub trait Open {
@ -7,7 +8,7 @@ pub trait Open {
pub trait Sink { pub trait Sink {
fn start(&mut self) -> io::Result<()>; fn start(&mut self) -> io::Result<()>;
fn stop(&mut self) -> io::Result<()>; fn stop(&mut self) -> io::Result<()>;
fn write(&mut self, data: &[i16]) -> io::Result<()>; fn write(&mut self, packet: &AudioPacket) -> io::Result<()>;
} }
fn mk_sink<S: Sink + Open + 'static>(device: Option<String>) -> Box<dyn Sink> { fn mk_sink<S: Sink + Open + 'static>(device: Option<String>) -> Box<dyn Sink> {

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::{self, Write}; use std::io::{self, Write};
use std::mem; use std::mem;
@ -26,12 +27,15 @@ impl Sink for StdoutSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
let data: &[u8] = unsafe { let data: &[u8] = match packet {
slice::from_raw_parts( AudioPacket::Samples(data) => unsafe {
data.as_ptr() as *const u8, slice::from_raw_parts(
data.len() * mem::size_of::<i16>(), data.as_ptr() as *const u8,
) data.len() * mem::size_of::<i16>(),
)
},
AudioPacket::OggData(data) => data,
}; };
self.0.write_all(data)?; self.0.write_all(data)?;

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use portaudio_rs; use portaudio_rs;
use portaudio_rs::device::{get_default_output_index, DeviceIndex, DeviceInfo}; use portaudio_rs::device::{get_default_output_index, DeviceIndex, DeviceInfo};
use portaudio_rs::stream::*; use portaudio_rs::stream::*;
@ -95,8 +96,8 @@ impl<'a> Sink for PortAudioSink<'a> {
self.0 = None; self.0 = None;
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
match self.0.as_mut().unwrap().write(data) { match self.0.as_mut().unwrap().write(packet.samples()) {
Ok(_) => (), Ok(_) => (),
Err(portaudio_rs::PaError::OutputUnderflowed) => error!("PortAudio write underflow"), Err(portaudio_rs::PaError::OutputUnderflowed) => error!("PortAudio write underflow"),
Err(e) => panic!("PA Error {}", e), Err(e) => panic!("PA Error {}", e),

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use libpulse_binding::{self as pulse, stream::Direction}; use libpulse_binding::{self as pulse, stream::Direction};
use libpulse_simple_binding::Simple; use libpulse_simple_binding::Simple;
use std::io; use std::io;
@ -65,13 +66,17 @@ impl Sink for PulseAudioSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
if let Some(s) = &self.s { if let Some(s) = &self.s {
// SAFETY: An i16 consists of two bytes, so that the given slice can be interpreted // SAFETY: An i16 consists of two bytes, so that the given slice can be interpreted
// as a byte array of double length. Each byte pointer is validly aligned, and so // as a byte array of double length. Each byte pointer is validly aligned, and so
// is the newly created slice. // is the newly created slice.
let d: &[u8] = let d: &[u8] = unsafe {
unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, data.len() * 2) }; std::slice::from_raw_parts(
packet.samples().as_ptr() as *const u8,
packet.samples().len() * 2,
)
};
match s.write(d) { match s.write(d) {
Ok(_) => Ok(()), Ok(_) => Ok(()),

View file

@ -1,6 +1,7 @@
use super::{Open, Sink}; use super::{Open, Sink};
extern crate cpal; extern crate cpal;
extern crate rodio; extern crate rodio;
use crate::audio::AudioPacket;
use cpal::traits::{DeviceTrait, HostTrait}; use cpal::traits::{DeviceTrait, HostTrait};
use std::process::exit; use std::process::exit;
use std::{io, thread, time}; use std::{io, thread, time};
@ -164,8 +165,8 @@ impl Sink for RodioSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
let source = rodio::buffer::SamplesBuffer::new(2, 44100, data); let source = rodio::buffer::SamplesBuffer::new(2, 44100, packet.samples());
self.rodio_sink.append(source); self.rodio_sink.append(source);
// Chunk sizes seem to be about 256 to 3000 ish items long. // Chunk sizes seem to be about 256 to 3000 ish items long.

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use sdl2::audio::{AudioQueue, AudioSpecDesired}; use sdl2::audio::{AudioQueue, AudioSpecDesired};
use std::{io, thread, time}; use std::{io, thread, time};
@ -45,12 +46,12 @@ impl Sink for SdlSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
while self.queue.size() > (2 * 2 * 44_100) { while self.queue.size() > (2 * 2 * 44_100) {
// sleep and wait for sdl thread to drain the queue a bit // sleep and wait for sdl thread to drain the queue a bit
thread::sleep(time::Duration::from_millis(10)); thread::sleep(time::Duration::from_millis(10));
} }
self.queue.queue(data); self.queue.queue(packet.samples());
Ok(()) Ok(())
} }
} }

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use shell_words::split; use shell_words::split;
use std::io::{self, Write}; use std::io::{self, Write};
use std::mem; use std::mem;
@ -43,11 +44,11 @@ impl Sink for SubprocessSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
let data: &[u8] = unsafe { let data: &[u8] = unsafe {
slice::from_raw_parts( slice::from_raw_parts(
data.as_ptr() as *const u8, packet.samples().as_ptr() as *const u8,
data.len() * mem::size_of::<i16>(), packet.samples().len() * mem::size_of::<i16>(),
) )
}; };
if let Some(child) = &mut self.child { if let Some(child) = &mut self.child {

View file

@ -55,6 +55,7 @@ pub struct PlayerConfig {
pub normalisation_type: NormalisationType, pub normalisation_type: NormalisationType,
pub normalisation_pregain: f32, pub normalisation_pregain: f32,
pub gapless: bool, pub gapless: bool,
pub passthrough: bool,
} }
impl Default for PlayerConfig { impl Default for PlayerConfig {
@ -65,6 +66,7 @@ impl Default for PlayerConfig {
normalisation_type: NormalisationType::default(), normalisation_type: NormalisationType::default(),
normalisation_pregain: 0.0, normalisation_pregain: 0.0,
gapless: true, gapless: true,
passthrough: false,
} }
} }
} }

View file

@ -15,8 +15,8 @@ use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator; use librespot_core::util::SeqGenerator;
use crate::audio::{AudioDecoder, AudioError, AudioPacket, PassthroughDecoder, VorbisDecoder};
use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController};
use crate::audio::{VorbisDecoder, VorbisPacket};
use crate::audio::{ use crate::audio::{
READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
@ -378,7 +378,7 @@ enum PlayerPreload {
}, },
} }
type Decoder = VorbisDecoder<Subfile<AudioDecrypt<AudioFile>>>; type Decoder = Box<dyn AudioDecoder + Send>;
enum PlayerState { enum PlayerState {
Stopped, Stopped,
@ -723,7 +723,19 @@ impl PlayerTrackLoader {
let audio_file = Subfile::new(decrypted_file, 0xa7); let audio_file = Subfile::new(decrypted_file, 0xa7);
let mut decoder = match VorbisDecoder::new(audio_file) { let result = if self.config.passthrough {
match PassthroughDecoder::new(audio_file) {
Ok(result) => Ok(Box::new(result) as Decoder),
Err(e) => Err(AudioError::PassthroughError(e)),
}
} else {
match VorbisDecoder::new(audio_file) {
Ok(result) => Ok(Box::new(result) as Decoder),
Err(e) => Err(AudioError::VorbisError(e)),
}
};
let mut decoder = match result {
Ok(decoder) => decoder, Ok(decoder) => decoder,
Err(e) if is_cached => { Err(e) if is_cached => {
warn!( warn!(
@ -873,37 +885,44 @@ impl Future for PlayerInternal {
{ {
let packet = decoder.next_packet().expect("Vorbis error"); let packet = decoder.next_packet().expect("Vorbis error");
if let Some(ref packet) = packet { if !self.config.passthrough {
*stream_position_pcm = if let Some(ref packet) = packet {
*stream_position_pcm + (packet.data().len() / 2) as u64; *stream_position_pcm =
let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm); *stream_position_pcm + (packet.samples().len() / 2) as u64;
let stream_position_millis =
Self::position_pcm_to_ms(*stream_position_pcm);
let notify_about_position = match *reported_nominal_start_time { let notify_about_position = match *reported_nominal_start_time {
None => true, None => true,
Some(reported_nominal_start_time) => { Some(reported_nominal_start_time) => {
// only notify if we're behind. If we're ahead it's probably due to a buffer of the backend and we;re actually in time. // only notify if we're behind. If we're ahead it's probably due to a buffer of the backend and we;re actually in time.
let lag = (Instant::now() - reported_nominal_start_time).as_millis() let lag = (Instant::now() - reported_nominal_start_time)
as i64 .as_millis()
- stream_position_millis as i64; as i64
if lag > 1000 { - stream_position_millis as i64;
true if lag > 1000 {
} else { true
false } else {
false
}
} }
};
if notify_about_position {
*reported_nominal_start_time = Some(
Instant::now()
- Duration::from_millis(stream_position_millis as u64),
);
self.send_event(PlayerEvent::Playing {
track_id,
play_request_id,
position_ms: stream_position_millis as u32,
duration_ms,
});
} }
};
if notify_about_position {
*reported_nominal_start_time = Some(
Instant::now()
- Duration::from_millis(stream_position_millis as u64),
);
self.send_event(PlayerEvent::Playing {
track_id,
play_request_id,
position_ms: stream_position_millis as u32,
duration_ms,
});
} }
} else {
// position, even if irrelevant, must be set so that seek() is called
*stream_position_pcm = duration_ms.into();
} }
self.handle_packet(packet, normalisation_factor); self.handle_packet(packet, normalisation_factor);
@ -1085,21 +1104,23 @@ impl PlayerInternal {
} }
} }
fn handle_packet(&mut self, packet: Option<VorbisPacket>, normalisation_factor: f32) { fn handle_packet(&mut self, packet: Option<AudioPacket>, normalisation_factor: f32) {
match packet { match packet {
Some(mut packet) => { Some(mut packet) => {
if packet.data().len() > 0 { if !packet.is_empty() {
if let Some(ref editor) = self.audio_filter { if let AudioPacket::Samples(ref mut data) = packet {
editor.modify_stream(&mut packet.data_mut()) if let Some(ref editor) = self.audio_filter {
}; editor.modify_stream(data)
}
if self.config.normalisation && normalisation_factor != 1.0 { if self.config.normalisation && normalisation_factor != 1.0 {
for x in packet.data_mut().iter_mut() { for x in data.iter_mut() {
*x = (*x as f32 * normalisation_factor) as i16; *x = (*x as f32 * normalisation_factor) as i16;
}
} }
} }
if let Err(err) = self.sink.write(&packet.data()) { if let Err(err) = self.sink.write(&packet) {
error!("Could not write audio: {}", err); error!("Could not write audio: {}", err);
self.ensure_sink_stopped(false); self.ensure_sink_stopped(false);
} }

View file

@ -204,6 +204,11 @@ fn setup(args: &[String]) -> Setup {
"", "",
"disable-gapless", "disable-gapless",
"disable gapless playback.", "disable gapless playback.",
)
.optflag(
"",
"passthrough",
"Pass raw stream to output, only works for \"pipe\"."
); );
let matches = match opts.parse(&args[1..]) { let matches = match opts.parse(&args[1..]) {
@ -354,6 +359,8 @@ fn setup(args: &[String]) -> Setup {
} }
}; };
let passthrough = matches.opt_present("passthrough");
let player_config = { let player_config = {
let bitrate = matches let bitrate = matches
.opt_str("b") .opt_str("b")
@ -376,6 +383,7 @@ fn setup(args: &[String]) -> Setup {
.opt_str("normalisation-pregain") .opt_str("normalisation-pregain")
.map(|pregain| pregain.parse::<f32>().expect("Invalid pregain float value")) .map(|pregain| pregain.parse::<f32>().expect("Invalid pregain float value"))
.unwrap_or(PlayerConfig::default().normalisation_pregain), .unwrap_or(PlayerConfig::default().normalisation_pregain),
passthrough,
} }
}; };