diff --git a/Cargo.lock b/Cargo.lock index d8f80b48..16da85e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1416,12 +1416,13 @@ dependencies = [ "log", "num-bigint", "protobuf", - "rand 0.7.3", + "rand 0.8.3", "serde", "serde_derive", "serde_json", "sha-1 0.9.4", "tokio", + "tokio-stream", "url 1.7.2", ] diff --git a/Cargo.toml b/Cargo.toml index a26db2ed..93866e29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ num-bigint = "0.3" protobuf = "~2.14.0" rand = "0.7" rpassword = "5.0" -tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "process"] } +tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "sync", "process"] } url = "1.7" sha-1 = "0.8" hex = "0.4" diff --git a/connect/Cargo.toml b/connect/Cargo.toml index 1ca6d7db..4997c5fb 100644 --- a/connect/Cargo.toml +++ b/connect/Cargo.toml @@ -27,12 +27,13 @@ hyper = { version = "0.14", features = ["server", "http1"] } log = "0.4" num-bigint = "0.3" protobuf = "~2.14.0" -rand = "0.7" +rand = "0.8" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" sha-1 = "0.9" tokio = { version = "1.0", features = ["macros"] } +tokio-stream = { version = "0.1" } url = "1.7" dns-sd = { version = "0.1.3", optional = true } diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 50307595..dd495d82 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -6,7 +6,7 @@ use crate::playback::mixer::Mixer; use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel}; use crate::protocol; use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef}; -use futures::channel::mpsc; + use futures::future::{self, FusedFuture}; use futures::stream::FusedStream; use futures::{Future, FutureExt, StreamExt}; @@ -21,6 +21,8 @@ use protobuf::{self, Message}; use rand; use rand::seq::SliceRandom; use serde_json; +use tokio::sync::mpsc; +use tokio_stream::wrappers::UnboundedReceiverStream; enum SpircPlayStatus { Stopped, @@ -59,8 +61,8 @@ struct SpircTask { subscription: BoxedStream, sender: MercurySender, - commands: mpsc::UnboundedReceiver, - player_events: PlayerEventChannel, + commands: Option>, + player_events: Option, shutdown: bool, session: Session, @@ -263,6 +265,7 @@ impl Spirc { .mercury() .subscribe(uri.clone()) .map(Result::unwrap) + .map(UnboundedReceiverStream::new) .flatten_stream() .map(|response| -> Frame { let data = response.payload.first().unwrap(); @@ -272,7 +275,7 @@ impl Spirc { let sender = session.mercury().sender(uri); - let (cmd_tx, cmd_rx) = mpsc::unbounded(); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel(); let volume = config.volume; let task_config = SpircTaskConfig { @@ -301,8 +304,8 @@ impl Spirc { subscription: subscription, sender: sender, - commands: cmd_rx, - player_events: player_events, + commands: Some(cmd_rx), + player_events: Some(player_events), shutdown: false, session: session, @@ -322,34 +325,36 @@ impl Spirc { } pub fn play(&self) { - let _ = self.commands.unbounded_send(SpircCommand::Play); + let _ = self.commands.send(SpircCommand::Play); } pub fn play_pause(&self) { - let _ = self.commands.unbounded_send(SpircCommand::PlayPause); + let _ = self.commands.send(SpircCommand::PlayPause); } pub fn pause(&self) { - let _ = self.commands.unbounded_send(SpircCommand::Pause); + let _ = self.commands.send(SpircCommand::Pause); } pub fn prev(&self) { - let _ = self.commands.unbounded_send(SpircCommand::Prev); + let _ = self.commands.send(SpircCommand::Prev); } pub fn next(&self) { - let _ = self.commands.unbounded_send(SpircCommand::Next); + let _ = self.commands.send(SpircCommand::Next); } pub fn volume_up(&self) { - let _ = self.commands.unbounded_send(SpircCommand::VolumeUp); + let _ = self.commands.send(SpircCommand::VolumeUp); } pub fn volume_down(&self) { - let _ = self.commands.unbounded_send(SpircCommand::VolumeDown); + let _ = self.commands.send(SpircCommand::VolumeDown); } pub fn shutdown(&self) { - let _ = self.commands.unbounded_send(SpircCommand::Shutdown); + let _ = self.commands.send(SpircCommand::Shutdown); } } impl SpircTask { async fn run(mut self) { while !self.session.is_invalid() && !self.shutdown { + let commands = self.commands.as_mut(); + let player_events = self.player_events.as_mut(); tokio::select! { frame = self.subscription.next() => match frame { Some(frame) => self.handle_frame(frame), @@ -358,10 +363,10 @@ impl SpircTask { break; } }, - cmd = self.commands.next(), if !self.commands.is_terminated() => if let Some(cmd) = cmd { + cmd = async { commands.unwrap().recv().await }, if commands.is_some() => if let Some(cmd) = cmd { self.handle_command(cmd); }, - event = self.player_events.next(), if !self.player_events.is_terminated() => if let Some(event) = event { + event = async { player_events.unwrap().recv().await }, if player_events.is_some() => if let Some(event) = event { self.handle_player_event(event) }, result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() { @@ -508,7 +513,7 @@ impl SpircTask { SpircCommand::Shutdown => { CommandSender::new(self, MessageType::kMessageTypeGoodbye).send(); self.shutdown = true; - self.commands.close(); + self.commands.as_mut().map(|rx| rx.close()); } } } diff --git a/src/main.rs b/src/main.rs index 31ef3001..7882203b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use futures::{channel::mpsc::UnboundedReceiver, future::FusedFuture, FutureExt, StreamExt}; +use futures::{future::FusedFuture, FutureExt, StreamExt}; use librespot_playback::player::PlayerEvent; use log::{error, info, warn}; use sha1::{Digest, Sha1}; @@ -10,6 +10,7 @@ use std::{ io::{stderr, Write}, pin::Pin, }; +use tokio::sync::mpsc::UnboundedReceiver; use url::Url; use librespot::core::authentication::Credentials; @@ -589,7 +590,7 @@ async fn main() { } } }, - event = async { player_event_channel.as_mut().unwrap().next().await }, if player_event_channel.is_some() => match event { + event = async { player_event_channel.as_mut().unwrap().recv().await }, if player_event_channel.is_some() => match event { Some(event) => { if let Some(program) = &setupp.player_event_program { if let Some(child) = run_program_on_events(event, program) {