Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
i2c_dev.py 11.40 KiB
import numpy as np
import logging
from .spibitbang1 import spibitbang1
from queuetypes import *
from .hwdev import hwdev

def ApplyMask(value,width=8,bitoffset=0,previous=0):
    mask=(1<<width)-1
    if bitoffset>0:
      value<<=bitoffset;
      mask<<=bitoffset;
    return (value & mask) + (previous - (previous & mask));
def UnMask(value,width=8,bitoffset=0):
    mask=(1<<width)-1
    if bitoffset>0:
      value>>=bitoffset;
    value=value&mask;
    return value;

def int2bytes(i):
   b=[];
   while i>255:
        b=[i%256]+b;
        i>>=8;
   return [i]+b;

def GetField(D,name,dev_number,default=None):
    X=D.get(name,default)
    return X[dev_number] if isinstance(X,list) else X;
def Find(L,name,value):
  for x in L:
    if x[name]==value:
        return x;
  return False;

class AttrDict(dict):
  def __init__(self,*args,**kwargs):
    super(AttrDict,self).__init__(*args,**kwargs)
    self.__dict__=self

def DevRegList(D):
  #todo only count the drivers registers!!
  for i,dev in enumerate(D.drivers):
    dev['drv_id']=i;
  devreglist={}
  store=0;
  for dev in D.device_registers:
    N=dev.get('dim',1)
    name=dev['name']    
    for n in range(N):
      addr=GetField(dev,'address',n,0)
      devtype=GetField(dev,'driver',n)
#      print(addr,devtype)
      devid=0;
      if devtype:
        devid=Find(D.drivers,'name',devtype)['drv_id']
        devtype=Find(D.drivers,'name',devtype)['type']
      else: devtype=0;
      if N>1: name2=name+str(n+1)
      else:   name2=name;
      for reg in dev['registers']:
         regR=GetField(reg,'address',0,0)
         regW=GetField(reg,'address',1,0)
         if reg.get('store',False):
            store+=1;
            storex=store
         else:
            storex=0
#         hf.write("const devreg %s {%i,%i,%i,%i,%i};\n" % (name2+'_'+reg['name'],addr,regR,regW,storex,devtype) )
         devregname=name2+'.'+reg['name'];
         devreglist[devregname]=AttrDict({"Addr":addr,"Register_R":regR,"Register_W":regW,"store":storex,"devtype":devtype,"devid":devid});
#         print(devregname,devreglist[devregname]);
#         hf.write("inline const t_devreg %s {.address=%i,.register_R=%i,.register_W=%i,.store=%i,.driver=%i};\n" % (devregname,addr,regR,regW,storex,devtype) )
#hf.write("#define NumberStoreReg %i"%store)
#  print(devreglist)
  return devreglist,store

def list_different(A,B):
      if not(len(A)==len(B)): return True;
      for i,m in enumerate(B): 
          if not(m==A[i]): return True
      return False

