diff --git a/src/lib.rs b/src/lib.rs index 31671d0c..59e46547 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -60,6 +60,7 @@ pub mod player; pub mod stream; pub mod util; pub mod version; +pub mod mixer; #[cfg(feature = "with-syntex")] include!(concat!(env!("OUT_DIR"), "/lib.rs")); #[cfg(not(feature = "with-syntex"))] include!("lib.in.rs"); diff --git a/src/main.rs b/src/main.rs index cfe1b9e8..72e0113c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,8 @@ use librespot::audio_backend::{self, BACKENDS}; use librespot::cache::{Cache, DefaultCache, NoCache}; use librespot::player::Player; use librespot::session::{Bitrate, Config, Session}; +use librespot::mixer::{self, Mixer}; + use librespot::version; fn usage(program: &str, opts: &getopts::Options) -> String { @@ -59,7 +61,7 @@ fn list_backends() { } } -fn setup(args: &[String]) -> (Session, Player) { +fn setup(args: &[String]) -> (Session, Player, Box) { let mut opts = getopts::Options::new(); opts.optopt("c", "cache", "Path to a directory where files will be cached.", "CACHE") .reqopt("n", "name", "Device name", "NAME") @@ -70,7 +72,8 @@ fn setup(args: &[String]) -> (Session, Player) { .optopt("u", "username", "Username to sign in with", "USERNAME") .optopt("p", "password", "Password", "PASSWORD") .optopt("", "backend", "Audio backend to use. Use '?' to list options", "BACKEND") - .optopt("", "device", "Audio device to use. Use '?' to list options", "DEVICE"); + .optopt("", "device", "Audio device to use. Use '?' to list options", "DEVICE") + .optopt("", "mixer", "Mixer to use", "MIXER"); let matches = match opts.parse(&args[1..]) { Ok(m) => m, @@ -119,20 +122,24 @@ fn setup(args: &[String]) -> (Session, Player) { let credentials = get_credentials(&session, matches.opt_str("username"), matches.opt_str("password")); session.login(credentials).unwrap(); + + let mixer_name = matches.opt_str("mixer"); + let mixer = mixer::find(mixer_name.as_ref()).expect("Invalid mixer"); + let audio_filter = mixer.get_audio_filter(); let device_name = matches.opt_str("device"); - let player = Player::new(session.clone(), move || { + let player = Player::new(session.clone(), audio_filter, move || { (backend)(device_name.as_ref().map(AsRef::as_ref)) }); - (session, player) + (session, player, mixer) } fn main() { let args: Vec = std::env::args().collect(); - let (session, player) = setup(&args); + let (session, player, mixer) = setup(&args); - let spirc = SpircManager::new(session.clone(), player); + let spirc = SpircManager::new(session.clone(), player, mixer); let spirc_signal = spirc.clone(); thread::spawn(move || spirc.run()); diff --git a/src/mixer/mod.rs b/src/mixer/mod.rs new file mode 100644 index 00000000..32350af3 --- /dev/null +++ b/src/mixer/mod.rs @@ -0,0 +1,23 @@ +use self::softmixer::SoftMixer; + +pub mod softmixer; + +pub trait Mixer { + fn start(&self); + fn stop(&self); + fn set_volume(&self, volume: u16); + fn volume(&self) -> u16; + fn get_audio_filter(&self) -> Option> { + None + } +} + +pub trait AudioFilter { + fn modify_stream(&self, data: &mut [i16]); +} + +pub fn find>(name: Option) -> Option> { + match name { + _ => Some(Box::new(SoftMixer::new())), + } +} \ No newline at end of file diff --git a/src/mixer/softmixer.rs b/src/mixer/softmixer.rs new file mode 100644 index 00000000..e54e728e --- /dev/null +++ b/src/mixer/softmixer.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use super::Mixer; +use super::AudioFilter; + +pub struct SoftMixer { + volume: Arc +} + +impl SoftMixer { + pub fn new() -> SoftMixer { + SoftMixer { + volume: Arc::new(AtomicUsize::new(0xFFFF)) + } + } +} + +impl Mixer for SoftMixer { + fn start(&self) { + } + fn stop(&self) { + } + fn volume(&self) -> u16 { + self.volume.load(Ordering::Relaxed) as u16 + } + fn set_volume(&self, volume: u16) { + self.volume.store(volume as usize, Ordering::Relaxed); + } + fn get_audio_filter(&self) -> Option> { + Some(Box::new(SoftVolumeApplier { volume: self.volume.clone() })) + } +} + +struct SoftVolumeApplier { + volume: Arc +} + +impl AudioFilter for SoftVolumeApplier { + fn modify_stream(&self, data: &mut [i16]) { + let volume = self.volume.load(Ordering::Relaxed) as u16; + if volume != 0xFFFF { + for x in data.iter_mut() { + *x = (*x as i32 * volume as i32 / 0xFFFF) as i16; + } + } + } +} \ No newline at end of file diff --git a/src/player.rs b/src/player.rs index 60a5ad01..aad23dfa 100644 --- a/src/player.rs +++ b/src/player.rs @@ -9,6 +9,7 @@ use audio_decrypt::AudioDecrypt; use audio_backend::Sink; use metadata::{FileFormat, Track, TrackRef}; use session::{Bitrate, Session}; +use mixer::AudioFilter; use util::{self, ReadSeek, SpotifyId, Subfile}; pub use spirc::PlayStatus; @@ -47,8 +48,6 @@ pub struct PlayerState { pub status: PlayStatus, pub position_ms: u32, pub position_measured_at: i64, - pub update_time: i64, - pub volume: u16, pub track: Option, pub end_of_track: bool, @@ -67,14 +66,13 @@ enum PlayerCommand { Load(SpotifyId, bool, u32), Play, Pause, - Volume(u16), Stop, Seek(u32), SeekAt(u32, i64), } impl Player { - pub fn new(session: Session, sink_builder: F) -> Player + pub fn new(session: Session, stream_editor: Option>, sink_builder: F) -> Player where F: FnOnce() -> Box + Send + 'static { let (cmd_tx, cmd_rx) = mpsc::channel(); @@ -82,8 +80,6 @@ impl Player { status: PlayStatus::kPlayStatusStop, position_ms: 0, position_measured_at: 0, - update_time: util::now_ms(), - volume: 0xFFFF, track: None, end_of_track: false, })); @@ -97,7 +93,7 @@ impl Player { observers: observers.clone(), }; - thread::spawn(move || internal.run(sink_builder())); + thread::spawn(move || internal.run(sink_builder(), stream_editor)); Player { commands: cmd_tx, @@ -138,30 +134,11 @@ impl Player { self.state.lock().unwrap().clone() } - pub fn volume(&self, vol: u16) { - self.command(PlayerCommand::Volume(vol)); - } - pub fn add_observer(&self, observer: PlayerObserver) { self.observers.lock().unwrap().push(observer); } } -fn apply_volume(volume: u16, data: &[i16]) -> Cow<[i16]> { - // Fast path when volume is 100% - if volume == 0xFFFF { - Cow::Borrowed(data) - } else { - Cow::Owned(data.iter() - .map(|&x| { - (x as i32 - * volume as i32 - / 0xFFFF) as i16 - }) - .collect()) - } -} - fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option> { if track.available { Some(Cow::Borrowed(track)) @@ -229,7 +206,7 @@ fn run_onstop(session: &Session) { } impl PlayerInternal { - fn run(self, mut sink: Box) { + fn run(self, mut sink: Box, stream_editor: Option>) { let mut decoder = None; loop { @@ -334,7 +311,6 @@ impl PlayerInternal { Some(PlayerCommand::Pause) => { self.update(|state| { state.status = PlayStatus::kPlayStatusPause; - state.update_time = util::now_ms(); state.position_ms = decoder.as_mut().map(|d| vorbis_time_tell_ms(d).unwrap()).unwrap_or(0) as u32; state.position_measured_at = util::now_ms(); true @@ -343,12 +319,6 @@ impl PlayerInternal { sink.stop().unwrap(); run_onstop(&self.session); } - Some(PlayerCommand::Volume(vol)) => { - self.update(|state| { - state.volume = vol; - true - }); - } Some(PlayerCommand::Stop) => { self.update(|state| { if state.status == PlayStatus::kPlayStatusPlay { @@ -370,10 +340,11 @@ impl PlayerInternal { let packet = decoder.as_mut().unwrap().packets().next(); match packet { - Some(Ok(packet)) => { - let buffer = apply_volume(self.state.lock().unwrap().volume, - &packet.data); - sink.write(&buffer).unwrap(); + Some(Ok(mut packet)) => { + if let Some(ref editor) = stream_editor { + editor.modify_stream(&mut packet.data) + }; + sink.write(&packet.data).unwrap(); self.update(|state| { state.position_ms = vorbis_time_tell_ms(decoder.as_mut().unwrap()).unwrap() as u32; @@ -408,7 +379,6 @@ impl PlayerInternal { let observers = self.observers.lock().unwrap(); if update { - guard.update_time = util::now_ms(); let state = guard.clone(); drop(guard); @@ -428,14 +398,6 @@ impl PlayerState { (self.position_ms, self.position_measured_at) } - pub fn volume(&self) -> u16 { - self.volume - } - - pub fn update_time(&self) -> i64 { - self.update_time - } - pub fn end_of_track(&self) -> bool { self.end_of_track } diff --git a/src/spirc.rs b/src/spirc.rs index 674fd47a..c532f794 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,11 +1,11 @@ use eventual::Async; use protobuf::{self, Message, RepeatedField}; -use std::borrow::Cow; use std::sync::{Mutex, Arc}; use std::collections::HashMap; use mercury::{MercuryRequest, MercuryMethod}; use player::{Player, PlayerState}; +use mixer::Mixer; use session::Session; use util; use util::SpotifyId; @@ -20,6 +20,7 @@ pub struct SpircManager(Arc>); struct SpircInternal { player: Player, session: Session, + mixer: Box, seq_nr: u32, @@ -43,14 +44,62 @@ struct SpircInternal { devices: HashMap, } +#[derive(Clone)] +pub struct State { + pub status: PlayStatus, + pub position_ms: u32, + pub position_measured_at: i64, + pub update_time: i64, + pub volume: u16, + pub track: Option, + pub end_of_track: bool, +} + +impl State { + pub fn new() -> State { + let state = State { + status: PlayStatus::kPlayStatusStop, + position_ms: 0, + position_measured_at: 0, + update_time: 0, + volume: 0, + track: None, + end_of_track: false, + }; + state.update_time() + } + + pub fn update_from_player(mut self, player: &Player) -> State { + let player_state = player.state(); + let (position_ms, position_measured_at) = player_state.position(); + self.status = player_state.status(); + self.position_ms = position_ms; + self.position_measured_at = position_measured_at; + self.track = player_state.track; + self.end_of_track = player_state.end_of_track(); + self.update_time() + } + + pub fn update_from_mixer(mut self, mixer: &Box) -> State { + self.volume = mixer.volume(); + self.update_time() + } + + fn update_time(mut self) -> State { + self.update_time = util::now_ms(); + self + } +} + impl SpircManager { - pub fn new(session: Session, player: Player) -> SpircManager { + pub fn new(session: Session, player: Player, mixer: Box) -> SpircManager { let ident = session.device_id().to_owned(); let name = session.config().device_name.clone(); SpircManager(Arc::new(Mutex::new(SpircInternal { player: player, session: session, + mixer: mixer, seq_nr: 0, @@ -184,7 +233,7 @@ impl SpircInternal { let track = self.tracks[self.index as usize]; self.player.load(track, true, 0); } else { - self.notify_with_player_state(false, None, player_state); + self.notify(false, None); } } @@ -226,9 +275,11 @@ impl SpircInternal { } MessageType::kMessageTypePlay => { self.player.play(); + self.mixer.start(); } MessageType::kMessageTypePause => { self.player.pause(); + self.mixer.stop(); } MessageType::kMessageTypeNext => { self.index = (self.index + 1) % self.tracks.len() as u32; @@ -250,10 +301,12 @@ impl SpircInternal { if self.is_active && frame.get_device_state().get_is_active() { self.is_active = false; self.player.stop(); + self.mixer.stop(); } } MessageType::kMessageTypeVolume => { - self.player.volume(frame.get_volume() as u16); + self.mixer.set_volume(frame.get_volume() as u16); + self.notify(false, None); } MessageType::kMessageTypeGoodbye => { if frame.has_ident() { @@ -287,30 +340,11 @@ impl SpircInternal { cs.send(); } - fn notify_with_player_state(&mut self, - hello: bool, - recipient: Option<&str>, - player_state: &PlayerState) { - let mut cs = CommandSender::new(self, - if hello { - MessageType::kMessageTypeHello - } else { - MessageType::kMessageTypeNotify - }) - .player_state(player_state); - if let Some(s) = recipient { - cs = cs.recipient(&s); - } - cs.send(); - } - - fn spirc_state(&self, player_state: &PlayerState) -> protocol::spirc::State { - let (position_ms, position_measured_at) = player_state.position(); - + fn spirc_state(&self, state: &State) -> protocol::spirc::State { protobuf_init!(protocol::spirc::State::new(), { - status: player_state.status(), - position_ms: position_ms, - position_measured_at: position_measured_at as u64, + status: state.status, + position_ms: state.position_ms, + position_measured_at: state.position_measured_at as u64, playing_track_index: self.index, track: self.tracks.iter().map(|track| { @@ -329,12 +363,12 @@ impl SpircInternal { }) } - fn device_state(&self, player_state: &PlayerState) -> protocol::spirc::DeviceState { + fn device_state(&self, state: &State) -> protocol::spirc::DeviceState { protobuf_init!(protocol::spirc::DeviceState::new(), { sw_version: version::version_string(), is_active: self.is_active, can_play: self.can_play, - volume: player_state.volume() as u32, + volume: state.volume as u32, name: self.name.clone(), error_code: 0, became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 }, @@ -398,7 +432,6 @@ struct CommandSender<'a> { spirc_internal: &'a mut SpircInternal, cmd: MessageType, recipient: Option<&'a str>, - player_state: Option<&'a PlayerState>, state: Option, } @@ -408,7 +441,6 @@ impl<'a> CommandSender<'a> { spirc_internal: spirc_internal, cmd: cmd, recipient: None, - player_state: None, state: None, } } @@ -418,22 +450,15 @@ impl<'a> CommandSender<'a> { self } - fn player_state(mut self, s: &'a PlayerState) -> CommandSender { - self.player_state = Some(s); - self - } - fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> { self.state = Some(s); self } fn send(self) { - let state = self.player_state.map_or_else(|| { - Cow::Owned(self.spirc_internal.player.state()) - }, |s| { - Cow::Borrowed(s) - }); + let state = State::new() + .update_from_player(&self.spirc_internal.player) + .update_from_mixer(&self.spirc_internal.mixer); let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { version: 1, @@ -445,7 +470,7 @@ impl<'a> CommandSender<'a> { self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) ), device_state: self.spirc_internal.device_state(&state), - state_update_id: state.update_time() + state_update_id: state.update_time }); if self.spirc_internal.is_active {