diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 4bb53e77..a99a47e3 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -1,8 +1,15 @@ +use std; +use std::time::{SystemTime, UNIX_EPOCH}; + +use futures::{Async, Future, Poll, Sink, Stream}; use futures::future; use futures::sync::{mpsc, oneshot}; -use futures::{Async, Future, Poll, Sink, Stream}; use protobuf::{self, Message}; +use rand; +use rand::seq::SliceRandom; +use serde_json; +use context::StationContext; use core::config::ConnectConfig; use core::mercury::MercuryError; use core::session::Session; @@ -10,19 +17,10 @@ use core::spotify_id::SpotifyId; use core::util::SeqGenerator; use core::version; use core::volume::Volume; - -use protocol; -use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State}; - use playback::mixer::Mixer; use playback::player::Player; -use serde_json; - -use context::StationContext; -use rand; -use rand::seq::SliceRandom; -use std; -use std::time::{SystemTime, UNIX_EPOCH}; +use protocol; +use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State}; pub struct SpircTask { player: Player, @@ -64,14 +62,6 @@ pub struct Spirc { commands: mpsc::UnboundedSender, } -fn now_ms() -> i64 { - let dur = match SystemTime::now().duration_since(UNIX_EPOCH) { - Ok(dur) => dur, - Err(err) => err.duration(), - }; - (dur.as_secs() * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64 -} - fn initial_state() -> State { let mut frame = protocol::spirc::State::new(); frame.set_repeat(false); @@ -404,6 +394,14 @@ impl Future for SpircTask { } impl SpircTask { + fn now_ms(&mut self) -> i64 { + let dur = match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(dur) => dur, + Err(err) => err.duration(), + }; + ((dur.as_secs() + self.session.time_delta()) * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64 + } + fn handle_command(&mut self, cmd: SpircCommand) { let active = self.device.get_is_active(); match cmd { @@ -494,15 +492,17 @@ impl SpircTask { MessageType::kMessageTypeLoad => { if !self.device.get_is_active() { + let now = self.now_ms(); self.device.set_is_active(true); - self.device.set_became_active_at(now_ms()); + self.device.set_became_active_at(now); } self.update_tracks(&frame); if self.state.get_track().len() > 0 { + let now = self.now_ms(); self.state.set_position_ms(frame.get_state().get_position_ms()); - self.state.set_position_measured_at(now_ms() as u64); + self.state.set_position_measured_at(now as u64); let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay; self.load_track(play); @@ -577,8 +577,9 @@ impl SpircTask { MessageType::kMessageTypeSeek => { let position = frame.get_position(); + let now = self.now_ms(); self.state.set_position_ms(position); - self.state.set_position_measured_at(now_ms() as u64); + self.state.set_position_measured_at(now as u64); self.player.seek(position); self.notify(None); } @@ -611,7 +612,8 @@ impl SpircTask { self.mixer.start(); self.player.play(); self.state.set_status(PlayStatus::kPlayStatusPlay); - self.state.set_position_measured_at(now_ms() as u64); + let now = self.now_ms(); + self.state.set_position_measured_at(now as u64); } } @@ -629,7 +631,7 @@ impl SpircTask { self.mixer.stop(); self.state.set_status(PlayStatus::kPlayStatusPause); - let now = now_ms() as u64; + let now = self.now_ms() as u64; let position = self.state.get_position_ms(); let diff = now - self.state.get_position_measured_at(); @@ -674,7 +676,8 @@ impl SpircTask { } self.state.set_playing_track_index(new_index); self.state.set_position_ms(0); - self.state.set_position_measured_at(now_ms() as u64); + let now = self.now_ms(); + self.state.set_position_measured_at(now as u64); self.load_track(continue_playing); } @@ -710,14 +713,16 @@ impl SpircTask { pos += 1; } + let now = self.now_ms(); self.state.set_playing_track_index(new_index); self.state.set_position_ms(0); - self.state.set_position_measured_at(now_ms() as u64); + self.state.set_position_measured_at(now as u64); self.load_track(true); } else { + let now = self.now_ms(); self.state.set_position_ms(0); - self.state.set_position_measured_at(now_ms() as u64); + self.state.set_position_measured_at(now as u64); self.player.seek(0); } } @@ -744,7 +749,7 @@ impl SpircTask { } fn position(&mut self) -> u32 { - let diff = now_ms() as u64 - self.state.get_position_measured_at(); + let diff = self.now_ms() as u64 - self.state.get_position_measured_at(); self.state.get_position_ms() + diff as u32 } @@ -881,7 +886,7 @@ impl<'a> CommandSender<'a> { frame.set_seq_nr(spirc.sequence.get()); frame.set_typ(cmd); frame.set_device_state(spirc.device.clone()); - frame.set_state_update_id(now_ms()); + frame.set_state_update_id(spirc.now_ms()); CommandSender { spirc: spirc, frame: frame, diff --git a/core/src/session.rs b/core/src/session.rs index 931b60c7..13042965 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -1,24 +1,27 @@ -use bytes::Bytes; -use futures::sync::mpsc; -use futures::{Async, Future, IntoFuture, Poll, Stream}; use std::io; -use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT}; use std::sync::{Arc, RwLock, Weak}; +use std::sync::atomic::{ATOMIC_USIZE_INIT, AtomicUsize, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use byteorder::{BigEndian, ByteOrder}; +use bytes::Bytes; +use futures::{Async, Future, IntoFuture, Poll, Stream}; +use futures::sync::mpsc; use tokio_core::reactor::{Handle, Remote}; use apresolve::apresolve_or_fallback; +use audio_key::AudioKeyManager; use authentication::Credentials; use cache::Cache; +use channel::ChannelManager; use component::Lazy; use config::SessionConfig; use connection; - -use audio_key::AudioKeyManager; -use channel::ChannelManager; use mercury::MercuryManager; struct SessionData { country: String, + time_delta: u64, canonical_username: String, invalid: bool, } @@ -108,6 +111,7 @@ impl Session { country: String::new(), canonical_username: username, invalid: false, + time_delta: 0, }), tx_connection: sender_tx, @@ -146,6 +150,10 @@ impl Session { self.0.mercury.get(|| MercuryManager::new(self.weak())) } + pub fn time_delta(&self) -> u64 { + self.0.data.read().unwrap().time_delta + } + pub fn spawn(&self, f: F) where F: FnOnce(&Handle) -> R + Send + 'static, @@ -168,8 +176,16 @@ impl Session { fn dispatch(&self, cmd: u8, data: Bytes) { match cmd { 0x4 => { + let server_timestamp = BigEndian::read_u32(data.as_ref()) as u64; + let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(dur) => dur, + Err(err) => err.duration(), + }.as_secs() as u64; + + self.0.data.write().unwrap().time_delta = server_timestamp - timestamp; + self.debug_info(); - self.send_packet(0x49, data.as_ref().to_owned()); + self.send_packet(0x49, vec![0, 0, 0, 0]); } 0x4a => (), 0x1b => {