mirror of
https://github.com/uhi22/pyPLC.git
synced 2024-11-10 01:05:42 +00:00
feature: EVSE stimulates a set of testcases for the PEV.
This commit is contained in:
parent
3620b7c4f9
commit
6c1703b1a9
5 changed files with 57 additions and 11 deletions
21
fsmEvse.py
21
fsmEvse.py
|
@ -50,6 +50,15 @@ class fsmEvse():
|
|||
if (strConverterResult.find("ProtocolNamespace=urn:din")>0):
|
||||
# todo: of course we should care for schemaID and prio also here
|
||||
self.addToTrace("Detected DIN")
|
||||
# TESTSUITE: When the EVSE received the Handshake, it selects a new test case.
|
||||
testsuite_choose_testcase()
|
||||
strTestcase = "TC" + str(testsuite_getTcNumber()) + " "
|
||||
msg = bytearray(20)
|
||||
for i in range(0, 20):
|
||||
msg[i] = ord(strTestcase[i])
|
||||
self.Tcp.transmit(msg) # Announce the test case number to the pev, so that we see it in the cars log.
|
||||
# It's not sure that this is a good idea, maybe the unexpected data in the TCP
|
||||
# confuses the car.
|
||||
# Eh for encode handshake, SupportedApplicationProtocolResponse
|
||||
msg = addV2GTPHeader(exiEncode("Eh"))
|
||||
self.addToTrace("responding " + prettyHexMessage(msg))
|
||||
|
@ -135,7 +144,8 @@ class fsmEvse():
|
|||
msg = addV2GTPHeader(exiEncode("EDf")) # EDf for Encode, Din, CableCheckResponse
|
||||
self.addToTrace("responding " + prettyHexMessage(msg))
|
||||
self.publishStatus("CableCheck")
|
||||
self.Tcp.transmit(msg)
|
||||
if (not testsuite_faultinjection_is_triggered(TC_EVSE_Timeout_during_CableCheck)):
|
||||
self.Tcp.transmit(msg)
|
||||
self.enterState(stateWaitForFlexibleRequest) # todo: not clear, what is specified in DIN
|
||||
if (strConverterResult.find("PreChargeReq")>0):
|
||||
# check the request content, and fill response parameters
|
||||
|
@ -164,7 +174,8 @@ class fsmEvse():
|
|||
msg = addV2GTPHeader("809a02180189551e24fc9e9160004100008182800000")
|
||||
self.addToTrace("responding " + prettyHexMessage(msg))
|
||||
self.publishStatus("PreCharging " + strPresentVoltage)
|
||||
self.Tcp.transmit(msg)
|
||||
if (not testsuite_faultinjection_is_triggered(TC_EVSE_Timeout_during_PreCharge)):
|
||||
self.Tcp.transmit(msg)
|
||||
self.enterState(stateWaitForFlexibleRequest) # todo: not clear, what is specified in DIN
|
||||
if (strConverterResult.find("ContractAuthenticationReq")>0):
|
||||
# todo: check the request content, and fill response parameters
|
||||
|
@ -194,9 +205,13 @@ class fsmEvse():
|
|||
if (testsuite_faultinjection_is_triggered(TC_EVSE_Malfunction_during_CurrentDemand)):
|
||||
# send a CurrentDemandResponse with StatusCode EVSE_Malfunction, to simulate e.g. a voltage overshoot
|
||||
msg = addV2GTPHeader("809a02203fa9e71c31bc920100821b430b933b4b7339032b93937b908e08043000081828440201818000040060a11c06030306402038441380")
|
||||
if (testsuite_faultinjection_is_triggered(TC_EVSE_Shutdown_during_CurrentDemand)):
|
||||
# send a CurrentDemandResponse with StatusCode EVSE_Shutdown, to simulate a user stop request
|
||||
msg = addV2GTPHeader("809a0125e15c2cd0e000410000018280001818000000040a1b648030300002038486580800")
|
||||
self.addToTrace("responding " + prettyHexMessage(msg))
|
||||
self.publishStatus("CurrentDemand")
|
||||
self.Tcp.transmit(msg)
|
||||
if (not testsuite_faultinjection_is_triggered(TC_EVSE_Timeout_during_CurrentDemand)):
|
||||
self.Tcp.transmit(msg)
|
||||
self.enterState(stateWaitForFlexibleRequest) # todo: not clear, what is specified in DIN
|
||||
if (strConverterResult.find("WeldingDetectionReq")>0):
|
||||
# todo: check the request content, and fill response parameters
|
||||
|
|
|
@ -233,6 +233,10 @@ class fsmPev():
|
|||
def stateFunctionWaitForSupportedApplicationProtocolResponse(self):
|
||||
if (len(self.rxData)>0):
|
||||
self.addToTrace("In state WaitForSupportedApplicationProtocolResponse, received " + prettyHexMessage(self.rxData))
|
||||
if ((self.rxData[0]!=0x01) or (self.rxData[1]!=0xFE)):
|
||||
# it is no EXI data. Print it to log, it could be a TESTSUITE notification.
|
||||
self.addToTrace("TESTSUITE notification. Seems we are running a test case. TTTTTTTTTTTTTTTTTTTTTTT")
|
||||
return
|
||||
exidata = removeV2GTPHeader(self.rxData)
|
||||
self.rxData = []
|
||||
strConverterResult = self.exiDecode(exidata, "Dh") # Decode Handshake-response
|
||||
|
|
|
@ -11,19 +11,25 @@
|
|||
|
||||
# The list of test cases. Each must have a unique test case ID.
|
||||
TC_NOTHING_TO_TEST = 0
|
||||
TC_EVSE_Shutdown_during_PreCharge = 1000
|
||||
TC_EVSE_Shutdown_during_CurrentDemand = 2000
|
||||
TC_EVSE_Malfunction_during_CurrentDemand = 2001
|
||||
|
||||
# Here we configure, which test case should fire, and after which number of calls:
|
||||
testsuite_testcase_number = TC_EVSE_Malfunction_during_CurrentDemand
|
||||
testsuite_delayCycles = 200
|
||||
TC_EVSE_Timeout_during_CableCheck = 1
|
||||
TC_EVSE_Timeout_during_PreCharge = 2
|
||||
TC_EVSE_Shutdown_during_PreCharge = 3
|
||||
TC_EVSE_Shutdown_during_CurrentDemand = 4
|
||||
TC_EVSE_Malfunction_during_CurrentDemand = 5
|
||||
TC_EVSE_Timeout_during_CurrentDemand = 6
|
||||
TC_EVSE_LastTest = 7
|
||||
|
||||
|
||||
# variables
|
||||
testsuite_testcase_number = 0
|
||||
testsuite_delayCycles = 0
|
||||
|
||||
# Counter variable for delaying the trigger
|
||||
testsuite_counter = 0
|
||||
|
||||
def testsuite_getTcNumber():
|
||||
return testsuite_testcase_number
|
||||
|
||||
def testsuite_faultinjection_is_triggered(context):
|
||||
global testsuite_counter, testsuite_testcase_number, testsuite_delayCycles
|
||||
isTestcaseFired = False
|
||||
|
@ -35,6 +41,17 @@ def testsuite_faultinjection_is_triggered(context):
|
|||
return isTestcaseFired
|
||||
|
||||
|
||||
def testsuite_choose_testcase():
|
||||
global testsuite_counter, testsuite_testcase_number, testsuite_delayCycles
|
||||
if (testsuite_testcase_number<TC_EVSE_LastTest):
|
||||
testsuite_testcase_number+=1
|
||||
print("[TESTSUITE] Setting up test case " + str(testsuite_testcase_number) + " TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT")
|
||||
testsuite_counter = 0
|
||||
testsuite_delayCycles = 5
|
||||
if (testsuite_testcase_number == TC_EVSE_Timeout_during_CableCheck):
|
||||
testsuite_delayCycles=0 # immediately timeout
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Testing the mytestsuite")
|
||||
print("nothing to do")
|
||||
|
|
9
pyPlc.py
9
pyPlc.py
|
@ -12,6 +12,7 @@ import pyPlcWorker
|
|||
from pyPlcModes import *
|
||||
import sys # for argv
|
||||
from configmodule import getConfigValue, getConfigValueBool
|
||||
from mytestsuite import *
|
||||
|
||||
startTime_ms = round(time.time()*1000)
|
||||
|
||||
|
@ -113,6 +114,11 @@ lblUInlet.pack()
|
|||
lblEVSEPresentVoltage = tk.Label(root, text="(EVSEPresentVoltage)")
|
||||
lblEVSEPresentVoltage.config(font=('Helvetica bold', 16))
|
||||
lblEVSEPresentVoltage.pack()
|
||||
|
||||
if (myMode == C_EVSE_MODE):
|
||||
lblTestcase = tk.Label(root, text="(test case)")
|
||||
lblTestcase.pack()
|
||||
|
||||
lblMode = tk.Label(root, text="(mode)")
|
||||
lblMode.pack()
|
||||
|
||||
|
@ -133,6 +139,9 @@ while lastKey!="x":
|
|||
# print(str(nMainloops) + " " + str(nKeystrokes)) # show something in the console window
|
||||
root.update()
|
||||
worker.mainfunction()
|
||||
if (myMode == C_EVSE_MODE):
|
||||
lblTestcase['text']= "Testcase " + str(testsuite_getTcNumber())
|
||||
|
||||
del(worker)
|
||||
|
||||
#---------------------------------------------------------------
|
||||
|
|
|
@ -79,7 +79,8 @@ class pyPlcWorker():
|
|||
self.hp.mainfunction() # call the lower-level workers
|
||||
self.hardwareInterface.mainfunction()
|
||||
if (self.mode == C_EVSE_MODE):
|
||||
self.evse.mainfunction() # call the evse state machine
|
||||
if (self.nMainFunctionCalls>8*33): # ugly. Wait with EVSE high level handling, until the modem restarted.
|
||||
self.evse.mainfunction() # call the evse state machine
|
||||
if (self.mode == C_PEV_MODE):
|
||||
self.pev.mainfunction() # call the pev state machine
|
||||
|
||||
|
|
Loading…
Reference in a new issue