first draft for IPv6 plus SECC Discovery Protocol

This commit is contained in:
uhi22 2022-10-20 21:29:02 +02:00
parent aecff0444e
commit ad08284824
4 changed files with 376 additions and 38 deletions

19
helpers.py Normal file
View file

@ -0,0 +1,19 @@
def twoCharHex(b):
strHex = "%0.2X" % b
return strHex
def showAsHex(mybytearray, description=""):
packetlength = len(mybytearray)
strHex = ""
for i in range(0, packetlength):
strHex = strHex + twoCharHex(mybytearray[i]) + " "
print(description + "(" + str(packetlength) + "bytes) = " + strHex)
def prettyMac(macByteArray):
s=""
for i in range(0, 5):
s = s + twoCharHex(macByteArray[i]) + ":"
s = s + twoCharHex(macByteArray[i])
return s

View file

@ -34,24 +34,8 @@
import pcap
def twoCharHex(b):
strHex = "%0.2X" % b
return strHex
def showAsHex(mybytearray):
packetlength = len(mybytearray)
strHex = ""
for i in range(0, packetlength):
strHex = strHex + twoCharHex(mybytearray[i]) + " "
print("len " + str(packetlength) + " data " + strHex)
def prettyMac(macByteArray):
s=""
for i in range(0, 5):
s = s + twoCharHex(macByteArray[i]) + ":"
s = s + twoCharHex(macByteArray[i])
return s
import pyPlcIpv6
from helpers import * # prettyMac etc
MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ]
MAC_LAPTOP = [0xdc, 0x0e, 0xa1, 0x11, 0x67, 0x08 ]
@ -117,14 +101,12 @@ class pyPlcHomeplug():
print("From " + strSourceMac + strSourceFriendlyName + " to " + strDestMac)
def isHomeplug(self, mybytearray):
blIsHomePlug=False
if len(mybytearray)>(6+6+2):
protocol=mybytearray[12]*256 + mybytearray[13]
if (protocol == 0x88E1):
blIsHomePlug=True
# print("HomePlug protocol")
return blIsHomePlug
def getEtherType(self, messagebufferbytearray):
etherType=0
if len(messagebufferbytearray)>(6+6+2):
etherType=messagebufferbytearray[12]*256 + messagebufferbytearray[13]
return etherType
def fillSourceMac(self, mac, offset=6): # at offset 6 in the ethernet frame, we have the source MAC
# we can give a different offset, to re-use the MAC also in the data area
@ -424,31 +406,34 @@ class pyPlcHomeplug():
if (selection=="1"):
self.composeSlacParamReq()
self.addToTrace("transmitting SLAC_PARAM.REQ...")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
self.transmit(self.mytransmitbuffer)
if (selection=="2"):
self.composeSlacParamCnf()
self.addToTrace("transmitting SLAC_PARAM.CNF...")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
self.transmit(self.mytransmitbuffer)
if (selection=="S"):
self.composeGetSwReq()
self.addToTrace("transmitting GetSwReq...")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
self.transmit(self.mytransmitbuffer)
if (selection=="s"):
self.composeSetKey(0)
self.addToTrace("transmitting SET_KEY.REQ (key 0)")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
self.transmit(self.mytransmitbuffer)
if (selection=="t"):
self.composeSetKey(2)
self.addToTrace("transmitting SET_KEY.REQ (key 2)")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
self.transmit(self.mytransmitbuffer)
if (selection=="D"):
self.composeDHCP()
self.addToTrace("transmitting broken DHCP")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
self.transmit(self.mytransmitbuffer)
if (selection=="G"):
self.composeGetKey()
self.addToTrace("transmitting GET_KEY")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
self.transmit(self.mytransmitbuffer)
def transmit(self, pkt):
self.sniffer.sendpacket(bytes(pkt))
def evaluateGetKeyCnf(self):
# The getkey response contains the Network ID (NID), even if the request was rejected. We store the NID,
@ -594,6 +579,8 @@ class pyPlcHomeplug():
self.NID = [ 1, 2, 3, 4, 5, 6, 7 ] # a default network ID
self.pevMac = [0x55, 0x56, 0x57, 0x58, 0x59, 0x5A ] # a default pev MAC
self.runningCounter=0
self.ipv6 = pyPlcIpv6.ipv6handler(self.transmit)
self.ipv6.ownMac = MAC_RANDOM
self.enterEvseMode()
self.showStatus(prettyMac(self.pevMac), "pevmac")
print("sniffer created at " + self.strInterfaceName)
@ -609,16 +596,16 @@ class pyPlcHomeplug():
for ts, pkt in self.sniffer: # attention: for using this in non-blocking manner, we need the patch described above.
self.nPacketsReceived+=1
# print('%d' % (ts)) # the time stamp
if (self.isHomeplug(pkt)):
etherType = self.getEtherType(pkt)
if (etherType == 0x88E1): # it is a HomePlug message
self.myreceivebuffer = pkt
# self.showMacAddresses(pkt)
self.evaluateReceivedHomeplugPacket()
if (etherType == 0x86dd): # it is an IPv6 frame
self.ipv6.evaluateReceivedPacket(pkt)
self.showStatus("nPacketsReceived=" + str(self.nPacketsReceived))
def close(self):
self.sniffer.close()
#sn = pyPlcHomeplug()
#while (1):
# print("Press control-C to stop")
# sn.mainfunction()

