From e452abce43decf95b2ee986e19fbfdbcf398461d Mon Sep 17 00:00:00 2001 From: Paul Lietar Date: Tue, 7 Jul 2015 22:40:31 +0100 Subject: [PATCH] Cache audio files to disk. --- Cargo.lock | 22 ++- Cargo.toml | 1 + src/audio_file.rs | 336 +++++++++++++++++++++++------------------ src/audio_key.rs | 4 +- src/main.rs | 10 +- src/metadata.rs | 4 +- src/session.rs | 6 +- src/stream.rs | 2 +- src/util/mod.rs | 14 ++ src/util/spotify_id.rs | 14 +- src/util/zerofile.rs | 44 ++++++ 11 files changed, 293 insertions(+), 164 deletions(-) create mode 100644 src/util/zerofile.rs diff --git a/Cargo.lock b/Cargo.lock index 56283849..bcfda472 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,7 +2,7 @@ name = "librespot" version = "0.1.0" dependencies = [ - "byteorder 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", "lazy_static 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)", "librespot-protocol 0.1.0", "mod_path 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", @@ -15,6 +15,7 @@ dependencies = [ "rust-crypto 0.2.31 (registry+https://github.com/rust-lang/crates.io-index)", "rust-gmp 0.2.0 (git+https://github.com/plietar/rust-gmp.git)", "shannon 0.1.0 (git+https://github.com/plietar/rust-shannon.git)", + "tempfile 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)", "vergen 0.0.13 (registry+https://github.com/rust-lang/crates.io-index)", "vorbis 0.0.11 (git+https://github.com/plietar/vorbis-rs)", @@ -27,12 +28,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "bitflags" -version = "0.3.0" +version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "byteorder" -version = "0.3.10" +version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] @@ -168,7 +169,7 @@ name = "shannon" version = "0.1.0" source = "git+https://github.com/plietar/rust-shannon.git#c6be8a879a523a77d81c50df46faa891b76fea25" dependencies = [ - "byteorder 0.3.10 (registry+https://github.com/rust-lang/crates.io-index)", + "byteorder 0.3.11 (registry+https://github.com/rust-lang/crates.io-index)", "readall 0.1.0 (git+https://github.com/plietar/rust-readall.git)", "shannon-sys 0.1.0 (git+https://github.com/plietar/rust-shannon.git)", ] @@ -181,6 +182,17 @@ dependencies = [ "gcc 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "tempfile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "kernel32-sys 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", + "libc 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", + "winapi 0.1.23 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "time" version = "0.1.30" @@ -196,7 +208,7 @@ name = "vergen" version = "0.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "bitflags 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", + "bitflags 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "time 0.1.30 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index 4dae94e4..fd8012e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ rand = "*" lazy_static = "0.1.*" rust-crypto = "*" time = "*" +tempfile = "*" [dependencies.protobuf] git = "https://github.com/plietar/rust-protobuf.git" diff --git a/src/audio_file.rs b/src/audio_file.rs index 5b736e58..b84f5f84 100644 --- a/src/audio_file.rs +++ b/src/audio_file.rs @@ -1,21 +1,31 @@ use byteorder::{ByteOrder, BigEndian}; use std::cmp::min; -use std::collections::{BitSet, HashMap}; -use std::io::{self, SeekFrom}; -use std::slice::bytes::copy_memory; +use std::collections::BitSet; use std::sync::{Arc, Condvar, Mutex}; use std::sync::mpsc::{self, TryRecvError}; use std::thread; +use std::fs; +use std::io::{self, Read, Write, Seek, SeekFrom}; +use std::path::PathBuf; +use tempfile::TempFile; use stream::StreamEvent; -use util::FileId; +use util::{FileId, IgnoreExt, ZeroFile, mkdir_existing}; use session::Session; -const CHUNK_SIZE : usize = 0x10000; +const CHUNK_SIZE : usize = 0x20000; -pub struct AudioFile<'s> { - position: usize, +pub enum AudioFile<'s> { + Direct(fs::File), + Loading(AudioFileLoading<'s>) +} + +struct AudioFileLoading<'s> { + read_file: TempFile, + + position: u64, seek: mpsc::Sender, + shared: Arc, #[allow(dead_code)] @@ -25,123 +35,17 @@ pub struct AudioFile<'s> { struct AudioFileShared { file_id: FileId, size: usize, - data: Mutex, - cond: Condvar + chunk_count: usize, + cond: Condvar, + bitmap: Mutex, } -struct AudioFileData { - buffer: Vec, - bitmap: BitSet, -} +impl <'s> AudioFileLoading<'s> { + fn new(session: &Session, file_id: FileId) -> AudioFileLoading { + let mut files_iter = TempFile::shared(2).unwrap().into_iter(); + let read_file = files_iter.next().unwrap(); + let mut write_file = files_iter.next().unwrap(); -impl <'s> AudioFile <'s> { - fn new(session: &Session, shared: Arc) -> AudioFile { - let shared_ = shared.clone(); - let (seek_tx, seek_rx) = mpsc::channel(); - - let file = AudioFile { - thread: thread::scoped( move || { AudioFile::fetch(session, shared_, seek_rx); }), - position: 0, - seek: seek_tx, - shared: shared, - }; - - file - } - - fn fetch_chunk(session: &Session, shared: &Arc, index: usize) { - let rx = session.stream(shared.file_id, - (index * CHUNK_SIZE / 4) as u32, - (CHUNK_SIZE / 4) as u32); - - let mut offset = 0usize; - for event in rx.iter() { - match event { - StreamEvent::Header(_,_) => (), - StreamEvent::Data(data) => { - let mut handle = shared.data.lock().unwrap(); - copy_memory(&data, &mut handle.buffer[index * CHUNK_SIZE + offset ..]); - offset += data.len(); - - if offset >= CHUNK_SIZE { - break - } - } - } - } - - { - let mut handle = shared.data.lock().unwrap(); - handle.bitmap.insert(index as usize); - shared.cond.notify_all(); - } - } - - fn fetch(session: &Session, shared: Arc, seek: mpsc::Receiver) { - let mut index = 0; - loop { - index = if index * CHUNK_SIZE < shared.size { - match seek.try_recv() { - Ok(position) => position as usize / CHUNK_SIZE, - Err(TryRecvError::Empty) => index, - Err(TryRecvError::Disconnected) => break - } - } else { - match seek.recv() { - Ok(position) => position as usize / CHUNK_SIZE, - Err(_) => break - } - }; - - { - let handle = shared.data.lock().unwrap(); - while handle.bitmap.contains(&index) && index * CHUNK_SIZE < shared.size { - index += 1; - } - } - - if index * CHUNK_SIZE < shared.size { - AudioFile::fetch_chunk(session, &shared, index) - } - } - } -} - -impl <'s> io::Read for AudioFile <'s> { - fn read(&mut self, output: &mut [u8]) -> io::Result { - let index = self.position / CHUNK_SIZE; - let offset = self.position % CHUNK_SIZE; - let len = min(output.len(), CHUNK_SIZE-offset); - - let mut handle = self.shared.data.lock().unwrap(); - - while !handle.bitmap.contains(&index) { - handle = self.shared.cond.wait(handle).unwrap(); - } - - copy_memory(&handle.buffer[self.position..self.position+len], output); - self.position += len; - - Ok(len) - } -} - -impl <'s> io::Seek for AudioFile <'s> { - fn seek(&mut self, pos: io::SeekFrom) -> io::Result { - let newpos = match pos { - SeekFrom::Start(offset) => offset as i64, - SeekFrom::End(offset) => self.shared.size as i64 + offset, - SeekFrom::Current(offset) => self.position as i64 + offset, - }; - - self.position = min(newpos as usize, self.shared.size); - self.seek.send(self.position as u64).unwrap(); - Ok(self.position as u64) - } -} - -impl AudioFileShared { - fn new(session: &Session, file_id: FileId) -> Arc { let size = session.stream(file_id, 0, 1).into_iter() .filter_map(|event| { match event { @@ -152,42 +56,178 @@ impl AudioFileShared { } }).next().unwrap(); - let bufsize = size + (CHUNK_SIZE - size % CHUNK_SIZE); + let chunk_count = (size + CHUNK_SIZE / 2) / CHUNK_SIZE; - Arc::new(AudioFileShared { + let shared = Arc::new(AudioFileShared { file_id: file_id, size: size, - data: Mutex::new(AudioFileData { - buffer: vec![0u8; bufsize], - bitmap: BitSet::with_capacity(bufsize / CHUNK_SIZE as usize) - }), + chunk_count: chunk_count, cond: Condvar::new(), - }) - } -} + bitmap: Mutex::new(BitSet::with_capacity(chunk_count)), + }); -pub struct AudioFileManager { - cache: HashMap> -} + io::copy(&mut ZeroFile::new(size as u64), &mut write_file).unwrap(); -impl AudioFileManager { - pub fn new() -> AudioFileManager { - AudioFileManager { - cache: HashMap::new() + let (seek_tx, seek_rx) = mpsc::channel(); + + AudioFileLoading { + read_file: read_file, + + position: 0, + seek: seek_tx, + + shared: shared.clone(), + + thread: thread::scoped(move || { + AudioFileLoading::fetch(session, shared, write_file, seek_rx) + }) } } - pub fn request<'a> (&mut self, session: &'a Session, file_id: FileId) -> AudioFile<'a> { - let shared = self.cache - .get(&file_id) - .cloned() - .unwrap_or_else(|| { - println!("Cache miss"); - let shared = AudioFileShared::new(session, file_id.clone()); - self.cache.insert(file_id, shared.clone()); - shared - }); - AudioFile::new(session, shared) + fn fetch(session: &Session, shared: Arc, + mut write_file: TempFile, seek_rx: mpsc::Receiver) { + let mut index = 0; + + loop { + match seek_rx.try_recv() { + Ok(position) => { + index = position as usize / CHUNK_SIZE; + } + Err(TryRecvError::Disconnected) => break, + Err(TryRecvError::Empty) => (), + } + + let bitmap = shared.bitmap.lock().unwrap(); + if bitmap.len() >= shared.chunk_count { + drop(bitmap); + AudioFileLoading::store(session, &shared, &mut write_file); + break; + } + + while bitmap.contains(&index) { + index = (index + 1) % shared.chunk_count; + } + drop(bitmap); + + AudioFileLoading::fetch_chunk(session, &shared, &mut write_file, index); + } + } + + fn fetch_chunk(session: &Session, shared: &Arc, + write_file: &mut TempFile, index: usize) { + + let rx = session.stream(shared.file_id, + (index * CHUNK_SIZE / 4) as u32, + (CHUNK_SIZE / 4) as u32); + + println!("Chunk {}", index); + + write_file.seek(SeekFrom::Start((index * CHUNK_SIZE) as u64)).unwrap(); + + let mut size = 0usize; + for event in rx.iter() { + match event { + StreamEvent::Header(..) => (), + StreamEvent::Data(data) => { + write_file.write_all(&data).unwrap(); + + size += data.len(); + if size >= CHUNK_SIZE { + break + } + } + } + } + + let mut bitmap = shared.bitmap.lock().unwrap(); + bitmap.insert(index as usize); + + shared.cond.notify_all(); + } + + fn store(session: &Session, shared: &AudioFileShared, write_file: &mut TempFile) { + write_file.seek(SeekFrom::Start(0)).unwrap(); + + mkdir_existing(&AudioFileManager::cache_dir(session, shared.file_id)).unwrap(); + + let mut f = fs::File::create(AudioFileManager::cache_path(session, shared.file_id)).unwrap(); + io::copy(write_file, &mut f).unwrap(); + } +} + +impl <'s> Read for AudioFileLoading<'s> { + fn read(&mut self, output: &mut [u8]) -> io::Result { + let index = self.position as usize / CHUNK_SIZE; + let offset = self.position as usize % CHUNK_SIZE; + let len = min(output.len(), CHUNK_SIZE-offset); + + let mut bitmap = self.shared.bitmap.lock().unwrap(); + while !bitmap.contains(&index) { + bitmap = self.shared.cond.wait(bitmap).unwrap(); + } + drop(bitmap); + + let len = try!(self.read_file.read(&mut output[..len])); + + self.position += len as u64; + + Ok(len) + } +} + +impl <'s> Seek for AudioFileLoading<'s> { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + self.position = try!(self.read_file.seek(pos)); + + /* + * Notify the fetch thread to get the correct block + * This can fail if fetch thread has completed, in which case the + * block is ready. Just ignore the error. + */ + self.seek.send(self.position).ignore(); + Ok(self.position as u64) + } +} + +impl <'s> Read for AudioFile<'s> { + fn read(&mut self, output: &mut [u8]) -> io::Result { + match *self { + AudioFile::Direct(ref mut file) => file.read(output), + AudioFile::Loading(ref mut loading) => loading.read(output), + } + } +} + +impl <'s> Seek for AudioFile<'s> { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + match *self { + AudioFile::Direct(ref mut file) => file.seek(pos), + AudioFile::Loading(ref mut loading) => loading.seek(pos), + } + } +} + +pub struct AudioFileManager; +impl AudioFileManager { + pub fn new() -> AudioFileManager { + AudioFileManager + } + + pub fn cache_dir(session: &Session, file_id: FileId) -> PathBuf { + let name = file_id.to_base16(); + session.config.cache_location.join(&name[0..2]) + } + + pub fn cache_path(session: &Session, file_id: FileId) -> PathBuf { + let name = file_id.to_base16(); + AudioFileManager::cache_dir(session, file_id).join(&name[2..]) + } + + pub fn request<'a> (&mut self, session: &'a Session, file_id: FileId) -> AudioFile<'a> { + match fs::File::open(AudioFileManager::cache_path(session, file_id)) { + Ok(f) => AudioFile::Direct(f), + Err(..) => AudioFile::Loading(AudioFileLoading::new(session, file_id)) + } } } diff --git a/src/audio_key.rs b/src/audio_key.rs index 313cead1..b1f1bb63 100644 --- a/src/audio_key.rs +++ b/src/audio_key.rs @@ -52,7 +52,7 @@ impl AudioKeyManager { self.next_seq += 1; let mut data : Vec = Vec::new(); - data.write(&file).unwrap(); + data.write(&file.0).unwrap(); data.write(&track.to_raw()).unwrap(); data.write_u32::(seq).unwrap(); data.write_u16::(0x0000).unwrap(); @@ -84,7 +84,7 @@ impl PacketHandler for AudioKeyManager { if let AudioKeyStatus::Loading(cbs) = status { for cb in cbs { - cb.send(key).unwrap(); + cb.send(key).ignore(); } } } diff --git a/src/main.rs b/src/main.rs index 11d0da28..e86c5004 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ #![feature(plugin,scoped,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,arc_weak,append,future)] #![allow(deprecated)] -#![allow(unused_imports,dead_code)] +//#![allow(unused_imports,dead_code)] #![plugin(protobuf_macros)] #[macro_use] extern crate lazy_static; @@ -19,6 +19,7 @@ extern crate rand; extern crate readall; extern crate vorbis; extern crate time; +extern crate tempfile; extern crate librespot_protocol; @@ -41,6 +42,7 @@ use std::io::{Read, Write}; use std::path::Path; use protobuf::core::Message; use std::thread; +use std::path::PathBuf; use metadata::{AlbumRef, ArtistRef, TrackRef}; use session::{Config, Session}; @@ -56,6 +58,7 @@ fn main() { let mut appkey_file = File::open(Path::new(&args.next().unwrap())).unwrap(); let username = args.next().unwrap(); let password = args.next().unwrap(); + let cache_location = args.next().unwrap(); let name = args.next().unwrap(); let mut appkey = Vec::new(); @@ -64,7 +67,8 @@ fn main() { let config = Config { application_key: appkey, user_agent: version_string(), - device_id: name.to_string() + device_id: name.clone(), + cache_location: PathBuf::from(cache_location) }; let session = Session::new(config); session.login(username.clone(), password); @@ -86,7 +90,7 @@ fn main() { state_update_id: 0, seq_nr: 0, - name: name.clone(), + name: name, ident: session.config.device_id.clone(), device_type: 5, can_play: true, diff --git a/src/metadata.rs b/src/metadata.rs index 342f3d40..676aea7a 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -35,7 +35,7 @@ impl MetadataTrait for Track { .map(|file| { let mut dst = [0u8; 20]; copy_memory(&file.get_file_id(), &mut dst); - dst + FileId(dst) }) .collect(), } @@ -67,7 +67,7 @@ impl MetadataTrait for Album { .map(|image| { let mut dst = [0u8; 20]; copy_memory(&image.get_file_id(), &mut dst); - dst + FileId(dst) }) .collect(), } diff --git a/src/session.rs b/src/session.rs index 015702c3..46313fd8 100644 --- a/src/session.rs +++ b/src/session.rs @@ -3,11 +3,12 @@ use crypto::sha1::Sha1; use protobuf::{self, Message}; use rand::thread_rng; use std::sync::{Mutex, Arc, Future, mpsc}; +use std::path::PathBuf; use connection::{self, PlainConnection, CipherConnection}; use keys::PrivateKeys; use librespot_protocol as protocol; -use util::{SpotifyId, FileId}; +use util::{SpotifyId, FileId, mkdir_existing}; use mercury::{MercuryManager, MercuryRequest, MercuryResponse}; use metadata::{MetadataManager, MetadataRef, MetadataTrait}; @@ -22,6 +23,7 @@ pub struct Config { pub application_key: Vec, pub user_agent: String, pub device_id: String, + pub cache_location: PathBuf, } pub struct Session { @@ -46,6 +48,8 @@ impl Session { h.result_str() }; + mkdir_existing(&config.cache_location).unwrap(); + let keys = PrivateKeys::new(); let mut connection = PlainConnection::connect().unwrap(); diff --git a/src/stream.rs b/src/stream.rs index 3414bb2d..3224329b 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -55,7 +55,7 @@ impl StreamManager { data.write_u32::(0x00000000).unwrap(); data.write_u32::(0x00009C40).unwrap(); data.write_u32::(0x00020000).unwrap(); - data.write(&file).unwrap(); + data.write(&file.0).unwrap(); data.write_u32::(offset).unwrap(); data.write_u32::(offset + size).unwrap(); diff --git a/src/util/mod.rs b/src/util/mod.rs index 34bada1c..eb40ad27 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,15 +1,20 @@ use rand::{Rng,Rand}; use time; +use std::io; +use std::fs; +use std::path::Path; mod int128; mod spotify_id; mod arcvec; mod subfile; +mod zerofile; pub use util::int128::u128; pub use util::spotify_id::{SpotifyId, FileId}; pub use util::arcvec::ArcVec; pub use util::subfile::Subfile; +pub use util::zerofile::ZeroFile; #[macro_export] macro_rules! eprintln( @@ -73,3 +78,12 @@ pub fn now_ms() -> i64 { ts.sec * 1000 + ts.nsec as i64 / 1000000 } +pub fn mkdir_existing(path: &Path) -> io::Result<()> { + fs::create_dir(path) + .or_else(|err| if err.kind() == io::ErrorKind::AlreadyExists { + Ok(()) + } else { + Err(err) + }) +} + diff --git a/src/util/spotify_id.rs b/src/util/spotify_id.rs index 9a84f511..aa823468 100644 --- a/src/util/spotify_id.rs +++ b/src/util/spotify_id.rs @@ -3,11 +3,12 @@ use util::u128; use byteorder::{BigEndian,ByteOrder}; use std::ascii::AsciiExt; -pub type FileId = [u8; 20]; - #[derive(Debug,Copy,Clone,PartialEq,Eq,Hash)] pub struct SpotifyId(u128); +#[derive(Debug,Copy,Clone,PartialEq,Eq,Hash)] +pub struct FileId(pub [u8; 20]); + const BASE62_DIGITS: &'static [u8] = b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; const BASE16_DIGITS: &'static [u8] = b"0123456789abcdef"; @@ -77,3 +78,12 @@ impl SpotifyId { } } +impl FileId { + pub fn to_base16(&self) -> String { + self.0.iter() + .map(|b| format!("{:02x}", b)) + .collect::>() + .concat() + } +} + diff --git a/src/util/zerofile.rs b/src/util/zerofile.rs new file mode 100644 index 00000000..7c8c66e2 --- /dev/null +++ b/src/util/zerofile.rs @@ -0,0 +1,44 @@ +use std::io; +use std::cmp::{min, max}; + +pub struct ZeroFile { + position: u64, + size: u64 +} + +impl ZeroFile { + pub fn new(size: u64) -> ZeroFile { + ZeroFile { + position: 0, + size: size + } + } +} + +impl io::Seek for ZeroFile { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let newpos = match pos { + io::SeekFrom::Start(offset) => offset as i64, + io::SeekFrom::End(offset) => self.size as i64 + offset, + io::SeekFrom::Current(offset) => self.position as i64 + offset, + }; + + self.position = max(min(newpos, self.size as i64), 0) as u64; + + Ok(self.position) + } +} + +impl io::Read for ZeroFile { + // TODO optimize with memset or similar + fn read(&mut self, output: &mut [u8]) -> io::Result { + let len = min(output.len(), (self.size - self.position) as usize); + for i in 0..len { + output[i] = 0; + } + + self.position += len as u64; + Ok(len) + } +} +