From 837d54232933efc51f78e15c03c037777a190c58 Mon Sep 17 00:00:00 2001 From: uhi22 Date: Thu, 4 May 2023 08:53:01 +0200 Subject: [PATCH] feature: reaction on some EVSEStatusCodes. Feature: testsuite and fault injection --- fsmEvse.py | 7 +++++++ fsmPev.py | 21 +++++++++++++++++++++ mytestsuite.py | 40 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+) create mode 100644 mytestsuite.py diff --git a/fsmEvse.py b/fsmEvse.py index 91725ff..5d9605c 100644 --- a/fsmEvse.py +++ b/fsmEvse.py @@ -7,6 +7,7 @@ import pyPlcTcpSocket import time # for time.sleep() from helpers import prettyHexMessage, combineValueAndMultiplier +from mytestsuite import * from random import random from exiConnector import * # for EXI data handling/converting @@ -158,6 +159,9 @@ class fsmEvse(): strPresentVoltage = str(self.simulatedPresentVoltage) # "345" self.callbackShowStatus(strPresentVoltage, "EVSEPresentVoltage") msg = addV2GTPHeader(exiEncode("EDg_"+strPresentVoltage)) # EDg for Encode, Din, PreChargeResponse + if (testsuite_faultinjection_is_triggered(TC_EVSE_Shutdown_during_PreCharge)): + # send a PreChargeResponse with StatusCode EVSE_Shutdown, to simulate a user-triggered session stop + msg = addV2GTPHeader("809a02180189551e24fc9e9160004100008182800000") self.addToTrace("responding " + prettyHexMessage(msg)) self.publishStatus("PreCharging " + strPresentVoltage) self.Tcp.transmit(msg) @@ -187,6 +191,9 @@ class fsmEvse(): self.callbackShowStatus(strPresentVoltage, "EVSEPresentVoltage") strEVSEPresentCurrent = "1" # Just as a dummy current msg = addV2GTPHeader(exiEncode("EDi_"+strPresentVoltage + "_" + strEVSEPresentCurrent)) # EDi for Encode, Din, CurrentDemandRes + if (testsuite_faultinjection_is_triggered(TC_EVSE_Malfunction_during_CurrentDemand)): + # send a CurrentDemandResponse with StatusCode EVSE_Malfunction, to simulate e.g. a voltage overshoot + msg = addV2GTPHeader("809a02203fa9e71c31bc920100821b430b933b4b7339032b93937b908e08043000081828440201818000040060a11c06030306402038441380") self.addToTrace("responding " + prettyHexMessage(msg)) self.publishStatus("CurrentDemand") self.Tcp.transmit(msg) diff --git a/fsmPev.py b/fsmPev.py index 828630c..4ac53d2 100644 --- a/fsmPev.py +++ b/fsmPev.py @@ -95,6 +95,8 @@ class fsmPev(): s = "WaitForSessionStopResponse" if (statenumber == stateChargingFinished): s = "ChargingFinished" + if (statenumber == stateUnrecoverableError): + s = "UnrecoverableError" if (statenumber == stateSequenceTimeout): s = "SequenceTimeout" return s @@ -393,15 +395,22 @@ class fsmPev(): self.addToTrace(strConverterResult) if (strConverterResult.find("PreChargeRes")>0): u = 0 # a default voltage of 0V in case we cannot convert the actual value + strEVSEStatusCode = "0" # default in case the decoding does not work try: y = json.loads(strConverterResult) strEVSEPresentVoltageValue = y["EVSEPresentVoltage.Value"] strEVSEPresentVoltageMultiplier = y["EVSEPresentVoltage.Multiplier"] u = combineValueAndMultiplier(strEVSEPresentVoltageValue, strEVSEPresentVoltageMultiplier) self.callbackShowStatus(format(u,".1f"), "EVSEPresentVoltage") + strEVSEStatusCode = y["DC_EVSEStatus.EVSEStatusCode"] except: self.addToTrace("ERROR: Could not decode the PreChargeResponse") self.addToTrace("PreChargeResponse received.") + if (strEVSEStatusCode=="2"): + self.addToTrace("EVSE_Shutdown. Seems the user canceled the charging on the charger.") + self.publishStatus("EVSE_Shutdown") + self.enterState(stateUnrecoverableError) + return if (getConfigValueBool("use_evsepresentvoltage_for_precharge_end")): # We want to use the EVSEPresentVoltage, which was reported by the charger, as end-criteria for the precharging. s = "EVSEPresentVoltage " + str(u) + "V, " @@ -491,14 +500,26 @@ class fsmPev(): self.addToTrace(strConverterResult) if (strConverterResult.find("CurrentDemandRes")>0): u = 0 # a default voltage of 0V in case we cannot convert the actual value + strEVSEStatusCode = "0" # default in case the decoding does not work try: y = json.loads(strConverterResult) strEVSEPresentVoltageValue = y["EVSEPresentVoltage.Value"] strEVSEPresentVoltageMultiplier = y["EVSEPresentVoltage.Multiplier"] u = combineValueAndMultiplier(strEVSEPresentVoltageValue, strEVSEPresentVoltageMultiplier) self.callbackShowStatus(format(u,".1f"), "EVSEPresentVoltage") + strEVSEStatusCode = y["DC_EVSEStatus.EVSEStatusCode"] except: self.addToTrace("ERROR: Could not decode the PreChargeResponse") + if (strEVSEStatusCode=="2"): + self.addToTrace("EVSE_Shutdown. Seems the user canceled the charging on the charger.") + self.publishStatus("EVSE_Shutdown") + self.enterState(stateUnrecoverableError) + return + if (strEVSEStatusCode=="6"): + self.addToTrace("EVSE_Malfunction. Seems the charger detected a problem.") + self.publishStatus("EVSE_Malfunction") + self.enterState(stateUnrecoverableError) + return if (getConfigValueBool("use_physical_inlet_voltage_during_chargeloop")): # Instead of using the voltage which is reported by the charger, use the physically measured. u = self.hardwareInterface.getInletVoltage() diff --git a/mytestsuite.py b/mytestsuite.py new file mode 100644 index 0000000..796db50 --- /dev/null +++ b/mytestsuite.py @@ -0,0 +1,40 @@ + +# For testing. +# +# Concept: This module allows to trigger abnormal situations, to test the reaction of the software ("fault insertion testing"). +# In the place in the software, where the fault shall be injected, add a condition like +# if (testsuite_faultinjection_is_triggered(TC_MY_TESTCASE_FOR_SOMETHING)): +# DoSomethingStrange() +# In normal software run, this condition is never fulfilled and does not disturb. If the related test case is activated, +# by setting testsuite_testcase_number = TC_MY_TESTCASE_FOR_SOMETHING below, the condition will fire and the fault is injected. +# A number of delay cycles can be configured with testsuite_delayCycles below. + +# The list of test cases. Each must have a unique test case ID. +TC_NOTHING_TO_TEST = 0 +TC_EVSE_Shutdown_during_PreCharge = 1000 +TC_EVSE_Shutdown_during_CurrentDemand = 2000 +TC_EVSE_Malfunction_during_CurrentDemand = 2001 + +# Here we configure, which test case should fire, and after which number of calls: +testsuite_testcase_number = TC_EVSE_Malfunction_during_CurrentDemand +testsuite_delayCycles = 5 + + + +# Counter variable for delaying the trigger +testsuite_counter = 0 + +def testsuite_faultinjection_is_triggered(context): + global testsuite_counter, testsuite_testcase_number, testsuite_delayCycles + isTestcaseFired = False + if (context==testsuite_testcase_number): # if the call context is matching the intended test case + testsuite_counter += 1 # count the number of matching calls + isTestcaseFired = testsuite_counter>=testsuite_delayCycles # and fire the test case if the intended number is reached + if (isTestcaseFired): + print("[TESTSUITE] Fired test case " + str(context) + " TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT") + return isTestcaseFired + + +if __name__ == "__main__": + print("Testing the mytestsuite") + print("nothing to do")