From 53b4ab05ba9f1627d568c0193cc246fb9cae3fca Mon Sep 17 00:00:00 2001 From: ashthespy Date: Sat, 23 Jan 2021 22:21:42 +0000 Subject: [PATCH] Migrate to `tokio` 0.1 --- Cargo.toml | 3 ++- core/src/session.rs | 46 +++++++++++++++++++++++++++++++++++---------- src/main.rs | 20 ++++++++++++++------ 3 files changed, 52 insertions(+), 17 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 89c4b469..f56649d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,7 +49,8 @@ num-bigint = "0.3" protobuf = "~2.14.0" rand = "0.7" rpassword = "5.0" -tokio-core = "0.1" +rpassword = "3.0" +tokio = "0.1" tokio-io = "0.1" tokio-process = "0.2" tokio-signal = "0.2" diff --git a/core/src/session.rs b/core/src/session.rs index 4db2f1dc..1f8313f3 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -1,14 +1,13 @@ use std::io; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, RwLock, Weak}; +use std::sync::{Arc, Mutex, RwLock, Weak}; use std::time::{SystemTime, UNIX_EPOCH}; use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; use futures::sync::mpsc; use futures::{Async, Future, IntoFuture, Poll, Stream}; -use tokio::runtime::current_thread; -// use tokio::runtime::current_thread::Handle; +use tokio::runtime::{current_thread, current_thread::Handle}; use crate::apresolve::apresolve_or_fallback; use crate::audio_key::AudioKeyManager; @@ -38,7 +37,7 @@ struct SessionInternal { mercury: Lazy, cache: Option>, - // handle: Handle, + handle: Mutex, session_id: usize, } @@ -52,7 +51,7 @@ impl Session { config: SessionConfig, credentials: Credentials, cache: Option, - // handle: Handle, + handle: Handle, ) -> Box> { let access_point = apresolve_or_fallback::(&config.proxy, &config.ap_port); @@ -74,7 +73,7 @@ impl Session { } let (session, task) = Session::create( - // &handle, + &handle, transport, config, cache, @@ -93,7 +92,7 @@ impl Session { } fn create( - // handle: &Handle, + handle: &Handle, transport: connection::Transport, config: SessionConfig, cache: Option, @@ -123,7 +122,7 @@ impl Session { channel: Lazy::new(), mercury: Lazy::new(), - // handle: handle.clone(), + handle: Mutex::new(handle.clone()), session_id: session_id, })); @@ -158,13 +157,40 @@ impl Session { self.0.data.read().unwrap().time_delta } + // Spawn a future directly pub fn spawn(&self, f: F) where - F: Future + 'static, + F: Future + Send + 'static, { - current_thread::spawn(f); + let handle = self.0.handle.lock().unwrap(); + let spawn_res = handle.spawn(f); + match spawn_res { + Ok(_) => (), + Err(e) => error!("Session SpawnErr {:?}", e), + } } + // pub fn spawn(&self, f: F) + // where + // F: FnOnce() -> R + Send + 'static, + // R: Future + Send + 'static, + // { + // // This fails when called from a different thread + // // current_thread::spawn(future::lazy(|| f())); + // + // // These fail when the Future doesn't implement Send + // let handle = self.0.handle.lock().unwrap(); + // let spawn_res = handle.spawn(lazy(|| f())); + // + // // let mut te = current_thread::TaskExecutor::current(); + // // let spawn_res = te.spawn_local(Box::new(future::lazy(|| f()))); + // + // match spawn_res { + // Ok(_) => (), + // Err(e) => error!("Session SpawnErr {:?}", e), + // } + // } + fn debug_info(&self) { debug!( "Session[{}] strong={} weak={}", diff --git a/src/main.rs b/src/main.rs index 8cc06903..6673310b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,6 @@ use std::path::PathBuf; use std::process::exit; use std::str::FromStr; use std::time::Instant; -// use tokio_core::reactor::{Core, Handle}; use tokio_io::IoStream; use url::Url; @@ -26,7 +25,10 @@ use librespot::playback::config::{Bitrate, PlayerConfig}; use librespot::playback::mixer::{self, Mixer, MixerConfig}; use librespot::playback::player::{Player, PlayerEvent}; -use tokio::runtime::current_thread; +use tokio::runtime::{ + current_thread, + current_thread::{Handle, Runtime}, +}; mod player_event_handler; use crate::player_event_handler::{emit_sink_event, run_program_on_events}; @@ -401,6 +403,7 @@ struct Main { device: Option, mixer: fn(Option) -> Box, mixer_config: MixerConfig, + handle: Handle, discovery: Option, signal: IoStream<()>, @@ -418,8 +421,9 @@ struct Main { } impl Main { - fn new(setup: Setup) -> Main { + fn new(handle: Handle, setup: Setup) -> Main { let mut task = Main { + handle: handle, cache: setup.cache, session_config: setup.session_config, player_config: setup.player_config, @@ -460,8 +464,8 @@ impl Main { fn credentials(&mut self, credentials: Credentials) { self.last_credentials = Some(credentials.clone()); let config = self.session_config.clone(); - - let connection = Session::connect(config, credentials, self.cache.clone()); + let handle = self.handle.clone(); + let connection = Session::connect(config, credentials, self.cache.clone(), handle); self.connect = connection; self.spirc = None; @@ -611,5 +615,9 @@ fn main() { let args: Vec = std::env::args().collect(); - current_thread::block_on_all(Main::new(setup(&args))).unwrap() + let mut runtime = Runtime::new().unwrap(); + let handle = runtime.handle(); + runtime.block_on(Main::new(handle, setup(&args))).unwrap(); + runtime.run().unwrap(); + // current_thread::block_on_all(Main::new(setup(&args))).unwrap() }