# -*- coding: utf-8 -*- # # This file is part of the RCUSCC project # # # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. """ RCU-SCC Device Server for LOFAR2.0 """ # PyTango imports import tango from tango import DebugIt from tango.server import run from tango.server import Device from tango.server import attribute, command from tango.server import device_property from tango import AttrQuality, DispLevel, DevState from tango import AttrWriteType, PipeWriteType # Additional import # PROTECTED REGION ID(RCUSCC.additionnal_import) ENABLED START # import opcua import traceback # PROTECTED REGION END # // RCUSCC.additionnal_import __all__ = ["RCUSCC", "main"] class RCUSCC(Device): """ **Properties:** - Device Property OPC_Server_Name - Type:'DevString' OPC_Server_Port - Type:'DevULong' OPC_Time_Out - Type:'DevDouble' """ # PROTECTED REGION ID(RCUSCC.class_variable) ENABLED START # client = 0 name_space_index = 0 obj = 0 # PROTECTED REGION END # // RCUSCC.class_variable # ----------------- # Device Properties # ----------------- OPC_Server_Name = device_property( dtype='DevString', mandatory=True ) OPC_Server_Port = device_property( dtype='DevULong', mandatory=True ) OPC_Time_Out = device_property( dtype='DevDouble', mandatory=True ) # ---------- # Attributes # ---------- time_offset_rw = attribute( dtype='DevLong64', access=AttrWriteType.READ_WRITE, ) time_offset_r = attribute( dtype='DevLong64', ) # --------------- # General methods # --------------- def init_device(self): """Initialises the attributes and properties of the RCUSCC.""" Device.init_device(self) # PROTECTED REGION ID(RCUSCC.init_device) ENABLED START # self.set_state(DevState.INIT) # Init the dict that contains attribute to OPC-UA MP/CP mappings. self.attribute_mapping = {} # Set default values in the RW/R attributes and add them to # the mapping. self._time_offset_rw = 0 self.attribute_mapping["_time_offset_rw"] = {} self._time_offset_r = 0 self.attribute_mapping["_time_offset_r"] = {} # Set defaults to property values. try: self.debug_stream("Connecting to OPC-UA server %s:%d...", self.OPC_Server_Name, self.OPC_Server_Port) self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_Out * 1000) self.client.connect() self.debug_stream("Connecting to OPC-UA server %s:%d done.", self.OPC_Server_Name, self.OPC_Server_Port) try: self.name_space_index = self.client.get_namespace_index("http://lofar.eu") except Exception as e: self.warn_stream("Cannot determine the OPC-UA name space index. Will try and use the default = 2.") self.name_space_index = 2 self.obj_node = self.client.get_objects_node() self.debug_stream("Mapping OPC-UA MP/CP to attributes...") attribute_name = "time_offset" self.attribute_mapping["_time_offset_rw"] = self.obj_node.get_child([ "{}:time_offset".format(self.name_space_index), "{}:time_offset_RW".format(self.name_space_index)]) self.attribute_mapping["_time_offset_r"] = self.obj_node.get_child([ "{}:time_offset".format(self.name_space_index), "{}:time_offset_R".format(self.name_space_index)]) self.debug_stream("Mapping OPC-UA MP/CP to attributes done.") self.set_state(DevState.ON) except Exception as e: self.set_state(DevState.FAULT) self.error_stream("Connection init to the OPC-UA server %s:%d failed. Trace: %s" % (self.OPC_Server_Name, self.OPC_Server_Port, traceback.format_exc())) raise e # PROTECTED REGION END # // RCUSCC.init_device def always_executed_hook(self): """Method always executed before any TANGO command is executed.""" # PROTECTED REGION ID(RCUSCC.always_executed_hook) ENABLED START # # PROTECTED REGION END # // RCUSCC.always_executed_hook def delete_device(self): """Hook to delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command. """ # PROTECTED REGION ID(RCUSCC.delete_device) ENABLED START # self.debug_stream("Shutting down...") self.set_state(DevState.OFF) if self.client is not None: self.client.close_session() self.client.close_secure_channel() self.debug_stream("Shut down. Good bye.") # PROTECTED REGION END # // RCUSCC.delete_device # ------------------ # Attributes methods # ------------------ def read_time_offset_rw(self): # PROTECTED REGION ID(RCUSCC.time_offset_rw_read) ENABLED START # """Return the time_offset_rw attribute.""" return self._time_offset_rw # PROTECTED REGION END # // RCUSCC.time_offset_rw_read def write_time_offset_rw(self, value): # PROTECTED REGION ID(RCUSCC.time_offset_rw_write) ENABLED START # """Set the time_offset_rw attribute.""" self.attribute_mapping["_time_offset_rw"].set_value(value) self._time_offset_rw = value # PROTECTED REGION END # // RCUSCC.time_offset_rw_write def read_time_offset_r(self): # PROTECTED REGION ID(RCUSCC.time_offset_r_read) ENABLED START # """Return the time_offset_r attribute.""" self._time_offset_r = self.attribute_mapping["_time_offset_r"].get_value() return self._time_offset_r # PROTECTED REGION END # // RCUSCC.time_offset_r_read # -------- # Commands # -------- # ---------- # Run server # ---------- def main(args=None, **kwargs): """Main function of the RCUSCC module.""" # PROTECTED REGION ID(RCUSCC.main) ENABLED START # return run((RCUSCC,), args=args, **kwargs) # PROTECTED REGION END # // RCUSCC.main if __name__ == '__main__': main()