Implement common SinkError and SinkResult (#820)

* Make error messages more consistent and concise.

* `impl From<AlsaError> for io::Error` so `AlsaErrors` can be thrown to player as `io::Errors`. This little bit of boilerplate goes a long way to simplifying things further down in the code. And will make any needed future changes easier.

* Bonus: handle ALSA backend buffer sizing a little better.
This commit is contained in:
Jason Gray 2021-09-27 13:46:26 -05:00 committed by GitHub
parent 57937a10d9
commit 8d70fd910e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 275 additions and 220 deletions

View file

@ -1,4 +1,4 @@
use super::{Open, Sink, SinkAsBytes}; use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult};
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
@ -7,7 +7,6 @@ use alsa::device_name::HintIter;
use alsa::pcm::{Access, Format, HwParams, PCM}; use alsa::pcm::{Access, Format, HwParams, PCM};
use alsa::{Direction, ValueOr}; use alsa::{Direction, ValueOr};
use std::cmp::min; use std::cmp::min;
use std::io;
use std::process::exit; use std::process::exit;
use std::time::Duration; use std::time::Duration;
use thiserror::Error; use thiserror::Error;
@ -18,34 +17,67 @@ const BUFFER_TIME: Duration = Duration::from_millis(500);
#[derive(Debug, Error)] #[derive(Debug, Error)]
enum AlsaError { enum AlsaError {
#[error("AlsaSink, device {device} may be invalid or busy, {err}")] #[error("<AlsaSink> Device {device} Unsupported Format {alsa_format:?} ({format:?}), {e}")]
PcmSetUp { device: String, err: alsa::Error },
#[error("AlsaSink, device {device} unsupported access type RWInterleaved, {err}")]
UnsupportedAccessType { device: String, err: alsa::Error },
#[error("AlsaSink, device {device} unsupported format {format:?}, {err}")]
UnsupportedFormat { UnsupportedFormat {
device: String, device: String,
alsa_format: Format,
format: AudioFormat, format: AudioFormat,
err: alsa::Error, e: alsa::Error,
}, },
#[error("AlsaSink, device {device} unsupported sample rate {samplerate}, {err}")]
UnsupportedSampleRate { #[error("<AlsaSink> Device {device} Unsupported Channel Count {channel_count}, {e}")]
device: String,
samplerate: u32,
err: alsa::Error,
},
#[error("AlsaSink, device {device} unsupported channel count {channel_count}, {err}")]
UnsupportedChannelCount { UnsupportedChannelCount {
device: String, device: String,
channel_count: u8, channel_count: u8,
err: alsa::Error, e: alsa::Error,
}, },
#[error("AlsaSink Hardware Parameters Error, {0}")]
#[error("<AlsaSink> Device {device} Unsupported Sample Rate {samplerate}, {e}")]
UnsupportedSampleRate {
device: String,
samplerate: u32,
e: alsa::Error,
},
#[error("<AlsaSink> Device {device} Unsupported Access Type RWInterleaved, {e}")]
UnsupportedAccessType { device: String, e: alsa::Error },
#[error("<AlsaSink> Device {device} May be Invalid, Busy, or Already in Use, {e}")]
PcmSetUp { device: String, e: alsa::Error },
#[error("<AlsaSink> Failed to Drain PCM Buffer, {0}")]
DrainFailure(alsa::Error),
#[error("<AlsaSink> {0}")]
OnWrite(alsa::Error),
#[error("<AlsaSink> Hardware, {0}")]
HwParams(alsa::Error), HwParams(alsa::Error),
#[error("AlsaSink Software Parameters Error, {0}")]
#[error("<AlsaSink> Software, {0}")]
SwParams(alsa::Error), SwParams(alsa::Error),
#[error("AlsaSink PCM Error, {0}")]
#[error("<AlsaSink> PCM, {0}")]
Pcm(alsa::Error), Pcm(alsa::Error),
#[error("<AlsaSink> Could Not Parse Ouput Name(s) and/or Description(s)")]
Parsing,
#[error("<AlsaSink>")]
NotConnected,
}
impl From<AlsaError> for SinkError {
fn from(e: AlsaError) -> SinkError {
use AlsaError::*;
let es = e.to_string();
match e {
DrainFailure(_) | OnWrite(_) => SinkError::OnWrite(es),
PcmSetUp { .. } => SinkError::ConnectionRefused(es),
NotConnected => SinkError::NotConnected(es),
_ => SinkError::InvalidParams(es),
}
}
} }
pub struct AlsaSink { pub struct AlsaSink {
@ -55,25 +87,19 @@ pub struct AlsaSink {
period_buffer: Vec<u8>, period_buffer: Vec<u8>,
} }
fn list_outputs() -> io::Result<()> { fn list_outputs() -> SinkResult<()> {
println!("Listing available Alsa outputs:"); println!("Listing available Alsa outputs:");
for t in &["pcm", "ctl", "hwdep"] { for t in &["pcm", "ctl", "hwdep"] {
println!("{} devices:", t); println!("{} devices:", t);
let i = match HintIter::new_str(None, t) {
Ok(i) => i, let i = HintIter::new_str(None, &t).map_err(|_| AlsaError::Parsing)?;
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, e));
}
};
for a in i { for a in i {
if let Some(Direction::Playback) = a.direction { if let Some(Direction::Playback) = a.direction {
// mimic aplay -L // mimic aplay -L
let name = a let name = a.name.ok_or(AlsaError::Parsing)?;
.name let desc = a.desc.ok_or(AlsaError::Parsing)?;
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Could not parse name"))?;
let desc = a
.desc
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Could not parse desc"))?;
println!("{}\n\t{}\n", name, desc.replace("\n", "\n\t")); println!("{}\n\t{}\n", name, desc.replace("\n", "\n\t"));
} }
} }
@ -82,10 +108,10 @@ fn list_outputs() -> io::Result<()> {
Ok(()) Ok(())
} }
fn open_device(dev_name: &str, format: AudioFormat) -> Result<(PCM, usize), AlsaError> { fn open_device(dev_name: &str, format: AudioFormat) -> SinkResult<(PCM, usize)> {
let pcm = PCM::new(dev_name, Direction::Playback, false).map_err(|e| AlsaError::PcmSetUp { let pcm = PCM::new(dev_name, Direction::Playback, false).map_err(|e| AlsaError::PcmSetUp {
device: dev_name.to_string(), device: dev_name.to_string(),
err: e, e,
})?; })?;
let alsa_format = match format { let alsa_format = match format {
@ -103,24 +129,26 @@ fn open_device(dev_name: &str, format: AudioFormat) -> Result<(PCM, usize), Alsa
let bytes_per_period = { let bytes_per_period = {
let hwp = HwParams::any(&pcm).map_err(AlsaError::HwParams)?; let hwp = HwParams::any(&pcm).map_err(AlsaError::HwParams)?;
hwp.set_access(Access::RWInterleaved) hwp.set_access(Access::RWInterleaved)
.map_err(|e| AlsaError::UnsupportedAccessType { .map_err(|e| AlsaError::UnsupportedAccessType {
device: dev_name.to_string(), device: dev_name.to_string(),
err: e, e,
})?; })?;
hwp.set_format(alsa_format) hwp.set_format(alsa_format)
.map_err(|e| AlsaError::UnsupportedFormat { .map_err(|e| AlsaError::UnsupportedFormat {
device: dev_name.to_string(), device: dev_name.to_string(),
alsa_format,
format, format,
err: e, e,
})?; })?;
hwp.set_rate(SAMPLE_RATE, ValueOr::Nearest).map_err(|e| { hwp.set_rate(SAMPLE_RATE, ValueOr::Nearest).map_err(|e| {
AlsaError::UnsupportedSampleRate { AlsaError::UnsupportedSampleRate {
device: dev_name.to_string(), device: dev_name.to_string(),
samplerate: SAMPLE_RATE, samplerate: SAMPLE_RATE,
err: e, e,
} }
})?; })?;
@ -128,7 +156,7 @@ fn open_device(dev_name: &str, format: AudioFormat) -> Result<(PCM, usize), Alsa
.map_err(|e| AlsaError::UnsupportedChannelCount { .map_err(|e| AlsaError::UnsupportedChannelCount {
device: dev_name.to_string(), device: dev_name.to_string(),
channel_count: NUM_CHANNELS, channel_count: NUM_CHANNELS,
err: e, e,
})?; })?;
hwp.set_buffer_time_near(BUFFER_TIME.as_micros() as u32, ValueOr::Nearest) hwp.set_buffer_time_near(BUFFER_TIME.as_micros() as u32, ValueOr::Nearest)
@ -141,8 +169,7 @@ fn open_device(dev_name: &str, format: AudioFormat) -> Result<(PCM, usize), Alsa
let swp = pcm.sw_params_current().map_err(AlsaError::Pcm)?; let swp = pcm.sw_params_current().map_err(AlsaError::Pcm)?;
// Don't assume we got what we wanted. // Don't assume we got what we wanted. Ask to make sure.
// Ask to make sure.
let frames_per_period = hwp.get_period_size().map_err(AlsaError::HwParams)?; let frames_per_period = hwp.get_period_size().map_err(AlsaError::HwParams)?;
let frames_per_buffer = hwp.get_buffer_size().map_err(AlsaError::HwParams)?; let frames_per_buffer = hwp.get_buffer_size().map_err(AlsaError::HwParams)?;
@ -171,8 +198,8 @@ impl Open for AlsaSink {
Ok(_) => { Ok(_) => {
exit(0); exit(0);
} }
Err(err) => { Err(e) => {
error!("Error listing Alsa outputs, {}", err); error!("{}", e);
exit(1); exit(1);
} }
}, },
@ -193,21 +220,20 @@ impl Open for AlsaSink {
} }
impl Sink for AlsaSink { impl Sink for AlsaSink {
fn start(&mut self) -> io::Result<()> { fn start(&mut self) -> SinkResult<()> {
if self.pcm.is_none() { if self.pcm.is_none() {
match open_device(&self.device, self.format) { let (pcm, bytes_per_period) = open_device(&self.device, self.format)?;
Ok((pcm, bytes_per_period)) => {
self.pcm = Some(pcm); self.pcm = Some(pcm);
// If the capacity is greater than we want shrink it
// to it's current len (which should be zero) before let current_capacity = self.period_buffer.capacity();
// setting the capacity with reserve_exact.
if self.period_buffer.capacity() > bytes_per_period { if current_capacity > bytes_per_period {
self.period_buffer.truncate(bytes_per_period);
self.period_buffer.shrink_to_fit(); self.period_buffer.shrink_to_fit();
} else if current_capacity < bytes_per_period {
let extra = bytes_per_period - self.period_buffer.len();
self.period_buffer.reserve_exact(extra);
} }
// This does nothing if the capacity is already sufficient.
// Len should always be zero, but for the sake of being thorough...
self.period_buffer
.reserve_exact(bytes_per_period - self.period_buffer.len());
// Should always match the "Period Buffer size in bytes: " trace! message. // Should always match the "Period Buffer size in bytes: " trace! message.
trace!( trace!(
@ -215,31 +241,19 @@ impl Sink for AlsaSink {
self.period_buffer.capacity() self.period_buffer.capacity()
); );
} }
Err(e) => {
return Err(io::Error::new(io::ErrorKind::Other, e));
}
}
}
Ok(()) Ok(())
} }
fn stop(&mut self) -> io::Result<()> { fn stop(&mut self) -> SinkResult<()> {
// Zero fill the remainder of the period buffer and // Zero fill the remainder of the period buffer and
// write any leftover data before draining the actual PCM buffer. // write any leftover data before draining the actual PCM buffer.
self.period_buffer.resize(self.period_buffer.capacity(), 0); self.period_buffer.resize(self.period_buffer.capacity(), 0);
self.write_buf()?; self.write_buf()?;
let pcm = self.pcm.as_mut().ok_or_else(|| { let pcm = self.pcm.as_mut().ok_or(AlsaError::NotConnected)?;
io::Error::new(io::ErrorKind::Other, "Error stopping AlsaSink, PCM is None")
})?;
pcm.drain().map_err(|e| { pcm.drain().map_err(AlsaError::DrainFailure)?;
io::Error::new(
io::ErrorKind::Other,
format!("Error stopping AlsaSink {}", e),
)
})?;
self.pcm = None; self.pcm = None;
Ok(()) Ok(())
@ -249,23 +263,28 @@ impl Sink for AlsaSink {
} }
impl SinkAsBytes for AlsaSink { impl SinkAsBytes for AlsaSink {
fn write_bytes(&mut self, data: &[u8]) -> io::Result<()> { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
let mut start_index = 0; let mut start_index = 0;
let data_len = data.len(); let data_len = data.len();
let capacity = self.period_buffer.capacity(); let capacity = self.period_buffer.capacity();
loop { loop {
let data_left = data_len - start_index; let data_left = data_len - start_index;
let space_left = capacity - self.period_buffer.len(); let space_left = capacity - self.period_buffer.len();
let data_to_buffer = min(data_left, space_left); let data_to_buffer = min(data_left, space_left);
let end_index = start_index + data_to_buffer; let end_index = start_index + data_to_buffer;
self.period_buffer self.period_buffer
.extend_from_slice(&data[start_index..end_index]); .extend_from_slice(&data[start_index..end_index]);
if self.period_buffer.len() == capacity { if self.period_buffer.len() == capacity {
self.write_buf()?; self.write_buf()?;
} }
if end_index == data_len { if end_index == data_len {
break Ok(()); break Ok(());
} }
start_index = end_index; start_index = end_index;
} }
} }
@ -274,30 +293,18 @@ impl SinkAsBytes for AlsaSink {
impl AlsaSink { impl AlsaSink {
pub const NAME: &'static str = "alsa"; pub const NAME: &'static str = "alsa";
fn write_buf(&mut self) -> io::Result<()> { fn write_buf(&mut self) -> SinkResult<()> {
let pcm = self.pcm.as_mut().ok_or_else(|| { let pcm = self.pcm.as_mut().ok_or(AlsaError::NotConnected)?;
io::Error::new(
io::ErrorKind::Other, if let Err(e) = pcm.io_bytes().writei(&self.period_buffer) {
"Error writing from AlsaSink buffer to PCM, PCM is None",
)
})?;
let io = pcm.io_bytes();
if let Err(err) = io.writei(&self.period_buffer) {
// Capture and log the original error as a warning, and then try to recover. // Capture and log the original error as a warning, and then try to recover.
// If recovery fails then forward that error back to player. // If recovery fails then forward that error back to player.
warn!( warn!(
"Error writing from AlsaSink buffer to PCM, trying to recover {}", "Error writing from AlsaSink buffer to PCM, trying to recover, {}",
err
);
pcm.try_recover(err, false).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!(
"Error writing from AlsaSink buffer to PCM, recovery failed {}",
e e
), );
)
})? pcm.try_recover(e, false).map_err(AlsaError::OnWrite)?
} }
self.period_buffer.clear(); self.period_buffer.clear();

View file

@ -1,4 +1,4 @@
use super::{Open, Sink, SinkAsBytes}; use super::{Open, Sink, SinkAsBytes, SinkResult};
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
@ -11,7 +11,7 @@ use gst::prelude::*;
use zerocopy::AsBytes; use zerocopy::AsBytes;
use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::mpsc::{sync_channel, SyncSender};
use std::{io, thread}; use std::thread;
#[allow(dead_code)] #[allow(dead_code)]
pub struct GstreamerSink { pub struct GstreamerSink {
@ -131,7 +131,7 @@ impl Sink for GstreamerSink {
} }
impl SinkAsBytes for GstreamerSink { impl SinkAsBytes for GstreamerSink {
fn write_bytes(&mut self, data: &[u8]) -> io::Result<()> { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
// Copy expensively (in to_vec()) to avoid thread synchronization // Copy expensively (in to_vec()) to avoid thread synchronization
self.tx self.tx
.send(data.to_vec()) .send(data.to_vec())

View file

@ -1,4 +1,4 @@
use super::{Open, Sink}; use super::{Open, Sink, SinkError, SinkResult};
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
@ -6,7 +6,6 @@ use crate::NUM_CHANNELS;
use jack::{ use jack::{
AsyncClient, AudioOut, Client, ClientOptions, Control, Port, ProcessHandler, ProcessScope, AsyncClient, AudioOut, Client, ClientOptions, Control, Port, ProcessHandler, ProcessScope,
}; };
use std::io;
use std::sync::mpsc::{sync_channel, Receiver, SyncSender}; use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
pub struct JackSink { pub struct JackSink {
@ -70,10 +69,11 @@ impl Open for JackSink {
} }
impl Sink for JackSink { impl Sink for JackSink {
fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> SinkResult<()> {
let samples = packet let samples = packet
.samples() .samples()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| SinkError::OnWrite(e.to_string()))?;
let samples_f32: &[f32] = &converter.f64_to_f32(samples); let samples_f32: &[f32] = &converter.f64_to_f32(samples);
for sample in samples_f32.iter() { for sample in samples_f32.iter() {
let res = self.send.send(*sample); let res = self.send.send(*sample);

View file

@ -1,26 +1,40 @@
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
use std::io; use thiserror::Error;
#[derive(Debug, Error)]
pub enum SinkError {
#[error("Audio Sink Error Not Connected: {0}")]
NotConnected(String),
#[error("Audio Sink Error Connection Refused: {0}")]
ConnectionRefused(String),
#[error("Audio Sink Error On Write: {0}")]
OnWrite(String),
#[error("Audio Sink Error Invalid Parameters: {0}")]
InvalidParams(String),
}
pub type SinkResult<T> = Result<T, SinkError>;
pub trait Open { pub trait Open {
fn open(_: Option<String>, format: AudioFormat) -> Self; fn open(_: Option<String>, format: AudioFormat) -> Self;
} }
pub trait Sink { pub trait Sink {
fn start(&mut self) -> io::Result<()> { fn start(&mut self) -> SinkResult<()> {
Ok(()) Ok(())
} }
fn stop(&mut self) -> io::Result<()> { fn stop(&mut self) -> SinkResult<()> {
Ok(()) Ok(())
} }
fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> io::Result<()>; fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> SinkResult<()>;
} }
pub type SinkBuilder = fn(Option<String>, AudioFormat) -> Box<dyn Sink>; pub type SinkBuilder = fn(Option<String>, AudioFormat) -> Box<dyn Sink>;
pub trait SinkAsBytes { pub trait SinkAsBytes {
fn write_bytes(&mut self, data: &[u8]) -> io::Result<()>; fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()>;
} }
fn mk_sink<S: Sink + Open + 'static>(device: Option<String>, format: AudioFormat) -> Box<dyn Sink> { fn mk_sink<S: Sink + Open + 'static>(device: Option<String>, format: AudioFormat) -> Box<dyn Sink> {
@ -30,7 +44,7 @@ fn mk_sink<S: Sink + Open + 'static>(device: Option<String>, format: AudioFormat
// reuse code for various backends // reuse code for various backends
macro_rules! sink_as_bytes { macro_rules! sink_as_bytes {
() => { () => {
fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> SinkResult<()> {
use crate::convert::i24; use crate::convert::i24;
use zerocopy::AsBytes; use zerocopy::AsBytes;
match packet { match packet {

View file

@ -1,4 +1,4 @@
use super::{Open, Sink, SinkAsBytes}; use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult};
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
@ -23,14 +23,14 @@ impl Open for StdoutSink {
} }
impl Sink for StdoutSink { impl Sink for StdoutSink {
fn start(&mut self) -> io::Result<()> { fn start(&mut self) -> SinkResult<()> {
if self.output.is_none() { if self.output.is_none() {
let output: Box<dyn Write> = match self.path.as_deref() { let output: Box<dyn Write> = match self.path.as_deref() {
Some(path) => { Some(path) => {
let open_op = OpenOptions::new() let open_op = OpenOptions::new()
.write(true) .write(true)
.open(path) .open(path)
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; .map_err(|e| SinkError::ConnectionRefused(e.to_string()))?;
Box::new(open_op) Box::new(open_op)
} }
None => Box::new(io::stdout()), None => Box::new(io::stdout()),
@ -46,14 +46,18 @@ impl Sink for StdoutSink {
} }
impl SinkAsBytes for StdoutSink { impl SinkAsBytes for StdoutSink {
fn write_bytes(&mut self, data: &[u8]) -> io::Result<()> { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
match self.output.as_deref_mut() { match self.output.as_deref_mut() {
Some(output) => { Some(output) => {
output.write_all(data)?; output
output.flush()?; .write_all(data)
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
output
.flush()
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
} }
None => { None => {
return Err(io::Error::new(io::ErrorKind::Other, "Output is None")); return Err(SinkError::NotConnected("Output is None".to_string()));
} }
} }

View file

@ -1,11 +1,10 @@
use super::{Open, Sink}; use super::{Open, Sink, SinkError, SinkResult};
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
use crate::{NUM_CHANNELS, SAMPLE_RATE}; use crate::{NUM_CHANNELS, SAMPLE_RATE};
use portaudio_rs::device::{get_default_output_index, DeviceIndex, DeviceInfo}; use portaudio_rs::device::{get_default_output_index, DeviceIndex, DeviceInfo};
use portaudio_rs::stream::*; use portaudio_rs::stream::*;
use std::io;
use std::process::exit; use std::process::exit;
use std::time::Duration; use std::time::Duration;
@ -96,7 +95,7 @@ impl<'a> Open for PortAudioSink<'a> {
} }
impl<'a> Sink for PortAudioSink<'a> { impl<'a> Sink for PortAudioSink<'a> {
fn start(&mut self) -> io::Result<()> { fn start(&mut self) -> SinkResult<()> {
macro_rules! start_sink { macro_rules! start_sink {
(ref mut $stream: ident, ref $parameters: ident) => {{ (ref mut $stream: ident, ref $parameters: ident) => {{
if $stream.is_none() { if $stream.is_none() {
@ -125,7 +124,7 @@ impl<'a> Sink for PortAudioSink<'a> {
Ok(()) Ok(())
} }
fn stop(&mut self) -> io::Result<()> { fn stop(&mut self) -> SinkResult<()> {
macro_rules! stop_sink { macro_rules! stop_sink {
(ref mut $stream: ident) => {{ (ref mut $stream: ident) => {{
$stream.as_mut().unwrap().stop().unwrap(); $stream.as_mut().unwrap().stop().unwrap();
@ -141,7 +140,7 @@ impl<'a> Sink for PortAudioSink<'a> {
Ok(()) Ok(())
} }
fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> SinkResult<()> {
macro_rules! write_sink { macro_rules! write_sink {
(ref mut $stream: expr, $samples: expr) => { (ref mut $stream: expr, $samples: expr) => {
$stream.as_mut().unwrap().write($samples) $stream.as_mut().unwrap().write($samples)
@ -150,7 +149,7 @@ impl<'a> Sink for PortAudioSink<'a> {
let samples = packet let samples = packet
.samples() .samples()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| SinkError::OnWrite(e.to_string()))?;
let result = match self { let result = match self {
Self::F32(stream, _parameters) => { Self::F32(stream, _parameters) => {

View file

@ -1,11 +1,10 @@
use super::{Open, Sink, SinkAsBytes}; use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult};
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
use crate::{NUM_CHANNELS, SAMPLE_RATE}; use crate::{NUM_CHANNELS, SAMPLE_RATE};
use libpulse_binding::{self as pulse, error::PAErr, stream::Direction}; use libpulse_binding::{self as pulse, error::PAErr, stream::Direction};
use libpulse_simple_binding::Simple; use libpulse_simple_binding::Simple;
use std::io;
use thiserror::Error; use thiserror::Error;
const APP_NAME: &str = "librespot"; const APP_NAME: &str = "librespot";
@ -13,18 +12,40 @@ const STREAM_NAME: &str = "Spotify endpoint";
#[derive(Debug, Error)] #[derive(Debug, Error)]
enum PulseError { enum PulseError {
#[error("Error starting PulseAudioSink, invalid PulseAudio sample spec")] #[error("<PulseAudioSink> Unsupported Pulseaudio Sample Spec, Format {pulse_format:?} ({format:?}), Channels {channels}, Rate {rate}")]
InvalidSampleSpec, InvalidSampleSpec {
#[error("Error starting PulseAudioSink, could not connect to PulseAudio server, {0}")] pulse_format: pulse::sample::Format,
format: AudioFormat,
channels: u8,
rate: u32,
},
#[error("<PulseAudioSink> {0}")]
ConnectionRefused(PAErr), ConnectionRefused(PAErr),
#[error("Error stopping PulseAudioSink, failed to drain PulseAudio server buffer, {0}")]
#[error("<PulseAudioSink> Failed to Drain Pulseaudio Buffer, {0}")]
DrainFailure(PAErr), DrainFailure(PAErr),
#[error("Error in PulseAudioSink, Not connected to PulseAudio server")]
#[error("<PulseAudioSink>")]
NotConnected, NotConnected,
#[error("Error writing from PulseAudioSink to PulseAudio server, {0}")]
#[error("<PulseAudioSink> {0}")]
OnWrite(PAErr), OnWrite(PAErr),
} }
impl From<PulseError> for SinkError {
fn from(e: PulseError) -> SinkError {
use PulseError::*;
let es = e.to_string();
match e {
DrainFailure(_) | OnWrite(_) => SinkError::OnWrite(es),
ConnectionRefused(_) => SinkError::ConnectionRefused(es),
NotConnected => SinkError::NotConnected(es),
InvalidSampleSpec { .. } => SinkError::InvalidParams(es),
}
}
}
pub struct PulseAudioSink { pub struct PulseAudioSink {
s: Option<Simple>, s: Option<Simple>,
device: Option<String>, device: Option<String>,
@ -51,11 +72,8 @@ impl Open for PulseAudioSink {
} }
impl Sink for PulseAudioSink { impl Sink for PulseAudioSink {
fn start(&mut self) -> io::Result<()> { fn start(&mut self) -> SinkResult<()> {
if self.s.is_some() { if self.s.is_none() {
return Ok(());
}
// PulseAudio calls S24 and S24_3 different from the rest of the world // PulseAudio calls S24 and S24_3 different from the rest of the world
let pulse_format = match self.format { let pulse_format = match self.format {
AudioFormat::F32 => pulse::sample::Format::FLOAT32NE, AudioFormat::F32 => pulse::sample::Format::FLOAT32NE,
@ -73,13 +91,17 @@ impl Sink for PulseAudioSink {
}; };
if !ss.is_valid() { if !ss.is_valid() {
return Err(io::Error::new( let pulse_error = PulseError::InvalidSampleSpec {
io::ErrorKind::Other, pulse_format,
PulseError::InvalidSampleSpec, format: self.format,
)); channels: NUM_CHANNELS,
rate: SAMPLE_RATE,
};
return Err(SinkError::from(pulse_error));
} }
let result = Simple::new( let s = Simple::new(
None, // Use the default server. None, // Use the default server.
APP_NAME, // Our application's name. APP_NAME, // Our application's name.
Direction::Playback, // Direction. Direction::Playback, // Direction.
@ -88,31 +110,19 @@ impl Sink for PulseAudioSink {
&ss, // Our sample format. &ss, // Our sample format.
None, // Use default channel map. None, // Use default channel map.
None, // Use default buffering attributes. None, // Use default buffering attributes.
); )
.map_err(PulseError::ConnectionRefused)?;
match result {
Ok(s) => {
self.s = Some(s); self.s = Some(s);
} }
Err(e) => {
return Err(io::Error::new(
io::ErrorKind::ConnectionRefused,
PulseError::ConnectionRefused(e),
));
}
}
Ok(()) Ok(())
} }
fn stop(&mut self) -> io::Result<()> { fn stop(&mut self) -> SinkResult<()> {
let s = self let s = self.s.as_mut().ok_or(PulseError::NotConnected)?;
.s
.as_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, PulseError::NotConnected))?;
s.drain() s.drain().map_err(PulseError::DrainFailure)?;
.map_err(|e| io::Error::new(io::ErrorKind::Other, PulseError::DrainFailure(e)))?;
self.s = None; self.s = None;
Ok(()) Ok(())
@ -122,14 +132,10 @@ impl Sink for PulseAudioSink {
} }
impl SinkAsBytes for PulseAudioSink { impl SinkAsBytes for PulseAudioSink {
fn write_bytes(&mut self, data: &[u8]) -> io::Result<()> { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
let s = self let s = self.s.as_mut().ok_or(PulseError::NotConnected)?;
.s
.as_mut()
.ok_or_else(|| io::Error::new(io::ErrorKind::NotConnected, PulseError::NotConnected))?;
s.write(data) s.write(data).map_err(PulseError::OnWrite)?;
.map_err(|e| io::Error::new(io::ErrorKind::Other, PulseError::OnWrite(e)))?;
Ok(()) Ok(())
} }

View file

@ -1,11 +1,11 @@
use std::process::exit; use std::process::exit;
use std::thread;
use std::time::Duration; use std::time::Duration;
use std::{io, thread};
use cpal::traits::{DeviceTrait, HostTrait}; use cpal::traits::{DeviceTrait, HostTrait};
use thiserror::Error; use thiserror::Error;
use super::Sink; use super::{Sink, SinkError, SinkResult};
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
@ -33,16 +33,30 @@ pub fn mk_rodiojack(device: Option<String>, format: AudioFormat) -> Box<dyn Sink
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum RodioError { pub enum RodioError {
#[error("Rodio: no device available")] #[error("<RodioSink> No Device Available")]
NoDeviceAvailable, NoDeviceAvailable,
#[error("Rodio: device \"{0}\" is not available")] #[error("<RodioSink> device \"{0}\" is Not Available")]
DeviceNotAvailable(String), DeviceNotAvailable(String),
#[error("Rodio play error: {0}")] #[error("<RodioSink> Play Error: {0}")]
PlayError(#[from] rodio::PlayError), PlayError(#[from] rodio::PlayError),
#[error("Rodio stream error: {0}")] #[error("<RodioSink> Stream Error: {0}")]
StreamError(#[from] rodio::StreamError), StreamError(#[from] rodio::StreamError),
#[error("Cannot get audio devices: {0}")] #[error("<RodioSink> Cannot Get Audio Devices: {0}")]
DevicesError(#[from] cpal::DevicesError), DevicesError(#[from] cpal::DevicesError),
#[error("<RodioSink> {0}")]
Samples(String),
}
impl From<RodioError> for SinkError {
fn from(e: RodioError) -> SinkError {
use RodioError::*;
let es = e.to_string();
match e {
StreamError(_) | PlayError(_) | Samples(_) => SinkError::OnWrite(es),
NoDeviceAvailable | DeviceNotAvailable(_) => SinkError::ConnectionRefused(es),
DevicesError(_) => SinkError::InvalidParams(es),
}
}
} }
pub struct RodioSink { pub struct RodioSink {
@ -175,10 +189,10 @@ pub fn open(host: cpal::Host, device: Option<String>, format: AudioFormat) -> Ro
} }
impl Sink for RodioSink { impl Sink for RodioSink {
fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> SinkResult<()> {
let samples = packet let samples = packet
.samples() .samples()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| RodioError::Samples(e.to_string()))?;
match self.format { match self.format {
AudioFormat::F32 => { AudioFormat::F32 => {
let samples_f32: &[f32] = &converter.f64_to_f32(samples); let samples_f32: &[f32] = &converter.f64_to_f32(samples);

View file

@ -1,11 +1,11 @@
use super::{Open, Sink}; use super::{Open, Sink, SinkError, SinkResult};
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
use crate::{NUM_CHANNELS, SAMPLE_RATE}; use crate::{NUM_CHANNELS, SAMPLE_RATE};
use sdl2::audio::{AudioQueue, AudioSpecDesired}; use sdl2::audio::{AudioQueue, AudioSpecDesired};
use std::thread;
use std::time::Duration; use std::time::Duration;
use std::{io, thread};
pub enum SdlSink { pub enum SdlSink {
F32(AudioQueue<f32>), F32(AudioQueue<f32>),
@ -52,7 +52,7 @@ impl Open for SdlSink {
} }
impl Sink for SdlSink { impl Sink for SdlSink {
fn start(&mut self) -> io::Result<()> { fn start(&mut self) -> SinkResult<()> {
macro_rules! start_sink { macro_rules! start_sink {
($queue: expr) => {{ ($queue: expr) => {{
$queue.clear(); $queue.clear();
@ -67,7 +67,7 @@ impl Sink for SdlSink {
Ok(()) Ok(())
} }
fn stop(&mut self) -> io::Result<()> { fn stop(&mut self) -> SinkResult<()> {
macro_rules! stop_sink { macro_rules! stop_sink {
($queue: expr) => {{ ($queue: expr) => {{
$queue.pause(); $queue.pause();
@ -82,7 +82,7 @@ impl Sink for SdlSink {
Ok(()) Ok(())
} }
fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket, converter: &mut Converter) -> SinkResult<()> {
macro_rules! drain_sink { macro_rules! drain_sink {
($queue: expr, $size: expr) => {{ ($queue: expr, $size: expr) => {{
// sleep and wait for sdl thread to drain the queue a bit // sleep and wait for sdl thread to drain the queue a bit
@ -94,7 +94,7 @@ impl Sink for SdlSink {
let samples = packet let samples = packet
.samples() .samples()
.map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?; .map_err(|e| SinkError::OnWrite(e.to_string()))?;
match self { match self {
Self::F32(queue) => { Self::F32(queue) => {
let samples_f32: &[f32] = &converter.f64_to_f32(samples); let samples_f32: &[f32] = &converter.f64_to_f32(samples);

View file

@ -1,10 +1,10 @@
use super::{Open, Sink, SinkAsBytes}; use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult};
use crate::config::AudioFormat; use crate::config::AudioFormat;
use crate::convert::Converter; use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
use shell_words::split; use shell_words::split;
use std::io::{self, Write}; use std::io::Write;
use std::process::{Child, Command, Stdio}; use std::process::{Child, Command, Stdio};
pub struct SubprocessSink { pub struct SubprocessSink {
@ -30,21 +30,25 @@ impl Open for SubprocessSink {
} }
impl Sink for SubprocessSink { impl Sink for SubprocessSink {
fn start(&mut self) -> io::Result<()> { fn start(&mut self) -> SinkResult<()> {
let args = split(&self.shell_command).unwrap(); let args = split(&self.shell_command).unwrap();
self.child = Some( let child = Command::new(&args[0])
Command::new(&args[0])
.args(&args[1..]) .args(&args[1..])
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.spawn()?, .spawn()
); .map_err(|e| SinkError::ConnectionRefused(e.to_string()))?;
self.child = Some(child);
Ok(()) Ok(())
} }
fn stop(&mut self) -> io::Result<()> { fn stop(&mut self) -> SinkResult<()> {
if let Some(child) = &mut self.child.take() { if let Some(child) = &mut self.child.take() {
child.kill()?; child
child.wait()?; .kill()
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
child
.wait()
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
} }
Ok(()) Ok(())
} }
@ -53,11 +57,18 @@ impl Sink for SubprocessSink {
} }
impl SinkAsBytes for SubprocessSink { impl SinkAsBytes for SubprocessSink {
fn write_bytes(&mut self, data: &[u8]) -> io::Result<()> { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
if let Some(child) = &mut self.child { if let Some(child) = &mut self.child {
let child_stdin = child.stdin.as_mut().unwrap(); let child_stdin = child
child_stdin.write_all(data)?; .stdin
child_stdin.flush()?; .as_mut()
.ok_or_else(|| SinkError::NotConnected("Child is None".to_string()))?;
child_stdin
.write_all(data)
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
child_stdin
.flush()
.map_err(|e| SinkError::OnWrite(e.to_string()))?;
} }
Ok(()) Ok(())
} }