WIP tokio-core -> tokio migration

This commit is contained in:
ashthespy 2021-01-23 22:21:42 +00:00
parent 962d7af24d
commit 9bbf8c3b26
7 changed files with 40 additions and 49 deletions

View file

@ -328,7 +328,7 @@ impl AudioFileOpenStreaming {
stream_loader_command_rx, stream_loader_command_rx,
complete_tx, complete_tx,
); );
self.session.spawn(move |_| fetcher); self.session.spawn(fetcher);
AudioFileStreaming { AudioFileStreaming {
read_file: read_file, read_file: read_file,
@ -425,7 +425,7 @@ impl AudioFile {
}; };
let session_ = session.clone(); let session_ = session.clone();
session.spawn(move |_| { session.spawn(
complete_rx complete_rx
.map(move |mut file| { .map(move |mut file| {
if let Some(cache) = session_.cache() { if let Some(cache) = session_.cache() {
@ -435,8 +435,8 @@ impl AudioFile {
debug!("File {} complete", file_id); debug!("File {} complete", file_id);
} }
}) })
.or_else(|oneshot::Canceled| Ok(())) .or_else(|oneshot::Canceled| Ok(())),
}); );
return AudioFileOpen::Streaming(open); return AudioFileOpen::Streaming(open);
} }
@ -671,7 +671,7 @@ impl AudioFileFetch {
initial_request_sent_time, initial_request_sent_time,
); );
session.spawn(move |_| initial_data_receiver); session.spawn(initial_data_receiver);
AudioFileFetch { AudioFileFetch {
session: session, session: session,
@ -746,7 +746,7 @@ impl AudioFileFetch {
Instant::now(), Instant::now(),
); );
self.session.spawn(move |_| receiver); self.session.spawn(receiver);
} }
} }

View file

@ -33,7 +33,7 @@ serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
shannon = "0.2.0" shannon = "0.2.0"
tokio-codec = "0.1" tokio-codec = "0.1"
tokio-core = "0.1" tokio = "0.1"
tokio-io = "0.1" tokio-io = "0.1"
url = "1.7" url = "1.7"
uuid = { version = "0.8", features = ["v4"] } uuid = { version = "0.8", features = ["v4"] }

View file

