diff --git a/src/mercury.rs b/src/mercury.rs index 37a908c6..2b1ee564 100644 --- a/src/mercury.rs +++ b/src/mercury.rs @@ -4,10 +4,10 @@ use protobuf::{self, Message}; use std::collections::HashMap; use std::io::{Cursor, Read, Write}; use std::mem::replace; -use std::sync::mpsc; use protocol; use session::{Session, PacketHandler}; +use spirc::MercuryResponseSender; #[derive(Debug, PartialEq, Eq)] pub enum MercuryMethod { @@ -32,7 +32,7 @@ pub struct MercuryResponse { enum MercuryCallback { Future(eventual::Complete), - Subscription(mpsc::Sender), + Subscription(MercuryResponseSender), Channel, } @@ -45,7 +45,7 @@ pub struct MercuryPending { pub struct MercuryManager { next_seq: u32, pending: HashMap, MercuryPending>, - subscriptions: HashMap>, + subscriptions: HashMap, } impl ToString for MercuryMethod { @@ -103,9 +103,7 @@ impl MercuryManager { rx } - pub fn subscribe(&mut self, session: &Session, uri: String) -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(); - + pub fn subscribe(&mut self, session: &Session, uri: String, tx: MercuryResponseSender) { self.request_with_callback(session, MercuryRequest { method: MercuryMethod::SUB, @@ -114,8 +112,6 @@ impl MercuryManager { payload: Vec::new(), }, MercuryCallback::Subscription(tx)); - - rx } fn parse_part(mut s: &mut Read) -> Vec { @@ -128,7 +124,7 @@ impl MercuryManager { fn complete_subscription(&mut self, response: MercuryResponse, - tx: mpsc::Sender) { + tx: MercuryResponseSender) { for sub_data in response.payload { if let Ok(mut sub) = protobuf::parse_from_bytes::(&sub_data) { diff --git a/src/session.rs b/src/session.rs index 59167614..75976ba2 100644 --- a/src/session.rs +++ b/src/session.rs @@ -9,7 +9,7 @@ use protobuf::{self, Message}; use rand::thread_rng; use std::io::{Read, Write, Cursor}; use std::result::Result; -use std::sync::{Mutex, RwLock, Arc, mpsc}; +use std::sync::{Mutex, RwLock, Arc}; use std::str::FromStr; use album_cover::AlbumCover; @@ -24,6 +24,7 @@ use mercury::{MercuryManager, MercuryRequest, MercuryResponse}; use metadata::{MetadataManager, MetadataRef, MetadataTrait}; use protocol; use stream::StreamManager; +use spirc::MercuryResponseSender; use util::{self, SpotifyId, FileId, ReadSeek}; use version; @@ -320,8 +321,8 @@ impl Session { self.0.mercury.lock().unwrap().request(self, req) } - pub fn mercury_sub(&self, uri: String) -> mpsc::Receiver { - self.0.mercury.lock().unwrap().subscribe(self, uri) + pub fn mercury_sub(&self, uri: String, tx: MercuryResponseSender) { + self.0.mercury.lock().unwrap().subscribe(self, uri, tx) } pub fn cache(&self) -> &Cache { diff --git a/src/spirc.rs b/src/spirc.rs index 6688e834..faee0add 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,10 +1,10 @@ use eventual::Async; use protobuf::{self, Message, RepeatedField}; use std::borrow::Cow; -use std::sync::{Mutex, Arc}; +use std::sync::{mpsc, Mutex, Arc}; use std::collections::HashMap; -use mercury::{MercuryRequest, MercuryMethod}; +use mercury::{MercuryRequest, MercuryMethod, MercuryResponse}; use player::{Player, PlayerState}; use mixer::Mixer; use session::Session; @@ -56,6 +56,18 @@ pub struct State { pub end_of_track: bool, } +pub struct UpdateMessage; + +pub enum SpircMessage { + MercuryMsg(MercuryResponse), + UpdateMsg(UpdateMessage) +} + +implement_sender!(name => MercuryResponseSender, + wrap => MercuryResponse, + with => SpircMessage, + variant => MercuryMsg); + impl SpircManager { pub fn new(session: Session, player: Player, mixer: Box) -> SpircManager { let ident = session.device_id().to_owned(); @@ -92,8 +104,11 @@ impl SpircManager { pub fn run(&self) { let rx = { let mut internal = self.0.lock().unwrap(); + let (tx, rx) = mpsc::channel::(); - let rx = internal.session.mercury_sub(internal.uri()); + let mercury_response_sender = MercuryResponseSender::create(tx.clone()); + + internal.session.mercury_sub(internal.uri(), mercury_response_sender); internal.notify(true, None); @@ -110,18 +125,23 @@ impl SpircManager { rx }; - for pkt in rx { - let data = pkt.payload.first().unwrap(); - let frame = protobuf::parse_from_bytes::(data).unwrap(); + for msg in rx { + match msg { + SpircMessage::MercuryMsg(pkt) => { + let data = pkt.payload.first().unwrap(); + let frame = protobuf::parse_from_bytes::(data).unwrap(); - debug!("{:?} {:?} {} {} {}", - frame.get_typ(), - frame.get_device_state().get_name(), - frame.get_ident(), - frame.get_seq_nr(), - frame.get_state_update_id()); + debug!("{:?} {:?} {} {} {}", + frame.get_typ(), + frame.get_device_state().get_name(), + frame.get_ident(), + frame.get_seq_nr(), + frame.get_state_update_id()); - self.0.lock().unwrap().handle(frame); + self.0.lock().unwrap().handle(frame); + } + _ => {} + } } }