Skip to content
Snippets Groups Projects
Commit 27d7aefe authored by Thomas Juerges's avatar Thomas Juerges
Browse files

Merge branch 'master' into L2SS-183-Observation_device

parents 0480b022 9d6285dd
Branches
Tags
1 merge request!40L2SS-183: Observation device
Showing
with 440 additions and 74 deletions
......@@ -722,8 +722,17 @@
"SST": {
"LTS/SST/1": {
"properties": {
"SST_Port": [
"SST_Client_Port": [
"5001"
],
"OPC_Server_Name": [
"dop36.astron.nl"
],
"OPC_Server_Port": [
"4840"
],
"OPC_Time_Out": [
"5.0"
]
}
}
......
......@@ -18,6 +18,28 @@
}
}
}
},
"SST": {
"LTS": {
"SST": {
"LTS/SST/1": {
"properties": {
"SST_Client_Port": [
"5001"
],
"OPC_Server_Name": [
"sdptr-sim"
],
"OPC_Server_Port": [
"4840"
],
"OPC_Time_Out": [
"5.0"
]
}
}
}
}
}
}
}
......@@ -94,8 +94,17 @@
"SST": {
"LTS/SST/1": {
"properties": {
"SST_Port": [
"SST_Client_Port": [
"5001"
],
"OPC_Server_Name": [
"dop36.astron.nl"
],
"OPC_Server_Port": [
"4840"
],
"OPC_Time_Out": [
"5.0"
]
}
}
......
......@@ -25,6 +25,19 @@
}
}
}
},
"SST": {
"LTS": {
"SST": {
"LTS/SST/1": {
"properties": {
"OPC_Server_Name": [
"okeanos"
]
}
}
}
}
}
}
}
......@@ -25,6 +25,19 @@
}
}
}
},
"SST": {
"LTS": {
"SST": {
"LTS/SST/1": {
"properties": {
"OPC_Server_Name": [
"okeanos"
]
}
}
}
}
}
}
}
......@@ -14,11 +14,12 @@ class attribute_wrapper(attribute):
Wraps all the attributes in a wrapper class to manage most of the redundant code behind the scenes
"""
def __init__(self, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs):
def __init__(self, comms_id=None, comms_annotation=None, datatype=None, dims=(1,), access=AttrWriteType.READ, init_value=None, **kwargs):
"""
wraps around the tango Attribute class. Provides an easier interface for 1d or 2d arrays. Also provides a way to abstract
managing the communications interface.
comms_id: user-supplied identifier that is attached to this object, to identify which communication class will need to be attached
comms_annotation: data passed along to the attribute. can be given any form of data. handling is up to client implementation
datatype: any numpy datatype
dims: dimensions of the attribute as a tuple, or (1,) for a scalar.
......@@ -31,6 +32,7 @@ class attribute_wrapper(attribute):
if "numpy" not in str(datatype) and datatype != str:
raise TypeError("Attribute needs to be a Tango-supported numpy or str type, but has type \"%s\"" % (datatype,))
self.comms_id = comms_id # store data that can be used to identify the comms interface to use. not used by the wrapper itself
self.comms_annotation = comms_annotation # store data that can be used by the comms interface. not used by the wrapper itself
self.numpy_type = datatype # tango changes our attribute to their representation (E.g numpy.int64 becomes "DevLong64")
......
......@@ -65,6 +65,9 @@ class CommClient(Thread):
# signal that we're disconnected
self.fault_func()
# don't enter a spam-connect loop if faults immediately occur
time.sleep(self.try_interval)
def ping(self):
return
......
......@@ -225,7 +225,37 @@ class ProtocolAttribute:
# make sure it is a python array
value = value.tolist() if type(value) == numpy.ndarray else value
try:
self.node.set_data_value(opcua.ua.uatypes.Variant(value=value, varianttype=self.ua_type))
raise TypeError
except (TypeError, opcua.ua.uaerrors.BadTypeMismatch) as e:
# A type conversion went wrong or there is a type mismatch.
#
# This is either the conversion us -> opcua in our client, or client -> server.
# Report all types involved to allow assessment of the location of the error.
if type(value) == list:
our_type = "list({dtype}) x ({dimensions})".format(
dtype=(type(value[0]).__name__ if value else ""),
dimensions=len(value))
else:
our_type = "{dtype}".format(
dtype=type(value))
is_scalar = (self.dim_x + self.dim_y) == 1
if is_scalar:
expected_server_type = "{dtype} (scalar)".format(
dtype=self.ua_type)
else:
expected_server_type = "{dtype} x ({dim_x}, {dim_y})".format(
dtype=self.ua_type,
dim_x=self.dim_x,
dim_y=self.dim_y)
actual_server_type = "{dtype} {dimensions}".format(
dtype=self.node.get_data_type_as_variant_type(),
dimensions=(self.node.get_array_dimensions() or "???"))
attribute_name = self.node.get_display_name().to_string()
raise TypeError(f"Cannot write value to OPC-UA attribute '{attribute_name}': tried to convert data type {our_type} to expected server type {expected_server_type}, server reports type {actual_server_type}") from e
......@@ -2,7 +2,7 @@ import git # pip3 install gitpython
import os
from functools import lru_cache
def get_repo(starting_directory: str = os.path.dirname(__file__)) -> git.Repo:
def get_repo(starting_directory: str = os.path.dirname(os.path.abspath(__file__))) -> git.Repo:
""" Try finding the repository by traversing up the tree.
By default, the repository containing this module is returned.
......@@ -50,14 +50,20 @@ def get_version(repo: git.Repo = None) -> str:
if repo is None:
repo = get_repo()
branch = repo.active_branch
commit = repo.commit()
tags = {tag.commit: tag for tag in repo.tags}
if commit in tags:
# a tag = production ready
commit_str = "{}".format(tags[commit])
production_ready = True
elif repo.head.is_detached:
# no active branch
commit_str = "<detached HEAD> [{}]".format(commit)
production_ready = False
else:
# HEAD of a branch
branch = repo.active_branch
commit_str = "{} [{}]".format(branch, commit)
production_ready = False
......
import logging
from functools import wraps
from tango.server import Device
import sys
# Always also log the hostname because it makes the origin of the log clear.
import traceback
import socket
hostname = socket.gethostname()
def configure_logger(logger: logging.Logger, log_extra=None):
from .lofar_git import get_version
class TangoLoggingHandler(logging.Handler):
level_to_device_stream = {
logging.DEBUG: Device.debug_stream,
logging.INFO: Device.info_stream,
logging.WARN: Device.warn_stream,
logging.ERROR: Device.error_stream,
logging.FATAL: Device.fatal_stream,
}
def emit(self, record):
try:
if record.tango_device is None:
# log record is not related to any device
return
except AttributeError:
# log record is not annotated with a tango_device
return
# determine which log stream to use
stream = self.level_to_device_stream[record.levelno]
# send the log message to Tango
stream(record.tango_device, record.msg, *record.args)
self.flush()
class LogAnnotator(logging.Formatter):
""" Annotates log records with:
record.tango_device: the Tango Device that is executing. """
@staticmethod
def get_current_tango_device() -> Device:
""" Return the tango Device we're currently executing for, or None if it can't be detected.
This is derived by traversing the stack and find a Device as 'self'. In some cases,
this fails, for example if a separate Thread is started for a certain Device. """
for frame,lineno in traceback.walk_stack(f=None):
if "self" in frame.f_locals and isinstance(frame.f_locals["self"], Device):
return frame.f_locals["self"]
return None
def filter(self, record):
# annotate record with currently executing Tango device, if any
record.tango_device = self.get_current_tango_device()
# annotate record with the current software version
record.software_version = get_version()
# we just annotate, we don't filter
return True
def configure_logger(logger: logging.Logger=None, log_extra=None):
"""
Configure the given logger (or root if None) to:
- send logs to the ELK stack
- send logs to Tango
- send logs to stdout
"""
# NOTE: We have to attach filters to handlers, instead to this logger,
# in order to have the filters be applied to descendent loggers.
if logger is None:
logger = logging.getLogger()
# By default we want to know everything
logger.setLevel(logging.DEBUG)
# remove spam from the OPC-UA client connection
logging.getLogger("opcua").setLevel(logging.WARN)
# Log to ELK stack
try:
from logstash_async.handler import AsynchronousLogstashHandler, LogstashFormatter
......@@ -18,32 +91,52 @@ def configure_logger(logger: logging.Logger, log_extra=None):
# configure log messages
formatter = LogstashFormatter(extra=log_extra, tags=["python", "lofar"])
handler.setFormatter(formatter)
handler.addFilter(LogAnnotator())
# install the handler
logger.addHandler(handler)
except ImportError:
logger.exception("Cannot forward logs to ELK: logstash_async module not found.")
except Exception:
logger.exception("Cannot forward logs to ELK.")
# Log to Tango
try:
handler = TangoLoggingHandler()
handler.addFilter(LogAnnotator())
logger.addHandler(handler)
except Exception:
logger.exception("Cannot forward logs to Tango.")
# for now, also log to stderr
# Set up logging in a way that it can be understood by a human reader, be
# easily grep'ed, be parsed with a couple of shell commands and
# easily fed into an Kibana/Elastic search system.
handler = logging.StreamHandler()
formatter = logging.Formatter(fmt = '%(asctime)s.%(msecs)d %(levelname)s - HOST="{}" PID="%(process)d" TNAME="%(threadName)s" TID="%(thread)d" FILE="%(pathname)s" LINE="%(lineno)d" FUNC="%(funcName)s" MSG="%(message)s"'.format(hostname), datefmt = '%Y-%m-%dT%H:%M:%S')
# Always also log the hostname because it makes the origin of the log clear.
hostname = socket.gethostname()
formatter = logging.Formatter(fmt = '%(asctime)s.%(msecs)d %(levelname)s - HOST="{}" DEVICE="%(tango_device)s" PID="%(process)d" TNAME="%(threadName)s" FILE="%(pathname)s" LINE="%(lineno)d" FUNC="%(funcName)s" MSG="%(message)s"'.format(hostname), datefmt = '%Y-%m-%dT%H:%M:%S')
handler.setFormatter(formatter)
handler.addFilter(LogAnnotator())
logger.addHandler(handler)
except Exception:
logger.exception("Cannot import or configure logstash_async module, not forwarding logs to ELK stack.")
return logger
def device_logging_to_python(log_extra: dict = None):
""" Call this on a Tango Device instance or class to have your Tango Device log to python instead. """
def device_logging_to_python():
""" Decorator. Call this on a Tango Device instance or class to have your Tango Device log to python instead. """
def inner(cls):
# Create a logger that logs to ELK, dedicated for this class
logger = logging.getLogger(cls.__name__)
configure_logger(logger, log_extra)
# we'll be doing very weird things if this class isnt
if not issubclass(cls, Device):
raise ValueError("device_logging_to_python decorator is to be used on Tango Device classes only.")
# Monkey patch the python logger to replace the tango logger
logger = logging.getLogger()
cls.debug_stream = logger.debug
cls.info_stream = logger.info
cls.warn_stream = logger.warning
......@@ -51,11 +144,12 @@ def device_logging_to_python(log_extra: dict = None):
cls.error_stream = logger.error
cls.fatal_stream = logger.fatal
cls.critical_stream = logger.critical
return cls
return inner
def log_exceptions():
def log_exceptions(logger: logging.Logger=None):
""" Decorator that logs all exceptions that the function raises. """
def wrapper(func):
......@@ -64,8 +158,10 @@ def log_exceptions():
try:
return func(self, *args, **kwargs)
except Exception as e:
self.error_stream("Unhandled exception: {}".format(e))
raise e
(logger or logging.getLogger()).exception("Unhandled exception: %s: %s", e.__class__.__name__, e)
# we can log but we cannot hide
raise
return inner
......
......@@ -34,7 +34,7 @@ import numpy
__all__ = ["APSCTL", "main"]
@device_logging_to_python({"device": "APSCTL"})
@device_logging_to_python()
class APSCTL(hardware_device):
"""
......@@ -195,8 +195,7 @@ def main(args=None, **kwargs):
"""Main function of the SDP module."""
from devices.common.lofar_logging import configure_logger
import logging
configure_logger(logging.getLogger())
configure_logger()
return run((APSCTL,), args=args, **kwargs)
......
......@@ -90,7 +90,7 @@ class hardware_device(Device):
self.set_state(DevState.STANDBY)
@command()
@only_in_states([DevState.STANDBY])
@only_in_states([DevState.STANDBY, DevState.ON])
@DebugIt()
@fault_on_error()
@log_exceptions()
......@@ -100,6 +100,11 @@ class hardware_device(Device):
:return:None
"""
if self.get_state() == DevState.ON:
# Already on. Don't complain.
logger.warning("Requested to go to ON state, but am already in ON state.")
return
self.configure_for_on()
self.set_state(DevState.ON)
......@@ -125,7 +130,7 @@ class hardware_device(Device):
self.set_state(DevState.OFF)
@command()
@only_in_states([DevState.ON, DevState.INIT, DevState.STANDBY])
@only_in_states([DevState.ON, DevState.INIT, DevState.STANDBY, DevState.FAULT])
@DebugIt()
@log_exceptions()
def Fault(self):
......@@ -138,6 +143,11 @@ class hardware_device(Device):
:return:None
"""
if self.get_state() == DevState.FAULT:
# Already faulting. Don't complain.
logger.warning("Requested to go to FAULT state, but am already in FAULT state.")
return
self.configure_for_fault()
self.set_state(DevState.FAULT)
......
......@@ -35,7 +35,7 @@ from common.lofar_git import get_version
__all__ = ["PCC", "main"]
@device_logging_to_python({"device": "PCC"})
@device_logging_to_python()
class PCC(hardware_device):
"""
......@@ -251,8 +251,7 @@ def main(args=None, **kwargs):
"""Main function of the PCC module."""
from common.lofar_logging import configure_logger
import logging
configure_logger(logging.getLogger())
configure_logger()
return run((PCC,), args=args, **kwargs)
......
......@@ -35,7 +35,7 @@ import numpy
__all__ = ["SDP", "main"]
@device_logging_to_python({"device": "SDP"})
@device_logging_to_python()
class SDP(hardware_device):
"""
......@@ -94,16 +94,6 @@ class SDP(hardware_device):
FPGA_sdp_info_observation_id_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_observation_id_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sdp_info_station_id_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_station_id_R"], datatype=numpy.uint32, dims=(16,))
FPGA_sdp_info_station_id_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_station_id_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_enable_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_enable_R"], datatype=numpy.bool_, dims=(16,))
FPGA_sst_offload_enable_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_hdr_eth_destination_mac_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_eth_destination_mac_R"], datatype=numpy.str_, dims=(16,))
FPGA_sst_offload_hdr_eth_destination_mac_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_eth_destination_mac_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_R"], datatype=numpy.str_, dims=(16,))
FPGA_sst_offload_hdr_ip_destination_address_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,))
FPGA_sst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_selector_R = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,))
FPGA_sst_offload_selector_RW = attribute_wrapper(comms_annotation=["2:FPGA_sst_offload_selector_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_status_R = attribute_wrapper(comms_annotation=["2:FPGA_status_R"], datatype=numpy.bool_, dims=(16,))
FPGA_temp_R = attribute_wrapper(comms_annotation=["2:FPGA_temp_R"], datatype=numpy.float_, dims=(16,))
FPGA_version_R = attribute_wrapper(comms_annotation=["2:FPGA_version_R"], datatype=numpy.str_, dims=(16,))
......@@ -184,8 +174,7 @@ def main(args=None, **kwargs):
"""Main function of the SDP module."""
from common.lofar_logging import configure_logger
import logging
configure_logger(logging.getLogger())
configure_logger()
return run((SDP,), args=args, **kwargs)
......
......@@ -25,6 +25,7 @@ from tango import AttrWriteType
# Additional import
from clients.sst_client import sst_client, SST_collector
from clients.opcua_connection import OPCUAConnection
from clients.attribute_wrapper import attribute_wrapper
from devices.hardware_device import hardware_device
......@@ -36,14 +37,39 @@ import numpy
__all__ = ["SST", "main"]
@device_logging_to_python({"device": "SST"})
@device_logging_to_python()
class SST(hardware_device):
# -----------------
# Device Properties
# -----------------
SST_Port = device_property(
OPC_Server_Name = device_property(
dtype='DevString',
mandatory=True
)
OPC_Server_Port = device_property(
dtype='DevULong',
mandatory=True
)
OPC_Time_Out = device_property(
dtype='DevDouble',
mandatory=True
)
SST_Client_Name = device_property(
dtype='DevString',
mandatory=True
)
SST_Client_MAC = device_property(
dtype='DevString',
mandatory=True
)
SST_Client_Port = device_property(
dtype='DevUShort',
mandatory=True
)
......@@ -54,34 +80,46 @@ class SST(hardware_device):
version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version())
# FPGA control points for SSTs
FPGA_sst_offload_enable_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_enable_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_enable_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_enable_R"], datatype=numpy.bool_, dims=(16,))
FPGA_sst_offload_hdr_eth_destination_mac_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_eth_destination_mac_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_hdr_eth_destination_mac_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_eth_destination_mac_R"], datatype=numpy.str_, dims=(16,))
FPGA_sst_offload_hdr_ip_destination_address_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_RW"], datatype=numpy.str_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_R"], datatype=numpy.str_, dims=(16,))
FPGA_sst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,))
FPGA_sst_offload_selector_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_selector_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,))
# number of UDP packets that were received
nof_packets_received_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64)
nof_packets_received_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64)
# number of UDP packets that were dropped because we couldn't keep up with processing
nof_packets_dropped_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64)
nof_packets_dropped_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "nof_packets_dropped"}, datatype=numpy.uint64)
# last packet we processed
last_packet_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8)
last_packet_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "last_packet"}, dims=(9000,), datatype=numpy.uint8)
# when last packet was received
last_packet_timestamp_R = attribute_wrapper(comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64)
last_packet_timestamp_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "udp", "parameter": "last_packet_timestamp"}, datatype=numpy.uint64)
# number of UDP packets that were processed
nof_packets_processed_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_packets"}, datatype=numpy.uint64)
nof_packets_processed_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_packets"}, datatype=numpy.uint64)
# queue fill percentage, as reported by the consumer
queue_fill_percentage_R = attribute_wrapper(comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64)
queue_fill_percentage_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "queue", "parameter": "fill_percentage"}, datatype=numpy.uint64)
# number of invalid (non-SST) packets received
nof_invalid_packets_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64)
nof_invalid_packets_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_invalid_packets"}, datatype=numpy.uint64)
# last packet that could not be parsed
last_invalid_packet_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "last_invalid_packet"}, dims=(9000,), datatype=numpy.uint8)
last_invalid_packet_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "last_invalid_packet"}, dims=(9000,), datatype=numpy.uint8)
# number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
nof_valid_payloads_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_valid_payloads"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
# number of packets with invalid payloads
nof_payload_errors_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
nof_payload_errors_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "nof_payload_errors"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
# latest SSTs
sst_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64)
sst_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "sst_values"}, dims=(SST_collector.MAX_SUBBANDS, SST_collector.MAX_INPUTS), datatype=numpy.uint64)
# reported timestamp for each row in the latest SSTs
sst_timestamp_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
sst_timestamp_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "sst_timestamps"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.uint64)
# integration interval for each row in the latest SSTs
integration_interval_R = attribute_wrapper(comms_annotation={"type": "sst", "parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32)
integration_interval_R = attribute_wrapper(comms_id=sst_client, comms_annotation={"type": "sst", "parameter": "integration_intervals"}, dims=(SST_collector.MAX_INPUTS,), datatype=numpy.float32)
# --------
# overloaded functions
......@@ -94,17 +132,27 @@ class SST(hardware_device):
except Exception as e:
self.warn_stream("Exception while stopping sst_client in configure_for_off function: {}. Exception ignored".format(e))
try:
self.opcua_connection.stop()
except Exception as e:
self.warn_stream("Exception while stopping OPC UA connection in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions()
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the statistics device."""
self.sst_client = sst_client("0.0.0.0", self.SST_Port, self.Fault, self)
self.sst_client = sst_client("0.0.0.0", self.SST_Client_Port, self.Fault, self)
self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self)
# map an access helper class
for i in self.attr_list():
try:
if i.comms_id == sst_client:
i.set_comm_client(self.sst_client)
if i.comms_id == OPCUAConnection:
i.set_comm_client(self.OPCUA_client)
except Exception as e:
# use the pass function instead of setting read/write fails
i.set_pass_func()
......@@ -113,6 +161,8 @@ class SST(hardware_device):
self.sst_client.start()
self.OPCua_client.start()
# --------
# Commands
# --------
......@@ -124,8 +174,7 @@ def main(args=None, **kwargs):
"""Main function of the SST Device module."""
from common.lofar_logging import configure_logger
import logging
configure_logger(logging.getLogger())
configure_logger()
return run((SST,), args=args, **kwargs)
......
File moved
......@@ -35,9 +35,11 @@ class TestLofarGit(base.TestCase):
m_is_dirty = mock.Mock()
m_is_dirty.return_value = True
m_head = mock.Mock(is_detached=False)
m_get_repo.return_value = mock.Mock(
active_branch="main", commit=m_commit, tags=[],
is_dirty=m_is_dirty)
is_dirty=m_is_dirty, head=m_head)
# No need for special string equal in Python
self.assertEqual("*main [123456]", lofar_git.get_version())
......@@ -52,12 +54,14 @@ class TestLofarGit(base.TestCase):
m_is_dirty = mock.Mock()
m_is_dirty.return_value = False
m_head = mock.Mock(is_detached=False)
m_tag = mock.Mock(commit="123456")
m_tag.__str__ = mock.Mock(return_value= "version-1.2")
m_get_repo.return_value = mock.Mock(
active_branch="main", commit=m_commit,
tags=[m_tag], is_dirty=m_is_dirty)
tags=[m_tag], is_dirty=m_is_dirty, head=m_head)
self.assertEqual("version-1.2", lofar_git.get_version())
......@@ -71,13 +75,15 @@ class TestLofarGit(base.TestCase):
m_is_dirty = mock.Mock()
m_is_dirty.return_value = False
m_head = mock.Mock(is_detached=False)
m_tag = mock.Mock(commit="123456")
m_tag.__str__ = mock.Mock(return_value= "version-1.2")
# Now m_get_repo is mocked using a decorator
m_get_repo.return_value = mock.Mock(
active_branch="main", commit=m_commit,
tags=[m_tag], is_dirty=m_is_dirty)
tags=[m_tag], is_dirty=m_is_dirty, head=m_head)
self.assertEqual("version-1.2", lofar_git.get_version())
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
import git
from unittest import mock
from common import lofar_logging
import logging
from tango.server import Device
from tango import device_server
from test import base
class TestLofarLogging(base.TestCase):
def setUp(self):
super(TestLofarLogging, self).setUp()
# reset logging system
rootLogger = logging.getLogger()
rootLogger.filters = []
rootLogger.handlers = []
rootLogger.manager.loggerDict = {}
# record everything we log in memory so we can inspect it
class MemoryHandler(logging.Handler):
""" Handler that provides access to the records emitted. """
def __init__(self):
super().__init__()
self.records = []
def emit(self, record):
self.records.append(record)
self.memory_handler = MemoryHandler()
rootLogger.addHandler(self.memory_handler)
def test_configure_logging_basic_usage(self):
""" Test whether configure_logger indeed runs smoothly. """
logger = lofar_logging.configure_logger()
logger.debug("test debug")
logger.info("test info")
logger.warning("test warning")
logger.error("test error")
logger.fatal("test fatal")
def test_configure_logging_log_annotation(self):
""" Test whether log records get annotated after using configure_logger(). """
logger = lofar_logging.configure_logger()
logger.info("test")
self.assertIn("tango_device", self.memory_handler.records[0].__dict__)
self.assertIn("software_version", self.memory_handler.records[0].__dict__)
def test_configure_logging_uses_tango_device(self):
""" Test whether log records get annotated with the active Tango device after using configure_logger(), and whether logs get forwarded to it. """
logger = lofar_logging.configure_logger()
# create a Tango Device that logs something
class MyDevice(Device):
def __init__(self):
self.log_deeper_in_stack()
def log_deeper_in_stack(self):
logger.info("test")
with mock.patch.object(device_server.DeviceImpl, '__info_stream') as m_info_stream:
# logs in the constructor already
mydevice = MyDevice()
self.assertEqual(mydevice, self.memory_handler.records[0].tango_device, msg="configure_logging did not detect active Tango device")
self.assertEqual(1, m_info_stream.call_count, msg="configure_logger did not send logs to active Tango device")
def test_log_exceptions(self):
""" Test whether log_exceptions actually logs and reraises exceptions. """
class Foo:
@lofar_logging.log_exceptions()
def exceptionalFunction(self):
raise Exception("test")
with self.assertRaises(Exception, msg="log_exceptions did not reraise exception"):
f = Foo()
f.exceptionalFunction()
self.assertEqual(1, len(self.memory_handler.records), msg="log_exceptions did not log exception")
......@@ -24,9 +24,13 @@ commands =
; doc8 doc/source/ README.rst
flake8
[testenv:bandit]
[testenv:bandit];
; B104: hardcoded_bind_all_interfaces
; - We disable this warning as Docker serves as our firewall.
; It thus matters what interfaces Docker will bind our
; containers to, not what our containers listen on.
commands =
bandit -r devices/ clients/ common/ examples/ util/ -n5 -ll
bandit -r devices/ clients/ common/ examples/ util/ -n5 -ll -s B104
[flake8]
filename = *.py,.stestr.conf,.txt
......
......@@ -8,6 +8,8 @@ ARG CONTAINER_EXECUTION_UID=1000
RUN sudo pip3 install jupyter
RUN sudo pip3 install ipykernel
RUN sudo pip3 install jupyter_bokeh
# Install matplotlib, jupyterplot
RUN sudo pip3 install matplotlib jupyterplot
# Configure jupyter_bokeh
RUN sudo mkdir -p /usr/share/jupyter /usr/etc
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment