From 658e022e2e13b1dbd80a1a9fba08183a9d3a8bd2 Mon Sep 17 00:00:00 2001 From: uhi22 Date: Tue, 6 Dec 2022 18:35:17 +0100 Subject: [PATCH] hardwareInterface integrated into Pev --- fsmPev.py | 8 ++-- hardwareInterface.py | 87 +++++++++++++++++++++++++++++++++++--------- pyPlcWorker.py | 9 +++-- 3 files changed, 80 insertions(+), 24 deletions(-) diff --git a/fsmPev.py b/fsmPev.py index 0f39ab6..73b8041 100644 --- a/fsmPev.py +++ b/fsmPev.py @@ -204,8 +204,8 @@ class fsmPev(): # (B) The charger finished to tell the charge parameters. if (strConverterResult.find('"EVSEProcessing": "Finished"')>0): self.addToTrace("It is Finished. Will change to state C and send CableCheckReq.") - # todo: pull the CP line to state C here. - # self.hardwareInterface.changeToStateC() + # pull the CP line to state C here: + self.hardwareInterface.setStateC() msg = addV2GTPHeader(exiEncode("EDF_"+self.sessionId)) # EDF for Encode, Din, CableCheck self.addToTrace("responding " + prettyHexMessage(msg)) self.Tcp.transmit(msg) @@ -320,15 +320,17 @@ class fsmPev(): def reInit(self): self.addToTrace("re-initializing fsmPev") self.Tcp.disconnect() + self.hardwareInterface.setStateB() self.state = stateConnecting self.cyclesInState = 0 self.rxData = [] - def __init__(self, addressManager, callbackAddToTrace): + def __init__(self, addressManager, callbackAddToTrace, hardwareInterface): self.callbackAddToTrace = callbackAddToTrace self.addToTrace("initializing fsmPev") self.Tcp = pyPlcTcpSocket.pyPlcTcpClientSocket(self.callbackAddToTrace) self.addressManager = addressManager + self.hardwareInterface = hardwareInterface self.state = stateNotYetInitialized self.sessionId = "DEAD55AADEAD55AA" self.cyclesInState = 0 diff --git a/hardwareInterface.py b/hardwareInterface.py index 2abddec..6bce233 100644 --- a/hardwareInterface.py +++ b/hardwareInterface.py @@ -10,24 +10,75 @@ import serial # the pyserial from serial.tools.list_ports import comports from time import sleep +class hardwareInterface(): + def findSerialPort(self): + ports = [] + self.addToTrace('Available serial ports:') + for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): + self.addToTrace('{:2}: {:20} {!r}'.format(n, port, desc)) + ports.append(port) + if (len(ports)<1): + self.addToTrace("ERROR: No serial ports found. No hardware interaction possible.") + self.ser = None + self.isInterfaceOk = False + else: + self.addToTrace("ok, we take the first port, " + ports[0]) + try: + self.ser = serial.Serial(ports[0], 19200, timeout=0) + self.isInterfaceOk = True + except: + self.addToTrace("ERROR: Could not open serial port.") + self.ser = None + self.isInterfaceOk = False + + def addToTrace(self, s): + self.callbackAddToTrace("[HARDWAREINTERFACE] " + s) + + def setStateB(self): + self.addToTrace("Setting CP line into state B.") + self.outvalue = 0 + + def setStateC(self): + self.addToTrace("Setting CP line into state C.") + self.outvalue = 1 + + def __init__(self, callbackAddToTrace=None): + self.callbackAddToTrace = callbackAddToTrace + self.loopcounter = 0 + self.outvalue = 0 + self.findSerialPort() + + def close(self): + if (self.isInterfaceOk): + self.ser.close() + + def mainfunction(self): + self.loopcounter+=1 + if (self.isInterfaceOk): + if (self.loopcounter>15): + self.loopcounter=0 + # self.ser.write(b'hello world\n') + self.ser.write(bytes("out"+str(self.outvalue)+"\n", "utf-8")) + s = self.ser.read(100) + if (len(s)>0): + self.addToTrace(str(len(s)) + " bytes received: " + str(s, 'utf-8').strip()) + +def myPrintfunction(s): + print("myprint " + s) + if __name__ == "__main__": - nFail=0 print("Testing hardwareInterface...") - print('Available ports:') - ports = [] - for n, (port, desc, hwid) in enumerate(sorted(comports()), 1): - print('{:2}: {:20} {!r}'.format(n, port, desc)) - ports.append(port) - if (len(ports)<1): - print("no ports, we cannot test anything.") - exit() - print("ok, we take the first port, " + ports[0]) - ser = serial.Serial(ports[0], 19200, timeout=0) - for i in range(0, 5): - ser.write(b'hello world\n') - sleep(0.5) - s = ser.read(100) - if (len(s)>0): - print(str(len(s)) + " bytes received: " + str(s, 'utf-8')) - ser.close() + hw = hardwareInterface(myPrintfunction) + for i in range(0, 300): + hw.mainfunction() + if (i==100): + hw.setStateC() + if (i==200): + hw.setStateB() + if (i==250): + hw.setStateC() + if (i==280): + hw.setStateB() + sleep(0.03) + hw.close() print("finished.") \ No newline at end of file diff --git a/pyPlcWorker.py b/pyPlcWorker.py index e20dc11..601be26 100644 --- a/pyPlcWorker.py +++ b/pyPlcWorker.py @@ -13,6 +13,7 @@ from pyPlcModes import * import addressManager import time import subprocess +import hardwareInterface class pyPlcWorker(): @@ -28,6 +29,7 @@ class pyPlcWorker(): self.oldAvlnStatus = 0 self.isSimulationMode = isSimulationMode self.hp = pyPlcHomeplug.pyPlcHomeplug(self.workerAddToTrace, self.callbackShowStatus, self.mode, self.addressManager, self.callbackReadyForTcp, self.isSimulationMode) + self.hardwareInterface = hardwareInterface.hardwareInterface(self.workerAddToTrace) self.hp.printToUdp("pyPlcWorker init") # Find out the version number, using git. # see https://stackoverflow.com/questions/14989858/get-the-current-git-hash-in-a-python-script @@ -39,7 +41,7 @@ class pyPlcWorker(): if (self.mode == C_EVSE_MODE): self.evse = fsmEvse.fsmEvse(self.workerAddToTrace) if (self.mode == C_PEV_MODE): - self.pev = fsmPev.fsmPev(self.addressManager, self.workerAddToTrace) + self.pev = fsmPev.fsmPev(self.addressManager, self.workerAddToTrace, self.hardwareInterface) def workerAddToTrace(self, s): # The central logging function. All logging messages from the different parts of the project @@ -69,7 +71,8 @@ class pyPlcWorker(): def mainfunction(self): self.nMainFunctionCalls+=1 #self.showStatus("pyPlcWorker loop " + str(self.nMainFunctionCalls)) - self.hp.mainfunction() # call the lower-level worker + self.hp.mainfunction() # call the lower-level workers + self.hardwareInterface.mainfunction() if (self.mode == C_EVSE_MODE): self.evse.mainfunction() # call the evse state machine if (self.mode == C_PEV_MODE): @@ -86,7 +89,7 @@ class pyPlcWorker(): self.hp.enterPevMode() if (not hasattr(self, 'pev')): print("creating pev") - self.pev = fsmPev.fsmPev(self.addressManager, self.workerAddToTrace) + self.pev = fsmPev.fsmPev(self.addressManager, self.workerAddToTrace, self.hardwareInterface) self.pev.reInit() if (strAction == "E"): print("switching to EVSE mode")