Migrate to tokio 0.1

This commit is contained in:
ashthespy 2021-01-23 22:21:42 +00:00
parent 9bbf8c3b26
commit 53b4ab05ba
3 changed files with 52 additions and 17 deletions

View file

@ -49,7 +49,8 @@ num-bigint = "0.3"
protobuf = "~2.14.0" protobuf = "~2.14.0"
rand = "0.7" rand = "0.7"
rpassword = "5.0" rpassword = "5.0"
tokio-core = "0.1" rpassword = "3.0"
tokio = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
tokio-process = "0.2" tokio-process = "0.2"
tokio-signal = "0.2" tokio-signal = "0.2"

View file

@ -1,14 +1,13 @@
use std::io; use std::io;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Weak}; use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::mpsc; use futures::sync::mpsc;
use futures::{Async, Future, IntoFuture, Poll, Stream}; use futures::{Async, Future, IntoFuture, Poll, Stream};
use tokio::runtime::current_thread; use tokio::runtime::{current_thread, current_thread::Handle};
// use tokio::runtime::current_thread::Handle;
use crate::apresolve::apresolve_or_fallback; use crate::apresolve::apresolve_or_fallback;
use crate::audio_key::AudioKeyManager; use crate::audio_key::AudioKeyManager;
@ -38,7 +37,7 @@ struct SessionInternal {
mercury: Lazy<MercuryManager>, mercury: Lazy<MercuryManager>,
cache: Option<Arc<Cache>>, cache: Option<Arc<Cache>>,
// handle: Handle, handle: Mutex<Handle>,
session_id: usize, session_id: usize,
} }
@ -52,7 +51,7 @@ impl Session {
config: SessionConfig, config: SessionConfig,
credentials: Credentials, credentials: Credentials,
cache: Option<Cache>, cache: Option<Cache>,
// handle: Handle, handle: Handle,
) -> Box<dyn Future<Item = Session, Error = io::Error>> { ) -> Box<dyn Future<Item = Session, Error = io::Error>> {
let access_point = apresolve_or_fallback::<io::Error>(&config.proxy, &config.ap_port); let access_point = apresolve_or_fallback::<io::Error>(&config.proxy, &config.ap_port);
@ -74,7 +73,7 @@ impl Session {
} }
let (session, task) = Session::create( let (session, task) = Session::create(
// &handle, &handle,
transport, transport,
config, config,
cache, cache,
@ -93,7 +92,7 @@ impl Session {
} }
fn create( fn create(
// handle: &Handle, handle: &Handle,
transport: connection::Transport, transport: connection::Transport,
config: SessionConfig, config: SessionConfig,
cache: Option<Cache>, cache: Option<Cache>,
@ -123,7 +122,7 @@ impl Session {
channel: Lazy::new(), channel: Lazy::new(),
mercury: Lazy::new(), mercury: Lazy::new(),
// handle: handle.clone(), handle: Mutex::new(handle.clone()),
session_id: session_id, session_id: session_id,
})); }));
@ -158,12 +157,39 @@ impl Session {
self.0.data.read().unwrap().time_delta self.0.data.read().unwrap().time_delta
} }
// Spawn a future directly
pub fn spawn<F>(&self, f: F) pub fn spawn<F>(&self, f: F)
where where
F: Future<Item = (), Error = ()> + 'static, F: Future<Item = (), Error = ()> + Send + 'static,
{ {
current_thread::spawn(f); let handle = self.0.handle.lock().unwrap();
let spawn_res = handle.spawn(f);
match spawn_res {
Ok(_) => (),
Err(e) => error!("Session SpawnErr {:?}", e),
} }
}
// pub fn spawn<F, R>(&self, f: F)
// where
// F: FnOnce() -> R + Send + 'static,
// R: Future<Item = (), Error = ()> + Send + 'static,
// {
// // This fails when called from a different thread
// // current_thread::spawn(future::lazy(|| f()));
//
// // These fail when the Future doesn't implement Send
// let handle = self.0.handle.lock().unwrap();
// let spawn_res = handle.spawn(lazy(|| f()));
//
// // let mut te = current_thread::TaskExecutor::current();
// // let spawn_res = te.spawn_local(Box::new(future::lazy(|| f())));
//
// match spawn_res {
// Ok(_) => (),
// Err(e) => error!("Session SpawnErr {:?}", e),
// }
// }
fn debug_info(&self) { fn debug_info(&self) {
debug!( debug!(

View file

@ -9,7 +9,6 @@ use std::path::PathBuf;
use std::process::exit; use std::process::exit;
use std::str::FromStr; use std::str::FromStr;
use std::time::Instant; use std::time::Instant;
// use tokio_core::reactor::{Core, Handle};
use tokio_io::IoStream; use tokio_io::IoStream;
use url::Url; use url::Url;
@ -26,7 +25,10 @@ use librespot::playback::config::{Bitrate, PlayerConfig};
use librespot::playback::mixer::{self, Mixer, MixerConfig}; use librespot::playback::mixer::{self, Mixer, MixerConfig};
use librespot::playback::player::{Player, PlayerEvent}; use librespot::playback::player::{Player, PlayerEvent};
use tokio::runtime::current_thread; use tokio::runtime::{
current_thread,
current_thread::{Handle, Runtime},
};
mod player_event_handler; mod player_event_handler;
use crate::player_event_handler::{emit_sink_event, run_program_on_events}; use crate::player_event_handler::{emit_sink_event, run_program_on_events};
@ -401,6 +403,7 @@ struct Main {
device: Option<String>, device: Option<String>,
mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>, mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>,
mixer_config: MixerConfig, mixer_config: MixerConfig,
handle: Handle,
discovery: Option<DiscoveryStream>, discovery: Option<DiscoveryStream>,
signal: IoStream<()>, signal: IoStream<()>,
@ -418,8 +421,9 @@ struct Main {
} }
impl Main { impl Main {
fn new(setup: Setup) -> Main { fn new(handle: Handle, setup: Setup) -> Main {
let mut task = Main { let mut task = Main {
handle: handle,
cache: setup.cache, cache: setup.cache,
session_config: setup.session_config, session_config: setup.session_config,
player_config: setup.player_config, player_config: setup.player_config,
@ -460,8 +464,8 @@ impl Main {
fn credentials(&mut self, credentials: Credentials) { fn credentials(&mut self, credentials: Credentials) {
self.last_credentials = Some(credentials.clone()); self.last_credentials = Some(credentials.clone());
let config = self.session_config.clone(); let config = self.session_config.clone();
let handle = self.handle.clone();
let connection = Session::connect(config, credentials, self.cache.clone()); let connection = Session::connect(config, credentials, self.cache.clone(), handle);
self.connect = connection; self.connect = connection;
self.spirc = None; self.spirc = None;
@ -611,5 +615,9 @@ fn main() {
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
current_thread::block_on_all(Main::new(setup(&args))).unwrap() let mut runtime = Runtime::new().unwrap();
let handle = runtime.handle();
runtime.block_on(Main::new(handle, setup(&args))).unwrap();
runtime.run().unwrap();
// current_thread::block_on_all(Main::new(setup(&args))).unwrap()
} }