diff --git a/bin/start-ds.sh b/bin/start-ds.sh index 9ae8ae900fdfe9bb57940afa24df6fe9776f8b10..8a5baba501cf7c48dd53a7d3b92196874020d3d6 100755 --- a/bin/start-ds.sh +++ b/bin/start-ds.sh @@ -32,7 +32,7 @@ if [[ $TANGOSTATIONCONTROL ]]; then else # Install the package, exit 1 if it fails cd tangostationcontrol || exit 1 - pip install ./ + pip install --upgrade --force-reinstall ./ fi # Return to the stored the directory, this preserves the working_dir argument in diff --git a/tangostationcontrol/tangostationcontrol/clients/README.md b/tangostationcontrol/tangostationcontrol/clients/README.md index c7085de860b20ade0e8d759f3bbd72d211e452df..9a0c749575c32ad6d582427f154562d97a7fc8a6 100644 --- a/tangostationcontrol/tangostationcontrol/clients/README.md +++ b/tangostationcontrol/tangostationcontrol/clients/README.md @@ -16,11 +16,8 @@ Inside lofar/tango/tangostationcontrol/tangostationcontrol/devices/lofar_device. Init_value: Initialisation value. If none is presents, fills the attribute with zero data of the correct type and dimension **kwargs: any other non attribute_wrapper arguments. NOTE: the `__init__` function contains wrappers for the unassigned read/write functions. In previous versions the read function of an RW attribute used to return the last value it had written *to* the client instead of the value from the client. This has since been changed. - -`initial_value`: - This function fills the attribute with a default value of all zero's with the proper dimensions and type if None is specified. -`Set_comm_client`: +`set_comm_client`: This function can be called to assign a read and write function to the attribute using the data accessor or client given to this function. The attribute wrapper assumes the client is running and has a function called ‘setup_attribute’ which will provide it with a valid read/write function. `async_set_comm_client`: @@ -29,14 +26,6 @@ This function can be called to assign a read and write function to the attribute `set_pass_func`: Can be called to assign a 'fake' read/write function. This is useful as a fallback option while development is still ongoing. -`_decorate_read_function`: -Wrap an attribute read function to annotate its exceptions with our comms_annotation to be able to identify which attribute triggered the error. - - - -`_decorate_write_function`: -Wrap an attribute write function to annotate its exceptions with our comms_annotation to be able to identify which attribute triggered the error. - diff --git a/tangostationcontrol/tangostationcontrol/clients/attribute_wrapper.py b/tangostationcontrol/tangostationcontrol/clients/attribute_wrapper.py index 5d9f5bdc8cd123e0ffd28c962548fc94bdf82572..9375b6d5d2b75c5ebfd66b3cab2318e586227b65 100644 --- a/tangostationcontrol/tangostationcontrol/clients/attribute_wrapper.py +++ b/tangostationcontrol/tangostationcontrol/clients/attribute_wrapper.py @@ -1,6 +1,5 @@ from tango.server import attribute from tango import AttrWriteType -import numpy from tangostationcontrol.devices.device_decorators import fault_on_error import logging @@ -8,12 +7,46 @@ import logging logger = logging.getLogger() +class attribute_io(object): + """ Holds the I/O functionality for an attribute for a specific device. """ + + def __init__(self, device, attribute_wrapper): + # Link to the associated device + self.device = device + + # Link to the associated attribute wrapper + self.attribute_wrapper = attribute_wrapper + + # Link to last (written) value + self.cached_value = None + + # Specific read and write functions for this attribute on this device + self.read_function = lambda: None + self.write_function = lambda value: None + + def cached_read_function(self): + """ Return the last (written) value, if available. Otherwise, read + from the device. """ + + if self.cached_value is not None: + return self.cached_value + + self.cached_value = self.read_function() + return self.cached_value + + def cached_write_function(self, value): + """ Writes the given value to the device, and updates the cache. """ + + self.write_function(value) + self.cached_value = value + + class attribute_wrapper(attribute): """ Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes """ - def __init__(self, comms_id=None, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs): + def __init__(self, comms_id=None, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, **kwargs): """ wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract managing the communications interface. @@ -34,43 +67,43 @@ class attribute_wrapper(attribute): self.comms_id = comms_id # store data that can be used to identify the comms interface to use. not used by the wrapper itself self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself - self.init_value = init_value - is_scalar = dims == (1,) - - self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64") + self.datatype = datatype - # check if not scalar - if is_scalar: - # scalar, just set the single dimension. + if dims == (1,): # Tango defines a scalar as having dimensions (1,0), see https://pytango.readthedocs.io/en/stable/server_api/attribute.html max_dim_x = 1 max_dim_y = 0 - else: - # get first dimension + dtype = datatype + elif len(dims) == 1: max_dim_x = dims[0] - - # single dimension/spectrum requires the datatype to be wrapped in a tuple - datatype = (datatype,) - - if len(dims) == 2: - # get second dimension - max_dim_y = dims[1] - # wrap the datatype tuple in another tuple for 2d arrays/images - datatype = (datatype,) - else: - max_dim_y = 0 + max_dim_y = 0 + dtype = (datatype,) + elif len(dims) == 2: + max_dim_x = dims[0] + max_dim_y = dims[1] + dtype = ((datatype,),) + else: + raise ValueError(f"Only up to 2D arrays are supported. Supplied dimensions: {dims}") if access == AttrWriteType.READ_WRITE: """ If the attribute is of READ_WRITE type, assign the write and read functions to it""" + # we return the last written value, as we are the only ones in control, + # and the hardware does not necessarily return what we've written + # (see L2SDP-725). + @fault_on_error() def write_func_wrapper(device, value): """ write_func_wrapper writes a value to this attribute """ - self.write_function(value) - device.value_dict[self] = value + try: + io = self.get_attribute_io(device) + + return io.cached_write_function(value) + except Exception as e: + raise e.__class__(f"Could not write attribute {comms_annotation}") from e @fault_on_error() def read_func_wrapper(device): @@ -78,16 +111,9 @@ class attribute_wrapper(attribute): read_func_wrapper reads the attribute value, stores it and returns it" """ try: - # we return the last written value, as we are the only ones in control, - # and the hardware does not necessarily return what we've written - # (see L2SDP-725). - if self in device.value_dict: - return device.value_dict[self] - - # value was never written, so obtain current one and cache that instead - value = self.read_function() - device.value_dict[self] = value - return value + io = self.get_attribute_io(device) + + return io.cached_read_function() except Exception as e: raise e.__class__(f"Could not read attribute {comms_annotation}") from e @@ -102,7 +128,9 @@ class attribute_wrapper(attribute): read_func_wrapper reads the attribute value, stores it and returns it" """ try: - return self.read_function() + io = self.get_attribute_io(device) + + return io.read_function() except Exception as e: raise e.__class__(f"Could not read attribute {comms_annotation}") from e @@ -114,60 +142,20 @@ class attribute_wrapper(attribute): # # NOTE: fisallowed=<callable> does not work: https://gitlab.com/tango-controls/pytango/-/issues/435 # So we have to use fisallowed=<str> here, which causes the function device.<str> to be called. - super().__init__(dtype=datatype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, fisallowed="is_attribute_access_allowed", **kwargs) + super().__init__(dtype=dtype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, fisallowed="is_attribute_access_allowed", **kwargs) - def initial_value(self): + def get_attribute_io(self, device): """ - returns a numpy array filled with zeroes fit to the size of the attribute. Or if init_value is not the default None, return that value + returns the attribute I/O functions from a certain device, or registers it if not present """ - if self.init_value is not None: - return self.init_value - - if self.dim_y > 1: - dims = (self.dim_x, self.dim_y) - else: - dims = (self.dim_x,) - - # x and y are swapped for numpy and Tango. to maintain tango conventions, x and y are swapped for numpy - if len(dims) == 2: - numpy_dims = tuple((dims[1], dims[0])) - else: - numpy_dims = dims - - if self.dim_x == 1: - if self.numpy_type == str: - value = '' - else: - value = self.numpy_type(0) - else: - value = numpy.zeros(numpy_dims, dtype=self.numpy_type) - - return value - - def _decorate_read_function(self, read_attr_func): - """ Wrap an attribute read function to annotate its exceptions with our - comms_annotation to be able to identify which attribute triggered the error. """ - def wrapper(): - try: - return read_attr_func() - except Exception as e: - raise Exception(f"Failed to read attribute {self.comms_annotation}") from e - - return wrapper - - def _decorate_write_function(self, write_attr_func): - """ Wrap an attribute write function to annotate its exceptions with our - comms_annotation to be able to identify which attribute triggered the error. """ - def wrapper(value): - try: - write_attr_func(value) - except Exception as e: - raise Exception(f"Failed to write attribute {self.comms_annotation}") from e - - return wrapper + try: + return device._attribute_wrapper_io[self] + except KeyError: + device._attribute_wrapper_io[self] = attribute_io(device, self) + return device._attribute_wrapper_io[self] - def set_comm_client(self, client): + def set_comm_client(self, device, client): """ takes a communications client as input arguments This client should be of a class containing a "get_mapping" function and return a read and write function that the wrapper will use to get/set data. @@ -175,29 +163,28 @@ class attribute_wrapper(attribute): try: read_attr_func, write_attr_func = client.setup_attribute(self.comms_annotation, self) - self.read_function = self._decorate_read_function(read_attr_func) - self.write_function = self._decorate_write_function(write_attr_func) + io = self.get_attribute_io(device) + io.read_function = read_attr_func + io.write_function = write_attr_func except Exception as e: raise Exception(f"Exception while setting {client.__class__.__name__} attribute with annotation: '{self.comms_annotation}'") from e - async def async_set_comm_client(self, client): + async def async_set_comm_client(self, device, client): """ Asynchronous version of set_comm_client. """ try: read_attr_func, write_attr_func = await client.setup_attribute(self.comms_annotation, self) - self.read_function = self._decorate_read_function(read_attr_func) - self.write_function = self._decorate_write_function(write_attr_func) + io = self.get_attribute_io(device) + io.read_function = read_attr_func + io.write_function = write_attr_func except Exception as e: raise Exception(f"Exception while setting {client.__class__.__name__} attribute with annotation: '{self.comms_annotation}'") from e - - def set_pass_func(self): - def pass_func(value=None): - pass - + def set_pass_func(self, device): logger.debug("using pass function for attribute with annotation: {}".format(self.comms_annotation)) - self.read_function = pass_func - self.write_function = pass_func + io = self.get_attribute_io(device) + io.read_function = lambda: None + io.write_function = lambda value: None diff --git a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py index 448b4fc5f1754cb54273140aa240256f4d6ff58d..53211e52fea7472390d82c450543eb8351ca165f 100644 --- a/tangostationcontrol/tangostationcontrol/clients/opcua_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/opcua_client.py @@ -175,7 +175,7 @@ class OPCUAConnection(AsyncCommClient): # get all the necessary data to set up the read/write functions from the attribute_wrapper dim_x = attribute.dim_x dim_y = attribute.dim_y - ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type + ua_type = numpy_to_OPCua_dict[attribute.datatype] # convert the numpy type to a corresponding UA type # configure and return the read/write functions prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type) diff --git a/tangostationcontrol/tangostationcontrol/clients/snmp_client.py b/tangostationcontrol/tangostationcontrol/clients/snmp_client.py index 4776bd15f922714739c05f8fbb316606e923877f..f75b6bcf4c4f8a7fd26fd85b6d0a4973a4d8d0dc 100644 --- a/tangostationcontrol/tangostationcontrol/clients/snmp_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/snmp_client.py @@ -94,7 +94,7 @@ class SNMP_client(CommClient): dim_x = attribute.dim_x dim_y = attribute.dim_y - dtype = attribute.numpy_type + dtype = attribute.datatype return dim_x, dim_y, dtype diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics/__init__.py b/tangostationcontrol/tangostationcontrol/clients/statistics/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics_client.py b/tangostationcontrol/tangostationcontrol/clients/statistics/client.py similarity index 95% rename from tangostationcontrol/tangostationcontrol/clients/statistics_client.py rename to tangostationcontrol/tangostationcontrol/clients/statistics/client.py index 017ad5e70c34752a8ff815b42105d73c720d8dbf..869cb78dab4b013b7cb6634e517f4dad73f187c5 100644 --- a/tangostationcontrol/tangostationcontrol/clients/statistics_client.py +++ b/tangostationcontrol/tangostationcontrol/clients/statistics/client.py @@ -2,11 +2,10 @@ from queue import Queue import logging import numpy -from .comms_client import AsyncCommClient -from .tcp_replicator import TCPReplicator -from .udp_receiver import UDPReceiver - -from tangostationcontrol.devices.sdp.statistics_collector import StatisticsConsumer +from tangostationcontrol.clients.comms_client import AsyncCommClient +from tangostationcontrol.clients.tcp_replicator import TCPReplicator +from tangostationcontrol.clients.udp_receiver import UDPReceiver +from tangostationcontrol.clients.statistics.consumer import StatisticsConsumer logger = logging.getLogger() diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics_client_thread.py b/tangostationcontrol/tangostationcontrol/clients/statistics/client_thread.py similarity index 100% rename from tangostationcontrol/tangostationcontrol/clients/statistics_client_thread.py rename to tangostationcontrol/tangostationcontrol/clients/statistics/client_thread.py diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py new file mode 100644 index 0000000000000000000000000000000000000000..066839a293f61aa65b0da741800b8cdac2e31f34 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py @@ -0,0 +1,70 @@ +import logging +from threading import Thread +from queue import Queue + +from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread +from tangostationcontrol.devices.sdp.statistics_collector import StatisticsCollector + +logger = logging.getLogger() + + +class StatisticsConsumer(Thread, StatisticsClientThread): + """ Base class to process statistics packets from a queue, asynchronously. """ + + # Maximum time to wait for the Thread to get unstuck, if we want to stop + DISCONNECT_TIMEOUT = 10.0 + + # No default options required, for now? + _DEFAULT_OPTIONS = {} + + def __init__(self, queue: Queue, collector: StatisticsCollector): + self.queue = queue + self.collector = collector + self.last_packet = None + + super().__init__() + self.start() + + @property + def _options(self) -> dict: + return StatisticsConsumer._DEFAULT_OPTIONS + + def run(self): + logger.info("Starting statistics thread") + + while True: + self.last_packet = self.queue.get() + + # This is the exception/slow path, but python doesn't allow us to optimise that + if self.last_packet is None: + # None is the magic marker to stop processing + break + + try: + self.collector.process_packet(self.last_packet) + except ValueError as e: + logger.exception("Could not parse statistics packet") + + # continue processing + + logger.info("Stopped statistics thread") + + def join(self, timeout=0): + # insert magic marker + self.queue.put(None) + logger.info("Sent shutdown to statistics thread") + + super().join(timeout) + + def disconnect(self): + # TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver + # and StatisticsConsumer. + if not self.is_alive(): + return + + # try to get the thread shutdown, but don't stall forever + self.join(self.DISCONNECT_TIMEOUT) + + if self.is_alive(): + # there is nothing we can do except wait (stall) longer, which could be indefinitely. + logger.error(f"Statistics thread did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.") diff --git a/tangostationcontrol/tangostationcontrol/clients/tcp_replicator.py b/tangostationcontrol/tangostationcontrol/clients/tcp_replicator.py index 5b32466eabe8690936cae768814841a1f7f1f1b0..fd44bcc7247313a606fe20e8e105da4335780338 100644 --- a/tangostationcontrol/tangostationcontrol/clients/tcp_replicator.py +++ b/tangostationcontrol/tangostationcontrol/clients/tcp_replicator.py @@ -6,7 +6,7 @@ import asyncio import logging import sys -from tangostationcontrol.clients.statistics_client_thread import StatisticsClientThread +from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread logger = logging.getLogger() diff --git a/tangostationcontrol/tangostationcontrol/clients/udp_receiver.py b/tangostationcontrol/tangostationcontrol/clients/udp_receiver.py index 9f90782106602aab556061c2cfec19caef5d7c87..d07982f17cff4661dd441294614017781d5ce8a6 100644 --- a/tangostationcontrol/tangostationcontrol/clients/udp_receiver.py +++ b/tangostationcontrol/tangostationcontrol/clients/udp_receiver.py @@ -7,7 +7,7 @@ import socket import time from typing import List # not needed for python3.9+, where we can use the type "list[Queue]" directly -from tangostationcontrol.clients.statistics_client_thread import StatisticsClientThread +from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread logger = logging.getLogger() diff --git a/tangostationcontrol/tangostationcontrol/devices/docker_device.py b/tangostationcontrol/tangostationcontrol/devices/docker_device.py index 5066e9305014bb6ce3e98ba69491c01e5e6e8821..b98bf53e95c65a12c5cdbc224f43d4acd674f4de 100644 --- a/tangostationcontrol/tangostationcontrol/devices/docker_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/docker_device.py @@ -116,7 +116,7 @@ class Docker(lofar_device): async def _connect_docker(self): # tie attributes to client for i in self.attr_list(): - await i.async_set_comm_client(self.docker_client) + await i.async_set_comm_client(self, self.docker_client) await self.docker_client.start() diff --git a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py index 5f58131c9fce2a41c3cfaa6a2aa0b3fbf1b826e1..f898a5c4f08075044d6d64f2c4cf32fe80cdc77f 100644 --- a/tangostationcontrol/tangostationcontrol/devices/lofar_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/lofar_device.py @@ -92,10 +92,10 @@ class lofar_device(Device, metaclass=DeviceMeta): """ Return a list of all the attribute_wrapper members of this class. """ return [v for k, v in cls.__dict__.items() if type(v) == attribute_wrapper] - def setup_value_dict(self): - """ set the initial value for all the attribute wrapper objects""" + def setup_attribute_wrapper(self): + """ prepare the caches for attribute wrapper objects""" - self.value_dict = {i: i.initial_value() for i in self.attr_list()} + self._attribute_wrapper_io = {} def is_attribute_access_allowed(self, req_type): """ Returns whether an attribute wrapped by the attribute_wrapper be accessed. """ @@ -157,7 +157,7 @@ class lofar_device(Device, metaclass=DeviceMeta): self.set_state(DevState.INIT) self.set_status("Device is in the INIT state.") - self.setup_value_dict() + self.setup_attribute_wrapper() # reload our class & device properties from the Tango database self.get_device_properties() @@ -235,7 +235,9 @@ class lofar_device(Device, metaclass=DeviceMeta): :return:None """ if self.get_state() == DevState.OFF: - raise Exception("IllegalCommand: Cannot go from FAULT -> OFF") + # Spurious FAULT + logger.warning("Requested to go to FAULT state, but am already in OFF state.") + return if self.get_state() == DevState.FAULT: # Already faulting. Don't complain. diff --git a/tangostationcontrol/tangostationcontrol/devices/opcua_device.py b/tangostationcontrol/tangostationcontrol/devices/opcua_device.py index eb2e508c35cf6923fb9d121f729465a4eca621e3..c5fa4d1a791bc2511ae93f9e30091c0c1beba2c8 100644 --- a/tangostationcontrol/tangostationcontrol/devices/opcua_device.py +++ b/tangostationcontrol/tangostationcontrol/devices/opcua_device.py @@ -105,10 +105,10 @@ class opcua_device(lofar_device): for i in self.attr_list(): try: if not i.comms_id or i.comms_id == OPCUAConnection: - await i.async_set_comm_client(self.opcua_connection) + await i.async_set_comm_client(self, self.opcua_connection) except Exception as e: # use the pass function instead of setting read/write fails - i.set_pass_func() + i.set_pass_func(self) self.opcua_missing_attributes.append(",".join(self.opcua_connection.get_node_path(i.comms_annotation))) logger.warning(f"Error while setting the attribute {i.comms_annotation} read/write function.", exc_info=True) diff --git a/tangostationcontrol/tangostationcontrol/devices/psoc.py b/tangostationcontrol/tangostationcontrol/devices/psoc.py index e599563fd4f1ad3120d86da937aac31f27dcc53e..643617cfba8853168cef0ac59911fc744571c2e9 100644 --- a/tangostationcontrol/tangostationcontrol/devices/psoc.py +++ b/tangostationcontrol/tangostationcontrol/devices/psoc.py @@ -100,10 +100,10 @@ class PSOC(lofar_device): # map an access helper class for i in self.attr_list(): try: - i.set_comm_client(self.snmp_manager) + i.set_comm_client(self, self.snmp_manager) except Exception as e: # use the pass function instead of setting read/write fails - i.set_pass_func() + i.set_pass_func(self) logger.warning("error while setting the SNMP attribute {} read/write function. {}".format(i, e)) self.snmp_manager.start() diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py index 9b72ce655249770b84b99a9b43ce2777f84f785e..6bdf4f3779dab651b0a0d2d89d359e9560353a8f 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/beamlet.py @@ -157,7 +157,21 @@ class Beamlet(opcua_device): subband_select_RW = attribute(dtype=(numpy.uint32,), max_dim_x=N_BEAMLETS_CTRL, access=AttrWriteType.READ_WRITE, fisallowed="is_attribute_access_allowed") def read_subband_select_RW(self): - return self._subband_select + # We can only return a single value, so we assume the FPGA is configured coherently. Which is something + # that is to be checked by an independent monitoring system anyway. + mask = self.sdp_proxy.TR_fpga_mask_RW + subbands = self.read_attribute("FPGA_beamlet_subband_select_RW") + subbands_in_mask = [s for idx,s in enumerate(subbands) if mask[idx]] + + # If there are no FPGAs selected at all, just return a sane default. + if not subbands_in_mask: + return self.subband_select_RW_default + + # We return the first setting within the mask. Convert into actual shape for a single FPGA + mask_for_all_inputs = subbands_in_mask[0].reshape(self.A_PN, self.N_POL, self.N_BEAMLETS_CTRL) + + # Return the first setting (antenna, pol) within this FPGA + return mask_for_all_inputs[0,0] def write_subband_select_RW(self, subbands): # Use the same subband for all inputs and polarisations of a beamlet @@ -165,9 +179,6 @@ class Beamlet(opcua_device): self.cache_clear() - # Store new value - self._subband_select = subbands - # ---------- # Summarising Attributes # ---------- @@ -179,8 +190,6 @@ class Beamlet(opcua_device): def configure_for_initialise(self): super().configure_for_initialise() - self._subband_select = numpy.zeros(self.N_BEAMLETS_CTRL, dtype=numpy.uint32) - self.sdp_proxy = DeviceProxy("STAT/SDP/1") self.sdp_proxy.set_source(DevSource.DEV) diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py index 3ff59ef6cdeb5ba5206c17a9414d46eee260b386..7f0816afcd5a94b06bbfb76f68f4af857daf1161 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py @@ -17,7 +17,7 @@ from tango import AttrWriteType from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.clients.opcua_client import OPCUAConnection -from tangostationcontrol.clients.statistics_client import StatisticsClient +from tangostationcontrol.clients.statistics.client import StatisticsClient from tangostationcontrol.devices.sdp.statistics import Statistics from tangostationcontrol.devices.sdp.statistics_collector import BSTCollector diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py index a1ae871efd63be5de397984584460de8a0012987..8a51d009fef7dd97f9ae821485ef0bb21114d903 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py @@ -19,7 +19,7 @@ from tango import AttrWriteType from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.clients.opcua_client import OPCUAConnection -from tangostationcontrol.clients.statistics_client import StatisticsClient +from tangostationcontrol.clients.statistics.client import StatisticsClient from tangostationcontrol.devices.sdp.statistics import Statistics from tangostationcontrol.devices.sdp.statistics_collector import SSTCollector diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics.py b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics.py index 92ac1518c3b3f4b7d104f31bde253a2e2005a2b3..c9c3d76338d1b8b7da039a4571355c975dbc6f1e 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics.py @@ -18,7 +18,7 @@ from tango import DeviceProxy, DevSource # Additional import import asyncio -from tangostationcontrol.clients.statistics_client import StatisticsClient +from tangostationcontrol.clients.statistics.client import StatisticsClient from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.devices.opcua_device import opcua_device from tangostationcontrol.common.lofar_logging import log_exceptions @@ -133,10 +133,10 @@ class Statistics(opcua_device): for i in self.attr_list(): try: if i.comms_id == StatisticsClient: - await i.async_set_comm_client(self.statistics_client) + await i.async_set_comm_client(self, self.statistics_client) except Exception as e: # use the pass function instead of setting read/write fails - i.set_pass_func() + i.set_pass_func(self) logger.warning("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e)) await self.statistics_client.start() diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py index b611d6614b60775c6037da6d0371c9d3bd065e68..b330a120488de8acdab62e7b3ae8d88f4f341811 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/statistics_collector.py @@ -1,17 +1,15 @@ -from queue import Queue -from threading import Thread import logging import numpy import datetime from .packet import SSTPacket, XSTPacket, BSTPacket from tangostationcontrol.common.baselines import nr_baselines, baseline_index, baseline_from_index -from tangostationcontrol.clients.statistics_client_thread import StatisticsClientThread from tango import DeviceProxy, DevFailed, DevState logger = logging.getLogger() + class StatisticsCollector: """ Base class to process statistics packets into parameters matrices. """ @@ -355,65 +353,3 @@ class BSTCollector(StatisticsCollector): self.parameters["bst_values"][block_index][:self.MAX_BEAMLETS] = fields.payload self.parameters["bst_timestamps"][block_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["integration_intervals"][block_index] = fields.integration_interval() - - -class StatisticsConsumer(Thread, StatisticsClientThread): - """ Base class to process statistics packets from a queue, asynchronously. """ - - # Maximum time to wait for the Thread to get unstuck, if we want to stop - DISCONNECT_TIMEOUT = 10.0 - - # No default options required, for now? - _DEFAULT_OPTIONS = {} - - def __init__(self, queue: Queue, collector: StatisticsCollector): - self.queue = queue - self.collector = collector - self.last_packet = None - - super().__init__() - self.start() - - @property - def _options(self) -> dict: - return StatisticsConsumer._DEFAULT_OPTIONS - - def run(self): - logger.info("Starting statistics thread") - - while True: - self.last_packet = self.queue.get() - - # This is the exception/slow path, but python doesn't allow us to optimise that - if self.last_packet is None: - # None is the magic marker to stop processing - break - - try: - self.collector.process_packet(self.last_packet) - except ValueError as e: - logger.exception("Could not parse statistics packet") - - # continue processing - - logger.info("Stopped statistics thread") - - def join(self, timeout=0): - # insert magic marker - self.queue.put(None) - logger.info("Sent shutdown to statistics thread") - - super().join(timeout) - - def disconnect(self): - # TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver - # and StatisticsConsumer. - if not self.is_alive(): - return - - # try to get the thread shutdown, but don't stall forever - self.join(self.DISCONNECT_TIMEOUT) - - if self.is_alive(): - # there is nothing we can do except wait (stall) longer, which could be indefinitely. - logger.error(f"Statistics thread did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.") diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py index 4fe5292b638fd346da9bf772c0ce2c7e529bddae..62134c8ba4eda6fd532a9a8b33d5d9f6cb37830a 100644 --- a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py +++ b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py @@ -19,7 +19,7 @@ from tango import AttrWriteType from tangostationcontrol.common.entrypoint import entry from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper from tangostationcontrol.clients.opcua_client import OPCUAConnection -from tangostationcontrol.clients.statistics_client import StatisticsClient +from tangostationcontrol.clients.statistics.client import StatisticsClient from tangostationcontrol.devices.sdp.statistics import Statistics from tangostationcontrol.devices.sdp.statistics_collector import XSTCollector diff --git a/tangostationcontrol/tangostationcontrol/integration_test/default/client/test_opcua_client_against_server.py b/tangostationcontrol/tangostationcontrol/integration_test/default/client/test_opcua_client_against_server.py index 3ce57a0ae8660ac040a3cff0f5ef8680c5909ebc..e16ac79e1f991737910d689b6fdc406dce2a0e3d 100644 --- a/tangostationcontrol/tangostationcontrol/integration_test/default/client/test_opcua_client_against_server.py +++ b/tangostationcontrol/tangostationcontrol/integration_test/default/client/test_opcua_client_against_server.py @@ -88,7 +88,7 @@ class TestClientServer(base.IntegrationAsyncTestCase): class attribute(object): dim_x = 1 dim_y = 0 - numpy_type = numpy.double + datatype = numpy.double prot_attr = await test_client.setup_protocol_attribute(["double_R"], attribute()) @@ -108,7 +108,7 @@ class TestClientServer(base.IntegrationAsyncTestCase): class attribute(object): dim_x = 1 dim_y = 0 - numpy_type = numpy.double + datatype = numpy.double prot_attr = await test_client.setup_protocol_attribute(["double_RW"], attribute()) diff --git a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py index f2c537aafea96a8c28388bff40637bd8ca5eea73..a3b5779650217572dfeddb73eff324657acead2b 100644 --- a/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py +++ b/tangostationcontrol/tangostationcontrol/test/beam/test_delays.py @@ -159,12 +159,12 @@ class TestDelays(base.TestCase): # verify parallellisation along direction axis for i, single_dir in enumerate(directions): single_dir_result = d.convert_bulk([single_dir], positions) - numpy.testing.assert_equal(single_dir_result[:, 0], bulk_result[:, i]) + numpy.testing.assert_almost_equal(single_dir_result[:, 0], bulk_result[:, i], 4) # verify parallellisation along position axis for i, single_pos in enumerate(positions): single_pos_result = d.convert_bulk(directions, [single_pos]) - numpy.testing.assert_equal(single_pos_result[0, :], bulk_result[i, :]) + numpy.testing.assert_almost_equal(single_pos_result[0, :], bulk_result[i, :], 4) def test_convert_bulk_speed(self): d = delay_calculator([0, 0, 0]) diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py b/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py index fd45244f870ccf0da97f378891638a17447bb28b..485250131cc8221f4881cec6a158816265ad60a4 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_attr_wrapper.py @@ -36,7 +36,7 @@ def dev_init(device): device.set_state(DevState.INIT) device.test_client = test_client(device.Fault) for i in device.attr_list(): - asyncio.run(i.async_set_comm_client(device.test_client)) + asyncio.run(i.async_set_comm_client(device, device.test_client)) device.test_client.start() diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_client.py b/tangostationcontrol/tangostationcontrol/test/clients/test_client.py index 577bab69e469fb26af2252790b22f7f92d2c0ade..245e03eff42a5592fa3a36673a2d1e01405e680f 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_client.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_client.py @@ -65,7 +65,7 @@ class test_client(CommClient): else: dims = (attribute.dim_x,) - dtype = attribute.numpy_type + dtype = attribute.datatype return dims, dtype diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py b/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py index ab20b238297af55adef923c7417273dc07e57dc6..63daef88819ad97ebc7b0cb6ddf2c7bda6c86a75 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_opcua_client.py @@ -93,7 +93,7 @@ class TestOPCua(base.AsyncTestCase): for i in ATTR_TEST_TYPES: class mock_attr: def __init__(self, dtype, x, y): - self.numpy_type = dtype + self.datatype = dtype self.dim_x = x self.dim_y = y diff --git a/tangostationcontrol/tangostationcontrol/test/clients/test_statistics_client_thread.py b/tangostationcontrol/tangostationcontrol/test/clients/test_statistics_client_thread.py index 17f866871bd682b3f289364c16a55e5ee2010a7c..1513f605ec7ee937fe9cc51764488fe0fde4f44b 100644 --- a/tangostationcontrol/tangostationcontrol/test/clients/test_statistics_client_thread.py +++ b/tangostationcontrol/tangostationcontrol/test/clients/test_statistics_client_thread.py @@ -10,7 +10,7 @@ import logging from unittest import mock -from tangostationcontrol.clients.statistics_client_thread import \ +from tangostationcontrol.clients.statistics.client_thread import \ StatisticsClientThread from tangostationcontrol.test import base