Skip to content
Snippets Groups Projects
Commit db742014 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'L2SS-1533-improve-station-transition' into 'master'

Resolve L2SS-1533 "Improve station transition"

Closes L2SS-1533

See merge request !851
parents 77e11aec 56668e49
No related branches found
No related tags found
1 merge request!851Resolve L2SS-1533 "Improve station transition"
Showing
with 489 additions and 318 deletions
......@@ -166,6 +166,7 @@ Next change the version in the following places:
# Release Notes
* 0.30.0 Refactor station state transitions using the State pattern
* 0.29.2 Bump MinIO versions
* 0.29.1-3 Fix central logs service consul name
* 0.29.1-2 Fix vector tenant_id, must be string
......
0.29.2
0.30.0
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from enum import IntEnum
from typing import Dict, Optional
from functools import wraps
from tango import DevState, DeviceProxy
from .type_checking import device_class_matches
from tango import DevState
# -----------------------
# Device states
......@@ -29,77 +23,3 @@ POWER_ON_COMMAND_STATES = DEFAULT_COMMAND_STATES + [DevState.OFF, DevState.INIT]
# States in which the hardware can be powered off
POWER_OFF_COMMAND_STATES = OPERATIONAL_STATES + [DevState.STANDBY, DevState.DISABLE]
# -----------------------
# Station states
# -----------------------
class StationState(IntEnum):
"""Station states enumeration"""
OFF = 0
HIBERNATE = 1
STANDBY = 2
ON = 3
# Contains which transitions are allowed for a given states
ALLOWED_STATION_STATE_TRANSITIONS = {
StationState.OFF: [StationState.HIBERNATE],
StationState.HIBERNATE: [StationState.OFF, StationState.STANDBY],
StationState.STANDBY: [StationState.HIBERNATE, StationState.ON],
StationState.ON: [StationState.STANDBY],
}
DEVICES_ON_IN_STATION_STATE: Dict[str, Optional[StationState]] = {
"""In which StationState each device class should be switched ON."""
"StationManager": StationState.HIBERNATE,
"CCD": StationState.HIBERNATE,
"EC": StationState.HIBERNATE,
"PCON": StationState.HIBERNATE,
"PSOC": StationState.HIBERNATE,
"TemperatureManager": StationState.HIBERNATE,
"Docker": StationState.HIBERNATE,
"Configuration": StationState.HIBERNATE,
"SDPFirmware": StationState.STANDBY,
"APS": StationState.STANDBY,
"APSPU": StationState.STANDBY,
"APSCT": StationState.STANDBY,
"UNB2": StationState.STANDBY,
"RECVH": StationState.STANDBY,
"RECVL": StationState.STANDBY,
# TODO(JDM) Lingering observations (and debug observation 0) should not be booted as we do not persist their settings
"ObservationField": None,
# Unmentioned devices will go ON in this state
"_default": StationState.ON,
}
def run_if_device_on_in_station_state(
target_state: StationState,
state_table: Dict[str, StationState] = DEVICES_ON_IN_STATION_STATE,
):
"""Decorator for a function func(device). It only executes the decorated function if the device should be turned ON in the target_state. Returns None otherwise."""
def inner(func):
@wraps(func)
def wrapper(device: DeviceProxy, *args, **kwargs):
def should_execute():
for pattern, station_state in state_table.items():
if not device_class_matches(device, pattern):
continue
return target_state == station_state
# no matches, use default (if any)
return target_state == state_table.get("_default")
# execute function if we should in the configured state
return func(device, *args, **kwargs) if should_execute() else None
return wrapper
return inner
......@@ -3,7 +3,6 @@
"""Power Hierarchy for PyTango devices"""
import asyncio
from typing import Dict, Optional
import logging
......@@ -12,12 +11,9 @@ from tango import DeviceProxy, DevState
from tangostationcontrol.devices.base_device_classes.hierarchy_device import (
AbstractHierarchyDevice,
)
from tangostationcontrol.common.states import (
StationState,
run_if_device_on_in_station_state,
)
from tangostationcontrol.states.station_state import run_if_device_on_in_station_state
from tangostationcontrol.states.station_state_enum import StationStateEnum
from tangostationcontrol.common.type_checking import device_class_matches
from tangostationcontrol.common.device_decorators import suppress_exceptions
logger = logging.getLogger()
......@@ -30,10 +26,6 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
POWER_CHILD_PROPERTY = "Power_Children"
HIBERNATE_TIMEOUT = 60.0
STANDBY_TIMEOUT = 300.0
ON_TIMEOUT = 600.0
def init(
self,
device_name: str,
......@@ -67,12 +59,12 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
device.off()
logger.info("Shutting down %s: Succesful: state=%s", device, device.state())
def _off_to_hibernate(self):
def off_to_hibernate(self):
"""Manage the device operations involved in the OFF -> HIBERNATE state transition.
Only minimal hardware is powered."""
@suppress_exceptions(self.continue_on_failure)
@run_if_device_on_in_station_state(StationState.HIBERNATE)
@run_if_device_on_in_station_state(StationStateEnum.HIBERNATE)
def boot_to_hibernate(device: DeviceProxy):
# TODO(JDM): wait for CCDTR to be powered on before booting it?
self._boot_device(device)
......@@ -94,25 +86,13 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
# Return the suppressed exceptions
return boot_to_hibernate.exceptions
async def off_to_hibernate(self):
"""Manage the device operations involved in the OFF -> HIBERNATE state transition.
Only minimal hardware is powered. ASYNC Version.
"""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._off_to_hibernate),
timeout=self.HIBERNATE_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError("OFF -> HIBERNATE state transition timed out.") from exc
def _hibernate_to_standby(self):
def hibernate_to_standby(self):
"""Manage the device operations involved in the HIBERNATE -> STANDBY state transition.
Powers hardware except antennas and firmware.
"""
@suppress_exceptions(self.continue_on_failure)
@run_if_device_on_in_station_state(StationState.STANDBY)
@run_if_device_on_in_station_state(StationStateEnum.STANDBY)
def boot_to_standby(device: DeviceProxy):
self._boot_device(device)
......@@ -127,25 +107,11 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
# Return the suppressed exceptions
return boot_to_standby.exceptions
async def hibernate_to_standby(self):
"""Manage the device operations involved in the HIBERNATE -> STANDBY state transition.
Powers hardware except antennas and firmware. ASYNC version.
"""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._hibernate_to_standby),
timeout=self.STANDBY_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError(
"HIBERNATE -> STANDBY state transition timed out."
) from exc
def _standby_to_hibernate(self):
def standby_to_hibernate(self):
"""Manage the device operations involved in the STANDBY -> HIBERNATE state transition."""
@suppress_exceptions(self.continue_on_failure)
@run_if_device_on_in_station_state(StationState.STANDBY)
@run_if_device_on_in_station_state(StationStateEnum.STANDBY)
def power_off_from_standby(device: DeviceProxy):
# UNB2: Power off the Uniboards
if device_class_matches(device, "UNB2"):
......@@ -157,7 +123,7 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
# now transition to hibernate
@suppress_exceptions(self.continue_on_failure)
@run_if_device_on_in_station_state(StationState.STANDBY)
@run_if_device_on_in_station_state(StationStateEnum.STANDBY)
def shutdown_to_hibernate(device: DeviceProxy):
self._shutdown_device(device)
......@@ -166,20 +132,7 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
# Return the suppressed exceptions
return power_off_from_standby.exceptions + shutdown_to_hibernate.exceptions
async def standby_to_hibernate(self):
"""Manage the device operations involved in the STANDBY -> HIBERNATE state transition.
ASYNC version."""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._standby_to_hibernate),
timeout=self.HIBERNATE_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError(
"STANDBY -> HIBERNATE state transition timed out."
) from exc
def _standby_to_on(self):
def standby_to_on(self):
"""Manage the device operations involved in the STANDBY -> ON state transition.
Powers power-hungry devices (SDP, antennas).
"""
......@@ -209,7 +162,7 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
# now transition to on
@suppress_exceptions(self.continue_on_failure)
@run_if_device_on_in_station_state(StationState.ON)
@run_if_device_on_in_station_state(StationStateEnum.ON)
def boot_to_on(device: DeviceProxy):
self._boot_device(device)
......@@ -232,24 +185,12 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
power_on.exceptions + boot_to_on.exceptions + power_antennas_on.exceptions
)
async def standby_to_on(self):
"""Manage the device operations involved in the STANDBY -> ON state transition.
Powers power-hungry devices (SDP, antennas). ASYNC version.
"""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._standby_to_on),
timeout=self.ON_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError("STANDBY -> ON state transition timed out.") from exc
def _on_to_standby(self):
def on_to_standby(self):
"""Manage the device operations involved in the ON -> STANDBY state transition."""
# turn off power to hardware we also will turn off the software device for
@suppress_exceptions(self.continue_on_failure)
@run_if_device_on_in_station_state(StationState.ON)
@run_if_device_on_in_station_state(StationStateEnum.ON)
def power_off_from_on(device: DeviceProxy):
# AntennaField: Power off all antennas
if device_class_matches(device, ("AFL", "AFH")):
......@@ -262,7 +203,7 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
# turn off software devices
@suppress_exceptions(self.continue_on_failure)
@run_if_device_on_in_station_state(StationState.ON)
@run_if_device_on_in_station_state(StationStateEnum.ON)
def shutdown_to_standby(device: DeviceProxy):
self._shutdown_device(device)
......@@ -297,14 +238,3 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
+ shutdown_to_standby.exceptions
+ power_off.exceptions
)
async def on_to_standby(self):
"""Manage the device operations involved in the ON -> STANDBY state transition.
ASYNC version."""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._on_to_standby),
timeout=self.STANDBY_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError("ON -> STANDBY state transition timed out.") from exc
......@@ -6,9 +6,8 @@
"""
import asyncio
import logging
from typing import Callable, Awaitable
from tango import DebugIt, DevState, DevFailed, Except
from tango import DebugIt
# pytango imports
from tango.server import attribute, command, device_property
......@@ -17,15 +16,14 @@ from tango.server import attribute, command, device_property
from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.common.lofar_logging import exception_to_str
from tangostationcontrol.common.lofar_logging import log_exceptions
from tangostationcontrol.common.states import (
StationState,
ALLOWED_STATION_STATE_TRANSITIONS,
)
from tangostationcontrol.devices.base_device_classes.async_device import AsyncDevice
from tangostationcontrol.states.station_state_enum import StationStateEnum
from tangostationcontrol.devices.base_device_classes.power_hierarchy import (
PowerHierarchyDevice,
)
from tangostationcontrol.devices.base_device_classes.async_device import AsyncDevice
from tangostationcontrol.metrics import device_metrics
from tangostationcontrol.states.off import OffState
logger = logging.getLogger()
......@@ -72,11 +70,11 @@ class StationManager(AsyncDevice):
def station_name_R(self):
return self.Station_Name
@attribute(dtype=StationState, fisallowed="is_attribute_access_allowed")
@attribute(dtype=StationStateEnum, fisallowed="is_attribute_access_allowed")
def station_state_R(self):
return self.station_state
return self.station_state_name
@attribute(dtype=StationState, fisallowed="is_attribute_access_allowed")
@attribute(dtype=StationStateEnum, fisallowed="is_attribute_access_allowed")
def requested_station_state_R(self):
return self.requested_station_state
......@@ -93,16 +91,16 @@ class StationManager(AsyncDevice):
@attribute(dtype=bool, fisallowed="is_attribute_access_allowed")
def last_requested_transition_ok_R(self):
return self.last_requested_transition_exceptions == []
return not self.last_requested_transition_exceptions
# --------
# overloaded functions
# --------
def __init__(self, cl, name):
self.station_state = StationState.OFF
self.stationmanager_ph = None
self.requested_station_state = StationState.OFF
self.init_station_state = OffState(self, self.stationmanager_ph)
self.set_station_state(self.init_station_state)
self.requested_station_state = self.init_station_state.state
self.last_requested_transition_exceptions = []
self.transition_lock = asyncio.Lock()
......@@ -130,16 +128,21 @@ class StationManager(AsyncDevice):
if self.transition_lock is None:
self.transition_lock = asyncio.Lock()
# Set the station state to off
self.station_state = StationState.OFF
# Initialise power hierarchy based on current settings
self._initialise_power_hierarchy()
# Set the station state to off
self.set_station_state(OffState(self, self.stationmanager_ph))
# --------
# internal functions
# --------
def set_station_state(self, station_state):
"""Change the Station state"""
self.station_state = station_state
self.station_state_name = StationStateEnum[station_state.state.name]
def _initialise_power_hierarchy(self):
"""Create and initialise the PowerHierarchy to manage the power sequence"""
# create a power hierarchy device instance
......@@ -148,70 +151,6 @@ class StationManager(AsyncDevice):
self.get_name(), continue_on_failure=self.Suppress_State_Transition_Failures
)
def _is_transition_allowed(self, to_state) -> bool:
# get allowed transitions for the current state
allowed_transitions = ALLOWED_STATION_STATE_TRANSITIONS[self.station_state]
# check if the station is already in state it wants to go to
if to_state == self.station_state:
logger.warning(
"Requested to go to %s state, but am already in %s state.",
to_state.name,
self.station_state.name,
)
return False
# check if it is allowed to transition to the desired state
elif to_state not in allowed_transitions:
raise Exception(
f"State transition to {to_state.name} state "
f"only allowed in {[i.name for i in allowed_transitions]} "
f"Current state: {self.station_state.name}"
)
else:
# the requested state transition is allowed
return True
async def _transition(
self,
target_state: StationState,
transition_func: Callable[[], Awaitable[None]],
):
"""Transition to a station state using `transition_func`.
:param transition_desc : string description of the transition to be performed
:param transition_func : function that implements the transition
"""
# StationManager device must be in ON state
if self.get_state() != DevState.ON:
raise Exception(
f"Station Manager must be in ON state. Current state is {self.get_state()}"
)
logger.info(
"Station %s requested to perform the %s -> %s Power Sequence",
self.Station_Name,
self.station_state.name,
target_state.name,
)
try:
self.requested_station_state = target_state
self.last_requested_transition_exceptions = await transition_func()
except Exception as ex:
# unsuppressed exception
self.last_requested_transition_exceptions = [(None, ex)]
raise
logger.info(
"Station %s has correctly completed the %s -> %s Power Sequence",
self.Station_Name,
self.station_state.name,
target_state.name,
)
# --------
# Commands
# --------
......@@ -226,14 +165,7 @@ class StationManager(AsyncDevice):
"""
await self.transition_lock.acquire()
try:
if not self._is_transition_allowed(StationState.OFF):
return
# not implemented -> call the correct state transition function
# update the station_state variable when successful
self.requested_station_state = StationState.OFF
self.station_state = StationState.OFF
await self.station_state.station_off()
finally:
self.transition_lock.release()
......@@ -247,29 +179,7 @@ class StationManager(AsyncDevice):
"""
await self.transition_lock.acquire()
try:
if not self._is_transition_allowed(StationState.HIBERNATE):
return
# call the correct state transition function
try:
if self.station_state == StationState.OFF:
await self._transition(
StationState.HIBERNATE, self.stationmanager_ph.off_to_hibernate
)
elif self.station_state == StationState.STANDBY:
await self._transition(
StationState.HIBERNATE,
self.stationmanager_ph.standby_to_hibernate,
)
except DevFailed as exc:
error_string = f"Station {self.Station_Name} \
can not transition to {StationState.HIBERNATE.name} state. \
Current state is {self.station_state.name}"
logger.exception(error_string)
Except.re_throw_exception(exc, "DevFailed", error_string, __name__)
# update the station_state variable when successful
self.station_state = StationState.HIBERNATE
await self.station_state.station_hibernate()
finally:
self.transition_lock.release()
......@@ -283,29 +193,7 @@ class StationManager(AsyncDevice):
"""
await self.transition_lock.acquire()
try:
if not self._is_transition_allowed(StationState.STANDBY):
return
# call the correct state transition function
try:
if self.station_state == StationState.HIBERNATE:
await self._transition(
StationState.STANDBY,
self.stationmanager_ph.hibernate_to_standby,
)
elif self.station_state == StationState.ON:
await self._transition(
StationState.STANDBY, self.stationmanager_ph.on_to_standby
)
except DevFailed as exc:
error_string = f"Station {self.Station_Name} \
can not transition to {StationState.STANDBY.name} state. \
Current state is {self.station_state.name}"
logger.exception(error_string)
Except.re_throw_exception(exc, "DevFailed", error_string, __name__)
# update the station_state variable when successful
self.station_state = StationState.STANDBY
await self.station_state.station_standby()
finally:
self.transition_lock.release()
......@@ -319,22 +207,6 @@ class StationManager(AsyncDevice):
"""
await self.transition_lock.acquire()
try:
if not self._is_transition_allowed(StationState.ON):
return
# call the correct state transition function
try:
await self._transition(
StationState.ON, self.stationmanager_ph.standby_to_on
)
except DevFailed as exc:
error_string = f"Station {self.Station_Name} \
can not transition to {StationState.ON.name} state. \
Current state is {self.station_state.name}"
logger.exception(error_string)
Except.re_throw_exception(exc, "DevFailed", error_string, __name__)
# update the station_state variable when successful
self.station_state = StationState.ON
await self.station_state.station_on()
finally:
self.transition_lock.release()
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import logging
from tango import DevFailed, Except
from tangostationcontrol.states.station_state import StationState
from tangostationcontrol.states.station_state_enum import StationStateEnum
__all__ = ["HibernateState"]
logger = logging.getLogger()
class HibernateState(StationState):
"""HIBERNATE station state"""
def __init__(self, station_manager, power_hierarchy) -> None:
self.timeout = self.get_timeout(StationStateEnum.HIBERNATE)
super().__init__(StationStateEnum.HIBERNATE, station_manager, power_hierarchy)
async def station_hibernate(self):
self.get_transition_current_state_error()
return
async def station_on(self):
return self.disallowed_transition_error(StationStateEnum.ON)
async def station_off(self):
"""Transition HIBERNATE -> OFF"""
from tangostationcontrol.states.off import OffState
# not implemented -> call the correct state transition function
self._station_manager.requested_station_state = OffState(
self._station_manager, self._power_hierarchy
).state.name
self._station_manager.set_station_state(
OffState(self._station_manager, self._power_hierarchy)
)
async def station_standby(self):
"""Transition HIBERNATE -> STANDBY"""
from tangostationcontrol.states.standby import StandbyState
target_state = StationStateEnum.STANDBY
try:
await self._transition(
target_state,
self._power_hierarchy.hibernate_to_standby,
)
except DevFailed as exc:
error_string = self.get_transition_error_string(target_state)
logger.exception(error_string)
Except.re_throw_exception(
exc, "DevFailed", error_string, self._station_manager.__name__
)
self._station_manager.set_station_state(
StandbyState(self._station_manager, self._power_hierarchy)
)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import logging
from tango import DevFailed, Except
from tangostationcontrol.states.station_state import StationState
from tangostationcontrol.states.station_state_enum import StationStateEnum
__all__ = ["OffState"]
logger = logging.getLogger()
class OffState(StationState):
"""OFF station state"""
def __init__(self, station_manager, power_hierarchy) -> None:
super().__init__(StationStateEnum.OFF, station_manager, power_hierarchy)
async def station_off(self):
self.get_transition_current_state_error()
return
async def station_standby(self):
return self.disallowed_transition_error(StationStateEnum.STANDBY)
async def station_on(self):
return self.disallowed_transition_error(StationStateEnum.ON)
async def station_hibernate(self) -> None:
"""Transition OFF -> HIBERNATE"""
from tangostationcontrol.states.hibernate import HibernateState
target_state = StationStateEnum.HIBERNATE
try:
await self._transition(
target_state,
self._power_hierarchy.off_to_hibernate,
)
except DevFailed as exc:
error_string = self.get_transition_error_string(target_state)
logger.exception(error_string)
Except.re_throw_exception(
exc, "DevFailed", error_string, self._station_manager.__name__
)
self._station_manager.set_station_state(
HibernateState(self._station_manager, self._power_hierarchy)
)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import logging
from tango import DevFailed, Except
from tangostationcontrol.states.station_state import StationState
from tangostationcontrol.states.station_state_enum import StationStateEnum
__all__ = ["OnState"]
logger = logging.getLogger()
class OnState(StationState):
"""ON station state"""
def __init__(self, station_manager, power_hierarchy) -> None:
self.timeout = self.get_timeout(StationStateEnum.ON)
super().__init__(StationStateEnum.ON, station_manager, power_hierarchy)
async def station_off(self):
return self.disallowed_transition_error(StationStateEnum.ON)
async def station_hibernate(self):
return self.disallowed_transition_error(StationStateEnum.HIBERNATE)
async def station_on(self):
self.get_transition_current_state_error()
return
async def station_standby(self):
"""Transition ON -> STANDBY"""
from tangostationcontrol.states.standby import StandbyState
target_state = StationStateEnum.STANDBY
try:
await self._transition(
target_state,
self._power_hierarchy.on_to_standby,
)
except DevFailed as exc:
error_string = self.get_transition_error_string(target_state)
logger.exception(error_string)
Except.re_throw_exception(
exc, "DevFailed", error_string, self._station_manager.__name__
)
self._station_manager.set_station_state(
StandbyState(self._station_manager, self._power_hierarchy)
)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import logging
from tango import DevFailed, Except
from tangostationcontrol.states.station_state import StationState
from tangostationcontrol.states.station_state_enum import StationStateEnum
__all__ = ["StandbyState"]
logger = logging.getLogger()
class StandbyState(StationState):
"""STANDBY station state"""
def __init__(self, station_manager, power_hierarchy) -> None:
self.timeout = self.get_timeout(StationStateEnum.STANDBY)
super().__init__(StationStateEnum.STANDBY, station_manager, power_hierarchy)
async def station_off(self):
return self.disallowed_transition_error(StationStateEnum.OFF)
async def station_standby(self):
self.get_transition_current_state_error()
return
async def station_hibernate(self):
"""Transition STANDBY -> HIBERNATE"""
from tangostationcontrol.states.hibernate import HibernateState
target_state = StationStateEnum.HIBERNATE
try:
await self._transition(
target_state,
self._power_hierarchy.standby_to_hibernate,
)
except DevFailed as exc:
error_string = self.get_transition_error_string(target_state)
logger.exception(error_string)
Except.re_throw_exception(
exc, "DevFailed", error_string, self._station_manager.__name__
)
self._station_manager.set_station_state(
HibernateState(self._station_manager, self._power_hierarchy)
)
async def station_on(self):
"""Transition STANDBY -> ON"""
from tangostationcontrol.states.on import OnState
target_state = StationStateEnum.ON
try:
await self._transition(
target_state,
self._power_hierarchy.standby_to_on,
)
except DevFailed as exc:
error_string = self.get_transition_error_string(target_state)
logger.exception(error_string)
Except.re_throw_exception(
exc, "DevFailed", error_string, self._station_manager.__name__
)
self._station_manager.set_station_state(
OnState(self._station_manager, self._power_hierarchy)
)
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import asyncio
import logging
from abc import ABC, abstractmethod
from typing import Callable, Awaitable, Dict, Optional
from functools import wraps
from tango import DevState, DeviceProxy
from tangostationcontrol.states.station_state_enum import StationStateEnum
from tangostationcontrol.common.type_checking import device_class_matches
__all__ = ["StationState"]
logger = logging.getLogger()
DEVICES_ON_IN_STATION_STATE: Dict[str, Optional[StationStateEnum]] = {
"""In which StationState each device class should be switched ON."""
"StationManager": StationStateEnum.HIBERNATE,
"CCD": StationStateEnum.HIBERNATE,
"EC": StationStateEnum.HIBERNATE,
"PCON": StationStateEnum.HIBERNATE,
"PSOC": StationStateEnum.HIBERNATE,
"TemperatureManager": StationStateEnum.HIBERNATE,
"Docker": StationStateEnum.HIBERNATE,
"Configuration": StationStateEnum.HIBERNATE,
"SDPFirmware": StationStateEnum.STANDBY,
"APS": StationStateEnum.STANDBY,
"APSPU": StationStateEnum.STANDBY,
"APSCT": StationStateEnum.STANDBY,
"UNB2": StationStateEnum.STANDBY,
"RECVH": StationStateEnum.STANDBY,
"RECVL": StationStateEnum.STANDBY,
# TODO(JDM) Lingering observations (and debug observation 0)
# should not be booted as we do not persist their settings
"ObservationField": None,
# Unmentioned devices will go ON in this state
"_default": StationStateEnum.ON,
}
def run_if_device_on_in_station_state(
target_state: StationStateEnum,
state_table: Dict[str, StationStateEnum] = None,
):
"""Decorator for a function func(device).
It only executes the decorated function if the device should be turned ON in the target_state.
Returns None otherwise."""
if state_table is None:
state_table = DEVICES_ON_IN_STATION_STATE
def inner(func):
@wraps(func)
def wrapper(device: DeviceProxy, *args, **kwargs):
def should_execute():
for pattern, station_state in state_table.items():
if not device_class_matches(device, pattern):
continue
return target_state == station_state
# no matches, use default (if any)
return target_state == state_table.get("_default")
# execute function if we should in the configured state
return func(device, *args, **kwargs) if should_execute() else None
return wrapper
return inner
class StationState(ABC):
"""Abstract class for station state"""
TIMEOUT_DICT = {
StationStateEnum.HIBERNATE: 60.0,
StationStateEnum.STANDBY: 300.0,
StationStateEnum.ON: 600.0,
}
def __init__(
self,
state: StationStateEnum,
station_manager,
power_hierarchy,
) -> None:
self._state = state
self._station_manager = station_manager
self._power_hierarchy = power_hierarchy
super().__init__()
@property
def state(self) -> StationStateEnum:
"""Name of the current state"""
return self._state
@state.setter
def state(self, state: StationStateEnum) -> None:
self._state = state
@classmethod
def get_timeout(cls, state: StationStateEnum) -> float:
"""Return the timeout for a given station state"""
return cls.TIMEOUT_DICT[state]
def get_transition_error_string(self, target_state: StationStateEnum) -> str:
"""Print standard transition error string"""
return f"Station {self._station_manager.Station_Name} \
can not transition to {target_state.name} state. \
Current state is {self.state.name}"
def get_transition_current_state_error(self) -> None:
"""Log a warning message after performing a transition to
the already current state"""
logger.warning(
"Requested to go to %s state, but am already in %s state.",
self.state.name,
self.state.name,
)
def disallowed_transition_error(
self, target_state: StationStateEnum, raise_exception: bool = True
) -> None:
"""Raise an exception or log a warning if the requested transition
is not allowed"""
error_message = (
f"State transition to {target_state.name} state "
f"not allowed in current state: {self.state.name}"
)
if raise_exception:
raise Exception(error_message)
logger.warning(error_message)
async def _transition(
self,
target_state: StationStateEnum,
transition_func: Callable[[], Awaitable[None]],
):
"""Transition to a station state using `transition_func`.
:param transition_state : transition to be performed
:param transition_func : function that implements the transition
:param state_timeout: timeout of transition to be performed
"""
# StationManager device must be in ON state
if self._station_manager.get_state() != DevState.ON:
raise Exception(
f"Station Manager must be in ON state. "
f"Current state is {self._station_manager.get_state()}"
)
logger.info(
"Station %s requested to perform the %s -> %s Power Sequence",
self._station_manager.Station_Name,
self.state.name,
target_state.name,
)
try:
self._station_manager.requested_station_state = target_state
self._station_manager.last_requested_transition_exceptions = (
await self.power_transition(
target_state,
transition_func,
)
)
except Exception as ex:
# unsuppressed exception
self._station_manager.last_requested_transition_exceptions = [(None, ex)]
raise
logger.info(
"Station %s has correctly completed the %s -> %s Power Sequence",
self._station_manager.Station_Name,
self.state.name,
target_state.name,
)
@abstractmethod
async def station_off(self):
"""Abstract method for OFF transitions"""
raise NotImplementedError
@abstractmethod
async def station_hibernate(self):
"""Abstract method for HIBERNATE transitions"""
raise NotImplementedError
@abstractmethod
async def station_standby(self):
"""Abstract method for STANDBY transitions"""
raise NotImplementedError
@abstractmethod
async def station_on(self):
"""Abstract method for ON transitions"""
raise NotImplementedError
async def power_transition(self, target_state, pwr_func):
"""Trigger the Power Hierarchy device to perform the transition
:param target_state : transition to be performed
:param pwr_func : power hierarchy function that implements the transition
:param timeout: timeout of transition to be performed
"""
try:
return await asyncio.wait_for(
asyncio.to_thread(pwr_func),
timeout=self.get_timeout(target_state),
)
except asyncio.TimeoutError as exc:
raise TimeoutError(
f"Transition to {target_state.name} state timed out."
) from exc
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from enum import IntEnum
__all__ = ["StationStateEnum"]
class StationStateEnum(IntEnum):
"""Station states enumeration"""
OFF = 0
HIBERNATE = 1
STANDBY = 2
ON = 3
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment