evaluate json from converter. Lean debug output. State machine extended.

This commit is contained in:
uhi22 2022-11-08 12:01:54 +01:00
parent 99fe012d87
commit 73b2ec1a70
5 changed files with 76 additions and 23 deletions

View file

@ -42,6 +42,7 @@ from helpers import showAsHex, twoCharHex
import subprocess
import sys
import time
import json
# Example data:
# (1) From the Ioniq:
@ -89,11 +90,11 @@ def exiByteArrayToHex(b):
return s
def addV2GTPHeader(exidata):
print("type is " + str(type(exidata)))
#print("type is " + str(type(exidata)))
if (str(type(exidata)) == "<class 'str'>"):
print("changing type to bytearray")
#print("changing type to bytearray")
exidata = exiHexToByteArray(exidata)
print("type is " + str(type(exidata)))
#print("type is " + str(type(exidata)))
# takes the bytearray with exidata, and adds a header to it, according to the Vehicle-to-Grid-Transport-Protocol
exiLen = len(exidata)
header = bytearray(8) # V2GTP header has 8 bytes
@ -119,14 +120,14 @@ def exiDecode(exiHex, prefix="DH"):
# input: exi data. Either hexstring, or bytearray or bytes
# prefix to select the schema
# if the input is a byte array, we convert it into hex string. If it is already a hex string, we take it as it is.
print("type is " + str(type(exiHex)))
#print("type is " + str(type(exiHex)))
if (str(type(exiHex)) == "<class 'bytearray'>"):
print("changing type to hex string")
#print("changing type to hex string")
exiHex = exiByteArrayToHex(exiHex)
if (str(type(exiHex)) == "<class 'bytes'>"):
print("changing type to hex string")
#print("changing type to hex string")
exiHex = exiByteArrayToHex(exiHex)
print("type is " + str(type(exiHex)))
#print("type is " + str(type(exiHex)))
param1 = prefix + exiHex # DH for decode handshake
print("exiDecode: trying to decode " + exiHex + " with schema " + prefix)
result = subprocess.run(
@ -139,12 +140,28 @@ def exiDecode(exiHex, prefix="DH"):
def exiEncode(strMessageName, params=""):
# todo: handle the schema, the message name and the parameters
param1 = "EH1" # EH for encode handshake, SupportedApplicationProtocolResponse
# param1 = "Eh" # Eh for encode handshake, SupportedApplicationProtocolResponse
# param1 = "EDa" # EDa for Encode, Din, SessionSetupResponse
param1 = strMessageName
result = subprocess.run([pathToOpenV2GExe, param1], capture_output=True, text=True)
print("exiEncode stdout:", result.stdout)
if (len(result.stderr)>0):
print("exiEncode ERROR. stderr:" + result.stderr)
strConverterResult = result.stdout
strConverterResult = "exiEncode ERROR. stderr:" + result.stderr
print(strConverterResult)
else:
print("exiEncode stdout:", result.stdout)
# Now we have an encoder result in json form, something like:
# {
# "info": "",
# "error": "",
# "result": "8004440400"
# }
try:
y = json.loads(result.stdout)
strConverterResult = y["result"]
print("strConverterResult is " + str(strConverterResult))
except:
strConverterResult = "exiEncode failed to convert json to dict."
print(strConverterResult)
return strConverterResult

View file

@ -6,6 +6,7 @@
import pyPlcTcpSocket
import time # for time.sleep()
from helpers import prettyHexMessage
from exiConnector import * # for EXI data handling/converting
stateWaitForSupportedApplicationProtocolRequest = 0
@ -26,29 +27,39 @@ class fsmEvse():
def isTooLong(self):
# The timeout handling function.
return (self.cyclesInState > 20)
return (self.cyclesInState > 50)
def stateFunctionWaitForSupportedApplicationProtocolRequest(self):
if (len(self.rxData)>0):
print("received " + str(self.rxData))
print("In state WaitForSupportedApplicationProtocolRequest, received " + prettyHexMessage(self.rxData))
exidata = removeV2GTPHeader(self.rxData)
print("received exi " + str(exidata))
print("received exi" + prettyHexMessage(exidata))
self.rxData = []
strConverterResult = exiDecode(exidata)
print(strConverterResult)
if (strConverterResult.find("ProtocolNamespace=urn:din")>0):
# todo: of course we should care for schemaID and prio also here
print("Detected DIN")
msg = addV2GTPHeader(exiEncode("SupportedApplicationProtocolResponse"))
print("responding " + str(msg))
# Eh for encode handshake, SupportedApplicationProtocolResponse
msg = addV2GTPHeader(exiEncode("Eh"))
print("responding " + prettyHexMessage(msg))
self.Tcp.transmit(msg)
self.enterState(1)
def stateFunctionWaitForSessionSetupRequest(self):
if (len(self.rxData)>0):
print("In state stateFunctionWaitForSessionSetupRequest, received " + prettyHexMessage(self.rxData))
exidata = removeV2GTPHeader(self.rxData)
print("received exi" + prettyHexMessage(exidata))
self.rxData = []
strConverterResult = exiDecode(exidata)
print(strConverterResult)
if (True):
# todo: check the request content, and fill response parameters
msg = addV2GTPHeader(exiEncode("EDa")) # EDa for Encode, Din, SessionSetupResponse
print("responding " + prettyHexMessage(msg))
self.Tcp.transmit(msg)
self.enterState(2)
if (self.isTooLong()):
self.enterState(0)
@ -132,7 +143,7 @@ class fsmEvse():
self.Tcp.mainfunction() # call the lower-level worker
if (self.Tcp.isRxDataAvailable()):
self.rxData = self.Tcp.getRxData()
print("received " + str(self.rxData))
#print("received " + str(self.rxData))
#msg = "ok, you sent " + str(self.rxData)
#print("responding " + msg)
#self.Tcp.transmit(bytes(msg, "utf-8"))

View file

@ -6,6 +6,7 @@
import pyPlcTcpSocket
import time # for time.sleep()
from helpers import prettyHexMessage
from exiConnector import * # for EXI data handling/converting
stateInitialized = 0
@ -31,17 +32,34 @@ class fsmPev():
def stateFunctionInitialized(self):
if (self.Tcp.isConnected):
# self.Tcp.transmit(bytes("TestFromPevInInitialized", "utf-8"))
self.Tcp.transmit(addV2GTPHeader(exiHexToByteArray(exiHexDemoSupportedApplicationProtocolRequestIoniq)))
self.enterState(stateWaitForSupportedApplicationProtocolResponse)
def stateFunctionWaitForSupportedApplicationProtocolResponse(self):
if (len(self.rxData)>0):
print("In state stateFunctionWaitForSupportedApplicationProtocolResponse, received " + prettyHexMessage(self.rxData))
exidata = removeV2GTPHeader(self.rxData)
print("received exi" + prettyHexMessage(exidata))
self.rxData = []
strConverterResult = exiDecode(exidata)
print(strConverterResult)
# todo: evaluate the message
# EDA for encode DIN, SessionSetupRequest
msg = addV2GTPHeader(exiEncode("EDA"))
print("sending SessionSetupRequest" + prettyHexMessage(msg))
self.Tcp.transmit(msg)
self.enterState(stateWaitForSessionSetupResponse)
def stateFunctionWaitForSessionSetupResponse(self):
if (len(self.rxData)>0):
#self.enterState(stateWaitForServiceDiscoveryResponse)
pass
stateFunctions = {
stateInitialized: stateFunctionInitialized,
stateWaitForSupportedApplicationProtocolResponse: stateFunctionWaitForSupportedApplicationProtocolResponse,
stateWaitForSessionSetupResponse: stateFunctionWaitForSessionSetupResponse,
}
def reInit(self):
@ -65,7 +83,7 @@ class fsmPev():
#self.Tcp.mainfunction() # call the lower-level worker
if (self.Tcp.isRxDataAvailable()):
self.rxData = self.Tcp.getRxData()
print("received " + str(self.rxData))
print("received " + prettyHexMessage(self.rxData))
#msg = "ok, you sent " + str(self.rxData)
#print("responding " + msg)
#self.Tcp.transmit(bytes(msg, "utf-8"))

View file

@ -11,6 +11,13 @@ def showAsHex(mybytearray, description=""):
strHex = strHex + twoCharHex(mybytearray[i]) + " "
print(description + "(" + str(packetlength) + "bytes) = " + strHex)
def prettyHexMessage(mybytearray, description=""):
packetlength = len(mybytearray)
strHex = ""
for i in range(0, packetlength):
strHex = strHex + twoCharHex(mybytearray[i]) + " "
return description + "(" + str(packetlength) + "bytes) = " + strHex
def prettyMac(macByteArray):
s=""
for i in range(0, 5):

View file

@ -155,7 +155,7 @@ class pyPlcTcpServerSocket():
#print("The client closed the connection in the meanwhile.")
data = None
if data:
print("received data:", data)
# print("received data:", data)
self.rxData = data
else:
print("connection closed")