librespot/src/audio_file.rs

236 lines
7 KiB
Rust
Raw Normal View History

2015-09-01 11:20:37 +00:00
use bit_set::BitSet;
2015-06-23 14:38:29 +00:00
use byteorder::{ByteOrder, BigEndian};
use std::cmp::min;
2015-06-23 17:34:48 +00:00
use std::sync::{Arc, Condvar, Mutex};
use std::sync::mpsc::{self, TryRecvError};
use std::thread;
2015-07-07 21:40:31 +00:00
use std::fs;
use std::io::{self, Read, Write, Seek, SeekFrom};
use std::path::PathBuf;
2016-01-12 23:29:31 +00:00
use tempfile::NamedTempFile;
2015-06-23 14:38:29 +00:00
2016-01-12 23:29:31 +00:00
use util::{FileId, IgnoreExt, mkdir_existing};
use session::Session;
2015-09-01 11:20:37 +00:00
use stream::StreamEvent;
2015-06-23 14:38:29 +00:00
2016-01-02 15:19:39 +00:00
const CHUNK_SIZE: usize = 0x20000;
2015-06-23 14:38:29 +00:00
2015-09-01 11:20:37 +00:00
pub enum AudioFile {
2015-07-07 21:40:31 +00:00
Direct(fs::File),
2016-01-02 15:19:39 +00:00
Loading(AudioFileLoading),
2015-07-07 21:40:31 +00:00
}
2015-09-01 11:20:37 +00:00
pub struct AudioFileLoading {
2016-01-12 23:29:31 +00:00
read_file: fs::File,
2015-07-07 21:40:31 +00:00
position: u64,
2015-06-23 17:34:48 +00:00
seek: mpsc::Sender<u64>,
2015-07-07 21:40:31 +00:00
2015-06-24 00:41:39 +00:00
shared: Arc<AudioFileShared>,
}
2015-06-23 14:38:29 +00:00
2015-06-24 00:41:39 +00:00
struct AudioFileShared {
file_id: FileId,
2015-06-24 00:41:39 +00:00
size: usize,
2015-07-07 21:40:31 +00:00
chunk_count: usize,
cond: Condvar,
bitmap: Mutex<BitSet>,
2015-06-23 14:38:29 +00:00
}
2015-09-01 11:20:37 +00:00
impl AudioFileLoading {
2015-07-07 21:40:31 +00:00
fn new(session: &Session, file_id: FileId) -> AudioFileLoading {
2016-01-02 15:19:39 +00:00
let size = session.stream(file_id, 0, 1)
.into_iter()
.filter_map(|event| {
match event {
StreamEvent::Header(id, ref data) if id == 0x3 => {
Some(BigEndian::read_u32(data) as usize * 4)
}
_ => None,
}
})
.next()
.unwrap();
2015-07-07 21:40:31 +00:00
let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
2015-07-07 21:40:31 +00:00
2016-01-12 23:29:31 +00:00
2015-07-07 21:40:31 +00:00
let shared = Arc::new(AudioFileShared {
file_id: file_id,
size: size,
chunk_count: chunk_count,
cond: Condvar::new(),
bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
});
2016-01-12 23:29:31 +00:00
let write_file = NamedTempFile::new().unwrap();
write_file.set_len(size as u64).unwrap();
let read_file = write_file.reopen().unwrap();
2015-06-23 14:38:29 +00:00
let (seek_tx, seek_rx) = mpsc::channel();
2016-01-02 15:48:44 +00:00
{
let shared = shared.clone();
let session = session.clone();
thread::spawn(move || AudioFileLoading::fetch(&session, shared, write_file, seek_rx));
}
2015-09-01 11:20:37 +00:00
2015-07-07 21:40:31 +00:00
AudioFileLoading {
read_file: read_file,
2015-06-24 00:41:39 +00:00
position: 0,
seek: seek_tx,
2015-06-23 14:38:29 +00:00
2016-01-02 15:19:39 +00:00
shared: shared,
2015-07-07 21:40:31 +00:00
}
}
2016-01-02 15:19:39 +00:00
fn fetch(session: &Session,
shared: Arc<AudioFileShared>,
2016-01-12 23:29:31 +00:00
mut write_file: NamedTempFile,
2016-01-02 15:19:39 +00:00
seek_rx: mpsc::Receiver<u64>) {
2015-07-07 21:40:31 +00:00
let mut index = 0;
loop {
match seek_rx.try_recv() {
Ok(position) => {
index = position as usize / CHUNK_SIZE;
}
Err(TryRecvError::Disconnected) => break,
Err(TryRecvError::Empty) => (),
}
let bitmap = shared.bitmap.lock().unwrap();
if bitmap.len() >= shared.chunk_count {
drop(bitmap);
AudioFileLoading::persist_to_cache(session, &shared, &mut write_file);
2015-07-07 21:40:31 +00:00
break;
}
while bitmap.contains(&index) {
index = (index + 1) % shared.chunk_count;
}
drop(bitmap);
AudioFileLoading::fetch_chunk(session, &shared, &mut write_file, index);
}
2015-06-23 17:34:48 +00:00
}
2015-06-24 00:41:39 +00:00
2016-01-02 15:19:39 +00:00
fn fetch_chunk(session: &Session,
shared: &Arc<AudioFileShared>,
2016-01-12 23:29:31 +00:00
write_file: &mut NamedTempFile,
2016-01-02 15:19:39 +00:00
index: usize) {
2015-07-07 21:40:31 +00:00
let rx = session.stream(shared.file_id,
2016-01-02 15:19:39 +00:00
(index * CHUNK_SIZE / 4) as u32,
(CHUNK_SIZE / 4) as u32);
2015-06-23 14:38:29 +00:00
2015-07-07 21:40:31 +00:00
println!("Chunk {}", index);
write_file.seek(SeekFrom::Start((index * CHUNK_SIZE) as u64)).unwrap();
let mut size = 0usize;
2015-06-23 17:34:48 +00:00
for event in rx.iter() {
match event {
2015-07-07 21:40:31 +00:00
StreamEvent::Header(..) => (),
2015-06-23 17:34:48 +00:00
StreamEvent::Data(data) => {
2015-07-07 21:40:31 +00:00
write_file.write_all(&data).unwrap();
2015-06-23 17:34:48 +00:00
2015-07-07 21:40:31 +00:00
size += data.len();
if size >= CHUNK_SIZE {
2016-01-02 15:19:39 +00:00
break;
2015-06-23 14:38:29 +00:00
}
}
}
2015-06-23 17:34:48 +00:00
}
2015-06-24 00:41:39 +00:00
2015-07-07 21:40:31 +00:00
let mut bitmap = shared.bitmap.lock().unwrap();
bitmap.insert(index as usize);
shared.cond.notify_all();
2015-06-23 17:34:48 +00:00
}
fn persist_to_cache(session: &Session, shared: &AudioFileShared, write_file: &mut NamedTempFile) {
if let Some(path) = AudioFileManager::cache_path(session, shared.file_id) {
write_file.seek(SeekFrom::Start(0)).unwrap();
mkdir_existing(path.parent().unwrap()).unwrap();
2015-06-23 17:34:48 +00:00
let mut cache_file = fs::File::create(path).unwrap();
io::copy(write_file, &mut cache_file).unwrap();
}
2015-06-23 14:38:29 +00:00
}
}
2015-09-01 11:20:37 +00:00
impl Read for AudioFileLoading {
2015-06-23 14:38:29 +00:00
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
2015-07-07 21:40:31 +00:00
let index = self.position as usize / CHUNK_SIZE;
let offset = self.position as usize % CHUNK_SIZE;
2016-01-02 15:19:39 +00:00
let len = min(output.len(), CHUNK_SIZE - offset);
2015-06-23 14:38:29 +00:00
2015-07-07 21:40:31 +00:00
let mut bitmap = self.shared.bitmap.lock().unwrap();
while !bitmap.contains(&index) {
bitmap = self.shared.cond.wait(bitmap).unwrap();
2015-06-23 14:38:29 +00:00
}
2015-07-07 21:40:31 +00:00
drop(bitmap);
2015-09-01 11:20:37 +00:00
let read_len = try!(self.read_file.read(&mut output[..len]));
2015-06-23 14:38:29 +00:00
2015-09-01 11:20:37 +00:00
self.position += read_len as u64;
2015-06-23 14:38:29 +00:00
2015-09-01 11:20:37 +00:00
Ok(read_len)
2015-06-23 14:38:29 +00:00
}
}
2015-09-01 11:20:37 +00:00
impl Seek for AudioFileLoading {
2015-07-07 21:40:31 +00:00
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.position = try!(self.read_file.seek(pos));
2016-01-02 15:19:39 +00:00
// Notify the fetch thread to get the correct block
// This can fail if fetch thread has completed, in which case the
// block is ready. Just ignore the error.
2015-07-07 21:40:31 +00:00
self.seek.send(self.position).ignore();
2015-06-23 17:34:48 +00:00
Ok(self.position as u64)
2015-06-23 14:38:29 +00:00
}
}
2015-09-01 11:20:37 +00:00
impl Read for AudioFile {
2015-07-07 21:40:31 +00:00
fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
match *self {
AudioFile::Direct(ref mut file) => file.read(output),
AudioFile::Loading(ref mut loading) => loading.read(output),
}
2015-07-03 00:23:49 +00:00
}
}
2015-09-01 11:20:37 +00:00
impl Seek for AudioFile {
2015-07-07 21:40:31 +00:00
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
match *self {
AudioFile::Direct(ref mut file) => file.seek(pos),
AudioFile::Loading(ref mut loading) => loading.seek(pos),
}
}
2015-07-03 00:23:49 +00:00
}
2015-07-07 21:40:31 +00:00
pub struct AudioFileManager;
2015-07-03 00:23:49 +00:00
impl AudioFileManager {
pub fn new() -> AudioFileManager {
2015-07-07 21:40:31 +00:00
AudioFileManager
}
pub fn cache_path(session: &Session, file_id: FileId) -> Option<PathBuf> {
session.config().cache_location.as_ref().map(|cache| {
let name = file_id.to_base16();
cache.join(&name[0..2]).join(&name[2..])
})
2015-07-03 00:23:49 +00:00
}
2016-01-02 15:19:39 +00:00
pub fn request(&mut self, session: &Session, file_id: FileId) -> AudioFile {
let cache_path = AudioFileManager::cache_path(session, file_id);
let cache_file = cache_path.and_then(|p| fs::File::open(p).ok());
cache_file.map(AudioFile::Direct).unwrap_or_else(|| {
AudioFile::Loading(AudioFileLoading::new(session, file_id))
})
2015-07-03 00:23:49 +00:00
}
}