-
Paulus Kruger authoredPaulus Kruger authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
testHBA2_pico.py 1.99 KiB
import logging
import argparse
from opcuaserv import opcuaserv
from opcuaserv import i2client
from opcuaserv import yamlreader
#from opcuaserv import pypcc2
from i2cserv import i2cthread
import threading
import time
import sys
import signal
from yamlconfig import Find;
logging.basicConfig(level="DEBUG",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
if __name__ == '__main__':
RunTimer=True;
#def signal_handler(sig, frame):
# logging.warn('Stop signal received!')
# global RunTimer;
# RunTimer=False
#signal.signal(signal.SIGINT, signal_handler)
logging.info("Start I2C processes")
threads=[]
I2Cclients=[]
name='HBAT2'
RCU_I2C=i2client.i2client(name=name)
thread1=i2cthread.start(*RCU_I2C.GetInfo())
threads.append(thread1)
I2Cclients.append(RCU_I2C)
#Load yaml so that we know the variable names
RCU_conf=yamlreader.yamlreader(RCU_I2C,yamlfile=name)
#RCU_conf.CallInit()
var1=RCU_conf.getmethodid('LOAD_EEPROM');
#var1=RCU_conf.getmethodid('RCU_on');
#N=32;
#mask=[i<2 for i in range(N)];
#RCU_I2C.callmethod(var1,[])
BFaddr=[0,3];
# var1=RCU_conf.getvarid('HBAT2_DELAYS_X_G1');
var1=RCU_conf.getvarid('HBAT2_PWR_FE');
N=16;
data=[1,2,3,4]+[0]*8+[1,2,3,4];
mask=[(i//4) in BFaddr for i in range(N)];
print(mask,data)
# RCU_I2C.setvar(var1,data,mask);
RCU_I2C.readvar(var1,mask);
# var1=RCU_conf.getvarid('HBAT2_FE_I');
# RCU_I2C.readvar(var1,mask);
var1=RCU_conf.getvarid('HBAT2_DELAYS_X_G2');
N=4;
data=[2,0,0,3];
mask=[i in BFaddr for i in range(N)];
print(mask,data)
# RCU_I2C.setvar(var1,data,mask);
# RCU_I2C.readvar(var1,mask);
# var1=RCU_conf.getvarid('HBAT2_BF_V');
# RCU_I2C.readvar(var1,mask);
# var1=RCU_conf.getvarid('HBAT2_BF_I1');
# RCU_I2C.readvar(var1,mask);
time.sleep(5);
while RCU_I2C.data_waiting():
varid,data,mask=RCU_I2C.readdata()
print("Results:",RCU_conf.getvar1(varid)['name'],data[:16],mask[:16])
logging.info("Stop threads")
for i2c in I2Cclients:
i2c.stop()
for thread1 in threads:
thread1.join()