def GetSteps(V1):
              if isinstance(V1['devreg'],list): Step=len(V1['devreg']) 
              else: Step=1; #V1.nVars
              Step2=V1.get('dim',1)*((V1.get('width',8)+7)//8)//Step #int(V1.size/V1.nVars)
              logging.debug(str(("GetStep",Step,Step2)));
              return Step,Step2


class i2c_dev(hwdev):
    def __init__(self,config):
        hwdev.__init__(self,config);
        self.I2Cmask=[0]
        self.I2Cmaskid=config.get('maskid',None)
        if self.I2Cmaskid is None: logging.warn(config['name']+" I2C mask not found!")
        self.I2Ccut=3;

    def OPCUASetVariable(self,varid,var1,data,mask):
       logging.info(str(("Set Var",var1['name'],data[:32],mask)))
       if varid==self.I2Cmaskid:
              logging.info("Set I2Cmask");
              if (len(data)==1) and (len(mask)>1): data=[data[0]]*len(mask);
              if len(mask)==0: mask=[True]*len(data);
              if len(data)!=len(mask): 
                logging.warn("Trying to set I2C OK with wrong mask length! %i-%i"%(len(data),len(mask)))
                return []
              if len(self.I2Cmask)!=len(mask): 
                logging.warn("Trying to set I2C OK with wrong mask length! %i-%i"%(len(data),len(mask)))
                return []
              for x,m in enumerate(mask):
                if m: self.I2Cmask[x]=data[x];
              return [OPCUAset(self.I2Cmaskid,InstType.varSet,self.I2Cmask.copy(),[])]
       if (var1['rw']=='variable') or not(var1.get('devreg')):
          return [OPCUAset(varid,InstType.varSet,data,mask)];
       oldmask=self.I2Cmask.copy()
       data,mask2=self.SetGetVarValueMask(var1,data,mask);
#       if len(mask)==len(mask2): mask[:]=mask2[:];
#       elif len(mask)==0: (mask.append(x) for x in mask2);
       Data=OPCUAset(varid,InstType.varSet,data.copy(),mask2.copy())
       if list_different(self.I2Cmask,oldmask):
           Data=[Data,OPCUAset(self.I2Cmaskid,InstType.varSet,self.I2Cmask.copy(),[])]
       else: Data=[Data] 
       return Data

    def OPCUAReadVariable(self,varid,var1,mask):
      if len(mask)==0: mask=[True]; 
      logging.info(str(("Read Var",var1['name'],"Mask=",mask)))
      if not(var1.get('devreg')): return []
      #data=self.GetVarValueAll(var1)
      #else:             
      oldmask=self.I2Cmask.copy()
      data,mask2=self.GetVarValueMask(var1,mask);
      logging.info(str(("Read Var",var1['name'],"Data=",data)))
#      if len(mask)==len(mask2): mask[:]=mask2[:];
#      elif len(mask)==0: (mask.append(x) for x in mask2);
      Data=OPCUAset(varid,InstType.varSet,data.copy(),mask2.copy())
      if list_different(self.I2Cmask,oldmask):
           Data=[Data,OPCUAset(self.I2Cmaskid,InstType.varSet,self.I2Cmask.copy(),[])]
      else: Data=[Data] 
      return Data

#    def OPCUAcallMethod(self,var1,data,mask):
#       print("Call Method",var1)
    
    def i2csetget(self,*args,**kwargs):
       return self.conf['parentcls'].i2csetget(*args,**kwargs)
    
    def SetGetVarValueMask(self,var1,data,mask):
        Step,Step2=GetSteps(var1);
        value1=[0]*Step*Step2;
#        Step=(len(var1['devreg']) if isinstance(var1['devreg'],list) else 1)
#        value1=[0]*Step;

        if not(len(data)==Step*Step2):
#        if not(len(data)==Step):
            print("Check data length!");
            return;
        if (len(mask)==1):
          mask=[mask[0] for x in range(Step)]
        if (len(mask)==0):
          mask=[True for x in range(Step)]
        if not(len(mask)==Step):
            print("Check mask length!");
            return;
        for Vari in range(Step):
                if not(mask[Vari]): continue
                if not(self.I2Cmask[0]<=self.I2Ccut):
                  mask[Vari]=False;
                  continue;
                i0=(Vari)*Step2
                i1=(Vari+1)*Step2
                devreg=var1['devreg'][Vari];
                width=var1.get('width',8)
                bitoffset=GetField(var1,'bitoffset',Vari,0)
                res=self.SetVarValue(devreg,width,bitoffset,data[i0:i1])
                if not(res):
                  self.I2Cmask[0]=1;
                  mask[Vari]=False;
                  continue;
                value2=value1[i0:i1]
                res=self.GetVarValue(devreg,width,bitoffset,value2)
                if not(res):
                  self.I2Cmask[0]+=1;
                  if self.I2Cmask[0]>self.I2Ccut: mask[Vari]=False;
                  continue;
                self.I2Cmask[0]=0;
                value1[i0:i1]=value2
        return value1,mask


    def GetVarValueMask(self,var1,mask):
        Step,Step2=GetSteps(var1);
        value1=[0]*Step*Step2;
#        Step=(len(var1['devreg']) if isinstance(var1['devreg'],list) else 1)
#        value1=[0]*Step;
        if (len(mask)==1):
          mask=[mask[0] for x in range(Step)]
        if (len(mask)==0):
          mask=[True for x in range(Step)]
        if not(len(mask)==Step):
            print("Check mask length!");
            return;
        for Vari in range(Step):
                if not(mask[Vari]): continue
                if not(self.I2Cmask[0]<=self.I2Ccut):
                  mask[Vari]=False;
                  continue;
                i0=( Vari)*Step2
                i1=(Vari+1)*Step2
                devreg=var1['devreg'][Vari];
                width=var1.get('width',8)
                bitoffset=GetField(var1,'bitoffset',Vari,0)
                value2=value1[i0:i1]
                res=self.GetVarValue(devreg,width,bitoffset,value2)
                if not(res):
                  self.I2Cmask[0]+=1;
                  if self.I2Cmask[0]>self.I2Ccut: mask[Vari]=False;
                  continue;
                self.I2Cmask[0]=0;
                value1[i0:i1]=value2
        return value1,mask


    def getstoreval(self,devreg):
          storeval=devreg.get('storeval')
          if not(storeval):
                devreg['storeval']=0;
                storeval=devreg.get('storeval');
          return storeval;

    def Setdevreg(self,devreg,value,mask=[]):
#        if devreg.get('store'): logging.debug("Stored")
#        print(devreg['store'])
        if not(self.I2Cmask[0]==0): return False;
        res=self.SetVarValue(devreg,8,0,value)
        if not(res):
          self.I2Cmask[0]=1;
          return False;
        if devreg.get('store'):
           devreg['storeval']=value[0];
        return True;

    def Getdevreg(self,devreg,mask=[]):
        value=[0];
        if not(self.I2Cmask[0]==0): return False;
        res=self.GetVarValue(devreg,8,0,value)
        if not(res):
          self.I2Cmask[0]=1;
          return False;
        if devreg.get('store'):
                devreg['storeval']=value[0];
                logging.debug("Stored value:"+str(value[0]))
        return True;        

    def SetVarValue(self,devreg,width,bitoffset,value):
            if devreg['register_W']==-1: return True; #We can not set it, only read it e.g. temperature
            logging.debug(str(("I2C Set ",devreg['addr'],value)))
            #self.parentcls.SetChannel(1<<RCU_Switch[RCUi]);
            if devreg.get('store'):
                previous=self.getstoreval(devreg);
                for x in range(len(value)):
                  value[x]=ApplyMask(value[x],width,bitoffset,previous);
                devreg['storeval']=value[0];
                logging.debug("Stored value:"+str(value[0]))
            #  devreg['drivercls'].i2csetget
            return devreg['drivercls'].i2csetget(devreg['addr'],value,reg=devreg['register_W'])

    def GetVarValue(self,devreg,width,bitoffset,value,mode=0):
#mode=0: Set register, wait, read
#mode=1: only read
#mode=2: only set register
#mode=3: only wait
        logging.debug(str(("i2c_dev Get ",devreg['addr'],value)))
        callback=devreg['drivercls'].i2csetget;
        reg=devreg['register_R']
        if mode==2:
           return callback(devreg['addr'],int2bytes(reg),read=2)
        elif mode==3:
           if devreg.get('wait',0)>0: callback(0,[devreg['wait']],read=3)
           return True;
        value2=value
        if mode==1:
           if not(callback(devreg['addr'],value2,read=1)): return False;
        else:
           if devreg.get('wait',0)>0:
              callback(0,[devreg['wait']],read=3)
              if not(callback(devreg['addr'],int2bytes(reg),read=2)): return False;
              callback(0,[devreg['wait']],read=3)
              if not(callback(devreg['addr'],value2,read=1)): return False;
           else:
              rd=(5 if devreg.get('crc') else 1)
              if not(callback(devreg['addr'],value2,reg=reg,read=rd)): return False;
        if value2[0] is None:  return False
        value[:]=value2[:];
        if devreg.get('store'):
             devreg['storeval']=value[0];
        l1=int(np.floor((width+bitoffset+7)/8))
        #print(value[0],width,bitoffset,l1)
        if (width!=l1*8) or (bitoffset>0):
            if (width<8):
              for i in range(len(value)):
                value[i]=UnMask(value[i],width,bitoffset)
            else:
                value[0]=UnMask(value[0],width-(l1-1)*8,bitoffset)
        else: value[0]=value2[0]
        return True;