# -*- coding: utf-8 -*- # # This file is part of the SDP project # # # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. """ SDP Device Server for LOFAR2.0 """ # PyTango imports from tango.server import run from tango.server import device_property # Additional import from clients.opcua_connection import OPCUAConnection from util.attribute_wrapper import * from util.hardware_device import * from util.lofar_logging import device_logging_to_python __all__ = ["SDP", "main"] @device_logging_to_python({"device": "SDP"}) class SDP(hardware_device): """ **Properties:** - Device Property OPC_Server_Name - Type:'DevString' OPC_Server_Port - Type:'DevULong' OPC_Time_Out - Type:'DevDouble' """ # ----------------- # Device Properties # ----------------- OPC_Server_Name = device_property( dtype='DevString', mandatory=True ) OPC_Server_Port = device_property( dtype='DevULong', mandatory=True ) OPC_Time_Out = device_property( dtype='DevDouble', mandatory=True ) # ---------- # Attributes # ---------- fpga_mask_RW = attribute_wrapper(comms_annotation=["1:fpga_mask_RW"], datatype=numpy.bool_, dims=(16,), access=AttrWriteType.READ_WRITE) fpga_scrap_R = attribute_wrapper(comms_annotation=["1:fpga_scrap_R"], datatype=numpy.int32, dims=(2048,)) fpga_scrap_RW = attribute_wrapper(comms_annotation=["1:fpga_scrap_RW"], datatype=numpy.int32, dims=(2048,), access=AttrWriteType.READ_WRITE) fpga_status_R = attribute_wrapper(comms_annotation=["1:fpga_status_R"], datatype=numpy.bool_, dims=(16,)) fpga_temp_R = attribute_wrapper(comms_annotation=["1:fpga_temp_R"], datatype=numpy.float_, dims=(16,)) fpga_version_R = attribute_wrapper(comms_annotation=["1:fpga_version_R"], datatype=numpy.str_, dims=(16,)) fpga_weights_R = attribute_wrapper(comms_annotation=["1:fpga_weights_R"], datatype=numpy.int16, dims=(16, 12 * 488 * 2)) fpga_weights_RW = attribute_wrapper(comms_annotation=["1:fpga_weights_RW"], datatype=numpy.int16, dims=(16, 12 * 488 * 2), access=AttrWriteType.READ_WRITE) tr_busy_R = attribute_wrapper(comms_annotation=["1:tr_busy_R"], datatype=numpy.bool_) # NOTE: typo in node name is 'tr_reload_W' should be 'tr_reload_RW' tr_reload_RW = attribute_wrapper(comms_annotation=["1:tr_reload_W"], datatype=numpy.bool_, access=AttrWriteType.READ_WRITE) tr_tod_R = attribute_wrapper(comms_annotation=["1:tr_tod_R"], datatype=numpy.uint64) tr_uptime_R = attribute_wrapper(comms_annotation=["1:tr_uptime_R"], datatype=numpy.uint64) def always_executed_hook(self): """Method always executed before any TANGO command is executed.""" pass def delete_device(self): """Hook to delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command (a Tango built-in). """ self.debug_stream("Shutting down...") self.Off() self.debug_stream("Shut down. Good bye.") # -------- # overloaded functions # -------- def off(self): """ user code here. is called when the state is set to OFF """ # Stop keep-alive self.opcua_connection.stop() def initialise(self): """ user code here. is called when the sate is set to INIT """ """Initialises the attributes and properties of the PCC.""" # set up the OPC ua client self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Standby, self.Fault, self) # will contain all the values for this object self.setup_value_dict() # map an access helper class for i in self.attr_list(): i.set_comm_client(self.OPCua_client) self.OPCua_client.start() # -------- # Commands # -------- # ---------- # Run server # ---------- def main(args=None, **kwargs): """Main function of the SDP module.""" return run((SDP,), args=args, **kwargs) if __name__ == '__main__': main()