@ -8,9 +8,8 @@ use futures::{Future, Sink, Stream};
use protobuf::{self, Message}; use protobuf::{self, Message};
use std::io; use std::io;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use tokio::net::TcpStream;
use tokio_codec::Framed; use tokio_codec::Framed;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
use url::Url; use url::Url;
use crate::authentication::Credentials; use crate::authentication::Credentials;
@ -22,7 +21,6 @@ pub type Transport = Framed<TcpStream, APCodec>;
pub fn connect( pub fn connect(
addr: String, addr: String,
handle: &Handle,
proxy: &Option<Url>, proxy: &Option<Url>,
) -> Box<dyn Future<Item = Transport, Error = io::Error>> { ) -> Box<dyn Future<Item = Transport, Error = io::Error>> {
let (addr, connect_url) = match *proxy { let (addr, connect_url) = match *proxy {
@ -51,7 +49,7 @@ pub fn connect(
} }
}; };
let socket = TcpStream::connect(&addr, handle); let socket = TcpStream::connect(&addr);
if let Some(connect_url) = connect_url { if let Some(connect_url) = connect_url {
let connection = socket let connection = socket
.and_then(move |socket| proxytunnel::connect(socket, &connect_url).and_then(handshake)); .and_then(move |socket| proxytunnel::connect(socket, &connect_url).and_then(handshake));

View file

@ -29,8 +29,8 @@ extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate sha1; extern crate sha1;
extern crate shannon; extern crate shannon;
extern crate tokio;
extern crate tokio_codec; extern crate tokio_codec;
extern crate tokio_core;
extern crate tokio_io; extern crate tokio_io;
extern crate url; extern crate url;
extern crate uuid; extern crate uuid;

View file

@ -7,7 +7,8 @@ use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::mpsc; use futures::sync::mpsc;
use futures::{Async, Future, IntoFuture, Poll, Stream}; use futures::{Async, Future, IntoFuture, Poll, Stream};
use tokio_core::reactor::{Handle, Remote}; use tokio::runtime::current_thread;
// use tokio::runtime::current_thread::Handle;
use crate::apresolve::apresolve_or_fallback; use crate::apresolve::apresolve_or_fallback;
use crate::audio_key::AudioKeyManager; use crate::audio_key::AudioKeyManager;
@ -37,8 +38,7 @@ struct SessionInternal {
mercury: Lazy<MercuryManager>, mercury: Lazy<MercuryManager>,
cache: Option<Arc<Cache>>, cache: Option<Arc<Cache>>,
handle: Remote, // handle: Handle,
session_id: usize, session_id: usize,
} }
@ -52,15 +52,14 @@ impl Session {
config: SessionConfig, config: SessionConfig,
credentials: Credentials, credentials: Credentials,
cache: Option<Cache>, cache: Option<Cache>,
handle: Handle, // handle: Handle,
) -> Box<dyn Future<Item = Session, Error = io::Error>> { ) -> Box<dyn Future<Item = Session, Error = io::Error>> {
let access_point = apresolve_or_fallback::<io::Error>(&config.proxy, &config.ap_port); let access_point = apresolve_or_fallback::<io::Error>(&config.proxy, &config.ap_port);
let handle_ = handle.clone();
let proxy = config.proxy.clone(); let proxy = config.proxy.clone();
let connection = access_point.and_then(move |addr| { let connection = access_point.and_then(move |addr| {
info!("Connecting to AP \"{}\"", addr); info!("Connecting to AP \"{}\"", addr);
connection::connect(addr, &handle_, &proxy) connection::connect(addr, &proxy)
}); });
let device_id = config.device_id.clone(); let device_id = config.device_id.clone();
@ -75,15 +74,16 @@ impl Session {
} }
let (session, task) = Session::create( let (session, task) = Session::create(
&handle, // &handle,
transport, transport,
config, config,
cache, cache,
reusable_credentials.username.clone(), reusable_credentials.username.clone(),
); );
handle.spawn(task.map_err(|e| { current_thread::spawn(task.map_err(|e| {
error!("{:?}", e); error!("SessionError: {}", e.to_string());
std::process::exit(0);
})); }));
session session
@ -93,7 +93,7 @@ impl Session {
} }
fn create( fn create(
handle: &Handle, // handle: &Handle,
transport: connection::Transport, transport: connection::Transport,
config: SessionConfig, config: SessionConfig,
cache: Option<Cache>, cache: Option<Cache>,
@ -123,8 +123,7 @@ impl Session {
channel: Lazy::new(), channel: Lazy::new(),
mercury: Lazy::new(), mercury: Lazy::new(),
handle: handle.remote().clone(), // handle: handle.clone(),
session_id: session_id, session_id: session_id,
})); }));
@ -159,13 +158,11 @@ impl Session {
self.0.data.read().unwrap().time_delta self.0.data.read().unwrap().time_delta
} }
pub fn spawn<F, R>(&self, f: F) pub fn spawn<F>(&self, f: F)
where where
F: FnOnce(&Handle) -> R + Send + 'static, F: Future<Item = (), Error = ()> + 'static,
R: IntoFuture<Item = (), Error = ()>,
R::Future: 'static,
{ {
self.0.handle.spawn(f) current_thread::spawn(f);
} }
fn debug_info(&self) { fn debug_info(&self) {
@ -203,7 +200,7 @@ impl Session {
0x9 | 0xa => self.channel().dispatch(cmd, data), 0x9 | 0xa => self.channel().dispatch(cmd, data),
0xd | 0xe => self.audio_key().dispatch(cmd, data), 0xd | 0xe => self.audio_key().dispatch(cmd, data),
0xb2..=0xb6 => self.mercury().dispatch(cmd, data), 0xb2..=0xb6 => self.mercury().dispatch(cmd, data),
_ => (), _ => trace!("Unknown dispatch cmd :{:?} {:?}", cmd, data),
} }
} }

View file

@ -1,3 +1,4 @@
# max_width = 105 # max_width = 105
reorder_imports = true reorder_imports = true
reorder_modules = true reorder_modules = true
edition = "2018"

View file

