diff --git a/devices/clients/comms_client.py b/devices/clients/comms_client.py index 011e1e62180e85f6bc17d72a6ee31eb5871ecb50..0475704b060970f4726a8665c297a82690af1e7f 100644 --- a/devices/clients/comms_client.py +++ b/devices/clients/comms_client.py @@ -1,9 +1,62 @@ from threading import Thread import time +import asyncio +from abc import ABC, abstractmethod -class CommClient(Thread): +class AbstractCommClient(ABC): + @abstractmethod + def start(self): + """ Start communication with the client. """ + + @abstractmethod + def stop(self): + """ Start communication with the client. """ + + def ping(self): + """ Check whether the connection is still alive. Raises an exception if not. """ + pass + + @abstractmethod + def setup_attribute(self, annotation, attribute): + """ + This function is responsible for providing the attribute_wrapper with a read/write function + How this is done is implementation specific. + The setup-attribute has access to the comms_annotation provided to the attribute wrapper to pass along to the comms client + as well as a reference to the attribute itself. + + It should do this by first calling: _setup_annotation and setup_value_conversion to get all data necceacry to configure the read/write functions. + It should then return the read and write functions to the attribute. + + MANDATORY: + annotation_outputs = _setup_annotation(annotation) + attribute_outputs = _setup_annotation(attribute) + (note: outputs are up to the user) + + REQUIRED: provide read and write functions to return, there are no restrictions on how these should be provided, + except that the read function takes a single input value and the write function returns a single value + + MANDATORY: + return read_function, write_function + + Examples: + - File system: get_mapping returns functions that read/write a fixed + number of bytes at a fixed location in a file. (SEEK) + - OPC-UA: traverse the OPC-UA tree until the node is found. + Then return the read/write functions for that node which automatically + convert values between Python and OPC-UA. + """ + + def _setup_annotation(self, annotation): + """ + This function is responsible for handling the annotation data provided by the attribute to configure the read/write function the client must provide. + This function should be called by setup_attribute + """ + pass + +class CommClient(AbstractCommClient, Thread): """ - The ProtocolHandler class is the generic interface class between the tango attribute_wrapper and the outside world + Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping() + primitives. """ def __init__(self, fault_func, streams, try_interval=2): @@ -85,47 +138,98 @@ class CommClient(Thread): self.disconnect() - def setup_attribute(self, annotation, attribute): - """ - This function is responsible for providing the attribute_wrapper with a read/write function - How this is done is implementation specific. - The setup-attribute has access to the comms_annotation provided to the attribute wrapper to pass along to the comms client - as well as a reference to the attribute itself. - It should do this by first calling: _setup_annotation and setup_value_conversion to get all data necceacry to configure the read/write functions. - It should then return the read and write functions to the attribute. - - MANDATORY: - annotation_outputs = _setup_annotation(annotation) - attribute_outputs = _setup_annotation(attribute) - (note: outputs are up to the user) +class AsyncCommClient(object): + """ + Abstracts communication with a client, for instance, over the network, by handling connect(), disconnect(), and ping() + primitives. - REQUIRED: provide read and write functions to return, there are no restrictions on how these should be provided, - except that the read function takes a single input value and the write function returns a single value + asyncio version of the CommClient. Also does not reconnect if the connection is lost. + """ - MANDATORY: - return read_function, write_function + def __init__(self, fault_func): + """ - Examples: - - File system: get_mapping returns functions that read/write a fixed - number of bytes at a fixed location in a file. (SEEK) - - OPC-UA: traverse the OPC-UA tree until the node is found. - Then return the read/write functions for that node which automatically - convert values between Python and OPC-UA. """ - raise NotImplementedError("the setup_attribute must be implemented and provide return a valid read/write function for the attribute") + self.fault_func = fault_func + self.running = False - def _setup_annotation(self, annotation): + @abstractmethod + async def connect(self): """ - This function is responsible for handling the annotation data provided by the attribute to configure the read/write function the client must provide. - This function should be called by setup_attribute + Function used to connect to the client, and any + post init. """ - raise NotImplementedError("the _setup_annotation must be implemented, content and outputs are up to the user") - def setup_value_conversion(self, attribute): + @abstractmethod + async def disconnect(self): """ - this function is responsible for setting up the value conversion between the client and the attribute. - This function should be called by setup_attribute + Function used to disconnect from the client. """ - raise NotImplementedError("the setup_value_conversion must be implemented, content and outputs are up to the user") + + async def watch_connection(self): + """ Notice when the connection goes down. """ + + try: + logger.info(f"Start watching OPC-UA connection to {self._servername()}") + + while self.running: + # ping will throw in case of connection issues + try: + await self.ping() + except Exception as e: + logger.exception("Ping failed: connection considered lost") + + # connection error, go to fault + self.fault_func() + + # disconnect will cancel us + await self.disconnect() + + # always have a backup plan + return + + # don't spin, sleep for a while + await asyncio.sleep(2) + except asyncio.CancelledError as e: + pass + except Exception as e: + # log immediately, or the exception will only be printed once this task is awaited + logger.exception(f"Exception raised while watching OPC-UA connection to {self._servername()}") + + raise + finally: + logger.info(f"Stop watching OPC-UA connection to {self._servername()}") + + async def ping(self): + return + + async def start(self): + if self.running: + # already running + return + + await self.connect() + self.running = True + + # watch connection + self.watch_connection_task = asyncio.create_task(self.watch_connection()) + + async def stop(self): + if not self.running: + # already stopped + return + + self.running = False + + # cancel & reap watcher + self.watch_connection_task.cancel() + try: + await self.watch_connection_task + except Exception as e: + logger.exception(f"Watcher thread raised exception") + + # the task stopped eithr way, so no need to bother our caller with this + + await self.disconnect() diff --git a/devices/clients/opcua_client.py b/devices/clients/opcua_client.py index 634fc62ad41e136d92e2a84ce8ee8d518b3b9cfe..e7634caedf9f26c6484370bf010bb6ec152e9e56 100644 --- a/devices/clients/opcua_client.py +++ b/devices/clients/opcua_client.py @@ -5,7 +5,7 @@ import asyncua import asyncio from asyncua import Client -from clients.comms_client import CommClient +from clients.comms_client import AsyncCommClient import logging logger = logging.getLogger() @@ -41,22 +41,22 @@ event_loop = asyncio.new_event_loop() event_loop_thread = Thread(target=run_loop, args=(event_loop,), name="OPC-UA asyncio event loop", daemon=True) event_loop_thread.start() -class OPCUAConnection(object): +class OPCUAConnection(AsyncCommClient): """ Connects to OPC-UA in the foreground or background, and sends HELLO messages to keep a check on the connection. On connection failure, reconnects once. """ - def __init__(self, address, namespace, timeout, fault_func, streams, try_interval=2): + def __init__(self, address, namespace, timeout, fault_func): """ Create the OPC ua client and connect() to it and get the object node """ self.client = Client(address, int(timeout)) - self.streams = streams - self.fault_func = fault_func self.namespace = namespace + super().__init__(fault_func) + async def start(self): # connect await self.connect() @@ -79,10 +79,6 @@ class OPCUAConnection(object): Try to connect to the client """ - if self.connected: - logger.debug(f"Already connected to server {self._servername()}") - return - logger.debug(f"Connecting to server {self._servername()}") try: @@ -93,65 +89,26 @@ class OPCUAConnection(object): logger.debug(f"Connected to OPC-UA server {self._servername()}") - # watch connection - self.watch_connection_task = asyncio.create_task(self.watch_connection()) + # determine namespace used + try: + if type(self.namespace) is str: + self.name_space_index = await self.client.get_namespace_index(self.namespace) + elif type(self.namespace) is int: + self.name_space_index = self.namespace + else: + raise TypeError(f"Namespace is not of type int or str, but of type {type(self.namespace).__name__}") + except Exception as e: + self.streams.error_stream("Could not determine namespace index from namespace: %s: %s", namespace, e) + raise Exception("Could not determine namespace index from namespace %s", namespace) from e + + self.obj = self.client.get_objects_node() async def disconnect(self): """ disconnect from the client """ - if not self.connected: - logger.debug(f"Already disconnected from server {self._servername()}") - return - - self.connected = False - - # cancel & reap watcher - self.watch_connection_task.cancel() - try: - await self.watch_connection_task - except Exception as e: - logger.exception(f"Watcher thread for {self._servername()} raised exception") - - # disconnect client explictly (will throw if the other side already disconnected) - try: - await self.client.disconnect() - except Exception as e: - logger.exception(f"Could not disconnect from OPC-UA server {self._servername()}") from e - - async def watch_connection(self): - """ Notice when the connection goes down. """ - - try: - logger.info(f"Start watching OPC-UA connection to {self._servername()}") - - while self.connected: - # ping will throw in case of connection issues - try: - await self.ping() - except Exception as e: - logger.exception(f"OPC-UA connection to {self._servername()} lost") - - # connection error, go to fault - self.fault_func() - - # disconnect will cancel us - await self.disconnect() - # always have a backup plan - return - - # don't spin, sleep for a while - await asyncio.sleep(1) - except asyncio.CancelledError as e: - pass - except Exception as e: - # log immediately, or the exception will only be printed once this task is awaited - logger.exception(f"Exception raised while watching OPC-UA connection to {self._servername()}") - - raise - finally: - logger.info(f"Stop watching OPC-UA connection to {self._servername()}") + await self.client.disconnect() async def ping(self): """ @@ -163,10 +120,6 @@ class OPCUAConnection(object): raise IOError("Lost connection to server %s: %s", self._servername(), e) async def _setup_annotation(self, annotation): - """ - This class's Implementation of the get_mapping function. returns the read and write functions - """ - if isinstance(annotation, dict): # check if required path inarg is present if annotation.get('path') is None: @@ -184,33 +137,19 @@ class OPCUAConnection(object): try: node = await self.obj.get_child(path) except Exception as e: - self.streams.error_stream("Could not get node: %s on server %s: %s", path, self._servername(), e) + logger.exception("Could not get node: %s on server %s", path, self._servername()) raise Exception("Could not get node: %s on server %s", path, self._servername()) from e return node - def setup_value_conversion(self, attribute): - """ - gives the client access to the attribute_wrapper object in order to access all data it could potentially need. - the OPC ua read/write functions require the dimensionality and the type to be known - """ - - dim_x = attribute.dim_x - dim_y = attribute.dim_y - ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type - - return dim_x, dim_y, ua_type - async def setup_attribute(self, annotation, attribute): - """ - MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions - """ - # process the annotation node = await self._setup_annotation(annotation) # get all the necessary data to set up the read/write functions from the attribute_wrapper - dim_x, dim_y, ua_type = self.setup_value_conversion(attribute) + dim_x = attribute.dim_x + dim_y = attribute.dim_y + ua_type = numpy_to_OPCua_dict[attribute.numpy_type] # convert the numpy type to a corresponding UA type # configure and return the read/write functions prot_attr = ProtocolAttribute(node, dim_x, dim_y, ua_type) diff --git a/devices/devices/docker_device.py b/devices/devices/docker_device.py index 5ff0ec366c436a2dfc75d4cd479219a04c6938d3..01d36236d8d04678035748a34cae213e6b7b18ae 100644 --- a/devices/devices/docker_device.py +++ b/devices/devices/docker_device.py @@ -31,7 +31,6 @@ from clients.docker_client import DockerClient from clients.attribute_wrapper import attribute_wrapper from devices.hardware_device import hardware_device from common.lofar_logging import device_logging_to_python, log_exceptions -from common.lofar_git import get_version __all__ = ["Docker", "main"] @@ -63,7 +62,6 @@ class Docker(hardware_device): # ---------- # Attributes # ---------- - version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) archiver_maria_db_R = attribute_wrapper(comms_annotation={"container": "archiver-maria-db"}, datatype=numpy.bool_) archiver_maria_db_RW = attribute_wrapper(comms_annotation={"container": "archiver-maria-db"}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE) databaseds_R = attribute_wrapper(comms_annotation={"container": "databaseds"}, datatype=numpy.bool_) @@ -124,7 +122,7 @@ class Docker(hardware_device): """ user code here. is called when the state is set to OFF """ # Stop keep-alive try: - self.opcua_connection.stop() + self.docker_client.stop() except Exception as e: self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) diff --git a/devices/devices/hardware_device.py b/devices/devices/hardware_device.py index f8f6ca50d7e02f5a8694c2ec4f9135dd874cd516..8a75387081ec2760f22e5c4a41aa6957357f1721 100644 --- a/devices/devices/hardware_device.py +++ b/devices/devices/hardware_device.py @@ -15,18 +15,18 @@ from abc import ABCMeta, abstractmethod # PyTango imports from tango.server import Device, command, DeviceMeta, attribute -from tango import DevState, DebugIt, Attribute, DeviceProxy +from tango import DevState, DebugIt, Attribute, DeviceProxy, AttrWriteType # Additional import from clients.attribute_wrapper import attribute_wrapper from common.lofar_logging import log_exceptions +from common.lofar_git import get_version from devices.abstract_device import AbstractDeviceMetas from devices.device_decorators import only_in_states, fault_on_error -import logging - __all__ = ["hardware_device"] +import logging logger = logging.getLogger() @@ -56,6 +56,12 @@ class hardware_device(Device, metaclass=AbstractDeviceMetas): The user triggers their transitions by the commands reflecting the target state (Initialise(), On(), Fault()). """ + # ---------- + # Attributes + # ---------- + + version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) + @classmethod def attr_list(cls): """ Return a list of all the attribute_wrapper members of this class. """ @@ -74,6 +80,19 @@ class hardware_device(Device, metaclass=AbstractDeviceMetas): self.set_state(DevState.OFF) + @log_exceptions() + def delete_device(self): + """Hook to delete resources allocated in init_device. + + This method allows for any memory or other resources allocated in the + init_device method to be released. This method is called by the device + destructor and by the device Init command (a Tango built-in). + """ + self.debug_stream("Shutting down...") + + self.Off() + self.debug_stream("Shut down. Good bye.") + # -------- # Commands # -------- diff --git a/devices/devices/recv.py b/devices/devices/recv.py index c656c7862805a204a536fcbc051efb0067ed564a..18f92e6be4b3ec0c1ae616057b131e6c425e20ca 100644 --- a/devices/devices/recv.py +++ b/devices/devices/recv.py @@ -28,16 +28,14 @@ import asyncio from device_decorators import * -from clients.opcua_client import OPCUAConnection, event_loop as opcua_event_loop from clients.attribute_wrapper import attribute_wrapper -from devices.hardware_device import hardware_device +from devices.opcua_device import opcua_device from common.lofar_logging import device_logging_to_python, log_exceptions -from common.lofar_git import get_version __all__ = ["RECV", "main"] @device_logging_to_python() -class RECV(hardware_device): +class RECV(opcua_device): """ **Properties:** @@ -51,33 +49,9 @@ class RECV(hardware_device): - Type:'DevDouble' """ - # ----------------- - # Device Properties - # ----------------- - - OPC_Server_Name = device_property( - dtype='DevString', - mandatory=True - ) - - OPC_Server_Port = device_property( - dtype='DevULong', - mandatory=True - ) - - OPC_Time_Out = device_property( - dtype='DevDouble', - mandatory=True - ) - OPC_namespace = device_property( - dtype='DevString', - mandatory=False - ) - # ---------- # Attributes # ---------- - version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) Ant_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:Ant_mask_RW"], datatype=numpy.bool_, dims=(3, 32), access=AttrWriteType.READ_WRITE) CLK_Enable_PWR_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_Enable_PWR_R"], datatype=numpy.bool_) CLK_I2C_STATUS_R = attribute_wrapper(comms_annotation=["2:PCC", "2:CLK_I2C_STATUS_R"], datatype=numpy.int64) @@ -111,61 +85,6 @@ class RECV(hardware_device): RCU_translator_busy_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_translator_busy_R"], datatype=numpy.bool_) RCU_version_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_version_R"], datatype=numpy.str, dims=(32,)) - @log_exceptions() - def delete_device(self): - """Hook to delete resources allocated in init_device. - - This method allows for any memory or other resources allocated in the - init_device method to be released. This method is called by the device - destructor and by the device Init command (a Tango built-in). - """ - self.debug_stream("Shutting down...") - - self.Off() - self.debug_stream("Shut down. Good bye.") - - # -------- - # overloaded functions - # -------- - @log_exceptions() - def configure_for_off(self): - """ user code here. is called when the state is set to OFF """ - # Stop keep-alive - try: - self.OPCua_client.stop() - except Exception as e: - self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) - - - @log_exceptions() - def configure_for_initialise(self): - """ user code here. is called when the state is set to INIT """ - - # Init the dict that contains function to OPC-UA function mappings. - self.function_mapping = {} - self.function_mapping["RCU_on"] = {} - self.function_mapping["RCU_off"] = {} - self.function_mapping["CLK_on"] = {} - self.function_mapping["CLK_off"] = {} - - future = asyncio.run_coroutine_threadsafe(self._initialise_opcua(), opcua_event_loop) - _ = future.result() - - async def _initialise_opcua(self): - # set up the OPC ua client - self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) - - await self.OPCua_client.start() - - # map an access helper class - for i in self.attr_list(): - try: - await i.set_comm_client(self.OPCua_client) - except Exception as e: - # use the pass function instead of setting read/write fails - i.set_pass_func() - self.warn_stream("error while setting the RECV attribute {} read/write function. {}".format(i, e)) - # -------- # Commands # -------- diff --git a/devices/devices/sdp/sdp.py b/devices/devices/sdp/sdp.py index 75e027b571cefe0bdfa68621b37f45dd26d98aae..f93abbb469fd919fb07cf6e46d83fc75f65afcff 100644 --- a/devices/devices/sdp/sdp.py +++ b/devices/devices/sdp/sdp.py @@ -24,19 +24,17 @@ from tango.server import device_property, attribute from tango import AttrWriteType # Additional import -from clients.opcua_client import OPCUAConnection from clients.attribute_wrapper import attribute_wrapper -from devices.hardware_device import hardware_device +from devices.opcua_device import opcua_device from common.lofar_logging import device_logging_to_python, log_exceptions -from common.lofar_git import get_version import numpy __all__ = ["SDP", "main"] @device_logging_to_python() -class SDP(hardware_device): +class SDP(opcua_device): """ **Properties:** @@ -54,21 +52,6 @@ class SDP(hardware_device): # Device Properties # ----------------- - OPC_Server_Name = device_property( - dtype='DevString', - mandatory=True - ) - - OPC_Server_Port = device_property( - dtype='DevULong', - mandatory=True - ) - - OPC_Time_Out = device_property( - dtype='DevDouble', - mandatory=True - ) - FPGA_processing_enable_RW_default = device_property( dtype='DevVarBooleanArray', mandatory=False, @@ -96,8 +79,6 @@ class SDP(hardware_device): # Attributes # ---------- - version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) - # SDP will switch from FPGA_mask_RW to TR_FPGA_mask_RW, offer both for now as its a critical flag FPGA_firmware_version_R = attribute_wrapper(comms_annotation=["2:FPGA_firmware_version_R"], datatype=numpy.str, dims=(16,)) FPGA_global_node_index_R = attribute_wrapper(comms_annotation=["2:FPGA_global_node_index_R"], datatype=numpy.uint32, dims=(16,)) @@ -142,55 +123,9 @@ class SDP(hardware_device): TR_tod_R = attribute_wrapper(comms_annotation=["2:TR_tod_R"], datatype=numpy.int64, dims=(2,)) TR_tod_pps_delta_R = attribute_wrapper(comms_annotation=["2:TR_tod_pps_delta_R"], datatype=numpy.double) - def always_executed_hook(self): - """Method always executed before any TANGO command is executed.""" - pass - - @log_exceptions() - def delete_device(self): - """Hook to delete resources allocated in init_device. - - This method allows for any memory or other resources allocated in the - init_device method to be released. This method is called by the device - destructor and by the device Init command (a Tango built-in). - """ - self.debug_stream("Shutting down...") - - self.Off() - self.debug_stream("Shut down. Good bye.") - # -------- # overloaded functions # -------- - @log_exceptions() - def configure_for_off(self): - """ user code here. is called when the state is set to OFF """ - - # Stop keep-alive - try: - self.OPCua_client.stop() - except Exception as e: - self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) - - @log_exceptions() - def configure_for_initialise(self): - """ user code here. is called when the sate is set to INIT """ - """Initialises the attributes and properties of the SDP.""" - - # set up the OPC ua client - self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) - - # map an access helper class - for i in self.attr_list(): - try: - i.set_comm_client(self.OPCua_client) - except Exception as e: - # use the pass function instead of setting read/write fails - i.set_pass_func() - self.warn_stream("error while setting the SDP attribute {} read/write function. {}".format(i, e)) - pass - - self.OPCua_client.start() # -------- # Commands diff --git a/devices/devices/sdp/sst.py b/devices/devices/sdp/sst.py index 3b2f36236a841adb0511b284cbeb4a0fbc6ee296..85d4f043de15ca074b03f754584fec655576f119 100644 --- a/devices/devices/sdp/sst.py +++ b/devices/devices/sdp/sst.py @@ -25,8 +25,8 @@ from tango import AttrWriteType # Additional import from clients.attribute_wrapper import attribute_wrapper -from clients.opcua_client import OPCUAConnection from clients.statistics_client import StatisticsClient +from clients.opcua_client import OPCUAConnection from devices.sdp.statistics import Statistics from devices.sdp.statistics_collector import SSTCollector diff --git a/devices/devices/sdp/statistics.py b/devices/devices/sdp/statistics.py index 7d0b970b089ff29931bfc088f8b4b208d347402c..24019d623b90a846316e2b457694368dc533bf6e 100644 --- a/devices/devices/sdp/statistics.py +++ b/devices/devices/sdp/statistics.py @@ -26,12 +26,10 @@ from tango import AttrWriteType # Additional import from clients.statistics_client import StatisticsClient -from clients.opcua_client import OPCUAConnection from clients.attribute_wrapper import attribute_wrapper -from devices.hardware_device import hardware_device +from devices.opcua_device import opcua_device -from common.lofar_git import get_version from common.lofar_logging import device_logging_to_python, log_exceptions import logging @@ -41,7 +39,7 @@ import numpy __all__ = ["Statistics"] -class Statistics(hardware_device, metaclass=ABCMeta): +class Statistics(opcua_device, metaclass=ABCMeta): # In derived classes, set this to a subclass of StatisticsCollector @property @@ -53,21 +51,6 @@ class Statistics(hardware_device, metaclass=ABCMeta): # Device Properties # ----------------- - OPC_Server_Name = device_property( - dtype='DevString', - mandatory=True - ) - - OPC_Server_Port = device_property( - dtype='DevULong', - mandatory=True - ) - - OPC_Time_Out = device_property( - dtype='DevDouble', - mandatory=True - ) - Statistics_Client_UDP_Port = device_property( dtype='DevUShort', mandatory=True @@ -82,8 +65,6 @@ class Statistics(hardware_device, metaclass=ABCMeta): # Attributes # ---------- - version_R = attribute(dtype = str, access = AttrWriteType.READ, fget = lambda self: get_version()) - # number of UDP packets and bytes that were received nof_packets_received_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "nof_packets_received"}, datatype=numpy.uint64) nof_bytes_received_R = attribute_wrapper(comms_id=StatisticsClient, comms_annotation={"type": "udp", "parameter": "nof_bytes_received"}, datatype=numpy.uint64) @@ -118,17 +99,13 @@ class Statistics(hardware_device, metaclass=ABCMeta): def configure_for_off(self): """ user code here. is called when the state is set to OFF """ - # Stop keep-alive + super().configure_for_off() + try: self.statistics_client.stop() except Exception as e: logger.exception("Exception while stopping statistics_client in configure_for_off. Exception ignored") - try: - self.OPCUA_client.stop() - except Exception as e: - logger.exception("Exception while stopping OPC UA connection in configure_for_off. Exception ignored") - @log_exceptions() def configure_for_initialise(self): """ user code here. is called when the sate is set to INIT """ @@ -149,26 +126,19 @@ class Statistics(hardware_device, metaclass=ABCMeta): self.statistics_collector = self.STATISTICS_COLLECTOR_CLASS() self.statistics_client = StatisticsClient(self.statistics_collector, udp_options, tcp_options, self.Fault, self) - self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) - # map an access helper class for i in self.attr_list(): try: if i.comms_id == StatisticsClient: i.set_comm_client(self.statistics_client) - elif i.comms_id == OPCUAConnection: - i.set_comm_client(self.OPCUA_client) - else: - raise ValueError("Cannot set comm client for attribute {}: Unknown comms_id {}".format(i, i.comms_id)) except Exception as e: # use the pass function instead of setting read/write fails i.set_pass_func() self.warn_stream("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e)) - pass self.statistics_client.start() - self.OPCUA_client.start() + super().configure_for_initialise() # -------- # Commands diff --git a/devices/devices/sdp/xst.py b/devices/devices/sdp/xst.py index caeeb5d3488369ecaf17208d1b33c2b7e6c76511..d4fdc99142275b9736983668996ba6659a4eedf5 100644 --- a/devices/devices/sdp/xst.py +++ b/devices/devices/sdp/xst.py @@ -25,12 +25,9 @@ from tango import AttrWriteType # Additional import from clients.attribute_wrapper import attribute_wrapper -from clients.opcua_client import OPCUAConnection from clients.statistics_client import StatisticsClient +from clients.opcua_client import OPCUAConnection -from devices.hardware_device import hardware_device - -from common.lofar_git import get_version from common.lofar_logging import device_logging_to_python, log_exceptions from devices.sdp.statistics import Statistics diff --git a/devices/devices/unb2.py b/devices/devices/unb2.py index 7c2575991605354de5bba608906fb9ea248f021b..cd6c48904a257c01ff8ed26407f13c09b71111a5 100644 --- a/devices/devices/unb2.py +++ b/devices/devices/unb2.py @@ -23,19 +23,17 @@ from tango.server import device_property, attribute from tango import AttrWriteType # Additional import -from clients.opcua_client import OPCUAConnection from clients.attribute_wrapper import attribute_wrapper -from devices.hardware_device import hardware_device +from devices.opcua_device import opcua_device from common.lofar_logging import device_logging_to_python, log_exceptions -from common.lofar_git import get_version import numpy __all__ = ["UNB2", "main"] @device_logging_to_python() -class UNB2(hardware_device): +class UNB2(opcua_device): """ **Properties:** @@ -53,27 +51,10 @@ class UNB2(hardware_device): # Device Properties # ----------------- - OPC_Server_Name = device_property( - dtype='DevString', - mandatory=True - ) - - OPC_Server_Port = device_property( - dtype='DevULong', - mandatory=True - ) - - OPC_Time_Out = device_property( - dtype='DevDouble', - mandatory=True - ) - # ---------- # Attributes # ---------- - version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) - N_unb = 2 N_fpga = 4 N_ddr = 2 @@ -167,49 +148,10 @@ class UNB2(hardware_device): # QualifiedName(2: UNB2_on) # QualifiedName(2: UNB2_off) - @log_exceptions() - def delete_device(self): - """Hook to delete resources allocated in init_device. - - This method allows for any memory or other resources allocated in the - init_device method to be released. This method is called by the device - destructor and by the device Init command (a Tango built-in). - """ - self.debug_stream("Shutting down...") - - self.Off() - self.debug_stream("Shut down. Good bye.") # -------- # overloaded functions # -------- - @log_exceptions() - def configure_for_off(self): - """ user code here. is called when the state is set to OFF """ - # Stop keep-alive - try: - self.opcua_connection.stop() - except Exception as e: - self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) - - @log_exceptions() - def configure_for_initialise(self): - """ user code here. is called when the sate is set to INIT """ - """Initialises the attributes and properties of theRECV.""" - - # set up the OPC ua client - self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) - - # map an access helper class - for i in self.attr_list(): - try: - i.set_comm_client(self.OPCua_client) - except Exception as e: - # use the pass function instead of setting read/write fails - i.set_pass_func() - self.warn_stream("error while setting the UNB2 attribute {} read/write function. {}".format(i, e)) - - self.OPCua_client.start() # -------- # Commands