Skip to content
Snippets Groups Projects
Select Git revision
  • ca31a2d1bef23aa4888171b5a57d40d543052688
  • master default protected
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

tcp_replicator.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    I2Cdevices.py 9.61 KiB
    from hwdev import *;
    import numpy as np
    
    
    def GetField(D,name,dev_number):
        X=D.get(name)
        return X[dev_number] if isinstance(X,list) else X;
    
    
    def bytes2int(bts):
       x=0;
       for b in bts:
         x=x*256+b;
       return x;
    def int2bytes(i):
       b=[];
       while i>255: 
            b=[i%256]+b;
            i>>=8;
       return [i]+b;
       
    
    def I2CName2pin(s):
        dev,reg,pin=s.split('.')
        return dev,reg,int(pin)
    
    
    def ApplyMask(value,width=8,bitoffset=0,previous=0):
        mask=(1<<width)-1
        if bitoffset>0:
          value<<=bitoffset;
          mask<<=bitoffset;
        return (value & mask) + (previous - (previous & mask));
    def UnMask(value,width=8,bitoffset=0):
        mask=(1<<width)-1
        if bitoffset>0:
          value>>=bitoffset;
        value=value&mask;
        return value;
    
    
    class I2Cdevices(hwdev):
          def GetVarValues(names,value,I2Ccallback):
            return True  
     
          def CallMethod(self,name,params,I2Ccallback):
    #        print("RCU1 call method",name)
            for Mth in self.D['Methods']:
              if Mth['method_name']==name:
                 print("RCU1 Run method",name)
                 for regs in Mth['registers']:
                    for key,value in regs.items():
                      self.SetVarValue(key,value,I2Ccallback)
                 return True
            return hwdev.CallMethod(self,name,params,I2Ccallback)
    
          def SetChildValue(self,D,child,name,value,I2Ccallback):
             Pins={}
             for p in D['child_GPIO']:
                for key,regvalue in p.items():
                  Pins[key]=I2CName2pin(regvalue)
             def SetGetPin(pin,value,Get=False):
               Pin=Pins[pin]
               if not(Get): return self.I2CSet(Pin[0],Pin[1],[value],I2Ccallback,1,Pin[2])
               a=[0]
               self.I2CGet(Pin[0],Pin[1],a,I2Ccallback,1,Pin[2])
               return a[0];
             return child.SetVarValue(name,value,SetGetPin)
          def GetChildValue(self,D,child,name,value,I2Ccallback):
             Pins={}
             for p in D['child_GPIO']:
                for key,regvalue in p.items():
                  Pins[key]=I2CName2pin(regvalue)
             def SetGetPin(pin,value,Get=False):
               Pin=Pins[pin]
               if not(Get): return self.I2CSet(Pin[0],Pin[1],[value],I2Ccallback,1,Pin[2])
               a=[0]
               self.I2CGet(Pin[0],Pin[1],a,I2Ccallback,1,Pin[2])
               return a[0];
             print("Set Child",child)
             return child.GetVarValue(name,value,SetGetPin)
    
          def SetVarValue(self,name,value,I2Ccallback):
                print("RCU1 Set ",name,value)
                D=self.D
                Var1=Find(D['Variables'],'var_name',name)
    
                if (Var1): 
                  dev_number=0
                  Dev=GetField(Var1,'var_dev',dev_number)
                  Dev=Dev.split('.')
    
                  vmax=GetField(Var1,'var_max',dev_number)
                  if vmax:
                      if value[0]>vmax: value[0]=vmax;
                  vmin=GetField(Var1,'var_min',dev_number)
                  if vmin:
                      if value[0]<vmin: value[0]=vmin;
                      value[0]=value-vmin;   
                  width=GetField(Var1,'var_width',dev_number)
                  bitoffset=GetField(Var1,'var_bitoffset',dev_number)
                  scale=GetField(Var1,'var_scale',dev_number)
                elif (Find(D['Methods'],'method_name',name)):
                   return self.CallMethod(name,value,I2Ccallback)
                elif len(name.split('.'))==2:
                       Dev=name.split('.');
                       width=False;
                       bitoffset=False;
                       scale=False;
                else:
                       childname=name.split('_')[0]
                       for c in self.D['dev_children']:
                             if childname==c['child_name']: 
                                 return self.SetChildValue(c,c['obj'],name[len(childname)+1:],value,I2Ccallback)
    #                   return hwdev.SetVarValue(self,name,value,I2Ccallback)
                       print("Unknown variable",name);
                       return False;
                print(Dev)
                if scale: value=int2bytes(int(value[0]/float(scale)))
                if Find(D['I2C_devices'],'dev_name',Dev[0]):
                  if not(width): width=8
                  if not(bitoffset): bitoffset=0
                  print('SetVar ',name,'I2C',Dev,value,'(width=',width,'bitoffset=',bitoffset,')')
                  return self.I2CSet(Dev[0],Dev[1],value,I2Ccallback,width,bitoffset)
    
                if Find(D['SPI_devices'],'dev_name',Dev[0]):
                  print('SetVar ',name,'SPI',Dev,value)
                  return SPIset(D,Dev[0],Dev[1],value,I2Ccallback,dev_number)
                hwdev.SetVarValue(self,name,value,I2Ccallback)
    #        return SetVar(self.D,name,value,I2Ccallback)
    
          def GetVarValue(self,name,value,I2Ccallback):
    #def GetVar(D,name,value,I2Ccallback,dev_number=0):
                dev_number=0;D=self.D
                print("RCU1 Get ",name,value)
                Var1=Find(D['Variables'],'var_name',name)
                if (Var1): 
                  Dev=GetField(Var1,'var_dev',dev_number)
                  Dev=Dev.split('.')
                  width=GetField(Var1,'var_width',dev_number)
                  bitoffset=GetField(Var1,'var_bitoffset',dev_number)
                  scale=GetField(Var1,'var_scale',dev_number)
                elif len(name.split('.'))==2:
                       Dev=name.split('.');
                       width=False;
                       bitoffset=False;
                       scale=False;
                else:
                       childname=name.split('_')[0]
                       for c in self.D['dev_children']:
                             if childname==c['child_name']: 
                                 return self.GetChildValue(c,c['obj'],name[len(childname)+1:],value,I2Ccallback)
    #                   return hwdev.SetVarValue(self,name,value,I2Ccallback)
                       print("Unknown variable",name);
                       return False
    
                print(Dev)
                if not(width): width=8
                if not(bitoffset): bitoffset=0
                if Find(D['I2C_devices'],'dev_name',Dev[0]):
                  print('GetVar ',name,'I2C',Dev,value,'(width=',width,'bitoffset=',bitoffset,')')
                  result=self.I2CGet(Dev[0],Dev[1],value,I2Ccallback,width,bitoffset)
                  if not(result): return False;
                  if scale: value[0]=float(value[0])*float(scale);
                  return True
                if Find(D['SPI_devices'],'dev_name',Dev[0]):
                  print('GetVar ',name,'SPI',Dev,value)
                  return SPIget(D,Dev[0],Dev[1],value,I2Ccallback,dev_number)
                if Find(D['I2C_bitbang'],'dev_name',Dev[0]):
                  print('GetVar ',name,'I2C bb',Dev,value)
                  if not(I2Cbbget(D,Dev[0],Dev[1],value,I2Ccallback,dev_number,width,bitoffset)): return False
                  if scale: value[0]=float(value[0])*float(scale);
                  return True;
                print("Unknown variable",name)
                return False;
    
    #        return GetVar(self.D,name,value,I2Ccallback)
    
          def GetVarNames(self,parentname,callback):
              for V in self.D['Variables']:
                dtype=(1 if V.get('var_scale') else 0)
                RWs={'RO' : 1,'WO' : 2,'RW' : 3}
                RW=(RWs[V.get('var_R/W')] if V.get('var_R/W') else 0)
                callback(parentname+V['var_name'],dtype,RW)
              return hwdev.GetVarNames(self,parentname,callback);
    
          def GetMethodNames(self,parentname,callback):
              for V in self.D['Methods']:
                if not(V.get('method_invisible')): callback(parentname+V['method_name'])
              return hwdev.GetMethodNames(self,parentname,callback);
    
          def I2CSet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
          #remember of values, add new ones with mask
          #  print('I2CSet:',dev,'=',value,'(mask=',mask,')')
            Dev=Find(self.D['I2C_devices'],'dev_name',dev)    
            Addr=Dev['dev_address']
            Reg=Find(Dev['dev_registers'],'reg_name',reg)    
          #  print(dev,reg,value)
            if (width<8) or (bitoffset>0):
              previous=Reg.get('previous')
              if not(previous):
                  previous=Reg.get('reg_default')
              if not(previous):
                  previous=0;
              value[0]=ApplyMask(value[0],width,bitoffset,previous);
          #    print('masked to',value)
          #  print("Set",dev,reg,"to",value)    
            result=I2Ccallback(Addr,value,reg=Reg['reg_addr'],)
            #if result:        
            Reg['previous']=value[0]
            return result 
    
          def I2CGet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
            Dev=Find(self.D['I2C_devices'],'dev_name',dev)    
            Addr=Dev['dev_address']
            Reg=Find(Dev['dev_registers'],'reg_name',reg)  
            l1=int(np.floor((width+bitoffset+7)/8))
    #        print(width,bitoffset,l1)
            makesinglevalue=((len(value)==1) and (l1>1));
            if makesinglevalue: value2=[0 for x in range(l1)]
            else: value2=value
            reg=Reg['reg_addr']
            if reg>255: #This is for the monitor ADC
              if not(I2Ccallback(Addr,int2bytes(reg),read=2)): return False;
              I2Ccallback(Addr,[250],read=3)
              if not(I2Ccallback(Addr,value2,read=1)): return False;
            else:
              if not(I2Ccallback(Addr,value2,reg=Reg['reg_addr'],read=1)): return False;
            if value2[0] is None:  return False
            if makesinglevalue: value[0]=bytes2int(value2)
            else: value[:]=value2[:];
            Reg['previous']=value[0]
            if (width!=l1*8) or (bitoffset>0): 
                value[0]=UnMask(value[0],width,bitoffset)      
            else: value[0]=value2[0]
            return True;
    
    #def I2CGetBuff(D,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
    #        Dev=Find(D['I2C_devices'],'dev_name',dev)    
    #        Addr=Dev['dev_address']
    #        Reg=Find(Dev['dev_registers'],'reg_name',reg)  
    #        value[0]=Reg.get('previous')
    #        print("Get Value=",value[0])
    #        if (width<8) or (bitoffset>0): 
    #            value[0]=UnMask(value[0],width,bitoffset)      
    #        return True;