Remove that last couple unwraps from main

Also:

* Don't just hang if Spirc shuts down too often.

* Replace the while loop with Vec retain.

* Be more explicit with the rate limit.
This commit is contained in:
JasonLG1979 2021-12-23 20:56:16 -06:00
parent 4370aa1cbe
commit 1f43e9e389

View file

@ -1568,6 +1568,9 @@ fn get_setup() -> Setup {
#[tokio::main(flavor = "current_thread")] #[tokio::main(flavor = "current_thread")]
async fn main() { async fn main() {
const RUST_BACKTRACE: &str = "RUST_BACKTRACE"; const RUST_BACKTRACE: &str = "RUST_BACKTRACE";
const RECONNECT_RATE_LIMIT_WINDOW: Duration = Duration::from_secs(600);
const RECONNECT_RATE_LIMIT: usize = 5;
if env::var(RUST_BACKTRACE).is_err() { if env::var(RUST_BACKTRACE).is_err() {
env::set_var(RUST_BACKTRACE, "full") env::set_var(RUST_BACKTRACE, "full")
} }
@ -1585,14 +1588,18 @@ async fn main() {
if setup.enable_discovery { if setup.enable_discovery {
let device_id = setup.session_config.device_id.clone(); let device_id = setup.session_config.device_id.clone();
discovery = Some( discovery = match librespot::discovery::Discovery::builder(device_id)
librespot::discovery::Discovery::builder(device_id) .name(setup.connect_config.name.clone())
.name(setup.connect_config.name.clone()) .device_type(setup.connect_config.device_type)
.device_type(setup.connect_config.device_type) .port(setup.zeroconf_port)
.port(setup.zeroconf_port) .launch()
.launch() {
.unwrap(), Ok(d) => Some(d),
); Err(e) => {
error!("Discovery Error: {}", e);
exit(1);
}
}
} }
if let Some(credentials) = setup.credentials { if let Some(credentials) = setup.credentials {
@ -1609,7 +1616,12 @@ async fn main() {
loop { loop {
tokio::select! { tokio::select! {
credentials = async { discovery.as_mut().unwrap().next().await }, if discovery.is_some() => { credentials = async {
match discovery.as_mut() {
Some(d) => d.next().await,
_ => None
}
}, if discovery.is_some() => {
match credentials { match credentials {
Some(credentials) => { Some(credentials) => {
last_credentials = Some(credentials.clone()); last_credentials = Some(credentials.clone());
@ -1630,8 +1642,8 @@ async fn main() {
).fuse()); ).fuse());
}, },
None => { None => {
warn!("Discovery stopped!"); error!("Discovery stopped unexpectedly");
discovery = None; exit(1);
} }
} }
}, },
@ -1682,20 +1694,22 @@ async fn main() {
exit(1); exit(1);
} }
}, },
_ = async { spirc_task.as_mut().unwrap().await }, if spirc_task.is_some() => { _ = async {
if let Some(task) = spirc_task.as_mut() {
task.await;
}
}, if spirc_task.is_some() => {
spirc_task = None; spirc_task = None;
warn!("Spirc shut down unexpectedly"); warn!("Spirc shut down unexpectedly");
while !auto_connect_times.is_empty()
&& ((Instant::now() - auto_connect_times[0]).as_secs() > 600)
{
let _ = auto_connect_times.remove(0);
}
if let Some(credentials) = last_credentials.clone() { let mut reconnect_exceeds_rate_limit = || {
if auto_connect_times.len() >= 5 { auto_connect_times.retain(|&t| t.elapsed() < RECONNECT_RATE_LIMIT_WINDOW);
warn!("Spirc shut down too often. Not reconnecting automatically."); auto_connect_times.len() > RECONNECT_RATE_LIMIT
} else { };
match last_credentials.clone() {
Some(credentials) if !reconnect_exceeds_rate_limit() => {
auto_connect_times.push(Instant::now()); auto_connect_times.push(Instant::now());
connecting = Box::pin(Session::connect( connecting = Box::pin(Session::connect(
@ -1703,19 +1717,25 @@ async fn main() {
credentials, credentials,
setup.cache.clone(), setup.cache.clone(),
).fuse()); ).fuse());
} },
_ => {
error!("Spirc shut down too often. Not reconnecting automatically.");
exit(1);
},
} }
}, },
event = async { player_event_channel.as_mut().unwrap().recv().await }, if player_event_channel.is_some() => match event { event = async {
match player_event_channel.as_mut() {
Some(p) => p.recv().await,
_ => None
}
}, if player_event_channel.is_some() => match event {
Some(event) => { Some(event) => {
if let Some(program) = &setup.player_event_program { if let Some(program) = &setup.player_event_program {
if let Some(child) = run_program_on_events(event, program) { if let Some(child) = run_program_on_events(event, program) {
if child.is_ok() { if let Ok(mut child) = child {
let mut child = child.unwrap();
tokio::spawn(async move { tokio::spawn(async move {
match child.wait().await { match child.wait().await {
Ok(e) if e.success() => (), Ok(e) if e.success() => (),
Ok(e) => { Ok(e) => {
if let Some(code) = e.code() { if let Some(code) = e.code() {
@ -1741,7 +1761,8 @@ async fn main() {
}, },
_ = tokio::signal::ctrl_c() => { _ = tokio::signal::ctrl_c() => {
break; break;
} },
else => break,
} }
} }
@ -1754,7 +1775,8 @@ async fn main() {
if let Some(mut spirc_task) = spirc_task { if let Some(mut spirc_task) = spirc_task {
tokio::select! { tokio::select! {
_ = tokio::signal::ctrl_c() => (), _ = tokio::signal::ctrl_c() => (),
_ = spirc_task.as_mut() => () _ = spirc_task.as_mut() => (),
else => (),
} }
} }
} }