Remove code to notify spirc manager from the mixer

This commit is contained in:
Daniel Romero 2017-02-03 17:08:35 +01:00
parent a7aba5c8e7
commit 10f9da410e
9 changed files with 35 additions and 181 deletions

View file

@ -8,9 +8,6 @@
#![cfg_attr(feature="clippy", feature(plugin))] #![cfg_attr(feature="clippy", feature(plugin))]
#![cfg_attr(feature="clippy", plugin(clippy))] #![cfg_attr(feature="clippy", plugin(clippy))]
#[macro_use]
pub mod util;
#[macro_use] extern crate lazy_static; #[macro_use] extern crate lazy_static;
#[macro_use] extern crate log; #[macro_use] extern crate log;
@ -61,9 +58,9 @@ pub mod link;
pub mod metadata; pub mod metadata;
pub mod player; pub mod player;
pub mod stream; pub mod stream;
pub mod util;
pub mod version; pub mod version;
pub mod mixer; pub mod mixer;
pub mod messaging;
#[cfg(feature = "with-syntex")] include!(concat!(env!("OUT_DIR"), "/lib.rs")); #[cfg(feature = "with-syntex")] include!(concat!(env!("OUT_DIR"), "/lib.rs"));
#[cfg(not(feature = "with-syntex"))] include!("lib.in.rs"); #[cfg(not(feature = "with-syntex"))] include!("lib.in.rs");

View file

