From 9d06934ec7d657ba609b4ca5e60c9d2ba9e47aa3 Mon Sep 17 00:00:00 2001 From: uhi22 Date: Wed, 10 May 2023 20:56:48 +0200 Subject: [PATCH] feature: integrated celeron55s hardwareinterface. Improved pev timeouts. --- .gitignore | 2 +- doc/pyPlc.ini.template | 5 ++ fsmPev.py | 10 ++- hardwareInterface.py | 190 ++++++++++++++++++++++++++++++++++++++--- 4 files changed, 191 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 10e0d50..546fe56 100644 --- a/.gitignore +++ b/.gitignore @@ -3,5 +3,5 @@ /doc/*.jar *.bat PevExiLog*.txt -pyPlc.ini +*.ini /log diff --git a/doc/pyPlc.ini.template b/doc/pyPlc.ini.template index 7884ebe..1b92147 100644 --- a/doc/pyPlc.ini.template +++ b/doc/pyPlc.ini.template @@ -75,3 +75,8 @@ serial_baud = 115200 # 11 * 6 * 4.1V = 66 * 4.1V = 270V charge_target_voltage = 270 +# By using an external manager script, this is useful for splitting charging +# into separately logged sessions and avoiding logging when not connected to a +# charger +exit_on_session_end = False + diff --git a/fsmPev.py b/fsmPev.py index d316670..6794f77 100644 --- a/fsmPev.py +++ b/fsmPev.py @@ -180,7 +180,9 @@ class fsmPev(): def isTooLong(self): # The timeout handling function. - limit = 33 # number of call cycles until timeout. Default 33 cycles with 30ms, means approx. 1 second. + limit = 66 # number of call cycles until timeout. Default 66 cycles with 30ms, means approx. 2 seconds. + # This 2s is the specified timeout time for many messages, fitting to the + # performance time of 1.5s. Exceptions see below. if (self.state==stateWaitForChargeParameterDiscoveryResponse): limit = 5*33 # On some charger models, the chargeParameterDiscovery needs more than a second. Wait at least 5s. if (self.state==stateWaitForCableCheckResponse): @@ -188,7 +190,11 @@ class fsmPev(): if (self.state==stateWaitForPreChargeResponse): limit = 30*33 # PreCharge may need some time. Wait at least 30s. if (self.state==stateWaitForPowerDeliveryResponse): - limit = 5*33 # PowerDelivery may need some time. Wait at least 5s. On Compleo charger, observed more than 1s until response. + limit = 6*33 # PowerDelivery may need some time. Wait at least 6s. On Compleo charger, observed more than 1s until response. + # specified performance time is 4.5s (ISO) + if (self.state==stateWaitForCurrentDemandResponse): + limit = 5*33 # Test with 5s timeout. Just experimental. + # The specified performance time is 25ms (ISO), the specified timeout 250ms. return (self.cyclesInState > limit) def stateFunctionNotYetInitialized(self): diff --git a/hardwareInterface.py b/hardwareInterface.py index 525f83b..b3becdf 100644 --- a/hardwareInterface.py +++ b/hardwareInterface.py @@ -10,6 +10,7 @@ import serial # the pyserial from serial.tools.list_ports import comports from time import sleep from configmodule import getConfigValue, getConfigValueBool +import sys # For exit_on_session_end hack if (getConfigValue("digital_output_device")=="beaglebone"): # In case we run on beaglebone, we want to use GPIO ports. @@ -24,11 +25,31 @@ class hardwareInterface(): return True # a "dieter" output device is expected to be connected on serial port. if (getConfigValue("analog_input_device")=="dieter"): return True # a "dieter" input device is expected to be connected on serial port. + if (getConfigValue("digital_output_device")=="celeron55device"): + return True + if (getConfigValue("analog_input_device")=="celeron55device"): + return True return False # non of the functionalities need a serial port. def findSerialPort(self): + baud = int(getConfigValue("serial_baud")) + if (getConfigValue("serial_port")!="auto"): + port = getConfigValue("serial_port") + try: + self.addToTrace("Using serial port " + port) + self.ser = serial.Serial(port, baud, timeout=0) + self.isSerialInterfaceOk = True + except: + if (self.needsSerial()): + self.addToTrace("ERROR: Could not open serial port.") + else: + self.addToTrace("Could not open serial port, but also do not need it. Ok.") + self.ser = None + self.isSerialInterfaceOk = False + return + ports = [] - self.addToTrace('Available serial ports:') + self.addToTrace('Auto detection of serial ports. Available serial ports:') for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): if (port=="/dev/ttyAMA0"): self.addToTrace("ignoring /dev/ttyAMA0, because this is not an USB serial port") @@ -47,7 +68,7 @@ class hardwareInterface(): else: self.addToTrace("ok, we take the first port, " + ports[0]) try: - self.ser = serial.Serial(ports[0], 19200, timeout=0) + self.ser = serial.Serial(ports[0], baud, timeout=0) self.isSerialInterfaceOk = True except: self.addToTrace("ERROR: Could not open serial port.") @@ -61,24 +82,32 @@ class hardwareInterface(): self.addToTrace("Setting CP line into state B.") if (getConfigValue("digital_output_device")=="beaglebone"): GPIO.output("P8_18", GPIO.LOW) + if (getConfigValue("digital_output_device")=="celeron55device"): + self.ser.write(bytes("cp=0\n", "utf-8")) self.outvalue &= ~1 def setStateC(self): self.addToTrace("Setting CP line into state C.") if (getConfigValue("digital_output_device")=="beaglebone"): GPIO.output("P8_18", GPIO.HIGH) + if (getConfigValue("digital_output_device")=="celeron55device"): + self.ser.write(bytes("cp=1\n", "utf-8")) self.outvalue |= 1 def setPowerRelayOn(self): self.addToTrace("Switching PowerRelay ON.") if (getConfigValue("digital_output_device")=="beaglebone"): GPIO.output("P8_16", GPIO.HIGH) + if (getConfigValue("digital_output_device")=="celeron55device"): + self.ser.write(bytes("contactor=1\n", "utf-8")) self.outvalue |= 2 def setPowerRelayOff(self): self.addToTrace("Switching PowerRelay OFF.") if (getConfigValue("digital_output_device")=="beaglebone"): GPIO.output("P8_16", GPIO.LOW) + if (getConfigValue("digital_output_device")=="celeron55device"): + self.ser.write(bytes("contactor=0\n", "utf-8")) self.outvalue &= ~2 def setRelay2On(self): @@ -90,17 +119,26 @@ class hardwareInterface(): self.outvalue &= ~4 def getPowerRelayConfirmation(self): + if (getConfigValue("digital_output_device")=="celeron55device"): + return self.contactor_confirmed return 1 # todo: self.contactor_confirmed def triggerConnectorLocking(self): self.addToTrace("Locking the connector") + if (getConfigValue("digital_output_device")=="celeron55device"): + self.ser.write(bytes("lock\n", "utf-8")) # todo control the lock motor into lock direction until the end (time based or current based stopping?) def triggerConnectorUnlocking(self): self.addToTrace("Unocking the connector") + if (getConfigValue("digital_output_device")=="celeron55device"): + self.ser.write(bytes("unlock\n", "utf-8")) # todo control the lock motor into unlock direction until the end (time based or current based stopping?) def isConnectorLocked(self): + # TODO: Read the lock= value from the hardware so that this works + #if (getConfigValue("digital_output_device")=="celeron55device"): + # return self.lock_confirmed return 1 # todo: use the real connector lock feedback def getInletVoltage(self): @@ -109,26 +147,45 @@ class hardwareInterface(): return self.inletVoltage def getAccuVoltage(self): + if (getConfigValue("digital_output_device")=="celeron55device"): + return self.accuVoltage #todo: get real measured voltage from the accu self.accuVoltage = 230 return self.accuVoltage def getAccuMaxCurrent(self): + if (getConfigValue("digital_output_device")=="celeron55device"): + # The overall current limit is currently hardcoded in + # OpenV2Gx/src/test/main_commandlineinterface.c + EVMaximumCurrentLimit = 250 + if self.accuMaxCurrent >= EVMaximumCurrentLimit: + return EVMaximumCurrentLimit + return self.accuMaxCurrent #todo: get max charging current from the BMS self.accuMaxCurrent = 10 return self.accuMaxCurrent def getAccuMaxVoltage(self): - #todo: get max charging voltage from the BMS - self.accuMaxVoltage = 230 + if getConfigValue("charge_target_voltage"): + self.accuMaxVoltage = getConfigValue("charge_target_voltage") + else: + #todo: get max charging voltage from the BMS + self.accuMaxVoltage = 230 return self.accuMaxVoltage def getIsAccuFull(self): - #todo: get "full" indication from the BMS - self.IsAccuFull = (self.simulatedSoc >= 98) + if (getConfigValue("digital_output_device")=="celeron55device"): + self.IsAccuFull = (self.soc_percent >= 98) + else: + #todo: get "full" indication from the BMS + self.IsAccuFull = (self.simulatedSoc >= 98) return self.IsAccuFull def getSoc(self): + if self.callbackShowStatus: + self.callbackShowStatus(format(self.soc_percent,".1f"), "soc") + if (getConfigValue("digital_output_device")=="celeron55device"): + return self.soc_percent #todo: get SOC from the BMS self.callbackShowStatus(format(self.simulatedSoc,".1f"), "soc") return self.simulatedSoc @@ -142,14 +199,32 @@ class hardwareInterface(): def __init__(self, callbackAddToTrace=None, callbackShowStatus=None): self.callbackAddToTrace = callbackAddToTrace self.callbackShowStatus = callbackShowStatus + self.loopcounter = 0 self.outvalue = 0 self.simulatedSoc = 20.0 # percent + self.inletVoltage = 0.0 # volts + self.accuVoltage = 0.0 + self.lock_confirmed = False # Confirmation from hardware + self.cp_pwm = 0.0 + self.soc_percent = 0.0 + self.accuMaxCurrent = 0.0 + self.contactor_confirmed = False # Confirmation from hardware + self.plugged_in = None # None means "not known yet" + + self.logged_inlet_voltage = None + self.logged_dc_link_voltage = None + self.logged_cp_pwm = None + self.logged_max_charge_a = None + self.logged_soc_percent = None + self.logged_contactor_confirmed = None + self.logged_plugged_in = None + self.rxbuffer = "" + self.findSerialPort() self.initPorts() - def resetSimulation(self): self.simulatedInletVoltage = 0.0 # volts @@ -162,8 +237,8 @@ class hardwareInterface(): def close(self): if (self.isSerialInterfaceOk): self.ser.close() - - def evaluateReceivedData(self, s): + + def evaluateReceivedData_dieter(self, s): self.rxbuffer += s x=self.rxbuffer.find("A0=") if (x>=0): @@ -179,11 +254,76 @@ class hardwareInterface(): #self.addToTrace("RX data ok " + s) self.rxbuffer = self.rxbuffer[x+3:] # consume the receive buffer entry + def evaluateReceivedData_celeron55device(self, s): + self.rxbuffer += s + while True: + x = self.rxbuffer.find("\n") + if x < 0: + break + line = self.rxbuffer[0:x].strip() + self.rxbuffer = self.rxbuffer[x+1:] + #self.addToTrace("Received line: \""+line+"\"") + if line.startswith("inlet_v="): + self.inletVoltage = int(line[8:]) + if self.logged_inlet_voltage != self.inletVoltage: + self.logged_inlet_voltage = self.inletVoltage + self.addToTrace("<< inlet_voltage="+str(self.inletVoltage)) + if self.callbackShowStatus: + self.callbackShowStatus(format(self.inletVoltage,".1f"), "uInlet") + elif line.startswith("dc_link_v="): + self.accuVoltage = int(line[10:]) + if self.logged_dc_link_voltage != self.accuVoltage: + self.logged_dc_link_voltage = self.accuVoltage + self.addToTrace("<< dc_link_voltage="+str(self.accuVoltage)) + elif line.startswith("cp_pwm="): + self.cp_pwm = int(line[7:]) + if self.logged_cp_pwm != self.cp_pwm: + self.logged_cp_pwm = self.cp_pwm + self.addToTrace("<< cp_pwm="+str(self.cp_pwm)) + elif line.startswith("cp_output_state="): + state = int(line[len("cp_output_state="):]) + if bool(state) == ((self.outvalue & 1)!=0): + self.addToTrace("<< CP state confirmed") + else: + self.addToTrace("<< CP state MISMATCH") + elif line.startswith("ccs_contactor_wanted_closed="): + state = int(line[len("ccs_contactor_wanted_closed="):]) + if bool(state) == ((self.outvalue & 2)!=0): + self.addToTrace("<< Contactor request confirmed") + else: + self.addToTrace("<< Contactor request MISMATCH") + elif line.startswith("max_charge_a="): + self.accuMaxCurrent = int(line[13:]) + if self.logged_max_charge_a != self.accuMaxCurrent: + self.logged_max_charge_a = self.accuMaxCurrent + self.addToTrace("<< max_charge_a="+str(self.accuMaxCurrent)) + elif line.startswith("soc_percent="): + self.soc_percent = int(line[12:]) + if self.logged_soc_percent != self.soc_percent: + self.logged_soc_percent = self.soc_percent + self.addToTrace("<< soc_percent="+str(self.soc_percent)) + elif line.startswith("contactor_confirmed="): + self.contactor_confirmed = bool(int(line[20:])) + if self.logged_contactor_confirmed != self.contactor_confirmed: + self.logged_contactor_confirmed = self.contactor_confirmed + self.addToTrace("<< contactor_confirmed="+str(self.contactor_confirmed)) + elif line.startswith("plugged_in="): + self.plugged_in = bool(int(line[11:])) + if self.logged_plugged_in != self.plugged_in: + self.logged_plugged_in = self.plugged_in + self.addToTrace("<< plugged_in="+str(self.plugged_in)) + else: + self.addToTrace("Received unknown line: \""+line+"\"") + def showOnDisplay(self, s1, s2, s3): # show the given string s on the display which is connected to the serial port if (getConfigValueBool("display_via_serial") and self.isSerialInterfaceOk): - s = "lc" + s1 + "\n" + "lc" + s2 + "\n" + "lc" + s3 + "\n" - self.ser.write(bytes(s, "utf-8")) + if (getConfigValue("digital_output_device")=="celeron55device"): + s = "disp0=" + s1 + "\n" + "disp1=" + s2 + "\n" + "disp2=" + s3 + "\n" + self.ser.write(bytes(s, "utf-8")) + else: + s = "lc" + s1 + "\n" + "lc" + s2 + "\n" + "lc" + s3 + "\n" + self.ser.write(bytes(s, "utf-8")) def mainfunction(self): if (getConfigValueBool("soc_simulation")): @@ -192,7 +332,20 @@ class hardwareInterface(): # while the relay is closed, simulate increasing SOC self.simulatedSoc = self.simulatedSoc + 0.01 - + if (getConfigValue("digital_output_device")=="dieter"): + self.mainfunction_dieter() + + if (getConfigValue("digital_output_device")=="celeron55device"): + self.mainfunction_celeron55device() + + if getConfigValueBool("exit_on_session_end"): + # TODO: This is a hack. Do this in fsmPev instead and publish some + # of these values into there if needed. + if (self.plugged_in is not None and self.plugged_in == False and + self.inletVoltage < 50): + sys.exit(0) + + def mainfunction_dieter(self): self.loopcounter+=1 if (self.isSerialInterfaceOk): if (self.loopcounter>15): @@ -207,7 +360,18 @@ class hardwareInterface(): except: s = "" # for the case we received corrupted data (not convertable as utf-8) self.addToTrace(str(len(s)) + " bytes received: " + s) - self.evaluateReceivedData(s) + self.evaluateReceivedData_dieter(s) + + def mainfunction_celeron55device(self): + if (self.isSerialInterfaceOk): + s = self.ser.read(100) + if (len(s)>0): + try: + s = str(s, 'utf-8') + except: + s = "" # for the case we received corrupted data (not convertable as utf-8) + #self.addToTrace(str(len(s)) + " bytes received: " + s) + self.evaluateReceivedData_celeron55device(s) def myPrintfunction(s): print("myprint " + s)