Fix indentation

This commit is contained in:
30hours 2024-03-18 10:55:12 +00:00
parent ad19fe9754
commit 543d5dcb60
17 changed files with 1504 additions and 1531 deletions

View file

@ -17,13 +17,13 @@ app = Flask(__name__)
# init config file
try:
with open('config/config.yml', 'r') as file:
config = yaml.safe_load(file)
radar_data = config['radar']
with open('config/config.yml', 'r') as file:
config = yaml.safe_load(file)
radar_data = config['radar']
except FileNotFoundError:
print("Error: Configuration file not found.")
print("Error: Configuration file not found.")
except yaml.YAMLError as e:
print("Error reading YAML configuration:", e)
print("Error reading YAML configuration:", e)
# store state data
servers = []
@ -57,71 +57,71 @@ valid['adsbs'] = [item['url'] for item in adsbs]
# message received callback
async def callback_message_received(msg):
print(f"Callback: Received message in main.py: {msg}", flush=True)
print(f"Callback: Received message in main.py: {msg}", flush=True)
# init messaging
message_api_request = Message('event', 6969)
@app.route("/")
def index():
return render_template("index.html", servers=servers, \
associators=associators, localisations=localisations, adsbs=adsbs)
return render_template("index.html", servers=servers, \
associators=associators, localisations=localisations, adsbs=adsbs)
# serve static files from the /app/public folder
@app.route('/public/<path:file>')
def serve_static(file):
base_dir = os.path.abspath(os.path.dirname(__file__))
public_folder = os.path.join(base_dir, 'public')
return send_from_directory(public_folder, file)
base_dir = os.path.abspath(os.path.dirname(__file__))
public_folder = os.path.join(base_dir, 'public')
return send_from_directory(public_folder, file)
@app.route("/api")
def api():
api = request.query_string.decode('utf-8')
# input protection
servers_api = request.args.getlist('server')
associators_api = request.args.getlist('associator')
localisations_api = request.args.getlist('localisation')
adsbs_api = request.args.getlist('adsb')
if not all(item in valid['servers'] for item in servers_api):
return 'Invalid server'
if not all(item in valid['associators'] for item in associators_api):
return 'Invalid associator'
if not all(item in valid['localisations'] for item in localisations_api):
return 'Invalid localisation'
if not all(item in valid['adsbs'] for item in adsbs_api):
return 'Invalid ADSB']
# send to event handler
try:
reply_chunks = message_api_request.send_message(api)
reply = ''.join(reply_chunks)
print(reply, flush=True)
return reply
except Exception as e:
reply = "Exception: " + str(e)
return jsonify(error=reply), 500
api = request.query_string.decode('utf-8')
# input protection
servers_api = request.args.getlist('server')
associators_api = request.args.getlist('associator')
localisations_api = request.args.getlist('localisation')
adsbs_api = request.args.getlist('adsb')
if not all(item in valid['servers'] for item in servers_api):
return 'Invalid server'
if not all(item in valid['associators'] for item in associators_api):
return 'Invalid associator'
if not all(item in valid['localisations'] for item in localisations_api):
return 'Invalid localisation'
if not all(item in valid['adsbs'] for item in adsbs_api):
return 'Invalid ADSB'
# send to event handler
try:
reply_chunks = message_api_request.send_message(api)
reply = ''.join(reply_chunks)
print(reply, flush=True)
return reply
except Exception as e:
reply = "Exception: " + str(e)
return jsonify(error=reply), 500
@app.route("/map/<path:file>")
def serve_map(file):
base_dir = os.path.abspath(os.path.dirname(__file__))
public_folder = os.path.join(base_dir, 'map')
return send_from_directory(public_folder, file)
base_dir = os.path.abspath(os.path.dirname(__file__))
public_folder = os.path.join(base_dir, 'map')
return send_from_directory(public_folder, file)
# handle /cesium/ specifically
@app.route('/cesium/')
def serve_cesium_index():
return redirect('/cesium/index.html')
return redirect('/cesium/index.html')
@app.route('/cesium/<path:file>')
def serve_cesium_content(file):
apache_url = 'http://cesium-apache/' + file
try:
response = requests.get(apache_url)
if response.status_code == 200:
return Response(response.content, content_type=response.headers['content-type'])
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Error fetching content from Apache server: {e}")
return Response('Error fetching content from Apache server', status=500, content_type='text/plain')
apache_url = 'http://cesium-apache/' + file
try:
response = requests.get(apache_url)
if response.status_code == 200:
return Response(response.content, content_type=response.headers['content-type'])
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"Error fetching content from Apache server: {e}")
return Response('Error fetching content from Apache server', status=500, content_type='text/plain')
if __name__ == "__main__":
app.run()
app.run()

View file

@ -1,25 +1,24 @@
function event_adsb() {
fetch(adsb_url)
.then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');
}
return response.json();
})
.then(data => {
// Update aircraft points based on new data
updateAircraftPoints(data);
})
.catch(error => {
// Handle errors during fetch
console.error('Error during fetch:', error);
})
.finally(() => {
// Schedule the next fetch after a delay (e.g., 5 seconds)
setTimeout(event_adsb, 1000);
});
.then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');
}
return response.json();
})
.then(data => {
// Update aircraft points based on new data
updateAircraftPoints(data);
})
.catch(error => {
// Handle errors during fetch
console.error('Error during fetch:', error);
})
.finally(() => {
// Schedule the next fetch after a delay (e.g., 5 seconds)
setTimeout(event_adsb, 1000);
});
}
// Function to update aircraft points

View file

@ -4,52 +4,52 @@ function event_ellipsoid() {
'/api' + window.location.search;
fetch(radar_url)
.then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');
}
return response.json();
})
.then(data => {
if (!data["ellipsoids"]) {
return;
}
if (Object.keys(data["ellipsoids"]).length !== 0) {
removeEntitiesByType("ellipsoids");
}
else {
removeEntitiesOlderThanAndFade("ellipsoids", 10, 0.5);
}
for (const key in data["ellipsoids"]) {
if (data["ellipsoids"].hasOwnProperty(key)) {
const points = data["ellipsoids"][key];
for (const point in points) {
addPoint(
points[point][0],
points[point][1],
points[point][2],
"ellipsoids",
style_ellipsoid.color,
style_ellipsoid.pointSize,
style_ellipsoid.type,
Date.now()
);
}
.then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');
}
}
})
.catch(error => {
// Handle errors during fetch
console.error('Error during fetch:', error);
})
.finally(() => {
// Schedule the next fetch after a delay (e.g., 5 seconds)
setTimeout(event_ellipsoid, 1000);
});
return response.json();
})
.then(data => {
if (!data["ellipsoids"]) {
return;
}
if (Object.keys(data["ellipsoids"]).length !== 0) {
removeEntitiesByType("ellipsoids");
}
else {
removeEntitiesOlderThanAndFade("ellipsoids", 10, 0.5);
}
for (const key in data["ellipsoids"]) {
if (data["ellipsoids"].hasOwnProperty(key)) {
const points = data["ellipsoids"][key];
for (const point in points) {
addPoint(
points[point][0],
points[point][1],
points[point][2],
"ellipsoids",
style_ellipsoid.color,
style_ellipsoid.pointSize,
style_ellipsoid.type,
Date.now()
);
}
}
}
})
.catch(error => {
// Handle errors during fetch
console.error('Error during fetch:', error);
})
.finally(() => {
// Schedule the next fetch after a delay (e.g., 5 seconds)
setTimeout(event_ellipsoid, 1000);
});
}

View file

@ -4,49 +4,49 @@ function event_radar() {
'/api' + window.location.search;
fetch(radar_url)
.then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');
}
return response.json();
})
.then(data => {
if (!data["detections_localised"]) {
return;
}
removeEntitiesOlderThanAndFade("detection", 10, 0.5);
for (const key in data["detections_localised"]) {
if (data["detections_localised"].hasOwnProperty(key)) {
const target = data["detections_localised"][key];
const points = target["points"];
for (const point in points) {
addPoint(
points[point][0],
points[point][1],
points[point][2],
"detection",
style_point.color,
style_point.pointSize,
style_point.type,
Date.now()
);
}
.then(response => {
if (!response.ok) {
throw new Error('Network response was not ok');
}
}
})
.catch(error => {
// Handle errors during fetch
console.error('Error during fetch:', error);
})
.finally(() => {
// Schedule the next fetch after a delay (e.g., 5 seconds)
setTimeout(event_radar, 1000);
});
return response.json();
})
.then(data => {
if (!data["detections_localised"]) {
return;
}
removeEntitiesOlderThanAndFade("detection", 10, 0.5);
for (const key in data["detections_localised"]) {
if (data["detections_localised"].hasOwnProperty(key)) {
const target = data["detections_localised"][key];
const points = target["points"];
for (const point in points) {
addPoint(
points[point][0],
points[point][1],
points[point][2],
"detection",
style_point.color,
style_point.pointSize,
style_point.type,
Date.now()
);
}
}
}
})
.catch(error => {
// Handle errors during fetch
console.error('Error during fetch:', error);
})
.finally(() => {
// Schedule the next fetch after a delay (e.g., 5 seconds)
setTimeout(event_radar, 1000);
});
}

View file

