diff --git a/demo_pcap.py b/demo_pcap.py new file mode 100644 index 0000000..c9e637b --- /dev/null +++ b/demo_pcap.py @@ -0,0 +1,15 @@ +import pcap + + +print("Interfaces:\n" + '\n'.join(pcap.findalldevs())) +print("ex_name") +for i in range(0, 10): + strInterfaceName = pcap.ex_name("eth"+str(i)) + if (strInterfaceName == '\\Device\\NPF_{E4B8176C-8516-4D48-88BC-85225ABCF259}'): + print("This is the wanted Ethernet adaptor.") + print("eth"+ str(i) + " is " + strInterfaceName) + +sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50) +addr = lambda pkt, offset: '.'.join(str(pkt[i]) for i in range(offset, offset + 4)) +for ts, pkt in sniffer: + print('%d\tSRC %-16s\tDST %-16s' % (ts, addr(pkt, sniffer.dloff + 12), addr(pkt, sniffer.dloff + 16))) diff --git a/pyPlcHomeplug.py b/pyPlcHomeplug.py index d56bced..879303b 100644 --- a/pyPlcHomeplug.py +++ b/pyPlcHomeplug.py @@ -127,30 +127,63 @@ class pyPlcHomeplug(): addToTrace("transmitting test frame...") self.composeTestFrame() self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) + + def findEthernetAdaptor(self): + self.strInterfaceName="eth0" # default, if the real is not found + print("Interfaces:\n" + '\n'.join(pcap.findalldevs())) + for i in range(0, 10): + strInterfaceName = pcap.ex_name("eth"+str(i)) + if (strInterfaceName == '\\Device\\NPF_{E4B8176C-8516-4D48-88BC-85225ABCF259}'): + print("This is the wanted Ethernet adaptor.") + self.strInterfaceName="eth"+str(i) + print("eth"+ str(i) + " is " + strInterfaceName) def __init__(self, callbackAddToTrace=None, callbackShowStatus=None): self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8') + self.nPacketsReceived = 0 self.callbackAddToTrace = callbackAddToTrace - self.callbackShowStatus = callbackShowStatus #self.sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50) - # eth3 bedeutet: Dritter Eintrag von hinten, in der Liste der Interfaces, die von pcap.findalldevs geliefert wird. - # Verbesserungsbedarf: Interface namensbasiert auswählen. - self.sniffer = pcap.pcap(name="eth3", promisc=True, immediate=True, timeout_ms=50) + self.callbackShowStatus = callbackShowStatus + #self.sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50) + # eth3 means: Third entry from back, in the list of interfaces, which is provided by pcap.findalldevs. + # Improvement necessary: select the interface based on the name. + # For debugging of the interface names, we can patch the file + # C:\Users\uwemi\AppData\Local\Packages\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\LocalCache\local-packages\Python310\site-packages\pcap\_pcap_ex.py, + # in the function + # def name(name: bytes) -> bytes: + # in the place after + # if i == idx: + # print("index match at " + str(i) + " dev name=" + str(dev.name) + " dev.description=" + str(dev.description)) + # This will print the description of the used interface. + # + # Patch for non-blocking read-iteration: + # in _pcap.py, function def __next__(self), in the case of timeout (if n==0), we need to "raise StopIteration" instead of "continue". + # + self.findEthernetAdaptor() + self.sniffer = pcap.pcap(name=self.strInterfaceName, promisc=True, immediate=True, timeout_ms=50) + self.sniffer.setnonblock(True) + print("sniffer created at " + self.strInterfaceName) def addToTrace(s): self.callbackAddToTrace(s) + + def showStatus(self, s): + self.callbackShowStatus(s) - def mainfunction(self): + def mainfunction(self): + # print("will evaluate self.sniffer") + for ts, pkt in self.sniffer: # attention: for using this in non-blocking manner, we need the patch described above. + self.nPacketsReceived+=1 + print('%d' % (ts)) + #if (self.isHomeplug(pkt)): + # self.showMacAddresses(pkt) + # showAsHex(pkt) + # if (pkt[15]==0x64): #SLAC_Request + # self.sendTestFrame() + # + #else: + # addToTrace("(other)") + self.showStatus("nPacketsReceived=" + str(self.nPacketsReceived)) - for ts, pkt in self.sniffer: - #print('%d' % (ts)) - if (self.isHomeplug(pkt)): - self.showMacAddresses(pkt) - showAsHex(pkt) - if (pkt[15]==0x64): #SLAC_Request - self.sendTestFrame() - - else: - addToTrace("(other)") def close(self): self.sniffer.close() diff --git a/pyPlcWorker.py b/pyPlcWorker.py index f8e7dc6..7a5e944 100644 --- a/pyPlcWorker.py +++ b/pyPlcWorker.py @@ -6,9 +6,6 @@ #------------------------------------------------------------ import pyPlcHomeplug - - - class pyPlcWorker(): def __init__(self, callbackAddToTrace=None, callbackShowStatus=None): print("initializing pyPlcWorker") @@ -27,11 +24,9 @@ class pyPlcWorker(): def mainfunction(self): self.nMainFunctionCalls+=1 - self.showStatus("pyPlcWorker loop " + str(self.nMainFunctionCalls)) - #self.hp.mainfunction() + #self.showStatus("pyPlcWorker loop " + str(self.nMainFunctionCalls)) + self.hp.mainfunction() # call the lower-level worker - - def handleUserAction(self, strAction): self.strUserAction = strAction self.addToTrace("UserAction " + strAction)