Skip to content
Snippets Groups Projects
Commit ce0081a4 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'master' into L2SS-777

parents a116d933 4789800b
Branches
Tags
1 merge request!387Resolve L2SS-777 "Add observations to prometheus exporter"
Showing
with 219 additions and 142 deletions
......@@ -32,7 +32,7 @@ if [[ $TANGOSTATIONCONTROL ]]; then
else
# Install the package, exit 1 if it fails
cd tangostationcontrol || exit 1
pip install ./
pip install --upgrade --force-reinstall ./
fi
# Return to the stored the directory, this preserves the working_dir argument in
......
......@@ -9,9 +9,11 @@ The ``recv == DeviceProxy("STAT/RECV/1")`` device controls the RCUs, the LBA ant
:Ant_mask_RW: Controls which antennas will actually be configured when attributes referring to antennas are written.
:type: ``bool[N_RCUs][N_antennas_per_RCU]``
:type: ``bool[N_antennas]``
Typically, ``N_RCUs == 32``, and ``N_antennas_per_RCU == 3``.
Typically, ``N_RCUs == 32``, and ``N_antennas == 96``.
.. note:: The antennas are hooked up to the RCUs in sets of 3, in order.
Error information
`````````````````````
......@@ -24,7 +26,7 @@ These attributes summarise the basic state of the device. Any elements which are
:ANT_error_R: Whether the antennas appear usable.
:type: ``bool[N_RCUs][N_antennas_per_RCU]``
:type: ``bool[N_antennas]``
:RCU_IOUT_error_R: Whether there are alarms on any of the amplitudes in the measured currents.
......
......@@ -17,10 +17,7 @@ Inside lofar/tango/tangostationcontrol/tangostationcontrol/devices/lofar_device.
**kwargs: any other non attribute_wrapper arguments.
NOTE: the `__init__` function contains wrappers for the unassigned read/write functions. In previous versions the read function of an RW attribute used to return the last value it had written *to* the client instead of the value from the client. This has since been changed.
`initial_value`:
This function fills the attribute with a default value of all zero's with the proper dimensions and type if None is specified.
`Set_comm_client`:
`set_comm_client`:
This function can be called to assign a read and write function to the attribute using the data accessor or client given to this function. The attribute wrapper assumes the client is running and has a function called ‘setup_attribute’ which will provide it with a valid read/write function.
`async_set_comm_client`:
......@@ -29,14 +26,6 @@ This function can be called to assign a read and write function to the attribute
`set_pass_func`:
Can be called to assign a 'fake' read/write function. This is useful as a fallback option while development is still ongoing.
`_decorate_read_function`:
Wrap an attribute read function to annotate its exceptions with our comms_annotation to be able to identify which attribute triggered the error.
`_decorate_write_function`:
Wrap an attribute write function to annotate its exceptions with our comms_annotation to be able to identify which attribute triggered the error.
......
from operator import mul
from functools import reduce
from tango.server import attribute
from tango import AttrWriteType
import numpy
from tango import AttrWriteType, AttReqType
from tangostationcontrol.devices.device_decorators import fault_on_error
import logging
......@@ -8,12 +10,46 @@ import logging
logger = logging.getLogger()
class attribute_io(object):
""" Holds the I/O functionality for an attribute for a specific device. """
def __init__(self, device, attribute_wrapper):
# Link to the associated device
self.device = device
# Link to the associated attribute wrapper
self.attribute_wrapper = attribute_wrapper
# Link to last (written) value
self.cached_value = None
# Specific read and write functions for this attribute on this device
self.read_function = lambda: None
self.write_function = lambda value: None
def cached_read_function(self):
""" Return the last (written) value, if available. Otherwise, read
from the device. """
if self.cached_value is not None:
return self.cached_value
self.cached_value = self.read_function()
return self.cached_value
def cached_write_function(self, value):
""" Writes the given value to the device, and updates the cache. """
self.write_function(value)
self.cached_value = value
class attribute_wrapper(attribute):
"""
Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes
"""
def __init__(self, comms_id=None, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs):
def __init__(self, comms_id=None, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, **kwargs):
"""
wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract
managing the communications interface.
......@@ -34,60 +70,62 @@ class attribute_wrapper(attribute):
self.comms_id = comms_id # store data that can be used to identify the comms interface to use. not used by the wrapper itself
self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself
self.init_value = init_value
is_scalar = dims == (1,)
self.datatype = datatype
self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64")
# check if not scalar
if is_scalar:
# scalar, just set the single dimension.
if dims == (1,):
# Tango defines a scalar as having dimensions (1,0), see https://pytango.readthedocs.io/en/stable/server_api/attribute.html
max_dim_x = 1
max_dim_y = 0
else:
# get first dimension
dtype = datatype
elif len(dims) == 1:
max_dim_x = dims[0]
# single dimension/spectrum requires the datatype to be wrapped in a tuple
datatype = (datatype,)
if len(dims) == 2:
# get second dimension
max_dim_y = dims[1]
# wrap the datatype tuple in another tuple for 2d arrays/images
datatype = (datatype,)
else:
max_dim_y = 0
dtype = (datatype,)
elif len(dims) == 2:
max_dim_x = dims[1]
max_dim_y = dims[0]
dtype = ((datatype,),)
else:
# >2D arrays collapse into the X and Y dimensions. The Y (major) dimension mirrors the first dimension given, the
# rest collapses into the X (minor) dimension.
max_dim_x = reduce(mul, dims[1:])
max_dim_y = dims[0]
dtype = ((datatype,),)
if access == AttrWriteType.READ_WRITE:
""" If the attribute is of READ_WRITE type, assign the write and read functions to it"""
# we return the last written value, as we are the only ones in control,
# and the hardware does not necessarily return what we've written
# (see L2SDP-725).
@fault_on_error()
def write_func_wrapper(device, value):
"""
write_func_wrapper writes a value to this attribute
"""
self.write_function(value)
device.value_dict[self] = value
try:
io = self.get_attribute_io(device)
return io.cached_write_function(value)
except Exception as e:
raise e.__class__(f"Could not write attribute {comms_annotation}") from e
@fault_on_error()
def read_func_wrapper(device):
"""
read_func_wrapper reads the attribute value, stores it and returns it"
"""
# lofar.read_attribute ignores fisallowed. So check again if we're allowed to read.
if not device.is_attribute_access_allowed(AttReqType.READ_REQ):
return None
try:
# we return the last written value, as we are the only ones in control,
# and the hardware does not necessarily return what we've written
# (see L2SDP-725).
if self in device.value_dict:
return device.value_dict[self]
io = self.get_attribute_io(device)
# value was never written, so obtain current one and cache that instead
value = self.read_function()
device.value_dict[self] = value
return value
return io.cached_read_function()
except Exception as e:
raise e.__class__(f"Could not read attribute {comms_annotation}") from e
......@@ -101,8 +139,15 @@ class attribute_wrapper(attribute):
"""
read_func_wrapper reads the attribute value, stores it and returns it"
"""
# lofar.read_attribute ignores fisallowed. So check again if we're allowed to read.
if not device.is_attribute_access_allowed(AttReqType.READ_REQ):
return None
try:
return self.read_function()
io = self.get_attribute_io(device)
return io.read_function()
except Exception as e:
raise e.__class__(f"Could not read attribute {comms_annotation}") from e
......@@ -114,60 +159,20 @@ class attribute_wrapper(attribute):
#
# NOTE: fisallowed=<callable> does not work: https://gitlab.com/tango-controls/pytango/-/issues/435
# So we have to use fisallowed=<str> here, which causes the function device.<str> to be called.
super().__init__(dtype=datatype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, fisallowed="is_attribute_access_allowed", **kwargs)
super().__init__(dtype=dtype, max_dim_y=max_dim_y, max_dim_x=max_dim_x, access=access, fisallowed="is_attribute_access_allowed", format=str(dims), **kwargs)
def initial_value(self):
def get_attribute_io(self, device):
"""
returns a numpy array filled with zeroes fit to the size of the attribute. Or if init_value is not the default None, return that value
returns the attribute I/O functions from a certain device, or registers it if not present
"""
if self.init_value is not None:
return self.init_value
if self.dim_y > 1:
dims = (self.dim_x, self.dim_y)
else:
dims = (self.dim_x,)
# x and y are swapped for numpy and Tango. to maintain tango conventions, x and y are swapped for numpy
if len(dims) == 2:
numpy_dims = tuple((dims[1], dims[0]))
else:
numpy_dims = dims
if self.dim_x == 1:
if self.numpy_type == str:
value = ''
else:
value = self.numpy_type(0)
else:
value = numpy.zeros(numpy_dims, dtype=self.numpy_type)
return value
def _decorate_read_function(self, read_attr_func):
""" Wrap an attribute read function to annotate its exceptions with our
comms_annotation to be able to identify which attribute triggered the error. """
def wrapper():
try:
return read_attr_func()
except Exception as e:
raise Exception(f"Failed to read attribute {self.comms_annotation}") from e
return wrapper
return device._attribute_wrapper_io[self]
except KeyError:
device._attribute_wrapper_io[self] = attribute_io(device, self)
return device._attribute_wrapper_io[self]
def _decorate_write_function(self, write_attr_func):
""" Wrap an attribute write function to annotate its exceptions with our
comms_annotation to be able to identify which attribute triggered the error. """
def wrapper(value):
try:
write_attr_func(value)
except Exception as e:
raise Exception(f"Failed to write attribute {self.comms_annotation}") from e
return wrapper
def set_comm_client(self, client):
def set_comm_client(self, device, client):
"""
takes a communications client as input arguments This client should be of a class containing a "get_mapping" function
and return a read and write function that the wrapper will use to get/set data.
......@@ -175,29 +180,28 @@ class attribute_wrapper(attribute):
try:
read_attr_func, write_attr_func = client.setup_attribute(self.comms_annotation, self)
self.read_function = self._decorate_read_function(read_attr_func)
self.write_function = self._decorate_write_function(write_attr_func)
io = self.get_attribute_io(device)
io.read_function = read_attr_func
io.write_function = write_attr_func
except Exception as e:
raise Exception(f"Exception while setting {client.__class__.__name__} attribute with annotation: '{self.comms_annotation}'") from e
async def async_set_comm_client(self, client):
async def async_set_comm_client(self, device, client):
"""
Asynchronous version of set_comm_client.
"""
try:
read_attr_func, write_attr_func = await client.setup_attribute(self.comms_annotation, self)
self.read_function = self._decorate_read_function(read_attr_func)
self.write_function = self._decorate_write_function(write_attr_func)
io = self.get_attribute_io(device)
io.read_function = read_attr_func
io.write_function = write_attr_func
except Exception as e:
raise Exception(f"Exception while setting {client.__class__.__name__} attribute with annotation: '{self.comms_annotation}'") from e
def set_pass_func(self):
def pass_func(value=None):
pass
def set_pass_func(self, device):
logger.debug("using pass function for attribute with annotation: {}".format(self.comms_annotation))
self.read_function = pass_func
self.write_function = pass_func
io = self.get_attribute_io(device)
io.read_function = lambda: None
io.write_function = lambda value: None
......@@ -175,7 +175,7 @@ class OPCUAConnection(AsyncCommClient):
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x = attribute.dim_x
dim_y = attribute.dim_y
ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type
ua_type = numpy_to_OPCua_dict[attribute.datatype] # convert the numpy type to a corresponding UA type
# configure and return the read/write functions
prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type)
......
......@@ -94,7 +94,7 @@ class SNMP_client(CommClient):
dim_x = attribute.dim_x
dim_y = attribute.dim_y
dtype = attribute.numpy_type
dtype = attribute.datatype
return dim_x, dim_y, dtype
......
......@@ -2,11 +2,10 @@ from queue import Queue
import logging
import numpy
from .comms_client import AsyncCommClient
from .tcp_replicator import TCPReplicator
from .udp_receiver import UDPReceiver
from tangostationcontrol.devices.sdp.statistics_collector import StatisticsConsumer
from tangostationcontrol.clients.comms_client import AsyncCommClient
from tangostationcontrol.clients.tcp_replicator import TCPReplicator
from tangostationcontrol.clients.udp_receiver import UDPReceiver
from tangostationcontrol.clients.statistics.consumer import StatisticsConsumer
logger = logging.getLogger()
......
import logging
from threading import Thread
from queue import Queue
from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread
from tangostationcontrol.devices.sdp.statistics_collector import StatisticsCollector
logger = logging.getLogger()
class StatisticsConsumer(Thread, StatisticsClientThread):
""" Base class to process statistics packets from a queue, asynchronously. """
# Maximum time to wait for the Thread to get unstuck, if we want to stop
DISCONNECT_TIMEOUT = 10.0
# No default options required, for now?
_DEFAULT_OPTIONS = {}
def __init__(self, queue: Queue, collector: StatisticsCollector):
self.queue = queue
self.collector = collector
self.last_packet = None
super().__init__()
self.start()
@property
def _options(self) -> dict:
return StatisticsConsumer._DEFAULT_OPTIONS
def run(self):
logger.info("Starting statistics thread")
while True:
self.last_packet = self.queue.get()
# This is the exception/slow path, but python doesn't allow us to optimise that
if self.last_packet is None:
# None is the magic marker to stop processing
break
try:
self.collector.process_packet(self.last_packet)
except ValueError as e:
logger.exception("Could not parse statistics packet")
# continue processing
logger.info("Stopped statistics thread")
def join(self, timeout=0):
# insert magic marker
self.queue.put(None)
logger.info("Sent shutdown to statistics thread")
super().join(timeout)
def disconnect(self):
# TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver
# and StatisticsConsumer.
if not self.is_alive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.DISCONNECT_TIMEOUT)
if self.is_alive():
# there is nothing we can do except wait (stall) longer, which could be indefinitely.
logger.error(f"Statistics thread did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.")
......@@ -6,7 +6,7 @@ import asyncio
import logging
import sys
from tangostationcontrol.clients.statistics_client_thread import StatisticsClientThread
from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread
logger = logging.getLogger()
......
......@@ -7,7 +7,7 @@ import socket
import time
from typing import List # not needed for python3.9+, where we can use the type "list[Queue]" directly
from tangostationcontrol.clients.statistics_client_thread import StatisticsClientThread
from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread
logger = logging.getLogger()
......
......@@ -140,7 +140,7 @@ def configure_logger(logger: logging.Logger=None, log_extra=None, debug=False):
# Always also log the hostname because it makes the origin of the log clear.
hostname = socket.gethostname()
formatter = logging.Formatter(fmt = '%(asctime)s.%(msecs)d %(levelname)s - HOST="{}" DEVICE="%(tango_device)s" PID="%(process)d" TNAME="%(threadName)s" FILE="%(pathname)s" LINE="%(lineno)d" FUNC="%(funcName)s" MSG="%(message)s"'.format(hostname), datefmt = '%Y-%m-%dT%H:%M:%S')
formatter = logging.Formatter(fmt = '%(asctime)s.%(msecs)d %(levelname)s - %(tango_device)s: %(message)s [%(funcName)s in %(filename)s:%(lineno)d]'.format(hostname), datefmt = '%Y-%m-%dT%H:%M:%S')
handler.setFormatter(formatter)
handler.addFilter(LogSuppressErrorSpam())
handler.addFilter(LogAnnotator())
......@@ -171,6 +171,8 @@ def configure_logger(logger: logging.Logger=None, log_extra=None, debug=False):
except Exception:
logger.exception("Cannot forward logs to ELK.")
# Don't log to Tango to reduce log spam
"""
# Log to Tango
try:
handler = TangoLoggingHandler()
......@@ -179,6 +181,7 @@ def configure_logger(logger: logging.Logger=None, log_extra=None, debug=False):
logger.addHandler(handler)
except Exception:
logger.exception("Cannot forward logs to Tango.")
"""
return logger
......
from tango import DevState
# The Device states in which we consider our device operational,
# and thus allow interaction.
OPERATIONAL_STATES = [DevState.ON, DevState.ALARM]
# States in which Initialise() has happened, and the hardware
# can thus be configured or otherwise interacted with.
INITIALISED_STATES = OPERATIONAL_STATES + [DevState.STANDBY]
# States in which most commands are allowed
DEFAULT_COMMAND_STATES = INITIALISED_STATES
......@@ -375,14 +375,10 @@ class HBATToRecvMapper(object):
"RCU_band_select_RW": numpy.zeros(96, dtype=numpy.int64)
}
self.__reshape_attributes_in = {
"ANT_mask_RW": (96,),
"HBAT_BF_delay_steps_RW": (96, 32),
"RCU_band_select_RW": (96,),
}
self.__reshape_attributes_out = {
"ANT_mask_RW": (32, 3),
"HBAT_BF_delay_steps_RW": (96, 32),
"RCU_band_select_RW": (32, 3)
}
def map_read(self, mapped_attribute, recv_results):
......
......@@ -21,9 +21,9 @@ import numpy
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.devices.device_decorators import only_in_states
from tangostationcontrol.devices.opcua_device import opcua_device
from tangostationcontrol.devices.lofar_device import lofar_device
import logging
logger = logging.getLogger()
......@@ -97,8 +97,8 @@ class APSCT(opcua_device):
self.read_attribute("APSCT_PLL_160MHz_locked_R") and self.read_attribute("APSCT_PLL_160MHz_error_R")]
return any(errors)
APSCT_TEMP_error_R = attribute(dtype=bool, polling_period=1000)
APSCT_VOUT_error_R = attribute(dtype=bool)
APSCT_TEMP_error_R = attribute(dtype=bool, fisallowed="is_attribute_access_allowed", polling_period=1000)
APSCT_VOUT_error_R = attribute(dtype=bool, fisallowed="is_attribute_access_allowed")
def read_APSCT_TEMP_error_R(self):
return (self.alarm_val("APSCT_TEMP_R"))
......@@ -139,7 +139,7 @@ class APSCT(opcua_device):
@command()
@DebugIt()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
@only_in_states(DEFAULT_COMMAND_STATES)
def APSCT_off(self):
"""
......@@ -149,7 +149,7 @@ class APSCT(opcua_device):
@command()
@DebugIt()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
@only_in_states(DEFAULT_COMMAND_STATES)
def APSCT_200MHz_on(self):
"""
......@@ -159,7 +159,7 @@ class APSCT(opcua_device):
@command()
@DebugIt()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
@only_in_states(DEFAULT_COMMAND_STATES)
def APSCT_160MHz_on(self):
"""
......
......@@ -63,7 +63,7 @@ class APSPU(opcua_device):
# ----------
# Summarising Attributes
# ----------
APSPU_error_R = attribute(dtype=bool)
APSPU_error_R = attribute(dtype=bool, fisallowed="is_attribute_access_allowed")
def read_APSPU_error_R(self):
return ((self.read_attribute("APSPUTR_I2C_error_R") > 0)
......
......@@ -20,6 +20,7 @@ from tango import AttrWriteType, DebugIt, DevVarStringArray, DevVarDoubleArray,
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.measures import get_measures_directory, get_available_measures_directories, download_measures, use_measures_directory, restart_python
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.devices.device_decorators import TimeIt, only_in_states, fault_on_error
from tangostationcontrol.beam.delays import delay_calculator
from tangostationcontrol.devices.lofar_device import lofar_device
......@@ -206,7 +207,7 @@ class beam_device(lofar_device):
@command(dtype_in=DevVarStringArray)
@DebugIt()
@log_exceptions()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
@only_in_states(DEFAULT_COMMAND_STATES)
def set_pointing(self, pointing_direction: list):
"""
Compute and uploads the hardware-specific delays based on a given pointing direction
......@@ -220,7 +221,7 @@ class beam_device(lofar_device):
@command(dtype_in = DevString)
@DebugIt()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
@only_in_states(DEFAULT_COMMAND_STATES)
def set_pointing_for_specific_time(self, parameters: DevString = None):
"""
Compute and uploads the hardware-specific delays based on a given pointing direction
......@@ -245,7 +246,7 @@ class beam_device(lofar_device):
@command(dtype_in=DevVarStringArray, dtype_out=DevVarDoubleArray)
@DebugIt()
@log_exceptions()
@only_in_states(lofar_device.DEFAULT_COMMAND_STATES)
@only_in_states(DEFAULT_COMMAND_STATES)
def delays(self, pointing_direction: numpy.array):
"""
Calculate the delays based on the pointing list and the timestamp
......
......@@ -26,6 +26,7 @@ from tangostationcontrol.devices.device_decorators import only_in_states
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.devices.lofar_device import lofar_device
from tangostationcontrol.common.lofar_logging import device_logging_to_python, log_exceptions
from tangostationcontrol.common.states import OPERATIONAL_STATES
import logging
logger = logging.getLogger()
......@@ -153,7 +154,7 @@ class DevicesInitialiser(object):
continue
if self.is_available(device):
if self.reboot or self.devices[device].state() not in lofar_device.OPERATIONAL_STATES:
if self.reboot or self.devices[device].state() not in OPERATIONAL_STATES:
self.stop_device(device)
self.boot_device(device)
......@@ -204,7 +205,7 @@ class DevicesInitialiser(object):
else:
proxy.warm_boot()
if proxy.state() not in lofar_device.OPERATIONAL_STATES:
if proxy.state() not in OPERATIONAL_STATES:
raise InitialisationException(f"Could not boot device {device_name}. It reports status: {proxy.status()}")
self.set_status(f"[restarting {device_name}] Succesfully booted.")
......@@ -317,14 +318,14 @@ class Boot(lofar_device):
@command()
@DebugIt()
@only_in_states(lofar_device.OPERATIONAL_STATES)
@only_in_states(OPERATIONAL_STATES)
@log_exceptions()
def boot(self):
self._boot(reboot=False, initialise_hardware=self.Initialise_Hardware)
@command()
@DebugIt()
@only_in_states(lofar_device.OPERATIONAL_STATES)
@only_in_states(OPERATIONAL_STATES)
@log_exceptions()
def reboot(self):
self._boot(reboot=True, initialise_hardware=self.Initialise_Hardware)
......
......@@ -116,7 +116,7 @@ class Docker(lofar_device):
async def _connect_docker(self):
# tie attributes to client
for i in self.attr_list():
await i.async_set_comm_client(self.docker_client)
await i.async_set_comm_client(self, self.docker_client)
await self.docker_client.start()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment