-
Taya Snijder authoredTaya Snijder authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
hardware_device.py 5.20 KiB
# -*- coding: utf-8 -*-
#
# This file is part of the PCC project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" PCC Device Server for LOFAR2.0
"""
# PyTango imports
from tango.server import Device, command
from tango import DevState, DebugIt
# Additional import
from util.attribute_wrapper import attribute_wrapper
from util.lofar_logging import log_exceptions
__all__ = ["hardware_device"]
from util.wrappers import only_in_states, fault_on_error
#@log_exceptions()
class hardware_device(Device):
"""
**Properties:**
States are as follows:
INIT = Device is initialising.
STANDBY = Device is initialised, but pends external configuration and an explicit turning on,
ON = Device is fully configured, functional, controls the hardware, and is possibly actively running,
FAULT = Device detected an unrecoverable error, and is thus malfunctional,
OFF = Device is turned off, drops connection to the hardware,
The following state transitions are implemented:
boot -> OFF: Triggered by tango. Device will be instantiated,
OFF -> INIT: Triggered by device. Device will initialise (connect to hardware, other devices),
INIT -> STANDBY: Triggered by device. Device is initialised, and is ready for additional configuration by the user,
STANDBY -> ON: Triggered by user. Device reports to be functional,
* -> FAULT: Triggered by device. Device has degraded to malfunctional, for example because the connection to the hardware is lost,
* -> FAULT: Triggered by user. Emulate a forced malfunction for integration testing purposes,
* -> OFF: Triggered by user. Device is turned off. Triggered by the Off() command,
FAULT -> INIT: Triggered by user. Device is reinitialised to recover from an error,
The user triggers their transitions by the commands reflecting the target state (Initialise(), On(), Fault()).
"""
@classmethod
def attr_list(cls):
""" Return a list of all the attribute_wrapper members of this class. """
return [v for k, v in cls.__dict__.items() if type(v) == attribute_wrapper]
def setup_value_dict(self):
""" set the initial value for all the attribute wrapper objects"""
self.value_dict = {i: i.initial_value() for i in self.attr_list()}
def init_device(self):
""" Instantiates the device in the OFF state. """
# NOTE: Will delete_device first, if necessary
Device.init_device(self)
self.set_state(DevState.OFF)
# --------
# Commands
# --------
@command()
@only_in_states([DevState.FAULT, DevState.OFF])
@DebugIt()
@fault_on_error()
@log_exceptions()
def Initialise(self):
"""
Command to ask for initialisation of this device. Can only be called in FAULT or OFF state.
:return:None
"""
self.set_state(DevState.INIT)
self.setup_value_dict()
self.configure_for_initialise()
self.set_state(DevState.STANDBY)
@command()
@only_in_states([DevState.STANDBY])
@DebugIt()
@fault_on_error()
@log_exceptions()
def On(self):
"""
Command to ask for initialisation of this device. Can only be called in FAULT or OFF state.
:return:None
"""
self.configure_for_on()
self.set_state(DevState.ON)
@command()
@DebugIt()
@log_exceptions()
def Off(self):
"""
Command to ask for shutdown of this device.
:return:None
"""
if self.get_state() == DevState.OFF:
# Already off. Don't complain.
return
# Turn off
self.set_state(DevState.OFF)
self.configure_for_off()
# Turn off again, in case of race conditions through reconnecting
self.set_state(DevState.OFF)
@command()
@only_in_states([DevState.ON, DevState.INIT, DevState.STANDBY])
@DebugIt()
@log_exceptions()
def Fault(self):
"""
FAULT state is used to indicate our connection with the OPC-UA server is down.
This device will try to reconnect once, and transition to the ON state on success.
If reconnecting fails, the user needs to call Initialise() to retry to restart this device.
:return:None
"""
self.configure_for_fault()
self.set_state(DevState.FAULT)
# functions that can be overloaded
def configure_for_fault(self):
pass
def configure_for_off(self):
pass
def configure_for_on(self):
pass
def configure_for_initialise(self):
pass
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
pass
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")