diff --git a/Cargo.lock b/Cargo.lock index a8577d19..1d687112 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1481,7 +1481,8 @@ dependencies = [ "alsa", "byteorder", "cpal", - "futures", + "futures-executor", + "futures-util", "glib", "gstreamer", "gstreamer-app", @@ -1498,6 +1499,7 @@ dependencies = [ "sdl2", "shell-words", "thiserror", + "tokio", "zerocopy", ] diff --git a/playback/Cargo.toml b/playback/Cargo.toml index 2759ae0e..e32ee7ba 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -18,10 +18,12 @@ path = "../metadata" version = "0.1.6" [dependencies] -futures = "0.3" +futures-executor = { version = "0.3", default_features = false } +futures-util = { version = "0.3", default_features = false } log = "0.4" byteorder = "1.4" shell-words = "1.0.0" +tokio = { version = "1", features = ["sync"] } alsa = { version = "0.4", optional = true } portaudio-rs = { version = "0.3", optional = true } diff --git a/playback/src/lib.rs b/playback/src/lib.rs index f4606430..9613f2e5 100644 --- a/playback/src/lib.rs +++ b/playback/src/lib.rs @@ -1,39 +1,9 @@ #[macro_use] extern crate log; -extern crate byteorder; -extern crate futures; -extern crate shell_words; - -#[cfg(feature = "alsa-backend")] -extern crate alsa; - -#[cfg(feature = "portaudio-backend")] -extern crate portaudio_rs; - -#[cfg(feature = "pulseaudio-backend")] -extern crate libpulse_binding; -#[cfg(feature = "pulseaudio-backend")] -extern crate libpulse_simple_binding; - -#[cfg(feature = "jackaudio-backend")] -extern crate jack; - -#[cfg(feature = "gstreamer-backend")] -extern crate glib; -#[cfg(feature = "gstreamer-backend")] -extern crate gstreamer as gst; -#[cfg(feature = "gstreamer-backend")] -extern crate gstreamer_app as gst_app; -#[cfg(feature = "gstreamer-backend")] -extern crate zerocopy; - -#[cfg(feature = "sdl-backend")] -extern crate sdl2; - -extern crate librespot_audio as audio; -extern crate librespot_core; -extern crate librespot_metadata as metadata; +use librespot_audio as audio; +use librespot_core as core; +use librespot_metadata as metadata; pub mod audio_backend; pub mod config; diff --git a/playback/src/player.rs b/playback/src/player.rs index 861f91b0..bc4a1dda 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -1,3 +1,16 @@ +use std::borrow::Cow; +use std::cmp::max; +use std::future::Future; +use std::io::{self, Read, Seek, SeekFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; +use std::{mem, thread}; + +use byteorder::{LittleEndian, ReadBytesExt}; +use futures_util::{future, TryFutureExt}; +use tokio::sync::{mpsc, oneshot}; + use crate::audio::{AudioDecoder, AudioError, AudioPacket, PassthroughDecoder, VorbisDecoder}; use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use crate::audio::{ @@ -7,23 +20,11 @@ use crate::audio::{ use crate::audio_backend::Sink; use crate::config::NormalisationType; use crate::config::{Bitrate, PlayerConfig}; +use crate::core::session::Session; +use crate::core::spotify_id::SpotifyId; +use crate::core::util::SeqGenerator; use crate::metadata::{AudioItem, FileFormat}; use crate::mixer::AudioFilter; -use librespot_core::session::Session; -use librespot_core::spotify_id::SpotifyId; -use librespot_core::util::SeqGenerator; - -use byteorder::{LittleEndian, ReadBytesExt}; -use futures::channel::{mpsc, oneshot}; -use futures::{future, Future, Stream, StreamExt, TryFutureExt}; -use std::borrow::Cow; - -use std::cmp::max; -use std::io::{self, Read, Seek, SeekFrom}; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::{Duration, Instant}; -use std::{mem, thread}; const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; @@ -244,8 +245,8 @@ impl Player { where F: FnOnce() -> Box + Send + 'static, { - let (cmd_tx, cmd_rx) = mpsc::unbounded(); - let (event_sender, event_receiver) = mpsc::unbounded(); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); + let (event_sender, event_receiver) = mpsc::unbounded_channel(); let handle = thread::spawn(move || { debug!("new Player[{}]", session.session_id()); @@ -265,8 +266,8 @@ impl Player { }; // While PlayerInternal is written as a future, it still contains blocking code. - // It must be run by using wait() in a dedicated thread. - futures::executor::block_on(internal); + // It must be run by using block_on() in a dedicated thread. + futures_executor::block_on(internal); debug!("PlayerInternal thread finished."); }); @@ -281,7 +282,7 @@ impl Player { } fn command(&self, cmd: PlayerCommand) { - self.commands.as_ref().unwrap().unbounded_send(cmd).unwrap(); + self.commands.as_ref().unwrap().send(cmd).unwrap(); } pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 { @@ -317,14 +318,14 @@ impl Player { } pub fn get_player_event_channel(&self) -> PlayerEventChannel { - let (event_sender, event_receiver) = mpsc::unbounded(); + let (event_sender, event_receiver) = mpsc::unbounded_channel(); self.command(PlayerCommand::AddEventSender(event_sender)); event_receiver } - pub async fn get_end_of_track_future(&self) { + pub async fn await_end_of_track(&self) { let mut channel = self.get_player_event_channel(); - while let Some(event) = channel.next().await { + while let Some(event) = channel.recv().await { if matches!( event, PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } @@ -797,7 +798,7 @@ impl Future for PlayerInternal { let mut all_futures_completed_or_not_ready = true; // process commands that were sent to us - let cmd = match Pin::new(&mut self.commands).poll_next(cx) { + let cmd = match self.commands.poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down. Poll::Ready(Some(cmd)) => { all_futures_completed_or_not_ready = false; @@ -1580,7 +1581,7 @@ impl PlayerInternal { fn send_event(&mut self, event: PlayerEvent) { let mut index = 0; while index < self.event_senders.len() { - match self.event_senders[index].unbounded_send(event.clone()) { + match self.event_senders[index].send(event.clone()) { Ok(_) => index += 1, Err(_) => { self.event_senders.remove(index); @@ -1608,7 +1609,7 @@ impl PlayerInternal { let (result_tx, result_rx) = oneshot::channel(); std::thread::spawn(move || { - futures::executor::block_on(loader.load_track(spotify_id, position_ms)).and_then( + futures_executor::block_on(loader.load_track(spotify_id, position_ms)).and_then( move |data| { let _ = result_tx.send(data); Some(())