Added fsmPev. Restructured mode handling. Fitted state machines to main program.

This commit is contained in:
uhi22 2022-11-07 09:21:25 +01:00
parent 0318d08d37
commit ff0470befb
7 changed files with 163 additions and 8 deletions

View file

@ -105,6 +105,12 @@ class fsmEvse():
stateWaitForPowerDeliveryRequest: stateFunctionWaitForPowerDeliveryRequest, stateWaitForPowerDeliveryRequest: stateFunctionWaitForPowerDeliveryRequest,
} }
def reInit(self):
print("re-initializing fsmEvse")
self.state = 0
self.cyclesInState = 0
self.rxData = []
def __init__(self): def __init__(self):
print("initializing fsmEvse") print("initializing fsmEvse")
self.Tcp = pyPlcTcpSocket.pyPlcTcpServerSocket() self.Tcp = pyPlcTcpSocket.pyPlcTcpServerSocket()

83
fsmPev.py Normal file
View file

@ -0,0 +1,83 @@
# State machine for the car
#
#
#------------------------------------------------------------
import pyPlcTcpSocket
import time # for time.sleep()
stateInitialized = 0
stateWaitForSupportedApplicationProtocolResponse = 1
stateWaitForSessionSetupResponse = 2
stateWaitForServiceDiscoveryResponse = 3
stateWaitForPaymentServiceSelectionResponse = 4
stateWaitForAuthorizationResponse = 5
stateWaitForChargeParameterResponse = 6
stateWaitForCableCheckResponse = 7
stateWaitForPreChargeResponse = 8
stateWaitForPowerDeliveryResponse = 9
class fsmPev():
def enterState(self, n):
print("from " + str(self.state) + " entering " + str(n))
self.state = n
self.cyclesInState = 0
def isTooLong(self):
# The timeout handling function.
return (self.cyclesInState > 20)
def stateFunctionInitialized(self):
if (self.Tcp.isConnected):
self.Tcp.transmit(bytes("TestFromPevInInitialized", "utf-8"))
self.enterState(stateWaitForSupportedApplicationProtocolResponse)
def stateFunctionWaitForSupportedApplicationProtocolResponse(self):
if (len(self.rxData)>0):
pass
stateFunctions = {
stateInitialized: stateFunctionInitialized,
stateWaitForSupportedApplicationProtocolResponse: stateFunctionWaitForSupportedApplicationProtocolResponse,
}
def reInit(self):
print("re-initializing fsmPev")
self.state = stateInitialized
self.cyclesInState = 0
self.rxData = []
if (not self.Tcp.isConnected):
self.Tcp.connect('fe80::e0ad:99ac:52eb:85d3', 15118)
if (not self.Tcp.isConnected):
print("connection failed")
else:
print("connected")
def __init__(self):
print("initializing fsmPev")
self.Tcp = pyPlcTcpSocket.pyPlcTcpClientSocket()
self.reInit()
def mainfunction(self):
#self.Tcp.mainfunction() # call the lower-level worker
if (self.Tcp.isRxDataAvailable()):
self.rxData = self.Tcp.getRxData()
print("received " + str(self.rxData))
#msg = "ok, you sent " + str(self.rxData)
#print("responding " + msg)
#self.Tcp.transmit(bytes(msg, "utf-8"))
# run the state machine:
self.cyclesInState += 1 # for timeout handling, count how long we are in a state
self.stateFunctions[self.state](self)
if __name__ == "__main__":
print("Testing the pev state machine")
pev = fsmPev()
print("Press Ctrl-Break for aborting")
while (True):
time.sleep(0.1)
pev.mainfunction()

View file

