mirror of
https://github.com/uhi22/pyPLC.git
synced 2024-11-20 01:13:58 +00:00
started connecting state machines to exi
This commit is contained in:
parent
ff0470befb
commit
99fe012d87
3 changed files with 143 additions and 37 deletions
156
exiConnector.py
156
exiConnector.py
|
@ -32,14 +32,6 @@
|
|||
# Work in progress: Fork in https://github.com/uhi22/OpenV2Gx, to add a command line interface (and maybe a socket interface).
|
||||
#
|
||||
#
|
||||
# Example data:
|
||||
# (1) From the Ioniq:
|
||||
# In wireshark: Copy as hex stream
|
||||
# 01fe8001000000228000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040
|
||||
# remove the 8 bytes V2GTP header
|
||||
# 8000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040
|
||||
# (2) From OpenV2G main_example.appHandshake()
|
||||
# 8000ebab9371d34b9b79d189a98989c1d191d191818981d26b9b3a232b30010000040001b75726e3a64696e3a37303132313a323031323a4d73674465660020000100880
|
||||
#
|
||||
#
|
||||
#
|
||||
|
@ -51,36 +43,138 @@ import subprocess
|
|||
import sys
|
||||
import time
|
||||
|
||||
# Example data:
|
||||
# (1) From the Ioniq:
|
||||
# In wireshark: Copy as hex stream
|
||||
# 01fe8001000000228000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040
|
||||
# remove the 8 bytes V2GTP header
|
||||
# 8000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040
|
||||
exiHexDemoSupportedApplicationProtocolRequestIoniq="8000dbab9371d3234b71d1b981899189d191818991d26b9b3a232b30020000040040"
|
||||
# (2) From OpenV2G main_example.appHandshake()
|
||||
# 8000ebab9371d34b9b79d189a98989c1d191d191818981d26b9b3a232b30010000040001b75726e3a64696e3a37303132313a323031323a4d73674465660020000100880
|
||||
exiHexDemoSupportedApplicationProtocolRequest2="8000ebab9371d34b9b79d189a98989c1d191d191818981d26b9b3a232b30010000040001b75726e3a64696e3a37303132313a323031323a4d73674465660020000100880"
|
||||
|
||||
|
||||
# Configuration of the exi converter tool
|
||||
pathToOpenV2GExe = "..\\OpenV2Gx\\Release\\OpenV2G.exe";
|
||||
|
||||
# Functions
|
||||
def exiHexToByteArray(hexString):
|
||||
# input: a string with the two-byte-hex representation
|
||||
# output: a byte array with the same data.
|
||||
# If the convertion fails, we return an empty byte array.
|
||||
hexlen=len(hexString)
|
||||
if ((hexlen % 2)!=0):
|
||||
print("exiHexToByteArray: unplausible length of " + hexString)
|
||||
return bytearray(0)
|
||||
exiframe = bytearray(int(hexlen/2)) # two characters in the input string are one byte in the exiframe
|
||||
for i in range(0, int(hexlen/2)):
|
||||
x = hexString[2*i: 2*i+2]
|
||||
#print("valuestring = " + x)
|
||||
try:
|
||||
v = int(x, 16)
|
||||
#print("value " + hex(v))
|
||||
exiframe[i] = v
|
||||
except:
|
||||
print("exiHexToByteArray: unplausible data " + x)
|
||||
return bytearray(0)
|
||||
return exiframe
|
||||
|
||||
def exiByteArrayToHex(b):
|
||||
# input: Byte array
|
||||
# output: a string with the two-byte-hex representation of the byte array
|
||||
s = ""
|
||||
for i in range(0, len(b)):
|
||||
s = s + twoCharHex(b[i])
|
||||
return s
|
||||
|
||||
def addV2GTPHeader(exidata):
|
||||
print("type is " + str(type(exidata)))
|
||||
if (str(type(exidata)) == "<class 'str'>"):
|
||||
print("changing type to bytearray")
|
||||
exidata = exiHexToByteArray(exidata)
|
||||
print("type is " + str(type(exidata)))
|
||||
# takes the bytearray with exidata, and adds a header to it, according to the Vehicle-to-Grid-Transport-Protocol
|
||||
exiLen = len(exidata)
|
||||
header = bytearray(8) # V2GTP header has 8 bytes
|
||||
# 1 byte protocol version
|
||||
# 1 byte protocol version inverted
|
||||
# 2 bytes payload type
|
||||
# 4 byte payload length
|
||||
header[0] = 0x01 # version
|
||||
header[1] = 0xfe # version inverted
|
||||
header[2] = 0x80 # payload type. 0x8001 means "EXI data"
|
||||
header[3] = 0x01 #
|
||||
header[4] = (exiLen >> 24) & 0xff # length 4 byte.
|
||||
header[5] = (exiLen >> 16) & 0xff
|
||||
header[6] = (exiLen >> 8) & 0xff
|
||||
header[7] = exiLen & 0xff
|
||||
return header + exidata
|
||||
|
||||
def removeV2GTPHeader(v2gtpData):
|
||||
#removeV2GTPHeader
|
||||
return v2gtpData[8:]
|
||||
|
||||
def exiDecode(exiHex, prefix="DH"):
|
||||
# input: exi data. Either hexstring, or bytearray or bytes
|
||||
# prefix to select the schema
|
||||
# if the input is a byte array, we convert it into hex string. If it is already a hex string, we take it as it is.
|
||||
print("type is " + str(type(exiHex)))
|
||||
if (str(type(exiHex)) == "<class 'bytearray'>"):
|
||||
print("changing type to hex string")
|
||||
exiHex = exiByteArrayToHex(exiHex)
|
||||
if (str(type(exiHex)) == "<class 'bytes'>"):
|
||||
print("changing type to hex string")
|
||||
exiHex = exiByteArrayToHex(exiHex)
|
||||
print("type is " + str(type(exiHex)))
|
||||
param1 = prefix + exiHex # DH for decode handshake
|
||||
print("exiDecode: trying to decode " + exiHex + " with schema " + prefix)
|
||||
result = subprocess.run(
|
||||
[pathToOpenV2GExe, param1], capture_output=True, text=True)
|
||||
#print("stdout:", result.stdout)
|
||||
if (len(result.stderr)>0):
|
||||
print("exiDecode ERROR. stderr:" + result.stderr)
|
||||
strConverterResult = result.stdout
|
||||
return strConverterResult
|
||||
|
||||
def exiEncode(strMessageName, params=""):
|
||||
# todo: handle the schema, the message name and the parameters
|
||||
param1 = "EH1" # EH for encode handshake, SupportedApplicationProtocolResponse
|
||||
result = subprocess.run([pathToOpenV2GExe, param1], capture_output=True, text=True)
|
||||
print("exiEncode stdout:", result.stdout)
|
||||
if (len(result.stderr)>0):
|
||||
print("exiEncode ERROR. stderr:" + result.stderr)
|
||||
strConverterResult = result.stdout
|
||||
return strConverterResult
|
||||
|
||||
|
||||
def testByteArrayConversion(s):
|
||||
print("Testing conversion of " + s)
|
||||
x = exiHexToByteArray(s)
|
||||
newHexString = exiByteArrayToHex(x)
|
||||
print("exi as hex=" + newHexString)
|
||||
exiWithHeader = addV2GTPHeader(x)
|
||||
exiWithHeaderString = exiByteArrayToHex(exiWithHeader)
|
||||
print("with V2GTP header=" + exiWithHeaderString)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Testing exiConnector...")
|
||||
testByteArrayConversion("123456")
|
||||
testByteArrayConversion("1234567")
|
||||
testByteArrayConversion("ABCDEF")
|
||||
testByteArrayConversion("00112233445566778899AABBCCDDEEFF")
|
||||
testByteArrayConversion("TRASH!")
|
||||
|
||||
# Example: This is the first message from the EVCC.
|
||||
# Means: It is an supportedAppProtocolReq message. It provides a list of charging protocols supported by the EVCC.
|
||||
#
|
||||
#
|
||||
exiframe = [
|
||||
0x80, 0x00, 0xdb, 0xab, 0x93, 0x71, 0xd3, 0x23, 0x4b, 0x71, 0xd1, 0xb9, 0x81, 0x89,
|
||||
0x91, 0x89, 0xd1, 0x91, 0x81, 0x89, 0x91, 0xd2, 0x6b, 0x9b, 0x3a, 0x23, 0x2b, 0x30, 0x02, 0x00,
|
||||
0x00, 0x04, 0x00, 0x40 ]
|
||||
print("Testing exiDecode with exiHexDemoSupportedApplicationProtocolRequestIoniq")
|
||||
print(exiDecode(exiHexDemoSupportedApplicationProtocolRequestIoniq))
|
||||
print("Testing exiDecode with exiHexDemoSupportedApplicationProtocolRequest2")
|
||||
strConverterResult = exiDecode(exiHexDemoSupportedApplicationProtocolRequest2)
|
||||
print(strConverterResult)
|
||||
|
||||
print("Testing the exiConnector...")
|
||||
pathToOpenV2GExe = "..\\OpenV2Gx\\Release\\OpenV2G.exe";
|
||||
s = ""
|
||||
for i in range(0, len(exiframe)):
|
||||
s = s + twoCharHex(exiframe[i])
|
||||
print("exi as hex=" + s)
|
||||
param1 = "DH" + s # DH for decode handshake
|
||||
start_time = time.time()
|
||||
for i in range(0, 10):
|
||||
result = subprocess.run(
|
||||
[pathToOpenV2GExe, param1], capture_output=True, text=True)
|
||||
strConverterResult = exiDecode(exiHexToByteArray(exiHexDemoSupportedApplicationProtocolRequest2))
|
||||
print(strConverterResult)
|
||||
|
||||
print("--- %s seconds ---" % (time.time() - start_time))
|
||||
print("stdout:", result.stdout)
|
||||
print("stderr:", result.stderr)
|
||||
strConverterResult = result.stdout
|
||||
if (strConverterResult.find("ProtocolNamespace=urn:din")>0):
|
||||
print("Detected DIN")
|
||||
|
||||
|
|
16
fsmEvse.py
16
fsmEvse.py
|
@ -6,6 +6,7 @@
|
|||
|
||||
import pyPlcTcpSocket
|
||||
import time # for time.sleep()
|
||||
from exiConnector import * # for EXI data handling/converting
|
||||
|
||||
stateWaitForSupportedApplicationProtocolRequest = 0
|
||||
stateWaitForSessionSetupRequest = 1
|
||||
|
@ -30,10 +31,19 @@ class fsmEvse():
|
|||
|
||||
def stateFunctionWaitForSupportedApplicationProtocolRequest(self):
|
||||
if (len(self.rxData)>0):
|
||||
msg = "ok, you sent " + str(self.rxData)
|
||||
print("received " + str(self.rxData))
|
||||
exidata = removeV2GTPHeader(self.rxData)
|
||||
print("received exi " + str(exidata))
|
||||
self.rxData = []
|
||||
print("responding " + msg)
|
||||
self.Tcp.transmit(bytes(msg, "utf-8"))
|
||||
strConverterResult = exiDecode(exidata)
|
||||
print(strConverterResult)
|
||||
if (strConverterResult.find("ProtocolNamespace=urn:din")>0):
|
||||
# todo: of course we should care for schemaID and prio also here
|
||||
print("Detected DIN")
|
||||
msg = addV2GTPHeader(exiEncode("SupportedApplicationProtocolResponse"))
|
||||
print("responding " + str(msg))
|
||||
self.Tcp.transmit(msg)
|
||||
|
||||
self.enterState(1)
|
||||
|
||||
def stateFunctionWaitForSessionSetupRequest(self):
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
import pyPlcTcpSocket
|
||||
import time # for time.sleep()
|
||||
from exiConnector import * # for EXI data handling/converting
|
||||
|
||||
stateInitialized = 0
|
||||
stateWaitForSupportedApplicationProtocolResponse = 1
|
||||
|
@ -30,7 +31,8 @@ class fsmPev():
|
|||
|
||||
def stateFunctionInitialized(self):
|
||||
if (self.Tcp.isConnected):
|
||||
self.Tcp.transmit(bytes("TestFromPevInInitialized", "utf-8"))
|
||||
# self.Tcp.transmit(bytes("TestFromPevInInitialized", "utf-8"))
|
||||
self.Tcp.transmit(addV2GTPHeader(exiHexToByteArray(exiHexDemoSupportedApplicationProtocolRequestIoniq)))
|
||||
self.enterState(stateWaitForSupportedApplicationProtocolResponse)
|
||||
|
||||
def stateFunctionWaitForSupportedApplicationProtocolResponse(self):
|
||||
|
|
Loading…
Reference in a new issue