# -*- coding: utf-8 -*- # # This file is part of the StatsCrosslet project # # # # Distributed under the terms of the APACHE license. # See LICENSE.txt for more info. """ OPC-UA client for LOFAR stations crosslet stats """ # PyTango imports import tango from tango import DebugIt from tango.server import run from tango.server import Device from tango.server import attribute, command, pipe from tango.server import device_property from tango import AttrQuality, DispLevel, DevState from tango import AttrWriteType, PipeWriteType # Additional import # PROTECTED REGION ID(StatsCrosslet.additionnal_import) ENABLED START # import opcua import numpy import traceback import threading import time # PROTECTED REGION END # // StatsCrosslet.additionnal_import __all__ = ["StatsCrosslet", "main"] class StatsCrosslet(Device): """ **Properties:** - Device Property OPC_Server_Name - Type:'DevString' OPC_Server_Port - Type:'DevULong' OPC_time_out - Type:'DevULong' Default_pause_time - Type:'DevDouble' Default_subband - Type:'DevULong' Default_integration_time - Type:'DevDouble' """ # PROTECTED REGION ID(StatsCrosslet.class_variable) ENABLED START # client = 0 ns = 0 obj = 0 record_cross = 0 data = [] data_lock = threading.Lock() stop_data_read_loop = threading.Event() data_acquisition_is_active = threading.Event() data_read_loop = threading.Thread() def read_data(self): # This is the the thread that continuously reads the crosslet # statistics from the station and feeds it into the Tango # property system. while self.stop_data_read_loop.is_set() is False: time.sleep(self._pause_time) if self.data_acquisition_is_active.is_set() is True: data = [] try: timestamp, visibilities, rcu_modes = self.obj.call_method(self.record_cross, self._subband, self._integration_time) self.data_lock.acquire() self._time_stamp = timestamp self._raw_visibilities = visibilities self._visibilities = numpy.array(visibilities)[0] + 1j * numpy.array(visibilities[1]) self._rcu_modes = rcu_modes self.data_lock.release() except Exception as e: print("Cannot call the method %s in the OPC-UA server %s. Trace: %s" % (self.record_cross, self.OPC_Server_Name, traceback.format_exc ())) # PROTECTED REGION END # // StatsCrosslet.class_variable # ----------------- # Device Properties # ----------------- OPC_Server_Name = device_property( dtype='DevString', default_value="okeanos" ) OPC_Server_Port = device_property( dtype='DevULong', default_value=55556 ) OPC_time_out = device_property( dtype='DevULong', default_value=1000 ) Default_pause_time = device_property( dtype='DevDouble', default_value=60.0 ) Default_subband = device_property( dtype='DevULong', default_value=150 ) Default_integration_time = device_property( dtype='DevDouble', default_value=1.0 ) # ---------- # Attributes # ---------- subband = attribute( dtype='DevUShort', access=AttrWriteType.READ_WRITE, ) integration_time = attribute( dtype='DevDouble', ) time_stamp = attribute( dtype='DevDouble', ) pause_time = attribute( dtype='DevDouble', access=AttrWriteType.READ_WRITE, ) visibilities = attribute( dtype=('DevDouble',), max_dim_x=96, ) rcu_modes = attribute( dtype=('DevString',), max_dim_x=96, ) raw_visibilities = attribute( dtype=(('DevDouble',),), max_dim_x=96, max_dim_y=96, ) # ----- # Pipes # ----- pipe_visibilities = pipe( ) pipe_raw_visibilities = pipe( ) # --------------- # General methods # --------------- def init_device(self): """Initialises the attributes and properties of the StatsCrosslet.""" Device.init_device(self) # PROTECTED REGION ID(StatsCrosslet.init_device) ENABLED START # self._visibilities = (0.0,) try: self.client = opcua.Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_time_out) self.client.connect() ns = self.client.get_namespace_index("http://lofar.eu") self.obj = self.client.get_root_node().get_child(["0:Objects", "{}:StationMetrics".format(ns), "{}:RCU".format(ns)]) self.record_cross = "{}:record_cross".format(ns) # Set default values in the read/write attributes self._pause_time = self.Default_pause_time self._integration_time = self.Default_integration_time self._subband = self.Default_subband self.data_read_loop = threading.Thread(target = self.read_data) self.data_acquisition_is_active.set() self.stop_data_read_loop.clear() self.data_read_loop.start() except Exception as e: print("Cannot connect to the OPC-UA server %s. Trace: %s" % (self.OPC_Server_Name, traceback.format_exc())) # PROTECTED REGION END # // StatsCrosslet.init_device def always_executed_hook(self): """Method always executed before any TANGO command is executed.""" # PROTECTED REGION ID(StatsCrosslet.always_executed_hook) ENABLED START # # PROTECTED REGION END # // StatsCrosslet.always_executed_hook def delete_device(self): """Hook to delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command. """ # PROTECTED REGION ID(StatsCrosslet.delete_device) ENABLED START # self.data_acquisition_is_active.clear() self.stop_data_read_loop.set() self.data_read_loop.join() if self.client is not Null: self.client.close_session() self.client_secure_channel() # PROTECTED REGION END # // StatsCrosslet.delete_device # ------------------ # Attributes methods # ------------------ def read_subband(self): # PROTECTED REGION ID(StatsCrosslet.subband_read) ENABLED START # """Return the subband attribute.""" return self._subband # PROTECTED REGION END # // StatsCrosslet.subband_read def write_subband(self, value): # PROTECTED REGION ID(StatsCrosslet.subband_write) ENABLED START # """Set the subband attribute.""" self._subband = value # PROTECTED REGION END # // StatsCrosslet.subband_write def read_integration_time(self): # PROTECTED REGION ID(StatsCrosslet.integration_time_read) ENABLED START # """Return the integration_time attribute.""" return self._integration_time # PROTECTED REGION END # // StatsCrosslet.integration_time_read def read_time_stamp(self): # PROTECTED REGION ID(StatsCrosslet.time_stamp_read) ENABLED START # """Return the time_stamp attribute.""" self.data_lock.acquire() time_stamp = self._time_stamp self.data_lock.release() return time_stamp # PROTECTED REGION END # // StatsCrosslet.time_stamp_read def read_pause_time(self): # PROTECTED REGION ID(StatsCrosslet.pause_time_read) ENABLED START # """Return the pause_time attribute.""" return self._pause_time # PROTECTED REGION END # // StatsCrosslet.pause_time_read def write_pause_time(self, value): # PROTECTED REGION ID(StatsCrosslet.pause_time_write) ENABLED START # """Set the pause_time attribute.""" self._pause_time = value # PROTECTED REGION END # // StatsCrosslet.pause_time_write def read_visibilities(self): # PROTECTED REGION ID(StatsCrosslet.visibilities_read) ENABLED START # """Return the visibilities attribute.""" self.data_lock.acquire() visibilities = self._visibilities self.data_lock.release() return visibilities # PROTECTED REGION END # // StatsCrosslet.visibilities_read def read_rcu_modes(self): # PROTECTED REGION ID(StatsCrosslet.rcu_modes_read) ENABLED START # """Return the rcu_modes attribute.""" self.data_lock.acquire() rcu_modes = self._rcu_modes self.data_lock.release() return rcu_modes # PROTECTED REGION END # // StatsCrosslet.rcu_modes_read def read_raw_visibilities(self): # PROTECTED REGION ID(StatsCrosslet.raw_visibilities_read) ENABLED START # """Return the raw_visibilities attribute.""" self.data_lock.acquire() raw_visibilities = self._raw_visibilities self.data_lock.release() return raw_visibilities # PROTECTED REGION END # // StatsCrosslet.raw_visibilities_read # ------------- # Pipes methods # ------------- def read_pipe_visibilities(self): # PROTECTED REGION ID(StatsCrosslet.pipe_visibilities_read) ENABLED START # return dict(x=0, y=0) # PROTECTED REGION END # // StatsCrosslet.pipe_visibilities_read def read_pipe_raw_visibilities(self): # PROTECTED REGION ID(StatsCrosslet.pipe_raw_visibilities_read) ENABLED START # return dict(x=0, y=0) # PROTECTED REGION END # // StatsCrosslet.pipe_raw_visibilities_read # -------- # Commands # -------- @command( ) @DebugIt() def start_acquisition(self): # PROTECTED REGION ID(StatsCrosslet.start_acquisition) ENABLED START # """ Start the data acquisition of the station`s crosslet stats. :param argin: 'DevULong' :return:None """ self.data_acquisition_is_active.set() # PROTECTED REGION END # // StatsCrosslet.start_acquisition @command( ) @DebugIt() def stop_acquisition(self): # PROTECTED REGION ID(StatsCrosslet.stop_acquisition) ENABLED START # """ Stop the data acquisition. :return:None """ self.data_acquisition_is_active.clear() # PROTECTED REGION END # // StatsCrosslet.stop_acquisition # ---------- # Run server # ---------- def main(args=None, **kwargs): """Main function of the StatsCrosslet module.""" # PROTECTED REGION ID(StatsCrosslet.main) ENABLED START # return run((StatsCrosslet,), args=args, **kwargs) # PROTECTED REGION END # // StatsCrosslet.main if __name__ == '__main__': main()