@ -9,7 +9,7 @@ use std::path::PathBuf;
use std::process::exit; use std::process::exit;
use std::str::FromStr; use std::str::FromStr;
use std::time::Instant; use std::time::Instant;
use tokio_core::reactor::{Core, Handle}; // use tokio_core::reactor::{Core, Handle};
use tokio_io::IoStream; use tokio_io::IoStream;
use url::Url; use url::Url;
@ -26,6 +26,8 @@ use librespot::playback::config::{Bitrate, PlayerConfig};
use librespot::playback::mixer::{self, Mixer, MixerConfig}; use librespot::playback::mixer::{self, Mixer, MixerConfig};
use librespot::playback::player::{Player, PlayerEvent}; use librespot::playback::player::{Player, PlayerEvent};
use tokio::runtime::current_thread;
mod player_event_handler; mod player_event_handler;
use crate::player_event_handler::{emit_sink_event, run_program_on_events}; use crate::player_event_handler::{emit_sink_event, run_program_on_events};
@ -399,8 +401,6 @@ struct Main {
device: Option<String>, device: Option<String>,
mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>, mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>,
mixer_config: MixerConfig, mixer_config: MixerConfig,
handle: Handle,
discovery: Option<DiscoveryStream>, discovery: Option<DiscoveryStream>,
signal: IoStream<()>, signal: IoStream<()>,
@ -418,9 +418,8 @@ struct Main {
} }
impl Main { impl Main {
fn new(handle: Handle, setup: Setup) -> Main { fn new(setup: Setup) -> Main {
let mut task = Main { let mut task = Main {
handle: handle.clone(),
cache: setup.cache, cache: setup.cache,
session_config: setup.session_config, session_config: setup.session_config,
player_config: setup.player_config, player_config: setup.player_config,
@ -444,13 +443,12 @@ impl Main {
emit_sink_events: setup.emit_sink_events, emit_sink_events: setup.emit_sink_events,
}; };
if setup.enable_discovery { // if setup.enable_discovery {
let config = task.connect_config.clone(); // let config = task.connect_config.clone();
let device_id = task.session_config.device_id.clone(); // let device_id = task.session_config.device_id.clone();
//
task.discovery = // task.discovery = Some(discovery(config, device_id, setup.zeroconf_port).unwrap());
Some(discovery(&handle, config, device_id, setup.zeroconf_port).unwrap()); // }
}
if let Some(credentials) = setup.credentials { if let Some(credentials) = setup.credentials {
task.credentials(credentials); task.credentials(credentials);
@ -462,15 +460,14 @@ impl Main {
fn credentials(&mut self, credentials: Credentials) { fn credentials(&mut self, credentials: Credentials) {
self.last_credentials = Some(credentials.clone()); self.last_credentials = Some(credentials.clone());
let config = self.session_config.clone(); let config = self.session_config.clone();
let handle = self.handle.clone();
let connection = Session::connect(config, credentials, self.cache.clone(), handle); let connection = Session::connect(config, credentials, self.cache.clone());
self.connect = connection; self.connect = connection;
self.spirc = None; self.spirc = None;
let task = mem::replace(&mut self.spirc_task, None); let task = mem::replace(&mut self.spirc_task, None);
if let Some(task) = task { if let Some(task) = task {
self.handle.spawn(task); current_thread::spawn(Box::new(task));
} }
} }
} }
@ -594,7 +591,7 @@ impl Future for Main {
}) })
.map_err(|e| error!("failed to wait on child process: {}", e)); .map_err(|e| error!("failed to wait on child process: {}", e));
self.handle.spawn(child); current_thread::spawn(child);
} }
} }
} }
@ -611,10 +608,8 @@ fn main() {
if env::var("RUST_BACKTRACE").is_err() { if env::var("RUST_BACKTRACE").is_err() {
env::set_var("RUST_BACKTRACE", "full") env::set_var("RUST_BACKTRACE", "full")
} }
let mut core = Core::new().unwrap();
let handle = core.handle();
let args: Vec<String> = std::env::args().collect(); let args: Vec<String> = std::env::args().collect();
core.run(Main::new(handle, setup(&args))).unwrap() current_thread::block_on_all(Main::new(setup(&args))).unwrap()
} }