Select Git revision
yamlconfig.py
Paulus Kruger authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
yamlconfig.py 8.35 KiB
import yaml
import struct
import time
import logging
def Find(L,name,value):
for x in L:
if x[name]==value:
return x;
return False;
def GetField(D,name,dev_number,default=None):
X=D.get(name,default)
return X[dev_number] if isinstance(X,list) else X;
#class AttrDict(dict):
# def __init__(self,*args,**kwargs):
# super(AttrDict,self).__init__(*args,**kwargs)
# self.__dict__=self
def str2int(x):
return (int(x,16) if x.find('x')>0 else int(x));
class yamlconfig():
def __init__(self,yamlfile='RCU'):
self.conf=yaml.load(open("config/"+yamlfile+'.yaml'))
self.expand_variables()
# print([[v['name'],v.get('devreg')] for v in var1])
# print(len(self.conf['variables']),N)
for i,v in enumerate(self.conf['variables']):
v['id']=i
for i,v in enumerate(self.conf['methods']):
v['id']=i
def getvars(self):
return self.conf['variables'];
def getvar1(self,varid):
return self.conf['variables'][varid];
def getvarid(self,name):
var1=Find(self.conf['variables'],'name',name);
return (var1['id'] if var1 else var1);
def getmethod(self,methodid):
return self.conf['methods'][methodid];
# return Find(self.conf['methods'],'id',methodid);
def getmethodid(self,name):
var1=Find(self.conf['methods'],'name',name);
return (var1['id'] if var1 else var1);
def expand_variables(self):
#expand variables that are combined in single array
var1=self.conf['variables']
N=len(var1)
# print([v['name'] for v in self.conf['variables']])
for i in range(N-1,-1,-1):
#print(var1[i])
if isinstance(var1[i]['name'],list):
for x,name in enumerate(var1[i]['name']):
var2=var1[i].copy()
var2['name']=name
var2['devreg']=GetField(var1[i],'devreg',x)
var2['scale']=GetField(var1[i],'scale',x,1)
var1.append(var2)
N+=1;
var1.remove(var1[i])
N-=1;
def getdevreg(self,name):
# try:
dr=self.devreglist.get(name);
if dr: return dr;
if isinstance(name,int): return {"addr":name}
name2=name.split('.')
# print(name2,str2int(name2[0]),str2int(name2[1]))
logging.debug("New devreg! %s",str((name2)))
if len(name2)==2:
dev1=Find(self.conf['device_registers'],'name',name2[0])
if dev1:
store=dev1.get('store',False)
devcls=dev1.get('driver');
if not(devcls): logging.error("Can not find driver for register "+name2[0]);
else:
devcls=Find(self.conf['drivers'],'name',devcls)
if not(devcls): logging.error("Can not find device for register "+name2[0]);
else:
devcls=devcls.get('obj')
# print(dev1)
return {"addr":dev1.get('address',0),"register_R":str2int(name2[1]),"register_W":str2int(name2[1]),"store":store,"drivercls":devcls};
else: return {"addr":str2int(name2[0]),"register_R":str2int(name2[1]),"register_W":str2int(name2[1])}
logging.error("Can not find device register "+str(name));
# except:
# logging.error("Can not find device register "+str(name));
# return None;
def linkdevices(self):
#replace devreg reference with (pointer to) devreg dict
devreglist={}
drivers=self.conf['drivers']
for dev in self.conf['device_registers']:
N=dev.get('dim',1)
name=dev['name']
for n in range(N):
addr=GetField(dev,'address',n,0)
devtype=GetField(dev,'driver',n)
# print(addr)
if devtype:
devid=Find(drivers,'name',devtype)
#devtype=Find(D.drivers,'name',devtype)['type']
else: devtype=None;
if N>1: name2=name+str(n+1)
else: name2=name;
for reg in dev['registers']:
regR=GetField(reg,'address',0,0)
regW=GetField(reg,'address',1,0)
store=reg.get('store',False)
devregname=name2+'.'+reg['name'];
devreglist[devregname]={"addr":addr,"register_R":regR,"register_W":regW,"store":store,"driver":devid};
# print(devregname,devreglist[devregname]['addr'],devreglist[devregname]['register_R'],devreglist[devregname]['register_W']);
self.devreglist=devreglist;
for D in drivers:
devreg=D.get('devreg');
if not(devreg): continue;
for i,dr in enumerate(devreg):
#print(dr,self.getdevreg(dr))
devreg[i]=self.getdevreg(dr)
for D in self.conf['variables']:
devreg=D.get('devreg');
if not(devreg): continue;
if isinstance(devreg,list):
for i,dr in enumerate(devreg):
#print("linkdev",dr,self.getdevreg(dr))
devreg[i]=self.getdevreg(dr)
else:
D['devreg']=[self.getdevreg(devreg)]
#print(D['name'],devreg)
# hf.write("inline const t_devreg %s {.address=%i,.register_R=%i,.register_W=%i,.store=%i,.driver=%i};\n" % (devregname,addr,regR,regW,storex,devtype) )
#hf.write("#define NumberStoreReg %i"%store)
# print(devreglist)
# return devreglist,store
def loaddrivers(self):
import importlib
import sys, inspect
for c in self.conf['drivers']:
# print("loading",c['name'],c['type']);
# try:
i = importlib.import_module("i2cserv."+c['type'])
# except:
# logging.warn("No driver for "+c['type'])
# c['obj']=None;
# continue;
for name, obj in inspect.getmembers(i,inspect.isclass):
if name==c['type']:
# print(" Load child:",name)
c['obj']=obj(c)
if not(c.get('obj')):
logging.warn((("Driver "+c['type']+" not found!")))
def linkdrivers(self):
drivers=self.conf['drivers']
for c in drivers:
name=c.get('parent');
if not(name): continue;
p=Find(drivers,'name',name);
if not(p):
logging.error("Can not find driver "+str(name))
continue;
c['parentcls']=p['obj'];
# print("link ",c['name'],name," to ",p['obj'])
#devregs=c.get('devreg')
#if not(devregs): continue;
#for dr in devregs:
# if not(dr): continue;
# name=dr.get('driver')
# if not(name): #Give it the parents driver
# dr["drivercls"]=p['obj']
# drv=dr.get("drivercls")
# if drv is None: logging.warn("variable "+c['name']+" register driver not found")
for name,c in self.devreglist.items():
drv=c['driver'];
if not(drv): continue;
# print(drv)
c['drivercls']=drv.get('obj');
# print("link ",name,drv['name']," to ",c.driverobj)
for c in self.conf['variables']:
name=c.get('driver');
if not(name): continue;
p=Find(drivers,'name',name);
if not(p):
logging.error("Can not find driver "+str(name))
continue;
c['drivercls']=p['obj'];
# print("link ",c['name'],name," to ",p['obj'])
devregs=c.get('devreg')
if not(devregs): continue;
for dr in devregs:
if not(dr): continue;
name=dr.get('driver')
if not(name): #Give it the parents driver
dr["drivercls"]=p['obj']
drv=dr.get("drivercls")
if drv is None: logging.warn("variable "+c['name']+" register driver not found")
for c in self.conf['methods']:
name=c.get('driver');
if not(name): continue;
p=Find(drivers,'name',name);
if not(p):
logging.error("Can not find driver "+str(name))
continue;
c['drivercls']=p['obj'];