feature: reaction on some EVSEStatusCodes. Feature: testsuite and fault injection

This commit is contained in:
uhi22 2023-05-04 08:53:01 +02:00
parent de13253342
commit 837d542329
3 changed files with 68 additions and 0 deletions

View file

@ -7,6 +7,7 @@
import pyPlcTcpSocket import pyPlcTcpSocket
import time # for time.sleep() import time # for time.sleep()
from helpers import prettyHexMessage, combineValueAndMultiplier from helpers import prettyHexMessage, combineValueAndMultiplier
from mytestsuite import *
from random import random from random import random
from exiConnector import * # for EXI data handling/converting from exiConnector import * # for EXI data handling/converting
@ -158,6 +159,9 @@ class fsmEvse():
strPresentVoltage = str(self.simulatedPresentVoltage) # "345" strPresentVoltage = str(self.simulatedPresentVoltage) # "345"
self.callbackShowStatus(strPresentVoltage, "EVSEPresentVoltage") self.callbackShowStatus(strPresentVoltage, "EVSEPresentVoltage")
msg = addV2GTPHeader(exiEncode("EDg_"+strPresentVoltage)) # EDg for Encode, Din, PreChargeResponse msg = addV2GTPHeader(exiEncode("EDg_"+strPresentVoltage)) # EDg for Encode, Din, PreChargeResponse
if (testsuite_faultinjection_is_triggered(TC_EVSE_Shutdown_during_PreCharge)):
# send a PreChargeResponse with StatusCode EVSE_Shutdown, to simulate a user-triggered session stop
msg = addV2GTPHeader("809a02180189551e24fc9e9160004100008182800000")
self.addToTrace("responding " + prettyHexMessage(msg)) self.addToTrace("responding " + prettyHexMessage(msg))
self.publishStatus("PreCharging " + strPresentVoltage) self.publishStatus("PreCharging " + strPresentVoltage)
self.Tcp.transmit(msg) self.Tcp.transmit(msg)
@ -187,6 +191,9 @@ class fsmEvse():
self.callbackShowStatus(strPresentVoltage, "EVSEPresentVoltage") self.callbackShowStatus(strPresentVoltage, "EVSEPresentVoltage")
strEVSEPresentCurrent = "1" # Just as a dummy current strEVSEPresentCurrent = "1" # Just as a dummy current
msg = addV2GTPHeader(exiEncode("EDi_"+strPresentVoltage + "_" + strEVSEPresentCurrent)) # EDi for Encode, Din, CurrentDemandRes msg = addV2GTPHeader(exiEncode("EDi_"+strPresentVoltage + "_" + strEVSEPresentCurrent)) # EDi for Encode, Din, CurrentDemandRes
if (testsuite_faultinjection_is_triggered(TC_EVSE_Malfunction_during_CurrentDemand)):
# send a CurrentDemandResponse with StatusCode EVSE_Malfunction, to simulate e.g. a voltage overshoot
msg = addV2GTPHeader("809a02203fa9e71c31bc920100821b430b933b4b7339032b93937b908e08043000081828440201818000040060a11c06030306402038441380")
self.addToTrace("responding " + prettyHexMessage(msg)) self.addToTrace("responding " + prettyHexMessage(msg))
self.publishStatus("CurrentDemand") self.publishStatus("CurrentDemand")
self.Tcp.transmit(msg) self.Tcp.transmit(msg)

View file

