Change counting to spirc and player

They can be reinstantiated, unlike the `session` which is now
intended to be constructed once.
This commit is contained in:
Roderick van Domburg 2022-01-16 21:29:59 +01:00
parent fcb21df81f
commit 8851951f04
No known key found for this signature in database
GPG key ID: FE2585E713F9F30A
3 changed files with 30 additions and 27 deletions

View file

@ -2,6 +2,7 @@ use std::{
convert::TryFrom, convert::TryFrom,
future::Future, future::Future,
pin::Pin, pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
}; };
@ -106,8 +107,12 @@ struct SpircTask {
context_fut: BoxedFuture<Result<serde_json::Value, Error>>, context_fut: BoxedFuture<Result<serde_json::Value, Error>>,
autoplay_fut: BoxedFuture<Result<String, Error>>, autoplay_fut: BoxedFuture<Result<String, Error>>,
context: Option<StationContext>, context: Option<StationContext>,
spirc_id: usize,
} }
static SPIRC_COUNTER: AtomicUsize = AtomicUsize::new(0);
pub enum SpircCommand { pub enum SpircCommand {
Play, Play,
PlayPause, PlayPause,
@ -263,7 +268,8 @@ impl Spirc {
player: Player, player: Player,
mixer: Box<dyn Mixer>, mixer: Box<dyn Mixer>,
) -> Result<(Spirc, impl Future<Output = ()>), Error> { ) -> Result<(Spirc, impl Future<Output = ()>), Error> {
debug!("new Spirc[{}]", session.session_id()); let spirc_id = SPIRC_COUNTER.fetch_add(1, Ordering::AcqRel);
debug!("new Spirc[{}]", spirc_id);
let ident = session.device_id().to_owned(); let ident = session.device_id().to_owned();
@ -368,6 +374,8 @@ impl Spirc {
context_fut: Box::pin(future::pending()), context_fut: Box::pin(future::pending()),
autoplay_fut: Box::pin(future::pending()), autoplay_fut: Box::pin(future::pending()),
context: None, context: None,
spirc_id,
}; };
if let Some(volume) = initial_volume { if let Some(volume) = initial_volume {
@ -1427,7 +1435,7 @@ impl SpircTask {
impl Drop for SpircTask { impl Drop for SpircTask {
fn drop(&mut self) { fn drop(&mut self) {
debug!("drop Spirc[{}]", self.session.session_id()); debug!("drop Spirc[{}]", self.spirc_id);
} }
} }

View file

@ -4,10 +4,7 @@ use std::{
io, io,
pin::Pin, pin::Pin,
process::exit, process::exit,
sync::{ sync::{Arc, Weak},
atomic::{AtomicUsize, Ordering},
Arc, Weak,
},
task::{Context, Poll}, task::{Context, Poll},
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
}; };
@ -97,12 +94,8 @@ struct SessionInternal {
cache: Option<Arc<Cache>>, cache: Option<Arc<Cache>>,
handle: tokio::runtime::Handle, handle: tokio::runtime::Handle,
session_id: usize,
} }
static SESSION_COUNTER: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone)] #[derive(Clone)]
pub struct Session(Arc<SessionInternal>); pub struct Session(Arc<SessionInternal>);
@ -110,8 +103,7 @@ impl Session {
pub fn new(config: SessionConfig, cache: Option<Cache>) -> Self { pub fn new(config: SessionConfig, cache: Option<Cache>) -> Self {
let http_client = HttpClient::new(config.proxy.as_ref()); let http_client = HttpClient::new(config.proxy.as_ref());
let session_id = SESSION_COUNTER.fetch_add(1, Ordering::AcqRel); debug!("new Session");
debug!("new Session[{}]", session_id);
Self(Arc::new(SessionInternal { Self(Arc::new(SessionInternal {
config, config,
@ -126,7 +118,6 @@ impl Session {
spclient: OnceCell::new(), spclient: OnceCell::new(),
token_provider: OnceCell::new(), token_provider: OnceCell::new(),
handle: tokio::runtime::Handle::current(), handle: tokio::runtime::Handle::current(),
session_id,
})) }))
} }
@ -218,8 +209,7 @@ impl Session {
fn debug_info(&self) { fn debug_info(&self) {
debug!( debug!(
"Session[{}] strong={} weak={}", "Session strong={} weak={}",
self.0.session_id,
Arc::strong_count(&self.0), Arc::strong_count(&self.0),
Arc::weak_count(&self.0) Arc::weak_count(&self.0)
); );
@ -413,12 +403,8 @@ impl Session {
SessionWeak(Arc::downgrade(&self.0)) SessionWeak(Arc::downgrade(&self.0))
} }
pub fn session_id(&self) -> usize {
self.0.session_id
}
pub fn shutdown(&self) { pub fn shutdown(&self) {
debug!("Invalidating session [{}]", self.0.session_id); debug!("Invalidating session");
self.0.data.write().invalid = true; self.0.data.write().invalid = true;
self.mercury().shutdown(); self.mercury().shutdown();
self.channel().shutdown(); self.channel().shutdown();
@ -445,7 +431,7 @@ impl SessionWeak {
impl Drop for SessionInternal { impl Drop for SessionInternal {
fn drop(&mut self) { fn drop(&mut self) {
debug!("drop Session[{}]", self.session_id); debug!("drop Session");
} }
} }

View file

@ -7,7 +7,10 @@ use std::{
mem, mem,
pin::Pin, pin::Pin,
process::exit, process::exit,
sync::Arc, sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll}, task::{Context, Poll},
thread, thread,
time::{Duration, Instant}, time::{Duration, Instant},
@ -84,8 +87,12 @@ struct PlayerInternal {
normalisation_peak: f64, normalisation_peak: f64,
auto_normalise_as_album: bool, auto_normalise_as_album: bool,
player_id: usize,
} }
static PLAYER_COUNTER: AtomicUsize = AtomicUsize::new(0);
enum PlayerCommand { enum PlayerCommand {
Load { Load {
track_id: SpotifyId, track_id: SpotifyId,
@ -365,7 +372,8 @@ impl Player {
} }
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
debug!("new Player[{}]", session.session_id()); let player_id = PLAYER_COUNTER.fetch_add(1, Ordering::AcqRel);
debug!("new Player [{}]", player_id);
let converter = Converter::new(config.ditherer); let converter = Converter::new(config.ditherer);
@ -388,6 +396,8 @@ impl Player {
normalisation_integrator: 0.0, normalisation_integrator: 0.0,
auto_normalise_as_album: false, auto_normalise_as_album: false,
player_id,
}; };
// While PlayerInternal is written as a future, it still contains blocking code. // While PlayerInternal is written as a future, it still contains blocking code.
@ -488,9 +498,8 @@ impl Drop for Player {
debug!("Shutting down player thread ..."); debug!("Shutting down player thread ...");
self.commands = None; self.commands = None;
if let Some(handle) = self.thread_handle.take() { if let Some(handle) = self.thread_handle.take() {
match handle.join() { if let Err(e) = handle.join() {
Ok(_) => (), error!("Player thread Error: {:?}", e);
Err(e) => error!("Player thread Error: {:?}", e),
} }
} }
} }
@ -2043,7 +2052,7 @@ impl PlayerInternal {
impl Drop for PlayerInternal { impl Drop for PlayerInternal {
fn drop(&mut self) { fn drop(&mut self) {
debug!("drop PlayerInternal[{}]", self.session.session_id()); debug!("drop PlayerInternal[{}]", self.player_id);
let handles: Vec<thread::JoinHandle<()>> = { let handles: Vec<thread::JoinHandle<()>> = {
// waiting for the thread while holding the mutex would result in a deadlock // waiting for the thread while holding the mutex would result in a deadlock