mirror of
https://github.com/uhi22/pyPLC.git
synced 2024-11-10 01:05:42 +00:00
added tkinter top level module and submodules
This commit is contained in:
parent
9b8734d4e7
commit
d29ca27a78
4 changed files with 259 additions and 72 deletions
58
pyPlc.py
Normal file
58
pyPlc.py
Normal file
|
@ -0,0 +1,58 @@
|
||||||
|
|
||||||
|
# This is a simple Tkinter program, running a main loop and reacting on keys
|
||||||
|
#
|
||||||
|
# Tested on Windows10 with python 3.9
|
||||||
|
#
|
||||||
|
# https://groups.google.com/g/comp.lang.python/c/dldnjWRX3lE/m/cL69gG3fCAAJ
|
||||||
|
|
||||||
|
#------------------------------------------------------------
|
||||||
|
import tkinter as tk
|
||||||
|
import time
|
||||||
|
import pyPlcWorker
|
||||||
|
|
||||||
|
def storekeyname(event):
|
||||||
|
global nKeystrokes
|
||||||
|
global lastKey
|
||||||
|
nKeystrokes+=1
|
||||||
|
lastKey = event.keysym
|
||||||
|
worker.handleUserAction(lastKey)
|
||||||
|
return 'break' # swallow the event
|
||||||
|
|
||||||
|
def inkey():
|
||||||
|
global lastKey
|
||||||
|
return lastKey
|
||||||
|
lastKey = ''
|
||||||
|
|
||||||
|
def cbAddToTrace(s):
|
||||||
|
print(s)
|
||||||
|
|
||||||
|
def cbShowStatus(s):
|
||||||
|
print(s)
|
||||||
|
lblStatus['text']=s
|
||||||
|
root.update()
|
||||||
|
|
||||||
|
root = tk.Tk()
|
||||||
|
lastKey = ''
|
||||||
|
display = tk.Label(root, text='No Key', width=30) # A textual element in the graphical user interface
|
||||||
|
display.pack()
|
||||||
|
lblHelp = tk.Label(root, text="x=exit, t=testframe")
|
||||||
|
lblHelp.pack()
|
||||||
|
lblStatus = tk.Label(root, text="(Status)")
|
||||||
|
lblStatus.pack()
|
||||||
|
# Bind the keyboard handler to all relevant elements:
|
||||||
|
display.bind('<Key>', storekeyname)
|
||||||
|
root.bind('<Key>', storekeyname)
|
||||||
|
cbShowStatus("initialized")
|
||||||
|
root.update()
|
||||||
|
worker=pyPlcWorker.pyPlcWorker(cbAddToTrace, cbShowStatus)
|
||||||
|
|
||||||
|
nMainloops=0
|
||||||
|
nKeystrokes=0
|
||||||
|
while lastKey!="x":
|
||||||
|
time.sleep(.3) # 'do some calculation'
|
||||||
|
nMainloops+=1
|
||||||
|
# print(str(nMainloops) + " " + str(nKeystrokes)) # show something in the console window
|
||||||
|
root.update()
|
||||||
|
worker.mainfunction()
|
||||||
|
|
||||||
|
#---------------------------------------------------------------
|
160
pyPlcHomeplug.py
Normal file
160
pyPlcHomeplug.py
Normal file
|
@ -0,0 +1,160 @@
|
||||||
|
|
||||||
|
# Preconditions:
|
||||||
|
# Library pcap-ct (not libpcap, not pylibpcap, not pypcap)
|
||||||
|
#
|
||||||
|
# Version 2022-08-14:
|
||||||
|
# - Selection of interfaces ok
|
||||||
|
# - Sniffing of the SLAC-request ok
|
||||||
|
# - Transmission of a demo message ok
|
||||||
|
#
|
||||||
|
import pcap
|
||||||
|
|
||||||
|
def twoCharHex(b):
|
||||||
|
strHex = "%0.2X" % b
|
||||||
|
return strHex
|
||||||
|
|
||||||
|
def showAsHex(mybytearray):
|
||||||
|
packetlength = len(mybytearray)
|
||||||
|
strHex = ""
|
||||||
|
for i in range(0, packetlength):
|
||||||
|
strHex = strHex + twoCharHex(mybytearray[i]) + " "
|
||||||
|
print("len " + str(packetlength) + " data " + strHex)
|
||||||
|
|
||||||
|
|
||||||
|
class pyPlcHomeplug():
|
||||||
|
def showIpAddresses(self, mybytearray):
|
||||||
|
addr = lambda pkt, offset: '.'.join(str(pkt[i]) for i in range(offset, offset + 4))
|
||||||
|
print('SRC %-16s\tDST %-16s' % (addr(mybytearray, self.sniffer.dloff + 12), addr(mybytearray, self.sniffer.dloff + 16)))
|
||||||
|
|
||||||
|
def showMacAddresses(self, mybytearray):
|
||||||
|
strDestMac = ""
|
||||||
|
for i in range(0, 6):
|
||||||
|
strDestMac = strDestMac + twoCharHex(mybytearray[i]) + ":"
|
||||||
|
strSourceMac = ""
|
||||||
|
for i in range(5, 12):
|
||||||
|
strSourceMac = strSourceMac + twoCharHex(mybytearray[i]) + ":"
|
||||||
|
lastThreeOfSource = mybytearray[6]*256*256 + mybytearray[7]*256 + mybytearray[8]
|
||||||
|
strSourceFriendlyName = ""
|
||||||
|
if (lastThreeOfSource == 0x0a663a):
|
||||||
|
strSourceFriendlyName="Fritzbox"
|
||||||
|
if (lastThreeOfSource == 0x0064c3):
|
||||||
|
strSourceFriendlyName="Ioniq"
|
||||||
|
|
||||||
|
print("From " + strSourceMac + strSourceFriendlyName + " to " + strDestMac)
|
||||||
|
|
||||||
|
def isHomeplug(self, mybytearray):
|
||||||
|
blIsHomePlug=False
|
||||||
|
if len(mybytearray)>(6+6+2):
|
||||||
|
protocol=mybytearray[12]*256 + mybytearray[13]
|
||||||
|
if (protocol == 0x88E1):
|
||||||
|
blIsHomePlug=True
|
||||||
|
print("HomePlug protocol")
|
||||||
|
return blIsHomePlug
|
||||||
|
|
||||||
|
def composeTestFrame(self):
|
||||||
|
self.mytransmitbuffer = bytearray(60)
|
||||||
|
# Destination MAC
|
||||||
|
self.mytransmitbuffer[0]=0x04 # Ioniq MAC
|
||||||
|
self.mytransmitbuffer[1]=0x65
|
||||||
|
self.mytransmitbuffer[2]=0x65
|
||||||
|
self.mytransmitbuffer[3]=0x00
|
||||||
|
self.mytransmitbuffer[4]=0x64
|
||||||
|
self.mytransmitbuffer[5]=0xC3
|
||||||
|
|
||||||
|
# Source MAC
|
||||||
|
self.mytransmitbuffer[6]=0x0A # Alpi MAC
|
||||||
|
self.mytransmitbuffer[7]=0x19
|
||||||
|
self.mytransmitbuffer[8]=0x4A
|
||||||
|
self.mytransmitbuffer[9]=0x39
|
||||||
|
self.mytransmitbuffer[10]=0xD6
|
||||||
|
self.mytransmitbuffer[11]=0x98
|
||||||
|
|
||||||
|
# Protocol
|
||||||
|
self.mytransmitbuffer[12]=0x88 # Protocol HomeplugAV
|
||||||
|
self.mytransmitbuffer[13]=0xE1
|
||||||
|
|
||||||
|
self.mytransmitbuffer[14]=0x01 # version
|
||||||
|
self.mytransmitbuffer[15]=0x65 # SLAC_PARAM.confirm
|
||||||
|
|
||||||
|
self.mytransmitbuffer[16]=0x60 #
|
||||||
|
self.mytransmitbuffer[17]=0x00 #
|
||||||
|
self.mytransmitbuffer[18]=0x00 #
|
||||||
|
self.mytransmitbuffer[19]=0xff #
|
||||||
|
self.mytransmitbuffer[20]=0xff #
|
||||||
|
self.mytransmitbuffer[21]=0xff #
|
||||||
|
self.mytransmitbuffer[22]=0xff #
|
||||||
|
self.mytransmitbuffer[23]=0xff #
|
||||||
|
self.mytransmitbuffer[24]=0xff #
|
||||||
|
self.mytransmitbuffer[25]=0x0A #
|
||||||
|
self.mytransmitbuffer[26]=0x06 #
|
||||||
|
self.mytransmitbuffer[27]=0x01 #
|
||||||
|
self.mytransmitbuffer[28]=0x04 #
|
||||||
|
self.mytransmitbuffer[29]=0x65 #
|
||||||
|
self.mytransmitbuffer[30]=0x65 #
|
||||||
|
self.mytransmitbuffer[31]=0x00 #
|
||||||
|
|
||||||
|
self.mytransmitbuffer[32]=0x64 #
|
||||||
|
self.mytransmitbuffer[33]=0xC3 #
|
||||||
|
self.mytransmitbuffer[34]=0x00 #
|
||||||
|
self.mytransmitbuffer[35]=0x00 #
|
||||||
|
self.mytransmitbuffer[36]=0x04 #
|
||||||
|
self.mytransmitbuffer[37]=0x65 #
|
||||||
|
self.mytransmitbuffer[38]=0x65 #
|
||||||
|
self.mytransmitbuffer[39]=0x00 #
|
||||||
|
self.mytransmitbuffer[40]=0x64 #
|
||||||
|
self.mytransmitbuffer[41]=0xC3 #
|
||||||
|
self.mytransmitbuffer[42]=0x0 #
|
||||||
|
self.mytransmitbuffer[43]=0x0 #
|
||||||
|
self.mytransmitbuffer[44]=0x0 #
|
||||||
|
self.mytransmitbuffer[45]=0x0 #
|
||||||
|
self.mytransmitbuffer[46]=0x0 #
|
||||||
|
self.mytransmitbuffer[47]=0x0 #
|
||||||
|
|
||||||
|
self.mytransmitbuffer[48]=0x0 #
|
||||||
|
self.mytransmitbuffer[49]=0x0 #
|
||||||
|
self.mytransmitbuffer[50]=0x0 #
|
||||||
|
self.mytransmitbuffer[51]=0x0 #
|
||||||
|
self.mytransmitbuffer[52]=0x0 #
|
||||||
|
self.mytransmitbuffer[53]=0x0 #
|
||||||
|
self.mytransmitbuffer[54]=0x0 #
|
||||||
|
self.mytransmitbuffer[55]=0x0 #
|
||||||
|
self.mytransmitbuffer[56]=0x0 #
|
||||||
|
self.mytransmitbuffer[57]=0x0 #
|
||||||
|
self.mytransmitbuffer[58]=0x0 #
|
||||||
|
self.mytransmitbuffer[59]=0x0 #
|
||||||
|
|
||||||
|
def sendTestFrame(self):
|
||||||
|
addToTrace("transmitting test frame...")
|
||||||
|
self.composeTestFrame()
|
||||||
|
self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
|
||||||
|
|
||||||
|
def __init__(self, callbackAddToTrace=None, callbackShowStatus=None):
|
||||||
|
self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8')
|
||||||
|
self.callbackAddToTrace = callbackAddToTrace
|
||||||
|
self.callbackShowStatus = callbackShowStatus #self.sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50)
|
||||||
|
# eth3 bedeutet: Dritter Eintrag von hinten, in der Liste der Interfaces, die von pcap.findalldevs geliefert wird.
|
||||||
|
# Verbesserungsbedarf: Interface namensbasiert auswählen.
|
||||||
|
self.sniffer = pcap.pcap(name="eth3", promisc=True, immediate=True, timeout_ms=50)
|
||||||
|
|
||||||
|
def addToTrace(s):
|
||||||
|
self.callbackAddToTrace(s)
|
||||||
|
|
||||||
|
def mainfunction(self):
|
||||||
|
|
||||||
|
for ts, pkt in self.sniffer:
|
||||||
|
#print('%d' % (ts))
|
||||||
|
if (self.isHomeplug(pkt)):
|
||||||
|
self.showMacAddresses(pkt)
|
||||||
|
showAsHex(pkt)
|
||||||
|
if (pkt[15]==0x64): #SLAC_Request
|
||||||
|
self.sendTestFrame()
|
||||||
|
|
||||||
|
else:
|
||||||
|
addToTrace("(other)")
|
||||||
|
def close(self):
|
||||||
|
self.sniffer.close()
|
||||||
|
|
||||||
|
#sn = pyPlcHomeplug()
|
||||||
|
#while (1):
|
||||||
|
# print("Press control-C to stop")
|
||||||
|
# sn.mainfunction()
|
41
pyPlcWorker.py
Normal file
41
pyPlcWorker.py
Normal file
|
@ -0,0 +1,41 @@
|
||||||
|
# Worker for the pyPLC
|
||||||
|
#
|
||||||
|
# Tested on Windows10 with python 3.9
|
||||||
|
#
|
||||||
|
|
||||||
|
#------------------------------------------------------------
|
||||||
|
import pyPlcHomeplug
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class pyPlcWorker():
|
||||||
|
def __init__(self, callbackAddToTrace=None, callbackShowStatus=None):
|
||||||
|
print("initializing pyPlcWorker")
|
||||||
|
self.something = "Hallo das ist ein Test"
|
||||||
|
self.nMainFunctionCalls=0
|
||||||
|
self.strUserAction = ""
|
||||||
|
self.callbackAddToTrace = callbackAddToTrace
|
||||||
|
self.callbackShowStatus = callbackShowStatus
|
||||||
|
self.hp = pyPlcHomeplug.pyPlcHomeplug(self.callbackAddToTrace, self.callbackShowStatus)
|
||||||
|
|
||||||
|
def addToTrace(self, s):
|
||||||
|
self.callbackAddToTrace(s)
|
||||||
|
|
||||||
|
def showStatus(self, s):
|
||||||
|
self.callbackShowStatus(s)
|
||||||
|
|
||||||
|
def mainfunction(self):
|
||||||
|
self.nMainFunctionCalls+=1
|
||||||
|
self.showStatus("pyPlcWorker loop " + str(self.nMainFunctionCalls))
|
||||||
|
#self.hp.mainfunction()
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def handleUserAction(self, strAction):
|
||||||
|
self.strUserAction = strAction
|
||||||
|
self.addToTrace("UserAction " + strAction)
|
||||||
|
if (strAction == "t"):
|
||||||
|
self.addToTrace("sending test frame")
|
||||||
|
self.hp.sendTestFrame()
|
||||||
|
|
72
test1.py
72
test1.py
|
@ -1,72 +0,0 @@
|
||||||
|
|
||||||
# Vorraussetzungen:
|
|
||||||
# Library pcap-ct (nicht libpcap, nicht pylibpcap, nicht pypcap)
|
|
||||||
#
|
|
||||||
# Stand 2022-08-14:
|
|
||||||
# - Auswahl des Interaces ok
|
|
||||||
# - Sniffen des SLAC-request ok
|
|
||||||
# - Aussenden eine Demo-Botschaft ok
|
|
||||||
#
|
|
||||||
import pcap
|
|
||||||
|
|
||||||
def twoCharHex(b):
|
|
||||||
strHex = "%0.2X" % b
|
|
||||||
return strHex
|
|
||||||
|
|
||||||
def showAsHex(mybytearray):
|
|
||||||
packetlength = len(mybytearray)
|
|
||||||
strHex = ""
|
|
||||||
for i in range(0, packetlength):
|
|
||||||
strHex = strHex + twoCharHex(mybytearray[i]) + " "
|
|
||||||
print("len " + str(packetlength) + " data " + strHex)
|
|
||||||
|
|
||||||
def showIpAddresses(mybytearray):
|
|
||||||
addr = lambda pkt, offset: '.'.join(str(pkt[i]) for i in range(offset, offset + 4))
|
|
||||||
print('SRC %-16s\tDST %-16s' % (addr(mybytearray, sniffer.dloff + 12), addr(mybytearray, sniffer.dloff + 16)))
|
|
||||||
|
|
||||||
def showMacAddresses(mybytearray):
|
|
||||||
strDestMac = ""
|
|
||||||
for i in range(0, 6):
|
|
||||||
strDestMac = strDestMac + twoCharHex(mybytearray[i]) + ":"
|
|
||||||
strSourceMac = ""
|
|
||||||
for i in range(5, 12):
|
|
||||||
strSourceMac = strSourceMac + twoCharHex(mybytearray[i]) + ":"
|
|
||||||
lastThreeOfSource = mybytearray[6]*256*256 + mybytearray[7]*256 + mybytearray[8]
|
|
||||||
strSourceFriendlyName = ""
|
|
||||||
if (lastThreeOfSource == 0x0a663a):
|
|
||||||
strSourceFriendlyName="Fritzbox"
|
|
||||||
if (lastThreeOfSource == 0x0064c3):
|
|
||||||
strSourceFriendlyName="Ioniq"
|
|
||||||
|
|
||||||
print("From " + strSourceMac + strSourceFriendlyName + " to " + strDestMac)
|
|
||||||
|
|
||||||
def isHomeplug(mybytearray):
|
|
||||||
blIsHomePlug=False
|
|
||||||
if len(mybytearray)>(6+6+2):
|
|
||||||
protocol=mybytearray[12]*256 + mybytearray[13]
|
|
||||||
if (protocol == 0x88E1):
|
|
||||||
blIsHomePlug=True
|
|
||||||
print("HomePlug protocol")
|
|
||||||
return blIsHomePlug
|
|
||||||
|
|
||||||
def sendTestFrame():
|
|
||||||
print("todo...")
|
|
||||||
|
|
||||||
mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8')
|
|
||||||
|
|
||||||
#sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50)
|
|
||||||
# eth3 bedeutet: Dritter Eintrag von hinten, in der Liste der Interfaces, die von pcap.findalldevs geliefert wird.
|
|
||||||
# Verbesserungsbedarf: Interface namensbasiert auswählen.
|
|
||||||
sniffer = pcap.pcap(name="eth3", promisc=True, immediate=True, timeout_ms=50)
|
|
||||||
print("Press control-C to stop")
|
|
||||||
for ts, pkt in sniffer:
|
|
||||||
#print('%d' % (ts))
|
|
||||||
if (isHomeplug(pkt)):
|
|
||||||
showMacAddresses(pkt)
|
|
||||||
showAsHex(pkt)
|
|
||||||
sendTestFrame()
|
|
||||||
sniffer.sendpacket(bytes(mytransmitbuffer))
|
|
||||||
else:
|
|
||||||
print("(other)")
|
|
||||||
|
|
||||||
sniffer.close()
|
|
Loading…
Reference in a new issue