From 94503e351b20421b6616b7ec2cdbc3cd33881636 Mon Sep 17 00:00:00 2001 From: Paul Lietar Date: Thu, 9 Jul 2015 22:04:19 +0100 Subject: [PATCH] Remove busy waiting in SpircManager. --- src/lib.rs | 2 +- src/player.rs | 131 ++++++++++++++++++++++++++++++++------------------ src/spirc.rs | 52 +++++++++----------- 3 files changed, 109 insertions(+), 76 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4c9f2264..0ef28231 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ #![crate_name = "librespot"] -#![feature(plugin,scoped,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,arc_weak,append,future)] +#![feature(plugin,scoped,zero_one,iter_arith,slice_position_elem,slice_bytes,bitset,arc_weak,append,future,mpsc_select)] #![allow(deprecated)] //#![allow(unused_imports,dead_code)] diff --git a/src/player.rs b/src/player.rs index b9c1713e..d88c3cca 100644 --- a/src/player.rs +++ b/src/player.rs @@ -76,28 +76,25 @@ impl <'s> PlayerInternal<'s> { portaudio::initialize().unwrap(); let stream = portaudio::stream::Stream::::open_default( - 0, - 2, - 44100.0, + 0, 2, 44100.0, portaudio::stream::FRAMES_PER_BUFFER_UNSPECIFIED, None - ).unwrap(); + ).unwrap(); let mut decoder = None; loop { match self.commands.try_recv() { Ok(PlayerCommand::Load(id, play, position)) => { - println!("Load"); - let mut h = self.state.0.lock().unwrap(); - if h.status == PlayStatus::kPlayStatusPlay { - stream.stop().unwrap(); - } - h.status = PlayStatus::kPlayStatusLoading; - h.position_ms = position; - h.position_measured_at = util::now_ms(); - h.update_time = util::now_ms(); - drop(h); + self.update(|state| { + if state.status == PlayStatus::kPlayStatusPlay { + stream.stop().unwrap(); + } + state.status = PlayStatus::kPlayStatusLoading; + state.position_ms = position; + state.position_measured_at = util::now_ms(); + return true; + }); let track : TrackRef = self.session.metadata(id); let file_id = *track.wait().unwrap().files.first().unwrap(); @@ -109,48 +106,54 @@ impl <'s> PlayerInternal<'s> { self.session.audio_file(file_id)), 0xa7)).unwrap()); decoder.as_mut().unwrap().time_seek(position as f64 / 1000f64).unwrap(); - let mut h = self.state.0.lock().unwrap(); - h.status = if play { - stream.start().unwrap(); - PlayStatus::kPlayStatusPlay - } else { - PlayStatus::kPlayStatusPause - }; - h.position_ms = position; - h.position_measured_at = util::now_ms(); - h.update_time = util::now_ms(); + self.update(|state| { + state.status = if play { + stream.start().unwrap(); + PlayStatus::kPlayStatusPlay + } else { + PlayStatus::kPlayStatusPause + }; + state.position_ms = position; + state.position_measured_at = util::now_ms(); + + return true; + }); println!("Load Done"); } Ok(PlayerCommand::Seek(ms)) => { - let mut h = self.state.0.lock().unwrap(); decoder.as_mut().unwrap().time_seek(ms as f64 / 1000f64).unwrap(); - h.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32; - h.position_measured_at = util::now_ms(); - h.update_time = util::now_ms(); + self.update(|state| { + state.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32; + state.position_measured_at = util::now_ms(); + return true; + }); }, Ok(PlayerCommand::Play) => { - println!("Play"); - let mut h = self.state.0.lock().unwrap(); - h.status = PlayStatus::kPlayStatusPlay; - h.update_time = util::now_ms(); + self.update(|state| { + state.status = PlayStatus::kPlayStatusPlay; + return true; + }); stream.start().unwrap(); }, Ok(PlayerCommand::Pause) => { - let mut h = self.state.0.lock().unwrap(); - h.status = PlayStatus::kPlayStatusPause; - h.update_time = util::now_ms(); + self.update(|state| { + state.status = PlayStatus::kPlayStatusPause; + state.update_time = util::now_ms(); + return true; + }); stream.stop().unwrap(); }, Ok(PlayerCommand::Stop) => { - let mut h = self.state.0.lock().unwrap(); - if h.status == PlayStatus::kPlayStatusPlay { - stream.stop().unwrap(); - } + self.update(|state| { + if state.status == PlayStatus::kPlayStatusPlay { + state.status = PlayStatus::kPlayStatusPause; + } + return true; + }); - h.status = PlayStatus::kPlayStatusPause; - h.update_time = util::now_ms(); + stream.stop().unwrap(); decoder = None; }, Err(..) => (), @@ -170,9 +173,17 @@ impl <'s> PlayerInternal<'s> { Err(e) => panic!("Vorbis error {:?}", e) } - let mut h = self.state.0.lock().unwrap(); - h.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32; - h.position_measured_at = util::now_ms(); + self.update(|state| { + let now = util::now_ms(); + + if now - state.position_measured_at > 5000 { + state.position_ms = (decoder.as_mut().unwrap().time_tell().unwrap() * 1000f64) as u32; + state.position_measured_at = now; + return true; + } else { + return false; + } + }); } } @@ -180,6 +191,17 @@ impl <'s> PlayerInternal<'s> { portaudio::terminate().unwrap(); } + + fn update(&self, f: F) + where F: FnOnce(&mut MutexGuard) -> bool { + let mut guard = self.state.0.lock().unwrap(); + let update = f(&mut guard); + if update { + guard.update_time = util::now_ms(); + self.state.1.notify_all(); + + } + } } impl <'s> SpircDelegate for Player<'s> { @@ -210,9 +232,24 @@ impl <'s> SpircDelegate for Player<'s> { self.state.0.lock().unwrap() } - fn wait_update<'a>(&'a self, guard: MutexGuard<'a, Self::State>) - -> MutexGuard<'a, Self::State> { - self.state.1.wait(guard).unwrap() + fn updates(&self) -> mpsc::Receiver { + let state = self.state.clone(); + let (update_tx, update_rx) = mpsc::channel(); + + thread::spawn(move || { + let mut guard = state.0.lock().unwrap(); + let mut last_update; + loop { + last_update = guard.update_time; + update_tx.send(guard.update_time).unwrap(); + + while last_update >= guard.update_time { + guard = state.1.wait(guard).unwrap(); + } + } + }); + + return update_rx; } } diff --git a/src/spirc.rs b/src/spirc.rs index c60400d3..17256bc6 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,11 +1,11 @@ use protobuf::{self, Message}; +use std::sync::{mpsc, MutexGuard}; use util; use session::Session; use util::SpotifyId; use util::version::version_string; use mercury::{MercuryRequest, MercuryMethod}; -use std::sync::MutexGuard; use librespot_protocol as protocol; pub use librespot_protocol::spirc::PlayStatus; @@ -47,8 +47,7 @@ pub trait SpircDelegate { fn stop(&self); fn state(&self) -> MutexGuard; - fn wait_update<'a>(&'a self, guard: MutexGuard<'a, Self::State>) - -> MutexGuard<'a, Self::State>; + fn updates(&self) -> mpsc::Receiver; } pub trait SpircState { @@ -89,33 +88,30 @@ impl <'s, D: SpircDelegate> SpircManager<'s, D> { pub fn run(&mut self) { let rx = self.session.mercury_sub(format!("hm://remote/user/{}/v23", self.username)); - - self.notify(None); + let updates = self.delegate.updates(); loop { - if let Ok(pkt) = rx.try_recv() { - let frame = protobuf::parse_from_bytes::( - pkt.payload.front().unwrap()).unwrap(); - println!("{:?} {} {} {} {}", - frame.get_typ(), - frame.get_device_state().get_name(), - frame.get_ident(), - frame.get_seq_nr(), - frame.get_state_update_id()); - if frame.get_ident() != self.ident && - (frame.get_recipient().len() == 0 || - frame.get_recipient().contains(&self.ident)) { - self.handle(frame); + select! { + pkt = rx.recv() => { + let frame = protobuf::parse_from_bytes::( + pkt.unwrap().payload.front().unwrap()).unwrap(); + println!("{:?} {} {} {} {}", + frame.get_typ(), + frame.get_device_state().get_name(), + frame.get_ident(), + frame.get_seq_nr(), + frame.get_state_update_id()); + if frame.get_ident() != self.ident && + (frame.get_recipient().len() == 0 || + frame.get_recipient().contains(&self.ident)) { + self.handle(frame); + } + }, + update_time = updates.recv() => { + self.state_update_id = update_time.unwrap(); + self.notify(None); } } - - if { - let state = self.delegate.state(); - state.update_time() > self.state_update_id - } { - self.state_update_id = util::now_ms(); - self.notify(None); - } } } @@ -170,7 +166,7 @@ impl <'s, D: SpircDelegate> SpircManager<'s, D> { recipient: protobuf::RepeatedField::from_vec( recipient.map(|r| vec![r.to_string()] ).unwrap_or(vec![]) ), - state_update_id: self.state_update_id + state_update_id: self.state_update_id as i64 }); if self.is_active { @@ -219,7 +215,7 @@ impl <'s, D: SpircDelegate> SpircManager<'s, D> { volume: self.volume as u32, name: self.name.clone(), error_code: 0, - became_active_at: if self.is_active { self.became_active_at } else { 0 }, + became_active_at: if self.is_active { self.became_active_at as i64 } else { 0 }, capabilities => [ @{ typ: protocol::spirc::CapabilityType::kCanBePlayer,