From 0889d763570a89f0adf2d03dbf0b5616d1100488 Mon Sep 17 00:00:00 2001 From: thijs snijder <snijder@astron.nl> Date: Mon, 5 Sep 2022 16:24:20 +0200 Subject: [PATCH] added initial CCD code --- .../tangostationcontrol/devices/ccd.py | 168 ++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 tangostationcontrol/tangostationcontrol/devices/ccd.py diff --git a/tangostationcontrol/tangostationcontrol/devices/ccd.py b/tangostationcontrol/tangostationcontrol/devices/ccd.py new file mode 100644 index 000000000..fe7fb4fe5 --- /dev/null +++ b/tangostationcontrol/tangostationcontrol/devices/ccd.py @@ -0,0 +1,168 @@ +# -*- coding: utf-8 -*- +# +# This file is part of the RECV project +# +# +# +# Distributed under the terms of the APACHE license. +# See LICENSE.txt for more info. + +""" CCD Device Server for LOFAR2.0 + +""" + +# PyTango imports +from tango import DebugIt +from tango.server import command, attribute, device_property +from tango import AttrWriteType +import numpy +# Additional import + +from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper +from tangostationcontrol.common.entrypoint import entry +from tangostationcontrol.common.lofar_logging import device_logging_to_python +from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES +from tangostationcontrol.devices.device_decorators import only_in_states +from tangostationcontrol.devices.opcua_device import opcua_device + +import logging +logger = logging.getLogger() + +__all__ = ["CCD", "main"] + + +@device_logging_to_python() +class CCD(opcua_device): + # ----------------- + # Device Properties + # ----------------- + + CCDTR_monitor_rate_RW_default = device_property( + dtype='DevLong64', + mandatory=False, + default_value=1 + ) + + # ----- Timing values + + CCD_On_Off_timeout = device_property( + doc='Maximum amount of time to wait after turning CCD on or off', + dtype='DevFloat', + mandatory=False, + default_value=10.0 + ) + + # ---------- + # Attributes + # ---------- + CCDTR_I2C_error_R = attribute_wrapper(comms_annotation=["CCDTR_I2C_error_R" ], datatype=numpy.int64) + CCDTR_monitor_rate_RW = attribute_wrapper(comms_annotation=["CCDTR_monitor_rate_RW" ], datatype=numpy.int64, access=AttrWriteType.READ_WRITE) + CCDTR_translator_busy_R = attribute_wrapper(comms_annotation=["CCDTR_translator_busy_R" ], datatype=numpy.bool_) + CCD_clear_lock_R = attribute_wrapper(comms_annotation=["CCD_clear_lock_R" ], datatype=numpy.bool_) + CCD_clear_lock_RW = attribute_wrapper(comms_annotation=["CCD_clear_lock_RW" ], datatype=numpy.bool_, access=AttrWriteType.READ_WRITE) + CCD_FAN_RPM_R = attribute_wrapper(comms_annotation=["CCD_FAN_RPM_R" ], datatype=numpy.float64) + CCD_INPUT_10MHz_good_R = attribute_wrapper(comms_annotation=["CCD_INPUT_10MHz_good_R" ], datatype=numpy.bool_) + CCD_INPUT_PPS_good_R = attribute_wrapper(comms_annotation=["CCD_INPUT_PPS_good_R" ], datatype=numpy.bool_) + CCD_loss_lock_R = attribute_wrapper(comms_annotation=["CCD_loss_lock_R" ], datatype=numpy.bool_) + CCD_PCB_ID_R = attribute_wrapper(comms_annotation=["CCD_PCB_ID_R" ], datatype=numpy.int64) + CCD_PCB_number_R = attribute_wrapper(comms_annotation=["CCD_PCB_number_R" ], datatype=numpy.str) + CCD_PCB_version_R = attribute_wrapper(comms_annotation=["CCD_PCB_version_R" ], datatype=numpy.str) + CCD_PLL_locked_R = attribute_wrapper(comms_annotation=["CCD_PLL_locked_R" ], datatype=numpy.bool_) + CCD_PWR_CLK_DIST_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_CLK_DIST_3V3_R" ], datatype=numpy.float64) + CCD_PWR_CLK_INPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_CLK_INPUT_3V3_R" ], datatype=numpy.float64) + CCD_PWR_CTRL_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_CTRL_3V3_R" ], datatype=numpy.float64) + CCD_PWR_OCXO_INPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_OCXO_INPUT_3V3_R" ], datatype=numpy.float64) + CCD_PWR_on_R = attribute_wrapper(comms_annotation=["CCD_PWR_on_R" ], datatype=numpy.bool_) + CCD_PWR_PLL_INPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_PLL_INPUT_3V3_R" ], datatype=numpy.float64) + CCD_PWR_PPS_INPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_PPS_INPUT_3V3_R" ], datatype=numpy.float64) + CCD_PWR_PPS_OUTPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_PPS_OUTPUT_3V3_R" ], datatype=numpy.float64) + CCD_TEMP_R = attribute_wrapper(comms_annotation=["CCD_TEMP_R" ], datatype=numpy.float64) + # ---------- + # Summarising Attributes + # ---------- + CCD_error_R = attribute(dtype=bool, fisallowed="is_attribute_access_allowed") + + def read_CCD_error_R(self): + errors = [self.read_attribute("CCDTR_I2C_error_R") > 0, + self.alarm_val("CCD_loss_lock_R"), + self.read_attribute("CCD_INPUT_10MHz_good_R"), + not self.read_attribute("CCD_INPUT_10MHz_good_R"), + not self.read_attribute("CCD_INPUT_PPS_good_R") and not self.read_attribute("CCD_clear_lock_R"), + not self.read_attribute("CCD_PLL_locked_R")] + return any(errors) + + CCD_TEMP_error_R = attribute(dtype=bool, fisallowed="is_attribute_access_allowed", polling_period=1000) + CCD_VOUT_error_R = attribute(dtype=bool, fisallowed="is_attribute_access_allowed") + + def read_CCD_TEMP_error_R(self): + return (self.alarm_val("CCD_TEMP_R")) + + def read_CCD_VOUT_error_R(self): + return ( self.alarm_val("CCD_PWR_CLK_DIST_3V3_R") + or self.alarm_val("CCD_PWR_CLK_INPUT_3V3_R") + or self.alarm_val("CCD_PWR_CTRL_3V3_R") + or self.alarm_val("CCD_PWR_OCXO_INPUT_3V3_R") + or self.alarm_val("CCD_PWR_PLL_INPUT_3V3_R") + or self.alarm_val("CCD_PWR_PPS_INPUT_3V3_R") + or self.alarm_val("CCD_PWR_PPS_OUTPUT_3V3_R") + or (not self.read_attribute("CCD_PWR_on_R")) + ) + + # -------- + # overloaded functions + # -------- + + def _initialise_hardware(self): + """ Initialise the CCD hardware. """ + + # Cycle clock + self.CCD_off() + self.wait_attribute("CCDTR_translator_busy_R", False, self.CCD_On_Off_timeout) + self.CCD_on() + self.wait_attribute("CCDTR_translator_busy_R", False, self.CCD_On_Off_timeout) + + if not self.read_attribute("CCD_PLL_locked_R"): + if self.read_attribute("CCDTR_I2C_error_R"): + raise Exception("I2C is not working. Maybe power cycle subrack to restart CLK board and translator?") + else: + raise Exception("CCD clock is not locked") + + def _disable_hardware(self): + """ Disable the CCD hardware. """ + + # Turn off the CCD + self.CCD_off() + self.wait_attribute("CCDTR_translator_busy_R", False, self.CCD_On_Off_timeout) + + # -------- + # Commands + # -------- + + @command() + @DebugIt() + @only_in_states(DEFAULT_COMMAND_STATES) + def CCD_off(self): + """ + + :return:None + """ + self.opcua_connection.call_method(["CCD_off"]) + + @command() + @DebugIt() + @only_in_states(DEFAULT_COMMAND_STATES) + def CCD_on(self): + """ + + :return:None + """ + self.opcua_connection.call_method(["CCD_on"]) + + + +# ---------- +# Run server +# ---------- +def main(**kwargs): + """Main function of the ObservationControl module.""" + return entry(CCD, **kwargs) -- GitLab