[Core] WIP: Sessions

This commit is contained in:
ashthespy 2021-01-23 22:21:42 +00:00
parent 20dd94fe20
commit 0892587c0e

View file

@ -10,13 +10,7 @@ use bytes::Bytes;
// use tokio::runtime::{current_thread, current_thread::Handle};
// use futures::future::{IntoFuture, Remote};
use futures::{
channel::mpsc,
// future::{IntoFuture, Remote},
Future,
Stream,
TryFutureExt,
};
use futures::{channel::mpsc, future, Future, Stream, StreamExt, TryFutureExt};
use std::{
pin::Pin,
task::{Context, Poll},
@ -25,14 +19,14 @@ use std::{
use tokio::runtime::Handle;
use crate::apresolve::apresolve_or_fallback;
use crate::audio_key::AudioKeyManager;
// use crate::audio_key::AudioKeyManager;
use crate::authentication::Credentials;
use crate::cache::Cache;
use crate::channel::ChannelManager;
use crate::component::Lazy;
// use crate::channel::ChannelManager;
// use crate::component::Lazy;
use crate::config::SessionConfig;
use crate::connection;
use crate::mercury::MercuryManager;
// use crate::mercury::MercuryManager;
struct SessionData {
country: String,
@ -45,13 +39,12 @@ struct SessionInternal {
config: SessionConfig,
data: RwLock<SessionData>,
tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
tx_connection: mpsc::UnboundedSender<io::Result<(u8, Vec<u8>)>>,
audio_key: Lazy<AudioKeyManager>,
channel: Lazy<ChannelManager>,
mercury: Lazy<MercuryManager>,
// audio_key: Lazy<AudioKeyManager>,
// channel: Lazy<ChannelManager>,
// mercury: Lazy<MercuryManager>,
cache: Option<Arc<Cache>>,
handle: Mutex<Handle>,
session_id: usize,
}
@ -71,42 +64,44 @@ impl Session {
cache: Option<Cache>,
handle: Handle,
) -> Result<Session> {
unimplemented!()
// let access_point_addr =
// apresolve_or_fallback::<io::Error>(&config.proxy, &config.ap_port).await?;
//
// let proxy = config.proxy.clone();
// info!("Connecting to AP \"{}\"", access_point_addr);
// let connection = connection::connect(access_point_addr, &proxy);
//
// let device_id = config.device_id.clone();
// let authentication = connection.and_then(move |connection| {
// connection::authenticate(connection, credentials, device_id)
// });
//
// let result = authentication.map(move |(transport, reusable_credentials)| {
// info!("Authenticated as \"{}\" !", reusable_credentials.username);
// if let Some(ref cache) = cache {
// cache.save_credentials(&reusable_credentials);
// }
//
// let (session, task) = Session::create(
// &handle,
// transport,
// config,
// cache,
// reusable_credentials.username.clone(),
// );
//
// tokio::spawn(task.map_err(|e| {
// error!("SessionError: {}", e.to_string());
// std::process::exit(0);
// }));
//
// session
// });
//
// result
let access_point_addr =
apresolve_or_fallback::<io::Error>(&config.proxy, &config.ap_port).await?;
let proxy = config.proxy.clone();
info!("Connecting to AP \"{}\"", access_point_addr);
let connection = connection::connect(access_point_addr, &proxy);
let device_id = config.device_id.clone();
let authentication = connection.and_then(move |connection| {
connection::authenticate(connection, credentials, device_id)
});
let result = match authentication.await {
Ok((transport, reusable_credentials)) => {
info!("Authenticated as \"{}\" !", reusable_credentials.username);
if let Some(ref cache) = cache {
cache.save_credentials(&reusable_credentials);
}
let (session, tasks) = Session::create(
&handle,
transport,
config,
cache,
reusable_credentials.username.clone(),
);
tokio::task::spawn_local(async move { tasks });
Ok(session)
}
Err(e) => {
error!("Unable to Connect");
Err(e.into())
}
};
result
}
fn create(
@ -115,7 +110,7 @@ impl Session {
config: SessionConfig,
cache: Option<Cache>,
username: String,
) -> (Session, Box<dyn Future<Output = Result<()>>>) {
) -> (Session, Box<dyn Future<Output = (Result<()>, Result<()>)>>) {
let (sink, stream) = transport.split();
let (sender_tx, sender_rx) = mpsc::unbounded();
@ -124,7 +119,7 @@ impl Session {
debug!("new Session[{}]", session_id);
let session = Session(Arc::new(SessionInternal {
config: config,
config,
data: RwLock::new(SessionData {
country: String::new(),
canonical_username: username,
@ -136,57 +131,52 @@ impl Session {
cache: cache.map(Arc::new),
audio_key: Lazy::new(),
channel: Lazy::new(),
mercury: Lazy::new(),
// audio_key: Lazy::new(),
// channel: Lazy::new(),
// mercury: Lazy::new(),
handle: Mutex::new(handle.clone()),
session_id: session_id,
session_id,
}));
let sender_task = sender_rx
.map_err(|e| -> io::Error { panic!(e) })
.forward(sink)
.map(|_| ());
.map_err(|e| -> Box<dyn std::error::Error> { Box::new(e) });
let receiver_task = DispatchTask(stream, session.weak());
let task = Box::new(
(receiver_task, sender_task)
.into_future()
.map(|((), ())| ()),
);
let task = Box::new(future::join(receiver_task, sender_task));
(session, task)
}
pub fn audio_key(&self) -> &AudioKeyManager {
self.0.audio_key.get(|| AudioKeyManager::new(self.weak()))
}
// pub fn audio_key(&self) -> &AudioKeyManager {
// self.0.audio_key.get(|| AudioKeyManager::new(self.weak()))
// }
pub fn channel(&self) -> &ChannelManager {
self.0.channel.get(|| ChannelManager::new(self.weak()))
}
// pub fn channel(&self) -> &ChannelManager {
// self.0.channel.get(|| ChannelManager::new(self.weak()))
// }
pub fn mercury(&self) -> &MercuryManager {
self.0.mercury.get(|| MercuryManager::new(self.weak()))
}
// pub fn mercury(&self) -> &MercuryManager {
// self.0.mercury.get(|| MercuryManager::new(self.weak()))
// }
pub fn time_delta(&self) -> i64 {
self.0.data.read().unwrap().time_delta
}
// Spawn a future directly
pub fn spawn<F>(&self, f: F)
where
F: Future<Output = ()> + Send + 'static,
{
let handle = self.0.handle.lock().unwrap();
let spawn_res = handle.spawn(f);
match spawn_res {
Ok(_) => (),
Err(e) => error!("Session SpawnErr {:?}", e),
}
}
// pub fn spawn<F>(&self, f: F)
// where
// F: Future<Output = ()> + Send + 'static,
// {
// let handle = self.0.handle.lock().unwrap();
// let spawn_res = handle.spawn(f);
// match spawn_res {
// Ok(_) => (),
// Err(e) => error!("Session SpawnErr {:?}", e),
// }
// }
// pub fn spawn<F, R>(&self, f: F)
// where
@ -218,7 +208,7 @@ impl Session {
);
}
#[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
// #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
fn dispatch(&self, cmd: u8, data: Bytes) {
match cmd {
0x4 => {
@ -241,15 +231,18 @@ impl Session {
self.0.data.write().unwrap().country = country;
}
0x9 | 0xa => self.channel().dispatch(cmd, data),
0xd | 0xe => self.audio_key().dispatch(cmd, data),
0xb2..=0xb6 => self.mercury().dispatch(cmd, data),
// 0x9 | 0xa => self.channel().dispatch(cmd, data),
// 0xd | 0xe => self.audio_key().dispatch(cmd, data),
// 0xb2..=0xb6 => self.mercury().dispatch(cmd, data),
_ => trace!("Unknown dispatch cmd :{:?} {:?}", cmd, data),
}
}
pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
self.0.tx_connection.unbounded_send((cmd, data)).unwrap();
self.0
.tx_connection
.unbounded_send(Ok((cmd, data)))
.unwrap();
}
pub fn cache(&self) -> Option<&Arc<Cache>> {
@ -283,8 +276,8 @@ impl Session {
pub fn shutdown(&self) {
debug!("Invalidating session[{}]", self.0.session_id);
self.0.data.write().unwrap().invalid = true;
self.mercury().shutdown();
self.channel().shutdown();
// self.mercury().shutdown();
// self.channel().shutdown();
}
pub fn is_invalid(&self) -> bool {
@ -311,40 +304,37 @@ impl Drop for SessionInternal {
}
}
// type SErr = ::std::fmt::Debug;
struct DispatchTask<S>(S, SessionWeak)
where
S: Stream<Item = Result<((u8, Bytes), ())>>;
S: Stream<Item = io::Result<(u8, Bytes)>> + Unpin;
impl<S> Future for DispatchTask<S>
impl<S: Stream<Item = io::Result<(u8, Bytes)>>> Future for DispatchTask<S>
where
// SErr: ::std::fmt::Debug,
S: Stream<Item = Result<((u8, Bytes), ())>>,
S: Stream<Item = io::Result<(u8, Bytes)>> + Unpin,
{
type Output = Result<((), ())>;
type Output = Result<()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let session = match self.1.try_upgrade() {
Some(session) => session,
None => return Poll::Ready(()),
None => return Poll::Ready(Ok(())),
};
loop {
let (cmd, data) = match self.unwrap().0.poll() {
Poll::Ready(Ok(Some(t))) => t,
Poll::Ready(Ok(None)) => {
let (cmd, data) = match Pin::new(&mut self.0).poll_next(cx) {
Poll::Ready(Some(Ok(t))) => t,
Poll::Ready(Some(Err(e))) => {
warn!("Server Connectioned errored");
session.shutdown();
return Poll::Ready(Err(Box::new(e)));
}
Poll::Ready(None) => {
warn!("Connection to server closed.");
session.shutdown();
return Ok(Poll::Ready(()));
return Poll::Ready(Ok(()));
}
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => {
session.shutdown();
return Err(From::from(e));
}
};
session.dispatch(cmd, data);
}
}
@ -352,7 +342,7 @@ where
impl<S> Drop for DispatchTask<S>
where
S: Stream<Item = Result<((u8, Bytes), ())>>,
S: Stream<Item = io::Result<(u8, Bytes)>> + Unpin,
{
fn drop(&mut self) {
debug!("drop Dispatch");