From 347bf05dbe26d04e1e7a50c1acefb7049614787a Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 20 Jan 2017 20:39:05 +0100 Subject: [PATCH 01/18] Refactored to have apply_volume in a specifix mixer --- src/lib.rs | 1 + src/main.rs | 5 ++++- src/mixer/mod.rs | 14 +++++++++++++ src/mixer/softmixer.rs | 46 ++++++++++++++++++++++++++++++++++++++++++ src/player.rs | 33 +++++++++--------------------- 5 files changed, 75 insertions(+), 24 deletions(-) create mode 100644 src/mixer/mod.rs create mode 100644 src/mixer/softmixer.rs diff --git a/src/lib.rs b/src/lib.rs index 31671d0c..59e46547 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,6 +60,7 @@ pub mod player; pub mod stream; pub mod util; pub mod version; +pub mod mixer; #[cfg(feature = "with-syntex")] include!(concat!(env!("OUT_DIR"), "/lib.rs")); #[cfg(not(feature = "with-syntex"))] include!("lib.in.rs"); diff --git a/src/main.rs b/src/main.rs index cfe1b9e8..038d2cea 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ use librespot::audio_backend::{self, BACKENDS}; use librespot::cache::{Cache, DefaultCache, NoCache}; use librespot::player::Player; use librespot::session::{Bitrate, Config, Session}; +use librespot::mixer::softmixer::SoftMixer; use librespot::version; fn usage(program: &str, opts: &getopts::Options) -> String { @@ -120,8 +121,10 @@ fn setup(args: &[String]) -> (Session, Player) { matches.opt_str("password")); session.login(credentials).unwrap(); + let mixer = SoftMixer::new(); + let device_name = matches.opt_str("device"); - let player = Player::new(session.clone(), move || { + let player = Player::new(session.clone(), Box::new(mixer), move || { (backend)(device_name.as_ref().map(AsRef::as_ref)) }); diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs new file mode 100644 index 00000000..27bbdacd --- /dev/null +++ b/src/mixer/mod.rs @@ -0,0 +1,14 @@ +use std::borrow::Cow; + +pub mod softmixer; + +pub trait Mixer { + fn init(&mut self); + fn inuse(&mut self); + fn release(&mut self); + fn set(&mut self, volume: u16); + fn volume(&self) -> u16; + fn apply_volume<'a>(&mut self, data: &'a [i16]) -> Cow<'a, [i16]> { + Cow::Borrowed(data) + } +} \ No newline at end of file diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs new file mode 100644 index 00000000..2dedf67b --- /dev/null +++ b/src/mixer/softmixer.rs @@ -0,0 +1,46 @@ +use super::Mixer; +use std::borrow::Cow; + +pub struct SoftMixer { + volume: u16, +} + +impl SoftMixer { + pub fn new() -> SoftMixer { + SoftMixer { + volume: 0xFFFF + } + } +} + +impl Mixer for SoftMixer { + fn init(&mut self) { + } + + fn inuse(&mut self) { + } + + fn release(&mut self) { + } + + fn set(&mut self, volume: u16) { + self.volume = volume; + } + + fn volume(&self) -> u16 { + self.volume + } + fn apply_volume<'a>(&mut self, data: &'a [i16]) -> Cow<'a, [i16]> { + if self.volume == 0xFFFF { + Cow::Borrowed(data) + } else { + Cow::Owned(data.iter() + .map(|&x| { + (x as i32 + * self.volume as i32 + / 0xFFFF) as i16 + }) + .collect()) + } + } +} \ No newline at end of file diff --git a/src/player.rs b/src/player.rs index 60a5ad01..430a38d6 100644 --- a/src/player.rs +++ b/src/player.rs @@ -7,6 +7,7 @@ use vorbis; use audio_decrypt::AudioDecrypt; use audio_backend::Sink; +use mixer::Mixer; use metadata::{FileFormat, Track, TrackRef}; use session::{Bitrate, Session}; use util::{self, ReadSeek, SpotifyId, Subfile}; @@ -74,8 +75,9 @@ enum PlayerCommand { } impl Player { - pub fn new(session: Session, sink_builder: F) -> Player - where F: FnOnce() -> Box + Send + 'static { + pub fn new(session: Session, mixer: Box, sink_builder: F) -> Player + where F: FnOnce() -> Box + Send + 'static, + M: Mixer + Send + 'static { let (cmd_tx, cmd_rx) = mpsc::channel(); let state = Arc::new(Mutex::new(PlayerState { @@ -83,7 +85,7 @@ impl Player { position_ms: 0, position_measured_at: 0, update_time: util::now_ms(), - volume: 0xFFFF, + volume: mixer.volume(), track: None, end_of_track: false, })); @@ -97,7 +99,7 @@ impl Player { observers: observers.clone(), }; - thread::spawn(move || internal.run(sink_builder())); + thread::spawn(move || internal.run(sink_builder(), mixer)); Player { commands: cmd_tx, @@ -147,21 +149,6 @@ impl Player { } } -fn apply_volume(volume: u16, data: &[i16]) -> Cow<[i16]> { - // Fast path when volume is 100% - if volume == 0xFFFF { - Cow::Borrowed(data) - } else { - Cow::Owned(data.iter() - .map(|&x| { - (x as i32 - * volume as i32 - / 0xFFFF) as i16 - }) - .collect()) - } -} - fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option> { if track.available { Some(Cow::Borrowed(track)) @@ -229,7 +216,7 @@ fn run_onstop(session: &Session) { } impl PlayerInternal { - fn run(self, mut sink: Box) { + fn run(self, mut sink: Box, mut mixer: Box) { let mut decoder = None; loop { @@ -344,8 +331,9 @@ impl PlayerInternal { run_onstop(&self.session); } Some(PlayerCommand::Volume(vol)) => { + mixer.set(vol); self.update(|state| { - state.volume = vol; + state.volume = mixer.volume(); true }); } @@ -371,8 +359,7 @@ impl PlayerInternal { match packet { Some(Ok(packet)) => { - let buffer = apply_volume(self.state.lock().unwrap().volume, - &packet.data); + let buffer = mixer.apply_volume(&packet.data); sink.write(&buffer).unwrap(); self.update(|state| { From 59398b3ceecdc1cb20a3485b6d534d144550f283 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Wed, 25 Jan 2017 22:09:03 +0100 Subject: [PATCH 02/18] Remove mixer from Player and add it to SpircManager --- src/main.rs | 12 ++++--- src/mixer/mod.rs | 2 +- src/mixer/softmixer.rs | 2 +- src/player.rs | 39 ++++------------------ src/spirc.rs | 73 +++++++++++++++++++----------------------- 5 files changed, 48 insertions(+), 80 deletions(-) diff --git a/src/main.rs b/src/main.rs index 038d2cea..defac467 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,6 +19,8 @@ use librespot::cache::{Cache, DefaultCache, NoCache}; use librespot::player::Player; use librespot::session::{Bitrate, Config, Session}; use librespot::mixer::softmixer::SoftMixer; +use librespot::mixer::Mixer; + use librespot::version; fn usage(program: &str, opts: &getopts::Options) -> String { @@ -60,7 +62,7 @@ fn list_backends() { } } -fn setup(args: &[String]) -> (Session, Player) { +fn setup(args: &[String]) -> (Session, Player, Box) { let mut opts = getopts::Options::new(); opts.optopt("c", "cache", "Path to a directory where files will be cached.", "CACHE") .reqopt("n", "name", "Device name", "NAME") @@ -124,18 +126,18 @@ fn setup(args: &[String]) -> (Session, Player) { let mixer = SoftMixer::new(); let device_name = matches.opt_str("device"); - let player = Player::new(session.clone(), Box::new(mixer), move || { + let player = Player::new(session.clone(), move || { (backend)(device_name.as_ref().map(AsRef::as_ref)) }); - (session, player) + (session, player, Box::new(mixer)) } fn main() { let args: Vec = std::env::args().collect(); - let (session, player) = setup(&args); + let (session, player, mixer) = setup(&args); - let spirc = SpircManager::new(session.clone(), player); + let spirc = SpircManager::new(session.clone(), player, mixer); let spirc_signal = spirc.clone(); thread::spawn(move || spirc.run()); diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index 27bbdacd..285bde0c 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -6,7 +6,7 @@ pub trait Mixer { fn init(&mut self); fn inuse(&mut self); fn release(&mut self); - fn set(&mut self, volume: u16); + fn set_volume(&mut self, volume: u16); fn volume(&self) -> u16; fn apply_volume<'a>(&mut self, data: &'a [i16]) -> Cow<'a, [i16]> { Cow::Borrowed(data) diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index 2dedf67b..8a36fa90 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -23,7 +23,7 @@ impl Mixer for SoftMixer { fn release(&mut self) { } - fn set(&mut self, volume: u16) { + fn set_volume(&mut self, volume: u16) { self.volume = volume; } diff --git a/src/player.rs b/src/player.rs index 430a38d6..bc38263b 100644 --- a/src/player.rs +++ b/src/player.rs @@ -7,7 +7,6 @@ use vorbis; use audio_decrypt::AudioDecrypt; use audio_backend::Sink; -use mixer::Mixer; use metadata::{FileFormat, Track, TrackRef}; use session::{Bitrate, Session}; use util::{self, ReadSeek, SpotifyId, Subfile}; @@ -48,8 +47,6 @@ pub struct PlayerState { pub status: PlayStatus, pub position_ms: u32, pub position_measured_at: i64, - pub update_time: i64, - pub volume: u16, pub track: Option, pub end_of_track: bool, @@ -68,24 +65,20 @@ enum PlayerCommand { Load(SpotifyId, bool, u32), Play, Pause, - Volume(u16), Stop, Seek(u32), SeekAt(u32, i64), } impl Player { - pub fn new(session: Session, mixer: Box, sink_builder: F) -> Player - where F: FnOnce() -> Box + Send + 'static, - M: Mixer + Send + 'static { + pub fn new(session: Session, sink_builder: F) -> Player + where F: FnOnce() -> Box + Send + 'static { let (cmd_tx, cmd_rx) = mpsc::channel(); let state = Arc::new(Mutex::new(PlayerState { status: PlayStatus::kPlayStatusStop, position_ms: 0, position_measured_at: 0, - update_time: util::now_ms(), - volume: mixer.volume(), track: None, end_of_track: false, })); @@ -99,7 +92,7 @@ impl Player { observers: observers.clone(), }; - thread::spawn(move || internal.run(sink_builder(), mixer)); + thread::spawn(move || internal.run(sink_builder())); Player { commands: cmd_tx, @@ -140,10 +133,6 @@ impl Player { self.state.lock().unwrap().clone() } - pub fn volume(&self, vol: u16) { - self.command(PlayerCommand::Volume(vol)); - } - pub fn add_observer(&self, observer: PlayerObserver) { self.observers.lock().unwrap().push(observer); } @@ -216,7 +205,7 @@ fn run_onstop(session: &Session) { } impl PlayerInternal { - fn run(self, mut sink: Box, mut mixer: Box) { + fn run(self, mut sink: Box) { let mut decoder = None; loop { @@ -321,7 +310,6 @@ impl PlayerInternal { Some(PlayerCommand::Pause) => { self.update(|state| { state.status = PlayStatus::kPlayStatusPause; - state.update_time = util::now_ms(); state.position_ms = decoder.as_mut().map(|d| vorbis_time_tell_ms(d).unwrap()).unwrap_or(0) as u32; state.position_measured_at = util::now_ms(); true @@ -330,13 +318,6 @@ impl PlayerInternal { sink.stop().unwrap(); run_onstop(&self.session); } - Some(PlayerCommand::Volume(vol)) => { - mixer.set(vol); - self.update(|state| { - state.volume = mixer.volume(); - true - }); - } Some(PlayerCommand::Stop) => { self.update(|state| { if state.status == PlayStatus::kPlayStatusPlay { @@ -359,7 +340,8 @@ impl PlayerInternal { match packet { Some(Ok(packet)) => { - let buffer = mixer.apply_volume(&packet.data); + //let buffer = mixer.apply_volume(&packet.data); + let buffer = Cow::Borrowed(&packet.data); sink.write(&buffer).unwrap(); self.update(|state| { @@ -395,7 +377,6 @@ impl PlayerInternal { let observers = self.observers.lock().unwrap(); if update { - guard.update_time = util::now_ms(); let state = guard.clone(); drop(guard); @@ -415,14 +396,6 @@ impl PlayerState { (self.position_ms, self.position_measured_at) } - pub fn volume(&self) -> u16 { - self.volume - } - - pub fn update_time(&self) -> i64 { - self.update_time - } - pub fn end_of_track(&self) -> bool { self.end_of_track } diff --git a/src/spirc.rs b/src/spirc.rs index 674fd47a..6688e834 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -6,6 +6,7 @@ use std::collections::HashMap; use mercury::{MercuryRequest, MercuryMethod}; use player::{Player, PlayerState}; +use mixer::Mixer; use session::Session; use util; use util::SpotifyId; @@ -20,6 +21,7 @@ pub struct SpircManager(Arc>); struct SpircInternal { player: Player, session: Session, + mixer: Box, seq_nr: u32, @@ -43,14 +45,26 @@ struct SpircInternal { devices: HashMap, } +#[derive(Clone)] +pub struct State { + pub status: PlayStatus, + pub position_ms: u32, + pub position_measured_at: i64, + pub update_time: i64, + pub volume: u16, + pub track: Option, + pub end_of_track: bool, +} + impl SpircManager { - pub fn new(session: Session, player: Player) -> SpircManager { + pub fn new(session: Session, player: Player, mixer: Box) -> SpircManager { let ident = session.device_id().to_owned(); let name = session.config().device_name.clone(); SpircManager(Arc::new(Mutex::new(SpircInternal { player: player, session: session, + mixer: mixer, seq_nr: 0, @@ -184,7 +198,7 @@ impl SpircInternal { let track = self.tracks[self.index as usize]; self.player.load(track, true, 0); } else { - self.notify_with_player_state(false, None, player_state); + self.notify(false, None); } } @@ -253,7 +267,7 @@ impl SpircInternal { } } MessageType::kMessageTypeVolume => { - self.player.volume(frame.get_volume() as u16); + self.mixer.set_volume(frame.get_volume() as u16); } MessageType::kMessageTypeGoodbye => { if frame.has_ident() { @@ -287,30 +301,11 @@ impl SpircInternal { cs.send(); } - fn notify_with_player_state(&mut self, - hello: bool, - recipient: Option<&str>, - player_state: &PlayerState) { - let mut cs = CommandSender::new(self, - if hello { - MessageType::kMessageTypeHello - } else { - MessageType::kMessageTypeNotify - }) - .player_state(player_state); - if let Some(s) = recipient { - cs = cs.recipient(&s); - } - cs.send(); - } - - fn spirc_state(&self, player_state: &PlayerState) -> protocol::spirc::State { - let (position_ms, position_measured_at) = player_state.position(); - + fn spirc_state(&self, state: &State) -> protocol::spirc::State { protobuf_init!(protocol::spirc::State::new(), { - status: player_state.status(), - position_ms: position_ms, - position_measured_at: position_measured_at as u64, + status: state.status, + position_ms: state.position_ms, + position_measured_at: state.position_measured_at as u64, playing_track_index: self.index, track: self.tracks.iter().map(|track| { @@ -329,12 +324,12 @@ impl SpircInternal { }) } - fn device_state(&self, player_state: &PlayerState) -> protocol::spirc::DeviceState { + fn device_state(&self, state: &State) -> protocol::spirc::DeviceState { protobuf_init!(protocol::spirc::DeviceState::new(), { sw_version: version::version_string(), is_active: self.is_active, can_play: self.can_play, - volume: player_state.volume() as u32, + volume: state.volume as u32, name: self.name.clone(), error_code: 0, became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 }, @@ -398,7 +393,6 @@ struct CommandSender<'a> { spirc_internal: &'a mut SpircInternal, cmd: MessageType, recipient: Option<&'a str>, - player_state: Option<&'a PlayerState>, state: Option, } @@ -408,7 +402,6 @@ impl<'a> CommandSender<'a> { spirc_internal: spirc_internal, cmd: cmd, recipient: None, - player_state: None, state: None, } } @@ -418,21 +411,21 @@ impl<'a> CommandSender<'a> { self } - fn player_state(mut self, s: &'a PlayerState) -> CommandSender { - self.player_state = Some(s); - self - } - fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> { self.state = Some(s); self } fn send(self) { - let state = self.player_state.map_or_else(|| { - Cow::Owned(self.spirc_internal.player.state()) - }, |s| { - Cow::Borrowed(s) + //TODO: get data + let state = Cow::Owned(State { + status: PlayStatus::kPlayStatusStop, + position_ms: 0, + position_measured_at: 0, + update_time: util::now_ms(), + volume: 0, + track: None, + end_of_track: false, }); let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { @@ -445,7 +438,7 @@ impl<'a> CommandSender<'a> { self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) ), device_state: self.spirc_internal.device_state(&state), - state_update_id: state.update_time() + state_update_id: state.update_time }); if self.spirc_internal.is_active { From 464e85e285273341a30038ae112331913ac198f1 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Wed, 25 Jan 2017 22:22:53 +0100 Subject: [PATCH 03/18] Split mixer into Mixer and StreamEditor --- src/mixer/mod.rs | 17 +++++++++++------ src/mixer/softmixer.rs | 42 ++++++++++++++++++++++++++---------------- 2 files changed, 37 insertions(+), 22 deletions(-) diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index 285bde0c..cb67a6d7 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -3,12 +3,17 @@ use std::borrow::Cow; pub mod softmixer; pub trait Mixer { - fn init(&mut self); - fn inuse(&mut self); - fn release(&mut self); - fn set_volume(&mut self, volume: u16); + fn init(&self); + fn start(&self); + fn stop(&self); + fn set_volume(&self, volume: u16); fn volume(&self) -> u16; - fn apply_volume<'a>(&mut self, data: &'a [i16]) -> Cow<'a, [i16]> { - Cow::Borrowed(data) + fn get_stream_editor(&self) -> Option> + { + None } +} + +pub trait StreamEditor { + fn modify_stream<'a>(&self, data: &'a [i16]) -> Cow<'a, [i16]>; } \ No newline at end of file diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index 8a36fa90..7dfd1f6b 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -1,43 +1,53 @@ use super::Mixer; +use super::StreamEditor; use std::borrow::Cow; +use std::sync::{Arc, RwLock}; pub struct SoftMixer { - volume: u16, + volume: Arc> } impl SoftMixer { pub fn new() -> SoftMixer { SoftMixer { - volume: 0xFFFF + volume: Arc::new(RwLock::new(0xFFFF)) } } } impl Mixer for SoftMixer { - fn init(&mut self) { + fn init(&self) { } - - fn inuse(&mut self) { + fn start(&self) { } - - fn release(&mut self) { + fn stop(&self) { } - - fn set_volume(&mut self, volume: u16) { - self.volume = volume; - } - fn volume(&self) -> u16 { - self.volume + *self.volume.read().unwrap() } - fn apply_volume<'a>(&mut self, data: &'a [i16]) -> Cow<'a, [i16]> { - if self.volume == 0xFFFF { + fn set_volume(&self, volume: u16) { + *self.volume.write().unwrap() = volume; + } + fn get_stream_editor(&self) -> Option> { + let vol = self.volume.clone(); + Some(Box::new(SoftVolumeApplier { get_volume: Box::new(move || *vol.read().unwrap() ) })) + } +} + +struct SoftVolumeApplier { + get_volume: Box u16> +} + +impl StreamEditor for SoftVolumeApplier { + fn modify_stream<'a>(&self, data: &'a [i16]) -> Cow<'a, [i16]> { + let volume = (self.get_volume)(); + if volume == 0xFFFF { Cow::Borrowed(data) } else { Cow::Owned(data.iter() .map(|&x| { (x as i32 - * self.volume as i32 + * volume as i32 / 0xFFFF) as i16 }) .collect()) From 6df2af0ac9cd7eddaea134bed1444b7a02ff2c26 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Wed, 25 Jan 2017 22:49:18 +0100 Subject: [PATCH 04/18] Select mixer dynamically --- src/main.rs | 13 ++++++++----- src/mixer/mod.rs | 7 +++++++ 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/main.rs b/src/main.rs index defac467..ac460665 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,8 +18,7 @@ use librespot::audio_backend::{self, BACKENDS}; use librespot::cache::{Cache, DefaultCache, NoCache}; use librespot::player::Player; use librespot::session::{Bitrate, Config, Session}; -use librespot::mixer::softmixer::SoftMixer; -use librespot::mixer::Mixer; +use librespot::mixer::{self, Mixer}; use librespot::version; @@ -73,7 +72,8 @@ fn setup(args: &[String]) -> (Session, Player, Box) { .optopt("u", "username", "Username to sign in with", "USERNAME") .optopt("p", "password", "Password", "PASSWORD") .optopt("", "backend", "Audio backend to use. Use '?' to list options", "BACKEND") - .optopt("", "device", "Audio device to use. Use '?' to list options", "DEVICE"); + .optopt("", "device", "Audio device to use. Use '?' to list options", "DEVICE") + .optopt("", "mixer", "Mixer to use", "MIXER"); let matches = match opts.parse(&args[1..]) { Ok(m) => m, @@ -123,14 +123,17 @@ fn setup(args: &[String]) -> (Session, Player, Box) { matches.opt_str("password")); session.login(credentials).unwrap(); - let mixer = SoftMixer::new(); + + let mixer_name = matches.opt_str("mixer").unwrap_or("SoftMixer".to_owned()); + + let mixer = mixer::find(&mixer_name).unwrap(); let device_name = matches.opt_str("device"); let player = Player::new(session.clone(), move || { (backend)(device_name.as_ref().map(AsRef::as_ref)) }); - (session, player, Box::new(mixer)) + (session, player, mixer) } fn main() { diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index cb67a6d7..e4a2cb76 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -16,4 +16,11 @@ pub trait Mixer { pub trait StreamEditor { fn modify_stream<'a>(&self, data: &'a [i16]) -> Cow<'a, [i16]>; +} + +pub fn find(s: &str) -> Option> { + match s { + "SoftMixer" => Some(Box::new(softmixer::SoftMixer::new())), + _ => None, + } } \ No newline at end of file From 636de3fe71b84aed447724b2a8e03a17f330849f Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Wed, 25 Jan 2017 22:56:06 +0100 Subject: [PATCH 05/18] Use stream_editor in player --- src/main.rs | 2 +- src/mixer/mod.rs | 2 +- src/mixer/softmixer.rs | 4 ++-- src/player.rs | 18 +++++++++++++----- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index ac460665..7fbad5fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -129,7 +129,7 @@ fn setup(args: &[String]) -> (Session, Player, Box) { let mixer = mixer::find(&mixer_name).unwrap(); let device_name = matches.opt_str("device"); - let player = Player::new(session.clone(), move || { + let player = Player::new(session.clone(), mixer.get_stream_editor(), move || { (backend)(device_name.as_ref().map(AsRef::as_ref)) }); diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index e4a2cb76..bcd4564c 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -8,7 +8,7 @@ pub trait Mixer { fn stop(&self); fn set_volume(&self, volume: u16); fn volume(&self) -> u16; - fn get_stream_editor(&self) -> Option> + fn get_stream_editor(&self) -> Option> { None } diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index 7dfd1f6b..15333873 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -28,14 +28,14 @@ impl Mixer for SoftMixer { fn set_volume(&self, volume: u16) { *self.volume.write().unwrap() = volume; } - fn get_stream_editor(&self) -> Option> { + fn get_stream_editor(&self) -> Option> { let vol = self.volume.clone(); Some(Box::new(SoftVolumeApplier { get_volume: Box::new(move || *vol.read().unwrap() ) })) } } struct SoftVolumeApplier { - get_volume: Box u16> + get_volume: Box u16 + Send> } impl StreamEditor for SoftVolumeApplier { diff --git a/src/player.rs b/src/player.rs index bc38263b..a41ab489 100644 --- a/src/player.rs +++ b/src/player.rs @@ -9,6 +9,7 @@ use audio_decrypt::AudioDecrypt; use audio_backend::Sink; use metadata::{FileFormat, Track, TrackRef}; use session::{Bitrate, Session}; +use mixer::StreamEditor; use util::{self, ReadSeek, SpotifyId, Subfile}; pub use spirc::PlayStatus; @@ -71,7 +72,7 @@ enum PlayerCommand { } impl Player { - pub fn new(session: Session, sink_builder: F) -> Player + pub fn new(session: Session, stream_editor: Option>, sink_builder: F) -> Player where F: FnOnce() -> Box + Send + 'static { let (cmd_tx, cmd_rx) = mpsc::channel(); @@ -92,7 +93,7 @@ impl Player { observers: observers.clone(), }; - thread::spawn(move || internal.run(sink_builder())); + thread::spawn(move || internal.run(sink_builder(), stream_editor)); Player { commands: cmd_tx, @@ -138,6 +139,10 @@ impl Player { } } +fn borrow_data<'a>(data: &'a [i16]) -> Cow<'a, [i16]> { + Cow::Borrowed(&data) +} + fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option> { if track.available { Some(Cow::Borrowed(track)) @@ -205,7 +210,7 @@ fn run_onstop(session: &Session) { } impl PlayerInternal { - fn run(self, mut sink: Box) { + fn run(self, mut sink: Box, stream_editor: Option>) { let mut decoder = None; loop { @@ -340,8 +345,11 @@ impl PlayerInternal { match packet { Some(Ok(packet)) => { - //let buffer = mixer.apply_volume(&packet.data); - let buffer = Cow::Borrowed(&packet.data); + let buffer = if let Some(ref editor) = stream_editor { + editor.modify_stream(&packet.data) + } else { + borrow_data(&packet.data) + }; sink.write(&buffer).unwrap(); self.update(|state| { From e547a0c3da8764abeba92c7e51e07ad2097e0df5 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 27 Jan 2017 14:20:31 +0100 Subject: [PATCH 06/18] Changes code review --- src/main.rs | 11 +++++------ src/mixer/mod.rs | 13 ++++++------- src/mixer/softmixer.rs | 9 +++++---- src/player.rs | 6 +++--- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7fbad5fe..72e0113c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -122,14 +122,13 @@ fn setup(args: &[String]) -> (Session, Player, Box) { let credentials = get_credentials(&session, matches.opt_str("username"), matches.opt_str("password")); session.login(credentials).unwrap(); - - - let mixer_name = matches.opt_str("mixer").unwrap_or("SoftMixer".to_owned()); - - let mixer = mixer::find(&mixer_name).unwrap(); + + let mixer_name = matches.opt_str("mixer"); + let mixer = mixer::find(mixer_name.as_ref()).expect("Invalid mixer"); + let audio_filter = mixer.get_audio_filter(); let device_name = matches.opt_str("device"); - let player = Player::new(session.clone(), mixer.get_stream_editor(), move || { + let player = Player::new(session.clone(), audio_filter, move || { (backend)(device_name.as_ref().map(AsRef::as_ref)) }); diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index bcd4564c..bc19bf8e 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -1,4 +1,5 @@ use std::borrow::Cow; +use self::softmixer::SoftMixer; pub mod softmixer; @@ -8,19 +9,17 @@ pub trait Mixer { fn stop(&self); fn set_volume(&self, volume: u16); fn volume(&self) -> u16; - fn get_stream_editor(&self) -> Option> - { + fn get_audio_filter(&self) -> Option> { None } } -pub trait StreamEditor { +pub trait AudioFilter { fn modify_stream<'a>(&self, data: &'a [i16]) -> Cow<'a, [i16]>; } -pub fn find(s: &str) -> Option> { - match s { - "SoftMixer" => Some(Box::new(softmixer::SoftMixer::new())), - _ => None, +pub fn find>(name: Option) -> Option> { + match name { + _ => Some(Box::new(SoftMixer::new())), } } \ No newline at end of file diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index 15333873..cbe7477f 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -1,5 +1,5 @@ use super::Mixer; -use super::StreamEditor; +use super::AudioFilter; use std::borrow::Cow; use std::sync::{Arc, RwLock}; @@ -28,9 +28,10 @@ impl Mixer for SoftMixer { fn set_volume(&self, volume: u16) { *self.volume.write().unwrap() = volume; } - fn get_stream_editor(&self) -> Option> { + fn get_audio_filter(&self) -> Option> { let vol = self.volume.clone(); - Some(Box::new(SoftVolumeApplier { get_volume: Box::new(move || *vol.read().unwrap() ) })) + let get_volume = Box::new(move || *vol.read().unwrap()); + Some(Box::new(SoftVolumeApplier { get_volume: get_volume })) } } @@ -38,7 +39,7 @@ struct SoftVolumeApplier { get_volume: Box u16 + Send> } -impl StreamEditor for SoftVolumeApplier { +impl AudioFilter for SoftVolumeApplier { fn modify_stream<'a>(&self, data: &'a [i16]) -> Cow<'a, [i16]> { let volume = (self.get_volume)(); if volume == 0xFFFF { diff --git a/src/player.rs b/src/player.rs index a41ab489..f03a592b 100644 --- a/src/player.rs +++ b/src/player.rs @@ -9,7 +9,7 @@ use audio_decrypt::AudioDecrypt; use audio_backend::Sink; use metadata::{FileFormat, Track, TrackRef}; use session::{Bitrate, Session}; -use mixer::StreamEditor; +use mixer::AudioFilter; use util::{self, ReadSeek, SpotifyId, Subfile}; pub use spirc::PlayStatus; @@ -72,7 +72,7 @@ enum PlayerCommand { } impl Player { - pub fn new(session: Session, stream_editor: Option>, sink_builder: F) -> Player + pub fn new(session: Session, stream_editor: Option>, sink_builder: F) -> Player where F: FnOnce() -> Box + Send + 'static { let (cmd_tx, cmd_rx) = mpsc::channel(); @@ -210,7 +210,7 @@ fn run_onstop(session: &Session) { } impl PlayerInternal { - fn run(self, mut sink: Box, stream_editor: Option>) { + fn run(self, mut sink: Box, stream_editor: Option>) { let mut decoder = None; loop { From 37916330b41443b64e34c606c73eb975f3ee9e90 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Tue, 31 Jan 2017 21:37:58 +0100 Subject: [PATCH 07/18] Add macro to be able to create wrapped senders to send multiple message types to one channel --- src/lib.rs | 4 +++- src/util/channel.rs | 44 ++++++++++++++++++++++++++++++++++++++++++++ src/util/mod.rs | 2 ++ 3 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 src/util/channel.rs diff --git a/src/lib.rs b/src/lib.rs index 59e46547..bdf86147 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,9 @@ #![cfg_attr(feature="clippy", feature(plugin))] #![cfg_attr(feature="clippy", plugin(clippy))] +#[macro_use] +pub mod util; + #[macro_use] extern crate lazy_static; #[macro_use] extern crate log; @@ -58,7 +61,6 @@ pub mod link; pub mod metadata; pub mod player; pub mod stream; -pub mod util; pub mod version; pub mod mixer; diff --git a/src/util/channel.rs b/src/util/channel.rs new file mode 100644 index 00000000..982b4c27 --- /dev/null +++ b/src/util/channel.rs @@ -0,0 +1,44 @@ +macro_rules! implement_sender { + (name => $name:ident, + wrap => $wrap_type:ident, + with => $with_type:ident, + variant => $variant:ident) => { + pub struct $name { + wrapped_sender: ::std::sync::mpsc::Sender<$with_type>, + } + + impl $name { + pub fn create(sender: ::std::sync::mpsc::Sender<$with_type>) -> $name { + $name { + wrapped_sender: sender + } + } + pub fn send(&self, t: $wrap_type) -> Result<(), ::std::sync::mpsc::SendError<$wrap_type>> { + let wrapped = self.wrap(t); + let result = self.wrapped_sender.send(wrapped); + result.map_err(|senderror| { + let ::std::sync::mpsc::SendError(z) = senderror; + ::std::sync::mpsc::SendError(self.unwrap(z)) + }) + } + fn wrap(&self, d: $wrap_type) -> $with_type { + $with_type::$variant(d) + } + fn unwrap(&self, msg: $with_type) -> $wrap_type { + let d = match msg { + $with_type::$variant(d) => d, + _ => unreachable!() + }; + d + } + } + + impl Clone for $name { + fn clone(&self) -> $name { + $name { + wrapped_sender: self.wrapped_sender.clone() + } + } + } + } +} \ No newline at end of file diff --git a/src/util/mod.rs b/src/util/mod.rs index 0683f6a0..f550f526 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -11,6 +11,8 @@ mod int128; mod spotify_id; mod arcvec; mod subfile; +#[macro_use] +mod channel; pub use util::int128::u128; pub use util::spotify_id::{SpotifyId, FileId}; From c8ee08663d7b5f723df419ab15cde01e123f45c7 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Tue, 31 Jan 2017 21:38:52 +0100 Subject: [PATCH 08/18] Create channel in spirc instead of in mercury and use MercuryResponseSender implemented by macro --- src/mercury.rs | 14 +++++--------- src/session.rs | 7 ++++--- src/spirc.rs | 46 +++++++++++++++++++++++++++++++++------------- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/src/mercury.rs b/src/mercury.rs index 37a908c6..2b1ee564 100644 --- a/src/mercury.rs +++ b/src/mercury.rs @@ -4,10 +4,10 @@ use protobuf::{self, Message}; use std::collections::HashMap; use std::io::{Cursor, Read, Write}; use std::mem::replace; -use std::sync::mpsc; use protocol; use session::{Session, PacketHandler}; +use spirc::MercuryResponseSender; #[derive(Debug, PartialEq, Eq)] pub enum MercuryMethod { @@ -32,7 +32,7 @@ pub struct MercuryResponse { enum MercuryCallback { Future(eventual::Complete), - Subscription(mpsc::Sender), + Subscription(MercuryResponseSender), Channel, } @@ -45,7 +45,7 @@ pub struct MercuryPending { pub struct MercuryManager { next_seq: u32, pending: HashMap, MercuryPending>, - subscriptions: HashMap>, + subscriptions: HashMap, } impl ToString for MercuryMethod { @@ -103,9 +103,7 @@ impl MercuryManager { rx } - pub fn subscribe(&mut self, session: &Session, uri: String) -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(); - + pub fn subscribe(&mut self, session: &Session, uri: String, tx: MercuryResponseSender) { self.request_with_callback(session, MercuryRequest { method: MercuryMethod::SUB, @@ -114,8 +112,6 @@ impl MercuryManager { payload: Vec::new(), }, MercuryCallback::Subscription(tx)); - - rx } fn parse_part(mut s: &mut Read) -> Vec { @@ -128,7 +124,7 @@ impl MercuryManager { fn complete_subscription(&mut self, response: MercuryResponse, - tx: mpsc::Sender) { + tx: MercuryResponseSender) { for sub_data in response.payload { if let Ok(mut sub) = protobuf::parse_from_bytes::(&sub_data) { diff --git a/src/session.rs b/src/session.rs index 59167614..75976ba2 100644 --- a/src/session.rs +++ b/src/session.rs @@ -9,7 +9,7 @@ use protobuf::{self, Message}; use rand::thread_rng; use std::io::{Read, Write, Cursor}; use std::result::Result; -use std::sync::{Mutex, RwLock, Arc, mpsc}; +use std::sync::{Mutex, RwLock, Arc}; use std::str::FromStr; use album_cover::AlbumCover; @@ -24,6 +24,7 @@ use mercury::{MercuryManager, MercuryRequest, MercuryResponse}; use metadata::{MetadataManager, MetadataRef, MetadataTrait}; use protocol; use stream::StreamManager; +use spirc::MercuryResponseSender; use util::{self, SpotifyId, FileId, ReadSeek}; use version; @@ -320,8 +321,8 @@ impl Session { self.0.mercury.lock().unwrap().request(self, req) } - pub fn mercury_sub(&self, uri: String) -> mpsc::Receiver { - self.0.mercury.lock().unwrap().subscribe(self, uri) + pub fn mercury_sub(&self, uri: String, tx: MercuryResponseSender) { + self.0.mercury.lock().unwrap().subscribe(self, uri, tx) } pub fn cache(&self) -> &Cache { diff --git a/src/spirc.rs b/src/spirc.rs index 6688e834..faee0add 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,10 +1,10 @@ use eventual::Async; use protobuf::{self, Message, RepeatedField}; use std::borrow::Cow; -use std::sync::{Mutex, Arc}; +use std::sync::{mpsc, Mutex, Arc}; use std::collections::HashMap; -use mercury::{MercuryRequest, MercuryMethod}; +use mercury::{MercuryRequest, MercuryMethod, MercuryResponse}; use player::{Player, PlayerState}; use mixer::Mixer; use session::Session; @@ -56,6 +56,18 @@ pub struct State { pub end_of_track: bool, } +pub struct UpdateMessage; + +pub enum SpircMessage { + MercuryMsg(MercuryResponse), + UpdateMsg(UpdateMessage) +} + +implement_sender!(name => MercuryResponseSender, + wrap => MercuryResponse, + with => SpircMessage, + variant => MercuryMsg); + impl SpircManager { pub fn new(session: Session, player: Player, mixer: Box) -> SpircManager { let ident = session.device_id().to_owned(); @@ -92,8 +104,11 @@ impl SpircManager { pub fn run(&self) { let rx = { let mut internal = self.0.lock().unwrap(); + let (tx, rx) = mpsc::channel::(); - let rx = internal.session.mercury_sub(internal.uri()); + let mercury_response_sender = MercuryResponseSender::create(tx.clone()); + + internal.session.mercury_sub(internal.uri(), mercury_response_sender); internal.notify(true, None); @@ -110,18 +125,23 @@ impl SpircManager { rx }; - for pkt in rx { - let data = pkt.payload.first().unwrap(); - let frame = protobuf::parse_from_bytes::(data).unwrap(); + for msg in rx { + match msg { + SpircMessage::MercuryMsg(pkt) => { + let data = pkt.payload.first().unwrap(); + let frame = protobuf::parse_from_bytes::(data).unwrap(); - debug!("{:?} {:?} {} {} {}", - frame.get_typ(), - frame.get_device_state().get_name(), - frame.get_ident(), - frame.get_seq_nr(), - frame.get_state_update_id()); + debug!("{:?} {:?} {} {} {}", + frame.get_typ(), + frame.get_device_state().get_name(), + frame.get_ident(), + frame.get_seq_nr(), + frame.get_state_update_id()); - self.0.lock().unwrap().handle(frame); + self.0.lock().unwrap().handle(frame); + } + _ => {} + } } } From 134239d2981f95735d7e248734ace23b823845d3 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Wed, 1 Feb 2017 21:32:14 +0100 Subject: [PATCH 09/18] Send update messages from mixer to SpircManager --- src/mixer/mod.rs | 5 ++++- src/mixer/softmixer.rs | 19 ++++++++++++++----- src/spirc.rs | 35 +++++++++++++++++++++++------------ 3 files changed, 41 insertions(+), 18 deletions(-) diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index bc19bf8e..a0e991e1 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -1,10 +1,13 @@ use std::borrow::Cow; + +use spirc::UpdateMessageSender; + use self::softmixer::SoftMixer; pub mod softmixer; pub trait Mixer { - fn init(&self); + fn init(&mut self, UpdateMessageSender); fn start(&self); fn stop(&self); fn set_volume(&self, volume: u16); diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index cbe7477f..343b06b6 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -1,22 +1,29 @@ -use super::Mixer; -use super::AudioFilter; use std::borrow::Cow; use std::sync::{Arc, RwLock}; +use spirc::UpdateMessageSender; +use spirc::UpdateMessage; + +use super::Mixer; +use super::AudioFilter; + pub struct SoftMixer { - volume: Arc> + volume: Arc>, + tx: Option } impl SoftMixer { pub fn new() -> SoftMixer { SoftMixer { - volume: Arc::new(RwLock::new(0xFFFF)) + volume: Arc::new(RwLock::new(0xFFFF)), + tx: None } } } impl Mixer for SoftMixer { - fn init(&self) { + fn init(&mut self, tx: UpdateMessageSender) { + self.tx = Some(tx); } fn start(&self) { } @@ -27,6 +34,8 @@ impl Mixer for SoftMixer { } fn set_volume(&self, volume: u16) { *self.volume.write().unwrap() = volume; + let tx = self.tx.as_ref().expect("SoftMixer not initialized"); + tx.send(UpdateMessage).unwrap(); } fn get_audio_filter(&self) -> Option> { let vol = self.volume.clone(); diff --git a/src/spirc.rs b/src/spirc.rs index faee0add..37fbe04f 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -68,6 +68,11 @@ implement_sender!(name => MercuryResponseSender, with => SpircMessage, variant => MercuryMsg); +implement_sender!(name => UpdateMessageSender, + wrap => UpdateMessage, + with => SpircMessage, + variant => UpdateMsg); + impl SpircManager { pub fn new(session: Session, player: Player, mixer: Box) -> SpircManager { let ident = session.device_id().to_owned(); @@ -110,6 +115,10 @@ impl SpircManager { internal.session.mercury_sub(internal.uri(), mercury_response_sender); + let update_message_sender = UpdateMessageSender::create(tx.clone()); + + internal.mixer.init(update_message_sender); + internal.notify(true, None); // Use a weak pointer to avoid creating an Rc cycle between the player and the @@ -127,20 +136,22 @@ impl SpircManager { for msg in rx { match msg { - SpircMessage::MercuryMsg(pkt) => { - let data = pkt.payload.first().unwrap(); - let frame = protobuf::parse_from_bytes::(data).unwrap(); + SpircMessage::MercuryMsg(pkt) => { + let data = pkt.payload.first().unwrap(); + let frame = protobuf::parse_from_bytes::(data).unwrap(); - debug!("{:?} {:?} {} {} {}", - frame.get_typ(), - frame.get_device_state().get_name(), - frame.get_ident(), - frame.get_seq_nr(), - frame.get_state_update_id()); + debug!("{:?} {:?} {} {} {}", + frame.get_typ(), + frame.get_device_state().get_name(), + frame.get_ident(), + frame.get_seq_nr(), + frame.get_state_update_id()); - self.0.lock().unwrap().handle(frame); - } - _ => {} + self.0.lock().unwrap().handle(frame); + } + SpircMessage::UpdateMsg(_) => { + self.0.lock().unwrap().notify(false, None); + } } } } From 2de5d10a2f108661b8e657678d2a33f6ac109df4 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Wed, 1 Feb 2017 21:45:31 +0100 Subject: [PATCH 10/18] SoftMixer: Change volume to AtomicUsize and pass AtomicUsize to SoftVolumeApplier --- src/mixer/softmixer.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index 343b06b6..d2b75f6e 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -1,5 +1,6 @@ use std::borrow::Cow; -use std::sync::{Arc, RwLock}; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; use spirc::UpdateMessageSender; use spirc::UpdateMessage; @@ -8,14 +9,14 @@ use super::Mixer; use super::AudioFilter; pub struct SoftMixer { - volume: Arc>, + volume: Arc, tx: Option } impl SoftMixer { pub fn new() -> SoftMixer { SoftMixer { - volume: Arc::new(RwLock::new(0xFFFF)), + volume: Arc::new(AtomicUsize::new(0xFFFF)), tx: None } } @@ -30,27 +31,25 @@ impl Mixer for SoftMixer { fn stop(&self) { } fn volume(&self) -> u16 { - *self.volume.read().unwrap() + self.volume.load(Ordering::Relaxed) as u16 } fn set_volume(&self, volume: u16) { - *self.volume.write().unwrap() = volume; + self.volume.store(volume as usize, Ordering::Relaxed); let tx = self.tx.as_ref().expect("SoftMixer not initialized"); tx.send(UpdateMessage).unwrap(); } fn get_audio_filter(&self) -> Option> { - let vol = self.volume.clone(); - let get_volume = Box::new(move || *vol.read().unwrap()); - Some(Box::new(SoftVolumeApplier { get_volume: get_volume })) + Some(Box::new(SoftVolumeApplier { volume: self.volume.clone() })) } } struct SoftVolumeApplier { - get_volume: Box u16 + Send> + volume: Arc } impl AudioFilter for SoftVolumeApplier { fn modify_stream<'a>(&self, data: &'a [i16]) -> Cow<'a, [i16]> { - let volume = (self.get_volume)(); + let volume = self.volume.load(Ordering::Relaxed) as u16; if volume == 0xFFFF { Cow::Borrowed(data) } else { From da537b57f41790eb0cf9ecbe7ed99cf949c1db65 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Wed, 1 Feb 2017 22:16:28 +0100 Subject: [PATCH 11/18] AudioFilter: Modify fn modify_stream(&self, data: &mut [i16]); --- src/mixer/mod.rs | 4 +--- src/mixer/softmixer.rs | 18 ++++++------------ src/player.rs | 12 ++++-------- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index a0e991e1..fdadf4fe 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -1,5 +1,3 @@ -use std::borrow::Cow; - use spirc::UpdateMessageSender; use self::softmixer::SoftMixer; @@ -18,7 +16,7 @@ pub trait Mixer { } pub trait AudioFilter { - fn modify_stream<'a>(&self, data: &'a [i16]) -> Cow<'a, [i16]>; + fn modify_stream(&self, data: &mut [i16]); } pub fn find>(name: Option) -> Option> { diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index d2b75f6e..94b03f48 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -48,18 +47,13 @@ struct SoftVolumeApplier { } impl AudioFilter for SoftVolumeApplier { - fn modify_stream<'a>(&self, data: &'a [i16]) -> Cow<'a, [i16]> { + fn modify_stream(&self, data: &mut [i16]) { let volume = self.volume.load(Ordering::Relaxed) as u16; - if volume == 0xFFFF { - Cow::Borrowed(data) - } else { - Cow::Owned(data.iter() - .map(|&x| { - (x as i32 - * volume as i32 - / 0xFFFF) as i16 - }) - .collect()) + if volume != 0xFFFF { + let factor = volume as i32 / 0xFFFF; + for x in data.iter_mut() { + *x = (*x as i32 * factor) as i16; + } } } } \ No newline at end of file diff --git a/src/player.rs b/src/player.rs index f03a592b..5a4f0c98 100644 --- a/src/player.rs +++ b/src/player.rs @@ -139,10 +139,6 @@ impl Player { } } -fn borrow_data<'a>(data: &'a [i16]) -> Cow<'a, [i16]> { - Cow::Borrowed(&data) -} - fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option> { if track.available { Some(Cow::Borrowed(track)) @@ -345,10 +341,10 @@ impl PlayerInternal { match packet { Some(Ok(packet)) => { - let buffer = if let Some(ref editor) = stream_editor { - editor.modify_stream(&packet.data) - } else { - borrow_data(&packet.data) + let mut buffer = packet.data.to_vec(); + + if let Some(ref editor) = stream_editor { + editor.modify_stream(&mut buffer) }; sink.write(&buffer).unwrap(); From 48a43f4948edccc96e84e8aaeee808bf77ee84d8 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 3 Feb 2017 10:06:09 +0100 Subject: [PATCH 12/18] Move structs which are send across threads to own module --- src/lib.rs | 1 + src/mercury.rs | 8 +------- src/messaging/mod.rs | 22 ++++++++++++++++++++++ src/mixer/mod.rs | 2 +- src/mixer/softmixer.rs | 3 +-- src/session.rs | 4 ++-- src/spirc.rs | 20 ++------------------ 7 files changed, 30 insertions(+), 30 deletions(-) create mode 100644 src/messaging/mod.rs diff --git a/src/lib.rs b/src/lib.rs index bdf86147..aea2411f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -63,6 +63,7 @@ pub mod player; pub mod stream; pub mod version; pub mod mixer; +pub mod messaging; #[cfg(feature = "with-syntex")] include!(concat!(env!("OUT_DIR"), "/lib.rs")); #[cfg(not(feature = "with-syntex"))] include!("lib.in.rs"); diff --git a/src/mercury.rs b/src/mercury.rs index 2b1ee564..3d9add8f 100644 --- a/src/mercury.rs +++ b/src/mercury.rs @@ -7,7 +7,7 @@ use std::mem::replace; use protocol; use session::{Session, PacketHandler}; -use spirc::MercuryResponseSender; +use messaging::{MercuryResponse, MercuryResponseSender}; #[derive(Debug, PartialEq, Eq)] pub enum MercuryMethod { @@ -24,12 +24,6 @@ pub struct MercuryRequest { pub payload: Vec>, } -#[derive(Debug)] -pub struct MercuryResponse { - pub uri: String, - pub payload: Vec>, -} - enum MercuryCallback { Future(eventual::Complete), Subscription(MercuryResponseSender), diff --git a/src/messaging/mod.rs b/src/messaging/mod.rs new file mode 100644 index 00000000..9bf43c92 --- /dev/null +++ b/src/messaging/mod.rs @@ -0,0 +1,22 @@ +pub struct UpdateMessage; + +#[derive(Debug)] +pub struct MercuryResponse { + pub uri: String, + pub payload: Vec>, +} + +pub enum SpircMessage { + MercuryMsg(MercuryResponse), + UpdateMsg(UpdateMessage) +} + +implement_sender!(name => MercuryResponseSender, + wrap => MercuryResponse, + with => SpircMessage, + variant => MercuryMsg); + +implement_sender!(name => UpdateMessageSender, + wrap => UpdateMessage, + with => SpircMessage, + variant => UpdateMsg); \ No newline at end of file diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index fdadf4fe..6b2a3f8a 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -1,4 +1,4 @@ -use spirc::UpdateMessageSender; +use messaging::UpdateMessageSender; use self::softmixer::SoftMixer; diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index 94b03f48..789a5fff 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -1,8 +1,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use spirc::UpdateMessageSender; -use spirc::UpdateMessage; +use messaging::{UpdateMessage, UpdateMessageSender}; use super::Mixer; use super::AudioFilter; diff --git a/src/session.rs b/src/session.rs index 75976ba2..76157a77 100644 --- a/src/session.rs +++ b/src/session.rs @@ -20,11 +20,11 @@ use authentication::Credentials; use cache::Cache; use connection::{self, PlainConnection, CipherConnection}; use diffie_hellman::DHLocalKeys; -use mercury::{MercuryManager, MercuryRequest, MercuryResponse}; +use mercury::{MercuryManager, MercuryRequest}; use metadata::{MetadataManager, MetadataRef, MetadataTrait}; use protocol; use stream::StreamManager; -use spirc::MercuryResponseSender; +use messaging::{MercuryResponse, MercuryResponseSender}; use util::{self, SpotifyId, FileId, ReadSeek}; use version; diff --git a/src/spirc.rs b/src/spirc.rs index 37fbe04f..39b43cff 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -4,7 +4,8 @@ use std::borrow::Cow; use std::sync::{mpsc, Mutex, Arc}; use std::collections::HashMap; -use mercury::{MercuryRequest, MercuryMethod, MercuryResponse}; +use mercury::{MercuryRequest, MercuryMethod}; +use messaging::{SpircMessage, MercuryResponseSender, UpdateMessageSender}; use player::{Player, PlayerState}; use mixer::Mixer; use session::Session; @@ -56,23 +57,6 @@ pub struct State { pub end_of_track: bool, } -pub struct UpdateMessage; - -pub enum SpircMessage { - MercuryMsg(MercuryResponse), - UpdateMsg(UpdateMessage) -} - -implement_sender!(name => MercuryResponseSender, - wrap => MercuryResponse, - with => SpircMessage, - variant => MercuryMsg); - -implement_sender!(name => UpdateMessageSender, - wrap => UpdateMessage, - with => SpircMessage, - variant => UpdateMsg); - impl SpircManager { pub fn new(session: Session, player: Player, mixer: Box) -> SpircManager { let ident = session.device_id().to_owned(); From 2c187eb3aed3311b79470fb04e7aa3e71e7eb5af Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 3 Feb 2017 11:05:52 +0100 Subject: [PATCH 13/18] Avoid copying the data array in the player --- src/player.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/player.rs b/src/player.rs index 5a4f0c98..aad23dfa 100644 --- a/src/player.rs +++ b/src/player.rs @@ -340,13 +340,11 @@ impl PlayerInternal { let packet = decoder.as_mut().unwrap().packets().next(); match packet { - Some(Ok(packet)) => { - let mut buffer = packet.data.to_vec(); - + Some(Ok(mut packet)) => { if let Some(ref editor) = stream_editor { - editor.modify_stream(&mut buffer) + editor.modify_stream(&mut packet.data) }; - sink.write(&buffer).unwrap(); + sink.write(&packet.data).unwrap(); self.update(|state| { state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32; From ec0e81f0ae3a4a475999e64493d83d2e160019be Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 3 Feb 2017 11:15:26 +0100 Subject: [PATCH 14/18] Add documentation for channel macro --- src/util/channel.rs | 58 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/util/channel.rs b/src/util/channel.rs index 982b4c27..1d277f42 100644 --- a/src/util/channel.rs +++ b/src/util/channel.rs @@ -1,3 +1,61 @@ +/// Creates an implentation of sender which can be used to share an async channel for multiple message types. +/// +/// # Examples +/// +/// ``` +/// struct D; +/// struct A; +/// +/// enum Msg { +/// FirstType(D), +/// SecondType(A) +/// } +/// +/// fn main() { +/// let (tx, rx) = channel::(); +/// +/// let d_sender = DSender::create(tx.clone()); +/// let a_sender = ASender::create(tx.clone()); +/// subscribe(d_sender.clone()); +/// subscribe2(d_sender.clone()); +/// subscribe3(a_sender.clone()); + // +/// let mut i = 0; + // +/// for m in rx { +/// i += 1; +/// match m { +/// Msg::FirstType(_) => println!("m: D {}", i), +/// Msg::SecondType(_) => println!("m: A {}", i) +/// }; +/// } +/// } +/// +/// fn subscribe(sender: DSender) { +/// thread::spawn(move|| { +/// sender.send(D).unwrap(); +/// }); +/// } +/// fn subscribe2(sender: DSender) { +/// thread::spawn(move|| { +/// thread::sleep(time::Duration::from_millis(10)); +/// sender.send(D).unwrap(); +/// }); +/// } +/// fn subscribe3(sender: ASender) { +/// thread::spawn(move|| { +/// sender.send(A).unwrap(); +/// }); +/// } +/// implement_sender!(name => DSender, +/// wrap => D, +/// with => Msg, +/// variant => FirstType) +/// implement_sender!(name => ASender, +/// wrap => A, +/// with => Msg, +/// variant => SecondType) +/// ``` macro_rules! implement_sender { (name => $name:ident, wrap => $wrap_type:ident, From 5ef28bf2a52ae37a766142248f73a9d2427196a2 Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 3 Feb 2017 13:20:49 +0100 Subject: [PATCH 15/18] SpircManager: Update state from player and mixer before sending info --- src/spirc.rs | 50 +++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/src/spirc.rs b/src/spirc.rs index 39b43cff..293a7e7a 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,6 +1,5 @@ use eventual::Async; use protobuf::{self, Message, RepeatedField}; -use std::borrow::Cow; use std::sync::{mpsc, Mutex, Arc}; use std::collections::HashMap; @@ -57,6 +56,42 @@ pub struct State { pub end_of_track: bool, } +impl State { + pub fn new() -> State { + let state = State { + status: PlayStatus::kPlayStatusStop, + position_ms: 0, + position_measured_at: 0, + update_time: 0, + volume: 0, + track: None, + end_of_track: false, + }; + state.update_time() + } + + pub fn update_from_player(mut self, player: &Player) -> State { + let player_state = player.state(); + let (position_ms, position_measured_at) = player_state.position(); + self.status = player_state.status(); + self.position_ms = position_ms; + self.position_measured_at = position_measured_at; + self.track = player_state.track; + self.end_of_track = player_state.end_of_track(); + self.update_time() + } + + pub fn update_from_mixer(mut self, mixer: &Box) -> State { + self.volume = mixer.volume(); + self.update_time() + } + + fn update_time(mut self) -> State { + self.update_time = util::now_ms(); + self + } +} + impl SpircManager { pub fn new(session: Session, player: Player, mixer: Box) -> SpircManager { let ident = session.device_id().to_owned(); @@ -432,16 +467,9 @@ impl<'a> CommandSender<'a> { } fn send(self) { - //TODO: get data - let state = Cow::Owned(State { - status: PlayStatus::kPlayStatusStop, - position_ms: 0, - position_measured_at: 0, - update_time: util::now_ms(), - volume: 0, - track: None, - end_of_track: false, - }); + let state = State::new() + .update_from_player(&self.spirc_internal.player) + .update_from_mixer(&self.spirc_internal.mixer); let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { version: 1, From 44467a44d6700b70033741f0f1b97da90341f7be Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 3 Feb 2017 13:30:30 +0100 Subject: [PATCH 16/18] Call start and stop on mixer --- src/spirc.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/spirc.rs b/src/spirc.rs index 293a7e7a..21894f3a 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -290,9 +290,11 @@ impl SpircInternal { } MessageType::kMessageTypePlay => { self.player.play(); + self.mixer.start(); } MessageType::kMessageTypePause => { self.player.pause(); + self.mixer.stop(); } MessageType::kMessageTypeNext => { self.index = (self.index + 1) % self.tracks.len() as u32; @@ -314,6 +316,7 @@ impl SpircInternal { if self.is_active && frame.get_device_state().get_is_active() { self.is_active = false; self.player.stop(); + self.mixer.stop(); } } MessageType::kMessageTypeVolume => { From a7aba5c8e7bd398d2b7ed48916c996fadb8c725a Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 3 Feb 2017 14:19:30 +0100 Subject: [PATCH 17/18] SoftVolumeMixer: move factor in again, otherwise i32 division always results in 0 --- src/mixer/softmixer.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index 789a5fff..184ed563 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -49,9 +49,8 @@ impl AudioFilter for SoftVolumeApplier { fn modify_stream(&self, data: &mut [i16]) { let volume = self.volume.load(Ordering::Relaxed) as u16; if volume != 0xFFFF { - let factor = volume as i32 / 0xFFFF; for x in data.iter_mut() { - *x = (*x as i32 * factor) as i16; + *x = (*x as i32 * volume as i32 / 0xFFFF) as i16; } } } From 10f9da410ef970e9ba8b9c0bb5e74ca415a98dab Mon Sep 17 00:00:00 2001 From: Daniel Romero Date: Fri, 3 Feb 2017 17:08:35 +0100 Subject: [PATCH 18/18] Remove code to notify spirc manager from the mixer --- src/lib.rs | 5 +- src/mercury.rs | 20 ++++++-- src/messaging/mod.rs | 22 --------- src/mixer/mod.rs | 3 -- src/mixer/softmixer.rs | 13 +----- src/session.rs | 9 ++-- src/spirc.rs | 40 ++++++---------- src/util/channel.rs | 102 ----------------------------------------- src/util/mod.rs | 2 - 9 files changed, 35 insertions(+), 181 deletions(-) delete mode 100644 src/messaging/mod.rs delete mode 100644 src/util/channel.rs diff --git a/src/lib.rs b/src/lib.rs index aea2411f..59e46547 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,9 +8,6 @@ #![cfg_attr(feature="clippy", feature(plugin))] #![cfg_attr(feature="clippy", plugin(clippy))] -#[macro_use] -pub mod util; - #[macro_use] extern crate lazy_static; #[macro_use] extern crate log; @@ -61,9 +58,9 @@ pub mod link; pub mod metadata; pub mod player; pub mod stream; +pub mod util; pub mod version; pub mod mixer; -pub mod messaging; #[cfg(feature = "with-syntex")] include!(concat!(env!("OUT_DIR"), "/lib.rs")); #[cfg(not(feature = "with-syntex"))] include!("lib.in.rs"); diff --git a/src/mercury.rs b/src/mercury.rs index 3d9add8f..37a908c6 100644 --- a/src/mercury.rs +++ b/src/mercury.rs @@ -4,10 +4,10 @@ use protobuf::{self, Message}; use std::collections::HashMap; use std::io::{Cursor, Read, Write}; use std::mem::replace; +use std::sync::mpsc; use protocol; use session::{Session, PacketHandler}; -use messaging::{MercuryResponse, MercuryResponseSender}; #[derive(Debug, PartialEq, Eq)] pub enum MercuryMethod { @@ -24,9 +24,15 @@ pub struct MercuryRequest { pub payload: Vec>, } +#[derive(Debug)] +pub struct MercuryResponse { + pub uri: String, + pub payload: Vec>, +} + enum MercuryCallback { Future(eventual::Complete), - Subscription(MercuryResponseSender), + Subscription(mpsc::Sender), Channel, } @@ -39,7 +45,7 @@ pub struct MercuryPending { pub struct MercuryManager { next_seq: u32, pending: HashMap, MercuryPending>, - subscriptions: HashMap, + subscriptions: HashMap>, } impl ToString for MercuryMethod { @@ -97,7 +103,9 @@ impl MercuryManager { rx } - pub fn subscribe(&mut self, session: &Session, uri: String, tx: MercuryResponseSender) { + pub fn subscribe(&mut self, session: &Session, uri: String) -> mpsc::Receiver { + let (tx, rx) = mpsc::channel(); + self.request_with_callback(session, MercuryRequest { method: MercuryMethod::SUB, @@ -106,6 +114,8 @@ impl MercuryManager { payload: Vec::new(), }, MercuryCallback::Subscription(tx)); + + rx } fn parse_part(mut s: &mut Read) -> Vec { @@ -118,7 +128,7 @@ impl MercuryManager { fn complete_subscription(&mut self, response: MercuryResponse, - tx: MercuryResponseSender) { + tx: mpsc::Sender) { for sub_data in response.payload { if let Ok(mut sub) = protobuf::parse_from_bytes::(&sub_data) { diff --git a/src/messaging/mod.rs b/src/messaging/mod.rs deleted file mode 100644 index 9bf43c92..00000000 --- a/src/messaging/mod.rs +++ /dev/null @@ -1,22 +0,0 @@ -pub struct UpdateMessage; - -#[derive(Debug)] -pub struct MercuryResponse { - pub uri: String, - pub payload: Vec>, -} - -pub enum SpircMessage { - MercuryMsg(MercuryResponse), - UpdateMsg(UpdateMessage) -} - -implement_sender!(name => MercuryResponseSender, - wrap => MercuryResponse, - with => SpircMessage, - variant => MercuryMsg); - -implement_sender!(name => UpdateMessageSender, - wrap => UpdateMessage, - with => SpircMessage, - variant => UpdateMsg); \ No newline at end of file diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs index 6b2a3f8a..32350af3 100644 --- a/src/mixer/mod.rs +++ b/src/mixer/mod.rs @@ -1,11 +1,8 @@ -use messaging::UpdateMessageSender; - use self::softmixer::SoftMixer; pub mod softmixer; pub trait Mixer { - fn init(&mut self, UpdateMessageSender); fn start(&self); fn stop(&self); fn set_volume(&self, volume: u16); diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs index 184ed563..e54e728e 100644 --- a/src/mixer/softmixer.rs +++ b/src/mixer/softmixer.rs @@ -1,29 +1,22 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; -use messaging::{UpdateMessage, UpdateMessageSender}; - use super::Mixer; use super::AudioFilter; pub struct SoftMixer { - volume: Arc, - tx: Option + volume: Arc } impl SoftMixer { pub fn new() -> SoftMixer { SoftMixer { - volume: Arc::new(AtomicUsize::new(0xFFFF)), - tx: None + volume: Arc::new(AtomicUsize::new(0xFFFF)) } } } impl Mixer for SoftMixer { - fn init(&mut self, tx: UpdateMessageSender) { - self.tx = Some(tx); - } fn start(&self) { } fn stop(&self) { @@ -33,8 +26,6 @@ impl Mixer for SoftMixer { } fn set_volume(&self, volume: u16) { self.volume.store(volume as usize, Ordering::Relaxed); - let tx = self.tx.as_ref().expect("SoftMixer not initialized"); - tx.send(UpdateMessage).unwrap(); } fn get_audio_filter(&self) -> Option> { Some(Box::new(SoftVolumeApplier { volume: self.volume.clone() })) diff --git a/src/session.rs b/src/session.rs index 76157a77..59167614 100644 --- a/src/session.rs +++ b/src/session.rs @@ -9,7 +9,7 @@ use protobuf::{self, Message}; use rand::thread_rng; use std::io::{Read, Write, Cursor}; use std::result::Result; -use std::sync::{Mutex, RwLock, Arc}; +use std::sync::{Mutex, RwLock, Arc, mpsc}; use std::str::FromStr; use album_cover::AlbumCover; @@ -20,11 +20,10 @@ use authentication::Credentials; use cache::Cache; use connection::{self, PlainConnection, CipherConnection}; use diffie_hellman::DHLocalKeys; -use mercury::{MercuryManager, MercuryRequest}; +use mercury::{MercuryManager, MercuryRequest, MercuryResponse}; use metadata::{MetadataManager, MetadataRef, MetadataTrait}; use protocol; use stream::StreamManager; -use messaging::{MercuryResponse, MercuryResponseSender}; use util::{self, SpotifyId, FileId, ReadSeek}; use version; @@ -321,8 +320,8 @@ impl Session { self.0.mercury.lock().unwrap().request(self, req) } - pub fn mercury_sub(&self, uri: String, tx: MercuryResponseSender) { - self.0.mercury.lock().unwrap().subscribe(self, uri, tx) + pub fn mercury_sub(&self, uri: String) -> mpsc::Receiver { + self.0.mercury.lock().unwrap().subscribe(self, uri) } pub fn cache(&self) -> &Cache { diff --git a/src/spirc.rs b/src/spirc.rs index 21894f3a..c532f794 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,10 +1,9 @@ use eventual::Async; use protobuf::{self, Message, RepeatedField}; -use std::sync::{mpsc, Mutex, Arc}; +use std::sync::{Mutex, Arc}; use std::collections::HashMap; use mercury::{MercuryRequest, MercuryMethod}; -use messaging::{SpircMessage, MercuryResponseSender, UpdateMessageSender}; use player::{Player, PlayerState}; use mixer::Mixer; use session::Session; @@ -128,15 +127,8 @@ impl SpircManager { pub fn run(&self) { let rx = { let mut internal = self.0.lock().unwrap(); - let (tx, rx) = mpsc::channel::(); - let mercury_response_sender = MercuryResponseSender::create(tx.clone()); - - internal.session.mercury_sub(internal.uri(), mercury_response_sender); - - let update_message_sender = UpdateMessageSender::create(tx.clone()); - - internal.mixer.init(update_message_sender); + let rx = internal.session.mercury_sub(internal.uri()); internal.notify(true, None); @@ -153,25 +145,18 @@ impl SpircManager { rx }; - for msg in rx { - match msg { - SpircMessage::MercuryMsg(pkt) => { - let data = pkt.payload.first().unwrap(); - let frame = protobuf::parse_from_bytes::(data).unwrap(); + for pkt in rx { + let data = pkt.payload.first().unwrap(); + let frame = protobuf::parse_from_bytes::(data).unwrap(); - debug!("{:?} {:?} {} {} {}", - frame.get_typ(), - frame.get_device_state().get_name(), - frame.get_ident(), - frame.get_seq_nr(), - frame.get_state_update_id()); + debug!("{:?} {:?} {} {} {}", + frame.get_typ(), + frame.get_device_state().get_name(), + frame.get_ident(), + frame.get_seq_nr(), + frame.get_state_update_id()); - self.0.lock().unwrap().handle(frame); - } - SpircMessage::UpdateMsg(_) => { - self.0.lock().unwrap().notify(false, None); - } - } + self.0.lock().unwrap().handle(frame); } } @@ -321,6 +306,7 @@ impl SpircInternal { } MessageType::kMessageTypeVolume => { self.mixer.set_volume(frame.get_volume() as u16); + self.notify(false, None); } MessageType::kMessageTypeGoodbye => { if frame.has_ident() { diff --git a/src/util/channel.rs b/src/util/channel.rs deleted file mode 100644 index 1d277f42..00000000 --- a/src/util/channel.rs +++ /dev/null @@ -1,102 +0,0 @@ -/// Creates an implentation of sender which can be used to share an async channel for multiple message types. -/// -/// # Examples -/// -/// ``` -/// struct D; -/// struct A; -/// -/// enum Msg { -/// FirstType(D), -/// SecondType(A) -/// } -/// -/// fn main() { -/// let (tx, rx) = channel::(); -/// -/// let d_sender = DSender::create(tx.clone()); -/// let a_sender = ASender::create(tx.clone()); -/// subscribe(d_sender.clone()); -/// subscribe2(d_sender.clone()); -/// subscribe3(a_sender.clone()); - // -/// let mut i = 0; - // -/// for m in rx { -/// i += 1; -/// match m { -/// Msg::FirstType(_) => println!("m: D {}", i), -/// Msg::SecondType(_) => println!("m: A {}", i) -/// }; -/// } -/// } -/// -/// fn subscribe(sender: DSender) { -/// thread::spawn(move|| { -/// sender.send(D).unwrap(); -/// }); -/// } -/// fn subscribe2(sender: DSender) { -/// thread::spawn(move|| { -/// thread::sleep(time::Duration::from_millis(10)); -/// sender.send(D).unwrap(); -/// }); -/// } -/// fn subscribe3(sender: ASender) { -/// thread::spawn(move|| { -/// sender.send(A).unwrap(); -/// }); -/// } -/// implement_sender!(name => DSender, -/// wrap => D, -/// with => Msg, -/// variant => FirstType) -/// implement_sender!(name => ASender, -/// wrap => A, -/// with => Msg, -/// variant => SecondType) -/// ``` -macro_rules! implement_sender { - (name => $name:ident, - wrap => $wrap_type:ident, - with => $with_type:ident, - variant => $variant:ident) => { - pub struct $name { - wrapped_sender: ::std::sync::mpsc::Sender<$with_type>, - } - - impl $name { - pub fn create(sender: ::std::sync::mpsc::Sender<$with_type>) -> $name { - $name { - wrapped_sender: sender - } - } - pub fn send(&self, t: $wrap_type) -> Result<(), ::std::sync::mpsc::SendError<$wrap_type>> { - let wrapped = self.wrap(t); - let result = self.wrapped_sender.send(wrapped); - result.map_err(|senderror| { - let ::std::sync::mpsc::SendError(z) = senderror; - ::std::sync::mpsc::SendError(self.unwrap(z)) - }) - } - fn wrap(&self, d: $wrap_type) -> $with_type { - $with_type::$variant(d) - } - fn unwrap(&self, msg: $with_type) -> $wrap_type { - let d = match msg { - $with_type::$variant(d) => d, - _ => unreachable!() - }; - d - } - } - - impl Clone for $name { - fn clone(&self) -> $name { - $name { - wrapped_sender: self.wrapped_sender.clone() - } - } - } - } -} \ No newline at end of file diff --git a/src/util/mod.rs b/src/util/mod.rs index f550f526..0683f6a0 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -11,8 +11,6 @@ mod int128; mod spotify_id; mod arcvec; mod subfile; -#[macro_use] -mod channel; pub use util::int128::u128; pub use util::spotify_id::{SpotifyId, FileId};