diff --git a/pyPlc.py b/pyPlc.py new file mode 100644 index 0000000..1dd9ed7 --- /dev/null +++ b/pyPlc.py @@ -0,0 +1,58 @@ + +# This is a simple Tkinter program, running a main loop and reacting on keys +# +# Tested on Windows10 with python 3.9 +# +# https://groups.google.com/g/comp.lang.python/c/dldnjWRX3lE/m/cL69gG3fCAAJ + +#------------------------------------------------------------ +import tkinter as tk +import time +import pyPlcWorker + +def storekeyname(event): + global nKeystrokes + global lastKey + nKeystrokes+=1 + lastKey = event.keysym + worker.handleUserAction(lastKey) + return 'break' # swallow the event + +def inkey(): + global lastKey + return lastKey + lastKey = '' + +def cbAddToTrace(s): + print(s) + +def cbShowStatus(s): + print(s) + lblStatus['text']=s + root.update() + +root = tk.Tk() +lastKey = '' +display = tk.Label(root, text='No Key', width=30) # A textual element in the graphical user interface +display.pack() +lblHelp = tk.Label(root, text="x=exit, t=testframe") +lblHelp.pack() +lblStatus = tk.Label(root, text="(Status)") +lblStatus.pack() +# Bind the keyboard handler to all relevant elements: +display.bind('', storekeyname) +root.bind('', storekeyname) +cbShowStatus("initialized") +root.update() +worker=pyPlcWorker.pyPlcWorker(cbAddToTrace, cbShowStatus) + +nMainloops=0 +nKeystrokes=0 +while lastKey!="x": + time.sleep(.3) # 'do some calculation' + nMainloops+=1 + # print(str(nMainloops) + " " + str(nKeystrokes)) # show something in the console window + root.update() + worker.mainfunction() + +#--------------------------------------------------------------- diff --git a/pyPlcHomeplug.py b/pyPlcHomeplug.py new file mode 100644 index 0000000..d56bced --- /dev/null +++ b/pyPlcHomeplug.py @@ -0,0 +1,160 @@ + +# Preconditions: +# Library pcap-ct (not libpcap, not pylibpcap, not pypcap) +# +# Version 2022-08-14: +# - Selection of interfaces ok +# - Sniffing of the SLAC-request ok +# - Transmission of a demo message ok +# +import pcap + +def twoCharHex(b): + strHex = "%0.2X" % b + return strHex + +def showAsHex(mybytearray): + packetlength = len(mybytearray) + strHex = "" + for i in range(0, packetlength): + strHex = strHex + twoCharHex(mybytearray[i]) + " " + print("len " + str(packetlength) + " data " + strHex) + + +class pyPlcHomeplug(): + def showIpAddresses(self, mybytearray): + addr = lambda pkt, offset: '.'.join(str(pkt[i]) for i in range(offset, offset + 4)) + print('SRC %-16s\tDST %-16s' % (addr(mybytearray, self.sniffer.dloff + 12), addr(mybytearray, self.sniffer.dloff + 16))) + + def showMacAddresses(self, mybytearray): + strDestMac = "" + for i in range(0, 6): + strDestMac = strDestMac + twoCharHex(mybytearray[i]) + ":" + strSourceMac = "" + for i in range(5, 12): + strSourceMac = strSourceMac + twoCharHex(mybytearray[i]) + ":" + lastThreeOfSource = mybytearray[6]*256*256 + mybytearray[7]*256 + mybytearray[8] + strSourceFriendlyName = "" + if (lastThreeOfSource == 0x0a663a): + strSourceFriendlyName="Fritzbox" + if (lastThreeOfSource == 0x0064c3): + strSourceFriendlyName="Ioniq" + + print("From " + strSourceMac + strSourceFriendlyName + " to " + strDestMac) + + def isHomeplug(self, mybytearray): + blIsHomePlug=False + if len(mybytearray)>(6+6+2): + protocol=mybytearray[12]*256 + mybytearray[13] + if (protocol == 0x88E1): + blIsHomePlug=True + print("HomePlug protocol") + return blIsHomePlug + + def composeTestFrame(self): + self.mytransmitbuffer = bytearray(60) + # Destination MAC + self.mytransmitbuffer[0]=0x04 # Ioniq MAC + self.mytransmitbuffer[1]=0x65 + self.mytransmitbuffer[2]=0x65 + self.mytransmitbuffer[3]=0x00 + self.mytransmitbuffer[4]=0x64 + self.mytransmitbuffer[5]=0xC3 + + # Source MAC + self.mytransmitbuffer[6]=0x0A # Alpi MAC + self.mytransmitbuffer[7]=0x19 + self.mytransmitbuffer[8]=0x4A + self.mytransmitbuffer[9]=0x39 + self.mytransmitbuffer[10]=0xD6 + self.mytransmitbuffer[11]=0x98 + + # Protocol + self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV + self.mytransmitbuffer[13]=0xE1 + + self.mytransmitbuffer[14]=0x01 # version + self.mytransmitbuffer[15]=0x65 # SLAC_PARAM.confirm + + self.mytransmitbuffer[16]=0x60 # + self.mytransmitbuffer[17]=0x00 # + self.mytransmitbuffer[18]=0x00 # + self.mytransmitbuffer[19]=0xff # + self.mytransmitbuffer[20]=0xff # + self.mytransmitbuffer[21]=0xff # + self.mytransmitbuffer[22]=0xff # + self.mytransmitbuffer[23]=0xff # + self.mytransmitbuffer[24]=0xff # + self.mytransmitbuffer[25]=0x0A # + self.mytransmitbuffer[26]=0x06 # + self.mytransmitbuffer[27]=0x01 # + self.mytransmitbuffer[28]=0x04 # + self.mytransmitbuffer[29]=0x65 # + self.mytransmitbuffer[30]=0x65 # + self.mytransmitbuffer[31]=0x00 # + + self.mytransmitbuffer[32]=0x64 # + self.mytransmitbuffer[33]=0xC3 # + self.mytransmitbuffer[34]=0x00 # + self.mytransmitbuffer[35]=0x00 # + self.mytransmitbuffer[36]=0x04 # + self.mytransmitbuffer[37]=0x65 # + self.mytransmitbuffer[38]=0x65 # + self.mytransmitbuffer[39]=0x00 # + self.mytransmitbuffer[40]=0x64 # + self.mytransmitbuffer[41]=0xC3 # + self.mytransmitbuffer[42]=0x0 # + self.mytransmitbuffer[43]=0x0 # + self.mytransmitbuffer[44]=0x0 # + self.mytransmitbuffer[45]=0x0 # + self.mytransmitbuffer[46]=0x0 # + self.mytransmitbuffer[47]=0x0 # + + self.mytransmitbuffer[48]=0x0 # + self.mytransmitbuffer[49]=0x0 # + self.mytransmitbuffer[50]=0x0 # + self.mytransmitbuffer[51]=0x0 # + self.mytransmitbuffer[52]=0x0 # + self.mytransmitbuffer[53]=0x0 # + self.mytransmitbuffer[54]=0x0 # + self.mytransmitbuffer[55]=0x0 # + self.mytransmitbuffer[56]=0x0 # + self.mytransmitbuffer[57]=0x0 # + self.mytransmitbuffer[58]=0x0 # + self.mytransmitbuffer[59]=0x0 # + + def sendTestFrame(self): + addToTrace("transmitting test frame...") + self.composeTestFrame() + self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + + def __init__(self, callbackAddToTrace=None, callbackShowStatus=None): + self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8') + self.callbackAddToTrace = callbackAddToTrace + self.callbackShowStatus = callbackShowStatus #self.sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50) + # eth3 bedeutet: Dritter Eintrag von hinten, in der Liste der Interfaces, die von pcap.findalldevs geliefert wird. + # Verbesserungsbedarf: Interface namensbasiert auswählen. + self.sniffer = pcap.pcap(name="eth3", promisc=True, immediate=True, timeout_ms=50) + + def addToTrace(s): + self.callbackAddToTrace(s) + + def mainfunction(self): + + for ts, pkt in self.sniffer: + #print('%d' % (ts)) + if (self.isHomeplug(pkt)): + self.showMacAddresses(pkt) + showAsHex(pkt) + if (pkt[15]==0x64): #SLAC_Request + self.sendTestFrame() + + else: + addToTrace("(other)") + def close(self): + self.sniffer.close() + +#sn = pyPlcHomeplug() +#while (1): +# print("Press control-C to stop") +# sn.mainfunction() \ No newline at end of file diff --git a/pyPlcWorker.py b/pyPlcWorker.py new file mode 100644 index 0000000..f8e7dc6 --- /dev/null +++ b/pyPlcWorker.py @@ -0,0 +1,41 @@ +# Worker for the pyPLC +# +# Tested on Windows10 with python 3.9 +# + +#------------------------------------------------------------ +import pyPlcHomeplug + + + + +class pyPlcWorker(): + def __init__(self, callbackAddToTrace=None, callbackShowStatus=None): + print("initializing pyPlcWorker") + self.something = "Hallo das ist ein Test" + self.nMainFunctionCalls=0 + self.strUserAction = "" + self.callbackAddToTrace = callbackAddToTrace + self.callbackShowStatus = callbackShowStatus + self.hp = pyPlcHomeplug.pyPlcHomeplug(self.callbackAddToTrace, self.callbackShowStatus) + + def addToTrace(self, s): + self.callbackAddToTrace(s) + + def showStatus(self, s): + self.callbackShowStatus(s) + + def mainfunction(self): + self.nMainFunctionCalls+=1 + self.showStatus("pyPlcWorker loop " + str(self.nMainFunctionCalls)) + #self.hp.mainfunction() + + + + def handleUserAction(self, strAction): + self.strUserAction = strAction + self.addToTrace("UserAction " + strAction) + if (strAction == "t"): + self.addToTrace("sending test frame") + self.hp.sendTestFrame() + diff --git a/test1.py b/test1.py deleted file mode 100644 index 8a29618..0000000 --- a/test1.py +++ /dev/null @@ -1,72 +0,0 @@ - -# Vorraussetzungen: -# Library pcap-ct (nicht libpcap, nicht pylibpcap, nicht pypcap) -# -# Stand 2022-08-14: -# - Auswahl des Interaces ok -# - Sniffen des SLAC-request ok -# - Aussenden eine Demo-Botschaft ok -# -import pcap - -def twoCharHex(b): - strHex = "%0.2X" % b - return strHex - -def showAsHex(mybytearray): - packetlength = len(mybytearray) - strHex = "" - for i in range(0, packetlength): - strHex = strHex + twoCharHex(mybytearray[i]) + " " - print("len " + str(packetlength) + " data " + strHex) - -def showIpAddresses(mybytearray): - addr = lambda pkt, offset: '.'.join(str(pkt[i]) for i in range(offset, offset + 4)) - print('SRC %-16s\tDST %-16s' % (addr(mybytearray, sniffer.dloff + 12), addr(mybytearray, sniffer.dloff + 16))) - -def showMacAddresses(mybytearray): - strDestMac = "" - for i in range(0, 6): - strDestMac = strDestMac + twoCharHex(mybytearray[i]) + ":" - strSourceMac = "" - for i in range(5, 12): - strSourceMac = strSourceMac + twoCharHex(mybytearray[i]) + ":" - lastThreeOfSource = mybytearray[6]*256*256 + mybytearray[7]*256 + mybytearray[8] - strSourceFriendlyName = "" - if (lastThreeOfSource == 0x0a663a): - strSourceFriendlyName="Fritzbox" - if (lastThreeOfSource == 0x0064c3): - strSourceFriendlyName="Ioniq" - - print("From " + strSourceMac + strSourceFriendlyName + " to " + strDestMac) - -def isHomeplug(mybytearray): - blIsHomePlug=False - if len(mybytearray)>(6+6+2): - protocol=mybytearray[12]*256 + mybytearray[13] - if (protocol == 0x88E1): - blIsHomePlug=True - print("HomePlug protocol") - return blIsHomePlug - -def sendTestFrame(): - print("todo...") - -mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8') - -#sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50) -# eth3 bedeutet: Dritter Eintrag von hinten, in der Liste der Interfaces, die von pcap.findalldevs geliefert wird. -# Verbesserungsbedarf: Interface namensbasiert auswählen. -sniffer = pcap.pcap(name="eth3", promisc=True, immediate=True, timeout_ms=50) -print("Press control-C to stop") -for ts, pkt in sniffer: - #print('%d' % (ts)) - if (isHomeplug(pkt)): - showMacAddresses(pkt) - showAsHex(pkt) - sendTestFrame() - sniffer.sendpacket(bytes(mytransmitbuffer)) - else: - print("(other)") - -sniffer.close() \ No newline at end of file