Skip to content
Snippets Groups Projects
Commit 0889d763 authored by Taya Snijder's avatar Taya Snijder
Browse files

added initial CCD code

parent 7437d05a
No related branches found
No related tags found
2 merge requests!415Draft: Resolve L2SS-912 "Add ccd device testing",!406Resolve L2SS-912 "Add ccd device"
# -*- coding: utf-8 -*-
#
# This file is part of the RECV project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" CCD Device Server for LOFAR2.0
"""
# PyTango imports
from tango import DebugIt
from tango.server import command, attribute, device_property
from tango import AttrWriteType
import numpy
# Additional import
from tangostationcontrol.clients.attribute_wrapper import attribute_wrapper
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
from tangostationcontrol.devices.device_decorators import only_in_states
from tangostationcontrol.devices.opcua_device import opcua_device
import logging
logger = logging.getLogger()
__all__ = ["CCD", "main"]
@device_logging_to_python()
class CCD(opcua_device):
# -----------------
# Device Properties
# -----------------
CCDTR_monitor_rate_RW_default = device_property(
dtype='DevLong64',
mandatory=False,
default_value=1
)
# ----- Timing values
CCD_On_Off_timeout = device_property(
doc='Maximum amount of time to wait after turning CCD on or off',
dtype='DevFloat',
mandatory=False,
default_value=10.0
)
# ----------
# Attributes
# ----------
CCDTR_I2C_error_R = attribute_wrapper(comms_annotation=["CCDTR_I2C_error_R" ], datatype=numpy.int64)
CCDTR_monitor_rate_RW = attribute_wrapper(comms_annotation=["CCDTR_monitor_rate_RW" ], datatype=numpy.int64, access=AttrWriteType.READ_WRITE)
CCDTR_translator_busy_R = attribute_wrapper(comms_annotation=["CCDTR_translator_busy_R" ], datatype=numpy.bool_)
CCD_clear_lock_R = attribute_wrapper(comms_annotation=["CCD_clear_lock_R" ], datatype=numpy.bool_)
CCD_clear_lock_RW = attribute_wrapper(comms_annotation=["CCD_clear_lock_RW" ], datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
CCD_FAN_RPM_R = attribute_wrapper(comms_annotation=["CCD_FAN_RPM_R" ], datatype=numpy.float64)
CCD_INPUT_10MHz_good_R = attribute_wrapper(comms_annotation=["CCD_INPUT_10MHz_good_R" ], datatype=numpy.bool_)
CCD_INPUT_PPS_good_R = attribute_wrapper(comms_annotation=["CCD_INPUT_PPS_good_R" ], datatype=numpy.bool_)
CCD_loss_lock_R = attribute_wrapper(comms_annotation=["CCD_loss_lock_R" ], datatype=numpy.bool_)
CCD_PCB_ID_R = attribute_wrapper(comms_annotation=["CCD_PCB_ID_R" ], datatype=numpy.int64)
CCD_PCB_number_R = attribute_wrapper(comms_annotation=["CCD_PCB_number_R" ], datatype=numpy.str)
CCD_PCB_version_R = attribute_wrapper(comms_annotation=["CCD_PCB_version_R" ], datatype=numpy.str)
CCD_PLL_locked_R = attribute_wrapper(comms_annotation=["CCD_PLL_locked_R" ], datatype=numpy.bool_)
CCD_PWR_CLK_DIST_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_CLK_DIST_3V3_R" ], datatype=numpy.float64)
CCD_PWR_CLK_INPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_CLK_INPUT_3V3_R" ], datatype=numpy.float64)
CCD_PWR_CTRL_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_CTRL_3V3_R" ], datatype=numpy.float64)
CCD_PWR_OCXO_INPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_OCXO_INPUT_3V3_R" ], datatype=numpy.float64)
CCD_PWR_on_R = attribute_wrapper(comms_annotation=["CCD_PWR_on_R" ], datatype=numpy.bool_)
CCD_PWR_PLL_INPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_PLL_INPUT_3V3_R" ], datatype=numpy.float64)
CCD_PWR_PPS_INPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_PPS_INPUT_3V3_R" ], datatype=numpy.float64)
CCD_PWR_PPS_OUTPUT_3V3_R = attribute_wrapper(comms_annotation=["CCD_PWR_PPS_OUTPUT_3V3_R" ], datatype=numpy.float64)
CCD_TEMP_R = attribute_wrapper(comms_annotation=["CCD_TEMP_R" ], datatype=numpy.float64)
# ----------
# Summarising Attributes
# ----------
CCD_error_R = attribute(dtype=bool, fisallowed="is_attribute_access_allowed")
def read_CCD_error_R(self):
errors = [self.read_attribute("CCDTR_I2C_error_R") > 0,
self.alarm_val("CCD_loss_lock_R"),
self.read_attribute("CCD_INPUT_10MHz_good_R"),
not self.read_attribute("CCD_INPUT_10MHz_good_R"),
not self.read_attribute("CCD_INPUT_PPS_good_R") and not self.read_attribute("CCD_clear_lock_R"),
not self.read_attribute("CCD_PLL_locked_R")]
return any(errors)
CCD_TEMP_error_R = attribute(dtype=bool, fisallowed="is_attribute_access_allowed", polling_period=1000)
CCD_VOUT_error_R = attribute(dtype=bool, fisallowed="is_attribute_access_allowed")
def read_CCD_TEMP_error_R(self):
return (self.alarm_val("CCD_TEMP_R"))
def read_CCD_VOUT_error_R(self):
return ( self.alarm_val("CCD_PWR_CLK_DIST_3V3_R")
or self.alarm_val("CCD_PWR_CLK_INPUT_3V3_R")
or self.alarm_val("CCD_PWR_CTRL_3V3_R")
or self.alarm_val("CCD_PWR_OCXO_INPUT_3V3_R")
or self.alarm_val("CCD_PWR_PLL_INPUT_3V3_R")
or self.alarm_val("CCD_PWR_PPS_INPUT_3V3_R")
or self.alarm_val("CCD_PWR_PPS_OUTPUT_3V3_R")
or (not self.read_attribute("CCD_PWR_on_R"))
)
# --------
# overloaded functions
# --------
def _initialise_hardware(self):
""" Initialise the CCD hardware. """
# Cycle clock
self.CCD_off()
self.wait_attribute("CCDTR_translator_busy_R", False, self.CCD_On_Off_timeout)
self.CCD_on()
self.wait_attribute("CCDTR_translator_busy_R", False, self.CCD_On_Off_timeout)
if not self.read_attribute("CCD_PLL_locked_R"):
if self.read_attribute("CCDTR_I2C_error_R"):
raise Exception("I2C is not working. Maybe power cycle subrack to restart CLK board and translator?")
else:
raise Exception("CCD clock is not locked")
def _disable_hardware(self):
""" Disable the CCD hardware. """
# Turn off the CCD
self.CCD_off()
self.wait_attribute("CCDTR_translator_busy_R", False, self.CCD_On_Off_timeout)
# --------
# Commands
# --------
@command()
@DebugIt()
@only_in_states(DEFAULT_COMMAND_STATES)
def CCD_off(self):
"""
:return:None
"""
self.opcua_connection.call_method(["CCD_off"])
@command()
@DebugIt()
@only_in_states(DEFAULT_COMMAND_STATES)
def CCD_on(self):
"""
:return:None
"""
self.opcua_connection.call_method(["CCD_on"])
# ----------
# Run server
# ----------
def main(**kwargs):
"""Main function of the ObservationControl module."""
return entry(CCD, **kwargs)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment