added TCP sniffer for EXI

This commit is contained in:
uhi22 2022-10-24 23:21:17 +02:00
parent 6c1264015c
commit 386994d7e2

View file

@ -131,7 +131,7 @@ class ipv6handler():
showAsHex(self.udpPayload, "V2GTP ") showAsHex(self.udpPayload, "V2GTP ")
self.evccPort = self.sourceport self.evccPort = self.sourceport
v2gptPayloadType = self.udpPayload[2] * 256 + self.udpPayload[3] v2gptPayloadType = self.udpPayload[2] * 256 + self.udpPayload[3]
# 0x8001 EXI encoded V2G message # 0x8001 EXI encoded V2G message (Will NOT come with UDP. Will come with TCP.)
# 0x9000 SDP request message (SECC Discovery) # 0x9000 SDP request message (SECC Discovery)
# 0x9001 SDP response message (SECC response to the EVCC) # 0x9001 SDP response message (SECC response to the EVCC)
if (v2gptPayloadType == 0x9000): if (v2gptPayloadType == 0x9000):
@ -166,7 +166,33 @@ class ipv6handler():
self.iAmEvse = 0 # not emulating a charging station self.iAmEvse = 0 # not emulating a charging station
self.iAmPev = 0 # not emulating a vehicle self.iAmPev = 0 # not emulating a vehicle
def evaluateV2GTP(self):
# We sniffed a V2GTP frame via TCP. This should contain an EXI encoded payload.
v2gptPayloadType = self.v2gframe[2] * 256 + self.v2gframe[3]
# 0x8001 EXI encoded V2G message
if (v2gptPayloadType == 0x8001):
self.ExiPacket = self.v2gframe[8:] # the exi payload, without the 8 bytes V2GTP header
print("EXI from " + str(self.tcpsourceport) + " to " + str(self.tcpdestinationport))
showAsHex(self.ExiPacket)
# Todo: further process the EXI packet. E.g. write it into file for offline analysis.
# And send it to decoder.
def evaluateTcpPacket(self):
# We received a TCP packet. We do NOT want to make real TCP here (the OS will do it much better). We
# just want to listen to the conversation of two others, and extract what we hear.
self.tcpsourceport = self.myreceivebuffer[54] * 256 + self.myreceivebuffer[55]
self.tcpdestinationport = self.myreceivebuffer[56] * 256 + self.myreceivebuffer[57]
if ((self.tcpsourceport == 15118) or (self.tcpdestinationport == 15118)):
if (len(self.myreceivebuffer)>=74+9): # 74 is the TCP without any data. A V2GTP has 8 bytes header, plus at least 1 payload byte.
startOfV2gtp = 74 # the index of the first V2GTP byte in the ethernet buffer
if ((self.myreceivebuffer[startOfV2gtp] == 0x01) and (self.myreceivebuffer[startOfV2gtp+1] == 0xFE)):
# version and inverted version of the V2GTP are fine -> it is a V2G TP frame.
self.v2gframe = self.myreceivebuffer[startOfV2gtp:]
self.evaluateV2GTP()
def evaluateReceivedPacket(self, pkt): def evaluateReceivedPacket(self, pkt):
# The evaluation function for received ipv6 packages.
if (len(pkt)>60): if (len(pkt)>60):
self.myreceivebuffer = pkt self.myreceivebuffer = pkt
self.nextheader = self.myreceivebuffer[20] self.nextheader = self.myreceivebuffer[20]
@ -184,6 +210,8 @@ class ipv6handler():
#print("index " + str(i) + " " + hex(self.myreceivebuffer[62+i])) #print("index " + str(i) + " " + hex(self.myreceivebuffer[62+i]))
self.udpPayload[i] = self.myreceivebuffer[62+i] self.udpPayload[i] = self.myreceivebuffer[62+i]
self.evaluateUdpPayload() self.evaluateUdpPayload()
if (self.nextheader == 0x06): # it is an TCP frame
self.evaluateTcpPacket()
def __init__(self, transmitCallback): def __init__(self, transmitCallback):
self.enterEvseMode() self.enterEvseMode()