177
pyPlcIpv6.py Normal file
View file

@ -0,0 +1,177 @@
# This module handles the IPv6 related functionality of the communication between charging station and car.
#
# It has the following sub-functionalities:
# - IP.UDP.SDP: listen to requests from the car, and responding to them.
#
# Abbreviations:
# SECC: Supply Equipment Communication Controller. The "computer" of the charging station.
# EVCC: Electric Vehicle Communication Controller. The "computer" of the vehicle.
# SDP: SECC Discovery Protocol. The UDP based protocol to find out the IP address of the charging station.
# SLAAC: Stateless auto address configuration (not SLAC!). A method to automatically set IPv6 address, based
# on the 6 byte MAC address.
from helpers import showAsHex
class ipv6handler():
def fillMac(self, macbytearray, position=6): # position 6 is the source MAC
for i in range(0, 6):
self.EthResponse[6+i] = macbytearray[i]
def packResponseIntoEthernet(self, buffer):
# packs the IP packet into an ethernet packet
self.EthResponse = bytearray(len(buffer) + 6 + 6 + 2) # Ethernet header needs 14 bytes:
# 6 bytes MAC
# 6 bytes MAC
# 2 bytes EtherType
self.EthResponse[0] = 0x33 # destination MAC. We use multicast. Todo: Better use the PEV MAC.
self.EthResponse[1] = 0x33
self.EthResponse[2] = 0x00
self.EthResponse[3] = 0x00
self.EthResponse[4] = 0x00
self.EthResponse[5] = 0x01
self.fillMac(self.ownMac) # bytes 6 to 11 are the source MAC
self.EthResponse[12] = 0x86 # 86dd is IPv6
self.EthResponse[13] = 0xdd
for i in range(0, len(buffer)):
self.EthResponse[14+i] = buffer[i]
self.transmit(self.EthResponse)
def packResponseIntoIp(self, buffer):
# embeds the (SDP) response into the lower-layer-protocol: IP, Ethernet
self.IpResponse = bytearray(len(buffer) + 8 + 16 + 16) # IP6 needs 40 bytes:
# 4 bytes traffic class, flow
# 2 bytes destination port
# 2 bytes length (incl checksum)
# 2 bytes checksum
self.IpResponse[0] = 0x60 # traffic class, flow
self.IpResponse[1] = 0
self.IpResponse[2] = 0
self.IpResponse[3] = 0
plen = len(buffer) # length of the payload. Without headers.
self.IpResponse[4] = plen >> 8
self.IpResponse[5] = plen & 0xFF
self.IpResponse[6] = 0x11 # next level protocol, 0x11 = UDP in this case
self.IpResponse[7] = 0x0A # hop limit
for i in range(0, 16):
self.IpResponse[8+i] = self.SeccIp[i] # source IP address
for i in range(0, 16):
self.IpResponse[24+i] = self.EvccIp[i] # destination IP address
for i in range(0, len(buffer)):
self.IpResponse[40+i] = buffer[i]
showAsHex(self.IpResponse, "IP response ")
self.packResponseIntoEthernet(self.IpResponse)
def packResponseIntoUdp(self, buffer):
# embeds the (SDP) response into the lower-layer-protocol: UDP
self.UdpResponse = bytearray(len(buffer) + 8) # UDP needs 8 bytes:
# 2 bytes source port
# 2 bytes destination port
# 2 bytes length (incl checksum)
# 2 bytes checksum
self.UdpResponse[0] = 15118 >> 8
self.UdpResponse[1] = 15118 & 0xFF
self.UdpResponse[2] = self.evccPort >> 8
self.UdpResponse[3] = self.evccPort & 0xFF
lenInclChecksum = len(buffer) + 8
self.UdpResponse[4] = lenInclChecksum >> 8
self.UdpResponse[5] = lenInclChecksum & 0xFF
checksum = 0x1234 # todo: calculate this checksum
self.UdpResponse[6] = checksum >> 8
self.UdpResponse[7] = checksum & 0xFF
for i in range(0, len(buffer)):
self.UdpResponse[8+i] = buffer[i]
showAsHex(self.UdpResponse, "UDP response ")
self.packResponseIntoIp(self.UdpResponse)
def prepareSdpResponse(self):
# SECC Discovery Response.
# The response from the charger to the EV, which tells the chargers IPv6 address to the EV.
self.SdpPayload = bytearray(20)
for i in range(0, 16):
self.SdpPayload[i] = self.SeccIp[i] # 16 bytes IP address of the charger
self.SdpPayload[16] = 15118 >> 8 # SECC port high byte. Port is always 15118.
self.SdpPayload[17] = 15118 & 0xFF # SECC port low byte. Port is always 15118.
self.SdpPayload[18] = 0x10 # security. We only support "no transport layer security, 0x10".
self.SdpPayload[19] = 0x00 # transport protocol. We only support "TCP, 0x00".
showAsHex(self.SdpPayload, "SDP payload ")
# add the SDP header
lenSdp = len(self.SdpPayload)
self.V2Gframe = bytearray(lenSdp + 8) # V2GTP header needs 8 bytes:
# 1 byte protocol version
# 1 byte protocol version inverted
# 2 bytes payload type
# 4 byte payload length
self.V2Gframe[0] = 0x01 # version
self.V2Gframe[1] = 0xfe # version inverted
self.V2Gframe[2] = 0x90 # payload type. 0x9001 is the SDP response message
self.V2Gframe[3] = 0x01 #
self.V2Gframe[4] = (lenSdp >> 24) & 0xff # length 4 byte.
self.V2Gframe[5] = (lenSdp >> 16) & 0xff
self.V2Gframe[6] = (lenSdp >> 8) & 0xff
self.V2Gframe[7] = lenSdp & 0xff
for i in range(0, lenSdp):
self.V2Gframe[8+i] = self.SdpPayload[i]
showAsHex(self.V2Gframe, "V2Gframe ")
self.packResponseIntoUdp(self.V2Gframe)
def evaluateUdp(self):
if (self.destinationport == 15118): # port for the SECC
if ((self.udpPayload[0]==0x01) and (self.udpPayload[1]==0xFE)): # protocol version 1 and inverted
# it is a V2GTP message
self.evccPort = self.sourceport
v2gptPayloadType = self.udpPayload[2] * 256 + self.udpPayload[3]
# 0x8001 EXI encoded V2G message
# 0x9000 SDP request message (SECC Discovery)
# 0x9001 SDP response message (SECC response to the EVCC)
if (v2gptPayloadType == 0x9000):
v2gptPayloadLen = self.udpPayload[4] * 256 ** 3 + self.udpPayload[5] * 256 ** 2 + self.udpPayload[6] * 256 + self.udpPayload[7]
if (v2gptPayloadLen == 2):
# 2 is the only valid length for a SDP request.
seccDiscoveryReqSecurity = self.udpPayload[8] # normally 0x10 for "no transport layer security". Or 0x00 for "TLS".
seccDiscoveryReqTransportProtocol = self.udpPayload[9] # normally 0x00 for TCP
if (seccDiscoveryReqSecurity!=0x10):
print("seccDiscoveryReqSecurity " + str(seccDiscoveryReqSecurity) + " is not supported")
else:
if (seccDiscoveryReqTransportProtocol!=0x00):
print("seccDiscoveryReqTransportProtocol " + str(seccDiscoveryReqTransportProtocol) + " is not supported")
else:
# This was a valid SDP request. Let's respond.
showAsHex(self.udpPayload, "udp payload ")
print("ok, this was a valid SDP request. Will respond.")
self.prepareSdpResponse()
else:
print("v2gptPayloadLen on SDP request is " + str(v2gptPayloadLen) + " not supported")
else:
print("v2gptPayloadType " + hex(v2gptPayloadType) + " not supported")
def evaluateReceivedPacket(self, pkt):
if (len(pkt)>60):
self.myreceivebuffer = pkt
self.nextheader = self.myreceivebuffer[20]
if (self.nextheader == 0x11): # it is an UDP frame
self.sourceport = self.myreceivebuffer[54] * 256 + self.myreceivebuffer[55]
self.destinationport = self.myreceivebuffer[56] * 256 + self.myreceivebuffer[57]
self.udplen = self.myreceivebuffer[58] * 256 + self.myreceivebuffer[59]
self.udpsum = self.myreceivebuffer[60] * 256 + self.myreceivebuffer[61]
# udplen is including 8 bytes header at the begin
if (self.udplen>8):
self.udpPayload = bytearray(self.udplen-8)
print("self.udplen=" + str(self.udplen))
print("self.myreceivebuffer len=" + str(len(self.myreceivebuffer)))
for i in range(0, self.udplen-8):
#print("index " + str(i) + " " + hex(self.myreceivebuffer[62+i]))
self.udpPayload[i] = self.myreceivebuffer[62+i]
self.evaluateUdp()
def __init__(self, transmitCallback):
self.transmit = transmitCallback
self.SeccIp = [ 0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0x06, 0xaa, 0xaa, 0xff, 0xfe, 0, 0xaa, 0xaa ] # 16 bytes, a default IPv6 address for the charging station
self.EvccIp = [ 0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0x06, 0x65, 0x65, 0xff, 0xfe, 0, 0x64, 0xC3 ] # 16 bytes, a default IPv6 address for the vehicle
self.ownMac = [ 0x01, 0x02, 0x03, 0x04, 0x05, 0x06 ] # 6 bytes own MAC default. Should be overwritten before use.

