Skip to content
Snippets Groups Projects
Commit ec26aeeb authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-340: Merged master into branch, resolving conflicts due to the earlier...

L2SS-340: Merged master into branch, resolving conflicts due to the earlier split of StatisticsCollector into StatisticsCollector and StatisticsConsumer
parents 9e71a065 aad540dc
Branches
Tags
1 merge request!117create TCPReplicator for StatisticsClient
Showing
with 1351 additions and 138 deletions
...@@ -15,17 +15,30 @@ stages: ...@@ -15,17 +15,30 @@ stages:
- static-analysis - static-analysis
- unit-tests - unit-tests
- integration-tests - integration-tests
linting: newline_at_eof:
stage: linting
before_script:
- pip3 install -r devices/test-requirements.txt
script:
- flake8 --filename *.sh,*.conf,*.md,*.yml --select=W292 --exclude .tox,.egg-info,docker
python_linting:
stage: linting stage: linting
script: script:
- cd devices - cd devices
- tox -e pep8 - tox -e pep8
static-analysis: bandit:
stage: static-analysis stage: static-analysis
allow_failure: true
script: script:
- cd devices - cd devices
- tox -e bandit - tox -e bandit
shellcheck:
stage: static-analysis
allow_failure: true
before_script:
- sudo apt-get update
- sudo apt-get install -y shellcheck
script:
- shellcheck **/*.sh
unit_test: unit_test:
stage: unit-tests stage: unit-tests
before_script: before_script:
......
[submodule "docker-compose/tango-prometheus-exporter/ska-tango-grafana-exporter"]
path = docker-compose/tango-prometheus-exporter/ska-tango-grafana-exporter
url = https://git.astron.nl/lofar2.0/ska-tango-grafana-exporter.git
branch = station-control
...@@ -816,6 +816,25 @@ ...@@ -816,6 +816,25 @@
} }
} }
}, },
"UNB2": {
"LTS": {
"UNB2": {
"LTS/UNB2/1": {
"properties": {
"OPC_Server_Name": [
"despi.astron.nl"
],
"OPC_Server_Port": [
"4842"
],
"OPC_Time_Out": [
"5.0"
]
}
}
}
}
},
"StatsCrosslet": { "StatsCrosslet": {
"CS997": { "CS997": {
"StatsCrosslet": { "StatsCrosslet": {
......
This diff is collapsed.
...@@ -18,7 +18,6 @@ numpy_to_OPCua_dict = { ...@@ -18,7 +18,6 @@ numpy_to_OPCua_dict = {
numpy.uint32: opcua.ua.VariantType.UInt32, numpy.uint32: opcua.ua.VariantType.UInt32,
numpy.int64: opcua.ua.VariantType.Int64, numpy.int64: opcua.ua.VariantType.Int64,
numpy.uint64: opcua.ua.VariantType.UInt64, numpy.uint64: opcua.ua.VariantType.UInt64,
numpy.datetime_data: opcua.ua.VariantType.DateTime, # is this the right type, does it even matter?
numpy.float32: opcua.ua.VariantType.Float, numpy.float32: opcua.ua.VariantType.Float,
numpy.double: opcua.ua.VariantType.Double, numpy.double: opcua.ua.VariantType.Double,
numpy.float64: opcua.ua.VariantType.Double, numpy.float64: opcua.ua.VariantType.Double,
...@@ -59,9 +58,8 @@ class OPCUAConnection(CommClient): ...@@ -59,9 +58,8 @@ class OPCUAConnection(CommClient):
self.name_space_index = namespace self.name_space_index = namespace
except Exception as e: except Exception as e:
#TODO remove once SDP is fixed self.streams.error_stream("Could not determine namespace index from namespace: %s: %s", namespace, e)
self.streams.warn_stream("Cannot determine the OPC-UA name space index. Will try and use the default = 2.") raise Exception("Could not determine namespace index from namespace %s", namespace) from e
self.name_space_index = 2
self.obj = self.client.get_objects_node() self.obj = self.client.get_objects_node()
self.check_nodes() self.check_nodes()
......
...@@ -6,6 +6,8 @@ from .comms_client import CommClient ...@@ -6,6 +6,8 @@ from .comms_client import CommClient
from .tcp_replicator import TCPReplicator from .tcp_replicator import TCPReplicator
from .udp_receiver import UDPReceiver from .udp_receiver import UDPReceiver
from devices.sdp.statistics_collector import StatisticsConsumer
logger = logging.getLogger() logger = logging.getLogger()
...@@ -18,11 +20,11 @@ class StatisticsClient(CommClient): ...@@ -18,11 +20,11 @@ class StatisticsClient(CommClient):
def start(self): def start(self):
super().start() super().start()
def __init__(self, statistics_collector_class, udp_options, tcp_options, fault_func, streams, try_interval=2, queuesize=1024): def __init__(self, collector, udp_options, tcp_options, fault_func, streams, try_interval=2, queuesize=1024):
""" """
Create the statistics client and connect() to it and get the object node. Create the statistics client and connect() to it and get the object node.
statistics_collector_class: a subclass of StatisticsCollector that specialises in processing the received packets. collector: a subclass of StatisticsCollector that specialises in processing the received packets.
host: hostname to listen on host: hostname to listen on
port: port number to listen on port: port number to listen on
""" """
...@@ -30,7 +32,7 @@ class StatisticsClient(CommClient): ...@@ -30,7 +32,7 @@ class StatisticsClient(CommClient):
self.udp_options = udp_options self.udp_options = udp_options
self.tcp_options = tcp_options self.tcp_options = tcp_options
self.queuesize = queuesize self.queuesize = queuesize
self.statistics_collector_class = statistics_collector_class self.collector = collector
super().__init__(fault_func, streams, try_interval) super().__init__(fault_func, streams, try_interval)
...@@ -60,7 +62,7 @@ class StatisticsClient(CommClient): ...@@ -60,7 +62,7 @@ class StatisticsClient(CommClient):
self.udp_options) self.udp_options)
self.tcp = TCPReplicator(self.replicator_queue, self.tcp_options) self.tcp = TCPReplicator(self.replicator_queue, self.tcp_options)
self.statistics = self.statistics_collector_class(self.collector_queue) self.statistics = StatisticsConsumer(self.collector_queue, self.collector)
return super().connect() return super().connect()
...@@ -79,14 +81,13 @@ class StatisticsClient(CommClient): ...@@ -79,14 +81,13 @@ class StatisticsClient(CommClient):
try: try:
self.statistics.disconnect() self.statistics.disconnect()
except Exception: except Exception:
# nothing we can do, but we should continue cleaning up logger.exception("Could not disconnect statistics processing class")
logger.log_exception("Could not disconnect statistics processing class")
try: try:
self.udp.disconnect() self.udp.disconnect()
except Exception: except Exception:
# nothing we can do, but we should continue cleaning up # nothing we can do, but we should continue cleaning up
logger.log_exception("Could not disconnect UDP receiver class") logger.exception("Could not disconnect UDP receiver class")
try: try:
self.tcp.disconnect() self.tcp.disconnect()
...@@ -121,7 +122,7 @@ class StatisticsClient(CommClient): ...@@ -121,7 +122,7 @@ class StatisticsClient(CommClient):
# redirect to right object. this works as long as the parameter names are unique among them. # redirect to right object. this works as long as the parameter names are unique among them.
if annotation["type"] == "statistics": if annotation["type"] == "statistics":
def read_function(): def read_function():
return self.statistics.parameters[parameter] return self.collector.parameters[parameter]
elif annotation["type"] == "udp": elif annotation["type"] == "udp":
def read_function(): def read_function():
return self.udp.parameters[parameter] return self.udp.parameters[parameter]
......
...@@ -86,6 +86,12 @@ class SDP(hardware_device): ...@@ -86,6 +86,12 @@ class SDP(hardware_device):
mandatory=True mandatory=True
) )
FPGA_subband_weights_RW_default = device_property(
dtype='DevVarULongArray',
mandatory=False,
default_value=[[8192] * 12 * 512] * 16
)
# ---------- # ----------
# Attributes # Attributes
# ---------- # ----------
...@@ -102,15 +108,16 @@ class SDP(hardware_device): ...@@ -102,15 +108,16 @@ class SDP(hardware_device):
FPGA_sdp_info_antenna_band_index_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_antenna_band_index_R"], datatype=numpy.uint32, dims=(16,)) FPGA_sdp_info_antenna_band_index_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_antenna_band_index_R"], datatype=numpy.uint32, dims=(16,))
FPGA_sdp_info_block_period_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_block_period_R"], datatype=numpy.uint32, dims=(16,)) FPGA_sdp_info_block_period_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_block_period_R"], datatype=numpy.uint32, dims=(16,))
FPGA_sdp_info_f_adc_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_f_adc_R"], datatype=numpy.uint32, dims=(16,)) FPGA_sdp_info_f_adc_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_f_adc_R"], datatype=numpy.uint32, dims=(16,))
FPGA_sdp_info_f_sub_type_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_f_sub_type_R"], datatype=numpy.uint32, dims=(16,)) FPGA_sdp_info_fsub_type_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_fsub_type_R"], datatype=numpy.uint32, dims=(16,))
FPGA_sdp_info_nyquist_sampling_zone_index_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_nyquist_sampling_zone_index_R"], datatype=numpy.uint32, dims=(16,)) FPGA_sdp_info_nyquist_sampling_zone_index_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_nyquist_sampling_zone_index_R"], datatype=numpy.uint32, dims=(16,))
FPGA_sdp_info_nyquist_sampling_zone_index_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_nyquist_sampling_zone_index_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_sdp_info_nyquist_sampling_zone_index_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_nyquist_sampling_zone_index_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sdp_info_observation_id_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_observation_id_R"], datatype=numpy.uint32, dims=(16,)) FPGA_sdp_info_observation_id_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_observation_id_R"], datatype=numpy.uint32, dims=(16,))
FPGA_sdp_info_observation_id_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_observation_id_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_sdp_info_observation_id_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_observation_id_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sdp_info_station_id_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_station_id_R"], datatype=numpy.uint32, dims=(16,)) FPGA_sdp_info_station_id_R = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_station_id_R"], datatype=numpy.uint32, dims=(16,))
FPGA_sdp_info_station_id_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_station_id_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_sdp_info_station_id_RW = attribute_wrapper(comms_annotation=["2:FPGA_sdp_info_station_id_RW"], datatype=numpy.uint32, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_subband_weights_R = attribute_wrapper(comms_annotation=["2:FPGA_subband_weights_R"], datatype=numpy.uint32, dims=(12 * 512, 16))
FPGA_subband_weights_RW = attribute_wrapper(comms_annotation=["2:FPGA_subband_weights_R"], datatype=numpy.uint32, dims=(12 * 512, 16))
FPGA_temp_R = attribute_wrapper(comms_annotation=["2:FPGA_temp_R"], datatype=numpy.float_, dims=(16,)) FPGA_temp_R = attribute_wrapper(comms_annotation=["2:FPGA_temp_R"], datatype=numpy.float_, dims=(16,))
FPGA_version_R = attribute_wrapper(comms_annotation=["2:FPGA_version_R"], datatype=numpy.str_, dims=(16,))
FPGA_weights_R = attribute_wrapper(comms_annotation=["2:FPGA_weights_R"], datatype=numpy.int16, dims=(12 * 488 * 2, 16)) FPGA_weights_R = attribute_wrapper(comms_annotation=["2:FPGA_weights_R"], datatype=numpy.int16, dims=(12 * 488 * 2, 16))
FPGA_weights_RW = attribute_wrapper(comms_annotation=["2:FPGA_weights_RW"], datatype=numpy.int16, dims=(12 * 488 * 2, 16), access=AttrWriteType.READ_WRITE) FPGA_weights_RW = attribute_wrapper(comms_annotation=["2:FPGA_weights_RW"], datatype=numpy.int16, dims=(12 * 488 * 2, 16), access=AttrWriteType.READ_WRITE)
FPGA_wg_amplitude_R = attribute_wrapper(comms_annotation=["2:FPGA_wg_amplitude_R"], datatype=numpy.float_, dims=(12, 16)) FPGA_wg_amplitude_R = attribute_wrapper(comms_annotation=["2:FPGA_wg_amplitude_R"], datatype=numpy.float_, dims=(12, 16))
......
...@@ -57,6 +57,12 @@ class SST(Statistics): ...@@ -57,6 +57,12 @@ class SST(Statistics):
mandatory=True mandatory=True
) )
FPGA_sst_offload_weighted_subbands_RW_default = device_property(
dtype='DevVarBooleanArray',
mandatory=False,
default_value=[True] * 16
)
# ---------- # ----------
# Attributes # Attributes
# ---------- # ----------
...@@ -70,8 +76,8 @@ class SST(Statistics): ...@@ -70,8 +76,8 @@ class SST(Statistics):
FPGA_sst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_R"], datatype=numpy.str_, dims=(16,)) FPGA_sst_offload_hdr_ip_destination_address_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_ip_destination_address_R"], datatype=numpy.str_, dims=(16,))
FPGA_sst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_sst_offload_hdr_udp_destination_port_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_RW"], datatype=numpy.uint16, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,)) FPGA_sst_offload_hdr_udp_destination_port_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_hdr_udp_destination_port_R"], datatype=numpy.uint16, dims=(16,))
FPGA_sst_offload_selector_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) FPGA_sst_offload_weighted_subbands_RW = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_weighted_subbands_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE)
FPGA_sst_offload_selector_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_selector_R"], datatype=numpy.bool_, dims=(16,)) FPGA_sst_offload_weighted_subbands_R = attribute_wrapper(comms_id=OPCUAConnection, comms_annotation=["2:FPGA_sst_offload_weighted_subbands_R"], datatype=numpy.bool_, dims=(16,))
# number of packets with valid payloads # number of packets with valid payloads
nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64) nof_valid_payloads_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "statistics", "parameter": "nof_valid_payloads"}, dims=(SSTCollector.MAX_INPUTS,), datatype=numpy.uint64)
......
...@@ -136,7 +136,8 @@ class Statistics(hardware_device, metaclass=ABCMeta): ...@@ -136,7 +136,8 @@ class Statistics(hardware_device, metaclass=ABCMeta):
# tcp_host has default value # tcp_host has default value
} }
self.statistics_client = StatisticsClient(self.STATISTICS_COLLECTOR_CLASS, udp_options, tcp_options, self.Fault, self) self.statistics_collector = self.STATISTICS_COLLECTOR_CLASS()
self.statistics_client = StatisticsClient(self.statistics_collector, udp_options, tcp_options, self.Fault, self)
self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self)
......
...@@ -8,9 +8,8 @@ from clients.statistics_client_thread import StatisticsClientThread ...@@ -8,9 +8,8 @@ from clients.statistics_client_thread import StatisticsClientThread
logger = logging.getLogger() logger = logging.getLogger()
class StatisticsCollector:
class StatisticsCollector(Thread, StatisticsClientThread): """ Base class to process statistics packets into parameters matrices. """
""" Base class to process statistics packets from a queue, asynchronously. """
# Maximum number of antenna inputs we support (used to determine array sizes) # Maximum number of antenna inputs we support (used to determine array sizes)
MAX_INPUTS = 192 MAX_INPUTS = 192
...@@ -18,22 +17,9 @@ class StatisticsCollector(Thread, StatisticsClientThread): ...@@ -18,22 +17,9 @@ class StatisticsCollector(Thread, StatisticsClientThread):
# Maximum number of subbands we support (used to determine array sizes) # Maximum number of subbands we support (used to determine array sizes)
MAX_SUBBANDS = 512 MAX_SUBBANDS = 512
# No default options required, for now? def __init__(self):
_default_options = {}
def __init__(self, queue: Queue):
self.queue = queue
self.last_packet = None
self.parameters = self._default_parameters() self.parameters = self._default_parameters()
super().__init__()
self.start()
@property
def _options(self) -> dict:
return StatisticsCollector._default_options
def _default_parameters(self): def _default_parameters(self):
return { return {
"nof_packets": numpy.uint64(0), "nof_packets": numpy.uint64(0),
...@@ -45,50 +31,18 @@ class StatisticsCollector(Thread, StatisticsClientThread): ...@@ -45,50 +31,18 @@ class StatisticsCollector(Thread, StatisticsClientThread):
"last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8), "last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8),
} }
def run(self): def process_packet(self, packet):
logger.info("Starting statistics thread")
while True:
self.last_packet = self.queue.get()
# This is the exception/slow path, but python doesn't allow us to optimise that
if self.last_packet is None:
# None is the magic marker to stop processing
break
self.parameters["nof_packets"] += numpy.uint64(1) self.parameters["nof_packets"] += numpy.uint64(1)
try: try:
self.process_packet(self.last_packet) self.parse_packet(packet)
except Exception as e: except Exception as e:
logger.exception("Could not parse statistics UDP packet") self.parameters["last_invalid_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8)
self.parameters["last_invalid_packet"] = numpy.frombuffer(self.last_packet, dtype=numpy.uint8)
self.parameters["nof_invalid_packets"] += numpy.uint64(1) self.parameters["nof_invalid_packets"] += numpy.uint64(1)
logger.info("Stopped statistics thread") raise ValueError("Could not parse statistics packet") from e
def join(self, timeout=0):
# insert magic marker
self.queue.put(None)
logger.info("Sent shutdown to statistics thread")
super().join(timeout) def parse_packet(self, packet):
def disconnect(self):
# TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver
# and StatisticsCollector.
if not self.is_alive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.DISCONNECT_TIMEOUT)
if self.is_alive():
# there is nothing we can do except wait (stall) longer, which could be indefinitely.
logger.error(f"Statistics thread did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.")
def process_packet(self, packet):
""" Update any information based on this packet. """ """ Update any information based on this packet. """
raise NotImplementedError raise NotImplementedError
...@@ -121,7 +75,7 @@ class SSTCollector(StatisticsCollector): ...@@ -121,7 +75,7 @@ class SSTCollector(StatisticsCollector):
return defaults return defaults
def process_packet(self, packet): def parse_packet(self, packet):
fields = SSTPacket(packet) fields = SSTPacket(packet)
# determine which input this packet contains data for # determine which input this packet contains data for
...@@ -143,3 +97,66 @@ class SSTCollector(StatisticsCollector): ...@@ -143,3 +97,66 @@ class SSTCollector(StatisticsCollector):
self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload
self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp())
self.parameters["integration_intervals"][input_index] = fields.integration_interval() self.parameters["integration_intervals"][input_index] = fields.integration_interval()
class StatisticsConsumer(Thread, StatisticsClientThread):
""" Base class to process statistics packets from a queue, asynchronously. """
# Maximum time to wait for the Thread to get unstuck, if we want to stop
DISCONNECT_TIMEOUT = 10.0
# No default options required, for now?
_default_options = {}
def __init__(self, queue: Queue, collector: StatisticsCollector):
self.queue = queue
self.collector = collector
self.last_packet = None
super().__init__()
self.start()
@property
def _options(self) -> dict:
return StatisticsConsumer._default_options
def run(self):
logger.info("Starting statistics thread")
while True:
self.last_packet = self.queue.get()
# This is the exception/slow path, but python doesn't allow us to optimise that
if self.last_packet is None:
# None is the magic marker to stop processing
break
try:
self.collector.process_packet(self.last_packet)
except ValueError as e:
logger.exception("Could not parse statistics packet")
# continue processing
logger.info("Stopped statistics thread")
def join(self, timeout=0):
# insert magic marker
self.queue.put(None)
logger.info("Sent shutdown to statistics thread")
super().join(timeout)
def disconnect(self):
# TODO(Corne): Prevent duplicate code across TCPReplicator, UDPReceiver
# and StatisticsConsumer.
if not self.is_alive():
return
# try to get the thread shutdown, but don't stall forever
self.join(self.DISCONNECT_TIMEOUT)
if self.is_alive():
# there is nothing we can do except wait (stall) longer, which could be indefinitely.
logger.error(f"Statistics thread did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.")
...@@ -32,10 +32,10 @@ from common.lofar_git import get_version ...@@ -32,10 +32,10 @@ from common.lofar_git import get_version
import numpy import numpy
__all__ = ["APSCTL", "main"] __all__ = ["UNB2", "main"]
@device_logging_to_python() @device_logging_to_python()
class APSCTL(hardware_device): class UNB2(hardware_device):
""" """
**Properties:** **Properties:**
...@@ -79,63 +79,90 @@ class APSCTL(hardware_device): ...@@ -79,63 +79,90 @@ class APSCTL(hardware_device):
N_ddr = 2 N_ddr = 2
N_qsfp = 6 N_qsfp = 6
# Central CP per Uniboard ### All CP/MP are in order of appearance in the ICD
UNB2_FPGA_DDR4_SLOT_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_DDR4_SLOT_TEMP_R"], datatype=numpy.double, dims=((N_unb * N_ddr), N_fpga)) ### Central CP per Uniboard
UNB2_I2C_bus_QSFP_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_QSFP_STATUS_R"], datatype=numpy.int64, dims=((N_unb * N_fpga), N_qsfp))
UNB2_I2C_bus_DDR4_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_DDR4_STATUS_R"], datatype=numpy.int64, dims=(N_ddr, N_fpga))
UNB2_I2C_bus_FPGA_PS_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_FPGA_PS_STATUS_R"], datatype=numpy.int64, dims=(N_unb * N_fpga,))
UNB2_translator_busy_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_translator_busy_R"], datatype=numpy.bool_)
### Some points are not working yet on the UNB2 or under discussion
#XXX means Not working yet, but they are working on it
##XXX Means Under discussion
# Special case for the on off switch: instead of UNB2_Power_ON_OFF_R we use UNB2_POL_FPGA_CORE_VOUT_R as the MP
UNB2_Power_ON_OFF_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_Power_ON_OFF_RW"], datatype=numpy.bool_, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
UNB2_Front_Panel_LED_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_Front_Panel_LED_RW"], datatype=numpy.uint8, dims=(N_unb,), access=AttrWriteType.READ_WRITE) UNB2_Front_Panel_LED_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_Front_Panel_LED_RW"], datatype=numpy.uint8, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
UNB2_Front_Panel_LED_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_Front_Panel_LED_R"], datatype=numpy.uint8, dims=(N_unb,)) UNB2_Front_Panel_LED_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_Front_Panel_LED_R"], datatype=numpy.uint8, dims=(N_unb,))
UNB2_EEPROM_Serial_Number_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_EEPROM_Serial_Number_R"], datatype=numpy.str, dims=(N_unb,)) UNB2_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_mask_RW"], datatype=numpy.bool_, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
# Not yet deployed
#UNB2_mask_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_mask_R"], datatype=numpy.bool_, dims=(N_unb,))
### Central MP per Uniboard
# These three are only available in UNB2c
UNB2_I2C_bus_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_STATUS_R"], datatype=numpy.bool_, dims=(N_unb,))
##UNB2_I2C_bus_STATUS_R will probably be renamed to UNB2_I2C_bus_OK_R
##UNB2_I2C_bus_OK_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_OK_R"], datatype=numpy.bool_, dims=(N_unb,))
#UNB2_EEPROM_Serial_Number_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_EEPROM_Serial_Number_R"], datatype=numpy.str, dims=(N_unb,))
UNB2_EEPROM_Unique_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_EEPROM_Unique_ID_R"], datatype=numpy.uint32, dims=(N_unb,)) UNB2_EEPROM_Unique_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_EEPROM_Unique_ID_R"], datatype=numpy.uint32, dims=(N_unb,))
UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R"], datatype=numpy.str, dims=(N_unb * N_qsfp, N_fpga)) UNB2_DC_DC_48V_12V_VIN_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_VIN_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_monitor_rate_RW"], datatype=numpy.double, dims=(N_unb,), access=AttrWriteType.READ_WRITE) UNB2_DC_DC_48V_12V_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_I2C_bus_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_STATUS_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_DC_DC_48V_12V_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_I2C_bus_PS_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_PS_STATUS_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_DC_DC_48V_12V_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_mask_RW"], datatype=numpy.double, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
UNB2_Power_ON_OFF_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_Power_ON_OFF_R"], datatype=numpy.double, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
UNB2_FPGA_QSFP_CAGE_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_TEMP_R"], datatype=numpy.double, dims=(N_unb * N_qsfp,N_fpga))
UNB2_FPGA_QSFP_CAGE_LOS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_LOS_R"], datatype=numpy.uint8, dims=(N_unb * N_qsfp,N_fpga))
UNB2_FPGA_POL_HGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_HGXB_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_HGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_HGXB_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_HGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_HGXB_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_PGM_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_PGM_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_PGM_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_PGM_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_PGM_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_PGM_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_RXGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_RXGXB_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_RXGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_RXGXB_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_RXGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_RXGXB_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_TXGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_TXGXB_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_TXGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_TXGXB_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_POL_FPGA_TXGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_FPGA_TXGXB_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_POL_FPGA_CORE_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_FPGA_CORE_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_CORE_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_CORE_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_CORE_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_CORE_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_ERAM_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_ERAM_VOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_ERAM_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_ERAM_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_ERAM_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_ERAM_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_POL_CLOCK_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_CLOCK_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_CLOCK_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_CLOCK_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_CLOCK_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_CLOCK_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_1V2_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_1V2_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_1V2_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_1V2_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_1V2_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_1V2_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_PHY_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_PHY_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_PHY_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_PHY_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_PHY_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_PHY_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N01_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N01_VOUT_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_QSFP_N01_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N01_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N01_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N01_IOUT_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_QSFP_N01_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N01_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N01_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N01_TEMP_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_QSFP_N01_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N01_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N23_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N23_VOUT_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_QSFP_N23_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N23_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N23_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N23_IOUT_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_QSFP_N23_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N23_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_QSFP_N23_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N23_TEMP_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_QSFP_N23_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_QSFP_N23_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_DC_DC_48V_12V_VIN_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_VIN_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_SWITCH_1V2_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_1V2_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_DC_DC_48V_12V_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_VOUT_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_SWITCH_1V2_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_1V2_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_DC_DC_48V_12V_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_IOUT_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_SWITCH_1V2_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_1V2_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_DC_DC_48V_12V_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_DC_DC_48V_12V_TEMP_R"], datatype=numpy.double, dims=(N_unb,)) UNB2_POL_SWITCH_PHY_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_PHY_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_PHY_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_PHY_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_SWITCH_PHY_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_SWITCH_PHY_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_CLOCK_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_CLOCK_VOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_CLOCK_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_CLOCK_IOUT_R"], datatype=numpy.double, dims=(N_unb,))
UNB2_POL_CLOCK_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_CLOCK_TEMP_R"], datatype=numpy.double, dims=(N_unb,))
### Local MP per FPGA
UNB2_FPGA_DDR4_SLOT_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_DDR4_SLOT_TEMP_R"], datatype=numpy.double, dims=((N_fpga * N_ddr), N_unb))
#UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_DDR4_SLOT_PART_NUMBER_R"], datatype=numpy.str, dims=(N_fpga * N_ddr), N_unb))
#UNB2_FPGA_QSFP_CAGE_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_0_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_1_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_1_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_2_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_2_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_3_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_3_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_4_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_4_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_5_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_5_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_LOS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_0_LOS_R"], datatype=numpy.uint8, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_1_LOS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_1_LOS_R"], datatype=numpy.uint8, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_2_LOS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_2_LOS_R"], datatype=numpy.uint8, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_3_LOS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_3_LOS_R"], datatype=numpy.uint8, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_4_LOS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_4_LOS_R"], datatype=numpy.uint8, dims=(N_fpga, N_unb))
#UNB2_FPGA_QSFP_CAGE_5_LOS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_QSFP_CAGE_5_LOS_R"], datatype=numpy.uint8, dims=(N_fpga, N_unb))
#UNB2_FPGA_POL_CORE_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_FPGA_CORE_VOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_CORE_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_CORE_IOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_CORE_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_CORE_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_ERAM_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_ERAM_VOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_ERAM_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_ERAM_IOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_ERAM_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_ERAM_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_RXGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_RXGXB_VOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_RXGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_RXGXB_IOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_RXGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_RXGXB_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_TXGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_TXGXB_VOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_TXGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_TXGXB_IOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
#UNB2_FPGA_POL_TXGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_POL_FPGA_TXGXB_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_HGXB_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_HGXB_VOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_HGXB_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_HGXB_IOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_HGXB_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_HGXB_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_PGM_VOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_PGM_VOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_PGM_IOUT_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_PGM_IOUT_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
UNB2_FPGA_POL_PGM_TEMP_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_FPGA_POL_PGM_TEMP_R"], datatype=numpy.double, dims=(N_fpga, N_unb))
##UNB2_I2C_bus_QSFP_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_QSFP_STATUS_R"], datatype=numpy.int64, dims=((N_unb * N_fpga), N_qsfp))
##UNB2_I2C_bus_DDR4_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_DDR4_STATUS_R"], datatype=numpy.int64, dims=(N_ddr, N_fpga))
##UNB2_I2C_bus_FPGA_PS_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_FPGA_PS_STATUS_R"], datatype=numpy.int64, dims=(N_unb * N_fpga,))
##UNB2_I2C_bus_PS_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_I2C_bus_PS_STATUS_R"], datatype=numpy.double, dims=(N_unb,))
##UNB2_translator_busy_R = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_translator_busy_R"], datatype=numpy.bool_)
##UNB2_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:UNB2_monitor_rate_RW"], datatype=numpy.double, dims=(N_unb,), access=AttrWriteType.READ_WRITE)
# QualifiedName(2: UNB2_on) # QualifiedName(2: UNB2_on)
...@@ -180,7 +207,7 @@ class APSCTL(hardware_device): ...@@ -180,7 +207,7 @@ class APSCTL(hardware_device):
except Exception as e: except Exception as e:
# use the pass function instead of setting read/write fails # use the pass function instead of setting read/write fails
i.set_pass_func() i.set_pass_func()
self.warn_stream("error while setting the APSCTL attribute {} read/write function. {}".format(i, e)) self.warn_stream("error while setting the UNB2 attribute {} read/write function. {}".format(i, e))
self.OPCua_client.start() self.OPCua_client.start()
...@@ -192,12 +219,12 @@ class APSCTL(hardware_device): ...@@ -192,12 +219,12 @@ class APSCTL(hardware_device):
# Run server # Run server
# ---------- # ----------
def main(args=None, **kwargs): def main(args=None, **kwargs):
"""Main function of the SDP module.""" """Main function of the UNB2 module."""
from devices.common.lofar_logging import configure_logger from common.lofar_logging import configure_logger
configure_logger() configure_logger()
return run((APSCTL,), args=args, **kwargs) return run((UNB2,), args=args, **kwargs)
if __name__ == '__main__': if __name__ == '__main__':
......
[metadata] [metadata]
name = TangoStationControl name = TangoStationControl
summary = LOFAR 2.0 Station Control summary = LOFAR 2.0 Station Control
description-file = description_file =
README.md README.md
description-content-type = text/x-rst; charset=UTF-8 description_content_type = text/x-rst; charset=UTF-8
author = ASTRON author = ASTRON
home-page = https://astron.nl home_page = https://astron.nl
project_urls = project_urls =
Bug Tracker = https://support.astron.nl/jira/projects/L2SS/issues/ Bug Tracker = https://support.astron.nl/jira/projects/L2SS/issues/
Source Code = https://git.astron.nl/lofar2.0/tango Source Code = https://git.astron.nl/lofar2.0/tango
......
# TCP to HDF5 statistics writer
The TCP to HDF5 statistics writer can be started with `tcp_hdf5_writer.py` This script imports
`tcp_receiver.py` and `statistics_writer.py`. `tcp_receiver.py` only takes care of receiving packets.
`statistics_writer.py` takes the receive function from the tcp_receiver and uses it to obtain packets.
Any function that can deliver statistics packets can be used by this code.
`statistics_writer.py` takes care of processing the packets it receives, filling statistics matrices
and writing those matrices (as well as a bunch of metadata) to hdf5.
### TCP Statistics writer
The TCP statistics writer can be called with the `tcp_hdf5_writer.py` script.
This script can be called with the following arguments:
```
--address the address to connect to
--port the port to use
--interval The time between creating new files in hours
--location specifies the folder to write all the files
--mode sets the statistics type to be decoded options: "SST", "XST", "BST"
--debug takes no arguments, when used prints a lot of extra data to help with debugging
```
##HFD5 structure
Statistics packets are collected by the StatisticsCollector in to a matrix. Once the matrix is done or a newer
timestamp arrives this matrix along with the header of first packet header, nof_payload_errors and nof_valid_payloads.
The file will be named after the mode it is in and the timestamp of the statistics packets. For example: `SST_1970-01-01-00-00-00.h5`.
```
File
|
|------ {mode_timestamp} |- {statistics matrix}
| |- {first packet header}
| |- {nof_valid_payloads}
| |- {nof_payload_errors}
|
|------ {mode_timestamp} |- {statistics matrix}
| |- {first packet header}
| |- {nof_valid_payloads}
| |- {nof_payload_errors}
|
...
```
###explorer
There is an hdf5 explorer that will walk through specified hdf5 files.
Its called `hdf5_explorer.py` and can be called with a `--file` argument
ex: `python3 hdf5_explorer.py --file data/SST_1970-01-01-00-00-00.h5` This allows for easy manual checking
of the structure and content of hdf5 files. useful for testing and debugging.
Can also be used as example of how to read the HDF5 statistics data files.
Provides a number of example functions inside that go through the file in various ways.
###test server
There is a test server that will continuously send out the same statistics packet.
Its called `test_server.py`. Takes `--host`, `--port` and `--file` as optional input arguments.
Defaults to address `'127.0.0.1'`, port `65433` and file `devices_test_SDP_SST_statistics_packets.bin`
File suppressed by a .gitattributes entry, the file's encoding is unsupported, or the file size exceeds the limit.
File suppressed by a .gitattributes entry, the file's encoding is unsupported, or the file size exceeds the limit.
# imports for working with datetime objects
from datetime import datetime, timedelta
import pytz
# python hdf5
import h5py
import numpy
import json
import logging
# import statistics classes with workaround
import sys
sys.path.append("..")
from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, StatisticsPacket
import devices.sdp.statistics_collector as statistics_collector
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("statistics_writer")
__all__ = ["statistics_writer"]
class statistics_writer:
def __init__(self, new_file_time_interval, file_location, statistics_mode):
# all variables that deal with the SST matrix that's currently being decoded
self.current_matrix = None
self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC)
# the header of the first packet of a new matrix is written as metadata.
# Assumes all subsequent headers of the same matrix are identical (minus index)
self.statistics_header = None
# file handing
self.file_location = file_location
self.new_file_time_interval = timedelta(hours=new_file_time_interval)
self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC)
self.file = None
# config the writer for the correct statistics type
self.collector = None
self.decoder = None
self.mode = statistics_mode.upper()
self.config_mode()
def next_packet(self, packet):
"""
All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets.
As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple
packets as well as storing matrices and starting new ones
The code receives new packets and checks the statistics timestamp of them. If the timestamp is higher than the current timestamp
it will close the current matrix, store it and start a new one.
"""
# process the packet
statistics_packet = self.decoder(packet)
# grab the timestamp
statistics_timestamp = statistics_packet.timestamp()
# check if te statistics timestamp is unexpectedly older than the current one
if statistics_timestamp < self.current_timestamp:
logger.warning(f"Received statistics with earlier timestamp than is currently being processed ({statistics_timestamp}). Packet dropped.")
return
# if this statistics packet has a new timestamp it means we need to start a new matrix
if statistics_timestamp > self.current_timestamp:
self.statistics_header = statistics_packet.header()
self.start_new_matrix(statistics_timestamp)
self.current_timestamp = statistics_timestamp
self.process_packet(packet)
def start_new_matrix(self, timestamp):
logger.debug(f"starting new matrix with timestamp: {timestamp}")
"""
is called when a statistics packet with a newer timestamp is received.
Writes the matrix to the hdf5 file
Creates a new hdf5 file if needed
updates current timestamp and statistics matrix collector
"""
# write the finished (and checks if its the first matrix)
if self.current_matrix is not None:
try:
self.write_matrix()
except Exception as e:
time = str(self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
logger.error(f"Exception while attempting to write matrix to HDF5. Matrix: {time} dropped: {e}")
# only start a new file if its time AND we are done with the previous matrix.
if timestamp >= self.new_file_time_interval + self.last_file_time:
self.start_new_hdf5(timestamp)
# create a new and empty current_matrix
self.current_matrix = self.collector()
def write_matrix(self):
logger.debug("writing matrix to file")
"""
Writes the finished matrix to the hdf5 file
"""
# create the new hdf5 group based on the timestamp of packets
current_group = self.file.create_group("{}_{}".format(self.mode, str(self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S"))))
# store the statistics values
current_group.create_dataset(name=f"{self.mode}_values", data=self.current_matrix.parameters["sst_values"])
# might be optional, but they're easy to add.
current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"])
current_group.create_dataset(name="nof_valid_payloads", data=self.current_matrix.parameters["nof_valid_payloads"])
# get the statistics header
header = self.statistics_header
# can't store datetime objects in json, converted to string instead
header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds")
# convert the header to JSON
json_header = json.dumps(header)
# Stores the header of the packet received for this matrix
current_group.create_dataset(name='first_packet_header', data=numpy.str(json_header))
def process_packet(self, packet):
logger.debug(f"Processing packet")
"""
Adds the newly received statistics packet to the statistics matrix
"""
self.current_matrix.process_packet(packet)
def start_new_hdf5(self, timestamp):
if self.file is not None:
try:
self.file.close()
except Exception as e:
logger.error(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity. \r\n Exception: {e}")
current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
logger.debug(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5")
try:
self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w')
except Exception as e:
logger.error(f"Error while creating new file: {e}")
raise e
self.last_file_time = timestamp
def config_mode(self):
logger.debug(f"attempting to configure {self.mode} mode")
"""
Configures the object for the correct statistics type to be used.
"""
if self.mode == 'SST':
self.decoder = SSTPacket
self.collector = statistics_collector.SSTCollector
elif self.mode == 'BST':
# self.decoder = XSTPacket
raise NotImplementedError("BST collector has not yet been implemented")
elif self.mode == 'XST':
# self.decoder = XSTPacket
raise NotImplementedError("BST collector has not yet been implemented")
else:
# make sure the mode is valid
raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode))
def close_writer(self):
"""
Function that can be used to stop the writer without data loss.
"""
logger.debug("closing hdf5 file")
if self.file is not None:
if self.current_matrix is not None:
# Write matrix if one exists
# only creates file if there is a matrix to actually write
try:
self.write_matrix()
finally:
self.file.close()
logger.debug(f"{self.file} closed")
import argparse
from tcp_receiver import tcp_receiver
from statistics_writer import statistics_writer
import sys
import signal
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("statistics_writer")
parser = argparse.ArgumentParser(description='Arguments to configure the TCP connection and writer mode')
parser.add_argument('--address', type=str, help='the address to connect to')
parser.add_argument('--port', type=int, help='the port to use')
parser.add_argument('--interval', type=float, default=1, nargs="?", help='The time between creating new files in hours')
parser.add_argument('--location', type=str, default="data", nargs="?", help='specifies the folder to write all the files')
parser.add_argument('--mode', type=str, choices=['SST', 'XST', 'BST'], help='sets the statistics type to be decoded options: "SST", "XST", "BST"')
parser.add_argument('--debug', dest='debug', action='store_true', default=False, help='when used stores failed packets')
# create a data dumper that creates a new file every 10s (for testing)
if __name__ == "__main__":
def signal_handler(signum, frame):
writer.close_writer()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
args = parser.parse_args()
# argparse arguments
address = args.address
port = args.port
location = args.location
interval = args.interval
mode = args.mode
debug = args.debug
if debug:
logger.setLevel(logging.DEBUG)
logger.debug("Setting loglevel to DEBUG")
# creates the TCP receiver that is given to the writer
receiver = tcp_receiver(address, port)
# create the writer
writer = statistics_writer(new_file_time_interval=interval, file_location=location, statistics_mode=mode)
# start looping
try:
while True:
packet = receiver.get_packet()
writer.next_packet(packet)
except KeyboardInterrupt:
writer.close_writer()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment