# pcap Converter # # This little helper tool reads a network trace (e.g. recorded with wireshark) and # interprets the content of the EXI.V2GTP.TCP.IPv6 data. # # Preconditions: # 1. You have a capture file with contains V2G traffic (pcap or pcapng file). # 2. You installed the python library pyshark, according to https://github.com/KimiNewt/pyshark/ # pip install pyshark # 3. You cloned and compiled the OpenV2Gx EXI decoder from https://github.com/uhi22/OpenV2Gx # # Limitations: # - Only DIN is supported at the moment. # - The script treats all V2G EXI messages as DIN messages. This means, the ApplHandshake messages at # the begin of the charging session will lead to wrong or not decoded data. # - The path where the script look for pcap files needs to be configured in the code. # # Possible improvements / Todos: # - Show also the SLAC, NeigborDiscovery and SDP. # - Add flexibility to also decode the ApplHandshake messages. # - Add ISO support. # - Configure the path where to look for pcap files via command line # import pyshark import exiConnector import os from helpers import combineValueAndMultiplier import json # The path where the script will search for pcap files: directory = '../temp' #"EVSEPresentVoltage.Multiplier": "0", #"EVSEPresentVoltage.Value": "318", #"EVSEPresentVoltage.Unit": "V", #"DC_EVStatus.EVRESSSOC": "53", def convertPcapToTxt(inputFileName): cap = pyshark.FileCapture(inputFileName, display_filter="ipv6") fileOut = open(inputFileName + '.decoded.txt', 'w') print("# generated by pcapConverter.py", file=fileOut) print("# https://github.com/uhi22/pyPLC", file=fileOut) fileOutValues = open(inputFileName + '.values.txt', 'w') print("# generated by pcapConverter.py", file=fileOutValues) print("# https://github.com/uhi22/pyPLC", file=fileOutValues) # Example how to access the data: #print(cap) #print(cap[0]) #print(cap[1]) #print(dir(cap[1])) #print(cap[1].sniff_time) # readable time #print(cap[1].sniff_timestamp) # epoch time numberOfPackets=0 for packet in cap: numberOfPackets+=1 #print(packet) if 'TCP' in packet: #print(packet.tcp.field_names) if ('payload' in packet.tcp.field_names): tcppayload = packet.tcp.payload # this gives a string of hex values, separated by ":", e.g. "01:fe:80:01" s = tcppayload.replace(":", "") # remove colons if (s[0:8]=="01fe8001"): # it is a V2GTP header with EXI content strExi = s[16:] # remove V2GTP header (8 bytes, means 16 hex characters) sHeader = "Packet #" + str(numberOfPackets) + " [" + str(packet.sniff_time) + "] " + strExi + " means:" pre = "DD" # decode DIN decoded=exiConnector.exiDecode(strExi, pre) #print(sHeader) #print(decoded) print(sHeader, file=fileOut) print(decoded, file=fileOut) try: y = json.loads(decoded) try: u = combineValueAndMultiplier(y["EVSEPresentVoltage.Value"], y["EVSEPresentVoltage.Multiplier"]) print("[" + str(packet.sniff_time) + "] EVSEPresentVoltage=" + str(u), file=fileOutValues) i = combineValueAndMultiplier(y["EVSEPresentCurrent.Value"], y["EVSEPresentCurrent.Multiplier"]) print("[" + str(packet.sniff_time) + "] EVSEPresentCurrent=" + str(i), file=fileOutValues) except: pass try: soc = y["DC_EVStatus.EVRESSSOC"] print("[" + str(packet.sniff_time) + "] EVRESSSOC=" + str(soc), file=fileOutValues) except: pass except: pass if ((numberOfPackets % 100)==0): print(str(numberOfPackets) + " packets") #if (numberOfPackets>=1000): # break fileOut.close() fileOutValues.close() # iterate over files in the directory for filename in os.listdir(directory): strFileNameWithPath = os.path.join(directory, filename) # checking if it is a file if os.path.isfile(strFileNameWithPath): print(strFileNameWithPath) # check the file extension: if (strFileNameWithPath[-5:]==".pcap") or (strFileNameWithPath[-7:]==".pcapng"): print("Will decode " + strFileNameWithPath) convertPcapToTxt(strFileNameWithPath)