Select Git revision
tcp_replicator.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
I2Cdevices.py 9.61 KiB
from hwdev import *;
import numpy as np
def GetField(D,name,dev_number):
X=D.get(name)
return X[dev_number] if isinstance(X,list) else X;
def bytes2int(bts):
x=0;
for b in bts:
x=x*256+b;
return x;
def int2bytes(i):
b=[];
while i>255:
b=[i%256]+b;
i>>=8;
return [i]+b;
def I2CName2pin(s):
dev,reg,pin=s.split('.')
return dev,reg,int(pin)
def ApplyMask(value,width=8,bitoffset=0,previous=0):
mask=(1<<width)-1
if bitoffset>0:
value<<=bitoffset;
mask<<=bitoffset;
return (value & mask) + (previous - (previous & mask));
def UnMask(value,width=8,bitoffset=0):
mask=(1<<width)-1
if bitoffset>0:
value>>=bitoffset;
value=value&mask;
return value;
class I2Cdevices(hwdev):
def GetVarValues(names,value,I2Ccallback):
return True
def CallMethod(self,name,params,I2Ccallback):
# print("RCU1 call method",name)
for Mth in self.D['Methods']:
if Mth['method_name']==name:
print("RCU1 Run method",name)
for regs in Mth['registers']:
for key,value in regs.items():
self.SetVarValue(key,value,I2Ccallback)
return True
return hwdev.CallMethod(self,name,params,I2Ccallback)
def SetChildValue(self,D,child,name,value,I2Ccallback):
Pins={}
for p in D['child_GPIO']:
for key,regvalue in p.items():
Pins[key]=I2CName2pin(regvalue)
def SetGetPin(pin,value,Get=False):
Pin=Pins[pin]
if not(Get): return self.I2CSet(Pin[0],Pin[1],[value],I2Ccallback,1,Pin[2])
a=[0]
self.I2CGet(Pin[0],Pin[1],a,I2Ccallback,1,Pin[2])
return a[0];
return child.SetVarValue(name,value,SetGetPin)
def GetChildValue(self,D,child,name,value,I2Ccallback):
Pins={}
for p in D['child_GPIO']:
for key,regvalue in p.items():
Pins[key]=I2CName2pin(regvalue)
def SetGetPin(pin,value,Get=False):
Pin=Pins[pin]
if not(Get): return self.I2CSet(Pin[0],Pin[1],[value],I2Ccallback,1,Pin[2])
a=[0]
self.I2CGet(Pin[0],Pin[1],a,I2Ccallback,1,Pin[2])
return a[0];
print("Set Child",child)
return child.GetVarValue(name,value,SetGetPin)
def SetVarValue(self,name,value,I2Ccallback):
print("RCU1 Set ",name,value)
D=self.D
Var1=Find(D['Variables'],'var_name',name)
if (Var1):
dev_number=0
Dev=GetField(Var1,'var_dev',dev_number)
Dev=Dev.split('.')
vmax=GetField(Var1,'var_max',dev_number)
if vmax:
if value[0]>vmax: value[0]=vmax;
vmin=GetField(Var1,'var_min',dev_number)
if vmin:
if value[0]<vmin: value[0]=vmin;
value[0]=value-vmin;
width=GetField(Var1,'var_width',dev_number)
bitoffset=GetField(Var1,'var_bitoffset',dev_number)
scale=GetField(Var1,'var_scale',dev_number)
elif (Find(D['Methods'],'method_name',name)):
return self.CallMethod(name,value,I2Ccallback)
elif len(name.split('.'))==2:
Dev=name.split('.');
width=False;
bitoffset=False;
scale=False;
else:
childname=name.split('_')[0]
for c in self.D['dev_children']:
if childname==c['child_name']:
return self.SetChildValue(c,c['obj'],name[len(childname)+1:],value,I2Ccallback)
# return hwdev.SetVarValue(self,name,value,I2Ccallback)
print("Unknown variable",name);
return False;
print(Dev)
if scale: value=int2bytes(int(value[0]/float(scale)))
if Find(D['I2C_devices'],'dev_name',Dev[0]):
if not(width): width=8
if not(bitoffset): bitoffset=0
print('SetVar ',name,'I2C',Dev,value,'(width=',width,'bitoffset=',bitoffset,')')
return self.I2CSet(Dev[0],Dev[1],value,I2Ccallback,width,bitoffset)
if Find(D['SPI_devices'],'dev_name',Dev[0]):
print('SetVar ',name,'SPI',Dev,value)
return SPIset(D,Dev[0],Dev[1],value,I2Ccallback,dev_number)
hwdev.SetVarValue(self,name,value,I2Ccallback)
# return SetVar(self.D,name,value,I2Ccallback)
def GetVarValue(self,name,value,I2Ccallback):
#def GetVar(D,name,value,I2Ccallback,dev_number=0):
dev_number=0;D=self.D
print("RCU1 Get ",name,value)
Var1=Find(D['Variables'],'var_name',name)
if (Var1):
Dev=GetField(Var1,'var_dev',dev_number)
Dev=Dev.split('.')
width=GetField(Var1,'var_width',dev_number)
bitoffset=GetField(Var1,'var_bitoffset',dev_number)
scale=GetField(Var1,'var_scale',dev_number)
elif len(name.split('.'))==2:
Dev=name.split('.');
width=False;
bitoffset=False;
scale=False;
else:
childname=name.split('_')[0]
for c in self.D['dev_children']:
if childname==c['child_name']:
return self.GetChildValue(c,c['obj'],name[len(childname)+1:],value,I2Ccallback)
# return hwdev.SetVarValue(self,name,value,I2Ccallback)
print("Unknown variable",name);
return False
print(Dev)
if not(width): width=8
if not(bitoffset): bitoffset=0
if Find(D['I2C_devices'],'dev_name',Dev[0]):
print('GetVar ',name,'I2C',Dev,value,'(width=',width,'bitoffset=',bitoffset,')')
result=self.I2CGet(Dev[0],Dev[1],value,I2Ccallback,width,bitoffset)
if not(result): return False;
if scale: value[0]=float(value[0])*float(scale);
return True
if Find(D['SPI_devices'],'dev_name',Dev[0]):
print('GetVar ',name,'SPI',Dev,value)
return SPIget(D,Dev[0],Dev[1],value,I2Ccallback,dev_number)
if Find(D['I2C_bitbang'],'dev_name',Dev[0]):
print('GetVar ',name,'I2C bb',Dev,value)
if not(I2Cbbget(D,Dev[0],Dev[1],value,I2Ccallback,dev_number,width,bitoffset)): return False
if scale: value[0]=float(value[0])*float(scale);
return True;
print("Unknown variable",name)
return False;
# return GetVar(self.D,name,value,I2Ccallback)
def GetVarNames(self,parentname,callback):
for V in self.D['Variables']:
dtype=(1 if V.get('var_scale') else 0)
RWs={'RO' : 1,'WO' : 2,'RW' : 3}
RW=(RWs[V.get('var_R/W')] if V.get('var_R/W') else 0)
callback(parentname+V['var_name'],dtype,RW)
return hwdev.GetVarNames(self,parentname,callback);
def GetMethodNames(self,parentname,callback):
for V in self.D['Methods']:
if not(V.get('method_invisible')): callback(parentname+V['method_name'])
return hwdev.GetMethodNames(self,parentname,callback);
def I2CSet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
#remember of values, add new ones with mask
# print('I2CSet:',dev,'=',value,'(mask=',mask,')')
Dev=Find(self.D['I2C_devices'],'dev_name',dev)
Addr=Dev['dev_address']
Reg=Find(Dev['dev_registers'],'reg_name',reg)
# print(dev,reg,value)
if (width<8) or (bitoffset>0):
previous=Reg.get('previous')
if not(previous):
previous=Reg.get('reg_default')
if not(previous):
previous=0;
value[0]=ApplyMask(value[0],width,bitoffset,previous);
# print('masked to',value)
# print("Set",dev,reg,"to",value)
result=I2Ccallback(Addr,value,reg=Reg['reg_addr'],)
#if result:
Reg['previous']=value[0]
return result
def I2CGet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
Dev=Find(self.D['I2C_devices'],'dev_name',dev)
Addr=Dev['dev_address']
Reg=Find(Dev['dev_registers'],'reg_name',reg)
l1=int(np.floor((width+bitoffset+7)/8))
# print(width,bitoffset,l1)
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=Reg['reg_addr']
if reg>255: #This is for the monitor ADC
if not(I2Ccallback(Addr,int2bytes(reg),read=2)): return False;
I2Ccallback(Addr,[250],read=3)
if not(I2Ccallback(Addr,value2,read=1)): return False;
else:
if not(I2Ccallback(Addr,value2,reg=Reg['reg_addr'],read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
Reg['previous']=value[0]
if (width!=l1*8) or (bitoffset>0):
value[0]=UnMask(value[0],width,bitoffset)
else: value[0]=value2[0]
return True;
#def I2CGetBuff(D,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
# Dev=Find(D['I2C_devices'],'dev_name',dev)
# Addr=Dev['dev_address']
# Reg=Find(Dev['dev_registers'],'reg_name',reg)
# value[0]=Reg.get('previous')
# print("Get Value=",value[0])
# if (width<8) or (bitoffset>0):
# value[0]=UnMask(value[0],width,bitoffset)
# return True;