From f7b14fb42e016ba1d04262fd412a018336d9a7ce Mon Sep 17 00:00:00 2001 From: uhi22 Date: Sun, 10 Dec 2023 23:22:20 +0100 Subject: [PATCH] feature: added claralogConverter --- claralogConverter.py | 173 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 claralogConverter.py diff --git a/claralogConverter.py b/claralogConverter.py new file mode 100644 index 0000000..5a4d077 --- /dev/null +++ b/claralogConverter.py @@ -0,0 +1,173 @@ + +# claralogConverter +# +# This little helper tool reads a text log file containing V2G messages (e.g. from https://github.com/uhi22/ccs32clara) and +# interprets the content of the EXI data. +# +# Preconditions: +# 1. You have a log file which contains V2G traffic (e.g. from https://openinverter.org/forum/viewtopic.php?p=64655#p64655) +# 2. You cloned and compiled the OpenV2Gx EXI decoder from https://github.com/uhi22/OpenV2Gx +# +# Limitations: +# - Only DIN is supported at the moment. +# - The script treats all V2G EXI messages as DIN messages. This means, the ApplHandshake messages at +# the begin of the charging session will lead to wrong or not decoded data. +# - The path where the script look for pcap files needs to be configured in the code. +# +# Possible improvements / Todos: +# - Show also the SLAC, NeigborDiscovery and SDP. +# - Add flexibility to also decode the ApplHandshake messages. +# - Add ISO support. +# - Configure the path where to look for pcap files via command line +# + +import exiConnector +import os +from helpers import combineValueAndMultiplier +import json + +# The path where the script will search for pcap files: +directory = 'local/pcaps_to_convert' + +# stop the evaluation after this number of packets. Set to zero to have no limit. +nLimitNumberOfPackets = -1 + + +def getManufacturerFromMAC(strMAC): + # Examples based on https://macvendors.com/, and https://www.ipchecktool.com/tool/macfinder and own experience + if (strMAC[0:5]=="ec:a2"): + return "Kempower" + if (strMAC[0:8]=="dc:44:27"): + return "Tesla" + if (strMAC[0:8]=="ce:25:1a"): + return "Alpitronic" + if (strMAC[0:8]=="1a:a9:8e"): + return "Alpitronic" + if (strMAC[0:8]=="e8:eb:1b"): + return "Microchip (maybe ABB)" + if (strMAC[0:8]=="68:27:19"): + return "Microchip (maybe ABB)" + if (strMAC[0:8]=="80:1f:12"): + return "Microchip (maybe Compleo)" + if (strMAC[0:5]=="18:d7"): + return "(maybe Siemens)" + return "(unknown vendor)" + +# Examples of the decoder result: +#"EVSEPresentVoltage.Multiplier": "0", +#"EVSEPresentVoltage.Value": "318", +#"EVSEPresentVoltage.Unit": "V", +#"DC_EVStatus.EVRESSSOC": "53", + +def convertClaralogToTxt(inputFileName): + global nLimitNumberOfPackets + global directory + fileIn = open(inputFileName, 'r') + fileOut = open(inputFileName + '.decoded.txt', 'w') + print("# generated by claralogConverter.py", file=fileOut) + print("# https://github.com/uhi22/pyPLC", file=fileOut) + fileOutValues = open(inputFileName + '.values.txt', 'w') + print("# generated by claralogConverter.py", file=fileOutValues) + print("# https://github.com/uhi22/pyPLC", file=fileOutValues) + fileOutStatistics = open(directory + '/pcap_statistics.txt', 'a') + print("# statistics for " + inputFileName, file=fileOutStatistics) + t1CableCheckBegin = 0 + t2PreChargeBegin = 0 + t3CurrentDemandBegin = 0 + numberOfPackets=0 + Lines = fileIn.readlines() + for line in Lines: + numberOfPackets+=1 + #print(packet) + print(line.strip(), file=fileOut) + posTcpPayload = line.find(": 01 fe 80 01 00") + if posTcpPayload>0: + # we found the V2GTP header. + tcppayload = line[posTcpPayload + 2:] # everything until the end of the line is the data + # this gives a string of hex values, separated by " ", e.g. "01 fe 80 01" + s = tcppayload.replace(" ", "") # remove spaces + if (s[0:8]=="01fe8001"): + # it is a V2GTP header with EXI content + strExi = s[16:] # remove V2GTP header (8 bytes, means 16 hex characters) + pre = "DD" # decode DIN + decoded=exiConnector.exiDecode(strExi, pre) + #print(decoded) + print(decoded, file=fileOut) + if (decoded.find("SessionSetupReq")>0): + if ((t1CableCheckBegin>0) and (t2PreChargeBegin>t1CableCheckBegin) and (t3CurrentDemandBegin>t2PreChargeBegin)): + print("charger MAC " + chargerMAC + " " + getManufacturerFromMAC(chargerMAC)) + timeForCableCheck = t2PreChargeBegin - t1CableCheckBegin + timeForPreCharge = t3CurrentDemandBegin - t2PreChargeBegin + print("timeForCableCheck= " + ("%.3f" % timeForCableCheck)) + print("timeForPreCharge= " + ("%.3f" % timeForPreCharge)) + print(chargerMAC + ";" + getManufacturerFromMAC(chargerMAC) + ";" + \ + "timeForCableCheck;" + ("%.3f" % timeForCableCheck) + ";" + \ + "timeForPreCharge; " + ("%.3f" % timeForPreCharge), file=fileOutStatistics) + t1CableCheckBegin = 0 + t2PreChargeBegin = 0 + t3CurrentDemandBegin = 0 + if (decoded.find("CableCheckReq")>0) and (t1CableCheckBegin==0): + t1CableCheckBegin = float(packet.sniff_timestamp) + chargerMAC = str(packet.eth.dst) + if (decoded.find("PreChargeReq")>0) and (t2PreChargeBegin==0): + t2PreChargeBegin = float(packet.sniff_timestamp) + if (decoded.find("CurrentDemandReq")>0) and (t3CurrentDemandBegin==0): + t3CurrentDemandBegin = float(packet.sniff_timestamp) + try: + jsondict = json.loads(decoded) + try: + u = combineValueAndMultiplier(jsondict["EVSEPresentVoltage.Value"], jsondict["EVSEPresentVoltage.Multiplier"]) + print("[" + str(packet.sniff_time) + "] EVSEPresentVoltage=" + str(u), file=fileOutValues) + i = combineValueAndMultiplier(jsondict["EVSEPresentCurrent.Value"], jsondict["EVSEPresentCurrent.Multiplier"]) + print("[" + str(packet.sniff_time) + "] EVSEPresentCurrent=" + str(i), file=fileOutValues) + except: + pass + try: + u = combineValueAndMultiplier(jsondict["EVTargetVoltage.Value"], jsondict["EVTargetVoltage.Multiplier"]) + print("[" + str(packet.sniff_time) + "] EVTargetVoltage=" + str(u), file=fileOutValues) + i = combineValueAndMultiplier(jsondict["EVTargetCurrent.Value"], jsondict["EVTargetCurrent.Multiplier"]) + print("[" + str(packet.sniff_time) + "] EVTargetCurrent=" + str(i), file=fileOutValues) + except: + pass + try: + soc = jsondict["DC_EVStatus.EVRESSSOC"] + print("[" + str(packet.sniff_time) + "] EVRESSSOC=" + str(soc), file=fileOutValues) + except: + pass + except: + pass + if ((numberOfPackets % 100)==0): + print(str(numberOfPackets) + " packets") + if ((nLimitNumberOfPackets>0) and (numberOfPackets>=nLimitNumberOfPackets)): + break + # Statistics of the timing: + #print("t1CableCheckBegin " + str(t1CableCheckBegin)) + #print("t2PreChargeBegin " + str(t2PreChargeBegin)) + #print("t3CurrentDemandBegin " + str(t3CurrentDemandBegin)) + if ((t1CableCheckBegin>0) and (t2PreChargeBegin>t1CableCheckBegin) and (t3CurrentDemandBegin>t2PreChargeBegin)): + print("charger MAC " + chargerMAC + " " + getManufacturerFromMAC(chargerMAC)) + timeForCableCheck = t2PreChargeBegin - t1CableCheckBegin + timeForPreCharge = t3CurrentDemandBegin - t2PreChargeBegin + print("timeForCableCheck= " + ("%.3f" % timeForCableCheck)) + print("timeForPreCharge= " + ("%.3f" % timeForPreCharge)) + + print(chargerMAC + ";" + getManufacturerFromMAC(chargerMAC) + ";" + \ + "timeForCableCheck;" + ("%.3f" % timeForCableCheck) + ";" + \ + "timeForPreCharge; " + ("%.3f" % timeForPreCharge), file=fileOutStatistics) + + fileOutStatistics.close() + fileOut.close() + fileOutValues.close() + fileIn.close() + + +# iterate over files in the directory +for filename in os.listdir(directory): + strFileNameWithPath = os.path.join(directory, filename) + # checking if it is a file + if os.path.isfile(strFileNameWithPath): + print(strFileNameWithPath) + # check the file extension: + if (strFileNameWithPath[-9:]==".claralog"): + print("Will decode " + strFileNameWithPath) + convertClaralogToTxt(strFileNameWithPath)