Select Git revision
I2Cswitch.py

Jan David Mol authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
I2Cswitch.py 6.29 KiB
from hwdev import *;
import numpy as np
import logging
def I2C_dummy(addr,data,reg=None,read=0):
# logging.info("I2C dummy",addr,data)
return True;
class I2Cswitch(hwdev):
def SetChannel(self,channel,I2Ccallback):
if not(hasattr(self,'CurrentChannel')): self.CurrentChannel=0
logging.info("SetChannel",channel,self.CurrentChannel)
if (channel)==self.CurrentChannel: return True;
self.CurrentChannel=channel
return I2Ccallback(self.D['dev_address'],[channel])
# def GetVarNames(self,parentname,callback):
# for c in self.D['dev_children']:
# c['obj'].GetVarNames(parentname+c['name']+'_',callback)
# return True;
# def GetMethodNames(self,parentname,callback):
# for c in self.D['dev_children']:
# c['obj'].GetMethodNames(parentname+c['name']+'_',callback)
# return True;
def GetVarValue(self,name,value,I2Ccallback):
def I2Ccallback2(addr,data,reg=None,read=0):
# logging.info("Getvarcallback",x)
if (read==2):
if (cnt>0): return True;
if not(self.SetChannel(gchannel,I2Ccallback)): return False;
elif (read==3):
if (cnt>0): return True;
return I2Ccallback(addr,data,reg,read)
elif not(self.SetChannel(cchannel,I2Ccallback)): return False;
return I2Ccallback(addr,data,reg,read)
childname=name.split('_')[0]
child=Find(self.D['dev_children'],'child_name',childname)
if child:
if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False;
return child['obj'].GetVarValue(name[len(childname)+1:],value,I2Ccallback)
group=Find(self.D['dev_groups'],'group_name',childname)
if group:
devcnt=len(group['group_members'])
stride=len(value)//devcnt;
gchannel=0;
for childname2 in group['group_members']:
gchannel+=1<<(Find(self.D['dev_children'],'child_name',childname2)['child_addr'])
for cnt,childname2 in enumerate(group['group_members']):
child=Find(self.D['dev_children'],'child_name',childname2)
if not(child): return False;
logging.info("Get:",childname2)
cchannel=1<<child['child_addr']
value2=value[cnt*stride:(cnt+1)*stride]
if not(child['obj'].GetVarValue(name[len(childname)+1:],value2,I2Ccallback2)): return False;
value[cnt*stride:(cnt+1)*stride]=value2
return True;
return False
def SetVarValue(self,name,value,I2Ccallback):
childname=name.split('_')[0]
child=Find(self.D['dev_children'],'child_name',childname)
if child:
if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False;
return child['obj'].SetVarValue(name[len(childname)+1:],value,I2Ccallback)
group=Find(self.D['dev_groups'],'group_name',childname)
if group:
devcnt=len(group['group_members'])
stride=len(value)//devcnt;
AllSame=True;
for x in range(devcnt-1):
for y in range(stride):
if value[x*stride+y]!=value[(x+1)*stride+y]: AllSame=False;
if AllSame:
logging.info("Allsame");
channel=0;
for childname2 in group['group_members']:
channel+=1<<(Find(self.D['dev_children'],'child_name',childname2)['child_addr'])
logging.info(channel)
if not(self.SetChannel(channel,I2Ccallback)): return False;
for x,childname2 in enumerate(group['group_members']):
child=Find(self.D['dev_children'],'child_name',childname2)
if not(child): return False;
callback=(I2Ccallback if x==0 else I2C_dummy)
if not(child['obj'].SetVarValue(name[len(childname)+1:],value[:stride],callback)): return false;
return True;
else:
for x,childname2 in enumerate(group['group_members']):
child=Find(self.D['dev_children'],'child_name',childname2)
if not(child): return False;
logging.info("Set:",childname2)
if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False;
if not(child['obj'].SetVarValue(name[len(childname)+1:],value[x*stride:(x+1)*stride],I2Ccallback)): return false;
return True;
return False
def CallMethod(self,name,params,I2Ccallback):
childname=name.split('_')[0]
child=Find(self.D['dev_children'],'child_name',childname)
if child:
if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False;
return child['obj'].CallMethod(name[len(childname)+1:],params,I2Ccallback)
group=Find(self.D['dev_groups'],'group_name',childname)
if group:
channel=0;
for childname2 in group['group_members']:
channel+=1<<(Find(self.D['dev_children'],'child_name',childname2)['child_addr'])
if not(self.SetChannel(channel,I2Ccallback)): return False;
for x,childname2 in enumerate(group['group_members']):
child=Find(self.D['dev_children'],'child_name',childname2)
if not(child): return False;
callback=(I2Ccallback if x==0 else I2C_dummy)
if not(child['obj'].CallMethod(name[len(childname)+1:],params,callback)): return false;
return True;
return False;
def GetVarNames(self,parentname,callback):
if not(hwdev.GetVarNames(self,parentname,callback)): return False;
def AddVar(name,dtype=0,RW=3,cnt=1):
callback(name,dtype,RW,cnt*devcnt)
for group in self.D['dev_groups']:
childname=group['group_members'][0]
devcnt=len(group['group_members'])
child=Find(self.D['dev_children'],'child_name',childname)
if not(child): return False;
child['obj'].GetVarNames(parentname+group['group_name']+'_',AddVar)
return True;