mirror of
https://github.com/uhi22/pyPLC.git
synced 2024-11-10 01:05:42 +00:00
added extended status, added nid to set_key, made slac complete. Connection to Ioniq works.
This commit is contained in:
parent
12764110d5
commit
28a713c32e
3 changed files with 192 additions and 39 deletions
16
pyPlc.py
16
pyPlc.py
|
@ -26,8 +26,15 @@ def inkey():
|
|||
def cbAddToTrace(s):
|
||||
print(s)
|
||||
|
||||
def cbShowStatus(s):
|
||||
print(s)
|
||||
def cbShowStatus(s, selection=""):
|
||||
#print(s)
|
||||
if (selection == "mode"):
|
||||
lblMode['text']=s
|
||||
s=""
|
||||
if (selection == "pevmac"):
|
||||
lblPevMac['text']=s
|
||||
s=""
|
||||
if (len(s)>0):
|
||||
lblStatus['text']=s
|
||||
root.update()
|
||||
|
||||
|
@ -39,6 +46,11 @@ lblHelp = tk.Label(root, text="x=exit, t=testframe")
|
|||
lblHelp.pack()
|
||||
lblStatus = tk.Label(root, text="(Status)")
|
||||
lblStatus.pack()
|
||||
lblPevMac = tk.Label(root, text="(pev mac)")
|
||||
lblPevMac.pack()
|
||||
lblMode = tk.Label(root, text="(mode)")
|
||||
lblMode.pack()
|
||||
|
||||
# Bind the keyboard handler to all relevant elements:
|
||||
display.bind('<Key>', storekeyname)
|
||||
root.bind('<Key>', storekeyname)
|
||||
|
|
164
pyPlcHomeplug.py
164
pyPlcHomeplug.py
|
@ -46,10 +46,16 @@ def showAsHex(mybytearray):
|
|||
strHex = strHex + twoCharHex(mybytearray[i]) + " "
|
||||
print("len " + str(packetlength) + " data " + strHex)
|
||||
|
||||
def prettyMac(macByteArray):
|
||||
s=""
|
||||
for i in range(0, 5):
|
||||
s = s + twoCharHex(macByteArray[i]) + ":"
|
||||
s = s + twoCharHex(macByteArray[i])
|
||||
return s
|
||||
|
||||
MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ]
|
||||
MAC_LAPTOP = [0xdc, 0x0e, 0xa1, 0x11, 0x67, 0x08 ]
|
||||
MAC_RANDOM = [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff ]
|
||||
MAC_IONIQ = [0x04, 0x65, 0x65, 0x00, 0x64, 0xC3 ]
|
||||
MAC_ALPI = [0x0A, 0x19, 0x4A, 0x39, 0xD6, 0x98 ] # alpitronics
|
||||
MAC_TPLINK_E4 = [0x98, 0x48, 0x27, 0x5A, 0x3C, 0xE4 ] # TPlink PLC adaptor
|
||||
MAC_TPLINK_E6 = [0x98, 0x48, 0x27, 0x5A, 0x3C, 0xE6 ] # TPlink PLC adaptor
|
||||
|
@ -117,7 +123,7 @@ class pyPlcHomeplug():
|
|||
protocol=mybytearray[12]*256 + mybytearray[13]
|
||||
if (protocol == 0x88E1):
|
||||
blIsHomePlug=True
|
||||
print("HomePlug protocol")
|
||||
# print("HomePlug protocol")
|
||||
return blIsHomePlug
|
||||
|
||||
def fillSourceMac(self, mac, offset=6): # at offset 6 in the ethernet frame, we have the source MAC
|
||||
|
@ -141,7 +147,7 @@ class pyPlcHomeplug():
|
|||
|
||||
def setNidAt(self, index):
|
||||
# (b0f2e695666b03 was NID of TPlink)
|
||||
# copies the network ID (NID) into the wished position in the transmit buffer
|
||||
# copies the network ID (NID, 7 bytes) into the wished position in the transmit buffer
|
||||
for i in range(0, 7):
|
||||
self.mytransmitbuffer[index+i]=self.NID[i]
|
||||
|
||||
|
@ -206,7 +212,10 @@ class pyPlcHomeplug():
|
|||
self.mytransmitbuffer[30]=0x00 # 11
|
||||
self.mytransmitbuffer[31]=0x00 # 12 pmn
|
||||
self.mytransmitbuffer[32]=0x00 # 13 cco cap
|
||||
#self.setNidAt(33) # 14-20 nid 7 bytes from 33 to 39
|
||||
self.setNidAt(33) # 14-20 nid 7 bytes from 33 to 39
|
||||
# Network ID to be associated with the key distributed herein.
|
||||
# The 54 LSBs of this field contain the NID (refer to Section 3.4.3.1). The
|
||||
# two MSBs shall be set to 0b00.
|
||||
self.mytransmitbuffer[40]=0x01 # 21 peks (payload encryption key select) Table 11-83. 01 is NMK. We had 02 here, why???
|
||||
# with 0x0F we could choose "no key, payload is sent in the clear"
|
||||
self.setNmkAt(41)
|
||||
|
@ -256,7 +265,7 @@ class pyPlcHomeplug():
|
|||
# Destination MAC
|
||||
self.fillDestinationMac(MAC_BROADCAST)
|
||||
# Source MAC
|
||||
self.fillSourceMac(MAC_IONIQ)
|
||||
self.fillSourceMac(self.pevMac)
|
||||
# Protocol
|
||||
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
|
||||
self.mytransmitbuffer[13]=0xE1
|
||||
|
@ -281,7 +290,7 @@ class pyPlcHomeplug():
|
|||
self.mytransmitbuffer = bytearray(60)
|
||||
self.cleanTransmitBuffer()
|
||||
# Destination MAC
|
||||
self.fillDestinationMac(MAC_IONIQ)
|
||||
self.fillDestinationMac(self.pevMac)
|
||||
# Source MAC
|
||||
self.fillSourceMac(MAC_RANDOM)
|
||||
# Protocol
|
||||
|
@ -301,14 +310,76 @@ class pyPlcHomeplug():
|
|||
self.mytransmitbuffer[25]=0x0A # sound count
|
||||
self.mytransmitbuffer[26]=0x06 # timeout
|
||||
self.mytransmitbuffer[27]=0x01 # resptype
|
||||
self.fillDestinationMac(MAC_IONIQ, 28) # forwarding_sta, same as PEV MAC, plus 2 bytes 00 00
|
||||
self.fillDestinationMac(self.pevMac, 28) # forwarding_sta, same as PEV MAC, plus 2 bytes 00 00
|
||||
self.mytransmitbuffer[34]=0x00 #
|
||||
self.mytransmitbuffer[35]=0x00 #
|
||||
self.fillDestinationMac(MAC_IONIQ, 36) # runid, same as PEV MAC, plus 2 bytes 00 00
|
||||
self.fillDestinationMac(self.pevMac, 36) # runid, same as PEV MAC, plus 2 bytes 00 00
|
||||
self.mytransmitbuffer[42]=0x0 #
|
||||
self.mytransmitbuffer[43]=0x0 #
|
||||
# rest is 00
|
||||
|
||||
def composeAttenCharInd(self):
|
||||
self.mytransmitbuffer = bytearray(129)
|
||||
self.cleanTransmitBuffer()
|
||||
# Destination MAC
|
||||
self.fillDestinationMac(self.pevMac)
|
||||
# Source MAC
|
||||
self.fillSourceMac(MAC_RANDOM)
|
||||
# Protocol
|
||||
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
|
||||
self.mytransmitbuffer[13]=0xE1
|
||||
self.mytransmitbuffer[14]=0x01 # version
|
||||
self.mytransmitbuffer[15]=0x6E # ATTEN_CHAR.IND
|
||||
self.mytransmitbuffer[16]=0x60 #
|
||||
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
|
||||
self.mytransmitbuffer[18]=0x00 #
|
||||
self.mytransmitbuffer[19]=0x00 # apptype
|
||||
self.mytransmitbuffer[20]=0x00 # security
|
||||
self.fillDestinationMac(self.pevMac, 21) # The wireshark calls it source_mac, but alpitronic fills it with PEV mac. We use the PEV MAC.
|
||||
self.fillDestinationMac(self.pevMac, 27) # runid. The alpitronic fills it with the PEV mac, plus 00 00.
|
||||
self.mytransmitbuffer[35]=0x00 # 35 - 51 source_id, 17 bytes. The alpitronic fills it with 00
|
||||
|
||||
self.mytransmitbuffer[52]=0x00 # 52 - 68 response_id, 17 bytes. The alpitronic fills it with 00.
|
||||
self.mytransmitbuffer[69]=0x0A # Number of sounds. 10 in normal case. Should this be more flexible, e.g. using the counter from first MNBC_SOUND?
|
||||
self.mytransmitbuffer[70]=0x3A # Number of groups = 58. Should this be more flexible?
|
||||
for i in range(71, 129): # 71 to 128: The group attenuation for the 58 announced groups.
|
||||
self.mytransmitbuffer[i]=9 # Typical values are between 1 and 0x19. Since we have no real measurements from the AR7020,
|
||||
# we just simulate something. 0 seems to be interpreted as "defect", the IONIQ does not send
|
||||
# a positive response in this case.
|
||||
# higher attenuation for the higher frequencies, to be a little bit realistic (real data from alpitronic trace)
|
||||
self.mytransmitbuffer[126]=0x0f
|
||||
self.mytransmitbuffer[127]=0x13
|
||||
self.mytransmitbuffer[128]=0x19
|
||||
|
||||
def composeSlacMatchCnf(self):
|
||||
self.mytransmitbuffer = bytearray(109)
|
||||
self.cleanTransmitBuffer()
|
||||
# Destination MAC
|
||||
self.fillDestinationMac(self.pevMac)
|
||||
# Source MAC
|
||||
self.fillSourceMac(MAC_RANDOM)
|
||||
# Protocol
|
||||
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
|
||||
self.mytransmitbuffer[13]=0xE1
|
||||
self.mytransmitbuffer[14]=0x01 # version
|
||||
self.mytransmitbuffer[15]=0x7D # SLAC_MATCH.CNF
|
||||
self.mytransmitbuffer[16]=0x60 #
|
||||
self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented.
|
||||
self.mytransmitbuffer[18]=0x00 #
|
||||
self.mytransmitbuffer[19]=0x00 # apptype
|
||||
self.mytransmitbuffer[20]=0x00 # security
|
||||
self.mytransmitbuffer[21]=0x56 # length 2 byte
|
||||
self.mytransmitbuffer[22]=0x00 #
|
||||
# 23 - 39: pev_id 17 bytes. All zero in alpi/Ioniq trace.
|
||||
self.fillDestinationMac(self.pevMac, 40) # 40 - 45 pev_mac
|
||||
# 46 - 62: evse_id 17 bytes. All zero in alpi/Ioniq trace.
|
||||
self.fillSourceMac(MAC_RANDOM, 63) # 63 - 68 evse_mac
|
||||
self.fillDestinationMac(self.pevMac, 69) # 69-76 run_id. Is the ioniq mac plus 00 00.
|
||||
# 77 to 84 reserved 0
|
||||
self.setNidAt(85) # 85-91 NID. We can nearly freely choose this, but the upper two bits need to be zero
|
||||
# 92 reserved 0
|
||||
self.setNmkAt(93) # 93 to 108 NMK. We can freely choose this. Normally we should use a random number.
|
||||
|
||||
def composeDHCP(self):
|
||||
# DHCP discover, to check whether this "normal" package arrives on the other side
|
||||
self.mytransmitbuffer = bytearray(379)
|
||||
|
@ -363,8 +434,8 @@ class pyPlcHomeplug():
|
|||
self.addToTrace("transmitting GetSwReq...")
|
||||
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
|
||||
if (selection=="s"):
|
||||
self.composeSetKey(1)
|
||||
self.addToTrace("transmitting SET_KEY.REQ (key 1)")
|
||||
self.composeSetKey(0)
|
||||
self.addToTrace("transmitting SET_KEY.REQ (key 0)")
|
||||
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
|
||||
if (selection=="t"):
|
||||
self.composeSetKey(2)
|
||||
|
@ -383,25 +454,66 @@ class pyPlcHomeplug():
|
|||
# The getkey response contains the Network ID (NID), even if the request was rejected. We store the NID,
|
||||
# to have it available for the next request.
|
||||
s = ""
|
||||
for i in range(0, 7):
|
||||
for i in range(0, 7): # NID has 7 bytes
|
||||
self.NID[i] = self.myreceivebuffer[29+i]
|
||||
s=s+hex(self.NID[i])+ " "
|
||||
print("From GetKeyCnf, got network ID (NID) " + s)
|
||||
|
||||
def evaluateSetKeyCnf(self):
|
||||
# The Setkey confirmation
|
||||
# In spec, the result 0 means "success". But in reality, the 0 means: did not work. When it works,
|
||||
# then the LEDs are blinking (device is restarting), and the response is 1.
|
||||
result = self.myreceivebuffer[19]
|
||||
if (result == 0):
|
||||
self.addToTrace("SetKeyCnf says 0, this is a bad sign")
|
||||
else:
|
||||
self.addToTrace("SetKeyCnf says " + str(result) + ", this is formally 'rejected', but indeed ok.")
|
||||
|
||||
def evaluateSlacParamReq(self):
|
||||
# We received a SLAC_PARAM request from the PEV. This is the initiation of a SLAC procedure.
|
||||
# We extract the pev MAC from it.
|
||||
|
||||
for i in range(0, 6):
|
||||
self.pevMac[i] = self.myreceivebuffer[6+i]
|
||||
self.showStatus(prettyMac(self.pevMac), "pevmac")
|
||||
# If we want to emulate an EVSE, we want to answer.
|
||||
if (self.iAmEvse==1):
|
||||
self.composeSlacParamCnf()
|
||||
self.addToTrace("transmitting CM_SLAC_PARAM.CNF")
|
||||
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
|
||||
|
||||
def evaluateMnbcSoundInd(self):
|
||||
# We received MNBC_SOUND.IND from the PEV. Normally this happens 10times, with a countdown (remaining number of sounds)
|
||||
# running from 9 to 0. If the countdown is 0, this is the last message. In case we are the EVSE, we need
|
||||
# to answer with a ATTEN_CHAR.IND, which normally contains the attenuation for 10 sounds, 58 groups.
|
||||
if (self.iAmEvse==1):
|
||||
countdown = self.myreceivebuffer[38]
|
||||
if (countdown == 0):
|
||||
self.composeAttenCharInd()
|
||||
self.addToTrace("transmitting ATTEN_CHAR.IND")
|
||||
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
|
||||
|
||||
def evaluateSlacMatchReq(self):
|
||||
# We received SLAC_MATCH.REQ from the PEV.
|
||||
# If we are EVSE, we send the response.
|
||||
if (self.iAmEvse==1):
|
||||
self.composeSlacMatchCnf()
|
||||
self.addToTrace("transmitting SLAC_MATCH.CNF")
|
||||
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
|
||||
|
||||
|
||||
def evaluateSlacMatchCnf(self):
|
||||
# The SLAC_MATCH.CNF contains the NMK and the NID.
|
||||
# We extract this information, so that we can use it for the CM_SET_KEY afterwards.
|
||||
# References: https://github.com/qca/open-plc-utils/blob/master/slac/evse_cm_slac_match.c
|
||||
# 2021-12-16_HPC_säule1_full_slac.pcapng
|
||||
if (self.iAmEvse==1):
|
||||
# If we are EVSE, nothing to do. We have sent the match.CNF by our own.
|
||||
# The SET_KEY was already done at startup.
|
||||
pass
|
||||
else:
|
||||
s = ""
|
||||
for i in range(0, 7):
|
||||
for i in range(0, 7): # NID has 7 bytes
|
||||
self.NID[i] = self.myreceivebuffer[85+i]
|
||||
s=s+hex(self.NID[i])+ " "
|
||||
print("From SlacMatchCnf, got network ID (NID) " + s)
|
||||
|
@ -420,10 +532,16 @@ class pyPlcHomeplug():
|
|||
print(hex(mmt))
|
||||
if (mmt == CM_GET_KEY + MMTYPE_CNF):
|
||||
self.evaluateGetKeyCnf()
|
||||
if (mmt == CM_SLAC_MATCH + MMTYPE_REQ):
|
||||
self.evaluateSlacMatchReq()
|
||||
if (mmt == CM_SLAC_MATCH + MMTYPE_CNF):
|
||||
self.evaluateSlacMatchCnf()
|
||||
if (mmt == CM_SLAC_PARAM + MMTYPE_REQ):
|
||||
self.evaluateSlacParamReq()
|
||||
if (mmt == CM_MNBC_SOUND + MMTYPE_IND):
|
||||
self.evaluateMnbcSoundInd()
|
||||
if (mmt == CM_SET_KEY + MMTYPE_CNF):
|
||||
self.evaluateSetKeyCnf()
|
||||
|
||||
|
||||
def findEthernetAdaptor(self):
|
||||
|
@ -436,6 +554,19 @@ class pyPlcHomeplug():
|
|||
self.strInterfaceName="eth"+str(i)
|
||||
print("eth"+ str(i) + " is " + strInterfaceName)
|
||||
|
||||
def enterPevMode(self):
|
||||
self.iAmEvse = 0 # not emulating a charging station
|
||||
self.iAmPev = 1 # emulating a vehicle
|
||||
self.showStatus("PEV mode", "mode")
|
||||
def enterEvseMode(self):
|
||||
self.iAmEvse = 1 # not emulating a charging station
|
||||
self.iAmPev = 0 # emulating a vehicle
|
||||
self.showStatus("EVSE mode", "mode")
|
||||
def enterListenMode(self):
|
||||
self.iAmEvse = 0 # not emulating a charging station
|
||||
self.iAmPev = 0 # emulating a vehicle
|
||||
self.showStatus("LISTEN mode", "mode")
|
||||
|
||||
def __init__(self, callbackAddToTrace=None, callbackShowStatus=None):
|
||||
self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8')
|
||||
self.nPacketsReceived = 0
|
||||
|
@ -461,20 +592,23 @@ class pyPlcHomeplug():
|
|||
self.sniffer.setnonblock(True)
|
||||
self.NMK = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 ] # a default network key
|
||||
self.NID = [ 1, 2, 3, 4, 5, 6, 7 ] # a default network ID
|
||||
self.pevMac = [0x55, 0x56, 0x57, 0x58, 0x59, 0x5A ] # a default pev MAC
|
||||
self.runningCounter=0
|
||||
self.enterEvseMode()
|
||||
self.showStatus(prettyMac(self.pevMac), "pevmac")
|
||||
print("sniffer created at " + self.strInterfaceName)
|
||||
|
||||
def addToTrace(self, s):
|
||||
self.callbackAddToTrace(s)
|
||||
|
||||
def showStatus(self, s):
|
||||
self.callbackShowStatus(s)
|
||||
def showStatus(self, s, selection=""):
|
||||
self.callbackShowStatus(s, selection)
|
||||
|
||||
def mainfunction(self):
|
||||
# print("will evaluate self.sniffer")
|
||||
for ts, pkt in self.sniffer: # attention: for using this in non-blocking manner, we need the patch described above.
|
||||
self.nPacketsReceived+=1
|
||||
print('%d' % (ts))
|
||||
# print('%d' % (ts)) # the time stamp
|
||||
if (self.isHomeplug(pkt)):
|
||||
self.myreceivebuffer = pkt
|
||||
# self.showMacAddresses(pkt)
|
||||
|
|
|
@ -18,8 +18,8 @@ class pyPlcWorker():
|
|||
def addToTrace(self, s):
|
||||
self.callbackAddToTrace(s)
|
||||
|
||||
def showStatus(self, s):
|
||||
self.callbackShowStatus(s)
|
||||
def showStatus(self, s, selection = ""):
|
||||
self.callbackShowStatus(s, selection)
|
||||
|
||||
def mainfunction(self):
|
||||
self.nMainFunctionCalls+=1
|
||||
|
@ -28,6 +28,13 @@ class pyPlcWorker():
|
|||
|
||||
def handleUserAction(self, strAction):
|
||||
self.strUserAction = strAction
|
||||
if (strAction == "P"):
|
||||
self.hp.enterPevMode()
|
||||
if (strAction == "E"):
|
||||
self.hp.enterEvseMode()
|
||||
if (strAction == "L"):
|
||||
self.hp.enterListenMode()
|
||||
|
||||
self.addToTrace("UserAction " + strAction)
|
||||
self.hp.sendTestFrame(strAction)
|
||||
|
||||
|
|
Loading…
Reference in a new issue