Skip to content
Snippets Groups Projects

L2SS-1204 added station state stubs for the StationManager

Merged L2SS-1204 added station state stubs for the StationManager
All threads resolved!
Merged Taya Snijder requested to merge L2SS-1204_add_station_state_stubs_to_StationManager into master
All threads resolved!
@@ -7,20 +7,41 @@
import logging
from tango.server import attribute, device_property
# pytango imports
from tango.server import attribute, command, device_property
from tango import DebugIt
# Additional import
from tangostationcontrol.common.entrypoint import entry
from tangostationcontrol.common.lofar_logging import device_logging_to_python
from tangostationcontrol.devices.lofar_device import LOFARDevice
from tangostationcontrol.common.lofar_logging import log_exceptions
from enum import Enum
logger = logging.getLogger()
__all__ = ["StationManager", "main"]
class StationState(Enum):
# all station states
OFF = "OFF"
HIBERNATE = "HIBERNATE"
STANDBY = "STANDBY"
ON = "ON"
@device_logging_to_python()
class StationManager(LOFARDevice):
# Contains which transitions are allowed for a given states
ALLOWED_TRANSITIONS = {
StationState.OFF: [StationState.HIBERNATE],
StationState.HIBERNATE: [StationState.OFF, StationState.STANDBY],
StationState.STANDBY: [StationState.HIBERNATE, StationState.ON],
StationState.ON: [StationState.STANDBY],
}
# -----------------
# Device Properties
# -----------------
@@ -37,20 +58,146 @@ class StationManager(LOFARDevice):
def read_station_name_R(self):
return self.Station_Name
station_state_R = attribute(dtype=str, fisallowed="is_attribute_access_allowed")
def read_station_state_R(self):
return self.station_state.name
# --------
# overloaded functions
# --------
def init_device(self):
super().init_device()
# Set the station state to off
self.station_state = StationState.OFF
# always turn on automatically, so the user doesn't have to boot the StationManager device
self.Initialise()
self.On()
# --------
# internal functions
# --------
def _is_transition_allowed(self, to_state) -> bool:
# get allowed transitions for the current state
allowed_transitions = self.ALLOWED_TRANSITIONS[self.station_state]
# check if the station is already in state it wants to go to
if to_state == self.station_state:
logger.warning(
f"Requested to go to {to_state.name} state, "
f"but am already in {self.station_state.name} state."
)
return False
# check if it is allowed to transition to the desired state
elif to_state not in allowed_transitions:
raise Exception(
f"State transition to {to_state.name} state "
f"only allowed in {[i.name for i in allowed_transitions]} "
f"Current state: {self.station_state.name}"
)
else:
# the requested state transition is allowed
return True
def _off_to_hibernate(self):
# TODO: functionality
return
def _hibernate_to_off(self):
# TODO: functionality
return
def _hibernate_to_standby(self):
# TODO: functionality
return
def _standby_to_hibernate(self):
# TODO: functionality
return
def standby_to_on(self):
# TODO: functionality
return
def on_to_standby(self):
# TODO: functionality
return
# --------
# Commands
# --------
@command()
@DebugIt()
@log_exceptions()
def station_off(self):
if not self._is_transition_allowed(StationState.OFF):
logger.warning(f"Station did not transition to {StationState.OFF.name}")
return
# call the correct state transition function
else:
self._hibernate_to_off()
# update the station_state variable when successful
self.station_state = StationState.OFF
@command()
@DebugIt()
@log_exceptions()
def station_hibernate(self):
if not self._is_transition_allowed(StationState.HIBERNATE):
logger.warning(
f"Station did not transition to {StationState.HIBERNATE.name}"
)
return
# call the correct state transition function
elif self.station_state == StationState.OFF:
self._off_to_hibernate()
elif self.station_state == StationState.STANDBY:
self._standby_to_hibernate()
# update the station_state variable when successful
self.station_state = StationState.HIBERNATE
@command()
@DebugIt()
@log_exceptions()
def station_standby(self):
if not self._is_transition_allowed(StationState.STANDBY):
logger.warning(f"Station did not transition to {StationState.STANDBY.name}")
return
# call the correct state transition function
elif self.station_state == StationState.HIBERNATE:
self._hibernate_to_standby()
elif self.station_state == StationState.ON:
self.on_to_standby()
# update the station_state variable when successful
self.station_state = StationState.STANDBY
@command()
@DebugIt()
@log_exceptions()
def station_on(self):
if not self._is_transition_allowed(StationState.ON):
logger.warning(f"Station did not transition to {StationState.ON.name}")
return
# call the correct state transition function
else:
self.on_to_standby()
# update the station_state variable when successful
self.station_state = StationState.ON
# ----------
# Run server
Loading