Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
testHBA2_pico.py 1.99 KiB
import logging
import argparse
from opcuaserv import opcuaserv
from opcuaserv import i2client
from opcuaserv import yamlreader
#from opcuaserv import pypcc2
from i2cserv import i2cthread
import threading
import time
import sys
import signal
from yamlconfig import Find;

logging.basicConfig(level="DEBUG",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')

if __name__ == '__main__':
 RunTimer=True;
#def signal_handler(sig, frame):
#    logging.warn('Stop signal received!')
#    global RunTimer; 
#    RunTimer=False
#signal.signal(signal.SIGINT, signal_handler)

 logging.info("Start I2C processes")   
 threads=[]
 I2Cclients=[]
 name='HBAT2'
 RCU_I2C=i2client.i2client(name=name)
 thread1=i2cthread.start(*RCU_I2C.GetInfo())
 threads.append(thread1)
 I2Cclients.append(RCU_I2C)

#Load yaml so that we know the variable names
 RCU_conf=yamlreader.yamlreader(RCU_I2C,yamlfile=name)
#RCU_conf.CallInit()

 var1=RCU_conf.getmethodid('LOAD_EEPROM');
#var1=RCU_conf.getmethodid('RCU_on');
#N=32;
#mask=[i<2 for i in range(N)];
#RCU_I2C.callmethod(var1,[])
 BFaddr=[0,3];

# var1=RCU_conf.getvarid('HBAT2_DELAYS_X_G1');
 var1=RCU_conf.getvarid('HBAT2_PWR_FE');
 N=16;
 data=[1,2,3,4]+[0]*8+[1,2,3,4];
 mask=[(i//4) in BFaddr for i in range(N)];
 print(mask,data)
# RCU_I2C.setvar(var1,data,mask);
 RCU_I2C.readvar(var1,mask);

# var1=RCU_conf.getvarid('HBAT2_FE_I');
# RCU_I2C.readvar(var1,mask);

 var1=RCU_conf.getvarid('HBAT2_DELAYS_X_G2');
 N=4;
 data=[2,0,0,3];
 mask=[i in BFaddr for i in range(N)];
 print(mask,data)
# RCU_I2C.setvar(var1,data,mask);
# RCU_I2C.readvar(var1,mask);

# var1=RCU_conf.getvarid('HBAT2_BF_V');
# RCU_I2C.readvar(var1,mask);

# var1=RCU_conf.getvarid('HBAT2_BF_I1');
# RCU_I2C.readvar(var1,mask);

 time.sleep(5);

 while RCU_I2C.data_waiting():
    varid,data,mask=RCU_I2C.readdata()
    print("Results:",RCU_conf.getvar1(varid)['name'],data[:16],mask[:16])


 logging.info("Stop threads")
 for i2c in I2Cclients:
     i2c.stop()
 for thread1 in threads:
     thread1.join()