diff --git a/Cargo.lock b/Cargo.lock index cce06c16..1d507689 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aes" version = "0.6.0" @@ -110,6 +125,21 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "backtrace" +version = "0.3.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "321629d8ba6513061f26707241fa9bc89524ff1cd7a915a97ef0c62c666ce1b6" +dependencies = [ + "addr2line", + "cc", + "cfg-if 1.0.0", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.13.0" @@ -429,6 +459,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "fixedbitset" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" + [[package]] name = "fnv" version = "1.0.7" @@ -569,6 +605,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" + [[package]] name = "glib" version = "0.10.3" @@ -1226,6 +1268,7 @@ dependencies = [ "hyper", "librespot-core", "log", + "parking_lot", "tempfile", "thiserror", "tokio", @@ -1278,6 +1321,7 @@ dependencies = [ "num-integer", "num-traits", "once_cell", + "parking_lot", "pbkdf2", "priority-queue", "protobuf", @@ -1432,6 +1476,16 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "mio" version = "0.7.14" @@ -1704,6 +1758,15 @@ dependencies = [ "syn", ] +[[package]] +name = "object" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9" +dependencies = [ + "memchr", +] + [[package]] name = "oboe" version = "0.4.4" @@ -1771,11 +1834,14 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" dependencies = [ + "backtrace", "cfg-if 1.0.0", "instant", "libc", + "petgraph", "redox_syscall", "smallvec", + "thread-id", "winapi", ] @@ -1807,6 +1873,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "petgraph" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "467d164a6de56270bd7c4d070df81d07beace25012d5103ced4e9ff08d6afdb7" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project" version = "1.0.8" @@ -2115,6 +2191,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -2467,6 +2549,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thread-id" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fdfe0627923f7411a43ec9ec9c39c3a9b4151be313e0922042581fb6c9b717f" +dependencies = [ + "libc", + "redox_syscall", + "winapi", +] + [[package]] name = "time" version = "0.1.43" @@ -2505,6 +2598,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", + "parking_lot", "pin-project-lite", "signal-hook-registry", "tokio-macros", diff --git a/Cargo.toml b/Cargo.toml index 8429ba2e..bf453cff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ version = "0.3.1" [dependencies] base64 = "0.13" -env_logger = {version = "0.8", default-features = false, features = ["termcolor","humantime","atty"]} +env_logger = { version = "0.8", default-features = false, features = ["termcolor", "humantime", "atty"] } futures-util = { version = "0.3", default_features = false } getopts = "0.2.21" hex = "0.4" @@ -58,7 +58,7 @@ hyper = "0.14" log = "0.4" rpassword = "5.0" thiserror = "1.0" -tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "sync", "process"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "sync", "parking_lot", "process"] } url = "2.2" sha-1 = "0.9" diff --git a/audio/Cargo.toml b/audio/Cargo.toml index d5a7a074..c7cf0d7b 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -3,7 +3,7 @@ name = "librespot-audio" version = "0.3.1" authors = ["Paul Lietar "] description="The audio fetching and processing logic for librespot" -license="MIT" +license = "MIT" edition = "2018" [dependencies.librespot-core] @@ -19,6 +19,7 @@ futures-executor = "0.3" futures-util = { version = "0.3", default_features = false } hyper = { version = "0.14", features = ["client"] } log = "0.4" +parking_lot = { version = "0.11", features = ["deadlock_detection"] } tempfile = "3.1" thiserror = "1.0" -tokio = { version = "1", features = ["sync", "macros"] } +tokio = { version = "1", features = ["macros", "parking_lot", "sync"] } diff --git a/audio/src/fetch/mod.rs b/audio/src/fetch/mod.rs index dc5bcdf4..3efdc1e9 100644 --- a/audio/src/fetch/mod.rs +++ b/audio/src/fetch/mod.rs @@ -6,13 +6,14 @@ use std::{ io::{self, Read, Seek, SeekFrom}, sync::{ atomic::{self, AtomicUsize}, - Arc, Condvar, Mutex, + Arc, }, time::{Duration, Instant}, }; use futures_util::{future::IntoStream, StreamExt, TryFutureExt}; use hyper::{client::ResponseFuture, header::CONTENT_RANGE, Body, Response, StatusCode}; +use parking_lot::{Condvar, Mutex}; use tempfile::NamedTempFile; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; @@ -159,7 +160,7 @@ impl StreamLoaderController { pub fn range_available(&self, range: Range) -> bool { let available = if let Some(ref shared) = self.stream_shared { - let download_status = shared.download_status.lock().unwrap(); + let download_status = shared.download_status.lock(); range.length <= download_status @@ -214,18 +215,21 @@ impl StreamLoaderController { self.fetch(range); if let Some(ref shared) = self.stream_shared { - let mut download_status = shared.download_status.lock().unwrap(); + let mut download_status = shared.download_status.lock(); while range.length > download_status .downloaded .contained_length_from_value(range.start) { - download_status = shared + if shared .cond - .wait_timeout(download_status, DOWNLOAD_TIMEOUT) - .map_err(|_| AudioFileError::WaitTimeout)? - .0; + .wait_for(&mut download_status, DOWNLOAD_TIMEOUT) + .timed_out() + { + return Err(AudioFileError::WaitTimeout.into()); + } + if range.length > (download_status .downloaded @@ -473,7 +477,7 @@ impl Read for AudioFileStreaming { let length = min(output.len(), self.shared.file_size - offset); - let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) { + let length_to_request = match *(self.shared.download_strategy.lock()) { DownloadStrategy::RandomAccess() => length, DownloadStrategy::Streaming() => { // Due to the read-ahead stuff, we potentially request more than the actual request demanded. @@ -497,7 +501,7 @@ impl Read for AudioFileStreaming { let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length_to_request)); - let mut download_status = self.shared.download_status.lock().unwrap(); + let mut download_status = self.shared.download_status.lock(); ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); @@ -513,17 +517,17 @@ impl Read for AudioFileStreaming { } while !download_status.downloaded.contains(offset) { - download_status = self + if self .shared .cond - .wait_timeout(download_status, DOWNLOAD_TIMEOUT) - .map_err(|_| { - io::Error::new( - io::ErrorKind::TimedOut, - Error::deadline_exceeded(AudioFileError::WaitTimeout), - ) - })? - .0; + .wait_for(&mut download_status, DOWNLOAD_TIMEOUT) + .timed_out() + { + return Err(io::Error::new( + io::ErrorKind::TimedOut, + Error::deadline_exceeded(AudioFileError::WaitTimeout), + )); + } } let available_length = download_status .downloaded diff --git a/audio/src/fetch/receive.rs b/audio/src/fetch/receive.rs index f26c95f8..38851129 100644 --- a/audio/src/fetch/receive.rs +++ b/audio/src/fetch/receive.rs @@ -108,7 +108,7 @@ async fn receive_data( if request_length > 0 { let missing_range = Range::new(data_offset, request_length); - let mut download_status = shared.download_status.lock().unwrap(); + let mut download_status = shared.download_status.lock(); download_status.requested.subtract_range(&missing_range); shared.cond.notify_all(); @@ -157,7 +157,7 @@ enum ControlFlow { impl AudioFileFetch { fn get_download_strategy(&mut self) -> DownloadStrategy { - *(self.shared.download_strategy.lock().unwrap()) + *(self.shared.download_strategy.lock()) } fn download_range(&mut self, offset: usize, mut length: usize) -> AudioFileResult { @@ -172,7 +172,7 @@ impl AudioFileFetch { let mut ranges_to_request = RangeSet::new(); ranges_to_request.add_range(&Range::new(offset, length)); - let mut download_status = self.shared.download_status.lock().unwrap(); + let mut download_status = self.shared.download_status.lock(); ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); @@ -218,7 +218,7 @@ impl AudioFileFetch { let mut missing_data = RangeSet::new(); missing_data.add_range(&Range::new(0, self.shared.file_size)); { - let download_status = self.shared.download_status.lock().unwrap(); + let download_status = self.shared.download_status.lock(); missing_data.subtract_range_set(&download_status.downloaded); missing_data.subtract_range_set(&download_status.requested); @@ -306,7 +306,7 @@ impl AudioFileFetch { None => return Err(AudioFileError::Output.into()), } - let mut download_status = self.shared.download_status.lock().unwrap(); + let mut download_status = self.shared.download_status.lock(); let received_range = Range::new(data.offset, data.data.len()); download_status.downloaded.add_range(&received_range); @@ -336,10 +336,10 @@ impl AudioFileFetch { self.download_range(request.start, request.length)?; } StreamLoaderCommand::RandomAccessMode() => { - *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); + *(self.shared.download_strategy.lock()) = DownloadStrategy::RandomAccess(); } StreamLoaderCommand::StreamMode() => { - *(self.shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); + *(self.shared.download_strategy.lock()) = DownloadStrategy::Streaming(); } StreamLoaderCommand::Close() => return Ok(ControlFlow::Break), } @@ -380,7 +380,7 @@ pub(super) async fn audio_file_fetch( initial_request.offset, initial_request.offset + initial_request.length, ); - let mut download_status = shared.download_status.lock().unwrap(); + let mut download_status = shared.download_status.lock(); download_status.requested.add_range(&requested_range); } @@ -432,7 +432,7 @@ pub(super) async fn audio_file_fetch( let max_requests_to_send = MAX_PREFETCH_REQUESTS - number_of_open_requests; let bytes_pending: usize = { - let download_status = fetch.shared.download_status.lock().unwrap(); + let download_status = fetch.shared.download_status.lock(); download_status .requested diff --git a/connect/Cargo.toml b/connect/Cargo.toml index b0878c1c..ab425a66 100644 --- a/connect/Cargo.toml +++ b/connect/Cargo.toml @@ -16,7 +16,7 @@ rand = "0.8" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" -tokio = { version = "1.0", features = ["macros", "sync"] } +tokio = { version = "1.0", features = ["macros", "parking_lot", "sync"] } tokio-stream = "0.1.1" [dependencies.librespot-core] diff --git a/core/Cargo.toml b/core/Cargo.toml index 876a0038..798a5762 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -20,11 +20,11 @@ bytes = "1" chrono = "0.4" form_urlencoded = "1.0" futures-core = { version = "0.3", default-features = false } -futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] } +futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "sink", "unstable"] } hmac = "0.11" httparse = "1.3" http = "0.2" -hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] } +hyper = { version = "0.14", features = ["client", "http1", "http2", "tcp"] } hyper-proxy = { version = "0.9.1", default-features = false, features = ["rustls"] } hyper-rustls = { version = "0.22", default-features = false, features = ["native-tokio"] } log = "0.4" @@ -34,10 +34,11 @@ num-derive = "0.3" num-integer = "0.1" num-traits = "0.2" once_cell = "1.5.2" +parking_lot = { version = "0.11", features = ["deadlock_detection"] } pbkdf2 = { version = "0.8", default-features = false, features = ["hmac"] } priority-queue = "1.1" protobuf = "2.14.0" -quick-xml = { version = "0.22", features = [ "serialize" ] } +quick-xml = { version = "0.22", features = ["serialize"] } rand = "0.8" rustls = "0.19" rustls-native-certs = "0.5" @@ -46,7 +47,7 @@ serde_json = "1.0" sha-1 = "0.9" shannon = "0.2.0" thiserror = "1.0" -tokio = { version = "1.5", features = ["io-util", "macros", "net", "rt", "time", "sync"] } +tokio = { version = "1.5", features = ["io-util", "macros", "net", "parking_lot", "rt", "sync", "time"] } tokio-stream = "0.1.1" tokio-tungstenite = { version = "0.14", default-features = false, features = ["rustls-tls"] } tokio-util = { version = "0.6", features = ["codec"] } @@ -59,4 +60,4 @@ vergen = "3.0.4" [dev-dependencies] env_logger = "0.8" -tokio = {version = "1.0", features = ["macros"] } +tokio = { version = "1.0", features = ["macros", "parking_lot"] } diff --git a/core/src/cache.rs b/core/src/cache.rs index ed7cf83e..7a3c0fc4 100644 --- a/core/src/cache.rs +++ b/core/src/cache.rs @@ -4,10 +4,11 @@ use std::{ fs::{self, File}, io::{self, Read, Write}, path::{Path, PathBuf}, - sync::{Arc, Mutex}, + sync::Arc, time::SystemTime, }; +use parking_lot::Mutex; use priority_queue::PriorityQueue; use thiserror::Error; @@ -187,50 +188,42 @@ impl FsSizeLimiter { } } - fn add(&self, file: &Path, size: u64) -> Result<(), Error> { - self.limiter - .lock() - .unwrap() - .add(file, size, SystemTime::now()); - Ok(()) + fn add(&self, file: &Path, size: u64) { + self.limiter.lock().add(file, size, SystemTime::now()); } - fn touch(&self, file: &Path) -> Result { - Ok(self.limiter.lock().unwrap().update(file, SystemTime::now())) + fn touch(&self, file: &Path) -> bool { + self.limiter.lock().update(file, SystemTime::now()) } - fn remove(&self, file: &Path) -> Result { - Ok(self.limiter.lock().unwrap().remove(file)) + fn remove(&self, file: &Path) -> bool { + self.limiter.lock().remove(file) } - fn prune_internal Result, Error>>( - mut pop: F, - ) -> Result<(), Error> { + fn prune_internal Option>(mut pop: F) -> Result<(), Error> { let mut first = true; let mut count = 0; let mut last_error = None; - while let Ok(result) = pop() { - if let Some(file) = result { - if first { - debug!("Cache dir exceeds limit, removing least recently used files."); - first = false; - } - - let res = fs::remove_file(&file); - if let Err(e) = res { - warn!("Could not remove file {:?} from cache dir: {}", file, e); - last_error = Some(e); - } else { - count += 1; - } + while let Some(file) = pop() { + if first { + debug!("Cache dir exceeds limit, removing least recently used files."); + first = false; } - if count > 0 { - info!("Removed {} cache files.", count); + let res = fs::remove_file(&file); + if let Err(e) = res { + warn!("Could not remove file {:?} from cache dir: {}", file, e); + last_error = Some(e); + } else { + count += 1; } } + if count > 0 { + info!("Removed {} cache files.", count); + } + if let Some(err) = last_error { Err(err.into()) } else { @@ -239,14 +232,14 @@ impl FsSizeLimiter { } fn prune(&self) -> Result<(), Error> { - Self::prune_internal(|| Ok(self.limiter.lock().unwrap().pop())) + Self::prune_internal(|| self.limiter.lock().pop()) } fn new(path: &Path, limit: u64) -> Result { let mut limiter = SizeLimiter::new(limit); Self::init_dir(&mut limiter, path); - Self::prune_internal(|| Ok(limiter.pop()))?; + Self::prune_internal(|| limiter.pop())?; Ok(Self { limiter: Mutex::new(limiter), @@ -388,8 +381,8 @@ impl Cache { match File::open(&path) { Ok(file) => { if let Some(limiter) = self.size_limiter.as_deref() { - if let Err(e) = limiter.touch(&path) { - error!("limiter could not touch {:?}: {}", path, e); + if !limiter.touch(&path) { + error!("limiter could not touch {:?}", path); } } Some(file) @@ -411,8 +404,8 @@ impl Cache { .and_then(|mut file| io::copy(contents, &mut file)) { if let Some(limiter) = self.size_limiter.as_deref() { - limiter.add(&path, size)?; - limiter.prune()? + limiter.add(&path, size); + limiter.prune()?; } return Ok(()); } @@ -426,7 +419,7 @@ impl Cache { fs::remove_file(&path)?; if let Some(limiter) = self.size_limiter.as_deref() { - limiter.remove(&path)?; + limiter.remove(&path); } Ok(()) diff --git a/core/src/component.rs b/core/src/component.rs index aa1da840..ebe42e8d 100644 --- a/core/src/component.rs +++ b/core/src/component.rs @@ -1,20 +1,20 @@ macro_rules! component { ($name:ident : $inner:ident { $($key:ident : $ty:ty = $value:expr,)* }) => { #[derive(Clone)] - pub struct $name(::std::sync::Arc<($crate::session::SessionWeak, ::std::sync::Mutex<$inner>)>); + pub struct $name(::std::sync::Arc<($crate::session::SessionWeak, ::parking_lot::Mutex<$inner>)>); impl $name { #[allow(dead_code)] pub(crate) fn new(session: $crate::session::SessionWeak) -> $name { debug!(target:"librespot::component", "new {}", stringify!($name)); - $name(::std::sync::Arc::new((session, ::std::sync::Mutex::new($inner { + $name(::std::sync::Arc::new((session, ::parking_lot::Mutex::new($inner { $($key : $value,)* })))) } #[allow(dead_code)] fn lock R, R>(&self, f: F) -> R { - let mut inner = (self.0).1.lock().unwrap(); + let mut inner = (self.0).1.lock(); f(&mut inner) } diff --git a/core/src/dealer/mod.rs b/core/src/dealer/mod.rs index ac19fd6d..c1a9c94d 100644 --- a/core/src/dealer/mod.rs +++ b/core/src/dealer/mod.rs @@ -6,7 +6,7 @@ use std::{ pin::Pin, sync::{ atomic::{self, AtomicBool}, - Arc, Mutex, + Arc, }, task::Poll, time::Duration, @@ -14,6 +14,7 @@ use std::{ use futures_core::{Future, Stream}; use futures_util::{future::join_all, SinkExt, StreamExt}; +use parking_lot::Mutex; use thiserror::Error; use tokio::{ select, @@ -310,7 +311,6 @@ impl DealerShared { if let Some(split) = split_uri(&msg.uri) { self.message_handlers .lock() - .unwrap() .retain(split, &mut |tx| tx.send(msg.clone()).is_ok()); } } @@ -330,7 +330,7 @@ impl DealerShared { }; { - let handler_map = self.request_handlers.lock().unwrap(); + let handler_map = self.request_handlers.lock(); if let Some(handler) = handler_map.get(split) { handler.handle_request(request, responder); @@ -349,7 +349,9 @@ impl DealerShared { } async fn closed(&self) { - self.notify_drop.acquire().await.unwrap_err(); + if self.notify_drop.acquire().await.is_ok() { + error!("should never have gotten a permit"); + } } fn is_closed(&self) -> bool { @@ -367,19 +369,15 @@ impl Dealer { where H: RequestHandler, { - add_handler( - &mut self.shared.request_handlers.lock().unwrap(), - uri, - handler, - ) + add_handler(&mut self.shared.request_handlers.lock(), uri, handler) } pub fn remove_handler(&self, uri: &str) -> Option> { - remove_handler(&mut self.shared.request_handlers.lock().unwrap(), uri) + remove_handler(&mut self.shared.request_handlers.lock(), uri) } pub fn subscribe(&self, uris: &[&str]) -> Result { - subscribe(&mut self.shared.message_handlers.lock().unwrap(), uris) + subscribe(&mut self.shared.message_handlers.lock(), uris) } pub async fn close(mut self) { diff --git a/core/src/session.rs b/core/src/session.rs index 72805551..f1136e53 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -6,7 +6,7 @@ use std::{ process::exit, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, RwLock, Weak, + Arc, Weak, }, task::{Context, Poll}, time::{SystemTime, UNIX_EPOCH}, @@ -18,6 +18,7 @@ use futures_core::TryStream; use futures_util::{future, ready, StreamExt, TryStreamExt}; use num_traits::FromPrimitive; use once_cell::sync::OnceCell; +use parking_lot::RwLock; use quick_xml::events::Event; use thiserror::Error; use tokio::sync::mpsc; @@ -138,8 +139,7 @@ impl Session { connection::authenticate(&mut transport, credentials, &session.config().device_id) .await?; info!("Authenticated as \"{}\" !", reusable_credentials.username); - session.0.data.write().unwrap().user_data.canonical_username = - reusable_credentials.username.clone(); + session.0.data.write().user_data.canonical_username = reusable_credentials.username.clone(); if let Some(cache) = session.cache() { cache.save_credentials(&reusable_credentials); } @@ -200,7 +200,7 @@ impl Session { } pub fn time_delta(&self) -> i64 { - self.0.data.read().unwrap().time_delta + self.0.data.read().time_delta } pub fn spawn(&self, task: T) @@ -253,7 +253,7 @@ impl Session { } .as_secs() as i64; - self.0.data.write().unwrap().time_delta = server_timestamp - timestamp; + self.0.data.write().time_delta = server_timestamp - timestamp; self.debug_info(); self.send_packet(Pong, vec![0, 0, 0, 0]) @@ -261,7 +261,7 @@ impl Session { Some(CountryCode) => { let country = String::from_utf8(data.as_ref().to_owned())?; info!("Country: {:?}", country); - self.0.data.write().unwrap().user_data.country = country; + self.0.data.write().user_data.country = country; Ok(()) } Some(StreamChunkRes) | Some(ChannelError) => self.channel().dispatch(cmd, data), @@ -306,7 +306,7 @@ impl Session { trace!("Received product info: {:#?}", user_attributes); Self::check_catalogue(&user_attributes); - self.0.data.write().unwrap().user_data.attributes = user_attributes; + self.0.data.write().user_data.attributes = user_attributes; Ok(()) } Some(PongAck) @@ -335,7 +335,7 @@ impl Session { } pub fn user_data(&self) -> UserData { - self.0.data.read().unwrap().user_data.clone() + self.0.data.read().user_data.clone() } pub fn device_id(&self) -> &str { @@ -343,21 +343,15 @@ impl Session { } pub fn connection_id(&self) -> String { - self.0.data.read().unwrap().connection_id.clone() + self.0.data.read().connection_id.clone() } pub fn set_connection_id(&self, connection_id: String) { - self.0.data.write().unwrap().connection_id = connection_id; + self.0.data.write().connection_id = connection_id; } pub fn username(&self) -> String { - self.0 - .data - .read() - .unwrap() - .user_data - .canonical_username - .clone() + self.0.data.read().user_data.canonical_username.clone() } pub fn set_user_attribute(&self, key: &str, value: &str) -> Option { @@ -368,7 +362,6 @@ impl Session { self.0 .data .write() - .unwrap() .user_data .attributes .insert(key.to_owned(), value.to_owned()) @@ -377,13 +370,7 @@ impl Session { pub fn set_user_attributes(&self, attributes: UserAttributes) { Self::check_catalogue(&attributes); - self.0 - .data - .write() - .unwrap() - .user_data - .attributes - .extend(attributes) + self.0.data.write().user_data.attributes.extend(attributes) } fn weak(&self) -> SessionWeak { @@ -395,14 +382,14 @@ impl Session { } pub fn shutdown(&self) { - debug!("Invalidating session[{}]", self.0.session_id); - self.0.data.write().unwrap().invalid = true; + debug!("Invalidating session [{}]", self.0.session_id); + self.0.data.write().invalid = true; self.mercury().shutdown(); self.channel().shutdown(); } pub fn is_invalid(&self) -> bool { - self.0.data.read().unwrap().invalid + self.0.data.read().invalid } } @@ -415,7 +402,8 @@ impl SessionWeak { } pub(crate) fn upgrade(&self) -> Session { - self.try_upgrade().expect("Session died") // TODO + self.try_upgrade() + .expect("session was dropped and so should have this component") } } diff --git a/discovery/Cargo.toml b/discovery/Cargo.toml index 7edd934a..a5c56bbb 100644 --- a/discovery/Cargo.toml +++ b/discovery/Cargo.toml @@ -15,14 +15,14 @@ form_urlencoded = "1.0" futures-core = "0.3" futures-util = "0.3" hmac = "0.11" -hyper = { version = "0.14", features = ["server", "http1", "tcp"] } +hyper = { version = "0.14", features = ["http1", "server", "tcp"] } libmdns = "0.6" log = "0.4" rand = "0.8" serde_json = "1.0.25" sha-1 = "0.9" thiserror = "1.0" -tokio = { version = "1.0", features = ["sync", "rt"] } +tokio = { version = "1.0", features = ["parking_lot", "sync", "rt"] } dns-sd = { version = "0.1.3", optional = true } @@ -34,7 +34,7 @@ version = "0.3.1" [dev-dependencies] futures = "0.3" hex = "0.4" -tokio = { version = "1.0", features = ["macros", "rt"] } +tokio = { version = "1.0", features = ["macros", "parking_lot", "rt"] } [features] with-dns-sd = ["dns-sd"] diff --git a/playback/Cargo.toml b/playback/Cargo.toml index 1cd589a5..fee4dd51 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -24,7 +24,7 @@ log = "0.4" byteorder = "1.4" shell-words = "1.0.0" thiserror = "1.0" -tokio = { version = "1", features = ["rt", "rt-multi-thread", "sync"] } +tokio = { version = "1", features = ["parking_lot", "rt", "rt-multi-thread", "sync"] } zerocopy = { version = "0.3" } # Backends