feature: test result evaluation, test communication via UDP Syslog

This commit is contained in:
uhi22 2023-05-16 09:29:42 +02:00
parent 5eb34c63ee
commit 537387ff80
6 changed files with 104 additions and 14 deletions

View file

@ -52,13 +52,6 @@ class fsmEvse():
self.addToTrace("Detected DIN") self.addToTrace("Detected DIN")
# TESTSUITE: When the EVSE received the Handshake, it selects a new test case. # TESTSUITE: When the EVSE received the Handshake, it selects a new test case.
testsuite_choose_testcase() testsuite_choose_testcase()
strTestcase = "TC" + str(testsuite_getTcNumber()) + " "
msg = bytearray(20)
for i in range(0, 20):
msg[i] = ord(strTestcase[i])
self.Tcp.transmit(msg) # Announce the test case number to the pev, so that we see it in the cars log.
# It's not sure that this is a good idea, maybe the unexpected data in the TCP
# confuses the car.
# Eh for encode handshake, SupportedApplicationProtocolResponse # Eh for encode handshake, SupportedApplicationProtocolResponse
msg = addV2GTPHeader(exiEncode("Eh")) msg = addV2GTPHeader(exiEncode("Eh"))
self.addToTrace("responding " + prettyHexMessage(msg)) self.addToTrace("responding " + prettyHexMessage(msg))

View file

@ -8,6 +8,7 @@ import pyPlcTcpSocket
import time # for time.sleep() import time # for time.sleep()
from datetime import datetime from datetime import datetime
from helpers import prettyHexMessage, compactHexMessage, combineValueAndMultiplier from helpers import prettyHexMessage, compactHexMessage, combineValueAndMultiplier
from mytestsuite import *
from exiConnector import * # for EXI data handling/converting from exiConnector import * # for EXI data handling/converting
import json import json
from configmodule import getConfigValue, getConfigValueBool from configmodule import getConfigValue, getConfigValueBool
@ -731,6 +732,7 @@ class fsmPev():
# Finally unlock the connector # Finally unlock the connector
self.addToTrace("Charging successfully finished. Unlocking the connector") self.addToTrace("Charging successfully finished. Unlocking the connector")
self.hardwareInterface.triggerConnectorUnlocking() self.hardwareInterface.triggerConnectorUnlocking()
testsuite_reportstatus("TSRS_ChargingFinished")
self.enterState(stateEnd) self.enterState(stateEnd)
def stateFunctionSequenceTimeout(self): def stateFunctionSequenceTimeout(self):
@ -773,6 +775,7 @@ class fsmPev():
# Finally, when we have no current and no voltage, unlock the connector # Finally, when we have no current and no voltage, unlock the connector
self.addToTrace("Safe-shutdown-sequence: unlocking the connector") self.addToTrace("Safe-shutdown-sequence: unlocking the connector")
self.hardwareInterface.triggerConnectorUnlocking() self.hardwareInterface.triggerConnectorUnlocking()
testsuite_reportstatus("TSRS_SafeShutdownFinished")
# This is the end of the safe-shutdown-sequence. # This is the end of the safe-shutdown-sequence.
self.enterState(stateEnd) self.enterState(stateEnd)

View file

@ -330,7 +330,11 @@ class hardwareInterface():
if (self.simulatedSoc<100): if (self.simulatedSoc<100):
if ((self.outvalue & 2)!=0): if ((self.outvalue & 2)!=0):
# while the relay is closed, simulate increasing SOC # while the relay is closed, simulate increasing SOC
self.simulatedSoc = self.simulatedSoc + 0.01 deltaSoc = 0.5 # how fast the simulated SOC shall rise.
# Examples:
# 0.01 charging needs some minutes, good for light bulb tests
# 0.5 charging needs ~8s, good for automatic test case runs.
self.simulatedSoc = self.simulatedSoc + deltaSoc
if (getConfigValue("digital_output_device")=="dieter"): if (getConfigValue("digital_output_device")=="dieter"):
self.mainfunction_dieter() self.mainfunction_dieter()

View file

