Merge pull request #601 from Johannesd3/tokio_migration

[Tokio migration] Make RodioSink Send and other improvements
This commit is contained in:
Ash 2021-02-20 10:41:45 +01:00 committed by GitHub
commit afacaea15f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 101 deletions

1
Cargo.lock generated
View file

@ -1375,6 +1375,7 @@ dependencies = [
"rodio", "rodio",
"sdl2", "sdl2",
"shell-words", "shell-words",
"thiserror",
"zerocopy", "zerocopy",
] ]

View file

@ -32,7 +32,7 @@ pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport>
.map_err(|e| { .map_err(|e| {
io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid port: {}", e)) io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid port: {}", e))
})?; })?;
let host = split let host = split
.next() .next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?; .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?;

View file

@ -29,19 +29,22 @@ libpulse-binding = { version = "2.13", optional = true, default-features
libpulse-simple-binding = { version = "2.13", optional = true, default-features = false } libpulse-simple-binding = { version = "2.13", optional = true, default-features = false }
jack = { version = "0.6", optional = true } jack = { version = "0.6", optional = true }
libc = { version = "0.2", optional = true } libc = { version = "0.2", optional = true }
rodio = { version = "0.13", optional = true, default-features = false }
cpal = { version = "0.13", optional = true }
sdl2 = { version = "0.34", optional = true } sdl2 = { version = "0.34", optional = true }
gstreamer = { version = "0.16", optional = true } gstreamer = { version = "0.16", optional = true }
gstreamer-app = { version = "0.16", optional = true } gstreamer-app = { version = "0.16", optional = true }
glib = { version = "0.10", optional = true } glib = { version = "0.10", optional = true }
zerocopy = { version = "0.3", optional = true } zerocopy = { version = "0.3", optional = true }
# Rodio dependencies
rodio = { version = "0.13", optional = true, default-features = false }
cpal = { version = "0.13", optional = true }
thiserror = { version = "1", optional = true }
[features] [features]
alsa-backend = ["alsa"] alsa-backend = ["alsa"]
portaudio-backend = ["portaudio-rs"] portaudio-backend = ["portaudio-rs"]
pulseaudio-backend = ["libpulse-binding", "libpulse-simple-binding"] pulseaudio-backend = ["libpulse-binding", "libpulse-simple-binding"]
jackaudio-backend = ["jack"] jackaudio-backend = ["jack"]
rodio-backend = ["rodio", "cpal"] rodio-backend = ["rodio", "cpal", "thiserror"]
sdl-backend = ["sdl2"] sdl-backend = ["sdl2"]
gstreamer-backend = ["gstreamer", "gstreamer-app", "glib", "zerocopy"] gstreamer-backend = ["gstreamer", "gstreamer-app", "glib", "zerocopy"]

View file

@ -1,109 +1,165 @@
use super::{Open, Sink};
extern crate cpal;
extern crate rodio;
use cpal::traits::{DeviceTrait, HostTrait};
use std::process::exit; use std::process::exit;
use std::{convert::Infallible, sync::mpsc};
use std::{io, thread, time}; use std::{io, thread, time};
use cpal::traits::{DeviceTrait, HostTrait};
use thiserror::Error;
use super::{Open, Sink};
#[derive(Debug, Error)]
pub enum RodioError {
#[error("Rodio: no device available")]
NoDeviceAvailable,
#[error("Rodio: device \"{0}\" is not available")]
DeviceNotAvailable(String),
#[error("Rodio play error: {0}")]
PlayError(#[from] rodio::PlayError),
#[error("Rodio stream error: {0}")]
StreamError(#[from] rodio::StreamError),
#[error("Cannot get audio devices: {0}")]
DevicesError(#[from] cpal::DevicesError),
}
pub struct RodioSink { pub struct RodioSink {
rodio_sink: rodio::Sink, rodio_sink: rodio::Sink,
// We have to keep hold of this object, or the Sink can't play...
#[allow(dead_code)] // will produce a TryRecvError on the receiver side when it is dropped.
stream: rodio::OutputStream, _close_tx: mpsc::SyncSender<Infallible>,
} }
fn list_formats(ref device: &rodio::Device) { fn list_formats(device: &rodio::Device) {
let default_fmt = match device.default_output_config() { match device.default_output_config() {
Ok(fmt) => cpal::SupportedStreamConfig::from(fmt), Ok(cfg) => {
Err(e) => { debug!(" Default config:");
warn!("Error getting default rodio::Sink config: {}", e); debug!(" {:?}", cfg);
return;
} }
};
debug!(" Default config:");
debug!(" {:?}", default_fmt);
let mut output_configs = match device.supported_output_configs() {
Ok(f) => f.peekable(),
Err(e) => { Err(e) => {
warn!("Error getting supported rodio::Sink configs: {}", e); // Use loglevel debug, since even the output is only debug
return; debug!("Error getting default rodio::Sink config: {}", e);
} }
}; };
if output_configs.peek().is_some() { match device.supported_output_configs() {
debug!(" Available configs:"); Ok(mut cfgs) => {
for format in output_configs { if let Some(first) = cfgs.next() {
debug!(" {:?}", format); debug!(" Available configs:");
} debug!(" {:?}", first);
} } else {
} return;
fn list_outputs() {
let default_device = get_default_device();
let default_device_name = default_device.name().expect("cannot get output name");
println!("Default Audio Device:\n {}", default_device_name);
list_formats(&default_device);
println!("Other Available Audio Devices:");
for device in cpal::default_host()
.output_devices()
.expect("cannot get list of output devices")
{
let device_name = device.name().expect("cannot get output name");
if device_name != default_device_name {
println!(" {}", device_name);
list_formats(&device);
}
}
}
fn get_default_device() -> rodio::Device {
cpal::default_host()
.default_output_device()
.expect("no default output device available")
}
fn match_device(device: Option<String>) -> rodio::Device {
match device {
Some(device_name) => {
if device_name == "?".to_string() {
list_outputs();
exit(0)
} }
for d in cpal::default_host()
.output_devices() for cfg in cfgs {
.expect("cannot get list of output devices") debug!(" {:?}", cfg);
{ }
if d.name().expect("cannot get output name") == device_name { }
return d; Err(e) => {
debug!("Error getting supported rodio::Sink configs: {}", e);
}
}
}
fn list_outputs() -> Result<(), cpal::DevicesError> {
let mut default_device_name = None;
if let Some(default_device) = get_default_device() {
default_device_name = default_device.name().ok();
println!(
"Default Audio Device:\n {}",
default_device_name.as_deref().unwrap_or("[unknown name]")
);
list_formats(&default_device);
println!("Other Available Audio Devices:");
} else {
warn!("No default device was found");
}
for device in cpal::default_host().output_devices()? {
match device.name() {
Ok(name) if Some(&name) == default_device_name.as_ref() => (),
Ok(name) => {
println!(" {}", name);
list_formats(&device);
}
Err(e) => {
warn!("Cannot get device name: {}", e);
println!(" [unknown name]");
list_formats(&device);
}
}
}
Ok(())
}
fn get_default_device() -> Option<rodio::Device> {
cpal::default_host().default_output_device()
}
fn create_sink(device: Option<String>) -> Result<(rodio::Sink, rodio::OutputStream), RodioError> {
let rodio_device = match device {
Some(ask) if &ask == "?" => {
let exit_code = match list_outputs() {
Ok(()) => 0,
Err(e) => {
error!("{}", e);
1
} }
} };
println!("No output sink matching '{}' found.", device_name); exit(exit_code)
exit(0)
} }
None => return get_default_device(), Some(device_name) => {
} cpal::default_host()
.output_devices()?
.find(|d| d.name().ok().map_or(false, |name| name == device_name)) // Ignore devices for which getting name fails
.ok_or(RodioError::DeviceNotAvailable(device_name))?
}
None => get_default_device().ok_or(RodioError::NoDeviceAvailable)?,
};
let name = rodio_device.name().ok();
info!(
"Using audio device: {}",
name.as_deref().unwrap_or("[unknown name]")
);
let (stream, handle) = rodio::OutputStream::try_from_device(&rodio_device)?;
let sink = rodio::Sink::try_new(&handle)?;
Ok((sink, stream))
} }
impl Open for RodioSink { impl Open for RodioSink {
fn open(device: Option<String>) -> RodioSink { fn open(device: Option<String>) -> RodioSink {
debug!( debug!(
"Using rodio sink with cpal host: {:?}", "Using rodio sink with cpal host: {:?}",
cpal::default_host().id() cpal::default_host().id().name()
); );
let rodio_device = match_device(device); let (sink_tx, sink_rx) = mpsc::sync_channel(1);
debug!("Using cpal device"); let (close_tx, close_rx) = mpsc::sync_channel(1);
let stream = rodio::OutputStream::try_from_device(&rodio_device)
.expect("Couldn't open output stream.");
debug!("Using rodio stream");
let sink = rodio::Sink::try_new(&stream.1).expect("Couldn't create output sink.");
debug!("Using rodio sink");
std::thread::spawn(move || match create_sink(device) {
Ok((sink, stream)) => {
sink_tx.send(Ok(sink)).unwrap();
close_rx.recv().unwrap_err(); // This will fail as soon as the sender is dropped
debug!("drop rodio::OutputStream");
drop(stream);
}
Err(e) => {
sink_tx.send(Err(e)).unwrap();
}
});
// Instead of the second `unwrap`, better error handling could be introduced
let sink = sink_rx.recv().unwrap().unwrap();
debug!("Rodio sink was created");
RodioSink { RodioSink {
rodio_sink: sink, rodio_sink: sink,
stream: stream.0, _close_tx: close_tx,
} }
} }
} }

View file

@ -327,15 +327,15 @@ impl Player {
} }
pub async fn get_end_of_track_future(&self) { pub async fn get_end_of_track_future(&self) {
self.get_player_event_channel() let mut channel = self.get_player_event_channel();
.filter(|event| { while let Some(event) = channel.next().await {
future::ready(matches!( if matches!(
event, event,
PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. }
)) ) {
}) return;
.for_each(|_| future::ready(())) }
.await }
} }
pub fn set_sink_event_callback(&self, callback: Option<SinkEventCallback>) { pub fn set_sink_event_callback(&self, callback: Option<SinkEventCallback>) {
@ -676,14 +676,6 @@ impl PlayerTrackLoader {
let bytes_per_second = self.stream_data_rate(format); let bytes_per_second = self.stream_data_rate(format);
let play_from_beginning = position_ms == 0; let play_from_beginning = position_ms == 0;
let key = match self.session.audio_key().request(spotify_id, file_id).await {
Ok(key) => key,
Err(_) => {
error!("Unable to load decryption key");
return None;
}
};
// This is only a loop to be able to reload the file if an error occured // This is only a loop to be able to reload the file if an error occured
// while opening a cached file. // while opening a cached file.
loop { loop {
@ -713,6 +705,14 @@ impl PlayerTrackLoader {
stream_loader_controller.set_random_access_mode(); stream_loader_controller.set_random_access_mode();
} }
let key = match self.session.audio_key().request(spotify_id, file_id).await {
Ok(key) => key,
Err(_) => {
error!("Unable to load decryption key");
return None;
}
};
let mut decrypted_file = AudioDecrypt::new(key, encrypted_file); let mut decrypted_file = AudioDecrypt::new(key, encrypted_file);
let normalisation_factor = match NormalisationData::parse_from_file(&mut decrypted_file) let normalisation_factor = match NormalisationData::parse_from_file(&mut decrypted_file)