diff --git a/CDB/LOFAR_ConfigDb.json b/CDB/LOFAR_ConfigDb.json index 9ee0e3eec05f85251ea69d09fd071384cfc7611d..cbb02dd5ed8922c397eaeb9fefe2090e0b751f0b 100644 --- a/CDB/LOFAR_ConfigDb.json +++ b/CDB/LOFAR_ConfigDb.json @@ -722,8 +722,17 @@ "SST": { "LTS/SST/1": { "properties": { - "SST_Port": [ + "SST_Client_Port": [ "5001" + ], + "OPC_Server_Name": [ + "dop36.astron.nl" + ], + "OPC_Server_Port": [ + "4840" + ], + "OPC_Time_Out": [ + "5.0" ] } } diff --git a/CDB/sdp-sim-config.json b/CDB/sdp-sim-config.json index 8953c5a45e5a27d9134b1fbe9d6d77d187b3694a..75fb9998cbc210bdfee04adf41e6c854bcf13358 100644 --- a/CDB/sdp-sim-config.json +++ b/CDB/sdp-sim-config.json @@ -18,6 +18,28 @@ } } } + }, + "SST": { + "LTS": { + "SST": { + "LTS/SST/1": { + "properties": { + "SST_Client_Port": [ + "5001" + ], + "OPC_Server_Name": [ + "sdptr-sim" + ], + "OPC_Server_Port": [ + "4840" + ], + "OPC_Time_Out": [ + "5.0" + ] + } + } + } + } } } } diff --git a/CDB/thijs_ConfigDb.json b/CDB/thijs_ConfigDb.json index 6716bd3bc17828abb1135d6d78f6750dd9949329..e60ce20eacdf24bad708009c12259bdcdf8d1cbd 100644 --- a/CDB/thijs_ConfigDb.json +++ b/CDB/thijs_ConfigDb.json @@ -94,8 +94,17 @@ "SST": { "LTS/SST/1": { "properties": { - "SST_Port": [ + "SST_Client_Port": [ "5001" + ], + "OPC_Server_Name": [ + "dop36.astron.nl" + ], + "OPC_Server_Port": [ + "4840" + ], + "OPC_Time_Out": [ + "5.0" ] } } diff --git a/CDB/thomas_ConfigDb.json b/CDB/thomas_ConfigDb.json index 0ebff3734b678d3446fc5d3ecbcb955e34ecb267..33c19e162b8e15001759de58dfca22a82c2dd249 100644 --- a/CDB/thomas_ConfigDb.json +++ b/CDB/thomas_ConfigDb.json @@ -25,6 +25,19 @@ } } } + }, + "SST": { + "LTS": { + "SST": { + "LTS/SST/1": { + "properties": { + "OPC_Server_Name": [ + "okeanos" + ] + } + } + } + } } } } diff --git a/CDB/thomas_arm64_ConfigDb.json b/CDB/thomas_arm64_ConfigDb.json index 89673f8f96e2bc49ce63823a3cbaae1e299294f9..4d010b690433d631ddadc7c14babbb31ec71c6ac 100644 --- a/CDB/thomas_arm64_ConfigDb.json +++ b/CDB/thomas_arm64_ConfigDb.json @@ -25,6 +25,19 @@ } } } + }, + "SST": { + "LTS": { + "SST": { + "LTS/SST/1": { + "properties": { + "OPC_Server_Name": [ + "okeanos" + ] + } + } + } + } } } } diff --git a/devices/clients/attribute_wrapper.py b/devices/clients/attribute_wrapper.py index 73b8d203913362d3d6b4b079aaaaa285e15da23a..99312919c0631f85c64cd3aec097a00b316f12f4 100644 --- a/devices/clients/attribute_wrapper.py +++ b/devices/clients/attribute_wrapper.py @@ -14,11 +14,12 @@ class attribute_wrapper(attribute): Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes """ - def __init__(self, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs): + def __init__(self, comms_id=None, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs): """ wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract managing the communications interface. + comms_id: user-supplied identifier that is attached to this object, to identify which communication class will need to be attached comms_annotation: data passed along to the attribute. can be given any form of data. handling is up to client implementation datatype: any numpy datatype dims: dimensions of the attribute as a tuple, or (1,) for a scalar. @@ -31,6 +32,7 @@ class attribute_wrapper(attribute): if "numpy" not in str(datatype) and datatype != str: raise TypeError("Attribute needs to be a Tango-supported numpy or str type, but has type \"%s\"" % (datatype,)) + self.comms_id = comms_id # store data that can be used to identify the comms interface to use. not used by the wrapper itself self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64") diff --git a/devices/clients/comms_client.py b/devices/clients/comms_client.py index f189ce3fdec74bad5c40c2186bfc6b3f3c207cca..011e1e62180e85f6bc17d72a6ee31eb5871ecb50 100644 --- a/devices/clients/comms_client.py +++ b/devices/clients/comms_client.py @@ -65,6 +65,9 @@ class CommClient(Thread): # signal that we're disconnected self.fault_func() + # don't enter a spam-connect loop if faults immediately occur + time.sleep(self.try_interval) + def ping(self): return diff --git a/devices/clients/opcua_client.py b/devices/clients/opcua_client.py index de237d919e3ebe885f9a6fa41d9570758b885150..ac66f470613194101c41784adea6889003cc1cfb 100644 --- a/devices/clients/opcua_client.py +++ b/devices/clients/opcua_client.py @@ -225,7 +225,37 @@ class ProtocolAttribute: # make sure it is a python array value = value.tolist() if type(value) == numpy.ndarray else value - self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type)) - - - + try: + self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type)) + raise TypeError + except (TypeError, opcua.ua.uaerrors.BadTypeMismatch) as e: + # A type conversion went wrong or there is a type mismatch. + # + # This is either the conversion us -> opcua in our client, or client -> server. + # Report all types involved to allow assessment of the location of the error. + if type(value) == list: + our_type = "list({dtype}) x ({dimensions})".format( + dtype=(type(value[0]).__name__ if value else ""), + dimensions=len(value)) + else: + our_type = "{dtype}".format( + dtype=type(value)) + + is_scalar = (self.dim_x + self.dim_y) == 1 + + if is_scalar: + expected_server_type = "{dtype} (scalar)".format( + dtype=self.ua_type) + else: + expected_server_type = "{dtype} x ({dim_x}, {dim_y})".format( + dtype=self.ua_type, + dim_x=self.dim_x, + dim_y=self.dim_y) + + actual_server_type = "{dtype} {dimensions}".format( + dtype=self.node.get_data_type_as_variant_type(), + dimensions=(self.node.get_array_dimensions() or "???")) + + attribute_name = self.node.get_display_name().to_string() + + raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e diff --git a/devices/common/lofar_git.py b/devices/common/lofar_git.py index 353748d985ccf72483792456cbcd8a0a5aa1056b..f4f6217280fe612fa2e9a7830c1f026c1e36a815 100644 --- a/devices/common/lofar_git.py +++ b/devices/common/lofar_git.py @@ -2,7 +2,7 @@ import git # pip3 install gitpython import os from functools import lru_cache -def get_repo(starting_directory: str = os.path.dirname(__file__)) -> git.Repo: +def get_repo(starting_directory: str = os.path.dirname(os.path.abspath(__file__))) -> git.Repo: """ Try finding the repository by traversing up the tree. By default, the repository containing this module is returned. @@ -50,14 +50,20 @@ def get_version(repo: git.Repo = None) -> str: if repo is None: repo = get_repo() - branch = repo.active_branch commit = repo.commit() tags = {tag.commit: tag for tag in repo.tags} if commit in tags: + # a tag = production ready commit_str = "{}".format(tags[commit]) production_ready = True + elif repo.head.is_detached: + # no active branch + commit_str = "<detached HEAD> [{}]".format(commit) + production_ready = False else: + # HEAD of a branch + branch = repo.active_branch commit_str = "{} [{}]".format(branch, commit) production_ready = False diff --git a/devices/common/lofar_logging.py b/devices/common/lofar_logging.py index 5f3696efa45c8c95d3a7cf8837a51339493a9fe6..aed0353461d75ae6ad46b4b10ad51289fb08b553 100644 --- a/devices/common/lofar_logging.py +++ b/devices/common/lofar_logging.py @@ -1,14 +1,87 @@ import logging from functools import wraps +from tango.server import Device import sys - -# Always also log the hostname because it makes the origin of the log clear. +import traceback import socket -hostname = socket.gethostname() -def configure_logger(logger: logging.Logger, log_extra=None): +from .lofar_git import get_version + +class TangoLoggingHandler(logging.Handler): + level_to_device_stream = { + logging.DEBUG: Device.debug_stream, + logging.INFO: Device.info_stream, + logging.WARN: Device.warn_stream, + logging.ERROR: Device.error_stream, + logging.FATAL: Device.fatal_stream, + } + + def emit(self, record): + try: + if record.tango_device is None: + # log record is not related to any device + return + except AttributeError: + # log record is not annotated with a tango_device + return + + # determine which log stream to use + stream = self.level_to_device_stream[record.levelno] + + # send the log message to Tango + stream(record.tango_device, record.msg, *record.args) + + self.flush() + +class LogAnnotator(logging.Formatter): + """ Annotates log records with: + + record.tango_device: the Tango Device that is executing. """ + + @staticmethod + def get_current_tango_device() -> Device: + """ Return the tango Device we're currently executing for, or None if it can't be detected. + + This is derived by traversing the stack and find a Device as 'self'. In some cases, + this fails, for example if a separate Thread is started for a certain Device. """ + + for frame,lineno in traceback.walk_stack(f=None): + if "self" in frame.f_locals and isinstance(frame.f_locals["self"], Device): + return frame.f_locals["self"] + + return None + + def filter(self, record): + # annotate record with currently executing Tango device, if any + record.tango_device = self.get_current_tango_device() + + # annotate record with the current software version + record.software_version = get_version() + + # we just annotate, we don't filter + return True + +def configure_logger(logger: logging.Logger=None, log_extra=None): + """ + Configure the given logger (or root if None) to: + - send logs to the ELK stack + - send logs to Tango + - send logs to stdout + """ + + # NOTE: We have to attach filters to handlers, instead to this logger, + # in order to have the filters be applied to descendent loggers. + + if logger is None: + logger = logging.getLogger() + + # By default we want to know everything logger.setLevel(logging.DEBUG) + # remove spam from the OPC-UA client connection + logging.getLogger("opcua").setLevel(logging.WARN) + + # Log to ELK stack try: from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter @@ -18,32 +91,52 @@ def configure_logger(logger: logging.Logger, log_extra=None): # configure log messages formatter = LogstashFormatter(extra=log_extra, tags=["python", "lofar"]) handler.setFormatter(formatter) + handler.addFilter(LogAnnotator()) # install the handler logger.addHandler(handler) + except ImportError: + logger.exception("Cannot forward logs to ELK: logstash_async module not found.") + except Exception: + logger.exception("Cannot forward logs to ELK.") - # for now, also log to stderr - # Set up logging in a way that it can be understood by a human reader, be - # easily grep'ed, be parsed with a couple of shell commands and - # easily fed into an Kibana/Elastic search system. - handler = logging.StreamHandler() - formatter = logging.Formatter(fmt = '%(asctime)s.%(msecs)d %(levelname)s - HOST="{}" PID="%(process)d" TNAME="%(threadName)s" TID="%(thread)d" FILE="%(pathname)s" LINE="%(lineno)d" FUNC="%(funcName)s" MSG="%(message)s"'.format(hostname), datefmt = '%Y-%m-%dT%H:%M:%S') - handler.setFormatter(formatter) + # Log to Tango + try: + handler = TangoLoggingHandler() + handler.addFilter(LogAnnotator()) logger.addHandler(handler) except Exception: - logger.exception("Cannot import or configure logstash_async module, not forwarding logs to ELK stack.") + logger.exception("Cannot forward logs to Tango.") + + + # for now, also log to stderr + # Set up logging in a way that it can be understood by a human reader, be + # easily grep'ed, be parsed with a couple of shell commands and + # easily fed into an Kibana/Elastic search system. + handler = logging.StreamHandler() + + # Always also log the hostname because it makes the origin of the log clear. + hostname = socket.gethostname() + + formatter = logging.Formatter(fmt = '%(asctime)s.%(msecs)d %(levelname)s - HOST="{}" DEVICE="%(tango_device)s" PID="%(process)d" TNAME="%(threadName)s" FILE="%(pathname)s" LINE="%(lineno)d" FUNC="%(funcName)s" MSG="%(message)s"'.format(hostname), datefmt = '%Y-%m-%dT%H:%M:%S') + handler.setFormatter(formatter) + handler.addFilter(LogAnnotator()) + + logger.addHandler(handler) return logger -def device_logging_to_python(log_extra: dict = None): - """ Call this on a Tango Device instance or class to have your Tango Device log to python instead. """ +def device_logging_to_python(): + """ Decorator. Call this on a Tango Device instance or class to have your Tango Device log to python instead. """ def inner(cls): - # Create a logger that logs to ELK, dedicated for this class - logger = logging.getLogger(cls.__name__) - configure_logger(logger, log_extra) + # we'll be doing very weird things if this class isnt + if not issubclass(cls, Device): + raise ValueError("device_logging_to_python decorator is to be used on Tango Device classes only.") # Monkey patch the python logger to replace the tango logger + logger = logging.getLogger() + cls.debug_stream = logger.debug cls.info_stream = logger.info cls.warn_stream = logger.warning @@ -51,11 +144,12 @@ def device_logging_to_python(log_extra: dict = None): cls.error_stream = logger.error cls.fatal_stream = logger.fatal cls.critical_stream = logger.critical + return cls return inner -def log_exceptions(): +def log_exceptions(logger: logging.Logger=None): """ Decorator that logs all exceptions that the function raises. """ def wrapper(func): @@ -64,8 +158,10 @@ def log_exceptions(): try: return func(self, *args, **kwargs) except Exception as e: - self.error_stream("Unhandled exception: {}".format(e)) - raise e + (logger or logging.getLogger()).exception("Unhandled exception: %s: %s", e.__class__.__name__, e) + + # we can log but we cannot hide + raise return inner diff --git a/devices/devices/apsctl.py b/devices/devices/apsctl.py index 25d8e1080e5ce71a05634fee392aa9eff57ef711..b555cb1b68ac5f76261c73d9723aa050316dc9d8 100644 --- a/devices/devices/apsctl.py +++ b/devices/devices/apsctl.py @@ -34,7 +34,7 @@ import numpy __all__ = ["APSCTL", "main"] -@device_logging_to_python({"device": "APSCTL"}) +@device_logging_to_python() class APSCTL(hardware_device): """ @@ -195,8 +195,7 @@ def main(args=None, **kwargs): """Main function of the SDP module.""" from devices.common.lofar_logging import configure_logger - import logging - configure_logger(logging.getLogger()) + configure_logger() return run((APSCTL,), args=args, **kwargs) diff --git a/devices/devices/hardware_device.py b/devices/devices/hardware_device.py index d244e79df8d7c90bd93307f826bba39a8f9c15c5..8c348868d691111eedb700906b9c1a4242111591 100644 --- a/devices/devices/hardware_device.py +++ b/devices/devices/hardware_device.py @@ -90,7 +90,7 @@ class hardware_device(Device): self.set_state(DevState.STANDBY) @command() - @only_in_states([DevState.STANDBY]) + @only_in_states([DevState.STANDBY, DevState.ON]) @DebugIt() @fault_on_error() @log_exceptions() @@ -100,6 +100,11 @@ class hardware_device(Device): :return:None """ + if self.get_state() == DevState.ON: + # Already on. Don't complain. + logger.warning("Requested to go to ON state, but am already in ON state.") + return + self.configure_for_on() self.set_state(DevState.ON) @@ -125,7 +130,7 @@ class hardware_device(Device): self.set_state(DevState.OFF) @command() - @only_in_states([DevState.ON, DevState.INIT, DevState.STANDBY]) + @only_in_states([DevState.ON, DevState.INIT, DevState.STANDBY, DevState.FAULT]) @DebugIt() @log_exceptions() def Fault(self): @@ -138,6 +143,11 @@ class hardware_device(Device): :return:None """ + if self.get_state() == DevState.FAULT: + # Already faulting. Don't complain. + logger.warning("Requested to go to FAULT state, but am already in FAULT state.") + return + self.configure_for_fault() self.set_state(DevState.FAULT) diff --git a/devices/devices/pcc.py b/devices/devices/pcc.py index f21bd39a26405e323a7842718f571626f498cd5e..0db21b41e7c609c934345e0b0dafdea9e9e08efb 100644 --- a/devices/devices/pcc.py +++ b/devices/devices/pcc.py @@ -35,7 +35,7 @@ from common.lofar_git import get_version __all__ = ["PCC", "main"] -@device_logging_to_python({"device": "PCC"}) +@device_logging_to_python() class PCC(hardware_device): """ @@ -251,8 +251,7 @@ def main(args=None, **kwargs): """Main function of the PCC module.""" from common.lofar_logging import configure_logger - import logging - configure_logger(logging.getLogger()) + configure_logger() return run((PCC,), args=args, **kwargs) diff --git a/devices/devices/sdp/sdp.py b/devices/devices/sdp/sdp.py index 05b4fe3162eca33318c91855fcc61281f27e9882..c78c1d042fae448adac4b9e24901a053f1683fe1 100644 --- a/devices/devices/sdp/sdp.py +++ b/devices/devices/sdp/sdp.py @@ -35,7 +35,7 @@ import numpy __all__ = ["SDP", "main"] -@device_logging_to_python({"device": "SDP"}) +@device_logging_to_python() class SDP(hardware_device): """ @@ -94,16 +94,6 @@ class SDP(hardware_device): FPGA_sdp_info_observation_id_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_observation_id_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_sdp_info_station_id_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_station_id_R"], datatype=numpy.uint32, dims=(16,)) FPGA_sdp_info_station_id_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_station_id_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE) - FPGA_sst_offload_enable_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_enable_R"], datatype=numpy.bool_, dims=(16,)) - FPGA_sst_offload_enable_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) - FPGA_sst_offload_hdr_eth_destination_mac_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_eth_destination_mac_R"], datatype=numpy.str_, dims=(16,)) - FPGA_sst_offload_hdr_eth_destination_mac_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_eth_destination_mac_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE) - FPGA_sst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_R"], datatype=numpy.str_, dims=(16,)) - FPGA_sst_offload_hdr_ip_destination_address_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE) - FPGA_sst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,)) - FPGA_sst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE) - FPGA_sst_offload_selector_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,)) - FPGA_sst_offload_selector_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_selector_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_status_R = attribute_wrapper(comms_annotation=["2:FPGA_status_R"], datatype=numpy.bool_, dims=(16,)) FPGA_temp_R = attribute_wrapper(comms_annotation=["2:FPGA_temp_R"], datatype=numpy.float_, dims=(16,)) FPGA_version_R = attribute_wrapper(comms_annotation=["2:FPGA_version_R"], datatype=numpy.str_, dims=(16,)) @@ -184,8 +174,7 @@ def main(args=None, **kwargs): """Main function of the SDP module.""" from common.lofar_logging import configure_logger - import logging - configure_logger(logging.getLogger()) + configure_logger() return run((SDP,), args=args, **kwargs) diff --git a/devices/devices/sdp/sst.py b/devices/devices/sdp/sst.py index d78841939d26c058e8c1d06a2b5157b95dd06784..14b9e4ba381fc195f2d3c9997e51530e83e7e048 100644 --- a/devices/devices/sdp/sst.py +++ b/devices/devices/sdp/sst.py @@ -25,6 +25,7 @@ from tango import AttrWriteType # Additional import from clients.sst_client import sst_client, SST_collector +from clients.opcua_connection import OPCUAConnection from clients.attribute_wrapper import attribute_wrapper from devices.hardware_device import hardware_device @@ -36,14 +37,39 @@ import numpy __all__ = ["SST", "main"] -@device_logging_to_python({"device": "SST"}) +@device_logging_to_python() class SST(hardware_device): # ----------------- # Device Properties # ----------------- - SST_Port = device_property( + OPC_Server_Name = device_property( + dtype='DevString', + mandatory=True + ) + + OPC_Server_Port = device_property( + dtype='DevULong', + mandatory=True + ) + + OPC_Time_Out = device_property( + dtype='DevDouble', + mandatory=True + ) + + SST_Client_Name = device_property( + dtype='DevString', + mandatory=True + ) + + SST_Client_MAC = device_property( + dtype='DevString', + mandatory=True + ) + + SST_Client_Port = device_property( dtype='DevUShort', mandatory=True ) @@ -54,34 +80,46 @@ class SST(hardware_device): version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version()) + # FPGA control points for SSTs + FPGA_sst_offload_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_sst_offload_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_enable_R"], datatype=numpy.bool_, dims=(16,)) + FPGA_sst_offload_hdr_eth_destination_mac_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_eth_destination_mac_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_sst_offload_hdr_eth_destination_mac_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_eth_destination_mac_R"], datatype=numpy.str_, dims=(16,)) + FPGA_sst_offload_hdr_ip_destination_address_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_sst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_R"], datatype=numpy.str_, dims=(16,)) + FPGA_sst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_sst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,)) + FPGA_sst_offload_selector_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) + FPGA_sst_offload_selector_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,)) + # number of UDP packets that were received - nof_packets_received_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64) + nof_packets_received_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64) # number of UDP packets that were dropped because we couldn't keep up with processing - nof_packets_dropped_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64) + nof_packets_dropped_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64) # last packet we processed - last_packet_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8) + last_packet_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8) # when last packet was received - last_packet_timestamp_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64) + last_packet_timestamp_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64) # number of UDP packets that were processed - nof_packets_processed_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_packets"}, datatype=numpy.uint64) + nof_packets_processed_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_packets"}, datatype=numpy.uint64) # queue fill percentage, as reported by the consumer - queue_fill_percentage_R = attribute_wrapper(comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64) + queue_fill_percentage_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64) # number of invalid (non-SST) packets received - nof_invalid_packets_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64) + nof_invalid_packets_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64) # last packet that could not be parsed - last_invalid_packet_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "last_invalid_packet"}, dims=(9000,), datatype=numpy.uint8) + last_invalid_packet_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "last_invalid_packet"}, dims=(9000,), datatype=numpy.uint8) # number of packets with valid payloads - nof_valid_payloads_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + nof_valid_payloads_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) # number of packets with invalid payloads - nof_payload_errors_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + nof_payload_errors_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) # latest SSTs - sst_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64) + sst_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64) # reported timestamp for each row in the latest SSTs - sst_timestamp_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) + sst_timestamp_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64) # integration interval for each row in the latest SSTs - integration_interval_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32) + integration_interval_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32) # -------- # overloaded functions @@ -94,17 +132,27 @@ class SST(hardware_device): except Exception as e: self.warn_stream("Exception while stopping sst_client in configure_for_off function: {}. Exception ignored".format(e)) + try: + self.opcua_connection.stop() + except Exception as e: + self.warn_stream("Exception while stopping OPC UA connection in configure_for_off function: {}. Exception ignored".format(e)) + @log_exceptions() def configure_for_initialise(self): """ user code here. is called when the sate is set to INIT """ """Initialises the attributes and properties of the statistics device.""" - self.sst_client = sst_client("0.0.0.0", self.SST_Port, self.Fault, self) + self.sst_client = sst_client("0.0.0.0", self.SST_Client_Port, self.Fault, self) + + self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) # map an access helper class for i in self.attr_list(): try: - i.set_comm_client(self.sst_client) + if i.comms_id == sst_client: + i.set_comm_client(self.sst_client) + if i.comms_id == OPCUAConnection: + i.set_comm_client(self.OPCUA_client) except Exception as e: # use the pass function instead of setting read/write fails i.set_pass_func() @@ -113,6 +161,8 @@ class SST(hardware_device): self.sst_client.start() + self.OPCua_client.start() + # -------- # Commands # -------- @@ -124,8 +174,7 @@ def main(args=None, **kwargs): """Main function of the SST Device module.""" from common.lofar_logging import configure_logger - import logging - configure_logger(logging.getLogger()) + configure_logger() return run((SST,), args=args, **kwargs) diff --git a/devices/test/util/__init__.py b/devices/test/common/__init__.py similarity index 100% rename from devices/test/util/__init__.py rename to devices/test/common/__init__.py diff --git a/devices/test/util/test_lofar_git.py b/devices/test/common/test_lofar_git.py similarity index 90% rename from devices/test/util/test_lofar_git.py rename to devices/test/common/test_lofar_git.py index 178eac535d52dfc6a2edf393a80e3c699e37a8b7..52a1c7d876fc2827757f082e0f44a0a64b1ffc78 100644 --- a/devices/test/util/test_lofar_git.py +++ b/devices/test/common/test_lofar_git.py @@ -35,9 +35,11 @@ class TestLofarGit(base.TestCase): m_is_dirty = mock.Mock() m_is_dirty.return_value = True + m_head = mock.Mock(is_detached=False) + m_get_repo.return_value = mock.Mock( active_branch="main", commit=m_commit, tags=[], - is_dirty=m_is_dirty) + is_dirty=m_is_dirty, head=m_head) # No need for special string equal in Python self.assertEqual("*main [123456]", lofar_git.get_version()) @@ -52,12 +54,14 @@ class TestLofarGit(base.TestCase): m_is_dirty = mock.Mock() m_is_dirty.return_value = False + m_head = mock.Mock(is_detached=False) + m_tag = mock.Mock(commit="123456") m_tag.__str__ = mock.Mock(return_value= "version-1.2") m_get_repo.return_value = mock.Mock( active_branch="main", commit=m_commit, - tags=[m_tag], is_dirty=m_is_dirty) + tags=[m_tag], is_dirty=m_is_dirty, head=m_head) self.assertEqual("version-1.2", lofar_git.get_version()) @@ -71,13 +75,15 @@ class TestLofarGit(base.TestCase): m_is_dirty = mock.Mock() m_is_dirty.return_value = False + m_head = mock.Mock(is_detached=False) + m_tag = mock.Mock(commit="123456") m_tag.__str__ = mock.Mock(return_value= "version-1.2") # Now m_get_repo is mocked using a decorator m_get_repo.return_value = mock.Mock( active_branch="main", commit=m_commit, - tags=[m_tag], is_dirty=m_is_dirty) + tags=[m_tag], is_dirty=m_is_dirty, head=m_head) self.assertEqual("version-1.2", lofar_git.get_version()) diff --git a/devices/test/common/test_lofar_logging.py b/devices/test/common/test_lofar_logging.py new file mode 100644 index 0000000000000000000000000000000000000000..534b2650c8d432ef7c7dae9e32448b01cc5913f3 --- /dev/null +++ b/devices/test/common/test_lofar_logging.py @@ -0,0 +1,105 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the LOFAR 2.0 Station Software +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +import git +from unittest import mock + +from common import lofar_logging +import logging +from tango.server import Device +from tango import device_server + +from test import base + + +class TestLofarLogging(base.TestCase): + + + def setUp(self): + super(TestLofarLogging, self).setUp() + + + # reset logging system + rootLogger = logging.getLogger() + rootLogger.filters = [] + rootLogger.handlers = [] + rootLogger.manager.loggerDict = {} + + # record everything we log in memory so we can inspect it + class MemoryHandler(logging.Handler): + """ Handler that provides access to the records emitted. """ + + def __init__(self): + super().__init__() + self.records = [] + + def emit(self, record): + self.records.append(record) + + self.memory_handler = MemoryHandler() + rootLogger.addHandler(self.memory_handler) + + + def test_configure_logging_basic_usage(self): + """ Test whether configure_logger indeed runs smoothly. """ + + logger = lofar_logging.configure_logger() + + logger.debug("test debug") + logger.info("test info") + logger.warning("test warning") + logger.error("test error") + logger.fatal("test fatal") + + + def test_configure_logging_log_annotation(self): + """ Test whether log records get annotated after using configure_logger(). """ + + logger = lofar_logging.configure_logger() + + logger.info("test") + self.assertIn("tango_device", self.memory_handler.records[0].__dict__) + self.assertIn("software_version", self.memory_handler.records[0].__dict__) + + + def test_configure_logging_uses_tango_device(self): + """ Test whether log records get annotated with the active Tango device after using configure_logger(), and whether logs get forwarded to it. """ + + logger = lofar_logging.configure_logger() + + # create a Tango Device that logs something + class MyDevice(Device): + def __init__(self): + self.log_deeper_in_stack() + + def log_deeper_in_stack(self): + logger.info("test") + + with mock.patch.object(device_server.DeviceImpl, '__info_stream') as m_info_stream: + # logs in the constructor already + mydevice = MyDevice() + + self.assertEqual(mydevice, self.memory_handler.records[0].tango_device, msg="configure_logging did not detect active Tango device") + self.assertEqual(1, m_info_stream.call_count, msg="configure_logger did not send logs to active Tango device") + + + def test_log_exceptions(self): + """ Test whether log_exceptions actually logs and reraises exceptions. """ + + class Foo: + @lofar_logging.log_exceptions() + def exceptionalFunction(self): + raise Exception("test") + + with self.assertRaises(Exception, msg="log_exceptions did not reraise exception"): + f = Foo() + f.exceptionalFunction() + + self.assertEqual(1, len(self.memory_handler.records), msg="log_exceptions did not log exception") + diff --git a/devices/tox.ini b/devices/tox.ini index 26738468dc9f5fdbfe9b25af86c682c86c831c68..18c6cda38751d7bc447e8fb23d92e63b64288ddb 100644 --- a/devices/tox.ini +++ b/devices/tox.ini @@ -24,9 +24,13 @@ commands = ; doc8 doc/source/ README.rst flake8 -[testenv:bandit] +[testenv:bandit]; +; B104: hardcoded_bind_all_interfaces +; - We disable this warning as Docker serves as our firewall. +; It thus matters what interfaces Docker will bind our +; containers to, not what our containers listen on. commands = - bandit -r devices/ clients/ common/ examples/ util/ -n5 -ll + bandit -r devices/ clients/ common/ examples/ util/ -n5 -ll -s B104 [flake8] filename = *.py,.stestr.conf,.txt diff --git a/docker-compose/jupyter/Dockerfile b/docker-compose/jupyter/Dockerfile index 7f043d4493e71fff9659dc5ee26cd0b716cef247..d6378937093435bd209e0ec0175999fe56e08563 100644 --- a/docker-compose/jupyter/Dockerfile +++ b/docker-compose/jupyter/Dockerfile @@ -8,6 +8,8 @@ ARG CONTAINER_EXECUTION_UID=1000 RUN sudo pip3 install jupyter RUN sudo pip3 install ipykernel RUN sudo pip3 install jupyter_bokeh +# Install matplotlib, jupyterplot +RUN sudo pip3 install matplotlib jupyterplot # Configure jupyter_bokeh RUN sudo mkdir -p /usr/share/jupyter /usr/etc