@ -37,7 +37,7 @@ function toggle_button(button) {
}
// redirect to map with REST API
document.getElementById('buttonMap').addEventListener('click', function() {
document.getElementById('buttonMap').addEventListener('click', function () {
// Get the form values
var servers = document.querySelectorAll('.toggle-button.active');
var associator = document.querySelector('[name="associator"]').value;

View file

@ -9,111 +9,115 @@ import asyncio
class Message:
"""
@class Message
@brief A class for simple TCP messaging using a listener and sender.
"""
def __init__(self, host, port):
"""
@class Message
@brief A class for simple TCP messaging using a listener and sender.
@brief Constructor for Message.
@param host (str): The host to bind the listener to.
@param port (int): The port to bind the listener to.
"""
def __init__(self, host, port):
self.host = host
self.port = port
self.server_socket = None
self.callback_message_received = None
"""
@brief Constructor for Message.
@param host (str): The host to bind the listener to.
@param port (int): The port to bind the listener to.
"""
def start_listener(self):
self.host = host
self.port = port
self.server_socket = None
self.callback_message_received = None
"""
@brief Start the TCP listener to accept incoming connections.
@details Ensure this function is run in a separate thread.
@return None.
"""
def start_listener(self):
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen()
"""
@brief Start the TCP listener to accept incoming connections.
@details Ensure this function is run in a separate thread.
@return None.
"""
print(f"Listener is waiting for connections on {self.host}:{self.port}")
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen()
while True:
conn, addr = self.server_socket.accept()
thread = threading.Thread(target=self.handle_client, args=(conn, addr))
thread.start()
print(f"Listener is waiting for connections on {self.host}:{self.port}")
def handle_client(self, conn, addr):
"""
@brief Handle communication with a connected client.
@param conn (socket.socket): The socket object for the connected client.
@param addr (tuple): The address (host, port) of the connected client.
@return None.
"""
with conn:
while True:
conn, addr = self.server_socket.accept()
thread = threading.Thread(target=self.handle_client, args=(conn, addr))
thread.start()
data = conn.recv(8096)
if not data:
break
def handle_client(self, conn, addr):
"""
Handle communication with a connected client.
:param conn (socket.socket): The socket object for the connected client.
:param addr (tuple): The address (host, port) of the connected client.
:return None.
"""
with conn:
while True:
# Process data in chunks
decoded_data = ""
while data:
chunk = data.decode()
decoded_data += chunk
data = conn.recv(8096)
# Call the callback function if set
if self.callback_message_received:
reply = asyncio.run(self.callback_message_received(decoded_data))
if reply:
# Send the reply in chunks
for i in range(0, len(reply), 8096):
conn.sendall(reply[i:i + 8096].encode())
def send_message(self, message):
"""
@brief Send a message to the specified host and port.
@param message (str): The message to be sent.
@return generator: A generator yielding chunks of the reply.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.settimeout(3)
try:
client_socket.connect((self.host, self.port))
# Send the message in chunks
for i in range(0, len(message), 8096):
client_socket.sendall(message[i:i + 8096].encode())
# Indicate the end of transmission
client_socket.shutdown(socket.SHUT_WR)
# Receive the reply in chunks
while True:
data = client_socket.recv(8096)
if not data:
break
yield data.decode()
except ConnectionRefusedError:
print(f"Connection to {self.host}:{self.port} refused.")
# Process data in chunks
decoded_data = ""
while data:
chunk = data.decode()
decoded_data += chunk
data = conn.recv(8096)
def set_callback_message_received(self, callback):
# Call the callback function if set
if self.callback_message_received:
reply = asyncio.run(self.callback_message_received(decoded_data))
if reply:
# Send the reply in chunks
for i in range(0, len(reply), 8096):
conn.sendall(reply[i:i + 8096].encode())
"""
@brief Set callback function when a message is received.
@param callback (function): The callback function.
@return None.
"""
def send_message(self, message):
"""
Send a message to the specified host and port.
:param message (str): The message to be sent.
:return generator: A generator yielding chunks of the reply.
"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:
client_socket.settimeout(3)
try:
client_socket.connect((self.host, self.port))
# Send the message in chunks
for i in range(0, len(message), 8096):
client_socket.sendall(message[i:i + 8096].encode())
# Indicate the end of transmission
client_socket.shutdown(socket.SHUT_WR)
# Receive the reply in chunks
while True:
data = client_socket.recv(8096)
if not data:
break
yield data.decode()
except ConnectionRefusedError:
print(f"Connection to {self.host}:{self.port} refused.")
self.callback_message_received = callback
def set_callback_message_received(self, callback):
def close_listener(self):
"""
@brief Set callback function when a message is received.
@param callback (function): The callback function.
@return None.
"""
"""
@brief Close the listener socket.
@return None.
"""
self.callback_message_received = callback
def close_listener(self):
"""
@brief Close the listener socket.
@return None.
"""
if self.server_socket:
self.server_socket.close()
if self.server_socket:
self.server_socket.close()

View file

@ -8,168 +8,168 @@ import math
class AdsbAssociator:
"""
@class AdsbAssociator
@brief A class for associating detections of the same target.
@details First associate ADS-B truth with each radar detection.
Then associate over multiple radars.
@see blah2 at https://github.com/30hours/blah2.
Uses truth data in delay-Doppler space from an adsb2dd server.
@see adsb2dd at https://github.com/30hours/adsb2dd.
@todo Add adjustable window for associating truth/detections.
"""
def __init__(self):
"""
@class AdsbAssociator
@brief A class for associating detections of the same target.
@details First associate ADS-B truth with each radar detection.
Then associate over multiple radars.
@see blah2 at https://github.com/30hours/blah2.
Uses truth data in delay-Doppler space from an adsb2dd server.
@brief Constructor for the AdsbAssociator class.
"""
def process(self, radar_list, radar_data, timestamp):
"""
@brief Associate detections from 2+ radars.
@param radar_list (list): List of radars to associate.
@param radar_data (dict): Radar data for list of radars.
@param timestamp (int): Timestamp to compute delays at (ms).
@return dict: Associated detections by [hex][radar].
"""
assoc_detections = {}
assoc_detections_radar = []
for radar in radar_list:
valid_config = radar_data[radar]["config"] is not None
valid_detection = radar_data[radar]["detection"] is not None
if valid_config and valid_detection:
# get URL for adsb2truth
url = self.generate_api_url(radar, radar_data[radar])
# get ADSB detections
try:
response = requests.get(url, timeout=1)
response.raise_for_status()
data = response.json()
adsb_detections = data
except requests.exceptions.RequestException as e:
print(f"Error fetching data from {url}: {e}")
adsb_detections = None
continue
# associate radar and truth
assoc_detections_radar.append(self.process_1_radar(
radar, radar_data[radar]["detection"],
adsb_detections, timestamp, radar_data[radar]["config"]["capture"]["fc"]))
# associate detections between radars
output = {}
for entry in assoc_detections_radar:
for key, value in entry.items():
if key not in output:
output[key] = [value]
else:
output[key].append(value)
#output = {key: values for key, values in output.items() if len(values) > 1}
return output
def process_1_radar(self, radar, radar_detections, adsb_detections, timestamp, fc):
"""
@brief Associate detections between 1 radar/truth pair.
@details Output 1 detection per truth point.
@param radar (str): Name of radar to process.
@param radar_detections (dict): blah2 radar detections.
@param adsb_detections (dict): adsb2dd truth detections.
@return dict: Associated detections.
"""
assoc_detections = {}
distance_window = 10
for aircraft in adsb_detections:
if 'delay' in radar_detections:
if 'delay' in adsb_detections[aircraft] and len(radar_detections['delay']) >= 1:
# extrapolate delay to current time
# TODO extrapolate Doppler too
for i in range(len(radar_detections['delay'])):
delta_t = (timestamp - radar_detections['timestamp'])/1000
delay = (1000*radar_detections['delay'][i] + \
(-radar_detections['doppler'][i]*(299792458/fc))*delta_t)/1000
radar_detections['delay'][i] = delay
# distance from aircraft to all detections
closest_point, distance = self.closest_point(
adsb_detections[aircraft]['delay'],
adsb_detections[aircraft]['doppler'],
radar_detections['delay'],
radar_detections['doppler']
)
if distance < distance_window:
assoc_detections[aircraft] = {
'radar': radar,
'delay': closest_point[0],
'doppler': closest_point[1],
'timestamp': adsb_detections[aircraft]['timestamp']
}
return assoc_detections
def generate_api_url(self, radar, radar_data):
"""
@brief Generate an adsb2dd API endpoint for each radar.
@see adsb2dd at https://github.com/30hours/adsb2dd.
@todo Add adjustable window for associating truth/detections.
@param radar (str): Radar to run adsb2dd.
@param radar_data (dict): Radar data for this radar.
@return str: adsb2dd API for radar.
"""
def __init__(self):
rx_lat = radar_data['config']['location']['rx']['latitude']
rx_lon = radar_data['config']['location']['rx']['longitude']
rx_alt = radar_data['config']['location']['rx']['altitude']
tx_lat = radar_data['config']['location']['tx']['latitude']
tx_lon = radar_data['config']['location']['tx']['longitude']
tx_alt = radar_data['config']['location']['tx']['altitude']
fc = radar_data['config']['capture']['fc']
"""
@brief Constructor for the AdsbAssociator class.
"""
adsb = radar_data['config']['truth']['adsb']['tar1090']
def process(self, radar_list, radar_data, timestamp):
api_url = "http://adsb2dd.30hours.dev/api/dd"
"""
@brief Associate detections from 2+ radars.
@param radar_list (list): List of radars to associate.
@param radar_data (dict): Radar data for list of radars.
@param timestamp (int): Timestamp to compute delays at (ms).
@return dict: Associated detections by [hex][radar].
"""
api_query = (
api_url +
"?rx=" + str(rx_lat) + "," +
str(rx_lon) + "," +
str(rx_alt) +
"&tx=" + str(tx_lat) + "," +
str(tx_lon) + "," +
str(tx_alt) +
"&fc=" + str(fc/1000000) +
"&server=" + "http://" + str(adsb)
)
assoc_detections = {}
assoc_detections_radar = []
return api_query
for radar in radar_list:
def closest_point(self, x1, y1, x_coords, y_coords):
valid_config = radar_data[radar]["config"] is not None
valid_detection = radar_data[radar]["detection"] is not None
x1, y1 = float(x1), float(y1)
x_coords = [float(x) for x in x_coords]
y_coords = [float(y) for y in y_coords]
if valid_config and valid_detection:
distances = [math.sqrt((x - x1)**2 + (y - y1)**2) for x, y in zip(x_coords, y_coords)]
min_distance_index = distances.index(min(distances))
# get URL for adsb2truth
url = self.generate_api_url(radar, radar_data[radar])
closest_x = x_coords[min_distance_index]
closest_y = y_coords[min_distance_index]
distance = distances[min_distance_index]
# get ADSB detections
try:
response = requests.get(url, timeout=1)
response.raise_for_status()
data = response.json()
adsb_detections = data
except requests.exceptions.RequestException as e:
print(f"Error fetching data from {url}: {e}")
adsb_detections = None
continue
# associate radar and truth
assoc_detections_radar.append(self.process_1_radar(
radar, radar_data[radar]["detection"],
adsb_detections, timestamp, radar_data[radar]["config"]["capture"]["fc"]))
# associate detections between radars
output = {}
for entry in assoc_detections_radar:
for key, value in entry.items():
if key not in output:
output[key] = [value]
else:
output[key].append(value)
#output = {key: values for key, values in output.items() if len(values) > 1}
return output
def process_1_radar(self, radar, radar_detections, adsb_detections, timestamp, fc):
"""
@brief Associate detections between 1 radar/truth pair.
@details Output 1 detection per truth point.
@param radar (str): Name of radar to process.
@param radar_detections (dict): blah2 radar detections.
@param adsb_detections (dict): adsb2dd truth detections.
@return dict: Associated detections.
"""
assoc_detections = {}
distance_window = 10
for aircraft in adsb_detections:
if 'delay' in radar_detections:
if 'delay' in adsb_detections[aircraft] and len(radar_detections['delay']) >= 1:
# extrapolate delay to current time
# TODO extrapolate Doppler too
for i in range(len(radar_detections['delay'])):
delta_t = (timestamp - radar_detections['timestamp'])/1000
delay = (1000*radar_detections['delay'][i] + \
(-radar_detections['doppler'][i]*(299792458/fc))*delta_t)/1000
radar_detections['delay'][i] = delay
# distance from aircraft to all detections
closest_point, distance = self.closest_point(
adsb_detections[aircraft]['delay'],
adsb_detections[aircraft]['doppler'],
radar_detections['delay'],
radar_detections['doppler']
)
if distance < distance_window:
assoc_detections[aircraft] = {
'radar': radar,
'delay': closest_point[0],
'doppler': closest_point[1],
'timestamp': adsb_detections[aircraft]['timestamp']
}
return assoc_detections
def generate_api_url(self, radar, radar_data):
"""
@brief Generate an adsb2dd API endpoint for each radar.
@see adsb2dd at https://github.com/30hours/adsb2dd.
@param radar (str): Radar to run adsb2dd.
@param radar_data (dict): Radar data for this radar.
@return str: adsb2dd API for radar.
"""
rx_lat = radar_data['config']['location']['rx']['latitude']
rx_lon = radar_data['config']['location']['rx']['longitude']
rx_alt = radar_data['config']['location']['rx']['altitude']
tx_lat = radar_data['config']['location']['tx']['latitude']
tx_lon = radar_data['config']['location']['tx']['longitude']
tx_alt = radar_data['config']['location']['tx']['altitude']
fc = radar_data['config']['capture']['fc']
adsb = radar_data['config']['truth']['adsb']['tar1090']
api_url = "http://adsb2dd.30hours.dev/api/dd"
api_query = (
api_url +
"?rx=" + str(rx_lat) + "," +
str(rx_lon) + "," +
str(rx_alt) +
"&tx=" + str(tx_lat) + "," +
str(tx_lon) + "," +
str(tx_alt) +
"&fc=" + str(fc/1000000) +
"&server=" + "http://" + str(adsb)
)
return api_query
def closest_point(self, x1, y1, x_coords, y_coords):
x1, y1 = float(x1), float(y1)
x_coords = [float(x) for x in x_coords]
y_coords = [float(y) for y in y_coords]
distances = [math.sqrt((x - x1)**2 + (y - y1)**2) for x, y in zip(x_coords, y_coords)]
min_distance_index = distances.index(min(distances))
closest_x = x_coords[min_distance_index]
closest_y = y_coords[min_distance_index]
distance = distances[min_distance_index]
return [closest_x, closest_y], distance
return [closest_x, closest_y], distance

View file

@ -7,189 +7,198 @@ import math
class Geometry:
"""
@class Geometry
@brief A class to store geometric functions.
@details Assumes WGS-84 ellipsoid for all functions.
"""
def __init__(self):
"""
@class Geometry
@brief A class to store geometric functions.
@details Assumes WGS-84 ellipsoid for all functions.
@brief Constructor for the Geometry class.
"""
def __init__(self):
def lla2ecef(lat, lon, alt):
"""
@brief Constructor for the Geometry class.
"""
"""
@brief Converts geodetic coordinates (latitude, longitude, altitude) to ECEF coordinates.
@param lat (float): Geodetic latitude in degrees.
@param lon (float): Geodetic longitude in degrees.
@param alt (float): Altitude above the ellipsoid in meters.
@return ecef_x (float): ECEF x-coordinate in meters.
@return ecef_y (float): ECEF y-coordinate in meters.
@return ecef_z (float): ECEF z-coordinate in meters.
"""
def lla2ecef(lat, lon, alt):
# WGS84 constants
a = 6378137.0 # semi-major axis in meters
f = 1 / 298.257223563 # flattening
e = 0.081819190842622
# WGS84 constants
a = 6378137.0 # semi-major axis in meters
f = 1 / 298.257223563 # flattening
e = 0.081819190842622
lat_rad = math.radians(lat)
lon_rad = math.radians(lon)
# Convert latitude and longitude to radians
lat_rad = math.radians(lat)
lon_rad = math.radians(lon)
cos_lat = math.cos(lat_rad)
sin_lat = math.sin(lat_rad)
N = a / math.sqrt(1 - f * (2 - f) * sin_lat**2)
# Calculate the auxiliary values
cos_lat = math.cos(lat_rad)
sin_lat = math.sin(lat_rad)
N = a / math.sqrt(1 - f * (2 - f) * sin_lat**2)
# calculate ECEF coordinates
ecef_x = (N + alt) * cos_lat * math.cos(lon_rad)
ecef_y = (N + alt) * cos_lat * math.sin(lon_rad)
ecef_z = ((1-(e**2)) * N + alt) * sin_lat
# Calculate ECEF coordinates
ecef_x = (N + alt) * cos_lat * math.cos(lon_rad)
ecef_y = (N + alt) * cos_lat * math.sin(lon_rad)
ecef_z = ((1-(e**2)) * N + alt) * sin_lat
return ecef_x, ecef_y, ecef_z
return ecef_x, ecef_y, ecef_z
def ecef2lla(x, y, z):
def ecef2lla(x, y, z):
# WGS84 ellipsoid constants:
a = 6378137
e = 8.1819190842622e-2
"""
@brief Converts ECEF coordinates to geodetic coordinates (latitude, longitude, altitude).
@param x (float): ECEF x-coordinate in meters.
@param y (float): ECEF y-coordinate in meters.
@param z (float): ECEF z-coordinate in meters.
@return lat (float): Geodetic latitude in degrees.
@return lon (float): Geodetic longitude in degrees.
@return alt (float): Altitude above the ellipsoid in meters.
"""
# Calculations:
b = math.sqrt(a**2 * (1 - e**2))
ep = math.sqrt((a**2 - b**2) / b**2)
p = math.sqrt(x**2 + y**2)
th = math.atan2(a * z, b * p)
lon = math.atan2(y, x)
lat = math.atan2((z + ep**2 * b * math.sin(th)**3), (p - e**2 * a * math.cos(th)**3))
N = a / math.sqrt(1 - e**2 * math.sin(lat)**2)
alt = p / math.cos(lat) - N
# WGS84 ellipsoid constants:
a = 6378137
e = 8.1819190842622e-2
# Return lon in range [0, 2*pi)
lon = lon % (2 * math.pi)
b = math.sqrt(a**2 * (1 - e**2))
ep = math.sqrt((a**2 - b**2) / b**2)
p = math.sqrt(x**2 + y**2)
th = math.atan2(a * z, b * p)
lon = math.atan2(y, x)
lat = math.atan2((z + ep**2 * b * math.sin(th)**3), (p - e**2 * a * math.cos(th)**3))
N = a / math.sqrt(1 - e**2 * math.sin(lat)**2)
alt = p / math.cos(lat) - N
# Correct for numerical instability in altitude near exact poles:
# (after this correction, error is about 2 millimeters, which is about
# the same as the numerical precision of the overall function)
k = abs(x) < 1e-10 and abs(y) < 1e-10
alt = abs(z) - b if k else alt
# return lon in range [0, 2*pi)
lon = lon % (2 * math.pi)
lat = math.degrees(lat)
lon = math.degrees(lon)
# correct for numerical instability in altitude near exact poles:
k = abs(x) < 1e-10 and abs(y) < 1e-10
alt = abs(z) - b if k else alt
return lat, lon, alt
lat = math.degrees(lat)
lon = math.degrees(lon)
def enu2ecef(e1, n1, u1, lat, lon, alt):
"""
ENU to ECEF
return lat, lon, alt
Parameters
----------
def enu2ecef(e1, n1, u1, lat, lon, alt):
e1 : float
target east ENU coordinate (meters)
n1 : float
target north ENU coordinate (meters)
u1 : float
target up ENU coordinate (meters)
lat0 : float
Observer geodetic latitude
lon0 : float
Observer geodetic longitude
h0 : float
observer altitude above geodetic ellipsoid (meters)
"""
@brief Converts East-North-Up (ENU) coordinates to ECEF coordinates.
@param e1 (float): Target east ENU coordinate in meters.
@param n1 (float): Target north ENU coordinate in meters.
@param u1 (float): Target up ENU coordinate in meters.
@param lat (float): Observer geodetic latitude in degrees.
@param lon (float): Observer geodetic longitude in degrees.
@param alt (float): Observer geodetic altitude in meters.
@return x (float): Target x ECEF coordinate in meters.
@return y (float): Target y ECEF coordinate in meters.
@return z (float): Target z ECEF coordinate in meters.
"""
x0, y0, z0 = Geometry.lla2ecef(lat, lon, alt)
dx, dy, dz = Geometry.enu2uvw(e1, n1, u1, lat, lon)
Results
-------
x : float
target x ECEF coordinate (meters)
y : float
target y ECEF coordinate (meters)
z : float
target z ECEF coordinate (meters)
"""
x0, y0, z0 = Geometry.lla2ecef(lat, lon, alt)
dx, dy, dz = Geometry.enu2uvw(e1, n1, u1, lat, lon)
return x0 + dx, y0 + dy, z0 + dz
return x0 + dx, y0 + dy, z0 + dz
def enu2uvw(east, north, up, lat, lon):
def enu2uvw(east, north, up, lat, lon):
"""
Parameters
----------
"""
@brief Converts East-North-Up (ENU) coordinates to UVW coordinates.
@param east (float): Target east ENU coordinate in meters.
@param north (float): Target north ENU coordinate in meters.
@param up (float): Target up ENU coordinate in meters.
@param lat (float): Observer geodetic latitude in degrees.
@param lon (float): Observer geodetic longitude in degrees.
@return u (float): Target u coordinate in meters.
@return v (float): Target v coordinate in meters.
@return w (float): Target w coordinate in meters.
"""
e1 : float
target east ENU coordinate (meters)
n1 : float
target north ENU coordinate (meters)
u1 : float
target up ENU coordinate (meters)
lat = math.radians(lat)
lon = math.radians(lon)
Results
-------
t = math.cos(lat) * up - math.sin(lat) * north
w = math.sin(lat) * up + math.cos(lat) * north
u : float
v : float
w : float
"""
u = math.cos(lon) * t - math.sin(lon) * east
v = math.sin(lon) * t + math.cos(lon) * east
lat = math.radians(lat)
lon = math.radians(lon)
return u, v, w
t = math.cos(lat) * up - math.sin(lat) * north
w = math.sin(lat) * up + math.cos(lat) * north
def ecef2enu(x, y, z, lat, lon, alt):
u = math.cos(lon) * t - math.sin(lon) * east
v = math.sin(lon) * t + math.cos(lon) * east
"""
@brief Converts ECEF coordinates to East-North-Up (ENU) coordinates.
@param x (float): Target x ECEF coordinate in meters.
@param y (float): Target y ECEF coordinate in meters.
@param z (float): Target z ECEF coordinate in meters.
@param lat (float): Observer geodetic latitude in degrees.
@param lon (float): Observer geodetic longitude in degrees.
@param alt (float): Observer geodetic altitude in meters.
@return east (float): Target east ENU coordinate in meters.
@return north (float): Target north ENU coordinate in meters.
@return up (float): Target up ENU coordinate in meters.
"""
return u, v, w
x0, y0, z0 = Geometry.lla2ecef(lat, lon, alt)
return Geometry.uvw2enu(x - x0, y - y0, z - z0, lat, lon)
def ecef2enu(x, y, z, lat, lon, alt):
def uvw2enu(u, v, w, lat, lon):
"""
@brief From observer to target, ECEF => ENU.
@param x (float): Target x ECEF coordinate (m).
@param y (float): Target y ECEF coordinate (m).
@param z (float): Target z ECEF coordinate (m).
@param lat (float): Observer geodetic latitude (deg).
@param lon (float): Observer geodetic longitude (deg).
@param alt (float): Observer geodetic altituder (m).
@return east (float): Target east ENU coordinate (m).
@return north (float): Target north ENU coordinate (m).
@return up (float): Target up ENU coordinate (m).
"""
"""
@brief Converts UVW coordinates to East-North-Up (ENU) coordinates.
@param u (float): Shifted ECEF coordinate in the u-direction (m).
@param v (float): Shifted ECEF coordinate in the v-direction (m).
@param w (float): Shifted ECEF coordinate in the w-direction (m).
@param lat (float): Observer geodetic latitude in degrees.
@param lon (float): Observer geodetic longitude in degrees.
@return e (float): Target east ENU coordinate in meters.
@return n (float): Target north ENU coordinate in meters.
@return u (float): Target up ENU coordinate in meters.
"""
x0, y0, z0 = Geometry.lla2ecef(lat, lon, alt)
return Geometry.uvw2enu(x - x0, y - y0, z - z0, lat, lon)
lat = math.radians(lat)
lon = math.radians(lon)
def uvw2enu(u, v, w, lat, lon):
cos_lat = math.cos(lat)
sin_lat = math.sin(lat)
cos_lon = math.cos(lon)
sin_lon = math.sin(lon)
"""
@brief
@param u (float): Shifted ECEF coordinate (m).
@param v (float): Shifted ECEF coordinate (m).
@param w (float): Shifted ECEF coordinate (m).
@param lat (float): Observer geodetic latitude (deg).
@param lon (float): Observer geodetic longitude (deg).
@param e (float): Target east ENU coordinate (m).
@param n (float): Target north ENU coordinate (m).
@param u (float): Target up ENU coordinate (m).
"""
t = cos_lon * u + sin_lon * v
e = -sin_lon * u + cos_lon * v
u = cos_lat * t + sin_lat * w
n = -sin_lat * t + cos_lat * w
lat = math.radians(lat)
lon = math.radians(lon)
return e, n, u
cos_lat = math.cos(lat)
sin_lat = math.sin(lat)
cos_lon = math.cos(lon)
sin_lon = math.sin(lon)
def distance_ecef(point1, point2):
t = cos_lon * u + sin_lon * v
e = -sin_lon * u + cos_lon * v
u = cos_lat * t + sin_lat * w
n = -sin_lat * t + cos_lat * w
"""
@brief Computes the Euclidean distance between two points in ECEF coordinates.
@param point1 (tuple): Coordinates of the first point (x, y, z) in meters.
@param point2 (tuple): Coordinates of the second point (x, y, z) in meters.
@return distance (float): Euclidean distance between the two points in meters.
"""
return e, n, u
return math.sqrt(
(point2[0]-point1[0])**2 +
(point2[1]-point1[1])**2 +
(point2[2]-point1[2])**2)
def distance_ecef(point1, point2):
def average_points(points):
return math.sqrt(
(point2[0]-point1[0])**2 +
(point2[1]-point1[1])**2 +
(point2[2]-point1[2])**2)
"""
@brief Computes the average point from a list of points.
@param points (list): List of points, where each point is a tuple of coordinates (x, y, z) in meters.
@return average_point (list): Coordinates of the average point (x_avg, y_avg, z_avg) in meters.
"""
def average_points(points):
return [sum(coord) / len(coord) for coord in zip(*points)]
return [sum(coord) / len(coord) for coord in zip(*points)]

View file

@ -13,183 +13,182 @@ from concurrent.futures import ThreadPoolExecutor
class EllipseParametric:
"""
@class EllipseParametric
@brief A class for intersecting ellipses using a parametric approx.
@details Uses associated detections from multiple radars.
@see blah2 at https://github.com/30hours/blah2.
"""
def __init__(self, method="mean", nSamples=150, threshold=500):
"""
@class EllipseParametric
@brief A class for intersecting ellipses using a parametric approx.
@details Uses associated detections from multiple radars.
@see blah2 at https://github.com/30hours/blah2.
@brief Constructor for the EllipseParametric class.
"""
def __init__(self, method="mean", nSamples=150, threshold=500):
self.ellipsoids = []
self.nSamples = nSamples
self.threshold = threshold
self.method = method
"""
@brief Constructor for the EllipseParametric class.
"""
def process(self, assoc_detections, radar_data):
self.ellipsoids = []
self.nSamples = nSamples
self.threshold = threshold
self.method = method
"""
@brief Perform target localisation using the ellipse parametric method.
@details Generate a (non arc-length) parametric ellipse for each node.
@param assoc_detections (dict): JSON of blah2 radar detections.
@param radar_data (dict): JSON of adsb2dd truth detections.
@return dict: Dict of associated detections.
"""
def process(self, assoc_detections, radar_data):
output = {}
"""
@brief Perform target localisation using the ellipse parametric method.
@details Generate a (non arc-length) parametric ellipse for each node.
@param assoc_detections (dict): JSON of blah2 radar detections.
@param radar_data (dict): JSON of adsb2dd truth detections.
@return dict: Dict of associated detections.
"""
# return if no detections
if not assoc_detections:
return output
output = {}
for target in assoc_detections:
# return if no detections
if not assoc_detections:
return output
target_samples = {}
target_samples[target] = {}
for target in assoc_detections:
for radar in assoc_detections[target]:
target_samples = {}
target_samples[target] = {}
# create ellipsoid for radar
ellipsoid = next((
item for item in self.ellipsoids
if item.name == radar["radar"]), None)
for radar in assoc_detections[target]:
if ellipsoid is None:
config = radar_data[radar["radar"]]["config"]
x_tx, y_tx, z_tx = Geometry.lla2ecef(
config['location']['tx']['latitude'],
config['location']['tx']['longitude'],
config['location']['tx']['altitude']
)
x_rx, y_rx, z_rx = Geometry.lla2ecef(
config['location']['rx']['latitude'],
config['location']['rx']['longitude'],
config['location']['rx']['altitude']
)
ellipsoid = Ellipsoid(
[x_tx, y_tx, z_tx],
[x_rx, y_rx, z_rx],
radar["radar"]
)
# create ellipsoid for radar
ellipsoid = next((
item for item in self.ellipsoids
if item.name == radar["radar"]), None)
samples = self.sample(ellipsoid, radar["delay"]*1000, self.nSamples)
target_samples[target][radar["radar"]] = samples
if ellipsoid is None:
config = radar_data[radar["radar"]]["config"]
x_tx, y_tx, z_tx = Geometry.lla2ecef(
config['location']['tx']['latitude'],
config['location']['tx']['longitude'],
config['location']['tx']['altitude']
)
x_rx, y_rx, z_rx = Geometry.lla2ecef(
config['location']['rx']['latitude'],
config['location']['rx']['longitude'],
config['location']['rx']['altitude']
)
ellipsoid = Ellipsoid(
[x_tx, y_tx, z_tx],
[x_rx, y_rx, z_rx],
radar["radar"]
)
# find close points, ellipse 1 is master
radar_keys = list(target_samples[target].keys())
samples_intersect = []
samples = self.sample(ellipsoid, radar["delay"]*1000, self.nSamples)
target_samples[target][radar["radar"]] = samples
if self.method == "mean":
# find close points, ellipse 1 is master
radar_keys = list(target_samples[target].keys())
samples_intersect = []
# loop points in main ellipsoid
for point1 in target_samples[target][radar_keys[0]]:
valid_point = True
# loop over each other list
for i in range(1, len(radar_keys)):
# loop points in other list
if not any(Geometry.distance_ecef(point1, point2) < self.threshold
for point2 in target_samples[target][radar_keys[i]]):
valid_point = False
break
if valid_point:
samples_intersect.append(point1)
if self.method == "mean":
if len(samples_intersect) == 0:
continue
# loop points in main ellipsoid
for point1 in target_samples[target][radar_keys[0]]:
valid_point = True
# loop over each other list
for i in range(1, len(radar_keys)):
# loop points in other list
if not any(Geometry.distance_ecef(point1, point2) < self.threshold
for point2 in target_samples[target][radar_keys[i]]):
valid_point = False
break
if valid_point:
samples_intersect.append(point1)
average_point = Geometry.average_points(samples_intersect)
samples_intersect = [average_point]
if len(samples_intersect) == 0:
continue
elif self.method == "minimum":
average_point = Geometry.average_points(samples_intersect)
samples_intersect = [average_point]
elif self.method == "minimum":
min_distance = self.threshold
min_point1 = None
# loop points in main ellipsoid
for point1 in target_samples[target][radar_keys[0]]:
valid_point = True
distance_from_point1 = [self.threshold]*(len(radar_keys)-1)
# loop over each other list
for i in range(1, len(radar_keys)):
if i > 1 and distance_from_point1[i-1] > self.threshold:
valid_point = False
break
# loop points in other list
for point2 in target_samples[target][radar_keys[i]]:
distance = Geometry.distance_ecef(point1, point2)
if distance < distance_from_point1[i-1]:
distance_from_point1[i-1] = distance
norm = math.sqrt(sum(x ** 2 for x in distance_from_point1))
if valid_point and norm < min_distance:
min_distance = norm
min_point1 = point1
if min_point1 is not None:
samples_intersect.append(min_point1)
else:
continue
min_distance = self.threshold
min_point1 = None
# loop points in main ellipsoid
for point1 in target_samples[target][radar_keys[0]]:
valid_point = True
distance_from_point1 = [self.threshold]*(len(radar_keys)-1)
# loop over each other list
for i in range(1, len(radar_keys)):
if i > 1 and distance_from_point1[i-1] > self.threshold:
valid_point = False
break
# loop points in other list
for point2 in target_samples[target][radar_keys[i]]:
distance = Geometry.distance_ecef(point1, point2)
if distance < distance_from_point1[i-1]:
distance_from_point1[i-1] = distance
norm = math.sqrt(sum(x ** 2 for x in distance_from_point1))
if valid_point and norm < min_distance:
min_distance = norm
min_point1 = point1
if min_point1 is not None:
samples_intersect.append(min_point1)
else:
print('Invalid method.')
return output
continue
# remove duplicates and convert to LLA
output[target] = {}
output[target]["points"] = []
for i in range(len(samples_intersect)):
samples_intersect[i] = Geometry.ecef2lla(
samples_intersect[i][0],
samples_intersect[i][1],
samples_intersect[i][2])
output[target]["points"].append([
round(samples_intersect[i][0], 3),
round(samples_intersect[i][1], 3),
0])
else:
print('Invalid method.')
return output
return output
# remove duplicates and convert to LLA
output[target] = {}
output[target]["points"] = []
for i in range(len(samples_intersect)):
samples_intersect[i] = Geometry.ecef2lla(
samples_intersect[i][0],
samples_intersect[i][1],
samples_intersect[i][2])
output[target]["points"].append([
round(samples_intersect[i][0], 3),
round(samples_intersect[i][1], 3),
0])
def sample(self, ellipsoid, bistatic_range, n):
return output
"""
@brief Generate a set of ECEF points for the ellipse.
@details No arc length parametrisation.
@details Use ECEF because distance measure is simple over LLA.
@param ellipsoid (Ellipsoid): The ellipsoid object to use.
@param bistatic_range (float): Bistatic range for ellipse.
@param n (int): Number of points to generate.
@return list: Samples with size [n, 3].
"""
def sample(self, ellipsoid, bistatic_range, n):
# rotation matrix
theta = ellipsoid.yaw
R = np.array([
[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]
])
"""
@brief Generate a set of ECEF points for the ellipse.
@details No arc length parametrisation.
@details Use ECEF because distance measure is simple over LLA.
@param ellipsoid (Ellipsoid): The ellipsoid object to use.
@param bistatic_range (float): Bistatic range for ellipse.
@param n (int): Number of points to generate.
@return list: Samples with size [n, 3].
"""
# compute samples vectorised
a = (bistatic_range+ellipsoid.distance)/2
b = np.sqrt(a**2 - (ellipsoid.distance/2)**2)
u = np.linspace(0, 2 * np.pi, n)
x = a * np.cos(u)
y = b * np.sin(u)
r = np.stack([x, y], axis=-1).reshape(-1, 2)
# rotation matrix
theta = ellipsoid.yaw
R = np.array([
[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]
])
r_1 = np.dot(r, R)
output = []
# compute samples vectorised
a = (bistatic_range+ellipsoid.distance)/2
b = np.sqrt(a**2 - (ellipsoid.distance/2)**2)
u = np.linspace(0, 2 * np.pi, n)
x = a * np.cos(u)
y = b * np.sin(u)
r = np.stack([x, y], axis=-1).reshape(-1, 2)
for i in range(len(r_1)):
# points to ECEF
x, y, z = Geometry.enu2ecef(
r_1[i][0], r_1[i][1], 100,
ellipsoid.midpoint_lla[0],
ellipsoid.midpoint_lla[1],
ellipsoid.midpoint_lla[2])
output.append([x, y, z])
r_1 = np.dot(r, R)
output = []
return output
for i in range(len(r_1)):
# points to ECEF
x, y, z = Geometry.enu2ecef(
r_1[i][0], r_1[i][1], 100,
ellipsoid.midpoint_lla[0],
ellipsoid.midpoint_lla[1],
ellipsoid.midpoint_lla[2])
output.append([x, y, z])
return output

View file

@ -10,194 +10,194 @@ import math
class EllipsoidParametric:
"""
@class EllipsoidParametric
@brief A class for intersecting ellipsoids using a parametric approx.
@details Uses associated detections from multiple radars.
@see blah2 at https://github.com/30hours/blah2.
"""
def __init__(self, method="mean", nSamples=100, threshold=500):
"""
@class EllipsoidParametric
@brief A class for intersecting ellipsoids using a parametric approx.
@details Uses associated detections from multiple radars.
@see blah2 at https://github.com/30hours/blah2.
@brief Constructor for the EllipsoidParametric class.
"""
def __init__(self, method="mean", nSamples=100, threshold=500):
self.ellipsoids = []
self.nSamples = nSamples
self.threshold = threshold
self.method = method
"""
@brief Constructor for the EllipsoidParametric class.
"""
def process(self, assoc_detections, radar_data):
self.ellipsoids = []
self.nSamples = nSamples
self.threshold = threshold
self.method = method
"""
@brief Perform target localisation using the ellipsoid parametric method.
@details Generate a (non arc-length) parametric ellipsoid for each node.
@param assoc_detections (dict): JSON of blah2 radar detections.
@param radar_data (dict): JSON of adsb2dd truth detections.
@return dict: Dict of associated detections.
"""
def process(self, assoc_detections, radar_data):
output = {}
"""
@brief Perform target localisation using the ellipsoid parametric method.
@details Generate a (non arc-length) parametric ellipsoid for each node.
@param assoc_detections (dict): JSON of blah2 radar detections.
@param radar_data (dict): JSON of adsb2dd truth detections.
@return dict: Dict of associated detections.
"""
# return if no detections
if not assoc_detections:
return output
output = {}
for target in assoc_detections:
# return if no detections
if not assoc_detections:
return output
target_samples = {}
target_samples[target] = {}
for target in assoc_detections:
for radar in assoc_detections[target]:
target_samples = {}
target_samples[target] = {}
# create ellipsoid for radar
ellipsoid = next((
item for item in self.ellipsoids
if item.name == radar["radar"]), None)
for radar in assoc_detections[target]:
if ellipsoid is None:
config = radar_data[radar["radar"]]["config"]
x_tx, y_tx, z_tx = Geometry.lla2ecef(
config['location']['tx']['latitude'],
config['location']['tx']['longitude'],
config['location']['tx']['altitude']
)
x_rx, y_rx, z_rx = Geometry.lla2ecef(
config['location']['rx']['latitude'],
config['location']['rx']['longitude'],
config['location']['rx']['altitude']
)
ellipsoid = Ellipsoid(
[x_tx, y_tx, z_tx],
[x_rx, y_rx, z_rx],
radar["radar"]
)
# create ellipsoid for radar
ellipsoid = next((
item for item in self.ellipsoids
if item.name == radar["radar"]), None)
samples = self.sample(ellipsoid, radar["delay"]*1000, self.nSamples)
target_samples[target][radar["radar"]] = samples
if ellipsoid is None:
config = radar_data[radar["radar"]]["config"]
x_tx, y_tx, z_tx = Geometry.lla2ecef(
config['location']['tx']['latitude'],
config['location']['tx']['longitude'],
config['location']['tx']['altitude']
)
x_rx, y_rx, z_rx = Geometry.lla2ecef(
config['location']['rx']['latitude'],
config['location']['rx']['longitude'],
config['location']['rx']['altitude']
)
ellipsoid = Ellipsoid(
[x_tx, y_tx, z_tx],
[x_rx, y_rx, z_rx],
radar["radar"]
)
# find close points, ellipsoid 1 is master
radar_keys = list(target_samples[target].keys())
samples_intersect = []
samples = self.sample(ellipsoid, radar["delay"]*1000, self.nSamples)
target_samples[target][radar["radar"]] = samples
if self.method == "mean":
# find close points, ellipsoid 1 is master
radar_keys = list(target_samples[target].keys())
samples_intersect = []
# loop points in main ellipsoid
for point1 in target_samples[target][radar_keys[0]]:
valid_point = True
# loop over each other list
for i in range(1, len(radar_keys)):
# loop points in other list
if not any(Geometry.distance_ecef(point1, point2) < self.threshold
for point2 in target_samples[target][radar_keys[i]]):
valid_point = False
break
if valid_point:
samples_intersect.append(point1)
if self.method == "mean":
if len(samples_intersect) == 0:
continue
# loop points in main ellipsoid
for point1 in target_samples[target][radar_keys[0]]:
valid_point = True
# loop over each other list
for i in range(1, len(radar_keys)):
# loop points in other list
if not any(Geometry.distance_ecef(point1, point2) < self.threshold
for point2 in target_samples[target][radar_keys[i]]):
valid_point = False
break
if valid_point:
samples_intersect.append(point1)
average_point = Geometry.average_points(samples_intersect)
samples_intersect = [average_point]
if len(samples_intersect) == 0:
continue
elif self.method == "minimum":
average_point = Geometry.average_points(samples_intersect)
samples_intersect = [average_point]
elif self.method == "minimum":
min_distance = self.threshold
min_point1 = None
# loop points in main ellipsoid
for point1 in target_samples[target][radar_keys[0]]:
valid_point = True
distance_from_point1 = [self.threshold]*(len(radar_keys)-1)
# loop over each other list
for i in range(1, len(radar_keys)):
if i > 1 and distance_from_point1[i-1] > self.threshold:
valid_point = False
break
# loop points in other list
for point2 in target_samples[target][radar_keys[i]]:
distance = Geometry.distance_ecef(point1, point2)
if distance < distance_from_point1[i-1]:
distance_from_point1[i-1] = distance
norm = math.sqrt(sum(x ** 2 for x in distance_from_point1))
if valid_point and norm < min_distance:
min_distance = norm
min_point1 = point1
if min_point1 is not None:
samples_intersect.append(min_point1)
else:
continue
min_distance = self.threshold
min_point1 = None
# loop points in main ellipsoid
for point1 in target_samples[target][radar_keys[0]]:
valid_point = True
distance_from_point1 = [self.threshold]*(len(radar_keys)-1)
# loop over each other list
for i in range(1, len(radar_keys)):
if i > 1 and distance_from_point1[i-1] > self.threshold:
valid_point = False
break
# loop points in other list
for point2 in target_samples[target][radar_keys[i]]:
distance = Geometry.distance_ecef(point1, point2)
if distance < distance_from_point1[i-1]:
distance_from_point1[i-1] = distance
norm = math.sqrt(sum(x ** 2 for x in distance_from_point1))
if valid_point and norm < min_distance:
min_distance = norm
min_point1 = point1
if min_point1 is not None:
samples_intersect.append(min_point1)
else:
print('Invalid method.')
return output
continue
# remove duplicates and convert to LLA
output[target] = {}
output[target]["points"] = []
else:
print('Invalid method.')
return output
for i in range(len(samples_intersect)):
samples_intersect[i] = Geometry.ecef2lla(
samples_intersect[i][0],
samples_intersect[i][1],
samples_intersect[i][2])
output[target]["points"].append([
round(samples_intersect[i][0], 3),
round(samples_intersect[i][1], 3),
round(samples_intersect[i][2])])
# remove duplicates and convert to LLA
output[target] = {}
output[target]["points"] = []
return output
for i in range(len(samples_intersect)):
samples_intersect[i] = Geometry.ecef2lla(
samples_intersect[i][0],
samples_intersect[i][1],
samples_intersect[i][2])
output[target]["points"].append([
round(samples_intersect[i][0], 3),
round(samples_intersect[i][1], 3),
round(samples_intersect[i][2])])
def sample(self, ellipsoid, bistatic_range, n):
return output
"""
@brief Generate a set of ECEF points for the ellipsoid.
@details No arc length parametrisation.
@details Use ECEF because distance measure is simple over LLA.
@param ellipsoid (Ellipsoid): The ellipsoid object to use.
@param bistatic_range (float): Bistatic range for ellipsoid.
@param n (int): Number of points to generate.
@return list: Samples with size [n, 3].
"""
def sample(self, ellipsoid, bistatic_range, n):
# rotation matrix
phi = ellipsoid.pitch
theta = ellipsoid.yaw
R = np.array([
[np.cos(theta), -np.sin(theta)*np.cos(phi), np.sin(theta)*np.sin(phi)],
[np.sin(theta), np.cos(theta)*np.cos(phi), -np.cos(theta)*np.sin(phi)],
[0, np.sin(phi), np.cos(phi)]
])
"""
@brief Generate a set of ECEF points for the ellipsoid.
@details No arc length parametrisation.
@details Use ECEF because distance measure is simple over LLA.
@param ellipsoid (Ellipsoid): The ellipsoid object to use.
@param bistatic_range (float): Bistatic range for ellipsoid.
@param n (int): Number of points to generate.
@return list: Samples with size [n, 3].
"""
# compute samples vectorised
a = (bistatic_range+ellipsoid.distance)/2
b = np.sqrt(a**2 - (ellipsoid.distance/2)**2)
u_values = np.linspace(0, 2 * np.pi, n)
v_values = np.linspace(-np.pi/2, np.pi/2, int(n/2))
u, v = np.meshgrid(u_values, v_values, indexing='ij')
x = a * np.cos(u)
y = b * np.sin(u) * np.cos(v)
z = b * np.sin(u) * np.sin(v)
r = np.stack([x, y, z], axis=-1).reshape(-1, 3)
# rotation matrix
phi = ellipsoid.pitch
theta = ellipsoid.yaw
R = np.array([
[np.cos(theta), -np.sin(theta)*np.cos(phi), np.sin(theta)*np.sin(phi)],
[np.sin(theta), np.cos(theta)*np.cos(phi), -np.cos(theta)*np.sin(phi)],
[0, np.sin(phi), np.cos(phi)]
])
r_1 = np.dot(r, R)
output = []
# compute samples vectorised
a = (bistatic_range+ellipsoid.distance)/2
b = np.sqrt(a**2 - (ellipsoid.distance/2)**2)
u_values = np.linspace(0, 2 * np.pi, n)
v_values = np.linspace(-np.pi/2, np.pi/2, int(n/2))
u, v = np.meshgrid(u_values, v_values, indexing='ij')
x = a * np.cos(u)
y = b * np.sin(u) * np.cos(v)
z = b * np.sin(u) * np.sin(v)
r = np.stack([x, y, z], axis=-1).reshape(-1, 3)
for i in range(len(r_1)):
# points to ECEF
x, y, z = Geometry.enu2ecef(
r_1[i][0], r_1[i][1], r_1[i][2],
ellipsoid.midpoint_lla[0],
ellipsoid.midpoint_lla[1],
ellipsoid.midpoint_lla[2])
# points to LLA
[x, y, z] = Geometry.ecef2lla(x, y, z)
# only store points above ground
if z > 0:
# convert back to ECEF for simple distance measurements
[x, y, z] = Geometry.lla2ecef(x, y, z)
output.append([round(x, 3), round(y, 3), round(z)])
r_1 = np.dot(r, R)
output = []
return output
for i in range(len(r_1)):
# points to ECEF
x, y, z = Geometry.enu2ecef(
r_1[i][0], r_1[i][1], r_1[i][2],
ellipsoid.midpoint_lla[0],
ellipsoid.midpoint_lla[1],
ellipsoid.midpoint_lla[2])
# points to LLA
[x, y, z] = Geometry.ecef2lla(x, y, z)
# only store points above ground
if z > 0:
# convert back to ECEF for simple distance measurements
[x, y, z] = Geometry.lla2ecef(x, y, z)
output.append([round(x, 3), round(y, 3), round(z)])
return output

View file

@ -9,118 +9,118 @@ import math
class SphericalIntersection:
"""
@class SphericalIntersection
@brief A class for intersecting ellipsoids using SX method.
@details Uses associated detections from multiple radars.
@see https://ieeexplore.ieee.org/document/6129656
"""
def __init__(self):
"""
@class SphericalIntersection
@brief A class for intersecting ellipsoids using SX method.
@details Uses associated detections from multiple radars.
@see https://ieeexplore.ieee.org/document/6129656
@brief Constructor for the SphericalIntersection class.
"""
def __init__(self):
self.type = "rx"
self.not_type = "rx" if self.type == "tx" else "tx"
"""
@brief Constructor for the SphericalIntersection class.
"""
def process(self, assoc_detections, radar_data):
self.type = "rx"
self.not_type = "rx" if self.type == "tx" else "tx"
"""
@brief Perform target localisation using the SX method.
@param assoc_detections (dict): JSON of blah2 radar detections.
@param radar_data (dict): JSON of adsb2dd truth detections.
@return dict: Dict of associated detections.
"""
def process(self, assoc_detections, radar_data):
output = {}
"""
@brief Perform target localisation using the SX method.
@param assoc_detections (dict): JSON of blah2 radar detections.
@param radar_data (dict): JSON of adsb2dd truth detections.
@return dict: Dict of associated detections.
"""
# return if no detections
if not assoc_detections:
return output
output = {}
# pick first radar rx node as ENU reference (arbitrary)
radar = next(iter(radar_data))
reference_lla = [
radar_data[radar]["config"]["location"][self.not_type]["latitude"],
radar_data[radar]["config"]["location"][self.not_type]["longitude"],
radar_data[radar]["config"]["location"][self.not_type]["altitude"]]
reference_ecef = Geometry.lla2ecef(reference_lla[0],
reference_lla[1], reference_lla[2])
# return if no detections
if not assoc_detections:
return output
for target in assoc_detections:
# pick first radar rx node as ENU reference (arbitrary)
radar = next(iter(radar_data))
reference_lla = [
radar_data[radar]["config"]["location"][self.not_type]["latitude"],
radar_data[radar]["config"]["location"][self.not_type]["longitude"],
radar_data[radar]["config"]["location"][self.not_type]["altitude"]]
reference_ecef = Geometry.lla2ecef(reference_lla[0],
reference_lla[1], reference_lla[2])
nDetections = len(assoc_detections[target])
for target in assoc_detections:
# matrix of positions of non-constant node
S = np.zeros((nDetections, 3))
nDetections = len(assoc_detections[target])
# additional vector
z_vec = np.zeros((nDetections, 1))
# matrix of positions of non-constant node
S = np.zeros((nDetections, 3))
# bistatic range vector r
r = np.zeros((nDetections, 1))
# additional vector
z_vec = np.zeros((nDetections, 1))
for index, radar in enumerate(assoc_detections[target]):
# bistatic range vector r
r = np.zeros((nDetections, 1))
# convert position to ENU and add to S
config = radar_data[radar["radar"]]["config"]
x, y, z = Geometry.lla2ecef(
config['location'][self.type]['latitude'],
config['location'][self.type]['longitude'],
config['location'][self.type]['altitude'])
x_enu, y_enu, z_enu = Geometry.ecef2enu(x, y, z,
reference_lla[0],
reference_lla[1],
reference_lla[2])
S[index, :] = [x_enu, y_enu, z_enu]
for index, radar in enumerate(assoc_detections[target]):
# add to z
distance = Geometry.distance_ecef(
[x, y, z], [reference_ecef[0],
reference_ecef[1], reference_ecef[2]])
R_i = (radar["delay"]*1000) + distance
z_vec[index, :] = (x_enu**2 + y_enu**2 + z_enu**2 - R_i**2)/2
# convert position to ENU and add to S
config = radar_data[radar["radar"]]["config"]
x, y, z = Geometry.lla2ecef(
config['location'][self.type]['latitude'],
config['location'][self.type]['longitude'],
config['location'][self.type]['altitude'])
x_enu, y_enu, z_enu = Geometry.ecef2enu(x, y, z,
reference_lla[0],
reference_lla[1],
reference_lla[2])
S[index, :] = [x_enu, y_enu, z_enu]
# add to r
r[index, :] = R_i
# add to z
distance = Geometry.distance_ecef(
[x, y, z], [reference_ecef[0],
reference_ecef[1], reference_ecef[2]])
R_i = (radar["delay"]*1000) + distance
z_vec[index, :] = (x_enu**2 + y_enu**2 + z_enu**2 - R_i**2)/2
# now compute matrix math
S_star = np.linalg.inv(S.T @ S) @ S.T
a = S_star @ z_vec
b = S_star @ r
R_t = [0, 0]
discriminant = 4*((a.T @ b)**2) - 4*((b.T @ b) - 1)*(a.T @ a)
if discriminant >= 0:
R_t[0] = (-2*(a.T @ b) - np.sqrt(discriminant))/(2*((b.T @ b)-1))
R_t[1] = (-2*(a.T @ b) + np.sqrt(discriminant))/(2*((b.T @ b)-1))
else:
R_t[0] = np.real((-2*(a.T @ b) - np.sqrt(discriminant + 0j))/(2*((b.T @ b)-1)))
R_t[1] = np.real((-2*(a.T @ b) + np.sqrt(discriminant + 0j))/(2*((b.T @ b)-1)))
x_t = [0, 0]
x_t[0] = S_star @ (z_vec + r*R_t[0])
x_t[1] = S_star @ (z_vec + r*R_t[1])
# add to r
r[index, :] = R_i
# use solution with highest altitude
output[target] = {}
output[target]["points"] = []
x_t_list = [np.squeeze(arr).tolist() for arr in x_t]
# now compute matrix math
S_star = np.linalg.inv(S.T @ S) @ S.T
a = S_star @ z_vec
b = S_star @ r
R_t = [0, 0]
discriminant = 4*((a.T @ b)**2) - 4*((b.T @ b) - 1)*(a.T @ a)
if discriminant >= 0:
R_t[0] = (-2*(a.T @ b) - np.sqrt(discriminant))/(2*((b.T @ b)-1))
R_t[1] = (-2*(a.T @ b) + np.sqrt(discriminant))/(2*((b.T @ b)-1))
else:
R_t[0] = np.real((-2*(a.T @ b) - np.sqrt(discriminant + 0j))/(2*((b.T @ b)-1)))
R_t[1] = np.real((-2*(a.T @ b) + np.sqrt(discriminant + 0j))/(2*((b.T @ b)-1)))
x_t = [0, 0]
x_t[0] = S_star @ (z_vec + r*R_t[0])
x_t[1] = S_star @ (z_vec + r*R_t[1])
# convert points back to LLA
for index in range(len(x_t_list)):
x, y, z = Geometry.enu2ecef(x_t_list[index][0],
x_t_list[index][1],
x_t_list[index][2],
reference_lla[0],
reference_lla[1],
reference_lla[2])
lat, lon, alt = Geometry.ecef2lla(x, y, z)
x_t_list[index] = [lat, lon, alt]
# use solution with highest altitude
output[target] = {}
output[target]["points"] = []
x_t_list = [np.squeeze(arr).tolist() for arr in x_t]
if x_t[0][2] > x_t[1][2]:
output[target]["points"].append(x_t_list[0])
else:
output[target]["points"].append(x_t_list[1])
# convert points back to LLA
for index in range(len(x_t_list)):
x, y, z = Geometry.enu2ecef(x_t_list[index][0],
x_t_list[index][1],
x_t_list[index][2],
reference_lla[0],
reference_lla[1],
reference_lla[2])
lat, lon, alt = Geometry.ecef2lla(x, y, z)
x_t_list[index] = [lat, lon, alt]
if x_t[0][2] > x_t[1][2]:
output[target]["points"].append(x_t_list[0])
else:
output[target]["points"].append(x_t_list[1])
return output
return output

View file

@ -46,21 +46,17 @@ class AdsbTruth:
# store relevant data
if adsb:
# loop over aircraft
for aircraft in adsb["aircraft"]:
if aircraft.get("seen_pos") and \
aircraft.get("alt_geom") and \
aircraft.get("flight") and \
aircraft.get("seen_pos") < self.seen_pos_limit:
output[aircraft["hex"]] = {}
output[aircraft["hex"]]["lat"] = aircraft["lat"]
output[aircraft["hex"]]["lon"] = aircraft["lon"]
output[aircraft["hex"]]["alt"] = aircraft["alt_geom"]
output[aircraft["hex"]]["flight"] = aircraft["flight"]
output[aircraft["hex"]]["timestamp"] = \
adsb["now"] - aircraft["seen_pos"]
if aircraft.get("seen_pos") and \
aircraft.get("alt_geom") and \
aircraft.get("flight") and \
aircraft.get("seen_pos") < self.seen_pos_limit:
output[aircraft["hex"]] = {}
output[aircraft["hex"]]["lat"] = aircraft["lat"]
output[aircraft["hex"]]["lon"] = aircraft["lon"]
output[aircraft["hex"]]["alt"] = aircraft["alt_geom"]
output[aircraft["hex"]]["flight"] = aircraft["flight"]
output[aircraft["hex"]]["timestamp"] = \
adsb["now"] - aircraft["seen_pos"]
return output

View file

@ -4,40 +4,41 @@
"""
import math
from algorithm.geometry.Geometry import Geometry
class Ellipsoid:
"""
@class Ellipsoid
@brief A class to store ellipsoid parameters for bistatic radar.
@details Stores foci, midpoint, pitch, yaw and distance.
"""
def __init__(self, f1, f2, name):
"""
@class Ellipsoid
@brief A class to store ellipsoid parameters for bistatic radar.
@details Stores foci, midpoint, pitch, yaw and distance.
@brief Constructor for the Ellipsoid class.
@param f1 (list): [x, y, z] of foci 1 in ECEF.
@param f2 (list): [x, y, z] of foci 2 in ECEF.
@param name (str): Name to associate with shape.
"""
def __init__(self, f1, f2, name):
self.f1 = f1
self.f2 = f2
self.name = name
"""
@brief Constructor for the Ellipsoid class.
@param f1 (list): [x, y, z] of foci 1 in ECEF.
@param f2 (list): [x, y, z] of foci 2 in ECEF.
@param name (str): Name to associate with shape.
"""
self.f1 = f1
self.f2 = f2
self.name = name
# dependent members
self.midpoint = [(f1[0]+f2[0])/2,
(f1[1]+f2[1])/2, (f1[2]+f2[2])/2]
self.midpoint_lla = Geometry.ecef2lla(
self.midpoint[0], self.midpoint[1], self.midpoint[2])
vector_enu = Geometry.ecef2enu(f1[0], f1[1], f1[2],
self.midpoint_lla[0], self.midpoint_lla[1], self.midpoint_lla[2])
self.yaw = -math.atan2(vector_enu[1], vector_enu[0])
self.pitch = math.atan2(vector_enu[2],
math.sqrt(vector_enu[0]**2 + vector_enu[1]**2))
self.distance = math.sqrt(
(f2[0] - f1[0])**2 +
(f2[1] - f1[1])**2 +
(f2[2] - f1[2])**2)
# dependent members
self.midpoint = [(f1[0]+f2[0])/2,
(f1[1]+f2[1])/2, (f1[2]+f2[2])/2]
self.midpoint_lla = Geometry.ecef2lla(
self.midpoint[0], self.midpoint[1], self.midpoint[2])
vector_enu = Geometry.ecef2enu(f1[0], f1[1], f1[2],
self.midpoint_lla[0], self.midpoint_lla[1], self.midpoint_lla[2])
self.yaw = -math.atan2(vector_enu[1], vector_enu[0])
self.pitch = math.atan2(vector_enu[2],
math.sqrt(vector_enu[0]**2 + vector_enu[1]**2))
self.distance = math.sqrt(
(f2[0] - f1[0])**2 +
(f2[1] - f1[1])**2 +
(f2[2] - f1[2])**2)

View file

@ -26,21 +26,21 @@ from algorithm.geometry.Geometry import Geometry
# init config file
try:
with open('config/config.yml', 'r') as file:
config = yaml.safe_load(file)
nSamplesEllipse = config['localisation']['ellipse']['nSamples']
thresholdEllipse = config['localisation']['ellipse']['threshold']
nDisplayEllipse = config['localisation']['ellipse']['nDisplay']
nSamplesEllipsoid = config['localisation']['ellipsoid']['nSamples']
thresholdEllipsoid = config['localisation']['ellipsoid']['threshold']
nDisplayEllipsoid = config['localisation']['ellipsoid']['nDisplay']
tDeleteAdsb = config['associate']['adsb']['tDelete']
save = config['3lips']['save']
tDelete = config['3lips']['tDelete']
with open('config/config.yml', 'r') as file:
config = yaml.safe_load(file)
nSamplesEllipse = config['localisation']['ellipse']['nSamples']
thresholdEllipse = config['localisation']['ellipse']['threshold']
nDisplayEllipse = config['localisation']['ellipse']['nDisplay']
nSamplesEllipsoid = config['localisation']['ellipsoid']['nSamples']
thresholdEllipsoid = config['localisation']['ellipsoid']['threshold']
nDisplayEllipsoid = config['localisation']['ellipsoid']['nDisplay']
tDeleteAdsb = config['associate']['adsb']['tDelete']
save = config['3lips']['save']
tDelete = config['3lips']['tDelete']
except FileNotFoundError:
print("Error: Configuration file not found.")
print("Error: Configuration file not found.")
except yaml.YAMLError as e:
print("Error reading YAML configuration:", e)
print("Error reading YAML configuration:", e)
# init event loop
api = []
@ -58,245 +58,245 @@ saveFile = '/app/save/' + str(int(time.time())) + '.ndjson'
async def event():
print('Start event', flush=True)
print('Start event', flush=True)
global api, save
timestamp = int(time.time()*1000)
api_event = copy.copy(api)
global api, save
timestamp = int(time.time()*1000)
api_event = copy.copy(api)
# list all blah2 radars
radar_names = []
for item in api_event:
for radar in item["server"]:
radar_names.append(radar)
radar_names = list(set(radar_names))
# list all blah2 radars
radar_names = []
for item in api_event:
for radar in item["server"]:
radar_names.append(radar)
radar_names = list(set(radar_names))
# get detections all radar
radar_detections_url = [
"http://" + radar_name + "/api/detection" for radar_name in radar_names]
radar_detections = []
for url in radar_detections_url:
try:
response = requests.get(url, timeout=1)
response.raise_for_status()
data = response.json()
radar_detections.append(data)
except requests.exceptions.RequestException as e:
print(f"Error fetching data from {url}: {e}")
radar_detections.append(None)
# get detections all radar
radar_detections_url = [
"http://" + radar_name + "/api/detection" for radar_name in radar_names]
radar_detections = []
for url in radar_detections_url:
try:
response = requests.get(url, timeout=1)
response.raise_for_status()
data = response.json()
radar_detections.append(data)
except requests.exceptions.RequestException as e:
print(f"Error fetching data from {url}: {e}")
radar_detections.append(None)
# get config all radar
radar_config_url = [
"http://" + radar_name + "/api/config" for radar_name in radar_names]
radar_config = []
for url in radar_config_url:
try:
response = requests.get(url, timeout=1)
response.raise_for_status()
data = response.json()
radar_config.append(data)
except requests.exceptions.RequestException as e:
print(f"Error fetching data from {url}: {e}")
radar_config.append(None)
# get config all radar
radar_config_url = [
"http://" + radar_name + "/api/config" for radar_name in radar_names]
radar_config = []
for url in radar_config_url:
try:
response = requests.get(url, timeout=1)
response.raise_for_status()
data = response.json()
radar_config.append(data)
except requests.exceptions.RequestException as e:
print(f"Error fetching data from {url}: {e}")
radar_config.append(None)
# store detections in dict
radar_dict = {}
for i in range(len(radar_names)):
radar_dict[radar_names[i]] = {
"detection": radar_detections[i],
"config": radar_config[i]
}
# store detections in dict
radar_dict = {}
for i in range(len(radar_names)):
radar_dict[radar_names[i]] = {
"detection": radar_detections[i],
"config": radar_config[i]
}
# store truth in dict
truth_adsb = {}
adsb_urls = []
for item in api_event:
adsb_urls.append(item["adsb"])
adsb_urls = list(set(adsb_urls))
for url in adsb_urls:
truth_adsb[url] = adsbTruth.process(url)
# store truth in dict
truth_adsb = {}
adsb_urls = []
for item in api_event:
adsb_urls.append(item["adsb"])
adsb_urls = list(set(adsb_urls))
for url in adsb_urls:
truth_adsb[url] = adsbTruth.process(url)
# main processing
for item in api_event:
# main processing
for item in api_event:
start_time = time.time()
start_time = time.time()
# extract dict for item
radar_dict_item = {
key: radar_dict[key]
for key in item["server"]
if key in radar_dict
}
# extract dict for item
radar_dict_item = {
key: radar_dict[key]
for key in item["server"]
if key in radar_dict
}
# associator selection
if item["associator"] == "adsb-associator":
associator = adsbAssociator
else:
print("Error: Associator invalid.")
return
# associator selection
if item["associator"] == "adsb-associator":
associator = adsbAssociator
else:
print("Error: Associator invalid.")
return
# localisation selection
if item["localisation"] == "ellipse-parametric-mean":
localisation = ellipseParametricMean
elif item["localisation"] == "ellipse-parametric-min":
localisation = ellipseParametricMin
elif item["localisation"] == "ellipsoid-parametric-mean":
localisation = ellipsoidParametricMean
elif item["localisation"] == "ellipsoid-parametric-min":
localisation = ellipsoidParametricMin
elif item["localisation"] == "spherical-intersection":
localisation = sphericalIntersection
else:
print("Error: Localisation invalid.")
return
# localisation selection
if item["localisation"] == "ellipse-parametric-mean":
localisation = ellipseParametricMean
elif item["localisation"] == "ellipse-parametric-min":
localisation = ellipseParametricMin
elif item["localisation"] == "ellipsoid-parametric-mean":
localisation = ellipsoidParametricMean
elif item["localisation"] == "ellipsoid-parametric-min":
localisation = ellipsoidParametricMin
elif item["localisation"] == "spherical-intersection":
localisation = sphericalIntersection
else:
print("Error: Localisation invalid.")
return
# processing
associated_dets = associator.process(item["server"], radar_dict_item, timestamp)
associated_dets_3_radars = {
key: value
for key, value in associated_dets.items()
if isinstance(value, list) and len(value) >= 3
}
if associated_dets_3_radars:
print('Detections from 3 or more radars availble.')
print(associated_dets_3_radars)
associated_dets_2_radars = {
key: value
for key, value in associated_dets.items()
if isinstance(value, list) and len(value) >= 2
}
localised_dets = localisation.process(associated_dets_3_radars, radar_dict_item)
# processing
associated_dets = associator.process(item["server"], radar_dict_item, timestamp)
associated_dets_3_radars = {
key: value
for key, value in associated_dets.items()
if isinstance(value, list) and len(value) >= 3
}
if associated_dets_3_radars:
print('Detections from 3 or more radars availble.')
print(associated_dets_3_radars)
associated_dets_2_radars = {
key: value
for key, value in associated_dets.items()
if isinstance(value, list) and len(value) >= 2
}
localised_dets = localisation.process(associated_dets_3_radars, radar_dict_item)
if associated_dets:
print(associated_dets, flush=True)
if associated_dets:
print(associated_dets, flush=True)
# show ellipsoids of associated detections for 1 target
ellipsoids = {}
if item["localisation"] == "ellipse-parametric-mean" or \
item["localisation"] == "ellipsoid-parametric-mean" or \
item["localisation"] == "ellipse-parametric-min" or \
item["localisation"] == "ellipsoid-parametric-min":
if associated_dets_2_radars:
# get first target key
key = next(iter(associated_dets_2_radars))
ellipsoid_radars = []
for radar in associated_dets_2_radars[key]:
ellipsoid_radars.append(radar["radar"])
x_tx, y_tx, z_tx = Geometry.lla2ecef(
radar_dict_item[radar["radar"]]["config"]['location']['tx']['latitude'],
radar_dict_item[radar["radar"]]["config"]['location']['tx']['longitude'],
radar_dict_item[radar["radar"]]["config"]['location']['tx']['altitude']
)
x_rx, y_rx, z_rx = Geometry.lla2ecef(
radar_dict_item[radar["radar"]]["config"]['location']['rx']['latitude'],
radar_dict_item[radar["radar"]]["config"]['location']['rx']['longitude'],
radar_dict_item[radar["radar"]]["config"]['location']['rx']['altitude']
)
ellipsoid = Ellipsoid(
[x_tx, y_tx, z_tx],
[x_rx, y_rx, z_rx],
radar["radar"]
)
points = localisation.sample(ellipsoid, radar["delay"]*1000, nDisplayEllipse)
for i in range(len(points)):
lat, lon, alt = Geometry.ecef2lla(points[i][0], points[i][1], points[i][2])
if item["localisation"] == "ellipsoid-parametric-mean" or \
item["localisation"] == "ellipsoid-parametric-min":
alt = round(alt)
if item["localisation"] == "ellipse-parametric-mean" or \
item["localisation"] == "ellipse-parametric-min":
alt = 0
points[i] = ([round(lat, 3), round(lon, 3), alt])
ellipsoids[radar["radar"]] = points
# show ellipsoids of associated detections for 1 target
ellipsoids = {}
if item["localisation"] == "ellipse-parametric-mean" or \
item["localisation"] == "ellipsoid-parametric-mean" or \
item["localisation"] == "ellipse-parametric-min" or \
item["localisation"] == "ellipsoid-parametric-min":
if associated_dets_2_radars:
# get first target key
key = next(iter(associated_dets_2_radars))
ellipsoid_radars = []
for radar in associated_dets_2_radars[key]:
ellipsoid_radars.append(radar["radar"])
x_tx, y_tx, z_tx = Geometry.lla2ecef(
radar_dict_item[radar["radar"]]["config"]['location']['tx']['latitude'],
radar_dict_item[radar["radar"]]["config"]['location']['tx']['longitude'],
radar_dict_item[radar["radar"]]["config"]['location']['tx']['altitude']
)
x_rx, y_rx, z_rx = Geometry.lla2ecef(
radar_dict_item[radar["radar"]]["config"]['location']['rx']['latitude'],
radar_dict_item[radar["radar"]]["config"]['location']['rx']['longitude'],
radar_dict_item[radar["radar"]]["config"]['location']['rx']['altitude']
)
ellipsoid = Ellipsoid(
[x_tx, y_tx, z_tx],
[x_rx, y_rx, z_rx],
radar["radar"]
)
points = localisation.sample(ellipsoid, radar["delay"]*1000, nDisplayEllipse)
for i in range(len(points)):
lat, lon, alt = Geometry.ecef2lla(points[i][0], points[i][1], points[i][2])
if item["localisation"] == "ellipsoid-parametric-mean" or \
item["localisation"] == "ellipsoid-parametric-min":
alt = round(alt)
if item["localisation"] == "ellipse-parametric-mean" or \
item["localisation"] == "ellipse-parametric-min":
alt = 0
points[i] = ([round(lat, 3), round(lon, 3), alt])
ellipsoids[radar["radar"]] = points
stop_time = time.time()
stop_time = time.time()
# output data to API
item["timestamp_event"] = timestamp
item["truth"] = truth_adsb[item["adsb"]]
item["detections_associated"] = associated_dets
item["detections_localised"] = localised_dets
item["ellipsoids"] = ellipsoids
item["time"] = stop_time - start_time
# output data to API
item["timestamp_event"] = timestamp
item["truth"] = truth_adsb[item["adsb"]]
item["detections_associated"] = associated_dets
item["detections_localised"] = localised_dets
item["ellipsoids"] = ellipsoids
item["time"] = stop_time - start_time
print('Method: ' + item["localisation"], flush=True)
print(item["time"], flush=True)
print('Method: ' + item["localisation"], flush=True)
print(item["time"], flush=True)
# delete old API requests
api_event = [
item for item in api_event if timestamp - item["timestamp"] <= tDelete*1000]
# delete old API requests
api_event = [
item for item in api_event if timestamp - item["timestamp"] <= tDelete*1000]
# update API
api = api_event
# update API
api = api_event
# save to file
if save:
append_api_to_file(api)
# save to file
if save:
append_api_to_file(api)
# event loop
async def main():
while True:
await event()
await asyncio.sleep(1)
while True:
await event()
await asyncio.sleep(1)
def append_api_to_file(api_object, filename=saveFile):
if not os.path.exists(filename):
with open(filename, 'w') as new_file:
pass
if not os.path.exists(filename):
with open(filename, 'w') as new_file:
pass
with open(filename, 'a') as json_file:
json.dump(api_object, json_file)
json_file.write('\n')
with open(filename, 'a') as json_file:
json.dump(api_object, json_file)
json_file.write('\n')
def short_hash(input_string, length=10):
hash_object = hashlib.sha256(input_string.encode())
short_hash = hash_object.hexdigest()[:length]
return short_hash
hash_object = hashlib.sha256(input_string.encode())
short_hash = hash_object.hexdigest()[:length]
return short_hash
# message received callback
async def callback_message_received(msg):
timestamp = int(time.time()*1000)
timestamp = int(time.time()*1000)
# update timestamp if API entry exists
for x in api:
if x["hash"] == short_hash(msg):
x["timestamp"] = timestamp
break
# update timestamp if API entry exists
for x in api:
if x["hash"] == short_hash(msg):
x["timestamp"] = timestamp
break
# add API entry if does not exist, split URL
if not any(x.get("hash") == short_hash(msg) for x in api):
api.append({})
api[-1]["hash"] = short_hash(msg)
url_parts = msg.split("&")
for part in url_parts:
key, value = part.split("=")
if key in api[-1]:
if not isinstance(api[-1][key], list):
api[-1][key] = [api[-1][key]]
api[-1][key].append(value)
else:
api[-1][key] = value
api[-1]["timestamp"] = timestamp
if not isinstance(api[-1]["server"], list):
api[-1]["server"] = [api[-1]["server"]]
# add API entry if does not exist, split URL
if not any(x.get("hash") == short_hash(msg) for x in api):
api.append({})
api[-1]["hash"] = short_hash(msg)
url_parts = msg.split("&")
for part in url_parts:
key, value = part.split("=")
if key in api[-1]:
if not isinstance(api[-1][key], list):
api[-1][key] = [api[-1][key]]
api[-1][key].append(value)
else:
api[-1][key] = value
api[-1]["timestamp"] = timestamp
if not isinstance(api[-1]["server"], list):
api[-1]["server"] = [api[-1]["server"]]
# json dump
for item in api:
if item["hash"] == short_hash(msg):
output = json.dumps(item)
break
# json dump
for item in api:
if item["hash"] == short_hash(msg):
output = json.dumps(item)
break
return output
return output
# init messaging
message_api_request = Message('event', 6969)
message_api_request.set_callback_message_received(callback_message_received)
if __name__ == "__main__":
threading.Thread(target=message_api_request.start_listener).start()
asyncio.run(main())
threading.Thread(target=message_api_request.start_listener).start()
asyncio.run(main())

View file

@ -2,221 +2,197 @@ import argparse
import json
import sys
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt
from geometry.Geometry import Geometry
def parse_posix_time(value):
try:
return int(value)
except ValueError:
raise argparse.ArgumentTypeError("Invalid POSIX time format")
try:
return int(value)
except ValueError:
raise argparse.ArgumentTypeError("Invalid POSIX time format")
def parse_command_line_arguments():
parser = argparse.ArgumentParser(description="Process command line arguments.")
parser.add_argument("json_file", type=str, help="Input JSON file path")
parser.add_argument("target_name", type=str, help="Target name")
parser.add_argument("--start_time", type=parse_posix_time, help="Optional start time in POSIX seconds")
parser.add_argument("--stop_time", type=parse_posix_time, help="Optional stop time in POSIX seconds")
return parser.parse_args()
parser = argparse.ArgumentParser(description="Process command line arguments.")
parser.add_argument("json_file", type=str, help="Input JSON file path")
parser.add_argument("target_name", type=str, help="Target name")
parser.add_argument("--start_time", type=parse_posix_time, help="Optional start time in POSIX seconds")
parser.add_argument("--stop_time", type=parse_posix_time, help="Optional stop time in POSIX seconds")
return parser.parse_args()
def interpolate_positions(timestamp_vector, truth_timestamp, truth_position):
# Convert lists to NumPy arrays for easier manipulation
truth_timestamp = np.array(truth_timestamp)
truth_position = np.array(truth_position)
# Interpolate positions for the new timestamp vector
interpolated_positions = np.zeros((len(timestamp_vector), truth_position.shape[1]))
# convert lists to NumPy arrays for easier manipulation
truth_timestamp = np.array(truth_timestamp)
truth_position = np.array(truth_position)
for i in range(truth_position.shape[1]):
interpolated_positions[:, i] = np.interp(timestamp_vector, truth_timestamp, truth_position[:, i])
return interpolated_positions
# interpolate positions for the new timestamp vector
interpolated_positions = np.zeros((len(timestamp_vector), truth_position.shape[1]))
for i in range(truth_position.shape[1]):
interpolated_positions[:, i] = np.interp(timestamp_vector, truth_timestamp, truth_position[:, i])
return interpolated_positions
def calculate_rmse(actual_values, predicted_values):
# Convert lists to NumPy arrays for easy calculations
actual_values = np.array(actual_values)
predicted_values = np.array(predicted_values)
# Calculate the squared differences
squared_diff = (actual_values - predicted_values) ** 2
# convert to numpy arrays
actual_values = np.array(actual_values)
predicted_values = np.array(predicted_values)
# Calculate the mean squared error
mean_squared_error = np.mean(squared_diff)
# rms error
squared_diff = (actual_values - predicted_values) ** 2
mean_squared_error = np.mean(squared_diff)
rmse = np.sqrt(mean_squared_error)
# Calculate the root mean squared error
rmse = np.sqrt(mean_squared_error)
return rmse
return rmse
def main():
# input handling
args = parse_command_line_arguments()
json_data = []
with open(args.json_file, 'r') as json_file:
for line in json_file:
try:
json_object = json.loads(line)
json_data.append(json_object)
except json.JSONDecodeError:
print(f"Error decoding JSON from line: {line}")
json_data = [item for item in json_data if item]
start_time = args.start_time if args.start_time else None
stop_time = args.stop_time if args.stop_time else None
print("JSON String (Last Non-Empty Data):", json_data[-1])
print("Target Name:", args.target_name)
print("Start Time:", start_time)
print("Stop Time:", stop_time)
# input handling
args = parse_command_line_arguments()
json_data = []
with open(args.json_file, 'r') as json_file:
for line in json_file:
try:
json_object = json.loads(line)
json_data.append(json_object)
except json.JSONDecodeError:
print(f"Error decoding JSON from line: {line}")
json_data = [item for item in json_data if item]
start_time = args.start_time if args.start_time else None
stop_time = args.stop_time if args.stop_time else None
print("JSON String (Last Non-Empty Data):", json_data[-1])
print("Target Name:", args.target_name)
print("Start Time:", start_time)
print("Stop Time:", stop_time)
# get LLA coords from first radar
radar4_lla = [-34.91041, 138.68924, 210]
# get LLA coords from first radar or Adelaide CBD
radar4_lla = [-34.9286, 138.5999, 50]
# extract data of interest
server = json_data[0][0]["server"]
timestamp = []
position = {}
detected = {}
truth_timestamp = []
truth_position = []
for item in json_data:
# extract data of interest
server = json_data[0][0]["server"]
timestamp = []
position = {}
detected = {}
truth_timestamp = []
truth_position = []
for item in json_data:
for method in item:
if method["server"] != server:
continue
if start_time and method["timestamp_event"]/1000 < start_time:
continue
if stop_time and method["timestamp_event"]/1000 > stop_time:
continue
for method in item:
# store target data
method_localisation = method["localisation"]
if method_localisation not in position:
position[method_localisation] = {}
position[method_localisation]["timestamp"] = []
position[method_localisation]["detections"] = []
else:
if args.target_name in method["detections_localised"] and \
len(method["detections_localised"][args.target_name]["points"]) > 0:
position[method_localisation]["timestamp"].append(
method["timestamp_event"]/1000)
position[method_localisation]["detections"].append(
method["detections_localised"][args.target_name]["points"][0])
# covert to ENU
x, y, z = Geometry.lla2ecef(
position[method_localisation]["detections"][-1][0],
position[method_localisation]["detections"][-1][1],
position[method_localisation]["detections"][-1][2])
x, y, z = Geometry.ecef2enu(x, y, z, radar4_lla[0],
radar4_lla[1], radar4_lla[2])
if not "detections_enu" in position[method_localisation]:
position[method_localisation]["detections_enu"] = []
position[method_localisation]["detections_enu"].append([x, y, z])
if method["server"] != server:
continue
# store truth data
if args.target_name in method["truth"]:
truth_timestamp.append(
method["truth"][args.target_name]["timestamp"])
truth_position.append([
method["truth"][args.target_name]["lat"],
method["truth"][args.target_name]["lon"],
method["truth"][args.target_name]["alt"]])
if start_time and method["timestamp_event"]/1000 < start_time:
continue
# store event timestamp
timestamp.append(method["timestamp_event"])
if stop_time and method["timestamp_event"]/1000 > stop_time:
continue
# remove duplicates in truth data
timestamp = list(dict.fromkeys(timestamp))
timestamp = [element/1000 for element in timestamp]
truth_timestamp_unique = []
truth_position_unique = []
for t, p in zip(truth_timestamp, truth_position):
if t not in truth_timestamp_unique:
truth_timestamp_unique.append(t)
truth_position_unique.append(p)
truth_timestamp = truth_timestamp_unique
truth_position = truth_position_unique
# store target data
method_localisation = method["localisation"]
# resample truth to event time (position already sampled correct)
for i in reversed(range(len(timestamp))):
if timestamp[i] < min(truth_timestamp) or timestamp[i] > max(truth_timestamp):
del timestamp[i]
truth_position_resampled = interpolate_positions(
timestamp, truth_timestamp, truth_position)
# override skip a method
#if method_localisation == "spherical-intersection":
#continue
# convert truth to ENU
truth_position_resampled_enu = []
for pos in truth_position_resampled:
x, y, z = Geometry.lla2ecef(pos[0], pos[1], pos[2])
truth_position_resampled_enu.append(
Geometry.ecef2enu(x, y, z,
radar4_lla[0], radar4_lla[1], radar4_lla[2]))
if method_localisation not in position:
position[method_localisation] = {}
position[method_localisation]["timestamp"] = []
position[method_localisation]["detections"] = []
else:
if args.target_name in method["detections_localised"] and \
len(method["detections_localised"][args.target_name]["points"]) > 0:
position[method_localisation]["timestamp"].append(
method["timestamp_event"]/1000)
position[method_localisation]["detections"].append(
method["detections_localised"][args.target_name]["points"][0])
# covert to ENU
x, y, z = Geometry.lla2ecef(
position[method_localisation]["detections"][-1][0],
position[method_localisation]["detections"][-1][1],
position[method_localisation]["detections"][-1][2])
x, y, z = Geometry.ecef2enu(x, y, z, radar4_lla[0],
radar4_lla[1], radar4_lla[2])
if not "detections_enu" in position[method_localisation]:
position[method_localisation]["detections_enu"] = []
position[method_localisation]["detections_enu"].append([x, y, z])
# store truth data
if args.target_name in method["truth"]:
truth_timestamp.append(
method["truth"][args.target_name]["timestamp"])
truth_position.append([
method["truth"][args.target_name]["lat"],
method["truth"][args.target_name]["lon"],
method["truth"][args.target_name]["alt"]])
timestamp.append(method["timestamp_event"])
# remove duplicates in truth data
timestamp = list(dict.fromkeys(timestamp))
timestamp = [element/1000 for element in timestamp]
truth_timestamp_unique = []
truth_position_unique = []
for t, p in zip(truth_timestamp, truth_position):
if t not in truth_timestamp_unique:
truth_timestamp_unique.append(t)
truth_position_unique.append(p)
truth_timestamp = truth_timestamp_unique
truth_position = truth_position_unique
# resample truth to event time (position already sampled correct)
for i in reversed(range(len(timestamp))):
if timestamp[i] < min(truth_timestamp) or timestamp[i] > max(truth_timestamp):
del timestamp[i]
truth_position_resampled = interpolate_positions(
timestamp, truth_timestamp, truth_position)
# convert truth to ENU
truth_position_resampled_enu = []
for pos in truth_position_resampled:
x, y, z = Geometry.lla2ecef(pos[0], pos[1], pos[2])
truth_position_resampled_enu.append(
Geometry.ecef2enu(x, y, z,
radar4_lla[0], radar4_lla[1], radar4_lla[2]))
# plot x, y, z
#plt.figure(figsize=(5,7))
position2 = {}
position2["ellipse-parametric-mean"] = position["ellipse-parametric-mean"]
position2["ellipsoid-parametric-mean"] = position["ellipsoid-parametric-mean"]
position2["spherical-intersection"] = position["spherical-intersection"]
mark = ['x', 'o', 's']
position_reord = ["ellipse-parametric-mean", "ellipsoid-parametric-mean", "spherical-intersection"]
fig, axes = plt.subplots(3, 1, figsize=(5, 7), sharex=True)
# plot x, y, z
mark = ['x', 'o', 's']
position_reord = ["ellipse-parametric-mean", "ellipsoid-parametric-mean", "spherical-intersection"]
fig, axes = plt.subplots(3, 1, figsize=(5, 7), sharex=True)
for i in range(3):
yaxis_truth = [pos[i] for pos in truth_position_resampled_enu]
plt.subplot(3, 1, i+1)
plt.plot(timestamp, yaxis_truth, label="ADS-B Truth")
for method in position_reord:
if "detections_enu" not in position[method]:
continue
for i in range(3):
yaxis_truth = [pos[i] for pos in truth_position_resampled_enu]
plt.subplot(3, 1, i+1)
plt.plot(timestamp, yaxis_truth, label="ADS-B Truth")
for method in position_reord:
print(position[method])
if "detections_enu" not in position[method]:
continue
for i in range(3):
#print(position)
yaxis_target = [pos[i] for pos in position[method]["detections_enu"]]
plt.subplot(3, 1, i+1)
plt.plot(position[method]["timestamp"], yaxis_target, marker=mark[i], label=method)
plt.xlabel('Timestamp')
if i == 0:
plt.ylabel('ENU X (m)')
if i == 1:
plt.ylabel('ENU Y (m)')
if i == 2:
plt.ylabel('ENU Z (m)')
yaxis_target = [pos[i] for pos in position[method]["detections_enu"]]
plt.subplot(3, 1, i+1)
plt.plot(position[method]["timestamp"],
yaxis_target, marker=mark[i], label=method)
plt.xlabel('Timestamp')
if i == 0:
plt.ylabel('ENU X (m)')
if i == 1:
plt.ylabel('ENU Y (m)')
if i == 2:
plt.ylabel('ENU Z (m)')
plt.subplot(3, 1, 1)
plt.legend(prop = {"size": 8})
plt.tight_layout()
filename = 'plot_accuracy_' + args.target_name + '.png'
plt.savefig('save/' + filename, bbox_inches='tight', pad_inches=0.01)
plt.subplot(3, 1, 1)
plt.legend(prop = {"size": 8})
plt.tight_layout()
filename = 'plot_accuracy_' + args.target_name + '.png'
plt.savefig('save/' + filename, bbox_inches='tight', pad_inches=0.01)
# save tabular data
table = {}
for method in position:
if "detections_enu" not in position[method]:
continue
table[method] = {}
for i in range(3):
yaxis_truth = np.array([pos[i] for pos in truth_position_resampled_enu])
matching_indices = np.isin(np.array(timestamp), np.array(position[method]["timestamp"]))
yaxis_truth_target = yaxis_truth[matching_indices]
yaxis_target = [pos[i] for pos in position[method]["detections_enu"]]
table[method][str(i)] = calculate_rmse(yaxis_target, yaxis_truth_target)
#print('test')
#print(yaxis_target)
#print(yaxis_truth_target)
print(table)
# save tabular data
table = {}
for method in position:
if "detections_enu" not in position[method]:
continue
table[method] = {}
for i in range(3):
yaxis_truth = np.array([pos[i] for pos in truth_position_resampled_enu])
matching_indices = np.isin(np.array(timestamp), np.array(position[method]["timestamp"]))
yaxis_truth_target = yaxis_truth[matching_indices]
yaxis_target = [pos[i] for pos in position[method]["detections_enu"]]
table[method][str(i)] = calculate_rmse(yaxis_target, yaxis_truth_target)
print(table)
if __name__ == "__main__":
main()
main()

View file

@ -2,113 +2,101 @@ import argparse
import json
import sys
from datetime import datetime
import numpy as np
import matplotlib.pyplot as plt
from geometry.Geometry import Geometry
def parse_posix_time(value):
try:
return int(value)
except ValueError:
raise argparse.ArgumentTypeError("Invalid POSIX time format")
try:
return int(value)
except ValueError:
raise argparse.ArgumentTypeError("Invalid POSIX time format")
def parse_command_line_arguments():
parser = argparse.ArgumentParser(description="Process command line arguments.")
parser.add_argument("json_file", type=str, help="Input JSON file path")
parser.add_argument("target_name", type=str, help="Target name")
parser.add_argument("--start_time", type=parse_posix_time, help="Optional start time in POSIX seconds")
parser.add_argument("--stop_time", type=parse_posix_time, help="Optional stop time in POSIX seconds")
return parser.parse_args()
parser = argparse.ArgumentParser(description="Process command line arguments.")
parser.add_argument("json_file", type=str, help="Input JSON file path")
parser.add_argument("target_name", type=str, help="Target name")
parser.add_argument("--start_time", type=parse_posix_time, help="Optional start time in POSIX seconds")
parser.add_argument("--stop_time", type=parse_posix_time, help="Optional stop time in POSIX seconds")
return parser.parse_args()
def main():
# input handling
args = parse_command_line_arguments()
json_data = []
with open(args.json_file, 'r') as json_file:
for line in json_file:
try:
json_object = json.loads(line)
json_data.append(json_object)
except json.JSONDecodeError:
print(f"Error decoding JSON from line: {line}")
json_data = [item for item in json_data if item]
start_time = args.start_time if args.start_time else None
stop_time = args.stop_time if args.stop_time else None
print("JSON String (Last Non-Empty Data):", json_data[-1])
print("Target Name:", args.target_name)
print("Start Time:", start_time)
print("Stop Time:", stop_time)
# input handling
args = parse_command_line_arguments()
json_data = []
with open(args.json_file, 'r') as json_file:
for line in json_file:
try:
json_object = json.loads(line)
json_data.append(json_object)
except json.JSONDecodeError:
print(f"Error decoding JSON from line: {line}")
json_data = [item for item in json_data if item]
start_time = args.start_time if args.start_time else None
stop_time = args.stop_time if args.stop_time else None
print("JSON String (Last Non-Empty Data):", json_data[-1])
print("Target Name:", args.target_name)
print("Start Time:", start_time)
print("Stop Time:", stop_time)
# extract data of interest
server = json_data[0][0]["server"]
timestamp = []
associated = {}
for item in json_data:
# extract data of interest
server = json_data[0][0]["server"]
timestamp = []
associated = {}
for item in json_data:
first_result = item[0]
if first_result["server"] != server:
print('error')
sys.exit(-1)
if start_time and first_result["timestamp_event"]/1000 < start_time:
continue
if stop_time and first_result["timestamp_event"]/1000 > stop_time:
continue
# store association data
if "detections_associated" in first_result:
if args.target_name in first_result["detections_associated"]:
for radar in first_result["detections_associated"][args.target_name]:
if radar['radar'] not in associated:
associated[radar['radar']] = []
else:
associated[radar['radar']].append(first_result["timestamp_event"])
timestamp.append(first_result["timestamp_event"])
first_result = item[0]
# data massaging
timestamp = list(dict.fromkeys(timestamp))
associated = dict(sorted(associated.items(), key=lambda x: x[0]))
radars = list(associated.keys())
radar_label = []
for radar in radars:
radar_label.append(radar.split('.', 1)[0])
if first_result["server"] != server:
print('error')
sys.exit(-1)
# get start and stop times from data
start_time = min(min(arr) for arr in associated.values())
stop_time = max(max(arr) for arr in associated.values())
timestamp = [value for value in timestamp if value >= start_time]
timestamp = [value for value in timestamp if value <= stop_time]
if start_time and first_result["timestamp_event"]/1000 < start_time:
continue
data = []
for radar in radars:
result = [1 if value in associated[radar] else 0 for value in timestamp]
data.append(result)
if stop_time and first_result["timestamp_event"]/1000 > stop_time:
continue
# store association data
if "detections_associated" in first_result:
if args.target_name in first_result["detections_associated"]:
for radar in first_result["detections_associated"][args.target_name]:
if radar['radar'] not in associated:
associated[radar['radar']] = []
else:
associated[radar['radar']].append(first_result["timestamp_event"])
timestamp.append(first_result["timestamp_event"])
# data massaging
timestamp = list(dict.fromkeys(timestamp))
associated = dict(sorted(associated.items(), key=lambda x: x[0]))
radars = list(associated.keys())
radar_label = []
for radar in radars:
radar_label.append(radar.split('.', 1)[0])
# get start and stop times from data
start_time = min(min(arr) for arr in associated.values())
stop_time = max(max(arr) for arr in associated.values())
timestamp = [value for value in timestamp if value >= start_time]
timestamp = [value for value in timestamp if value <= stop_time]
print(associated)
data = []
for radar in radars:
result = [1 if value in associated[radar] else 0 for value in timestamp]
data.append(result)
print(data)
# plot x, y, z
plt.figure(figsize=(8,4))
img = plt.imshow(data, aspect='auto', interpolation='none')
y_extent = plt.gca().get_ylim()
img.set_extent([start_time/1000, stop_time/1000, y_extent[1], y_extent[0]])
plt.yticks(np.arange(len(radar_label)), radar_label[::-1], rotation='vertical')
plt.xlabel('Timestamp')
plt.ylabel('Radar')
plt.tight_layout()
filename = 'plot_associate_' + args.target_name + '.png'
plt.savefig('save/' + filename, bbox_inches='tight', pad_inches=0.01)
# plot x, y, z
plt.figure(figsize=(8,4))
img = plt.imshow(data, aspect='auto', interpolation='none')
y_extent = plt.gca().get_ylim()
img.set_extent([start_time/1000, stop_time/1000, y_extent[1], y_extent[0]])
plt.yticks(np.arange(len(radar_label)), radar_label[::-1], rotation='vertical')
plt.xlabel('Timestamp')
plt.ylabel('Radar')
plt.tight_layout()
filename = 'plot_associate_' + args.target_name + '.png'
plt.savefig('save/' + filename, bbox_inches='tight', pad_inches=0.01)
if __name__ == "__main__":
main()
main()

View file

@ -3,46 +3,47 @@ from algorithm.geometry.Geometry import Geometry
class TestGeometry(unittest.TestCase):
def test_lla2ecef(self):
# Test case 1
result = Geometry.lla2ecef(-34.9286, 138.5999, 50)
self.assertAlmostEqual(result[0], -3926830.77177051, places=3)
self.assertAlmostEqual(result[1], 3461979.19806774, places=3)
self.assertAlmostEqual(result[2], -3631404.11418915, places=3)
def test_lla2ecef(self):
# Test case 2
result = Geometry.lla2ecef(0, 0, 0)
self.assertAlmostEqual(result[0], 6378137.0, places=3)
self.assertAlmostEqual(result[1], 0, places=3)
self.assertAlmostEqual(result[2], 0, places=3)
# test case 1
result = Geometry.lla2ecef(-34.9286, 138.5999, 50)
self.assertAlmostEqual(result[0], -3926830.77177051, places=3)
self.assertAlmostEqual(result[1], 3461979.19806774, places=3)
self.assertAlmostEqual(result[2], -3631404.11418915, places=3)
# Add more test cases as needed
# test case 2
result = Geometry.lla2ecef(0, 0, 0)
self.assertAlmostEqual(result[0], 6378137.0, places=3)
self.assertAlmostEqual(result[1], 0, places=3)
self.assertAlmostEqual(result[2], 0, places=3)
def test_ecef2lla(self):
# Test case 1
result = Geometry.ecef2lla(-3926830.77177051, 3461979.19806774, -3631404.11418915)
self.assertAlmostEqual(result[0], -34.9286, places=4)
self.assertAlmostEqual(result[1], 138.5999, places=4)
self.assertAlmostEqual(result[2], 50, places=3)
def test_ecef2lla(self):
# Test case 2
result = Geometry.ecef2lla(6378137.0, 0, 0)
self.assertAlmostEqual(result[0], 0, places=4)
self.assertAlmostEqual(result[1], 0, places=4)
self.assertAlmostEqual(result[2], 0, places=3)
# test case 1
result = Geometry.ecef2lla(-3926830.77177051, 3461979.19806774, -3631404.11418915)
self.assertAlmostEqual(result[0], -34.9286, places=4)
self.assertAlmostEqual(result[1], 138.5999, places=4)
self.assertAlmostEqual(result[2], 50, places=3)
def test_enu2ecef(self):
# test case 2
result = Geometry.ecef2lla(6378137.0, 0, 0)
self.assertAlmostEqual(result[0], 0, places=4)
self.assertAlmostEqual(result[1], 0, places=4)
self.assertAlmostEqual(result[2], 0, places=3)
result = Geometry.enu2ecef(0, 0, 0, -34.9286, 138.5999, 50)
self.assertAlmostEqual(result[0], -3926830.77177051, places=3)
self.assertAlmostEqual(result[1], 3461979.19806774, places=3)
self.assertAlmostEqual(result[2], -3631404.11418915, places=3)
def test_enu2ecef(self):
result = Geometry.enu2ecef(-1000, 2000, 3000, -34.9286, 138.5999, 50)
self.assertAlmostEqual(result[0], -3928873.3865007, places=3)
self.assertAlmostEqual(result[1], 3465113.14948365, places=3)
self.assertAlmostEqual(result[2], -3631482.0474089, places=3)
# test case 1
result = Geometry.enu2ecef(0, 0, 0, -34.9286, 138.5999, 50)
self.assertAlmostEqual(result[0], -3926830.77177051, places=3)
self.assertAlmostEqual(result[1], 3461979.19806774, places=3)
self.assertAlmostEqual(result[2], -3631404.11418915, places=3)
# test case 2
result = Geometry.enu2ecef(-1000, 2000, 3000, -34.9286, 138.5999, 50)
self.assertAlmostEqual(result[0], -3928873.3865007, places=3)
self.assertAlmostEqual(result[1], 3465113.14948365, places=3)
self.assertAlmostEqual(result[2], -3631482.0474089, places=3)
if __name__ == '__main__':
unittest.main()
unittest.main()