@ -4,10 +4,10 @@ use protobuf::{self, Message};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Cursor, Read, Write}; use std::io::{Cursor, Read, Write};
use std::mem::replace; use std::mem::replace;
use std::sync::mpsc;
use protocol; use protocol;
use session::{Session, PacketHandler}; use session::{Session, PacketHandler};
use messaging::{MercuryResponse, MercuryResponseSender};
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub enum MercuryMethod { pub enum MercuryMethod {
@ -24,9 +24,15 @@ pub struct MercuryRequest {
pub payload: Vec<Vec<u8>>, pub payload: Vec<Vec<u8>>,
} }
#[derive(Debug)]
pub struct MercuryResponse {
pub uri: String,
pub payload: Vec<Vec<u8>>,
}
enum MercuryCallback { enum MercuryCallback {
Future(eventual::Complete<MercuryResponse, ()>), Future(eventual::Complete<MercuryResponse, ()>),
Subscription(MercuryResponseSender), Subscription(mpsc::Sender<MercuryResponse>),
Channel, Channel,
} }
@ -39,7 +45,7 @@ pub struct MercuryPending {
pub struct MercuryManager { pub struct MercuryManager {
next_seq: u32, next_seq: u32,
pending: HashMap<Vec<u8>, MercuryPending>, pending: HashMap<Vec<u8>, MercuryPending>,
subscriptions: HashMap<String, MercuryResponseSender>, subscriptions: HashMap<String, mpsc::Sender<MercuryResponse>>,
} }
impl ToString for MercuryMethod { impl ToString for MercuryMethod {
@ -97,7 +103,9 @@ impl MercuryManager {
rx rx
} }
pub fn subscribe(&mut self, session: &Session, uri: String, tx: MercuryResponseSender) { pub fn subscribe(&mut self, session: &Session, uri: String) -> mpsc::Receiver<MercuryResponse> {
let (tx, rx) = mpsc::channel();
self.request_with_callback(session, self.request_with_callback(session,
MercuryRequest { MercuryRequest {
method: MercuryMethod::SUB, method: MercuryMethod::SUB,
@ -106,6 +114,8 @@ impl MercuryManager {
payload: Vec::new(), payload: Vec::new(),
}, },
MercuryCallback::Subscription(tx)); MercuryCallback::Subscription(tx));
rx
} }
fn parse_part(mut s: &mut Read) -> Vec<u8> { fn parse_part(mut s: &mut Read) -> Vec<u8> {
@ -118,7 +128,7 @@ impl MercuryManager {
fn complete_subscription(&mut self, fn complete_subscription(&mut self,
response: MercuryResponse, response: MercuryResponse,
tx: MercuryResponseSender) { tx: mpsc::Sender<MercuryResponse>) {
for sub_data in response.payload { for sub_data in response.payload {
if let Ok(mut sub) = if let Ok(mut sub) =
protobuf::parse_from_bytes::<protocol::pubsub::Subscription>(&sub_data) { protobuf::parse_from_bytes::<protocol::pubsub::Subscription>(&sub_data) {

View file

@ -1,22 +0,0 @@
pub struct UpdateMessage;
#[derive(Debug)]
pub struct MercuryResponse {
pub uri: String,
pub payload: Vec<Vec<u8>>,
}
pub enum SpircMessage {
MercuryMsg(MercuryResponse),
UpdateMsg(UpdateMessage)
}
implement_sender!(name => MercuryResponseSender,
wrap => MercuryResponse,
with => SpircMessage,
variant => MercuryMsg);
implement_sender!(name => UpdateMessageSender,
wrap => UpdateMessage,
with => SpircMessage,
variant => UpdateMsg);

View file

@ -1,11 +1,8 @@
use messaging::UpdateMessageSender;
use self::softmixer::SoftMixer; use self::softmixer::SoftMixer;
pub mod softmixer; pub mod softmixer;
pub trait Mixer { pub trait Mixer {
fn init(&mut self, UpdateMessageSender);
fn start(&self); fn start(&self);
fn stop(&self); fn stop(&self);
fn set_volume(&self, volume: u16); fn set_volume(&self, volume: u16);

View file

@ -1,29 +1,22 @@
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use messaging::{UpdateMessage, UpdateMessageSender};
use super::Mixer; use super::Mixer;
use super::AudioFilter; use super::AudioFilter;
pub struct SoftMixer { pub struct SoftMixer {
volume: Arc<AtomicUsize>, volume: Arc<AtomicUsize>
tx: Option<UpdateMessageSender>
} }
impl SoftMixer { impl SoftMixer {
pub fn new() -> SoftMixer { pub fn new() -> SoftMixer {
SoftMixer { SoftMixer {
volume: Arc::new(AtomicUsize::new(0xFFFF)), volume: Arc::new(AtomicUsize::new(0xFFFF))
tx: None
} }
} }
} }
impl Mixer for SoftMixer { impl Mixer for SoftMixer {
fn init(&mut self, tx: UpdateMessageSender) {
self.tx = Some(tx);
}
fn start(&self) { fn start(&self) {
} }
fn stop(&self) { fn stop(&self) {
@ -33,8 +26,6 @@ impl Mixer for SoftMixer {
} }
fn set_volume(&self, volume: u16) { fn set_volume(&self, volume: u16) {
self.volume.store(volume as usize, Ordering::Relaxed); self.volume.store(volume as usize, Ordering::Relaxed);
let tx = self.tx.as_ref().expect("SoftMixer not initialized");
tx.send(UpdateMessage).unwrap();
} }
fn get_audio_filter(&self) -> Option<Box<AudioFilter + Send>> { fn get_audio_filter(&self) -> Option<Box<AudioFilter + Send>> {
Some(Box::new(SoftVolumeApplier { volume: self.volume.clone() })) Some(Box::new(SoftVolumeApplier { volume: self.volume.clone() }))

View file

@ -9,7 +9,7 @@ use protobuf::{self, Message};
use rand::thread_rng; use rand::thread_rng;
use std::io::{Read, Write, Cursor}; use std::io::{Read, Write, Cursor};
use std::result::Result; use std::result::Result;
use std::sync::{Mutex, RwLock, Arc}; use std::sync::{Mutex, RwLock, Arc, mpsc};
use std::str::FromStr; use std::str::FromStr;
use album_cover::AlbumCover; use album_cover::AlbumCover;
@ -20,11 +20,10 @@ use authentication::Credentials;
use cache::Cache; use cache::Cache;
use connection::{self, PlainConnection, CipherConnection}; use connection::{self, PlainConnection, CipherConnection};
use diffie_hellman::DHLocalKeys; use diffie_hellman::DHLocalKeys;
use mercury::{MercuryManager, MercuryRequest}; use mercury::{MercuryManager, MercuryRequest, MercuryResponse};
use metadata::{MetadataManager, MetadataRef, MetadataTrait}; use metadata::{MetadataManager, MetadataRef, MetadataTrait};
use protocol; use protocol;
use stream::StreamManager; use stream::StreamManager;
use messaging::{MercuryResponse, MercuryResponseSender};
use util::{self, SpotifyId, FileId, ReadSeek}; use util::{self, SpotifyId, FileId, ReadSeek};
use version; use version;
@ -321,8 +320,8 @@ impl Session {
self.0.mercury.lock().unwrap().request(self, req) self.0.mercury.lock().unwrap().request(self, req)
} }
pub fn mercury_sub(&self, uri: String, tx: MercuryResponseSender) { pub fn mercury_sub(&self, uri: String) -> mpsc::Receiver<MercuryResponse> {
self.0.mercury.lock().unwrap().subscribe(self, uri, tx) self.0.mercury.lock().unwrap().subscribe(self, uri)
} }
pub fn cache(&self) -> &Cache { pub fn cache(&self) -> &Cache {

View file

@ -1,10 +1,9 @@
use eventual::Async; use eventual::Async;
use protobuf::{self, Message, RepeatedField}; use protobuf::{self, Message, RepeatedField};
use std::sync::{mpsc, Mutex, Arc}; use std::sync::{Mutex, Arc};
use std::collections::HashMap; use std::collections::HashMap;
use mercury::{MercuryRequest, MercuryMethod}; use mercury::{MercuryRequest, MercuryMethod};
use messaging::{SpircMessage, MercuryResponseSender, UpdateMessageSender};
use player::{Player, PlayerState}; use player::{Player, PlayerState};
use mixer::Mixer; use mixer::Mixer;
use session::Session; use session::Session;
@ -128,15 +127,8 @@ impl SpircManager {
pub fn run(&self) { pub fn run(&self) {
let rx = { let rx = {
let mut internal = self.0.lock().unwrap(); let mut internal = self.0.lock().unwrap();
let (tx, rx) = mpsc::channel::<SpircMessage>();
let mercury_response_sender = MercuryResponseSender::create(tx.clone()); let rx = internal.session.mercury_sub(internal.uri());
internal.session.mercury_sub(internal.uri(), mercury_response_sender);
let update_message_sender = UpdateMessageSender::create(tx.clone());
internal.mixer.init(update_message_sender);
internal.notify(true, None); internal.notify(true, None);
@ -153,25 +145,18 @@ impl SpircManager {
rx rx
}; };
for msg in rx { for pkt in rx {
match msg { let data = pkt.payload.first().unwrap();
SpircMessage::MercuryMsg(pkt) => { let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(data).unwrap();
let data = pkt.payload.first().unwrap();
let frame = protobuf::parse_from_bytes::<protocol::spirc::Frame>(data).unwrap();
debug!("{:?} {:?} {} {} {}", debug!("{:?} {:?} {} {} {}",
frame.get_typ(), frame.get_typ(),
frame.get_device_state().get_name(), frame.get_device_state().get_name(),
frame.get_ident(), frame.get_ident(),
frame.get_seq_nr(), frame.get_seq_nr(),
frame.get_state_update_id()); frame.get_state_update_id());
self.0.lock().unwrap().handle(frame); self.0.lock().unwrap().handle(frame);
}
SpircMessage::UpdateMsg(_) => {
self.0.lock().unwrap().notify(false, None);
}
}
} }
} }
@ -321,6 +306,7 @@ impl SpircInternal {
} }
MessageType::kMessageTypeVolume => { MessageType::kMessageTypeVolume => {
self.mixer.set_volume(frame.get_volume() as u16); self.mixer.set_volume(frame.get_volume() as u16);
self.notify(false, None);
} }
MessageType::kMessageTypeGoodbye => { MessageType::kMessageTypeGoodbye => {
if frame.has_ident() { if frame.has_ident() {

View file

@ -1,102 +0,0 @@
/// Creates an implentation of sender which can be used to share an async channel for multiple message types.
///
/// # Examples
///
/// ```
/// struct D;
/// struct A;
///
/// enum Msg {
/// FirstType(D),
/// SecondType(A)
/// }
///
/// fn main() {
/// let (tx, rx) = channel::<Msg>();
///
/// let d_sender = DSender::create(tx.clone());
/// let a_sender = ASender::create(tx.clone());
/// subscribe(d_sender.clone());
/// subscribe2(d_sender.clone());
/// subscribe3(a_sender.clone());
//
/// let mut i = 0;
//
/// for m in rx {
/// i += 1;
/// match m {
/// Msg::FirstType(_) => println!("m: D {}", i),
/// Msg::SecondType(_) => println!("m: A {}", i)
/// };
/// }
/// }
///
/// fn subscribe(sender: DSender) {
/// thread::spawn(move|| {
/// sender.send(D).unwrap();
/// });
/// }
/// fn subscribe2(sender: DSender) {
/// thread::spawn(move|| {
/// thread::sleep(time::Duration::from_millis(10));
/// sender.send(D).unwrap();
/// });
/// }
/// fn subscribe3(sender: ASender) {
/// thread::spawn(move|| {
/// sender.send(A).unwrap();
/// });
/// }
/// implement_sender!(name => DSender,
/// wrap => D,
/// with => Msg,
/// variant => FirstType)
/// implement_sender!(name => ASender,
/// wrap => A,
/// with => Msg,
/// variant => SecondType)
/// ```
macro_rules! implement_sender {
(name => $name:ident,
wrap => $wrap_type:ident,
with => $with_type:ident,
variant => $variant:ident) => {
pub struct $name {
wrapped_sender: ::std::sync::mpsc::Sender<$with_type>,
}
impl $name {
pub fn create(sender: ::std::sync::mpsc::Sender<$with_type>) -> $name {
$name {
wrapped_sender: sender
}
}
pub fn send(&self, t: $wrap_type) -> Result<(), ::std::sync::mpsc::SendError<$wrap_type>> {
let wrapped = self.wrap(t);
let result = self.wrapped_sender.send(wrapped);
result.map_err(|senderror| {
let ::std::sync::mpsc::SendError(z) = senderror;
::std::sync::mpsc::SendError(self.unwrap(z))
})
}
fn wrap(&self, d: $wrap_type) -> $with_type {
$with_type::$variant(d)
}
fn unwrap(&self, msg: $with_type) -> $wrap_type {
let d = match msg {
$with_type::$variant(d) => d,
_ => unreachable!()
};
d
}
}
impl Clone for $name {
fn clone(&self) -> $name {
$name {
wrapped_sender: self.wrapped_sender.clone()
}
}
}
}
}

View file

@ -11,8 +11,6 @@ mod int128;
mod spotify_id; mod spotify_id;
mod arcvec; mod arcvec;
mod subfile; mod subfile;
#[macro_use]
mod channel;
pub use util::int128::u128; pub use util::int128::u128;
pub use util::spotify_id::{SpotifyId, FileId}; pub use util::spotify_id::{SpotifyId, FileId};