From fe371868046222690f6104ba2742ce73589c2e25 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Fri, 22 Jan 2021 22:51:41 +0100 Subject: [PATCH] Make librespot_playback work --- playback/src/audio_backend/mod.rs | 8 +- playback/src/audio_backend/pipe.rs | 2 +- playback/src/player.rs | 118 ++++++++++++++--------------- src/lib.rs | 2 +- 4 files changed, 64 insertions(+), 66 deletions(-) diff --git a/playback/src/audio_backend/mod.rs b/playback/src/audio_backend/mod.rs index a9840d42..21ee3c05 100644 --- a/playback/src/audio_backend/mod.rs +++ b/playback/src/audio_backend/mod.rs @@ -10,7 +10,9 @@ pub trait Sink { fn write(&mut self, data: &[i16]) -> io::Result<()>; } -fn mk_sink(device: Option) -> Box { +pub type SinkBuilder = fn(Option) -> Box; + +fn mk_sink(device: Option) -> Box { Box::new(S::open(device)) } @@ -54,7 +56,7 @@ use self::pipe::StdoutSink; mod subprocess; use self::subprocess::SubprocessSink; -pub const BACKENDS: &'static [(&'static str, fn(Option) -> Box)] = &[ +pub const BACKENDS: &'static [(&'static str, SinkBuilder)] = &[ #[cfg(feature = "alsa-backend")] ("alsa", mk_sink::), #[cfg(feature = "portaudio-backend")] @@ -73,7 +75,7 @@ pub const BACKENDS: &'static [(&'static str, fn(Option) -> Box ("subprocess", mk_sink::), ]; -pub fn find(name: Option) -> Option) -> Box> { +pub fn find(name: Option) -> Option { if let Some(name) = name { BACKENDS .iter() diff --git a/playback/src/audio_backend/pipe.rs b/playback/src/audio_backend/pipe.rs index 2adafe11..02b8faf5 100644 --- a/playback/src/audio_backend/pipe.rs +++ b/playback/src/audio_backend/pipe.rs @@ -4,7 +4,7 @@ use std::io::{self, Write}; use std::mem; use std::slice; -pub struct StdoutSink(Box); +pub struct StdoutSink(Box); impl Open for StdoutSink { fn open(path: Option) -> StdoutSink { diff --git a/playback/src/player.rs b/playback/src/player.rs index df442f0a..ff0fba24 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -6,6 +6,7 @@ use crate::audio::{ }; use crate::audio_backend::Sink; use crate::config::{Bitrate, PlayerConfig}; +use crate::librespot_core::tokio; use crate::metadata::{AudioItem, FileFormat}; use crate::mixer::AudioFilter; use librespot_core::session::Session; @@ -19,7 +20,6 @@ use futures::{ }; use std::io::{Read, Seek, SeekFrom}; use std::mem; -use std::thread; use std::time::{Duration, Instant}; use std::{borrow::Cow, io}; use std::{ @@ -32,7 +32,7 @@ const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; pub struct Player { commands: Option>, - thread_handle: Option>, + task_handle: Option>, play_request_id_generator: SeqGenerator, } @@ -52,7 +52,7 @@ struct PlayerInternal { state: PlayerState, preload: PlayerPreload, - sink: Box, + sink: Box, sink_status: SinkStatus, sink_event_callback: Option, audio_filter: Option>, @@ -242,38 +242,38 @@ impl Player { sink_builder: F, ) -> (Player, PlayerEventChannel) where - F: FnOnce() -> Box + Send + 'static, + F: FnOnce() -> Box + Send + 'static, { let (cmd_tx, cmd_rx) = mpsc::unbounded(); let (event_sender, event_receiver) = mpsc::unbounded(); - let handle = thread::spawn(move || { - debug!("new Player[{}]", session.session_id()); + debug!("new Player[{}]", session.session_id()); - let internal = PlayerInternal { - session: session, - config: config, - commands: cmd_rx, + let internal = PlayerInternal { + session: session, + config: config, + commands: cmd_rx, - state: PlayerState::Stopped, - preload: PlayerPreload::None, - sink: sink_builder(), - sink_status: SinkStatus::Closed, - sink_event_callback: None, - audio_filter: audio_filter, - event_senders: [event_sender].to_vec(), - }; + state: PlayerState::Stopped, + preload: PlayerPreload::None, + sink: sink_builder(), + sink_status: SinkStatus::Closed, + sink_event_callback: None, + audio_filter: audio_filter, + event_senders: [event_sender].to_vec(), + }; - // While PlayerInternal is written as a future, it still contains blocking code. - // It must be run by using wait() in a dedicated thread. - todo!("How to block in futures 0.3?"); + // While PlayerInternal is written as a future, it still contains blocking code. + // It must be run by using wait() in a dedicated thread. + let handle = tokio::spawn(async move { + internal.await; debug!("PlayerInternal thread finished."); }); ( Player { commands: Some(cmd_tx), - thread_handle: Some(handle), + task_handle: Some(handle), play_request_id_generator: SeqGenerator::new(0), }, event_receiver, @@ -347,11 +347,13 @@ impl Drop for Player { fn drop(&mut self) { debug!("Shutting down player thread ..."); self.commands = None; - if let Some(handle) = self.thread_handle.take() { - match handle.join() { - Ok(_) => (), - Err(_) => error!("Player thread panicked!"), - } + if let Some(handle) = self.task_handle.take() { + tokio::spawn(async { + match handle.await { + Ok(_) => (), + Err(_) => error!("Player thread panicked!"), + } + }); } } } @@ -369,11 +371,11 @@ enum PlayerPreload { None, Loading { track_id: SpotifyId, - loader: Pin>>>, + loader: Pin> + Send>>, }, Ready { track_id: SpotifyId, - loaded_track: PlayerLoadedTrackData, + loaded_track: Box, }, } @@ -385,7 +387,7 @@ enum PlayerState { track_id: SpotifyId, play_request_id: u64, start_playback: bool, - loader: Pin>>>, + loader: Pin> + Send>>, }, Paused { track_id: SpotifyId, @@ -430,23 +432,15 @@ impl PlayerState { #[allow(dead_code)] fn is_stopped(&self) -> bool { - use self::PlayerState::*; - match *self { - Stopped => true, - _ => false, - } + matches!(self, Self::Stopped) } fn is_loading(&self) -> bool { - use self::PlayerState::*; - match *self { - Loading { .. } => true, - _ => false, - } + matches!(self, Self::Loading { .. }) } fn decoder(&mut self) -> Option<&mut Decoder> { - use self::PlayerState::*; + use PlayerState::*; match *self { Stopped | EndOfTrack { .. } | Loading { .. } => None, Paused { @@ -575,10 +569,10 @@ struct PlayerTrackLoader { } impl PlayerTrackLoader { - async fn find_available_alternative<'a>( - &self, - audio: &'a AudioItem, - ) -> Option> { + async fn find_available_alternative<'a, 'b>( + &'a self, + audio: &'b AudioItem, + ) -> Option> { if audio.available { Some(Cow::Borrowed(audio)) } else if let Some(alternatives) = &audio.alternatives { @@ -716,7 +710,7 @@ impl PlayerTrackLoader { } Err(_) => { warn!("Unable to extract normalisation data, using default value."); - 1.0 as f32 + 1.0_f32 } }; @@ -811,7 +805,7 @@ impl Future for PlayerInternal { self.send_event(PlayerEvent::Preloading { track_id }); self.preload = PlayerPreload::Ready { track_id, - loaded_track, + loaded_track: Box::new(loaded_track), }; } Poll::Ready(Err(_)) => { @@ -1061,7 +1055,7 @@ impl PlayerInternal { fn handle_packet(&mut self, packet: Option, normalisation_factor: f32) { match packet { Some(mut packet) => { - if packet.data().len() > 0 { + if !packet.data().is_empty() { if let Some(ref editor) = self.audio_filter { editor.modify_stream(&mut packet.data_mut()) }; @@ -1216,10 +1210,9 @@ impl PlayerInternal { loaded_track .stream_loader_controller .set_random_access_mode(); - let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking. - // But most likely the track is fully - // loaded already because we played - // to the end of it. + let _ = tokio::task::block_in_place(|| { + loaded_track.decoder.seek(position_ms as i64) + }); loaded_track.stream_loader_controller.set_stream_mode(); loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms); } @@ -1252,7 +1245,7 @@ impl PlayerInternal { // we can use the current decoder. Ensure it's at the correct position. if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm { stream_loader_controller.set_random_access_mode(); - let _ = decoder.seek(position_ms as i64); // This may be blocking. + let _ = tokio::task::block_in_place(|| decoder.seek(position_ms as i64)); stream_loader_controller.set_stream_mode(); *stream_position_pcm = Self::position_ms_to_pcm(position_ms); } @@ -1320,10 +1313,12 @@ impl PlayerInternal { loaded_track .stream_loader_controller .set_random_access_mode(); - let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking + let _ = tokio::task::block_in_place(|| { + loaded_track.decoder.seek(position_ms as i64) + }); loaded_track.stream_loader_controller.set_stream_mode(); } - self.start_playback(track_id, play_request_id, loaded_track, play); + self.start_playback(track_id, play_request_id, *loaded_track, play); return; } else { unreachable!(); @@ -1539,7 +1534,7 @@ impl PlayerInternal { &self, spotify_id: SpotifyId, position_ms: u32, - ) -> impl Future> + 'static { + ) -> impl Future> + Send + 'static { // This method creates a future that returns the loaded stream and associated info. // Ideally all work should be done using asynchronous code. However, seek() on the // audio stream is implemented in a blocking fashion. Thus, we can't turn it into future @@ -1554,11 +1549,10 @@ impl PlayerInternal { let (result_tx, result_rx) = oneshot::channel(); - std::thread::spawn(move || { - todo!("How to block in futures 0.3?") - /*if let Some(data) = block_on(loader.load_track(spotify_id, position_ms)) { + tokio::spawn(async move { + if let Some(data) = loader.load_track(spotify_id, position_ms).await { let _ = result_tx.send(data); - }*/ + } }); result_rx.await.map_err(|_| ()) @@ -1588,7 +1582,9 @@ impl PlayerInternal { * bytes_per_second as f64) as usize, (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, ); - stream_loader_controller.fetch_next_blocking(wait_for_data_length); + tokio::task::block_in_place(|| { + stream_loader_controller.fetch_next_blocking(wait_for_data_length) + }); } } } diff --git a/src/lib.rs b/src/lib.rs index 610062e2..31bac343 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ #![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))] pub extern crate librespot_audio as audio; -pub extern crate librespot_connect as connect; +// pub extern crate librespot_connect as connect; pub extern crate librespot_core as core; pub extern crate librespot_metadata as metadata; pub extern crate librespot_playback as playback;