From 3ce9854df5f16b6229bb3213340c90dbe2ff2963 Mon Sep 17 00:00:00 2001 From: Guillaume Desmottes Date: Tue, 28 Dec 2021 00:04:09 +0100 Subject: [PATCH] player: ensure load threads are done when dropping PlayerInternal Fix a race where the load operation was trying to use a disposed tokio context, resulting in panic. --- Cargo.lock | 1 + playback/Cargo.toml | 1 + playback/src/player.rs | 30 ++++++++++++++++++++++++++++-- 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81f083ff..1e5bc36f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1406,6 +1406,7 @@ dependencies = [ "librespot-metadata", "log", "ogg", + "parking_lot", "portaudio-rs", "rand", "rand_distr", diff --git a/playback/Cargo.toml b/playback/Cargo.toml index fee4dd51..92452d3c 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -26,6 +26,7 @@ shell-words = "1.0.0" thiserror = "1.0" tokio = { version = "1", features = ["parking_lot", "rt", "rt-multi-thread", "sync"] } zerocopy = { version = "0.3" } +parking_lot = { version = "0.11", features = ["deadlock_detection"] } # Backends alsa = { version = "0.5", optional = true } diff --git a/playback/src/player.rs b/playback/src/player.rs index 747c4967..f7788fda 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -1,11 +1,13 @@ use std::{ cmp::max, + collections::HashMap, fmt, future::Future, io::{self, Read, Seek, SeekFrom}, mem, pin::Pin, process::exit, + sync::Arc, task::{Context, Poll}, thread, time::{Duration, Instant}, @@ -13,6 +15,7 @@ use std::{ use byteorder::{LittleEndian, ReadBytesExt}; use futures_util::{future, stream::futures_unordered::FuturesUnordered, StreamExt, TryFutureExt}; +use parking_lot::Mutex; use tokio::sync::{mpsc, oneshot}; use crate::{ @@ -56,6 +59,7 @@ struct PlayerInternal { session: Session, config: PlayerConfig, commands: mpsc::UnboundedReceiver, + load_handles: Arc>>>, state: PlayerState, preload: PlayerPreload, @@ -344,6 +348,7 @@ impl Player { session, config, commands: cmd_rx, + load_handles: Arc::new(Mutex::new(HashMap::new())), state: PlayerState::Stopped, preload: PlayerPreload::None, @@ -1953,7 +1958,7 @@ impl PlayerInternal { } fn load_track( - &self, + &mut self, spotify_id: SpotifyId, position_ms: u32, ) -> impl Future> + Send + 'static { @@ -1970,14 +1975,21 @@ impl PlayerInternal { let (result_tx, result_rx) = oneshot::channel(); + let load_handles_clone = self.load_handles.clone(); let handle = tokio::runtime::Handle::current(); - thread::spawn(move || { + let load_handle = thread::spawn(move || { let data = handle.block_on(loader.load_track(spotify_id, position_ms)); if let Some(data) = data { let _ = result_tx.send(data); } + + let mut load_handles = load_handles_clone.lock(); + load_handles.remove(&thread::current().id()); }); + let mut load_handles = self.load_handles.lock(); + load_handles.insert(load_handle.thread().id(), load_handle); + result_rx.map_err(|_| ()) } @@ -2016,6 +2028,20 @@ impl PlayerInternal { impl Drop for PlayerInternal { fn drop(&mut self) { debug!("drop PlayerInternal[{}]", self.session.session_id()); + + let handles: Vec> = { + // waiting for the thread while holding the mutex would result in a deadlock + let mut load_handles = self.load_handles.lock(); + + load_handles + .drain() + .map(|(_thread_id, handle)| handle) + .collect() + }; + + for handle in handles { + let _ = handle.join(); + } } }