-
Paulus Kruger authoredPaulus Kruger authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
i2c_dev.py 11.40 KiB
import numpy as np
import logging
from .spibitbang1 import spibitbang1
from queuetypes import *
from .hwdev import hwdev
def ApplyMask(value,width=8,bitoffset=0,previous=0):
mask=(1<<width)-1
if bitoffset>0:
value<<=bitoffset;
mask<<=bitoffset;
return (value & mask) + (previous - (previous & mask));
def UnMask(value,width=8,bitoffset=0):
mask=(1<<width)-1
if bitoffset>0:
value>>=bitoffset;
value=value&mask;
return value;
def int2bytes(i):
b=[];
while i>255:
b=[i%256]+b;
i>>=8;
return [i]+b;
def GetField(D,name,dev_number,default=None):
X=D.get(name,default)
return X[dev_number] if isinstance(X,list) else X;
def Find(L,name,value):
for x in L:
if x[name]==value:
return x;
return False;
class AttrDict(dict):
def __init__(self,*args,**kwargs):
super(AttrDict,self).__init__(*args,**kwargs)
self.__dict__=self
def DevRegList(D):
#todo only count the drivers registers!!
for i,dev in enumerate(D.drivers):
dev['drv_id']=i;
devreglist={}
store=0;
for dev in D.device_registers:
N=dev.get('dim',1)
name=dev['name']
for n in range(N):
addr=GetField(dev,'address',n,0)
devtype=GetField(dev,'driver',n)
# print(addr,devtype)
devid=0;
if devtype:
devid=Find(D.drivers,'name',devtype)['drv_id']
devtype=Find(D.drivers,'name',devtype)['type']
else: devtype=0;
if N>1: name2=name+str(n+1)
else: name2=name;
for reg in dev['registers']:
regR=GetField(reg,'address',0,0)
regW=GetField(reg,'address',1,0)
if reg.get('store',False):
store+=1;
storex=store
else:
storex=0
# hf.write("const devreg %s {%i,%i,%i,%i,%i};\n" % (name2+'_'+reg['name'],addr,regR,regW,storex,devtype) )
devregname=name2+'.'+reg['name'];
devreglist[devregname]=AttrDict({"Addr":addr,"Register_R":regR,"Register_W":regW,"store":storex,"devtype":devtype,"devid":devid});
# print(devregname,devreglist[devregname]);
# hf.write("inline const t_devreg %s {.address=%i,.register_R=%i,.register_W=%i,.store=%i,.driver=%i};\n" % (devregname,addr,regR,regW,storex,devtype) )
#hf.write("#define NumberStoreReg %i"%store)
# print(devreglist)
return devreglist,store
def list_different(A,B):
if not(len(A)==len(B)): return True;
for i,m in enumerate(B):
if not(m==A[i]): return True
return False
def GetSteps(V1):
if isinstance(V1['devreg'],list): Step=len(V1['devreg'])
else: Step=1; #V1.nVars
Step2=V1.get('dim',1)*((V1.get('width',8)+7)//8)//Step #int(V1.size/V1.nVars)
logging.debug(str(("GetStep",Step,Step2)));
return Step,Step2
class i2c_dev(hwdev):
def __init__(self,config):
hwdev.__init__(self,config);
self.I2Cmask=[0]
self.I2Cmaskid=config.get('maskid',None)
if self.I2Cmaskid is None: logging.warn(config['name']+" I2C mask not found!")
self.I2Ccut=3;
def OPCUASetVariable(self,varid,var1,data,mask):
logging.info(str(("Set Var",var1['name'],data[:32],mask)))
if varid==self.I2Cmaskid:
logging.info("Set I2Cmask");
if (len(data)==1) and (len(mask)>1): data=[data[0]]*len(mask);
if len(mask)==0: mask=[True]*len(data);
if len(data)!=len(mask):
logging.warn("Trying to set I2C OK with wrong mask length! %i-%i"%(len(data),len(mask)))
return []
if len(self.I2Cmask)!=len(mask):
logging.warn("Trying to set I2C OK with wrong mask length! %i-%i"%(len(data),len(mask)))
return []
for x,m in enumerate(mask):
if m: self.I2Cmask[x]=data[x];
return [OPCUAset(self.I2Cmaskid,InstType.varSet,self.I2Cmask.copy(),[])]
if (var1['rw']=='variable') or not(var1.get('devreg')):
return [OPCUAset(varid,InstType.varSet,data,mask)];
oldmask=self.I2Cmask.copy()
data,mask2=self.SetGetVarValueMask(var1,data,mask);
# if len(mask)==len(mask2): mask[:]=mask2[:];
# elif len(mask)==0: (mask.append(x) for x in mask2);
Data=OPCUAset(varid,InstType.varSet,data.copy(),mask2.copy())
if list_different(self.I2Cmask,oldmask):
Data=[Data,OPCUAset(self.I2Cmaskid,InstType.varSet,self.I2Cmask.copy(),[])]
else: Data=[Data]
return Data
def OPCUAReadVariable(self,varid,var1,mask):
if len(mask)==0: mask=[True];
logging.info(str(("Read Var",var1['name'],"Mask=",mask)))
if not(var1.get('devreg')): return []
#data=self.GetVarValueAll(var1)
#else:
oldmask=self.I2Cmask.copy()
data,mask2=self.GetVarValueMask(var1,mask);
logging.info(str(("Read Var",var1['name'],"Data=",data)))
# if len(mask)==len(mask2): mask[:]=mask2[:];
# elif len(mask)==0: (mask.append(x) for x in mask2);
Data=OPCUAset(varid,InstType.varSet,data.copy(),mask2.copy())
if list_different(self.I2Cmask,oldmask):
Data=[Data,OPCUAset(self.I2Cmaskid,InstType.varSet,self.I2Cmask.copy(),[])]
else: Data=[Data]
return Data
# def OPCUAcallMethod(self,var1,data,mask):
# print("Call Method",var1)
def i2csetget(self,*args,**kwargs):
return self.conf['parentcls'].i2csetget(*args,**kwargs)
def SetGetVarValueMask(self,var1,data,mask):
Step,Step2=GetSteps(var1);
value1=[0]*Step*Step2;
# Step=(len(var1['devreg']) if isinstance(var1['devreg'],list) else 1)
# value1=[0]*Step;
if not(len(data)==Step*Step2):
# if not(len(data)==Step):
print("Check data length!");
return;
if (len(mask)==1):
mask=[mask[0] for x in range(Step)]
if (len(mask)==0):
mask=[True for x in range(Step)]
if not(len(mask)==Step):
print("Check mask length!");
return;
for Vari in range(Step):
if not(mask[Vari]): continue
if not(self.I2Cmask[0]<=self.I2Ccut):
mask[Vari]=False;
continue;
i0=(Vari)*Step2
i1=(Vari+1)*Step2
devreg=var1['devreg'][Vari];
width=var1.get('width',8)
bitoffset=GetField(var1,'bitoffset',Vari,0)
res=self.SetVarValue(devreg,width,bitoffset,data[i0:i1])
if not(res):
self.I2Cmask[0]=1;
mask[Vari]=False;
continue;
value2=value1[i0:i1]
res=self.GetVarValue(devreg,width,bitoffset,value2)
if not(res):
self.I2Cmask[0]+=1;
if self.I2Cmask[0]>self.I2Ccut: mask[Vari]=False;
continue;
self.I2Cmask[0]=0;
value1[i0:i1]=value2
return value1,mask
def GetVarValueMask(self,var1,mask):
Step,Step2=GetSteps(var1);
value1=[0]*Step*Step2;
# Step=(len(var1['devreg']) if isinstance(var1['devreg'],list) else 1)
# value1=[0]*Step;
if (len(mask)==1):
mask=[mask[0] for x in range(Step)]
if (len(mask)==0):
mask=[True for x in range(Step)]
if not(len(mask)==Step):
print("Check mask length!");
return;
for Vari in range(Step):
if not(mask[Vari]): continue
if not(self.I2Cmask[0]<=self.I2Ccut):
mask[Vari]=False;
continue;
i0=( Vari)*Step2
i1=(Vari+1)*Step2
devreg=var1['devreg'][Vari];
width=var1.get('width',8)
bitoffset=GetField(var1,'bitoffset',Vari,0)
value2=value1[i0:i1]
res=self.GetVarValue(devreg,width,bitoffset,value2)
if not(res):
self.I2Cmask[0]+=1;
if self.I2Cmask[0]>self.I2Ccut: mask[Vari]=False;
continue;
self.I2Cmask[0]=0;
value1[i0:i1]=value2
return value1,mask
def getstoreval(self,devreg):
storeval=devreg.get('storeval')
if not(storeval):
devreg['storeval']=0;
storeval=devreg.get('storeval');
return storeval;
def Setdevreg(self,devreg,value,mask=[]):
# if devreg.get('store'): logging.debug("Stored")
# print(devreg['store'])
if not(self.I2Cmask[0]==0): return False;
res=self.SetVarValue(devreg,8,0,value)
if not(res):
self.I2Cmask[0]=1;
return False;
if devreg.get('store'):
devreg['storeval']=value[0];
return True;
def Getdevreg(self,devreg,mask=[]):
value=[0];
if not(self.I2Cmask[0]==0): return False;
res=self.GetVarValue(devreg,8,0,value)
if not(res):
self.I2Cmask[0]=1;
return False;
if devreg.get('store'):
devreg['storeval']=value[0];
logging.debug("Stored value:"+str(value[0]))
return True;
def SetVarValue(self,devreg,width,bitoffset,value):
if devreg['register_W']==-1: return True; #We can not set it, only read it e.g. temperature
logging.debug(str(("I2C Set ",devreg['addr'],value)))
#self.parentcls.SetChannel(1<<RCU_Switch[RCUi]);
if devreg.get('store'):
previous=self.getstoreval(devreg);
for x in range(len(value)):
value[x]=ApplyMask(value[x],width,bitoffset,previous);
devreg['storeval']=value[0];
logging.debug("Stored value:"+str(value[0]))
# devreg['drivercls'].i2csetget
return devreg['drivercls'].i2csetget(devreg['addr'],value,reg=devreg['register_W'])
def GetVarValue(self,devreg,width,bitoffset,value,mode=0):
#mode=0: Set register, wait, read
#mode=1: only read
#mode=2: only set register
#mode=3: only wait
logging.debug(str(("i2c_dev Get ",devreg['addr'],value)))
callback=devreg['drivercls'].i2csetget;
reg=devreg['register_R']
if mode==2:
return callback(devreg['addr'],int2bytes(reg),read=2)
elif mode==3:
if devreg.get('wait',0)>0: callback(0,[devreg['wait']],read=3)
return True;
value2=value
if mode==1:
if not(callback(devreg['addr'],value2,read=1)): return False;
else:
if devreg.get('wait',0)>0:
callback(0,[devreg['wait']],read=3)
if not(callback(devreg['addr'],int2bytes(reg),read=2)): return False;
callback(0,[devreg['wait']],read=3)
if not(callback(devreg['addr'],value2,read=1)): return False;
else:
rd=(5 if devreg.get('crc') else 1)
if not(callback(devreg['addr'],value2,reg=reg,read=rd)): return False;
if value2[0] is None: return False
value[:]=value2[:];
if devreg.get('store'):
devreg['storeval']=value[0];
l1=int(np.floor((width+bitoffset+7)/8))
#print(value[0],width,bitoffset,l1)
if (width!=l1*8) or (bitoffset>0):
if (width<8):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
else:
value[0]=UnMask(value[0],width-(l1-1)*8,bitoffset)
else: value[0]=value2[0]
return True;