@ -9,6 +9,9 @@
# by setting testsuite_testcase_number = TC_MY_TESTCASE_FOR_SOMETHING below, the condition will fire and the fault is injected. # by setting testsuite_testcase_number = TC_MY_TESTCASE_FOR_SOMETHING below, the condition will fire and the fault is injected.
# A number of delay cycles can be configured with testsuite_delayCycles below. # A number of delay cycles can be configured with testsuite_delayCycles below.
from udplog import udplog_log
# The list of test cases. Each must have a unique test case ID. # The list of test cases. Each must have a unique test case ID.
TC_NOTHING_TO_TEST = 0 TC_NOTHING_TO_TEST = 0
TC_EVSE_ResponseCode_SequenceError_for_SessionSetup = 1 TC_EVSE_ResponseCode_SequenceError_for_SessionSetup = 1
@ -47,20 +50,94 @@ def testsuite_faultinjection_is_triggered(context):
isTestcaseFired = testsuite_counter>=testsuite_delayCycles # and fire the test case if the intended number is reached isTestcaseFired = testsuite_counter>=testsuite_delayCycles # and fire the test case if the intended number is reached
if (isTestcaseFired): if (isTestcaseFired):
print("[TESTSUITE] Fired test case " + str(context) + " TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT") print("[TESTSUITE] Fired test case " + str(context) + " TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT")
s = "[TESTSUITE] Fired test case " + str(context)
udplog_log(s, "testsuite")
return isTestcaseFired return isTestcaseFired
def testsuite_choose_testcase(): def testsuite_choose_testcase():
global testsuite_counter, testsuite_testcase_number, testsuite_delayCycles global testsuite_counter, testsuite_testcase_number, testsuite_delayCycles
global testsuite_observedResult
global testsuite_expectedResult
try:
if (testsuite_expectedResult is None):
testsuite_expectedResult = ""
except:
testsuite_expectedResult = ""
# as first step, before choosing the next test case, check the result of the ongoing test case
if (testsuite_expectedResult!=""):
s = "ExpectedResult: " + testsuite_expectedResult
s = s + ", ObservedResult: " + testsuite_observedResult
if (testsuite_expectedResult!=testsuite_observedResult):
s = "FAIL " + s
else:
s = "PASS " + s
print(s)
udplog_log(s, "testsuite")
if (testsuite_testcase_number<TC_EVSE_LastTest): if (testsuite_testcase_number<TC_EVSE_LastTest):
testsuite_testcase_number+=1 testsuite_testcase_number+=1
print("[TESTSUITE] Setting up test case " + str(testsuite_testcase_number) + " TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT") print("[TESTSUITE] Setting up test case " + str(testsuite_testcase_number) + " TTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTTT")
s = "[TESTSUITE] Setting up test case " + str(testsuite_testcase_number)
udplog_log(s, "testsuite")
testsuite_counter = 0 testsuite_counter = 0
testsuite_delayCycles = 5 testsuite_delayCycles = 5 # just a default
testsuite_expectedResult = "" # just a default
testsuite_observedResult = "" # just a default
# For each test case, configure the test parameters and the expected result
if (testsuite_testcase_number == TC_EVSE_Timeout_during_CableCheck): if (testsuite_testcase_number == TC_EVSE_Timeout_during_CableCheck):
testsuite_delayCycles=0 # immediately timeout testsuite_delayCycles=0 # immediately timeout
testsuite_expectedResult = "TSRS_SafeShutdownFinished"
if (testsuite_testcase_number == TC_EVSE_ResponseCode_SequenceError_for_SessionSetup):
testsuite_delayCycles=0 # immediately
testsuite_expectedResult = "TSRS_SafeShutdownFinished"
if (testsuite_testcase_number == TC_EVSE_ResponseCode_SequenceError_for_ServiceDiscoveryRes):
testsuite_delayCycles=0 # immediately
testsuite_expectedResult = "TSRS_SafeShutdownFinished"
if (testsuite_testcase_number == TC_EVSE_ResponseCode_SequenceError_for_ServicePaymentSelectionRes):
testsuite_delayCycles=0 # immediately
testsuite_expectedResult = "TSRS_SafeShutdownFinished"
if (testsuite_testcase_number == TC_EVSE_ResponseCode_SequenceError_for_ContractAuthenticationRes):
testsuite_delayCycles=0 # immediately
testsuite_expectedResult = "TSRS_SafeShutdownFinished"
if (testsuite_testcase_number == TC_EVSE_ResponseCode_ServiceSelectionInvalid_for_ChargeParameterDiscovery):
testsuite_delayCycles=0 # immediately
testsuite_expectedResult = "TSRS_SafeShutdownFinished"
if (testsuite_testcase_number == TC_EVSE_ResponseCode_Failed_for_PowerDeliveryRes):
testsuite_delayCycles=0 # immediately
testsuite_expectedResult = "TSRS_SafeShutdownFinished"
def testsuite_reportstatus(s):
# give the test status to the UDP, to inform the other side and to have it in the network log.
udplog_log(s, "testsuite")
pass
def testsuite_evaluateIpv4Packet(pkt):
# The testsuite listens to syslog messages which are coming from the other side,
# to know what is going on.
global testsuite_observedResult
if (len(pkt)>50):
protocol = pkt[23]
destinationport = pkt[36]*256 + pkt[37]
if ((protocol == 0x11) and (destinationport==0x0202)): # it is an UDP packet to the syslog port
baSyslog = pkt[46:]
strSyslog = ""
syslogLen = len(baSyslog)
if (syslogLen>100):
syslogLen=100
for i in range(0, syslogLen-1): # one less, remove the trailing 0x00
x = baSyslog[i]
if (x<0x20):
x=0x20 # make unprintable character to space.
strSyslog+=chr(x) # convert ASCII code to string
print("[Testsuite] received syslog packet: " + strSyslog)
if (strSyslog[0:5]=="TSRS_"):
# it is a TestSuiteReportStatus message.
testsuite_observedResult = strSyslog
if __name__ == "__main__": if __name__ == "__main__":
print("Testing the mytestsuite") print("Testing the mytestsuite")
print("nothing to do") print("nothing to do")

