-
Paulus Kruger authoredPaulus Kruger authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
i2cthread.py 5.36 KiB
#Start the correct I2C server connect to the pipes on a new process
import logging
from multiprocessing import Process
from queuetypes import *
from time import sleep
#from i2cserv import I2Cswitch1
#from i2cserv import I2C_dummy as I2C
#from i2cserv import RCU
#from i2cserv import CLK
import yamlconfig as yc
import yaml
class AttrDict(object):
def __init__(self, dct):
self.dict = dct
def __repr__(self):
return repr(self.dict)
def __getattr__(self, attr):
try:
val = self.dict[attr]
if isinstance(val, dict):
val = AttrDict(val)
return val
except KeyError:
raise AttributeError
def runmethod(conf,Qout,methodid,mask):
# print("Run method",methodid,mask)
var1=conf.getmethod(methodid)
drv=var1.get('drivercls');
for inst in var1['instructions']:
for key,value in inst.items():
if not(isinstance(value,list)): value=[value]
logging.info(str(("Run instruction",key,value,mask)));
if mask is None: mask=[]
if (key=='WAIT'):
sleep(value[0]/1000.)
continue;
v1=conf.getvarid(key)
if not(v1 is None):
if value[0]=='Update':
#mask=
getvar(conf,Qout,v1,mask)
else:
#mask=
setvar(conf,Qout,v1,value,mask)
continue;
v1=conf.getmethodid(key)
if v1:
#mask=
runmethod(conf,Qout,v1,mask)
continue;
v1=conf.getdevreg(key)
if v1:
drv2=v1.get('drivercls')
if value[0]=='Update':
if drv: drv.Getdevreg(v1,mask)
elif drv2: drv2.Getdevreg(v1,mask)
else: logging.warn("Driver not specified for instruction"+key)
else:
if drv: drv.Setdevreg(v1,value,mask)
elif drv2: drv2.Setdevreg(v1,value,mask)
else: logging.warn("Driver not specified for instruction"+key)
continue;
# def Setdevreg(self,devreg,value,mask=[],width=8,bitoffset=0):
logging.warn("Unknown instruction "+key)
# drv=var1.get('drivercls');
# if not(drv):
# logging.warn(var1['name']+" driver not found!")
# continue;
# for data in drv.OPCUAcallMethod(item.id,var1,item.data,item.mask):
# Qout.put(data)
return mask
def setvar(conf,Qout,varid,data,mask):
var1=conf.getvars()[varid]
drv=var1.get('drivercls');
if not(drv):
logging.warn(var1['name']+" driver not found!")
return;
for data2 in drv.OPCUASetVariable(varid,var1,data,mask):
if len(mask)==len(data2.mask): mask[:]=data2.mask[:];
elif len(mask)==0: mask=data2.mask.copy();
# print(data2.data[:64]);
Qout.put(data2)
return mask;
def getvar(conf,Qout,varid,mask):
var1=conf.getvars()[varid]
drv=var1.get('drivercls');
if not(drv):
logging.warn(var1['name']+" driver not found!")
return;
for data in drv.OPCUAReadVariable(varid,var1,mask):
if len(mask)==len(data.mask): mask[:]=data.mask[:];
elif len(mask)==0: mask=data.mask.copy();
Qout.put(data)
return mask;
def I2Cserver(Qin,Qout,name):
logging.info("New i2c process "+name)
conf=yc.yamlconfig(name)
conf.linkdevices()
conf.loaddrivers()
for c in conf.conf['drivers']:
c['Qout']=Qout;
conf.linkdrivers()
statusid=conf.getvarid(name+"_translator_busy");
while True:
item = Qin.get()
if item is None:
for c in conf.conf['drivers']:
c['Qout']=None;
break;
# print("TODO: Set busy")
# self.statevar.set_value("busy");
#print("SetVar",item)#,self.conf.variables[item.id])
if (item.type==InstType.varSet):
setvar(conf,Qout,item.id,item.data,item.mask)
# self.OPCUASetVariable(item.id,var1,item.data,item.mask)
elif (item.type==InstType.varRead):
getvar(conf,Qout,item.id,item.mask)
# var1=self.conf.variables[item.id]
# self.OPCUAReadVariable(item.id,var1,item.data,item.mask)
elif (item.type==InstType.method): runmethod(conf,Qout,item.id,item.mask)
else: print("OPCUA call not implemented!");
# print("TODO: Set ready")
if (Qin.qsize()==0) and not(statusid is None): Qout.put(OPCUAset(statusid,InstType.varSet,[0],[]))
logging.info("End i2c process "+name)
# if name=='RCU':
# SW1=I2Cswitch1.I2Cswitch1(I2C.I2C1server)
# RCU1=RCU.RCU1(32,I2C.I2C1server,SW1.SetChannel,settings,Qin,Qout)
# elif name=='CLK':
# SW0=I2Cswitch1.I2Cswitch0(None) #Dummy switch as their is no switch on LTS
# RCU1=CLK.RCU1(1,I2C.I2C4server,SW0.SetChannel)
# else:
# logging.warn(str(("Unknown name",name," thread stopping")))
# return
# RCU1.load() #Load current register values from HW
# RCU1.RCUthread(Qin)
def start(Qout,Qin,name):
thread1 = Process(target=I2Cserver, args=(Qin,Qout,name))
thread1.start()
return thread1