Skip to content
Snippets Groups Projects
Select Git revision
  • d10296252e93269b5388b57333c6eb119f4399fa
  • master default protected
  • dither_on_off_disabled
  • yocto
  • pypcc2
  • pypcc3
  • 2020-12-07-the_only_working_copy
  • v2.1
  • v2.0
  • v1.0
  • v0.9
  • Working-RCU_ADC,ID
  • 2020-12-11-Holiday_Season_release
13 results

I2Cswitch.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    I2Cswitch.py 6.29 KiB
    from hwdev import *;
    import numpy as np
    import logging
    
    def I2C_dummy(addr,data,reg=None,read=0):
    #            logging.info("I2C dummy",addr,data)
                return True;
    
    class I2Cswitch(hwdev):
        def SetChannel(self,channel,I2Ccallback):
            if not(hasattr(self,'CurrentChannel')): self.CurrentChannel=0
            logging.info("SetChannel",channel,self.CurrentChannel)
            if (channel)==self.CurrentChannel: return True;
            self.CurrentChannel=channel
            return I2Ccallback(self.D['dev_address'],[channel])
            
    #    def GetVarNames(self,parentname,callback):
    #        for c in self.D['dev_children']:
    #            c['obj'].GetVarNames(parentname+c['name']+'_',callback)
    #        return True;
    
    #    def GetMethodNames(self,parentname,callback):
    #        for c in self.D['dev_children']:
    #            c['obj'].GetMethodNames(parentname+c['name']+'_',callback)
    #        return True;
    
        def GetVarValue(self,name,value,I2Ccallback):
            def I2Ccallback2(addr,data,reg=None,read=0):
    #            logging.info("Getvarcallback",x)
                if (read==2):
                    if (cnt>0): return True;
                    if not(self.SetChannel(gchannel,I2Ccallback)): return False;
                elif (read==3):
                    if (cnt>0): return True;
                    return I2Ccallback(addr,data,reg,read)
                elif not(self.SetChannel(cchannel,I2Ccallback)): return False;
                return I2Ccallback(addr,data,reg,read)
    
            childname=name.split('_')[0]
            child=Find(self.D['dev_children'],'child_name',childname)
            if child:
                  if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False; 
                  return child['obj'].GetVarValue(name[len(childname)+1:],value,I2Ccallback)
            group=Find(self.D['dev_groups'],'group_name',childname)
            if group:
                  devcnt=len(group['group_members'])
                  stride=len(value)//devcnt;
                  gchannel=0;
                  for childname2 in group['group_members']:
                       gchannel+=1<<(Find(self.D['dev_children'],'child_name',childname2)['child_addr'])
    
                  for cnt,childname2 in enumerate(group['group_members']):
                        child=Find(self.D['dev_children'],'child_name',childname2)
                        if not(child): return False;
                        logging.info("Get:",childname2)
                        cchannel=1<<child['child_addr']
                        value2=value[cnt*stride:(cnt+1)*stride]
                        if not(child['obj'].GetVarValue(name[len(childname)+1:],value2,I2Ccallback2)): return False; 
                        value[cnt*stride:(cnt+1)*stride]=value2
                  return True;
            return False
    
        def SetVarValue(self,name,value,I2Ccallback):
            childname=name.split('_')[0]
            child=Find(self.D['dev_children'],'child_name',childname)
            if child:
                  if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False; 
                  return child['obj'].SetVarValue(name[len(childname)+1:],value,I2Ccallback)
            group=Find(self.D['dev_groups'],'group_name',childname)
            if group:
                  devcnt=len(group['group_members'])
                  stride=len(value)//devcnt;
                  AllSame=True;
                  for x in range(devcnt-1):
                      for y in range(stride):
                          if value[x*stride+y]!=value[(x+1)*stride+y]: AllSame=False;
                  if AllSame: 
                    logging.info("Allsame");              
                    channel=0;
                    for childname2 in group['group_members']:
                        channel+=1<<(Find(self.D['dev_children'],'child_name',childname2)['child_addr'])
                    logging.info(channel)
                    if not(self.SetChannel(channel,I2Ccallback)): return False; 
                    for x,childname2 in enumerate(group['group_members']):
                        child=Find(self.D['dev_children'],'child_name',childname2)
                        if not(child): return False;
                        callback=(I2Ccallback if x==0 else I2C_dummy)
                        if not(child['obj'].SetVarValue(name[len(childname)+1:],value[:stride],callback)): return false; 
                    return True;
                  else:
                    for x,childname2 in enumerate(group['group_members']):
                        child=Find(self.D['dev_children'],'child_name',childname2)
                        if not(child): return False;
                        logging.info("Set:",childname2)
                        if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False; 
                        if not(child['obj'].SetVarValue(name[len(childname)+1:],value[x*stride:(x+1)*stride],I2Ccallback)): return false; 
                    return True;
            return False
    
        def CallMethod(self,name,params,I2Ccallback):
            childname=name.split('_')[0]
            child=Find(self.D['dev_children'],'child_name',childname)
            if child:
                if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False; 
                return child['obj'].CallMethod(name[len(childname)+1:],params,I2Ccallback)
            group=Find(self.D['dev_groups'],'group_name',childname)
            if group:
                  channel=0;
                  for childname2 in group['group_members']:
                     channel+=1<<(Find(self.D['dev_children'],'child_name',childname2)['child_addr'])
                  if not(self.SetChannel(channel,I2Ccallback)): return False; 
                  for x,childname2 in enumerate(group['group_members']):
                     child=Find(self.D['dev_children'],'child_name',childname2)
                     if not(child): return False;
                     callback=(I2Ccallback if x==0 else I2C_dummy)
                     if not(child['obj'].CallMethod(name[len(childname)+1:],params,callback)): return false; 
                  return True;
            return False;
    
        def GetVarNames(self,parentname,callback):
            if not(hwdev.GetVarNames(self,parentname,callback)): return False;
            def AddVar(name,dtype=0,RW=3,cnt=1):
                callback(name,dtype,RW,cnt*devcnt)
            for group in self.D['dev_groups']:
                    childname=group['group_members'][0]
                    devcnt=len(group['group_members'])
                    child=Find(self.D['dev_children'],'child_name',childname)
                    if not(child): return False;
                    child['obj'].GetVarNames(parentname+group['group_name']+'_',AddVar)
            return True;