diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics_client.py b/tangostationcontrol/tangostationcontrol/clients/statistics_client.py index 08d2889d0b61c5bd9c9ce5468e35ac6bf82fdffb..6981c31a5437b96af04171981f5bf3d6562963d5 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics_client.py @@ -97,14 +97,7 @@ class StatisticsClient(AsyncCommClient): # redirect to right object. this works as long as the parameter names are unique among them. if annotation["type"] == "statistics": def read_function(): - if annotation.get("reshape", False): - # force array into the shape of the attribute - if attribute.dim_y > 1: - return self.collector.parameters[parameter].reshape(attribute.dim_y, attribute.dim_x) - else: - return self.collector.parameters[parameter].reshape(attribute.dim_x) - else: - return self.collector.parameters[parameter] + return _process_statistics_annotation() elif annotation["type"] == "udp": def read_function(): return self.udp.parameters[parameter] @@ -118,25 +111,34 @@ class StatisticsClient(AsyncCommClient): else: raise ValueError(f"Unknown queue parameter requested: {parameter}") elif annotation["type"] == "replicator": - if parameter == "clients": - def read_function(): - return numpy.array(self.tcp.clients(),dtype=numpy.str) - elif parameter == "nof_bytes_sent": - def read_function(): - return numpy.uint64(self.tcp.nof_bytes_sent) - elif parameter == "nof_packets_sent": - def read_function(): - return numpy.uint64(self.tcp.nof_packets_sent) - elif parameter == "nof_tasks_pending": - def read_function(): - return numpy.uint64(self.tcp.nof_tasks_pending) - else: - raise ValueError(f"Unknown replicator parameter requested: {parameter}") + def read_function(): + return _process_replicator_annotation() + def _process_statistics_annotation(): + if annotation.get("reshape", False): + # force array into the shape of the attribute + if attribute.dim_y > 1: + return self.collector.parameters[parameter].reshape(attribute.dim_y, attribute.dim_x) + else: + return self.collector.parameters[parameter].reshape(attribute.dim_x) + else: + return self.collector.parameters[parameter] + + def _process_replicator_annotation(): + parameters_dict = {"clients": numpy.array(self.tcp.clients(),dtype=numpy.str), + "nof_bytes_sent": numpy.uint64(self.tcp.nof_bytes_sent), + "nof_packets_sent": numpy.uint64(self.tcp.nof_packets_sent), + "nof_tasks_pending": numpy.uint64(self.tcp.nof_tasks_pending)} + for k,v in parameters_dict.items(): + if parameter == k: + return v + raise ValueError(f"Unknown replicator parameter requested: {parameter}") + def write_function(value): """ Not used here """ pass - return read_function, write_function + return read_function, write_function + diff --git a/tangostationcontrol/tangostationcontrol/common/lofar_version.py b/tangostationcontrol/tangostationcontrol/common/lofar_version.py index be923cf9f1958555e4a397bfa5bffa33daaf3580..0a70002ed6de1fffdfc6059c6cfe2fff0beb630c 100644 --- a/tangostationcontrol/tangostationcontrol/common/lofar_version.py +++ b/tangostationcontrol/tangostationcontrol/common/lofar_version.py @@ -72,14 +72,10 @@ def get_version(repo: git.Repo = None) -> str: commit = repo.commit() filtered_tags = [tag.name for tag in repo.tags if reg.search(tag.name)] # Order tags from newest to oldest - tags = {tag.commit: tag for tag in reversed(repo.tags) if tag.name in filtered_tags} + tags = _order_tags(repo, filtered_tags) # Find closest tag for commit - closest_tag = type('',(object,),{"name": 'v0.0.0'})() - for item in commit.iter_items(repo, commit): - if item.type == 'commit' and item in tags: - closest_tag = tags[item] - break + closest_tag = _find_closest_tag(commit, repo, tags) if commit in tags: # a tag = production ready @@ -94,6 +90,19 @@ def get_version(repo: git.Repo = None) -> str: return "{}{}".format(commit_str, ".dirty" if repo.is_dirty() else "") +def _order_tags(repo, filtered_tags): + """ Helper function to order tags from newest to oldest """ + return {tag.commit: tag for tag in reversed(repo.tags) if tag.name in filtered_tags} + +def _find_closest_tag(commit, repo, tags): + """ Helper function to find closest tag for commit """ + closest_tag = type('',(object,),{"name": 'v0.0.0'})() + for item in commit.iter_items(repo, commit): + if item.type == 'commit' and item in tags: + closest_tag = tags[item] + break + return closest_tag + # at least cache the current repo version immediately try: _ = get_version() diff --git a/tangostationcontrol/tangostationcontrol/devices/apsct.py b/tangostationcontrol/tangostationcontrol/devices/apsct.py index 75ef8f612b3bfd44735444fba95d8927226fe55a..58c91bc5036e4de799f2b42c4cc1cbd5cd04bb22 100644 --- a/tangostationcontrol/tangostationcontrol/devices/apsct.py +++ b/tangostationcontrol/tangostationcontrol/devices/apsct.py @@ -88,14 +88,14 @@ class APSCT(opcua_device): APSCT_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed") def read_APSCT_error_R(self): - return ((self.read_attribute("APSCTTR_I2C_error_R") > 0) - or self.alarm_val("APSCT_PCB_ID_R") - or (not self.read_attribute("APSCT_INPUT_10MHz_good_R")) - or (not self.read_attribute("APSCT_INPUT_PPS_good_R") and not self.read_attribute("APSCT_PPS_ignore_R")) - or (not self.read_attribute("APSCT_PLL_160MHz_locked_R") and not self.read_attribute("APSCT_PLL_200MHz_locked_R")) - or (self.read_attribute("APSCT_PLL_200MHz_locked_R") and self.read_attribute("APSCT_PLL_200MHz_error_R")) - or (self.read_attribute("APSCT_PLL_160MHz_locked_R") and self.read_attribute("APSCT_PLL_160MHz_error_R")) - ) + errors = [self.read_attribute("APSCTTR_I2C_error_R") > 0, + self.alarm_val("APSCT_PCB_ID_R"), + not self.read_attribute("APSCT_INPUT_10MHz_good_R"), + not self.read_attribute("APSCT_INPUT_PPS_good_R") and not self.read_attribute("APSCT_PPS_ignore_R"), + not self.read_attribute("APSCT_PLL_160MHz_locked_R") and not self.read_attribute("APSCT_PLL_200MHz_locked_R"), + self.read_attribute("APSCT_PLL_200MHz_locked_R") and self.read_attribute("APSCT_PLL_200MHz_error_R"), + self.read_attribute("APSCT_PLL_160MHz_locked_R") and self.read_attribute("APSCT_PLL_160MHz_error_R")] + return any(errors) APSCT_TEMP_error_R = attribute(dtype=bool) APSCT_VOUT_error_R = attribute(dtype=bool) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py index ca8fde9127b48f673bba745a74c989d260edb0d9..784750cd0b01a2d7e144858af9f4eca39c6aee14 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py @@ -210,12 +210,7 @@ class XSTCollector(StatisticsCollector): assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, f"Selected slot {subband_slot}, but only have room for {self.MAX_PARALLEL_SUBBANDS}. Existing slots are {self.parameters['xst_subbands']}, processing subband {fields.subband_index}." # log if we're replacing a subband we were once recording - previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot] - if previous_subband_in_slot != fields.subband_index: - previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot]) - - if previous_subband_timestamp.timestamp() > 0: - logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.") + self._log_replacing_subband(subband_slot, fields) # the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH # starting at baseline first_baseline. @@ -236,6 +231,15 @@ class XSTCollector(StatisticsCollector): self.parameters["xst_subbands"][subband_slot] = numpy.uint16(fields.subband_index) self.parameters["xst_integration_intervals"][subband_slot] = fields.integration_interval() + def _log_replacing_subband(self, subband_slot, fields): + # log if we're replacing a subband we were once recording + previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot] + if previous_subband_in_slot != fields.subband_index: + previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot]) + + if previous_subband_timestamp.timestamp() > 0: + logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.") + def xst_values(self, subband_indices = None): """ xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] of complex values. diff --git a/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py b/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py index 52747fff71d436d62bcbe40844aa0a3a45a2ab25..7247e685d9a86db742a4b7e104c21cbc93a0ec7d 100644 --- a/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py +++ b/tangostationcontrol/tangostationcontrol/statistics_writer/statistics_writer.py @@ -9,7 +9,8 @@ import logging logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s') logger = logging.getLogger("statistics_writer") -def main(): +def _create_parser(): + """Define the parser""" parser = argparse.ArgumentParser( description='Converts a stream of statistics packets into HDF5 files.') parser.add_argument( @@ -43,48 +44,24 @@ def main(): '-r', '--reconnect', dest='reconnect', action='store_true', default=False, help='Set the writer to keep trying to reconnect whenever connection ' 'is lost. (default: %(default)s)') + return parser - args = parser.parse_args() - - # argparse arguments - host = args.host - port = args.port - filename = args.file - output_dir = args.output_dir - interval = args.interval - mode = args.mode - decimation = args.decimation - debug = args.debug - reconnect = args.reconnect - - if not filename and not host: - raise ValueError("Supply either a filename (--file) or a hostname (--host)") - - if decimation < 1: - raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ") - - if port == 0: - default_ports = { "SST": 5101, "XST": 5102, "BST": 5103 } - port = default_ports[mode] - - if debug: - logger.setLevel(logging.DEBUG) - logger.debug("Setting loglevel to DEBUG") - - # creates the TCP receiver that is given to the writer +def _create_receiver(filename, host, port): + """ creates the TCP receiver that is given to the writer """ if filename: - receiver = file_receiver(filename) + return file_receiver(filename) elif host and port: - receiver = tcp_receiver(host, port) + return tcp_receiver(host, port) else: logger.fatal("Must provide either a host and port, or a file to receive input from") sys.exit(1) - # create the writer +def _create_writer(mode, interval, output_dir, decimation): + """Create the writer""" if mode == "XST": - writer = parallel_xst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) + return parallel_xst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) elif mode == "SST": - writer = sst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) + return sst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation) elif mode == "BST": logger.fatal(f"BST mode not supported") sys.exit(1) @@ -92,7 +69,8 @@ def main(): logger.fatal(f"Invalid mode: {mode}") sys.exit(1) - # start looping +def _start_loop(receiver, writer, reconnect, filename): + """Main loop""" try: while True: try: @@ -119,3 +97,43 @@ def main(): logger.warning("Received keyboard interrupt. Stopping.") finally: writer.close_writer() + +def main(): + + parser = _create_parser() + + args = parser.parse_args() + + # argparse arguments + host = args.host + port = args.port + filename = args.file + output_dir = args.output_dir + interval = args.interval + mode = args.mode + decimation = args.decimation + debug = args.debug + reconnect = args.reconnect + + if not filename and not host: + raise ValueError("Supply either a filename (--file) or a hostname (--host)") + + if decimation < 1: + raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ") + + if port == 0: + default_ports = { "SST": 5101, "XST": 5102, "BST": 5103 } + port = default_ports[mode] + + if debug: + logger.setLevel(logging.DEBUG) + logger.debug("Setting loglevel to DEBUG") + + # creates the TCP receiver that is given to the writer + receiver = _create_receiver(filename, host, port) + + # create the writer + writer = _create_writer(mode, interval, output_dir, decimation) + + # start looping + _start_loop(receiver, writer, reconnect, filename) diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py b/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py index bffc23aaa24b7ed0269dffa4fec3afb5363b7ba9..c55e9d1c09fcf4d78d52b520d0d5b8b7ee96e60e 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py @@ -410,6 +410,48 @@ class TestAttributeTypes(base.TestCase): info = "Test failure in {} {} read RW test. Expected: {}, got {}".format(test_type, dtype, expected, val) raise Exception(info) from e + def _get_result_type(self, dtype, test_type, proxy): + if test_type == "scalar": + if dtype is numpy.str: + val = str_scalar_val + else: + val = dtype(1) + proxy.scalar_RW = val + result_R = proxy.scalar_R + result_RW = proxy.scalar_RW + elif test_type == "spectrum": + if dtype is numpy.str: + val = str_spectrum_val + else: + val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1) + proxy.spectrum_RW = val + result_R = proxy.spectrum_R + result_RW = proxy.spectrum_RW + elif test_type == "image": + if dtype is numpy.str: + val = str_image_val + else: + val = numpy.full(image_dims, dtype=dtype, fill_value=1) + + # info += " write value: {}".format(val) + proxy.image_RW = val + result_R = proxy.image_R + result_RW = proxy.image_RW + + if dtype != numpy.str: + self.assertEqual(result_R.shape, image_dims, "not the correct dimensions") + + result_R = result_R.reshape(-1) + result_RW = result_RW.reshape(-1) + val = val.reshape(-1) + + else: + # if the test isn't scalar/spectrum or image its wrong + self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type)) + + return result_R, result_RW, val + + def readback_test(self, dev, dtype, test_type): '''Test device''' try: @@ -419,43 +461,8 @@ class TestAttributeTypes(base.TestCase): proxy.initialise() proxy.on() - if test_type == "scalar": - if dtype is numpy.str: - val = str_scalar_val - else: - val = dtype(1) - proxy.scalar_RW = val - result_R = proxy.scalar_R - result_RW = proxy.scalar_RW - elif test_type == "spectrum": - if dtype is numpy.str: - val = str_spectrum_val - else: - val = numpy.full(spectrum_dims, dtype=dtype, fill_value=1) - proxy.spectrum_RW = val - result_R = proxy.spectrum_R - result_RW = proxy.spectrum_RW - elif test_type == "image": - if dtype is numpy.str: - val = str_image_val - else: - val = numpy.full(image_dims, dtype=dtype, fill_value=1) - - # info += " write value: {}".format(val) - proxy.image_RW = val - result_R = proxy.image_R - result_RW = proxy.image_RW - - if dtype != numpy.str: - self.assertEqual(result_R.shape, image_dims, "not the correct dimensions") - - result_R = result_R.reshape(-1) - result_RW = result_RW.reshape(-1) - val = val.reshape(-1) - - else: - # if the test isn't scalar/spectrum or image its wrong - self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type)) + #get result and val + result_R, result_RW, val = self._get_result_type(dtype, test_type, proxy) if test_type == "scalar": comparison = result_RW == val diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py index f061e38cedc7cefefeb72976454edecd7b647259..4d3a5c22ab3b7ac61ccbdfd78671f0c9ed4cf56a 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_snmp_client.py @@ -64,24 +64,19 @@ class server_imitator: """ if dims == self.dim_list["scalar"]: - if snmp_type is hlapi.ObjectIdentity: - check_val = "1.3.6.1.2.1.1.1.0.1" - elif snmp_type is hlapi.IpAddress: - check_val = "1.1.1.1" - elif snmp_type is str: - check_val = "1" - else: - check_val = 1 + snmp_type_dict = {hlapi.ObjectIdentity:"1.3.6.1.2.1.1.1.0.1", + hlapi.IpAddress: "1.1.1.1", + str: "1"} + check_val = 1 + for k,v in snmp_type_dict.items(): + if snmp_type is k: check_val = v elif dims == self.dim_list["spectrum"]: - if snmp_type is hlapi.ObjectIdentity: - check_val = ["1.3.6.1.2.1.1.1.0.1"] * dims[0] - - elif snmp_type is hlapi.IpAddress: - check_val = ["1.1.1.1"] * dims[0] - elif snmp_type is str: - check_val = ["1"] * dims[0] - else: - check_val = [1] * dims[0] + snmp_type_dict = {hlapi.ObjectIdentity:["1.3.6.1.2.1.1.1.0.1"] * dims[0], + hlapi.IpAddress: ["1.1.1.1"] * dims[0], + str: ["1"] * dims[0]} + check_val = check_val = [1] * dims[0] + for k,v in snmp_type_dict.items(): + if snmp_type is k: check_val = v else: raise Exception("Image not yet supported :(") diff --git a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py b/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py index 4ddeab05e63baa29c791a61ec428effa8e2f47a2..852d8bdc5bc92a940ab6395da97f81f1592dd1a0 100644 --- a/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/test/devices/test_statistics_collector.py @@ -163,9 +163,11 @@ class TestXSTCollector(base.TestCase): baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs) baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs) - if baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 0: + baseline_in_pk = baseline_a_was_in_packet and baseline_b_was_in_packet + + if baseline_in_pk and subband_idx == 0: self.assertEqual(1+1j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') - elif baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 1: + elif baseline_in_pk and subband_idx == 1: self.assertEqual(2+2j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.') else: self.assertEqual(0+0j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.') diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py index 2e178083d2428d1b0706e283ded68e7ee0c5ea46..ba56f228c3df54c25d7897e05bc01f42ef89e173 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver.py @@ -3,7 +3,7 @@ import logging from tango import DeviceProxy, AttributeProxy, DevState, DevFailed -from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn, get_size_from_datatype +from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn, get_size_from_datatype, filter_attribute_list from tangostationcontrol.toolkit.archiver_configurator import get_parameters_from_attribute, get_include_attribute_list, get_exclude_attribute_list, get_global_env_parameters import time @@ -253,11 +253,7 @@ class Archiver(): persisting changes between them""" exclude = [] - d = DeviceProxy(device_fqdn(device_name)) - device_attrs_list = d.get_attribute_list() - # Filter out the attributes in exclude-list - exclude_list = [a.lower() for a in exclude] - attrs_list = [a.lower() for a in list(device_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison + attrs_list = filter_attribute_list(device_name, exclude) for a in attrs_list: attr_fullname = attribute_fqdn(f"{device_name}/{a}") attr_proxy = AttributeProxy(attr_fullname) @@ -304,10 +300,7 @@ class Archiver(): persisting changes between them""" exclude = [] - d = DeviceProxy(device_fqdn(device_name)) - dev_attrs_list = d.get_attribute_list() - exclude_list = [a.lower() for a in exclude] - attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison + attrs_list = filter_attribute_list(device_name, exclude) for a in attrs_list: try: attr_fullname = attribute_fqdn(f"{device_name}/{a}") diff --git a/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py b/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py index c797c5267a517dbe00c566297b94cc176f9a9574..f991dccb5f9e0765c3059a7e3903d5d7c5bcb7c6 100644 --- a/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py +++ b/tangostationcontrol/tangostationcontrol/toolkit/archiver_util.py @@ -130,3 +130,14 @@ def get_size_from_datatype(datatype:int) -> int: return DATATYPES_SIZE_DICT[datatype] except IndexError: return 1 + +def filter_attribute_list(device_name: str, exclude:list) -> list: + """ + Filter out the attributes in exclude-list + """ + device = DeviceProxy(device_name) + device_attrs_list = device.get_attribute_list() + # Filter out excluded attributes + exclude_list = [a.lower() for a in exclude] + attrs_list = [a.lower() for a in list(device_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison + return attrs_list