diff --git a/src/main.rs b/src/main.rs index c8883d58..d1e13e6e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,14 +9,13 @@ extern crate tokio_core; use env_logger::LogBuilder; use std::io::{stderr, Write}; use std::process::exit; -use std::thread; use std::env; use std::path::PathBuf; use std::str::FromStr; use futures::Future; use tokio_core::reactor::Core; -use librespot::spirc::SpircManager; +use librespot::spirc::Spirc; use librespot::authentication::{get_credentials, Credentials}; use librespot::audio_backend::{self, Sink, BACKENDS}; use librespot::cache::{Cache, DefaultCache, NoCache}; @@ -158,22 +157,19 @@ fn main() { let connection = Session::connect(config, credentials, cache, handle); - let task = connection.and_then(move |(session, task)| { + let task = connection.and_then(move |session| { let player = Player::new(session.clone(), move || { (backend)(device) }); - let spirc = SpircManager::new(session.clone(), player); - let spirc_signal = spirc.clone(); + let (spirc, task) = Spirc::new(session.clone(), player); + let spirc = ::std::cell::RefCell::new(spirc); ctrlc::set_handler(move || { - spirc_signal.send_goodbye(); - exit(0); + spirc.borrow_mut().shutdown(); }); - thread::spawn(move || spirc.run()); - - task + task.map_err(|()| panic!("spirc error")) }); core.run(task).unwrap() diff --git a/src/mercury/sender.rs b/src/mercury/sender.rs index be660db3..67b4dc08 100644 --- a/src/mercury/sender.rs +++ b/src/mercury/sender.rs @@ -20,6 +20,16 @@ impl MercurySender { } } +impl Clone for MercurySender { + fn clone(&self) -> MercurySender { + MercurySender { + mercury: self.mercury.clone(), + uri: self.uri.clone(), + pending: VecDeque::new(), + } + } +} + impl Sink for MercurySender { type SinkItem = Vec; type SinkError = MercuryError; diff --git a/src/player.rs b/src/player.rs index 9ccc8b16..20600281 100644 --- a/src/player.rs +++ b/src/player.rs @@ -1,9 +1,11 @@ -use std::borrow::Cow; -use std::sync::{mpsc, Mutex, Arc, MutexGuard}; -use std::thread; -use std::io::{Read, Seek}; -use vorbis; use futures::{future, Future}; +use futures::sync::mpsc; +use std; +use std::borrow::Cow; +use std::io::{Read, Seek}; +use std::sync::{Mutex, Arc, MutexGuard}; +use std::thread; +use vorbis; use audio_file::AudioFile; use audio_decrypt::AudioDecrypt; @@ -33,14 +35,12 @@ fn vorbis_time_tell_ms(decoder: &mut vorbis::Decoder) -> Result; - #[derive(Clone)] pub struct Player { state: Arc>, - observers: Arc>>, + observers: Arc>>>, - commands: mpsc::Sender, + commands: std::sync::mpsc::Sender, } #[derive(Clone)] @@ -57,10 +57,10 @@ pub struct PlayerState { struct PlayerInternal { state: Arc>, - observers: Arc>>, + observers: Arc>>>, session: Session, - commands: mpsc::Receiver, + commands: std::sync::mpsc::Receiver, } #[derive(Debug)] @@ -77,7 +77,7 @@ enum PlayerCommand { impl Player { pub fn new(session: Session, sink_builder: F) -> Player where F: FnOnce() -> Box + Send + 'static { - let (cmd_tx, cmd_rx) = mpsc::channel(); + let (cmd_tx, cmd_rx) = std::sync::mpsc::channel(); let state = Arc::new(Mutex::new(PlayerState { status: PlayStatus::kPlayStatusStop, @@ -143,8 +143,11 @@ impl Player { self.command(PlayerCommand::Volume(vol)); } - pub fn add_observer(&self, observer: PlayerObserver) { - self.observers.lock().unwrap().push(observer); + pub fn observe(&self) -> mpsc::UnboundedReceiver { + let (tx, rx) = mpsc::unbounded(); + self.observers.lock().unwrap().push(tx); + + rx } } @@ -419,7 +422,7 @@ impl PlayerInternal { drop(guard); for observer in observers.iter() { - observer(&state); + observer.send(state.clone()).unwrap(); } } } diff --git a/src/session.rs b/src/session.rs index 3c50d1e7..5b9945b2 100644 --- a/src/session.rs +++ b/src/session.rs @@ -85,7 +85,7 @@ pub fn device_id(name: &str) -> String { impl Session { pub fn connect(config: Config, credentials: Credentials, cache: Box, handle: Handle) - -> Box), Error=io::Error>> + -> Box> { let access_point = apresolve_or_fallback::(&handle); @@ -108,7 +108,9 @@ impl Session { &handle, transport, config, cache, reusable_credentials.username.clone() ); - (session, task) + handle.spawn(task.map_err(|e| panic!(e))); + + session }); Box::new(result) diff --git a/src/spirc.rs b/src/spirc.rs index f226c029..27959ed4 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,26 +1,23 @@ use protobuf::{self, Message, RepeatedField}; use std::borrow::Cow; -use std::sync::{Mutex, Arc}; -use std::collections::HashMap; +use futures::{Future, Stream, Sink, Async, Poll}; +use futures::stream::BoxStream; +use futures::sink::BoxSink; +use futures::sync::mpsc; +use mercury::MercuryError; use player::{Player, PlayerState}; use session::Session; -use util; -use util::SpotifyId; +use util::{now_ms, SpotifyId, SeqGenerator}; use version; -use futures::{Future, Stream}; use protocol; -pub use protocol::spirc::{PlayStatus, MessageType}; +pub use protocol::spirc::{PlayStatus, MessageType, Frame}; -#[derive(Clone)] -pub struct SpircManager(Arc>); - -struct SpircInternal { +pub struct SpircTask { player: Player, - session: Session, - seq_nr: u32, + sequence: SeqGenerator, name: String, ident: String, @@ -39,19 +36,49 @@ struct SpircInternal { tracks: Vec, index: u32, - devices: HashMap, + subscription: BoxStream, + sender: BoxSink, + + updates: mpsc::UnboundedReceiver, + commands: mpsc::UnboundedReceiver, + + shutdown: bool, } -impl SpircManager { - pub fn new(session: Session, player: Player) -> SpircManager { +pub enum SpircCommand { + Shutdown +} + +pub struct Spirc { + commands: mpsc::UnboundedSender, +} + +impl Spirc { + pub fn new(session: Session, player: Player) -> (Spirc, SpircTask) { let ident = session.device_id().to_owned(); let name = session.config().name.clone(); - SpircManager(Arc::new(Mutex::new(SpircInternal { - player: player, - session: session, + let uri = format!("hm://remote/user/{}", session.username()); - seq_nr: 0, + let subscription = session.mercury().subscribe(&uri as &str); + let subscription = subscription.map(|stream| stream.map_err(|_| MercuryError)).flatten_stream(); + let subscription = subscription.map(|response| -> Frame { + let data = response.payload.first().unwrap(); + protobuf::parse_from_bytes(data).unwrap() + }).boxed(); + + let sender = Box::new(session.mercury().sender(uri).with(|frame: Frame| { + Ok(frame.write_to_bytes().unwrap()) + })); + + let updates = player.observe(); + + let (cmd_tx, cmd_rx) = mpsc::unbounded(); + + let mut task = SpircTask { + player: player, + + sequence: SeqGenerator::new(1), name: name, ident: ident, @@ -70,125 +97,111 @@ impl SpircManager { tracks: Vec::new(), index: 0, - devices: HashMap::new(), - }))) - } + subscription: subscription, + sender: sender, + updates: updates, + commands: cmd_rx, - pub fn run(&self) { - let rx = { - let mut internal = self.0.lock().unwrap(); - - let rx = internal.session.mercury().subscribe(internal.uri()); - let rx = rx.map_err(|_| ()).flatten_stream().wait(); - - internal.notify(true, None); - - // Use a weak pointer to avoid creating an Rc cycle between the player and the - // SpircManager - let _self = Arc::downgrade(&self.0); - internal.player.add_observer(Box::new(move |state| { - if let Some(_self) = _self.upgrade() { - let mut internal = _self.lock().unwrap(); - internal.on_update(state); - } - })); - - rx + shutdown: false, }; - for pkt in rx { - let data = pkt.as_ref().unwrap().payload.first().unwrap(); - let frame = protobuf::parse_from_bytes::(data).unwrap(); + let spirc = Spirc { + commands: cmd_tx, + }; - debug!("{:?} {:?} {} {} {}", - frame.get_typ(), - frame.get_device_state().get_name(), - frame.get_ident(), - frame.get_seq_nr(), - frame.get_state_update_id()); + task.notify(true, None); - self.0.lock().unwrap().handle(frame); - } + (spirc, task) } - pub fn devices(&self) -> HashMap { - self.0.lock().unwrap().devices.clone() - } - - pub fn send_play(&self, recipient: &str) { - let mut internal = self.0.lock().unwrap(); - CommandSender::new(&mut *internal, MessageType::kMessageTypePlay) - .recipient(recipient) - .send(); - } - - pub fn send_pause(&self, recipient: &str) { - let mut internal = self.0.lock().unwrap(); - CommandSender::new(&mut *internal, MessageType::kMessageTypePause) - .recipient(recipient) - .send(); - } - - pub fn send_prev(&self, recipient: &str) { - let mut internal = self.0.lock().unwrap(); - CommandSender::new(&mut *internal, MessageType::kMessageTypePrev) - .recipient(recipient) - .send(); - } - - pub fn send_next(&self, recipient: &str) { - let mut internal = self.0.lock().unwrap(); - CommandSender::new(&mut *internal, MessageType::kMessageTypeNext) - .recipient(recipient) - .send(); - } - - pub fn send_replace_tracks>(&mut self, - recipient: &str, - track_ids: I) { - let state = track_ids_to_state(track_ids); - let mut internal = self.0.lock().unwrap(); - CommandSender::new(&mut *internal, MessageType::kMessageTypeReplace) - .recipient(recipient) - .state(state) - .send(); - } - - pub fn send_load_tracks>(&mut self, - recipient: &str, - track_ids: I) { - let state = track_ids_to_state(track_ids); - let mut internal = self.0.lock().unwrap(); - CommandSender::new(&mut *internal, MessageType::kMessageTypeLoad) - .recipient(recipient) - .state(state) - .send(); - } - - pub fn send_goodbye(&self) { - let mut internal = self.0.lock().unwrap(); - CommandSender::new(&mut *internal, MessageType::kMessageTypeGoodbye) - .send(); - } - - pub fn get_queue(&self) -> Vec { - self.0.lock().unwrap().tracks.clone() + pub fn shutdown(&mut self) { + mpsc::UnboundedSender::send(&mut self.commands, SpircCommand::Shutdown).unwrap(); } } -impl SpircInternal { - fn on_update(&mut self, player_state: &PlayerState) { +impl Future for SpircTask { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + let mut progress = false; + + if !self.shutdown { + match self.subscription.poll().unwrap() { + Async::Ready(Some(frame)) => { + progress = true; + self.handle_frame(frame); + } + Async::Ready(None) => panic!("subscription terminated"), + Async::NotReady => (), + } + + match self.updates.poll().unwrap() { + Async::Ready(Some(state)) => { + progress = true; + self.handle_update(state); + } + Async::Ready(None) => panic!("player terminated"), + Async::NotReady => (), + } + + match self.commands.poll().unwrap() { + Async::Ready(Some(command)) => { + progress = true; + self.handle_command(command); + } + Async::Ready(None) => (), + Async::NotReady => (), + } + } + + let poll_sender = self.sender.poll_complete().unwrap(); + + // Only shutdown once we've flushed out all our messages + if self.shutdown && poll_sender.is_ready() { + return Ok(Async::Ready(())); + } + + if !progress { + + return Ok(Async::NotReady); + } + } + } +} + +impl SpircTask { + fn handle_update(&mut self, player_state: PlayerState) { let end_of_track = player_state.end_of_track(); if end_of_track { self.index = (self.index + 1) % self.tracks.len() as u32; let track = self.tracks[self.index as usize]; self.player.load(track, true, 0); } else { - self.notify_with_player_state(false, None, player_state); + self.notify_with_player_state(false, None, &player_state); } } - fn handle(&mut self, frame: protocol::spirc::Frame) { + fn handle_command(&mut self, cmd: SpircCommand) { + match cmd { + SpircCommand::Shutdown => { + CommandSender::new(self, MessageType::kMessageTypeGoodbye).send(); + self.shutdown = true; + self.commands.close(); + self.updates.close(); + } + } + } + + fn handle_frame(&mut self, frame: Frame) { + debug!("{:?} {:?} {} {} {}", + frame.get_typ(), + frame.get_device_state().get_name(), + frame.get_ident(), + frame.get_seq_nr(), + frame.get_state_update_id()); + if frame.get_ident() == self.ident || (frame.get_recipient().len() > 0 && !frame.get_recipient().contains(&self.ident)) { return; @@ -199,11 +212,6 @@ impl SpircInternal { self.last_command_msgid = frame.get_seq_nr(); } - if frame.has_ident() && !frame.has_goodbye() && frame.has_device_state() { - self.devices.insert(frame.get_ident().into(), - frame.get_device_state().get_name().into()); - } - match frame.get_typ() { MessageType::kMessageTypeHello => { self.notify(false, Some(frame.get_ident())); @@ -211,7 +219,7 @@ impl SpircInternal { MessageType::kMessageTypeLoad => { if !self.is_active { self.is_active = true; - self.became_active_at = util::now_ms(); + self.became_active_at = now_ms(); } self.reload_tracks(&frame); @@ -255,11 +263,7 @@ impl SpircInternal { MessageType::kMessageTypeVolume => { self.player.volume(frame.get_volume() as u16); } - MessageType::kMessageTypeGoodbye => { - if frame.has_ident() { - self.devices.remove(frame.get_ident()); - } - } + MessageType::kMessageTypeGoodbye => (), _ => (), } } @@ -388,14 +392,10 @@ impl SpircInternal { ], }) } - - fn uri(&self) -> String { - format!("hm://remote/user/{}", self.session.username()) - } } struct CommandSender<'a> { - spirc_internal: &'a mut SpircInternal, + spirc: &'a mut SpircTask, cmd: MessageType, recipient: Option<&'a str>, player_state: Option<&'a PlayerState>, @@ -403,9 +403,9 @@ struct CommandSender<'a> { } impl<'a> CommandSender<'a> { - fn new(spirc_internal: &'a mut SpircInternal, cmd: MessageType) -> CommandSender { + fn new(spirc: &'a mut SpircTask, cmd: MessageType) -> CommandSender { CommandSender { - spirc_internal: spirc_internal, + spirc: spirc, cmd: cmd, recipient: None, player_state: None, @@ -423,6 +423,7 @@ impl<'a> CommandSender<'a> { self } + #[allow(dead_code)] fn state(mut self, s: protocol::spirc::State) -> CommandSender<'a> { self.state = Some(s); self @@ -430,35 +431,34 @@ impl<'a> CommandSender<'a> { fn send(self) { let state = self.player_state.map_or_else(|| { - Cow::Owned(self.spirc_internal.player.state()) + Cow::Owned(self.spirc.player.state()) }, |s| { Cow::Borrowed(s) }); - let mut pkt = protobuf_init!(protocol::spirc::Frame::new(), { + let mut frame = protobuf_init!(Frame::new(), { version: 1, - ident: self.spirc_internal.ident.clone(), + ident: self.spirc.ident.clone(), protocol_version: "2.0.0", - seq_nr: { self.spirc_internal.seq_nr += 1; self.spirc_internal.seq_nr }, + seq_nr: self.spirc.sequence.get(), typ: self.cmd, recipient: RepeatedField::from_vec( self.recipient.map(|r| vec![r.to_owned()] ).unwrap_or(vec![]) ), - device_state: self.spirc_internal.device_state(&state), + device_state: self.spirc.device_state(&state), state_update_id: state.update_time() }); - if self.spirc_internal.is_active { - pkt.set_state(self.spirc_internal.spirc_state(&state)); + if self.spirc.is_active { + frame.set_state(self.spirc.spirc_state(&state)); } - let payload = pkt.write_to_bytes().unwrap(); - let uri = self.spirc_internal.uri(); - self.spirc_internal.session.mercury() - .send(uri, payload).wait().unwrap(); + let ready = self.spirc.sender.start_send(frame).unwrap().is_ready(); + assert!(ready); } } +#[allow(dead_code)] fn track_ids_to_state>(track_ids: I) -> protocol::spirc::State { let tracks: Vec = track_ids.map(|i| {