Skip to content
Snippets Groups Projects
Commit a74e0b3b authored by Paulus Kruger's avatar Paulus Kruger
Browse files

UNB2 OPC-UA variables

parent 0f8682af
No related branches found
No related tags found
1 merge request!11Pypcc2
...@@ -111,7 +111,7 @@ variables: ...@@ -111,7 +111,7 @@ variables:
driver: GPIO driver: GPIO
mask: UNB2_mask mask: UNB2_mask
width: 1 width: 1
rw: rw rw: ro
dtype: boolean dtype: boolean
dim: 2 dim: 2
...@@ -195,7 +195,7 @@ variables: ...@@ -195,7 +195,7 @@ variables:
scale: 0.0625 scale: 0.0625
dim: 16 dim: 16
- name: UNB2_FPGA_DDR4_SLOT0_PART_NUMBER - name: UNB2_FPGA_DDR4_SLOT_PART_NUMBER
driver: switch_DDR4 driver: switch_DDR4
mask: UNB2_I2C_bus_DDR4_OK mask: UNB2_I2C_bus_DDR4_OK
devreg: [0x18.0x149,0x19.0x149] devreg: [0x18.0x149,0x19.0x149]
...@@ -214,7 +214,7 @@ variables: ...@@ -214,7 +214,7 @@ variables:
scale: 1.2207e-4 #2^-13 scale: 1.2207e-4 #2^-13
dim: 8 dim: 8
- name: [UNB2_FPGA_POL_RXGXB_VOUT,UNB2_FPGA_POL_RXGXB_IOUT,UNB2_FPGA_POL_RXGXB_TEMP,UNB2_FPGA_POL_TXGXB_VOUT,UNB2_FPGA_POL_TXGXB_IOUT,PUNB2_OL_FPGA_TXGXB_TEMP] - name: [UNB2_FPGA_POL_RXGXB_VOUT,UNB2_FPGA_POL_RXGXB_IOUT,UNB2_FPGA_POL_RXGXB_TEMP,UNB2_FPGA_POL_TXGXB_VOUT,UNB2_FPGA_POL_TXGXB_IOUT,UNB2_POL_FPGA_TXGXB_TEMP]
driver: switch_FPGA_PS driver: switch_FPGA_PS
mask: UNB2_I2C_bus_FGPA_PS_OK mask: UNB2_I2C_bus_FGPA_PS_OK
devreg: [0xE.0x8B,0xE.0x8C,0xE.0x8D,0xF.0x8B,0xF.0x8C,0xF.0x8D] devreg: [0xE.0x8B,0xE.0x8C,0xE.0x8D,0xF.0x8B,0xF.0x8C,0xF.0x8D]
......
import yaml
import struct
import time
def Find(L,name,value):
for x in L:
if x[name]==value:
return x;
return False;
def GetField(D,name,dev_number,default=None):
X=D.get(name,default)
return X[dev_number] if isinstance(X,list) else X;
class yamlconfig():
def __init__(self,yamlfile='RCU'):
self.conf=yaml.load(open("config/"+yamlfile+'.yaml'))
var1=self.conf['variables']
N=len(var1)
# print([v['name'] for v in self.conf['variables']])
for i in range(N-1,-1,-1):
#print(var1[i])
if isinstance(var1[i]['name'],list):
for x,name in enumerate(var1[i]['name']):
var2=var1[i].copy()
var2['name']=name
var2['devreg']=GetField(var1[i],'devreg',x)
var2['scale']=GetField(var1[i],'scale',x,1)
var1.append(var2)
N+=1;
var1.remove(var1[i])
N-=1;
# print([[v['name'],v.get('devreg')] for v in var1])
# print(len(self.conf['variables']),N)
for i,v in enumerate(self.conf['variables']):
v['id']=i
for i,v in enumerate(self.conf['methods']):
v['id']=i
import yaml import yaml
import struct import struct
import time import time
def Find(L,name,value): from opcuaserv.yamlconfig import *
for x in L:
if x[name]==value:
return x;
return False;
def bytes2int(bts): def bytes2int(bts):
x=0; x=0;
...@@ -21,13 +17,9 @@ def int2bytes(i): ...@@ -21,13 +17,9 @@ def int2bytes(i):
return [i]+b; return [i]+b;
class yamlreader(): class yamlreader(yamlconfig):
def __init__(self,i2cserver,yamlfile='RCU'): def __init__(self,i2cserver,yamlfile='RCU'):
self.conf=yaml.load(open("config/"+yamlfile+'.yaml')) yamlconfig.__init__(self,yamlfile)
for i,v in enumerate(self.conf['variables']):
v['id']=i
for i,v in enumerate(self.conf['methods']):
v['id']=i
self.server=i2cserver; self.server=i2cserver;
......
...@@ -14,6 +14,7 @@ parser = argparse.ArgumentParser() ...@@ -14,6 +14,7 @@ parser = argparse.ArgumentParser()
parser.add_argument("-s", "--simulator", help="Do not connect to I2c, but simulate behaviour.", action="store_true") parser.add_argument("-s", "--simulator", help="Do not connect to I2c, but simulate behaviour.", action="store_true")
parser.add_argument("-p", "--port", help="Port number to listen on [%(default)s].", type=int, default=4842) parser.add_argument("-p", "--port", help="Port number to listen on [%(default)s].", type=int, default=4842)
parser.add_argument("-l", "--loglevel", help="Log level [%(default)s].", type=str, choices=["DEBUG","INFO","WARNING","ERROR"], default="INFO") parser.add_argument("-l", "--loglevel", help="Log level [%(default)s].", type=str, choices=["DEBUG","INFO","WARNING","ERROR"], default="INFO")
#parser.add_argument("-c", "--config", help="YAML config file.",type=str, action="store_true",default='RCU')
args = parser.parse_args() args = parser.parse_args()
# set log level # set log level
...@@ -33,16 +34,16 @@ signal.signal(signal.SIGINT, signal_handler) ...@@ -33,16 +34,16 @@ signal.signal(signal.SIGINT, signal_handler)
#Start i2c processes as soon as possible to have minimum duplication #Start i2c processes as soon as possible to have minimum duplication
logging.info("Start I2C processes") logging.info("Start I2C processes")
I2Cports=['RCU'] I2Cports=['UNB2']
#I2Cports=[] #I2Cports=[]
threads=[] threads=[]
I2Cclients=[] I2Cclients=[]
for name in I2Cports: for name in I2Cports:
RCU_I2C=i2client.i2client(name=name) RCU_I2C=i2client.i2client(name=name)
thread1=i2cthread.start(*RCU_I2C.GetInfo()) if not(args.simulator):
threads.append(thread1) thread1=i2cthread.start(*RCU_I2C.GetInfo())
threads.append(thread1)
I2Cclients.append(RCU_I2C) I2Cclients.append(RCU_I2C)
#Initialise OPCUA server #Initialise OPCUA server
logging.info("Initialised OPC-UA Server") logging.info("Initialised OPC-UA Server")
opcuaserv.InitServer(port=args.port) opcuaserv.InitServer(port=args.port)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment