Skip to content
Snippets Groups Projects
Commit 5963237f authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-630-refactor-functions-graded-c' into 'master'

Resolve L2SS-630 "Refactor functions graded c"

Closes L2SS-630

See merge request !296
parents 2a3ef002 ac43def4
Branches
Tags
1 merge request!296Resolve L2SS-630 "Refactor functions graded c"
Showing
with 185 additions and 144 deletions
......@@ -97,14 +97,7 @@ class StatisticsClient(AsyncCommClient):
# redirect to right object. this works as long as the parameter names are unique among them.
if annotation["type"] == "statistics":
def read_function():
if annotation.get("reshape", False):
# force array into the shape of the attribute
if attribute.dim_y > 1:
return self.collector.parameters[parameter].reshape(attribute.dim_y, attribute.dim_x)
else:
return self.collector.parameters[parameter].reshape(attribute.dim_x)
else:
return self.collector.parameters[parameter]
return _process_statistics_annotation()
elif annotation["type"] == "udp":
def read_function():
return self.udp.parameters[parameter]
......@@ -118,19 +111,27 @@ class StatisticsClient(AsyncCommClient):
else:
raise ValueError(f"Unknown queue parameter requested: {parameter}")
elif annotation["type"] == "replicator":
if parameter == "clients":
def read_function():
return numpy.array(self.tcp.clients(),dtype=numpy.str)
elif parameter == "nof_bytes_sent":
def read_function():
return numpy.uint64(self.tcp.nof_bytes_sent)
elif parameter == "nof_packets_sent":
def read_function():
return numpy.uint64(self.tcp.nof_packets_sent)
elif parameter == "nof_tasks_pending":
def read_function():
return numpy.uint64(self.tcp.nof_tasks_pending)
return _process_replicator_annotation()
def _process_statistics_annotation():
if annotation.get("reshape", False):
# force array into the shape of the attribute
if attribute.dim_y > 1:
return self.collector.parameters[parameter].reshape(attribute.dim_y, attribute.dim_x)
else:
return self.collector.parameters[parameter].reshape(attribute.dim_x)
else:
return self.collector.parameters[parameter]
def _process_replicator_annotation():
parameters_dict = {"clients": numpy.array(self.tcp.clients(),dtype=numpy.str),
"nof_bytes_sent": numpy.uint64(self.tcp.nof_bytes_sent),
"nof_packets_sent": numpy.uint64(self.tcp.nof_packets_sent),
"nof_tasks_pending": numpy.uint64(self.tcp.nof_tasks_pending)}
for k,v in parameters_dict.items():
if parameter == k:
return v
raise ValueError(f"Unknown replicator parameter requested: {parameter}")
def write_function(value):
......@@ -140,3 +141,4 @@ class StatisticsClient(AsyncCommClient):
pass
return read_function, write_function
......@@ -72,14 +72,10 @@ def get_version(repo: git.Repo = None) -> str:
commit = repo.commit()
filtered_tags = [tag.name for tag in repo.tags if reg.search(tag.name)]
# Order tags from newest to oldest
tags = {tag.commit: tag for tag in reversed(repo.tags) if tag.name in filtered_tags}
tags = _order_tags(repo, filtered_tags)
# Find closest tag for commit
closest_tag = type('',(object,),{"name": 'v0.0.0'})()
for item in commit.iter_items(repo, commit):
if item.type == 'commit' and item in tags:
closest_tag = tags[item]
break
closest_tag = _find_closest_tag(commit, repo, tags)
if commit in tags:
# a tag = production ready
......@@ -94,6 +90,19 @@ def get_version(repo: git.Repo = None) -> str:
return "{}{}".format(commit_str, ".dirty" if repo.is_dirty() else "")
def _order_tags(repo, filtered_tags):
""" Helper function to order tags from newest to oldest """
return {tag.commit: tag for tag in reversed(repo.tags) if tag.name in filtered_tags}
def _find_closest_tag(commit, repo, tags):
""" Helper function to find closest tag for commit """
closest_tag = type('',(object,),{"name": 'v0.0.0'})()
for item in commit.iter_items(repo, commit):
if item.type == 'commit' and item in tags:
closest_tag = tags[item]
break
return closest_tag
# at least cache the current repo version immediately
try:
_ = get_version()
......
......@@ -88,14 +88,14 @@ class APSCT(opcua_device):
APSCT_error_R = attribute(dtype=bool, fisallowed="is_attribute_wrapper_allowed")
def read_APSCT_error_R(self):
return ((self.read_attribute("APSCTTR_I2C_error_R") > 0)
or self.alarm_val("APSCT_PCB_ID_R")
or (not self.read_attribute("APSCT_INPUT_10MHz_good_R"))
or (not self.read_attribute("APSCT_INPUT_PPS_good_R") and not self.read_attribute("APSCT_PPS_ignore_R"))
or (not self.read_attribute("APSCT_PLL_160MHz_locked_R") and not self.read_attribute("APSCT_PLL_200MHz_locked_R"))
or (self.read_attribute("APSCT_PLL_200MHz_locked_R") and self.read_attribute("APSCT_PLL_200MHz_error_R"))
or (self.read_attribute("APSCT_PLL_160MHz_locked_R") and self.read_attribute("APSCT_PLL_160MHz_error_R"))
)
errors = [self.read_attribute("APSCTTR_I2C_error_R") > 0,
self.alarm_val("APSCT_PCB_ID_R"),
not self.read_attribute("APSCT_INPUT_10MHz_good_R"),
not self.read_attribute("APSCT_INPUT_PPS_good_R") and not self.read_attribute("APSCT_PPS_ignore_R"),
not self.read_attribute("APSCT_PLL_160MHz_locked_R") and not self.read_attribute("APSCT_PLL_200MHz_locked_R"),
self.read_attribute("APSCT_PLL_200MHz_locked_R") and self.read_attribute("APSCT_PLL_200MHz_error_R"),
self.read_attribute("APSCT_PLL_160MHz_locked_R") and self.read_attribute("APSCT_PLL_160MHz_error_R")]
return any(errors)
APSCT_TEMP_error_R = attribute(dtype=bool)
APSCT_VOUT_error_R = attribute(dtype=bool)
......
......@@ -210,12 +210,7 @@ class XSTCollector(StatisticsCollector):
assert 0 <= subband_slot < self.MAX_PARALLEL_SUBBANDS, f"Selected slot {subband_slot}, but only have room for {self.MAX_PARALLEL_SUBBANDS}. Existing slots are {self.parameters['xst_subbands']}, processing subband {fields.subband_index}."
# log if we're replacing a subband we were once recording
previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot]
if previous_subband_in_slot != fields.subband_index:
previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot])
if previous_subband_timestamp.timestamp() > 0:
logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.")
self._log_replacing_subband(subband_slot, fields)
# the payload contains complex values for the block of baselines of size BLOCK_LENGTH x BLOCK_LENGTH
# starting at baseline first_baseline.
......@@ -236,6 +231,15 @@ class XSTCollector(StatisticsCollector):
self.parameters["xst_subbands"][subband_slot] = numpy.uint16(fields.subband_index)
self.parameters["xst_integration_intervals"][subband_slot] = fields.integration_interval()
def _log_replacing_subband(self, subband_slot, fields):
# log if we're replacing a subband we were once recording
previous_subband_in_slot = self.parameters["xst_subbands"][subband_slot]
if previous_subband_in_slot != fields.subband_index:
previous_subband_timestamp = datetime.datetime.fromtimestamp(self.parameters["xst_timestamps"][subband_slot])
if previous_subband_timestamp.timestamp() > 0:
logger.info(f"Stopped recording XSTs for subband {previous_subband_in_slot}. Last data for this subband was received at {previous_subband_timestamp}.")
def xst_values(self, subband_indices = None):
""" xst_blocks, but as a matrix[len(subband_indices)][MAX_INPUTS][MAX_INPUTS] of complex values.
......
......@@ -9,7 +9,8 @@ import logging
logging.basicConfig(level=logging.INFO, format = '%(asctime)s:%(levelname)s: %(message)s')
logger = logging.getLogger("statistics_writer")
def main():
def _create_parser():
"""Define the parser"""
parser = argparse.ArgumentParser(
description='Converts a stream of statistics packets into HDF5 files.')
parser.add_argument(
......@@ -43,48 +44,24 @@ def main():
'-r', '--reconnect', dest='reconnect', action='store_true', default=False,
help='Set the writer to keep trying to reconnect whenever connection '
'is lost. (default: %(default)s)')
return parser
args = parser.parse_args()
# argparse arguments
host = args.host
port = args.port
filename = args.file
output_dir = args.output_dir
interval = args.interval
mode = args.mode
decimation = args.decimation
debug = args.debug
reconnect = args.reconnect
if not filename and not host:
raise ValueError("Supply either a filename (--file) or a hostname (--host)")
if decimation < 1:
raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ")
if port == 0:
default_ports = { "SST": 5101, "XST": 5102, "BST": 5103 }
port = default_ports[mode]
if debug:
logger.setLevel(logging.DEBUG)
logger.debug("Setting loglevel to DEBUG")
# creates the TCP receiver that is given to the writer
def _create_receiver(filename, host, port):
""" creates the TCP receiver that is given to the writer """
if filename:
receiver = file_receiver(filename)
return file_receiver(filename)
elif host and port:
receiver = tcp_receiver(host, port)
return tcp_receiver(host, port)
else:
logger.fatal("Must provide either a host and port, or a file to receive input from")
sys.exit(1)
# create the writer
def _create_writer(mode, interval, output_dir, decimation):
"""Create the writer"""
if mode == "XST":
writer = parallel_xst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation)
return parallel_xst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation)
elif mode == "SST":
writer = sst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation)
return sst_hdf5_writer(new_file_time_interval=interval, file_location=output_dir, decimation_factor=decimation)
elif mode == "BST":
logger.fatal(f"BST mode not supported")
sys.exit(1)
......@@ -92,7 +69,8 @@ def main():
logger.fatal(f"Invalid mode: {mode}")
sys.exit(1)
# start looping
def _start_loop(receiver, writer, reconnect, filename):
"""Main loop"""
try:
while True:
try:
......@@ -119,3 +97,43 @@ def main():
logger.warning("Received keyboard interrupt. Stopping.")
finally:
writer.close_writer()
def main():
parser = _create_parser()
args = parser.parse_args()
# argparse arguments
host = args.host
port = args.port
filename = args.file
output_dir = args.output_dir
interval = args.interval
mode = args.mode
decimation = args.decimation
debug = args.debug
reconnect = args.reconnect
if not filename and not host:
raise ValueError("Supply either a filename (--file) or a hostname (--host)")
if decimation < 1:
raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ")
if port == 0:
default_ports = { "SST": 5101, "XST": 5102, "BST": 5103 }
port = default_ports[mode]
if debug:
logger.setLevel(logging.DEBUG)
logger.debug("Setting loglevel to DEBUG")
# creates the TCP receiver that is given to the writer
receiver = _create_receiver(filename, host, port)
# create the writer
writer = _create_writer(mode, interval, output_dir, decimation)
# start looping
_start_loop(receiver, writer, reconnect, filename)
......@@ -410,15 +410,7 @@ class TestAttributeTypes(base.TestCase):
info = "Test failure in {} {} read RW test. Expected: {}, got {}".format(test_type, dtype, expected, val)
raise Exception(info) from e
def readback_test(self, dev, dtype, test_type):
'''Test device'''
try:
with DeviceTestContext(dev, process=True) as proxy:
#initialise
proxy.initialise()
proxy.on()
def _get_result_type(self, dtype, test_type, proxy):
if test_type == "scalar":
if dtype is numpy.str:
val = str_scalar_val
......@@ -457,6 +449,21 @@ class TestAttributeTypes(base.TestCase):
# if the test isn't scalar/spectrum or image its wrong
self.assertEqual(1,2, " {} is not a valid test_type. please use either scalar, spectrum or image".format(test_type))
return result_R, result_RW, val
def readback_test(self, dev, dtype, test_type):
'''Test device'''
try:
with DeviceTestContext(dev, process=True) as proxy:
#initialise
proxy.initialise()
proxy.on()
#get result and val
result_R, result_RW, val = self._get_result_type(dtype, test_type, proxy)
if test_type == "scalar":
comparison = result_RW == val
self.assertTrue(comparison, " Value could not be handled by the atrribute_wrappers internal RW storer. attempted to write: {}".format(val))
......
......@@ -64,24 +64,19 @@ class server_imitator:
"""
if dims == self.dim_list["scalar"]:
if snmp_type is hlapi.ObjectIdentity:
check_val = "1.3.6.1.2.1.1.1.0.1"
elif snmp_type is hlapi.IpAddress:
check_val = "1.1.1.1"
elif snmp_type is str:
check_val = "1"
else:
snmp_type_dict = {hlapi.ObjectIdentity:"1.3.6.1.2.1.1.1.0.1",
hlapi.IpAddress: "1.1.1.1",
str: "1"}
check_val = 1
for k,v in snmp_type_dict.items():
if snmp_type is k: check_val = v
elif dims == self.dim_list["spectrum"]:
if snmp_type is hlapi.ObjectIdentity:
check_val = ["1.3.6.1.2.1.1.1.0.1"] * dims[0]
elif snmp_type is hlapi.IpAddress:
check_val = ["1.1.1.1"] * dims[0]
elif snmp_type is str:
check_val = ["1"] * dims[0]
else:
check_val = [1] * dims[0]
snmp_type_dict = {hlapi.ObjectIdentity:["1.3.6.1.2.1.1.1.0.1"] * dims[0],
hlapi.IpAddress: ["1.1.1.1"] * dims[0],
str: ["1"] * dims[0]}
check_val = check_val = [1] * dims[0]
for k,v in snmp_type_dict.items():
if snmp_type is k: check_val = v
else:
raise Exception("Image not yet supported :(")
......
......@@ -163,9 +163,11 @@ class TestXSTCollector(base.TestCase):
baseline_a_was_in_packet = (fields.first_baseline[0] <= baseline_a < fields.first_baseline[0] + fields.nof_signal_inputs)
baseline_b_was_in_packet = (fields.first_baseline[1] <= baseline_b < fields.first_baseline[1] + fields.nof_signal_inputs)
if baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 0:
baseline_in_pk = baseline_a_was_in_packet and baseline_b_was_in_packet
if baseline_in_pk and subband_idx == 0:
self.assertEqual(1+1j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.')
elif baseline_a_was_in_packet and baseline_b_was_in_packet and subband_idx == 1:
elif baseline_in_pk and subband_idx == 1:
self.assertEqual(2+2j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] did not end up in XST matrix.')
else:
self.assertEqual(0+0j, xst_values[subband_idx][baseline_a][baseline_b], msg=f'element [{baseline_a}][{baseline_b}] was not in packet, but was written to the XST matrix.')
......
......@@ -3,7 +3,7 @@
import logging
from tango import DeviceProxy, AttributeProxy, DevState, DevFailed
from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn, get_size_from_datatype
from tangostationcontrol.toolkit.archiver_util import get_db_config, device_fqdn, attribute_fqdn, get_size_from_datatype, filter_attribute_list
from tangostationcontrol.toolkit.archiver_configurator import get_parameters_from_attribute, get_include_attribute_list, get_exclude_attribute_list, get_global_env_parameters
import time
......@@ -253,11 +253,7 @@ class Archiver():
persisting changes between them"""
exclude = []
d = DeviceProxy(device_fqdn(device_name))
device_attrs_list = d.get_attribute_list()
# Filter out the attributes in exclude-list
exclude_list = [a.lower() for a in exclude]
attrs_list = [a.lower() for a in list(device_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison
attrs_list = filter_attribute_list(device_name, exclude)
for a in attrs_list:
attr_fullname = attribute_fqdn(f"{device_name}/{a}")
attr_proxy = AttributeProxy(attr_fullname)
......@@ -304,10 +300,7 @@ class Archiver():
persisting changes between them"""
exclude = []
d = DeviceProxy(device_fqdn(device_name))
dev_attrs_list = d.get_attribute_list()
exclude_list = [a.lower() for a in exclude]
attrs_list = [a.lower() for a in list(dev_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison
attrs_list = filter_attribute_list(device_name, exclude)
for a in attrs_list:
try:
attr_fullname = attribute_fqdn(f"{device_name}/{a}")
......
......@@ -130,3 +130,14 @@ def get_size_from_datatype(datatype:int) -> int:
return DATATYPES_SIZE_DICT[datatype]
except IndexError:
return 1
def filter_attribute_list(device_name: str, exclude:list) -> list:
"""
Filter out the attributes in exclude-list
"""
device = DeviceProxy(device_name)
device_attrs_list = device.get_attribute_list()
# Filter out excluded attributes
exclude_list = [a.lower() for a in exclude]
attrs_list = [a.lower() for a in list(device_attrs_list) if a.lower() not in exclude_list] # transform list for string comparison
return attrs_list
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment