From 0d4367fca23971dc8d790eae52e8f41044278bf4 Mon Sep 17 00:00:00 2001 From: JasonLG1979 Date: Mon, 20 Jun 2022 15:55:06 -0500 Subject: [PATCH] Improve the subprocess backend Better error handling. Move the checking of the shell command to start so a proper error can be thrown if it's None. Use write instead of write_all for finer grained error handling and the ability to attempt a restart on write errors. Use try_wait to skip flushing and killing the process if it's already dead. Stop the player on shutdown to *mostly* prevent write errors from spamming the logs during shutdown. Previously Ctrl+c always resulted in a write error. --- connect/src/spirc.rs | 1 + playback/src/audio_backend/subprocess.rs | 197 ++++++++++++++++++----- 2 files changed, 158 insertions(+), 40 deletions(-) diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index b574ff52..698ca46a 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -478,6 +478,7 @@ impl SpircTask { } SpircCommand::Shutdown => { CommandSender::new(self, MessageType::kMessageTypeGoodbye).send(); + self.player.stop(); self.shutdown = true; if let Some(rx) = self.commands.as_mut() { rx.close() diff --git a/playback/src/audio_backend/subprocess.rs b/playback/src/audio_backend/subprocess.rs index 63fc5d88..6ce545da 100644 --- a/playback/src/audio_backend/subprocess.rs +++ b/playback/src/audio_backend/subprocess.rs @@ -4,30 +4,75 @@ use crate::convert::Converter; use crate::decoder::AudioPacket; use shell_words::split; -use std::io::Write; +use std::io::{ErrorKind, Write}; use std::process::{exit, Child, Command, Stdio}; +use thiserror::Error; + +#[derive(Debug, Error)] +enum SubprocessError { + #[error(" {0}")] + OnWrite(std::io::Error), + + #[error(" Command {command} Can Not be Executed, {e}")] + SpawnFailure { command: String, e: std::io::Error }, + + #[error(" Failed to Parse Command args for {command}, {e}")] + InvalidArgs { + command: String, + e: shell_words::ParseError, + }, + + #[error(" Failed to Flush the Subprocess, {0}")] + FlushFailure(std::io::Error), + + #[error(" Failed to Kill the Subprocess, {0}")] + KillFailure(std::io::Error), + + #[error(" Failed to Wait for the Subprocess to Exit, {0}")] + WaitFailure(std::io::Error), + + #[error(" The Subprocess is no longer able to accept Bytes")] + WriteZero, + + #[error(" Missing Required Shell Command")] + MissingCommand, + + #[error(" The Subprocess is None")] + NoChild, + + #[error(" The Subprocess's stdin is None")] + NoStdin, +} + +impl From for SinkError { + fn from(e: SubprocessError) -> SinkError { + use SubprocessError::*; + let es = e.to_string(); + match e { + FlushFailure(_) | KillFailure(_) | WaitFailure(_) | OnWrite(_) | WriteZero => { + SinkError::OnWrite(es) + } + SpawnFailure { .. } => SinkError::ConnectionRefused(es), + MissingCommand | InvalidArgs { .. } => SinkError::InvalidParams(es), + NoChild | NoStdin => SinkError::NotConnected(es), + } + } +} pub struct SubprocessSink { - shell_command: String, + shell_command: Option, child: Option, format: AudioFormat, } impl Open for SubprocessSink { fn open(shell_command: Option, format: AudioFormat) -> Self { - let shell_command = match shell_command.as_deref() { - Some("?") => { - info!("Usage: --backend subprocess --device {{shell_command}}"); - exit(0); - } - Some(cmd) => cmd.to_owned(), - None => { - error!("subprocess sink requires specifying a shell command"); - exit(1); - } - }; + if let Some("?") = shell_command.as_deref() { + println!("\nUsage:\n\nOutput to a Subprocess:\n\n\t--backend subprocess --device {{shell_command}}\n"); + exit(0); + } - info!("Using subprocess sink with format: {:?}", format); + info!("Using SubprocessSink with format: {:?}", format); Self { shell_command, @@ -39,26 +84,53 @@ impl Open for SubprocessSink { impl Sink for SubprocessSink { fn start(&mut self) -> SinkResult<()> { - let args = split(&self.shell_command).unwrap(); - let child = Command::new(&args[0]) - .args(&args[1..]) - .stdin(Stdio::piped()) - .spawn() - .map_err(|e| SinkError::ConnectionRefused(e.to_string()))?; - self.child = Some(child); + self.child.get_or_insert({ + match self.shell_command.as_deref() { + Some(command) => { + let args = split(command).map_err(|e| SubprocessError::InvalidArgs { + command: command.to_string(), + e, + })?; + + Command::new(&args[0]) + .args(&args[1..]) + .stdin(Stdio::piped()) + .spawn() + .map_err(|e| SubprocessError::SpawnFailure { + command: command.to_string(), + e, + })? + } + None => return Err(SubprocessError::MissingCommand.into()), + } + }); + Ok(()) } fn stop(&mut self) -> SinkResult<()> { - if let Some(child) = &mut self.child.take() { - child - .kill() - .map_err(|e| SinkError::OnWrite(e.to_string()))?; - child - .wait() - .map_err(|e| SinkError::OnWrite(e.to_string()))?; + let child = &mut self.child.take().ok_or(SubprocessError::NoChild)?; + + match child.try_wait() { + // The process has already exited + // nothing to do. + Ok(Some(_)) => Ok(()), + Ok(_) => { + // The process Must DIE!!! + child + .stdin + .take() + .ok_or(SubprocessError::NoStdin)? + .flush() + .map_err(SubprocessError::FlushFailure)?; + + child.kill().map_err(SubprocessError::KillFailure)?; + child.wait().map_err(SubprocessError::WaitFailure)?; + + Ok(()) + } + Err(e) => Err(SubprocessError::WaitFailure(e).into()), } - Ok(()) } sink_as_bytes!(); @@ -66,22 +138,67 @@ impl Sink for SubprocessSink { impl SinkAsBytes for SubprocessSink { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { - if let Some(child) = &mut self.child { - let child_stdin = child + // We get one attempted restart per write. + // We don't want to get stuck in a restart loop. + let mut restarted = false; + let mut start_index = 0; + let data_len = data.len(); + let mut end_index = data_len; + + loop { + match self + .child + .as_ref() + .ok_or(SubprocessError::NoChild)? .stdin - .as_mut() - .ok_or_else(|| SinkError::NotConnected("Child is None".to_string()))?; - child_stdin - .write_all(data) - .map_err(|e| SinkError::OnWrite(e.to_string()))?; - child_stdin - .flush() - .map_err(|e| SinkError::OnWrite(e.to_string()))?; + .as_ref() + .ok_or(SubprocessError::NoStdin)? + .write(&data[start_index..end_index]) + { + Ok(0) => { + // Potentially fatal. + // As per the docs a return value of 0 + // means we shouldn't try to write to the + // process anymore so let's try a restart + // if we haven't already. + self.try_restart(SubprocessError::WriteZero, &mut restarted)?; + + continue; + } + Ok(bytes_written) => { + // What we want, a successful write. + start_index = data_len.min(start_index + bytes_written); + end_index = data_len.min(start_index + bytes_written); + + if end_index == data_len { + break Ok(()); + } + } + // Non-fatal, retry the write. + Err(ref e) if e.kind() == ErrorKind::Interrupted => continue, + Err(e) => { + // Very possibly fatal, + // but let's try a restart anyway if we haven't already. + self.try_restart(SubprocessError::OnWrite(e), &mut restarted)?; + + continue; + } + } } - Ok(()) } } impl SubprocessSink { pub const NAME: &'static str = "subprocess"; + + fn try_restart(&mut self, e: SubprocessError, restarted: &mut bool) -> SinkResult<()> { + // If the restart fails throw the original error back. + if !*restarted && self.stop().is_ok() && self.start().is_ok() { + *restarted = true; + + Ok(()) + } else { + Err(e.into()) + } + } }