@ -9,6 +9,8 @@
import tkinter as tk import tkinter as tk
import time import time
import pyPlcWorker import pyPlcWorker
from pyPlcModes import *
import sys # for argv
def storekeyname(event): def storekeyname(event):
global nKeystrokes global nKeystrokes
@ -38,6 +40,21 @@ def cbShowStatus(s, selection=""):
lblStatus['text']=s lblStatus['text']=s
root.update() root.update()
myMode = C_LISTEN_MODE
if (len(sys.argv) > 1):
if (sys.argv[1] == "P"):
myMode = C_PEV_MODE
else:
if (sys.argv[1] == "E"):
myMode = C_EVSE_MODE
if (myMode == C_LISTEN_MODE):
print("starting in LISTEN_MODE")
if (myMode == C_PEV_MODE):
print("starting in PEV_MODE")
if (myMode == C_EVSE_MODE):
print("starting in EVSE_MODE")
root = tk.Tk() root = tk.Tk()
lastKey = '' lastKey = ''
lblHelp = tk.Label(root, justify= "left") lblHelp = tk.Label(root, justify= "left")
@ -54,7 +71,7 @@ lblMode.pack()
root.bind('<Key>', storekeyname) root.bind('<Key>', storekeyname)
cbShowStatus("initialized") cbShowStatus("initialized")
root.update() root.update()
worker=pyPlcWorker.pyPlcWorker(cbAddToTrace, cbShowStatus) worker=pyPlcWorker.pyPlcWorker(cbAddToTrace, cbShowStatus, myMode)
nMainloops=0 nMainloops=0
nKeystrokes=0 nKeystrokes=0

View file

@ -36,6 +36,7 @@
import pcap import pcap
import pyPlcIpv6 import pyPlcIpv6
from helpers import * # prettyMac etc from helpers import * # prettyMac etc
from pyPlcModes import *
MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ] MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ]
MAC_LAPTOP = [0xdc, 0x0e, 0xa1, 0x11, 0x67, 0x08 ] # Win10 laptop MAC_LAPTOP = [0xdc, 0x0e, 0xa1, 0x11, 0x67, 0x08 ] # Win10 laptop
@ -532,7 +533,7 @@ class pyPlcHomeplug():
if (strInterfaceName == '\\Device\\NPF_{E4B8176C-8516-4D48-88BC-85225ABCF259}'): if (strInterfaceName == '\\Device\\NPF_{E4B8176C-8516-4D48-88BC-85225ABCF259}'):
print("This is the wanted Ethernet adaptor.") print("This is the wanted Ethernet adaptor.")
self.strInterfaceName="eth"+str(i) self.strInterfaceName="eth"+str(i)
print("eth"+ str(i) + " is " + strInterfaceName) #print("eth"+ str(i) + " is " + strInterfaceName)
def enterPevMode(self): def enterPevMode(self):
self.iAmEvse = 0 # not emulating a charging station self.iAmEvse = 0 # not emulating a charging station
@ -550,7 +551,7 @@ class pyPlcHomeplug():
self.ipv6.enterListenMode() self.ipv6.enterListenMode()
self.showStatus("LISTEN mode", "mode") self.showStatus("LISTEN mode", "mode")
def __init__(self, callbackAddToTrace=None, callbackShowStatus=None): def __init__(self, callbackAddToTrace=None, callbackShowStatus=None, mode=C_LISTEN_MODE):
self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8') self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8')
self.nPacketsReceived = 0 self.nPacketsReceived = 0
self.callbackAddToTrace = callbackAddToTrace self.callbackAddToTrace = callbackAddToTrace
@ -580,7 +581,12 @@ class pyPlcHomeplug():
self.runningCounter=0 self.runningCounter=0
self.ipv6 = pyPlcIpv6.ipv6handler(self.transmit) self.ipv6 = pyPlcIpv6.ipv6handler(self.transmit)
self.ipv6.ownMac = self.myMAC self.ipv6.ownMac = self.myMAC
if (mode == C_LISTEN_MODE):
self.enterListenMode()
if (mode == C_EVSE_MODE):
self.enterEvseMode() self.enterEvseMode()
if (mode == C_PEV_MODE):
self.enterPevMode()
self.showStatus(prettyMac(self.pevMac), "pevmac") self.showStatus(prettyMac(self.pevMac), "pevmac")
print("sniffer created at " + self.strInterfaceName) print("sniffer created at " + self.strInterfaceName)

5
pyPlcModes.py Normal file
View file

@ -0,0 +1,5 @@
C_LISTEN_MODE = 0 # listen to the communication. No active participation.
C_EVSE_MODE = 1 # play the role of the charger
C_PEV_MODE = 2 # play the role of the car

View file

