Skip to content
Snippets Groups Projects

Pypcc2

Merged Paulus Kruger requested to merge pypcc2 into master
52 files
+ 3123
1910
Compare changes
  • Side-by-side
  • Inline
Files
52
+ 0
414
from . import Vars
import numpy as np
import logging
from .spibitbang2 import *
import threading
def ApplyMask(value,width=8,bitoffset=0,previous=0):
mask=(1<<width)-1
if bitoffset>0:
value<<=bitoffset;
mask<<=bitoffset;
return (value & mask) + (previous - (previous & mask));
def UnMask(value,width=8,bitoffset=0):
mask=(1<<width)-1
if bitoffset>0:
value>>=bitoffset;
value=value&mask;
return value;
def bytes2int(bts):
x=0;
for b in bts:
x=x*256+b;
return x;
def int2bytes(i):
b=[];
while i>255:
b=[i%256]+b;
i>>=8;
return [i]+b;
def strs2bytes(var):
# print("str2bytes",var)
if len(var)==0: return var;
if isinstance(var[0],str): #make string a byte array
# return [ord(c.encode('utf-8')[0]) for s in var for c in s]
return [c.encode('utf-8')[0] for s in var for c in s]
return var
def bytes2strs(var,step,dtype):
if not(dtype==Vars.datatype.dstring): return var
cnt=int(len(var)/step)
print(var)
return [(bytearray(var[i*step:(i+1)*step]).decode("utf-8")) for i in range(cnt)]
class RCU1():
def __init__(self,number,I2Ccallback,Switchcallback):
self.N=number;
self.I2Ccallback=I2Ccallback
self.SWcallback=Switchcallback
self.previous =np.zeros([number,Vars.RCU_storeReg],dtype='int')
self.previousHBA=np.zeros([number,3,32],dtype='int')
def load(self):
Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
self.SetVar(Inst1)
#Vars.RCU_mask.OPCW.get_data_value().Value.Value=[1,1,0,0]
#Vars.Ant_mask.OPCW.get_data_value().Value.Value=[1,1,0,0,0,0,0,0,0,0,1,0]
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Var,Vars.RCU_att,12,[0,1,2,3,4,5,6,7,8,9,11])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_off,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_on,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.ADC1_on,0,[])
#RCU.SetVar(Inst1)
#print(Vars.RCU)
def SetVar(self,Instr,Mask=[]):
# Instr.value=strs2bytes(Instr.value) #Alwast be an array of bytes
if (self.N==1): Mask=[True]
if Instr.type==Vars.DevType.Instr:
#Execute instructions
Iset=Instr.dev;
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_value()
for i in Iset.instr:
logging.debug(str(("Inst",i)))
self.SetVar(i,Mask=Mask)
return;
if Instr.type in [Vars.DevType.I2C,Vars.DevType.SPIbb,Vars.DevType.I2Cbb]:
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_value()
mask=0;
RCU0=-1;
for RCUi in range(self.N):
if (Mask[RCUi]):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
if RCU0<0: RCU0=RCUi;
if RCU0<0: return; #Mask all zero
self.SWcallback(mask)
if Instr.type==Vars.DevType.I2C:
logging.info(str(('** Set I2C:',Instr.dev,Instr.value)))
self.SetI2C(RCU0,Instr.dev,8,0,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.SPIbb:
logging.debug(str(('** Set SPIbb:',Instr.dev,Instr.value)))
SetSPIbb(self.SetI2C,RCU0,Instr.dev,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.I2Cbb:
logging.info(str(('** Set I2Cbb:',Instr.dev,Instr.value)))
# self.SetI2C(RCUi,Instr.dev,8,0,strs2bytes(Instr.value))
return;
V1=Instr.dev
if not((Instr.nvalue==V1.nVars) and (Instr.type==Vars.DevType.VarUpdate)) and not(Instr.nvalue==V1.size*self.N):
logging.error("Wrong size of value")
return False
if V1.Vars[0].type==Vars.DevType.Internal:
Instr.dev.OPCW.set_value(Instr.value)
logging.debug("Update internal variable")
return
# if V1.Vars[0].type==Vars.DevType.Internal: return;
Step=V1.nVars
Step2=int(V1.size/V1.nVars)
if (self.N>1):
Mask=(Vars.RCU_mask if Step==1 else Vars.Ant_mask)
Mask=Mask.OPCW.get_value()
value1=strs2bytes(Instr.value) if V1.OPCR is None else strs2bytes(V1.OPCR.get_value())
if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N);
if (Instr.type==Vars.DevType.Var):
logging.info(str(('** Set Var:',V1.name,value1)))
for RCUi in range(self.N):
for Vari in range(Step):
if not(Mask[RCUi*Step+Vari]): continue
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
self.SetVarValue(RCUi,V1.Vars[Vari],Instr.value[i0:i1])
value2=value1[i0:i1]
self.GetVarValue(RCUi,V1.Vars[Vari],value2)
value1[i0:i1]=value2
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
elif Instr.type==Vars.DevType.VarUpdate:
if self.N==1:
for Vari in range(Step):
i0=(Vari)*Step2
i1=(Vari+1)*Step2
value2=value1[i0:i1]
self.GetVarValue(0,V1.Vars[Vari],value2)
value1[i0:i1]=value2
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
else:
self.GetVarValueAll(V1,value1)
# if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
# V1.OPCR.get_data_value().Value.Value=value1
logging.info(str(('** Readback:',V1.name,value1)))
def GetVarValue(self,RCUi,var,value):
logging.info(str(("RCU1 Get ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.HBA1:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2CHBA1(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
GetSPIbb(self.SetI2C,self.GetI2C,RCUi,var.devreg,value)
else:
logging.error("Not Implemented")
def GetVarValueAll(self,V1,value1):
mask=0;
for RCUi in range(self.N):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
Step=V1.nVars
Step2=int(V1.size/V1.nVars)
if V1.Vars[0].type==Vars.DevType.I2C:
for Vari in range(Step):
DevReg=V1.Vars[Vari].devreg
self.SWcallback(mask)
self.SetI2CAddr(self,DevReg)
if DevReg.Register_R>255: self.I2Ccallback(DevReg.Addr,[250],read=3) #Wait for ADC
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
var=V1.Vars[Vari]
# print(Step,Step2,i0,i1,value2,len(value1))
self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# print(value2)
if (var.Scale!=0): value2[0]*=var.Scale;
value1[i0:i1]=value2
elif V1.Vars[0].type==Vars.DevType.SPIbb:
self.GetBBValueAll(V1,value1,mask)
# logging.info("SPIbb all not implemented yet")
elif V1.Vars[0].type==Vars.DevType.HBA1:
for Vari in range(Step):
var=V1.Vars[Vari]
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
# print(Step,Step2,len(value1),V1.size)
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
self.GetI2CHBA1(RCUi,var.devreg,var.width,var.bitoffset,value2)
value1[i0:i1]=value2
# i0=(RCUi*Step+ Vari)*Step2+16
# i1=(RCUi*Step+(Vari+1))*Step2
# value2=value1[i0:i1]
# self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# value1[i0:i1]=value2
else:
logging.error("Type not implemented")
# print(value1)
def GetBBValueAll(self,V1,value1,mask):
def SetBit(RCUi,dev,width,bitoffset,value,buffer=False):
if not(buffer): self.SWcallback(mask)
self.SetI2C(RCUi,dev,width,bitoffset,value,buffer=buffer)
def GetBit(RCUixx,dev,width,bitoffset,buffer=False):
value=[0 for RCUi in range(self.N)]
value2=[0]
if buffer:
for RCUi in range(self.N):
self.GetI2Cbuffer(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value
self.SWcallback(mask)
self.SetI2CAddr(self,dev)
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
#for i in range(len(value2)): value2[i]*=0;
value2[0]=0
self.GetI2Cnoreg(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value;
devs=[V1.Vars[Vari].devreg for Vari in range(V1.nVars)]
GetSPIbb2(SetBit,GetBit,0,devs,value1)
def SetVarValue(self,RCUi,var,value):
if var.devreg.Register_W==-1: return True; #We can not set it, only read it e.g. temperature
logging.debug(str(("RCU1 Set ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.HBA1:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetHBA1I2C(RCUi,var.devreg,var.width,var.bitoffset,value)
# print("RCU1 Set",RCUi,var,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
logging.error("SPIbb Implemented")
else:
logging.error("Not Implemented")
def SetI2C(self,RCUi,dev,width,bitoffset,value,buffer=False):
if dev.store>0:
previous=self.previous[RCUi,dev.store-1];
value[0]=ApplyMask(value[0],width,bitoffset,previous);
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(('masked to',value)))
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if buffer: return True;
return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def SetHBA1I2C(self,RCUi,dev,width,bitoffset,value,buffer=False):
# if dev.store>0:
previous=self.previousHBA[RCUi,dev.store-1];
L=len(value)
for i in range(L):
value[i]=ApplyMask(value[i],width,bitoffset,previous[i]);
self.previousHBA[RCUi,dev.store-1]=value
# if buffer: return True;
self.I2Ccallback(dev.Addr,value[:16],reg=dev.Register_W)
if L>16: self.I2Ccallback(dev.Addr,value[16:],reg=dev.Register_W+16)
self.I2Ccallback(dev.Addr,[500],reg=dev.Register_W,read=3) #Wait 500ms
return True
# return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def GetI2Cbuffer(self,RCUi,dev,width,bitoffset,value):
if not(dev.store>0): return False;
value[0]=self.previous[RCUi,dev.store-1];
# logging.debug(str(("GetI2Cbuffer",RCUi,dev.store,value)))
l1=int(np.floor((width+bitoffset+7)/8))
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True
def GetI2C(self,RCUi,dev,width,bitoffset,value):
# if dev.store>0:
# value[0]=self.previous[RCUi,dev.store-1]
# return True
l1=int(np.floor((width+bitoffset+7)/8))
# print(width,bitoffset,l1)
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if reg>255: #This is for the monitor ADC
if not(self.I2Ccallback(dev.Addr,int2bytes(reg),read=2)): return False;
I2Ccallback(Addr,[250],read=3)
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
else:
if not(self.I2Ccallback(dev.Addr,value2,reg=reg,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
else: value[0]=value2[0]
return True;
def GetI2Cbit(self,RCUi,dev,pin):
value2=[value]
self.I2CGet(RCUi,dev,1,1<<pin,value2)
return value2[0]
def SetI2CAddr(self,RCUi,dev):
return self.I2Ccallback(dev.Addr,int2bytes(dev.Register_R),read=2)
def GetI2Cnoreg(self,RCUi,dev,width,bitoffset,value):
#print(width,len(value))
l1=int(np.floor((width+bitoffset+7)/8))
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
# if width<8:
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
# value[0]=UnMask(value[0],width,bitoffset)
#else: value[0]=value2[0]
#if (len(value)>1) and (width<8): print value
return True;
def GetI2CHBA1(self,RCUi,dev,width,bitoffset,value):
#print(width,len(value))
value2=value
reg=dev.Register_R
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
value[:]=value2[:];
if dev.store>0:
self.previousHBA[RCUi,dev.store-1]=value
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True;
def start(self,Q1):
def RCUthread(Q1):
while True:
item = Q1.get()
if item is None: break;
self.SetVar(item)
logging.info("End RCU thread")
RCUthread1 = threading.Thread(target=RCUthread, args=(Q1,))
RCUthread1.start()
return RCUthread1
def Queue_Monitor(self,Q1,NRCU):
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,NRCU,[0]*NRCU)
# Q1.put(Inst1)
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
# Q1.put(Inst1)
return
def AddVars(self,Q1,AddVarR,AddVarW):
for v in Vars.OPC_devvars:
dim1=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars
dim2=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.size
dim3=int(v.size/v.nVars)
#print(v.name,dim)
varvalue2=0
if v.type==Vars.datatype.dInt: varvalue2=dim2*[0]
elif v.type==Vars.datatype.dbool: varvalue2=dim2*[False]
elif v.type==Vars.datatype.dfloat: varvalue2=dim2*[0.0]
elif v.type==Vars.datatype.dstring: varvalue2=dim1*[" "*dim3]
print(len(varvalue2),varvalue2)
if v.RW in [Vars.RW.ReadOnly,Vars.RW.ReadWrite]:
var1=AddVarR(v.name+"_R",varvalue2,v)
# print(len(varvalue1),len(varvalue2),v.size,dim2)
v.OPCR=var1
Inst=Vars.Instr(Vars.DevType.VarUpdate,v,dim2,varvalue2)
Q1.put(Inst)
if v.RW in [Vars.RW.WriteOnly,Vars.RW.ReadWrite]:
var1=AddVarW(v.name+"_RW",varvalue2,v,Q1)
v.OPCW=var1
def AddMethod(self,Q1,Addmethod):
for v in Vars.OPC_methods:
Inst1=Vars.Instr(Vars.DevType.Instr,v,0,[])
Addmethod(v.name,Inst1,Q1)
Loading