Skip to content
Snippets Groups Projects
Commit 446fbcf9 authored by Paulus Kruger's avatar Paulus Kruger
Browse files

Merge branch 'pypcc2'

parents 91caf20b 604840c4
No related branches found
No related tags found
1 merge request!11Pypcc2
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# PyPCC # PyPCC
Python OPC-UA server to control the I2C devices in the LTS. Python OPC-UA server to control the I2C devices in the LTS.
+ opcuserv.py: OPC-UA server that expose (visible) variables and methods. + pypcc2.py: OPC-UA server that expose (visible) variables and methods.
# Prerequisites # Prerequisites
...@@ -34,7 +34,7 @@ python3 -m pip install -r requirements.txt ...@@ -34,7 +34,7 @@ python3 -m pip install -r requirements.txt
# Execute it # Execute it
The software can be simply executed with Python3: `python3 opcuaserv.py` The software can be simply executed with Python3: `python3 pypcc2.py -c RCY,CLK,UNB2`
Optional parameters are explained when the `-h` or `--help` parameter is supplied: Optional parameters are explained when the `-h` or `--help` parameter is supplied:
...@@ -47,7 +47,8 @@ usage: pypcc2.py [-h] [-s] [--no-lib-hack] [-p PORT] ...@@ -47,7 +47,8 @@ usage: pypcc2.py [-h] [-s] [--no-lib-hack] [-p PORT]
optional arguments: optional arguments:
-h, --help show this help message and exit -h, --help show this help message and exit
-s, --simulator Do not connect to I2c, but simulate behaviour. -s, --simulator Do not connect to I2c, but simulate behaviour.
--no-lib-hack Do not require a hacked opcua library. Breaks behaviour.
-p PORT, --port PORT Port number to listen on [4842]. -p PORT, --port PORT Port number to listen on [4842].
-c ..
-t ...
``` ```
from . import Vars
import numpy as np
import logging
from .spibitbang2 import *
import threading
def ApplyMask(value,width=8,bitoffset=0,previous=0):
mask=(1<<width)-1
if bitoffset>0:
value<<=bitoffset;
mask<<=bitoffset;
return (value & mask) + (previous - (previous & mask));
def UnMask(value,width=8,bitoffset=0):
mask=(1<<width)-1
if bitoffset>0:
value>>=bitoffset;
value=value&mask;
return value;
def bytes2int(bts):
x=0;
for b in bts:
x=x*256+b;
return x;
def int2bytes(i):
b=[];
while i>255:
b=[i%256]+b;
i>>=8;
return [i]+b;
def strs2bytes(var):
# print("str2bytes",var)
if len(var)==0: return var;
if isinstance(var[0],str): #make string a byte array
# return [ord(c.encode('utf-8')[0]) for s in var for c in s]
return [c.encode('utf-8')[0] for s in var for c in s]
return var
def bytes2strs(var,step,dtype):
if not(dtype==Vars.datatype.dstring): return var
cnt=int(len(var)/step)
print(var)
return [(bytearray(var[i*step:(i+1)*step]).decode("utf-8")) for i in range(cnt)]
class RCU1():
def __init__(self,number,I2Ccallback,Switchcallback):
self.N=number;
self.I2Ccallback=I2Ccallback
self.SWcallback=Switchcallback
self.previous =np.zeros([number,Vars.RCU_storeReg],dtype='int')
self.previousHBA=np.zeros([number,3,32],dtype='int')
def load(self):
Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
self.SetVar(Inst1)
#Vars.RCU_mask.OPCW.get_data_value().Value.Value=[1,1,0,0]
#Vars.Ant_mask.OPCW.get_data_value().Value.Value=[1,1,0,0,0,0,0,0,0,0,1,0]
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Var,Vars.RCU_att,12,[0,1,2,3,4,5,6,7,8,9,11])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_off,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_on,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.ADC1_on,0,[])
#RCU.SetVar(Inst1)
#print(Vars.RCU)
def SetVar(self,Instr,Mask=[]):
# Instr.value=strs2bytes(Instr.value) #Alwast be an array of bytes
if (self.N==1): Mask=[True]
if Instr.type==Vars.DevType.Instr:
#Execute instructions
Iset=Instr.dev;
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_value()
for i in Iset.instr:
logging.debug(str(("Inst",i)))
self.SetVar(i,Mask=Mask)
return;
if Instr.type in [Vars.DevType.I2C,Vars.DevType.SPIbb,Vars.DevType.I2Cbb]:
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_value()
mask=0;
RCU0=-1;
for RCUi in range(self.N):
if (Mask[RCUi]):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
if RCU0<0: RCU0=RCUi;
if RCU0<0: return; #Mask all zero
self.SWcallback(mask)
if Instr.type==Vars.DevType.I2C:
logging.info(str(('** Set I2C:',Instr.dev,Instr.value)))
self.SetI2C(RCU0,Instr.dev,8,0,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.SPIbb:
logging.debug(str(('** Set SPIbb:',Instr.dev,Instr.value)))
SetSPIbb(self.SetI2C,RCU0,Instr.dev,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.I2Cbb:
logging.info(str(('** Set I2Cbb:',Instr.dev,Instr.value)))
# self.SetI2C(RCUi,Instr.dev,8,0,strs2bytes(Instr.value))
return;
V1=Instr.dev
if not((Instr.nvalue==V1.nVars) and (Instr.type==Vars.DevType.VarUpdate)) and not(Instr.nvalue==V1.size*self.N):
logging.error("Wrong size of value")
return False
if V1.Vars[0].type==Vars.DevType.Internal:
Instr.dev.OPCW.set_value(Instr.value)
logging.debug("Update internal variable")
return
# if V1.Vars[0].type==Vars.DevType.Internal: return;
Step=V1.nVars
Step2=int(V1.size/V1.nVars)
if (self.N>1):
Mask=(Vars.RCU_mask if Step==1 else Vars.Ant_mask)
Mask=Mask.OPCW.get_value()
value1=strs2bytes(Instr.value) if V1.OPCR is None else strs2bytes(V1.OPCR.get_value())
if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N);
if (Instr.type==Vars.DevType.Var):
logging.info(str(('** Set Var:',V1.name,value1)))
for RCUi in range(self.N):
for Vari in range(Step):
if not(Mask[RCUi*Step+Vari]): continue
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
self.SetVarValue(RCUi,V1.Vars[Vari],Instr.value[i0:i1])
value2=value1[i0:i1]
self.GetVarValue(RCUi,V1.Vars[Vari],value2)
value1[i0:i1]=value2
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
elif Instr.type==Vars.DevType.VarUpdate:
if self.N==1:
for Vari in range(Step):
i0=(Vari)*Step2
i1=(Vari+1)*Step2
value2=value1[i0:i1]
self.GetVarValue(0,V1.Vars[Vari],value2)
value1[i0:i1]=value2
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
else:
self.GetVarValueAll(V1,value1)
# if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
# V1.OPCR.get_data_value().Value.Value=value1
logging.info(str(('** Readback:',V1.name,value1)))
def GetVarValue(self,RCUi,var,value):
logging.info(str(("RCU1 Get ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.HBA1:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2CHBA1(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
GetSPIbb(self.SetI2C,self.GetI2C,RCUi,var.devreg,value)
else:
logging.error("Not Implemented")
def GetVarValueAll(self,V1,value1):
mask=0;
for RCUi in range(self.N):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
Step=V1.nVars
Step2=int(V1.size/V1.nVars)
if V1.Vars[0].type==Vars.DevType.I2C:
for Vari in range(Step):
DevReg=V1.Vars[Vari].devreg
self.SWcallback(mask)
self.SetI2CAddr(self,DevReg)
if DevReg.Register_R>255: self.I2Ccallback(DevReg.Addr,[250],read=3) #Wait for ADC
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
var=V1.Vars[Vari]
# print(Step,Step2,i0,i1,value2,len(value1))
self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# print(value2)
if (var.Scale!=0): value2[0]*=var.Scale;
value1[i0:i1]=value2
elif V1.Vars[0].type==Vars.DevType.SPIbb:
self.GetBBValueAll(V1,value1,mask)
# logging.info("SPIbb all not implemented yet")
elif V1.Vars[0].type==Vars.DevType.HBA1:
for Vari in range(Step):
var=V1.Vars[Vari]
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
# print(Step,Step2,len(value1),V1.size)
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
self.GetI2CHBA1(RCUi,var.devreg,var.width,var.bitoffset,value2)
value1[i0:i1]=value2
# i0=(RCUi*Step+ Vari)*Step2+16
# i1=(RCUi*Step+(Vari+1))*Step2
# value2=value1[i0:i1]
# self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# value1[i0:i1]=value2
else:
logging.error("Type not implemented")
# print(value1)
def GetBBValueAll(self,V1,value1,mask):
def SetBit(RCUi,dev,width,bitoffset,value,buffer=False):
if not(buffer): self.SWcallback(mask)
self.SetI2C(RCUi,dev,width,bitoffset,value,buffer=buffer)
def GetBit(RCUixx,dev,width,bitoffset,buffer=False):
value=[0 for RCUi in range(self.N)]
value2=[0]
if buffer:
for RCUi in range(self.N):
self.GetI2Cbuffer(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value
self.SWcallback(mask)
self.SetI2CAddr(self,dev)
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
#for i in range(len(value2)): value2[i]*=0;
value2[0]=0
self.GetI2Cnoreg(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value;
devs=[V1.Vars[Vari].devreg for Vari in range(V1.nVars)]
GetSPIbb2(SetBit,GetBit,0,devs,value1)
def SetVarValue(self,RCUi,var,value):
if var.devreg.Register_W==-1: return True; #We can not set it, only read it e.g. temperature
logging.debug(str(("RCU1 Set ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.HBA1:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetHBA1I2C(RCUi,var.devreg,var.width,var.bitoffset,value)
# print("RCU1 Set",RCUi,var,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
logging.error("SPIbb Implemented")
else:
logging.error("Not Implemented")
def SetI2C(self,RCUi,dev,width,bitoffset,value,buffer=False):
if dev.store>0:
previous=self.previous[RCUi,dev.store-1];
value[0]=ApplyMask(value[0],width,bitoffset,previous);
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(('masked to',value)))
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if buffer: return True;
return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def SetHBA1I2C(self,RCUi,dev,width,bitoffset,value,buffer=False):
# if dev.store>0:
previous=self.previousHBA[RCUi,dev.store-1];
L=len(value)
for i in range(L):
value[i]=ApplyMask(value[i],width,bitoffset,previous[i]);
self.previousHBA[RCUi,dev.store-1]=value
# if buffer: return True;
self.I2Ccallback(dev.Addr,value[:16],reg=dev.Register_W)
if L>16: self.I2Ccallback(dev.Addr,value[16:],reg=dev.Register_W+16)
self.I2Ccallback(dev.Addr,[500],reg=dev.Register_W,read=3) #Wait 500ms
return True
# return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def GetI2Cbuffer(self,RCUi,dev,width,bitoffset,value):
if not(dev.store>0): return False;
value[0]=self.previous[RCUi,dev.store-1];
# logging.debug(str(("GetI2Cbuffer",RCUi,dev.store,value)))
l1=int(np.floor((width+bitoffset+7)/8))
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True
def GetI2C(self,RCUi,dev,width,bitoffset,value):
# if dev.store>0:
# value[0]=self.previous[RCUi,dev.store-1]
# return True
l1=int(np.floor((width+bitoffset+7)/8))
# print(width,bitoffset,l1)
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if reg>255: #This is for the monitor ADC
if not(self.I2Ccallback(dev.Addr,int2bytes(reg),read=2)): return False;
I2Ccallback(Addr,[250],read=3)
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
else:
if not(self.I2Ccallback(dev.Addr,value2,reg=reg,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
else: value[0]=value2[0]
return True;
def GetI2Cbit(self,RCUi,dev,pin):
value2=[value]
self.I2CGet(RCUi,dev,1,1<<pin,value2)
return value2[0]
def SetI2CAddr(self,RCUi,dev):
return self.I2Ccallback(dev.Addr,int2bytes(dev.Register_R),read=2)
def GetI2Cnoreg(self,RCUi,dev,width,bitoffset,value):
#print(width,len(value))
l1=int(np.floor((width+bitoffset+7)/8))
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
# if width<8:
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
# value[0]=UnMask(value[0],width,bitoffset)
#else: value[0]=value2[0]
#if (len(value)>1) and (width<8): print value
return True;
def GetI2CHBA1(self,RCUi,dev,width,bitoffset,value):
#print(width,len(value))
value2=value
reg=dev.Register_R
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
value[:]=value2[:];
if dev.store>0:
self.previousHBA[RCUi,dev.store-1]=value
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True;
def start(self,Q1):
def RCUthread(Q1):
while True:
item = Q1.get()
if item is None: break;
self.SetVar(item)
logging.info("End RCU thread")
RCUthread1 = threading.Thread(target=RCUthread, args=(Q1,))
RCUthread1.start()
return RCUthread1
def Queue_Monitor(self,Q1,NRCU):
Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.CLK_Stat1,NRCU,[0]*NRCU)
Q1.put(Inst1)
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
# Q1.put(Inst1)
return
def AddVars(self,Q1,AddVarR,AddVarW):
for v in Vars.OPC_devvars:
dim1=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars
dim2=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.size
dim3=int(v.size/v.nVars)
#print(v.name,dim)
varvalue2=0
if v.type==Vars.datatype.dInt: varvalue2=dim2*[0]
elif v.type==Vars.datatype.dbool: varvalue2=dim2*[False]
elif v.type==Vars.datatype.dfloat: varvalue2=dim2*[0.0]
elif v.type==Vars.datatype.dstring: varvalue2=dim1*[" "*dim3]
print(len(varvalue2),varvalue2)
if v.RW in [Vars.RW.ReadOnly,Vars.RW.ReadWrite]:
var1=AddVarR(v.name+"_R",varvalue2,v)
# print(len(varvalue1),len(varvalue2),v.size,dim2)
v.OPCR=var1
Inst=Vars.Instr(Vars.DevType.VarUpdate,v,dim2,varvalue2)
Q1.put(Inst)
if v.RW in [Vars.RW.WriteOnly,Vars.RW.ReadWrite]:
var1=AddVarW(v.name+"_RW",varvalue2,v,Q1)
v.OPCW=var1
def AddMethod(self,Q1,Addmethod):
for v in Vars.OPC_methods:
Inst1=Vars.Instr(Vars.DevType.Instr,v,0,[])
Addmethod(v.name,Inst1,Q1)
#from collections import namedtuple
#from enum import Enum
from pcctypes import *
#Mid plane address
RCU_MPaddr=MPaddr(1,[1],1,[7])
CLK_IO3_OUT1=DevReg(0x20,0,2,1)
CLK_IO3_OUT2=DevReg(0x20,1,3,2)
CLK_IO3_CONF1=DevReg(0x20,6,6,3)
CLK_IO3_CONF2=DevReg(0x20,7,7,4)
RCU_storeReg=4; #Number of stored registers
SPIBB_PLL=BBdev(4,[CLK_IO3_OUT1,CLK_IO3_OUT1,CLK_IO3_OUT1,CLK_IO3_OUT1],[4,7,5,6],0) #CLK,SDI,SDO,CS
CLK_PLL_lock =DevReg(SPIBB_PLL,0X00,0X00,0) # PLL locked status
CLK_PLL_div =DevReg(SPIBB_PLL,0X0D,0X0D,0) # PLL divide
CLK_PLL_pwr =DevReg(SPIBB_PLL,0X03,0X03,0) # PLL divide
CLK_PLL_5 =DevReg(SPIBB_PLL,0X05,0X05,0) # PLL divide
CLK_PLL_6 =DevReg(SPIBB_PLL,0X06,0X06,0) # PLL divide
CLK_PLL_7 =DevReg(SPIBB_PLL,0X07,0X07,0) # PLL divide
CLK_PLL_8 =DevReg(SPIBB_PLL,0X08,0X08,0) # PLL divide
CLK_PLL_9 =DevReg(SPIBB_PLL,0X09,0X09,0) # PLL divide
CLK_PLL_A =DevReg(SPIBB_PLL,0X0A,0X0A,0) # PLL divide
CLK_PLL_D =DevReg(SPIBB_PLL,0X0D,0X0D,0) # PLL divide
CLK_PLL_F =DevReg(SPIBB_PLL,0X0F,0X0F,0) # PLL divide
CLK_PLL_11 =DevReg(SPIBB_PLL,0X11,0X11,0) # PLL divide
CLK_PLL_13 =DevReg(SPIBB_PLL,0X13,0X13,0) # PLL divide
from .HWconf import *
CLKmod=I2Cmodules.CLK
CLK_IGNORE_PPS= VarArray("CLK_Ignore_PPS",1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Enable_PWR= VarArray("CLK_Enable_PWR",1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,1,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Stat1 = VarArray("CLK_Stat" ,1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,2 ,2,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_lock1 = VarArray("CLK_Lock" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_lock,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_div1 = VarArray("CLK_Div" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_div,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_pwr1 = VarArray("CLK_Pwr" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_pwr,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_5 = VarArray("CLK_PLL_5" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_5,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_6 = VarArray("CLK_PLL_6" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_6,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_7 = VarArray("CLK_PLL_7" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_7,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_8 = VarArray("CLK_PLL_8" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_8,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_9 = VarArray("CLK_PLL_9" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_9,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_A = VarArray("CLK_PLL_A" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_A,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_D = VarArray("CLK_PLL_D" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_D,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_F = VarArray("CLK_PLL_F" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_F,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_11 = VarArray("CLK_PLL_11" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_11,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_13 = VarArray("CLK_PLL_13" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_13,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
#OPC_devvars=[CLK_Enable_PWR,CLK_Stat1,CLK_5,CLK_6,CLK_7,CLK_8,CLK_9,CLK_A,CLK_D,CLK_F,CLK_11,CLK_13] #,CLK_lock1,CLK_Stat1,CLK_div1,CLK_pwr1,CLK_IGNORE_PPS]
OPC_devvars=[CLK_Enable_PWR,CLK_Stat1] #,CLK_lock1,CLK_Stat1,CLK_div1,CLK_pwr1,CLK_IGNORE_PPS]
RCU_init=Instrs("CLK_update",2,[
# Instr(DevType.VarUpdate,CLK_IGNORE_PPS,1,[0]),
Instr(DevType.VarUpdate,CLK_Enable_PWR,1,[0]),
Instr(DevType.VarUpdate,CLK_Stat1,1,[0])
])
CLK_on=Instrs("CLK_on",3,[
Instr(DevType.I2C,CLK_IO3_CONF1,1,[0x2C]),#0010 1100 PPS/PWR output, SCLK,CS,SDI
Instr(DevType.I2C,CLK_IO3_OUT1 ,1,[0x42]),#0100 0010 high:PWR enable, CS
# Instr(DevType.VarUpdate,CLK_lock1,1,[0]),
Instr(DevType.VarUpdate,CLK_Stat1,1,[0]),
Instr(DevType.VarUpdate,CLK_Enable_PWR,1,[0])
])
CLK_off=Instrs("CLK_off",3,[
Instr(DevType.I2C,CLK_IO3_CONF1,1,[0x2C]),
Instr(DevType.I2C,CLK_IO3_OUT1 ,1,[0x40]),
Instr(DevType.VarUpdate,CLK_Enable_PWR,1,[0])
])
def SPIinst(reg,value):
return Instr(DevType.SPIbb,DevReg(SPIBB_PLL,0,reg,0),1,[value])
CLK_PLL_setup=Instrs("CLK_PLL_setup",15,[
# SPIinst(0x03,0x08),
# Instr(DevType.I2C,CLK_IO3_CONF1,1,[0x2C]),
SPIinst(0x05,0x17),#was 97
SPIinst(0x06,0x10),
SPIinst(0x07,0x04),
SPIinst(0x08,0x01),
SPIinst(0x07,0x00),
SPIinst(0x09,0x10),
SPIinst(0x0A,0x14),
SPIinst(0x09,0x00),
SPIinst(0x0D,0x01),#was 2
SPIinst(0x0F,0x01),
SPIinst(0x11,0x01),
SPIinst(0x13,0x01),
# Instr(DevType.VarUpdate,CLK_lock1,1,[0]),
# Instr(DevType.VarUpdate,CLK_div1,1,[0]),
Instr(DevType.VarUpdate,CLK_Stat1,1,[0])
])
OPC_methods=[CLK_on,CLK_off,CLK_PLL_setup,RCU_init]
from enum import Enum
import logging
import numpy as np
class SPIBB_pins(Enum):
CLK = 0
SDI = 1
SDO = 2
CS = 3
def SetSPIbb(SetI2C,RCUi,dev,value):
ADC_address=dev.Register_W
CSdev=dev.Addr.devs[SPIBB_pins.CS.value]
CSpin=dev.Addr.pins[SPIBB_pins.CS.value]
SDOdev=dev.Addr.devs[SPIBB_pins.SDO.value]
SDOpin=dev.Addr.pins[SPIBB_pins.SDO.value]
SDIdev=dev.Addr.devs[SPIBB_pins.SDI.value]
SDIpin=dev.Addr.pins[SPIBB_pins.SDI.value]
CLKdev=dev.Addr.devs[SPIBB_pins.CLK.value]
CLKpin=dev.Addr.pins[SPIBB_pins.CLK.value]
logging.info(str(("SPIbb set",ADC_address,value)))
# dev_rw = 0x00 # 0 for write, 1 for read
# data2 = ( reg_address << 9 ) + ( dev_rw << 8 ) + value[0]
data2 = ( ADC_address << 9 ) + value[0]
bit_array = "{0:{fill}16b}".format(data2, fill='0')
# print(bit_array)
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable
for bit in bit_array:
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
SetI2C(RCUi,SDIdev,1,SDIpin,[int(bit)])
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0])#Why?
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
# SetI2C(RCUi,SDIdev,1,SDIpin,[1]) #high when finished
return True;
def GetSPIbb(SetI2C,GetI2C,RCUi,dev,value):
ADC_reg_address=dev.Register_R
CSdev=dev.Addr.devs[SPIBB_pins.CS.value]
CSpin=dev.Addr.pins[SPIBB_pins.CS.value]
SDOdev=dev.Addr.devs[SPIBB_pins.SDO.value]
SDOpin=dev.Addr.pins[SPIBB_pins.SDO.value]
SDIdev=dev.Addr.devs[SPIBB_pins.SDI.value]
SDIpin=dev.Addr.pins[SPIBB_pins.SDI.value]
CLKdev=dev.Addr.devs[SPIBB_pins.CLK.value]
CLKpin=dev.Addr.pins[SPIBB_pins.CLK.value]
logging.info(str(("SPIbb get",ADC_reg_address)))
ADC_bytes = 0x00
# ADC_rw = 0x01 # 0 for write, 1 for read
data = (ADC_reg_address << 1) + 1 #was 7??
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable
bit_array = "{0:{fill}8b}".format(data, fill='0')
for bit in bit_array:
SetI2C(RCUi,SDIdev,1,SDIpin,[int(bit)])
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
# SetI2C(RCUi,SDIdev,1,SDIpin,[1]) #high when finished
# print("read byte")
a=[0]
N=1 #len(value)
ret_value=[0]
for i in range(N): value[i]=0
for cnt in range(8*(ADC_bytes+1)):
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
SetI2C(RCUi,CLKdev,1,CLKpin,[1]) #read after rising
GetI2C(RCUi,SDOdev,1,SDOpin,ret_value)
for i in range(N): value[i]=(value[i]<<1)+ ret_value[i]
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
return True;
version: "1.0"
description: "1234"
drivers:
- name: I2C
type: i2c
parameters: [4] #I2C port number
- name: I2C_CLK
type: i2c_dev #I2C devices
parent: I2C
status: CLK_I2C_STATUS
- name: SPIbb1
type: spibitbang2 #SPI bitbang via GPIO expander: CLK, SDI,SDO,CS
parent: I2C_CLK
devreg: [IO1.GPIO1,IO1.GPIO1,IO1.GPIO1,IO1.GPIO1]
parameters: [4,7,5,6]
#This is the I2C devices in the RCU
device_registers:
- name: IO1
description: IO-Expander
address: 0x20
driver: I2C_CLK
registers:
- name: CONF1
description: Direction of port1
address: 6
store: True
- name: GPIO1
description: Input/Ouput port 1
address: [0,2] #Read / Write address different
store: True
- name: PLL
driver: SPIbb1
registers:
- name: PLL_stat
description: PLL locked status
address: 0x0
- {name: r3, address: 0x03}
- {name: r5, address: 0x05}
- {name: r6, address: 0x06}
variables:
- name: CLK_I2C_STATUS
driver: I2C_CLK
rw: ro #server RW variable, not linked to IO
dtype: uint8
- name: CLK_translator_busy
description: False when idle
rw: ro #server variable, not linked to IO
dtype: boolean
dim: 1
- name: CLK_Enable_PWR
description: Power enabled
rw: ro
dtype: boolean
driver: I2C_CLK
devreg: IO1.GPIO1
bitoffset: 1
width: 1
- name: CLK_PLL_locked
description: First status pin give lock status
rw: ro
dtype: boolean
monitor: true
driver: I2C_CLK
devreg: IO1.GPIO1
bitoffset: 2
width: 1
- name: CLK_PLL_error
description: Second status pin give error
rw: ro
dtype: boolean
monitor: true
driver: I2C_CLK
devreg: IO1.GPIO1
bitoffset: 3
width: 1
- name: CLK_PLL_locked_SPI
description: 0x81=locked
driver: I2C_CLK
devreg: PLL.PLL_stat
width: 8
rw: ro
dtype: uint8
debug: True
- name: [CLK_PLL_r3,CLK_PLL_r5,CLK_PLL_r6]
driver: I2C_CLK
devreg: [PLL.r3,PLL.r5,PLL.r6]
width: 8
rw: ro
dtype: uint8
debug: True
- name: RCU_IO1_GPIO1
driver: I2C_CLK
devreg: IO1.GPIO1
width: 8
rw: ro
dtype: uint8
mask: RCU_mask
debug: True
methods:
- name: CLK_Init #Called after startup to load. Should have all stored registers
driver: I2C_CLK
debug: True
instructions:
- CLK_I2C_STATUS : 0
- RCU_IO1_GPIO1 : Update
- IO1.CONF1: Update
- CLK_Enable_PWR: Update
- CLK_PLL_locked: Update
- CLK_PLL_error: Update
- name: CLK_on
driver: I2C_CLK
instructions:
- CLK_I2C_STATUS : 0
- IO1.CONF1: 0x2C #0010 1100 PPS/PWR output, SCLK,CS,SDI
- IO1.GPIO1: 0x42 #0100 0010 high:PWR enable, CS
- CLK_Enable_PWR: Update
- WAIT: 200 #ms to wait before checking lock
- CLK_PLL_setup: 0
- WAIT: 100 #ms to wait before checking lock
- CLK_PLL_locked: Update
- name: CLK_off
driver: I2C_CLK
instructions:
- CLK_I2C_STATUS : 0
- IO1.CONF1: 0x2C #0010 1100 PPS/PWR output, SCLK,CS,SDI
- IO1.GPIO1: 0x00 #all low
- CLK_Enable_PWR: Update
- CLK_PLL_locked: Update
- name: CLK_PLL_setup
driver: I2C_CLK
debug: true
instructions:
# - PLL.0x03: 0x08 #Set power, this is default
- PLL.0x05: 0x17 #was 97, set lock time
- PLL.0x06: 0x10
- PLL.0x07: 0x04 #Stop R divider
- PLL.0x08: 0x01 #Set R divider
- PLL.0x07: 0x00 #Start R divider
- PLL.0x09: 0x10 #Stop N divider
- PLL.0x0A: 0x14 #Set N divider=20, 200MHz/20=10MHz = input clock
- PLL.0x09: 0x00 #Start N divider
- PLL.0x0D: 0x01 #Divider output 1=1
- PLL.0x0F: 0x01 #Divider output 2=1
- PLL.0x11: 0x01 #Divider output 3=1
- PLL.0x13: 0x01 #Divider output 4=1
version: "1.0"
description: "1234"
drivers:
- name: I2C1 #TCA9548
type: i2c_switch
devreg: [0x70]
parameters: [1] #I2C port number
- name: I2C_RCU
type: i2c_array #An array of similar devices connected to an I2C switch
parent: I2C1
parameters: [0,31] #start,number of RCUs
status: RCU_I2C_STATUS
- name: I2C_HBAT
type: hba1 #Special driver to manage HBAT1s.
parent: I2C_RCU
- name: I2Cbb1
type: i2cbitbang1 #I2C bitbang via GPIO expander
devreg: [IO1.GPIO1,IO2.GPIO2,IO2.CONF2]
parameters: [6,3,3] #pins
parent: I2C_RCU
- name: I2Cbb2
type: i2cbitbang1
devreg: [IO1.GPIO2,IO2.GPIO1,IO1.CONF1]
parameters: [7,7,7]
parent: I2C_RCU
- name: I2Cbb3
type: i2cbitbang1
devreg: [IO1.GPIO2,IO2.GPIO1,IO1.CONF1]
parameters: [7,7,7]
parent: I2C_RCU
- name: SPIbb1
type: spibitbang1 #SPI bitbang via GPIO expander: CLK, SDIO, SDIOdir,CS
devreg: [IO3.GPIO1,IO3.GPIO1,IO3.CONF1,IO3.GPIO2]
parameters: [1,0,0,0]
parent: I2C_RCU
- name: SPIbb2
type: spibitbang1
devreg: [IO3.GPIO1,IO3.GPIO1,IO3.CONF1,IO3.GPIO2]
parameters: [3,2,2,1]
parent: I2C_RCU
- name: SPIbb3
type: spibitbang1
devreg: [IO3.GPIO1,IO3.GPIO1,IO3.CONF1,IO3.GPIO2]
parameters: [5,4,4,2]
parent: I2C_RCU
- name: HBA_trigger
type: gpio_hba_trigger
devreg: [0x40.0x10] #I2C broadcast register
parameters: [15] #PPS GPIO pin
parent: I2C_RCU
#This is the I2C devices in the RCU
device_registers:
- name: IO
dim: 3
description: [IO-Expander for filter selection,IO-Expander for ON/OFF, Band, BUFx2,IO-Expander for ADC control]
address: [0x75,0x76,0x20]
device: [TCA9539,TCA9539,TCA6416]
driver: I2C1
registers:
- name: CONF1
description: Direction of port1
address: 6
store: True
- name: CONF2
description: Direction of port2
address: 7
store: True
- name: GPIO1
description: Input/Ouput port 1
address: [0,2] #Read / Write address different
store: True
- name: GPIO2
description: Input/Ouput port 2
address: [1,3]
store: True
- name: ROM
description: IO-Expander for filter selection
address: 0x50
driver: I2C1
registers:
- name: ID
description: Random
address: 0xfc
- name: Version
description: Set in production
address: 0
- name: AN
description: Monitor ADC on RCU
address: 0x14
device: LTC2495
driver: I2C1
registers:
- name: V_1v8
address: 0xB080
- name: V_2v5
address: 0xB880
- name: V_3v3
address: 0xB180
- name: I_Ant0
address: 0xB980
- name: I_Ant1
address: 0xB280
- name: I_Ant2
address: 0xBA80
- name: V_Ant0
address: 0xB380
- name: V_Ant1
address: 0xBB80
- name: Temp
address: 0xA0C0
#This 'special' devices that uses I2C
- name: HB_UC
description: RCU microcontroller
address: 0x40
driver: I2C1
registers:
- name: ID
description: Device ID
address: 0
- name: HB_UC_update
description: RCU microcontroller
address: 0x40
driver: HBA_trigger
registers:
- name: wait_pps
address: 10
- name: HBAT
dim: 3
address: [0x41,0x42,0x43]
description: Virtual HBAT0 interface
driver: I2C_HBAT
registers:
- name: XY
address: 0x10
description: XY delay register
store: True
- name: Version
address: 127
description: HBAT server version
- name: ADC
dim: 3
description: ADC SPI control
device: AD9683
driver: [SPIbb1,SPIbb2,SPIbb3]
registers:
- name: PLL_stat
description: PLL locked status
address: 0x0A
- name: JESD_control1
description: JESD link control
address: 0x5F
- name: SYNC_control
address: 0x3A
- name: CML_level
description: CML output adjust
address: 0x15
- name: Update
description: Global device uptate
address: 0xFF
- name: DTH
dim: 3
description: CW dither source
device: SI4012
driver: [I2Cbb1,I2Cbb1,I2Cbb1]
address: 0x70
registers:
- name: Freq
description: Frequency
address: [0x1140,0x1141]
- name: Property
description: Properties
address: [0x11,0x11]
- name: Start
description: Start CW
address: [0x62,0x62]
- name: Stop
description: Stop CW
address: [0x67,0x67]
variables:
- name: Ant_mask
description: Only masked RF chains are updated
driver: I2C_RCU
rw: variable #server RW variable, not linked to IO
dtype: boolean
dim: 96
- name: RCU_mask
description: Only masked RCUs are updated
driver: I2C_RCU
rw: variable #server RW variable, not linked to IO
dtype: boolean
dim: 32
- name: RCU_I2C_STATUS
description: 0=Good, 1=No communication, 2=error
driver: I2C_RCU
rw: ro #server RW variable, not linked to IO
dtype: uint8
mask: RCU_mask
dim: 32
# - name: RCU_state
# description: State of RCUs 0=unknown, 1=ready, 2=busy, 3= wait PPS, 4=error
# driver: I2C_RCU
# rw: ro #server variable, not linked to IO
# dtype: uint8
# dim: 1
- name: RCU_translator_busy
description: False when idle
rw: ro #server variable, not linked to IO
dtype: boolean
dim: 1
- name: RCU_attenuator
description: Attenuator before ADC
driver: I2C_RCU
devreg: [IO1.GPIO1,IO1.GPIO2,IO2.GPIO1]
bitoffset: [0,0,0]
width: 5
rw: rw
dtype: uint8
dim: 96
mask: Ant_mask
- name: RCU_band
description: Band select switch 1=10MHz,2=30MHz
driver: I2C_RCU
devreg: [IO2.GPIO2,IO2.GPIO2,IO2.GPIO2]
bitoffset: [0,2,4]
width: 2
rw: rw
dtype: uint8
dim: 96
mask: Ant_mask
- name: [RCU_IO1_GPIO1,RCU_IO1_GPIO2,RCU_IO2_GPIO1,RCU_IO2_GPIO2,RCU_IO3_GPIO1,RCU_IO3_GPIO2]
driver: I2C_RCU
devreg: [IO1.GPIO1,IO1.GPIO2,IO2.GPIO1,IO2.GPIO2,IO3.GPIO1,IO3.GPIO2]
width: 8
rw: ro
dtype: uint8
dim: 32
mask: RCU_mask
debug: True
- name: RCU_LED0
driver: I2C_RCU
description: LED on RCU
devreg: IO2.GPIO2
bitoffset: 6
width: 1
rw: rw
dtype: boolean
dim: 32
mask: RCU_mask
- name: RCU_LED1
driver: I2C_RCU
description: LED on RCU
devreg: IO2.GPIO2
bitoffset: 7
width: 1
rw: rw
dtype: boolean
dim: 32
mask: RCU_mask
- name: RCU_temperature
description: Temperature sensor on RCU
driver: I2C_RCU
devreg: AN.Temp
width: 23
scale: 4.21e-3
rw: ro
dtype: double
dim: 32
monitor: true
mask: RCU_I2C_STATUS
- name: RCU_Pwr_dig
description: Enable LDOs
driver: I2C_RCU
devreg: IO2.GPIO1
width: 1
bitoffset: 6
rw: ro
dtype: boolean
dim: 32
mask: RCU_mask
- name: HBA_element_beamformer_delays
description: Delays of each frontend
driver: I2C_HBAT
devreg: [HBAT1.XY,HBAT2.XY,HBAT3.XY]
bitoffset: [2,2,2]
width: 5
rw: rw
dtype: uint8
dim: 3072
mask: Ant_mask
wait: 100 #ms
- name: [HBA_element_led,HBA_element_pwr,HBA_element_LNA_pwr]
description: LED, power, frontend power of each frontend
driver: I2C_HBAT
devreg: [HBAT1.XY,HBAT2.XY,HBAT3.XY]
bitoffset: [0,7,1]
width: 1
rw: rw
dtype: boolean
dim: 3072
mask: Ant_mask
wait: 100 #ms
- name: RCU_ID
description: Unique RCU ID
driver: I2C_RCU
devreg: ROM.ID
width: 32
rw: ro
dtype: uint32
dim: 32
mask: RCU_mask
- name: RCU_version
description: RCU version number
driver: I2C_RCU
devreg: ROM.Version
width: 80 #10 characters
rw: ro
dtype: string
dim: 32
mask: RCU_mask
- name: RCU_ADC_lock
description: 0x81=locked
driver: I2C_RCU
devreg: [ADC1.PLL_stat,ADC2.PLL_stat,ADC3.PLL_stat]
width: 8
rw: ro
dtype: uint8
dim: 96
monitor: true
- name: RCU_ADC_sync
description: 0x81=locked
driver: I2C_RCU
devreg: [ADC1.SYNC_control,ADC2.SYNC_control,ADC3.SYNC_control]
width: 8
rw: ro
dtype: uint8
dim: 96
debug: true
# - name: RCU_dth1_freq
# driver: I2C_RCU
# devreg: [DTH1.Freq,DTH2.Freq,DTH3.Freq]
# width: 32
# rw: rw
# dtype: uint32
# dim: 96
# mask: Ant_mask
methods:
- name: RCU_Init #Called after startup to load. Should have all stored registers
driver: I2C_RCU
debug: True
instructions:
- RCU_IO1_GPIO1: Update
- RCU_IO1_GPIO2: Update
- RCU_IO2_GPIO1: Update
- RCU_IO2_GPIO2: Update
- RCU_IO3_GPIO1: Update
- RCU_IO3_GPIO2: Update
# - IO1.GPIO2: Update
# - IO2.GPIO1: Update
# - IO2.GPIO2: Update
# - IO3.GPIO1: Update
# - IO3.GPIO2: Update
- IO3.CONF1: Update
- RCU_update: 0
- name: RCU_on
driver: I2C_RCU
mask: RCU_mask
instructions:
- IO2.CONF1: 0 #Set device register, can also specify a register adress direction e.g. OIO2.0: 0
- IO2.GPIO1: 0x4A
- IO2.GPIO2: 0x55
- IO3.GPIO1: 0x15
- IO3.GPIO2: 0x47
- IO1.GPIO1: 0xCA
- IO1.GPIO2: 0xCA
- IO2.CONF2: 0
- IO3.CONF1: 0
- IO3.CONF2: 0
- IO1.CONF1: 0
- IO1.CONF1: 0
# - RCU_GPIO1: Update
# - RCU_GPIO2: Update
# - RCU_attenuator: [10,10,10] #Set OPC-UA variable
- WAIT: 500 #ms to wait
- ADC1_on: 0 #call another opc-ua method
- ADC2_on: 0
- ADC3_on: 0
- WAIT: 500 #ms to wait
- RCU_update: 0
- name: RCU_update
driver: I2C_RCU
mask: RCU_mask
debug: True
instructions:
- RCU_Pwr_dig: Update #Read value and update the OPC-UA variable
- RCU_ID: Update
- RCU_version: Update
- RCU_LED0: Update
- RCU_attenuator: Update
- RCU_band: Update
- RCU_ADC_lock: Update
- RCU_ADC_sync: Update
- name: ADC1_on
driver: I2C_RCU
debug: True
# rw: hidden
instructions:
- ADC1.JESD_control1 : 0x14
- ADC1.SYNC_control: 1 #Setup ADCs
- ADC1.CML_level: 0x7
- ADC1.Update: 1 #Needed to update ADC registers
- name: ADC2_on
driver: I2C_RCU
debug: True
# rw: hidden
instructions:
- ADC2.JESD_control1 : 0x14
- ADC2.SYNC_control: 1 #Setup ADCs
- ADC2.CML_level: 0x7
- ADC2.Update: 1 #Needed to update ADC registers
- name: ADC3_on
driver: I2C_RCU
debug: True
# rw: hidden
instructions:
- ADC3.JESD_control1 : 0x14
- ADC3.SYNC_control: 1 #Setup ADCs
- ADC3.CML_level: 0x7
- ADC3.Update: 1 #Needed to update ADC registers
- name: RCU_off
driver: I2C_RCU
mask: RCU_mask
instructions:
- RCU_Pwr_dig: 0 #Switch power off
- IO2.GPIO1: 0
- IO2.GPIO2: 0
- IO3.GPIO1: 0
- IO3.GPIO2: 0
- IO1.GPIO1: 0
- IO1.GPIO2: 0
- RCU_update: 0
#todo, also make all GPIO pins (except power enables) inputs to remove all power from devices.
- name: RCU_HBAT_WAIT_PPS
driver: I2C_RCU
mask: RCU_mask
debug: True
instructions:
- RCU_state: 3
- HB_UC_update.wait_pps : 1
- RCU_state: 1
version: "0.0"
description: "UNB2 DTS first draft"
drivers:
- name: I2C1
type: i2c_switch2
devreg: [APSCT_SWITCH.MASK,UB2_SWITCH1.MASK,UB2_SWITCH2.MASK]
parameters: [3,0,0,0] #I2C port number, 3x switch reset pins
- name: switch_UNB2
type: i2c_array
parent: I2C1
parameters: [0,1]
status: UNB2_I2C_bus_STATUS
- name: switch_PS
type: i2c_array2
parent: I2C1
parameters: [0,1, 4,4]
status: UNB2_I2C_bus_PS_STATUS
- name: switch_FP
type: i2c_array2
parent: I2C1
parameters: [0,1, 5,5]
status: UNB2_I2C_bus_FP_STATUS
- name: switch_QSFP
type: i2c_array2 #An array of similar devices connected to an I2C switch
parent: I2C1
parameters: [0,1, 0,3, 0,3, 6,7] #Parameters: APSCT_Switch, main switch, 2nd switch, 2nd switch
status: UNB2_I2C_bus_QSFP_STATUS
- name: switch_DDR4
type: i2c_array2
parent: I2C1
parameters: [0,1, 0,3, 4,4]
status: UNB2_I2C_bus_DDR4_STATUS
- name: switch_FPGA_PS
type: i2c_array2
parent: I2C1
parameters: [0,1, 0,3, 5,5]
status: UNB2_I2C_bus_FPGA_PS_STATUS
- name: GPIO
type: gpio
#This is the I2C devices in the RCU
device_registers:
- name: APSCT_SWITCH #not in LTS
description: Uniboard select switch on APSCT
address: 0x70
device: TCA9548
driver: I2C1
registers:
- name: MASK
- name: UB2_SWITCH1
description: UNB2 primary switch
address: 0x71
device: TCA9548
driver: I2C1
registers:
- name: MASK
- name: UB2_SWITCH2
description: UNB2 secondary switch
address: 0x72
device: TCA9548
driver: I2C1
registers:
- name: MASK
- name: FP_IO
description: IO-Expander for front panel
address: 0x41
device: PCA9536
driver: d_front_panel
registers:
- name: CONF #default 0xff = all input
description: Direction of GPIO
address: 3
- name: GPIO
description: Input/Ouput port
address: [0,1] #Read / Write address different
store: True
variables:
#When I2C bus timeout, bus_STATUS set to False. Can we set to True again to retry.
- name: UNB2_I2C_bus_STATUS
driver: switch_UNB2
rw: ro
dtype: uint8
dim: 2
- name: UNB2_I2C_bus_FP_STATUS
driver: switch_UNB2
rw: hidden
dtype: uint8
dim: 2
- name: UNB2_I2C_bus_QSFP_STATUS
driver: switch_QSFP
rw: ro
dtype: uint8
dim: 48
- name: UNB2_I2C_bus_DDR4_STATUS
driver: switch_QSFP
rw: ro
dtype: uint8
dim: 8
- name: UNB2_I2C_bus_FPGA_PS_STATUS
driver: switch_FPGA_PS
rw: ro
dtype: uint8
dim: 8
- name: UNB2_I2C_bus_PS_STATUS
driver: switch_PS
rw: ro
dtype: uint8
dim: 2
- name: UNB2_translator_busy
description: False when idle
rw: ro #server variable, not linked to IO
dtype: boolean
dim: 1
##Central MP for whole Uniboard2
- name: UNB2_mask
rw: variable #translator variable
dtype: boolean
dim: 2
- name: UNB2_Power_ON_OFF
driver: GPIO
mask: UNB2_mask
width: 1
rw: ro
dtype: boolean
dim: 2
- name: UNB2_Front_Panel_LED
description: bit 0=Red, 1=Blue, 2=Green
mask: UNB2_mask
driver: switch_FP
devreg: FP_IO.GPIO
bitoffset: 4
width: 3
rw: rw
dtype: uint8
dim: 2
- name: UNB2_EEPROM_Serial_Number
driver: switch_UNB2
devreg: 0x50.0
width: 80 #10 characters
rw: ro
dtype: string
dim: 2
- name: UNB2_EEPROM_Unique_ID
driver: switch_UNB2
devreg: 0x50.0xFC
width: 32
rw: ro
dtype: uint32
dim: 2
- name: [UNB2_DC_DC_48V_12V_VIN,UNB2_DC_DC_48V_12V_VOUT,UNB2_DC_DC_48V_12V_IOUT,UNB2_DC_DC_48V_12V_TEMP]
driver: switch_PS
devreg: [0x2C.0x88,0x2C.0x8B,0x2C.0x8C,0x2C.0x8D]
width: 16
rw: ro
dtype: double
scale: 1.2207e-4 #2^-13
dim: 2
monitor: true
- name: [UNB2_POL_QSFP_N01_VOUT,UNB2_POL_QSFP_N01_IOUT,UNB2_POL_QSFP_N01_TEMP,UNB2_POL_QSFP_N23_VOUT,UNB2_POL_QSFP_N23_IOUT,UNB2_POL_QSFP_N23_TEMP]
driver: switch_PS
devreg: [0x2.0x8B,0x2.0x8C,0x2.0x8D,0x1.0x8B,0x1.0x8C,0x1.0x8D]
width: 16
rw: ro
dtype: double
scale: 1.2207e-4 #2^-13
dim: 2
monitor: true
- name: [UNB2_POL_SWITCH_1V2_VOUT,UNB2_POL_SWITCH_1V2_IOUT,UNB2_POL_SWITCH_1V2_TEMP,UNB2_POL_SWITCH_PHY_VOUT,UNB2_POL_SWITCH_PHY_IOUT,UNB2_POL_SWITCH_PHY_TEMP]
driver: switch_PS
devreg: [0xF.0x8B,0xF.0x8C,0xF.0x8D,0xE.0x8B,0xE.0x8C,0xE.0x8D]
width: 16
rw: ro
dtype: double
scale: 1.2207e-4 #2^-13
dim: 2
monitor: true
- name: [UNB2_POL_CLOCK_VOUT,UNB2_POL_CLOCK_IOUT,UNB2_POL_CLOCK_TEMP]
driver: switch_PS
devreg: [0xD.0x8B,0xD.0x8C,0xD.0x8D]
width: 16
rw: ro
dtype: double
scale: 1.2207e-4 #2^-13
dim: 2
monitor: true
##Local MP per FPGA node
- name: UNB2_FPGA_DDR4_SLOT_TEMP
description: Signed I2C!
driver: switch_DDR4
devreg: [0x18.0x5,0x19.0x5]
width: 13
rw: ro
dtype: double
scale: 0.0625
dim: 16
monitor: true
- name: UNB2_FPGA_DDR4_SLOT_PART_NUMBER
driver: switch_DDR4
devreg: [0x18.0x149,0x19.0x149]
width: 160
rw: ro
dtype: string
dim: 16
monitor: true
- name: [UNB2_POL_FPGA_CORE_VOUT,UNB2_FPGA_POL_CORE_IOUT,UNB2_FPGA_POL_CORE_TEMP,UNB2_FPGA_POL_ERAM_VOUT,UNB2_FPGA_POL_ERAM_IOUT,UNB2_FPGA_POL_ERAM_TEMP]
driver: switch_FPGA_PS
devreg: [0x1.0x8B,0x1.0x8C,0x1.0x8D,0xD.0x8B,0xD.0x8C,0xD.0x8D]
width: 16
rw: ro
dtype: double
scale: 1.2207e-4 #2^-13
dim: 8
monitor: true
- name: [UNB2_FPGA_POL_RXGXB_VOUT,UNB2_FPGA_POL_RXGXB_IOUT,UNB2_FPGA_POL_RXGXB_TEMP,UNB2_FPGA_POL_TXGXB_VOUT,UNB2_FPGA_POL_TXGXB_IOUT,UNB2_POL_FPGA_TXGXB_TEMP]
driver: switch_FPGA_PS
devreg: [0xE.0x8B,0xE.0x8C,0xE.0x8D,0xF.0x8B,0xF.0x8C,0xF.0x8D]
width: 16
rw: ro
dtype: double
scale: 1.2207e-4 #2^-13
dim: 8
monitor: true
- name: [UNB2_FPGA_POL_HGXB_VOUT,UNB2_FPGA_POL_HGXB_IOUT,UNB2_FPGA_POL_HGXB_TEMP,UNB2_FPGA_POL_PGM_VOUT,UNB2_FPGA_POL_PGM_IOUT,UNB2_FPGA_POL_PGM_TEMP]
driver: switch_FPGA_PS
devreg: [0x10.0x8B,0x10.0x8C,0x10.0x8D,0x11.0x8B,0x11.0x8C,0x11.0x8D]
width: 16
rw: ro
dtype: double
scale: 1.2207e-4 #2^-13
dim: 8
monitor: true
##Local MP per FPGA node, QSFP cage
- name: UNB2_FPGA_QSFP_CAGE_TEMP
description: Signed I2C!
driver: switch_QSFP
devreg: 0x50.0x16
width: 16
rw: ro
dtype: double
scale: 0.0625 #TBC
dim: 48
monitor: true
- name: UNB2_FPGA_QSFP_CAGE_LOS
description: Bits for 4 TX, 4 RX channels
driver: switch_QSFP
devreg: 0x50.0x03
width: 8
rw: ro
dtype: uint8
dim: 48
monitor: true
methods:
- name: UNB2_on
mask: UNB2_mask
instructions:
- FP_IO.CONF: 0xff #TODO: setup correctly
- UNB2_Front_Panel_LED: 1
- name: UNB2_off
mask: UNB2_mask
instructions:
- UNB2_Front_Panel_LED: 2
{
"$id": "http://git.astron.nl/lofar2.0/pypcc/apsctl.schema.json",
"$schema": "http://json-schema.org/draft-07/schema",
"title": "apsctl",
"description": "APSCTL yaml schema",
"type" : "object",
"properties": {
"version": {"type":"string"},
"description":{"type":"string"},
"drivers": { "type":"array", "items": { "$ref": "#/$defs/drivers"}},
"device_registers":{ "type":"array", "items": { "$ref": "#/$defs/dev_regs"}},
"variables":{ "type":"array", "items": { "$ref": "#/$defs/variables"}},
"methods":{ "type":"array", "items": { "$ref": "#/$defs/methods"}}
},
"additionalProperties" : false,
"$defs":{
"drivers" : {
"type" : "object",
"properties": {
"name" : {"type":"string"},
"type" : {"enum":["i2c","RCU_switch","UNB2_switch","i2c_devs","i2c_array","i2c_array2","hba1","i2cbitbang1","spibitbang1","spibitbang2"]},
"parent" : {"type":"string"},
"parameters" : {"type":"array","items":{"type":"integer"}} ,
"devreg" : {"type":"array","items":{"type":"string"}}
},
"additionalProperties" : false
},
"dev_regs" : {
"type" : "object",
"properties": {
"name" : {"type":"string"},
"dim" : {"type":"integer"},
"description" : {},
"address" : {"anyOf":[{"type":"integer"},{"type":"array","items":{"type":"integer"}}]},
"device" : {},
"driver" : {},
"registers":{ "type":"array", "items": { "$ref": "#/$defs/registers"}}
},
"additionalProperties" : false
},
"registers" : {
"type" : "object",
"properties": {
"name" : {"type":"string"},
"description" : {"type":"string"},
"address" : {},
"store" : {"type":"boolean"}
},
"additionalProperties" : false
},
"variables" : {
"type" : "object",
"properties": {
"name" : {"type":"string"},
"description" : {"type":"string"},
"dim" : {"type":"integer"},
"dtype" : {"enum":["uint64","uint32","uint16","uint8","double","boolean","string"]},
"driver" : {"type":"string"},
"rw" : {"enum":["variable","rw","ro","hidden"]},
"scale" : {"type":"number"},
"bitoffset" : {},
"width" : {"type":"integer"},
"devreg" : {},
"mask" : {"type":"string"}
},
"additionalProperties" : false
},
"methods" : {
"type" : "object",
"properties": {
"name" : {"type":"string"},
"mask" : {"type":"string"},
"instructions" : {"type":"array"},
"rw" : {"enum":["visible","hidden"]}
},
"additionalProperties" : false
}
}
}
#import os
#if os.sys.platform is 'linux':
import pylibi2c;
import time
import logging
#bus = pylibi2c.I2CDevice('/dev/i2c-1'
#read=0: write to register
#read=1: read from register
#read=2: write to register (common in group)
#read=3: wait ms second
I2Ccounter=0;
def I2C1server(addr,data,reg=None,read=0):
# print("I2C",addr,reg,data,read)
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-1',addr)
if read==1:
length=len(data)
bus.iaddr_bytes=0
if not(reg is None):
bus.ioctl_write(0,str(bytearray([reg])))
data[:]=[int(x) for x in bus.ioctl_read(0,length)]
# print("I2C read",addr,reg,data,read)
else:
if reg is None:
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
# data[0]=0xff
return False;
def I2C4server(addr,data,reg=None,read=0):
# print("I2C4",addr,reg,data,read)
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
# print("I2C",addr,reg,data,read)
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-4',addr)
if read==1:
length=len(data)
bus.iaddr_bytes=0
if not(reg is None):
bus.ioctl_write(0,str(bytearray([reg])))
data[:]=[int(x) for x in bus.ioctl_read(0,length)]
# print(data)
else:
if reg is None:
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
# data[0]=0
return False;
import pylibi2c;
import time
import logging
#bus = pylibi2c.I2CDevice('/dev/i2c-1'
#read=0: write to register
#read=1: read from register
#read=2: write to register (common in group)
#read=3: wait ms second
I2Ccounter=0;
def I2C1server(addr,data,reg=None,read=0):
# print("I2C",addr,reg,data,read)
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-1',addr)
if read==1:
length=len(data)
bus.iaddr_bytes=0
if not(reg is None):
bus.ioctl_write(0,str(bytearray([reg])))
data[:]=[int(x) for x in bus.ioctl_read(0,length)]
# print("I2C read",addr,reg,data,read)
else:
if reg is None:
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
# data[0]=0xff
return False;
def I2C4server(addr,data,reg=None,read=0):
# print("I2C4",addr,reg,data,read)
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
# print("I2C",addr,reg,data,read)
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-4',addr)
if read==1:
length=len(data)
bus.iaddr_bytes=0
if not(reg is None):
bus.ioctl_write(0,str(bytearray([reg])))
data[:]=[int(x) for x in bus.ioctl_read(0,length)]
# print(data)
else:
if reg is None:
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
print("I2C4 error")
# data[0]=0xff
return False;
#import pylibi2c;
import time
import logging
#bus = pylibi2c.I2CDevice('/dev/i2c-1'
#read=0: write to register
#read=1: read from register
#read=2: write to register (common in group)
#read=3: wait ms second
I2Ccounter=0;
def I2C1server(addr,data,reg=None,read=0):
global I2Ccounter;
logging.debug(str(("I2C",addr,reg,data,read)))
if read==3:
time.sleep(data[0]/1000.)
return True
if read==1:
data[0]=0
if reg: I2Ccounter+=1;
I2Ccounter+=len(data)
# logging.debug(str(("I2C",addr,reg,data,read)))
return True
def I2C4server(addr,data,reg=None,read=0):
global I2Ccounter;
logging.debug(str(("I2C4",addr,reg,data,read)))
if read==3:
time.sleep(data[0]/1000.)
return True
if read==1:
data[0]=0
if reg: I2Ccounter+=1;
I2Ccounter+=len(data)
# logging.debug(str(("I2C",addr,reg,data,read)))
return True
import numpy as np
import logging
#import HWconf
SWaddr=0x70
class I2Cswitch1():
def __init__(self,callback1):
self.callback1=callback1;
self.CurrentChannel=0;
def SetChannel(self,channel):
if (channel)==self.CurrentChannel: return True;
logging.debug(str(("SetChannel",channel,self.CurrentChannel)))
self.CurrentChannel=channel
return self.callback1(SWaddr,[channel])
def I2Ccallback(self,RCU,addr,data,reg=None,read=0):
self.callback1(addr,data,reg,read)
class I2Cswitch0():
def __init__(self,callback1):
return
def SetChannel(self,channel):
return True
def I2Ccallback(self,RCU,addr,data,reg=None,read=0):
return
\ No newline at end of file
#import pylibi2c;
#import
from periphery import I2C;
import time
import logging
#bus = pylibi2c.I2CDevice('/dev/i2c-1'
#read=0: write to register
#read=1: read from register
#read=2: write to register (common in group)
#read=3: wait ms second
I2Ccounter=0;
i2c1=I2C("/dev/i2c-1")
def I2C1server(addr,data,reg=None,read=0):
# print("I2C",addr,reg,data,read)
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
if read==1:
if not(reg is None): i2c1.transfer(addr,[I2C.Message([reg])])
msgs=[I2C.Message(data,read=True)]
i2c1.transfer(addr,msgs)
data[:]=msgs[0].data
else:
if reg is None:
msgs=[I2C.Message(data)]
else:
msgs=[I2C.Message([reg]+data)]
i2c1.transfer(addr,msgs)
return True;
except:
return False;
i2c4=I2C("/dev/i2c-4")
def I2C4server(addr,data,reg=None,read=0):
# print("I2C",addr,reg,data,read)
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
if read==1:
if not(reg is None): i2c4.transfer(addr,[I2C.Message([reg])])
msgs=[I2C.Message(data,read=True)]
i2c4.transfer(addr,msgs)
data[:]=msgs[0].data
else:
if reg is None:
msgs=[I2C.Message(data)]
else:
msgs=[I2C.Message([reg]+data)]
i2c4.transfer(addr,msgs)
return True;
except:
return False;
from I2C import *
import time
RCU=1
Ver="RCU2H v0.2"
R1=0
ROM=0x50
#Set switch
print("Set switch")
if not(I2C1server(0x70,[1<<RCU],reg=None,read=0)): exit() #select RCU
#exit()
#Get ID
print("Get ID")
ID=[0]*4
if not(I2C1server(ROM,ID,reg=0xFC,read=1)): exit() #select RCU
print(ID)
exit()
#Upload version
Ver2=[(c.encode('utf-8')[0]) for c in Ver]
print(len(Ver),Ver,Ver2)
V2=[0]
for i,v in enumerate(Ver2):
time.sleep(0.1)
I2C1server(ROM,[v],reg=R1+i,read=0) #select RCU
# I2C1server(ROM,[v],reg=None,read=0) #select RCU
time.sleep(0.1)
I2C1server(ROM,V2,reg=R1+i,read=1) #select RCU
print(i,v,V2)
#Download version
Ver2=[0]*10
I2C1server(ROM,Ver2,reg=R1,read=1) #select RCU
print(Ver2)
from I2C import *
import time
RCU=3
Ver="RCU2H v0.2"
R1=0
ROM=0x50
#Set switch
print("Set switch")
if not(I2C1server(0x70,[1<<RCU],reg=None,read=0)):
print("Error setting switch!")
exit() #select RCU
exit()
#Get ID
print("Get ID")
ID=[0]*4
if not(I2C1server(ROM,ID,reg=0xFC,read=1)): exit() #select RCU
print(ID)
exit()
#Upload version
Ver2=[(c.encode('utf-8')[0]) for c in Ver]
print(len(Ver),Ver,Ver2)
V2=[0]
for i,v in enumerate(Ver2):
time.sleep(0.1)
I2C1server(ROM,[v],reg=R1+i,read=0) #select RCU
# I2C1server(ROM,[v],reg=None,read=0) #select RCU
time.sleep(0.1)
I2C1server(ROM,V2,reg=R1+i,read=1) #select RCU
print(i,v,V2)
#Download version
Ver2=[0]*10
I2C1server(ROM,Ver2,reg=R1,read=1) #select RCU
print(Ver2)
from I2C import *
import time
RCU=1
Ver="RCU2H v0.2"
R1=0
ROM=0x50
ID=[0]*7
I2C1server(0x20,ID,reg=2,read=1)
print(ID)
I2C1server(0x20,ID,reg=2,read=1)
print(ID)
exit()
#0=0
#1=0 #debug?
#2=21 #spd1
#3=7 #spd2
#4=175 #TXspeed
#5=0
#6=0
#Upload version
Ver2=[(c.encode('utf-8')[0]) for c in Ver]
print(len(Ver),Ver,Ver2)
V2=[0]
for i,v in enumerate(Ver2):
time.sleep(0.1)
I2C1server(ROM,[v],reg=R1+i,read=0) #select RCU
# I2C1server(ROM,[v],reg=None,read=0) #select RCU
time.sleep(0.1)
I2C1server(ROM,V2,reg=R1+i,read=1) #select RCU
print(i,v,V2)
#Download version
Ver2=[0]*10
I2C1server(ROM,Ver2,reg=R1,read=1) #select RCU
print(Ver2)
File moved
#import numpy as np
from .hwdev import hwdev;
import logging
class gpio(hwdev):
def __init__(self,config):
hwdev.__init__(self,config);
logging.info("gpio todo")
from .hwdev import hwdev;
import logging
import signal
try:
import RPi.GPIO as GPIO
except:
GPIO=False;
class gpio_hba_trigger(hwdev):
def __init__(self,config):
hwdev.__init__(self,config);
self.pin=config['parameters'][0];
logging.info("HBAT wait for PPS on pin %i",self.pin)
if GPIO:
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.pin,GPIO.IN)
else:
logging.warn("RPi GPIO module not found, PPS input disable!")
def i2csetget(self,addr,data,reg=None,read=0):
if (read==0) and GPIO:
channel=GPIO.wait_for_edge(self.pin,GPIO.RISING,timeout=1500)
if channel is None:
logging.info("PPS not received!");
return False;
self.conf['parentcls'].i2csetget(addr,data,reg,read)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment