Skip to content
Snippets Groups Projects
Commit 81b8e252 authored by Taya Snijder's avatar Taya Snijder
Browse files

worked on polling thread for temperature_manager

parent 4d6335f0
No related branches found
No related tags found
1 merge request!322Resolve L2SS-697 "Notice temperature alarm across devices"
...@@ -15,28 +15,43 @@ from tangostationcontrol.common.lofar_logging import device_logging_to_python, l ...@@ -15,28 +15,43 @@ from tangostationcontrol.common.lofar_logging import device_logging_to_python, l
from tango._tango import DevState from tango._tango import DevState
from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy from tangostationcontrol.integration_test.device_proxy import TestDeviceProxy
from tango import Util, DeviceProxy, DevSource, DeviceAttribute, AttributeInfo, AttrDataFormat, attr_config from tango import Util, DeviceProxy, DevSource, DeviceAttribute, AttributeInfo, AttrDataFormat, DebugIt, DevState
from tango.server import attribute, device_property from tangostationcontrol.devices.device_decorators import only_in_states, fault_on_error
from tango.server import attribute, device_property, command
import time
import logging import logging
from threading import Thread
logger = logging.getLogger() logger = logging.getLogger()
__all__ = ["TemperatureManager", "main"] __all__ = ["TemperatureManager", "main"]
class temp_attr: class devAttr:
def __init__(self, dev, attr): def __init__(self, dev : DeviceProxy, attr : AttributeInfo):
self.dev = dev self.dev = dev
self.attr = attr self.attr = attr
# fetch attribute configuration # fetch attribute configuration
attr_config = self.dev.get_attribute_config(attr.name) attr_config = self.dev.get_attribute_config(self.attr.name)
self.is_scalar = attr_config.data_format == AttrDataFormat.SCALAR self.is_scalar = attr_config.data_format == AttrDataFormat.SCALAR
def take_action(device_attribute : devAttr):
device_attribute.dev.fault()
def read_alarm_state(device_attribute : devAttr):
val = device_attribute.dev.read_attribute(device_attribute.attr.name)
# The "_temp_error_r" is a value or list of bools. If any of the bools is true that means there is an alarm
if device_attribute.is_scalar:
is_in_alarm = val.value
else:
is_in_alarm = any(val.value)
if is_in_alarm:
logger.warning(f"Warning!! A device: {device_attribute.dev} has entered a temperature alarm state")
@device_logging_to_python() @device_logging_to_python()
class TemperatureManager(lofar_device): class TemperatureManager(lofar_device):
...@@ -64,26 +79,28 @@ class TemperatureManager(lofar_device): ...@@ -64,26 +79,28 @@ class TemperatureManager(lofar_device):
@log_exceptions() @log_exceptions()
def configure_for_initialise(self): def configure_for_initialise(self):
# Set a reference of RECV device that is correlated to this BEAM device self.poll = False
util = Util.instance() util = Util.instance()
instance_number = self.get_name().split('/')[2] instance_number = self.get_name().split('/')[2]
ds_inst = util.get_ds_inst_name() ds_inst = util.get_ds_inst_name()
# all hardware device proxies go here # all hardware device proxies go here
self.sdp_proxy = DeviceProxy(f"{ds_inst}/SDP/{instance_number}") sdp_proxy = DeviceProxy(f"{ds_inst}/SDP/{instance_number}")
self.pdu_proxy = DeviceProxy(f"{ds_inst}/PDU/{instance_number}") pdu_proxy = DeviceProxy(f"{ds_inst}/PDU/{instance_number}")
self.recv_proxy = DeviceProxy(f"{ds_inst}/RECV/{instance_number}") recv_proxy = DeviceProxy(f"{ds_inst}/RECV/{instance_number}")
self.unb2_proxy = DeviceProxy(f"{ds_inst}/UNB2/{instance_number}") unb2_proxy = DeviceProxy(f"{ds_inst}/UNB2/{instance_number}")
self.apsct_proxy = DeviceProxy(f"{ds_inst}/APSCT/{instance_number}") apsct_proxy = DeviceProxy(f"{ds_inst}/APSCT/{instance_number}")
self.apspu_proxy = DeviceProxy(f"{ds_inst}/APSPU/{instance_number}") apspu_proxy = DeviceProxy(f"{ds_inst}/APSPU/{instance_number}")
# for easy iteration # for easy iteration
dev_list = [self.sdp_proxy, self.pdu_proxy, self.recv_proxy, self.unb2_proxy, self.apsct_proxy, self.apspu_proxy] dev_list = [sdp_proxy, pdu_proxy, recv_proxy, unb2_proxy, apsct_proxy, apspu_proxy]
self.temp_attr_list = []
self.temp_error_attr_list = []
for dev in dev_list: for dev in dev_list:
logger.info(f"Looking for temperature alarm attributes in device {dev}")
dev.set_source(DevSource.DEV) dev.set_source(DevSource.DEV)
# get a list of AttributeInfo objects # get a list of AttributeInfo objects
...@@ -92,8 +109,10 @@ class TemperatureManager(lofar_device): ...@@ -92,8 +109,10 @@ class TemperatureManager(lofar_device):
# make a list of all temperature attributes # make a list of all temperature attributes
for attr in attribute_list: for attr in attribute_list:
if "_temp_error_r" in attr.name.lowercase(): if "_temp_error_r" in attr.name.lowercase():
self.temp_attr_list.append(temp_attr(dev, attr)) self.temp_error_attr_list.append(devAttr(dev, attr))
logger.info(f"Found temperature alarm attribute {attr.name} in device {dev}")
self.polling_thread = Thread(target=self.polling_loop)
super().configure_for_initialise() super().configure_for_initialise()
@log_exceptions() @log_exceptions()
...@@ -104,28 +123,33 @@ class TemperatureManager(lofar_device): ...@@ -104,28 +123,33 @@ class TemperatureManager(lofar_device):
def configure_for_off(self): def configure_for_off(self):
super().configure_for_off() super().configure_for_off()
def read_alarm_state(self, devAttr): @command()
val = devAttr.dev.read_attribute(devAttr.attr.name) @only_in_states([DevState.ON])
def enable_polling(self):
if devAttr.is_scalar: self.poll = True
if val.value: self.polling_thread.start()
# there is an alarm state logger.info(f"Temperature manager is now polling for alarms")
else:
if any(val.value):
# there is something in the alarm state
pass
def polling_loop(self):
for i in self.temp_attr_list:
self.read_alarm_state(i)
@command()
@only_in_states([DevState.ON])
def disable_polling(self):
self.poll = False
self.polling_thread.join()
logger.info(f"Temperature manager has stopped polling for alarms")
TEMP_MANAGER_polling_state_R = attribute(dtype=bool)
def read_TEMP_MANAGER_polling_state_R(self):
return self.poll
def polling_loop(self):
while self.poll:
# we'll see how to change this later
time.sleep(1)
# go through all the temperature alarm attributes and check if there is an alarm
for i in self.temp_error_attr_list:
read_alarm_state(i)
# ---------- # ----------
......
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from .base import AbstractTestBases
class TestTemperatureManager(AbstractTestBases.TestDeviceBase):
def setUp(self):
"""Intentionally recreate the device object in each test"""
super().setUp("STAT/TemperatureManager/1")
# -*- coding: utf-8 -*-
#
# This file is part of the LOFAR 2.0 Station Software
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
from tango.test_context import DeviceTestContext
from tangostationcontrol.devices import temperature_manager, lofar_device
import mock
from tangostationcontrol.test import base
class TestTemperatureManagerDevice(base.TestCase):
def setUp(self):
super(TestTemperatureManagerDevice, self).setUp()
# Patch DeviceProxy to allow making the proxies during initialisation
# that we otherwise avoid using
for device in [temperature_manager, lofar_device]:
proxy_patcher = mock.patch.object(
device, 'DeviceProxy')
proxy_patcher.start()
self.addCleanup(proxy_patcher.stop)
def test_init(self):
with DeviceTestContext(temperature_manager.TemperatureManager, process=True, timeout=10) as proxy:
proxy.initialise()
proxy.on()
proxy.enable_polling()
self.assertTrue(proxy.TEMP_MANAGER_polling_state_R)
proxy.disable_polling()
self.assertFalse(proxy.TEMP_MANAGER_polling_state_R)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment