feature: simulation mode clean up

This commit is contained in:
uhi22 2023-04-19 09:05:21 +02:00
parent 9a65441442
commit 7e587336b6
3 changed files with 69 additions and 22 deletions

View file

@ -6,7 +6,8 @@
import pyPlcTcpSocket import pyPlcTcpSocket
import time # for time.sleep() import time # for time.sleep()
from helpers import prettyHexMessage from helpers import prettyHexMessage, combineValueAndMultiplier
from random import random
from exiConnector import * # for EXI data handling/converting from exiConnector import * # for EXI data handling/converting
stateWaitForSupportedApplicationProtocolRequest = 0 stateWaitForSupportedApplicationProtocolRequest = 0
@ -136,15 +137,26 @@ class fsmEvse():
self.Tcp.transmit(msg) self.Tcp.transmit(msg)
self.enterState(stateWaitForFlexibleRequest) # todo: not clear, what is specified in DIN self.enterState(stateWaitForFlexibleRequest) # todo: not clear, what is specified in DIN
if (strConverterResult.find("PreChargeReq")>0): if (strConverterResult.find("PreChargeReq")>0):
# todo: check the request content, and fill response parameters # check the request content, and fill response parameters
uTarget = 220 # default in case we cannot decode the requested voltage
try:
y = json.loads(strConverterResult)
strEVTargetVoltageValue = y["EVTargetVoltage.Value"]
strEVTargetVoltageMultiplier = y["EVTargetVoltage.Multiplier"]
uTarget = combineValueAndMultiplier(strEVTargetVoltageValue, strEVTargetVoltageMultiplier)
self.addToTrace("EV wants EVTargetVoltage " + str(uTarget))
except:
self.addToTrace("ERROR: Could not decode the PreChargeReq")
# simulating preCharge # simulating preCharge
if (self.simulatedPresentVoltage<200): if (self.simulatedPresentVoltage<uTarget/2):
self.simulatedPresentVoltage = 200 self.simulatedPresentVoltage = uTarget/2
if (self.simulatedPresentVoltage<230): if (self.simulatedPresentVoltage<uTarget-30):
self.simulatedPresentVoltage += 10 self.simulatedPresentVoltage += 20
if (self.simulatedPresentVoltage<400): if (self.simulatedPresentVoltage<uTarget):
self.simulatedPresentVoltage += 5 self.simulatedPresentVoltage += 5
strPresentVoltage = str(self.simulatedPresentVoltage) # "345" strPresentVoltage = str(self.simulatedPresentVoltage) # "345"
self.callbackShowStatus(strPresentVoltage, "EVSEPresentVoltage")
msg = addV2GTPHeader(exiEncode("EDg_"+strPresentVoltage)) # EDg for Encode, Din, PreChargeResponse msg = addV2GTPHeader(exiEncode("EDg_"+strPresentVoltage)) # EDg for Encode, Din, PreChargeResponse
self.addToTrace("responding " + prettyHexMessage(msg)) self.addToTrace("responding " + prettyHexMessage(msg))
self.publishStatus("PreCharging " + strPresentVoltage) self.publishStatus("PreCharging " + strPresentVoltage)
@ -158,10 +170,21 @@ class fsmEvse():
self.Tcp.transmit(msg) self.Tcp.transmit(msg)
self.enterState(stateWaitForFlexibleRequest) # todo: not clear, what is specified in DIN self.enterState(stateWaitForFlexibleRequest) # todo: not clear, what is specified in DIN
if (strConverterResult.find("CurrentDemandReq")>0): if (strConverterResult.find("CurrentDemandReq")>0):
# todo: check the request content, and fill response parameters # check the request content, and fill response parameters
if (self.simulatedPresentVoltage<300): uTarget = 220 # default in case we cannot decode the requested voltage
self.simulatedPresentVoltage += 1; try:
y = json.loads(strConverterResult)
strEVTargetVoltageValue = y["EVTargetVoltage.Value"]
strEVTargetVoltageMultiplier = y["EVTargetVoltage.Multiplier"]
uTarget = combineValueAndMultiplier(strEVTargetVoltageValue, strEVTargetVoltageMultiplier)
self.addToTrace("EV wants EVTargetVoltage " + str(uTarget))
strSoc = y["DC_EVStatus.EVRESSSOC"]
self.callbackShowStatus(strSoc, "soc")
except:
self.addToTrace("ERROR: Could not decode the CurrentDemandReq")
self.simulatedPresentVoltage = uTarget + 3*random() # The charger provides the voltage which is demanded by the car.
strPresentVoltage = str(self.simulatedPresentVoltage) strPresentVoltage = str(self.simulatedPresentVoltage)
self.callbackShowStatus(strPresentVoltage, "EVSEPresentVoltage")
strEVSEPresentCurrent = "10" # Just as a dummy current strEVSEPresentCurrent = "10" # Just as a dummy current
msg = addV2GTPHeader(exiEncode("EDi_"+strPresentVoltage + "_" + strEVSEPresentCurrent)) # EDi for Encode, Din, CurrentDemandRes msg = addV2GTPHeader(exiEncode("EDi_"+strPresentVoltage + "_" + strEVSEPresentCurrent)) # EDi for Encode, Din, CurrentDemandRes
self.addToTrace("responding " + prettyHexMessage(msg)) self.addToTrace("responding " + prettyHexMessage(msg))

