Skip to content
Snippets Groups Projects

Pypcc2

Merged Paulus Kruger requested to merge pypcc2 into master
Files
36
+ 79
18
@@ -49,7 +49,8 @@ class RCU1():
self.N=number;
self.I2Ccallback=I2Ccallback
self.SWcallback=Switchcallback
self.previous=np.zeros([number,Vars.RCU_storeReg],dtype='int')
self.previous =np.zeros([number,Vars.RCU_storeReg],dtype='int')
self.previousHBA=np.zeros([number,3,32],dtype='int')
def load(self):
Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
@@ -80,13 +81,13 @@ class RCU1():
if Instr.type==Vars.DevType.Instr:
#Execute instructions
Iset=Instr.dev;
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_data_value().Value.Value
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_value()
for i in Iset.instr:
logging.debug(str(("Inst",i)))
self.SetVar(i,Mask=Mask)
return;
if Instr.type in [Vars.DevType.I2C,Vars.DevType.SPIbb,Vars.DevType.I2Cbb]:
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_data_value().Value.Value
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_value()
mask=0;
RCU0=-1;
for RCUi in range(self.N):
@@ -112,7 +113,7 @@ class RCU1():
logging.error("Wrong size of value")
return False
if V1.Vars[0].type==Vars.DevType.Internal:
Instr.dev.OPCW.get_data_value().Value.Value=Instr.value
Instr.dev.OPCW.set_value(Instr.value)
logging.debug("Update internal variable")
return
# if V1.Vars[0].type==Vars.DevType.Internal: return;
@@ -120,10 +121,10 @@ class RCU1():
Step2=int(V1.size/V1.nVars)
if (self.N>1):
Mask=(Vars.RCU_mask if Step==1 else Vars.Ant_mask)
Mask=Mask.OPCW.get_data_value().Value.Value
value1=strs2bytes(Instr.value) if V1.OPCR is None else strs2bytes(V1.OPCR.get_data_value().Value.Value)
Mask=Mask.OPCW.get_value()
value1=strs2bytes(Instr.value) if V1.OPCR is None else strs2bytes(V1.OPCR.get_value())
if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N);
if (Instr.type==Vars.DevType.Var) or ((self.N==1) and (Instr.type==Vars.DevType.VarUpdate)):
if (Instr.type==Vars.DevType.Var):
logging.info(str(('** Set Var:',V1.name,value1)))
for RCUi in range(self.N):
for Vari in range(Step):
@@ -134,11 +135,22 @@ class RCU1():
value2=value1[i0:i1]
self.GetVarValue(RCUi,V1.Vars[Vari],value2)
value1[i0:i1]=value2
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
elif Instr.type==Vars.DevType.VarUpdate:
self.GetVarValueAll(V1,value1)
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
# V1.OPCR.get_data_value().Value.Value=value1
if self.N==1:
for Vari in range(Step):
i0=(Vari)*Step2
i1=(Vari+1)*Step2
value2=value1[i0:i1]
self.GetVarValue(0,V1.Vars[Vari],value2)
value1[i0:i1]=value2
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
else:
self.GetVarValueAll(V1,value1)
# if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
# V1.OPCR.get_data_value().Value.Value=value1
logging.info(str(('** Readback:',V1.name,value1)))
@@ -147,13 +159,14 @@ class RCU1():
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.HBA1:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2CHBA1(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
GetSPIbb(self.SetI2C,self.GetI2C,RCUi,var.devreg,value)
# self.GetSPIBB(RCUi,var.devreg,var.width,var.bitoffset,value)
# logging.error("SPIbb Implemented")
else:
logging.error("Not Implemented")
@@ -183,6 +196,22 @@ class RCU1():
elif V1.Vars[0].type==Vars.DevType.SPIbb:
self.GetBBValueAll(V1,value1,mask)
# logging.info("SPIbb all not implemented yet")
elif V1.Vars[0].type==Vars.DevType.HBA1:
for Vari in range(Step):
var=V1.Vars[Vari]
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
# print(Step,Step2,len(value1),V1.size)
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
self.GetI2CHBA1(RCUi,var.devreg,var.width,var.bitoffset,value2)
value1[i0:i1]=value2
# i0=(RCUi*Step+ Vari)*Step2+16
# i1=(RCUi*Step+(Vari+1))*Step2
# value2=value1[i0:i1]
# self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# value1[i0:i1]=value2
else:
logging.error("Type not implemented")
# print(value1)
@@ -217,6 +246,10 @@ class RCU1():
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.HBA1:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetHBA1I2C(RCUi,var.devreg,var.width,var.bitoffset,value)
# print("RCU1 Set",RCUi,var,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
@@ -234,6 +267,20 @@ class RCU1():
if buffer: return True;
return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def SetHBA1I2C(self,RCUi,dev,width,bitoffset,value,buffer=False):
# if dev.store>0:
previous=self.previousHBA[RCUi,dev.store-1];
L=len(value)
for i in range(L):
value[i]=ApplyMask(value[i],width,bitoffset,previous[i]);
self.previousHBA[RCUi,dev.store-1]=value
# if buffer: return True;
self.I2Ccallback(dev.Addr,value[:16],reg=dev.Register_W)
if L>16: self.I2Ccallback(dev.Addr,value[16:],reg=dev.Register_W+16)
self.I2Ccallback(dev.Addr,[500],reg=dev.Register_W,read=3) #Wait 500ms
return True
# return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def GetI2Cbuffer(self,RCUi,dev,width,bitoffset,value):
if not(dev.store>0): return False;
value[0]=self.previous[RCUi,dev.store-1];
@@ -305,6 +352,20 @@ class RCU1():
#if (len(value)>1) and (width<8): print value
return True;
def GetI2CHBA1(self,RCUi,dev,width,bitoffset,value):
#print(width,len(value))
value2=value
reg=dev.Register_R
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
value[:]=value2[:];
if dev.store>0:
self.previousHBA[RCUi,dev.store-1]=value
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True;
def start(self,Q1):
def RCUthread(Q1):
while True:
@@ -318,13 +379,12 @@ class RCU1():
return RCUthread1
def Queue_Monitor(self,Q1):
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,32,[0]*32)
# Q1.put(Inst1)
def Queue_Monitor(self,Q1,NRCU):
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,NRCU,[0]*NRCU)
# Q1.put(Inst1)
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
# Q1.put(Inst1)
return
return
def AddVars(self,Q1,AddVarR,AddVarW):
for v in Vars.OPC_devvars:
dim1=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars
@@ -333,6 +393,7 @@ class RCU1():
#print(v.name,dim)
varvalue2=0
if v.type==Vars.datatype.dInt: varvalue2=dim2*[0]
elif v.type==Vars.datatype.dbool: varvalue2=dim2*[False]
elif v.type==Vars.datatype.dfloat: varvalue2=dim2*[0.0]
elif v.type==Vars.datatype.dstring: varvalue2=dim1*[" "*dim3]
print(len(varvalue2),varvalue2)
Loading