diff --git a/api/main.py b/api/main.py index 094efef..1dcd6ca 100644 --- a/api/main.py +++ b/api/main.py @@ -6,16 +6,12 @@ import sqlite3 import requests import time import socket +import asyncio -from util.Sqlite import Sqlite +from common.Message import Message app = Flask(__name__) -# init db -# sqlite = Sqlite('./db/3lips.db') -# schema = "api TEXT PRIMARY KEY, timestamp INTEGER NOT NULL" -# sqlite.create_table("data", schema) - # store state data servers = [ {"name": "radar4", "url": "radar4.30hours.dev"}, @@ -40,6 +36,9 @@ adsbs = [ {"name": "None", "url": ""} ] +# init messaging +message = Message('event', 6969) + @app.route("/") def index(): return render_template("index.html", servers=servers, \ @@ -55,9 +54,7 @@ def serve_static(file): @app.route("/api") def api(): api = request.query_string.decode('utf-8') - # timestamp = time.time()*1000 - # sqlite.add_entry("data", api, timestamp) - send_data_to_event(api) + message.send_message(api) urls = request.args.getlist("url") data = [{"url": 'http://' + url} for url in urls] @@ -86,11 +83,5 @@ def serve_cesium_content(file): print(f"Error fetching content from Apache server: {e}") return Response('Error fetching content from Apache server', status=500, content_type='text/plain') -def send_data_to_event(data): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - # Use the service name 'event' as specified in your Docker Compose file - s.connect(('event', 12345)) - s.sendall(data.encode()) - if __name__ == "__main__": app.run(debug=True) diff --git a/common/Message.py b/common/Message.py new file mode 100644 index 0000000..0e0b071 --- /dev/null +++ b/common/Message.py @@ -0,0 +1,102 @@ +""" +@file Message.py +@author 30hours +""" + +import socket +import threading +import asyncio + +class Message: + + """ + @class Message + @brief A class for simple TCP messaging using a listener and sender. + """ + + def __init__(self, host, port): + + """ + @brief Constructor for Message. + @param host (str): The host to bind the listener to. + @param port (int): The port to bind the listener to. + """ + + self.host = host + self.port = port + self.server_socket = None + self.callback_message_received = None + + def start_listener(self): + + """ + @brief Start the TCP listener to accept incoming connections. + @details Ensure this function is run in a separate thread. + @return None. + """ + + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.bind((self.host, self.port)) + self.server_socket.listen() + + print(f"Listener is waiting for connections on {self.host}:{self.port}") + + while True: + conn, addr = self.server_socket.accept() + thread = threading.Thread(target=self.handle_client, args=(conn, addr)) + thread.start() + + def handle_client(self, conn, addr): + + """ + @brief Handle communication with a connected client. + @param conn (socket.socket): The socket object for the connected client. + @param addr (tuple): The address (host, port) of the connected client. + @return None. + """ + + with conn: + print(f"Connected by {addr}") + + while True: + data = conn.recv(1024) + if not data: + break + decoded_data = data.decode() + print(f"Received data: {decoded_data}") + + # Call the callback function if set + if self.callback_message_received: + asyncio.run(self.callback_message_received(decoded_data)) + + def send_message(self, message): + + """ + @brief Send a message to the specified host and port. + @param message (str): The message to be sent. + @return None. + """ + + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket: + client_socket.connect((self.host, self.port)) + client_socket.sendall(message.encode()) + + def set_callback_message_received(self, callback): + + """ + @brief Set callback function when a message is received. + @param callback (function): The callback function. + @return None. + """ + + self.callback_message_received = callback + + def close_listener(self): + + """ + @brief Close the listener socket. + @return None. + """ + + if self.server_socket: + self.server_socket.close() \ No newline at end of file diff --git a/common/__pycache__/Message.cpython-39.pyc b/common/__pycache__/Message.cpython-39.pyc new file mode 100644 index 0000000..bbfd2fe Binary files /dev/null and b/common/__pycache__/Message.cpython-39.pyc differ diff --git a/docker-compose.yml b/docker-compose.yml index 3c162b0..ecee4d9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,8 @@ services: - 49156:5000 networks: - 3lips + volumes: + - ./common:/app/common container_name: 3lips-api event: @@ -26,7 +28,7 @@ services: networks: - 3lips volumes: - - ./db:/app/db + - ./common:/app/common container_name: 3lips-event cesium-apache: @@ -37,6 +39,4 @@ services: image: cesium-apache networks: - 3lips - volumes: - - ./db:/app/db container_name: 3lips-cesium \ No newline at end of file diff --git a/event/event.py b/event/event.py index 36ff831..44157fd 100644 --- a/event/event.py +++ b/event/event.py @@ -3,37 +3,30 @@ import sqlite3 import requests import threading import socket -from datetime import datetime +import asyncio +import time from algorithm.associator.AdsbAssociator import AdsbAssociator from algorithm.coordreg.EllipseParametric import EllipseParametric -from util.Sqlite import Sqlite - -# init db -sqlite = Sqlite('./db/3lips.db') +from common.Message import Message # init event loop api = [] api_update = [] async def event(): - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - print("Event triggered at: " + timestamp, flush=True) + + timestamp = int(time.time()*1000) + print("Event triggered at: " + str(timestamp), flush=True) # main event loop #for api_x in api: # request data from API - # response = requests.get(data_i["url_radar_detections"]) - # data_i["radar_detections"] = # API management - #if sqlite.table_exists('data'): # add new API requests - # rows = sqlite.fetch_all_rows("SELECT * FROM data") - # for row in rows: - # print(row) # delete old API requests @@ -47,28 +40,14 @@ async def main(): await asyncio.sleep(1) api = api_update -def start_event_listener(): - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: - server_socket.bind(('event', 12345)) - server_socket.listen() +# message received callback +async def callback_message_received(message): + print(f"Callback: Received message in event.py: {message}", flush=True) - print("Event listener is waiting for connections...") - - while True: - conn, addr = server_socket.accept() - thread = threading.Thread(target=handle_client, args=(conn, addr)) - thread.start() - -def handle_client(conn, addr): - with conn: - print(f"Connected by {addr}") - - while True: - data = conn.recv(1024) - if not data: - break - print(f"Received data: {data.decode()}") +# init messaging +message = Message('event', 6969) +message.set_callback_message_received(callback_message_received) if __name__ == "__main__": - threading.Thread(target=start_event_listener).start() + threading.Thread(target=message.start_listener).start() asyncio.run(main()) \ No newline at end of file