Message over TCP now in class form

This commit is contained in:
30hours 2024-02-09 14:47:13 +00:00
parent 63af00a036
commit f144d7a986
5 changed files with 124 additions and 52 deletions

View file

@ -6,16 +6,12 @@ import sqlite3
import requests
import time
import socket
import asyncio
from util.Sqlite import Sqlite
from common.Message import Message
app = Flask(__name__)
# init db
# sqlite = Sqlite('./db/3lips.db')
# schema = "api TEXT PRIMARY KEY, timestamp INTEGER NOT NULL"
# sqlite.create_table("data", schema)
# store state data
servers = [
{"name": "radar4", "url": "radar4.30hours.dev"},
@ -40,6 +36,9 @@ adsbs = [
{"name": "None", "url": ""}
]
# init messaging
message = Message('event', 6969)
@app.route("/")
def index():
return render_template("index.html", servers=servers, \
@ -55,9 +54,7 @@ def serve_static(file):
@app.route("/api")
def api():
api = request.query_string.decode('utf-8')
# timestamp = time.time()*1000
# sqlite.add_entry("data", api, timestamp)
send_data_to_event(api)
message.send_message(api)
urls = request.args.getlist("url")
data = [{"url": 'http://' + url} for url in urls]
@ -86,11 +83,5 @@ def serve_cesium_content(file):
print(f"Error fetching content from Apache server: {e}")
return Response('Error fetching content from Apache server', status=500, content_type='text/plain')
def send_data_to_event(data):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# Use the service name 'event' as specified in your Docker Compose file
s.connect(('event', 12345))
s.sendall(data.encode())
if __name__ == "__main__":
app.run(debug=True)

102
common/Message.py Normal file
View file

@ -0,0 +1,102 @@
"""
@file Message.py
@author 30hours
"""
import socket
import threading
import asyncio
class Message:
"""
@class Message
@brief A class for simple TCP messaging using a listener and sender.
"""
def __init__(self, host, port):
"""
@brief Constructor for Message.
@param host (str): The host to bind the listener to.
@param port (int): The port to bind the listener to.
"""
self.host = host
self.port = port
self.server_socket = None
self.callback_message_received = None
def start_listener(self):
"""
@brief Start the TCP listener to accept incoming connections.
@details Ensure this function is run in a separate thread.
@return None.
"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen()
print(f"Listener is waiting for connections on {self.host}:{self.port}")
while True:
conn, addr = self.server_socket.accept()
thread = threading.Thread(target=self.handle_client, args=(conn, addr))
thread.start()
def handle_client(self, conn, addr):
"""
@brief Handle communication with a connected client.
@param conn (socket.socket): The socket object for the connected client.
@param addr (tuple): The address (host, port) of the connected client.
@return None.
"""
with conn:
print(f"Connected by {addr}")
while True:
data = conn.recv(1024)
if not data:
break
decoded_data = data.decode()
print(f"Received data: {decoded_data}")
# Call the callback function if set
if self.callback_message_received:
asyncio.run(self.callback_message_received(decoded_data))
def send_message(self, message):
"""
@brief Send a message to the specified host and port.
@param message (str): The message to be sent.
@return None.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.connect((self.host, self.port))
client_socket.sendall(message.encode())
def set_callback_message_received(self, callback):
"""
@brief Set callback function when a message is received.
@param callback (function): The callback function.
@return None.
"""
self.callback_message_received = callback
def close_listener(self):
"""
@brief Close the listener socket.
@return None.
"""
if self.server_socket:
self.server_socket.close()

Binary file not shown.

View file

@ -15,6 +15,8 @@ services:
- 49156:5000
networks:
- 3lips
volumes:
- ./common:/app/common
container_name: 3lips-api
event:
@ -26,7 +28,7 @@ services:
networks:
- 3lips
volumes:
- ./db:/app/db
- ./common:/app/common
container_name: 3lips-event
cesium-apache:
@ -37,6 +39,4 @@ services:
image: cesium-apache
networks:
- 3lips
volumes:
- ./db:/app/db
container_name: 3lips-cesium

View file

@ -3,37 +3,30 @@ import sqlite3
import requests
import threading
import socket
from datetime import datetime
import asyncio
import time
from algorithm.associator.AdsbAssociator import AdsbAssociator
from algorithm.coordreg.EllipseParametric import EllipseParametric
from util.Sqlite import Sqlite
# init db
sqlite = Sqlite('./db/3lips.db')
from common.Message import Message
# init event loop
api = []
api_update = []
async def event():
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("Event triggered at: " + timestamp, flush=True)
timestamp = int(time.time()*1000)
print("Event triggered at: " + str(timestamp), flush=True)
# main event loop
#for api_x in api:
# request data from API
# response = requests.get(data_i["url_radar_detections"])
# data_i["radar_detections"] =
# API management
#if sqlite.table_exists('data'):
# add new API requests
# rows = sqlite.fetch_all_rows("SELECT * FROM data")
# for row in rows:
# print(row)
# delete old API requests
@ -47,28 +40,14 @@ async def main():
await asyncio.sleep(1)
api = api_update
def start_event_listener():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
server_socket.bind(('event', 12345))
server_socket.listen()
# message received callback
async def callback_message_received(message):
print(f"Callback: Received message in event.py: {message}", flush=True)
print("Event listener is waiting for connections...")
while True:
conn, addr = server_socket.accept()
thread = threading.Thread(target=handle_client, args=(conn, addr))
thread.start()
def handle_client(conn, addr):
with conn:
print(f"Connected by {addr}")
while True:
data = conn.recv(1024)
if not data:
break
print(f"Received data: {data.decode()}")
# init messaging
message = Message('event', 6969)
message.set_callback_message_received(callback_message_received)
if __name__ == "__main__":
threading.Thread(target=start_event_listener).start()
threading.Thread(target=message.start_listener).start()
asyncio.run(main())