From 9e46b545d2478d3562f0031a59d99b9fb0496af3 Mon Sep 17 00:00:00 2001 From: donker <donker@astron.nl> Date: Mon, 11 Oct 2021 09:32:24 +0200 Subject: [PATCH] add functions --- test/py/Client.py | 98 +++++++++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 42 deletions(-) mode change 100644 => 100755 test/py/Client.py diff --git a/test/py/Client.py b/test/py/Client.py old mode 100644 new mode 100755 index 34a0c898..a368d4cc --- a/test/py/Client.py +++ b/test/py/Client.py @@ -32,6 +32,7 @@ import time import logging import traceback import argparse +import pprint as pp from functools import wraps from tools import * @@ -183,37 +184,58 @@ def check_get_all_R_time(obj): print(f"checked {len(info)} FPGA_*_R points") # print('\n'.join(info)) +def write_fpga_mask(obj, nodes=None, mask=None): + enable_mask = [False] * N_NODES + if mask is not None: + enable_mask = mask + elif nodes is not None: + for node in list(nodes): + enable_mask[node] = True + #print(enable_mask) + var = obj.get_child("2:TR_fpga_mask_RW") + var.set_value(ua.Variant(value=list(enable_mask), varianttype=ua.VariantType.Boolean)) + + +def get_fpga_mask(obj): + var = obj.get_child("2:TR_fpga_mask_R") + enable_mask = var.get_value() + #print(enable_mask) + return enable_mask + def plot_histogram(obj, nodes, inputs): import numpy as np import matplotlib # matplotlib.use('Agg') import matplotlib.pyplot as plt + + signed_data = True var = obj.get_child("2:FPGA_signal_input_histogram_R") data = var.get_value() np_data = np.array(data) N_HISTO = 512 - # for i in range(N_NODES): for i in list(nodes): for j in list(inputs): s1 = (i * S_PN * N_HISTO) + (j * N_HISTO) s2 = s1 + N_HISTO print("i={}, j={}, start={}, stop={}".format(i, j, s1, s2)) _data = np_data[s1:s2] - # _data = np.ma.masked_where(_data < 1, _data) - # _data = np.ma.masked_where(_data > 2**28-1, _data) + if signed_data: + _lo, _hi = np.split(_data, 2) + _data = np.concatenate((_hi, _lo)) print("used data : {}".format(_data)) print("min value : {}".format(_data.min())) print("max value : {}".format(_data.max())) print("sum values: {}".format(_data.sum())) - print("skip first and last value from array") - bin_range = range(-255, 255) - plt.bar(bin_range, height=_data[1:-1], width=1.0) - plt.xlim([-256, 255]) + # print("skip first and last value from array") + bin_range = range(-256, 256) + plt.bar(bin_range, height=_data, width=0.90) + plt.xlim([-259, 257]) # plt.ylim([0, 200e6]) plt.tight_layout() + plt.grid() plt.show() return True @@ -244,35 +266,14 @@ def plot_input_data(obj, nodes, inputs): return True - - - -def write_fpga_mask(obj, nodes=None, mask=None): - enable_mask = [False] * N_NODES - if mask is not None: - enable_mask = mask - elif nodes is not None: - for node in list(nodes): - enable_mask[node] = True - #print(enable_mask) - var = obj.get_child("2:TR_fpga_mask_RW") - var.set_value(ua.Variant(value=list(enable_mask), varianttype=ua.VariantType.Boolean)) - - -def get_fpga_mask(obj): - var = obj.get_child("2:TR_fpga_mask_R") - enable_mask = var.get_value() - #print(enable_mask) - return enable_mask - - def read_subband_weights(obj): var = obj.get_child("2:FPGA_subband_weights_R") vals = var.get_value() + pp.pprint(vals, compact=True) -def write_subband_weights(obj): - weights = [8192] * N_SUB * N_NODES * S_PN # set weights for subbands to default +def write_subband_weights(obj, weight): + weights = [weight] * N_SUB * N_NODES * S_PN # set weights for subbands to default var = obj.get_child("2:FPGA_subband_weights_RW") var.set_value(ua.Variant(value=list(weights), varianttype=ua.VariantType.UInt32)) @@ -312,18 +313,23 @@ def setup_wg_xst_mode(obj): ''' setup wg for xst stream ''' + print("wg off") + enable = [False for i in range(S_PN)] * N_NODES # enable wg + var = obj.get_child("2:FPGA_wg_enable_RW") + var.set_value(ua.Variant(value=list(enable), varianttype=ua.VariantType.Boolean)) + # Write WG configuration with phases from 0 - 360 and 1 to 1/12 amplitudes - wg_ampl = [0.01 * (1 - (i / S_PN)) for i in range(S_PN)] * N_NODES + _ampl = [wg_ampl * (1 - (i / S_PN)) for i in range(S_PN)] * N_NODES var = obj.get_child("2:FPGA_wg_amplitude_RW") - var.set_value(ua.Variant(value=list(wg_ampl), varianttype=ua.VariantType.Double)) + var.set_value(ua.Variant(value=list(_ampl), varianttype=ua.VariantType.Double)) - wg_phase = [i * (360 / S_PN) for i in range(S_PN)] * N_NODES + _phase = [i * (360 / S_PN) for i in range(S_PN)] * N_NODES var = obj.get_child("2:FPGA_wg_phase_RW") - var.set_value(ua.Variant(value=list(wg_phase), varianttype=ua.VariantType.Double)) + var.set_value(ua.Variant(value=list(_phase), varianttype=ua.VariantType.Double)) - wg_freq = [(102 / 1024) * 200e6 for i in range(S_PN)] * N_NODES + _freq = [(102 / 1024) * wg_freq for i in range(S_PN)] * N_NODES var = obj.get_child("2:FPGA_wg_frequency_RW") - var.set_value(ua.Variant(value=list(wg_freq), varianttype=ua.VariantType.Double)) + var.set_value(ua.Variant(value=list(_freq), varianttype=ua.VariantType.Double)) print("wg on") enable = [True for i in range(S_PN)] * N_NODES # enable wg @@ -458,6 +464,8 @@ if __name__ == "__main__": parser.add_argument('--info', dest='info', action='store_true', help="print point infor from server") parser.add_argument('--all', dest='all_r', action='store_true', help="recv all*_R points and show time") parser.add_argument('--wg', dest='wg_mode', type=str, choices=['OFF', 'XST', 'SIN'], help="turn wg off/on for xst") + parser.add_argument('--set-sb-weights', dest='set_sb_weights', type=int, help="set subband weights") + parser.add_argument('--get-sb-weights', dest='get_sb_weights', action='store_true', help="get subband weights") parser.add_argument('--freq', dest='wg_freq', type=float, default=1e6, help="set wg freq") parser.add_argument('--ampl', dest='wg_ampl', type=float, default=1.0, help="set wg ampl") parser.add_argument('--phase', dest='wg_phase', type=float, default=0.0, help="set wg phase") @@ -466,7 +474,7 @@ if __name__ == "__main__": parser.add_argument('--plot', dest='plot', type=str, choices=['INP', 'HIST'], help="plot selected type") parser.add_argument('--verbosity', default='INFO', help="stdout log level can be [ERROR | WARNING | INFO | DEBUG]") args = parser.parse_args() - # print(args) + print(args) node_list = arg_str_to_list(args.nodes) if args.nodes else range(16) input_list = arg_str_to_list(args.inputs) if args.inputs else range(12) @@ -529,6 +537,11 @@ if __name__ == "__main__": print("\t\tchild", i, ": ", children[i].get_data_type_as_variant_type()) print("\t\tchild", i, ": ", children[i].get_node_class()) + if args.set_sb_weights is not None: + write_subband_weights(Object, args.set_sb_weights) + if args.get_sb_weights is True: + read_subband_weights(Object) + if args.wg_mode is not None: if args.wg_mode == 'OFF': turn_wg_off(Object) @@ -570,16 +583,17 @@ if __name__ == "__main__": plot_input_data(Object, nodes=node_list, inputs=input_list) if args.plot == 'HIST': plot_histogram(Object, nodes=node_list, inputs=input_list) + - - # ############################## - # TEST AREA (for new functions) + # ############################## + # TEST AREA (for new functions) #write_subband_weights(Object) #read_subband_weights(Object) # END TEST AREA - # ############################## + # ############################## + write_fpga_mask(Object, mask=fpga_mask) # write back start mask -- GitLab