feature: first steps for ISO1

This commit is contained in:
uhi22 2024-05-16 09:36:28 +02:00
parent 4274e8b8a1
commit e4bff44e80

View file

@ -63,6 +63,7 @@ class fsmEvse():
self.addToTrace(strConverterResult) self.addToTrace(strConverterResult)
if (strConverterResult.find("supportedAppProtocolReq")>0): if (strConverterResult.find("supportedAppProtocolReq")>0):
nDinSchemaID = 255 # invalid default value nDinSchemaID = 255 # invalid default value
nIso1SchemaID = 255
try: try:
jsondict = json.loads(strConverterResult) jsondict = json.loads(strConverterResult)
nAppProtocol_ArrayLen = int(jsondict["AppProtocol_arrayLen"]) nAppProtocol_ArrayLen = int(jsondict["AppProtocol_arrayLen"])
@ -73,8 +74,16 @@ class fsmEvse():
self.addToTrace("The NameSpace " + strNameSpace + " has SchemaID " + str(nSchemaId)) self.addToTrace("The NameSpace " + strNameSpace + " has SchemaID " + str(nSchemaId))
if (strNameSpace.find(":din:70121:")>0): if (strNameSpace.find(":din:70121:")>0):
nDinSchemaID = nSchemaId nDinSchemaID = nSchemaId
if (strNameSpace.find(":iso:15118:2:2013")>0):
nIso1SchemaID = nSchemaId
except: except:
self.addToTrace("ERROR: Could not decode the supportedAppProtocolReq") self.addToTrace("ERROR: Could not decode the supportedAppProtocolReq")
# Strategy for schema selection: pyPLC preferes DIN. If the car does not announce DIN,
# then pyPLC looks for ISO1. If this is also not announced by the car, pyPLC will not
# send a handshake response.
# This means: pyPLC does NOT care for the priority sent by the car. It uses the own
# priority "DIN over ISO1". Reason: DIN is proven-in-use, the ISO implementation still
# work-in-progress.
if (nDinSchemaID<255): if (nDinSchemaID<255):
self.addToTrace("Detected DIN") self.addToTrace("Detected DIN")
# TESTSUITE: When the EVSE received the Handshake, it selects a new test case. # TESTSUITE: When the EVSE received the Handshake, it selects a new test case.
@ -84,9 +93,22 @@ class fsmEvse():
self.addToTrace("responding " + prettyHexMessage(msg)) self.addToTrace("responding " + prettyHexMessage(msg))
self.Tcp.transmit(msg) self.Tcp.transmit(msg)
self.publishStatus("Schema negotiated") self.publishStatus("Schema negotiated")
self.schemaSelection = "D" # D for DIN
self.enterState(stateWaitForSessionSetupRequest) self.enterState(stateWaitForSessionSetupRequest)
else: else:
self.addToTrace("Error: The connected car does not support DIN. At the moment, the pyPLC only supports DIN.") if (nIso1SchemaID<255):
self.addToTrace("Detected ISO1 (aka ISO 2013)")
# TESTSUITE: When the EVSE received the Handshake, it selects a new test case.
testsuite_choose_testcase()
# Eh for encode handshake, SupportedApplicationProtocolResponse, with SchemaID as parameter
msg = addV2GTPHeader(exiEncode("Eh__"+str(nIso1SchemaID)))
self.addToTrace("responding " + prettyHexMessage(msg))
self.Tcp.transmit(msg)
self.publishStatus("Schema negotiated")
self.schemaSelection = "1" # 1 for ISO1
self.enterState(stateWaitForSessionSetupRequest)
else:
self.addToTrace("Error: The connected car does not support DIN or ISO1. At the moment, the pyPLC only supports DIN and ISO1.")
def stateFunctionWaitForSessionSetupRequest(self): def stateFunctionWaitForSessionSetupRequest(self):
if (len(self.rxData)>0): if (len(self.rxData)>0):
@ -94,11 +116,11 @@ class fsmEvse():
self.addToTrace("In state WaitForSessionSetupRequest, received " + prettyHexMessage(self.rxData)) self.addToTrace("In state WaitForSessionSetupRequest, received " + prettyHexMessage(self.rxData))
exidata = removeV2GTPHeader(self.rxData) exidata = removeV2GTPHeader(self.rxData)
self.rxData = [] self.rxData = []
strConverterResult = exiDecode(exidata, "DD") strConverterResult = exiDecode(exidata, "D"+self.schemaSelection) # decodes DIN or ISO1
self.addToTrace(strConverterResult) self.addToTrace(strConverterResult)
if (strConverterResult.find("SessionSetupReq")>0): if (strConverterResult.find("SessionSetupReq")>0):
# todo: check the request content, and fill response parameters # todo: check the request content, and fill response parameters
msg = addV2GTPHeader(exiEncode("EDa")) # EDa for Encode, Din, SessionSetupResponse msg = addV2GTPHeader(exiEncode("E"+self.schemaSelection+"a")) # EDa for Encode, Din, SessionSetupResponse
if (testsuite_faultinjection_is_triggered(TC_EVSE_ResponseCode_SequenceError_for_SessionSetup)): if (testsuite_faultinjection_is_triggered(TC_EVSE_ResponseCode_SequenceError_for_SessionSetup)):
# send a SessionSetupResponse with Responsecode SequenceError # send a SessionSetupResponse with Responsecode SequenceError
msg = addV2GTPHeader("809a0232417b661514a4cb91e0A02d0691559529548c0841e0fc1af4507460c0") msg = addV2GTPHeader("809a0232417b661514a4cb91e0A02d0691559529548c0841e0fc1af4507460c0")
@ -118,7 +140,7 @@ class fsmEvse():
self.addToTrace("In state WaitForServiceDiscoveryRequest, received " + prettyHexMessage(self.rxData)) self.addToTrace("In state WaitForServiceDiscoveryRequest, received " + prettyHexMessage(self.rxData))
exidata = removeV2GTPHeader(self.rxData) exidata = removeV2GTPHeader(self.rxData)
self.rxData = [] self.rxData = []
strConverterResult = exiDecode(exidata, "DD") strConverterResult = exiDecode(exidata, "D"+self.schemaSelection) # decodes DIN or ISO1
self.addToTrace(strConverterResult) self.addToTrace(strConverterResult)
if (strConverterResult.find("ServiceDiscoveryReq")>0): if (strConverterResult.find("ServiceDiscoveryReq")>0):
# todo: check the request content, and fill response parameters # todo: check the request content, and fill response parameters
@ -139,7 +161,7 @@ class fsmEvse():
self.addToTrace("In state WaitForServicePaymentSelectionRequest, received " + prettyHexMessage(self.rxData)) self.addToTrace("In state WaitForServicePaymentSelectionRequest, received " + prettyHexMessage(self.rxData))
exidata = removeV2GTPHeader(self.rxData) exidata = removeV2GTPHeader(self.rxData)
self.rxData = [] self.rxData = []
strConverterResult = exiDecode(exidata, "DD") strConverterResult = exiDecode(exidata, "D"+self.schemaSelection) # decodes DIN or ISO1
self.addToTrace(strConverterResult) self.addToTrace(strConverterResult)
if (strConverterResult.find("ServicePaymentSelectionReq")>0): if (strConverterResult.find("ServicePaymentSelectionReq")>0):
# todo: check the request content, and fill response parameters # todo: check the request content, and fill response parameters
@ -160,7 +182,7 @@ class fsmEvse():
self.addToTrace("In state WaitForFlexibleRequest, received " + prettyHexMessage(self.rxData)) self.addToTrace("In state WaitForFlexibleRequest, received " + prettyHexMessage(self.rxData))
exidata = removeV2GTPHeader(self.rxData) exidata = removeV2GTPHeader(self.rxData)
self.rxData = [] self.rxData = []
strConverterResult = exiDecode(exidata, "DD") strConverterResult = exiDecode(exidata, "D"+self.schemaSelection) # decodes DIN or ISO1
self.addToTrace(strConverterResult) self.addToTrace(strConverterResult)
if (strConverterResult.find("PowerDeliveryReq")>0): if (strConverterResult.find("PowerDeliveryReq")>0):
# todo: check the request content, and fill response parameters # todo: check the request content, and fill response parameters