View file

@ -65,6 +65,7 @@ if (getConfigValue("mode")=="PevMode"):
myMode = C_PEV_MODE myMode = C_PEV_MODE
if (getConfigValue("mode")=="EvseMode"): if (getConfigValue("mode")=="EvseMode"):
myMode = C_EVSE_MODE myMode = C_EVSE_MODE
# The command line arguments overwrite the config file setting for PevMode/EvseMode.
if (len(sys.argv) > 1): if (len(sys.argv) > 1):
if (sys.argv[1] == "P"): if (sys.argv[1] == "P"):
myMode = C_PEV_MODE myMode = C_PEV_MODE
@ -72,6 +73,7 @@ if (len(sys.argv) > 1):
if (sys.argv[1] == "E"): if (sys.argv[1] == "E"):
myMode = C_EVSE_MODE myMode = C_EVSE_MODE
# The simulation mode can be set by command line in addition in both, PevMode and EvseMode.
isSimulationMode=0 isSimulationMode=0
if (len(sys.argv) > 2): if (len(sys.argv) > 2):
if (sys.argv[2] == "S"): if (sys.argv[2] == "S"):
@ -80,9 +82,15 @@ if (len(sys.argv) > 2):
if (myMode == C_LISTEN_MODE): if (myMode == C_LISTEN_MODE):
print("starting in LISTEN_MODE") print("starting in LISTEN_MODE")
if (myMode == C_PEV_MODE): if (myMode == C_PEV_MODE):
print("starting in PEV_MODE") if (isSimulationMode!=0):
print("starting in PevMode, simulated environment")
else:
print("starting in PevMode")
if (myMode == C_EVSE_MODE): if (myMode == C_EVSE_MODE):
print("starting in EVSE_MODE") if (isSimulationMode!=0):
print("starting in EvseMode, simulated environment")
else:
print("starting in EvseMode")
root = tk.Tk() root = tk.Tk()
root.geometry("400x350") root.geometry("400x350")

View file

