diff --git a/pypcc/i2cdirect.py b/pypcc/i2cdirect.py new file mode 100644 index 0000000000000000000000000000000000000000..df7f7dfddef279f0fe2976154f637e51f9ea37f0 --- /dev/null +++ b/pypcc/i2cdirect.py @@ -0,0 +1,99 @@ +name='RECVTR_LB_TEST' #YAML config file with all register values etc + +import logging +#import argparse +from pypcc.opcuaserv import opcuaserv +from pypcc.opcuaserv import i2client +from pypcc.opcuaserv import yamlreader +#from opcuaserv import pypcc2 +from pypcc.i2cserv import i2cthread +#import threading +import time +import sys +import signal +from pypcc.yamlconfig import Find; +import pypcc.yamlconfig as yc +from datetime import datetime +from pypcc.opcuaserv.yamlreader import byte2var,var2byte + +class i2cdirect(): + def __init__(self,name): + self.conf=yc.yamlconfig(name) + self.conf.linkdevices() + self.conf.loaddrivers() + self.conf.linkdrivers() + self.name=self.conf.conf['name'] + self.runmethod(self.name+"_Init") + + def GetVal(self,name): + varid=self.conf.getvarid(name); + if varid is None: + logging.error("Variable "+name+" not found") + return None,None + var1=self.conf.getvars()[varid] + drv=var1.get('drivercls'); + data=drv.OPCUAReadVariable(varid,var1,[]) + data=data[0].data + return byte2var(var1,data),var1 + + + def SetVal(self,name,data): + varid=self.conf.getvarid(name); + if varid is None: + logging.error("Variable "+name+" not found") + return None + var1=self.conf.getvars()[varid] + drv=var1.get('drivercls'); + data2,mask=var2byte(var1,data) + return drv.OPCUASetVariable(varid,var1,data2,[]) + + def SetRegister(self,regname,value): + methodid=self.conf.getmethodid(self.name+"_Init"); + var1=self.conf.getmethod(methodid) + drv=var1.get('drivercls'); + v1=self.conf.getdevreg(regname) + drv2=v1.get('drivercls') + mask=[] + if drv: drv.Setdevreg(v1,value,mask) + elif drv2: drv2.Setdevreg(v1,value,mask) + else: logging.warn("Driver not specified for instruction"+key) + + def runmethod(self,methodname): + logging.info("Run method:"+methodname) + methodid=self.conf.getmethodid("methodname"); + var1=self.conf.getmethod(methodid) + drv=var1.get('drivercls'); + for inst in var1['instructions']: + for key,value in inst.items(): + if not(isinstance(value,list)): value=[value] + logging.info(str(("Run instruction",key,value))); + if (key=='WAIT'): + time.sleep(value[0]/1000.) + continue; + v1=self.conf.getvarid(key) + if not(v1 is None): + if value[0]=='Update': + self.GetVal(key) + else: + self.SetVal(key,value) + continue; + v1=self.conf.getmethodid(key) + if v1: + self.runmethod(key) + continue; + v1=self.conf.getdevreg(key) + if v1: + mask=[] + drv2=v1.get('drivercls') + if value[0]=='Update': + if drv: drv.Getdevreg(v1,mask) + elif drv2: drv2.Getdevreg(v1,mask) + else: logging.warn("Driver not specified for instruction"+key) + else: + if drv: drv.Setdevreg(v1,value,mask) + elif drv2: drv2.Setdevreg(v1,value,mask) + else: logging.warn("Driver not specified for instruction"+key) + + continue; + logging.warn("Unknown instruction "+key) + return \ No newline at end of file diff --git a/pypcc/yamlconfig.py b/pypcc/yamlconfig.py index 9323a2ddba9c127b1c607cbabf11957e658df81c..fad2c8e470dc356c3aac2e35a52d0ffa60db0475 100644 --- a/pypcc/yamlconfig.py +++ b/pypcc/yamlconfig.py @@ -29,9 +29,12 @@ def str2int(x): class yamlconfig(): def __init__(self,yamlfile='RCU'): - pkg = importlib_resources.files("pypcc") - pkg_data_file = pkg / "config" / (yamlfile+'.yaml') - self.conf=yaml.load(pkg_data_file.open(), Loader=yaml.FullLoader) + if yamlfile.split('.')[-1]=='yaml': + self.conf=yaml.load(open(yamlfile), Loader=yaml.FullLoader) + else: + pkg = importlib_resources.files("pypcc") + pkg_data_file = pkg / "config" / (yamlfile+'.yaml') + self.conf=yaml.load(pkg_data_file.open(), Loader=yaml.FullLoader) self.expand_variables() # print([[v['name'],v.get('devreg')] for v in var1]) # print(len(self.conf['variables']),N) diff --git a/python_scripts/RCU_test1.py b/python_scripts/RCU_test1.py index e467a08df1bf5770ebf80134ae2e021e002cd3a9..5ba14022d39e8eee6b02a79bee75001b0a980ec1 100644 --- a/python_scripts/RCU_test1.py +++ b/python_scripts/RCU_test1.py @@ -1,11 +1,11 @@ -from i2cdirect import i2cdirect +from pypcc.i2cdirect import i2cdirect from time import sleep import logging logging.basicConfig(level="ERROR",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s') #logging.basicConfig(level="DEBUG",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s') -d1=i2cdirect('RECVTR_LB_TEST') +d1=i2cdirect('RECVTR_LB_TEST.yaml') def RCU_off(d1): logging.warning("Switch RCU Power off"); diff --git a/python_scripts/i2cdirect.py b/python_scripts/i2cdirect.py deleted file mode 100644 index 01bc7a41123ffc1df2f581c754f1301c4f97a11b..0000000000000000000000000000000000000000 --- a/python_scripts/i2cdirect.py +++ /dev/null @@ -1,57 +0,0 @@ -name='RECVTR_LB_TEST' #YAML config file with all register values etc - -import logging -#import argparse -from pypcc.opcuaserv import opcuaserv -from pypcc.opcuaserv import i2client -from pypcc.opcuaserv import yamlreader -#from opcuaserv import pypcc2 -from pypcc.i2cserv import i2cthread -#import threading -import time -import sys -import signal -from pypcc.yamlconfig import Find; -import pypcc.yamlconfig as yc -from datetime import datetime -from pypcc.opcuaserv.yamlreader import byte2var,var2byte - -class i2cdirect(): - def __init__(self,name): - self.conf=yc.yamlconfig(name) - self.conf.linkdevices() - self.conf.loaddrivers() - self.conf.linkdrivers() - - def GetVal(self,name,N=1): - varid=self.conf.getvarid(name); - if varid is None: - logging.error("Variable "+name+" not found") - return None,None - var1=self.conf.getvars()[varid] - drv=var1.get('drivercls'); - data=drv.OPCUAReadVariable(varid,var1,[]) - data=data[0].data - return byte2var(var1,data),var1 - - def SetVal(self,name,data): - varid=self.conf.getvarid(name); - if varid is None: - logging.error("Variable "+name+" not found") - return None - var1=self.conf.getvars()[varid] - drv=var1.get('drivercls'); - data2,mask=var2byte(var1,data) - return drv.OPCUASetVariable(varid,var1,data2,[]) - - def SetRegister(self,regname,value): - methodid=self.conf.getmethodid("RECVTR_Init"); - var1=self.conf.getmethod(methodid) - drv=var1.get('drivercls'); - v1=self.conf.getdevreg(regname) - drv2=v1.get('drivercls') - mask=[] - if drv: drv.Setdevreg(v1,value,mask) - elif drv2: drv2.Setdevreg(v1,value,mask) - else: logging.warn("Driver not specified for instruction"+key) -