Use tokio channels and fix compilation errors

This commit is contained in:
johannesd3 2021-02-21 19:38:40 +01:00 committed by Johannesd3
parent f9c0e26f6d
commit d064ffc670
5 changed files with 30 additions and 22 deletions

3
Cargo.lock generated
View file

@ -1416,12 +1416,13 @@ dependencies = [
"log", "log",
"num-bigint", "num-bigint",
"protobuf", "protobuf",
"rand 0.7.3", "rand 0.8.3",
"serde", "serde",
"serde_derive", "serde_derive",
"serde_json", "serde_json",
"sha-1 0.9.4", "sha-1 0.9.4",
"tokio", "tokio",
"tokio-stream",
"url 1.7.2", "url 1.7.2",
] ]

View file

@ -55,7 +55,7 @@ num-bigint = "0.3"
protobuf = "~2.14.0" protobuf = "~2.14.0"
rand = "0.7" rand = "0.7"
rpassword = "5.0" rpassword = "5.0"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "process"] } tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "sync", "process"] }
url = "1.7" url = "1.7"
sha-1 = "0.8" sha-1 = "0.8"
hex = "0.4" hex = "0.4"

View file

@ -27,12 +27,13 @@ hyper = { version = "0.14", features = ["server", "http1"] }
log = "0.4" log = "0.4"
num-bigint = "0.3" num-bigint = "0.3"
protobuf = "~2.14.0" protobuf = "~2.14.0"
rand = "0.7" rand = "0.8"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
sha-1 = "0.9" sha-1 = "0.9"
tokio = { version = "1.0", features = ["macros"] } tokio = { version = "1.0", features = ["macros"] }
tokio-stream = { version = "0.1" }
url = "1.7" url = "1.7"
dns-sd = { version = "0.1.3", optional = true } dns-sd = { version = "0.1.3", optional = true }

View file

