import numpy as np import logging from .spibitbang1 import spibitbang1 from queuetypes import * from .hwdev import hwdev def ApplyMask(value,width=8,bitoffset=0,previous=0): mask=(1<<width)-1 if bitoffset>0: value<<=bitoffset; mask<<=bitoffset; return (value & mask) + (previous - (previous & mask)); def UnMask(value,width=8,bitoffset=0): mask=(1<<width)-1 if bitoffset>0: value>>=bitoffset; value=value&mask; return value; def int2bytes(i): b=[]; while i>255: b=[i%256]+b; i>>=8; return [i]+b; def GetField(D,name,dev_number,default=None): X=D.get(name,default) return X[dev_number] if isinstance(X,list) else X; def Find(L,name,value): for x in L: if x[name]==value: return x; return False; class AttrDict(dict): def __init__(self,*args,**kwargs): super(AttrDict,self).__init__(*args,**kwargs) self.__dict__=self def DevRegList(D): #todo only count the drivers registers!! for i,dev in enumerate(D.drivers): dev['drv_id']=i; devreglist={} store=0; for dev in D.device_registers: N=dev.get('dim',1) name=dev['name'] for n in range(N): addr=GetField(dev,'address',n,0) devtype=GetField(dev,'driver',n) # print(addr,devtype) devid=0; if devtype: devid=Find(D.drivers,'name',devtype)['drv_id'] devtype=Find(D.drivers,'name',devtype)['type'] else: devtype=0; if N>1: name2=name+str(n+1) else: name2=name; for reg in dev['registers']: regR=GetField(reg,'address',0,0) regW=GetField(reg,'address',1,0) if reg.get('store',False): store+=1; storex=store else: storex=0 # hf.write("const devreg %s {%i,%i,%i,%i,%i};\n" % (name2+'_'+reg['name'],addr,regR,regW,storex,devtype) ) devregname=name2+'.'+reg['name']; devreglist[devregname]=AttrDict({"Addr":addr,"Register_R":regR,"Register_W":regW,"store":storex,"devtype":devtype,"devid":devid}); # print(devregname,devreglist[devregname]); # hf.write("inline const t_devreg %s {.address=%i,.register_R=%i,.register_W=%i,.store=%i,.driver=%i};\n" % (devregname,addr,regR,regW,storex,devtype) ) #hf.write("#define NumberStoreReg %i"%store) # print(devreglist) return devreglist,store def list_different(A,B): if not(len(A)==len(B)): return True; for i,m in enumerate(B): if not(m==A[i]): return True return False class i2c_dev(hwdev): def __init__(self,config): hwdev.__init__(self,config); self.I2Cmask=[0] self.I2Cmaskid=config.get('maskid',None) if self.I2Cmaskid is None: logging.warn(config['name']+" I2C mask not found!") def OPCUASetVariable(self,varid,var1,data,mask): logging.info(str(("Set Var",var1['name'],data,mask))) if (var1['rw']=='variable') or not(var1.get('devreg')): if varid==self.I2Cmaskid: if len(mask)==0: mask=[True]*len(data); if len(data)!=len(mask): logging.warn("Trying to set I2C OK with wrong mask length!") return [] for x,m in enumerate(mask): if m: self.I2Cmask[x]=data[x]; return [OPCUAset(self.I2Cmaskid,InstType.varSet,self.I2Cmask.copy(),[])] return [OPCUAset(varid,InstType.varSet,data,mask)]; oldmask=self.I2Cmask.copy() data,mask2=self.SetGetVarValueMask(var1,data,mask); # if len(mask)==len(mask2): mask[:]=mask2[:]; # elif len(mask)==0: (mask.append(x) for x in mask2); Data=OPCUAset(varid,InstType.varSet,data.copy(),mask2.copy()) if list_different(self.I2Cmask,oldmask): Data=[Data,OPCUAset(self.I2Cmaskid,InstType.varSet,self.I2Cmask.copy(),[])] else: Data=[Data] return Data def OPCUAReadVariable(self,varid,var1,mask): logging.info(str(("Read Var",var1['name'],mask))) if not(var1.get('devreg')): return [] #data=self.GetVarValueAll(var1) #else: oldmask=self.I2Cmask.copy() data,mask2=self.GetVarValueMask(var1,mask); # if len(mask)==len(mask2): mask[:]=mask2[:]; # elif len(mask)==0: (mask.append(x) for x in mask2); Data=OPCUAset(varid,InstType.varSet,data.copy(),mask2.copy()) if list_different(self.I2Cmask,oldmask): Data=[Data,OPCUAset(self.I2Cmaskid,InstType.varSet,self.I2Cmask.copy(),[])] else: Data=[Data] return Data # def OPCUAcallMethod(self,var1,data,mask): # print("Call Method",var1) def i2csetget(self,*args,**kwargs): return self.conf['parentcls'].i2csetget(*args,**kwargs) def SetGetVarValueMask(self,var1,data,mask): Step=(len(var1['devreg']) if isinstance(var1['devreg'],list) else 1) value1=[0]*Step if not(len(data)==Step): print("Check data length!"); return; if (len(mask)==1): mask=[mask[0] for x in range(Step)] if (len(mask)==0): mask=[True for x in range(Step)] if not(len(mask)==Step): print("Check mask length!"); return; for Vari in range(Step): if not(mask[Vari]): continue if not(self.I2Cmask[0]==0): mask[Vari]=False; continue; i0=(Vari)*Step i1=(Vari+1)*Step devreg=var1['devreg'][Vari]; width=var1.get('width',8) bitoffset=GetField(var1,'bitoffset',Vari,0) res=self.SetVarValue(devreg,width,bitoffset,data[i0:i1]) if not(res): self.I2Cmask[0]=1; mask[Vari]=False; continue; value2=value1[i0:i1] res=self.GetVarValue(devreg,width,bitoffset,value2) if not(res): self.I2Cmask[0]=1; mask[Vari]=False; continue; value1[i0:i1]=value2 return value1,mask def GetVarValueMask(self,var1,mask): Step=(len(var1['devreg']) if isinstance(var1['devreg'],list) else 1) value1=[0]*Step; if (len(mask)==1): mask=[mask[0] for x in range(Step)] if (len(mask)==0): mask=[True for x in range(Step)] if not(len(mask)==Step): print("Check mask length!"); return; for Vari in range(Step): if not(mask[Vari]): continue if not(self.I2Cmask[0]==0): mask[Vari]=False; continue; i0=( Vari)*Step i1=(Vari+1)*Step devreg=var1['devreg'][Vari]; width=var1.get('width',8) bitoffset=GetField(var1,'bitoffset',Vari,0) value2=value1[i0:i1] res=self.GetVarValue(devreg,width,bitoffset,value2) if not(res): self.I2Cmask[0]=1; mask[Vari]=False; continue; value1[i0:i1]=value2 return value1,mask def getstoreval(self,devreg): storeval=devreg.get('storeval') if not(storeval): devreg['storeval']=0; storeval=devreg.get('storeval'); return storeval; def Setdevreg(self,devreg,value,mask=[]): # if devreg.get('store'): logging.debug("Stored") # print(devreg['store']) if not(self.I2Cmask[0]==0): return False; res=self.SetVarValue(devreg,8,0,value) if not(res): self.I2Cmask[0]=1; return False; if devreg.get('store'): devreg['storeval']=value[0]; return True; def Getdevreg(self,devreg,mask=[]): value=[0]; if not(self.I2Cmask[0]==0): return False; res=self.GetVarValue(devreg,8,0,value) if not(res): self.I2Cmask[0]=1; return False; if devreg.get('store'): devreg['storeval']=value[0]; logging.debug("Stored value:"+str(value[0])) return True; def SetVarValue(self,devreg,width,bitoffset,value): if devreg['register_W']==-1: return True; #We can not set it, only read it e.g. temperature logging.debug(str(("I2C Set ",devreg['addr'],value))) #self.parentcls.SetChannel(1<<RCU_Switch[RCUi]); if devreg['store']: previous=self.getstoreval(devreg); for x in range(len(value)): value[x]=ApplyMask(value[x],width,bitoffset,previous); devreg['storeval']=value[0]; logging.debug("Stored value:"+str(value[0])) # devreg['drivercls'].i2csetget return devreg['drivercls'].i2csetget(devreg['addr'],value,reg=devreg['register_W']) def GetVarValue(self,devreg,width,bitoffset,value): logging.debug(str(("RCU1 Get ",devreg['addr'],value))) callback=devreg['drivercls'].i2csetget; value2=value reg=devreg['register_R'] if not(callback(devreg['addr'],value2,reg=reg,read=1)): return False; if value2[0] is None: return False value[:]=value2[:]; if devreg['store']: devreg['storeval']=value[0]; l1=int(np.floor((width+bitoffset+7)/8)) #print(value[0],width,bitoffset,l1) if (width!=l1*8) or (bitoffset>0): if (width<8): for i in range(len(value)): value[i]=UnMask(value[i],width,bitoffset) else: value[0]=UnMask(value[0],width-(l1-1)*8,bitoffset) else: value[0]=value2[0] return True;