# -*- coding: utf-8 -*- # # This file is part of the StatsCrosslet project # # # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. """ OPC-UA client for LOFAR stations crosslet stats """ # PyTango imports import tango from tango import DebugIt from tango.server import run from tango.server import Device from tango.server import attribute, command from tango.server import device_property from tango import AttrQuality, DispLevel, DevState from tango import AttrWriteType, PipeWriteType # Additional import # PROTECTED REGION ID(StatsCrosslet.additionnal_import) ENABLED START # import opcua import numpy import traceback import threading from datetime import datetime # PROTECTED REGION END # // StatsCrosslet.additionnal_import __all__ = ["StatsCrosslet", "main"] class StatsCrosslet(Device): """ **Properties:** - Device Property OPC_Server_Name - Type:'DevString' OPC_Server_Port - Type:'DevULong' OPC_Time_out - Type:'DevDouble' Default_pause_time - Type:'DevDouble' Default_subband - Type:'DevULong' Default_integration_time - Type:'DevDouble' """ # PROTECTED REGION ID(StatsCrosslet.class_variable) ENABLED START # client = 0 ns = 0 obj = 0 record_cross = 0 stop_data_read_loop = False data_acquisition_is_active = False data_read_loop = threading.Thread() def read_data(self): # This is the the thread that continuously reads the crosslet # statistics from the station and feeds it into the Tango # property system. self.debug_stream("Entering read_data loop.") while self.stop_data_read_loop is False: self.debug_stream("read_data is running...") threading.Event().wait(self._pause_time) if self.data_acquisition_is_active is True: try: self.debug_stream("fetching data: subband = %d, integration_time = %d", self._subband, self._integration_time) t, visibilities_list, rcu_modes = self.obj.call_method(self.record_cross, self._subband, self._integration_time) self.debug_stream("fetching data done: t = %s, len(visibilities_list) = %d, len(rcu_modes) = %d", t, len(visibilities_list), len(rcu_modes)) self._time_stamp = t visibilities = numpy.array(visibilities_list)[0] + 1j * numpy.array(visibilities_list[1]) self._visibilities_real = visibilities.real self._visibilities_imag = visibilities.imag self._rcu_modes = rcu_modes self.debug_stream("timestamp = %s, visibilities_real [0][1] = %f", self._time_stamp, self._visibilities_real[0][1]) except KeyboardInterrupt: self.debug_stream("KBD interrupt caught, leaving data read loop.") self.stop_data_read_loop = True except Exception as e: self.error_stream("Exception: %s. Cannot call the method %s in the OPC-UA server %s. Trace: %s", e, self.record_cross, self.OPC_Server_Name, traceback.format_exc()) # PROTECTED REGION END # // StatsCrosslet.class_variable # ----------------- # Device Properties # ----------------- OPC_Server_Name = device_property( dtype='DevString', default_value="okeanos" ) OPC_Server_Port = device_property( dtype='DevULong', default_value=55556 ) OPC_Time_out = device_property( dtype='DevDouble', default_value=1.0 ) Default_pause_time = device_property( dtype='DevDouble', default_value=1.0 ) Default_subband = device_property( dtype='DevULong', default_value=150 ) Default_integration_time = device_property( dtype='DevDouble', default_value=1.0 ) # ---------- # Attributes # ---------- subband = attribute( dtype='DevUShort', access=AttrWriteType.READ_WRITE, ) integration_time = attribute( dtype='DevDouble', ) time_stamp = attribute( dtype='DevString', ) pause_time = attribute( dtype='DevDouble', access=AttrWriteType.READ_WRITE, ) rcu_modes = attribute( dtype=('DevString',), max_dim_x=96, ) visibilities_imag = attribute( dtype=(('DevDouble',),), max_dim_x=96, max_dim_y=96, ) visibilities_real = attribute( dtype=(('DevDouble',),), max_dim_x=96, max_dim_y=96, ) # --------------- # General methods # --------------- def init_device(self): """Initialises the attributes and properties of the StatsCrosslet.""" Device.init_device(self) # PROTECTED REGION ID(StatsCrosslet.init_device) ENABLED START # try: self.debug_stream("Connecting to OPC-UA server %s:%d...", self.OPC_Server_Name, self.OPC_Server_Port) self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_Time_out * 1000) self.client.connect() ns = self.client.get_namespace_index("http://lofar.eu") self.obj = self.client.get_root_node().get_child(["0:Objects", "{}:StationMetrics".format(ns), "{}:RCU".format(ns)]) self.record_cross = "{}:record_cross".format(ns) self.debug_stream("Connecting to OPC-UA server %s:%d done.", self.OPC_Server_Name, self.OPC_Server_Port) # Set default values in the read/write attributes. self._pause_time = self.Default_pause_time self._integration_time = self.Default_integration_time self._subband = self.Default_subband self.data_read_loop = threading.Thread(target = self.read_data) self.data_acquisition_is_active = False self.stop_data_read_loop = False self.data_read_loop.start() except Exception as e: self.error_stream("Cannot connect to the OPC-UA server %s. Trace: %s" % (self.OPC_Server_Name, traceback.format_exc())) raise e # PROTECTED REGION END # // StatsCrosslet.init_device def always_executed_hook(self): """Method always executed before any TANGO command is executed.""" # PROTECTED REGION ID(StatsCrosslet.always_executed_hook) ENABLED START # # PROTECTED REGION END # // StatsCrosslet.always_executed_hook def delete_device(self): """Hook to delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command. """ # PROTECTED REGION ID(StatsCrosslet.delete_device) ENABLED START # self.debug_stream("Shutting down...") self.data_acquisition_is_active = False self.stop_data_read_loop = True self.debug_stream("Waiting for data acquisition thread to shut down...") self.data_read_loop.join() self.debug_stream("Waiting for data acquisition thread to shut down, done") if self.client is not None: self.client.close_session() self.client.close_secure_channel() self.debug_stream("Shutdown done.") # PROTECTED REGION END # // StatsCrosslet.delete_device # ------------------ # Attributes methods # ------------------ def read_subband(self): # PROTECTED REGION ID(StatsCrosslet.subband_read) ENABLED START # """Return the subband attribute.""" return self._subband # PROTECTED REGION END # // StatsCrosslet.subband_read def write_subband(self, value): # PROTECTED REGION ID(StatsCrosslet.subband_write) ENABLED START # """Set the subband attribute.""" self._subband = value # PROTECTED REGION END # // StatsCrosslet.subband_write def is_subband_allowed(self, attr): # PROTECTED REGION ID(StatsCrosslet.is_subband_allowed) ENABLED START # if attr==attr.READ_REQ: return self.get_state() not in [DevState.STANDBY] else: return self.get_state() not in [DevState.STANDBY] # PROTECTED REGION END # // StatsCrosslet.is_subband_allowed def read_integration_time(self): # PROTECTED REGION ID(StatsCrosslet.integration_time_read) ENABLED START # """Return the integration_time attribute.""" return self._integration_time # PROTECTED REGION END # // StatsCrosslet.integration_time_read def is_integration_time_allowed(self, attr): # PROTECTED REGION ID(StatsCrosslet.is_integration_time_allowed) ENABLED START # return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT] # PROTECTED REGION END # // StatsCrosslet.is_integration_time_allowed def read_time_stamp(self): # PROTECTED REGION ID(StatsCrosslet.time_stamp_read) ENABLED START # """Return the time_stamp attribute.""" return self._time_stamp # PROTECTED REGION END # // StatsCrosslet.time_stamp_read def is_time_stamp_allowed(self, attr): # PROTECTED REGION ID(StatsCrosslet.is_time_stamp_allowed) ENABLED START # return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT] # PROTECTED REGION END # // StatsCrosslet.is_time_stamp_allowed def read_pause_time(self): # PROTECTED REGION ID(StatsCrosslet.pause_time_read) ENABLED START # """Return the pause_time attribute.""" return self._pause_time # PROTECTED REGION END # // StatsCrosslet.pause_time_read def write_pause_time(self, value): # PROTECTED REGION ID(StatsCrosslet.pause_time_write) ENABLED START # """Set the pause_time attribute.""" self._pause_time = value # PROTECTED REGION END # // StatsCrosslet.pause_time_write def read_rcu_modes(self): # PROTECTED REGION ID(StatsCrosslet.rcu_modes_read) ENABLED START # """Return the rcu_modes attribute.""" return self._rcu_modes # PROTECTED REGION END # // StatsCrosslet.rcu_modes_read def is_rcu_modes_allowed(self, attr): # PROTECTED REGION ID(StatsCrosslet.is_rcu_modes_allowed) ENABLED START # return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT] # PROTECTED REGION END # // StatsCrosslet.is_rcu_modes_allowed def read_visibilities_imag(self): # PROTECTED REGION ID(StatsCrosslet.visibilities_imag_read) ENABLED START # """Return the visibilities_imag attribute.""" return self._visibilities_imag # PROTECTED REGION END # // StatsCrosslet.visibilities_imag_read def is_visibilities_imag_allowed(self, attr): # PROTECTED REGION ID(StatsCrosslet.is_visibilities_imag_allowed) ENABLED START # return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT] # PROTECTED REGION END # // StatsCrosslet.is_visibilities_imag_allowed def read_visibilities_real(self): # PROTECTED REGION ID(StatsCrosslet.visibilities_real_read) ENABLED START # """Return the visibilities_real attribute.""" return self._visibilities_real # PROTECTED REGION END # // StatsCrosslet.visibilities_real_read def is_visibilities_real_allowed(self, attr): # PROTECTED REGION ID(StatsCrosslet.is_visibilities_real_allowed) ENABLED START # return self.get_state() not in [DevState.ON,DevState.OFF,DevState.INIT] # PROTECTED REGION END # // StatsCrosslet.is_visibilities_real_allowed # -------- # Commands # -------- @command( ) @DebugIt() def start_acquisition(self): # PROTECTED REGION ID(StatsCrosslet.start_acquisition) ENABLED START # """ Start the data acquisition of the station`s crosslet stats. :param argin: 'DevULong' :return:None """ if is_start_acquisition_allowed() is True: self.data_acquisition_is_active = True # PROTECTED REGION END # // StatsCrosslet.start_acquisition def is_start_acquisition_allowed(self): # PROTECTED REGION ID(StatsCrosslet.is_start_acquisition_allowed) ENABLED START # return self.get_state() not in [DevState.ON,DevState.OFF,DevState.STANDBY,DevState.RUNNING] # PROTECTED REGION END # // StatsCrosslet.is_start_acquisition_allowed @command( ) @DebugIt() def stop_acquisition(self): # PROTECTED REGION ID(StatsCrosslet.stop_acquisition) ENABLED START # """ Stop the data acquisition. :return:None """ if is_stop_acquisition_allowed() is True: self.data_acquisition_is_active = False # PROTECTED REGION END # // StatsCrosslet.stop_acquisition def is_stop_acquisition_allowed(self): # PROTECTED REGION ID(StatsCrosslet.is_stop_acquisition_allowed) ENABLED START # return self.get_state() not in [DevState.ON,DevState.INIT,DevState.RUNNING] # PROTECTED REGION END # // StatsCrosslet.is_stop_acquisition_allowed # ---------- # Run server # ---------- def main(args=None, **kwargs): """Main function of the StatsCrosslet module.""" # PROTECTED REGION ID(StatsCrosslet.main) ENABLED START # return run((StatsCrosslet,), args=args, **kwargs) # PROTECTED REGION END # // StatsCrosslet.main if __name__ == '__main__': main()