Merge pull request #1014 from JasonLG1979/improve-subprocess-backend

Improve the subprocess backend
This commit is contained in:
Roderick van Domburg 2022-07-18 22:21:12 +02:00 committed by GitHub
commit 9385dc840f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 158 additions and 40 deletions

View file

@ -478,6 +478,7 @@ impl SpircTask {
} }
SpircCommand::Shutdown => { SpircCommand::Shutdown => {
CommandSender::new(self, MessageType::kMessageTypeGoodbye).send(); CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
self.player.stop();
self.shutdown = true; self.shutdown = true;
if let Some(rx) = self.commands.as_mut() { if let Some(rx) = self.commands.as_mut() {
rx.close() rx.close()

View file

@ -4,30 +4,75 @@ use crate::convert::Converter;
use crate::decoder::AudioPacket; use crate::decoder::AudioPacket;
use shell_words::split; use shell_words::split;
use std::io::Write; use std::io::{ErrorKind, Write};
use std::process::{exit, Child, Command, Stdio}; use std::process::{exit, Child, Command, Stdio};
use thiserror::Error;
#[derive(Debug, Error)]
enum SubprocessError {
#[error("<SubprocessSink> {0}")]
OnWrite(std::io::Error),
#[error("<SubprocessSink> Command {command} Can Not be Executed, {e}")]
SpawnFailure { command: String, e: std::io::Error },
#[error("<SubprocessSink> Failed to Parse Command args for {command}, {e}")]
InvalidArgs {
command: String,
e: shell_words::ParseError,
},
#[error("<SubprocessSink> Failed to Flush the Subprocess, {0}")]
FlushFailure(std::io::Error),
#[error("<SubprocessSink> Failed to Kill the Subprocess, {0}")]
KillFailure(std::io::Error),
#[error("<SubprocessSink> Failed to Wait for the Subprocess to Exit, {0}")]
WaitFailure(std::io::Error),
#[error("<SubprocessSink> The Subprocess is no longer able to accept Bytes")]
WriteZero,
#[error("<SubprocessSink> Missing Required Shell Command")]
MissingCommand,
#[error("<SubprocessSink> The Subprocess is None")]
NoChild,
#[error("<SubprocessSink> The Subprocess's stdin is None")]
NoStdin,
}
impl From<SubprocessError> for SinkError {
fn from(e: SubprocessError) -> SinkError {
use SubprocessError::*;
let es = e.to_string();
match e {
FlushFailure(_) | KillFailure(_) | WaitFailure(_) | OnWrite(_) | WriteZero => {
SinkError::OnWrite(es)
}
SpawnFailure { .. } => SinkError::ConnectionRefused(es),
MissingCommand | InvalidArgs { .. } => SinkError::InvalidParams(es),
NoChild | NoStdin => SinkError::NotConnected(es),
}
}
}
pub struct SubprocessSink { pub struct SubprocessSink {
shell_command: String, shell_command: Option<String>,
child: Option<Child>, child: Option<Child>,
format: AudioFormat, format: AudioFormat,
} }
impl Open for SubprocessSink { impl Open for SubprocessSink {
fn open(shell_command: Option<String>, format: AudioFormat) -> Self { fn open(shell_command: Option<String>, format: AudioFormat) -> Self {
let shell_command = match shell_command.as_deref() { if let Some("?") = shell_command.as_deref() {
Some("?") => { println!("\nUsage:\n\nOutput to a Subprocess:\n\n\t--backend subprocess --device {{shell_command}}\n");
info!("Usage: --backend subprocess --device {{shell_command}}"); exit(0);
exit(0); }
}
Some(cmd) => cmd.to_owned(),
None => {
error!("subprocess sink requires specifying a shell command");
exit(1);
}
};
info!("Using subprocess sink with format: {:?}", format); info!("Using SubprocessSink with format: {:?}", format);
Self { Self {
shell_command, shell_command,
@ -39,26 +84,53 @@ impl Open for SubprocessSink {
impl Sink for SubprocessSink { impl Sink for SubprocessSink {
fn start(&mut self) -> SinkResult<()> { fn start(&mut self) -> SinkResult<()> {
let args = split(&self.shell_command).unwrap(); self.child.get_or_insert({
let child = Command::new(&args[0]) match self.shell_command.as_deref() {
.args(&args[1..]) Some(command) => {
.stdin(Stdio::piped()) let args = split(command).map_err(|e| SubprocessError::InvalidArgs {
.spawn() command: command.to_string(),
.map_err(|e| SinkError::ConnectionRefused(e.to_string()))?; e,
self.child = Some(child); })?;
Command::new(&args[0])
.args(&args[1..])
.stdin(Stdio::piped())
.spawn()
.map_err(|e| SubprocessError::SpawnFailure {
command: command.to_string(),
e,
})?
}
None => return Err(SubprocessError::MissingCommand.into()),
}
});
Ok(()) Ok(())
} }
fn stop(&mut self) -> SinkResult<()> { fn stop(&mut self) -> SinkResult<()> {
if let Some(child) = &mut self.child.take() { let child = &mut self.child.take().ok_or(SubprocessError::NoChild)?;
child
.kill() match child.try_wait() {
.map_err(|e| SinkError::OnWrite(e.to_string()))?; // The process has already exited
child // nothing to do.
.wait() Ok(Some(_)) => Ok(()),
.map_err(|e| SinkError::OnWrite(e.to_string()))?; Ok(_) => {
// The process Must DIE!!!
child
.stdin
.take()
.ok_or(SubprocessError::NoStdin)?
.flush()
.map_err(SubprocessError::FlushFailure)?;
child.kill().map_err(SubprocessError::KillFailure)?;
child.wait().map_err(SubprocessError::WaitFailure)?;
Ok(())
}
Err(e) => Err(SubprocessError::WaitFailure(e).into()),
} }
Ok(())
} }
sink_as_bytes!(); sink_as_bytes!();
@ -66,22 +138,67 @@ impl Sink for SubprocessSink {
impl SinkAsBytes for SubprocessSink { impl SinkAsBytes for SubprocessSink {
fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> {
if let Some(child) = &mut self.child { // We get one attempted restart per write.
let child_stdin = child // We don't want to get stuck in a restart loop.
let mut restarted = false;
let mut start_index = 0;
let data_len = data.len();
let mut end_index = data_len;
loop {
match self
.child
.as_ref()
.ok_or(SubprocessError::NoChild)?
.stdin .stdin
.as_mut() .as_ref()
.ok_or_else(|| SinkError::NotConnected("Child is None".to_string()))?; .ok_or(SubprocessError::NoStdin)?
child_stdin .write(&data[start_index..end_index])
.write_all(data) {
.map_err(|e| SinkError::OnWrite(e.to_string()))?; Ok(0) => {
child_stdin // Potentially fatal.
.flush() // As per the docs a return value of 0
.map_err(|e| SinkError::OnWrite(e.to_string()))?; // means we shouldn't try to write to the
// process anymore so let's try a restart
// if we haven't already.
self.try_restart(SubprocessError::WriteZero, &mut restarted)?;
continue;
}
Ok(bytes_written) => {
// What we want, a successful write.
start_index = data_len.min(start_index + bytes_written);
end_index = data_len.min(start_index + bytes_written);
if end_index == data_len {
break Ok(());
}
}
// Non-fatal, retry the write.
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(e) => {
// Very possibly fatal,
// but let's try a restart anyway if we haven't already.
self.try_restart(SubprocessError::OnWrite(e), &mut restarted)?;
continue;
}
}
} }
Ok(())
} }
} }
impl SubprocessSink { impl SubprocessSink {
pub const NAME: &'static str = "subprocess"; pub const NAME: &'static str = "subprocess";
fn try_restart(&mut self, e: SubprocessError, restarted: &mut bool) -> SinkResult<()> {
// If the restart fails throw the original error back.
if !*restarted && self.stop().is_ok() && self.start().is_ok() {
*restarted = true;
Ok(())
} else {
Err(e.into())
}
}
} }