Skip to content
Snippets Groups Projects
PCC.py 6.88 KiB
Newer Older
# -*- coding: utf-8 -*-
#
# This file is part of the PCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.

""" PCC Device Server for LOFAR2.0

"""

# PyTango imports
from tango import DebugIt
from tango.server import run, command
from tango.server import device_property
# Additional import

from clients.opcua_connection import OPCUAConnection
from src.attribute_wrapper import *
from src.hardware_device import *


__all__ = ["PCC", "main"]

class PCC(hardware_device):
	"""

	**Properties:**

	- Device Property
		OPC_Server_Name
			- Type:'DevString'
		OPC_Server_Port
			- Type:'DevULong'
		OPC_Time_Out
			- Type:'DevDouble'
	"""

	# -----------------
	# Device Properties
	# -----------------

	OPC_Server_Name = device_property(
		dtype='DevString',
		mandatory=True
	)

	OPC_Server_Port = device_property(
		dtype='DevULong',
		mandatory=True
	)

	OPC_Time_Out = device_property(
		dtype='DevDouble',
		mandatory=True
	)
	OPC_namespace = device_property(
		dtype='DevString',
		mandatory=False
	)

	# ----------
	# Attributes
	# ----------
	RCU_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_mask_RW"], datatype=numpy.bool_, dims=(32,), access=AttrWriteType.READ_WRITE)
	Ant_mask_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:Ant_mask_RW"], datatype=numpy.bool_, dims=(3, 32), access=AttrWriteType.READ_WRITE)
	RCU_attenuator_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_R"], datatype=numpy.int64, dims=(3, 32))
	RCU_attenuator_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_attenuator_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE)
	RCU_band_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_R"], datatype=numpy.int64, dims=(3, 32))
	RCU_band_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_band_RW"], datatype=numpy.int64, dims=(3, 32), access=AttrWriteType.READ_WRITE)
	RCU_temperature_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_temperature_R"], datatype=numpy.float64, dims=(32,))
	RCU_Pwr_dig_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_Pwr_dig_R"], datatype=numpy.int64, dims=(32,))
	RCU_LED0_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_R"], datatype=numpy.int64, dims=(32,))
	RCU_LED0_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_LED0_RW"], datatype=numpy.int64, dims=(32,), access=AttrWriteType.READ_WRITE)
	RCU_ADC_lock_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_lock_R"], datatype=numpy.int64, dims=(3, 32))
	RCU_ADC_SYNC_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_SYNC_R"], datatype=numpy.int64, dims=(3, 32))
	RCU_ADC_JESD_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_JESD_R"], datatype=numpy.int64, dims=(3, 32))
	RCU_ADC_CML_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ADC_CML_R"], datatype=numpy.int64, dims=(3, 32))
	RCU_OUT1_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_OUT1_R"], datatype=numpy.int64, dims=(3, 32))
	RCU_OUT2_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_OUT2_R"], datatype=numpy.int64, dims=(3, 32))
	RCU_ID_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_ID_R"], datatype=numpy.int64, dims=(32,))
	RCU_version_R = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_version_R"], datatype=numpy.str_, dims=(32,))

	HBA_element_beamformer_delays_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_R"], datatype=numpy.int64, dims=(32,96))
	HBA_element_beamformer_delays_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_beamformer_delays_RW"], datatype=numpy.int64, dims=(32,96), access=AttrWriteType.READ_WRITE)
	HBA_element_pwr_R = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_R"], datatype=numpy.int64, dims=(32,96))
	HBA_element_pwr_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:HBA_element_pwr_RW"], datatype=numpy.int64, dims=(32,96), access=AttrWriteType.READ_WRITE)

	RCU_monitor_rate_RW = attribute_wrapper(comms_annotation=["2:PCC", "2:RCU_monitor_rate_RW"], datatype=numpy.float64, access=AttrWriteType.READ_WRITE)

	def delete_device(self):
		"""Hook to delete resources allocated in init_device.

		This method allows for any memory or other resources allocated in the
		init_device method to be released.  This method is called by the device
		destructor and by the device Init command (a Tango built-in).
		"""
		self.debug_stream("Shutting down...")

		self.Off()
		self.debug_stream("Shut down.  Good bye.")

	# --------
	# overloaded functions
	# --------
	def off(self):
		""" user code here. is called when the state is set to OFF """

		# Stop keep-alive
		self.opcua_connection.stop()

	def initialise(self):
		""" user code here. is called when the state is set to INIT """

		# Init the dict that contains function to OPC-UA function mappings.
		self.function_mapping = {}
		self.function_mapping["RCU_on"] = {}
		self.function_mapping["RCU_off"] = {}
		self.function_mapping["ADC_on"] = {}
		self.function_mapping["RCU_update"] = {}
		self.function_mapping["CLK_on"] = {}
		self.function_mapping["CLK_off"] = {}
		self.function_mapping["CLK_PLL_setup"] = {}
		self.OPCua_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Standby, self.Fault, self)
		# map the attributes to the OPC ua comm client
	@command()
	@DebugIt()
	@only_when_on
	@fault_on_error
	def RCU_off(self):
		"""

		:return:None
		"""
		self.function_mapping["RCU_off"]()

	@command()
	@DebugIt()
	@only_when_on
	@fault_on_error
	def RCU_on(self):
		"""

		:return:None
		"""
		self.function_mapping["RCU_on"]()

	@command()
	@DebugIt()
	@only_when_on
	@fault_on_error
	def ADC_on(self):
		"""

		:return:None
		"""
		self.function_mapping["ADC_on"]()

	@command()
	@DebugIt()
	@only_when_on
	@fault_on_error
	def RCU_update(self):
		"""

		:return:None
		"""
		self.function_mapping["RCU_update"]()

	@command()
	@DebugIt()
	@only_when_on
	@fault_on_error
	def CLK_off(self):
		"""

		:return:None
		"""
		self.function_mapping["CLK_off"]()

	@command()
	@DebugIt()
	@only_when_on
	@fault_on_error
	def CLK_on(self):
		"""

		:return:None
		"""
		self.function_mapping["CLK_on"]()

	@command()
	@DebugIt()
	@only_when_on
	@fault_on_error
	def CLK_PLL_setup(self):
		"""

		:return:None
		"""
		self.function_mapping["CLK_PLL_setup"]()

# ----------
# Run server
# ----------
def main(args=None, **kwargs):
	"""Main function of the PCC module."""
	return run((PCC,), args=args, **kwargs)


if __name__ == '__main__':
	main()