Restore original blocking player behaviour

This commit is contained in:
johannesd3 2021-02-20 22:14:15 +01:00 committed by Johannes Dertmann
parent 1c4d57c6da
commit 007e653f3d
2 changed files with 78 additions and 67 deletions

View file

@ -39,6 +39,8 @@ struct SessionInternal {
mercury: OnceCell<MercuryManager>, mercury: OnceCell<MercuryManager>,
cache: Option<Arc<Cache>>, cache: Option<Arc<Cache>>,
handle: tokio::runtime::Handle,
session_id: usize, session_id: usize,
} }
@ -65,7 +67,13 @@ impl Session {
cache.save_credentials(&reusable_credentials); cache.save_credentials(&reusable_credentials);
} }
let session = Session::create(conn, config, cache, reusable_credentials.username); let session = Session::create(
conn,
config,
cache,
reusable_credentials.username,
tokio::runtime::Handle::current(),
);
Ok(session) Ok(session)
} }
@ -75,6 +83,7 @@ impl Session {
config: SessionConfig, config: SessionConfig,
cache: Option<Cache>, cache: Option<Cache>,
username: String, username: String,
handle: tokio::runtime::Handle,
) -> Session { ) -> Session {
let (sink, stream) = transport.split(); let (sink, stream) = transport.split();
@ -100,6 +109,8 @@ impl Session {
channel: OnceCell::new(), channel: OnceCell::new(),
mercury: OnceCell::new(), mercury: OnceCell::new(),
handle,
session_id: session_id, session_id: session_id,
})); }));
@ -139,7 +150,7 @@ impl Session {
T: Future + Send + 'static, T: Future + Send + 'static,
T::Output: Send + 'static, T::Output: Send + 'static,
{ {
tokio::spawn(task); self.0.handle.spawn(task);
} }
fn debug_info(&self) { fn debug_info(&self) {

View file

@ -7,7 +7,6 @@ use crate::audio::{
use crate::audio_backend::Sink; use crate::audio_backend::Sink;
use crate::config::NormalisationType; use crate::config::NormalisationType;
use crate::config::{Bitrate, PlayerConfig}; use crate::config::{Bitrate, PlayerConfig};
use crate::librespot_core::tokio;
use crate::metadata::{AudioItem, FileFormat}; use crate::metadata::{AudioItem, FileFormat};
use crate::mixer::AudioFilter; use crate::mixer::AudioFilter;
use librespot_core::session::Session; use librespot_core::session::Session;
@ -15,25 +14,22 @@ use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator; use librespot_core::util::SeqGenerator;
use byteorder::{LittleEndian, ReadBytesExt}; use byteorder::{LittleEndian, ReadBytesExt};
use futures::{ use futures::channel::{mpsc, oneshot};
channel::{mpsc, oneshot}, use futures::{future, Future, Stream, StreamExt, TryFutureExt};
future, Future, Stream, StreamExt, use std::borrow::Cow;
};
use std::io::{Read, Seek, SeekFrom}; use std::cmp::max;
use std::mem; use std::io::{self, Read, Seek, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{borrow::Cow, io}; use std::{mem, thread};
use std::{
cmp::max,
pin::Pin,
task::{Context, Poll},
};
const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;
pub struct Player { pub struct Player {
commands: Option<mpsc::UnboundedSender<PlayerCommand>>, commands: Option<mpsc::UnboundedSender<PlayerCommand>>,
task_handle: Option<tokio::task::JoinHandle<()>>, thread_handle: Option<thread::JoinHandle<()>>,
play_request_id_generator: SeqGenerator<u64>, play_request_id_generator: SeqGenerator<u64>,
} }
@ -251,33 +247,33 @@ impl Player {
let (cmd_tx, cmd_rx) = mpsc::unbounded(); let (cmd_tx, cmd_rx) = mpsc::unbounded();
let (event_sender, event_receiver) = mpsc::unbounded(); let (event_sender, event_receiver) = mpsc::unbounded();
debug!("new Player[{}]", session.session_id()); let handle = thread::spawn(move || {
debug!("new Player[{}]", session.session_id());
let internal = PlayerInternal { let internal = PlayerInternal {
session: session, session: session,
config: config, config: config,
commands: cmd_rx, commands: cmd_rx,
state: PlayerState::Stopped, state: PlayerState::Stopped,
preload: PlayerPreload::None, preload: PlayerPreload::None,
sink: sink_builder(), sink: sink_builder(),
sink_status: SinkStatus::Closed, sink_status: SinkStatus::Closed,
sink_event_callback: None, sink_event_callback: None,
audio_filter: audio_filter, audio_filter: audio_filter,
event_senders: [event_sender].to_vec(), event_senders: [event_sender].to_vec(),
}; };
// While PlayerInternal is written as a future, it still contains blocking code. // While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread. // It must be run by using wait() in a dedicated thread.
let handle = tokio::spawn(async move { futures::executor::block_on(internal);
internal.await;
debug!("PlayerInternal thread finished."); debug!("PlayerInternal thread finished.");
}); });
( (
Player { Player {
commands: Some(cmd_tx), commands: Some(cmd_tx),
task_handle: Some(handle), thread_handle: Some(handle),
play_request_id_generator: SeqGenerator::new(0), play_request_id_generator: SeqGenerator::new(0),
}, },
event_receiver, event_receiver,
@ -351,13 +347,11 @@ impl Drop for Player {
fn drop(&mut self) { fn drop(&mut self) {
debug!("Shutting down player thread ..."); debug!("Shutting down player thread ...");
self.commands = None; self.commands = None;
if let Some(handle) = self.task_handle.take() { if let Some(handle) = self.thread_handle.take() {
tokio::spawn(async { match handle.join() {
match handle.await { Ok(_) => (),
Ok(_) => (), Err(_) => error!("Player thread panicked!"),
Err(_) => error!("Player thread panicked!"), }
}
});
} }
} }
} }
@ -436,15 +430,23 @@ impl PlayerState {
#[allow(dead_code)] #[allow(dead_code)]
fn is_stopped(&self) -> bool { fn is_stopped(&self) -> bool {
matches!(self, Self::Stopped) use self::PlayerState::*;
match *self {
Stopped => true,
_ => false,
}
} }
fn is_loading(&self) -> bool { fn is_loading(&self) -> bool {
matches!(self, Self::Loading { .. }) use self::PlayerState::*;
match *self {
Loading { .. } => true,
_ => false,
}
} }
fn decoder(&mut self) -> Option<&mut Decoder> { fn decoder(&mut self) -> Option<&mut Decoder> {
use PlayerState::*; use self::PlayerState::*;
match *self { match *self {
Stopped | EndOfTrack { .. } | Loading { .. } => None, Stopped | EndOfTrack { .. } | Loading { .. } => None,
Paused { Paused {
@ -1243,9 +1245,10 @@ impl PlayerInternal {
loaded_track loaded_track
.stream_loader_controller .stream_loader_controller
.set_random_access_mode(); .set_random_access_mode();
let _ = tokio::task::block_in_place(|| { let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking.
loaded_track.decoder.seek(position_ms as i64) // But most likely the track is fully
}); // loaded already because we played
// to the end of it.
loaded_track.stream_loader_controller.set_stream_mode(); loaded_track.stream_loader_controller.set_stream_mode();
loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms); loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms);
} }
@ -1278,7 +1281,7 @@ impl PlayerInternal {
// we can use the current decoder. Ensure it's at the correct position. // we can use the current decoder. Ensure it's at the correct position.
if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm { if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm {
stream_loader_controller.set_random_access_mode(); stream_loader_controller.set_random_access_mode();
let _ = tokio::task::block_in_place(|| decoder.seek(position_ms as i64)); let _ = decoder.seek(position_ms as i64); // This may be blocking.
stream_loader_controller.set_stream_mode(); stream_loader_controller.set_stream_mode();
*stream_position_pcm = Self::position_ms_to_pcm(position_ms); *stream_position_pcm = Self::position_ms_to_pcm(position_ms);
} }
@ -1346,9 +1349,7 @@ impl PlayerInternal {
loaded_track loaded_track
.stream_loader_controller .stream_loader_controller
.set_random_access_mode(); .set_random_access_mode();
let _ = tokio::task::block_in_place(|| { let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking
loaded_track.decoder.seek(position_ms as i64)
});
loaded_track.stream_loader_controller.set_stream_mode(); loaded_track.stream_loader_controller.set_stream_mode();
} }
self.start_playback(track_id, play_request_id, *loaded_track, play); self.start_playback(track_id, play_request_id, *loaded_track, play);
@ -1563,7 +1564,7 @@ impl PlayerInternal {
} }
} }
pub fn load_track( fn load_track(
&self, &self,
spotify_id: SpotifyId, spotify_id: SpotifyId,
position_ms: u32, position_ms: u32,
@ -1574,22 +1575,23 @@ impl PlayerInternal {
// easily. Instead we spawn a thread to do the work and return a one-shot channel as the // easily. Instead we spawn a thread to do the work and return a one-shot channel as the
// future to work with. // future to work with.
let session = self.session.clone(); let loader = PlayerTrackLoader {
let config = self.config.clone(); session: self.session.clone(),
config: self.config.clone(),
};
async move { let (result_tx, result_rx) = oneshot::channel();
let loader = PlayerTrackLoader { session, config };
let (result_tx, result_rx) = oneshot::channel(); std::thread::spawn(move || {
futures::executor::block_on(loader.load_track(spotify_id, position_ms)).and_then(
tokio::spawn(async move { move |data| {
if let Some(data) = loader.load_track(spotify_id, position_ms).await {
let _ = result_tx.send(data); let _ = result_tx.send(data);
} Some(())
}); },
);
});
result_rx.await.map_err(|_| ()) result_rx.map_err(|_| ())
}
} }
fn preload_data_before_playback(&mut self) { fn preload_data_before_playback(&mut self) {
@ -1615,9 +1617,7 @@ impl PlayerInternal {
* bytes_per_second as f64) as usize, * bytes_per_second as f64) as usize,
(READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
); );
tokio::task::block_in_place(|| { stream_loader_controller.fetch_next_blocking(wait_for_data_length);
stream_loader_controller.fetch_next_blocking(wait_for_data_length)
});
} }
} }
} }