player: ensure load threads are done when dropping PlayerInternal

Fix a race where the load operation was trying to use a disposed tokio
context, resulting in panic.
This commit is contained in:
Guillaume Desmottes 2021-12-28 00:04:09 +01:00
parent afa2a021db
commit 3ce9854df5
3 changed files with 30 additions and 2 deletions

1
Cargo.lock generated
View file

@ -1406,6 +1406,7 @@ dependencies = [
"librespot-metadata", "librespot-metadata",
"log", "log",
"ogg", "ogg",
"parking_lot",
"portaudio-rs", "portaudio-rs",
"rand", "rand",
"rand_distr", "rand_distr",

View file

@ -26,6 +26,7 @@ shell-words = "1.0.0"
thiserror = "1.0" thiserror = "1.0"
tokio = { version = "1", features = ["parking_lot", "rt", "rt-multi-thread", "sync"] } tokio = { version = "1", features = ["parking_lot", "rt", "rt-multi-thread", "sync"] }
zerocopy = { version = "0.3" } zerocopy = { version = "0.3" }
parking_lot = { version = "0.11", features = ["deadlock_detection"] }
# Backends # Backends
alsa = { version = "0.5", optional = true } alsa = { version = "0.5", optional = true }

View file

@ -1,11 +1,13 @@
use std::{ use std::{
cmp::max, cmp::max,
collections::HashMap,
fmt, fmt,
future::Future, future::Future,
io::{self, Read, Seek, SeekFrom}, io::{self, Read, Seek, SeekFrom},
mem, mem,
pin::Pin, pin::Pin,
process::exit, process::exit,
sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
thread, thread,
time::{Duration, Instant}, time::{Duration, Instant},
@ -13,6 +15,7 @@ use std::{
use byteorder::{LittleEndian, ReadBytesExt}; use byteorder::{LittleEndian, ReadBytesExt};
use futures_util::{future, stream::futures_unordered::FuturesUnordered, StreamExt, TryFutureExt}; use futures_util::{future, stream::futures_unordered::FuturesUnordered, StreamExt, TryFutureExt};
use parking_lot::Mutex;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::{ use crate::{
@ -56,6 +59,7 @@ struct PlayerInternal {
session: Session, session: Session,
config: PlayerConfig, config: PlayerConfig,
commands: mpsc::UnboundedReceiver<PlayerCommand>, commands: mpsc::UnboundedReceiver<PlayerCommand>,
load_handles: Arc<Mutex<HashMap<thread::ThreadId, thread::JoinHandle<()>>>>,
state: PlayerState, state: PlayerState,
preload: PlayerPreload, preload: PlayerPreload,
@ -344,6 +348,7 @@ impl Player {
session, session,
config, config,
commands: cmd_rx, commands: cmd_rx,
load_handles: Arc::new(Mutex::new(HashMap::new())),
state: PlayerState::Stopped, state: PlayerState::Stopped,
preload: PlayerPreload::None, preload: PlayerPreload::None,
@ -1953,7 +1958,7 @@ impl PlayerInternal {
} }
fn load_track( fn load_track(
&self, &mut self,
spotify_id: SpotifyId, spotify_id: SpotifyId,
position_ms: u32, position_ms: u32,
) -> impl Future<Output = Result<PlayerLoadedTrackData, ()>> + Send + 'static { ) -> impl Future<Output = Result<PlayerLoadedTrackData, ()>> + Send + 'static {
@ -1970,14 +1975,21 @@ impl PlayerInternal {
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
let load_handles_clone = self.load_handles.clone();
let handle = tokio::runtime::Handle::current(); let handle = tokio::runtime::Handle::current();
thread::spawn(move || { let load_handle = thread::spawn(move || {
let data = handle.block_on(loader.load_track(spotify_id, position_ms)); let data = handle.block_on(loader.load_track(spotify_id, position_ms));
if let Some(data) = data { if let Some(data) = data {
let _ = result_tx.send(data); let _ = result_tx.send(data);
} }
let mut load_handles = load_handles_clone.lock();
load_handles.remove(&thread::current().id());
}); });
let mut load_handles = self.load_handles.lock();
load_handles.insert(load_handle.thread().id(), load_handle);
result_rx.map_err(|_| ()) result_rx.map_err(|_| ())
} }
@ -2016,6 +2028,20 @@ impl PlayerInternal {
impl Drop for PlayerInternal { impl Drop for PlayerInternal {
fn drop(&mut self) { fn drop(&mut self) {
debug!("drop PlayerInternal[{}]", self.session.session_id()); debug!("drop PlayerInternal[{}]", self.session.session_id());
let handles: Vec<thread::JoinHandle<()>> = {
// waiting for the thread while holding the mutex would result in a deadlock
let mut load_handles = self.load_handles.lock();
load_handles
.drain()
.map(|(_thread_id, handle)| handle)
.collect()
};
for handle in handles {
let _ = handle.join();
}
} }
} }