Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
testRCU.py 2.23 KiB
import logging
import argparse
from opcuaserv import opcuaserv
from opcuaserv import i2client
from opcuaserv import yamlreader
#from opcuaserv import pypcc2
from i2cserv import i2cthread
import threading
import time
import sys
import signal
from yamlconfig import Find;

logging.basicConfig(level="INFO",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')

RunTimer=True;
#def signal_handler(sig, frame):
#    logging.warn('Stop signal received!')
#    global RunTimer; 
#    RunTimer=False
#signal.signal(signal.SIGINT, signal_handler)

logging.info("Start I2C processes")   
threads=[]
I2Cclients=[]
name='RCU'
RCU_I2C=i2client.i2client(name=name)
thread1=i2cthread.start(*RCU_I2C.GetInfo())
threads.append(thread1)
I2Cclients.append(RCU_I2C)

#Load yaml so that we know the variable names
RCU_conf=yamlreader.yamlreader(RCU_I2C,yamlfile=name)
RCU_conf.CallInit()

var1=RCU_conf.getmethodid('RCU_update');
#var1=RCU_conf.getmethodid('RCU_on');
#N=32;
#mask=[i<2 for i in range(N)];
#RCU_I2C.callmethod(var1,[])

if False:
    var1=RCU_conf.getvarid('RCU_LED0');
    N=32;
    mask=[i<8 for i in range(N)];
    data=[0]*N;
elif True:
    var1=RCU_conf.getvarid('RCU_ADC_lock');
    N=32*3;
    mask=[i<5*3 for i in range(N)];
elif False:
    var1=RCU_conf.getvarid('RCU_attenuator');
    N=32*3;
    mask=[i<6 for i in range(N)];
    data=[10]*N;
    data[:6]=[0,1,2,3,4,5]
else:
    var1=RCU_conf.getvarid('HBA_element_beamformer_delays');
    N=32*3;
    mask=[i<4*3 for i in range(N)];
    data=[1]*(N*32);
#print(var1)
#print("mask=",mask);
#print("data=",data);
#RCU_I2C.setvar(var1,data,mask);
RCU_I2C.readvar(var1,mask);

var1=RCU_conf.getvarid('RCU_temperature');
N=32;
mask=[i<2 for i in range(N)];
#RCU_I2C.readvar(var1,mask);

#data=[0]*N;
#RCU_I2C.setvar(var1,data,mask);
#def setvar(self,id1,data=[],mask=[]):

#var1=RCU_conf.getmethodid('RCU_on');
var1=RCU_conf.getmethodid('RCU_HBAT_WAIT_PPS');
N=32;
mask=[i<7 for i in range(N)];
#RCU_I2C.callmethod(var1,mask)


time.sleep(5);

while RCU_I2C.data_waiting():
    varid,data,mask=RCU_I2C.readdata()
    print("Results:",RCU_conf.getvar1(varid)['name'],data[:12],mask[:12])


logging.info("Stop threads")
for i2c in I2Cclients:
    i2c.stop()
for thread1 in threads:
    thread1.join()