From 80184c4b551080f4960b8d9655aeaa10b7dbe742 Mon Sep 17 00:00:00 2001 From: Frank Villaro-Dixon Date: Tue, 27 Aug 2024 23:21:22 +0200 Subject: [PATCH] this shit fucking works Signed-off-by: Frank Villaro-Dixon --- client.py | 45 ++++++++++++++++++++-------- conn.py | 40 ++++++++++++++++++++----- pod.py | 88 ++++++++++++++++++++----------------------------------- 3 files changed, 96 insertions(+), 77 deletions(-) diff --git a/client.py b/client.py index ca998a0..d47455a 100644 --- a/client.py +++ b/client.py @@ -13,43 +13,62 @@ WEBSOCKET_URL = 'ws://localhost:9999/data' ALREADY_DONE = False socket_id = -1 -async def handle_client(client_reader, client_writer): +async def handle_client(socket_reader, socket_writer): global socket_id socket_id += 1 + try: async with websockets.connect(WEBSOCKET_URL) as websocket: + + print(f"New client connected socket {socket_id}: {socket_reader} {socket_writer}") + m = conn.WSMsg(socket_id, conn.MsgType.CONNECT) + await websocket.send(m.to_bytes()) + # Forwarding data from client to WebSocket async def client_to_websocket(): while True: - data = await client_reader.read(2024) + data = await socket_reader.read(2024) print(f'TCP{socket_id}>WS: ', hashlib.md5(data).hexdigest()) if not data: break - c = conn.Conn(socket_id, data) - await websocket.send(c.to_ws_bytes()) - + c = conn.WSMsg(socket_id, conn.MsgType.DATA, data) + await websocket.send(c.to_bytes()) + # Forwarding data from WebSocket to client async def websocket_to_client(): while True: message = await websocket.recv() - c = conn.Conn.from_ws_bytes(message) + c = conn.WSMsg.from_bytes(message) + + # XXX this is ugly, because it means that the data is sent twice or more if 2+ connections.. if c.socketid == socket_id: - # XXX this is ugly, because it means that the data is sent twice or more if 2+ connections.. - print(f'WS>TCP@{socket_id}: ', hashlib.md5(message).hexdigest()) - client_writer.write(c.data) - await client_writer.drain() - + if c.msg == conn.MsgType.DISCONNECT: + print(f"Client {socket_id} disconnected") + break + else: + print(f'WS>TCP@{socket_id}: ', hashlib.md5(message).hexdigest()) + socket_writer.write(c.payload) + await socket_writer.drain() + else: + print(f'WS>TCP@{socket_id}: ', hashlib.md5(message).hexdigest(), 'skipping') + + # Run both tasks concurrently await asyncio.gather(client_to_websocket(), websocket_to_client()) + print(f">>>> Client {socket_id} disconnected") except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() finally: - client_writer.close() - await client_writer.wait_closed() + print(f">>>>>>>>>> Closing client {socket_id}") + m = conn.WSMsg(socket_id, conn.MsgType.DISCONNECT) + await websocket.send(m.to_bytes()) + + socket_writer.close() + await socket_writer.wait_closed() async def main(): port = PORT diff --git a/conn.py b/conn.py index bb333a5..c6f32b7 100644 --- a/conn.py +++ b/conn.py @@ -1,17 +1,41 @@ +from typing import Optional import bencoder +from enum import Enum -class Conn: + + +class MsgType(Enum): + CONNECT = 0 + DISCONNECT = 1 + DATA = 2 + + +class WSMsg: socketid: int - data: bytes + msg: MsgType + payload: Optional[bytes] - def __init__(self, socketid: int, data: bytes): + def __init__(self, socketid: int, msg: MsgType, payload: Optional[bytes] = None): self.socketid = socketid - self.data = data + self.msg = msg + self.payload = payload - def to_ws_bytes(self) -> bytes: - return bencoder.encode({b"socketid": self.socketid, b"data": self.data}) @staticmethod - def from_ws_bytes(b: bytes) -> 'Conn': + def from_bytes(b: bytes) -> 'WSMsg': d = bencoder.decode(b) - return Conn(d[b"socketid"], d[b"data"]) + socketid = d[b"sid"] + mtype = MsgType(d[b"mt"]) + + if mtype == MsgType.CONNECT: + return WSMsg(socketid, mtype) + elif mtype == MsgType.DISCONNECT: + return WSMsg(socketid, mtype) + elif mtype == MsgType.DATA: + return WSMsg(socketid, mtype, d[b"data"]) + + def to_bytes(self) -> bytes: + d = {b"sid": self.socketid, b"mt": self.msg.value} + if self.payload: + d[b"data"] = self.payload + return bencoder.encode(d) diff --git a/pod.py b/pod.py index e6cada9..a782abe 100644 --- a/pod.py +++ b/pod.py @@ -9,60 +9,24 @@ HOST = '192.168.21.30' PORT = 6443 WEBSOCKET_URL = 'ws://localhost:9999/data' -async def handle_client(tcpreader, tcpwriter, ws): - sockets = {} - - print(f"New client connected: {tcpreader} {tcpwriter}") - try: - # Forwarding data from client to WebSocket - async def tcp_to_websocket(): - print("tcp_to_websocket...") - while True: - data = await tcpreader.read(2024) - c = conn.Conn(0, data) - socketid = c.socketid - print(f'TCP@{socketid}>WS: ', hashlib.md5(data).hexdigest()) - if not data: - break - await ws.send(c.to_ws_bytes()) - - # Forwarding data from WebSocket to client - async def websocket_to_tcp(): - print("websocket_to_tcp...") - while True: - message = await ws.recv() - c = conn.Conn.from_ws_bytes(message) - socketid = c.socketid - print(f'WS>TCP@{socketid}: ', hashlib.md5(message).hexdigest()) - tcpwriter.write(c.data) - await tcpwriter.drain() - - print("Running both tasks concurrently...") - # Run both tasks concurrently - await asyncio.gather(tcp_to_websocket(), websocket_to_tcp()) - print('done') - - except Exception as e: - print(f"ASYNCIOD Error: {e}") - async def handle_socket_read(socketid, tcpreader, ws): try: print(f"New socket: {socketid}. Waiting on recv") - print(tcpreader) while True: data = await tcpreader.read(2024) - print('GOT DATA', data) if data == b'': - print(f"Connection closed: {socketid}") + print(f"TCP@{socketid} Connection closed") + c = conn.WSMsg(socketid, conn.MsgType.DISCONNECT) + await ws.send(c.to_bytes()) break - print(f'TCP@{socketid}>WS: RAW', data) - c = conn.Conn(socketid, data) + c = conn.WSMsg(socketid, conn.MsgType.DATA, data) print(f'TCP@{socketid}>WS: ', hashlib.md5(data).hexdigest()) - await ws.send(c.to_ws_bytes()) + await ws.send(c.to_bytes()) + except Exception as e: - print(f"Error: {e}") + print(f"{socketid} Error: {e}") import traceback traceback.print_exc() @@ -70,22 +34,34 @@ async def handle_socket_read(socketid, tcpreader, ws): async def handle_ws_incoming(ws, sockets): data = await ws.recv() - c = conn.Conn.from_ws_bytes(data) + c = conn.WSMsg.from_bytes(data) + print(f'NEW DATA: {c.socketid} {c.msg} {c.payload}') socketid = c.socketid - if socketid not in sockets: - print(f"New socket: {socketid}") - tcpreader, tcpwriter = await asyncio.open_connection(HOST, PORT) - sockets[socketid] = (tcpreader, tcpwriter) - print(f'TCPR: {tcpreader}') - asyncio.create_task(handle_socket_read(socketid, tcpreader, ws)) - else: + if c.msg == conn.MsgType.CONNECT: + if socketid in sockets: + print(f"Socket {socketid} already connected") + return + else: + print(f"New socket: {socketid}") + tcpreader, tcpwriter = await asyncio.open_connection(HOST, PORT) + sockets[socketid] = (tcpreader, tcpwriter) + asyncio.create_task(handle_socket_read(socketid, tcpreader, ws)) + + elif c.msg == conn.MsgType.DISCONNECT: + if socketid not in sockets: + print(f"Socket {socketid} not connected") + return + else: + print(f"Socket {socketid} disconnected") + del sockets[socketid] + tcpwriter.close() + await tcpwriter.wait_closed() + + elif c.msg == conn.MsgType.DATA: tcpreader, tcpwriter = sockets[socketid] - - print(f'WS@{socketid}>TCP: ', hashlib.md5(data).hexdigest()) - print(tcpwriter) - - tcpwriter.write(c.data) + print(f'WS@{socketid}>TCP: ', hashlib.md5(data).hexdigest()) + tcpwriter.write(c.payload)