From b5371b923263719a33a95b0a24924f2c611596a7 Mon Sep 17 00:00:00 2001 From: kruger <kruger@astron.nl> Date: Thu, 18 Feb 2021 16:11:45 +0100 Subject: [PATCH] CLK SPI bitbang added,not tested --- clk/CLK.py | 110 ++++++++++++++++++++++++++++++----------- clk/HWconf.py | 6 +++ clk/Vars.py | 33 +++++++++++-- clk/spibitbang2.py | 79 +++++++++++++++++++++++++++++ i2c/I2C.py | 29 +++++++++++ i2c/I2C_dummy.py | 13 +++++ i2c/I2Cswitch1.py | 11 +++++ pypcc2.py | 11 +++-- rcu/Vars.py | 2 +- scripts/ADCreset.py | 13 +++++ scripts/Att.py | 26 ++++++++++ scripts/LED.py | 19 +++++++ scripts/RCUupdate.py | 8 +++ scripts/SetHBA_BF.py | 27 ++++++++++ scripts/SetHBA_LED.py | 26 ++++++++++ scripts/SetMonitor.py | 10 ++++ scripts/test1.py | 23 +++++++++ scripts/test_common.py | 68 +++++++++++++++++++++++++ 18 files changed, 473 insertions(+), 41 deletions(-) create mode 100644 clk/spibitbang2.py create mode 100644 scripts/ADCreset.py create mode 100644 scripts/Att.py create mode 100644 scripts/LED.py create mode 100644 scripts/RCUupdate.py create mode 100644 scripts/SetHBA_BF.py create mode 100644 scripts/SetHBA_LED.py create mode 100644 scripts/SetMonitor.py create mode 100644 scripts/test1.py create mode 100644 scripts/test_common.py diff --git a/clk/CLK.py b/clk/CLK.py index a7dee31..f5a70f1 100644 --- a/clk/CLK.py +++ b/clk/CLK.py @@ -1,7 +1,7 @@ from . import Vars import numpy as np import logging -#from .spibitbang1 import * +from .spibitbang2 import * import threading @@ -30,6 +30,20 @@ def int2bytes(i): i>>=8; return [i]+b; +def strs2bytes(var): +# print("str2bytes",var) + if len(var)==0: return var; + if isinstance(var[0],str): #make string a byte array +# return [ord(c.encode('utf-8')[0]) for s in var for c in s] + return [c.encode('utf-8')[0] for s in var for c in s] + return var + +def bytes2strs(var,step,dtype): + if not(dtype==Vars.datatype.dstring): return var + cnt=int(len(var)/step) + print(var) + return [(bytearray(var[i*step:(i+1)*step]).decode("utf-8")) for i in range(cnt)] + class RCU1(): def __init__(self,number,I2Ccallback,Switchcallback): self.N=number; @@ -40,7 +54,6 @@ class RCU1(): def load(self): Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs self.SetVar(Inst1) - #Vars.RCU_mask.OPCW.get_data_value().Value.Value=[1,1,0,0] #Vars.Ant_mask.OPCW.get_data_value().Value.Value=[1,1,0,0,0,0,0,0,0,0,1,0] @@ -61,36 +74,41 @@ class RCU1(): #print(Vars.RCU) - def SetVar(self,Instr): + def SetVar(self,Instr,Mask=[]): +# Instr.value=strs2bytes(Instr.value) #Alwast be an array of bytes + if (self.N==1): Mask=[True] if Instr.type==Vars.DevType.Instr: #Execute instructions Iset=Instr.dev; + if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_data_value().Value.Value for i in Iset.instr: logging.debug(str(("Inst",i))) - self.SetVar(i) + self.SetVar(i,Mask=Mask) return; if Instr.type in [Vars.DevType.I2C,Vars.DevType.SPIbb,Vars.DevType.I2Cbb]: + if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_data_value().Value.Value + mask=0; RCU0=-1; - mask=0 for RCUi in range(self.N): + if (Mask[RCUi]): mask|=1<<Vars.RCU_MPaddr.Switch[RCUi] if RCU0<0: RCU0=RCUi; if RCU0<0: return; #Mask all zero self.SWcallback(mask) if Instr.type==Vars.DevType.I2C: logging.info(str(('** Set I2C:',Instr.dev,Instr.value))) - self.SetI2C(RCU0,Instr.dev,8,0,Instr.value) + self.SetI2C(RCU0,Instr.dev,8,0,strs2bytes(Instr.value)) return; elif Instr.type==Vars.DevType.SPIbb: logging.debug(str(('** Set SPIbb:',Instr.dev,Instr.value))) - SetSPIbb(self.SetI2C,RCU0,Instr.dev,Instr.value) + SetSPIbb(self.SetI2C,RCU0,Instr.dev,strs2bytes(Instr.value)) return; elif Instr.type==Vars.DevType.I2Cbb: logging.info(str(('** Set I2Cbb:',Instr.dev,Instr.value))) -# self.SetI2C(RCUi,Instr.dev,8,0,Instr.value) +# self.SetI2C(RCUi,Instr.dev,8,0,strs2bytes(Instr.value)) return; V1=Instr.dev - if not((Instr.nvalue==V1.nVars) and (Instr.type==Vars.DevType.VarUpdate)) and not(Instr.nvalue==V1.nVars*self.N): + if not((Instr.nvalue==V1.nVars) and (Instr.type==Vars.DevType.VarUpdate)) and not(Instr.nvalue==V1.size*self.N): logging.error("Wrong size of value") return False if V1.Vars[0].type==Vars.DevType.Internal: @@ -99,21 +117,27 @@ class RCU1(): return # if V1.Vars[0].type==Vars.DevType.Internal: return; Step=V1.nVars - value1=Instr.value if V1.OPCR is None else V1.OPCR.get_data_value().Value.Value + Step2=int(V1.size/V1.nVars) + if (self.N>1): + Mask=(Vars.RCU_mask if Step==1 else Vars.Ant_mask) + Mask=Mask.OPCW.get_data_value().Value.Value + value1=strs2bytes(Instr.value) if V1.OPCR is None else strs2bytes(V1.OPCR.get_data_value().Value.Value) if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N); - if Instr.type==Vars.DevType.Var: + if (Instr.type==Vars.DevType.Var) or ((self.N==1) and (Instr.type==Vars.DevType.VarUpdate)): logging.info(str(('** Set Var:',V1.name,value1))) for RCUi in range(self.N): for Vari in range(Step): - self.SetVarValue(RCUi,V1.Vars[Vari],Instr.value[RCUi*Step+Vari:RCUi*Step+Vari+1]) - value2=value1[RCUi*Step+Vari:RCUi*Step+Vari+1] + if not(Mask[RCUi*Step+Vari]): continue + i0=(RCUi*Step+ Vari)*Step2 + i1=(RCUi*Step+(Vari+1))*Step2 + self.SetVarValue(RCUi,V1.Vars[Vari],Instr.value[i0:i1]) + value2=value1[i0:i1] self.GetVarValue(RCUi,V1.Vars[Vari],value2) - value1[RCUi*Step+Vari:RCUi*Step+Vari+1]=value2 - if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=value1 - + value1[i0:i1]=value2 + if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type) elif Instr.type==Vars.DevType.VarUpdate: - self.GetVarValueAll(V1,value1) - if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=value1 + self.GetVarValueAll(V1,value1) + if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type) # V1.OPCR.get_data_value().Value.Value=value1 logging.info(str(('** Readback:',V1.name,value1))) @@ -126,7 +150,10 @@ class RCU1(): elif var.type==Vars.DevType.I2Cbb: logging.error("I2Cbb Implemented") elif var.type==Vars.DevType.SPIbb: - logging.error("SPIbb Implemented") + self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi]) + GetSPIbb(self.SetI2C,self.GetI2C,RCUi,var.devreg,value) +# self.GetSPIBB(RCUi,var.devreg,var.width,var.bitoffset,value) +# logging.error("SPIbb Implemented") else: logging.error("Not Implemented") @@ -135,6 +162,7 @@ class RCU1(): for RCUi in range(self.N): mask|=1<<Vars.RCU_MPaddr.Switch[RCUi] Step=V1.nVars + Step2=int(V1.size/V1.nVars) if V1.Vars[0].type==Vars.DevType.I2C: for Vari in range(Step): DevReg=V1.Vars[Vari].devreg @@ -143,16 +171,21 @@ class RCU1(): if DevReg.Register_R>255: self.I2Ccallback(DevReg.Addr,[250],read=3) #Wait for ADC for RCUi in range(self.N): self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi]) - value2=value1[RCUi*Step+Vari:RCUi*Step+Vari+1] + i0=(RCUi*Step+ Vari)*Step2 + i1=(RCUi*Step+(Vari+1))*Step2 + value2=value1[i0:i1] var=V1.Vars[Vari] +# print(Step,Step2,i0,i1,value2,len(value1)) self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2) +# print(value2) if (var.Scale!=0): value2[0]*=var.Scale; - value1[RCUi*Step+Vari:RCUi*Step+Vari+1]=value2 + value1[i0:i1]=value2 elif V1.Vars[0].type==Vars.DevType.SPIbb: self.GetBBValueAll(V1,value1,mask) # logging.info("SPIbb all not implemented yet") else: logging.error("Type not implemented") +# print(value1) def GetBBValueAll(self,V1,value1,mask): def SetBit(RCUi,dev,width,bitoffset,value,buffer=False): if not(buffer): self.SWcallback(mask) @@ -169,6 +202,8 @@ class RCU1(): self.SetI2CAddr(self,dev) for RCUi in range(self.N): self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi]) + #for i in range(len(value2)): value2[i]*=0; + value2[0]=0 self.GetI2Cnoreg(RCUi,dev,width,bitoffset,value2) value[RCUi]=value2[0] return value; @@ -205,7 +240,8 @@ class RCU1(): # logging.debug(str(("GetI2Cbuffer",RCUi,dev.store,value))) l1=int(np.floor((width+bitoffset+7)/8)) if (width!=l1*8) or (bitoffset>0): - value[0]=UnMask(value[0],width,bitoffset) + for i in range(len(value)): + value[i]=UnMask(value[i],width,bitoffset) return True def GetI2C(self,RCUi,dev,width,bitoffset,value): @@ -231,7 +267,8 @@ class RCU1(): self.previous[RCUi,dev.store-1]=value[0] # logging.debug(str(("Store buffer",RCUi,dev.store,value[0]))) if (width!=l1*8) or (bitoffset>0): - value[0]=UnMask(value[0],width,bitoffset) + for i in range(len(value)): + value[i]=UnMask(value[i],width,bitoffset) else: value[0]=value2[0] return True; def GetI2Cbit(self,RCUi,dev,pin): @@ -246,6 +283,7 @@ class RCU1(): def GetI2Cnoreg(self,RCUi,dev,width,bitoffset,value): + #print(width,len(value)) l1=int(np.floor((width+bitoffset+7)/8)) makesinglevalue=((len(value)==1) and (l1>1)); if makesinglevalue: value2=[0 for x in range(l1)] @@ -258,9 +296,13 @@ class RCU1(): if dev.store>0: self.previous[RCUi,dev.store-1]=value[0] # logging.debug(str(("Store buffer",RCUi,dev.store,value[0]))) - if (width!=l1*8) or (bitoffset>0): - value[0]=UnMask(value[0],width,bitoffset) +# if width<8: + if (width!=l1*8) or (bitoffset>0): + for i in range(len(value)): + value[i]=UnMask(value[i],width,bitoffset) +# value[0]=UnMask(value[0],width,bitoffset) #else: value[0]=value2[0] + #if (len(value)>1) and (width<8): print value return True; def start(self,Q1): @@ -276,21 +318,29 @@ class RCU1(): return RCUthread1 -# def Queue_Monitor(self,Q1): + def Queue_Monitor(self,Q1): # Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,32,[0]*32) # Q1.put(Inst1) # Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96) # Q1.put(Inst1) - + return def AddVars(self,Q1,AddVarR,AddVarW): for v in Vars.OPC_devvars: - dim=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars - varvalue2=(dim*[0.0] if v.type==Vars.datatype.dfloat else dim*[0]) + dim1=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars + dim2=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.size + dim3=int(v.size/v.nVars) + #print(v.name,dim) + varvalue2=0 + if v.type==Vars.datatype.dInt: varvalue2=dim2*[0] + elif v.type==Vars.datatype.dfloat: varvalue2=dim2*[0.0] + elif v.type==Vars.datatype.dstring: varvalue2=dim1*[" "*dim3] + print(len(varvalue2),varvalue2) if v.RW in [Vars.RW.ReadOnly,Vars.RW.ReadWrite]: var1=AddVarR(v.name+"_R",varvalue2,v) +# print(len(varvalue1),len(varvalue2),v.size,dim2) v.OPCR=var1 - Inst=Vars.Instr(Vars.DevType.VarUpdate,v,dim,varvalue2) + Inst=Vars.Instr(Vars.DevType.VarUpdate,v,dim2,varvalue2) Q1.put(Inst) if v.RW in [Vars.RW.WriteOnly,Vars.RW.ReadWrite]: diff --git a/clk/HWconf.py b/clk/HWconf.py index 95060df..36f1437 100644 --- a/clk/HWconf.py +++ b/clk/HWconf.py @@ -11,3 +11,9 @@ CLK_IO3_CONF1=DevReg(0x20,6,6,3) CLK_IO3_CONF2=DevReg(0x20,7,7,4) RCU_storeReg=4; #Number of stored registers + + + +SPIBB_PLL=BBdev(4,[CLK_IO3_OUT1,CLK_IO3_OUT1,CLK_IO3_OUT1,CLK_IO3_OUT2],[4,7,5,6],0) #CLK,SDI,SDO,CS + +CLK_PLL_lock =DevReg(SPIBB_PLL,0X00,0X00,0) # PLL locked status diff --git a/clk/Vars.py b/clk/Vars.py index 4127783..20c67ab 100644 --- a/clk/Vars.py +++ b/clk/Vars.py @@ -1,10 +1,15 @@ from .HWconf import * -CLK_IGNORE_PPS= VarArray("CLK_Ignore_PPS",1,[Var2dev("",I2Cmodules.CLK,DevType.I2C,CLK_IO3_OUT1,1 ,6,1)],RW.ReadOnly,datatype.dInt,1,None,None) -CLK_Enable_PWR= VarArray("CLK_Enable_PWR",1,[Var2dev("",I2Cmodules.CLK,DevType.I2C,CLK_IO3_OUT1,1 ,7,1)],RW.ReadOnly,datatype.dInt,1,None,None) -CLK_Stat1 = VarArray("CLK_Stat" ,1,[Var2dev("",I2Cmodules.CLK,DevType.I2C,CLK_IO3_OUT1,1 ,4,1)],RW.ReadOnly,datatype.dInt,1,None,None) +CLKmod=I2Cmodules.CLK -OPC_devvars=[CLK_IGNORE_PPS,CLK_Enable_PWR,CLK_Stat1] + +CLK_IGNORE_PPS= VarArray("CLK_Ignore_PPS",1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,6,1)],RW.ReadOnly,datatype.dInt,1,None,None) +CLK_Enable_PWR= VarArray("CLK_Enable_PWR",1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,7,1)],RW.ReadOnly,datatype.dInt,1,None,None) +CLK_Stat1 = VarArray("CLK_Stat" ,1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,4,1)],RW.ReadOnly,datatype.dInt,1,None,None) +CLK_lock1 = VarArray("CLK_Lock" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_lock,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None) + + +OPC_devvars=[CLK_IGNORE_PPS,CLK_Enable_PWR,CLK_Stat1,CLK_lock1] RCU_init=Instrs("ReadRegisters",2,[ Instr(DevType.VarUpdate,CLK_IGNORE_PPS,1,[0]), @@ -21,4 +26,22 @@ CLK_off=Instrs("CLK_off",2,[ Instr(DevType.I2C,CLK_IO3_OUT1 ,1,[0x40]), ]) -OPC_methods=[CLK_on] +def SPIinst(reg,value): + return Instr(DevType.SPIbb,DevReg(SPIBB_PLL,0,reg,0),1,[value]), +CLK_PLL_setup=Instrs("CLK_PLL_setup",2,[ + SPIinst(0x03,0x08), + SPIinst(0x05,0x97), + SPIinst(0x06,0x10), + SPIinst(0x07,0x04), + SPIinst(0x08,0x01), + SPIinst(0x07,0x00), + SPIinst(0x09,0x10), + SPIinst(0x0A,0x14), + SPIinst(0x09,0x00), + SPIinst(0x0D,0x01),#was 2 + SPIinst(0x0F,0x01), + SPIinst(0x11,0x01), + SPIinst(0x13,0x01) +]) + +OPC_methods=[CLK_on,CLK_off,CLK_PLL_setup] diff --git a/clk/spibitbang2.py b/clk/spibitbang2.py new file mode 100644 index 0000000..d7d6be0 --- /dev/null +++ b/clk/spibitbang2.py @@ -0,0 +1,79 @@ +from enum import Enum +import logging +import numpy as np + +class SPIBB_pins(Enum): + CLK = 0 + SDI = 1 + SDO = 2 + CS = 3 + +def SetSPIbb(SetI2C,RCUi,dev,value): + ADC_address=dev.Register_W + CSdev=dev.Addr.devs[SPIBB_pins.CS.value] + CSpin=dev.Addr.pins[SPIBB_pins.CS.value] + SDOdev=dev.Addr.devs[SPIBB_pins.SDO.value] + SDOpin=dev.Addr.pins[SPIBB_pins.SDO.value] + SDIdev=dev.Addr.devs[SPIBB_pins.SDI.value] + SDIpin=dev.Addr.pins[SPIBB_pins.SDI.value] + CLKdev=dev.Addr.devs[SPIBB_pins.CLK.value] + CLKpin=dev.Addr.pins[SPIBB_pins.CLK.value] + + logging.info(str(("SPIbb set",ADC_address,value))) + +# dev_rw = 0x00 # 0 for write, 1 for read +# data2 = ( reg_address << 9 ) + ( dev_rw << 8 ) + value[0] + data2 = ( ADC_address << 9 ) + value[0] + + bit_array = "{0:{fill}16b}".format(data2, fill='0') + # print(bit_array) + SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable + for bit in bit_array: + SetI2C(RCUi,SDIdev,1,SDIpin,[int(bit)]) + SetI2C(RCUi,CLKdev,1,CLKpin,[1]) + SetI2C(RCUi,CLKdev,1,CLKpin,[0]) + SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable + # SetI2C(RCUi,SDIdev,1,SDIpin,[1]) #high when finished + return True; + +def GetSPIbb(SetI2C,GetI2C,RCUi,dev,value): + ADC_reg_address=dev.Register_R + CSdev=dev.Addr.devs[SPIBB_pins.CS.value] + CSpin=dev.Addr.pins[SPIBB_pins.CS.value] + SDOdev=dev.Addr.devs[SPIBB_pins.SDO.value] + SDOpin=dev.Addr.pins[SPIBB_pins.SDO.value] + SDIdev=dev.Addr.devs[SPIBB_pins.SDI.value] + SDIpin=dev.Addr.pins[SPIBB_pins.SDI.value] + CLKdev=dev.Addr.devs[SPIBB_pins.CLK.value] + CLKpin=dev.Addr.pins[SPIBB_pins.CLK.value] + + logging.info(str(("SPIbb get",ADC_reg_address))) + ADC_bytes = 0x00 + # ADC_rw = 0x01 # 0 for write, 1 for read + data = (ADC_reg_address << 1) + 1 #was 7?? + + SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable + + + bit_array = "{0:{fill}8b}".format(data, fill='0') + for bit in bit_array: + SetI2C(RCUi,SDIdev,1,SDIpin,[int(bit)]) + SetI2C(RCUi,CLKdev,1,CLKpin,[1]) + SetI2C(RCUi,CLKdev,1,CLKpin,[0]) + + # SetI2C(RCUi,SDIdev,1,SDIpin,[1]) #high when finished + + # print("read byte") + a=[0] + N=1 #len(value) + ret_value=[0] + for i in range(N): value[i]=0 + for cnt in range(8*(ADC_bytes+1)): + GetI2C(RCUi,SDOdev,1,SDOpin,ret_value) + for i in range(N): value[i]=(value[i]<<1)+ ret_value[i] + SetI2C(RCUi,CLKdev,1,CLKpin,[1]) + SetI2C(RCUi,CLKdev,1,CLKpin,[0]) #read after falling edge + SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable + return True; + + diff --git a/i2c/I2C.py b/i2c/I2C.py index 1abf950..ceb43d6 100644 --- a/i2c/I2C.py +++ b/i2c/I2C.py @@ -36,3 +36,32 @@ def I2C1server(addr,data,reg=None,read=0): # data[0]=0 return False; + +def I2C2server(addr,data,reg=None,read=0): + try: + if read==3: + time.sleep(data[0]/1000.) + return True + logging.debug(str(("I2C2",addr,reg,data,read))) + +# print("I2C",addr,reg,data,read) +# return True; + bus=pylibi2c.I2CDevice('/dev/i2c-2',addr) + if read==1: + length=len(data) + bus.iaddr_bytes=0 + if not(reg is None): + bus.ioctl_write(0,str(bytearray([reg]))) + data[:]=[int(x) for x in bus.ioctl_read(0,length)] + else: + if reg is None: + bus.iaddr_bytes=0 + reg=0; + bus.ioctl_write(reg,str(bytearray(data))) + bus.close() + return True; + except: + if bus: bus.close() +# data[0]=0 + return False; + diff --git a/i2c/I2C_dummy.py b/i2c/I2C_dummy.py index 63000d0..d6351b0 100644 --- a/i2c/I2C_dummy.py +++ b/i2c/I2C_dummy.py @@ -19,3 +19,16 @@ def I2C1server(addr,data,reg=None,read=0): I2Ccounter+=len(data) # logging.debug(str(("I2C",addr,reg,data,read))) return True + +def I2C2server(addr,data,reg=None,read=0): + global I2Ccounter; + logging.debug(str(("I2C2",addr,reg,data,read))) + if read==3: + time.sleep(data[0]/1000.) + return True + if read==1: + data[0]=0 + if reg: I2Ccounter+=1; + I2Ccounter+=len(data) +# logging.debug(str(("I2C",addr,reg,data,read))) + return True diff --git a/i2c/I2Cswitch1.py b/i2c/I2Cswitch1.py index 87382d0..50a72c6 100644 --- a/i2c/I2Cswitch1.py +++ b/i2c/I2Cswitch1.py @@ -16,3 +16,14 @@ class I2Cswitch1(): def I2Ccallback(self,RCU,addr,data,reg=None,read=0): self.callback1(addr,data,reg,read) + + +class I2Cswitch0(): + def __init__(self,callback1): + return + + def SetChannel(self,channel): + return True + + def I2Ccallback(self,RCU,addr,data,reg=None,read=0): + return \ No newline at end of file diff --git a/pypcc2.py b/pypcc2.py index 7d7b53e..9907c49 100644 --- a/pypcc2.py +++ b/pypcc2.py @@ -41,16 +41,17 @@ opcuaserv.InitServer(port=args.port) logging.info("OPC-UA Server started") SW1=I2Cswitch1.I2Cswitch1(I2C.I2C1server) +SW0=I2Cswitch1.I2Cswitch0(I2C.I2C2server) #Dummy switch as their is no switch on LTS RCU=RCU.RCU1(32,I2C.I2C1server,SW1.SetChannel) RCU.AddVars(Q1,opcuaserv.AddVarR,opcuaserv.AddVarW) RCU.AddMethod(Q1,opcuaserv.Addmethod) RCU.load() #Load current register values from HW -#CLK=CLK.RCU1(1,I2C.I2C1server,SW1.SetChannel) -#CLK.AddVars(Q2,opcuaserv.AddVarR,opcuaserv.AddVarW) -#CLK.AddMethod(Q2,opcuaserv.Addmethod) -#CLK.load() #Load current register values from HW +CLK=CLK.RCU1(1,I2C.I2C2server,SW0.SetChannel) +CLK.AddVars(Q2,opcuaserv.AddVarR,opcuaserv.AddVarW) +CLK.AddMethod(Q2,opcuaserv.Addmethod) +CLK.load() #Load current register values from HW #logging.debug(str(("I2C bytes=",I2C.I2Ccounter))) @@ -59,7 +60,7 @@ if False: exit() RCUthread1=RCU.start(Q1) -#CLKthread1=CLK.start(Q2) +CLKthread1=CLK.start(Q2) RunTimer=True; def TimerThread(Q1,RCU): diff --git a/rcu/Vars.py b/rcu/Vars.py index 32ad234..b8b3896 100644 --- a/rcu/Vars.py +++ b/rcu/Vars.py @@ -97,7 +97,7 @@ RCU_OUT2=VarArray("RCU_OUT2",3,[RCU_IO1_2,RCU_IO2_2,RCU_IO3_2],RW.ReadOnly,datat -OPC_devvars=[RCU_mask,Ant_mask,RCU_att,RCU_band,RCU_temp,RCU_pwrd,RCU_LED,RCU_ADC_lock,RCU_ADC_SYNC,RCU_ADC_JESD,RCU_ADC_CML,RCU_OUT1,RCU_OUT2,RCU_ID,RCU_VER]#,HBA1_Delay,HBA1_Pwr]#,RCU_CNF1,RCU_CNF2] +OPC_devvars=[RCU_mask,Ant_mask,RCU_att,RCU_band,RCU_temp,RCU_pwrd,RCU_LED,RCU_ADC_lock,RCU_ADC_SYNC,RCU_ADC_JESD,RCU_ADC_CML,RCU_OUT1,RCU_OUT2,RCU_ID,RCU_VER,HBA1_Delay,HBA1_Pwr]#,RCU_CNF1,RCU_CNF2] diff --git a/scripts/ADCreset.py b/scripts/ADCreset.py new file mode 100644 index 0000000..b0f208a --- /dev/null +++ b/scripts/ADCreset.py @@ -0,0 +1,13 @@ +from test_common import * + +RCUs=[0]; +setRCUmask(RCUs) + +callmethod("RCU_off") +time.sleep(1) +callmethod("RCU_on") +callmethod("RCU_on") +time.sleep(1) +callmethod("ADC_on") + +disconnect(); diff --git a/scripts/Att.py b/scripts/Att.py new file mode 100644 index 0000000..6e5d7b1 --- /dev/null +++ b/scripts/Att.py @@ -0,0 +1,26 @@ +from test_common import * + +name="RCU_attenuator" +RCU=0; +Att=[5,5,5] + + +#setAntmask([RCU]) + +att=get_value(name+"_R") +val=att.Value.Value +print("Att old:",val) +#print("Att old:",att[3*RCU:3*RCU+3]) + +val[3*RCU:3*RCU+3]=Att +att.Value.Value=val +print("Att set:",val) +#att[3*RCU]+=1 +set_value(name+"_RW",att) + +time.sleep(0.5) +att=get_value(name+"_R") +print("Att new:",att.Value.Value) +#print("Att new:",att[3*RCU:3*RCU+3]) + +disconnect() \ No newline at end of file diff --git a/scripts/LED.py b/scripts/LED.py new file mode 100644 index 0000000..d1857ae --- /dev/null +++ b/scripts/LED.py @@ -0,0 +1,19 @@ +from test_common import * + +name="RCU_LED0" +RCU=1; +LEDvalue="on"; + +setRCUmask([RCU]) +#exit() +led=get_value(name+"_R") +print("LED old:",led) +led[RCU]=LEDvalue +print("LED changed:",led) + +set_value(name+"_RW",led) +time.sleep(0.1) + +print("LED new:",get_value(name+"_R")) + +disconnect() diff --git a/scripts/RCUupdate.py b/scripts/RCUupdate.py new file mode 100644 index 0000000..42dde84 --- /dev/null +++ b/scripts/RCUupdate.py @@ -0,0 +1,8 @@ +from test_common import * + +RCUs=[0]; +setRCUmask(RCUs) + +callmethod("RCU_update") + +disconnect(); \ No newline at end of file diff --git a/scripts/SetHBA_BF.py b/scripts/SetHBA_BF.py new file mode 100644 index 0000000..160cf79 --- /dev/null +++ b/scripts/SetHBA_BF.py @@ -0,0 +1,27 @@ +RCU=0 +HBAT=1 #HBAT on RCU 0..2 +HBA=5; #HBA Element in HBAT +BFX=1 #delay in 0.5ns +BFY=BFX+1 +name="HBA_element_beamformer_delays" + +from test_common import * +import numpy as np + +AntMask=[(x==HBAT) for x in range(3)] +setAntmask([RCU],AntMask) + +i=(RCU*3+HBAT)*32+HBA*2 + +val=get_value(name+"_R") +print("old:",val[i:i+2]) + +val[i]=BFX +val[i+1]=BFY + +set_value(name+"_RW",val) +time.sleep(1) +val=get_value(name+"_R") +print("new:",val[i:i+2]) + +disconnect() diff --git a/scripts/SetHBA_LED.py b/scripts/SetHBA_LED.py new file mode 100644 index 0000000..b334ed0 --- /dev/null +++ b/scripts/SetHBA_LED.py @@ -0,0 +1,26 @@ +RCU=0 +HBAT=1 #HBAT on RCU 0..2 +HBA=5; #HBA Element in HBAT +LED=0 #on +name="HBA_element_led" + +from test_common import * +import numpy as np + +AntMask=[(x==HBAT) for x in range(3)] +setAntmask([RCU],AntMask) + +i=(RCU*3+HBAT)*32+HBA*2 + +val=get_value(name+"_R") +print("old:",val[i:i+2]) + +val[i]=LED #Not needed for LED +val[i+1]=LED + +set_value(name+"_RW",val) +time.sleep(1) +val=get_value(name+"_R") +print("new:",val[i:i+2]) + +disconnect() diff --git a/scripts/SetMonitor.py b/scripts/SetMonitor.py new file mode 100644 index 0000000..fa97710 --- /dev/null +++ b/scripts/SetMonitor.py @@ -0,0 +1,10 @@ +from test_common import * + +rate= 60 #seconds +name="RCU_monitor_rate_RW" + +print("old:",get_value(name)) +set_value(name,rate) +print("new:",get_value(name)) + +disconnect() diff --git a/scripts/test1.py b/scripts/test1.py new file mode 100644 index 0000000..a691a74 --- /dev/null +++ b/scripts/test1.py @@ -0,0 +1,23 @@ +from test_common import * + +name="RCU_LED0_RW" +#RCU=0; +#LEDvalue=0; + +#setRCUmask([RCU]) + +A=get_value(name) +print(A) +#A[0]=1.; +A[0]='on'; +print(A) +set_value(name,A) +#print("LED old:",led) +#led[RCU]=LEDvalue + +#set_value(name+"_RW",led) +#time.sleep(0.1) + +#print("LED new:",get_value(name+"_R")) + +disconnect() diff --git a/scripts/test_common.py b/scripts/test_common.py new file mode 100644 index 0000000..f9e5b42 --- /dev/null +++ b/scripts/test_common.py @@ -0,0 +1,68 @@ +#Address="opc.tcp://odroidRCU2:4842/" +Address="opc.tcp://dop444:4840/" +#Address="opc.tcp://ltspi:4842/" +import sys +sys.path.insert(0, "..") +import logging +import time + +from opcua import Client +from opcua import ua +#import numpy as np + +def connect(): + global client,root +# logging.basicConfig(level=logging.INFO) + logging.basicConfig(level=logging.WARN) +# client = Client("opc.tcp://localhost:4840/freeopcua/server/") + client = Client(Address) + client.connect() + client.load_type_definitions() # load definition of server specific structures/extension objects + root = client.get_root_node() + return root + +root=connect() + +def disconnect(): + client.disconnect() + +def get_value(name): + var1 = root.get_child(["0:Objects", "2:PCC", "2:"+name]) +# return var1.get_value() + return var1.get_data_value() + +def set_value(name,value): + var1 = root.get_child(["0:Objects", "2:PCC", "2:"+name]) + var1.set_value(value) +# var1.set_data_value(value,7) + +def setRCUmask(rcu=[]): + name="RCU_mask_RW" + M=get_value(name) + print(name," old:",M) + M=[False for m in M] + for r in rcu: +# M[r]=1 + M[r]=True + set_value(name,M) + print(name," new:",get_value(name)) + +def setAntmask(rcu=[],ant=[True,True,True]): + name="Ant_mask_RW" + M=get_value(name) + print(name," old:",M) + for i,j in enumerate(M): + M[i]=False + for r in rcu: + for i in range(3): + M[r*3+i]=ant[i] + set_value(name,M) + print(name," new:",get_value(name)) + +def callmethod(name): + try: + obj = root.get_child(["0:Objects", "2:PCC"])# + return obj.call_method("2:"+name) + except: +# print("error") + return None -- GitLab