From e9dc9cd8393443633bdf0c2d237ad7bfa682012c Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Fri, 19 Mar 2021 23:09:38 +0100 Subject: [PATCH 1/3] Add size limit to cache --- Cargo.lock | 27 ++++++ core/Cargo.toml | 1 + core/src/cache.rs | 204 ++++++++++++++++++++++++++++++++++------- playback/src/player.rs | 3 +- src/main.rs | 2 +- 5 files changed, 202 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50778598..dfb1aa95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -727,6 +727,12 @@ dependencies = [ "system-deps", ] +[[package]] +name = "hashbrown" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" + [[package]] name = "headers" version = "0.3.4" @@ -912,6 +918,16 @@ dependencies = [ "libc", ] +[[package]] +name = "indexmap" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824845a0bf897a9042383849b02c1bc219c2383772efcd5c6f9766fa4b81aef3" +dependencies = [ + "autocfg", + "hashbrown", +] + [[package]] name = "instant" version = "0.1.9" @@ -1207,6 +1223,7 @@ dependencies = [ "num-traits", "once_cell", "pbkdf2", + "priority-queue", "protobuf", "rand", "serde", @@ -1723,6 +1740,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc5c99d529f0d30937f6f4b8a86d988047327bb88d04d2c4afc356de74722131" +[[package]] +name = "priority-queue" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f16f1277a63996195ef38361e2c909314614c6f25f2ac4968f87dfd94a625d3d" +dependencies = [ + "autocfg", + "indexmap", +] + [[package]] name = "proc-macro-crate" version = "0.1.5" diff --git a/core/Cargo.toml b/core/Cargo.toml index 29f4f332..8700875a 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -31,6 +31,7 @@ num-integer = "0.1" num-traits = "0.2" once_cell = "1.5.2" pbkdf2 = { version = "0.7", default-features = false, features = ["hmac"] } +priority-queue = "1.1" protobuf = "~2.14.0" rand = "0.8" serde = { version = "1.0", features = ["derive"] } diff --git a/core/src/cache.rs b/core/src/cache.rs index 55c9ab01..d96e79a2 100644 --- a/core/src/cache.rs +++ b/core/src/cache.rs @@ -1,30 +1,173 @@ -use std::fs; -use std::fs::File; +use std::cmp::Reverse; +use std::collections::HashMap; +use std::fs::{self, File}; use std::io::{self, Error, ErrorKind, Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::SystemTime; + +use priority_queue::PriorityQueue; use crate::authentication::Credentials; use crate::spotify_id::FileId; +struct SizeLimiter { + queue: PriorityQueue>, + sizes: HashMap, + size_limit: u64, + in_use: u64, +} + +impl SizeLimiter { + fn new(limit: u64) -> Self { + Self { + queue: PriorityQueue::new(), + sizes: HashMap::new(), + size_limit: limit, + in_use: 0, + } + } + + /// Adds an entry to this data structure. + /// + /// If this file is already contained, it will be updated accordingly. + fn add(&mut self, file: &Path, size: u64, accessed: SystemTime) { + self.in_use += size; + self.queue.push(file.to_owned(), Reverse(accessed)); + if let Some(old_size) = self.sizes.insert(file.to_owned(), size) { + // It's important that decreasing happens after + // increasing the size, to prevent an overflow. + self.in_use -= old_size; + } + } + + /// Returns the least recently accessed file if the size of the cache exceeds + /// the limit. + /// + /// The entry is removed from the data structure, but the caller is responsible + /// to delete the file in the file system. + fn pop(&mut self) -> Option { + if self.in_use > self.size_limit { + let (next, _) = self.queue.pop()?; + // panic safety: It is guaranteed that `queue` and `sizes` have the same keys. + let size = self.sizes.remove(&next).unwrap(); + self.in_use -= size; + Some(next) + } else { + None + } + } + + fn update(&mut self, file: &Path, access_time: SystemTime) -> bool { + self.queue + .change_priority(file, Reverse(access_time)) + .is_some() + } + + fn remove(&mut self, file: &Path) { + if self.queue.remove(file).is_none() { + return; + } + let size = self.sizes.remove(file).unwrap(); + self.in_use -= size; + } +} + +struct FsSizeLimiter { + limiter: Mutex, +} + +impl FsSizeLimiter { + fn get_metadata(file: &Path) -> io::Result<(SystemTime, u64)> { + let metadata = file.metadata()?; + let access_time = metadata + .accessed() + .or_else(|_| metadata.created()) + .unwrap_or_else(|_| SystemTime::now()); + let size = metadata.len(); + + Ok((access_time, size)) + } + + fn init_dir(limiter: &mut SizeLimiter, path: &Path) { + for entry in fs::read_dir(path).into_iter().flatten().flatten() { + if let Ok(file_type) = entry.file_type() { + if file_type.is_dir() { + Self::init_dir(limiter, &entry.path()) + } else if file_type.is_file() { + let path = entry.path(); + if let Ok((access_time, size)) = Self::get_metadata(&path) { + limiter.add(&path, size, access_time); + } + } + } + } + } + + fn add(&self, file: &Path, size: u64) { + self.limiter + .lock() + .unwrap() + .add(file, size, SystemTime::now()); + } + + fn touch(&self, file: &Path) -> bool { + self.limiter.lock().unwrap().update(file, SystemTime::now()) + } + + fn remove(&self, file: &Path) { + self.limiter.lock().unwrap().remove(file); + } + + fn shrink(&self) { + while let Some(file) = self.limiter.lock().unwrap().pop() { + let _ = fs::remove_file(file); + } + } + + fn new(path: &Path, limit: u64) -> Self { + let mut limiter = SizeLimiter::new(limit); + Self::init_dir(&mut limiter, path); + + while let Some(file) = limiter.pop() { + let _ = fs::remove_file(file); + } + + Self { + limiter: Mutex::new(limiter), + } + } +} + /// A cache for volume, credentials and audio files. #[derive(Clone)] pub struct Cache { credentials_location: Option, volume_location: Option, audio_location: Option, + size_limiter: Option>, } +pub struct RemoveFileError(()); + impl Cache { pub fn new>( system_location: Option

, audio_location: Option

, + size_limit: Option, ) -> io::Result { if let Some(location) = &system_location { fs::create_dir_all(location)?; } + let mut size_limiter = None; + if let Some(location) = &audio_location { fs::create_dir_all(location)?; + if let Some(limit) = size_limit { + let limiter = FsSizeLimiter::new(location.as_ref(), limit); + size_limiter = Some(Arc::new(limiter)); + } } let audio_location = audio_location.map(|p| p.as_ref().to_owned()); @@ -37,6 +180,7 @@ impl Cache { credentials_location, volume_location, audio_location, + size_limiter, }; Ok(cache) @@ -121,13 +265,21 @@ impl Cache { } pub fn file(&self, file: FileId) -> Option { - File::open(self.file_path(file)?) - .map_err(|e| { + let path = self.file_path(file)?; + match File::open(&path) { + Ok(file) => { + if let Some(limiter) = self.size_limiter.as_deref() { + limiter.touch(&path); + } + Some(file) + } + Err(e) => { if e.kind() != ErrorKind::NotFound { warn!("Error reading file from cache: {}", e) } - }) - .ok() + None + } + } } pub fn save_file(&self, file: FileId, contents: &mut F) { @@ -142,37 +294,25 @@ impl Cache { .and_then(|_| File::create(&path)) .and_then(|mut file| io::copy(contents, &mut file)); - if let Err(e) = result { - if e.kind() == ErrorKind::Other { - // Perhaps there's no space left in the cache - // TODO: try to narrow down the error (platform-dependently) - info!("An error occured while writing to cache, trying to flush the cache"); - - if fs::remove_dir_all(self.audio_location.as_ref().unwrap()) - .and_then(|_| fs::create_dir_all(parent)) - .and_then(|_| File::create(&path)) - .and_then(|mut file| io::copy(contents, &mut file)) - .is_ok() - { - // It worked, there's no need to print a warning - return; - } + if let Ok(size) = result { + if let Some(limiter) = self.size_limiter.as_deref() { + limiter.add(&path, size); + limiter.shrink(); } - - warn!("Cannot save file to cache: {}", e) } } - pub fn remove_file(&self, file: FileId) -> bool { - if let Some(path) = self.file_path(file) { - if let Err(err) = fs::remove_file(path) { - warn!("Unable to remove file from cache: {}", err); - false - } else { - true - } + pub fn remove_file(&self, file: FileId) -> Result<(), RemoveFileError> { + let path = self.file_path(file).ok_or(RemoveFileError(()))?; + + if let Err(err) = fs::remove_file(&path) { + warn!("Unable to remove file from cache: {}", err); + Err(RemoveFileError(())) } else { - false + if let Some(limiter) = self.size_limiter.as_deref() { + limiter.remove(&path); + } + Ok(()) } } } diff --git a/playback/src/player.rs b/playback/src/player.rs index 3f0778f9..e13d9cbc 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -793,8 +793,7 @@ impl PlayerTrackLoader { e ); - // unwrap safety: The file is cached, so session must have a cache - if !self.session.cache().unwrap().remove_file(file_id) { + if self.session.cache().unwrap().remove_file(file_id).is_err() { return None; } diff --git a/src/main.rs b/src/main.rs index 78e7e2f9..7f5ecdbd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -367,7 +367,7 @@ fn get_setup(args: &[String]) -> Setup { .map(|p| p.into()); } - match Cache::new(system_dir, audio_dir) { + match Cache::new(system_dir, audio_dir, Some(50_000_000)) { Ok(cache) => Some(cache), Err(e) => { warn!("Cannot create cache: {}", e); From de6bc32deac30afd681a656ca9ea1464af18e1e0 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Sat, 10 Apr 2021 19:00:25 +0200 Subject: [PATCH 2/3] Add documentation, logging and tests --- core/src/cache.rs | 182 ++++++++++++++++++++++++++++++++++++----- playback/src/player.rs | 8 +- 2 files changed, 167 insertions(+), 23 deletions(-) diff --git a/core/src/cache.rs b/core/src/cache.rs index d96e79a2..612b7c39 100644 --- a/core/src/cache.rs +++ b/core/src/cache.rs @@ -11,6 +11,9 @@ use priority_queue::PriorityQueue; use crate::authentication::Credentials; use crate::spotify_id::FileId; +/// Some kind of data structure that holds some paths, the size of these files and a timestamp. +/// It keeps track of the file sizes and is able to pop the path with the oldest timestamp if +/// a given limit is exceeded. struct SizeLimiter { queue: PriorityQueue>, sizes: HashMap, @@ -19,6 +22,7 @@ struct SizeLimiter { } impl SizeLimiter { + /// Creates a new instance with the given size limit. fn new(limit: u64) -> Self { Self { queue: PriorityQueue::new(), @@ -41,16 +45,26 @@ impl SizeLimiter { } } + /// Returns true if the limit is exceeded. + fn exceeds_limit(&self) -> bool { + self.in_use > self.size_limit + } + /// Returns the least recently accessed file if the size of the cache exceeds /// the limit. /// /// The entry is removed from the data structure, but the caller is responsible /// to delete the file in the file system. fn pop(&mut self) -> Option { - if self.in_use > self.size_limit { - let (next, _) = self.queue.pop()?; - // panic safety: It is guaranteed that `queue` and `sizes` have the same keys. - let size = self.sizes.remove(&next).unwrap(); + if self.exceeds_limit() { + let (next, _) = self + .queue + .pop() + .expect("in_use was > 0, so the queue should have contained an item."); + let size = self + .sizes + .remove(&next) + .expect("`queue` and `sizes` should have the same keys."); self.in_use -= size; Some(next) } else { @@ -58,18 +72,26 @@ impl SizeLimiter { } } + /// Updates the timestamp of an existing element. Returns `true` if the item did exist. fn update(&mut self, file: &Path, access_time: SystemTime) -> bool { self.queue .change_priority(file, Reverse(access_time)) .is_some() } - fn remove(&mut self, file: &Path) { + /// Removes an element with the specified path. Returns `true` if the item did exist. + fn remove(&mut self, file: &Path) -> bool { if self.queue.remove(file).is_none() { - return; + return false; } - let size = self.sizes.remove(file).unwrap(); + + let size = self + .sizes + .remove(file) + .expect("`queue` and `sizes` should have the same keys."); self.in_use -= size; + + true } } @@ -78,29 +100,75 @@ struct FsSizeLimiter { } impl FsSizeLimiter { + /// Returns access time and file size of a given path. fn get_metadata(file: &Path) -> io::Result<(SystemTime, u64)> { let metadata = file.metadata()?; + + // The first of the following timestamps which is available will be chosen as access time: + // 1. Access time + // 2. Modification time + // 3. Creation time + // 4. Current time let access_time = metadata .accessed() + .or_else(|_| metadata.modified()) .or_else(|_| metadata.created()) .unwrap_or_else(|_| SystemTime::now()); + let size = metadata.len(); Ok((access_time, size)) } + /// Recursively search a directory for files and add them to the `limiter` struct. fn init_dir(limiter: &mut SizeLimiter, path: &Path) { - for entry in fs::read_dir(path).into_iter().flatten().flatten() { - if let Ok(file_type) = entry.file_type() { - if file_type.is_dir() { + let list_dir = match fs::read_dir(path) { + Ok(list_dir) => list_dir, + Err(e) => { + warn!("Could not read directory {:?} in cache dir: {}", path, e); + return; + } + }; + + for entry in list_dir { + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + warn!("Could not directory {:?} in cache dir: {}", path, e); + return; + } + }; + + match entry.file_type() { + Ok(file_type) if file_type.is_dir() || file_type.is_symlink() => { Self::init_dir(limiter, &entry.path()) - } else if file_type.is_file() { + } + Ok(file_type) if file_type.is_file() => { let path = entry.path(); - if let Ok((access_time, size)) = Self::get_metadata(&path) { - limiter.add(&path, size, access_time); + match Self::get_metadata(&path) { + Ok((access_time, size)) => { + limiter.add(&path, size, access_time); + } + Err(e) => { + warn!("Could not read file {:?} in cache dir: {}", path, e) + } } } - } + Ok(ft) => { + warn!( + "File {:?} in cache dir has unsupported type {:?}", + entry.path(), + ft + ) + } + Err(e) => { + warn!( + "Could not get type of file {:?} in cache dir: {}", + entry.path(), + e + ) + } + }; } } @@ -119,19 +187,37 @@ impl FsSizeLimiter { self.limiter.lock().unwrap().remove(file); } - fn shrink(&self) { - while let Some(file) = self.limiter.lock().unwrap().pop() { - let _ = fs::remove_file(file); + fn prune_internal Option>(mut pop: F) { + let mut first = true; + let mut count = 0; + + while let Some(file) = pop() { + if first { + debug!("Cache dir exceeds limit, removing least recently used files."); + first = false; + } + + if let Err(e) = fs::remove_file(&file) { + warn!("Could not remove file {:?} from cache dir: {}", file, e); + } else { + count += 1; + } } + + if count > 0 { + info!("Removed {} cache files.", count); + } + } + + fn prune(&self) { + Self::prune_internal(|| self.limiter.lock().unwrap().pop()) } fn new(path: &Path, limit: u64) -> Self { let mut limiter = SizeLimiter::new(limit); - Self::init_dir(&mut limiter, path); - while let Some(file) = limiter.pop() { - let _ = fs::remove_file(file); - } + Self::init_dir(&mut limiter, path); + Self::prune_internal(|| limiter.pop()); Self { limiter: Mutex::new(limiter), @@ -297,7 +383,7 @@ impl Cache { if let Ok(size) = result { if let Some(limiter) = self.size_limiter.as_deref() { limiter.add(&path, size); - limiter.shrink(); + limiter.prune(); } } } @@ -316,3 +402,55 @@ impl Cache { } } } + +#[cfg(test)] +mod test { + use super::*; + use std::time::Duration; + + fn ordered_time(v: u64) -> SystemTime { + SystemTime::UNIX_EPOCH + Duration::from_secs(v) + } + + #[test] + fn test_size_limiter() { + let mut limiter = SizeLimiter::new(1000); + + limiter.add(Path::new("a"), 500, ordered_time(2)); + limiter.add(Path::new("b"), 500, ordered_time(1)); + + // b (500) -> a (500) => sum: 1000 <= 1000 + assert!(!limiter.exceeds_limit()); + assert_eq!(limiter.pop(), None); + + limiter.add(Path::new("c"), 1000, ordered_time(3)); + + // b (500) -> a (500) -> c (1000) => sum: 2000 > 1000 + assert!(limiter.exceeds_limit()); + assert_eq!(limiter.pop().as_deref(), Some(Path::new("b"))); + // a (500) -> c (1000) => sum: 1500 > 1000 + assert_eq!(limiter.pop().as_deref(), Some(Path::new("a"))); + // c (1000) => sum: 1000 <= 1000 + assert_eq!(limiter.pop().as_deref(), None); + + limiter.add(Path::new("d"), 5, ordered_time(2)); + // d (5) -> c (1000) => sum: 1005 > 1000 + assert_eq!(limiter.pop().as_deref(), Some(Path::new("d"))); + // c (1000) => sum: 1000 <= 1000 + assert_eq!(limiter.pop().as_deref(), None); + + // Test updating + + limiter.add(Path::new("e"), 500, ordered_time(3)); + // c (1000) -> e (500) => sum: 1500 > 1000 + assert!(limiter.update(Path::new("c"), ordered_time(4))); + // e (500) -> c (1000) => sum: 1500 > 1000 + assert_eq!(limiter.pop().as_deref(), Some(Path::new("e"))); + // c (1000) => sum: 1000 <= 1000 + + // Test removing + limiter.add(Path::new("f"), 500, ordered_time(2)); + assert!(limiter.remove(Path::new("c"))); + assert!(!limiter.exceeds_limit()); + } +} diff --git a/playback/src/player.rs b/playback/src/player.rs index e13d9cbc..2daf4ff5 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -793,7 +793,13 @@ impl PlayerTrackLoader { e ); - if self.session.cache().unwrap().remove_file(file_id).is_err() { + if self + .session + .cache() + .expect("If the audio file is cached, a cache should exist") + .remove_file(file_id) + .is_err() + { return None; } From e355d4a4f16affaf711b4f5944b4b0bd7da8045b Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Tue, 13 Apr 2021 15:16:48 +0200 Subject: [PATCH 3/3] Add cache-size-limit command line argument --- Cargo.lock | 1 + Cargo.toml | 1 + src/main.rs | 83 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 84 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index dfb1aa95..f8f5a4e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1147,6 +1147,7 @@ dependencies = [ "log", "rpassword", "sha-1", + "thiserror", "tokio", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 14e33a83..1352a7a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ hex = "0.4" hyper = "0.14" log = "0.4" rpassword = "5.0" +thiserror = "1.0" tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "sync", "process"] } url = "2.1" sha-1 = "0.9" diff --git a/src/main.rs b/src/main.rs index 7f5ecdbd..9a9556f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ use futures_util::{future, FutureExt, StreamExt}; use librespot_playback::player::PlayerEvent; use log::{error, info, warn}; use sha1::{Digest, Sha1}; +use thiserror::Error; use tokio::sync::mpsc::UnboundedReceiver; use url::Url; @@ -98,6 +99,66 @@ pub fn get_credentials Option>( } } +#[derive(Debug, Error)] +pub enum ParseFileSizeError { + #[error("empty argument")] + EmptyInput, + #[error("invalid suffix")] + InvalidSuffix, + #[error("invalid number: {0}")] + InvalidNumber(#[from] std::num::ParseFloatError), + #[error("non-finite number specified")] + NotFinite(f64), +} + +pub fn parse_file_size(input: &str) -> Result { + use ParseFileSizeError::*; + + let mut iter = input.chars(); + let mut suffix = iter.next_back().ok_or(EmptyInput)?; + let mut suffix_len = 0; + + let iec = matches!(suffix, 'i' | 'I'); + + if iec { + suffix_len += 1; + suffix = iter.next_back().ok_or(InvalidSuffix)?; + } + + let base: u64 = if iec { 1024 } else { 1000 }; + + suffix_len += 1; + let exponent = match suffix.to_ascii_uppercase() { + '0'..='9' if !iec => { + suffix_len -= 1; + 0 + } + 'K' => 1, + 'M' => 2, + 'G' => 3, + 'T' => 4, + 'P' => 5, + 'E' => 6, + 'Z' => 7, + 'Y' => 8, + _ => return Err(InvalidSuffix), + }; + + let num = { + let mut iter = input.chars(); + + for _ in (&mut iter).rev().take(suffix_len) {} + + iter.as_str().parse::()? + }; + + if !num.is_finite() { + return Err(NotFinite(num)); + } + + Ok((num * base.pow(exponent) as f64) as u64) +} + fn print_version() { println!( "librespot {semver} {sha} (Built on {build_date}, Build ID: {build_id})", @@ -140,6 +201,11 @@ fn get_setup(args: &[String]) -> Setup { "system-cache", "Path to a directory where system files (credentials, volume) will be cached. Can be different from cache option value", "SYTEMCACHE", + ).optopt( + "", + "cache-size-limit", + "Limits the size of the cache for audio files.", + "CACHE_SIZE_LIMIT" ).optflag("", "disable-audio-cache", "Disable caching of the audio data.") .optopt("n", "name", "Device name", "NAME") .optopt("", "device-type", "Displayed device type", "DEVICE_TYPE") @@ -367,7 +433,22 @@ fn get_setup(args: &[String]) -> Setup { .map(|p| p.into()); } - match Cache::new(system_dir, audio_dir, Some(50_000_000)) { + let limit = if audio_dir.is_some() { + matches + .opt_str("cache-size-limit") + .as_deref() + .map(parse_file_size) + .map(|e| { + e.unwrap_or_else(|e| { + eprintln!("Invalid argument passed as cache size limit: {}", e); + exit(1); + }) + }) + } else { + None + }; + + match Cache::new(system_dir, audio_dir, limit) { Ok(cache) => Some(cache), Err(e) => { warn!("Cannot create cache: {}", e);