@ -95,6 +95,8 @@ class fsmPev():
s = "WaitForSessionStopResponse" s = "WaitForSessionStopResponse"
if (statenumber == stateChargingFinished): if (statenumber == stateChargingFinished):
s = "ChargingFinished" s = "ChargingFinished"
if (statenumber == stateUnrecoverableError):
s = "UnrecoverableError"
if (statenumber == stateSequenceTimeout): if (statenumber == stateSequenceTimeout):
s = "SequenceTimeout" s = "SequenceTimeout"
return s return s
@ -393,15 +395,22 @@ class fsmPev():
self.addToTrace(strConverterResult) self.addToTrace(strConverterResult)
if (strConverterResult.find("PreChargeRes")>0): if (strConverterResult.find("PreChargeRes")>0):
u = 0 # a default voltage of 0V in case we cannot convert the actual value u = 0 # a default voltage of 0V in case we cannot convert the actual value
strEVSEStatusCode = "0" # default in case the decoding does not work
try: try:
y = json.loads(strConverterResult) y = json.loads(strConverterResult)
strEVSEPresentVoltageValue = y["EVSEPresentVoltage.Value"] strEVSEPresentVoltageValue = y["EVSEPresentVoltage.Value"]
strEVSEPresentVoltageMultiplier = y["EVSEPresentVoltage.Multiplier"] strEVSEPresentVoltageMultiplier = y["EVSEPresentVoltage.Multiplier"]
u = combineValueAndMultiplier(strEVSEPresentVoltageValue, strEVSEPresentVoltageMultiplier) u = combineValueAndMultiplier(strEVSEPresentVoltageValue, strEVSEPresentVoltageMultiplier)
self.callbackShowStatus(format(u,".1f"), "EVSEPresentVoltage") self.callbackShowStatus(format(u,".1f"), "EVSEPresentVoltage")
strEVSEStatusCode = y["DC_EVSEStatus.EVSEStatusCode"]
except: except:
self.addToTrace("ERROR: Could not decode the PreChargeResponse") self.addToTrace("ERROR: Could not decode the PreChargeResponse")
self.addToTrace("PreChargeResponse received.") self.addToTrace("PreChargeResponse received.")
if (strEVSEStatusCode=="2"):
self.addToTrace("EVSE_Shutdown. Seems the user canceled the charging on the charger.")
self.publishStatus("EVSE_Shutdown")
self.enterState(stateUnrecoverableError)
return
if (getConfigValueBool("use_evsepresentvoltage_for_precharge_end")): if (getConfigValueBool("use_evsepresentvoltage_for_precharge_end")):
# We want to use the EVSEPresentVoltage, which was reported by the charger, as end-criteria for the precharging. # We want to use the EVSEPresentVoltage, which was reported by the charger, as end-criteria for the precharging.
s = "EVSEPresentVoltage " + str(u) + "V, " s = "EVSEPresentVoltage " + str(u) + "V, "
@ -491,14 +500,26 @@ class fsmPev():
self.addToTrace(strConverterResult) self.addToTrace(strConverterResult)
if (strConverterResult.find("CurrentDemandRes")>0): if (strConverterResult.find("CurrentDemandRes")>0):
u = 0 # a default voltage of 0V in case we cannot convert the actual value u = 0 # a default voltage of 0V in case we cannot convert the actual value
strEVSEStatusCode = "0" # default in case the decoding does not work
try: try:
y = json.loads(strConverterResult) y = json.loads(strConverterResult)
strEVSEPresentVoltageValue = y["EVSEPresentVoltage.Value"] strEVSEPresentVoltageValue = y["EVSEPresentVoltage.Value"]
strEVSEPresentVoltageMultiplier = y["EVSEPresentVoltage.Multiplier"] strEVSEPresentVoltageMultiplier = y["EVSEPresentVoltage.Multiplier"]
u = combineValueAndMultiplier(strEVSEPresentVoltageValue, strEVSEPresentVoltageMultiplier) u = combineValueAndMultiplier(strEVSEPresentVoltageValue, strEVSEPresentVoltageMultiplier)
self.callbackShowStatus(format(u,".1f"), "EVSEPresentVoltage") self.callbackShowStatus(format(u,".1f"), "EVSEPresentVoltage")
strEVSEStatusCode = y["DC_EVSEStatus.EVSEStatusCode"]
except: except:
self.addToTrace("ERROR: Could not decode the PreChargeResponse") self.addToTrace("ERROR: Could not decode the PreChargeResponse")
if (strEVSEStatusCode=="2"):
self.addToTrace("EVSE_Shutdown. Seems the user canceled the charging on the charger.")
self.publishStatus("EVSE_Shutdown")
self.enterState(stateUnrecoverableError)
return
if (strEVSEStatusCode=="6"):
self.addToTrace("EVSE_Malfunction. Seems the charger detected a problem.")
self.publishStatus("EVSE_Malfunction")
self.enterState(stateUnrecoverableError)
return
if (getConfigValueBool("use_physical_inlet_voltage_during_chargeloop")): if (getConfigValueBool("use_physical_inlet_voltage_during_chargeloop")):
# Instead of using the voltage which is reported by the charger, use the physically measured. # Instead of using the voltage which is reported by the charger, use the physically measured.
u = self.hardwareInterface.getInletVoltage() u = self.hardwareInterface.getInletVoltage()

40
mytestsuite.py Normal file
View file

@ -0,0 +1,40 @@
# For testing.
#
# Concept: This module allows to trigger abnormal situations, to test the reaction of the software ("fault insertion testing").
# In the place in the software, where the fault shall be injected, add a condition like
# if (testsuite_faultinjection_is_triggered(TC_MY_TESTCASE_FOR_SOMETHING)):
# DoSomethingStrange()
# In normal software run, this condition is never fulfilled and does not disturb. If the related test case is activated,
# by setting testsuite_testcase_number = TC_MY_TESTCASE_FOR_SOMETHING below, the condition will fire and the fault is injected.
# A number of delay cycles can be configured with testsuite_delayCycles below.
# The list of test cases. Each must have a unique test case ID.
TC_NOTHING_TO_TEST = 0
TC_EVSE_Shutdown_during_PreCharge = 1000
TC_EVSE_Shutdown_during_CurrentDemand = 2000
TC_EVSE_Malfunction_during_CurrentDemand = 2001
# Here we configure, which test case should fire, and after which number of calls:
testsuite_testcase_number = TC_EVSE_Malfunction_during_CurrentDemand
testsuite_delayCycles = 5
# Counter variable for delaying the trigger
testsuite_counter = 0
def testsuite_faultinjection_is_triggered(context):
global testsuite_counter, testsuite_testcase_number, testsuite_delayCycles
isTestcaseFired = False
if (context==testsuite_testcase_number): # if the call context is matching the intended test case
testsuite_counter += 1 # count the number of matching calls
isTestcaseFired = testsuite_counter>=testsuite_delayCycles # and fire the test case if the intended number is reached
if (isTestcaseFired):
print("[TESTSUITE] Fired test case " + str(context) + " TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT")
return isTestcaseFired
if __name__ == "__main__":
print("Testing the mytestsuite")
print("nothing to do")