155
tests/tester.py Normal file
View file

@ -0,0 +1,155 @@
# test
import pcap
import time
MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ]
MAC_LAPTOP = [0xdc, 0x0e, 0xa1, 0x11, 0x67, 0x08 ]
MAC_RANDOM = [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff ]
MAC_ALPI = [0x0A, 0x19, 0x4A, 0x39, 0xD6, 0x98 ] # alpitronics
MAC_TPLINK_E4 = [0x98, 0x48, 0x27, 0x5A, 0x3C, 0xE4 ] # TPlink PLC adaptor
MAC_TPLINK_E6 = [0x98, 0x48, 0x27, 0x5A, 0x3C, 0xE6 ] # TPlink PLC adaptor
MAC_DEVOLO_26 = [0xBC, 0xF2, 0xAF, 0x0B, 0x8E, 0x26 ] # Devolo PLC adaptor
MAC_IPv6MCAST1 = [0x33, 0x33, 0x00, 0x00, 0x00, 0x01 ] # IPv6 multicast MAC
MAC_RANDCAR = [0x04, 0x65, 0x65, 0x00, 0xaf, 0xfe ] # random hyundai car
class tester():
def cleanTransmitBuffer(self): # fill the complete ethernet transmit buffer with 0x00
for i in range(0, len(self.mytransmitbuffer)):
self.mytransmitbuffer[i]=0
def fillSourceMac(self, mac, offset=6): # at offset 6 in the ethernet frame, we have the source MAC
# we can give a different offset, to re-use the MAC also in the data area
for i in range(0, 6):
self.mytransmitbuffer[offset+i]=mac[i]
def fillDestinationMac(self, mac, offset=0): # at offset 0 in the ethernet frame, we have the destination MAC
# we can give a different offset, to re-use the MAC also in the data area
for i in range(0, 6):
self.mytransmitbuffer[offset+i]=mac[i]
def sendTestFrame1(self):
self.mytransmitbuffer = bytearray(72)
self.cleanTransmitBuffer()
# Destination MAC
self.fillDestinationMac(MAC_IPv6MCAST1)
# Source MAC
self.fillSourceMac(MAC_RANDCAR)
# Protocol
self.mytransmitbuffer[12]=0x86 # IPv6
self.mytransmitbuffer[13]=0xdd
self.mytransmitbuffer[14]=0x60 #
self.mytransmitbuffer[15]=0x00 #
self.mytransmitbuffer[16]=0x00 #
self.mytransmitbuffer[17]=0x00 #
self.mytransmitbuffer[18]=0x00 # len 2 bytes
self.mytransmitbuffer[19]=0x12 #
self.mytransmitbuffer[20]=0x11 # next is UDP
self.mytransmitbuffer[21]=0x0A # hop limit
self.mytransmitbuffer[22]=0xfe # 22 to 37 ip source address
self.mytransmitbuffer[23]=0x80 # 22 to 37 ip source address
self.mytransmitbuffer[24]=0x00 # 22 to 37 ip source address
self.mytransmitbuffer[25]=0x00 # 22 to 37 ip source address
self.mytransmitbuffer[26]=0x00 # 22 to 37 ip source address
self.mytransmitbuffer[27]=0x00 # 22 to 37 ip source address
self.mytransmitbuffer[28]=0x00 # 22 to 37 ip source address
self.mytransmitbuffer[29]=0x00 # 22 to 37 ip source address
self.mytransmitbuffer[30]=0x06 # 22 to 37 ip source address
self.mytransmitbuffer[31]=0x65 # 22 to 37 ip source address
self.mytransmitbuffer[32]=0x65 # 22 to 37 ip source address
self.mytransmitbuffer[33]=0xff # 22 to 37 ip source address
self.mytransmitbuffer[34]=0xfe # 22 to 37 ip source address
self.mytransmitbuffer[35]=0x00 # 22 to 37 ip source address
self.mytransmitbuffer[36]=0x64 # 22 to 37 ip source address
self.mytransmitbuffer[37]=0xc3 # 22 to 37 ip source address
self.mytransmitbuffer[38]=0xff # 38 to 53 ip destination address
self.mytransmitbuffer[39]=0x02 # 38 to 53 ip destination address
self.mytransmitbuffer[53]=0x01 # 38 to 53 ip destination address
self.mytransmitbuffer[54]=0xcc # source port
self.mytransmitbuffer[55]=0xab
self.mytransmitbuffer[56]=0x3b # dest port
self.mytransmitbuffer[57]=0x0e
self.mytransmitbuffer[58]=0x00 # length
self.mytransmitbuffer[59]=0x12
self.mytransmitbuffer[60]= 0x89 # checksum
self.mytransmitbuffer[61]= 0x62
self.mytransmitbuffer[62]= 0x01
self.mytransmitbuffer[63]= 0xFE
self.mytransmitbuffer[64]= 0x90
self.mytransmitbuffer[65]= 0x00
self.mytransmitbuffer[66]= 0x00
self.mytransmitbuffer[67]= 0x00
self.mytransmitbuffer[68]= 0x00
self.mytransmitbuffer[69]= 0x02
self.mytransmitbuffer[70]= 0x10
self.mytransmitbuffer[71]= 0x00
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
print("transmitted test frame 1")
def findEthernetAdaptor(self):
self.strInterfaceName="eth0" # default, if the real is not found
print("Interfaces:\n" + '\n'.join(pcap.findalldevs()))
for i in range(0, 10):
strInterfaceName = pcap.ex_name("eth"+str(i))
if (strInterfaceName == '\\Device\\NPF_{E4B8176C-8516-4D48-88BC-85225ABCF259}'):
print("This is the wanted Ethernet adaptor.")
self.strInterfaceName="eth"+str(i)
print("eth"+ str(i) + " is " + strInterfaceName)
def __init__(self):
self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8')
self.nPacketsReceived = 0
#self.sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50)
# eth3 means: Third entry from back, in the list of interfaces, which is provided by pcap.findalldevs.
# Improvement necessary: select the interface based on the name.
# For debugging of the interface names, we can patch the file
# C:\Users\uwemi\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\pcap\_pcap_ex.py,
# in the function
# def name(name: bytes) -> bytes:
# in the place after
# if i == idx:
# print("index match at " + str(i) + " dev name=" + str(dev.name) + " dev.description=" + str(dev.description))
# This will print the description of the used interface.
#
# Patch for non-blocking read-iteration:
# in _pcap.py, function def __next__(self), in the case of timeout (if n==0), we need to "raise StopIteration" instead of "continue".
#
self.findEthernetAdaptor()
self.sniffer = pcap.pcap(name=self.strInterfaceName, promisc=True, immediate=True, timeout_ms=50)
self.sniffer.setnonblock(True)
print("sniffer created at " + self.strInterfaceName)
def mainfunction(self):
# print("will evaluate self.sniffer")
for ts, pkt in self.sniffer: # attention: for using this in non-blocking manner, we need the patch described above.
self.nPacketsReceived+=1
# print('%d' % (ts)) # the time stamp
def close(self):
self.sniffer.close()
t=tester()
print(256 ** 2)
for i in range(0, 100):
t.sendTestFrame1()
time.sleep(0.5)
t.close()