kube-escape/conn.py
Frank Villaro-Dixon 80184c4b55 this shit fucking works
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-08-27 23:21:22 +02:00

42 lines
990 B
Python

from typing import Optional
import bencoder
from enum import Enum
class MsgType(Enum):
CONNECT = 0
DISCONNECT = 1
DATA = 2
class WSMsg:
socketid: int
msg: MsgType
payload: Optional[bytes]
def __init__(self, socketid: int, msg: MsgType, payload: Optional[bytes] = None):
self.socketid = socketid
self.msg = msg
self.payload = payload
@staticmethod
def from_bytes(b: bytes) -> 'WSMsg':
d = bencoder.decode(b)
socketid = d[b"sid"]
mtype = MsgType(d[b"mt"])
if mtype == MsgType.CONNECT:
return WSMsg(socketid, mtype)
elif mtype == MsgType.DISCONNECT:
return WSMsg(socketid, mtype)
elif mtype == MsgType.DATA:
return WSMsg(socketid, mtype, d[b"data"])
def to_bytes(self) -> bytes:
d = {b"sid": self.socketid, b"mt": self.msg.value}
if self.payload:
d[b"data"] = self.payload
return bencoder.encode(d)