Skip to content
Snippets Groups Projects
Commit a2ba9eb2 authored by Paulus Kruger's avatar Paulus Kruger
Browse files

Merge branch 'test_setup' into 'master'

Test setup

See merge request !18
parents 13b4a63d 439267ae
No related branches found
No related tags found
1 merge request!18Test setup
Pipeline #44171 passed
......@@ -584,7 +584,7 @@ variables:
read_parallel: all
- name: RCU_DTH_PWR
description: RCU Dither source power
description: RCU Dither source power (dBm). Range -25 to -4.
driver: I2C_RCU
devreg: [DTH1.PA_CONFIG,DTH2.PA_CONFIG,DTH3.PA_CONFIG]
width: 48
......@@ -613,13 +613,14 @@ variables:
#RCU2H points
- name: RCU_DAB_select
description: Band select ...
description: DAB filter enable 1=enabled, 2=disabled
driver: I2C_RCU
devreg: [IO4.GPIO1,IO4.GPIO1,IO4.GPIO1]
bitoffset: [0,2,4]
width: 2
rw: rw
dtype: uint8
dtype: boolean
convert_unit: int12_to_bool
dim: 96
dim2: [3,32]
mask: ANT_mask
......
version: "1.0"
description: "1234"
name: "RECVTR"
drivers:
- name: I2C1 #TCA9548
type: i2c_iss
parameters: [3] #COM port
- name: I2C_RCU
type: i2c_dev #I2C devices
parent: I2C1
status: CLK_I2C_STATUS
- name: I2Cbb1
type: i2cbitbang1 #I2C bitbang via GPIO expander
devreg: [IO3.GPIO2,IO3.GPIO2,IO3.CONF2]
parameters: [5,6,6] #pins
parent: I2C_RCU
- name: I2Cbb2
type: i2cbitbang1
devreg: [IO3.GPIO2,IO3.GPIO2,IO3.CONF2]
parameters: [4,6,6]
parent: I2C_RCU
- name: I2Cbb3
type: i2cbitbang1
devreg: [IO3.GPIO2,IO3.GPIO2,IO3.CONF2]
parameters: [3,6,6]
parent: I2C_RCU
- name: SPIbb1
type: spibitbang1 #SPI bitbang via GPIO expander: CLK, SDIO, SDIOdir,CS
devreg: [IO3.GPIO1,IO3.GPIO1,IO3.CONF1,IO3.GPIO2]
parameters: [1,0,0,0]
parent: I2C_RCU
- name: SPIbb2
type: spibitbang1
devreg: [IO3.GPIO1,IO3.GPIO1,IO3.CONF1,IO3.GPIO2]
parameters: [3,2,2,1]
parent: I2C_RCU
- name: SPIbb3
type: spibitbang1
devreg: [IO3.GPIO1,IO3.GPIO1,IO3.CONF1,IO3.GPIO2]
parameters: [5,4,4,2]
parent: I2C_RCU
#This is the I2C devices in the RCU
device_registers:
- name: IO
dim: 3
description: [IO-Expander for filter selection,IO-Expander for ON/OFF, Band, BUFx2,IO-Expander for ADC control]
address: [0x75,0x76,0x20]
device: [TCA9539,TCA9539,TCA6416]
driver: I2C_RCU
registers:
- name: CONF1
description: Direction of port1
address: 6
store: True
- name: CONF2
description: Direction of port2
address: 7
store: True
- name: GPIO1
description: Input/Ouput port 1
address: [0,2] #Read / Write address different
store: True
- name: GPIO2
description: Input/Ouput port 2
address: [1,3]
store: True
- name: DIR1
address: 4
- name: DIR2
address: 5
- name: ROM
description: IO-Expander for filter selection
address: 0x50
driver: I2C1
registers:
- name: ID
description: Random
address: 0xfc
- name: Version
description: Set in production
address: 0
- name: Serial
address: 0x20
- name: AN
description: Monitor ADC on RCU
address: 0x74 #was 0x14 on RCU2-DIG PCBs
device: LTC2495
driver: I2C1
registers:
- name: V_1v8
address: 0xB080
- name: V_2v5
address: 0xB880
- name: V_3v3
address: 0xB180
- name: I_Ant0
address: 0xB980
- name: I_Ant1
address: 0xB280
- name: I_Ant2
address: 0xBA80
- name: V_Ant_I0
address: 0xB380
- name: V_Ant_O0
address: 0xBB80
- name: V_Ant_I1
address: 0xB480
- name: V_Ant_O1
address: 0xBC80
- name: V_Ant_I2
address: 0xB580
- name: V_Ant_O2
address: 0xBD80
- name: Temp
address: 0xA0C0
#This 'special' devices that uses I2C
- name: ADC
dim: 3
description: ADC SPI control
device: AD9683
driver: [SPIbb1,SPIbb2,SPIbb3]
registers:
- name: PLL_stat
description: PLL locked status
address: 0x0A
- name: JESD_control1
description: JESD link control
address: 0x5F
- name: SYNC_control
address: 0x3A
- name: CML_level
description: CML output adjust
address: 0x15
- name: Update
description: Global device uptate
address: 0xFF
- name: testmode
address: 0x0D
- name: dither
address: 0x0C
- name: JESDscramble
address: 0x6E
- name: PDWNmodes
address: 0x08
- name: DCcorrect
address: 0x40
- name: DTH
dim: 3
description: CW dither source
device: SI4012
driver: [I2Cbb1,I2Cbb2,I2Cbb3]
address: 0x70
registers:
- name: Freq
description: Frequency
address: [0x1240,0x1140]
- name: Property
description: Properties
address: [0x12,0x11]
- name: State
address: [0x61,0x60]
- name: Start
description: Start CW
address: [0x62,0x62]
- name: Stop
description: Stop CW
address: [0x67,0x67]
- name: Rev
address: 0x10
- name: Tune
address: [0x1221,0x1121]
- name: CONF
address: [0x1210,0x1110]
- name: PA_CONFIG
address: [0x1260,0x1160]
store: True
variables:
- name: RCU_I2C_STATUS
driver: I2C_RCU
rw: ro #server RW variable, not linked to IO
dtype: uint8
- name: [CH1_attenuator_dB,CH2_attenuator_dB,CH3_attenuator_dB]
description: RCU Attenuation (dB)
driver: I2C_RCU
devreg: [IO1.GPIO1,IO1.GPIO2,IO2.GPIO1]
bitoffset: [0,0,0]
width: 6
rw: rw
dtype: uint8
dim: 3
- name: [CH1_band_select,CH2_band_select,CH3_band_select]
description: Band select 1=10MHz,2=30MHz
driver: I2C_RCU
devreg: [IO2.GPIO2,IO2.GPIO2,IO2.GPIO2]
bitoffset: [0,2,4]
width: 2
rw: rw
dtype: uint8
# dim: 3
- name: [RCU_IO1_GPIO1,RCU_IO1_GPIO2,RCU_IO2_GPIO1,RCU_IO2_GPIO2,RCU_IO3_GPIO1,RCU_IO3_GPIO2]
driver: I2C_RCU
devreg: [IO1.GPIO1,IO1.GPIO2,IO2.GPIO1,IO2.GPIO2,IO3.GPIO1,IO3.GPIO2]
width: 8
rw: ro
dtype: uint8
- name: RCU_LED_red_on
driver: I2C_RCU
description: Red LED on front panel
devreg: IO2.GPIO2
bitoffset: 6
width: 1
rw: rw
dtype: boolean
convert_unit: bool_invert
- name: RCU_LED_green_on
driver: I2C_RCU
description: Green LED on front panel
devreg: IO2.GPIO2
bitoffset: 7
width: 1
rw: rw
dtype: boolean
convert_unit: bool_invert
- name: RCU_TEMP
description: RCU PCB Temperature (Celsius)
driver: I2C_RCU
devreg: AN.Temp
width: 23
scale: 3.827e-3 #T=DATAOUT * Vref/12.25/32. Vreg=1.5 V
convert_unit: Kelvin2Celsius
rw: ro
dtype: double
- name: RCU_PWR_3V3
description: RCU 3.3V voltage (V)
driver: I2C_RCU
devreg: AN.V_3v3
width: 23
scale: 1.463e-6
rw: ro
dtype: double
- name: RCU_PWR_1V8
description: RCU 1.8V voltage (V)
driver: I2C_RCU
devreg: AN.V_1v8
width: 23
scale: 7.1526e-7
rw: ro
dtype: double
- name: RCU_PWR_2V5
description: RCU 2.5V voltage (V)
driver: I2C_RCU
devreg: AN.V_2v5
width: 23
scale: 7.1526e-7
rw: ro
dtype: double
- name: [CH1_PWR_ANT_VOUT,CH2_PWR_ANT_VOUT,CH3_PWR_ANT_VOUT]
description: Voltage on antenna output of RCU (V). Controlled by ANT_PWR_ON.
driver: I2C_RCU
devreg: [AN.V_Ant_O0,AN.V_Ant_O1,AN.V_Ant_O2]
width: 23
scale: 2.7895e-6
rw: ro
dtype: double
- name: [CH1_PWR_ANT_VIN,CH2_PWR_ANT_VIN,CH3_PWR_ANT_VIN]
description: RCU antenna voltage before switch (V)
driver: I2C_RCU
devreg: [AN.V_Ant_I0,AN.V_Ant_I1,AN.V_Ant_I2]
width: 23
scale: 2.7895e-6
rw: ro
dtype: double
- name: [CH1_PWR_ANT_IOUT,CH2_PWR_ANT_IOUT,CH3_PWR_ANT_IOUT]
description: Current drawn on antenna output of RCU (A). 15mA offset due to LED on RCU
driver: I2C_RCU
devreg: [AN.I_Ant0,AN.I_Ant1,AN.I_Ant2]
width: 23
scale: 7.1526e-8
rw: ro
dtype: double
- name: RCU_PWR_DIGITAL_on
description: RCU digital power enable. Controlled by calling RCU_on/RCU_off
driver: I2C_RCU
devreg: IO2.GPIO1
width: 1
bitoffset: 6
rw: ro #rw for testing
dtype: boolean
- name: RCU_PWR_good
description: Status of RCU power given by LDOs.
driver: I2C_RCU
devreg: IO2.GPIO1
width: 1
bitoffset: 7
rw: ro
dtype: boolean
- name: RCU_PWR_ANALOG_on
description: RCU analog power enable. Controlled by RCU_on/off
driver: I2C_RCU
devreg: IO1.GPIO2
width: 1
bitoffset: 7
rw: rw #rw for testing
dtype: boolean
- name: [CH1_DTH_shutdown,CH2_DTH_shutdown,CH3_DTH_shutdown]
description: False means dither source & ADC powered on.
driver: I2C_RCU
devreg: [IO3.GPIO1,IO3.GPIO1,IO3.GPIO2]
width: 1
bitoffset: [7,6,7]
rw: ro
dtype: boolean
- name: [CH1_PWR_ANT_on,CH2_PWR_ANT_on,CH3_PWR_ANT_on]
description: Antenna power output ON/OFF control. Monitored by Ant_VOUT.
driver: I2C_RCU
devreg: [IO1.GPIO1,IO1.GPIO1,IO1.GPIO2]
width: 1
bitoffset: [6,7,6]
rw: rw
dtype: boolean
- name: RCU_PCB_ID
description: Unique PCB ID
driver: I2C_RCU
devreg: ROM.ID
width: 32
rw: ro
dtype: uint32
- name: RCU_PCB_version
description: RCU version number
driver: I2C_RCU
devreg: ROM.Version
width: 0x80 #16 characters
rw: ro
dtype: string
- name: RCU_PCB_number
description: PCB number (astron.nl/webforms/IenS-Boarden/view.php?id=xxx)
driver: I2C_RCU
devreg: ROM.Serial
width: 0x80 #16 characters
rw: ro
dtype: string
- name: [CH1_ADC_locked,CH2_ADC_locked,CH3_ADC_locked]
description: RCU ADC lock status. May generated noise and polling disabled in future
driver: I2C_RCU
devreg: [ADC1.PLL_stat,ADC2.PLL_stat,ADC3.PLL_stat]
width: 1
rw: ro
dtype: boolean
# dim: 3
- name: [CH1_ADC_sync,CH2_ADC_sync,CH3_ADC_sync]
driver: I2C_RCU
devreg: [ADC1.SYNC_control,ADC2.SYNC_control,ADC3.SYNC_control]
width: 8
rw: ro
dtype: uint8
- name: [CH1_ADC_JESD,CH2_ADC_JESD,CH3_ADC_JESD]
driver: I2C_RCU
devreg: [ADC1.JESD_control1,ADC2.JESD_control1,ADC3.JESD_control1]
width: 8
rw: ro
dtype: uint8
- name: [CH1_ADC_CML_level,CH2_ADC_CML_level,CH3_ADC_CML_level]
driver: I2C_RCU
devreg: [ADC1.CML_level,ADC2.CML_level,ADC3.CML_level]
width: 8
rw: ro
dtype: uint8
- name: [CH1_DTH_freq,CH2_DTH_freq,CH3_DTH_freq]
description: RCU Dither source frequency (Hz). Should be around 102MHz.
driver: I2C_RCU
devreg: [DTH1.Freq,DTH2.Freq,DTH3.Freq]
width: 32
rw: rw
dtype: uint32
- name: [CH1_DTH_PWR,CH2_DTH_PWR,CH3_DTH_PWR]
description: RCU Dither source power
driver: I2C_RCU
devreg: [DTH1.PA_CONFIG,DTH2.PA_CONFIG,DTH3.PA_CONFIG]
width: 48
rw: rw
dtype: double
scale: si4012_6bytes_to_pwr
# - name: RCU_DTH_tune
# driver: I2C_RCU
# devreg: [DTH1.Tune,DTH2.Tune,DTH3.Tune]
# width: 16
# rw: rw
# dtype: uint32
# dim: 96
# dim2: [3,32]
# mask: ANT_mask
# debug: true
# read_parallel: all
# - name: RCU_DTH_config
# driver: I2C_RCU
# devreg: [DTH1.CONF,DTH2.CONF,DTH3.CONF]
# width: 8
# rw: rw
# dtype: uint32
# dim: 96
# dim2: [3,32]
# mask: ANT_mask
# debug: true
# read_parallel: all
- name: [CH1_DTH_on,CH2_DTH_on,CH3_DTH_on]
description: RCU Dither on. Controlled by RCU_DTH_on/off
driver: I2C_RCU
devreg: [DTH1.State,DTH2.State,DTH3.State]
width: 1 #read only first of 2 bytes
bitoffset: [1,1,1]
rw: ro
dtype: boolean
# - name: RCU_DTH_Rev
# driver: I2C_RCU
# devreg: [DTH1.Rev,DTH2.Rev,DTH3.Rev]
# width: 88
# rw: rw
# dtype: uint32
# dim: 96
# dim2: [3,32]
# mask: ANT_mask
# debug: true
# read_parallel: all
methods:
- name: RECVTR_Init #Called after startup to load. Should have all stored registers
driver: I2C_RCU
debug: True
instructions:
- RCU_IO1_GPIO1: Update
- RCU_IO1_GPIO2: Update
- RCU_IO2_GPIO1: Update
- RCU_IO2_GPIO2: Update
- RCU_IO3_GPIO1: Update
- RCU_IO3_GPIO2: Update
import os
#if os.sys.platform is 'linux':
from usb_iss import UsbIss, defs
import time
import logging
#read=0: write to register
#read=1: read from register
#read=2: write to register (common in group)
#read=3: wait ms second
from .hwdev import hwdev;
class i2c_iss(hwdev):
def __init__(self,config):
hwdev.__init__(self,config);
self.I2Cdev='com'+str(config['parameters'][0]);
logging.info("iss i2c driver on "+self.I2Cdev)
try:
self.iss=UsbIss()
self.iss.open(self.I2Cdev)
self.iss.setup_i2c(100,True,None,None)
# print(self.iss.read_iss_mode())
except:
logging.warning("I2C ISS failed!")
self.iss=None
# print(self.iss.test(0x20))
self.I2Ccounter=0
def i2csetget(self,addr,data,reg=None,read=0):
logging.debug(str(("I2C",addr,reg,data,read)))
if self.iss is None: return True #Should be false, only for testing!!!
try:
# if True:
if read==3:
time.sleep(data[0]/1000.)
return True
if read==1: #read some data
length=len(data)
if not(reg is None):
#bus.ioctl_write(0,bytes(bytearray([reg])))
# self.iss.i2c.write_ad0(addr,[reg])
if reg>255:
reg=[(reg//256)%256,reg%256]
time.sleep(0.3)
self.iss.i2c.write_ad0(addr,reg)
time.sleep(0.3)
data[:]=[int(x) for x in self.iss.i2c.read_ad0(addr,length)]
else:
data[:]=[int(x) for x in self.iss.i2c.read(addr,reg,length)]
# time.sleep(0.500)
# data[:]=[int(x) for x in bus.ioctl_read(0,length)]
else:
data[:]=[int(x) for x in self.iss.i2c.read_ad0(addr,length)]
logging.debug(str(("I2C get",addr,reg,data,read)))
# print("I2C read",addr,reg,data,read)
else:
logging.debug(str(("I2C set",addr,reg,data,read)))
if reg is None:
# bus.iaddr_bytes=0
# reg=0;
self.iss.i2c.write_ad0(addr,data)
else:
#TXlen=bus.ioctl_write(reg,bytes(bytearray(data)))
self.iss.i2c.write(addr,reg,data)
# if TXlen<0:
# bus.close()
# return False;
# if (TXlen!=len(data)): logging.warn("I2C write error %i/%i"%(TXlen,len(data)))
# bus.close()
return True;
except:
# else:
# if bus: bus.close()
logging.warning("I2C failed!")
if len(data)>0: data[0]=0
return False;
......@@ -11,3 +11,9 @@ def bool_invert(T):
def temp_check(T):
if (T<-100) or (T>200): return float('nan')
return T;
def int12_to_bool_inv(data):
return 1 if data else 2
def int12_to_bool(data):
return data==1
\ No newline at end of file
......@@ -46,3 +46,8 @@ def si4012_6bytes_to_pwr_inv(data):
pwr+=127 #fBetaSteps #default
return pwr
def bool_to_int12(data):
return 1 if data else 2
def bool_to_int12_inv(data):
return data==1
\ No newline at end of file
......@@ -18,6 +18,110 @@ def int2bytes(i):
i>>=8;
return [i]+b;
def byte2var(v,data):
# v=self.conf['variables'][id1];
dtype=v.get('dtype','integer');
width=(v.get('width',8)-1)//8+1
endian=v.get('endian','>');
logging.debug(str(("OPCset",width,data)))
notvalid=[(d is None) for d in data[::width]]
cnt=int(len(data)//width)
assert(cnt==len(notvalid))
# print(notvalid)
for x in range(len(data)):
if data[x] is None: data[x]=0;
# print(data)
if dtype=="boolean":
data2=[d==1 for d in data];
convert=v.get("convert_unit")
if convert:
data2=[eval("convert_unit."+convert)(d) for d in data2]
for x in range(cnt):
if notvalid[x]: data2[x]=False;
elif (dtype in ['uint8','uint16','uint32','uint64','double']):
data=bytearray(data)
if width<=1:
data2=[d for d in data]
elif width==2:
data2 = struct.unpack(endian+'%sH' % (len(data)//2), data)
elif width==3:
data2 = [struct.unpack(endian+'L' ,bytearray([0])+data[x*3:x*3+3])[0] for x in range(len(data)//3)]
elif width==4:
data2 = struct.unpack(endian+'%sL' % (len(data)//4), data)
elif width==6:
data2 = [struct.unpack(endian+'Q' ,bytearray([0,0])+data[x*6:x*6+6])[0] for x in range(len(data)//6)]
elif width==8:
data2 = struct.unpack(endian+'%sQ' % (len(data)//8), data)
else:
logging.warn("OPCset"+v['name']+" unsupported width!"+str(width))
return;
elif dtype=="string":
# data2=[(bytearray(data[i*width:(i+1)*width]).decode("utf-8")) for i in range(cnt)]
data2=[bytearray(data)[i*width:(i+1)*width].decode("utf-8",errors='ignore') for i in range(cnt)]
for x in range(cnt):
if notvalid[x]: data2[x]="";
else:
logging.warn("OPCset unsupported type");
return;
if dtype=="double":
scale=v.get('scale',1.)
if scale in ["smbus_2bytes_to_float","smbus_2bytes_to_float_exp12","smbus_2bytes_to_float_exp13","si4012_6bytes_to_pwr"]:
data2=[float(eval(scale)(d)) for d in data2]
else:
scale=float(scale)
data2=[(d*scale) for d in data2]
convert=v.get("convert_unit")
if convert: data2=[eval("convert_unit."+convert)(d) for d in data2]
for x in range(cnt):
if notvalid[x]: data2[x]=float('nan')
return data2
def var2byte(v,data):
endian=v.get('endian','>');
mask=v.get('maskOPC',None);
mask=mask.get_value() if (mask!=None) else [];
# print("M2:",mask)
dtype=v.get('dtype','integer');
width=(v.get('width',8)-1)//8+1
if not(isinstance(data,list)): data=[data];
if (dtype=="double"):
scale=v.get('scale',1.)
if scale in ["si4012_6bytes_to_pwr"]:
data=[int(eval(scale+"_inv")(d)) for d in data]
else:
scale=float(scale)
data=[int(d/scale) for d in data]
if (dtype=="boolean"):
convert=v.get("convert_unit")
if convert: data=[eval("convert_unit."+convert+"_inv")(d) for d in data]
data2=bytearray(data*1);
elif (dtype in ['uint8','uint16','uint32','uint64','double']):
if width<=1:
data2=bytearray(data)
elif width==2:
data2 = struct.pack(endian+'%sH' % len(data), *data)
elif width==3:
data2=bytearray()
for a in data: data2.extend(struct.pack('>L',a)[1:])
elif width==4:
data2 = struct.pack(endian+'%sL' % len(data), *data)
elif width==6:
data2=bytearray()
for a in data: data2.extend(struct.pack('>Q',a)[2:])
elif width==8:
data2 = struct.pack(endian+'%sQ' % len(data), *data)
else:
logging.warn("setvar"+v['name']+" unsupported width!"+str(width))
return None;
elif (dtype=="string"):
data2=bytearray()
for s in data:
data2.extend('{s:>{width}'.format(s=s[:width],width=width).encode('ascii'));
else:
logging.warn("setvar unsupported type");
return None;
data2=[d for d in data2]
return data2
class yamlreader(yamlconfig):
def __init__(self,i2cserver,yamlfile='RCU'):
......@@ -128,51 +232,9 @@ class yamlreader(yamlconfig):
logging.info("Update variable")
var1.set_value(data);
return;
endian=v.get('endian','>');
mask=v.get('maskOPC',None);
mask=mask.get_value() if (mask!=None) else [];
# print("M2:",mask)
dtype=v.get('dtype','integer');
width=(v.get('width',8)-1)//8+1
if not(isinstance(data,list)): data=[data];
if (dtype=="double"):
scale=v.get('scale',1.)
if scale in ["si4012_6bytes_to_pwr"]:
data=[int(eval(scale+"_inv")(d)) for d in data]
else:
scale=float(scale)
data=[int(d/scale) for d in data]
if (dtype=="boolean"):
convert=v.get("convert_unit")
if convert: data=[eval("convert_unit."+convert)(d) for d in data]
data2=bytearray(data*1);
elif (dtype in ['uint8','uint16','uint32','uint64','double']):
if width<=1:
data2=bytearray(data)
elif width==2:
data2 = struct.pack(endian+'%sH' % len(data), *data)
elif width==3:
data2=bytearray()
for a in data: data2.extend(struct.pack('>L',a)[1:])
elif width==4:
data2 = struct.pack(endian+'%sL' % len(data), *data)
elif width==6:
data2=bytearray()
for a in data: data2.extend(struct.pack('>Q',a)[2:])
elif width==8:
data2 = struct.pack(endian+'%sQ' % len(data), *data)
else:
logging.warn("setvar"+v['name']+" unsupported width!"+str(width))
return;
elif (dtype=="string"):
data2=bytearray()
for s in data:
data2.extend('{s:>{width}'.format(s=s[:width],width=width).encode('ascii'));
else:
logging.warn("setvar unsupported type");
return;
data2=[d for d in data2]
data2=var2byte(v,data)
logging.info(str(("setvar ",v['name'],data2,mask)));
if data2 is None: return
self.SetBusy()
self.server.setvar(id1,data2,mask)
......@@ -194,60 +256,7 @@ class yamlreader(yamlconfig):
def OPCset(self,id1,data,mask):
v=self.conf['variables'][id1];
dtype=v.get('dtype','integer');
width=(v.get('width',8)-1)//8+1
endian=v.get('endian','>');
logging.debug(str(("OPCset",width,data)))
notvalid=[(d is None) for d in data[::width]]
cnt=int(len(data)//width)
assert(cnt==len(notvalid))
# print(notvalid)
for x in range(len(data)):
if data[x] is None: data[x]=0;
# print(data)
if dtype=="boolean":
data2=[d==1 for d in data];
convert=v.get("convert_unit")
if convert:
data2=[eval("convert_unit."+convert)(d) for d in data2]
for x in range(cnt):
if notvalid[x]: data2[x]=False;
elif (dtype in ['uint8','uint16','uint32','uint64','double']):
data=bytearray(data)
if width<=1:
data2=[d for d in data]
elif width==2:
data2 = struct.unpack(endian+'%sH' % (len(data)//2), data)
elif width==3:
data2 = [struct.unpack(endian+'L' ,bytearray([0])+data[x*3:x*3+3])[0] for x in range(len(data)//3)]
elif width==4:
data2 = struct.unpack(endian+'%sL' % (len(data)//4), data)
elif width==6:
data2 = [struct.unpack(endian+'Q' ,bytearray([0,0])+data[x*6:x*6+6])[0] for x in range(len(data)//6)]
elif width==8:
data2 = struct.unpack(endian+'%sQ' % (len(data)//8), data)
else:
logging.warn("OPCset"+v['name']+" unsupported width!"+str(width))
return;
elif dtype=="string":
# data2=[(bytearray(data[i*width:(i+1)*width]).decode("utf-8")) for i in range(cnt)]
data2=[bytearray(data)[i*width:(i+1)*width].decode("utf-8",errors='ignore') for i in range(cnt)]
for x in range(cnt):
if notvalid[x]: data2[x]="";
else:
logging.warn("OPCset unsupported type");
return;
if dtype=="double":
scale=v.get('scale',1.)
if scale in ["smbus_2bytes_to_float","smbus_2bytes_to_float_exp12","smbus_2bytes_to_float_exp13","si4012_6bytes_to_pwr"]:
data2=[float(eval(scale)(d)) for d in data2]
else:
scale=float(scale)
data2=[(d*scale) for d in data2]
convert=v.get("convert_unit")
if convert: data2=[eval("convert_unit."+convert)(d) for d in data2]
for x in range(cnt):
if notvalid[x]: data2[x]=float('nan')
data2=byte2var(v,data)
var1=v.get('OPCR',None)
if var1 is None: var1=v.get('OPCW',None)
if var1 is None:
......
#from opcua import ua, Server
from asyncua.sync import ua, Server
from time import sleep
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
def datachange_notification(self, node, val, data):
print("Datachange callback",node,val,data)
# def datachange_notification(self, node, val, data):
# logging.warning(str(("Python: New data change event", node, val, data.monitored_item.Value.StatusCode)))
# logging.warning(str(("Python: Data came from client: ", (data.monitored_item.Value.StatusCode == ua.StatusCode(ua.StatusCodes.Good)))))
# if data came from client, mark the status to get a new notification if the client rewrites the same value
if data.monitored_item.Value.StatusCode == ua.StatusCode(ua.StatusCodes.Good):
print(str(("Python: New client data change event", node, val, data.monitored_item.Value.StatusCode)))
#node.set_value(ua.DataValue(val, ua.StatusCode(ua.StatusCodes.GoodCompletesAsynchronously)))
#data.monitored_item.Value.StatusCode=ua.StatusCodes.GoodCompletesAsynchronously
if True:
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:1234/")
idx = server.register_namespace("http://lofar.eu")
print("idx=",idx)
PCCobj = server.nodes.objects #server.get_objects_node()
handler = SubHandler()
sub = server.create_subscription(50, handler)
myvar2 = PCCobj.add_variable(idx, "test1", 1.5)
myvar2.set_writable()
handle = sub.subscribe_data_change(myvar2)
server.start()
sleep(0.3)
from asyncua.sync import Client
#from opcua import Client
#with Client(url='opc.tcp://localhost:1234/') as client:
# while True:
# Do something with client
# node = client.get_node('i=1')
# value = node.read_value()
# print(value)
client = Client("opc.tcp://localhost:1234/")
try:
client.connect()
#client.load_type_definitions() # load definition of server specific structures/extension objects
# Client has a few methods to get proxy to UA nodes that should always be in address space such as Root or Objects
#root = client.get_root_node()
#print("Root node is: ", root)
#objects = client.get_objects_node()
#print("Objects node is: ", objects)
# Node objects have methods to read and write node attributes as well as browse or populate address space
#print("Children of root are: ", root.get_children())
# get a specific node knowing its node id
#var = client.get_node(ua.NodeId(1002, 2))
var = client.get_node("ns=2;i=1")
# handler = SubHandler()
# sub = client.create_subscription(500, handler)
# handle = sub.subscribe_data_change(var)
#var = client.get_node("ns=2;g=1be5ba38-d004-46bd-aa3a-b5b87940c698")
#print(var)
#var.get_data_value() # get value of node as a DataValue object
#print(var.get_value()) # get value of node as a python builtin
#var.set_value(ua.Variant([23], ua.VariantType.Int64)) #set node value using explicit data type
var.set_value(2.5) # set node value using implicit data type
sleep(0.1)
#print(var.get_value()) # get value of node as a python builtin
var.set_value(2.5) # set node value using implicit data type
sleep(0.1)
#print(var.get_value()) # get value of node as a python builtin
sleep(0.3)
finally:
client.disconnect()
server.stop()
......@@ -31,7 +31,7 @@ class yamlconfig():
def __init__(self,yamlfile='RCU'):
pkg = importlib_resources.files("pypcc")
pkg_data_file = pkg / "config" / (yamlfile+'.yaml')
self.conf=yaml.load(pkg_data_file.open())
self.conf=yaml.load(pkg_data_file.open(), Loader=yaml.FullLoader)
self.expand_variables()
# print([[v['name'],v.get('devreg')] for v in var1])
# print(len(self.conf['variables']),N)
......
File moved
from i2cdirect import i2cdirect
from time import sleep
import logging
logging.basicConfig(level="ERROR",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
#logging.basicConfig(level="DEBUG",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
d1=i2cdirect('RECVTR_LB_TEST')
def RCU_off(d1):
logging.warning("Switch RCU Power off");
d1.SetRegister("IO1.GPIO1",[0])
d1.SetRegister("IO1.GPIO2",[0x0]) #Analog power off
d1.SetRegister("IO2.GPIO1",[0x0]) #Digital power off
d1.SetRegister("IO2.GPIO2",[0x0])
d1.SetRegister("IO3.GPIO1",[0x0])
d1.SetRegister("IO3.GPIO2",[0x0])
d1.SetRegister("IO1.CONF1",[0])
d1.SetRegister("IO1.CONF2",[0])
d1.SetRegister("IO2.CONF1",[0x80]) #Pgood on P07
d1.SetRegister("IO2.CONF2",[0])
def RCU_on(d1):
logging.warning("Switch RCU Power on");
d1.SetRegister("IO2.CONF1",[0x80]) #Pgood on P07
d1.SetRegister("IO2.GPIO1",[0x4A]) #Digital power on
d1.SetRegister("IO2.GPIO2",[0x55])
d1.SetRegister("IO3.GPIO1",[0x15]) #ADC_SDIO=high, clk=low, DTH_EN=low
d1.SetRegister("IO3.GPIO2",[0x47]) #ADC SC=high, DTH_SDA=high
d1.SetRegister("IO1.GPIO1",[0x0A])
d1.SetRegister("IO1.GPIO2",[0x8A]) #Analog power on
d1.SetRegister("IO2.CONF2",[0])
d1.SetRegister("IO3.CONF1",[0]) #All output
d1.SetRegister("IO3.CONF2",[0])
d1.SetRegister("IO3.DIR1",[0]) #All output
d1.SetRegister("IO3.DIR2",[0])
d1.SetRegister("IO1.CONF1",[0])
d1.SetRegister("IO1.CONF2",[0])
def ADC_config(d1):
logging.warning("Configure ADCs");
d1.SetRegister("ADC1.JESD_control1",[0x14])
d1.SetRegister("ADC1.SYNC_control",[1])
d1.SetRegister("ADC1.CML_level",[0x7])
d1.SetRegister("ADC1.dither",[0x0])
d1.SetRegister("ADC1.Update",[1])
d1.SetRegister("ADC2.JESD_control1",[0x14])
d1.SetRegister("ADC2.SYNC_control",[1])
d1.SetRegister("ADC2.CML_level",[0x7])
d1.SetRegister("ADC2.dither",[0x0])
d1.SetRegister("ADC2.Update",[1])
d1.SetRegister("ADC3.JESD_control1",[0x14])
d1.SetRegister("ADC3.SYNC_control",[1])
d1.SetRegister("ADC3.CML_level",[0x7])
d1.SetRegister("ADC3.dither",[0x0])
d1.SetRegister("ADC3.Update",[1])
def enable_ant_pwr(d1):
logging.warning("Switch antenna output power on");
d1.SetVal("CH1_PWR_ANT_on",[True])
d1.SetVal("CH2_PWR_ANT_on",[True])
d1.SetVal("CH3_PWR_ANT_on",[True])
def dither_config(d1): #Switch dither on
logging.warning("Configure Dither sources");
d1.SetVal("CH1_DTH_freq",[101000000])
d1.SetVal("CH1_DTH_PWR",[-10])
d1.SetRegister("DTH1.CONF",[0,0,0])
d1.SetRegister("DTH1.Tune",[0,0])
d1.SetRegister("DTH1.Start",[0,1,0,0,1])
d1.SetVal("CH2_DTH_freq",[102000000])
d1.SetVal("CH2_DTH_PWR",[-10])
d1.SetRegister("DTH2.CONF",[0,0,0])
d1.SetRegister("DTH2.Tune",[0,0])
d1.SetRegister("DTH2.Start",[0,1,0,0,1])
d1.SetVal("CH3_DTH_freq",[103000000])
d1.SetVal("CH3_DTH_PWR",[-10])
d1.SetRegister("DTH3.CONF",[0,0,0])
d1.SetRegister("DTH3.Tune",[0,0])
d1.SetRegister("DTH3.Start",[0,1,0,0,1])
#RCU_off(d1)
#sleep(1.0)
RCU_on(d1)
sleep(1.0)
ADC_config(d1)
#enable_ant_pwr(d1)
#dither_config(d1)
for varname in ['RCU_PCB_ID','RCU_PCB_version','RCU_PCB_number',
'RCU_PWR_good','RCU_PWR_DIGITAL_on','RCU_PWR_ANALOG_on',
'RCU_IO1_GPIO1','RCU_IO1_GPIO2','RCU_IO2_GPIO1','RCU_IO2_GPIO2',
"RCU_LED_red_on","RCU_LED_green_on"]:
data,var1=d1.GetVal(varname);
print(varname+"=",data[0])
for varname in ['RCU_TEMP','RCU_PWR_3V3','RCU_PWR_2V5','RCU_PWR_1V8']:
data,var1=d1.GetVal(varname);
print(varname+"=",data[0])
for ch in ['CH1','CH2','CH3']:
print("Channel ",ch)
for varname in ['band_select','attenuator_dB','DTH_shutdown',
'PWR_ANT_on','PWR_ANT_VOUT','PWR_ANT_VIN','PWR_ANT_IOUT',
'ADC_locked','ADC_sync','ADC_JESD','ADC_CML_level',
'DTH_freq','DTH_PWR','DTH_on',
]:
data,var1=d1.GetVal(ch+'_'+varname);
print(" ",varname+"=",data[0])
import yamlconfig as yc
import pypcc.yamlconfig as yc
import sys
if len(sys.argv)<2:
print("Usage: SCTable yamlfile")
exit()
RW={'ro':'_R','rw':'_R/_RW','variable':'_RW'}
DT={'uint8':'numpy.int64 ',
'uint32':'numpy.int64 ',
......
import yamlconfig as yc
import pypcc.yamlconfig as yc
import sys
if len(sys.argv)<2:
print("Usage: VarTable yamlfile")
exit()
RW={'ro':'R','rw':'RW','variable':'RW'}
Masked=True;
Y=yc.yamlconfig(sys.argv[1])
......
name='RECVTR_LB_TEST' #YAML config file with all register values etc
import logging
#import argparse
from pypcc.opcuaserv import opcuaserv
from pypcc.opcuaserv import i2client
from pypcc.opcuaserv import yamlreader
#from opcuaserv import pypcc2
from pypcc.i2cserv import i2cthread
#import threading
import time
import sys
import signal
from pypcc.yamlconfig import Find;
import pypcc.yamlconfig as yc
from datetime import datetime
from pypcc.opcuaserv.yamlreader import byte2var,var2byte
class i2cdirect():
def __init__(self,name):
self.conf=yc.yamlconfig(name)
self.conf.linkdevices()
self.conf.loaddrivers()
self.conf.linkdrivers()
def GetVal(self,name,N=1):
varid=self.conf.getvarid(name);
if varid is None:
logging.error("Variable "+name+" not found")
return None,None
var1=self.conf.getvars()[varid]
drv=var1.get('drivercls');
data=drv.OPCUAReadVariable(varid,var1,[])
data=data[0].data
return byte2var(var1,data),var1
def SetVal(self,name,data):
varid=self.conf.getvarid(name);
if varid is None:
logging.error("Variable "+name+" not found")
return None
var1=self.conf.getvars()[varid]
drv=var1.get('drivercls');
data2=var2byte(var1,data)
return drv.OPCUASetVariable(varid,var1,data2,[])
def SetRegister(self,regname,value):
methodid=self.conf.getmethodid("RECVTR_Init");
var1=self.conf.getmethod(methodid)
drv=var1.get('drivercls');
v1=self.conf.getdevreg(regname)
drv2=v1.get('drivercls')
mask=[]
if drv: drv.Setdevreg(v1,value,mask)
elif drv2: drv2.Setdevreg(v1,value,mask)
else: logging.warn("Driver not specified for instruction"+key)
File moved
File moved
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment