diff --git a/helpers.py b/helpers.py new file mode 100644 index 0000000..5a459e4 --- /dev/null +++ b/helpers.py @@ -0,0 +1,19 @@ + + +def twoCharHex(b): + strHex = "%0.2X" % b + return strHex + +def showAsHex(mybytearray, description=""): + packetlength = len(mybytearray) + strHex = "" + for i in range(0, packetlength): + strHex = strHex + twoCharHex(mybytearray[i]) + " " + print(description + "(" + str(packetlength) + "bytes) = " + strHex) + +def prettyMac(macByteArray): + s="" + for i in range(0, 5): + s = s + twoCharHex(macByteArray[i]) + ":" + s = s + twoCharHex(macByteArray[i]) + return s \ No newline at end of file diff --git a/pyPlcHomeplug.py b/pyPlcHomeplug.py index 4f34782..6244f2f 100644 --- a/pyPlcHomeplug.py +++ b/pyPlcHomeplug.py @@ -34,24 +34,8 @@ import pcap - -def twoCharHex(b): - strHex = "%0.2X" % b - return strHex - -def showAsHex(mybytearray): - packetlength = len(mybytearray) - strHex = "" - for i in range(0, packetlength): - strHex = strHex + twoCharHex(mybytearray[i]) + " " - print("len " + str(packetlength) + " data " + strHex) - -def prettyMac(macByteArray): - s="" - for i in range(0, 5): - s = s + twoCharHex(macByteArray[i]) + ":" - s = s + twoCharHex(macByteArray[i]) - return s +import pyPlcIpv6 +from helpers import * # prettyMac etc MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ] MAC_LAPTOP = [0xdc, 0x0e, 0xa1, 0x11, 0x67, 0x08 ] @@ -117,14 +101,12 @@ class pyPlcHomeplug(): print("From " + strSourceMac + strSourceFriendlyName + " to " + strDestMac) - def isHomeplug(self, mybytearray): - blIsHomePlug=False - if len(mybytearray)>(6+6+2): - protocol=mybytearray[12]*256 + mybytearray[13] - if (protocol == 0x88E1): - blIsHomePlug=True - # print("HomePlug protocol") - return blIsHomePlug + def getEtherType(self, messagebufferbytearray): + etherType=0 + if len(messagebufferbytearray)>(6+6+2): + etherType=messagebufferbytearray[12]*256 + messagebufferbytearray[13] + return etherType + def fillSourceMac(self, mac, offset=6): # at offset 6 in the ethernet frame, we have the source MAC # we can give a different offset, to re-use the MAC also in the data area @@ -424,31 +406,34 @@ class pyPlcHomeplug(): if (selection=="1"): self.composeSlacParamReq() self.addToTrace("transmitting SLAC_PARAM.REQ...") - self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + self.transmit(self.mytransmitbuffer) if (selection=="2"): self.composeSlacParamCnf() self.addToTrace("transmitting SLAC_PARAM.CNF...") - self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + self.transmit(self.mytransmitbuffer) if (selection=="S"): self.composeGetSwReq() self.addToTrace("transmitting GetSwReq...") - self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + self.transmit(self.mytransmitbuffer) if (selection=="s"): self.composeSetKey(0) self.addToTrace("transmitting SET_KEY.REQ (key 0)") - self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + self.transmit(self.mytransmitbuffer) if (selection=="t"): self.composeSetKey(2) self.addToTrace("transmitting SET_KEY.REQ (key 2)") - self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + self.transmit(self.mytransmitbuffer) if (selection=="D"): self.composeDHCP() self.addToTrace("transmitting broken DHCP") - self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + self.transmit(self.mytransmitbuffer) if (selection=="G"): self.composeGetKey() self.addToTrace("transmitting GET_KEY") - self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + self.transmit(self.mytransmitbuffer) + + def transmit(self, pkt): + self.sniffer.sendpacket(bytes(pkt)) def evaluateGetKeyCnf(self): # The getkey response contains the Network ID (NID), even if the request was rejected. We store the NID, @@ -594,6 +579,8 @@ class pyPlcHomeplug(): self.NID = [ 1, 2, 3, 4, 5, 6, 7 ] # a default network ID self.pevMac = [0x55, 0x56, 0x57, 0x58, 0x59, 0x5A ] # a default pev MAC self.runningCounter=0 + self.ipv6 = pyPlcIpv6.ipv6handler(self.transmit) + self.ipv6.ownMac = MAC_RANDOM self.enterEvseMode() self.showStatus(prettyMac(self.pevMac), "pevmac") print("sniffer created at " + self.strInterfaceName) @@ -609,16 +596,16 @@ class pyPlcHomeplug(): for ts, pkt in self.sniffer: # attention: for using this in non-blocking manner, we need the patch described above. self.nPacketsReceived+=1 # print('%d' % (ts)) # the time stamp - if (self.isHomeplug(pkt)): + etherType = self.getEtherType(pkt) + if (etherType == 0x88E1): # it is a HomePlug message self.myreceivebuffer = pkt # self.showMacAddresses(pkt) self.evaluateReceivedHomeplugPacket() + if (etherType == 0x86dd): # it is an IPv6 frame + self.ipv6.evaluateReceivedPacket(pkt) + self.showStatus("nPacketsReceived=" + str(self.nPacketsReceived)) def close(self): self.sniffer.close() -#sn = pyPlcHomeplug() -#while (1): -# print("Press control-C to stop") -# sn.mainfunction() diff --git a/pyPlcIpv6.py b/pyPlcIpv6.py new file mode 100644 index 0000000..13d59ce --- /dev/null +++ b/pyPlcIpv6.py @@ -0,0 +1,177 @@ + +# This module handles the IPv6 related functionality of the communication between charging station and car. +# +# It has the following sub-functionalities: +# - IP.UDP.SDP: listen to requests from the car, and responding to them. +# +# Abbreviations: +# SECC: Supply Equipment Communication Controller. The "computer" of the charging station. +# EVCC: Electric Vehicle Communication Controller. The "computer" of the vehicle. +# SDP: SECC Discovery Protocol. The UDP based protocol to find out the IP address of the charging station. +# SLAAC: Stateless auto address configuration (not SLAC!). A method to automatically set IPv6 address, based +# on the 6 byte MAC address. + +from helpers import showAsHex + + +class ipv6handler(): + def fillMac(self, macbytearray, position=6): # position 6 is the source MAC + for i in range(0, 6): + self.EthResponse[6+i] = macbytearray[i] + + def packResponseIntoEthernet(self, buffer): + # packs the IP packet into an ethernet packet + self.EthResponse = bytearray(len(buffer) + 6 + 6 + 2) # Ethernet header needs 14 bytes: + # 6 bytes MAC + # 6 bytes MAC + # 2 bytes EtherType + self.EthResponse[0] = 0x33 # destination MAC. We use multicast. Todo: Better use the PEV MAC. + self.EthResponse[1] = 0x33 + self.EthResponse[2] = 0x00 + self.EthResponse[3] = 0x00 + self.EthResponse[4] = 0x00 + self.EthResponse[5] = 0x01 + self.fillMac(self.ownMac) # bytes 6 to 11 are the source MAC + self.EthResponse[12] = 0x86 # 86dd is IPv6 + self.EthResponse[13] = 0xdd + for i in range(0, len(buffer)): + self.EthResponse[14+i] = buffer[i] + self.transmit(self.EthResponse) + + + def packResponseIntoIp(self, buffer): + # embeds the (SDP) response into the lower-layer-protocol: IP, Ethernet + self.IpResponse = bytearray(len(buffer) + 8 + 16 + 16) # IP6 needs 40 bytes: + # 4 bytes traffic class, flow + # 2 bytes destination port + # 2 bytes length (incl checksum) + # 2 bytes checksum + self.IpResponse[0] = 0x60 # traffic class, flow + self.IpResponse[1] = 0 + self.IpResponse[2] = 0 + self.IpResponse[3] = 0 + plen = len(buffer) # length of the payload. Without headers. + self.IpResponse[4] = plen >> 8 + self.IpResponse[5] = plen & 0xFF + self.IpResponse[6] = 0x11 # next level protocol, 0x11 = UDP in this case + self.IpResponse[7] = 0x0A # hop limit + for i in range(0, 16): + self.IpResponse[8+i] = self.SeccIp[i] # source IP address + for i in range(0, 16): + self.IpResponse[24+i] = self.EvccIp[i] # destination IP address + for i in range(0, len(buffer)): + self.IpResponse[40+i] = buffer[i] + showAsHex(self.IpResponse, "IP response ") + self.packResponseIntoEthernet(self.IpResponse) + + + def packResponseIntoUdp(self, buffer): + # embeds the (SDP) response into the lower-layer-protocol: UDP + self.UdpResponse = bytearray(len(buffer) + 8) # UDP needs 8 bytes: + # 2 bytes source port + # 2 bytes destination port + # 2 bytes length (incl checksum) + # 2 bytes checksum + self.UdpResponse[0] = 15118 >> 8 + self.UdpResponse[1] = 15118 & 0xFF + self.UdpResponse[2] = self.evccPort >> 8 + self.UdpResponse[3] = self.evccPort & 0xFF + lenInclChecksum = len(buffer) + 8 + self.UdpResponse[4] = lenInclChecksum >> 8 + self.UdpResponse[5] = lenInclChecksum & 0xFF + checksum = 0x1234 # todo: calculate this checksum + self.UdpResponse[6] = checksum >> 8 + self.UdpResponse[7] = checksum & 0xFF + for i in range(0, len(buffer)): + self.UdpResponse[8+i] = buffer[i] + showAsHex(self.UdpResponse, "UDP response ") + self.packResponseIntoIp(self.UdpResponse) + + def prepareSdpResponse(self): + # SECC Discovery Response. + # The response from the charger to the EV, which tells the chargers IPv6 address to the EV. + self.SdpPayload = bytearray(20) + for i in range(0, 16): + self.SdpPayload[i] = self.SeccIp[i] # 16 bytes IP address of the charger + self.SdpPayload[16] = 15118 >> 8 # SECC port high byte. Port is always 15118. + self.SdpPayload[17] = 15118 & 0xFF # SECC port low byte. Port is always 15118. + self.SdpPayload[18] = 0x10 # security. We only support "no transport layer security, 0x10". + self.SdpPayload[19] = 0x00 # transport protocol. We only support "TCP, 0x00". + showAsHex(self.SdpPayload, "SDP payload ") + # add the SDP header + lenSdp = len(self.SdpPayload) + self.V2Gframe = bytearray(lenSdp + 8) # V2GTP header needs 8 bytes: + # 1 byte protocol version + # 1 byte protocol version inverted + # 2 bytes payload type + # 4 byte payload length + self.V2Gframe[0] = 0x01 # version + self.V2Gframe[1] = 0xfe # version inverted + self.V2Gframe[2] = 0x90 # payload type. 0x9001 is the SDP response message + self.V2Gframe[3] = 0x01 # + self.V2Gframe[4] = (lenSdp >> 24) & 0xff # length 4 byte. + self.V2Gframe[5] = (lenSdp >> 16) & 0xff + self.V2Gframe[6] = (lenSdp >> 8) & 0xff + self.V2Gframe[7] = lenSdp & 0xff + for i in range(0, lenSdp): + self.V2Gframe[8+i] = self.SdpPayload[i] + showAsHex(self.V2Gframe, "V2Gframe ") + self.packResponseIntoUdp(self.V2Gframe) + + def evaluateUdp(self): + if (self.destinationport == 15118): # port for the SECC + if ((self.udpPayload[0]==0x01) and (self.udpPayload[1]==0xFE)): # protocol version 1 and inverted + # it is a V2GTP message + self.evccPort = self.sourceport + v2gptPayloadType = self.udpPayload[2] * 256 + self.udpPayload[3] + # 0x8001 EXI encoded V2G message + # 0x9000 SDP request message (SECC Discovery) + # 0x9001 SDP response message (SECC response to the EVCC) + if (v2gptPayloadType == 0x9000): + v2gptPayloadLen = self.udpPayload[4] * 256 ** 3 + self.udpPayload[5] * 256 ** 2 + self.udpPayload[6] * 256 + self.udpPayload[7] + if (v2gptPayloadLen == 2): + # 2 is the only valid length for a SDP request. + seccDiscoveryReqSecurity = self.udpPayload[8] # normally 0x10 for "no transport layer security". Or 0x00 for "TLS". + seccDiscoveryReqTransportProtocol = self.udpPayload[9] # normally 0x00 for TCP + if (seccDiscoveryReqSecurity!=0x10): + print("seccDiscoveryReqSecurity " + str(seccDiscoveryReqSecurity) + " is not supported") + else: + if (seccDiscoveryReqTransportProtocol!=0x00): + print("seccDiscoveryReqTransportProtocol " + str(seccDiscoveryReqTransportProtocol) + " is not supported") + else: + # This was a valid SDP request. Let's respond. + + showAsHex(self.udpPayload, "udp payload ") + print("ok, this was a valid SDP request. Will respond.") + self.prepareSdpResponse() + else: + print("v2gptPayloadLen on SDP request is " + str(v2gptPayloadLen) + " not supported") + else: + print("v2gptPayloadType " + hex(v2gptPayloadType) + " not supported") + + + def evaluateReceivedPacket(self, pkt): + if (len(pkt)>60): + self.myreceivebuffer = pkt + self.nextheader = self.myreceivebuffer[20] + if (self.nextheader == 0x11): # it is an UDP frame + self.sourceport = self.myreceivebuffer[54] * 256 + self.myreceivebuffer[55] + self.destinationport = self.myreceivebuffer[56] * 256 + self.myreceivebuffer[57] + self.udplen = self.myreceivebuffer[58] * 256 + self.myreceivebuffer[59] + self.udpsum = self.myreceivebuffer[60] * 256 + self.myreceivebuffer[61] + # udplen is including 8 bytes header at the begin + if (self.udplen>8): + self.udpPayload = bytearray(self.udplen-8) + print("self.udplen=" + str(self.udplen)) + print("self.myreceivebuffer len=" + str(len(self.myreceivebuffer))) + for i in range(0, self.udplen-8): + #print("index " + str(i) + " " + hex(self.myreceivebuffer[62+i])) + self.udpPayload[i] = self.myreceivebuffer[62+i] + self.evaluateUdp() + + def __init__(self, transmitCallback): + self.transmit = transmitCallback + self.SeccIp = [ 0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0x06, 0xaa, 0xaa, 0xff, 0xfe, 0, 0xaa, 0xaa ] # 16 bytes, a default IPv6 address for the charging station + self.EvccIp = [ 0xfe, 0x80, 0, 0, 0, 0, 0, 0, 0x06, 0x65, 0x65, 0xff, 0xfe, 0, 0x64, 0xC3 ] # 16 bytes, a default IPv6 address for the vehicle + self.ownMac = [ 0x01, 0x02, 0x03, 0x04, 0x05, 0x06 ] # 6 bytes own MAC default. Should be overwritten before use. + \ No newline at end of file diff --git a/tests/tester.py b/tests/tester.py new file mode 100644 index 0000000..486dc36 --- /dev/null +++ b/tests/tester.py @@ -0,0 +1,155 @@ + +# test + +import pcap +import time + +MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ] +MAC_LAPTOP = [0xdc, 0x0e, 0xa1, 0x11, 0x67, 0x08 ] +MAC_RANDOM = [0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff ] +MAC_ALPI = [0x0A, 0x19, 0x4A, 0x39, 0xD6, 0x98 ] # alpitronics +MAC_TPLINK_E4 = [0x98, 0x48, 0x27, 0x5A, 0x3C, 0xE4 ] # TPlink PLC adaptor +MAC_TPLINK_E6 = [0x98, 0x48, 0x27, 0x5A, 0x3C, 0xE6 ] # TPlink PLC adaptor +MAC_DEVOLO_26 = [0xBC, 0xF2, 0xAF, 0x0B, 0x8E, 0x26 ] # Devolo PLC adaptor +MAC_IPv6MCAST1 = [0x33, 0x33, 0x00, 0x00, 0x00, 0x01 ] # IPv6 multicast MAC +MAC_RANDCAR = [0x04, 0x65, 0x65, 0x00, 0xaf, 0xfe ] # random hyundai car + + +class tester(): + + def cleanTransmitBuffer(self): # fill the complete ethernet transmit buffer with 0x00 + for i in range(0, len(self.mytransmitbuffer)): + self.mytransmitbuffer[i]=0 + + def fillSourceMac(self, mac, offset=6): # at offset 6 in the ethernet frame, we have the source MAC + # we can give a different offset, to re-use the MAC also in the data area + for i in range(0, 6): + self.mytransmitbuffer[offset+i]=mac[i] + + def fillDestinationMac(self, mac, offset=0): # at offset 0 in the ethernet frame, we have the destination MAC + # we can give a different offset, to re-use the MAC also in the data area + for i in range(0, 6): + self.mytransmitbuffer[offset+i]=mac[i] + + def sendTestFrame1(self): + self.mytransmitbuffer = bytearray(72) + self.cleanTransmitBuffer() + # Destination MAC + self.fillDestinationMac(MAC_IPv6MCAST1) + # Source MAC + self.fillSourceMac(MAC_RANDCAR) + # Protocol + self.mytransmitbuffer[12]=0x86 # IPv6 + self.mytransmitbuffer[13]=0xdd + self.mytransmitbuffer[14]=0x60 # + self.mytransmitbuffer[15]=0x00 # + self.mytransmitbuffer[16]=0x00 # + self.mytransmitbuffer[17]=0x00 # + + self.mytransmitbuffer[18]=0x00 # len 2 bytes + self.mytransmitbuffer[19]=0x12 # + + self.mytransmitbuffer[20]=0x11 # next is UDP + self.mytransmitbuffer[21]=0x0A # hop limit + + self.mytransmitbuffer[22]=0xfe # 22 to 37 ip source address + self.mytransmitbuffer[23]=0x80 # 22 to 37 ip source address + self.mytransmitbuffer[24]=0x00 # 22 to 37 ip source address + self.mytransmitbuffer[25]=0x00 # 22 to 37 ip source address + self.mytransmitbuffer[26]=0x00 # 22 to 37 ip source address + self.mytransmitbuffer[27]=0x00 # 22 to 37 ip source address + self.mytransmitbuffer[28]=0x00 # 22 to 37 ip source address + self.mytransmitbuffer[29]=0x00 # 22 to 37 ip source address + self.mytransmitbuffer[30]=0x06 # 22 to 37 ip source address + self.mytransmitbuffer[31]=0x65 # 22 to 37 ip source address + self.mytransmitbuffer[32]=0x65 # 22 to 37 ip source address + self.mytransmitbuffer[33]=0xff # 22 to 37 ip source address + self.mytransmitbuffer[34]=0xfe # 22 to 37 ip source address + self.mytransmitbuffer[35]=0x00 # 22 to 37 ip source address + self.mytransmitbuffer[36]=0x64 # 22 to 37 ip source address + self.mytransmitbuffer[37]=0xc3 # 22 to 37 ip source address + + self.mytransmitbuffer[38]=0xff # 38 to 53 ip destination address + self.mytransmitbuffer[39]=0x02 # 38 to 53 ip destination address + self.mytransmitbuffer[53]=0x01 # 38 to 53 ip destination address + + self.mytransmitbuffer[54]=0xcc # source port + self.mytransmitbuffer[55]=0xab + + self.mytransmitbuffer[56]=0x3b # dest port + self.mytransmitbuffer[57]=0x0e + + self.mytransmitbuffer[58]=0x00 # length + self.mytransmitbuffer[59]=0x12 + + self.mytransmitbuffer[60]= 0x89 # checksum + self.mytransmitbuffer[61]= 0x62 + + self.mytransmitbuffer[62]= 0x01 + self.mytransmitbuffer[63]= 0xFE + + self.mytransmitbuffer[64]= 0x90 + self.mytransmitbuffer[65]= 0x00 + + self.mytransmitbuffer[66]= 0x00 + self.mytransmitbuffer[67]= 0x00 + self.mytransmitbuffer[68]= 0x00 + self.mytransmitbuffer[69]= 0x02 + + self.mytransmitbuffer[70]= 0x10 + self.mytransmitbuffer[71]= 0x00 + + self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + print("transmitted test frame 1") + + + + def findEthernetAdaptor(self): + self.strInterfaceName="eth0" # default, if the real is not found + print("Interfaces:\n" + '\n'.join(pcap.findalldevs())) + for i in range(0, 10): + strInterfaceName = pcap.ex_name("eth"+str(i)) + if (strInterfaceName == '\\Device\\NPF_{E4B8176C-8516-4D48-88BC-85225ABCF259}'): + print("This is the wanted Ethernet adaptor.") + self.strInterfaceName="eth"+str(i) + print("eth"+ str(i) + " is " + strInterfaceName) + + + def __init__(self): + self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8') + self.nPacketsReceived = 0 + #self.sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50) + # eth3 means: Third entry from back, in the list of interfaces, which is provided by pcap.findalldevs. + # Improvement necessary: select the interface based on the name. + # For debugging of the interface names, we can patch the file + # C:\Users\uwemi\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\pcap\_pcap_ex.py, + # in the function + # def name(name: bytes) -> bytes: + # in the place after + # if i == idx: + # print("index match at " + str(i) + " dev name=" + str(dev.name) + " dev.description=" + str(dev.description)) + # This will print the description of the used interface. + # + # Patch for non-blocking read-iteration: + # in _pcap.py, function def __next__(self), in the case of timeout (if n==0), we need to "raise StopIteration" instead of "continue". + # + self.findEthernetAdaptor() + self.sniffer = pcap.pcap(name=self.strInterfaceName, promisc=True, immediate=True, timeout_ms=50) + self.sniffer.setnonblock(True) + print("sniffer created at " + self.strInterfaceName) + + def mainfunction(self): + # print("will evaluate self.sniffer") + for ts, pkt in self.sniffer: # attention: for using this in non-blocking manner, we need the patch described above. + self.nPacketsReceived+=1 + # print('%d' % (ts)) # the time stamp + + def close(self): + self.sniffer.close() + +t=tester() +print(256 ** 2) +for i in range(0, 100): + t.sendTestFrame1() + time.sleep(0.5) +t.close()