Skip to content
Snippets Groups Projects
Commit 7c513d64 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Resolve L2SS-1531 "Async station manager"

parent c1f1c826
Branches
No related tags found
1 merge request!808Resolve L2SS-1531 "Async station manager"
......@@ -166,6 +166,7 @@ Next change the version in the following places:
# Release Notes
* 0.28.0 Make `StationManager` device asynchronous
* 0.27.2 Add new attributes in OPCUA devices
* 0.27.1 Bugfixes / rollout fixes
* 0.27.0 Replace device_attribute with a per-attribute metric
......
......@@ -217,6 +217,7 @@ function await {
exit 1
fi
done
sleep 2
echo ". [ok]"
done
......
0.27.2
0.28.0
......@@ -29,9 +29,8 @@ class TestDeviceProxy(DeviceProxy):
d = TestDeviceProxy(endpoint)
try:
d.Off()
except Exception as e:
"""Failing to turn Off devices should not raise errors here"""
logger.error(f"Failed to turn device off in teardown {e}")
"""Wait to prevent propagating reconnection errors"""
except Exception as exc:
# Failing to turn Off devices should not raise errors here
logger.error("Failed to turn device off in teardown {%s}", exc)
# Wait to prevent propagating reconnection errors
time.sleep(sleep)
......@@ -3,6 +3,7 @@
"""Power Hierarchy for PyTango devices"""
import asyncio
from typing import Dict, Optional
import logging
......@@ -29,6 +30,10 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
POWER_CHILD_PROPERTY = "Power_Children"
HIBERNATE_TIMEOUT = 60.0
STANDBY_TIMEOUT = 60.0
ON_TIMEOUT = 60.0
def init(
self,
device_name: str,
......@@ -42,27 +47,27 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
def _boot_device(self, device: DeviceProxy):
"""Default sequence of device booting operations"""
logger.info(f"Booting {device}: off()")
logger.info("Booting %s: off()", device)
device.off()
logger.info(f"Booting {device}: initialise()")
logger.info("Booting %s: initialise()", device)
device.initialise()
logger.info(f"Booting {device}: set_defaults()")
logger.info("Booting %s: set_defaults()", device)
device.set_defaults()
logger.info(f"Booting {device}: on()")
logger.info("Booting %s: on()", device)
device.on()
logger.info(f"Booting {device}: Succesful: state={device.state()}")
logger.info("Booting %s: Succesful: state=%s", device, device.state())
def _shutdown_device(self, device: DeviceProxy):
"""Default sequence of dervice turning-off operations"""
if device.state() == DevState.OFF:
logger.info(f"Shutting down {device}: Succesful: It's already OFF?")
logger.info("Shutting down %s: Succesful: It's already OFF?", device)
return
logger.info(f"Shutting down {device}: off()")
logger.info("Shutting down %s: off()", device)
device.off()
logger.info(f"Shutting down {device}: Succesful: state={device.state()}")
logger.info("Shutting down %s: Succesful: state=%s", device, device.state())
def off_to_hibernate(self):
def _off_to_hibernate(self):
"""Manage the device operations involved in the OFF -> HIBERNATE state transition.
Only minimal hardware is powered."""
......@@ -74,22 +79,34 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
# PSOC: Power on CCD
if device_class_matches(device, "PSOC"):
logger.info(f"Powering on {device}: CCD")
logger.info("Powering on %s: CCD", device)
device.power_hardware_on()
logger.info(f"Powering on {device}: Succesful: CCD")
logger.info("Powering on %s: Succesful: CCD", device)
# CCD: Power on clock
if device_class_matches(device, "CCD"):
logger.info(f"Powering on {device}: Clock")
logger.info("Powering on %s: Clock", device)
device.power_hardware_on()
logger.info(f"Powering on {device}: Succesful: Clock")
logger.info("Powering on %s: Succesful: Clock", device)
self.walk_down(boot_to_hibernate, -1)
# Return the suppressed exceptions
return boot_to_hibernate.exceptions
def hibernate_to_standby(self):
async def off_to_hibernate(self):
"""Manage the device operations involved in the OFF -> HIBERNATE state transition.
Only minimal hardware is powered. ASYNC Version.
"""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._off_to_hibernate),
timeout=self.HIBERNATE_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError("OFF -> HIBERNATE state transition timed out.") from exc
def _hibernate_to_standby(self):
"""Manage the device operations involved in the HIBERNATE -> STANDBY state transition.
Powers hardware except antennas and firmware.
"""
......@@ -101,16 +118,30 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
# UNB2: Power on the Uniboards
if device_class_matches(device, "UNB2"):
logger.info(f"Powering on {device}: Uniboards")
logger.info("Powering on %s: Uniboards", device)
device.power_hardware_on()
logger.info(f"Powering on {device}: Succesful: Uniboards")
logger.info("Powering on %s: Succesful: Uniboards", device)
self.walk_down(boot_to_standby, -1)
# Return the suppressed exceptions
return boot_to_standby.exceptions
def standby_to_hibernate(self):
async def hibernate_to_standby(self):
"""Manage the device operations involved in the HIBERNATE -> STANDBY state transition.
Powers hardware except antennas and firmware. ASYNC version.
"""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._hibernate_to_standby),
timeout=self.STANDBY_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError(
"HIBERNATE -> STANDBY state transition timed out."
) from exc
def _standby_to_hibernate(self):
"""Manage the device operations involved in the STANDBY -> HIBERNATE state transition."""
@suppress_exceptions(self.continue_on_failure)
......@@ -118,9 +149,9 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
def power_off_from_standby(device: DeviceProxy):
# UNB2: Power off the Uniboards
if device_class_matches(device, "UNB2"):
logger.info(f"Powering off {device}: Uniboards")
logger.info("Powering off %s: Uniboards", device)
device.power_hardware_off()
logger.info(f"Powering off {device}: Succesful: Uniboards")
logger.info("Powering off %s: Succesful: Uniboards", device)
self.walk_up(power_off_from_standby, -1)
......@@ -135,7 +166,20 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
# Return the suppressed exceptions
return power_off_from_standby.exceptions + shutdown_to_hibernate.exceptions
def standby_to_on(self):
async def standby_to_hibernate(self):
"""Manage the device operations involved in the STANDBY -> HIBERNATE state transition.
ASYNC version."""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._standby_to_hibernate),
timeout=self.HIBERNATE_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError(
"STANDBY -> HIBERNATE state transition timed out."
) from exc
def _standby_to_on(self):
"""Manage the device operations involved in the STANDBY -> ON state transition.
Powers power-hungry devices (SDP, antennas).
"""
......@@ -145,21 +189,21 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
def power_on(device: DeviceProxy):
# APSCT: Select 200 MHz clock
if device_class_matches(device, "APSCT"):
logger.info(f"Powering on {device}: 200MHz clock")
logger.info("Powering on %s: 200MHz clock", device)
device.power_hardware_on()
logger.info(f"Powering on {device}: Succesful: 200MHz clock")
logger.info("Powering on %s: Succesful: 200MHz clock", device)
# RECV: Power on RCUs
if device_class_matches(device, ["RECVH", "RECVL"]):
logger.info(f"Powering on {device}: RCUs")
logger.info("Powering on %s: RCUs", device)
device.power_hardware_on()
logger.info(f"Powering on {device}: Succesful: RCUs")
logger.info("Powering on %s: Succesful: RCUs", device)
# SDPFirmware: Flash user image
if device_class_matches(device, "SDPFirmware"):
logger.info(f"Powering on {device}: User image")
logger.info("Powering on %s: User image", device)
device.power_hardware_on()
logger.info(f"Powering on {device}: Succesful: User image")
logger.info("Powering on %s: Succesful: User image", device)
self.walk_down(power_on, -1)
......@@ -176,10 +220,10 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
def power_antennas_on(device: DeviceProxy):
# AntennaField: Power on used antennas
if device_class_matches(device, "AntennaField"):
logger.info(f"Powering on {device}: Antennas")
logger.info("Powering on %s: Antennas", device)
device.power_hardware_on()
# TODO(JDM): Report which antennas
logger.info(f"Powering on {device}: Succesful: Antennas")
logger.info("Powering on %s: Succesful: Antennas", device)
self.walk_down(power_antennas_on, -1)
......@@ -188,7 +232,19 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
power_on.exceptions + boot_to_on.exceptions + power_antennas_on.exceptions
)
def on_to_standby(self):
async def standby_to_on(self):
"""Manage the device operations involved in the STANDBY -> ON state transition.
Powers power-hungry devices (SDP, antennas). ASYNC version.
"""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._standby_to_on),
timeout=self.ON_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError("STANDBY -> ON state transition timed out.") from exc
def _on_to_standby(self):
"""Manage the device operations involved in the ON -> STANDBY state transition."""
# turn off power to hardware we also will turn off the software device for
......@@ -197,10 +253,10 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
def power_off_from_on(device: DeviceProxy):
# AntennaField: Power off all antennas
if device_class_matches(device, "AntennaField"):
logger.info(f"Powering off {device}: Antennas")
logger.info("Powering off %s: Antennas", device)
device.power_hardware_off()
# TODO(JDM): Report which antennas
logger.info(f"Powering off {device}: Succesful: Antennas")
logger.info("Powering off %s: Succesful: Antennas", device)
self.walk_up(power_off_from_on, -1)
......@@ -217,21 +273,21 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
def power_off(device: DeviceProxy):
# APSCT: Turn off clock
if device_class_matches(device, "APSCT"):
logger.info(f"Powering off {device}: Clock")
logger.info("Powering off %s: Clock", device)
device.power_hardware_off()
logger.info(f"Powering off {device}: Succesful: Clock")
logger.info("Powering off %s: Succesful: Clock", device)
# RECV: Power off RCUs
if device_class_matches(device, ["RECVH", "RECVL"]):
logger.info(f"Powering off {device}: RCUs")
logger.info("Powering off %s: RCUs", device)
device.power_hardware_off()
logger.info(f"Powering off {device}: Succesful: RCUs")
logger.info("Powering off %s: Succesful: RCUs", device)
# SDPFirmware: Flash factory image
if device_class_matches(device, "SDPFirmware"):
logger.info(f"Powering off {device}: Factory image")
logger.info("Powering off %s: Factory image", device)
device.power_hardware_off()
logger.info(f"Powering off {device}: Succesful: Factory image")
logger.info("Powering off %s: Succesful: Factory image", device)
self.walk_up(power_off, -1)
......@@ -241,3 +297,14 @@ class PowerHierarchyDevice(AbstractHierarchyDevice):
+ shutdown_to_standby.exceptions
+ power_off.exceptions
)
async def on_to_standby(self):
"""Manage the device operations involved in the ON -> STANDBY state transition.
ASYNC version."""
try:
return await asyncio.wait_for(
asyncio.to_thread(self._on_to_standby),
timeout=self.STANDBY_TIMEOUT,
)
except asyncio.TimeoutError as exc:
raise TimeoutError("ON -> STANDBY state transition timed out.") from exc
......@@ -4,8 +4,9 @@
""" StationManager Device Server for LOFAR2.0
"""
import asyncio
import logging
from typing import Callable, Awaitable
from tango import DebugIt, DevState, DevFailed, Except
......@@ -20,7 +21,7 @@ from tangostationcontrol.common.states import (
StationState,
ALLOWED_STATION_STATE_TRANSITIONS,
)
from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice
from tangostationcontrol.devices.base_device_classes.async_device import AsyncDevice
from tangostationcontrol.devices.base_device_classes.power_hierarchy import (
PowerHierarchyDevice,
)
......@@ -37,7 +38,7 @@ __all__ = ["StationManager"]
"last_requested_transition_exceptions_R",
]
)
class StationManager(LOFARDevice):
class StationManager(AsyncDevice):
"""StationManager Device Server for LOFAR2.0"""
# -----------------
......@@ -99,24 +100,35 @@ class StationManager(LOFARDevice):
self.stationmanager_ph = None
self.last_requested_transition = None
self.last_requested_transition_exceptions = []
self.transition_lock = asyncio.Lock()
# Super must be called after variable assignment due to executing init_device!
super().__init__(cl, name)
def init_device(self):
super().init_device()
# Set the station state to off
self.station_state = StationState.OFF
def _init_device(self):
super()._init_device()
# always turn on automatically, so the user doesn't have to boot the
# StationManager device.
self.Initialise()
self.On()
"""Alternative method"""
# loop = asyncio.get_running_loop()
# loop.create_task(self.Initialise())
# loop.create_task(self.On())
"""Works"""
self._Initialise()
self._On()
def configure_for_initialise(self):
super().configure_for_initialise()
# Create the state transition lock
if self.transition_lock is None:
self.transition_lock = asyncio.Lock()
# Set the station state to off
self.station_state = StationState.OFF
# Initialise power hierarchy based on current settings
self._initialise_power_hierarchy()
......@@ -157,25 +169,32 @@ class StationManager(LOFARDevice):
# the requested state transition is allowed
return True
def _transition(self, transition_desc: str, transition_func):
"""Transition to a station state using `transition_func`."""
async def _transition(
self,
transition_desc: str,
transition_func: Callable[[], Awaitable[None]],
):
"""Transition to a station state using `transition_func`.
:param transition_desc : string description of the transition to be performed
:param transition_func : function that implements the transition
"""
# StationManager device must be in ON state
if self.proxy.state() != DevState.ON:
if self.get_state() != DevState.ON:
raise Exception(
f"Station Manager must be in ON state. Current state is {self.proxy.state()}"
f"Station Manager must be in ON state. Current state is {self.get_state()}"
)
logger.info(
"Station %s requested to perform the %s Power Sequence",
self.proxy.station_name_R,
self.Station_Name,
transition_desc,
)
# Power Sequence OFF -> HIBERNATE
try:
self.last_requested_transition = transition_desc
self.last_requested_transition_exceptions = transition_func()
self.last_requested_transition_exceptions = await transition_func()
except Exception as ex:
# unsuppressed exception
self.last_requested_transition_exceptions = [(None, ex)]
......@@ -183,7 +202,7 @@ class StationManager(LOFARDevice):
logger.info(
"Station %s has correctly completed the %s Power Sequence",
self.proxy.station_name_R,
self.Station_Name,
transition_desc,
)
......@@ -194,118 +213,121 @@ class StationManager(LOFARDevice):
@command()
@DebugIt()
@log_exceptions()
def station_off(self):
async def station_off(self):
"""
Switch the station into OFF state.
It can only be executed from state HIBERNATE.
"""
if self.station_state == StationState.OFF:
return
await self.transition_lock.acquire()
try:
if not self._is_transition_allowed(StationState.OFF):
raise Exception(f"Station did not transition to {StationState.OFF.name}")
# call the correct state transition function
return
# not implemented
pass
# not implemented -> call the correct state transition function
# update the station_state variable when successful
self.station_state = StationState.OFF
finally:
self.transition_lock.release()
@command()
@DebugIt()
@log_exceptions()
def station_hibernate(self):
async def station_hibernate(self):
"""
Switch the station into HIBERNATE state.
It can only be executed from either state OFF or STANDBY.
"""
if self.station_state == StationState.HIBERNATE:
return
await self.transition_lock.acquire()
try:
if not self._is_transition_allowed(StationState.HIBERNATE):
raise Exception(
f"Station did not transition to {StationState.HIBERNATE.name}"
)
return
# call the correct state transition function
try:
if self.station_state == StationState.OFF:
self._transition(
await self._transition(
"OFF -> HIBERNATE", self.stationmanager_ph.off_to_hibernate
)
elif self.station_state == StationState.STANDBY:
self._transition(
"STANDBY -> HIBERNATE", self.stationmanager_ph.standby_to_hibernate
await self._transition(
"STANDBY -> HIBERNATE",
self.stationmanager_ph.standby_to_hibernate,
)
except DevFailed as exc:
error_string = f"Station {self.proxy.station_name_R} \
did not transition to {StationState.HIBERNATE.name} state. \
Current state is {self.proxy.station_state_R}"
error_string = f"Station {self.Station_Name} \
can not transition to {StationState.HIBERNATE.name} state. \
Current state is {self.station_state.name}"
logger.exception(error_string)
Except.re_throw_exception(exc, "DevFailed", error_string, __name__)
# update the station_state variable when successful
self.station_state = StationState.HIBERNATE
finally:
self.transition_lock.release()
@command()
@DebugIt()
@log_exceptions()
def station_standby(self):
async def station_standby(self):
"""
Switch the station into STANDBY state.
It can only be executed from either state HIBERNATE or ON.
"""
if self.station_state == StationState.STANDBY:
return
await self.transition_lock.acquire()
try:
if not self._is_transition_allowed(StationState.STANDBY):
raise Exception(
f"Station did not transition to {StationState.STANDBY.name}"
)
return
# call the correct state transition function
try:
if self.station_state == StationState.HIBERNATE:
self._transition(
"HIBERNATE -> STANDBY", self.stationmanager_ph.hibernate_to_standby
await self._transition(
"HIBERNATE -> STANDBY",
self.stationmanager_ph.hibernate_to_standby,
)
elif self.station_state == StationState.ON:
self._transition("ON -> STANDBY", self.stationmanager_ph.on_to_standby)
await self._transition(
"ON -> STANDBY", self.stationmanager_ph.on_to_standby
)
except DevFailed as exc:
error_string = f"Station {self.proxy.station_name_R} \
did not transition to {StationState.STANDBY.name} state. \
Current state is {self.proxy.station_state_R}"
error_string = f"Station {self.Station_Name} \
can not transition to {StationState.STANDBY.name} state. \
Current state is {self.station_state.name}"
logger.exception(error_string)
Except.re_throw_exception(exc, "DevFailed", error_string, __name__)
# update the station_state variable when successful
self.station_state = StationState.STANDBY
finally:
self.transition_lock.release()
@command()
@DebugIt()
@log_exceptions()
def station_on(self):
async def station_on(self):
"""
Switch the station into ON state.
It can only be executed from state STANDBY.
"""
if self.station_state == StationState.ON:
return
await self.transition_lock.acquire()
try:
if not self._is_transition_allowed(StationState.ON):
raise Exception(f"Station did not transition to {StationState.ON.name}")
return
# call the correct state transition function
try:
self._transition("STANDBY -> ON", self.stationmanager_ph.standby_to_on)
await self._transition(
"STANDBY -> ON", self.stationmanager_ph.standby_to_on
)
except DevFailed as exc:
error_string = f"Station {self.proxy.station_name_R} \
did not transition to {StationState.ON.name} state. \
Current state is {self.proxy.station_state_R}"
error_string = f"Station {self.Station_Name} \
can not transition to {StationState.ON.name} state. \
Current state is {self.station_state.name}"
logger.exception(error_string)
Except.re_throw_exception(exc, "DevFailed", error_string, __name__)
# update the station_state variable when successful
self.station_state = StationState.ON
finally:
self.transition_lock.release()
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
import asyncio
from test.devices import device_base
from tango import DevState
from tango.test_context import DeviceTestContext
from tangostationcontrol.devices import station_manager
class TestStationManagerDevice(device_base.DeviceTestCase):
"""Unit test class for device Station Manager"""
def setUp(self):
super(TestStationManagerDevice, self).setUp()
def test_init_on(self):
"""Test whether ON state is reached at device initialisation"""
with DeviceTestContext(
station_manager.StationManager, process=False, timeout=10
) as proxy:
proxy.init()
self.assertEqual(proxy.state(), DevState.ON)
async def test_transitions_lock(self):
"""Test whether the lock mechanism ensure only one transition
at a time is executed"""
async def dummy_station_hibernate(lock, shared_var):
"""Dummy method to simulate station_hibernate lock"""
async with lock:
shared_var += 1
await asyncio.sleep(0.1) # Simulate some asynchronous operation
shared_var = 99
await asyncio.sleep(0.1) # Simulate some asynchronous operation
shared_var -= 1
async def dummy_station_standby(lock, shared_var):
"""Dummy method to simulate station_standby lock"""
async with lock:
shared_var += 2
await asyncio.sleep(0.1) # Simulate some asynchronous operation
shared_var = 99
await asyncio.sleep(0.1) # Simulate some asynchronous operation
shared_var -= 2
async def test_concurrent_transitions(lock, shared_var):
"""Execute the two mocked coroutines"""
await dummy_station_hibernate(lock, shared_var)
await dummy_station_standby(lock, shared_var)
def test_lock_serialization(lock, shared_var):
"""Test whether lock serialization is correctly working"""
# Execute concurrent transitions multiple times to test serialization
for _ in range(100):
shared_var = 0 # reset variable
asyncio.run(test_concurrent_transitions(lock, shared_var))
self.assertEqual(
shared_var,
0,
msg=f"Lock serialization failed. Shared variable={shared_var}",
)
with DeviceTestContext(
station_manager.StationManager, process=False, timeout=10
) as proxy:
proxy.init()
shared_var = 0 # Initialize shared variable
lock = asyncio.Lock() # Create a lock
test_lock_serialization(lock, shared_var)
# Verify that the device is in the correct state after the transitions
self.assertEqual(proxy.state(), DevState.ON)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment