From f144d7a986213e6c244bd4a014a492fe5510316d Mon Sep 17 00:00:00 2001 From: 30hours Date: Fri, 9 Feb 2024 14:47:13 +0000 Subject: [PATCH] Message over TCP now in class form --- api/main.py | 21 ++--- common/Message.py | 102 ++++++++++++++++++++++ common/__pycache__/Message.cpython-39.pyc | Bin 0 -> 3074 bytes docker-compose.yml | 6 +- event/event.py | 47 +++------- 5 files changed, 124 insertions(+), 52 deletions(-) create mode 100644 common/Message.py create mode 100644 common/__pycache__/Message.cpython-39.pyc diff --git a/api/main.py b/api/main.py index 094efef..1dcd6ca 100644 --- a/api/main.py +++ b/api/main.py @@ -6,16 +6,12 @@ import sqlite3 import requests import time import socket +import asyncio -from util.Sqlite import Sqlite +from common.Message import Message app = Flask(__name__) -# init db -# sqlite = Sqlite('./db/3lips.db') -# schema = "api TEXT PRIMARY KEY, timestamp INTEGER NOT NULL" -# sqlite.create_table("data", schema) - # store state data servers = [ {"name": "radar4", "url": "radar4.30hours.dev"}, @@ -40,6 +36,9 @@ adsbs = [ {"name": "None", "url": ""} ] +# init messaging +message = Message('event', 6969) + @app.route("/") def index(): return render_template("index.html", servers=servers, \ @@ -55,9 +54,7 @@ def serve_static(file): @app.route("/api") def api(): api = request.query_string.decode('utf-8') - # timestamp = time.time()*1000 - # sqlite.add_entry("data", api, timestamp) - send_data_to_event(api) + message.send_message(api) urls = request.args.getlist("url") data = [{"url": 'http://' + url} for url in urls] @@ -86,11 +83,5 @@ def serve_cesium_content(file): print(f"Error fetching content from Apache server: {e}") return Response('Error fetching content from Apache server', status=500, content_type='text/plain') -def send_data_to_event(data): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - # Use the service name 'event' as specified in your Docker Compose file - s.connect(('event', 12345)) - s.sendall(data.encode()) - if __name__ == "__main__": app.run(debug=True) diff --git a/common/Message.py b/common/Message.py new file mode 100644 index 0000000..0e0b071 --- /dev/null +++ b/common/Message.py @@ -0,0 +1,102 @@ +""" +@file Message.py +@author 30hours +""" + +import socket +import threading +import asyncio + +class Message: + + """ + @class Message + @brief A class for simple TCP messaging using a listener and sender. + """ + + def __init__(self, host, port): + + """ + @brief Constructor for Message. + @param host (str): The host to bind the listener to. + @param port (int): The port to bind the listener to. + """ + + self.host = host + self.port = port + self.server_socket = None + self.callback_message_received = None + + def start_listener(self): + + """ + @brief Start the TCP listener to accept incoming connections. + @details Ensure this function is run in a separate thread. + @return None. + """ + + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.bind((self.host, self.port)) + self.server_socket.listen() + + print(f"Listener is waiting for connections on {self.host}:{self.port}") + + while True: + conn, addr = self.server_socket.accept() + thread = threading.Thread(target=self.handle_client, args=(conn, addr)) + thread.start() + + def handle_client(self, conn, addr): + + """ + @brief Handle communication with a connected client. + @param conn (socket.socket): The socket object for the connected client. + @param addr (tuple): The address (host, port) of the connected client. + @return None. + """ + + with conn: + print(f"Connected by {addr}") + + while True: + data = conn.recv(1024) + if not data: + break + decoded_data = data.decode() + print(f"Received data: {decoded_data}") + + # Call the callback function if set + if self.callback_message_received: + asyncio.run(self.callback_message_received(decoded_data)) + + def send_message(self, message): + + """ + @brief Send a message to the specified host and port. + @param message (str): The message to be sent. + @return None. + """ + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: + client_socket.connect((self.host, self.port)) + client_socket.sendall(message.encode()) + + def set_callback_message_received(self, callback): + + """ + @brief Set callback function when a message is received. + @param callback (function): The callback function. + @return None. + """ + + self.callback_message_received = callback + + def close_listener(self): + + """ + @brief Close the listener socket. + @return None. + """ + + if self.server_socket: + self.server_socket.close() \ No newline at end of file diff --git a/common/__pycache__/Message.cpython-39.pyc b/common/__pycache__/Message.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..bbfd2fe4d70c2148acc083930c08a6248d37333c GIT binary patch literal 3074 zcmai0-EZ4A5GN^FR-8EP+O%DVeGtQjM%a+F*vn81LyDkXi>=J|)6^!rHE2dnBlhKWyxD zlD_13rPk2{xivg$>_j8e%@qIaYBw7x-S`Lel2(j#@nP_>15G251ok7iSKARxI8O=e z;EEdbHQ|Xm^q%m=3iNf+5UbGpVofxmUlHqI$(g)fnlO$U9G;!HA8B3LRZJbIM0WTM zUMzLMMx6|XVEz8?cYI(^kfaazNF$ASKhZ{}Qt>DiT+38QwPh8>=(Fmi?pDi`&s~xE*JML6&Y;M;#vJ zei$ZcV#4q(;L^xSe2?NgK3lO>EHTC9@t`VPB042g!G)L3$zB! z>#!3_iJhyT5K4@X@#)o`69L>k?>YI+eNu6(oRItMm>x6J=&i!)8bp_M0HMFIV`t(_ zo2cRNws7rj_5K$5Mm1f;!zySq2Wmiu0XiOS(?h&Bmjnm;0bA>q_6qS zRF9MdMTzE}QEK@(jMXRw6+8lXVw?@KC>e-`}ykLx4U15d;9lp-ngqU9J!0Q zoX0=+iqq${p#m7@Ug3azr7#$_&%J$Xd*1BAh3m^O?k5sBYZ{#xxTWSwWU{D-c%Rejn!aIuPQYqZ_or-gefZA789swamVBx$z{3MGGzig9&|hA0Pus za>4*8%mRf@A!VLW1cd(R@E*W|oseS}xSWYQ;{p(>4dYG$gj1M5p$Nmnjqf3$y2K?< z97@u&*NE<11xVrh1GExY|DFccZR;Y4)?hG7lQ=@)@W+Yi0v5^;N&$%uJva~hPLT|d z0Wyib#9KvIriJ2zX9qpNqQwJZ_kR^)H29_&K`t#7;S3~HzKPl6Q;yjq;919Bg*T{s zHCUUw#pQq#9USoloJGcG?v<&Ai^xRRc`NYLc?eMMLJoSAdqTz;By10->XAN5<0Q)+ zxGzD?0!dwl74?plKn?j?At6HiIt3|p3HYu-)8~MY6_2hv4eC-xeHcNCx=>+h3A!=X zh3au&FoV)Qd@SdwXIGEegaI%}&t|GJOR=dpp;O3H2o`_}U4qK3C8!kj<=4|t*@Maj zs2CI#nnTjZnChX7lTHFIwRHtc90utntW;tPg{PXr@&SsL5%$UmwgO+_4OC~4oCT8C zOB5_*h&t9-s3^H7Au9?QIK`E(mm9X51)j_Db1Fvp8j4qy=r-)3G5C1t&(ZP5-SP)z zk!dMD7^_(w$6uzRj+a&Y`O984yefb5l>E3WF&kGN!VF!)^d-m6gg2{%US6HqLN&i2 z`Wo+2i}QdL{_-mgr#)CwB-Ly|@V8HRZy3&P^+D1NmZ7$&G10*Q+Dt)>CMPcKY7zg|R K1S%mt`RqS#78gta literal 0 HcmV?d00001 diff --git a/docker-compose.yml b/docker-compose.yml index 3c162b0..ecee4d9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,8 @@ services: - 49156:5000 networks: - 3lips + volumes: + - ./common:/app/common container_name: 3lips-api event: @@ -26,7 +28,7 @@ services: networks: - 3lips volumes: - - ./db:/app/db + - ./common:/app/common container_name: 3lips-event cesium-apache: @@ -37,6 +39,4 @@ services: image: cesium-apache networks: - 3lips - volumes: - - ./db:/app/db container_name: 3lips-cesium \ No newline at end of file diff --git a/event/event.py b/event/event.py index 36ff831..44157fd 100644 --- a/event/event.py +++ b/event/event.py @@ -3,37 +3,30 @@ import sqlite3 import requests import threading import socket -from datetime import datetime +import asyncio +import time from algorithm.associator.AdsbAssociator import AdsbAssociator from algorithm.coordreg.EllipseParametric import EllipseParametric -from util.Sqlite import Sqlite - -# init db -sqlite = Sqlite('./db/3lips.db') +from common.Message import Message # init event loop api = [] api_update = [] async def event(): - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print("Event triggered at: " + timestamp, flush=True) + + timestamp = int(time.time()*1000) + print("Event triggered at: " + str(timestamp), flush=True) # main event loop #for api_x in api: # request data from API - # response = requests.get(data_i["url_radar_detections"]) - # data_i["radar_detections"] = # API management - #if sqlite.table_exists('data'): # add new API requests - # rows = sqlite.fetch_all_rows("SELECT * FROM data") - # for row in rows: - # print(row) # delete old API requests @@ -47,28 +40,14 @@ async def main(): await asyncio.sleep(1) api = api_update -def start_event_listener(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: - server_socket.bind(('event', 12345)) - server_socket.listen() +# message received callback +async def callback_message_received(message): + print(f"Callback: Received message in event.py: {message}", flush=True) - print("Event listener is waiting for connections...") - - while True: - conn, addr = server_socket.accept() - thread = threading.Thread(target=handle_client, args=(conn, addr)) - thread.start() - -def handle_client(conn, addr): - with conn: - print(f"Connected by {addr}") - - while True: - data = conn.recv(1024) - if not data: - break - print(f"Received data: {data.decode()}") +# init messaging +message = Message('event', 6969) +message.set_callback_message_received(callback_message_received) if __name__ == "__main__": - threading.Thread(target=start_event_listener).start() + threading.Thread(target=message.start_listener).start() asyncio.run(main()) \ No newline at end of file