From 28a713c32e1938d5d1391c127e0dfdd12743db38 Mon Sep 17 00:00:00 2001 From: uhi22 Date: Wed, 19 Oct 2022 18:52:43 +0200 Subject: [PATCH] added extended status, added nid to set_key, made slac complete. Connection to Ioniq works. --- pyPlc.py | 18 ++++- pyPlcHomeplug.py | 202 +++++++++++++++++++++++++++++++++++++++-------- pyPlcWorker.py | 11 ++- 3 files changed, 192 insertions(+), 39 deletions(-) diff --git a/pyPlc.py b/pyPlc.py index 5fde14a..843bf50 100644 --- a/pyPlc.py +++ b/pyPlc.py @@ -26,9 +26,16 @@ def inkey(): def cbAddToTrace(s): print(s) -def cbShowStatus(s): - print(s) - lblStatus['text']=s +def cbShowStatus(s, selection=""): + #print(s) + if (selection == "mode"): + lblMode['text']=s + s="" + if (selection == "pevmac"): + lblPevMac['text']=s + s="" + if (len(s)>0): + lblStatus['text']=s root.update() root = tk.Tk() @@ -39,6 +46,11 @@ lblHelp = tk.Label(root, text="x=exit, t=testframe") lblHelp.pack() lblStatus = tk.Label(root, text="(Status)") lblStatus.pack() +lblPevMac = tk.Label(root, text="(pev mac)") +lblPevMac.pack() +lblMode = tk.Label(root, text="(mode)") +lblMode.pack() + # Bind the keyboard handler to all relevant elements: display.bind('', storekeyname) root.bind('', storekeyname) diff --git a/pyPlcHomeplug.py b/pyPlcHomeplug.py index b889255..4f34782 100644 --- a/pyPlcHomeplug.py +++ b/pyPlcHomeplug.py @@ -46,10 +46,16 @@ def showAsHex(mybytearray): strHex = strHex + twoCharHex(mybytearray[i]) + " " print("len " + str(packetlength) + " data " + strHex) +def prettyMac(macByteArray): + s="" + for i in range(0, 5): + s = s + twoCharHex(macByteArray[i]) + ":" + s = s + twoCharHex(macByteArray[i]) + return s + MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ] MAC_LAPTOP = [0xdc, 0x0e, 0xa1, 0x11, 0x67, 0x08 ] MAC_RANDOM = [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff ] -MAC_IONIQ = [0x04, 0x65, 0x65, 0x00, 0x64, 0xC3 ] MAC_ALPI = [0x0A, 0x19, 0x4A, 0x39, 0xD6, 0x98 ] # alpitronics MAC_TPLINK_E4 = [0x98, 0x48, 0x27, 0x5A, 0x3C, 0xE4 ] # TPlink PLC adaptor MAC_TPLINK_E6 = [0x98, 0x48, 0x27, 0x5A, 0x3C, 0xE6 ] # TPlink PLC adaptor @@ -117,7 +123,7 @@ class pyPlcHomeplug(): protocol=mybytearray[12]*256 + mybytearray[13] if (protocol == 0x88E1): blIsHomePlug=True - print("HomePlug protocol") + # print("HomePlug protocol") return blIsHomePlug def fillSourceMac(self, mac, offset=6): # at offset 6 in the ethernet frame, we have the source MAC @@ -141,7 +147,7 @@ class pyPlcHomeplug(): def setNidAt(self, index): # (b0f2e695666b03 was NID of TPlink) - # copies the network ID (NID) into the wished position in the transmit buffer + # copies the network ID (NID, 7 bytes) into the wished position in the transmit buffer for i in range(0, 7): self.mytransmitbuffer[index+i]=self.NID[i] @@ -206,10 +212,13 @@ class pyPlcHomeplug(): self.mytransmitbuffer[30]=0x00 # 11 self.mytransmitbuffer[31]=0x00 # 12 pmn self.mytransmitbuffer[32]=0x00 # 13 cco cap - #self.setNidAt(33) # 14-20 nid 7 bytes from 33 to 39 + self.setNidAt(33) # 14-20 nid 7 bytes from 33 to 39 + # Network ID to be associated with the key distributed herein. + # The 54 LSBs of this field contain the NID (refer to Section 3.4.3.1). The + # two MSBs shall be set to 0b00. self.mytransmitbuffer[40]=0x01 # 21 peks (payload encryption key select) Table 11-83. 01 is NMK. We had 02 here, why??? # with 0x0F we could choose "no key, payload is sent in the clear" - self.setNmkAt(41) + self.setNmkAt(41) self.mytransmitbuffer[41]+=variation # to try different NMKs # and three remaining zeros @@ -256,7 +265,7 @@ class pyPlcHomeplug(): # Destination MAC self.fillDestinationMac(MAC_BROADCAST) # Source MAC - self.fillSourceMac(MAC_IONIQ) + self.fillSourceMac(self.pevMac) # Protocol self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV self.mytransmitbuffer[13]=0xE1 @@ -281,7 +290,7 @@ class pyPlcHomeplug(): self.mytransmitbuffer = bytearray(60) self.cleanTransmitBuffer() # Destination MAC - self.fillDestinationMac(MAC_IONIQ) + self.fillDestinationMac(self.pevMac) # Source MAC self.fillSourceMac(MAC_RANDOM) # Protocol @@ -301,14 +310,76 @@ class pyPlcHomeplug(): self.mytransmitbuffer[25]=0x0A # sound count self.mytransmitbuffer[26]=0x06 # timeout self.mytransmitbuffer[27]=0x01 # resptype - self.fillDestinationMac(MAC_IONIQ, 28) # forwarding_sta, same as PEV MAC, plus 2 bytes 00 00 + self.fillDestinationMac(self.pevMac, 28) # forwarding_sta, same as PEV MAC, plus 2 bytes 00 00 self.mytransmitbuffer[34]=0x00 # self.mytransmitbuffer[35]=0x00 # - self.fillDestinationMac(MAC_IONIQ, 36) # runid, same as PEV MAC, plus 2 bytes 00 00 + self.fillDestinationMac(self.pevMac, 36) # runid, same as PEV MAC, plus 2 bytes 00 00 self.mytransmitbuffer[42]=0x0 # self.mytransmitbuffer[43]=0x0 # # rest is 00 - + + def composeAttenCharInd(self): + self.mytransmitbuffer = bytearray(129) + self.cleanTransmitBuffer() + # Destination MAC + self.fillDestinationMac(self.pevMac) + # Source MAC + self.fillSourceMac(MAC_RANDOM) + # Protocol + self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV + self.mytransmitbuffer[13]=0xE1 + self.mytransmitbuffer[14]=0x01 # version + self.mytransmitbuffer[15]=0x6E # ATTEN_CHAR.IND + self.mytransmitbuffer[16]=0x60 # + self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented. + self.mytransmitbuffer[18]=0x00 # + self.mytransmitbuffer[19]=0x00 # apptype + self.mytransmitbuffer[20]=0x00 # security + self.fillDestinationMac(self.pevMac, 21) # The wireshark calls it source_mac, but alpitronic fills it with PEV mac. We use the PEV MAC. + self.fillDestinationMac(self.pevMac, 27) # runid. The alpitronic fills it with the PEV mac, plus 00 00. + self.mytransmitbuffer[35]=0x00 # 35 - 51 source_id, 17 bytes. The alpitronic fills it with 00 + + self.mytransmitbuffer[52]=0x00 # 52 - 68 response_id, 17 bytes. The alpitronic fills it with 00. + self.mytransmitbuffer[69]=0x0A # Number of sounds. 10 in normal case. Should this be more flexible, e.g. using the counter from first MNBC_SOUND? + self.mytransmitbuffer[70]=0x3A # Number of groups = 58. Should this be more flexible? + for i in range(71, 129): # 71 to 128: The group attenuation for the 58 announced groups. + self.mytransmitbuffer[i]=9 # Typical values are between 1 and 0x19. Since we have no real measurements from the AR7020, + # we just simulate something. 0 seems to be interpreted as "defect", the IONIQ does not send + # a positive response in this case. + # higher attenuation for the higher frequencies, to be a little bit realistic (real data from alpitronic trace) + self.mytransmitbuffer[126]=0x0f + self.mytransmitbuffer[127]=0x13 + self.mytransmitbuffer[128]=0x19 + + def composeSlacMatchCnf(self): + self.mytransmitbuffer = bytearray(109) + self.cleanTransmitBuffer() + # Destination MAC + self.fillDestinationMac(self.pevMac) + # Source MAC + self.fillSourceMac(MAC_RANDOM) + # Protocol + self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV + self.mytransmitbuffer[13]=0xE1 + self.mytransmitbuffer[14]=0x01 # version + self.mytransmitbuffer[15]=0x7D # SLAC_MATCH.CNF + self.mytransmitbuffer[16]=0x60 # + self.mytransmitbuffer[17]=0x00 # 2 bytes fragmentation information. 0000 means: unfragmented. + self.mytransmitbuffer[18]=0x00 # + self.mytransmitbuffer[19]=0x00 # apptype + self.mytransmitbuffer[20]=0x00 # security + self.mytransmitbuffer[21]=0x56 # length 2 byte + self.mytransmitbuffer[22]=0x00 # + # 23 - 39: pev_id 17 bytes. All zero in alpi/Ioniq trace. + self.fillDestinationMac(self.pevMac, 40) # 40 - 45 pev_mac + # 46 - 62: evse_id 17 bytes. All zero in alpi/Ioniq trace. + self.fillSourceMac(MAC_RANDOM, 63) # 63 - 68 evse_mac + self.fillDestinationMac(self.pevMac, 69) # 69-76 run_id. Is the ioniq mac plus 00 00. + # 77 to 84 reserved 0 + self.setNidAt(85) # 85-91 NID. We can nearly freely choose this, but the upper two bits need to be zero + # 92 reserved 0 + self.setNmkAt(93) # 93 to 108 NMK. We can freely choose this. Normally we should use a random number. + def composeDHCP(self): # DHCP discover, to check whether this "normal" package arrives on the other side self.mytransmitbuffer = bytearray(379) @@ -363,8 +434,8 @@ class pyPlcHomeplug(): self.addToTrace("transmitting GetSwReq...") self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) if (selection=="s"): - self.composeSetKey(1) - self.addToTrace("transmitting SET_KEY.REQ (key 1)") + self.composeSetKey(0) + self.addToTrace("transmitting SET_KEY.REQ (key 0)") self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) if (selection=="t"): self.composeSetKey(2) @@ -383,47 +454,94 @@ class pyPlcHomeplug(): # The getkey response contains the Network ID (NID), even if the request was rejected. We store the NID, # to have it available for the next request. s = "" - for i in range(0, 7): + for i in range(0, 7): # NID has 7 bytes self.NID[i] = self.myreceivebuffer[29+i] s=s+hex(self.NID[i])+ " " print("From GetKeyCnf, got network ID (NID) " + s) + def evaluateSetKeyCnf(self): + # The Setkey confirmation + # In spec, the result 0 means "success". But in reality, the 0 means: did not work. When it works, + # then the LEDs are blinking (device is restarting), and the response is 1. + result = self.myreceivebuffer[19] + if (result == 0): + self.addToTrace("SetKeyCnf says 0, this is a bad sign") + else: + self.addToTrace("SetKeyCnf says " + str(result) + ", this is formally 'rejected', but indeed ok.") + def evaluateSlacParamReq(self): # We received a SLAC_PARAM request from the PEV. This is the initiation of a SLAC procedure. - # If we want to emulate an EVSE, we want to answer. - self.composeSlacParamCnf() - self.addToTrace("transmitting CM_SLAC_PARAM.CNF") - self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + # We extract the pev MAC from it. + for i in range(0, 6): + self.pevMac[i] = self.myreceivebuffer[6+i] + self.showStatus(prettyMac(self.pevMac), "pevmac") + # If we want to emulate an EVSE, we want to answer. + if (self.iAmEvse==1): + self.composeSlacParamCnf() + self.addToTrace("transmitting CM_SLAC_PARAM.CNF") + self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + + def evaluateMnbcSoundInd(self): + # We received MNBC_SOUND.IND from the PEV. Normally this happens 10times, with a countdown (remaining number of sounds) + # running from 9 to 0. If the countdown is 0, this is the last message. In case we are the EVSE, we need + # to answer with a ATTEN_CHAR.IND, which normally contains the attenuation for 10 sounds, 58 groups. + if (self.iAmEvse==1): + countdown = self.myreceivebuffer[38] + if (countdown == 0): + self.composeAttenCharInd() + self.addToTrace("transmitting ATTEN_CHAR.IND") + self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + + def evaluateSlacMatchReq(self): + # We received SLAC_MATCH.REQ from the PEV. + # If we are EVSE, we send the response. + if (self.iAmEvse==1): + self.composeSlacMatchCnf() + self.addToTrace("transmitting SLAC_MATCH.CNF") + self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + + def evaluateSlacMatchCnf(self): # The SLAC_MATCH.CNF contains the NMK and the NID. # We extract this information, so that we can use it for the CM_SET_KEY afterwards. # References: https://github.com/qca/open-plc-utils/blob/master/slac/evse_cm_slac_match.c # 2021-12-16_HPC_säule1_full_slac.pcapng - s = "" - for i in range(0, 7): - self.NID[i] = self.myreceivebuffer[85+i] - s=s+hex(self.NID[i])+ " " - print("From SlacMatchCnf, got network ID (NID) " + s) - s = "" - for i in range(0, 16): - self.NMK[i] = self.myreceivebuffer[93+i] - s=s+hex(self.NMK[i])+ " " - print("From SlacMatchCnf, got network membership key (NMK) " + s) - # use the extracted NMK and NID to set the key in the adaptor: - self.composeSetKey(0) - self.addToTrace("transmitting CM_SET_KEY.REQ") - self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + if (self.iAmEvse==1): + # If we are EVSE, nothing to do. We have sent the match.CNF by our own. + # The SET_KEY was already done at startup. + pass + else: + s = "" + for i in range(0, 7): # NID has 7 bytes + self.NID[i] = self.myreceivebuffer[85+i] + s=s+hex(self.NID[i])+ " " + print("From SlacMatchCnf, got network ID (NID) " + s) + s = "" + for i in range(0, 16): + self.NMK[i] = self.myreceivebuffer[93+i] + s=s+hex(self.NMK[i])+ " " + print("From SlacMatchCnf, got network membership key (NMK) " + s) + # use the extracted NMK and NID to set the key in the adaptor: + self.composeSetKey(0) + self.addToTrace("transmitting CM_SET_KEY.REQ") + self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) def evaluateReceivedHomeplugPacket(self): mmt = self.getManagementMessageType() print(hex(mmt)) if (mmt == CM_GET_KEY + MMTYPE_CNF): self.evaluateGetKeyCnf() + if (mmt == CM_SLAC_MATCH + MMTYPE_REQ): + self.evaluateSlacMatchReq() if (mmt == CM_SLAC_MATCH + MMTYPE_CNF): self.evaluateSlacMatchCnf() if (mmt == CM_SLAC_PARAM + MMTYPE_REQ): self.evaluateSlacParamReq() + if (mmt == CM_MNBC_SOUND + MMTYPE_IND): + self.evaluateMnbcSoundInd() + if (mmt == CM_SET_KEY + MMTYPE_CNF): + self.evaluateSetKeyCnf() def findEthernetAdaptor(self): @@ -435,6 +553,19 @@ class pyPlcHomeplug(): print("This is the wanted Ethernet adaptor.") self.strInterfaceName="eth"+str(i) print("eth"+ str(i) + " is " + strInterfaceName) + + def enterPevMode(self): + self.iAmEvse = 0 # not emulating a charging station + self.iAmPev = 1 # emulating a vehicle + self.showStatus("PEV mode", "mode") + def enterEvseMode(self): + self.iAmEvse = 1 # not emulating a charging station + self.iAmPev = 0 # emulating a vehicle + self.showStatus("EVSE mode", "mode") + def enterListenMode(self): + self.iAmEvse = 0 # not emulating a charging station + self.iAmPev = 0 # emulating a vehicle + self.showStatus("LISTEN mode", "mode") def __init__(self, callbackAddToTrace=None, callbackShowStatus=None): self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8') @@ -461,20 +592,23 @@ class pyPlcHomeplug(): self.sniffer.setnonblock(True) self.NMK = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16 ] # a default network key self.NID = [ 1, 2, 3, 4, 5, 6, 7 ] # a default network ID + self.pevMac = [0x55, 0x56, 0x57, 0x58, 0x59, 0x5A ] # a default pev MAC self.runningCounter=0 + self.enterEvseMode() + self.showStatus(prettyMac(self.pevMac), "pevmac") print("sniffer created at " + self.strInterfaceName) def addToTrace(self, s): self.callbackAddToTrace(s) - def showStatus(self, s): - self.callbackShowStatus(s) + def showStatus(self, s, selection=""): + self.callbackShowStatus(s, selection) def mainfunction(self): # print("will evaluate self.sniffer") for ts, pkt in self.sniffer: # attention: for using this in non-blocking manner, we need the patch described above. self.nPacketsReceived+=1 - print('%d' % (ts)) + # print('%d' % (ts)) # the time stamp if (self.isHomeplug(pkt)): self.myreceivebuffer = pkt # self.showMacAddresses(pkt) diff --git a/pyPlcWorker.py b/pyPlcWorker.py index 65b6fd8..6baab08 100644 --- a/pyPlcWorker.py +++ b/pyPlcWorker.py @@ -18,8 +18,8 @@ class pyPlcWorker(): def addToTrace(self, s): self.callbackAddToTrace(s) - def showStatus(self, s): - self.callbackShowStatus(s) + def showStatus(self, s, selection = ""): + self.callbackShowStatus(s, selection) def mainfunction(self): self.nMainFunctionCalls+=1 @@ -28,6 +28,13 @@ class pyPlcWorker(): def handleUserAction(self, strAction): self.strUserAction = strAction + if (strAction == "P"): + self.hp.enterPevMode() + if (strAction == "E"): + self.hp.enterEvseMode() + if (strAction == "L"): + self.hp.enterListenMode() + self.addToTrace("UserAction " + strAction) self.hp.sendTestFrame(strAction)