Skip to content
Snippets Groups Projects
Commit eafffe4a authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-391: Initialise devices in a different thread

parent 0c555040
Branches
Tags
1 merge request!144L2SS-391: Add boot device
...@@ -35,81 +35,18 @@ from common.lofar_git import get_version ...@@ -35,81 +35,18 @@ from common.lofar_git import get_version
import logging import logging
logger = logging.getLogger() logger = logging.getLogger()
from threading import Thread
__all__ = ["StationControl", "main"] __all__ = ["StationControl", "main"]
class InitialisationException(Exception): class InitialisationException(Exception):
pass pass
@device_logging_to_python() class StationInitialiser(Thread):
class StationControl(hardware_device): def __init__(self, ignore_unavailable_devices=True, proxy_timeout=10.0):
""" self.ignore_unavailable_devices = ignore_unavailable_devices
**Properties:**
- Device Property
OPC_Server_Name
- Type:'DevString'
OPC_Server_Port
- Type:'DevULong'
OPC_Time_Out
- Type:'DevDouble'
"""
# -----------------
# Device Properties
# -----------------
DeviceProxy_Time_Out = device_property(
dtype='DevDouble',
mandatory=False,
default_value=10.0,
)
# By default, we assume any device is not available
# because its docker container was not started, which
# is an explicit and thus intentional action.
# We ignore such devices when initialising the station.
Ignore_Unavailable_Devices = device_property(
dtype='DevBoolean',
mandatory=False,
default_value=True,
)
# ----------
# Attributes
# ----------
version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version())
initialising_station_R = attribute(dtype=numpy.bool_, access=AttrWriteType.READ, fget=lambda self: self.initialising_station)
initialisation_progress_R = attribute(dtype=numpy.int, access=AttrWriteType.READ, fget=lambda self: numpy.int(self.initialisation_progress))
@log_exceptions()
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
@log_exceptions()
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
pass
except Exception as e:
self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions()
def configure_for_initialise(self):
# all devices we're controlling # all devices we're controlling
self.devices = { self.devices = {
"recv": DeviceProxy("LTS/RECV/1"), "recv": DeviceProxy("LTS/RECV/1"),
...@@ -130,33 +67,39 @@ class StationControl(hardware_device): ...@@ -130,33 +67,39 @@ class StationControl(hardware_device):
# set the timeout for all deviceproxies # set the timeout for all deviceproxies
for device in self.devices.values(): for device in self.devices.values():
device.set_timeout_millis(int(self.DeviceProxy_Time_Out * 1000)) device.set_timeout_millis(int(proxy_timeout * 1000))
# setup initial state # setup initial state
self.initialising_station = False self.progress = 0
self.initialisation_progress = 0 self.set_status("Idle")
super().__init__()
def run(self):
self.set_status("Starting initialisation")
self.initialise_devices()
self.set_status("Initialisation completed")
def is_running(self):
return self.is_alive()
def set_status(self, status):
self.status = status
logger.info(status)
@command()
@DebugIt()
@only_when_on()
@fault_on_error()
@log_exceptions()
def initialise_devices(self): def initialise_devices(self):
""" """
Initialise or re-initialise all devices on the station. Initialise or re-initialise all devices on the station.
This command will take a while to execute, so should be called asynchronously.
:return:None :return:None
""" """
try: try:
# mark us as busy
self.set_state(DevState.RUNNING)
# reset initialisation parameters # reset initialisation parameters
self.initialising_station = True self.progress = 0
self.initialisation_progress = 0
num_restarted_devices = 0 num_restarted_devices = 0
# determine initialisation order # determine initialisation order
...@@ -164,33 +107,27 @@ class StationControl(hardware_device): ...@@ -164,33 +107,27 @@ class StationControl(hardware_device):
# First, stop all devices, to get a well defined state # First, stop all devices, to get a well defined state
for device in devices_ordered: for device in devices_ordered:
if self.is_available(device) or not self.Ignore_Unavailable_Devices: if self.is_available(device) or not self.ignore_unavailable_devices:
self.stop_device(device) self.stop_device(device)
# restart devices in order # restart devices in order
for device in devices_ordered: for device in devices_ordered:
if self.is_available(device) or not self.Ignore_Unavailable_Devices: if self.is_available(device) or not self.ignore_unavailable_devices:
self.start_device(device) self.start_device(device)
num_restarted_devices += 1 num_restarted_devices += 1
self.initialisation_progress = 100.0 * num_restarted_devices / len(self.devices) self.progress = 100.0 * num_restarted_devices / len(self.devices)
# make sure we always finish at 100% in case of success # make sure we always finish at 100% in case of success
self.initialisation_progress = 100 self.progress = 100
except InitialisationException as e: except InitialisationException as e:
logger.exception("Error initialising station") logger.exception("Error initialising station")
# Just because they go to FAULT, doesn't mean we have to. # Just because they go to FAULT, doesn't mean we have to.
# Note that the user can query the state of the devices from the devices themselves. # Note that the user can query the state of the devices from the devices themselves.
# The condition (initialisation_progress < 100 and not initialising_station) # The condition (progress < 100 and not initialising_station)
# will be an indicator initialisation went wrong. # will be an indicator initialisation went wrong.
finally:
self.initialising_station = False
# revert to ON, which is the state we were called in. if an Exception happened by now,
# we'll go to FAULT. Both are guaranteed by our decorators.
self.set_state(DevState.ON)
def is_available(self, device_name: str): def is_available(self, device_name: str):
""" Return whether the device 'device_name' is actually available on this server. """ """ Return whether the device 'device_name' is actually available on this server. """
...@@ -212,20 +149,17 @@ class StationControl(hardware_device): ...@@ -212,20 +149,17 @@ class StationControl(hardware_device):
# already off # already off
return return
logger.info(f"Stopping device {device_name}") self.set_status(f"[stopping {device_name}] Stopping device.")
self.set_status(f"[restarting {device_name}] Turning off device.")
proxy.Off() proxy.Off()
if proxy.state() != DevState.OFF: if proxy.state() != DevState.OFF:
raise InitialisationException(f"Could not turn off device {device_name}") raise InitialisationException(f"Could not turn off device {device_name}")
logger.info(f"Stopped device {device_name}") self.set_status(f"[stopping {device_name}] Stopped device.")
def start_device(self, device_name: str): def start_device(self, device_name: str):
""" Run the startup sequence for device 'device_name'. """ """ Run the startup sequence for device 'device_name'. """
logger.info(f"Starting device {device_name}")
proxy = self.devices[device_name] proxy = self.devices[device_name]
# go to a well-defined state, which may be needed if the user calls # go to a well-defined state, which may be needed if the user calls
...@@ -254,9 +188,108 @@ class StationControl(hardware_device): ...@@ -254,9 +188,108 @@ class StationControl(hardware_device):
if proxy.state() != DevState.ON: if proxy.state() != DevState.ON:
raise InitialisationException(f"Could not turn on device {device_name}") raise InitialisationException(f"Could not turn on device {device_name}")
self.set_status(f"[restarting {device_name}] Succesfully restarted.") self.set_status(f"[restarting {device_name}] Succesfully started.")
@device_logging_to_python()
class StationControl(hardware_device):
"""
**Properties:**
- Device Property
OPC_Server_Name
- Type:'DevString'
OPC_Server_Port
- Type:'DevULong'
OPC_Time_Out
- Type:'DevDouble'
"""
# -----------------
# Device Properties
# -----------------
DeviceProxy_Time_Out = device_property(
dtype='DevDouble',
mandatory=False,
default_value=10.0,
)
# By default, we assume any device is not available
# because its docker container was not started, which
# is an explicit and thus intentional action.
# We ignore such devices when initialising the station.
Ignore_Unavailable_Devices = device_property(
dtype='DevBoolean',
mandatory=False,
default_value=True,
)
# ----------
# Attributes
# ----------
version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version())
initialising_station_R = attribute(dtype=numpy.bool_, access=AttrWriteType.READ, fget=lambda self: self.initialiser.is_alive())
initialisation_progress_R = attribute(dtype=numpy.int, access=AttrWriteType.READ, fget=lambda self: numpy.int(self.initialiser.progress))
initialisation_status_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: self.initialiser.status)
@log_exceptions()
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
@log_exceptions()
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
pass
except Exception as e:
self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions()
def configure_for_initialise(self):
# create an initialiser object so we can query it even before starting the (first) initialisation
self.initialiser = StationInitialiser(self.Ignore_Unavailable_Devices, self.DeviceProxy_Time_Out)
@command()
@DebugIt()
@only_when_on()
@fault_on_error()
@log_exceptions()
def initialise_devices(self):
"""
Initialise or re-initialise all devices on the station.
This command will take a while to execute, so should be called asynchronously.
:return:None
"""
if self.initialiser.is_running():
# already initialising
return
# join any previous attempt, if any
try:
self.initialiser.join()
except RuntimeError:
pass
logger.info(f"Started device {device_name}") # start new initialisation attempt
self.initialiser = StationInitialiser(self.Ignore_Unavailable_Devices, self.DeviceProxy_Time_Out)
self.initialiser.start()
# ---------- # ----------
# Run server # Run server
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment