Migrated playback crate to futures 0.3

This commit is contained in:
johannesd3 2021-01-21 22:22:32 +01:00
parent 90905b81bb
commit 0895f17f8a
2 changed files with 109 additions and 106 deletions

View file

@ -18,9 +18,9 @@ path = "../metadata"
version = "0.1.3" version = "0.1.3"
[dependencies] [dependencies]
futures = "0.1" futures = "0.3"
log = "0.4" log = "0.4"
byteorder = "1.3" byteorder = "1.4"
shell-words = "1.0.0" shell-words = "1.0.0"
alsa = { version = "0.2", optional = true } alsa = { version = "0.2", optional = true }

View file

@ -1,20 +1,3 @@
use byteorder::{LittleEndian, ReadBytesExt};
use futures;
use futures::{future, Async, Future, Poll, Stream};
use std;
use std::borrow::Cow;
use std::cmp::max;
use std::io::{Read, Result, Seek, SeekFrom};
use std::mem;
use std::thread;
use std::time::{Duration, Instant};
use crate::config::{Bitrate, PlayerConfig};
use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator;
use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController};
use crate::audio::{VorbisDecoder, VorbisPacket}; use crate::audio::{VorbisDecoder, VorbisPacket};
use crate::audio::{ use crate::audio::{
@ -22,13 +5,33 @@ use crate::audio::{
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
}; };
use crate::audio_backend::Sink; use crate::audio_backend::Sink;
use crate::config::{Bitrate, PlayerConfig};
use crate::metadata::{AudioItem, FileFormat}; use crate::metadata::{AudioItem, FileFormat};
use crate::mixer::AudioFilter; use crate::mixer::AudioFilter;
use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator;
use byteorder::{LittleEndian, ReadBytesExt};
use futures::{
channel::{mpsc, oneshot},
future, Future, Stream, StreamExt,
};
use std::io::{Read, Seek, SeekFrom};
use std::mem;
use std::thread;
use std::time::{Duration, Instant};
use std::{borrow::Cow, io};
use std::{
cmp::max,
pin::Pin,
task::{Context, Poll},
};
const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;
pub struct Player { pub struct Player {
commands: Option<futures::sync::mpsc::UnboundedSender<PlayerCommand>>, commands: Option<mpsc::UnboundedSender<PlayerCommand>>,
thread_handle: Option<thread::JoinHandle<()>>, thread_handle: Option<thread::JoinHandle<()>>,
play_request_id_generator: SeqGenerator<u64>, play_request_id_generator: SeqGenerator<u64>,
} }
@ -45,7 +48,7 @@ pub type SinkEventCallback = Box<dyn Fn(SinkStatus) + Send>;
struct PlayerInternal { struct PlayerInternal {
session: Session, session: Session,
config: PlayerConfig, config: PlayerConfig,
commands: futures::sync::mpsc::UnboundedReceiver<PlayerCommand>, commands: mpsc::UnboundedReceiver<PlayerCommand>,
state: PlayerState, state: PlayerState,
preload: PlayerPreload, preload: PlayerPreload,
@ -53,7 +56,7 @@ struct PlayerInternal {
sink_status: SinkStatus, sink_status: SinkStatus,
sink_event_callback: Option<SinkEventCallback>, sink_event_callback: Option<SinkEventCallback>,
audio_filter: Option<Box<dyn AudioFilter + Send>>, audio_filter: Option<Box<dyn AudioFilter + Send>>,
event_senders: Vec<futures::sync::mpsc::UnboundedSender<PlayerEvent>>, event_senders: Vec<mpsc::UnboundedSender<PlayerEvent>>,
} }
enum PlayerCommand { enum PlayerCommand {
@ -70,7 +73,7 @@ enum PlayerCommand {
Pause, Pause,
Stop, Stop,
Seek(u32), Seek(u32),
AddEventSender(futures::sync::mpsc::UnboundedSender<PlayerEvent>), AddEventSender(mpsc::UnboundedSender<PlayerEvent>),
SetSinkEventCallback(Option<SinkEventCallback>), SetSinkEventCallback(Option<SinkEventCallback>),
EmitVolumeSetEvent(u16), EmitVolumeSetEvent(u16),
} }
@ -182,7 +185,7 @@ impl PlayerEvent {
} }
} }
pub type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver<PlayerEvent>; pub type PlayerEventChannel = mpsc::UnboundedReceiver<PlayerEvent>;
#[derive(Clone, Copy, Debug)] #[derive(Clone, Copy, Debug)]
struct NormalisationData { struct NormalisationData {
@ -193,7 +196,7 @@ struct NormalisationData {
} }
impl NormalisationData { impl NormalisationData {
fn parse_from_file<T: Read + Seek>(mut file: T) -> Result<NormalisationData> { fn parse_from_file<T: Read + Seek>(mut file: T) -> io::Result<NormalisationData> {
const SPOTIFY_NORMALIZATION_HEADER_START_OFFSET: u64 = 144; const SPOTIFY_NORMALIZATION_HEADER_START_OFFSET: u64 = 144;
file.seek(SeekFrom::Start(SPOTIFY_NORMALIZATION_HEADER_START_OFFSET)) file.seek(SeekFrom::Start(SPOTIFY_NORMALIZATION_HEADER_START_OFFSET))
.unwrap(); .unwrap();
@ -241,8 +244,8 @@ impl Player {
where where
F: FnOnce() -> Box<dyn Sink> + Send + 'static, F: FnOnce() -> Box<dyn Sink> + Send + 'static,
{ {
let (cmd_tx, cmd_rx) = futures::sync::mpsc::unbounded(); let (cmd_tx, cmd_rx) = mpsc::unbounded();
let (event_sender, event_receiver) = futures::sync::mpsc::unbounded(); let (event_sender, event_receiver) = mpsc::unbounded();
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
debug!("new Player[{}]", session.session_id()); debug!("new Player[{}]", session.session_id());
@ -263,7 +266,7 @@ impl Player {
// While PlayerInternal is written as a future, it still contains blocking code. // While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread. // It must be run by using wait() in a dedicated thread.
let _ = internal.wait(); todo!("How to block in futures 0.3?");
debug!("PlayerInternal thread finished."); debug!("PlayerInternal thread finished.");
}); });
@ -314,22 +317,21 @@ impl Player {
} }
pub fn get_player_event_channel(&self) -> PlayerEventChannel { pub fn get_player_event_channel(&self) -> PlayerEventChannel {
let (event_sender, event_receiver) = futures::sync::mpsc::unbounded(); let (event_sender, event_receiver) = mpsc::unbounded();
self.command(PlayerCommand::AddEventSender(event_sender)); self.command(PlayerCommand::AddEventSender(event_sender));
event_receiver event_receiver
} }
pub fn get_end_of_track_future(&self) -> Box<dyn Future<Item = (), Error = ()>> { pub async fn get_end_of_track_future(&self) {
let result = self self.get_player_event_channel()
.get_player_event_channel() .filter(|event| {
.filter(|event| match event { future::ready(matches!(
PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } => true, event,
_ => false, PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. }
))
}) })
.into_future() .for_each(|_| future::ready(()))
.map_err(|_| ()) .await
.map(|_| ());
Box::new(result)
} }
pub fn set_sink_event_callback(&self, callback: Option<SinkEventCallback>) { pub fn set_sink_event_callback(&self, callback: Option<SinkEventCallback>) {
@ -367,7 +369,7 @@ enum PlayerPreload {
None, None,
Loading { Loading {
track_id: SpotifyId, track_id: SpotifyId,
loader: Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>>, loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>>>>,
}, },
Ready { Ready {
track_id: SpotifyId, track_id: SpotifyId,
@ -383,7 +385,7 @@ enum PlayerState {
track_id: SpotifyId, track_id: SpotifyId,
play_request_id: u64, play_request_id: u64,
start_playback: bool, start_playback: bool,
loader: Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>>, loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>>>>,
}, },
Paused { Paused {
track_id: SpotifyId, track_id: SpotifyId,
@ -573,22 +575,23 @@ struct PlayerTrackLoader {
} }
impl PlayerTrackLoader { impl PlayerTrackLoader {
fn find_available_alternative<'a>(&self, audio: &'a AudioItem) -> Option<Cow<'a, AudioItem>> { async fn find_available_alternative<'a>(
&self,
audio: &'a AudioItem,
) -> Option<Cow<'a, AudioItem>> {
if audio.available { if audio.available {
Some(Cow::Borrowed(audio)) Some(Cow::Borrowed(audio))
} else if let Some(alternatives) = &audio.alternatives {
let alternatives = alternatives
.iter()
.map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id));
let alternatives = future::try_join_all(alternatives).await.unwrap();
alternatives
.into_iter()
.find(|alt| alt.available)
.map(Cow::Owned)
} else { } else {
if let Some(alternatives) = &audio.alternatives { None
let alternatives = alternatives
.iter()
.map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id));
let alternatives = future::join_all(alternatives).wait().unwrap();
alternatives
.into_iter()
.find(|alt| alt.available)
.map(Cow::Owned)
} else {
None
}
} }
} }
@ -611,8 +614,12 @@ impl PlayerTrackLoader {
} }
} }
fn load_track(&self, spotify_id: SpotifyId, position_ms: u32) -> Option<PlayerLoadedTrackData> { async fn load_track(
let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { &self,
spotify_id: SpotifyId,
position_ms: u32,
) -> Option<PlayerLoadedTrackData> {
let audio = match AudioItem::get_audio_item(&self.session, spotify_id).await {
Ok(audio) => audio, Ok(audio) => audio,
Err(_) => { Err(_) => {
error!("Unable to load audio item."); error!("Unable to load audio item.");
@ -622,7 +629,7 @@ impl PlayerTrackLoader {
info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri); info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri);
let audio = match self.find_available_alternative(&audio) { let audio = match self.find_available_alternative(&audio).await {
Some(audio) => audio, Some(audio) => audio,
None => { None => {
warn!("<{}> is not available", audio.uri); warn!("<{}> is not available", audio.uri);
@ -675,7 +682,7 @@ impl PlayerTrackLoader {
play_from_beginning, play_from_beginning,
); );
let encrypted_file = match encrypted_file.wait() { let encrypted_file = match encrypted_file.await {
Ok(encrypted_file) => encrypted_file, Ok(encrypted_file) => encrypted_file,
Err(_) => { Err(_) => {
error!("Unable to load encrypted file."); error!("Unable to load encrypted file.");
@ -693,7 +700,7 @@ impl PlayerTrackLoader {
stream_loader_controller.set_random_access_mode(); stream_loader_controller.set_random_access_mode();
} }
let key = match key.wait() { let key = match key.await {
Ok(key) => key, Ok(key) => key,
Err(_) => { Err(_) => {
error!("Unable to load decryption key"); error!("Unable to load decryption key");
@ -738,10 +745,9 @@ impl PlayerTrackLoader {
} }
impl Future for PlayerInternal { impl Future for PlayerInternal {
type Item = (); type Output = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// While this is written as a future, it still contains blocking code. // While this is written as a future, it still contains blocking code.
// It must be run on its own thread. // It must be run on its own thread.
@ -749,14 +755,13 @@ impl Future for PlayerInternal {
let mut all_futures_completed_or_not_ready = true; let mut all_futures_completed_or_not_ready = true;
// process commands that were sent to us // process commands that were sent to us
let cmd = match self.commands.poll() { let cmd = match Pin::new(&mut self.commands).poll_next(cx) {
Ok(Async::Ready(None)) => return Ok(Async::Ready(())), // client has disconnected - shut down. Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down.
Ok(Async::Ready(Some(cmd))) => { Poll::Ready(Some(cmd)) => {
all_futures_completed_or_not_ready = false; all_futures_completed_or_not_ready = false;
Some(cmd) Some(cmd)
} }
Ok(Async::NotReady) => None, _ => None,
Err(_) => None,
}; };
if let Some(cmd) = cmd { if let Some(cmd) = cmd {
@ -771,8 +776,8 @@ impl Future for PlayerInternal {
play_request_id, play_request_id,
} = self.state } = self.state
{ {
match loader.poll() { match loader.as_mut().poll(cx) {
Ok(Async::Ready(loaded_track)) => { Poll::Ready(Ok(loaded_track)) => {
self.start_playback( self.start_playback(
track_id, track_id,
play_request_id, play_request_id,
@ -783,8 +788,7 @@ impl Future for PlayerInternal {
panic!("The state wasn't changed by start_playback()"); panic!("The state wasn't changed by start_playback()");
} }
} }
Ok(Async::NotReady) => (), Poll::Ready(Err(_)) => {
Err(_) => {
warn!("Unable to load <{:?}>\nSkipping to next track", track_id); warn!("Unable to load <{:?}>\nSkipping to next track", track_id);
assert!(self.state.is_loading()); assert!(self.state.is_loading());
self.send_event(PlayerEvent::EndOfTrack { self.send_event(PlayerEvent::EndOfTrack {
@ -792,6 +796,7 @@ impl Future for PlayerInternal {
play_request_id, play_request_id,
}) })
} }
Poll::Pending => (),
} }
} }
@ -801,16 +806,15 @@ impl Future for PlayerInternal {
track_id, track_id,
} = self.preload } = self.preload
{ {
match loader.poll() { match loader.as_mut().poll(cx) {
Ok(Async::Ready(loaded_track)) => { Poll::Ready(Ok(loaded_track)) => {
self.send_event(PlayerEvent::Preloading { track_id }); self.send_event(PlayerEvent::Preloading { track_id });
self.preload = PlayerPreload::Ready { self.preload = PlayerPreload::Ready {
track_id, track_id,
loaded_track, loaded_track,
}; };
} }
Ok(Async::NotReady) => (), Poll::Ready(Err(_)) => {
Err(_) => {
debug!("Unable to preload {:?}", track_id); debug!("Unable to preload {:?}", track_id);
self.preload = PlayerPreload::None; self.preload = PlayerPreload::None;
// Let Spirc know that the track was unavailable. // Let Spirc know that the track was unavailable.
@ -827,6 +831,7 @@ impl Future for PlayerInternal {
}); });
} }
} }
Poll::Pending => (),
} }
} }
@ -847,8 +852,7 @@ impl Future for PlayerInternal {
let packet = decoder.next_packet().expect("Vorbis error"); let packet = decoder.next_packet().expect("Vorbis error");
if let Some(ref packet) = packet { if let Some(ref packet) = packet {
*stream_position_pcm = *stream_position_pcm += (packet.data().len() / 2) as u64;
*stream_position_pcm + (packet.data().len() / 2) as u64;
let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm); let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm);
let notify_about_position = match *reported_nominal_start_time { let notify_about_position = match *reported_nominal_start_time {
@ -858,11 +862,7 @@ impl Future for PlayerInternal {
let lag = (Instant::now() - reported_nominal_start_time).as_millis() let lag = (Instant::now() - reported_nominal_start_time).as_millis()
as i64 as i64
- stream_position_millis as i64; - stream_position_millis as i64;
if lag > 1000 { lag > 1000
true
} else {
false
}
} }
}; };
if notify_about_position { if notify_about_position {
@ -918,11 +918,11 @@ impl Future for PlayerInternal {
} }
if self.session.is_invalid() { if self.session.is_invalid() {
return Ok(Async::Ready(())); return Poll::Ready(());
} }
if (!self.state.is_playing()) && all_futures_completed_or_not_ready { if (!self.state.is_playing()) && all_futures_completed_or_not_ready {
return Ok(Async::NotReady); return Poll::Pending;
} }
} }
} }
@ -1066,7 +1066,9 @@ impl PlayerInternal {
editor.modify_stream(&mut packet.data_mut()) editor.modify_stream(&mut packet.data_mut())
}; };
if self.config.normalisation && normalisation_factor != 1.0 { if self.config.normalisation
&& (normalisation_factor - 1.0).abs() < f32::EPSILON
{
for x in packet.data_mut().iter_mut() { for x in packet.data_mut().iter_mut() {
*x = (*x as f32 * normalisation_factor) as i16; *x = (*x as f32 * normalisation_factor) as i16;
} }
@ -1363,9 +1365,7 @@ impl PlayerInternal {
self.preload = PlayerPreload::None; self.preload = PlayerPreload::None;
// If we don't have a loader yet, create one from scratch. // If we don't have a loader yet, create one from scratch.
let loader = loader let loader = loader.unwrap_or_else(|| Box::pin(self.load_track(track_id, position_ms)));
.or_else(|| Some(self.load_track(track_id, position_ms)))
.unwrap();
// Set ourselves to a loading state. // Set ourselves to a loading state.
self.state = PlayerState::Loading { self.state = PlayerState::Loading {
@ -1420,7 +1420,10 @@ impl PlayerInternal {
// schedule the preload of the current track if desired. // schedule the preload of the current track if desired.
if preload_track { if preload_track {
let loader = self.load_track(track_id, 0); let loader = self.load_track(track_id, 0);
self.preload = PlayerPreload::Loading { track_id, loader } self.preload = PlayerPreload::Loading {
track_id,
loader: Box::pin(loader),
}
} }
} }
@ -1532,34 +1535,34 @@ impl PlayerInternal {
} }
} }
fn load_track( pub fn load_track(
&self, &self,
spotify_id: SpotifyId, spotify_id: SpotifyId,
position_ms: u32, position_ms: u32,
) -> Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>> { ) -> impl Future<Output = Result<PlayerLoadedTrackData, ()>> + 'static {
// This method creates a future that returns the loaded stream and associated info. // This method creates a future that returns the loaded stream and associated info.
// Ideally all work should be done using asynchronous code. However, seek() on the // Ideally all work should be done using asynchronous code. However, seek() on the
// audio stream is implemented in a blocking fashion. Thus, we can't turn it into future // audio stream is implemented in a blocking fashion. Thus, we can't turn it into future
// easily. Instead we spawn a thread to do the work and return a one-shot channel as the // easily. Instead we spawn a thread to do the work and return a one-shot channel as the
// future to work with. // future to work with.
let loader = PlayerTrackLoader { let session = self.session.clone();
session: self.session.clone(), let config = self.config.clone();
config: self.config.clone(),
};
let (result_tx, result_rx) = futures::sync::oneshot::channel(); async move {
let loader = PlayerTrackLoader { session, config };
std::thread::spawn(move || { let (result_tx, result_rx) = oneshot::channel();
loader
.load_track(spotify_id, position_ms) std::thread::spawn(move || {
.and_then(move |data| { todo!("How to block in futures 0.3?")
/*if let Some(data) = block_on(loader.load_track(spotify_id, position_ms)) {
let _ = result_tx.send(data); let _ = result_tx.send(data);
Some(()) }*/
}); });
});
Box::new(result_rx.map_err(|_| ())) result_rx.await.map_err(|_| ())
}
} }
fn preload_data_before_playback(&mut self) { fn preload_data_before_playback(&mut self) {
@ -1689,13 +1692,13 @@ impl<T: Read + Seek> Subfile<T> {
} }
impl<T: Read + Seek> Read for Subfile<T> { impl<T: Read + Seek> Read for Subfile<T> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.stream.read(buf) self.stream.read(buf)
} }
} }
impl<T: Read + Seek> Seek for Subfile<T> { impl<T: Read + Seek> Seek for Subfile<T> {
fn seek(&mut self, mut pos: SeekFrom) -> Result<u64> { fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
pos = match pos { pos = match pos {
SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset), SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset),
x => x, x => x,