From 99fe012d87b51db1e14b080c900de9a37456df56 Mon Sep 17 00:00:00 2001 From: uhi22 Date: Mon, 7 Nov 2022 13:38:06 +0100 Subject: [PATCH] started connecting state machines to exi --- exiConnector.py | 160 ++++++++++++++++++++++++++++++++++++++---------- fsmEvse.py | 16 ++++- fsmPev.py | 4 +- 3 files changed, 143 insertions(+), 37 deletions(-) diff --git a/exiConnector.py b/exiConnector.py index 8a1d550..a88f925 100644 --- a/exiConnector.py +++ b/exiConnector.py @@ -32,14 +32,6 @@ # Work in progress: Fork in https://github.com/uhi22/OpenV2Gx, to add a command line interface (and maybe a socket interface). # # -# Example data: -# (1) From the Ioniq: -# In wireshark: Copy as hex stream -# 01fe8001000000228000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040 -# remove the 8 bytes V2GTP header -# 8000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040 -# (2) From OpenV2G main_example.appHandshake() -# 8000ebab9371d34b9b79d189a98989c1d191d191818981d26b9b3a232b30010000040001b75726e3a64696e3a37303132313a323031323a4d73674465660020000100880 # # # @@ -51,36 +43,138 @@ import subprocess import sys import time +# Example data: +# (1) From the Ioniq: +# In wireshark: Copy as hex stream +# 01fe8001000000228000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040 +# remove the 8 bytes V2GTP header +# 8000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040 +exiHexDemoSupportedApplicationProtocolRequestIoniq="8000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040" +# (2) From OpenV2G main_example.appHandshake() +# 8000ebab9371d34b9b79d189a98989c1d191d191818981d26b9b3a232b30010000040001b75726e3a64696e3a37303132313a323031323a4d73674465660020000100880 +exiHexDemoSupportedApplicationProtocolRequest2="8000ebab9371d34b9b79d189a98989c1d191d191818981d26b9b3a232b30010000040001b75726e3a64696e3a37303132313a323031323a4d73674465660020000100880" +# Configuration of the exi converter tool +pathToOpenV2GExe = "..\\OpenV2Gx\\Release\\OpenV2G.exe"; + +# Functions +def exiHexToByteArray(hexString): + # input: a string with the two-byte-hex representation + # output: a byte array with the same data. + # If the convertion fails, we return an empty byte array. + hexlen=len(hexString) + if ((hexlen % 2)!=0): + print("exiHexToByteArray: unplausible length of " + hexString) + return bytearray(0) + exiframe = bytearray(int(hexlen/2)) # two characters in the input string are one byte in the exiframe + for i in range(0, int(hexlen/2)): + x = hexString[2*i: 2*i+2] + #print("valuestring = " + x) + try: + v = int(x, 16) + #print("value " + hex(v)) + exiframe[i] = v + except: + print("exiHexToByteArray: unplausible data " + x) + return bytearray(0) + return exiframe + +def exiByteArrayToHex(b): + # input: Byte array + # output: a string with the two-byte-hex representation of the byte array + s = "" + for i in range(0, len(b)): + s = s + twoCharHex(b[i]) + return s + +def addV2GTPHeader(exidata): + print("type is " + str(type(exidata))) + if (str(type(exidata)) == ""): + print("changing type to bytearray") + exidata = exiHexToByteArray(exidata) + print("type is " + str(type(exidata))) + # takes the bytearray with exidata, and adds a header to it, according to the Vehicle-to-Grid-Transport-Protocol + exiLen = len(exidata) + header = bytearray(8) # V2GTP header has 8 bytes + # 1 byte protocol version + # 1 byte protocol version inverted + # 2 bytes payload type + # 4 byte payload length + header[0] = 0x01 # version + header[1] = 0xfe # version inverted + header[2] = 0x80 # payload type. 0x8001 means "EXI data" + header[3] = 0x01 # + header[4] = (exiLen >> 24) & 0xff # length 4 byte. + header[5] = (exiLen >> 16) & 0xff + header[6] = (exiLen >> 8) & 0xff + header[7] = exiLen & 0xff + return header + exidata + +def removeV2GTPHeader(v2gtpData): + #removeV2GTPHeader + return v2gtpData[8:] + +def exiDecode(exiHex, prefix="DH"): + # input: exi data. Either hexstring, or bytearray or bytes + # prefix to select the schema + # if the input is a byte array, we convert it into hex string. If it is already a hex string, we take it as it is. + print("type is " + str(type(exiHex))) + if (str(type(exiHex)) == ""): + print("changing type to hex string") + exiHex = exiByteArrayToHex(exiHex) + if (str(type(exiHex)) == ""): + print("changing type to hex string") + exiHex = exiByteArrayToHex(exiHex) + print("type is " + str(type(exiHex))) + param1 = prefix + exiHex # DH for decode handshake + print("exiDecode: trying to decode " + exiHex + " with schema " + prefix) + result = subprocess.run( + [pathToOpenV2GExe, param1], capture_output=True, text=True) + #print("stdout:", result.stdout) + if (len(result.stderr)>0): + print("exiDecode ERROR. stderr:" + result.stderr) + strConverterResult = result.stdout + return strConverterResult + +def exiEncode(strMessageName, params=""): + # todo: handle the schema, the message name and the parameters + param1 = "EH1" # EH for encode handshake, SupportedApplicationProtocolResponse + result = subprocess.run([pathToOpenV2GExe, param1], capture_output=True, text=True) + print("exiEncode stdout:", result.stdout) + if (len(result.stderr)>0): + print("exiEncode ERROR. stderr:" + result.stderr) + strConverterResult = result.stdout + return strConverterResult + + +def testByteArrayConversion(s): + print("Testing conversion of " + s) + x = exiHexToByteArray(s) + newHexString = exiByteArrayToHex(x) + print("exi as hex=" + newHexString) + exiWithHeader = addV2GTPHeader(x) + exiWithHeaderString = exiByteArrayToHex(exiWithHeader) + print("with V2GTP header=" + exiWithHeaderString) + if __name__ == "__main__": - - # Example: This is the first message from the EVCC. - # Means: It is an supportedAppProtocolReq message. It provides a list of charging protocols supported by the EVCC. - # - # - exiframe = [ - 0x80, 0x00, 0xdb, 0xab, 0x93, 0x71, 0xd3, 0x23, 0x4b, 0x71, 0xd1, 0xb9, 0x81, 0x89, - 0x91, 0x89, 0xd1, 0x91, 0x81, 0x89, 0x91, 0xd2, 0x6b, 0x9b, 0x3a, 0x23, 0x2b, 0x30, 0x02, 0x00, - 0x00, 0x04, 0x00, 0x40 ] - - print("Testing the exiConnector...") - pathToOpenV2GExe = "..\\OpenV2Gx\\Release\\OpenV2G.exe"; - s = "" - for i in range(0, len(exiframe)): - s = s + twoCharHex(exiframe[i]) - print("exi as hex=" + s) - param1 = "DH" + s # DH for decode handshake - start_time = time.time() - for i in range(0, 10): - result = subprocess.run( - [pathToOpenV2GExe, param1], capture_output=True, text=True) + print("Testing exiConnector...") + testByteArrayConversion("123456") + testByteArrayConversion("1234567") + testByteArrayConversion("ABCDEF") + testByteArrayConversion("00112233445566778899AABBCCDDEEFF") + testByteArrayConversion("TRASH!") - print("--- %s seconds ---" % (time.time() - start_time)) - print("stdout:", result.stdout) - print("stderr:", result.stderr) - strConverterResult = result.stdout + print("Testing exiDecode with exiHexDemoSupportedApplicationProtocolRequestIoniq") + print(exiDecode(exiHexDemoSupportedApplicationProtocolRequestIoniq)) + print("Testing exiDecode with exiHexDemoSupportedApplicationProtocolRequest2") + strConverterResult = exiDecode(exiHexDemoSupportedApplicationProtocolRequest2) + print(strConverterResult) + + strConverterResult = exiDecode(exiHexToByteArray(exiHexDemoSupportedApplicationProtocolRequest2)) + print(strConverterResult) + if (strConverterResult.find("ProtocolNamespace=urn:din")>0): print("Detected DIN") diff --git a/fsmEvse.py b/fsmEvse.py index bd921b0..1016739 100644 --- a/fsmEvse.py +++ b/fsmEvse.py @@ -6,6 +6,7 @@ import pyPlcTcpSocket import time # for time.sleep() +from exiConnector import * # for EXI data handling/converting stateWaitForSupportedApplicationProtocolRequest = 0 stateWaitForSessionSetupRequest = 1 @@ -30,10 +31,19 @@ class fsmEvse(): def stateFunctionWaitForSupportedApplicationProtocolRequest(self): if (len(self.rxData)>0): - msg = "ok, you sent " + str(self.rxData) + print("received " + str(self.rxData)) + exidata = removeV2GTPHeader(self.rxData) + print("received exi " + str(exidata)) self.rxData = [] - print("responding " + msg) - self.Tcp.transmit(bytes(msg, "utf-8")) + strConverterResult = exiDecode(exidata) + print(strConverterResult) + if (strConverterResult.find("ProtocolNamespace=urn:din")>0): + # todo: of course we should care for schemaID and prio also here + print("Detected DIN") + msg = addV2GTPHeader(exiEncode("SupportedApplicationProtocolResponse")) + print("responding " + str(msg)) + self.Tcp.transmit(msg) + self.enterState(1) def stateFunctionWaitForSessionSetupRequest(self): diff --git a/fsmPev.py b/fsmPev.py index 7f317bd..136b083 100644 --- a/fsmPev.py +++ b/fsmPev.py @@ -6,6 +6,7 @@ import pyPlcTcpSocket import time # for time.sleep() +from exiConnector import * # for EXI data handling/converting stateInitialized = 0 stateWaitForSupportedApplicationProtocolResponse = 1 @@ -30,7 +31,8 @@ class fsmPev(): def stateFunctionInitialized(self): if (self.Tcp.isConnected): - self.Tcp.transmit(bytes("TestFromPevInInitialized", "utf-8")) + # self.Tcp.transmit(bytes("TestFromPevInInitialized", "utf-8")) + self.Tcp.transmit(addV2GTPHeader(exiHexToByteArray(exiHexDemoSupportedApplicationProtocolRequestIoniq))) self.enterState(stateWaitForSupportedApplicationProtocolResponse) def stateFunctionWaitForSupportedApplicationProtocolResponse(self):