Fixed issue with time alignment

This commit is contained in:
devgianlu 2019-03-24 07:15:14 -07:00
parent 4e64934318
commit dc9b2de314
2 changed files with 59 additions and 38 deletions

View file

@ -1,8 +1,15 @@
use std;
use std::time::{SystemTime, UNIX_EPOCH};
use futures::{Async, Future, Poll, Sink, Stream};
use futures::future; use futures::future;
use futures::sync::{mpsc, oneshot}; use futures::sync::{mpsc, oneshot};
use futures::{Async, Future, Poll, Sink, Stream};
use protobuf::{self, Message}; use protobuf::{self, Message};
use rand;
use rand::seq::SliceRandom;
use serde_json;
use context::StationContext;
use core::config::ConnectConfig; use core::config::ConnectConfig;
use core::mercury::MercuryError; use core::mercury::MercuryError;
use core::session::Session; use core::session::Session;
@ -10,19 +17,10 @@ use core::spotify_id::SpotifyId;
use core::util::SeqGenerator; use core::util::SeqGenerator;
use core::version; use core::version;
use core::volume::Volume; use core::volume::Volume;
use protocol;
use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
use playback::mixer::Mixer; use playback::mixer::Mixer;
use playback::player::Player; use playback::player::Player;
use serde_json; use protocol;
use protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State};
use context::StationContext;
use rand;
use rand::seq::SliceRandom;
use std;
use std::time::{SystemTime, UNIX_EPOCH};
pub struct SpircTask { pub struct SpircTask {
player: Player, player: Player,
@ -64,14 +62,6 @@ pub struct Spirc {
commands: mpsc::UnboundedSender<SpircCommand>, commands: mpsc::UnboundedSender<SpircCommand>,
} }
fn now_ms() -> i64 {
let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(dur) => dur,
Err(err) => err.duration(),
};
(dur.as_secs() * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64
}
fn initial_state() -> State { fn initial_state() -> State {
let mut frame = protocol::spirc::State::new(); let mut frame = protocol::spirc::State::new();
frame.set_repeat(false); frame.set_repeat(false);
@ -404,6 +394,14 @@ impl Future for SpircTask {
} }
impl SpircTask { impl SpircTask {
fn now_ms(&mut self) -> i64 {
let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(dur) => dur,
Err(err) => err.duration(),
};
((dur.as_secs() + self.session.time_delta()) * 1000 + (dur.subsec_nanos() / 1000_000) as u64) as i64
}
fn handle_command(&mut self, cmd: SpircCommand) { fn handle_command(&mut self, cmd: SpircCommand) {
let active = self.device.get_is_active(); let active = self.device.get_is_active();
match cmd { match cmd {
@ -494,15 +492,17 @@ impl SpircTask {
MessageType::kMessageTypeLoad => { MessageType::kMessageTypeLoad => {
if !self.device.get_is_active() { if !self.device.get_is_active() {
let now = self.now_ms();
self.device.set_is_active(true); self.device.set_is_active(true);
self.device.set_became_active_at(now_ms()); self.device.set_became_active_at(now);
} }
self.update_tracks(&frame); self.update_tracks(&frame);
if self.state.get_track().len() > 0 { if self.state.get_track().len() > 0 {
let now = self.now_ms();
self.state.set_position_ms(frame.get_state().get_position_ms()); self.state.set_position_ms(frame.get_state().get_position_ms());
self.state.set_position_measured_at(now_ms() as u64); self.state.set_position_measured_at(now as u64);
let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay; let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
self.load_track(play); self.load_track(play);
@ -577,8 +577,9 @@ impl SpircTask {
MessageType::kMessageTypeSeek => { MessageType::kMessageTypeSeek => {
let position = frame.get_position(); let position = frame.get_position();
let now = self.now_ms();
self.state.set_position_ms(position); self.state.set_position_ms(position);
self.state.set_position_measured_at(now_ms() as u64); self.state.set_position_measured_at(now as u64);
self.player.seek(position); self.player.seek(position);
self.notify(None); self.notify(None);
} }
@ -611,7 +612,8 @@ impl SpircTask {
self.mixer.start(); self.mixer.start();
self.player.play(); self.player.play();
self.state.set_status(PlayStatus::kPlayStatusPlay); self.state.set_status(PlayStatus::kPlayStatusPlay);
self.state.set_position_measured_at(now_ms() as u64); let now = self.now_ms();
self.state.set_position_measured_at(now as u64);
} }
} }
@ -629,7 +631,7 @@ impl SpircTask {
self.mixer.stop(); self.mixer.stop();
self.state.set_status(PlayStatus::kPlayStatusPause); self.state.set_status(PlayStatus::kPlayStatusPause);
let now = now_ms() as u64; let now = self.now_ms() as u64;
let position = self.state.get_position_ms(); let position = self.state.get_position_ms();
let diff = now - self.state.get_position_measured_at(); let diff = now - self.state.get_position_measured_at();
@ -674,7 +676,8 @@ impl SpircTask {
} }
self.state.set_playing_track_index(new_index); self.state.set_playing_track_index(new_index);
self.state.set_position_ms(0); self.state.set_position_ms(0);
self.state.set_position_measured_at(now_ms() as u64); let now = self.now_ms();
self.state.set_position_measured_at(now as u64);
self.load_track(continue_playing); self.load_track(continue_playing);
} }
@ -710,14 +713,16 @@ impl SpircTask {
pos += 1; pos += 1;
} }
let now = self.now_ms();
self.state.set_playing_track_index(new_index); self.state.set_playing_track_index(new_index);
self.state.set_position_ms(0); self.state.set_position_ms(0);
self.state.set_position_measured_at(now_ms() as u64); self.state.set_position_measured_at(now as u64);
self.load_track(true); self.load_track(true);
} else { } else {
let now = self.now_ms();
self.state.set_position_ms(0); self.state.set_position_ms(0);
self.state.set_position_measured_at(now_ms() as u64); self.state.set_position_measured_at(now as u64);
self.player.seek(0); self.player.seek(0);
} }
} }
@ -744,7 +749,7 @@ impl SpircTask {
} }
fn position(&mut self) -> u32 { fn position(&mut self) -> u32 {
let diff = now_ms() as u64 - self.state.get_position_measured_at(); let diff = self.now_ms() as u64 - self.state.get_position_measured_at();
self.state.get_position_ms() + diff as u32 self.state.get_position_ms() + diff as u32
} }
@ -881,7 +886,7 @@ impl<'a> CommandSender<'a> {
frame.set_seq_nr(spirc.sequence.get()); frame.set_seq_nr(spirc.sequence.get());
frame.set_typ(cmd); frame.set_typ(cmd);
frame.set_device_state(spirc.device.clone()); frame.set_device_state(spirc.device.clone());
frame.set_state_update_id(now_ms()); frame.set_state_update_id(spirc.now_ms());
CommandSender { CommandSender {
spirc: spirc, spirc: spirc,
frame: frame, frame: frame,

View file

@ -1,24 +1,27 @@
use bytes::Bytes;
use futures::sync::mpsc;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use std::io; use std::io;
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::sync::{Arc, RwLock, Weak}; use std::sync::{Arc, RwLock, Weak};
use std::sync::atomic::{ATOMIC_USIZE_INIT, AtomicUsize, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use futures::sync::mpsc;
use tokio_core::reactor::{Handle, Remote}; use tokio_core::reactor::{Handle, Remote};
use apresolve::apresolve_or_fallback; use apresolve::apresolve_or_fallback;
use audio_key::AudioKeyManager;
use authentication::Credentials; use authentication::Credentials;
use cache::Cache; use cache::Cache;
use channel::ChannelManager;
use component::Lazy; use component::Lazy;
use config::SessionConfig; use config::SessionConfig;
use connection; use connection;
use audio_key::AudioKeyManager;
use channel::ChannelManager;
use mercury::MercuryManager; use mercury::MercuryManager;
struct SessionData { struct SessionData {
country: String, country: String,
time_delta: u64,
canonical_username: String, canonical_username: String,
invalid: bool, invalid: bool,
} }
@ -108,6 +111,7 @@ impl Session {
country: String::new(), country: String::new(),
canonical_username: username, canonical_username: username,
invalid: false, invalid: false,
time_delta: 0,
}), }),
tx_connection: sender_tx, tx_connection: sender_tx,
@ -146,6 +150,10 @@ impl Session {
self.0.mercury.get(|| MercuryManager::new(self.weak())) self.0.mercury.get(|| MercuryManager::new(self.weak()))
} }
pub fn time_delta(&self) -> u64 {
self.0.data.read().unwrap().time_delta
}
pub fn spawn<F, R>(&self, f: F) pub fn spawn<F, R>(&self, f: F)
where where
F: FnOnce(&Handle) -> R + Send + 'static, F: FnOnce(&Handle) -> R + Send + 'static,
@ -168,8 +176,16 @@ impl Session {
fn dispatch(&self, cmd: u8, data: Bytes) { fn dispatch(&self, cmd: u8, data: Bytes) {
match cmd { match cmd {
0x4 => { 0x4 => {
let server_timestamp = BigEndian::read_u32(data.as_ref()) as u64;
let timestamp = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(dur) => dur,
Err(err) => err.duration(),
}.as_secs() as u64;
self.0.data.write().unwrap().time_delta = server_timestamp - timestamp;
self.debug_info(); self.debug_info();
self.send_packet(0x49, data.as_ref().to_owned()); self.send_packet(0x49, vec![0, 0, 0, 0]);
} }
0x4a => (), 0x4a => (),
0x1b => { 0x1b => {