diff --git a/devices/devices/station_control.py b/devices/devices/station_control.py index dd026383e22829d5f61605c2b39a7a502ef58546..8ff0e5571232c24d7fe3b602b17768e9e29f556f 100644 --- a/devices/devices/station_control.py +++ b/devices/devices/station_control.py @@ -35,81 +35,18 @@ from common.lofar_git import get_version import logging logger = logging.getLogger() +from threading import Thread + __all__ = ["StationControl", "main"] class InitialisationException(Exception): pass -@device_logging_to_python() -class StationControl(hardware_device): - """ - - **Properties:** - - - Device Property - OPC_Server_Name - - Type:'DevString' - OPC_Server_Port - - Type:'DevULong' - OPC_Time_Out - - Type:'DevDouble' - """ - - # ----------------- - # Device Properties - # ----------------- - - DeviceProxy_Time_Out = device_property( - dtype='DevDouble', - mandatory=False, - default_value=10.0, - ) - - # By default, we assume any device is not available - # because its docker container was not started, which - # is an explicit and thus intentional action. - # We ignore such devices when initialising the station. - Ignore_Unavailable_Devices = device_property( - dtype='DevBoolean', - mandatory=False, - default_value=True, - ) - - # ---------- - # Attributes - # ---------- - version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) - initialising_station_R = attribute(dtype=numpy.bool_, access=AttrWriteType.READ, fget=lambda self: self.initialising_station) - initialisation_progress_R = attribute(dtype=numpy.int, access=AttrWriteType.READ, fget=lambda self: numpy.int(self.initialisation_progress)) - - @log_exceptions() - def delete_device(self): - """Hook to delete resources allocated in init_device. - - This method allows for any memory or other resources allocated in the - init_device method to be released. This method is called by the device - destructor and by the device Init command (a Tango built-in). - """ - self.debug_stream("Shutting down...") - - self.Off() - self.debug_stream("Shut down. Good bye.") - - # -------- - # overloaded functions - # -------- - @log_exceptions() - def configure_for_off(self): - """ user code here. is called when the state is set to OFF """ - # Stop keep-alive - try: - pass - except Exception as e: - self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) +class StationInitialiser(Thread): + def __init__(self, ignore_unavailable_devices=True, proxy_timeout=10.0): + self.ignore_unavailable_devices = ignore_unavailable_devices - @log_exceptions() - def configure_for_initialise(self): # all devices we're controlling self.devices = { "recv": DeviceProxy("LTS/RECV/1"), @@ -130,33 +67,39 @@ class StationControl(hardware_device): # set the timeout for all deviceproxies for device in self.devices.values(): - device.set_timeout_millis(int(self.DeviceProxy_Time_Out * 1000)) + device.set_timeout_millis(int(proxy_timeout * 1000)) # setup initial state - self.initialising_station = False - self.initialisation_progress = 0 + self.progress = 0 + self.set_status("Idle") + + super().__init__() + + def run(self): + self.set_status("Starting initialisation") + + self.initialise_devices() + + self.set_status("Initialisation completed") + + def is_running(self): + return self.is_alive() + + def set_status(self, status): + self.status = status + + logger.info(status) - @command() - @DebugIt() - @only_when_on() - @fault_on_error() - @log_exceptions() def initialise_devices(self): """ Initialise or re-initialise all devices on the station. - This command will take a while to execute, so should be called asynchronously. - :return:None """ try: - # mark us as busy - self.set_state(DevState.RUNNING) - # reset initialisation parameters - self.initialising_station = True - self.initialisation_progress = 0 + self.progress = 0 num_restarted_devices = 0 # determine initialisation order @@ -164,33 +107,27 @@ class StationControl(hardware_device): # First, stop all devices, to get a well defined state for device in devices_ordered: - if self.is_available(device) or not self.Ignore_Unavailable_Devices: + if self.is_available(device) or not self.ignore_unavailable_devices: self.stop_device(device) # restart devices in order for device in devices_ordered: - if self.is_available(device) or not self.Ignore_Unavailable_Devices: + if self.is_available(device) or not self.ignore_unavailable_devices: self.start_device(device) num_restarted_devices += 1 - self.initialisation_progress = 100.0 * num_restarted_devices / len(self.devices) + self.progress = 100.0 * num_restarted_devices / len(self.devices) # make sure we always finish at 100% in case of success - self.initialisation_progress = 100 + self.progress = 100 except InitialisationException as e: logger.exception("Error initialising station") # Just because they go to FAULT, doesn't mean we have to. # Note that the user can query the state of the devices from the devices themselves. - # The condition (initialisation_progress < 100 and not initialising_station) + # The condition (progress < 100 and not initialising_station) # will be an indicator initialisation went wrong. - finally: - self.initialising_station = False - - # revert to ON, which is the state we were called in. if an Exception happened by now, - # we'll go to FAULT. Both are guaranteed by our decorators. - self.set_state(DevState.ON) def is_available(self, device_name: str): """ Return whether the device 'device_name' is actually available on this server. """ @@ -212,20 +149,17 @@ class StationControl(hardware_device): # already off return - logger.info(f"Stopping device {device_name}") + self.set_status(f"[stopping {device_name}] Stopping device.") - self.set_status(f"[restarting {device_name}] Turning off device.") proxy.Off() if proxy.state() != DevState.OFF: raise InitialisationException(f"Could not turn off device {device_name}") - logger.info(f"Stopped device {device_name}") + self.set_status(f"[stopping {device_name}] Stopped device.") def start_device(self, device_name: str): """ Run the startup sequence for device 'device_name'. """ - logger.info(f"Starting device {device_name}") - proxy = self.devices[device_name] # go to a well-defined state, which may be needed if the user calls @@ -254,9 +188,108 @@ class StationControl(hardware_device): if proxy.state() != DevState.ON: raise InitialisationException(f"Could not turn on device {device_name}") - self.set_status(f"[restarting {device_name}] Succesfully restarted.") + self.set_status(f"[restarting {device_name}] Succesfully started.") + +@device_logging_to_python() +class StationControl(hardware_device): + """ + + **Properties:** + + - Device Property + OPC_Server_Name + - Type:'DevString' + OPC_Server_Port + - Type:'DevULong' + OPC_Time_Out + - Type:'DevDouble' + """ + + # ----------------- + # Device Properties + # ----------------- + + DeviceProxy_Time_Out = device_property( + dtype='DevDouble', + mandatory=False, + default_value=10.0, + ) + + # By default, we assume any device is not available + # because its docker container was not started, which + # is an explicit and thus intentional action. + # We ignore such devices when initialising the station. + Ignore_Unavailable_Devices = device_property( + dtype='DevBoolean', + mandatory=False, + default_value=True, + ) + + # ---------- + # Attributes + # ---------- + version_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: get_version()) + initialising_station_R = attribute(dtype=numpy.bool_, access=AttrWriteType.READ, fget=lambda self: self.initialiser.is_alive()) + initialisation_progress_R = attribute(dtype=numpy.int, access=AttrWriteType.READ, fget=lambda self: numpy.int(self.initialiser.progress)) + initialisation_status_R = attribute(dtype=str, access=AttrWriteType.READ, fget=lambda self: self.initialiser.status) + + @log_exceptions() + def delete_device(self): + """Hook to delete resources allocated in init_device. + + This method allows for any memory or other resources allocated in the + init_device method to be released. This method is called by the device + destructor and by the device Init command (a Tango built-in). + """ + self.debug_stream("Shutting down...") + + self.Off() + self.debug_stream("Shut down. Good bye.") + + # -------- + # overloaded functions + # -------- + @log_exceptions() + def configure_for_off(self): + """ user code here. is called when the state is set to OFF """ + # Stop keep-alive + try: + pass + except Exception as e: + self.warn_stream("Exception while stopping OPC ua connection in configure_for_off function: {}. Exception ignored".format(e)) + + @log_exceptions() + def configure_for_initialise(self): + # create an initialiser object so we can query it even before starting the (first) initialisation + self.initialiser = StationInitialiser(self.Ignore_Unavailable_Devices, self.DeviceProxy_Time_Out) + + @command() + @DebugIt() + @only_when_on() + @fault_on_error() + @log_exceptions() + def initialise_devices(self): + """ + Initialise or re-initialise all devices on the station. + + This command will take a while to execute, so should be called asynchronously. + + :return:None + """ + + if self.initialiser.is_running(): + # already initialising + return + + # join any previous attempt, if any + try: + self.initialiser.join() + except RuntimeError: + pass - logger.info(f"Started device {device_name}") + # start new initialisation attempt + self.initialiser = StationInitialiser(self.Ignore_Unavailable_Devices, self.DeviceProxy_Time_Out) + self.initialiser.start() # ---------- # Run server