From 10f9da410ef970e9ba8b9c0bb5e74ca415a98dab Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 3 Feb 2017 17:08:35 +0100 Subject: [PATCH] Remove code to notify spirc manager from the mixer --- src/lib.rs | 5 +- src/mercury.rs | 20 ++++++-- src/messaging/mod.rs | 22 --------- src/mixer/mod.rs | 3 -- src/mixer/softmixer.rs | 13 +----- src/session.rs | 9 ++-- src/spirc.rs | 40 ++++++---------- src/util/channel.rs | 102 ----------------------------------------- src/util/mod.rs | 2 - 9 files changed, 35 insertions(+), 181 deletions(-) delete mode 100644 src/messaging/mod.rs delete mode 100644 src/util/channel.rs diff --git a/src/lib.rs b/src/lib.rs index aea2411f..59e46547 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,9 +8,6 @@ #![cfg_attr(feature="clippy", feature(plugin))] #![cfg_attr(feature="clippy", plugin(clippy))] -#[macro_use] -pub mod util; - #[macro_use] extern crate lazy_static; #[macro_use] extern crate log; @@ -61,9 +58,9 @@ pub mod link; pub mod metadata; pub mod player; pub mod stream; +pub mod util; pub mod version; pub mod mixer; -pub mod messaging; #[cfg(feature = "with-syntex")] include!(concat!(env!("OUT_DIR"), "/lib.rs")); #[cfg(not(feature = "with-syntex"))] include!("lib.in.rs"); diff --git a/src/mercury.rs b/src/mercury.rs index 3d9add8f..37a908c6 100644 --- a/src/mercury.rs +++ b/src/mercury.rs @@ -4,10 +4,10 @@ use protobuf::{self, Message}; use std::collections::HashMap; use std::io::{Cursor, Read, Write}; use std::mem::replace; +use std::sync::mpsc; use protocol; use session::{Session, PacketHandler}; -use messaging::{MercuryResponse, MercuryResponseSender}; #[derive(Debug, PartialEq, Eq)] pub enum MercuryMethod { @@ -24,9 +24,15 @@ pub struct MercuryRequest { pub payload: Vec>, } +#[derive(Debug)] +pub struct MercuryResponse { + pub uri: String, + pub payload: Vec>, +} + enum MercuryCallback { Future(eventual::Complete), - Subscription(MercuryResponseSender), + Subscription(mpsc::Sender), Channel, } @@ -39,7 +45,7 @@ pub struct MercuryPending { pub struct MercuryManager { next_seq: u32, pending: HashMap, MercuryPending>, - subscriptions: HashMap, + subscriptions: HashMap>, } impl ToString for MercuryMethod { @@ -97,7 +103,9 @@ impl MercuryManager { rx } - pub fn subscribe(&mut self, session: &Session, uri: String, tx: MercuryResponseSender) { + pub fn subscribe(&mut self, session: &Session, uri: String) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(); + self.request_with_callback(session, MercuryRequest { method: MercuryMethod::SUB, @@ -106,6 +114,8 @@ impl MercuryManager { payload: Vec::new(), }, MercuryCallback::Subscription(tx)); + + rx } fn parse_part(mut s: &mut Read) -> Vec { @@ -118,7 +128,7 @@ impl MercuryManager { fn complete_subscription(&mut self, response: MercuryResponse, - tx: MercuryResponseSender) { + tx: mpsc::Sender) { for sub_data in response.payload { if let Ok(mut sub) = protobuf::parse_from_bytes::(&sub_data) { diff --git a/src/messaging/mod.rs b/src/messaging/mod.rs deleted file mode 100644 index 9bf43c92..00000000 --- a/src/messaging/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -pub struct UpdateMessage; - -#[derive(Debug)] -pub struct MercuryResponse { - pub uri: String, - pub payload: Vec>, -} - -pub enum SpircMessage { - MercuryMsg(MercuryResponse), - UpdateMsg(UpdateMessage) -} - -implement_sender!(name => MercuryResponseSender, - wrap => MercuryResponse, - with => SpircMessage, - variant => MercuryMsg); - -implement_sender!(name => UpdateMessageSender, - wrap => UpdateMessage, - with => SpircMessage, - variant => UpdateMsg); \ No newline at end of file diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index 6b2a3f8a..32350af3 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -1,11 +1,8 @@ -use messaging::UpdateMessageSender; - use self::softmixer::SoftMixer; pub mod softmixer; pub trait Mixer { - fn init(&mut self, UpdateMessageSender); fn start(&self); fn stop(&self); fn set_volume(&self, volume: u16); diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index 184ed563..e54e728e 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -1,29 +1,22 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use messaging::{UpdateMessage, UpdateMessageSender}; - use super::Mixer; use super::AudioFilter; pub struct SoftMixer { - volume: Arc, - tx: Option + volume: Arc } impl SoftMixer { pub fn new() -> SoftMixer { SoftMixer { - volume: Arc::new(AtomicUsize::new(0xFFFF)), - tx: None + volume: Arc::new(AtomicUsize::new(0xFFFF)) } } } impl Mixer for SoftMixer { - fn init(&mut self, tx: UpdateMessageSender) { - self.tx = Some(tx); - } fn start(&self) { } fn stop(&self) { @@ -33,8 +26,6 @@ impl Mixer for SoftMixer { } fn set_volume(&self, volume: u16) { self.volume.store(volume as usize, Ordering::Relaxed); - let tx = self.tx.as_ref().expect("SoftMixer not initialized"); - tx.send(UpdateMessage).unwrap(); } fn get_audio_filter(&self) -> Option> { Some(Box::new(SoftVolumeApplier { volume: self.volume.clone() })) diff --git a/src/session.rs b/src/session.rs index 76157a77..59167614 100644 --- a/src/session.rs +++ b/src/session.rs @@ -9,7 +9,7 @@ use protobuf::{self, Message}; use rand::thread_rng; use std::io::{Read, Write, Cursor}; use std::result::Result; -use std::sync::{Mutex, RwLock, Arc}; +use std::sync::{Mutex, RwLock, Arc, mpsc}; use std::str::FromStr; use album_cover::AlbumCover; @@ -20,11 +20,10 @@ use authentication::Credentials; use cache::Cache; use connection::{self, PlainConnection, CipherConnection}; use diffie_hellman::DHLocalKeys; -use mercury::{MercuryManager, MercuryRequest}; +use mercury::{MercuryManager, MercuryRequest, MercuryResponse}; use metadata::{MetadataManager, MetadataRef, MetadataTrait}; use protocol; use stream::StreamManager; -use messaging::{MercuryResponse, MercuryResponseSender}; use util::{self, SpotifyId, FileId, ReadSeek}; use version; @@ -321,8 +320,8 @@ impl Session { self.0.mercury.lock().unwrap().request(self, req) } - pub fn mercury_sub(&self, uri: String, tx: MercuryResponseSender) { - self.0.mercury.lock().unwrap().subscribe(self, uri, tx) + pub fn mercury_sub(&self, uri: String) -> mpsc::Receiver { + self.0.mercury.lock().unwrap().subscribe(self, uri) } pub fn cache(&self) -> &Cache { diff --git a/src/spirc.rs b/src/spirc.rs index 21894f3a..c532f794 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,10 +1,9 @@ use eventual::Async; use protobuf::{self, Message, RepeatedField}; -use std::sync::{mpsc, Mutex, Arc}; +use std::sync::{Mutex, Arc}; use std::collections::HashMap; use mercury::{MercuryRequest, MercuryMethod}; -use messaging::{SpircMessage, MercuryResponseSender, UpdateMessageSender}; use player::{Player, PlayerState}; use mixer::Mixer; use session::Session; @@ -128,15 +127,8 @@ impl SpircManager { pub fn run(&self) { let rx = { let mut internal = self.0.lock().unwrap(); - let (tx, rx) = mpsc::channel::(); - let mercury_response_sender = MercuryResponseSender::create(tx.clone()); - - internal.session.mercury_sub(internal.uri(), mercury_response_sender); - - let update_message_sender = UpdateMessageSender::create(tx.clone()); - - internal.mixer.init(update_message_sender); + let rx = internal.session.mercury_sub(internal.uri()); internal.notify(true, None); @@ -153,25 +145,18 @@ impl SpircManager { rx }; - for msg in rx { - match msg { - SpircMessage::MercuryMsg(pkt) => { - let data = pkt.payload.first().unwrap(); - let frame = protobuf::parse_from_bytes::(data).unwrap(); + for pkt in rx { + let data = pkt.payload.first().unwrap(); + let frame = protobuf::parse_from_bytes::(data).unwrap(); - debug!("{:?} {:?} {} {} {}", - frame.get_typ(), - frame.get_device_state().get_name(), - frame.get_ident(), - frame.get_seq_nr(), - frame.get_state_update_id()); + debug!("{:?} {:?} {} {} {}", + frame.get_typ(), + frame.get_device_state().get_name(), + frame.get_ident(), + frame.get_seq_nr(), + frame.get_state_update_id()); - self.0.lock().unwrap().handle(frame); - } - SpircMessage::UpdateMsg(_) => { - self.0.lock().unwrap().notify(false, None); - } - } + self.0.lock().unwrap().handle(frame); } } @@ -321,6 +306,7 @@ impl SpircInternal { } MessageType::kMessageTypeVolume => { self.mixer.set_volume(frame.get_volume() as u16); + self.notify(false, None); } MessageType::kMessageTypeGoodbye => { if frame.has_ident() { diff --git a/src/util/channel.rs b/src/util/channel.rs deleted file mode 100644 index 1d277f42..00000000 --- a/src/util/channel.rs +++ /dev/null @@ -1,102 +0,0 @@ -/// Creates an implentation of sender which can be used to share an async channel for multiple message types. -/// -/// # Examples -/// -/// ``` -/// struct D; -/// struct A; -/// -/// enum Msg { -/// FirstType(D), -/// SecondType(A) -/// } -/// -/// fn main() { -/// let (tx, rx) = channel::(); -/// -/// let d_sender = DSender::create(tx.clone()); -/// let a_sender = ASender::create(tx.clone()); -/// subscribe(d_sender.clone()); -/// subscribe2(d_sender.clone()); -/// subscribe3(a_sender.clone()); - // -/// let mut i = 0; - // -/// for m in rx { -/// i += 1; -/// match m { -/// Msg::FirstType(_) => println!("m: D {}", i), -/// Msg::SecondType(_) => println!("m: A {}", i) -/// }; -/// } -/// } -/// -/// fn subscribe(sender: DSender) { -/// thread::spawn(move|| { -/// sender.send(D).unwrap(); -/// }); -/// } -/// fn subscribe2(sender: DSender) { -/// thread::spawn(move|| { -/// thread::sleep(time::Duration::from_millis(10)); -/// sender.send(D).unwrap(); -/// }); -/// } -/// fn subscribe3(sender: ASender) { -/// thread::spawn(move|| { -/// sender.send(A).unwrap(); -/// }); -/// } -/// implement_sender!(name => DSender, -/// wrap => D, -/// with => Msg, -/// variant => FirstType) -/// implement_sender!(name => ASender, -/// wrap => A, -/// with => Msg, -/// variant => SecondType) -/// ``` -macro_rules! implement_sender { - (name => $name:ident, - wrap => $wrap_type:ident, - with => $with_type:ident, - variant => $variant:ident) => { - pub struct $name { - wrapped_sender: ::std::sync::mpsc::Sender<$with_type>, - } - - impl $name { - pub fn create(sender: ::std::sync::mpsc::Sender<$with_type>) -> $name { - $name { - wrapped_sender: sender - } - } - pub fn send(&self, t: $wrap_type) -> Result<(), ::std::sync::mpsc::SendError<$wrap_type>> { - let wrapped = self.wrap(t); - let result = self.wrapped_sender.send(wrapped); - result.map_err(|senderror| { - let ::std::sync::mpsc::SendError(z) = senderror; - ::std::sync::mpsc::SendError(self.unwrap(z)) - }) - } - fn wrap(&self, d: $wrap_type) -> $with_type { - $with_type::$variant(d) - } - fn unwrap(&self, msg: $with_type) -> $wrap_type { - let d = match msg { - $with_type::$variant(d) => d, - _ => unreachable!() - }; - d - } - } - - impl Clone for $name { - fn clone(&self) -> $name { - $name { - wrapped_sender: self.wrapped_sender.clone() - } - } - } - } -} \ No newline at end of file diff --git a/src/util/mod.rs b/src/util/mod.rs index f550f526..0683f6a0 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -11,8 +11,6 @@ mod int128; mod spotify_id; mod arcvec; mod subfile; -#[macro_use] -mod channel; pub use util::int128::u128; pub use util::spotify_id::{SpotifyId, FileId};