# -*- coding: utf-8 -*- # # This file is part of the Crossecho project # # # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. """ LOFAR2.0 Station Software Implementation of a Tango device on top of an existing OPC-UA server in Python3. """ # PyTango imports import PyTango from PyTango import DebugIt from PyTango.server import run from PyTango.server import Device, DeviceMeta from PyTango.server import attribute, command, pipe from PyTango.server import device_property from PyTango import AttrQuality, DispLevel, DevState from PyTango import AttrWriteType, PipeWriteType # Additional import # PROTECTED REGION ID(Crossecho.additionnal_import) ENABLED START # from opcua import Client import numpy # PROTECTED REGION END # // Crossecho.additionnal_import __all__ = ["Crossecho", "main"] class Crossecho(Device): """ Implementation of a Tango device on top of an existing OPC-UA server in Python3. """ __metaclass__ = DeviceMeta # PROTECTED REGION ID(Crossecho.class_variable) ENABLED START # # PROTECTED REGION END # // Crossecho.class_variable # ----------------- # Device Properties # ----------------- OPC_Server_Name = device_property( dtype='str', default_value="localhost" ) OPC_Server_Port = device_property( dtype='str', default_value="55555" ) OPC_time_out = device_property( dtype='uint', default_value=1000 ) # ---------- # Attributes # ---------- RCU_modes = attribute( dtype=('str',), max_dim_x=1024, ) crosslet_stat = attribute( dtype=('double',), max_dim_x=10240, ) # ----- # Pipes # ----- xlt_stat = pipe( label="xlt", ) # --------------- # General methods # --------------- def init_device(self): Device.init_device(self) # PROTECTED REGION ID(Crossecho.init_device) ENABLED START # try: self.client = Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_time_out) self.client.connect() self.client.load_type_definitions() objects = self.client.get_objects_node() idx = self.client.get_namespace_index(DEFAULT_URI) # Now getting a variable node using its browse path self.opc_obj = self.client.get_root_node().get_child(["0:Objects", "{}:StationMetrics".format(idx), "{}:RCU".format(idx)]) self.logger.info("Connected to the OPC-UA server %s", self.OPC_Server_Name) except: self.logger.error("Failed to connect to the OPC-UA server %s. Traceback: %s", self.OPC_Server_Name, traceback.format_exc()) # PROTECTED REGION END # // Crossecho.init_device def always_executed_hook(self): # PROTECTED REGION ID(Crossecho.always_executed_hook) ENABLED START # pass # PROTECTED REGION EloggND # // Crossecho.always_executed_hook def delete_device(self): # PROTECTED REGION ID(Crossecho.delete_device) ENABLED START # self.client.close_session() self.client.close_secure_channel() # PROTECTED REGION END # // Crossecho.delete_device # ------------------ # Attributes methods # ------------------ def read_RCU_modes(self): # PROTECTED REGION ID(Crossecho.RCU_modes_read) ENABLED START # return self.RCU_modes # PROTECTED REGION END # // Crossecho.RCU_modes_read def read_crosslet_stat(self): # PROTECTED REGION ID(Crossecho.crosslet_stat_read) ENABLED START # return crosslet_stat # PROTECTED REGION END # // Crossecho.crosslet_stat_read # ------------- # Pipes methods # ------------- def read_xlt_stat(self): # PROTECTED REGION ID(Crossecho.xlt_stat_read) ENABLED START # return dict(x=self.crosslet_stat, y=self.RCU_modes) # PROTECTED REGION END # // Crossecho.xlt_stat_read # -------- # Commands # -------- @command( ) @DebugIt() def record_cross(self): # PROTECTED REGION ID(Crossecho.record_cross) ENABLED START # if self.is_record_cross_allowed() is True: timeStamp, self.crosslet_stat, self.RCU_modes = self.opc_obj.call_method("{}:record_cross".format(idx), self.subBand, self.integrationTime) print("Timestamp is ", timeStamp) print("Crosscorrelations are ", self.crosslet_stat) print("RCU modes are", self.RCU_modes) # PROTECTED REGION END # // Crossecho.record_cross def is_record_cross_allowed(self): # PROTECTED REGION ID(Crossecho.is_record_cross_allowed) ENABLED START # return self.get_state() not in [DevState.OFF,DevState.INIT,DevState.FAULT] # PROTECTED REGION END # // Crossecho.is_record_cross_allowed # ---------- # Run server # ---------- def main(args=None, **kwargs): # PROTECTED REGION ID(Crossecho.main) ENABLED START # return run((Crossecho,), args=args, **kwargs) # PROTECTED REGION END # // Crossecho.main if __name__ == '__main__': main()