diff --git a/README.md b/README.md index 7794874fc30ad93b82461f5e419e1aab9324aa86..1c047091c54c2a3b81e225bb2b6097ae5488b02f 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,7 @@ Next change the version in the following places: # Release Notes +* 0.28.0 Make `StationManager` device asynchronous * 0.27.2 Add new attributes in OPCUA devices * 0.27.1 Bugfixes / rollout fixes * 0.27.0 Replace device_attribute with a per-attribute metric diff --git a/sbin/run_integration_test.sh b/sbin/run_integration_test.sh index ba96f07395fe1ba1dab6778c6082728bf36f217c..b17f52a219aeaa0a5cf92dee89a3a781e12c9a4a 100755 --- a/sbin/run_integration_test.sh +++ b/sbin/run_integration_test.sh @@ -217,6 +217,7 @@ function await { exit 1 fi done + sleep 2 echo ". [ok]" done diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION index 3edc695dcea5c89ff59d72a47fd30323fcfaa016..697f087f376ad7f03d3702f9f0e0987315e73f73 100644 --- a/tangostationcontrol/VERSION +++ b/tangostationcontrol/VERSION @@ -1 +1 @@ -0.27.2 +0.28.0 diff --git a/tangostationcontrol/integration_test/device_proxy.py b/tangostationcontrol/integration_test/device_proxy.py index 87557e2194547434e1fb3307cd5ef1798b0ce607..9e20533ddf04ef5e7844e7270a5d11082e559e15 100644 --- a/tangostationcontrol/integration_test/device_proxy.py +++ b/tangostationcontrol/integration_test/device_proxy.py @@ -29,9 +29,8 @@ class TestDeviceProxy(DeviceProxy): d = TestDeviceProxy(endpoint) try: d.Off() - except Exception as e: - """Failing to turn Off devices should not raise errors here""" - logger.error(f"Failed to turn device off in teardown {e}") - - """Wait to prevent propagating reconnection errors""" + except Exception as exc: + # Failing to turn Off devices should not raise errors here + logger.error("Failed to turn device off in teardown {%s}", exc) + # Wait to prevent propagating reconnection errors time.sleep(sleep) diff --git a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/power_hierarchy.py b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/power_hierarchy.py index eebafcce442e562c1b9327de250611fcd2ac1063..f710c2524b01444f27345dd7172671760dc8e732 100644 --- a/tangostationcontrol/tangostationcontrol/devices/base_device_classes/power_hierarchy.py +++ b/tangostationcontrol/tangostationcontrol/devices/base_device_classes/power_hierarchy.py @@ -3,6 +3,7 @@ """Power Hierarchy for PyTango devices""" +import asyncio from typing import Dict, Optional import logging @@ -29,6 +30,10 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): POWER_CHILD_PROPERTY = "Power_Children" + HIBERNATE_TIMEOUT = 60.0 + STANDBY_TIMEOUT = 60.0 + ON_TIMEOUT = 60.0 + def init( self, device_name: str, @@ -42,27 +47,27 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): def _boot_device(self, device: DeviceProxy): """Default sequence of device booting operations""" - logger.info(f"Booting {device}: off()") + logger.info("Booting %s: off()", device) device.off() - logger.info(f"Booting {device}: initialise()") + logger.info("Booting %s: initialise()", device) device.initialise() - logger.info(f"Booting {device}: set_defaults()") + logger.info("Booting %s: set_defaults()", device) device.set_defaults() - logger.info(f"Booting {device}: on()") + logger.info("Booting %s: on()", device) device.on() - logger.info(f"Booting {device}: Succesful: state={device.state()}") + logger.info("Booting %s: Succesful: state=%s", device, device.state()) def _shutdown_device(self, device: DeviceProxy): """Default sequence of dervice turning-off operations""" if device.state() == DevState.OFF: - logger.info(f"Shutting down {device}: Succesful: It's already OFF?") + logger.info("Shutting down %s: Succesful: It's already OFF?", device) return - logger.info(f"Shutting down {device}: off()") + logger.info("Shutting down %s: off()", device) device.off() - logger.info(f"Shutting down {device}: Succesful: state={device.state()}") + logger.info("Shutting down %s: Succesful: state=%s", device, device.state()) - def off_to_hibernate(self): + def _off_to_hibernate(self): """Manage the device operations involved in the OFF -> HIBERNATE state transition. Only minimal hardware is powered.""" @@ -74,22 +79,34 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): # PSOC: Power on CCD if device_class_matches(device, "PSOC"): - logger.info(f"Powering on {device}: CCD") + logger.info("Powering on %s: CCD", device) device.power_hardware_on() - logger.info(f"Powering on {device}: Succesful: CCD") + logger.info("Powering on %s: Succesful: CCD", device) # CCD: Power on clock if device_class_matches(device, "CCD"): - logger.info(f"Powering on {device}: Clock") + logger.info("Powering on %s: Clock", device) device.power_hardware_on() - logger.info(f"Powering on {device}: Succesful: Clock") + logger.info("Powering on %s: Succesful: Clock", device) self.walk_down(boot_to_hibernate, -1) # Return the suppressed exceptions return boot_to_hibernate.exceptions - def hibernate_to_standby(self): + async def off_to_hibernate(self): + """Manage the device operations involved in the OFF -> HIBERNATE state transition. + Only minimal hardware is powered. ASYNC Version. + """ + try: + return await asyncio.wait_for( + asyncio.to_thread(self._off_to_hibernate), + timeout=self.HIBERNATE_TIMEOUT, + ) + except asyncio.TimeoutError as exc: + raise TimeoutError("OFF -> HIBERNATE state transition timed out.") from exc + + def _hibernate_to_standby(self): """Manage the device operations involved in the HIBERNATE -> STANDBY state transition. Powers hardware except antennas and firmware. """ @@ -101,16 +118,30 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): # UNB2: Power on the Uniboards if device_class_matches(device, "UNB2"): - logger.info(f"Powering on {device}: Uniboards") + logger.info("Powering on %s: Uniboards", device) device.power_hardware_on() - logger.info(f"Powering on {device}: Succesful: Uniboards") + logger.info("Powering on %s: Succesful: Uniboards", device) self.walk_down(boot_to_standby, -1) # Return the suppressed exceptions return boot_to_standby.exceptions - def standby_to_hibernate(self): + async def hibernate_to_standby(self): + """Manage the device operations involved in the HIBERNATE -> STANDBY state transition. + Powers hardware except antennas and firmware. ASYNC version. + """ + try: + return await asyncio.wait_for( + asyncio.to_thread(self._hibernate_to_standby), + timeout=self.STANDBY_TIMEOUT, + ) + except asyncio.TimeoutError as exc: + raise TimeoutError( + "HIBERNATE -> STANDBY state transition timed out." + ) from exc + + def _standby_to_hibernate(self): """Manage the device operations involved in the STANDBY -> HIBERNATE state transition.""" @suppress_exceptions(self.continue_on_failure) @@ -118,9 +149,9 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): def power_off_from_standby(device: DeviceProxy): # UNB2: Power off the Uniboards if device_class_matches(device, "UNB2"): - logger.info(f"Powering off {device}: Uniboards") + logger.info("Powering off %s: Uniboards", device) device.power_hardware_off() - logger.info(f"Powering off {device}: Succesful: Uniboards") + logger.info("Powering off %s: Succesful: Uniboards", device) self.walk_up(power_off_from_standby, -1) @@ -135,7 +166,20 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): # Return the suppressed exceptions return power_off_from_standby.exceptions + shutdown_to_hibernate.exceptions - def standby_to_on(self): + async def standby_to_hibernate(self): + """Manage the device operations involved in the STANDBY -> HIBERNATE state transition. + ASYNC version.""" + try: + return await asyncio.wait_for( + asyncio.to_thread(self._standby_to_hibernate), + timeout=self.HIBERNATE_TIMEOUT, + ) + except asyncio.TimeoutError as exc: + raise TimeoutError( + "STANDBY -> HIBERNATE state transition timed out." + ) from exc + + def _standby_to_on(self): """Manage the device operations involved in the STANDBY -> ON state transition. Powers power-hungry devices (SDP, antennas). """ @@ -145,21 +189,21 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): def power_on(device: DeviceProxy): # APSCT: Select 200 MHz clock if device_class_matches(device, "APSCT"): - logger.info(f"Powering on {device}: 200MHz clock") + logger.info("Powering on %s: 200MHz clock", device) device.power_hardware_on() - logger.info(f"Powering on {device}: Succesful: 200MHz clock") + logger.info("Powering on %s: Succesful: 200MHz clock", device) # RECV: Power on RCUs if device_class_matches(device, ["RECVH", "RECVL"]): - logger.info(f"Powering on {device}: RCUs") + logger.info("Powering on %s: RCUs", device) device.power_hardware_on() - logger.info(f"Powering on {device}: Succesful: RCUs") + logger.info("Powering on %s: Succesful: RCUs", device) # SDPFirmware: Flash user image if device_class_matches(device, "SDPFirmware"): - logger.info(f"Powering on {device}: User image") + logger.info("Powering on %s: User image", device) device.power_hardware_on() - logger.info(f"Powering on {device}: Succesful: User image") + logger.info("Powering on %s: Succesful: User image", device) self.walk_down(power_on, -1) @@ -176,10 +220,10 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): def power_antennas_on(device: DeviceProxy): # AntennaField: Power on used antennas if device_class_matches(device, "AntennaField"): - logger.info(f"Powering on {device}: Antennas") + logger.info("Powering on %s: Antennas", device) device.power_hardware_on() # TODO(JDM): Report which antennas - logger.info(f"Powering on {device}: Succesful: Antennas") + logger.info("Powering on %s: Succesful: Antennas", device) self.walk_down(power_antennas_on, -1) @@ -188,7 +232,19 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): power_on.exceptions + boot_to_on.exceptions + power_antennas_on.exceptions ) - def on_to_standby(self): + async def standby_to_on(self): + """Manage the device operations involved in the STANDBY -> ON state transition. + Powers power-hungry devices (SDP, antennas). ASYNC version. + """ + try: + return await asyncio.wait_for( + asyncio.to_thread(self._standby_to_on), + timeout=self.ON_TIMEOUT, + ) + except asyncio.TimeoutError as exc: + raise TimeoutError("STANDBY -> ON state transition timed out.") from exc + + def _on_to_standby(self): """Manage the device operations involved in the ON -> STANDBY state transition.""" # turn off power to hardware we also will turn off the software device for @@ -197,10 +253,10 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): def power_off_from_on(device: DeviceProxy): # AntennaField: Power off all antennas if device_class_matches(device, "AntennaField"): - logger.info(f"Powering off {device}: Antennas") + logger.info("Powering off %s: Antennas", device) device.power_hardware_off() # TODO(JDM): Report which antennas - logger.info(f"Powering off {device}: Succesful: Antennas") + logger.info("Powering off %s: Succesful: Antennas", device) self.walk_up(power_off_from_on, -1) @@ -217,21 +273,21 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): def power_off(device: DeviceProxy): # APSCT: Turn off clock if device_class_matches(device, "APSCT"): - logger.info(f"Powering off {device}: Clock") + logger.info("Powering off %s: Clock", device) device.power_hardware_off() - logger.info(f"Powering off {device}: Succesful: Clock") + logger.info("Powering off %s: Succesful: Clock", device) # RECV: Power off RCUs if device_class_matches(device, ["RECVH", "RECVL"]): - logger.info(f"Powering off {device}: RCUs") + logger.info("Powering off %s: RCUs", device) device.power_hardware_off() - logger.info(f"Powering off {device}: Succesful: RCUs") + logger.info("Powering off %s: Succesful: RCUs", device) # SDPFirmware: Flash factory image if device_class_matches(device, "SDPFirmware"): - logger.info(f"Powering off {device}: Factory image") + logger.info("Powering off %s: Factory image", device) device.power_hardware_off() - logger.info(f"Powering off {device}: Succesful: Factory image") + logger.info("Powering off %s: Succesful: Factory image", device) self.walk_up(power_off, -1) @@ -241,3 +297,14 @@ class PowerHierarchyDevice(AbstractHierarchyDevice): + shutdown_to_standby.exceptions + power_off.exceptions ) + + async def on_to_standby(self): + """Manage the device operations involved in the ON -> STANDBY state transition. + ASYNC version.""" + try: + return await asyncio.wait_for( + asyncio.to_thread(self._on_to_standby), + timeout=self.STANDBY_TIMEOUT, + ) + except asyncio.TimeoutError as exc: + raise TimeoutError("ON -> STANDBY state transition timed out.") from exc diff --git a/tangostationcontrol/tangostationcontrol/devices/station_manager.py b/tangostationcontrol/tangostationcontrol/devices/station_manager.py index 322fe0400b24ec14f2cc62a6c7cc663d082c762b..f7963354cab92f26f61e235134073003e1b80a6d 100644 --- a/tangostationcontrol/tangostationcontrol/devices/station_manager.py +++ b/tangostationcontrol/tangostationcontrol/devices/station_manager.py @@ -4,8 +4,9 @@ """ StationManager Device Server for LOFAR2.0 """ - +import asyncio import logging +from typing import Callable, Awaitable from tango import DebugIt, DevState, DevFailed, Except @@ -20,7 +21,7 @@ from tangostationcontrol.common.states import ( StationState, ALLOWED_STATION_STATE_TRANSITIONS, ) -from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice +from tangostationcontrol.devices.base_device_classes.async_device import AsyncDevice from tangostationcontrol.devices.base_device_classes.power_hierarchy import ( PowerHierarchyDevice, ) @@ -37,7 +38,7 @@ __all__ = ["StationManager"] "last_requested_transition_exceptions_R", ] ) -class StationManager(LOFARDevice): +class StationManager(AsyncDevice): """StationManager Device Server for LOFAR2.0""" # ----------------- @@ -99,24 +100,35 @@ class StationManager(LOFARDevice): self.stationmanager_ph = None self.last_requested_transition = None self.last_requested_transition_exceptions = [] + self.transition_lock = asyncio.Lock() # Super must be called after variable assignment due to executing init_device! super().__init__(cl, name) - def init_device(self): - super().init_device() - - # Set the station state to off - self.station_state = StationState.OFF - + def _init_device(self): + super()._init_device() # always turn on automatically, so the user doesn't have to boot the # StationManager device. - self.Initialise() - self.On() + + """Alternative method""" + # loop = asyncio.get_running_loop() + # loop.create_task(self.Initialise()) + # loop.create_task(self.On()) + + """Works""" + self._Initialise() + self._On() def configure_for_initialise(self): super().configure_for_initialise() + # Create the state transition lock + if self.transition_lock is None: + self.transition_lock = asyncio.Lock() + + # Set the station state to off + self.station_state = StationState.OFF + # Initialise power hierarchy based on current settings self._initialise_power_hierarchy() @@ -157,25 +169,32 @@ class StationManager(LOFARDevice): # the requested state transition is allowed return True - def _transition(self, transition_desc: str, transition_func): - """Transition to a station state using `transition_func`.""" + async def _transition( + self, + transition_desc: str, + transition_func: Callable[[], Awaitable[None]], + ): + """Transition to a station state using `transition_func`. + + :param transition_desc : string description of the transition to be performed + :param transition_func : function that implements the transition + """ # StationManager device must be in ON state - if self.proxy.state() != DevState.ON: + if self.get_state() != DevState.ON: raise Exception( - f"Station Manager must be in ON state. Current state is {self.proxy.state()}" + f"Station Manager must be in ON state. Current state is {self.get_state()}" ) logger.info( "Station %s requested to perform the %s Power Sequence", - self.proxy.station_name_R, + self.Station_Name, transition_desc, ) - # Power Sequence OFF -> HIBERNATE try: self.last_requested_transition = transition_desc - self.last_requested_transition_exceptions = transition_func() + self.last_requested_transition_exceptions = await transition_func() except Exception as ex: # unsuppressed exception self.last_requested_transition_exceptions = [(None, ex)] @@ -183,7 +202,7 @@ class StationManager(LOFARDevice): logger.info( "Station %s has correctly completed the %s Power Sequence", - self.proxy.station_name_R, + self.Station_Name, transition_desc, ) @@ -194,118 +213,121 @@ class StationManager(LOFARDevice): @command() @DebugIt() @log_exceptions() - def station_off(self): + async def station_off(self): """ Switch the station into OFF state. It can only be executed from state HIBERNATE. """ - if self.station_state == StationState.OFF: - return - - if not self._is_transition_allowed(StationState.OFF): - raise Exception(f"Station did not transition to {StationState.OFF.name}") - - # call the correct state transition function + await self.transition_lock.acquire() + try: + if not self._is_transition_allowed(StationState.OFF): + return - # not implemented - pass + # not implemented -> call the correct state transition function - # update the station_state variable when successful - self.station_state = StationState.OFF + # update the station_state variable when successful + self.station_state = StationState.OFF + finally: + self.transition_lock.release() @command() @DebugIt() @log_exceptions() - def station_hibernate(self): + async def station_hibernate(self): """ Switch the station into HIBERNATE state. It can only be executed from either state OFF or STANDBY. """ - if self.station_state == StationState.HIBERNATE: - return - - if not self._is_transition_allowed(StationState.HIBERNATE): - raise Exception( - f"Station did not transition to {StationState.HIBERNATE.name}" - ) - - # call the correct state transition function + await self.transition_lock.acquire() try: - if self.station_state == StationState.OFF: - self._transition( - "OFF -> HIBERNATE", self.stationmanager_ph.off_to_hibernate - ) - elif self.station_state == StationState.STANDBY: - self._transition( - "STANDBY -> HIBERNATE", self.stationmanager_ph.standby_to_hibernate - ) - except DevFailed as exc: - error_string = f"Station {self.proxy.station_name_R} \ - did not transition to {StationState.HIBERNATE.name} state. \ - Current state is {self.proxy.station_state_R}" - logger.exception(error_string) - Except.re_throw_exception(exc, "DevFailed", error_string, __name__) - - # update the station_state variable when successful - self.station_state = StationState.HIBERNATE + if not self._is_transition_allowed(StationState.HIBERNATE): + return + + # call the correct state transition function + try: + if self.station_state == StationState.OFF: + await self._transition( + "OFF -> HIBERNATE", self.stationmanager_ph.off_to_hibernate + ) + elif self.station_state == StationState.STANDBY: + await self._transition( + "STANDBY -> HIBERNATE", + self.stationmanager_ph.standby_to_hibernate, + ) + except DevFailed as exc: + error_string = f"Station {self.Station_Name} \ + can not transition to {StationState.HIBERNATE.name} state. \ + Current state is {self.station_state.name}" + logger.exception(error_string) + Except.re_throw_exception(exc, "DevFailed", error_string, __name__) + + # update the station_state variable when successful + self.station_state = StationState.HIBERNATE + finally: + self.transition_lock.release() @command() @DebugIt() @log_exceptions() - def station_standby(self): + async def station_standby(self): """ Switch the station into STANDBY state. It can only be executed from either state HIBERNATE or ON. """ - if self.station_state == StationState.STANDBY: - return - - if not self._is_transition_allowed(StationState.STANDBY): - raise Exception( - f"Station did not transition to {StationState.STANDBY.name}" - ) - - # call the correct state transition function + await self.transition_lock.acquire() try: - if self.station_state == StationState.HIBERNATE: - self._transition( - "HIBERNATE -> STANDBY", self.stationmanager_ph.hibernate_to_standby - ) - elif self.station_state == StationState.ON: - self._transition("ON -> STANDBY", self.stationmanager_ph.on_to_standby) - except DevFailed as exc: - error_string = f"Station {self.proxy.station_name_R} \ - did not transition to {StationState.STANDBY.name} state. \ - Current state is {self.proxy.station_state_R}" - logger.exception(error_string) - Except.re_throw_exception(exc, "DevFailed", error_string, __name__) - - # update the station_state variable when successful - self.station_state = StationState.STANDBY + if not self._is_transition_allowed(StationState.STANDBY): + return + + # call the correct state transition function + try: + if self.station_state == StationState.HIBERNATE: + await self._transition( + "HIBERNATE -> STANDBY", + self.stationmanager_ph.hibernate_to_standby, + ) + elif self.station_state == StationState.ON: + await self._transition( + "ON -> STANDBY", self.stationmanager_ph.on_to_standby + ) + except DevFailed as exc: + error_string = f"Station {self.Station_Name} \ + can not transition to {StationState.STANDBY.name} state. \ + Current state is {self.station_state.name}" + logger.exception(error_string) + Except.re_throw_exception(exc, "DevFailed", error_string, __name__) + + # update the station_state variable when successful + self.station_state = StationState.STANDBY + finally: + self.transition_lock.release() @command() @DebugIt() @log_exceptions() - def station_on(self): + async def station_on(self): """ Switch the station into ON state. It can only be executed from state STANDBY. """ - if self.station_state == StationState.ON: - return - - if not self._is_transition_allowed(StationState.ON): - raise Exception(f"Station did not transition to {StationState.ON.name}") - - # call the correct state transition function + await self.transition_lock.acquire() try: - self._transition("STANDBY -> ON", self.stationmanager_ph.standby_to_on) - except DevFailed as exc: - error_string = f"Station {self.proxy.station_name_R} \ - did not transition to {StationState.ON.name} state. \ - Current state is {self.proxy.station_state_R}" - logger.exception(error_string) - Except.re_throw_exception(exc, "DevFailed", error_string, __name__) - - # update the station_state variable when successful - self.station_state = StationState.ON + if not self._is_transition_allowed(StationState.ON): + return + + # call the correct state transition function + try: + await self._transition( + "STANDBY -> ON", self.stationmanager_ph.standby_to_on + ) + except DevFailed as exc: + error_string = f"Station {self.Station_Name} \ + can not transition to {StationState.ON.name} state. \ + Current state is {self.station_state.name}" + logger.exception(error_string) + Except.re_throw_exception(exc, "DevFailed", error_string, __name__) + + # update the station_state variable when successful + self.station_state = StationState.ON + finally: + self.transition_lock.release() diff --git a/tangostationcontrol/test/devices/test_station_manager_device.py b/tangostationcontrol/test/devices/test_station_manager_device.py new file mode 100644 index 0000000000000000000000000000000000000000..6f5f6df88feff3de02f22ce0a41fc415a935ba81 --- /dev/null +++ b/tangostationcontrol/test/devices/test_station_manager_device.py @@ -0,0 +1,78 @@ +# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy) +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +from test.devices import device_base + +from tango import DevState +from tango.test_context import DeviceTestContext + +from tangostationcontrol.devices import station_manager + + +class TestStationManagerDevice(device_base.DeviceTestCase): + """Unit test class for device Station Manager""" + + def setUp(self): + super(TestStationManagerDevice, self).setUp() + + def test_init_on(self): + """Test whether ON state is reached at device initialisation""" + with DeviceTestContext( + station_manager.StationManager, process=False, timeout=10 + ) as proxy: + proxy.init() + + self.assertEqual(proxy.state(), DevState.ON) + + async def test_transitions_lock(self): + """Test whether the lock mechanism ensure only one transition + at a time is executed""" + + async def dummy_station_hibernate(lock, shared_var): + """Dummy method to simulate station_hibernate lock""" + async with lock: + shared_var += 1 + await asyncio.sleep(0.1) # Simulate some asynchronous operation + shared_var = 99 + await asyncio.sleep(0.1) # Simulate some asynchronous operation + shared_var -= 1 + + async def dummy_station_standby(lock, shared_var): + """Dummy method to simulate station_standby lock""" + async with lock: + shared_var += 2 + await asyncio.sleep(0.1) # Simulate some asynchronous operation + shared_var = 99 + await asyncio.sleep(0.1) # Simulate some asynchronous operation + shared_var -= 2 + + async def test_concurrent_transitions(lock, shared_var): + """Execute the two mocked coroutines""" + await dummy_station_hibernate(lock, shared_var) + await dummy_station_standby(lock, shared_var) + + def test_lock_serialization(lock, shared_var): + """Test whether lock serialization is correctly working""" + # Execute concurrent transitions multiple times to test serialization + for _ in range(100): + shared_var = 0 # reset variable + asyncio.run(test_concurrent_transitions(lock, shared_var)) + self.assertEqual( + shared_var, + 0, + msg=f"Lock serialization failed. Shared variable={shared_var}", + ) + + with DeviceTestContext( + station_manager.StationManager, process=False, timeout=10 + ) as proxy: + proxy.init() + + shared_var = 0 # Initialize shared variable + lock = asyncio.Lock() # Create a lock + + test_lock_serialization(lock, shared_var) + + # Verify that the device is in the correct state after the transitions + self.assertEqual(proxy.state(), DevState.ON)