View file

@ -39,6 +39,7 @@ import udplog
import time import time
from helpers import * # prettyMac etc from helpers import * # prettyMac etc
from pyPlcModes import * from pyPlcModes import *
from mytestsuite import *
from random import random from random import random
MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ] MAC_BROADCAST = [0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF ]
@ -1041,7 +1042,7 @@ class pyPlcHomeplug():
self.showStatus("LISTEN mode", "mode") self.showStatus("LISTEN mode", "mode")
def printToUdp(self, s): def printToUdp(self, s):
self.udplog.log(s) udplog.udplog_log(s)
def __init__(self, callbackAddToTrace=None, callbackShowStatus=None, mode=C_LISTEN_MODE, addrMan=None, connMgr=None, isSimulationMode=0): def __init__(self, callbackAddToTrace=None, callbackShowStatus=None, mode=C_LISTEN_MODE, addrMan=None, connMgr=None, isSimulationMode=0):
self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8') self.mytransmitbuffer = bytearray("Hallo das ist ein Test", 'UTF-8')
@ -1087,8 +1088,8 @@ class pyPlcHomeplug():
self.runningCounter=0 self.runningCounter=0
self.ipv6 = pyPlcIpv6.ipv6handler(self.transmit, self.addressManager, self.connMgr, self.callbackShowStatus) self.ipv6 = pyPlcIpv6.ipv6handler(self.transmit, self.addressManager, self.connMgr, self.callbackShowStatus)
self.ipv6.ownMac = self.myMAC self.ipv6.ownMac = self.myMAC
self.udplog = udplog.udplog(self.transmit, self.addressManager) udplog.udplog_init(self.transmit, self.addressManager)
self.udplog.log("Test message to verify the syslog. pyPlcHomeplug.py is in the init function.") udplog.udplog_log("Test message to verify the syslog. pyPlcHomeplug.py is in the init function.", "initalive")
if (mode == C_LISTEN_MODE): if (mode == C_LISTEN_MODE):
self.enterListenMode() self.enterListenMode()
if (mode == C_EVSE_MODE): if (mode == C_EVSE_MODE):
@ -1115,6 +1116,8 @@ class pyPlcHomeplug():
self.evaluateReceivedHomeplugPacket() self.evaluateReceivedHomeplugPacket()
if (etherType == 0x86dd): # it is an IPv6 frame if (etherType == 0x86dd): # it is an IPv6 frame
self.ipv6.evaluateReceivedPacket(pkt) self.ipv6.evaluateReceivedPacket(pkt)
if (etherType == 0x0800): # it is an IPv4 frame
testsuite_evaluateIpv4Packet(pkt)
def mainfunction(self): def mainfunction(self):
# https://stackoverflow.com/questions/31305712/how-do-i-make-libpcap-pcap-loop-non-blocking # https://stackoverflow.com/questions/31305712/how-do-i-make-libpcap-pcap-loop-non-blocking

View file

@ -9,14 +9,16 @@ class udplog():
self.EthTxFrame[6+i] = macbytearray[i] self.EthTxFrame[6+i] = macbytearray[i]
def log(self, s): def log(self, s, purpose=""):
# return # activate this line to disable the logging completely # return # activate this line to disable the logging completely
# #
# The frame format follows the Syslog protocol, https://en.wikipedia.org/wiki/Syslog # The frame format follows the Syslog protocol, https://en.wikipedia.org/wiki/Syslog
# Level consists of # Level consists of
# Facility = 1 = "user" # Facility = 1 = "user"
# Severity = 7 = "debug" # Severity = 7 = "debug"
return # print("[UDPLOG] Logging " + s)
if (purpose==""):
return # here we filter, which kind of infos we want to provide to the UDP logging.
strLevel="<15>" strLevel="<15>"
# The String to be logged. Terminated by 00. # The String to be logged. Terminated by 00.
strMessage=s+"\0" strMessage=s+"\0"
@ -96,3 +98,11 @@ class udplog():
self.addressManager = addressManager self.addressManager = addressManager
self.ownMac = self.addressManager.getLocalMacAddress() self.ownMac = self.addressManager.getLocalMacAddress()
print("udplog started with ownMac " + prettyMac(self.ownMac)) print("udplog started with ownMac " + prettyMac(self.ownMac))
def udplog_init(transmitCallback, addressManager):
global udplogger
udplogger = udplog(transmitCallback, addressManager) # create a global logger object
def udplog_log(s, purpose=""):
global udplogger
udplogger.log(s, purpose)