@ -719,6 +719,9 @@ class pyPlcHomeplug():
self.addToTrace("Checkpoint170: transmitting CM_SET_KEY.REQ") self.addToTrace("Checkpoint170: transmitting CM_SET_KEY.REQ")
self.sniffer.sendpacket(bytes(self.mytransmitbuffer)) self.sniffer.sendpacket(bytes(self.mytransmitbuffer))
if (self.pevSequenceState==STATE_WAITING_FOR_SLAC_MATCH_CNF): # we were waiting for finishing the SLAC_MATCH.CNF and SET_KEY.REQ if (self.pevSequenceState==STATE_WAITING_FOR_SLAC_MATCH_CNF): # we were waiting for finishing the SLAC_MATCH.CNF and SET_KEY.REQ
if (self.isSimulationMode!=0):
# In simulation mode, we pretend a successful SetKey response:
self.connMgr.SlacOk()
self.enterState(STATE_WAITING_FOR_RESTART2) self.enterState(STATE_WAITING_FOR_RESTART2)
def evaluateReceivedHomeplugPacket(self): def evaluateReceivedHomeplugPacket(self):
@ -774,6 +777,14 @@ class pyPlcHomeplug():
def modemFinder_Mainfunction(self): def modemFinder_Mainfunction(self):
if ((self.connMgr.getConnectionLevel()==5) and (self.mofi_state==0)): if ((self.connMgr.getConnectionLevel()==5) and (self.mofi_state==0)):
# We want the modem search only, if no connection is present at all. # We want the modem search only, if no connection is present at all.
if (self.isSimulationMode!=0):
self.addToTrace("[ModemFinder] We are in SimulationMode. Pretending that one modem is present.")
self.composeGetSwReq() # Send a GetSoftwareVersionRequest never the less. Just to have it in the trace.
self.transmit(self.mytransmitbuffer)
self.numberOfSoftwareVersionResponses = 1 # One pretended modem
self.connMgr.ModemFinderOk(self.numberOfSoftwareVersionResponses) # report "success" to the connection manager
self.mofi_state=2
return
self.addToTrace("[ModemFinder] Starting modem search") self.addToTrace("[ModemFinder] Starting modem search")
self.publishStatus("Modem search") self.publishStatus("Modem search")
self.composeGetSwReq() self.composeGetSwReq()
@ -817,14 +828,16 @@ class pyPlcHomeplug():
return return
if (self.pevSequenceState==STATE_INITIAL): # Initial state. if (self.pevSequenceState==STATE_INITIAL): # Initial state.
# In real life we would check whether we see 5% PWM on the pilot line. We skip this check. # In real life we would check whether we see 5% PWM on the pilot line. We skip this check.
self.isSimulationMode = self.isForcedSimulationMode # from command line, we can force the simulation mode
self.isSDPDone = 0 self.isSDPDone = 0
self.isDeveloperLocalKey = 0 self.isDeveloperLocalKey = 0
self.nEvseModemMissingCounter = 0 self.nEvseModemMissingCounter = 0
self.enterState(STATE_READY_FOR_SLAC) self.enterState(STATE_READY_FOR_SLAC)
return return
if (self.pevSequenceState==STATE_READY_FOR_SLAC): if (self.pevSequenceState==STATE_READY_FOR_SLAC):
self.showStatus("Starting SLAC", "pevState") if (self.isSimulationMode!=0):
self.showStatus("Simu SLAC", "pevState")
else:
self.showStatus("Starting SLAC", "pevState")
self.addToTrace("[PEVSLAC] Checkpoint100: Sending SLAC_PARAM.REQ...") self.addToTrace("[PEVSLAC] Checkpoint100: Sending SLAC_PARAM.REQ...")
self.composeSlacParamReq() self.composeSlacParamReq()
self.transmit(self.mytransmitbuffer) self.transmit(self.mytransmitbuffer)
@ -927,17 +940,22 @@ class pyPlcHomeplug():
if (self.pevSequenceState==STATE_FIND_MODEMS2): # Waiting for the modems to answer. if (self.pevSequenceState==STATE_FIND_MODEMS2): # Waiting for the modems to answer.
if (self.pevSequenceCyclesInState>=10): if (self.pevSequenceCyclesInState>=10):
# It was sufficient time to get the answers from the modems. # It was sufficient time to get the answers from the modems.
if (self.isSimulationMode!=0):
self.addToTrace("[PEVSLAC] Simulating that both modems are present now.")
self.nEvseModemMissingCounter=0
self.connMgr.ModemFinderOk(2) # Two modems were found.
# This is the end of the SLAC.
# The simulated AVLN is established, we have at least two modems in the network.
self.enterState(STATE_INITIAL)
return
self.addToTrace("[PEVSLAC] It was sufficient time to get the answers from the modems.") self.addToTrace("[PEVSLAC] It was sufficient time to get the answers from the modems.")
# Let's see what we received. # Let's see what we received.
if ((not self.isEvseModemFound()) and (not self.isSimulationMode)): if (not self.isEvseModemFound()):
self.nEvseModemMissingCounter+=1 self.nEvseModemMissingCounter+=1
self.addToTrace("[PEVSLAC] No EVSE seen (yet). Still waiting for it.") self.addToTrace("[PEVSLAC] No EVSE seen (yet). Still waiting for it.")
# At the Alpitronic we measured, that it takes 7s between the SlacMatchResponse and # At the Alpitronic we measured, that it takes 7s between the SlacMatchResponse and
# the chargers modem reacts to GetKeyRequest. So we should wait here at least 10s. # the chargers modem reacts to GetKeyRequest. So we should wait here at least 10s.
if (self.nEvseModemMissingCounter>10): if (self.nEvseModemMissingCounter>10):
if (self.isSimulationMode):
self.addToTrace("[PEVSLAC] No EVSE modem. But this is fine, we are in SimulationMode.")
else:
# We lost the connection to the EVSE modem. Back to the beginning. # We lost the connection to the EVSE modem. Back to the beginning.
self.addToTrace("[PEVSLAC] We lost the connection to the EVSE modem. Back to the beginning.") self.addToTrace("[PEVSLAC] We lost the connection to the EVSE modem. Back to the beginning.")
self.enterState(STATE_INITIAL) self.enterState(STATE_INITIAL)
@ -946,10 +964,8 @@ class pyPlcHomeplug():
self.pevSequenceDelayCycles=30 self.pevSequenceDelayCycles=30
self.enterState(STATE_WAITING_FOR_RESTART2) self.enterState(STATE_WAITING_FOR_RESTART2)
return return
# The EVSE modem is present (or we are simulating) # The EVSE modem is present
self.addToTrace("[PEVSLAC] EVSE is up, pairing successful.") self.addToTrace("[PEVSLAC] EVSE is up, pairing successful.")
if (self.isSimulationMode):
self.addToTrace("[PEVSLAC] But this is only simulated.")
self.nEvseModemMissingCounter=0 self.nEvseModemMissingCounter=0
self.connMgr.ModemFinderOk(2) # Two modems were found. self.connMgr.ModemFinderOk(2) # Two modems were found.
# This is the end of the SLAC. # This is the end of the SLAC.
@ -1042,7 +1058,7 @@ class pyPlcHomeplug():
self.numberOfFoundModems = 0 self.numberOfFoundModems = 0
self.mofi_state = 0 self.mofi_state = 0
self.mofi_stateDelay = 0 self.mofi_stateDelay = 0
self.isForcedSimulationMode = isSimulationMode # simulation without homeplug modem self.isSimulationMode = isSimulationMode # simulation without homeplug modem
#self.sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50) #self.sniffer = pcap.pcap(name=None, promisc=True, immediate=True, timeout_ms=50)
# eth3 means: Third entry from back, in the list of interfaces, which is provided by pcap.findalldevs. # eth3 means: Third entry from back, in the list of interfaces, which is provided by pcap.findalldevs.
# Improvement necessary: select the interface based on the name. # Improvement necessary: select the interface based on the name.