add session timeout handling (#1129)

This commit is contained in:
eladyn 2023-06-01 21:39:35 +02:00 committed by GitHub
parent 310725d2fb
commit 4d402e690c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 64 additions and 22 deletions

View file

@ -87,6 +87,7 @@ https://github.com/librespot-org/librespot
- [core] Support downloading of lyrics - [core] Support downloading of lyrics
- [core] Support parsing `SpotifyId` for local files - [core] Support parsing `SpotifyId` for local files
- [core] Support parsing `SpotifyId` for named playlists - [core] Support parsing `SpotifyId` for named playlists
- [core] Add checks and handling for stale server connections.
- [main] Add all player events to `player_event_handler.rs` - [main] Add all player events to `player_event_handler.rs`
- [main] Add an event worker thread that runs async to the main thread(s) but - [main] Add an event worker thread that runs async to the main thread(s) but
sync to itself to prevent potential data races for event consumers sync to itself to prevent potential data races for event consumers

View file

@ -1,7 +1,6 @@
use std::{ use std::{
collections::HashMap, collections::HashMap,
future::Future, future::Future,
mem,
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
@ -200,7 +199,7 @@ impl MercuryManager {
for i in 0..count { for i in 0..count {
let mut part = Self::parse_part(&mut data); let mut part = Self::parse_part(&mut data);
if let Some(mut partial) = mem::replace(&mut pending.partial, None) { if let Some(mut partial) = pending.partial.take() {
partial.extend_from_slice(&part); partial.extend_from_slice(&part);
part = partial; part = partial;
} }

57
core/src/session.rs Normal file → Executable file
View file

@ -6,7 +6,7 @@ use std::{
process::exit, process::exit,
sync::{Arc, Weak}, sync::{Arc, Weak},
task::{Context, Poll}, task::{Context, Poll},
time::{SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
@ -18,7 +18,7 @@ use once_cell::sync::OnceCell;
use parking_lot::RwLock; use parking_lot::RwLock;
use quick_xml::events::Event; use quick_xml::events::Event;
use thiserror::Error; use thiserror::Error;
use tokio::sync::mpsc; use tokio::{sync::mpsc, time::Instant};
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::{ use crate::{
@ -80,6 +80,7 @@ struct SessionData {
time_delta: i64, time_delta: i64,
invalid: bool, invalid: bool,
user_data: UserData, user_data: UserData,
last_ping: Option<Instant>,
} }
struct SessionInternal { struct SessionInternal {
@ -100,6 +101,15 @@ struct SessionInternal {
handle: tokio::runtime::Handle, handle: tokio::runtime::Handle,
} }
/// A shared reference to a Spotify session.
///
/// After instantiating, you need to login via [Session::connect].
/// You can either implement the whole playback logic yourself by using
/// this structs interface directly or hand it to a
/// `Player`.
///
/// *Note*: [Session] instances cannot yet be reused once invalidated. After
/// an unexpectedly closed connection, you'll need to create a new [Session].
#[derive(Clone)] #[derive(Clone)]
pub struct Session(Arc<SessionInternal>); pub struct Session(Arc<SessionInternal>);
@ -181,9 +191,10 @@ impl Session {
.map(Ok) .map(Ok)
.forward(sink); .forward(sink);
let receiver_task = DispatchTask(stream, self.weak()); let receiver_task = DispatchTask(stream, self.weak());
let timeout_task = Session::session_timeout(self.weak());
tokio::spawn(async move { tokio::spawn(async move {
let result = future::try_join(sender_task, receiver_task).await; let result = future::try_join3(sender_task, receiver_task, timeout_task).await;
if let Err(e) = result { if let Err(e) = result {
error!("{}", e); error!("{}", e);
@ -231,6 +242,33 @@ impl Session {
.get_or_init(|| TokenProvider::new(self.weak())) .get_or_init(|| TokenProvider::new(self.weak()))
} }
/// Returns an error, when we haven't received a ping for too long (2 minutes),
/// which means that we silently lost connection to Spotify servers.
async fn session_timeout(session: SessionWeak) -> io::Result<()> {
// pings are sent every 2 minutes and a 5 second margin should be fine
const SESSION_TIMEOUT: Duration = Duration::from_secs(125);
while let Some(session) = session.try_upgrade() {
if session.is_invalid() {
break;
}
let last_ping = session.0.data.read().last_ping.unwrap_or_else(Instant::now);
if last_ping.elapsed() >= SESSION_TIMEOUT {
session.shutdown();
// TODO: Optionally reconnect (with cached/last credentials?)
return Err(io::Error::new(
io::ErrorKind::TimedOut,
"session lost connection to server",
));
}
// drop the strong reference before sleeping
drop(session);
// a potential timeout cannot occur at least until SESSION_TIMEOUT after the last_ping
tokio::time::sleep_until(last_ping + SESSION_TIMEOUT).await;
}
Ok(())
}
pub fn time_delta(&self) -> i64 { pub fn time_delta(&self) -> i64 {
self.0.data.read().time_delta self.0.data.read().time_delta
} }
@ -278,13 +316,16 @@ impl Session {
match packet_type { match packet_type {
Some(Ping) => { Some(Ping) => {
let server_timestamp = BigEndian::read_u32(data.as_ref()) as i64; let server_timestamp = BigEndian::read_u32(data.as_ref()) as i64;
let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) { let timestamp = SystemTime::now()
Ok(dur) => dur, .duration_since(UNIX_EPOCH)
Err(err) => err.duration(), .unwrap_or(Duration::ZERO)
}
.as_secs() as i64; .as_secs() as i64;
self.0.data.write().time_delta = server_timestamp - timestamp; {
let mut data = self.0.data.write();
data.time_delta = server_timestamp.saturating_sub(timestamp);
data.last_ping = Some(Instant::now());
}
self.debug_info(); self.debug_info();
self.send_packet(Pong, vec![0, 0, 0, 0]) self.send_packet(Pong, vec![0, 0, 0, 0])

View file

@ -1,4 +1,4 @@
/// Version string of the form "librespot-<sha>" /// Version string of the form "librespot-\<sha\>"
pub const VERSION_STRING: &str = concat!("librespot-", env!("VERGEN_GIT_SHA")); pub const VERSION_STRING: &str = concat!("librespot-", env!("VERGEN_GIT_SHA"));
/// Generate a timestamp string representing the build date (UTC). /// Generate a timestamp string representing the build date (UTC).

View file

@ -1662,7 +1662,7 @@ async fn main() {
let mut connecting = false; let mut connecting = false;
let mut _event_handler: Option<EventHandler> = None; let mut _event_handler: Option<EventHandler> = None;
let session = Session::new(setup.session_config.clone(), setup.cache.clone()); let mut session = Session::new(setup.session_config.clone(), setup.cache.clone());
if setup.enable_discovery { if setup.enable_discovery {
let device_id = setup.session_config.device_id.clone(); let device_id = setup.session_config.device_id.clone();
@ -1721,6 +1721,10 @@ async fn main() {
} }
}, },
_ = async {}, if connecting && last_credentials.is_some() => { _ = async {}, if connecting && last_credentials.is_some() => {
if session.is_invalid() {
session = Session::new(setup.session_config.clone(), setup.cache.clone());
}
let mixer_config = setup.mixer_config.clone(); let mixer_config = setup.mixer_config.clone();
let mixer = (setup.mixer)(mixer_config); let mixer = (setup.mixer)(mixer_config);
let player_config = setup.player_config.clone(); let player_config = setup.player_config.clone();
@ -1770,15 +1774,12 @@ async fn main() {
auto_connect_times.len() > RECONNECT_RATE_LIMIT auto_connect_times.len() > RECONNECT_RATE_LIMIT
}; };
match last_credentials.clone() { if last_credentials.is_some() && !reconnect_exceeds_rate_limit() {
Some(_) if !reconnect_exceeds_rate_limit() => {
auto_connect_times.push(Instant::now()); auto_connect_times.push(Instant::now());
connecting = true; connecting = true;
}, } else {
_ => {
error!("Spirc shut down too often. Not reconnecting automatically."); error!("Spirc shut down too often. Not reconnecting automatically.");
exit(1); exit(1);
},
} }
}, },
_ = tokio::signal::ctrl_c() => { _ = tokio::signal::ctrl_c() => {