From 855a7e87a77506d6c9339b900becbd3c32e085dd Mon Sep 17 00:00:00 2001 From: Paul Lietar Date: Wed, 18 Jan 2017 20:39:46 +0000 Subject: [PATCH] Move AudioKeyManager to tokio --- src/audio_decrypt.rs | 4 +- src/audio_key.rs | 114 +++++++++++++++---------------------- src/cache/default_cache.rs | 4 +- src/component.rs | 29 ++++++++++ src/lib.rs | 1 + src/player.rs | 3 +- src/session.rs | 39 ++++++++++--- src/util/mod.rs | 53 +++++++++++++++++ 8 files changed, 164 insertions(+), 83 deletions(-) create mode 100644 src/component.rs diff --git a/src/audio_decrypt.rs b/src/audio_decrypt.rs index ab4e512c..e2df0c2d 100644 --- a/src/audio_decrypt.rs +++ b/src/audio_decrypt.rs @@ -17,7 +17,7 @@ pub struct AudioDecrypt { impl AudioDecrypt { pub fn new(key: AudioKey, reader: T) -> AudioDecrypt { - let cipher = aes::ctr(aes::KeySize::KeySize128, &key, AUDIO_AESIV); + let cipher = aes::ctr(aes::KeySize::KeySize128, &key.0, AUDIO_AESIV); AudioDecrypt { cipher: cipher, key: key, @@ -45,7 +45,7 @@ impl io::Seek for AudioDecrypt { let iv = BigUint::from_bytes_be(AUDIO_AESIV) .add(BigUint::from_u64(newpos / 16).unwrap()) .to_bytes_be(); - self.cipher = aes::ctr(aes::KeySize::KeySize128, &self.key, &iv); + self.cipher = aes::ctr(aes::KeySize::KeySize128, &self.key.0, &iv); let buf = vec![0u8; skip as usize]; let mut buf2 = vec![0u8; skip as usize]; diff --git a/src/audio_key.rs b/src/audio_key.rs index da9c3a73..7fe4b779 100644 --- a/src/audio_key.rs +++ b/src/audio_key.rs @@ -1,92 +1,68 @@ -use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use eventual; +use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use futures::sync::oneshot; use std::collections::HashMap; -use std::io::{Cursor, Read, Write}; +use std::io::Write; +use util::SeqGenerator; use util::{SpotifyId, FileId}; -use session::{Session, PacketHandler}; -pub type AudioKey = [u8; 16]; +#[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)] +pub struct AudioKey(pub [u8; 16]); + #[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)] pub struct AudioKeyError; -#[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)] -struct AudioKeyId(SpotifyId, FileId); +type Result = ::std::result::Result; -pub struct AudioKeyManager { - next_seq: u32, - pending: HashMap, - cache: HashMap>>, +component! { + AudioKeyManager : AudioKeyManagerInner { + sequence: SeqGenerator = SeqGenerator::new(0), + pending: HashMap> = HashMap::new(), + } } impl AudioKeyManager { - pub fn new() -> AudioKeyManager { - AudioKeyManager { - next_seq: 1, - pending: HashMap::new(), - cache: HashMap::new(), + pub fn dispatch(&self, cmd: u8, data: Vec) { + let seq = BigEndian::read_u32(&data[..4]); + + let sender = self.lock(|inner| inner.pending.remove(&seq)); + + if let Some(sender) = sender { + match cmd { + 0xd => { + let mut key = [0u8; 16]; + key.copy_from_slice(&data[4..20]); + sender.complete(Ok(AudioKey(key))); + } + 0xe => { + warn!("error audio key {:x} {:x}", data[4], data[5]); + sender.complete(Err(AudioKeyError)); + } + _ => (), + } } } - fn send_key_request(&mut self, session: &Session, track: SpotifyId, file: FileId) -> u32 { - let seq = self.next_seq; - self.next_seq += 1; + pub fn request<'a>(&self, track: SpotifyId, file: FileId) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + let seq = self.lock(move |inner| { + let seq = inner.sequence.get(); + inner.pending.insert(seq, tx); + seq + }); + + self.send_key_request(seq, track, file); + rx + } + + fn send_key_request<'a>(&self, seq: u32, track: SpotifyId, file: FileId) { let mut data: Vec = Vec::new(); data.write(&file.0).unwrap(); data.write(&track.to_raw()).unwrap(); data.write_u32::(seq).unwrap(); data.write_u16::(0x0000).unwrap(); - session.send_packet(0xc, data); - - seq - } - - pub fn request(&mut self, - session: &Session, - track: SpotifyId, - file: FileId) - -> eventual::Future { - - let id = AudioKeyId(track, file); - self.cache - .get_mut(&id) - .map(|ref mut requests| { - let (tx, rx) = eventual::Future::pair(); - requests.push(tx); - rx - }) - .unwrap_or_else(|| { - let seq = self.send_key_request(session, track, file); - self.pending.insert(seq, id.clone()); - - let (tx, rx) = eventual::Future::pair(); - self.cache.insert(id, vec![tx]); - rx - }) - } -} - -impl PacketHandler for AudioKeyManager { - fn handle(&mut self, cmd: u8, data: Vec, _session: &Session) { - let mut data = Cursor::new(data); - let seq = data.read_u32::().unwrap(); - - if let Some(callbacks) = self.pending.remove(&seq).and_then(|id| self.cache.remove(&id)) { - if cmd == 0xd { - let mut key = [0u8; 16]; - data.read_exact(&mut key).unwrap(); - - for cb in callbacks { - cb.complete(key); - } - } else if cmd == 0xe { - let error = AudioKeyError; - for cb in callbacks { - cb.fail(error); - } - } - } + self.session().send_packet(0xc, data) } } diff --git a/src/cache/default_cache.rs b/src/cache/default_cache.rs index 34a92e6e..1a6c1b8f 100644 --- a/src/cache/default_cache.rs +++ b/src/cache/default_cache.rs @@ -56,7 +56,7 @@ impl Cache for DefaultCache { value.and_then(|value| if value.len() == 16 { let mut result = [0u8; 16]; result.clone_from_slice(&value); - Some(result) + Some(AudioKey(result)) } else { None }) @@ -73,7 +73,7 @@ impl Cache for DefaultCache { key.extend_from_slice(&track.to_raw()); key.extend_from_slice(&file.0); - db.set(&key, &audio_key.as_ref()).unwrap(); + db.set(&key, &audio_key.0.as_ref()).unwrap(); } xact.commit().unwrap(); diff --git a/src/component.rs b/src/component.rs new file mode 100644 index 00000000..f4058e43 --- /dev/null +++ b/src/component.rs @@ -0,0 +1,29 @@ +macro_rules! component { + ($name:ident : $inner:ident { $($key:ident : $ty:ty = $value:expr,)* }) => { + #[derive(Clone)] + pub struct $name(::std::sync::Arc<($crate::session::SessionWeak, ::std::sync::Mutex<$inner>)>); + impl $name { + #[allow(dead_code)] + pub fn new(session: $crate::session::SessionWeak) -> $name { + $name(::std::sync::Arc::new((session, ::std::sync::Mutex::new($inner { + $($key : $value,)* + })))) + } + + #[allow(dead_code)] + fn lock R, R>(&self, f: F) -> R { + let mut inner = (self.0).1.lock().expect("Mutex poisoned"); + f(&mut inner) + } + + #[allow(dead_code)] + fn session(&self) -> $crate::session::Session { + (self.0).0.upgrade() + } + } + + struct $inner { + $($key : $ty,)* + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 1ce4b166..aa9821cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,7 @@ extern crate portaudio; #[cfg(feature = "libpulse-sys")] extern crate libpulse_sys; +#[macro_use] mod component; pub mod album_cover; pub mod audio_backend; pub mod audio_decrypt; diff --git a/src/player.rs b/src/player.rs index 60a5ad01..b3f948a4 100644 --- a/src/player.rs +++ b/src/player.rs @@ -4,6 +4,7 @@ use std::sync::{mpsc, Mutex, Arc, MutexGuard}; use std::thread; use std::io::{Read, Seek}; use vorbis; +use futures::Future; use audio_decrypt::AudioDecrypt; use audio_backend::Sink; @@ -206,7 +207,7 @@ fn load_track(session: &Session, track_id: SpotifyId) -> Option, metadata: Mutex, stream: Mutex, - audio_key: Mutex, rx_connection: Mutex), io::Error>>, tx_connection: Mutex)>>, + + audio_key: Lazy, } #[derive(Clone)] pub struct Session(pub Arc); +#[derive(Clone)] +pub struct SessionWeak(pub Weak); + pub fn device_id(name: &str) -> String { let mut h = Sha1::new(); h.input_str(&name); @@ -82,7 +86,7 @@ pub fn device_id(name: &str) -> String { impl Session { pub fn connect(config: Config, credentials: Credentials, cache: Box, handle: Handle) - -> Box>), Error=io::Error>> + -> Box), Error=io::Error>> { let access_point = apresolve_or_fallback::(&handle); @@ -108,7 +112,8 @@ impl Session { } fn create(transport: connection::Transport, config: Config, - cache: Box, username: String) -> (Session, Box>) + cache: Box, username: String) + -> (Session, BoxFuture<(), io::Error>) { let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned())); let (tx, rx, task) = adaptor::adapt(transport); @@ -127,12 +132,16 @@ impl Session { mercury: Mutex::new(MercuryManager::new()), metadata: Mutex::new(MetadataManager::new()), stream: Mutex::new(StreamManager::new()), - audio_key: Mutex::new(AudioKeyManager::new()), + + audio_key: Lazy::new(), })); (session, task) } + pub fn audio_key(&self) -> &AudioKeyManager { + self.0.audio_key.get(|| AudioKeyManager::new(self.weak())) + } pub fn poll(&self) { let (cmd, data) = self.recv(); @@ -141,7 +150,7 @@ impl Session { 0x4 => self.send_packet(0x49, data), 0x4a => (), 0x9 | 0xa => self.0.stream.lock().unwrap().handle(cmd, data, self), - 0xd | 0xe => self.0.audio_key.lock().unwrap().handle(cmd, data, self), + 0xd | 0xe => self.audio_key().dispatch(cmd, data), 0x1b => { self.0.data.write().unwrap().country = String::from_utf8(data).unwrap(); } @@ -158,6 +167,7 @@ impl Session { self.0.tx_connection.lock().unwrap().send((cmd, data)) } + /* pub fn audio_key(&self, track: SpotifyId, file_id: FileId) -> Future { self.0.cache .get_audio_key(track, file_id) @@ -172,6 +182,7 @@ impl Session { }) }) } + */ pub fn audio_file(&self, file_id: FileId) -> Box { self.0.cache @@ -241,6 +252,16 @@ impl Session { pub fn device_id(&self) -> &str { &self.config().device_id } + + pub fn weak(&self) -> SessionWeak { + SessionWeak(Arc::downgrade(&self.0)) + } +} + +impl SessionWeak { + pub fn upgrade(&self) -> Session { + Session(self.0.upgrade().expect("Session died")) + } } pub trait PacketHandler { diff --git a/src/util/mod.rs b/src/util/mod.rs index 0683f6a0..b2afdcf0 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -1,6 +1,7 @@ use num::{BigUint, Integer, Zero, One}; use rand::{Rng, Rand}; use std::io; +use std::mem; use std::ops::{Mul, Rem, Shr}; use std::fs; use std::path::Path; @@ -107,3 +108,55 @@ impl<'s> Iterator for StrChunks<'s> { pub trait ReadSeek : ::std::io::Read + ::std::io::Seek { } impl ReadSeek for T { } +pub trait Seq { + fn next(&self) -> Self; +} + +macro_rules! impl_seq { + ($($ty:ty)*) => { $( + impl Seq for $ty { + fn next(&self) -> Self { *self + 1 } + } + )* } +} + +impl_seq!(u8 u16 u32 u64 usize); + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Default)] +pub struct SeqGenerator(T); + +impl SeqGenerator { + pub fn new(value: T) -> Self { + SeqGenerator(value) + } + + pub fn get(&mut self) -> T { + let value = self.0.next(); + mem::replace(&mut self.0, value) + } +} + +use std::sync::Mutex; +use std::cell::UnsafeCell; + +pub struct Lazy(Mutex, UnsafeCell>); +unsafe impl Sync for Lazy {} +unsafe impl Send for Lazy {} + +impl Lazy { + pub fn new() -> Lazy { + Lazy(Mutex::new(false), UnsafeCell::new(None)) + } + + pub fn get T>(&self, f: F) -> &T { + let mut inner = self.0.lock().unwrap(); + if !*inner { + unsafe { + *self.1.get() = Some(f()); + } + *inner = true; + } + + unsafe { &*self.1.get() }.as_ref().unwrap() + } +}