From 2a2f227bef1a146ba4148f75663fe11412927592 Mon Sep 17 00:00:00 2001 From: Paul Lietar Date: Thu, 2 Jul 2015 19:24:25 +0200 Subject: [PATCH] Refactor the whole architecture. Use less threads, makes it much simpler to reason about. --- src/audio_file.rs | 87 +++++++++++-------------- src/audio_key.rs | 96 +++++++-------------------- src/connection.rs | 66 ++----------------- src/main.rs | 66 +++++++++---------- src/mercury.rs | 163 +++++++++++++++++++--------------------------- src/metadata.rs | 112 ++++++++++--------------------- src/player.rs | 17 +---- src/session.rs | 131 ++++++++++++++++++------------------- src/stream.rs | 86 +++++++----------------- src/util/mod.rs | 17 +++-- 10 files changed, 299 insertions(+), 542 deletions(-) diff --git a/src/audio_file.rs b/src/audio_file.rs index c594f202..f4cb4e13 100644 --- a/src/audio_file.rs +++ b/src/audio_file.rs @@ -5,22 +5,25 @@ use std::io::{self, SeekFrom}; use std::slice::bytes::copy_memory; use std::sync::{Arc, Condvar, Mutex}; use std::sync::mpsc::{self, TryRecvError}; - -use stream::{StreamRequest, StreamEvent}; -use util::FileId; use std::thread; +use stream::StreamEvent; +use util::FileId; +use session::Session; + const CHUNK_SIZE : usize = 0x40000; -#[derive(Clone)] -pub struct AudioFile { +pub struct AudioFile<'s> { position: usize, seek: mpsc::Sender, shared: Arc, + + #[allow(dead_code)] + thread: thread::JoinGuard<'s, ()>, } struct AudioFileShared { - fileid: FileId, + file_id: FileId, size: usize, data: Mutex, cond: Condvar @@ -31,38 +34,24 @@ struct AudioFileData { bitmap: BitSet, } -impl AudioFile { - pub fn new(fileid: FileId, streams: mpsc::Sender) -> AudioFile { - let (tx, rx) = mpsc::channel(); - - streams.send(StreamRequest { - id: fileid, - offset: 0, - size: 1, - callback: tx - }).unwrap(); - - let size = { - let mut size = None; - for event in rx.iter() { +impl <'s> AudioFile <'s> { + pub fn new(session: &Session, file_id: FileId) -> AudioFile { + let mut it = session.stream(file_id, 0, 1).into_iter() + .filter_map(|event| { match event { - StreamEvent::Header(id, data) => { - if id == 0x3 { - size = Some(BigEndian::read_u32(&data) * 4); - break; - } - }, - StreamEvent::Data(_) => break + StreamEvent::Header(id, ref data) if id == 0x3 => { + Some(BigEndian::read_u32(data) as usize * 4) + } + _ => None } - } - size.unwrap() as usize - }; + }); + + let size = it.next().unwrap(); let bufsize = size + (CHUNK_SIZE - size % CHUNK_SIZE); - let (tx, rx) = mpsc::channel(); let shared = Arc::new(AudioFileShared { - fileid: fileid, + file_id: file_id, size: size, data: Mutex::new(AudioFileData { buffer: vec![0u8; bufsize], @@ -71,25 +60,23 @@ impl AudioFile { cond: Condvar::new(), }); - let file = AudioFile { - position: 0, - seek: tx, - shared: shared.clone(), - }; + let shared_ = shared.clone(); + let (seek_tx, seek_rx) = mpsc::channel(); - thread::spawn( move || { AudioFile::fetch(shared, streams, rx); }); + let file = AudioFile { + thread: thread::scoped( move || { AudioFile::fetch(session, shared_, seek_rx); }), + position: 0, + seek: seek_tx, + shared: shared, + }; file } - fn fetch_chunk(shared: &Arc, streams: &mpsc::Sender, index: usize) { - let (tx, rx) = mpsc::channel(); - streams.send(StreamRequest { - id: shared.fileid, - offset: (index * CHUNK_SIZE / 4) as u32, - size: (CHUNK_SIZE / 4) as u32, - callback: tx - }).unwrap(); + fn fetch_chunk(session: &Session, shared: &Arc, index: usize) { + let rx = session.stream(shared.file_id, + (index * CHUNK_SIZE / 4) as u32, + (CHUNK_SIZE / 4) as u32); let mut offset = 0usize; for event in rx.iter() { @@ -114,7 +101,7 @@ impl AudioFile { } } - fn fetch(shared: Arc, streams: mpsc::Sender, seek: mpsc::Receiver) { + fn fetch(session: &Session, shared: Arc, seek: mpsc::Receiver) { let mut index = 0; loop { index = if index * CHUNK_SIZE < shared.size { @@ -138,13 +125,13 @@ impl AudioFile { } if index * CHUNK_SIZE < shared.size { - AudioFile::fetch_chunk(&shared, &streams, index) + AudioFile::fetch_chunk(session, &shared, index) } } } } -impl io::Read for AudioFile { +impl <'s> io::Read for AudioFile <'s> { fn read(&mut self, output: &mut [u8]) -> io::Result { let index = self.position / CHUNK_SIZE; let offset = self.position % CHUNK_SIZE; @@ -163,7 +150,7 @@ impl io::Read for AudioFile { } } -impl io::Seek for AudioFile { +impl <'s> io::Seek for AudioFile <'s> { fn seek(&mut self, pos: io::SeekFrom) -> io::Result { let newpos = match pos { SeekFrom::Start(offset) => offset as i64, diff --git a/src/audio_key.rs b/src/audio_key.rs index 10e02496..118e051b 100644 --- a/src/audio_key.rs +++ b/src/audio_key.rs @@ -1,109 +1,63 @@ use std::collections::HashMap; -use std::sync::mpsc; +use std::sync::{mpsc, Future}; use std::io::{Cursor, Write}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt}; use readall::ReadAllExt; -use connection::Packet; -use util::{SpotifyId, FileId}; -use util::Either::{Left, Right}; -use subsystem::Subsystem; +use util::{SpotifyId, FileId, IgnoreExt}; +use session::Session; +use connection::PacketHandler; -pub struct AudioKeyRequest { - pub track: SpotifyId, - pub file: FileId, - pub callback: AudioKeyCallback, -} pub type AudioKey = [u8; 16]; -pub struct AudioKeyResponse(pub AudioKey); -pub type AudioKeyCallback = mpsc::Sender; - type AudioKeyId = u32; + pub struct AudioKeyManager { next_seq: AudioKeyId, - callbacks: HashMap, - - requests: mpsc::Receiver, - packet_rx: mpsc::Receiver, - packet_tx: mpsc::Sender, + callbacks: HashMap>, } impl AudioKeyManager { - pub fn new(tx: mpsc::Sender) -> (AudioKeyManager, - mpsc::Sender, - mpsc::Sender) { - let (req_tx, req_rx) = mpsc::channel(); - let (pkt_tx, pkt_rx) = mpsc::channel(); - - (AudioKeyManager { + pub fn new() -> AudioKeyManager { + AudioKeyManager { next_seq: 1, callbacks: HashMap::new(), - - requests: req_rx, - packet_rx: pkt_rx, - packet_tx: tx - }, req_tx, pkt_tx) + } } - fn request(&mut self, req: AudioKeyRequest) { + pub fn request(&mut self, session: &Session, track: SpotifyId, file: FileId) + -> Future { + let (tx, rx) = mpsc::channel(); + let seq = self.next_seq; self.next_seq += 1; let mut data : Vec = Vec::new(); - data.write(&req.file).unwrap(); - data.write(&req.track.to_raw()).unwrap(); + data.write(&file).unwrap(); + data.write(&track.to_raw()).unwrap(); data.write_u32::(seq).unwrap(); data.write_u16::(0x0000).unwrap(); - self.packet_tx.send(Packet { - cmd: 0xc, - data: data - }).unwrap(); + session.send_packet(0xc, &data).unwrap(); - self.callbacks.insert(seq, req.callback); + self.callbacks.insert(seq, tx); + + Future::from_receiver(rx) } +} - fn packet(&mut self, packet: Packet) { - assert_eq!(packet.cmd, 0xd); +impl PacketHandler for AudioKeyManager { + fn handle(&mut self, cmd: u8, data: Vec) { + assert_eq!(cmd, 0xd); - let mut data = Cursor::new(&packet.data as &[u8]); + let mut data = Cursor::new(data); let seq = data.read_u32::().unwrap(); let mut key = [0u8; 16]; data.read_all(&mut key).unwrap(); match self.callbacks.remove(&seq) { - Some(callback) => callback.send(AudioKeyResponse(key)).unwrap(), + Some(callback) => callback.send(key).ignore(), None => () }; } } - -impl Subsystem for AudioKeyManager { - fn run(mut self) { - loop { - match { - let requests = &self.requests; - let packets = &self.packet_rx; - - select!{ - r = requests.recv() => { - Left(r.unwrap()) - }, - p = packets.recv() => { - Right(p.unwrap()) - } - } - } { - Left(req) => { - self.request(req); - } - Right(pkt) => { - self.packet(pkt); - } - } - - } - } -} - diff --git a/src/connection.rs b/src/connection.rs index 8d4e6cce..704a6395 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -6,7 +6,6 @@ use std::io; use std::io::Write; use std::net::TcpStream; use std::result; -use std::sync::mpsc; use keys::SharedKeys; @@ -84,7 +83,7 @@ impl PlainConnection { } impl CipherConnection { - pub fn send_encrypted_packet(&mut self, cmd: u8, data: &[u8]) -> Result<()> { + pub fn send_packet(&mut self, cmd: u8, data: &[u8]) -> Result<()> { try!(self.stream.write_u8(cmd)); try!(self.stream.write_u16::(data.len() as u16)); try!(self.stream.write(data)); @@ -107,70 +106,15 @@ impl CipherConnection { } } -pub struct Packet { - pub cmd: u8, - pub data: Vec +pub trait PacketHandler { + fn handle(&mut self, cmd: u8, data: Vec); } -pub struct SendThread { - connection: CipherConnection, - receiver: mpsc::Receiver, -} -impl SendThread { - pub fn new(connection: CipherConnection) - -> (SendThread, mpsc::Sender) { - let (tx, rx) = mpsc::channel(); - (SendThread { - connection: connection, - receiver: rx - }, tx) - } - - pub fn run(mut self) { - for req in self.receiver { - self.connection.send_encrypted_packet( - req.cmd, &req.data).unwrap(); - } - } -} - -pub struct PacketDispatch { - pub main: mpsc::Sender, - pub stream: mpsc::Sender, - pub mercury: mpsc::Sender, - pub audio_key: mpsc::Sender, -} - -pub struct RecvThread { - connection: CipherConnection, - dispatch: PacketDispatch -} - -impl RecvThread { - pub fn new(connection: CipherConnection, dispatch: PacketDispatch) - -> RecvThread { - RecvThread { - connection: connection, - dispatch: dispatch - } - } - - pub fn run(mut self) { - loop { - let (cmd, data) = self.connection.recv_packet().unwrap(); - let packet = Packet { - cmd: cmd, - data: data - }; - +/* match packet.cmd { 0x09 => &self.dispatch.stream, 0xd | 0xe => &self.dispatch.audio_key, 0xb2...0xb6 => &self.dispatch.mercury, _ => &self.dispatch.main, }.send(packet).unwrap(); - - } - } -} - + */ diff --git a/src/main.rs b/src/main.rs index aef53bab..48fcd45a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,8 @@ #![crate_name = "librespot"] -#![feature(plugin,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,mpsc_select,arc_weak,append)] -#![allow(unused_imports,dead_code)] +#![feature(plugin,scoped,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,arc_weak,append,future)] +#![allow(deprecated)] +//#![allow(unused_imports,dead_code)] #![plugin(protobuf_macros)] #[macro_use] extern crate lazy_static; @@ -38,10 +39,10 @@ use std::clone::Clone; use std::fs::File; use std::io::{Read, Write}; use std::path::Path; -use std::sync::mpsc; use protobuf::core::Message; +use std::thread; -use metadata::{MetadataCache, AlbumRef, ArtistRef, TrackRef}; +use metadata::{AlbumRef, ArtistRef, TrackRef}; use session::{Config, Session}; use util::SpotifyId; use util::version::version_string; @@ -69,12 +70,17 @@ fn main() { session.login(username.clone(), password); session.poll(); - let ident = session.config.device_id.clone(); - SpircManager{ - session: session, + let poll_thread = thread::scoped(|| { + loop { + session.poll(); + } + }); + + SpircManager { + session: &session, username: username.clone(), name: name.clone(), - ident: ident, + ident: session.config.device_id.clone(), device_type: 5, state_update_id: 0, @@ -88,21 +94,17 @@ fn main() { state: PlayerState::new() }.run(); - /* - loop { - session.poll(); - } - */ + poll_thread.join(); } -fn print_track(cache: &mut MetadataCache, track_id: SpotifyId) { - let track : TrackRef = cache.get(track_id); +fn print_track(session: &Session, track_id: SpotifyId) { + let track : TrackRef = session.metadata(track_id); let album : AlbumRef = { let handle = track.wait(); let data = handle.unwrap(); eprintln!("{}", data.name); - cache.get(data.album) + session.metadata(data.album) }; let artists : Vec = { @@ -110,7 +112,7 @@ fn print_track(cache: &mut MetadataCache, track_id: SpotifyId) { let data = handle.unwrap(); eprintln!("{}", data.name); data.artists.iter().map(|id| { - cache.get(*id) + session.metadata(*id) }).collect() }; @@ -159,7 +161,6 @@ impl PlayerState { } fn import(&mut self, state: &protocol::spirc::State) { - //println!("{:?}", state); self.status = state.get_status(); self.context_uri = state.get_context_uri().to_string(); @@ -203,8 +204,8 @@ impl PlayerState { } } -struct SpircManager { - session: Session, +struct SpircManager<'s> { + session: &'s Session, username: String, state_update_id: i64, seq_nr: u32, @@ -221,24 +222,16 @@ struct SpircManager { state: PlayerState } -impl SpircManager { +impl <'s> SpircManager<'s> { fn run(&mut self) { - let (tx, rx) = mpsc::channel(); - - self.session.mercury.send(MercuryRequest{ - method: MercuryMethod::SUB, - uri: format!("hm://remote/user/{}/v23", self.username), - content_type: None, - callback: Some(tx), - payload: Vec::new() - }).unwrap(); + let rx = self.session + .mercury_sub(format!("hm://remote/user/{}/v23", self.username)) + .into_iter().map(|pkt| { + protobuf::parse_from_bytes::(pkt.payload.front().unwrap()).unwrap() + }); self.notify(None); - let rx = rx.into_iter().map(|pkt| { - protobuf::parse_from_bytes::(pkt.payload.front().unwrap()).unwrap() - }); - for frame in rx { println!("{:?} {} {} {} {}", frame.get_typ(), @@ -328,13 +321,12 @@ impl SpircManager { pkt.set_state(self.state.export()); } - self.session.mercury.send(MercuryRequest{ + self.session.mercury(MercuryRequest{ method: MercuryMethod::SEND, uri: format!("hm://remote/user/{}", self.username), content_type: None, - callback: None, payload: vec![ pkt.write_to_bytes().unwrap() ] - }).unwrap(); + }); } fn device_state(&mut self) -> protocol::spirc::DeviceState { diff --git a/src/mercury.rs b/src/mercury.rs index bf495b15..b0d06c52 100644 --- a/src/mercury.rs +++ b/src/mercury.rs @@ -5,12 +5,12 @@ use std::collections::{HashMap, LinkedList}; use std::io::{Cursor, Read, Write}; use std::fmt; use std::mem::replace; -use std::sync::mpsc; +use std::sync::{mpsc, Future}; -use connection::Packet; use librespot_protocol as protocol; -use subsystem::Subsystem; -use util::Either::{Left, Right}; +use session::Session; +use connection::PacketHandler; +use util::IgnoreExt; #[derive(Debug, PartialEq, Eq)] pub enum MercuryMethod { @@ -24,7 +24,6 @@ pub struct MercuryRequest { pub method: MercuryMethod, pub uri: String, pub content_type: Option, - pub callback: Option, pub payload: Vec> } @@ -34,22 +33,16 @@ pub struct MercuryResponse { pub payload: LinkedList> } -pub type MercuryCallback = mpsc::Sender; - pub struct MercuryPending { parts: LinkedList>, partial: Option>, - callback: Option + callback: Option> } pub struct MercuryManager { next_seq: u32, pending: HashMap, MercuryPending>, - subscriptions: HashMap, - - requests: mpsc::Receiver, - packet_tx: mpsc::Sender, - packet_rx: mpsc::Receiver, + subscriptions: HashMap>, } impl fmt::Display for MercuryMethod { @@ -64,24 +57,17 @@ impl fmt::Display for MercuryMethod { } impl MercuryManager { - pub fn new(tx: mpsc::Sender) -> (MercuryManager, - mpsc::Sender, - mpsc::Sender) { - let (req_tx, req_rx) = mpsc::channel(); - let (pkt_tx, pkt_rx) = mpsc::channel(); - - (MercuryManager { + pub fn new() -> MercuryManager { + MercuryManager { next_seq: 0, pending: HashMap::new(), subscriptions: HashMap::new(), - - requests: req_rx, - packet_rx: pkt_rx, - packet_tx: tx, - }, req_tx, pkt_tx) + } } - fn request(&mut self, req: MercuryRequest) { + pub fn request(&mut self, session: &Session, req: MercuryRequest) + -> Future { + let mut seq = [0u8; 4]; BigEndian::write_u32(&mut seq, self.next_seq); self.next_seq += 1; @@ -93,20 +79,31 @@ impl MercuryManager { _ => 0xb2, }; - self.packet_tx.send(Packet { - cmd: cmd, - data: data - }).unwrap(); + session.send_packet(cmd, &data).unwrap(); - if req.method != MercuryMethod::SUB { - self.pending.insert(seq.to_vec(), MercuryPending{ - parts: LinkedList::new(), - partial: None, - callback: req.callback, - }); - } else if let Some(cb) = req.callback { - self.subscriptions.insert(req.uri, cb); - } + let (tx, rx) = mpsc::channel(); + self.pending.insert(seq.to_vec(), MercuryPending{ + parts: LinkedList::new(), + partial: None, + callback: Some(tx), + }); + + Future::from_receiver(rx) + } + + pub fn subscribe(&mut self, session: &Session, uri: String) + -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(); + self.subscriptions.insert(uri.clone(), tx); + + self.request(session, MercuryRequest{ + method: MercuryMethod::SUB, + uri: uri, + content_type: None, + payload: Vec::new() + }); + + rx } fn parse_part(mut s: &mut Read) -> Vec { @@ -133,14 +130,44 @@ impl MercuryManager { }; if let Some(ref ch) = callback { + // Ignore send error. + // It simply means the receiver was closed ch.send(MercuryResponse{ uri: header.get_uri().to_string(), payload: pending.parts - }).unwrap(); + }).ignore(); } } - fn handle_packet(&mut self, cmd: u8, data: Vec) { + fn encode_request(&self, seq: &[u8], req: &MercuryRequest) -> Vec { + let mut packet = Vec::new(); + packet.write_u16::(seq.len() as u16).unwrap(); + packet.write_all(seq).unwrap(); + packet.write_u8(1).unwrap(); // Flags: FINAL + packet.write_u16::(1 + req.payload.len() as u16).unwrap(); // Part count + + let mut header = protobuf_init!(protocol::mercury::Header::new(), { + uri: req.uri.clone(), + method: req.method.to_string(), + }); + if let Some(ref content_type) = req.content_type { + header.set_content_type(content_type.clone()); + } + + packet.write_u16::(header.compute_size() as u16).unwrap(); + header.write_to_writer(&mut packet).unwrap(); + + for p in &req.payload { + packet.write_u16::(p.len() as u16).unwrap(); + packet.write(&p).unwrap(); + } + + packet + } +} + +impl PacketHandler for MercuryManager { + fn handle(&mut self, cmd: u8, data: Vec) { let mut packet = Cursor::new(data); let seq = { @@ -185,59 +212,5 @@ impl MercuryManager { self.pending.insert(seq, pending); } } - - fn encode_request(&self, seq: &[u8], req: &MercuryRequest) -> Vec { - let mut packet = Vec::new(); - packet.write_u16::(seq.len() as u16).unwrap(); - packet.write_all(seq).unwrap(); - packet.write_u8(1).unwrap(); // Flags: FINAL - packet.write_u16::(1 + req.payload.len() as u16).unwrap(); // Part count - - let mut header = protobuf_init!(protocol::mercury::Header::new(), { - uri: req.uri.clone(), - method: req.method.to_string(), - }); - if let Some(ref content_type) = req.content_type { - header.set_content_type(content_type.clone()); - } - - packet.write_u16::(header.compute_size() as u16).unwrap(); - header.write_to_writer(&mut packet).unwrap(); - - for p in &req.payload { - packet.write_u16::(p.len() as u16).unwrap(); - packet.write(&p).unwrap(); - } - - packet - } -} - -impl Subsystem for MercuryManager { - fn run(mut self) { - loop { - match { - let requests = &self.requests; - let packets = &self.packet_rx; - - select!{ - r = requests.recv() => { - Left(r.unwrap()) - }, - p = packets.recv() => { - Right(p.unwrap()) - } - } - } { - Left(req) => { - self.request(req); - } - Right(pkt) => { - self.handle_packet(pkt.cmd, pkt.data); - } - } - - } - } } diff --git a/src/metadata.rs b/src/metadata.rs index a0e33f07..342f3d40 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -3,13 +3,13 @@ use std::any::{Any, TypeId}; use std::collections::HashMap; use std::fmt; use std::slice::bytes::copy_memory; -use std::sync::{mpsc, Arc, Condvar, Mutex, MutexGuard, Weak}; +use std::sync::{Arc, Condvar, Mutex, MutexGuard, Weak}; use std::thread; use librespot_protocol as protocol; use mercury::{MercuryRequest, MercuryMethod}; -use subsystem::Subsystem; use util::{SpotifyId, FileId}; +use session::Session; pub trait MetadataTrait : Send + Any + 'static { type Message: protobuf::MessageStatic; @@ -119,40 +119,6 @@ pub type TrackRef = MetadataRef; pub type AlbumRef = MetadataRef; pub type ArtistRef = MetadataRef; -pub struct MetadataCache { - metadata: mpsc::Sender, - cache: HashMap<(SpotifyId, TypeId), Box> -} - -impl MetadataCache { - pub fn new(metadata: mpsc::Sender) -> MetadataCache { - MetadataCache { - metadata: metadata, - cache: HashMap::new() - } - } - - pub fn get(&mut self, id: SpotifyId) - -> MetadataRef { - let key = (id, TypeId::of::()); - - self.cache.get(&key) - .and_then(|x| x.downcast_ref::>>()) - .and_then(|x| x.upgrade()) - .unwrap_or_else(|| { - let x : MetadataRef = Arc::new(Metadata{ - id: id, - state: Mutex::new(MetadataState::Loading), - cond: Condvar::new() - }); - - self.cache.insert(key, Box::new(x.downgrade())); - self.metadata.send(T::request(x.clone())).unwrap(); - x - }) - } -} - impl Metadata { pub fn id(&self) -> SpotifyId { self.id @@ -214,34 +180,46 @@ pub enum MetadataRequest { } pub struct MetadataManager { - requests: mpsc::Receiver, - mercury: mpsc::Sender + cache: HashMap<(SpotifyId, TypeId), Box> } impl MetadataManager { - pub fn new(mercury: mpsc::Sender) -> (MetadataManager, - mpsc::Sender) { - let (tx, rx) = mpsc::channel(); - (MetadataManager { - requests: rx, - mercury: mercury - }, tx) + pub fn new() -> MetadataManager { + MetadataManager { + cache: HashMap::new() + } } - fn load (&self, object: MetadataRef) { - let mercury = self.mercury.clone(); - thread::spawn(move || { - let (tx, rx) = mpsc::channel(); - - mercury.send(MercuryRequest { - method: MercuryMethod::GET, - uri: format!("{}/{}", T::base_url(), object.id.to_base16()), - content_type: None, - callback: Some(tx), - payload: Vec::new() - }).unwrap(); + pub fn get(&mut self, session: &Session, id: SpotifyId) + -> MetadataRef { + let key = (id, TypeId::of::()); - let response = rx.recv().unwrap(); + self.cache.get(&key) + .and_then(|x| x.downcast_ref::>>()) + .and_then(|x| x.upgrade()) + .unwrap_or_else(|| { + let x : MetadataRef = Arc::new(Metadata{ + id: id, + state: Mutex::new(MetadataState::Loading), + cond: Condvar::new() + }); + + self.cache.insert(key, Box::new(x.downgrade())); + self.load(session, x.clone()); + x + }) + } + + fn load (&self, session: &Session, object: MetadataRef) { + let rx = session.mercury(MercuryRequest { + method: MercuryMethod::GET, + uri: format!("{}/{}", T::base_url(), object.id.to_base16()), + content_type: None, + payload: Vec::new() + }); + + thread::spawn(move || { + let response = rx.into_inner(); let msg : T::Message = protobuf::parse_from_bytes( response.payload.front().unwrap()).unwrap(); @@ -251,21 +229,3 @@ impl MetadataManager { } } -impl Subsystem for MetadataManager { - fn run(self) { - for req in self.requests.iter() { - match req { - MetadataRequest::Artist(artist) => { - self.load(artist) - } - MetadataRequest::Album(album) => { - self.load(album) - } - MetadataRequest::Track(track) => { - self.load(track) - } - } - } - } -} - diff --git a/src/player.rs b/src/player.rs index 896fdc89..979ba51c 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,8 +1,6 @@ use portaudio; -use std::sync::mpsc; use vorbis; -use audio_key::{AudioKeyRequest, AudioKeyResponse}; use metadata::TrackRef; use session::Session; use audio_file::AudioFile; @@ -15,24 +13,13 @@ impl Player { pub fn play(session: &Session, track: TrackRef) { let file_id = *track.wait().unwrap().files.first().unwrap(); - let key = { - let (tx, rx) = mpsc::channel(); - - session.audio_key.send(AudioKeyRequest { - track: track.id(), - file: file_id, - callback: tx - }).unwrap(); - - let AudioKeyResponse(key) = rx.recv().unwrap(); - key - }; + let key = session.audio_key(track.id(), file_id).into_inner(); let mut decoder = vorbis::Decoder::new( Subfile::new( AudioDecrypt::new(key, - AudioFile::new(file_id, session.stream.clone())), 0xa7)).unwrap(); + AudioFile::new(session, file_id)), 0xa7)).unwrap(); //decoder.time_seek(60f64).unwrap(); portaudio::initialize().unwrap(); diff --git a/src/session.rs b/src/session.rs index b38aa218..8a5ae63b 100644 --- a/src/session.rs +++ b/src/session.rs @@ -2,17 +2,19 @@ use crypto::digest::Digest; use crypto::sha1::Sha1; use protobuf::{self, Message}; use rand::thread_rng; -use std::sync::mpsc; -use std::thread; +use std::sync::{Mutex, Arc, Future, mpsc}; -use audio_key; -use connection::{PlainConnection, Packet, PacketDispatch, SendThread, RecvThread}; +use connection::{self, PlainConnection, CipherConnection}; use keys::PrivateKeys; use librespot_protocol as protocol; -use mercury; -use metadata; -use stream; -use subsystem::Subsystem; +use util::{SpotifyId, FileId}; + +use mercury::{MercuryManager, MercuryRequest, MercuryResponse}; +use metadata::{MetadataManager, MetadataRef, MetadataTrait}; +use stream::{StreamManager, StreamEvent}; +use audio_key::{AudioKeyManager, AudioKey}; +use connection::PacketHandler; + use util; pub struct Config { @@ -24,15 +26,16 @@ pub struct Config { pub struct Session { pub config: Config, - packet_rx: mpsc::Receiver, - pub packet_tx: mpsc::Sender, - - pub audio_key: mpsc::Sender, - pub mercury: mpsc::Sender, - pub metadata: mpsc::Sender, - pub stream: mpsc::Sender, + mercury: Mutex, + metadata: Mutex, + stream: Mutex, + audio_key: Mutex, + rx_connection: Mutex, + tx_connection: Mutex, } +type SessionRef = Arc; + impl Session { pub fn new(mut config: Config) -> Session { config.device_id = { @@ -105,41 +108,16 @@ impl Session { let cipher_connection = connection.setup_cipher(shared_keys); - let (send_thread, tx) = SendThread::new(cipher_connection.clone()); - - let (main_tx, rx) = mpsc::channel(); - let (mercury, mercury_req, mercury_pkt) - = mercury::MercuryManager::new(tx.clone()); - let (metadata, metadata_req) - = metadata::MetadataManager::new(mercury_req.clone()); - let (stream, stream_req, stream_pkt) - = stream::StreamManager::new(tx.clone()); - let (audio_key, audio_key_req, audio_key_pkt) - = audio_key::AudioKeyManager::new(tx.clone()); - - let recv_thread = RecvThread::new(cipher_connection, PacketDispatch { - main: main_tx, - stream: stream_pkt, - mercury: mercury_pkt, - audio_key: audio_key_pkt - }); - - thread::spawn(move || send_thread.run()); - thread::spawn(move || recv_thread.run()); - - mercury.start(); - metadata.start(); - stream.start(); - audio_key.start(); - Session { config: config, - packet_rx: rx, - packet_tx: tx, - mercury: mercury_req, - metadata: metadata_req, - stream: stream_req, - audio_key: audio_key_req, + + rx_connection: Mutex::new(cipher_connection.clone()), + tx_connection: Mutex::new(cipher_connection), + + mercury: Mutex::new(MercuryManager::new()), + metadata: Mutex::new(MetadataManager::new()), + stream: Mutex::new(StreamManager::new()), + audio_key: Mutex::new(AudioKeyManager::new()), } } @@ -166,32 +144,47 @@ impl Session { } }); - self.packet_tx.send(Packet { - cmd: 0xab, - data: packet.write_to_bytes().unwrap() - }).unwrap(); + self.send_packet(0xab, &packet.write_to_bytes().unwrap()).unwrap(); } pub fn poll(&self) { - let packet = self.packet_rx.recv().unwrap(); + let (cmd, data) = + self.rx_connection.lock().unwrap().recv_packet().unwrap(); - match packet.cmd { - 0x4 => { // PING - self.packet_tx.send(Packet { - cmd: 0x49, - data: packet.data - }).unwrap(); - } - 0x4a => { // PONG - } - 0xac => { // AUTHENTICATED - eprintln!("Authentication succeedded"); - } - 0xad => { - eprintln!("Authentication failed"); - } + match cmd { + 0x4 => self.send_packet(0x49, &data).unwrap(), + 0x4a => (), + 0x9 => self.stream.lock().unwrap().handle(cmd, data), + 0xd | 0xe => self.audio_key.lock().unwrap().handle(cmd, data), + 0xb2...0xb6 => self.mercury.lock().unwrap().handle(cmd, data), + 0xac => eprintln!("Authentication succeedded"), + 0xad => eprintln!("Authentication failed"), _ => () - }; + } + } + + pub fn send_packet(&self, cmd: u8, data: &[u8]) -> connection::Result<()> { + self.tx_connection.lock().unwrap().send_packet(cmd, data) + } + + pub fn audio_key(&self, track: SpotifyId, file: FileId) -> Future { + self.audio_key.lock().unwrap().request(self, track, file) + } + + pub fn stream(&self, file: FileId, offset: u32, size: u32) -> mpsc::Receiver { + self.stream.lock().unwrap().request(self, file, offset, size) + } + + pub fn metadata(&self, id: SpotifyId) -> MetadataRef { + self.metadata.lock().unwrap().get(self, id) + } + + pub fn mercury(&self, req: MercuryRequest) -> Future { + self.mercury.lock().unwrap().request(self, req) + } + + pub fn mercury_sub(&self, uri: String) -> mpsc::Receiver { + self.mercury.lock().unwrap().subscribe(self, uri) } } diff --git a/src/stream.rs b/src/stream.rs index 581b4514..c2d4b115 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -3,18 +3,9 @@ use std::collections::HashMap; use std::io::{Cursor, Seek, SeekFrom, Write}; use std::sync::mpsc; -use connection::Packet; use util::{ArcVec, FileId}; -use util::Either::{Left, Right}; -use subsystem::Subsystem; - -pub type StreamCallback = mpsc::Sender; -pub struct StreamRequest { - pub id: FileId, - pub offset: u32, - pub size: u32, - pub callback: StreamCallback -} +use connection::PacketHandler; +use session::Session; #[derive(Debug)] pub enum StreamEvent { @@ -31,36 +22,28 @@ enum ChannelMode { struct Channel { mode: ChannelMode, - callback: StreamCallback + callback: mpsc::Sender } pub struct StreamManager { next_id: ChannelId, channels: HashMap, - - requests: mpsc::Receiver, - packet_rx: mpsc::Receiver, - packet_tx: mpsc::Sender, } impl StreamManager { - pub fn new(tx: mpsc::Sender) -> (StreamManager, - mpsc::Sender, - mpsc::Sender) { - let (req_tx, req_rx) = mpsc::channel(); - let (pkt_tx, pkt_rx) = mpsc::channel(); - - (StreamManager { + pub fn new() -> StreamManager { + StreamManager { next_id: 0, channels: HashMap::new(), - - requests: req_rx, - packet_rx: pkt_rx, - packet_tx: tx - }, req_tx, pkt_tx) + } } - fn request(&mut self, req: StreamRequest) { + pub fn request(&mut self, session: &Session, + file: FileId, offset: u32, size: u32) + -> mpsc::Receiver { + + let (tx, rx) = mpsc::channel(); + let channel_id = self.next_id; self.next_id += 1; @@ -72,22 +55,23 @@ impl StreamManager { data.write_u32::(0x00000000).unwrap(); data.write_u32::(0x00009C40).unwrap(); data.write_u32::(0x00020000).unwrap(); - data.write(&req.id).unwrap(); - data.write_u32::(req.offset).unwrap(); - data.write_u32::(req.offset + req.size).unwrap(); + data.write(&file).unwrap(); + data.write_u32::(offset).unwrap(); + data.write_u32::(offset + size).unwrap(); - self.packet_tx.send(Packet { - cmd: 0x8, - data: data - }).unwrap(); + session.send_packet(0x8, &data).unwrap(); self.channels.insert(channel_id, Channel { mode: ChannelMode::Header, - callback: req.callback + callback: tx }); - } - fn packet(&mut self, data: Vec) { + rx + } +} + +impl PacketHandler for StreamManager { + fn handle(&mut self, _cmd: u8, data: Vec) { let data = ArcVec::new(data); let mut packet = Cursor::new(&data as &[u8]); @@ -140,27 +124,3 @@ impl StreamManager { } } -impl Subsystem for StreamManager { - fn run(mut self) { - loop { - match { - let requests = &self.requests; - let packets = &self.packet_rx; - - select!{ - r = requests.recv() => { - Left(r.unwrap()) - }, - p = packets.recv() => { - Right(p.unwrap()) - } - } - } { - Left(req) => self.request(req), - Right(pkt) => self.packet(pkt.data) - } - } - } -} - - diff --git a/src/util/mod.rs b/src/util/mod.rs index 23d54ef4..061b2dda 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -47,11 +47,6 @@ pub mod version { } } -pub enum Either { - Left(S), - Right(T) -} - pub fn hexdump(data: &[u8]) { for b in data.iter() { eprint!("{:02X} ", b); @@ -59,3 +54,15 @@ pub fn hexdump(data: &[u8]) { eprintln!(""); } +pub trait IgnoreExt { + fn ignore(self); +} + +impl IgnoreExt for Result { + fn ignore(self) { + match self { + Ok(_) => (), + Err(_) => (), + } + } +}