From 9bbf8c3b26cef284eea5e193ce5755c4897b9d73 Mon Sep 17 00:00:00 2001 From: ashthespy Date: Sat, 23 Jan 2021 22:21:42 +0000 Subject: [PATCH] WIP tokio-core -> tokio migration --- audio/src/fetch.rs | 12 ++++++------ core/Cargo.toml | 2 +- core/src/connection/mod.rs | 6 ++---- core/src/lib.rs | 2 +- core/src/session.rs | 33 +++++++++++++++------------------ rustfmt.toml | 1 + src/main.rs | 33 ++++++++++++++------------------- 7 files changed, 40 insertions(+), 49 deletions(-) diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index c47cb4d3..2214cdef 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -328,7 +328,7 @@ impl AudioFileOpenStreaming { stream_loader_command_rx, complete_tx, ); - self.session.spawn(move |_| fetcher); + self.session.spawn(fetcher); AudioFileStreaming { read_file: read_file, @@ -425,7 +425,7 @@ impl AudioFile { }; let session_ = session.clone(); - session.spawn(move |_| { + session.spawn( complete_rx .map(move |mut file| { if let Some(cache) = session_.cache() { @@ -435,8 +435,8 @@ impl AudioFile { debug!("File {} complete", file_id); } }) - .or_else(|oneshot::Canceled| Ok(())) - }); + .or_else(|oneshot::Canceled| Ok(())), + ); return AudioFileOpen::Streaming(open); } @@ -671,7 +671,7 @@ impl AudioFileFetch { initial_request_sent_time, ); - session.spawn(move |_| initial_data_receiver); + session.spawn(initial_data_receiver); AudioFileFetch { session: session, @@ -746,7 +746,7 @@ impl AudioFileFetch { Instant::now(), ); - self.session.spawn(move |_| receiver); + self.session.spawn(receiver); } } diff --git a/core/Cargo.toml b/core/Cargo.toml index d2eec63f..65ba0477 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -33,7 +33,7 @@ serde_derive = "1.0" serde_json = "1.0" shannon = "0.2.0" tokio-codec = "0.1" -tokio-core = "0.1" +tokio = "0.1" tokio-io = "0.1" url = "1.7" uuid = { version = "0.8", features = ["v4"] } diff --git a/core/src/connection/mod.rs b/core/src/connection/mod.rs index 72497795..c0e95f52 100644 --- a/core/src/connection/mod.rs +++ b/core/src/connection/mod.rs @@ -8,9 +8,8 @@ use futures::{Future, Sink, Stream}; use protobuf::{self, Message}; use std::io; use std::net::ToSocketAddrs; +use tokio::net::TcpStream; use tokio_codec::Framed; -use tokio_core::net::TcpStream; -use tokio_core::reactor::Handle; use url::Url; use crate::authentication::Credentials; @@ -22,7 +21,6 @@ pub type Transport = Framed; pub fn connect( addr: String, - handle: &Handle, proxy: &Option, ) -> Box> { let (addr, connect_url) = match *proxy { @@ -51,7 +49,7 @@ pub fn connect( } }; - let socket = TcpStream::connect(&addr, handle); + let socket = TcpStream::connect(&addr); if let Some(connect_url) = connect_url { let connection = socket .and_then(move |socket| proxytunnel::connect(socket, &connect_url).and_then(handshake)); diff --git a/core/src/lib.rs b/core/src/lib.rs index c65878c2..278478c1 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -29,8 +29,8 @@ extern crate serde; extern crate serde_json; extern crate sha1; extern crate shannon; +extern crate tokio; extern crate tokio_codec; -extern crate tokio_core; extern crate tokio_io; extern crate url; extern crate uuid; diff --git a/core/src/session.rs b/core/src/session.rs index f1a24c1f..4db2f1dc 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -7,7 +7,8 @@ use byteorder::{BigEndian, ByteOrder}; use bytes::Bytes; use futures::sync::mpsc; use futures::{Async, Future, IntoFuture, Poll, Stream}; -use tokio_core::reactor::{Handle, Remote}; +use tokio::runtime::current_thread; +// use tokio::runtime::current_thread::Handle; use crate::apresolve::apresolve_or_fallback; use crate::audio_key::AudioKeyManager; @@ -37,8 +38,7 @@ struct SessionInternal { mercury: Lazy, cache: Option>, - handle: Remote, - + // handle: Handle, session_id: usize, } @@ -52,15 +52,14 @@ impl Session { config: SessionConfig, credentials: Credentials, cache: Option, - handle: Handle, + // handle: Handle, ) -> Box> { let access_point = apresolve_or_fallback::(&config.proxy, &config.ap_port); - let handle_ = handle.clone(); let proxy = config.proxy.clone(); let connection = access_point.and_then(move |addr| { info!("Connecting to AP \"{}\"", addr); - connection::connect(addr, &handle_, &proxy) + connection::connect(addr, &proxy) }); let device_id = config.device_id.clone(); @@ -75,15 +74,16 @@ impl Session { } let (session, task) = Session::create( - &handle, + // &handle, transport, config, cache, reusable_credentials.username.clone(), ); - handle.spawn(task.map_err(|e| { - error!("{:?}", e); + current_thread::spawn(task.map_err(|e| { + error!("SessionError: {}", e.to_string()); + std::process::exit(0); })); session @@ -93,7 +93,7 @@ impl Session { } fn create( - handle: &Handle, + // handle: &Handle, transport: connection::Transport, config: SessionConfig, cache: Option, @@ -123,8 +123,7 @@ impl Session { channel: Lazy::new(), mercury: Lazy::new(), - handle: handle.remote().clone(), - + // handle: handle.clone(), session_id: session_id, })); @@ -159,13 +158,11 @@ impl Session { self.0.data.read().unwrap().time_delta } - pub fn spawn(&self, f: F) + pub fn spawn(&self, f: F) where - F: FnOnce(&Handle) -> R + Send + 'static, - R: IntoFuture, - R::Future: 'static, + F: Future + 'static, { - self.0.handle.spawn(f) + current_thread::spawn(f); } fn debug_info(&self) { @@ -203,7 +200,7 @@ impl Session { 0x9 | 0xa => self.channel().dispatch(cmd, data), 0xd | 0xe => self.audio_key().dispatch(cmd, data), 0xb2..=0xb6 => self.mercury().dispatch(cmd, data), - _ => (), + _ => trace!("Unknown dispatch cmd :{:?} {:?}", cmd, data), } } diff --git a/rustfmt.toml b/rustfmt.toml index 25c1fc1e..aefd6aa8 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,4 @@ # max_width = 105 reorder_imports = true reorder_modules = true +edition = "2018" diff --git a/src/main.rs b/src/main.rs index 4f80657e..8cc06903 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,7 +9,7 @@ use std::path::PathBuf; use std::process::exit; use std::str::FromStr; use std::time::Instant; -use tokio_core::reactor::{Core, Handle}; +// use tokio_core::reactor::{Core, Handle}; use tokio_io::IoStream; use url::Url; @@ -26,6 +26,8 @@ use librespot::playback::config::{Bitrate, PlayerConfig}; use librespot::playback::mixer::{self, Mixer, MixerConfig}; use librespot::playback::player::{Player, PlayerEvent}; +use tokio::runtime::current_thread; + mod player_event_handler; use crate::player_event_handler::{emit_sink_event, run_program_on_events}; @@ -399,8 +401,6 @@ struct Main { device: Option, mixer: fn(Option) -> Box, mixer_config: MixerConfig, - handle: Handle, - discovery: Option, signal: IoStream<()>, @@ -418,9 +418,8 @@ struct Main { } impl Main { - fn new(handle: Handle, setup: Setup) -> Main { + fn new(setup: Setup) -> Main { let mut task = Main { - handle: handle.clone(), cache: setup.cache, session_config: setup.session_config, player_config: setup.player_config, @@ -444,13 +443,12 @@ impl Main { emit_sink_events: setup.emit_sink_events, }; - if setup.enable_discovery { - let config = task.connect_config.clone(); - let device_id = task.session_config.device_id.clone(); - - task.discovery = - Some(discovery(&handle, config, device_id, setup.zeroconf_port).unwrap()); - } + // if setup.enable_discovery { + // let config = task.connect_config.clone(); + // let device_id = task.session_config.device_id.clone(); + // + // task.discovery = Some(discovery(config, device_id, setup.zeroconf_port).unwrap()); + // } if let Some(credentials) = setup.credentials { task.credentials(credentials); @@ -462,15 +460,14 @@ impl Main { fn credentials(&mut self, credentials: Credentials) { self.last_credentials = Some(credentials.clone()); let config = self.session_config.clone(); - let handle = self.handle.clone(); - let connection = Session::connect(config, credentials, self.cache.clone(), handle); + let connection = Session::connect(config, credentials, self.cache.clone()); self.connect = connection; self.spirc = None; let task = mem::replace(&mut self.spirc_task, None); if let Some(task) = task { - self.handle.spawn(task); + current_thread::spawn(Box::new(task)); } } } @@ -594,7 +591,7 @@ impl Future for Main { }) .map_err(|e| error!("failed to wait on child process: {}", e)); - self.handle.spawn(child); + current_thread::spawn(child); } } } @@ -611,10 +608,8 @@ fn main() { if env::var("RUST_BACKTRACE").is_err() { env::set_var("RUST_BACKTRACE", "full") } - let mut core = Core::new().unwrap(); - let handle = core.handle(); let args: Vec = std::env::args().collect(); - core.run(Main::new(handle, setup(&args))).unwrap() + current_thread::block_on_all(Main::new(setup(&args))).unwrap() }