mirror of
https://github.com/uhi22/pyPLC.git
synced 2024-11-10 01:05:42 +00:00
feature: added claralogConverter
This commit is contained in:
parent
c18be1f852
commit
f7b14fb42e
1 changed files with 173 additions and 0 deletions
173
claralogConverter.py
Normal file
173
claralogConverter.py
Normal file
|
@ -0,0 +1,173 @@
|
||||||
|
|
||||||
|
# claralogConverter
|
||||||
|
#
|
||||||
|
# This little helper tool reads a text log file containing V2G messages (e.g. from https://github.com/uhi22/ccs32clara) and
|
||||||
|
# interprets the content of the EXI data.
|
||||||
|
#
|
||||||
|
# Preconditions:
|
||||||
|
# 1. You have a log file which contains V2G traffic (e.g. from https://openinverter.org/forum/viewtopic.php?p=64655#p64655)
|
||||||
|
# 2. You cloned and compiled the OpenV2Gx EXI decoder from https://github.com/uhi22/OpenV2Gx
|
||||||
|
#
|
||||||
|
# Limitations:
|
||||||
|
# - Only DIN is supported at the moment.
|
||||||
|
# - The script treats all V2G EXI messages as DIN messages. This means, the ApplHandshake messages at
|
||||||
|
# the begin of the charging session will lead to wrong or not decoded data.
|
||||||
|
# - The path where the script look for pcap files needs to be configured in the code.
|
||||||
|
#
|
||||||
|
# Possible improvements / Todos:
|
||||||
|
# - Show also the SLAC, NeigborDiscovery and SDP.
|
||||||
|
# - Add flexibility to also decode the ApplHandshake messages.
|
||||||
|
# - Add ISO support.
|
||||||
|
# - Configure the path where to look for pcap files via command line
|
||||||
|
#
|
||||||
|
|
||||||
|
import exiConnector
|
||||||
|
import os
|
||||||
|
from helpers import combineValueAndMultiplier
|
||||||
|
import json
|
||||||
|
|
||||||
|
# The path where the script will search for pcap files:
|
||||||
|
directory = 'local/pcaps_to_convert'
|
||||||
|
|
||||||
|
# stop the evaluation after this number of packets. Set to zero to have no limit.
|
||||||
|
nLimitNumberOfPackets = -1
|
||||||
|
|
||||||
|
|
||||||
|
def getManufacturerFromMAC(strMAC):
|
||||||
|
# Examples based on https://macvendors.com/, and https://www.ipchecktool.com/tool/macfinder and own experience
|
||||||
|
if (strMAC[0:5]=="ec:a2"):
|
||||||
|
return "Kempower"
|
||||||
|
if (strMAC[0:8]=="dc:44:27"):
|
||||||
|
return "Tesla"
|
||||||
|
if (strMAC[0:8]=="ce:25:1a"):
|
||||||
|
return "Alpitronic"
|
||||||
|
if (strMAC[0:8]=="1a:a9:8e"):
|
||||||
|
return "Alpitronic"
|
||||||
|
if (strMAC[0:8]=="e8:eb:1b"):
|
||||||
|
return "Microchip (maybe ABB)"
|
||||||
|
if (strMAC[0:8]=="68:27:19"):
|
||||||
|
return "Microchip (maybe ABB)"
|
||||||
|
if (strMAC[0:8]=="80:1f:12"):
|
||||||
|
return "Microchip (maybe Compleo)"
|
||||||
|
if (strMAC[0:5]=="18:d7"):
|
||||||
|
return "(maybe Siemens)"
|
||||||
|
return "(unknown vendor)"
|
||||||
|
|
||||||
|
# Examples of the decoder result:
|
||||||
|
#"EVSEPresentVoltage.Multiplier": "0",
|
||||||
|
#"EVSEPresentVoltage.Value": "318",
|
||||||
|
#"EVSEPresentVoltage.Unit": "V",
|
||||||
|
#"DC_EVStatus.EVRESSSOC": "53",
|
||||||
|
|
||||||
|
def convertClaralogToTxt(inputFileName):
|
||||||
|
global nLimitNumberOfPackets
|
||||||
|
global directory
|
||||||
|
fileIn = open(inputFileName, 'r')
|
||||||
|
fileOut = open(inputFileName + '.decoded.txt', 'w')
|
||||||
|
print("# generated by claralogConverter.py", file=fileOut)
|
||||||
|
print("# https://github.com/uhi22/pyPLC", file=fileOut)
|
||||||
|
fileOutValues = open(inputFileName + '.values.txt', 'w')
|
||||||
|
print("# generated by claralogConverter.py", file=fileOutValues)
|
||||||
|
print("# https://github.com/uhi22/pyPLC", file=fileOutValues)
|
||||||
|
fileOutStatistics = open(directory + '/pcap_statistics.txt', 'a')
|
||||||
|
print("# statistics for " + inputFileName, file=fileOutStatistics)
|
||||||
|
t1CableCheckBegin = 0
|
||||||
|
t2PreChargeBegin = 0
|
||||||
|
t3CurrentDemandBegin = 0
|
||||||
|
numberOfPackets=0
|
||||||
|
Lines = fileIn.readlines()
|
||||||
|
for line in Lines:
|
||||||
|
numberOfPackets+=1
|
||||||
|
#print(packet)
|
||||||
|
print(line.strip(), file=fileOut)
|
||||||
|
posTcpPayload = line.find(": 01 fe 80 01 00")
|
||||||
|
if posTcpPayload>0:
|
||||||
|
# we found the V2GTP header.
|
||||||
|
tcppayload = line[posTcpPayload + 2:] # everything until the end of the line is the data
|
||||||
|
# this gives a string of hex values, separated by " ", e.g. "01 fe 80 01"
|
||||||
|
s = tcppayload.replace(" ", "") # remove spaces
|
||||||
|
if (s[0:8]=="01fe8001"):
|
||||||
|
# it is a V2GTP header with EXI content
|
||||||
|
strExi = s[16:] # remove V2GTP header (8 bytes, means 16 hex characters)
|
||||||
|
pre = "DD" # decode DIN
|
||||||
|
decoded=exiConnector.exiDecode(strExi, pre)
|
||||||
|
#print(decoded)
|
||||||
|
print(decoded, file=fileOut)
|
||||||
|
if (decoded.find("SessionSetupReq")>0):
|
||||||
|
if ((t1CableCheckBegin>0) and (t2PreChargeBegin>t1CableCheckBegin) and (t3CurrentDemandBegin>t2PreChargeBegin)):
|
||||||
|
print("charger MAC " + chargerMAC + " " + getManufacturerFromMAC(chargerMAC))
|
||||||
|
timeForCableCheck = t2PreChargeBegin - t1CableCheckBegin
|
||||||
|
timeForPreCharge = t3CurrentDemandBegin - t2PreChargeBegin
|
||||||
|
print("timeForCableCheck= " + ("%.3f" % timeForCableCheck))
|
||||||
|
print("timeForPreCharge= " + ("%.3f" % timeForPreCharge))
|
||||||
|
print(chargerMAC + ";" + getManufacturerFromMAC(chargerMAC) + ";" + \
|
||||||
|
"timeForCableCheck;" + ("%.3f" % timeForCableCheck) + ";" + \
|
||||||
|
"timeForPreCharge; " + ("%.3f" % timeForPreCharge), file=fileOutStatistics)
|
||||||
|
t1CableCheckBegin = 0
|
||||||
|
t2PreChargeBegin = 0
|
||||||
|
t3CurrentDemandBegin = 0
|
||||||
|
if (decoded.find("CableCheckReq")>0) and (t1CableCheckBegin==0):
|
||||||
|
t1CableCheckBegin = float(packet.sniff_timestamp)
|
||||||
|
chargerMAC = str(packet.eth.dst)
|
||||||
|
if (decoded.find("PreChargeReq")>0) and (t2PreChargeBegin==0):
|
||||||
|
t2PreChargeBegin = float(packet.sniff_timestamp)
|
||||||
|
if (decoded.find("CurrentDemandReq")>0) and (t3CurrentDemandBegin==0):
|
||||||
|
t3CurrentDemandBegin = float(packet.sniff_timestamp)
|
||||||
|
try:
|
||||||
|
jsondict = json.loads(decoded)
|
||||||
|
try:
|
||||||
|
u = combineValueAndMultiplier(jsondict["EVSEPresentVoltage.Value"], jsondict["EVSEPresentVoltage.Multiplier"])
|
||||||
|
print("[" + str(packet.sniff_time) + "] EVSEPresentVoltage=" + str(u), file=fileOutValues)
|
||||||
|
i = combineValueAndMultiplier(jsondict["EVSEPresentCurrent.Value"], jsondict["EVSEPresentCurrent.Multiplier"])
|
||||||
|
print("[" + str(packet.sniff_time) + "] EVSEPresentCurrent=" + str(i), file=fileOutValues)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
u = combineValueAndMultiplier(jsondict["EVTargetVoltage.Value"], jsondict["EVTargetVoltage.Multiplier"])
|
||||||
|
print("[" + str(packet.sniff_time) + "] EVTargetVoltage=" + str(u), file=fileOutValues)
|
||||||
|
i = combineValueAndMultiplier(jsondict["EVTargetCurrent.Value"], jsondict["EVTargetCurrent.Multiplier"])
|
||||||
|
print("[" + str(packet.sniff_time) + "] EVTargetCurrent=" + str(i), file=fileOutValues)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
try:
|
||||||
|
soc = jsondict["DC_EVStatus.EVRESSSOC"]
|
||||||
|
print("[" + str(packet.sniff_time) + "] EVRESSSOC=" + str(soc), file=fileOutValues)
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
if ((numberOfPackets % 100)==0):
|
||||||
|
print(str(numberOfPackets) + " packets")
|
||||||
|
if ((nLimitNumberOfPackets>0) and (numberOfPackets>=nLimitNumberOfPackets)):
|
||||||
|
break
|
||||||
|
# Statistics of the timing:
|
||||||
|
#print("t1CableCheckBegin " + str(t1CableCheckBegin))
|
||||||
|
#print("t2PreChargeBegin " + str(t2PreChargeBegin))
|
||||||
|
#print("t3CurrentDemandBegin " + str(t3CurrentDemandBegin))
|
||||||
|
if ((t1CableCheckBegin>0) and (t2PreChargeBegin>t1CableCheckBegin) and (t3CurrentDemandBegin>t2PreChargeBegin)):
|
||||||
|
print("charger MAC " + chargerMAC + " " + getManufacturerFromMAC(chargerMAC))
|
||||||
|
timeForCableCheck = t2PreChargeBegin - t1CableCheckBegin
|
||||||
|
timeForPreCharge = t3CurrentDemandBegin - t2PreChargeBegin
|
||||||
|
print("timeForCableCheck= " + ("%.3f" % timeForCableCheck))
|
||||||
|
print("timeForPreCharge= " + ("%.3f" % timeForPreCharge))
|
||||||
|
|
||||||
|
print(chargerMAC + ";" + getManufacturerFromMAC(chargerMAC) + ";" + \
|
||||||
|
"timeForCableCheck;" + ("%.3f" % timeForCableCheck) + ";" + \
|
||||||
|
"timeForPreCharge; " + ("%.3f" % timeForPreCharge), file=fileOutStatistics)
|
||||||
|
|
||||||
|
fileOutStatistics.close()
|
||||||
|
fileOut.close()
|
||||||
|
fileOutValues.close()
|
||||||
|
fileIn.close()
|
||||||
|
|
||||||
|
|
||||||
|
# iterate over files in the directory
|
||||||
|
for filename in os.listdir(directory):
|
||||||
|
strFileNameWithPath = os.path.join(directory, filename)
|
||||||
|
# checking if it is a file
|
||||||
|
if os.path.isfile(strFileNameWithPath):
|
||||||
|
print(strFileNameWithPath)
|
||||||
|
# check the file extension:
|
||||||
|
if (strFileNameWithPath[-9:]==".claralog"):
|
||||||
|
print("Will decode " + strFileNameWithPath)
|
||||||
|
convertClaralogToTxt(strFileNameWithPath)
|
Loading…
Reference in a new issue