diff --git a/CDB/thijs_ConfigDb.json b/CDB/thijs_ConfigDb.json index b7e508732bf03b8919683b3cf2e34a52765a5ca3..85d7df9a69d730f621b99e777b39f95670bb9637 100644 --- a/CDB/thijs_ConfigDb.json +++ b/CDB/thijs_ConfigDb.json @@ -36,13 +36,6 @@ "1": { "example_device": { "LTS/example_device/1": { - "attribute_properties": { - "Ant_mask_RW": { - "archive_period": [ - "600000" - ] - } - }, "properties": { "OPC_Server_Name": [ "host.docker.internal" @@ -62,13 +55,6 @@ "1": { "ini_device": { "LTS/ini_device/1": { - "attribute_properties": { - "Ant_mask_RW": { - "archive_period": [ - "600000" - ] - } - }, "properties": { "OPC_Server_Name": [ "host.docker.internal" @@ -88,13 +74,6 @@ "1": { "APSCTL": { "LTS/APSCTL/1": { - "attribute_properties": { - "Ant_mask_RW": { - "archive_period": [ - "600000" - ] - } - }, "properties": { "OPC_Server_Name": [ "ltspi.astron.nl" @@ -110,17 +89,23 @@ } } }, + "Statistics": { + "1": { + "SST": { + "LTS/SST/1": { + "properties": { + "SST_Port": [ + "5001" + ] + } + } + } + } + }, "SNMP": { "1": { "SNMP": { "LTS/SNMP/1": { - "attribute_properties": { - "Ant_mask_RW": { - "archive_period": [ - "600000" - ] - } - }, "properties": { "SNMP_community": [ "public" diff --git a/devices/Statistics.py b/devices/Statistics.py new file mode 100644 index 0000000000000000000000000000000000000000..b397898e5b76c026dcd82ae18b014374062800aa --- /dev/null +++ b/devices/Statistics.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the Statistics project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" Statistics Device Server for LOFAR2.0 + +""" + +# PyTango imports +from tango.server import run +from tango.server import device_property +from tango import AttrWriteType +# Additional import + +from clients.sst_client import sst_client + +from util.attribute_wrapper import attribute_wrapper +from util.hardware_device import hardware_device + +from util.lofar_logging import device_logging_to_python, log_exceptions + +import numpy + +__all__ = ["SST", "main"] + +@device_logging_to_python({"device": "SST"}) +class SST(hardware_device): + + # ----------------- + # Device Properties + # ----------------- + + SST_Port = device_property( + dtype='DevUShort', + mandatory=True + ) + + # ---------- + # Attributes + # ---------- + # -------- + + # SST client annotation consists of a dict that contains the parameter name that needs to be read. + # Example: comms_annotation={"parameter": "this_value_R"} + packet_count_R = attribute_wrapper(comms_annotation={"parameter": "packet_count_R"}, datatype=numpy.int64) + last_packet_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "last_packet_timestamp_R"}, datatype=numpy.int64) + queue_percentage_used_R = attribute_wrapper(comms_annotation={"parameter": "queue_percentage_used_R"}, datatype=numpy.double) + + # -------- + # overloaded functions + def configure_for_off(self): + """ user code here. is called when the state is set to OFF """ + + # Stop keep-alive + try: + self.sst_client.stop() + except Exception as e: + self.warn_stream("Exception while stopping sst_client in configure_for_off function: {}. Exception ignored".format(e)) + + @log_exceptions() + def configure_for_initialise(self): + """ user code here. is called when the sate is set to INIT """ + """Initialises the attributes and properties of the statistics device.""" + + self.sst_client = sst_client("0.0.0.0", self.SST_Port, self.Fault, self) + + # map an access helper class + for i in self.attr_list(): + try: + i.set_comm_client(self.sst_client) + except Exception as e: + # use the pass function instead of setting read/write fails + i.set_pass_func() + self.warn_stream("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e)) + pass + + self.sst_client.start() + + # -------- + # Commands + # -------- + +# ---------- +# Run server +# ---------- +def main(args=None, **kwargs): + """Main function of the Statistics Device module.""" + return run((SST,), args=args, **kwargs) + + +if __name__ == '__main__': + main() diff --git a/devices/clients/sst_client.py b/devices/clients/sst_client.py new file mode 100644 index 0000000000000000000000000000000000000000..ab79fe125f66d0d0cf14bc0add00df8f515b9e32 --- /dev/null +++ b/devices/clients/sst_client.py @@ -0,0 +1,182 @@ +import queue +from threading import Thread +import socket +from util.comms_client import CommClient + +from queue import Queue + +import numpy +import logging +import socket +from datetime import datetime +from multiprocessing import Value, Array +import time + + +__all__ = ["sst_client"] + +class sst_client(CommClient): + """ + Connects to OPC-UA in the foreground or background, and sends HELLO + messages to keep a check on the connection. On connection failure, reconnects once. + """ + + def start(self): + super().start() + + def __init__(self, host, port, fault_func, streams, try_interval=2, queuesize=1024, buffersize=9000): + """ + Create the sst client and connect() to it and get the object node + """ + self.host = host + self.port = port + self.timeout = 0.1 + self.buffersize = buffersize + self.queuesize = queuesize + + super().__init__(fault_func, streams, try_interval) + + # Explicitly connect + if not self.connect(): + # hardware or infra is down -- needs fixing first + fault_func() + return + + def connect(self): + """ + Function used to connect to the client. + """ + if not self.connected: + self.queue = Queue(maxsize=self.queuesize) + self.udp = UDP_Receiver(self.host, self.port, self.queue, self.streams, self.buffersize, self.timeout) + self.sst = SST(self.queue, self.streams) + return super().connect() + + def disconnect(self): + del self.udp + del self.sst + del self.queue + + return super().disconnect() + + def _setup_annotation(self, annotation): + """ + This class's Implementation of the get_mapping function. returns the read and write functions + """ + parameter = annotation.get('parameter', None) + + if parameter is None: + raise Exception("No SST parameter was given in the annotation: %s", annotation) + + return parameter + + def setup_value_conversion(self, attribute): + """ + gives the client access to the attribute_wrapper object in order to access all data it could potentially need. + the OPC ua read/write functions require the dimensionality and the type to be known + """ + return + + def setup_attribute(self, annotation, attribute): + """ + MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions + """ + + # process the annotation + SST_param = self._setup_annotation(annotation) + + # get all the necessary data to set up the read/write functions from the attribute_wrapper + self.setup_value_conversion(attribute) + + + def read_function(): + return [self.sst.parameters[SST_param]] + + def write_function(value): + """ + Not used here + """ + pass + + return read_function, write_function + +class UDP_Receiver(Thread): + """ + This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code + """ + + def __init__(self, host, port, queue, streams, buffersize=9000, timeout=0.1): + + self.streams = streams + self.queue = queue + self.host = host + self.port = port + self.buffersize = buffersize + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + self.sock.bind((self.host, self.port)) + + + self.sock.settimeout(timeout) + self.stream_on = True + super().__init__() + + self.start() + + def run(self): + # all variables are manually defined and are updated each time + self.streams.debug_stream("starting UDP thread with port {} and host {}".format(self.port, self.host)) + + while self.stream_on: + try: + packet = [bytearray(self.buffersize)] + self.sock.recvmsg_into(packet[0:self.buffersize]) + self.queue.put(packet) + + except socket.timeout: + pass + except queue.Full: + pass + + def __del__(self): + self.stream_on = False + self.join() + + +class SST(Thread): + def __init__(self, queue, streams): + + self.streams = streams + self.queue = queue + self.last_packet = None + + self.parameters = { + "packet_count_R": numpy.int64(0), + "last_packet_timestamp_R": numpy.int64(0), + "queue_percentage_used_R": numpy.double(100 * self.queue.qsize() / self.queue.maxsize) + } + + super().__init__() + self.start() + + def run(self): + self.streams.debug_stream("starting SST thread") + + while True: + packet = self.queue.get() + + if packet is None: + self.queue.clear() + break + + self.process_packet(packet) + + def __del__(self): + self.queue.put(None) + self.join() + + def process_packet(self, packet): + self.parameters["packet_count_R"] += 1 + self.parameters["last_packet_timestamp_R"] = numpy.int64(int(time.time())) + self.parameters["queue_percentage_used_R"] = numpy.double(100 * self.queue.qsize() / self.queue.maxsize) \ No newline at end of file diff --git a/devices/udp_simulator.py b/devices/udp_simulator.py new file mode 100644 index 0000000000000000000000000000000000000000..9720cff969f4db0a1f406359512036357b224076 --- /dev/null +++ b/devices/udp_simulator.py @@ -0,0 +1,22 @@ +import socket +import time + +i = 1 + +UDP_IP = "127.0.0.1" +UDP_PORT = 5001 +MESSAGE = "{}".format(i) + +print("UDP target IP: %s" % UDP_IP) +print("UDP target port: %s" % UDP_PORT) + +sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # create UDP socket + +while True: + sock.sendto(bytes(MESSAGE, "utf-8"), (UDP_IP, UDP_PORT)) + i += 1 + MESSAGE = "{}".format(i) + + #sleep for an arbitrary amount of time. Currently 0.2 settings for visual testing. + time.sleep(0.2) + diff --git a/docker-compose/device-statistics.yml b/docker-compose/device-statistics.yml new file mode 100644 index 0000000000000000000000000000000000000000..52c08ab21fb6c45240022199ac98ede3874d870a --- /dev/null +++ b/docker-compose/device-statistics.yml @@ -0,0 +1,35 @@ +# +# Docker compose file that launches an interactive iTango session. +# +# Connect to the interactive session with 'docker attach itango'. +# Disconnect with the Docker deattach sequence: <CTRL>+<P> <CTRL>+<Q> +# +# Defines: +# - itango: iTango interactive session +# +# Requires: +# - lofar-device-base.yml +# +version: '2' + +services: + device-statistics: + image: device-statistics + # build explicitly, as docker-compose does not understand a local image + # being shared among services. + build: + context: lofar-device-base + container_name: ${CONTAINER_NAME_PREFIX}device-statistics + network_mode: ${NETWORK_MODE} + volumes: + - ${TANGO_LOFAR_CONTAINER_MOUNT} + environment: + - TANGO_HOST=${TANGO_HOST} + entrypoint: + - /usr/local/bin/wait-for-it.sh + - ${TANGO_HOST} + - --timeout=30 + - --strict + - -- + - python3 -u ${TANGO_LOFAR_CONTAINER_DIR}/devices/Statistics.py LTS -v + restart: on-failure diff --git a/docker-compose/itango.yml b/docker-compose/itango.yml index 1a8547baf830bfd893177049017f9aece28afa88..4af318480763a80170aa7926d477ddda597c5125 100644 --- a/docker-compose/itango.yml +++ b/docker-compose/itango.yml @@ -23,6 +23,10 @@ services: volumes: - ${TANGO_LOFAR_CONTAINER_MOUNT} - ${HOME}:/hosthome + ports: + - "5001:5001/udp" + expose: + - "5001/udp" environment: - TANGO_HOST=${TANGO_HOST} - XAUTHORITY=${XAUTHORITY} diff --git a/jupyter-notebooks/test_device.ipynb b/jupyter-notebooks/test_device.ipynb index 66d3a5f1057cb9051dce52325fb4fc73fb9d7005..7701f520937bcaf88b09f624c8b9e3a4ee752c85 100644 --- a/jupyter-notebooks/test_device.ipynb +++ b/jupyter-notebooks/test_device.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 26, + "execution_count": 61, "id": "waiting-chance", "metadata": {}, "outputs": [], @@ -13,17 +13,17 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 62, "id": "moving-alexandria", "metadata": {}, "outputs": [], "source": [ - "d=DeviceProxy(\"LTS/test_device/1\")" + "d=DeviceProxy(\"LTS/SST/1\")" ] }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 63, "id": "ranking-aluminum", "metadata": {}, "outputs": [ @@ -51,7 +51,7 @@ }, { "cell_type": "code", - "execution_count": 31, + "execution_count": 68, "id": "beneficial-evidence", "metadata": {}, "outputs": [ @@ -59,36 +59,11 @@ "name": "stdout", "output_type": "stream", "text": [ - "bool_scalar_R [False]\n", - "bool_scalar_RW [False]\n", - "int64_spectrum_R [0 0 0 0 0 0 0 0]\n", - "str_spectrum_RW ('', '', '', '', '', '', '', '')\n", - "double_image_R [[0. 0.]\n", - " [0. 0.]\n", - " [0. 0.]\n", - " [0. 0.]\n", - " [0. 0.]\n", - " [0. 0.]\n", - " [0. 0.]\n", - " [0. 0.]]\n", - "double_image_RW [[0. 0. 0. 0. 0. 0. 0. 0.]\n", - " [0. 0. 0. 0. 0. 0. 0. 0.]]\n", - "int32_scalar_R [0]\n", - "uint16_spectrum_RW [0 0 0 0 0 0 0 0]\n", - "float32_image_R [[0. 0. 0. 0. 0. 0. 0. 0.]\n", - " [0. 0. 0. 0. 0. 0. 0. 0.]]\n", - "uint8_image_RW [[0 0]\n", - " [0 0]\n", - " [0 0]\n", - " [0 0]\n", - " [0 0]\n", - " [0 0]\n", - " [0 0]\n", - " [0 0]]\n", - "tr_tod_R [0]\n", - "tr_uptime_R [0]\n", - "State <function __get_command_func.<locals>.f at 0x7f1c88a29e18>\n", - "Status <function __get_command_func.<locals>.f at 0x7f1c88a5abf8>\n" + "packet_count_R [55]\n", + "last_packet_timestamp_R [1623249385]\n", + "queue_percentage_used_R [0.]\n", + "State <function __get_command_func.<locals>.f at 0x7fcb205fd0d0>\n", + "Status <function __get_command_func.<locals>.f at 0x7fcb205fd0d0>\n" ] } ],