Select Git revision
extract_sip_meta.cwl
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
test_events.py 9.50 KiB
# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from threading import Semaphore
from tangostationcontrol.common.events import (
EventSubscriptions,
ChangeEvents,
)
from tango.server import Device, attribute, AttrWriteType, command
from tango.test_context import DeviceTestContext
from test import base
class EventSubscriptionMixin:
def setUp(self):
self.events = EventSubscriptions()
self.callback_called_values = []
# tokens are inserted on each call. start with 0.
self.callback_call_counter = Semaphore(0)
def callback(self, device, attribute_name, value):
self.callback_called_values.append((attribute_name, value))
# another token is added
self.callback_call_counter.release()
def wait_for_callback(self, timeout: float = 2.0):
self.assertTrue(self.callback_call_counter.acquire(timeout=timeout))
def wait_for_no_callback(self, timeout: float = 2.0):
self.assertFalse(self.callback_call_counter.acquire(timeout=timeout))
class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase):
class TestDevice(Device):
"""A device with a writable and polled attribute, needed for generating change events."""
A = attribute(
dtype=int,
access=AttrWriteType.READ_WRITE,
polling_period=1000,
abs_change="1",
)
def init_device(self):
super().init_device()
self._A = 0
def read_A(self):
return self._A
def write_A(self, value):
self._A = value
@command(dtype_in=str, dtype_out=bool)
def is_attribute_polled_by_lofardevice(self, attr_name):
# used by EventSubscriptions. we poll nothing ourselves
return False
def test_initial_event(self):
"""Do we receive the initial event emitted when subscribing?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# wait for change events
self.wait_for_callback()
# check call sequence
self.assertListEqual([("A", 0)], self.callback_called_values)
def test_is_subscribed(self):
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
self.assertFalse(self.events.is_subscribed(proxy.name(), "A"))
self.events.subscribe_change_event(proxy, "A", self.callback)
self.assertTrue(self.events.is_subscribed(proxy.name(), "A"))
self.events.unsubscribe_all()
self.assertFalse(self.events.is_subscribed(proxy.name(), "A"))
def test_change_event(self):
"""Do we also receive an event emitted after the value changes (plus the initial)?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# change (triggering a change event)
proxy.A = 42
# wait for 2 change events
self.wait_for_callback()
self.wait_for_callback()
# check call sequence
self.assertListEqual([("A", 0), ("a", 42)], self.callback_called_values)
def test_multiple_callbacks(self):
"""Can we register multiple callbacks on the same attribute?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# subscribe again (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# change (triggering a change event)
proxy.A = 42
# wait for 4 change events
self.wait_for_callback()
self.wait_for_callback()
self.wait_for_callback()
self.wait_for_callback()
# check call sequence
self.assertListEqual(
[("A", 0), ("A", 0), ("a", 42), ("a", 42)], self.callback_called_values
)
def test_unsubscribe(self):
"""Do we stop receiving if we unsubscribe?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# unsubscribe
self.events.unsubscribe_all()
# change (triggering a change event, but aren't subscribed)
proxy.A = 42
# wait for 1 change event
self.wait_for_callback()
# fail to wait for another change event
self.wait_for_no_callback()
# check call sequence
self.assertListEqual([("A", 0)], self.callback_called_values)
def test_event_errors(self):
"""Are events describing errors handled?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "A", self.callback)
# wait for 1 change event
self.wait_for_callback()
# stop polling attribute! this emits an event error
proxy.stop_poll_attribute("A")
# change the value anyway, this should not result in a change event
proxy.A = 42
# fail to wait for another change event
self.wait_for_no_callback()
# check if error registered
metric = self.events.error_count_metric.metric.collect()[0]
self.assertEqual(1.0, metric.samples[0].value)
self.assertDictEqual(
{"event_device": "test/nodb/testdevice", "event_attribute": "None"},
metric.samples[0].labels,
)
class TestChangeEvents(EventSubscriptionMixin, base.TestCase):
class TestDevice(Device):
"""A device with a writable attribute, needed for generating change events."""
bool_attr = attribute(
dtype=bool,
access=AttrWriteType.READ_WRITE,
)
int_attr = attribute(
dtype=int,
access=AttrWriteType.READ_WRITE,
)
def init_device(self):
super().init_device()
self._bool = False
self._int = 0
self.custom_change_events = ChangeEvents(self)
self.custom_change_events.configure_attribute("bool_attr")
self.custom_change_events.configure_attribute("int_attr")
def read_bool_attr(self):
return self._bool
def write_bool_attr(self, value):
self._bool = value
self.custom_change_events.send_change_event("bool_attr", value)
def read_int_attr(self):
return self._int
def write_int_attr(self, value):
self._int = value
self.custom_change_events.send_change_event("int_attr", value)
@command(dtype_in=str, dtype_out=bool)
def is_attribute_polled_by_lofardevice(self, attr_name):
# used by EventSubscriptions. we poll everything ourselves
return True
def test_bool_change_event(self):
"""Do we also receive an event emitted after the value changes (plus the initial)?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "bool_attr", self.callback)
# change (triggering a change event)
proxy.bool_attr = True
# wait for 2 change events
self.wait_for_callback()
self.wait_for_callback()
# check call sequence
self.assertListEqual(
[("bool_attr", False), ("bool_attr", True)], self.callback_called_values
)
def test_int_change_event(self):
"""Do we also receive an event emitted after the value changes (plus the initial)?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "int_attr", self.callback)
# change (triggering a change event)
proxy.int_attr = 42
# wait for 2 change events
self.wait_for_callback()
self.wait_for_callback()
# check call sequence
self.assertListEqual(
[("int_attr", 0), ("int_attr", 42)], self.callback_called_values
)
def test_no_change_event(self):
"""Do we also receive no event emitted when the value does not change when set (except the initial)?"""
with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
# subscribe (triggering a change event)
self.events.subscribe_change_event(proxy, "int_attr", self.callback)
# initial set (triggering a change event)
proxy.int_attr = 0
# wait for 2 change events
self.wait_for_callback()
self.wait_for_callback()
# second set to the same value (triggering no change event)
proxy.int_attr = 0
# this should not result in a change event
self.wait_for_no_callback()