Merge pull request #427 from kaymes/connection-lost-crash

Gracefully handle lost network connections
This commit is contained in:
Sasha Hilton 2020-01-30 02:04:37 +01:00 committed by GitHub
commit 83140bea88
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 153 additions and 45 deletions

View file

@ -333,7 +333,11 @@ impl Future for SpircTask {
progress = true;
self.handle_frame(frame);
}
Async::Ready(None) => panic!("subscription terminated"),
Async::Ready(None) => {
error!("subscription terminated");
self.shutdown = true;
self.commands.close();
}
Async::NotReady => (),
}

View file

@ -14,6 +14,7 @@ component! {
download_rate_estimate: usize = 0,
download_measurement_start: Option<Instant> = None,
download_measurement_bytes: usize = 0,
invalid: bool = false,
}
}
@ -46,7 +47,9 @@ impl ChannelManager {
let seq = self.lock(|inner| {
let seq = inner.sequence.get();
inner.channels.insert(seq, tx);
if !inner.invalid {
inner.channels.insert(seq, tx);
}
seq
});
@ -87,12 +90,21 @@ impl ChannelManager {
pub fn get_download_rate_estimate(&self) -> usize {
return self.lock(|inner| inner.download_rate_estimate);
}
pub(crate) fn shutdown(&self) {
self.lock(|inner| {
inner.invalid = true;
// destroy the sending halves of the channels to signal everyone who is waiting for something.
inner.channels.clear();
});
}
}
impl Channel {
fn recv_packet(&mut self) -> Poll<Bytes, ChannelError> {
let (cmd, packet) = match self.receiver.poll() {
Ok(Async::Ready(t)) => t.expect("channel closed"),
Ok(Async::Ready(Some(t))) => t,
Ok(Async::Ready(None)) => return Err(ChannelError), // The channel has been closed.
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
};

View file

@ -28,9 +28,27 @@ pub fn connect(
let (addr, connect_url) = match *proxy {
Some(ref url) => {
info!("Using proxy \"{}\"", url);
(url.to_socket_addrs().unwrap().next().unwrap(), Some(addr))
match url.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"Can't resolve proxy server address",
))
}) {
Ok(socket_addr) => (socket_addr, Some(addr)),
Err(error) => return Box::new(futures::future::err(error)),
}
}
None => {
match addr.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"Can't resolve server address",
))
}) {
Ok(socket_addr) => (socket_addr, None),
Err(error) => return Box::new(futures::future::err(error)),
}
}
None => (addr.to_socket_addrs().unwrap().next().unwrap(), None),
};
let socket = TcpStream::connect(&addr, handle);

View file

@ -20,6 +20,7 @@ component! {
sequence: SeqGenerator<u64> = SeqGenerator::new(0),
pending: HashMap<Vec<u8>, MercuryPending> = HashMap::new(),
subscriptions: Vec<(String, mpsc::UnboundedSender<MercuryResponse>)> = Vec::new(),
invalid: bool = false,
}
}
@ -61,7 +62,11 @@ impl MercuryManager {
};
let seq = self.next_seq();
self.lock(|inner| inner.pending.insert(seq.clone(), pending));
self.lock(|inner| {
if !inner.invalid {
inner.pending.insert(seq.clone(), pending);
}
});
let cmd = req.method.command();
let data = req.encode(&seq);
@ -110,21 +115,23 @@ impl MercuryManager {
let (tx, rx) = mpsc::unbounded();
manager.lock(move |inner| {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if response.payload.len() > 0 {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if response.payload.len() > 0 {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
debug!("subscribed sub_uri={}", sub_uri);
debug!("subscribed sub_uri={}", sub_uri);
inner.subscriptions.push((sub_uri, tx.clone()));
inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
}
});
@ -223,4 +230,13 @@ impl MercuryManager {
}
}
}
pub(crate) fn shutdown(&self) {
self.lock(|inner| {
inner.invalid = true;
// destroy the sending halves of the channels to signal everyone who is waiting for something.
inner.pending.clear();
inner.subscriptions.clear();
});
}
}

View file

