Skip to content
Snippets Groups Projects
Select Git revision
  • master
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
40 results

CMakeLists.txt

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    test_events.py 9.50 KiB
    # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    
    from threading import Semaphore
    
    from tangostationcontrol.common.events import (
        EventSubscriptions,
        ChangeEvents,
    )
    
    from tango.server import Device, attribute, AttrWriteType, command
    from tango.test_context import DeviceTestContext
    
    from test import base
    
    
    class EventSubscriptionMixin:
        def setUp(self):
            self.events = EventSubscriptions()
    
            self.callback_called_values = []
    
            # tokens are inserted on each call. start with 0.
            self.callback_call_counter = Semaphore(0)
    
        def callback(self, device, attribute_name, value):
            self.callback_called_values.append((attribute_name, value))
    
            # another token is added
            self.callback_call_counter.release()
    
        def wait_for_callback(self, timeout: float = 2.0):
            self.assertTrue(self.callback_call_counter.acquire(timeout=timeout))
    
        def wait_for_no_callback(self, timeout: float = 2.0):
            self.assertFalse(self.callback_call_counter.acquire(timeout=timeout))
    
    
    class TestEventSubscriptions(EventSubscriptionMixin, base.TestCase):
        class TestDevice(Device):
            """A device with a writable and polled attribute, needed for generating change events."""
    
            A = attribute(
                dtype=int,
                access=AttrWriteType.READ_WRITE,
                polling_period=1000,
                abs_change="1",
            )
    
            def init_device(self):
                super().init_device()
    
                self._A = 0
    
            def read_A(self):
                return self._A
    
            def write_A(self, value):
                self._A = value
    
            @command(dtype_in=str, dtype_out=bool)
            def is_attribute_polled_by_lofardevice(self, attr_name):
                # used by EventSubscriptions. we poll nothing ourselves
                return False
    
        def test_initial_event(self):
            """Do we receive the initial event emitted when subscribing?"""
    
            with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
                # subscribe (triggering a change event)
                self.events.subscribe_change_event(proxy, "A", self.callback)
    
                # wait for change events
                self.wait_for_callback()
    
                # check call sequence
                self.assertListEqual([("A", 0)], self.callback_called_values)
    
        def test_is_subscribed(self):
            with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
                self.assertFalse(self.events.is_subscribed(proxy.name(), "A"))
    
                self.events.subscribe_change_event(proxy, "A", self.callback)
    
                self.assertTrue(self.events.is_subscribed(proxy.name(), "A"))
    
                self.events.unsubscribe_all()
    
                self.assertFalse(self.events.is_subscribed(proxy.name(), "A"))
    
        def test_change_event(self):
            """Do we also receive an event emitted after the value changes (plus the initial)?"""
    
            with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
                # subscribe (triggering a change event)
                self.events.subscribe_change_event(proxy, "A", self.callback)
    
                # change (triggering a change event)
                proxy.A = 42
    
                # wait for 2 change events
                self.wait_for_callback()
                self.wait_for_callback()
    
                # check call sequence
                self.assertListEqual([("A", 0), ("a", 42)], self.callback_called_values)
    
        def test_multiple_callbacks(self):
            """Can we register multiple callbacks on the same attribute?"""
    
            with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
                # subscribe (triggering a change event)
                self.events.subscribe_change_event(proxy, "A", self.callback)
    
                # subscribe again (triggering a change event)
                self.events.subscribe_change_event(proxy, "A", self.callback)
    
                # change (triggering a change event)
                proxy.A = 42
    
                # wait for 4 change events
                self.wait_for_callback()
                self.wait_for_callback()
                self.wait_for_callback()
                self.wait_for_callback()
    
                # check call sequence
                self.assertListEqual(
                    [("A", 0), ("A", 0), ("a", 42), ("a", 42)], self.callback_called_values
                )
    
        def test_unsubscribe(self):
            """Do we stop receiving if we unsubscribe?"""
    
            with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
                # subscribe (triggering a change event)
                self.events.subscribe_change_event(proxy, "A", self.callback)
    
                # unsubscribe
                self.events.unsubscribe_all()
    
                # change (triggering a change event, but aren't subscribed)
                proxy.A = 42
    
                # wait for 1 change event
                self.wait_for_callback()
    
                # fail to wait for another change event
                self.wait_for_no_callback()
    
                # check call sequence
                self.assertListEqual([("A", 0)], self.callback_called_values)
    
        def test_event_errors(self):
            """Are events describing errors handled?"""
    
            with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
                # subscribe (triggering a change event)
                self.events.subscribe_change_event(proxy, "A", self.callback)
    
                # wait for 1 change event
                self.wait_for_callback()
    
                # stop polling attribute! this emits an event error
                proxy.stop_poll_attribute("A")
    
                # change the value anyway, this should not result in a change event
                proxy.A = 42
    
                # fail to wait for another change event
                self.wait_for_no_callback()
    
                # check if error registered
                metric = self.events.error_count_metric.metric.collect()[0]
                self.assertEqual(1.0, metric.samples[0].value)
                self.assertDictEqual(
                    {"event_device": "test/nodb/testdevice", "event_attribute": "None"},
                    metric.samples[0].labels,
                )
    
    
    class TestChangeEvents(EventSubscriptionMixin, base.TestCase):
        class TestDevice(Device):
            """A device with a writable attribute, needed for generating change events."""
    
            bool_attr = attribute(
                dtype=bool,
                access=AttrWriteType.READ_WRITE,
            )
    
            int_attr = attribute(
                dtype=int,
                access=AttrWriteType.READ_WRITE,
            )
    
            def init_device(self):
                super().init_device()
    
                self._bool = False
                self._int = 0
                self.custom_change_events = ChangeEvents(self)
    
                self.custom_change_events.configure_attribute("bool_attr")
                self.custom_change_events.configure_attribute("int_attr")
    
            def read_bool_attr(self):
                return self._bool
    
            def write_bool_attr(self, value):
                self._bool = value
                self.custom_change_events.send_change_event("bool_attr", value)
    
            def read_int_attr(self):
                return self._int
    
            def write_int_attr(self, value):
                self._int = value
                self.custom_change_events.send_change_event("int_attr", value)
    
            @command(dtype_in=str, dtype_out=bool)
            def is_attribute_polled_by_lofardevice(self, attr_name):
                # used by EventSubscriptions. we poll everything ourselves
                return True
    
        def test_bool_change_event(self):
            """Do we also receive an event emitted after the value changes (plus the initial)?"""
    
            with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
                # subscribe (triggering a change event)
                self.events.subscribe_change_event(proxy, "bool_attr", self.callback)
    
                # change (triggering a change event)
                proxy.bool_attr = True
    
                # wait for 2 change events
                self.wait_for_callback()
                self.wait_for_callback()
    
                # check call sequence
                self.assertListEqual(
                    [("bool_attr", False), ("bool_attr", True)], self.callback_called_values
                )
    
        def test_int_change_event(self):
            """Do we also receive an event emitted after the value changes (plus the initial)?"""
    
            with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
                # subscribe (triggering a change event)
                self.events.subscribe_change_event(proxy, "int_attr", self.callback)
    
                # change (triggering a change event)
                proxy.int_attr = 42
    
                # wait for 2 change events
                self.wait_for_callback()
                self.wait_for_callback()
    
                # check call sequence
                self.assertListEqual(
                    [("int_attr", 0), ("int_attr", 42)], self.callback_called_values
                )
    
        def test_no_change_event(self):
            """Do we also receive no event emitted when the value does not change when set (except the initial)?"""
    
            with DeviceTestContext(self.TestDevice, properties={}, process=True) as proxy:
                # subscribe (triggering a change event)
                self.events.subscribe_change_event(proxy, "int_attr", self.callback)
    
                # initial set (triggering a change event)
                proxy.int_attr = 0
    
                # wait for 2 change events
                self.wait_for_callback()
                self.wait_for_callback()
    
                # second set to the same value (triggering no change event)
                proxy.int_attr = 0
    
                # this should not result in a change event
                self.wait_for_no_callback()