From 0892587c0ef88d7ee6b1aaab604525b88c760b43 Mon Sep 17 00:00:00 2001 From: ashthespy Date: Sat, 23 Jan 2021 22:21:42 +0000 Subject: [PATCH] [Core] WIP: Sessions --- core/src/session.rs | 214 +++++++++++++++++++++----------------------- 1 file changed, 102 insertions(+), 112 deletions(-) diff --git a/core/src/session.rs b/core/src/session.rs index 821ae874..9a8df2b7 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -10,13 +10,7 @@ use bytes::Bytes; // use tokio::runtime::{current_thread, current_thread::Handle}; // use futures::future::{IntoFuture, Remote}; -use futures::{ - channel::mpsc, - // future::{IntoFuture, Remote}, - Future, - Stream, - TryFutureExt, -}; +use futures::{channel::mpsc, future, Future, Stream, StreamExt, TryFutureExt}; use std::{ pin::Pin, task::{Context, Poll}, @@ -25,14 +19,14 @@ use std::{ use tokio::runtime::Handle; use crate::apresolve::apresolve_or_fallback; -use crate::audio_key::AudioKeyManager; +// use crate::audio_key::AudioKeyManager; use crate::authentication::Credentials; use crate::cache::Cache; -use crate::channel::ChannelManager; -use crate::component::Lazy; +// use crate::channel::ChannelManager; +// use crate::component::Lazy; use crate::config::SessionConfig; use crate::connection; -use crate::mercury::MercuryManager; +// use crate::mercury::MercuryManager; struct SessionData { country: String, @@ -45,13 +39,12 @@ struct SessionInternal { config: SessionConfig, data: RwLock, - tx_connection: mpsc::UnboundedSender<(u8, Vec)>, + tx_connection: mpsc::UnboundedSender)>>, - audio_key: Lazy, - channel: Lazy, - mercury: Lazy, + // audio_key: Lazy, + // channel: Lazy, + // mercury: Lazy, cache: Option>, - handle: Mutex, session_id: usize, } @@ -71,42 +64,44 @@ impl Session { cache: Option, handle: Handle, ) -> Result { - unimplemented!() - // let access_point_addr = - // apresolve_or_fallback::(&config.proxy, &config.ap_port).await?; - // - // let proxy = config.proxy.clone(); - // info!("Connecting to AP \"{}\"", access_point_addr); - // let connection = connection::connect(access_point_addr, &proxy); - // - // let device_id = config.device_id.clone(); - // let authentication = connection.and_then(move |connection| { - // connection::authenticate(connection, credentials, device_id) - // }); - // - // let result = authentication.map(move |(transport, reusable_credentials)| { - // info!("Authenticated as \"{}\" !", reusable_credentials.username); - // if let Some(ref cache) = cache { - // cache.save_credentials(&reusable_credentials); - // } - // - // let (session, task) = Session::create( - // &handle, - // transport, - // config, - // cache, - // reusable_credentials.username.clone(), - // ); - // - // tokio::spawn(task.map_err(|e| { - // error!("SessionError: {}", e.to_string()); - // std::process::exit(0); - // })); - // - // session - // }); - // - // result + let access_point_addr = + apresolve_or_fallback::(&config.proxy, &config.ap_port).await?; + + let proxy = config.proxy.clone(); + info!("Connecting to AP \"{}\"", access_point_addr); + let connection = connection::connect(access_point_addr, &proxy); + + let device_id = config.device_id.clone(); + let authentication = connection.and_then(move |connection| { + connection::authenticate(connection, credentials, device_id) + }); + + let result = match authentication.await { + Ok((transport, reusable_credentials)) => { + info!("Authenticated as \"{}\" !", reusable_credentials.username); + if let Some(ref cache) = cache { + cache.save_credentials(&reusable_credentials); + } + + let (session, tasks) = Session::create( + &handle, + transport, + config, + cache, + reusable_credentials.username.clone(), + ); + + tokio::task::spawn_local(async move { tasks }); + + Ok(session) + } + Err(e) => { + error!("Unable to Connect"); + Err(e.into()) + } + }; + + result } fn create( @@ -115,7 +110,7 @@ impl Session { config: SessionConfig, cache: Option, username: String, - ) -> (Session, Box>>) { + ) -> (Session, Box, Result<()>)>>) { let (sink, stream) = transport.split(); let (sender_tx, sender_rx) = mpsc::unbounded(); @@ -124,7 +119,7 @@ impl Session { debug!("new Session[{}]", session_id); let session = Session(Arc::new(SessionInternal { - config: config, + config, data: RwLock::new(SessionData { country: String::new(), canonical_username: username, @@ -136,57 +131,52 @@ impl Session { cache: cache.map(Arc::new), - audio_key: Lazy::new(), - channel: Lazy::new(), - mercury: Lazy::new(), - + // audio_key: Lazy::new(), + // channel: Lazy::new(), + // mercury: Lazy::new(), handle: Mutex::new(handle.clone()), - session_id: session_id, + session_id, })); let sender_task = sender_rx - .map_err(|e| -> io::Error { panic!(e) }) .forward(sink) - .map(|_| ()); + .map_err(|e| -> Box { Box::new(e) }); + let receiver_task = DispatchTask(stream, session.weak()); - let task = Box::new( - (receiver_task, sender_task) - .into_future() - .map(|((), ())| ()), - ); + let task = Box::new(future::join(receiver_task, sender_task)); (session, task) } - pub fn audio_key(&self) -> &AudioKeyManager { - self.0.audio_key.get(|| AudioKeyManager::new(self.weak())) - } + // pub fn audio_key(&self) -> &AudioKeyManager { + // self.0.audio_key.get(|| AudioKeyManager::new(self.weak())) + // } - pub fn channel(&self) -> &ChannelManager { - self.0.channel.get(|| ChannelManager::new(self.weak())) - } + // pub fn channel(&self) -> &ChannelManager { + // self.0.channel.get(|| ChannelManager::new(self.weak())) + // } - pub fn mercury(&self) -> &MercuryManager { - self.0.mercury.get(|| MercuryManager::new(self.weak())) - } + // pub fn mercury(&self) -> &MercuryManager { + // self.0.mercury.get(|| MercuryManager::new(self.weak())) + // } pub fn time_delta(&self) -> i64 { self.0.data.read().unwrap().time_delta } // Spawn a future directly - pub fn spawn(&self, f: F) - where - F: Future + Send + 'static, - { - let handle = self.0.handle.lock().unwrap(); - let spawn_res = handle.spawn(f); - match spawn_res { - Ok(_) => (), - Err(e) => error!("Session SpawnErr {:?}", e), - } - } + // pub fn spawn(&self, f: F) + // where + // F: Future + Send + 'static, + // { + // let handle = self.0.handle.lock().unwrap(); + // let spawn_res = handle.spawn(f); + // match spawn_res { + // Ok(_) => (), + // Err(e) => error!("Session SpawnErr {:?}", e), + // } + // } // pub fn spawn(&self, f: F) // where @@ -218,7 +208,7 @@ impl Session { ); } - #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))] + // #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))] fn dispatch(&self, cmd: u8, data: Bytes) { match cmd { 0x4 => { @@ -241,15 +231,18 @@ impl Session { self.0.data.write().unwrap().country = country; } - 0x9 | 0xa => self.channel().dispatch(cmd, data), - 0xd | 0xe => self.audio_key().dispatch(cmd, data), - 0xb2..=0xb6 => self.mercury().dispatch(cmd, data), + // 0x9 | 0xa => self.channel().dispatch(cmd, data), + // 0xd | 0xe => self.audio_key().dispatch(cmd, data), + // 0xb2..=0xb6 => self.mercury().dispatch(cmd, data), _ => trace!("Unknown dispatch cmd :{:?} {:?}", cmd, data), } } pub fn send_packet(&self, cmd: u8, data: Vec) { - self.0.tx_connection.unbounded_send((cmd, data)).unwrap(); + self.0 + .tx_connection + .unbounded_send(Ok((cmd, data))) + .unwrap(); } pub fn cache(&self) -> Option<&Arc> { @@ -283,8 +276,8 @@ impl Session { pub fn shutdown(&self) { debug!("Invalidating session[{}]", self.0.session_id); self.0.data.write().unwrap().invalid = true; - self.mercury().shutdown(); - self.channel().shutdown(); + // self.mercury().shutdown(); + // self.channel().shutdown(); } pub fn is_invalid(&self) -> bool { @@ -311,40 +304,37 @@ impl Drop for SessionInternal { } } -// type SErr = ::std::fmt::Debug; - struct DispatchTask(S, SessionWeak) where - S: Stream>; + S: Stream> + Unpin; -impl Future for DispatchTask +impl>> Future for DispatchTask where - // SErr: ::std::fmt::Debug, - S: Stream>, + S: Stream> + Unpin, { - type Output = Result<((), ())>; + type Output = Result<()>; - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { let session = match self.1.try_upgrade() { Some(session) => session, - None => return Poll::Ready(()), + None => return Poll::Ready(Ok(())), }; loop { - let (cmd, data) = match self.unwrap().0.poll() { - Poll::Ready(Ok(Some(t))) => t, - Poll::Ready(Ok(None)) => { + let (cmd, data) = match Pin::new(&mut self.0).poll_next(cx) { + Poll::Ready(Some(Ok(t))) => t, + Poll::Ready(Some(Err(e))) => { + warn!("Server Connectioned errored"); + session.shutdown(); + return Poll::Ready(Err(Box::new(e))); + } + Poll::Ready(None) => { warn!("Connection to server closed."); session.shutdown(); - return Ok(Poll::Ready(())); + return Poll::Ready(Ok(())); } Poll::Pending => return Poll::Pending, - Poll::Ready(Err(e)) => { - session.shutdown(); - return Err(From::from(e)); - } }; - session.dispatch(cmd, data); } } @@ -352,7 +342,7 @@ where impl Drop for DispatchTask where - S: Stream>, + S: Stream> + Unpin, { fn drop(&mut self) { debug!("drop Dispatch");