@ -243,6 +243,8 @@ impl Session {
pub fn shutdown(&self) {
debug!("Invalidating session[{}]", self.0.session_id);
self.0.data.write().unwrap().invalid = true;
self.mercury().shutdown();
self.channel().shutdown();
}
pub fn is_invalid(&self) -> bool {
@ -289,14 +291,18 @@ where
loop {
let (cmd, data) = match self.0.poll() {
Ok(Async::Ready(t)) => t,
Ok(Async::Ready(Some(t))) => t,
Ok(Async::Ready(None)) => {
warn!("Connection to server closed.");
session.shutdown();
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
session.shutdown();
return Err(From::from(e));
}
}
.expect("connection closed");
};
session.dispatch(cmd, data);
}

View file

@ -636,9 +636,14 @@ impl PlayerInternal {
spotify_id: SpotifyId,
position: i64,
) -> Option<(Decoder, f32, StreamLoaderController, usize)> {
let audio = AudioItem::get_audio_item(&self.session, spotify_id)
.wait()
.unwrap();
let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() {
Ok(audio) => audio,
Err(_) => {
error!("Unable to load audio item.");
return None;
}
};
info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri);
let audio = match self.find_available_alternative(&audio) {
@ -690,7 +695,13 @@ impl PlayerInternal {
play_from_beginning,
);
let encrypted_file = encrypted_file.wait().unwrap();
let encrypted_file = match encrypted_file.wait() {
Ok(encrypted_file) => encrypted_file,
Err(_) => {
error!("Unable to load encrypted file.");
return None;
}
};
let mut stream_loader_controller = encrypted_file.get_stream_loader_controller();
@ -702,7 +713,14 @@ impl PlayerInternal {
stream_loader_controller.set_random_access_mode();
}
let key = key.wait().unwrap();
let key = match key.wait() {
Ok(key) => key,
Err(_) => {
error!("Unable to load decryption key");
return None;
}
};
let mut decrypted_file = AudioDecrypt::new(key, encrypted_file);
let normalisation_factor = match NormalisationData::parse_from_file(&mut decrypted_file) {

View file

@ -8,6 +8,7 @@ use std::mem;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Instant;
use tokio_core::reactor::{Core, Handle};
use tokio_io::IoStream;
use url::Url;
@ -375,6 +376,8 @@ struct Main {
connect: Box<dyn Future<Item = Session, Error = io::Error>>,
shutdown: bool,
last_credentials: Option<Credentials>,
auto_connect_times: Vec<Instant>,
player_event_channel: Option<UnboundedReceiver<PlayerEvent>>,
player_event_program: Option<String>,
@ -398,6 +401,8 @@ impl Main {
spirc: None,
spirc_task: None,
shutdown: false,
last_credentials: None,
auto_connect_times: Vec::new(),
signal: Box::new(tokio_signal::ctrl_c().flatten_stream()),
player_event_channel: None,
@ -420,6 +425,7 @@ impl Main {
}
fn credentials(&mut self, credentials: Credentials) {
self.last_credentials = Some(credentials.clone());
let config = self.session_config.clone();
let handle = self.handle.clone();
@ -448,32 +454,40 @@ impl Future for Main {
if let Some(ref spirc) = self.spirc {
spirc.shutdown();
}
self.auto_connect_times.clear();
self.credentials(creds);
progress = true;
}
if let Async::Ready(session) = self.connect.poll().unwrap() {
self.connect = Box::new(futures::future::empty());
let mixer_config = self.mixer_config.clone();
let mixer = (self.mixer)(Some(mixer_config));
let player_config = self.player_config.clone();
let connect_config = self.connect_config.clone();
match self.connect.poll() {
Ok(Async::Ready(session)) => {
self.connect = Box::new(futures::future::empty());
let mixer_config = self.mixer_config.clone();
let mixer = (self.mixer)(Some(mixer_config));
let player_config = self.player_config.clone();
let connect_config = self.connect_config.clone();
let audio_filter = mixer.get_audio_filter();
let backend = self.backend;
let device = self.device.clone();
let (player, event_channel) =
Player::new(player_config, session.clone(), audio_filter, move || {
(backend)(device)
});
let audio_filter = mixer.get_audio_filter();
let backend = self.backend;
let device = self.device.clone();
let (player, event_channel) =
Player::new(player_config, session.clone(), audio_filter, move || {
(backend)(device)
});
let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer);
self.spirc = Some(spirc);
self.spirc_task = Some(spirc_task);
self.player_event_channel = Some(event_channel);
let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer);
self.spirc = Some(spirc);
self.spirc_task = Some(spirc_task);
self.player_event_channel = Some(event_channel);
progress = true;
progress = true;
}
Ok(Async::NotReady) => (),
Err(error) => {
error!("Could not connect to server: {}", error);
self.connect = Box::new(futures::future::empty());
}
}
if let Async::Ready(Some(())) = self.signal.poll().unwrap() {
@ -492,12 +506,32 @@ impl Future for Main {
progress = true;
}
let mut drop_spirc_and_try_to_reconnect = false;
if let Some(ref mut spirc_task) = self.spirc_task {
if let Async::Ready(()) = spirc_task.poll().unwrap() {
if self.shutdown {
return Ok(Async::Ready(()));
} else {
panic!("Spirc shut down unexpectedly");
warn!("Spirc shut down unexpectedly");
drop_spirc_and_try_to_reconnect = true;
}
progress = true;
}
}
if drop_spirc_and_try_to_reconnect {
self.spirc_task = None;
while (!self.auto_connect_times.is_empty())
&& ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600)
{
let _ = self.auto_connect_times.remove(0);
}
if let Some(credentials) = self.last_credentials.clone() {
if self.auto_connect_times.len() >= 5 {
warn!("Spirc shut down too often. Not reconnecting automatically.");
} else {
self.auto_connect_times.push(Instant::now());
self.credentials(credentials);
}
}
}