Skip to content
Snippets Groups Projects
Select Git revision
  • 9168a14dd139f4b2578b7862e7a332ba62e6e8d3
  • master default protected
  • dither_on_off_disabled
  • yocto
  • pypcc2
  • pypcc3
  • 2020-12-07-the_only_working_copy
  • v2.0
  • v1.0
  • v0.9
  • Working-RCU_ADC,ID
  • 2020-12-11-Holiday_Season_release
12 results

testHBA.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    testHBA.py 1.46 KiB
    import logging
    import argparse
    from opcuaserv import opcuaserv
    from opcuaserv import i2client
    from opcuaserv import yamlreader
    #from opcuaserv import pypcc2
    from i2cserv import i2cthread
    import threading
    import time
    import sys
    import signal
    from yamlconfig import Find;
    
    logging.basicConfig(level="DEBUG",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
    
    RunTimer=True;
    #def signal_handler(sig, frame):
    #    logging.warn('Stop signal received!')
    #    global RunTimer; 
    #    RunTimer=False
    #signal.signal(signal.SIGINT, signal_handler)
    
    logging.info("Start I2C processes")   
    threads=[]
    I2Cclients=[]
    name='RCU'
    RCU_I2C=i2client.i2client(name=name)
    thread1=i2cthread.start(*RCU_I2C.GetInfo())
    threads.append(thread1)
    I2Cclients.append(RCU_I2C)
    
    #Load yaml so that we know the variable names
    RCU_conf=yamlreader.yamlreader(RCU_I2C,yamlfile=name)
    #RCU_conf.CallInit()
    
    var1=RCU_conf.getmethodid('RCU_update');
    #var1=RCU_conf.getmethodid('RCU_on');
    #N=32;
    #mask=[i<2 for i in range(N)];
    #RCU_I2C.callmethod(var1,[])
    
    var1=RCU_conf.getvarid('HBA_element_beamformer_delays');
    N=32*3;
    data=list(range(32))*N;
    mask=[i<1*3 for i in range(N)];
    RCU_I2C.setvar(var1,data,mask);
    #RCU_I2C.readvar(var1,mask);
    
    
    time.sleep(5);
    
    while RCU_I2C.data_waiting():
        varid,data,mask=RCU_I2C.readdata()
        print("Results:",RCU_conf.getvar1(varid)['name'],data[:12],mask[:12])
    
    
    logging.info("Stop threads")
    for i2c in I2Cclients:
        i2c.stop()
    for thread1 in threads:
        thread1.join()