diff --git a/playback/Cargo.toml b/playback/Cargo.toml index edd0951f..10451851 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -18,9 +18,9 @@ path = "../metadata" version = "0.1.3" [dependencies] -futures = "0.1" +futures = "0.3" log = "0.4" -byteorder = "1.3" +byteorder = "1.4" shell-words = "1.0.0" alsa = { version = "0.2", optional = true } diff --git a/playback/src/player.rs b/playback/src/player.rs index 125184a0..df442f0a 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -1,20 +1,3 @@ -use byteorder::{LittleEndian, ReadBytesExt}; -use futures; -use futures::{future, Async, Future, Poll, Stream}; -use std; -use std::borrow::Cow; -use std::cmp::max; -use std::io::{Read, Result, Seek, SeekFrom}; -use std::mem; -use std::thread; -use std::time::{Duration, Instant}; - -use crate::config::{Bitrate, PlayerConfig}; -use librespot_core::session::Session; -use librespot_core::spotify_id::SpotifyId; - -use librespot_core::util::SeqGenerator; - use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use crate::audio::{VorbisDecoder, VorbisPacket}; use crate::audio::{ @@ -22,13 +5,33 @@ use crate::audio::{ READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, }; use crate::audio_backend::Sink; +use crate::config::{Bitrate, PlayerConfig}; use crate::metadata::{AudioItem, FileFormat}; use crate::mixer::AudioFilter; +use librespot_core::session::Session; +use librespot_core::spotify_id::SpotifyId; +use librespot_core::util::SeqGenerator; + +use byteorder::{LittleEndian, ReadBytesExt}; +use futures::{ + channel::{mpsc, oneshot}, + future, Future, Stream, StreamExt, +}; +use std::io::{Read, Seek, SeekFrom}; +use std::mem; +use std::thread; +use std::time::{Duration, Instant}; +use std::{borrow::Cow, io}; +use std::{ + cmp::max, + pin::Pin, + task::{Context, Poll}, +}; const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; pub struct Player { - commands: Option>, + commands: Option>, thread_handle: Option>, play_request_id_generator: SeqGenerator, } @@ -45,7 +48,7 @@ pub type SinkEventCallback = Box; struct PlayerInternal { session: Session, config: PlayerConfig, - commands: futures::sync::mpsc::UnboundedReceiver, + commands: mpsc::UnboundedReceiver, state: PlayerState, preload: PlayerPreload, @@ -53,7 +56,7 @@ struct PlayerInternal { sink_status: SinkStatus, sink_event_callback: Option, audio_filter: Option>, - event_senders: Vec>, + event_senders: Vec>, } enum PlayerCommand { @@ -70,7 +73,7 @@ enum PlayerCommand { Pause, Stop, Seek(u32), - AddEventSender(futures::sync::mpsc::UnboundedSender), + AddEventSender(mpsc::UnboundedSender), SetSinkEventCallback(Option), EmitVolumeSetEvent(u16), } @@ -182,7 +185,7 @@ impl PlayerEvent { } } -pub type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver; +pub type PlayerEventChannel = mpsc::UnboundedReceiver; #[derive(Clone, Copy, Debug)] struct NormalisationData { @@ -193,7 +196,7 @@ struct NormalisationData { } impl NormalisationData { - fn parse_from_file(mut file: T) -> Result { + fn parse_from_file(mut file: T) -> io::Result { const SPOTIFY_NORMALIZATION_HEADER_START_OFFSET: u64 = 144; file.seek(SeekFrom::Start(SPOTIFY_NORMALIZATION_HEADER_START_OFFSET)) .unwrap(); @@ -241,8 +244,8 @@ impl Player { where F: FnOnce() -> Box + Send + 'static, { - let (cmd_tx, cmd_rx) = futures::sync::mpsc::unbounded(); - let (event_sender, event_receiver) = futures::sync::mpsc::unbounded(); + let (cmd_tx, cmd_rx) = mpsc::unbounded(); + let (event_sender, event_receiver) = mpsc::unbounded(); let handle = thread::spawn(move || { debug!("new Player[{}]", session.session_id()); @@ -263,7 +266,7 @@ impl Player { // While PlayerInternal is written as a future, it still contains blocking code. // It must be run by using wait() in a dedicated thread. - let _ = internal.wait(); + todo!("How to block in futures 0.3?"); debug!("PlayerInternal thread finished."); }); @@ -314,22 +317,21 @@ impl Player { } pub fn get_player_event_channel(&self) -> PlayerEventChannel { - let (event_sender, event_receiver) = futures::sync::mpsc::unbounded(); + let (event_sender, event_receiver) = mpsc::unbounded(); self.command(PlayerCommand::AddEventSender(event_sender)); event_receiver } - pub fn get_end_of_track_future(&self) -> Box> { - let result = self - .get_player_event_channel() - .filter(|event| match event { - PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } => true, - _ => false, + pub async fn get_end_of_track_future(&self) { + self.get_player_event_channel() + .filter(|event| { + future::ready(matches!( + event, + PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } + )) }) - .into_future() - .map_err(|_| ()) - .map(|_| ()); - Box::new(result) + .for_each(|_| future::ready(())) + .await } pub fn set_sink_event_callback(&self, callback: Option) { @@ -367,7 +369,7 @@ enum PlayerPreload { None, Loading { track_id: SpotifyId, - loader: Box>, + loader: Pin>>>, }, Ready { track_id: SpotifyId, @@ -383,7 +385,7 @@ enum PlayerState { track_id: SpotifyId, play_request_id: u64, start_playback: bool, - loader: Box>, + loader: Pin>>>, }, Paused { track_id: SpotifyId, @@ -573,22 +575,23 @@ struct PlayerTrackLoader { } impl PlayerTrackLoader { - fn find_available_alternative<'a>(&self, audio: &'a AudioItem) -> Option> { + async fn find_available_alternative<'a>( + &self, + audio: &'a AudioItem, + ) -> Option> { if audio.available { Some(Cow::Borrowed(audio)) + } else if let Some(alternatives) = &audio.alternatives { + let alternatives = alternatives + .iter() + .map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id)); + let alternatives = future::try_join_all(alternatives).await.unwrap(); + alternatives + .into_iter() + .find(|alt| alt.available) + .map(Cow::Owned) } else { - if let Some(alternatives) = &audio.alternatives { - let alternatives = alternatives - .iter() - .map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id)); - let alternatives = future::join_all(alternatives).wait().unwrap(); - alternatives - .into_iter() - .find(|alt| alt.available) - .map(Cow::Owned) - } else { - None - } + None } } @@ -611,8 +614,12 @@ impl PlayerTrackLoader { } } - fn load_track(&self, spotify_id: SpotifyId, position_ms: u32) -> Option { - let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() { + async fn load_track( + &self, + spotify_id: SpotifyId, + position_ms: u32, + ) -> Option { + let audio = match AudioItem::get_audio_item(&self.session, spotify_id).await { Ok(audio) => audio, Err(_) => { error!("Unable to load audio item."); @@ -622,7 +629,7 @@ impl PlayerTrackLoader { info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri); - let audio = match self.find_available_alternative(&audio) { + let audio = match self.find_available_alternative(&audio).await { Some(audio) => audio, None => { warn!("<{}> is not available", audio.uri); @@ -675,7 +682,7 @@ impl PlayerTrackLoader { play_from_beginning, ); - let encrypted_file = match encrypted_file.wait() { + let encrypted_file = match encrypted_file.await { Ok(encrypted_file) => encrypted_file, Err(_) => { error!("Unable to load encrypted file."); @@ -693,7 +700,7 @@ impl PlayerTrackLoader { stream_loader_controller.set_random_access_mode(); } - let key = match key.wait() { + let key = match key.await { Ok(key) => key, Err(_) => { error!("Unable to load decryption key"); @@ -738,10 +745,9 @@ impl PlayerTrackLoader { } impl Future for PlayerInternal { - type Item = (); - type Error = (); + type Output = (); - fn poll(&mut self) -> Poll<(), ()> { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { // While this is written as a future, it still contains blocking code. // It must be run on its own thread. @@ -749,14 +755,13 @@ impl Future for PlayerInternal { let mut all_futures_completed_or_not_ready = true; // process commands that were sent to us - let cmd = match self.commands.poll() { - Ok(Async::Ready(None)) => return Ok(Async::Ready(())), // client has disconnected - shut down. - Ok(Async::Ready(Some(cmd))) => { + let cmd = match Pin::new(&mut self.commands).poll_next(cx) { + Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down. + Poll::Ready(Some(cmd)) => { all_futures_completed_or_not_ready = false; Some(cmd) } - Ok(Async::NotReady) => None, - Err(_) => None, + _ => None, }; if let Some(cmd) = cmd { @@ -771,8 +776,8 @@ impl Future for PlayerInternal { play_request_id, } = self.state { - match loader.poll() { - Ok(Async::Ready(loaded_track)) => { + match loader.as_mut().poll(cx) { + Poll::Ready(Ok(loaded_track)) => { self.start_playback( track_id, play_request_id, @@ -783,8 +788,7 @@ impl Future for PlayerInternal { panic!("The state wasn't changed by start_playback()"); } } - Ok(Async::NotReady) => (), - Err(_) => { + Poll::Ready(Err(_)) => { warn!("Unable to load <{:?}>\nSkipping to next track", track_id); assert!(self.state.is_loading()); self.send_event(PlayerEvent::EndOfTrack { @@ -792,6 +796,7 @@ impl Future for PlayerInternal { play_request_id, }) } + Poll::Pending => (), } } @@ -801,16 +806,15 @@ impl Future for PlayerInternal { track_id, } = self.preload { - match loader.poll() { - Ok(Async::Ready(loaded_track)) => { + match loader.as_mut().poll(cx) { + Poll::Ready(Ok(loaded_track)) => { self.send_event(PlayerEvent::Preloading { track_id }); self.preload = PlayerPreload::Ready { track_id, loaded_track, }; } - Ok(Async::NotReady) => (), - Err(_) => { + Poll::Ready(Err(_)) => { debug!("Unable to preload {:?}", track_id); self.preload = PlayerPreload::None; // Let Spirc know that the track was unavailable. @@ -827,6 +831,7 @@ impl Future for PlayerInternal { }); } } + Poll::Pending => (), } } @@ -847,8 +852,7 @@ impl Future for PlayerInternal { let packet = decoder.next_packet().expect("Vorbis error"); if let Some(ref packet) = packet { - *stream_position_pcm = - *stream_position_pcm + (packet.data().len() / 2) as u64; + *stream_position_pcm += (packet.data().len() / 2) as u64; let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm); let notify_about_position = match *reported_nominal_start_time { @@ -858,11 +862,7 @@ impl Future for PlayerInternal { let lag = (Instant::now() - reported_nominal_start_time).as_millis() as i64 - stream_position_millis as i64; - if lag > 1000 { - true - } else { - false - } + lag > 1000 } }; if notify_about_position { @@ -918,11 +918,11 @@ impl Future for PlayerInternal { } if self.session.is_invalid() { - return Ok(Async::Ready(())); + return Poll::Ready(()); } if (!self.state.is_playing()) && all_futures_completed_or_not_ready { - return Ok(Async::NotReady); + return Poll::Pending; } } } @@ -1066,7 +1066,9 @@ impl PlayerInternal { editor.modify_stream(&mut packet.data_mut()) }; - if self.config.normalisation && normalisation_factor != 1.0 { + if self.config.normalisation + && (normalisation_factor - 1.0).abs() < f32::EPSILON + { for x in packet.data_mut().iter_mut() { *x = (*x as f32 * normalisation_factor) as i16; } @@ -1363,9 +1365,7 @@ impl PlayerInternal { self.preload = PlayerPreload::None; // If we don't have a loader yet, create one from scratch. - let loader = loader - .or_else(|| Some(self.load_track(track_id, position_ms))) - .unwrap(); + let loader = loader.unwrap_or_else(|| Box::pin(self.load_track(track_id, position_ms))); // Set ourselves to a loading state. self.state = PlayerState::Loading { @@ -1420,7 +1420,10 @@ impl PlayerInternal { // schedule the preload of the current track if desired. if preload_track { let loader = self.load_track(track_id, 0); - self.preload = PlayerPreload::Loading { track_id, loader } + self.preload = PlayerPreload::Loading { + track_id, + loader: Box::pin(loader), + } } } @@ -1532,34 +1535,34 @@ impl PlayerInternal { } } - fn load_track( + pub fn load_track( &self, spotify_id: SpotifyId, position_ms: u32, - ) -> Box> { + ) -> impl Future> + 'static { // This method creates a future that returns the loaded stream and associated info. // Ideally all work should be done using asynchronous code. However, seek() on the // audio stream is implemented in a blocking fashion. Thus, we can't turn it into future // easily. Instead we spawn a thread to do the work and return a one-shot channel as the // future to work with. - let loader = PlayerTrackLoader { - session: self.session.clone(), - config: self.config.clone(), - }; + let session = self.session.clone(); + let config = self.config.clone(); - let (result_tx, result_rx) = futures::sync::oneshot::channel(); + async move { + let loader = PlayerTrackLoader { session, config }; - std::thread::spawn(move || { - loader - .load_track(spotify_id, position_ms) - .and_then(move |data| { + let (result_tx, result_rx) = oneshot::channel(); + + std::thread::spawn(move || { + todo!("How to block in futures 0.3?") + /*if let Some(data) = block_on(loader.load_track(spotify_id, position_ms)) { let _ = result_tx.send(data); - Some(()) - }); - }); + }*/ + }); - Box::new(result_rx.map_err(|_| ())) + result_rx.await.map_err(|_| ()) + } } fn preload_data_before_playback(&mut self) { @@ -1689,13 +1692,13 @@ impl Subfile { } impl Read for Subfile { - fn read(&mut self, buf: &mut [u8]) -> Result { + fn read(&mut self, buf: &mut [u8]) -> io::Result { self.stream.read(buf) } } impl Seek for Subfile { - fn seek(&mut self, mut pos: SeekFrom) -> Result { + fn seek(&mut self, mut pos: SeekFrom) -> io::Result { pos = match pos { SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset), x => x,