@ -6,7 +6,7 @@ use crate::playback::mixer::Mixer;
use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel}; use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel};
use crate::protocol; use crate::protocol;
use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef}; use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
use futures::channel::mpsc;
use futures::future::{self, FusedFuture}; use futures::future::{self, FusedFuture};
use futures::stream::FusedStream; use futures::stream::FusedStream;
use futures::{Future, FutureExt, StreamExt}; use futures::{Future, FutureExt, StreamExt};
@ -21,6 +21,8 @@ use protobuf::{self, Message};
use rand; use rand;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use serde_json; use serde_json;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
enum SpircPlayStatus { enum SpircPlayStatus {
Stopped, Stopped,
@ -59,8 +61,8 @@ struct SpircTask {
subscription: BoxedStream<Frame>, subscription: BoxedStream<Frame>,
sender: MercurySender, sender: MercurySender,
commands: mpsc::UnboundedReceiver<SpircCommand>, commands: Option<mpsc::UnboundedReceiver<SpircCommand>>,
player_events: PlayerEventChannel, player_events: Option<PlayerEventChannel>,
shutdown: bool, shutdown: bool,
session: Session, session: Session,
@ -263,6 +265,7 @@ impl Spirc {
.mercury() .mercury()
.subscribe(uri.clone()) .subscribe(uri.clone())
.map(Result::unwrap) .map(Result::unwrap)
.map(UnboundedReceiverStream::new)
.flatten_stream() .flatten_stream()
.map(|response| -> Frame { .map(|response| -> Frame {
let data = response.payload.first().unwrap(); let data = response.payload.first().unwrap();
@ -272,7 +275,7 @@ impl Spirc {
let sender = session.mercury().sender(uri); let sender = session.mercury().sender(uri);
let (cmd_tx, cmd_rx) = mpsc::unbounded(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let volume = config.volume; let volume = config.volume;
let task_config = SpircTaskConfig { let task_config = SpircTaskConfig {
@ -301,8 +304,8 @@ impl Spirc {
subscription: subscription, subscription: subscription,
sender: sender, sender: sender,
commands: cmd_rx, commands: Some(cmd_rx),
player_events: player_events, player_events: Some(player_events),
shutdown: false, shutdown: false,
session: session, session: session,
@ -322,34 +325,36 @@ impl Spirc {
} }
pub fn play(&self) { pub fn play(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Play); let _ = self.commands.send(SpircCommand::Play);
} }
pub fn play_pause(&self) { pub fn play_pause(&self) {
let _ = self.commands.unbounded_send(SpircCommand::PlayPause); let _ = self.commands.send(SpircCommand::PlayPause);
} }
pub fn pause(&self) { pub fn pause(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Pause); let _ = self.commands.send(SpircCommand::Pause);
} }
pub fn prev(&self) { pub fn prev(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Prev); let _ = self.commands.send(SpircCommand::Prev);
} }
pub fn next(&self) { pub fn next(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Next); let _ = self.commands.send(SpircCommand::Next);
} }
pub fn volume_up(&self) { pub fn volume_up(&self) {
let _ = self.commands.unbounded_send(SpircCommand::VolumeUp); let _ = self.commands.send(SpircCommand::VolumeUp);
} }
pub fn volume_down(&self) { pub fn volume_down(&self) {
let _ = self.commands.unbounded_send(SpircCommand::VolumeDown); let _ = self.commands.send(SpircCommand::VolumeDown);
} }
pub fn shutdown(&self) { pub fn shutdown(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Shutdown); let _ = self.commands.send(SpircCommand::Shutdown);
} }
} }
impl SpircTask { impl SpircTask {
async fn run(mut self) { async fn run(mut self) {
while !self.session.is_invalid() && !self.shutdown { while !self.session.is_invalid() && !self.shutdown {
let commands = self.commands.as_mut();
let player_events = self.player_events.as_mut();
tokio::select! { tokio::select! {
frame = self.subscription.next() => match frame { frame = self.subscription.next() => match frame {
Some(frame) => self.handle_frame(frame), Some(frame) => self.handle_frame(frame),
@ -358,10 +363,10 @@ impl SpircTask {
break; break;
} }
}, },
cmd = self.commands.next(), if !self.commands.is_terminated() => if let Some(cmd) = cmd { cmd = async { commands.unwrap().recv().await }, if commands.is_some() => if let Some(cmd) = cmd {
self.handle_command(cmd); self.handle_command(cmd);
}, },
event = self.player_events.next(), if !self.player_events.is_terminated() => if let Some(event) = event { event = async { player_events.unwrap().recv().await }, if player_events.is_some() => if let Some(event) = event {
self.handle_player_event(event) self.handle_player_event(event)
}, },
result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() { result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() {
@ -508,7 +513,7 @@ impl SpircTask {
SpircCommand::Shutdown => { SpircCommand::Shutdown => {
CommandSender::new(self, MessageType::kMessageTypeGoodbye).send(); CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
self.shutdown = true; self.shutdown = true;
self.commands.close(); self.commands.as_mut().map(|rx| rx.close());
} }
} }
} }

View file

@ -1,4 +1,4 @@
use futures::{channel::mpsc::UnboundedReceiver, future::FusedFuture, FutureExt, StreamExt}; use futures::{future::FusedFuture, FutureExt, StreamExt};
use librespot_playback::player::PlayerEvent; use librespot_playback::player::PlayerEvent;
use log::{error, info, warn}; use log::{error, info, warn};
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
@ -10,6 +10,7 @@ use std::{
io::{stderr, Write}, io::{stderr, Write},
pin::Pin, pin::Pin,
}; };
use tokio::sync::mpsc::UnboundedReceiver;
use url::Url; use url::Url;
use librespot::core::authentication::Credentials; use librespot::core::authentication::Credentials;
@ -589,7 +590,7 @@ async fn main() {
} }
} }
}, },
event = async { player_event_channel.as_mut().unwrap().next().await }, if player_event_channel.is_some() => match event { event = async { player_event_channel.as_mut().unwrap().recv().await }, if player_event_channel.is_some() => match event {
Some(event) => { Some(event) => {
if let Some(program) = &setupp.player_event_program { if let Some(program) = &setupp.player_event_program {
if let Some(child) = run_program_on_events(event, program) { if let Some(child) = run_program_on_events(event, program) {