mirror of
https://github.com/librespot-org/librespot.git
synced 2024-12-18 17:11:53 +00:00
Merge pull request #919 from gdesmott/race-fix
player: ensure `load_track` threads are done when dropping `PlayerInternal`
This commit is contained in:
commit
e5938c7e24
3 changed files with 30 additions and 2 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1406,6 +1406,7 @@ dependencies = [
|
||||||
"librespot-metadata",
|
"librespot-metadata",
|
||||||
"log",
|
"log",
|
||||||
"ogg",
|
"ogg",
|
||||||
|
"parking_lot",
|
||||||
"portaudio-rs",
|
"portaudio-rs",
|
||||||
"rand",
|
"rand",
|
||||||
"rand_distr",
|
"rand_distr",
|
||||||
|
|
|
@ -26,6 +26,7 @@ shell-words = "1.0.0"
|
||||||
thiserror = "1.0"
|
thiserror = "1.0"
|
||||||
tokio = { version = "1", features = ["parking_lot", "rt", "rt-multi-thread", "sync"] }
|
tokio = { version = "1", features = ["parking_lot", "rt", "rt-multi-thread", "sync"] }
|
||||||
zerocopy = { version = "0.3" }
|
zerocopy = { version = "0.3" }
|
||||||
|
parking_lot = { version = "0.11", features = ["deadlock_detection"] }
|
||||||
|
|
||||||
# Backends
|
# Backends
|
||||||
alsa = { version = "0.5", optional = true }
|
alsa = { version = "0.5", optional = true }
|
||||||
|
|
|
@ -1,11 +1,13 @@
|
||||||
use std::{
|
use std::{
|
||||||
cmp::max,
|
cmp::max,
|
||||||
|
collections::HashMap,
|
||||||
fmt,
|
fmt,
|
||||||
future::Future,
|
future::Future,
|
||||||
io::{self, Read, Seek, SeekFrom},
|
io::{self, Read, Seek, SeekFrom},
|
||||||
mem,
|
mem,
|
||||||
pin::Pin,
|
pin::Pin,
|
||||||
process::exit,
|
process::exit,
|
||||||
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
thread,
|
thread,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
|
@ -13,6 +15,7 @@ use std::{
|
||||||
|
|
||||||
use byteorder::{LittleEndian, ReadBytesExt};
|
use byteorder::{LittleEndian, ReadBytesExt};
|
||||||
use futures_util::{future, stream::futures_unordered::FuturesUnordered, StreamExt, TryFutureExt};
|
use futures_util::{future, stream::futures_unordered::FuturesUnordered, StreamExt, TryFutureExt};
|
||||||
|
use parking_lot::Mutex;
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
|
@ -56,6 +59,7 @@ struct PlayerInternal {
|
||||||
session: Session,
|
session: Session,
|
||||||
config: PlayerConfig,
|
config: PlayerConfig,
|
||||||
commands: mpsc::UnboundedReceiver<PlayerCommand>,
|
commands: mpsc::UnboundedReceiver<PlayerCommand>,
|
||||||
|
load_handles: Arc<Mutex<HashMap<thread::ThreadId, thread::JoinHandle<()>>>>,
|
||||||
|
|
||||||
state: PlayerState,
|
state: PlayerState,
|
||||||
preload: PlayerPreload,
|
preload: PlayerPreload,
|
||||||
|
@ -344,6 +348,7 @@ impl Player {
|
||||||
session,
|
session,
|
||||||
config,
|
config,
|
||||||
commands: cmd_rx,
|
commands: cmd_rx,
|
||||||
|
load_handles: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
|
||||||
state: PlayerState::Stopped,
|
state: PlayerState::Stopped,
|
||||||
preload: PlayerPreload::None,
|
preload: PlayerPreload::None,
|
||||||
|
@ -1953,7 +1958,7 @@ impl PlayerInternal {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_track(
|
fn load_track(
|
||||||
&self,
|
&mut self,
|
||||||
spotify_id: SpotifyId,
|
spotify_id: SpotifyId,
|
||||||
position_ms: u32,
|
position_ms: u32,
|
||||||
) -> impl Future<Output = Result<PlayerLoadedTrackData, ()>> + Send + 'static {
|
) -> impl Future<Output = Result<PlayerLoadedTrackData, ()>> + Send + 'static {
|
||||||
|
@ -1970,14 +1975,21 @@ impl PlayerInternal {
|
||||||
|
|
||||||
let (result_tx, result_rx) = oneshot::channel();
|
let (result_tx, result_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
let load_handles_clone = self.load_handles.clone();
|
||||||
let handle = tokio::runtime::Handle::current();
|
let handle = tokio::runtime::Handle::current();
|
||||||
thread::spawn(move || {
|
let load_handle = thread::spawn(move || {
|
||||||
let data = handle.block_on(loader.load_track(spotify_id, position_ms));
|
let data = handle.block_on(loader.load_track(spotify_id, position_ms));
|
||||||
if let Some(data) = data {
|
if let Some(data) = data {
|
||||||
let _ = result_tx.send(data);
|
let _ = result_tx.send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut load_handles = load_handles_clone.lock();
|
||||||
|
load_handles.remove(&thread::current().id());
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let mut load_handles = self.load_handles.lock();
|
||||||
|
load_handles.insert(load_handle.thread().id(), load_handle);
|
||||||
|
|
||||||
result_rx.map_err(|_| ())
|
result_rx.map_err(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2016,6 +2028,20 @@ impl PlayerInternal {
|
||||||
impl Drop for PlayerInternal {
|
impl Drop for PlayerInternal {
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
debug!("drop PlayerInternal[{}]", self.session.session_id());
|
debug!("drop PlayerInternal[{}]", self.session.session_id());
|
||||||
|
|
||||||
|
let handles: Vec<thread::JoinHandle<()>> = {
|
||||||
|
// waiting for the thread while holding the mutex would result in a deadlock
|
||||||
|
let mut load_handles = self.load_handles.lock();
|
||||||
|
|
||||||
|
load_handles
|
||||||
|
.drain()
|
||||||
|
.map(|(_thread_id, handle)| handle)
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
for handle in handles {
|
||||||
|
let _ = handle.join();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue