Prepare for gapless play.

- change communication between player and spirc to use player events channel.
- enhance player events channel
- have spirc send loading messages to Spotify
- enable preloading of tracks in the player
This commit is contained in:
Konstantin Seiler 2020-02-01 08:41:11 +11:00
parent 3672214e31
commit 5784b4652c
6 changed files with 1375 additions and 398 deletions

View file

@ -144,6 +144,15 @@ impl StreamLoaderController {
}
}
pub fn range_to_end_available(&self) -> bool {
if let Some(ref shared) = self.stream_shared {
let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
self.range_available(Range::new(read_position, self.len() - read_position))
} else {
true
}
}
pub fn ping_time_ms(&self) -> usize {
if let Some(ref shared) = self.stream_shared {
return shared.ping_time_ms.load(atomic::Ordering::Relaxed);

View file

@ -2,7 +2,7 @@ use std;
use std::time::{SystemTime, UNIX_EPOCH};
use futures::future;
use futures::sync::{mpsc, oneshot};
use futures::sync::mpsc;
use futures::{Async, Future, Poll, Sink, Stream};
use protobuf::{self, Message};
use rand;
@ -11,7 +11,7 @@ use serde_json;
use crate::context::StationContext;
use crate::playback::mixer::Mixer;
use crate::playback::player::Player;
use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel};
use crate::protocol;
use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
use librespot_core::config::ConnectConfig;
@ -22,6 +22,14 @@ use librespot_core::util::SeqGenerator;
use librespot_core::version;
use librespot_core::volume::Volume;
enum SpircPlayStatus {
Stopped,
LoadingPlay { position_ms: u32 },
LoadingPause { position_ms: u32 },
Playing { nominal_start_time: i64 },
Paused { position_ms: u32 },
}
pub struct SpircTask {
player: Player,
mixer: Box<dyn Mixer>,
@ -32,11 +40,14 @@ pub struct SpircTask {
ident: String,
device: DeviceState,
state: State,
play_request_id: Option<u64>,
mixer_started: bool,
play_status: SpircPlayStatus,
subscription: Box<dyn Stream<Item = Frame, Error = MercuryError>>,
sender: Box<dyn Sink<SinkItem = Frame, SinkError = MercuryError>>,
commands: mpsc::UnboundedReceiver<SpircCommand>,
end_of_track: Box<dyn Future<Item = (), Error = oneshot::Canceled>>,
player_events: PlayerEventChannel,
shutdown: bool,
session: Session,
@ -255,6 +266,8 @@ impl Spirc {
};
let device = initial_device_state(config);
let player_events = player.get_player_event_channel();
let mut task = SpircTask {
player: player,
mixer: mixer,
@ -266,11 +279,14 @@ impl Spirc {
device: device,
state: initial_state(),
play_request_id: None,
mixer_started: false,
play_status: SpircPlayStatus::Stopped,
subscription: subscription,
sender: sender,
commands: cmd_rx,
end_of_track: Box::new(future::empty()),
player_events: player_events,
shutdown: false,
session: session.clone(),
@ -350,13 +366,14 @@ impl Future for SpircTask {
Async::NotReady => (),
}
match self.end_of_track.poll() {
Ok(Async::Ready(())) => {
progress = true;
self.handle_end_of_track();
}
match self.player_events.poll() {
Ok(Async::NotReady) => (),
Err(oneshot::Canceled) => self.end_of_track = Box::new(future::empty()),
Ok(Async::Ready(None)) => (),
Err(_) => (),
Ok(Async::Ready(Some(event))) => {
progress = true;
self.handle_player_event(event);
}
}
// TODO: Refactor
match self.context_fut.poll() {
@ -431,6 +448,26 @@ impl SpircTask {
+ (dur.subsec_nanos() / 1000_000) as i64)
}
fn ensure_mixer_started(&mut self) {
if !self.mixer_started {
self.mixer.start();
self.mixer_started = true;
}
}
fn ensure_mixer_stopped(&mut self) {
if self.mixer_started {
self.mixer.stop();
self.mixer_started = false;
}
}
fn update_state_position(&mut self, position_ms: u32) {
let now = self.now_ms();
self.state.set_position_measured_at(now as u64);
self.state.set_position_ms(position_ms);
}
fn handle_command(&mut self, cmd: SpircCommand) {
let active = self.device.get_is_active();
match cmd {
@ -498,14 +535,112 @@ impl SpircTask {
}
}
fn handle_player_event(&mut self, event: PlayerEvent) {
// we only process events if the play_request_id matches. If it doesn't, it is
// an event that belongs to a previous track and only arrives now due to a race
// condition. In this case we have updated the state already and don't want to
// mess with it.
if let Some(play_request_id) = event.get_play_request_id() {
if Some(play_request_id) == self.play_request_id {
match event {
PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track(),
PlayerEvent::Loading { .. } => (),
PlayerEvent::Playing { position_ms, .. } => {
let new_nominal_start_time =
self.now_ms() - position_ms as i64;
match self.play_status {
SpircPlayStatus::Playing { nominal_start_time } => {
if (nominal_start_time - new_nominal_start_time)
.abs()
> 100
{
self.update_state_position(position_ms);
self.notify(None);
self.play_status = SpircPlayStatus::Playing {
nominal_start_time: new_nominal_start_time,
};
}
}
SpircPlayStatus::LoadingPlay { .. }
| SpircPlayStatus::LoadingPause { .. } => {
self.state.set_status(PlayStatus::kPlayStatusPlay);
self.update_state_position(position_ms);
self.notify(None);
self.play_status = SpircPlayStatus::Playing {
nominal_start_time: new_nominal_start_time,
};
}
_ => (),
};
trace!("==> kPlayStatusPlay");
}
PlayerEvent::Paused {
position_ms: new_position_ms,
..
} => {
match self.play_status {
SpircPlayStatus::Paused {
ref mut position_ms,
} => {
if *position_ms != new_position_ms {
*position_ms = new_position_ms;
self.update_state_position(new_position_ms);
self.notify(None);
}
}
SpircPlayStatus::LoadingPlay { .. }
| SpircPlayStatus::LoadingPause { .. } => {
self.state.set_status(PlayStatus::kPlayStatusPause);
self.update_state_position(new_position_ms);
self.notify(None);
self.play_status = SpircPlayStatus::Paused {
position_ms: new_position_ms,
};
}
_ => (),
}
trace!("==> kPlayStatusPause");
}
PlayerEvent::Stopped { .. } => match self.play_status {
SpircPlayStatus::Stopped => (),
_ => {
warn!("The player has stopped unexpentedly.");
self.state.set_status(PlayStatus::kPlayStatusStop);
self.ensure_mixer_stopped();
self.notify(None);
self.play_status = SpircPlayStatus::Stopped;
}
},
PlayerEvent::TimeToPreloadNextTrack {..} => match self.play_status {
SpircPlayStatus::Paused {..}|SpircPlayStatus::Playing {..}| SpircPlayStatus::LoadingPause{..}|SpircPlayStatus::LoadingPlay {..} => {
if let Some(track_id) = self.preview_next_track() {
self.player.preload(track_id);
}
}
SpircPlayStatus::Stopped => (),
}
_ => (),
}
}
}
}
fn handle_frame(&mut self, frame: Frame) {
let state_string = match frame.get_state().get_status() {
PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",
PlayStatus::kPlayStatusPause => "kPlayStatusPause",
PlayStatus::kPlayStatusStop => "kPlayStatusStop",
PlayStatus::kPlayStatusPlay => "kPlayStatusPlay",
};
debug!(
"{:?} {:?} {} {} {}",
"{:?} {:?} {} {} {} {}",
frame.get_typ(),
frame.get_device_state().get_name(),
frame.get_ident(),
frame.get_seq_nr(),
frame.get_state_update_id()
frame.get_state_update_id(),
state_string,
);
if frame.get_ident() == self.ident
@ -529,18 +664,15 @@ impl SpircTask {
self.update_tracks(&frame);
if self.state.get_track().len() > 0 {
let now = self.now_ms();
self.state
.set_position_ms(frame.get_state().get_position_ms());
self.state.set_position_measured_at(now as u64);
let play = frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
self.load_track(play);
let start_playing =
frame.get_state().get_status() == PlayStatus::kPlayStatusPlay;
self.load_track(start_playing, frame.get_state().get_position_ms());
} else {
info!("No more tracks left in queue");
self.state.set_status(PlayStatus::kPlayStatusStop);
self.player.stop();
self.mixer.stop();
self.play_status = SpircPlayStatus::Stopped;
}
self.notify(None);
@ -607,12 +739,7 @@ impl SpircTask {
}
MessageType::kMessageTypeSeek => {
let position = frame.get_position();
let now = self.now_ms();
self.state.set_position_ms(position);
self.state.set_position_measured_at(now as u64);
self.player.seek(position);
self.handle_seek(frame.get_position());
self.notify(None);
}
@ -631,7 +758,8 @@ impl SpircTask {
self.device.set_is_active(false);
self.state.set_status(PlayStatus::kPlayStatusStop);
self.player.stop();
self.mixer.stop();
self.ensure_mixer_stopped();
self.play_status = SpircPlayStatus::Stopped;
}
}
@ -640,39 +768,75 @@ impl SpircTask {
}
fn handle_play(&mut self) {
if self.state.get_status() == PlayStatus::kPlayStatusPause {
self.mixer.start();
self.player.play();
self.state.set_status(PlayStatus::kPlayStatusPlay);
let now = self.now_ms();
self.state.set_position_measured_at(now as u64);
match self.play_status {
SpircPlayStatus::Paused { position_ms } => {
self.ensure_mixer_started();
self.player.play();
self.state.set_status(PlayStatus::kPlayStatusPlay);
self.update_state_position(position_ms);
self.play_status = SpircPlayStatus::Playing {
nominal_start_time: self.now_ms() as i64 - position_ms as i64,
};
}
SpircPlayStatus::LoadingPause { position_ms } => {
self.ensure_mixer_started();
self.player.play();
self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
}
_ => (),
}
}
fn handle_play_pause(&mut self) {
match self.state.get_status() {
PlayStatus::kPlayStatusPlay => self.handle_pause(),
PlayStatus::kPlayStatusPause => self.handle_play(),
match self.play_status {
SpircPlayStatus::Paused { .. } | SpircPlayStatus::LoadingPause { .. } => {
self.handle_play()
}
SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => {
self.handle_play()
}
_ => (),
}
}
fn handle_pause(&mut self) {
if self.state.get_status() == PlayStatus::kPlayStatusPlay {
self.player.pause();
self.mixer.stop();
self.state.set_status(PlayStatus::kPlayStatusPause);
let now = self.now_ms() as u64;
let position = self.state.get_position_ms();
let diff = now - self.state.get_position_measured_at();
self.state.set_position_ms(position + diff as u32);
self.state.set_position_measured_at(now);
match self.play_status {
SpircPlayStatus::Playing { nominal_start_time } => {
self.player.pause();
self.state.set_status(PlayStatus::kPlayStatusPause);
let position_ms = (self.now_ms() - nominal_start_time) as u32;
self.update_state_position(position_ms);
self.play_status = SpircPlayStatus::Paused { position_ms };
}
SpircPlayStatus::LoadingPlay { position_ms } => {
self.player.pause();
self.play_status = SpircPlayStatus::LoadingPause { position_ms };
}
_ => (),
}
}
fn handle_seek(&mut self, position_ms: u32) {
self.update_state_position(position_ms);
self.player.seek(position_ms);
let now = self.now_ms();
match self.play_status {
SpircPlayStatus::Stopped => (),
SpircPlayStatus::LoadingPause {
position_ms: ref mut position,
}
| SpircPlayStatus::LoadingPlay {
position_ms: ref mut position,
}
| SpircPlayStatus::Paused {
position_ms: ref mut position,
} => *position = position_ms,
SpircPlayStatus::Playing {
ref mut nominal_start_time,
} => *nominal_start_time = now - position_ms as i64,
};
}
fn consume_queued_track(&mut self) -> usize {
// Removes current track if it is queued
// Returns the index of the next track
@ -687,6 +851,10 @@ impl SpircTask {
}
}
fn preview_next_track(&mut self) -> Option<SpotifyId> {
None
}
fn handle_next(&mut self) {
let mut new_index = self.consume_queued_track() as u32;
let mut continue_playing = true;
@ -720,17 +888,14 @@ impl SpircTask {
if tracks_len > 0 {
self.state.set_playing_track_index(new_index);
self.state.set_position_ms(0);
let now = self.now_ms();
self.state.set_position_measured_at(now as u64);
self.load_track(continue_playing);
self.load_track(continue_playing, 0);
} else {
info!("Not playing next track because there are no more tracks left in queue.");
self.state.set_playing_track_index(0);
self.state.set_status(PlayStatus::kPlayStatusStop);
self.player.stop();
self.mixer.stop();
self.ensure_mixer_stopped();
self.play_status = SpircPlayStatus::Stopped;
}
}
@ -765,17 +930,11 @@ impl SpircTask {
pos += 1;
}
let now = self.now_ms();
self.state.set_playing_track_index(new_index);
self.state.set_position_ms(0);
self.state.set_position_measured_at(now as u64);
self.load_track(true);
self.load_track(true, 0);
} else {
let now = self.now_ms();
self.state.set_position_ms(0);
self.state.set_position_measured_at(now as u64);
self.player.seek(0);
self.handle_seek(0);
}
}
@ -801,8 +960,15 @@ impl SpircTask {
}
fn position(&mut self) -> u32 {
let diff = self.now_ms() as u64 - self.state.get_position_measured_at();
self.state.get_position_ms() + diff as u32
match self.play_status {
SpircPlayStatus::Stopped => 0,
SpircPlayStatus::LoadingPlay { position_ms }
| SpircPlayStatus::LoadingPause { position_ms }
| SpircPlayStatus::Paused { position_ms } => position_ms,
SpircPlayStatus::Playing { nominal_start_time } => {
(self.now_ms() - nominal_start_time) as u32
}
}
}
fn resolve_station(
@ -912,7 +1078,7 @@ impl SpircTask {
})
}
fn load_track(&mut self, play: bool) {
fn load_track(&mut self, start_playing: bool, position_ms: u32) {
let context_uri = self.state.get_context_uri().to_owned();
let mut index = self.state.get_playing_track_index();
let start_index = index;
@ -949,16 +1115,15 @@ impl SpircTask {
}
.expect("Invalid SpotifyId");
let position = self.state.get_position_ms();
let end_of_track = self.player.load(track, play, position);
self.play_request_id = Some(self.player.load(track, start_playing, position_ms));
if play {
self.state.set_status(PlayStatus::kPlayStatusPlay);
self.update_state_position(position_ms);
self.state.set_status(PlayStatus::kPlayStatusLoading);
if start_playing {
self.play_status = SpircPlayStatus::LoadingPlay { position_ms };
} else {
self.state.set_status(PlayStatus::kPlayStatusPause);
self.play_status = SpircPlayStatus::LoadingPause { position_ms };
}
self.end_of_track = Box::new(end_of_track);
}
fn hello(&mut self) {
@ -966,6 +1131,13 @@ impl SpircTask {
}
fn notify(&mut self, recipient: Option<&str>) {
let status_string = match self.state.get_status() {
PlayStatus::kPlayStatusLoading => "kPlayStatusLoading",
PlayStatus::kPlayStatusPause => "kPlayStatusPause",
PlayStatus::kPlayStatusStop => "kPlayStatusStop",
PlayStatus::kPlayStatusPlay => "kPlayStatusPlay",
};
trace!("Sending status to server: [{}]", status_string);
let mut cs = CommandSender::new(self, MessageType::kMessageTypeNotify);
if let Some(s) = recipient {
cs = cs.recipient(&s);
@ -980,6 +1152,7 @@ impl SpircTask {
if let Some(cache) = self.session.cache() {
cache.save_volume(Volume { volume })
}
self.player.emit_volume_set_event(volume);
}
}

View file

@ -7,8 +7,53 @@ use librespot::core::session::Session;
use librespot::core::spotify_id::SpotifyId;
use librespot::playback::config::PlayerConfig;
use futures::stream::Stream;
use futures::{Async, Future, Poll};
use librespot::playback::audio_backend;
use librespot::playback::player::Player;
use librespot::playback::player::{Player, PlayerEvent, PlayerEventChannel};
pub struct SingleTrackPlayer {
play_request_id: u64,
event_channel: PlayerEventChannel,
}
impl SingleTrackPlayer {
pub fn new(ref mut player: Player, track_id: SpotifyId) -> SingleTrackPlayer {
let event_channel = player.get_player_event_channel();
let play_request_id = player.load(track_id, true, 0);
SingleTrackPlayer {
play_request_id,
event_channel,
}
}
}
impl Future for SingleTrackPlayer {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
match self.event_channel.poll().unwrap() {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(None) => return Ok(Async::Ready(())),
Async::Ready(Some(event)) => match event {
PlayerEvent::EndOfTrack {
play_request_id, ..
}
| PlayerEvent::Stopped {
play_request_id, ..
} => {
if play_request_id == self.play_request_id {
return Ok(Async::Ready(()));
}
}
_ => (),
},
}
}
}
}
fn main() {
let mut core = Core::new().unwrap();
@ -39,7 +84,7 @@ fn main() {
});
println!("Playing...");
core.run(player.load(track, true, 0)).unwrap();
core.run(SingleTrackPlayer::new(player, track)).unwrap();
println!("Done");
}

View file

@ -63,6 +63,7 @@ pub struct AudioItem {
pub uri: String,
pub files: LinearMap<FileFormat, FileId>,
pub name: String,
pub duration: i32,
pub available: bool,
pub alternatives: Option<Vec<SpotifyId>>,
}
@ -100,6 +101,7 @@ impl AudioFiles for Track {
uri: format!("spotify:track:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: Some(item.alternatives),
})
@ -118,6 +120,7 @@ impl AudioFiles for Episode {
uri: format!("spotify:episode:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: None,
})

File diff suppressed because it is too large Load diff

View file

@ -25,14 +25,15 @@ pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> io::Result<Ch
env_vars.insert("OLD_TRACK_ID", old_track_id.to_base62());
env_vars.insert("TRACK_ID", new_track_id.to_base62());
}
PlayerEvent::Started { track_id } => {
PlayerEvent::Started { track_id, .. } => {
env_vars.insert("PLAYER_EVENT", "start".to_string());
env_vars.insert("TRACK_ID", track_id.to_base62());
}
PlayerEvent::Stopped { track_id } => {
PlayerEvent::Stopped { track_id, .. } => {
env_vars.insert("PLAYER_EVENT", "stop".to_string());
env_vars.insert("TRACK_ID", track_id.to_base62());
}
_ => (),
}
run_program(onevent, env_vars)
}