@ -14,7 +14,7 @@ import sys # for argv
import time # for time.sleep() import time # for time.sleep()
import errno import errno
class pyPlcClientSocket(): class pyPlcTcpClientSocket():
def __init__(self): def __init__(self):
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) self.sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
self.isConnected = False self.isConnected = False
@ -189,7 +189,7 @@ def testServerSocket():
def testClientSocket(): def testClientSocket():
print("Testing the pyPlcTcpClientSocket...") print("Testing the pyPlcTcpClientSocket...")
c = pyPlcClientSocket() c = pyPlcTcpClientSocket()
c.connect('fe80::e0ad:99ac:52eb:85d3', 15118) c.connect('fe80::e0ad:99ac:52eb:85d3', 15118)
print("connected="+str(c.isConnected)) print("connected="+str(c.isConnected))
print("sending something to the server") print("sending something to the server")

View file

@ -5,15 +5,23 @@
#------------------------------------------------------------ #------------------------------------------------------------
import pyPlcHomeplug import pyPlcHomeplug
import fsmEvse
import fsmPev
from pyPlcModes import *
class pyPlcWorker(): class pyPlcWorker():
def __init__(self, callbackAddToTrace=None, callbackShowStatus=None): def __init__(self, callbackAddToTrace=None, callbackShowStatus=None, mode=C_EVSE_MODE):
print("initializing pyPlcWorker") print("initializing pyPlcWorker")
self.nMainFunctionCalls=0 self.nMainFunctionCalls=0
self.mode = mode
self.strUserAction = "" self.strUserAction = ""
self.callbackAddToTrace = callbackAddToTrace self.callbackAddToTrace = callbackAddToTrace
self.callbackShowStatus = callbackShowStatus self.callbackShowStatus = callbackShowStatus
self.hp = pyPlcHomeplug.pyPlcHomeplug(self.callbackAddToTrace, self.callbackShowStatus) self.hp = pyPlcHomeplug.pyPlcHomeplug(self.callbackAddToTrace, self.callbackShowStatus, self.mode)
if (self.mode == C_EVSE_MODE):
self.evse = fsmEvse.fsmEvse()
if (self.mode == C_PEV_MODE):
self.pev = fsmPev.fsmPev()
def addToTrace(self, s): def addToTrace(self, s):
self.callbackAddToTrace(s) self.callbackAddToTrace(s)
@ -25,15 +33,45 @@ class pyPlcWorker():
self.nMainFunctionCalls+=1 self.nMainFunctionCalls+=1
#self.showStatus("pyPlcWorker loop " + str(self.nMainFunctionCalls)) #self.showStatus("pyPlcWorker loop " + str(self.nMainFunctionCalls))
self.hp.mainfunction() # call the lower-level worker self.hp.mainfunction() # call the lower-level worker
if (self.mode == C_EVSE_MODE):
self.evse.mainfunction() # call the evse state machine
if (self.mode == C_PEV_MODE):
self.pev.mainfunction() # call the pev state machine
def handleUserAction(self, strAction): def handleUserAction(self, strAction):
self.strUserAction = strAction self.strUserAction = strAction
if (strAction == "P"): if (strAction == "P"):
print("switching to PEV mode")
self.mode = C_PEV_MODE
if (hasattr(self, 'evse')):
print("deleting evse")
del self.evse
self.hp.enterPevMode() self.hp.enterPevMode()
if (not hasattr(self, 'pev')):
print("creating pev")
self.pev = fsmPev.fsmPev()
self.pev.reInit()
if (strAction == "E"): if (strAction == "E"):
print("switching to EVSE mode")
self.mode = C_EVSE_MODE
if (hasattr(self, 'pev')):
print("deleting pev")
del self.pev
self.hp.enterEvseMode() self.hp.enterEvseMode()
if (not hasattr(self, 'evse')):
print("creating fsmEvse")
self.evse = fsmEvse.fsmEvse()
self.evse.reInit()
if (strAction == "L"): if (strAction == "L"):
print("switching to LISTEN mode")
self.mode = C_LISTEN_MODE
self.hp.enterListenMode() self.hp.enterListenMode()
if (hasattr(self, 'evse')):
print("deleting evse")
del self.evse
if (hasattr(self, 'pev')):
print("deleting pev")
del self.pev
# self.addToTrace("UserAction " + strAction) # self.addToTrace("UserAction " + strAction)
self.hp.sendTestFrame(strAction) self.hp.sendTestFrame(strAction)