From 2a46a483461c54095ffc97e56b9985ab2b5a3159 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Corn=C3=A9=20Lukken?= <lukken@astron.nl>
Date: Tue, 21 May 2024 19:05:04 +0000
Subject: [PATCH] Resolve L2SS-1740: Statistics metadata

---
 CDB/stations/common.json                      |   19 +
 README.md                                     |    1 +
 .../startup/01-devices.py                     |    3 +-
 infra/env.yaml                                |    1 +
 sbin/install-hooks/pre-commit.sh              |    1 -
 sbin/run_integration_test.sh                  |    2 +-
 setup.sh                                      |    1 +
 tangostationcontrol/VERSION                   |    2 +-
 .../default/devices/test_device_metadata.py   |   21 +
 .../test_device_observation_control.py        |   40 +-
 tangostationcontrol/requirements.txt          |    3 +-
 .../clients/statistics/consumer.py            |    2 +-
 .../tangostationcontrol/common/constants.py   |    3 +
 .../common/events/subscriptions.py            |   56 +-
 .../tangostationcontrol/devices/README.md     |   17 +-
 .../tangostationcontrol/devices/__init__.py   |    2 +
 .../tangostationcontrol/devices/metadata.py   |  252 +++++
 .../devices/observation_control.py            |   46 +-
 .../tangostationcontrol/devices/sdp/bst.py    |    2 +-
 .../tangostationcontrol/devices/sdp/sst.py    |    2 +-
 .../tangostationcontrol/devices/sdp/xst.py    |    2 +-
 .../tangostationcontrol/devices/types.py      |    1 +
 .../tangostationcontrol/metadata/__init__.py  |    0
 .../metadata/metadata_organizer.py            |  122 ++
 .../metadata/metadata_types.py                |   14 +
 .../observation/observation.py                |   22 +-
 .../observation/observation_callback.py       |   10 +
 .../observation/observation_controller.py     |   31 +-
 .../tangostationcontrol/zeromq/__init__.py    |    0
 .../tangostationcontrol/zeromq/pipe.py        |   22 +
 .../tangostationcontrol/zeromq/publisher.py   |  135 +++
 .../tangostationcontrol/zeromq/subscriber.py  |   49 +
 .../test/devices/test_metadata_device.py      |   94 ++
 .../test/devices/test_observation_control.py  |   51 +
 .../test_observation_control_device.py        |   11 -
 .../test/devices/test_observation_device.py   |   12 -
 ....py => test_temperature_manager_device.py} |    0
 tangostationcontrol/test/metadata/__init__.py |    0
 .../test/metadata/test_metadata_organizer.py  |  277 +++++
 tangostationcontrol/test/metrics/__init__.py  |    0
 .../test/observation/test_observation.py      |   71 +-
 .../test_observation_controller.py            |   36 +-
 tangostationcontrol/test/zeromq/__init__.py   |    0
 .../test/zeromq/test_configdb.json            | 1004 +++++++++++++++++
 .../test/zeromq/test_publisher.py             |  249 ++++
 tangostationcontrol/tox.ini                   |    3 +
 46 files changed, 2592 insertions(+), 100 deletions(-)
 create mode 100644 tangostationcontrol/integration_test/default/devices/test_device_metadata.py
 create mode 100644 tangostationcontrol/tangostationcontrol/devices/metadata.py
 create mode 100644 tangostationcontrol/tangostationcontrol/metadata/__init__.py
 create mode 100644 tangostationcontrol/tangostationcontrol/metadata/metadata_organizer.py
 create mode 100644 tangostationcontrol/tangostationcontrol/metadata/metadata_types.py
 create mode 100644 tangostationcontrol/tangostationcontrol/observation/observation_callback.py
 create mode 100644 tangostationcontrol/tangostationcontrol/zeromq/__init__.py
 create mode 100644 tangostationcontrol/tangostationcontrol/zeromq/pipe.py
 create mode 100644 tangostationcontrol/tangostationcontrol/zeromq/publisher.py
 create mode 100644 tangostationcontrol/tangostationcontrol/zeromq/subscriber.py
 create mode 100644 tangostationcontrol/test/devices/test_metadata_device.py
 create mode 100644 tangostationcontrol/test/devices/test_observation_control.py
 delete mode 100644 tangostationcontrol/test/devices/test_observation_control_device.py
 delete mode 100644 tangostationcontrol/test/devices/test_observation_device.py
 rename tangostationcontrol/test/devices/{test_device_temperature_manager.py => test_temperature_manager_device.py} (100%)
 create mode 100644 tangostationcontrol/test/metadata/__init__.py
 create mode 100644 tangostationcontrol/test/metadata/test_metadata_organizer.py
 create mode 100644 tangostationcontrol/test/metrics/__init__.py
 create mode 100644 tangostationcontrol/test/zeromq/__init__.py
 create mode 100644 tangostationcontrol/test/zeromq/test_configdb.json
 create mode 100644 tangostationcontrol/test/zeromq/test_publisher.py

diff --git a/CDB/stations/common.json b/CDB/stations/common.json
index c161eecf1..fec943802 100644
--- a/CDB/stations/common.json
+++ b/CDB/stations/common.json
@@ -146,8 +146,27 @@
           "STAT/ObservationControl/1": {
             "properties": {
               "Power_Children": [
+                "STAT/Metadata/1"
               ],
               "Control_Children": [
+                "STAT/Metadata/1"
+              ]
+            }
+          }
+        }
+      }
+    },
+    "Metadata": {
+      "STAT": {
+        "Metadata": {
+          "STAT/Metadata/1": {
+            "properties": {
+              "Power_Children": [
+              ],
+              "Control_Children": [
+              ],
+              "Metadata_Port": [
+                "6001"
               ]
             }
           }
diff --git a/README.md b/README.md
index a30b29f86..3e4469100 100644
--- a/README.md
+++ b/README.md
@@ -161,6 +161,7 @@ Next change the version in the following places:
 
 # Release Notes
 
+* 0.38.0 Add metadata device publishing zmq events
 * 0.37.2 Improved event-subscription interface, avoid overlap between polling loops.
 * 0.37.1 Improved asyncio resource teardown when devices go Off.
 * 0.37.0-1 Fix for deploying on DTS Lab
diff --git a/docker/jupyter-lab/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py b/docker/jupyter-lab/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py
index 182cf5245..ed361a2bc 100644
--- a/docker/jupyter-lab/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py
+++ b/docker/jupyter-lab/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py
@@ -78,11 +78,11 @@ ccd = OptionalDeviceProxy("STAT/CCD/1")
 ec = OptionalDeviceProxy("STAT/EC/1")
 pcon = OptionalDeviceProxy("STAT/PCON/1")
 psoc = OptionalDeviceProxy("STAT/PSOC/1")
-docker = OptionalDeviceProxy("STAT/Docker/1")
 temperaturemanager = OptionalDeviceProxy("STAT/TemperatureManager/1")
 configuration = OptionalDeviceProxy("STAT/Configuration/1")
 calibration = OptionalDeviceProxy("STAT/Calibration/1")
 observationcontrol = OptionalDeviceProxy("STAT/ObservationControl/1")
+metadata = OptionalDeviceProxy("STAT/Metadata/1")
 
 # Put them in a list in case one wants to iterate
 devices = (
@@ -94,7 +94,6 @@ devices = (
         ec,
         pcon,
         psoc,
-        docker,
         temperaturemanager,
         configuration,
         sdpfirmware_l,
diff --git a/infra/env.yaml b/infra/env.yaml
index 1b0d5e7f3..875c03013 100644
--- a/infra/env.yaml
+++ b/infra/env.yaml
@@ -78,6 +78,7 @@ devices:
   - DigitalBeam
   - EC
   - ObservationControl
+  - Metadata
   - PCON
   - PSOC
   - RECVH
diff --git a/sbin/install-hooks/pre-commit.sh b/sbin/install-hooks/pre-commit.sh
index 622697e52..603fc8615 100755
--- a/sbin/install-hooks/pre-commit.sh
+++ b/sbin/install-hooks/pre-commit.sh
@@ -5,7 +5,6 @@ if [ ! -f "setup.sh" ]; then
   exit 1
 fi
 
-pip install pre-commit
 pre-commit install --hook-type pre-push
 
 # --allow-missing-config: Allows the installation to proceed
diff --git a/sbin/run_integration_test.sh b/sbin/run_integration_test.sh
index 0505478d9..91ddecf67 100755
--- a/sbin/run_integration_test.sh
+++ b/sbin/run_integration_test.sh
@@ -282,7 +282,7 @@ echo "Using tango host $TANGO_HOST"
 
 # Devices list is used to explitly word split when supplied to commands, must
 # disable shellcheck SC2086 for each case.
-DEVICES=(device-stationmanager device-aps device-apsct device-ccd device-ec device-apspu device-sdpfirmware device-sdp device-recvh device-recvl device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-afh device-afl device-temperaturemanager device-observationcontrol device-configuration device-calibration)
+DEVICES=(device-stationmanager device-aps device-apsct device-ccd device-ec device-apspu device-sdpfirmware device-sdp device-recvh device-recvl device-bst device-sst device-unb2 device-xst device-beamlet device-digitalbeam device-tilebeam device-psoc device-pcon device-afh device-afl device-temperaturemanager device-observationcontrol device-configuration device-calibration device-metadata)
 
 # Wait for devices to restart
 
diff --git a/setup.sh b/setup.sh
index c3c8d8986..6fad1cbc5 100755
--- a/setup.sh
+++ b/setup.sh
@@ -22,6 +22,7 @@ fi
 
 # Activate the virtual environment
 source "$VENV_DIR/bin/activate"
+pip install pre-commit
 
 # Install git hooks
 if [ ! -f "${LOFAR20_DIR}/.git/hooks/post-checkout" ]; then
diff --git a/tangostationcontrol/VERSION b/tangostationcontrol/VERSION
index 8570a3aeb..ca75280b0 100644
--- a/tangostationcontrol/VERSION
+++ b/tangostationcontrol/VERSION
@@ -1 +1 @@
-0.37.2
+0.38.0
diff --git a/tangostationcontrol/integration_test/default/devices/test_device_metadata.py b/tangostationcontrol/integration_test/default/devices/test_device_metadata.py
new file mode 100644
index 000000000..7a929098a
--- /dev/null
+++ b/tangostationcontrol/integration_test/default/devices/test_device_metadata.py
@@ -0,0 +1,21 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+
+from integration_test.default.devices.base import AbstractTestBases
+
+logger = logging.getLogger()
+
+
+class TestMetadataDevice(AbstractTestBases.TestDeviceBase):
+
+    def setUp(self):
+        super().setUp("STAT/Metadata/1")
+        self.psoc_proxy = self.setup_proxy("STAT/PSOC/1")
+
+    def test_send_metadata(self):
+        """Turn on the device and emit metadata"""
+
+        self.proxy.boot()
+        self.proxy.send_metadata()
diff --git a/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py b/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py
index 32326d096..0c50be803 100644
--- a/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py
+++ b/tangostationcontrol/integration_test/default/devices/test_device_observation_control.py
@@ -3,6 +3,7 @@
 
 import json
 import logging
+import time
 from datetime import datetime
 from datetime import timedelta
 
@@ -14,7 +15,7 @@ from tangostationcontrol.common.constants import CS001_TILES
 from tangostationcontrol.test.dummy_observation_settings import (
     get_observation_settings_hba_immediate,
 )
-
+from timeout_decorator import timeout_decorator
 
 logger = logging.getLogger()
 
@@ -83,6 +84,7 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase):
         self.recv_proxy = self.setup_proxy("STAT/RECVH/H0", defaults=True)
         self.sdpfirmware_proxy = self.setup_proxy("STAT/SDPFirmware/HBA0")
         self.sdp_proxy = self.setup_proxy("STAT/SDP/HBA0")
+        self.metadata_proxy = self.setup_proxy("STAT/Metadata/1")
 
         control_mapping = [[1, i] for i in range(CS001_TILES)]
         self.antennafield_proxy = self.setup_proxy(
@@ -284,3 +286,39 @@ class TestObservationControlDevice(AbstractTestBases.TestDeviceBase):
 
         self.on_device_assert(self.proxy)
         self.assertRaises(DevFailed, self.proxy.add_observation, json.dumps(parameters))
+
+    @timeout_decorator.timeout(60)
+    def test_add_observation_callbacks(self):
+        """Test adding an observation and checking start / stop callbacks work"""
+
+        self.on_device_assert(self.proxy)
+
+        parameters = json.loads(self.VALID_JSON)
+        for antenna_field in parameters["antenna_fields"]:
+            antenna_field["start_time"] = (
+                datetime.now() + timedelta(seconds=5)
+            ).isoformat()
+            antenna_field["stop_time"] = (
+                datetime.now() + timedelta(seconds=20)
+            ).isoformat()
+
+        self.proxy.add_observation(json.dumps(parameters))
+
+        self.assertFalse(self.proxy.is_observation_running(self.EXPECTED_OBS_ID))
+
+        time.sleep(5)
+
+        while not self.proxy.is_observation_running(self.EXPECTED_OBS_ID):
+            logging.info("Waiting for observation to start...")
+            time.sleep(0.1)
+
+        self.assertTrue(self.proxy.is_observation_running(self.EXPECTED_OBS_ID))
+        self.assertGreater(self.metadata_proxy.messages_published_R, 0)
+
+        time.sleep(20)
+
+        while self.proxy.is_observation_running(self.EXPECTED_OBS_ID):
+            logging.info("Waiting for observation to stop...")
+            time.sleep(0.1)
+
+        self.assertFalse(self.proxy.is_observation_running(self.EXPECTED_OBS_ID))
diff --git a/tangostationcontrol/requirements.txt b/tangostationcontrol/requirements.txt
index 6b628f3e0..11ac961a1 100644
--- a/tangostationcontrol/requirements.txt
+++ b/tangostationcontrol/requirements.txt
@@ -3,7 +3,8 @@
 # integration process, which may cause wedges in the gate later.
 
 lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client  # Apache 2
-PyTango>=9.5.1rc1 # LGPL v3
+PyTango>=9.5.1 # LGPL v3
+pyzmq>=24 # LGPL + BSD
 numpy>=1.21.6 # BSD3
 asyncua >= 0.9.90 # LGPLv3
 psycopg2-binary >= 2.9.2 # LGPL
diff --git a/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py
index 5372ea1ba..758a88ba2 100644
--- a/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py
+++ b/tangostationcontrol/tangostationcontrol/clients/statistics/consumer.py
@@ -6,7 +6,7 @@ import time
 from queue import Queue
 from threading import Thread
 
-from lofar_station_client.statistics.collector import StatisticsCollector
+from lofar_station_client.statistics.collectors import StatisticsCollector
 from lofar_station_client.statistics.packets import StatisticsPacket
 from tangostationcontrol.clients.statistics.client_thread import StatisticsClientThread
 
diff --git a/tangostationcontrol/tangostationcontrol/common/constants.py b/tangostationcontrol/tangostationcontrol/common/constants.py
index 766c66da9..cb283fc3b 100644
--- a/tangostationcontrol/tangostationcontrol/common/constants.py
+++ b/tangostationcontrol/tangostationcontrol/common/constants.py
@@ -106,6 +106,9 @@ MAX_ETH_FRAME_SIZE = 9000
 # The default polling period for polled attributes (in milliseconds)
 DEFAULT_POLLING_PERIOD_MS = 1000
 
+# The metadata polling period for polled attributes (in milliseconds)
+DEFAULT_POLLING_PERIOD_METADATA_MS = 60000
+
 # The default polling period for attributes polled to update metrics in Prometheus (in milliseconds)
 DEFAULT_METRICS_POLLING_PERIOD_MS = 2500
 
diff --git a/tangostationcontrol/tangostationcontrol/common/events/subscriptions.py b/tangostationcontrol/tangostationcontrol/common/events/subscriptions.py
index e48003150..832293b3b 100644
--- a/tangostationcontrol/tangostationcontrol/common/events/subscriptions.py
+++ b/tangostationcontrol/tangostationcontrol/common/events/subscriptions.py
@@ -4,13 +4,16 @@
 from dataclasses import dataclass
 from functools import partial
 import logging
-from typing import List, Dict, Callable
+from typing import List, Dict, Callable, Optional
 
 from tango import DeviceProxy, EventType, DevFailed
 from prometheus_client import Counter
 
 from tangostationcontrol.common.lofar_logging import exception_to_str
-from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD_MS
+from tangostationcontrol.common.constants import (
+    DEFAULT_POLLING_PERIOD_MS,
+    DEFAULT_METRICS_POLLING_PERIOD_MS,
+)
 from tangostationcontrol.common.case_insensitive_string import CaseInsensitiveString
 from tangostationcontrol.metrics import AttributeMetric
 
@@ -43,7 +46,9 @@ class EventSubscriptions:
             dynamic_labels=["event_device", "event_attribute"],
         )
 
-    def _get_subscription(self, dev_name: str, attr_name: str) -> bool:
+    def _get_subscription(
+        self, dev_name: str, attr_name: str
+    ) -> Optional[Subscription]:
         for sub in self.subscriptions:
             if (
                 CaseInsensitiveString(sub.proxy.name()) == dev_name
@@ -100,11 +105,31 @@ class EventSubscriptions:
                     f"Callback {callback} for device {device} attribute {attribute_name} threw an exception: {exception_to_str(ex)}"
                 )
 
+    def poll_period(self, proxy: DeviceProxy, attr_name: str) -> int | None:
+        """Return the polling period for the given attribute, in milliseconds,
+        or None if the attribute is not polled at all."""
+
+        # LofarDevice polls a set of attributes not covered by Tango's poll loop
+        try:
+            if proxy.is_attribute_polled_by_lofardevice(attr_name):
+                return DEFAULT_METRICS_POLLING_PERIOD_MS
+        except AttributeError:
+            # Not a LofarDevice
+            pass
+
+        # Check if polled by Tango
+        if proxy.is_attribute_polled(attr_name):
+            return proxy.get_attribute_poll_period(attr_name)
+
+        # No polling detected
+        return None
+
     def subscribe_change_event(
         self,
         proxy: DeviceProxy,
         attr_name: str,
         callback: EventCallbackType,
+        period: int = DEFAULT_POLLING_PERIOD_MS,
     ):
         """Subscribe to changes to an attribute of another device.
         Immediately and on change, the provided callback will be called as
@@ -117,6 +142,24 @@ class EventSubscriptions:
         the callback.
         """
 
+        # make sure th attribute is polled often enough
+        current_period = self.poll_period(proxy, attr_name)
+
+        if current_period is None or current_period > period:
+            if current_period is None:
+                logger.info(f"Enabling polling for {proxy.name()}/{attr_name}.")
+            else:
+                logger.info(
+                    "Adjusting tango controlled polling period from %d to %d",
+                    current_period,
+                    period,
+                )
+
+            # NB: This requires either abs_change or rel_change to be set
+            #     for numerical attributes.
+            proxy.poll_attribute(attr_name, period)
+
+        # add or extend subscription to CHANGE_EVENT
         if sub := self._get_subscription(proxy.name(), attr_name):
             # extend existing subscription
             logger.debug(
@@ -143,13 +186,6 @@ class EventSubscriptions:
                 callbacks=[callback],
             )
 
-            # make sure the attribute is polled, otherwise we wont receive event
-            if not proxy.is_attribute_polled_by_lofardevice(attr_name):
-                # LOFARDevice does not poll this attribute, but maybe Tango does
-                if not proxy.is_attribute_polled(attr_name):
-                    logger.info(f"Enabling polling for {proxy.name()}/{attr_name}.")
-                    proxy.poll_attribute(attr_name, DEFAULT_POLLING_PERIOD_MS)
-
             # subscribe
             logger.debug(
                 f"Subscribing callback {callback} to CHANGE_EVENT on device {proxy} attribute {attr_name}"
diff --git a/tangostationcontrol/tangostationcontrol/devices/README.md b/tangostationcontrol/tangostationcontrol/devices/README.md
index 2e20c416f..80fbfe703 100644
--- a/tangostationcontrol/tangostationcontrol/devices/README.md
+++ b/tangostationcontrol/tangostationcontrol/devices/README.md
@@ -6,17 +6,12 @@ This directory contains the sources for our custom Tango devices.
 
 If a new device is added, it will (likely) need to be referenced in several places. Adjust or add the following files (referenced from the repository root), following the pattern shown by the devices already there:
 
-- Adjust `CDB/LOFAR_ConfigDb.json` to create the device in the Tango device database,
-- Add the device hierarchies in `CDB/hierarchies` to define the power, control and clock configuration,
+- Adjust `CDB/stations/common.json` to create the device in the Tango device database,
+- Add the device hierarchies to define the power and control configuration in `common.json`
 - Adjust `docker/jupyter-lab/ipython-profiles/stationcontrol-jupyter/startup/01-devices.py` to make an alias for it available in Jupyter-Lab,
-- Adjust `tangostationcontrol/tangostationcontrol/devices/boot.py` to add the device to the station initialisation sequence,
-- Add to `docker-compose/` to create a YaML file to start the device in a docker container. NOTE: it needs a unique 57xx port assigned (current _unused_ port value: 5731), a unique 58xx port for ZMQ events, and a unique 59xx port for ZMQ heartbeat
-- Adjust `tangostationcontrol/setup.cfg` to add an entry point for the device in the package installation,
-- Add to `tangostationcontrol/tangostationcontrol/integration_test/default/devices/` to add an integration test,
-- Adjust `sbin/run_integration_test.sh` to have the device started when running the integration tests,
-- Adjust `.gitlab-ci.yml` to add the device to the `docker_build_image_all` step and to create a `docker_build_image_device_XXX` step (N.B. not necessary if device is based on `lofar-device-base`),
-- Add to `sbin/tag_and_push_docker_image.sh` the LOCAL_IMAGES device name, imagine name and build for integration boolean triple (N.B. not necessary if device is based on `lofar-device-base`),
+- Add to `infra/env.yaml` the device class.
+- Adjust `tangostationcontrol/devices/__init__.py` to include the new device class and add to `__all__`
+- Add the device class to `/sbin/run_integration_test.sh` list of devices
+- Create `tangostationcontrol/tangostationcontrol/integration_test/default/devices/test_device_xxx.py` to add an integration test,
 - Add to `tangostationcontrol/docs/source/devices/` to mention the device in the end-user documentation.
 - Adjust `tangostationcontrol/docs/source/index.rst` to include the newly created file in `docs/source/devices/`.
-- Adjust `docker-compose/tango-prometheus-exporter/lofar2-policy.json` (in all lowercase) to include this device in the prometheus exporter
-- Adjust `docker-compose/tango-prometheus-exporter/lofar2-fast-policy.json` (in all lowercase) to include this device in the prometheus exporter.
diff --git a/tangostationcontrol/tangostationcontrol/devices/__init__.py b/tangostationcontrol/tangostationcontrol/devices/__init__.py
index 72b5e95a6..cd096388f 100644
--- a/tangostationcontrol/tangostationcontrol/devices/__init__.py
+++ b/tangostationcontrol/tangostationcontrol/devices/__init__.py
@@ -12,6 +12,7 @@ from .configuration import Configuration
 from .ec import EC
 from .observation_field import ObservationField
 from .observation_control import ObservationControl
+from .metadata import Metadata
 from .pcon import PCON
 from .psoc import PSOC
 from .recv.recvh import RECVH
@@ -41,6 +42,7 @@ __all__ = [
     "EC",
     "ObservationField",
     "ObservationControl",
+    "Metadata",
     "PCON",
     "PSOC",
     "RECVL",
diff --git a/tangostationcontrol/tangostationcontrol/devices/metadata.py b/tangostationcontrol/tangostationcontrol/devices/metadata.py
new file mode 100644
index 000000000..d149910f3
--- /dev/null
+++ b/tangostationcontrol/tangostationcontrol/devices/metadata.py
@@ -0,0 +1,252 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+"""Metadata device to emit various data"""
+
+# Additional import
+import logging
+import time
+from typing import Any
+
+import numpy
+from tango import DebugIt, DeviceProxy
+from tango.server import device_property, attribute, command
+
+# PyTango imports
+from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict
+from tangostationcontrol.common.constants import DEFAULT_POLLING_PERIOD_METADATA_MS
+from tangostationcontrol.common.device_decorators import only_in_states
+from tangostationcontrol.common.lofar_logging import log_exceptions
+from tangostationcontrol.common.states import DEFAULT_COMMAND_STATES
+from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice
+from tangostationcontrol.metadata.metadata_organizer import MetadataOrganizer
+from tangostationcontrol.metadata.metadata_types import metadata_config_type
+from tangostationcontrol.metadata.metadata_types import metadata_proxy_type
+from tangostationcontrol.metrics import device_metrics
+from tangostationcontrol.zeromq.publisher import ZeroMQPublisher
+
+
+logger = logging.getLogger()
+
+__all__ = ["Metadata"]
+
+
+@device_metrics()
+class Metadata(LOFARDevice):
+    """Metadata device to emit data about relevant to statistics and observations"""
+
+    # -----------------
+    # Device Properties
+    # -----------------
+
+    metadata_protocol = device_property(dtype="DevString", default_value="tcp")
+
+    metadata_bind = device_property(dtype="DevString", default_value="*")
+
+    metadata_port = device_property(dtype="DevUShort", mandatory=True)
+
+    metadata_topic = device_property(dtype="DevString", default_value="metadata")
+
+    # ----------
+    # Attributes
+    # ----------
+
+    @attribute(
+        doc="Is the metadata publisher running",
+        dtype=bool,
+    )
+    def is_running_R(self):
+        if self._publisher:
+            return self._publisher.is_running
+        return False
+
+    @attribute(
+        doc="Is the metadata publisher stopping",
+        dtype=bool,
+    )
+    def is_stopping_R(self):
+        if self._publisher:
+            return self._publisher.is_stopping
+        return False
+
+    @attribute(
+        doc="Queue fill percentage",
+        dtype=numpy.float64,
+    )
+    def queue_fill_percentage_R(self):
+        if self._publisher:
+            return self._publisher.queue_fill / self._publisher.queue_size
+        return 0.0
+
+    @attribute(
+        doc="Number of messages published since device was started", dtype=numpy.int64
+    )
+    def messages_published_R(self):
+        return self._num_published
+
+    # --------
+    # Overloaded functions
+    # --------
+
+    STOP_TIME_LIMIT = 10
+
+    METADATA_CONFIG: metadata_config_type = CaseInsensitiveDict(
+        {
+            "AFH": [
+                "Antenna_to_SDP_Mapping_R",
+                "Antenna_Names_R",
+                "RCU_PCB_ID_R",
+                "RCU_PCB_version_R",
+                "Antenna_Usage_Mask_R",
+                "Antenna_Reference_ITRF_R",
+                "Frequency_Band_RW",
+                "RCU_attenuator_dB_R",
+                "RCU_DTH_on_R",
+                "RCU_DTH_freq_R",
+                "HBAT_PWR_on_R",
+            ],
+            "AFL": [
+                "Antenna_to_SDP_Mapping_R",
+                "Antenna_Names_R",
+                "RCU_PCB_ID_R",
+                "RCU_PCB_version_R",
+                "Antenna_Usage_Mask_R",
+                "Antenna_Reference_ITRF_R",
+                "Frequency_Band_RW",
+                "RCU_attenuator_dB_R",
+                "RCU_DTH_on_R",
+                "RCU_DTH_freq_R",
+            ],
+            "SDP": ["subband_frequency_R"],
+            "TileBeam": ["Pointing_direction_str_R", "Tracking_enabled_R"],
+            "DigitalBeam": [
+                "Pointing_direction_str_R",
+                "Tracking_enabled_R",
+                "subband_select_RW",
+            ],
+            "SDPFirmware": [
+                "FPGA_firmware_version_R",
+                "FPGA_hardware_version_R",
+                "nr_signal_inputs_R",
+                "first_signal_input_index_R",
+            ],
+        }
+    )
+
+    def __init__(self, cl, name):
+        self._num_published = 0
+        self._publisher = None
+        self._organizer = None
+
+        # Super must be called after variable assignment due to executing init_device!
+        super().__init__(cl, name)
+
+    def _wait_for_done(self):
+        """Spin until publisher is done"""
+        accumulate = 0
+        while not self._publisher.is_done and accumulate < self.STOP_TIME_LIMIT:
+            logger.debug("Waiting for publisher thread to stop..")
+            time.sleep(0.1)
+            accumulate += 0.1
+
+        if accumulate > self.STOP_TIME_LIMIT:
+            raise TimeoutError("Failed to stop publisher thread!")
+
+    def configure_for_off(self):
+        """user code here. is called when the state is set to OFF"""
+        super().configure_for_off()
+
+        # Delete so that event subscriptions are recreated
+        if self._organizer:
+            del self._organizer
+            self._organizer = None
+
+        if self._publisher:
+            self._publisher.shutdown()
+            try:
+                self._wait_for_done()
+            finally:
+                self._publisher = None
+
+    @log_exceptions()
+    def configure_for_initialise(self):
+        """Initialises the attributes and properties of the statistics device."""
+        super().configure_for_initialise()
+
+        if not self._publisher:
+            self._publisher = ZeroMQPublisher(
+                ZeroMQPublisher.contstruct_bind_uri(
+                    self.metadata_protocol, self.metadata_bind, self.metadata_port
+                ),
+                [self.metadata_topic],
+            )
+
+        if not self._organizer:
+            self._organizer = MetadataOrganizer(config=self.METADATA_CONFIG)
+
+        proxies = self._organizer.create_proxies()
+        self.register_change_event_subscriptions(proxies)
+
+    def register_change_event_subscriptions(self, proxies: metadata_proxy_type):
+        """Register change events for working device proxies"""
+
+        for device_name, proxy in proxies.items():
+            if not isinstance(proxy, DeviceProxy):
+                logger.exception(
+                    "Failed to subscribe to change events for device %s due to %s",
+                    device_name,
+                    proxy,
+                )
+                continue
+
+            dev_info = proxy.info()
+            attributes = self.METADATA_CONFIG.get(dev_info.dev_class, None)
+            if not attributes:
+                logger.warning(
+                    "register_change_event_subscriptions called for device (%s) not "
+                    "present in METADATA_CONFIG, aborting subscription",
+                    device_name,
+                )
+                continue
+
+            for attribute_name in attributes:
+                try:
+                    self.events.subscribe_change_event(
+                        proxy,
+                        attribute_name,
+                        self.change_event_handler,
+                        DEFAULT_POLLING_PERIOD_METADATA_MS,
+                    )
+                except Exception as e:
+                    logger.exception(
+                        "Subscription for attribute %s on device %s failed due to %s",
+                        attribute_name,
+                        device_name,
+                        e,
+                    )
+
+    def change_event_handler(
+        self, device: DeviceProxy, attribute_name: str, value: Any
+    ):
+        logger.debug(
+            "Attribute %s for device %s changed",
+            device,
+            attribute_name,
+        )
+        self._organizer.partial_update(device, attribute_name, value)
+        self._send_metadata()
+
+    def _send_metadata(self):
+        """"""
+        self._publisher.send(self._organizer.get_json())
+        self._num_published += 1
+
+    @command()
+    @DebugIt()
+    @only_in_states(DEFAULT_COMMAND_STATES)
+    def send_metadata(self):
+        """Gather and publish metadata attempt to connect failed proxies"""
+        new_proxies = self._organizer.create_proxies()
+        self.register_change_event_subscriptions(new_proxies)
+        self._organizer.gather_metadata()
+        self._send_metadata()
diff --git a/tangostationcontrol/tangostationcontrol/devices/observation_control.py b/tangostationcontrol/tangostationcontrol/devices/observation_control.py
index 51f2fdbd9..e7e16f025 100644
--- a/tangostationcontrol/tangostationcontrol/devices/observation_control.py
+++ b/tangostationcontrol/tangostationcontrol/devices/observation_control.py
@@ -19,6 +19,7 @@ from tangostationcontrol.common.lofar_logging import (
 from tangostationcontrol.configuration import ObservationSettings
 from tangostationcontrol.devices.base_device_classes.lofar_device import LOFARDevice
 from tangostationcontrol.devices.observation_field import ObservationField
+from tangostationcontrol.devices.types import DeviceTypes
 from tangostationcontrol.common.device_decorators import only_when_on, debugit
 from tangostationcontrol.metrics import device_metrics
 
@@ -108,14 +109,27 @@ class ObservationControl(LOFARDevice):
 
         # The top level tango domain is the left-most part of a
         # device's name.
+        self.metadata = None
         self.myTangoDomain: str = self.get_name().split("/")[0]
 
         self._observation_controller: ObservationController = ObservationController(
-            self.myTangoDomain
+            tango_domain=self.myTangoDomain,
+            start_callback=self._start_observation_callback,
         )
 
         self._stop_all_observations_now()
 
+    def _start_observation_callback(self, observation_id: int):
+        """Callback for when an observation is started, use to emit metadata"""
+
+        try:
+            self.metadata.send_metadata()
+        except Exception as e:
+            logger.exception(
+                f"Observation start callback for observation {observation_id} "
+                f"failed due to {e}"
+            )
+
     # Core functions
     @log_exceptions()
     @DebugIt()
@@ -128,6 +142,12 @@ class ObservationControl(LOFARDevice):
         #     devices.
         Util.instance().set_polling_threads_pool_size(10)
 
+    @log_exceptions()
+    def configure_for_initialise(self):
+        super().configure_for_initialise()
+
+        self.metadata = self.control.child(DeviceTypes.Metadata)
+
     # Lifecycle functions
     @log_exceptions()
     def configure_for_off(self):
@@ -135,30 +155,6 @@ class ObservationControl(LOFARDevice):
 
         self._stop_all_observations_now()
 
-    # API
-    @command(dtype_in=DevString)
-    @only_when_on()
-    @log_exceptions()
-    def start_observation(self, parameters: DevString = None):
-        """Deprecated. For backward compatibility with old lofar-station-clients."""
-        logger.warning(
-            "Deprecated start_observation command used, please use add_observation "
-            "or start_observation_now instead"
-        )
-        self.add_observation(parameters)
-
-    @command(dtype_in=numpy.int64)
-    @only_when_on()
-    @log_exceptions()
-    def stop_observation(self, obs_id: numpy.int64):
-        """Deprecated. For backward compatibility with old lofar-station-clients."""
-
-        logger.warning(
-            "Deprecated stop_observation command used, please use stop_observation_now "
-            "instead"
-        )
-        self.stop_observation_now(obs_id)
-
     @command(dtype_in=DevString)
     @only_when_on()
     @debugit()
diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py
index f9032ffa4..8810dacfd 100644
--- a/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py
+++ b/tangostationcontrol/tangostationcontrol/devices/sdp/bst.py
@@ -9,7 +9,7 @@ import numpy
 from typing import Dict, List, Tuple
 
 from attribute_wrapper.attribute_wrapper import AttributeWrapper
-from lofar_station_client.statistics.collector import BSTCollector
+from lofar_station_client.statistics.collectors import BSTCollector
 from tango import AttrWriteType
 from tango.server import device_property, attribute
 from tangostationcontrol.clients.opcua_client import OPCUAConnection
diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py
index 9dc46dea2..55f7f4b27 100644
--- a/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py
+++ b/tangostationcontrol/tangostationcontrol/devices/sdp/sst.py
@@ -9,7 +9,7 @@ import numpy
 from typing import Dict, List, Tuple
 
 from attribute_wrapper.attribute_wrapper import AttributeWrapper
-from lofar_station_client.statistics.collector import SSTCollector
+from lofar_station_client.statistics.collectors import SSTCollector
 
 # PyTango imports
 from tango import AttrWriteType
diff --git a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py
index 8be377e78..50a7e7cd3 100644
--- a/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py
+++ b/tangostationcontrol/tangostationcontrol/devices/sdp/xst.py
@@ -9,7 +9,7 @@ import numpy
 from typing import Dict, List, Tuple
 
 from attribute_wrapper.attribute_wrapper import AttributeWrapper
-from lofar_station_client.statistics.collector import XSTCollector
+from lofar_station_client.statistics.collectors import XSTCollector
 from tango import AttrWriteType
 
 # PyTango imports
diff --git a/tangostationcontrol/tangostationcontrol/devices/types.py b/tangostationcontrol/tangostationcontrol/devices/types.py
index 4dfafb6e8..85c9f11a3 100644
--- a/tangostationcontrol/tangostationcontrol/devices/types.py
+++ b/tangostationcontrol/tangostationcontrol/devices/types.py
@@ -25,6 +25,7 @@ class DeviceTypes(str, Enum):
     DigitalBeam = "DigitalBeam"
     Observation = "Observation"
     ObservationControl = "ObservationControl"
+    Metadata = "Metadata"
     PCON = "PCON"
     PSOC = "PSOC"
     RECV = "RECV"
diff --git a/tangostationcontrol/tangostationcontrol/metadata/__init__.py b/tangostationcontrol/tangostationcontrol/metadata/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tangostationcontrol/tangostationcontrol/metadata/metadata_organizer.py b/tangostationcontrol/tangostationcontrol/metadata/metadata_organizer.py
new file mode 100644
index 000000000..423cdd8ae
--- /dev/null
+++ b/tangostationcontrol/tangostationcontrol/metadata/metadata_organizer.py
@@ -0,0 +1,122 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+"""Metadata class organize gathering and managing metadata"""
+import json
+from typing import Any
+
+import numpy
+import tango
+from tango import DeviceProxy
+
+from tangostationcontrol.common.proxy import create_device_proxy
+from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict
+from tangostationcontrol.metadata.metadata_types import metadata_config_type
+from tangostationcontrol.metadata.metadata_types import metadata_proxy_type
+
+
+class MetadataOrganizer:
+    """Organize maintaining of metadata"""
+
+    def __init__(self, config: metadata_config_type):
+        self._data = CaseInsensitiveDict()
+        self._proxies: metadata_proxy_type = CaseInsensitiveDict()
+        self._config: metadata_config_type = config
+        self._class_device_map = CaseInsensitiveDict()
+
+        # self.create_proxies()
+
+    def create_proxies(self) -> metadata_proxy_type:
+        """Create proxies or store creation exception can be called anytime to retry
+
+        The return of this function can be used to register event subscriptions as soon
+        as the creation of a proxy becomes successful. Assuming the return of each call
+        is used this will prevent missing subscriptions but also prevent duplicate
+        subscriptions.
+
+        :return: A dictionary of newly created devices by name or creation exceptions
+        """
+
+        db = tango.Database()
+        new_proxies = CaseInsensitiveDict()
+
+        for device_class in self._config.keys():
+            devices = db.get_device_exported_for_class(device_class)
+            self._class_device_map[device_class] = devices
+            for device in devices:
+                if device in self._proxies and not isinstance(
+                    self._proxies[device], Exception
+                ):
+                    continue
+
+                try:
+                    proxy = create_device_proxy(device)
+                    self._proxies[device] = proxy
+                    new_proxies[device] = proxy
+                except Exception as e:
+                    self._proxies[device] = e
+                    new_proxies[device] = e
+
+        return new_proxies
+
+    @staticmethod
+    def _is_serializable(obj: Any) -> bool:
+        """Test if any object is serializable"""
+
+        try:
+            json.dumps(obj)
+        except TypeError:
+            return False
+
+        return True
+
+    @staticmethod
+    def _convert_object(obj: Any) -> Any:
+        """"""
+
+        if isinstance(obj, numpy.ndarray):
+            return obj.tolist()
+        raise TypeError("Failed to convert object of type %s", type(obj))
+
+    def _convert_if_necessary(self, obj: Any) -> Any:
+        """"""
+
+        if not self._is_serializable(obj):
+            return self._convert_object(obj)
+        return obj
+
+    def gather_metadata(self):
+        """Scrape proxies and ensure all metadata is uptodate"""
+
+        for device_class, attributes in self._config.items():
+            for device_str in self._class_device_map[device_class]:
+                device_proxy = self._proxies[device_str]
+                if isinstance(device_proxy, Exception):
+                    self._data[device_str] = str(device_proxy)
+                    continue
+
+                self._data[device_str] = CaseInsensitiveDict()
+
+                for attribute in attributes:
+                    try:
+                        self._data[device_str][attribute] = self._convert_if_necessary(
+                            getattr(device_proxy, attribute)
+                        )
+                    except Exception as e:
+                        self._data[device_str][attribute] = str(e)
+
+    def partial_update(self, device: DeviceProxy, attribute_name: str, value: Any):
+        """Partially update internally retained data"""
+
+        device_str = device.name()
+        self._data.setdefault(device_str, CaseInsensitiveDict())
+        self._data[device_str][attribute_name] = self._convert_if_necessary(value)
+
+    def get_json(self) -> str:
+        """Convert current data to json"""
+        return json.dumps(
+            {
+                key: dict(value) if isinstance(value, CaseInsensitiveDict) else value
+                for key, value in self._data.items()
+            }
+        )
diff --git a/tangostationcontrol/tangostationcontrol/metadata/metadata_types.py b/tangostationcontrol/tangostationcontrol/metadata/metadata_types.py
new file mode 100644
index 000000000..58e77fcaf
--- /dev/null
+++ b/tangostationcontrol/tangostationcontrol/metadata/metadata_types.py
@@ -0,0 +1,14 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+"""Metadata config type"""
+
+from typing import List, Union
+
+from tango import DeviceProxy
+
+from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict
+
+metadata_config_type = CaseInsensitiveDict[str, List[str]]
+
+metadata_proxy_type = CaseInsensitiveDict[str, Union[DeviceProxy, Exception]]
diff --git a/tangostationcontrol/tangostationcontrol/observation/observation.py b/tangostationcontrol/tangostationcontrol/observation/observation.py
index 48d277434..f0fe738ff 100644
--- a/tangostationcontrol/tangostationcontrol/observation/observation.py
+++ b/tangostationcontrol/tangostationcontrol/observation/observation.py
@@ -2,7 +2,7 @@
 # SPDX-License-Identifier: Apache-2.0
 
 import logging
-from typing import List, Callable
+from typing import List
 import time
 
 from tango import DeviceProxy, Except, EventData
@@ -12,6 +12,8 @@ from tangostationcontrol.configuration import ObservationSettings
 from tangostationcontrol.configuration import ObservationFieldSettings
 from tangostationcontrol.common.lofar_logging import log_exceptions
 from tangostationcontrol.observation.observation_field import ObservationField
+from tangostationcontrol.observation.observation_callback import callback_type
+from tangostationcontrol.observation.observation_callback import default_callback
 
 logger = logging.getLogger()
 
@@ -45,17 +47,20 @@ class Observation(object):
         self,
         tango_domain,
         parameters: ObservationSettings,
-        stop_callback: Callable[[int], None] = None,
+        start_callback: callback_type = None,
+        stop_callback: callback_type = None,
     ):
         self._tango_domain: str = tango_domain
         self._parameters: ObservationSettings = parameters
-        self._stop_callback = lambda obs_id: {}
+
+        self._started_antenna_fields = 0
+        self._start_callback = start_callback or default_callback
+
         self._stopped_antenna_fields = 0
-        if stop_callback:
-            self._stop_callback = stop_callback
+        self._stop_callback = stop_callback or default_callback
 
         for antenna_field in self._parameters.antenna_fields:
-            if not antenna_field.observation_id == self.observation_id:
+            if antenna_field.observation_id != self.observation_id:
                 raise RuntimeError(
                     "Observation configured for different observation IDs across "
                     "antenna fields"
@@ -130,6 +135,10 @@ class Observation(object):
             # Start observation
             self._start_antenna_field(antenna_field)
 
+            if self._started_antenna_fields is len(self.antenna_fields):
+                logger.debug("All antenna fields started, triggering start callback")
+                self._start_callback(self.observation_id)
+
     def create_devices(self):
         """Call create_observation_field_device per ObservationField
 
@@ -202,6 +211,7 @@ class Observation(object):
         """
         for observation_field in self._observation_fields:
             if observation_field.antenna_field == antenna_field:
+                self._started_antenna_fields += 1
                 observation_field.start()
 
     def _stop_antenna_field(self, antenna_field: str):
diff --git a/tangostationcontrol/tangostationcontrol/observation/observation_callback.py b/tangostationcontrol/tangostationcontrol/observation/observation_callback.py
new file mode 100644
index 000000000..043f620ce
--- /dev/null
+++ b/tangostationcontrol/tangostationcontrol/observation/observation_callback.py
@@ -0,0 +1,10 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+from typing import Callable
+
+callback_type = Callable[[int], None]
+
+
+def default_callback(observation_id: int):
+    pass
diff --git a/tangostationcontrol/tangostationcontrol/observation/observation_controller.py b/tangostationcontrol/tangostationcontrol/observation/observation_controller.py
index a062cf0b0..2e1b697a2 100644
--- a/tangostationcontrol/tangostationcontrol/observation/observation_controller.py
+++ b/tangostationcontrol/tangostationcontrol/observation/observation_controller.py
@@ -8,6 +8,9 @@ from typing import Callable, Type
 from tango import DevFailed, Util, Database
 from tangostationcontrol.configuration import ObservationSettings
 from tangostationcontrol.observation.observation import Observation
+from tangostationcontrol.observation.observation_callback import callback_type
+from tangostationcontrol.observation.observation_callback import default_callback
+
 
 logger = logging.getLogger()
 
@@ -16,11 +19,15 @@ class ObservationController(dict[int, Observation]):
     """A dictionary of observations. Actively manages the observation state transitions
     (start, stop)."""
 
-    def __init__(self, tango_domain: str):
+    def __init__(self, tango_domain: str, start_callback: callback_type = None):
         super().__init__()
         self._tango_util = Util.instance()
         self._tango_domain = tango_domain
 
+        self._start_callback = default_callback
+        if start_callback:
+            self._start_callback = start_callback
+
     def _catch(self, func: Callable, exception: Type[Exception]):
         try:
             return func()
@@ -28,6 +35,17 @@ class ObservationController(dict[int, Observation]):
             logger.warning("Executing %s failed due to %s", func, e)
             return None
 
+    def _stop_callback(self, observation_id: int):
+        try:
+            obs = self[observation_id]
+            obs.destroy_devices()
+            self.pop(observation_id)
+        except Exception as e:
+            logger.error(
+                "Cleanup callback failed for observation %d due to %s",
+                (observation_id, e),
+            )
+
     @property
     def running_observations(self) -> list[int]:
         return [
@@ -69,7 +87,12 @@ class ObservationController(dict[int, Observation]):
                     f"past its stop time {observation_field_settings.stop_time}"
                 )
 
-        obs = Observation(self._tango_domain, settings, self.pop)
+        obs = Observation(
+            tango_domain=self._tango_domain,
+            parameters=settings,
+            start_callback=self._start_callback,
+            stop_callback=self._stop_callback,
+        )
 
         try:
             obs.create_devices()
@@ -139,10 +162,6 @@ class ObservationController(dict[int, Observation]):
         db = Database()
         devices = db.get_device_exported_for_class(ObservationField.__name__)
         for device in devices:
-            # if CaseInsensitiveString(device) == CaseInsensitiveString(
-            #     "Stat/ObservationField/1"
-            # ):
-            #     continue
             try:
                 db.delete_device(device)
                 logger.warning("Destroyed lingering device: %s", device)
diff --git a/tangostationcontrol/tangostationcontrol/zeromq/__init__.py b/tangostationcontrol/tangostationcontrol/zeromq/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tangostationcontrol/tangostationcontrol/zeromq/pipe.py b/tangostationcontrol/tangostationcontrol/zeromq/pipe.py
new file mode 100644
index 000000000..149946597
--- /dev/null
+++ b/tangostationcontrol/tangostationcontrol/zeromq/pipe.py
@@ -0,0 +1,22 @@
+import binascii
+import os
+from typing import Tuple
+
+import zmq
+
+
+def zpipe(ctx) -> Tuple[zmq.Socket, zmq.Socket]:
+    """build inproc pipe for talking to threads
+
+    mimic pipe used in czmq zthread_fork.
+
+    Returns a pair of PAIRs connected via inproc
+    """
+    a = ctx.socket(zmq.PAIR)
+    b = ctx.socket(zmq.PAIR)
+    a.linger = b.linger = 0
+    a.hwm = b.hwm = 1
+    iface = "inproc://%s" % binascii.hexlify(os.urandom(8))
+    a.bind(iface)
+    b.connect(iface)
+    return a, b
diff --git a/tangostationcontrol/tangostationcontrol/zeromq/publisher.py b/tangostationcontrol/tangostationcontrol/zeromq/publisher.py
new file mode 100644
index 000000000..fbfcb60f4
--- /dev/null
+++ b/tangostationcontrol/tangostationcontrol/zeromq/publisher.py
@@ -0,0 +1,135 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+"""Base class for ZMQ publishers"""
+
+from datetime import datetime
+from datetime import timezone
+import logging
+import queue
+from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import Future
+from typing import List, Callable, Union
+
+import zmq
+
+logger = logging.getLogger()
+
+__all__ = ["ZeroMQPublisher"]
+
+
+class ZeroMQPublisher:
+    """Base class for ZMQ publishers"""
+
+    def __init__(
+        self,
+        bind_uri: str,
+        topics: Union[List[bytearray], List[str]],
+        queue_size: int = 100,
+    ):
+        """
+        param bind_uri: uri to bind of pattern protocol://ip:port
+        param topics: List of topics to publish to, for bytearray use str.encode()
+        """
+        self._queue = queue.Queue(maxsize=queue_size)
+        self._ctx = zmq.Context.instance()
+        self._publisher = self._ctx.socket(zmq.PUB)
+
+        if len(topics) > 0 and isinstance(topics[0], str):
+            self._topics = [topic.encode() for topic in topics]
+        else:
+            self._topics = topics
+
+        self._publisher.bind(bind_uri)
+        self._is_running = False
+        self._is_stopping = False
+        self._thread = ThreadPoolExecutor(max_workers=1)
+        self._future = self._thread.submit(self._run)
+
+    def __del__(self):
+        self.shutdown()
+
+    @staticmethod
+    def contstruct_bind_uri(protocol: str, bind: str, port: Union[str, int]) -> str:
+        """Combine parameters into a full bind uri for ZeroMQ"""
+        if isinstance(port, int):
+            port = str(port)
+        return f"{protocol}://{bind}:{port}"
+
+    @property
+    def is_stopping(self):
+        """If the request has been made to stop the publisher
+
+        Remains true even after fully stopping
+        """
+        return self._is_stopping
+
+    @property
+    def is_running(self):
+        """If the publisher has started"""
+        # don't use self._future.is_running, returns false if thread sleeps ;)
+        return self._is_running
+
+    @property
+    def is_done(self) -> bool:
+        """If the publisher has fully stopped"""
+        return self._future.done()
+
+    def get_result(self, timeout=None):
+        self._future.result(timeout=timeout)
+
+    def get_exception(self, timeout=None):
+        self._future.exception(timeout=timeout)
+
+    def register_callback(self, fn: Callable[[Future], None]):
+        self._future.add_done_callback(fn)
+
+    @property
+    def queue_fill(self) -> int:
+        return self._queue.unfinished_tasks
+
+    @property
+    def queue_size(self) -> int:
+        return self._queue.maxsize
+
+    def _run(self):
+        self._is_running = True
+        logger.info("Publisher thread: %s starting", self)
+        while not self._is_stopping:
+            try:
+                msg = self._queue.get(timeout=1)
+                now = datetime.now().astimezone(tz=timezone.utc).isoformat()
+                for topic in self._topics:
+                    msg = [topic, now.encode("utf-8"), f"{msg}".encode("utf-8")]
+                    logger.debug("Publisher send: %s", msg)
+                    self._publisher.send_multipart(msg)
+            except queue.Empty:
+                logger.debug("Get empty")
+                continue
+            except zmq.ZMQError as e:
+                if e.errno != zmq.ETERM:
+                    self._stop()
+                    raise e
+            except KeyboardInterrupt as e:
+                self._stop()
+                raise e
+        self._stop()
+
+    def _stop(self):
+        """Internal function to handle stopping"""
+        self._publisher.close()
+        logger.info("Terminated thread of %s", self)
+        self._is_running = False
+
+    def shutdown(self):
+        """External function to request stopping / shutdown"""
+        logger.debug("Request to stop thread of %s", self)
+        self._is_stopping = True
+        self._thread.shutdown()
+
+    def send(self, msg):
+        """
+        param msg: The message to enqueue for transmission
+        raises FullException: If the message could not be enqueued
+        """
+        self._queue.put_nowait(msg)
diff --git a/tangostationcontrol/tangostationcontrol/zeromq/subscriber.py b/tangostationcontrol/tangostationcontrol/zeromq/subscriber.py
new file mode 100644
index 000000000..97edd0967
--- /dev/null
+++ b/tangostationcontrol/tangostationcontrol/zeromq/subscriber.py
@@ -0,0 +1,49 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+"""Base class for ZMQ subscribers"""
+
+import abc
+import logging
+from typing import List
+
+import zmq
+
+logger = logging.getLogger()
+
+__all__ = ["ZeroMQSubscriber"]
+
+
+class ZeroMQSubscriber(abc.ABC):
+    """Base class for ZMQ subscribers"""
+
+    def __init__(self, connect_uri: str, topics: List[bytearray]):
+        """
+
+        param connect_uri: uri of pattern protocol://fqdn:port
+        param topics: List of topics to subscribe to, must be bytearray use str.encode()
+        """
+        self._ctx = zmq.Context.instance()
+        self._subscriber = self._ctx.socket(zmq.SUB)
+        self._topics = topics
+
+        self._subscriber.connect(connect_uri)
+        for topic in self._topics:
+            self._subscriber.setsockopt(zmq.SUBSCRIBE, topic)
+
+    def run(self):
+        while True:
+            try:
+                msg = self._subscriber.recv_multipart()
+                self.handle_message(msg)
+            except zmq.ZMQError as e:
+                if e.errno == zmq.ETERM:
+                    break  # Interrupted
+                else:
+                    raise
+
+    @abc.abstractmethod
+    def handle_message(self, msg):
+        raise NotImplementedError(
+            f"Method: {self.handle_message} of {self} not implemented!"
+        )
diff --git a/tangostationcontrol/test/devices/test_metadata_device.py b/tangostationcontrol/test/devices/test_metadata_device.py
new file mode 100644
index 000000000..0410097b5
--- /dev/null
+++ b/tangostationcontrol/test/devices/test_metadata_device.py
@@ -0,0 +1,94 @@
+# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+from unittest.mock import ANY, patch
+from unittest import mock
+from unittest.mock import call
+
+from tango import DevFailed, DeviceProxy
+from tango.test_context import DeviceTestContext
+
+from tangostationcontrol.devices.base_device_classes import lofar_device
+from tangostationcontrol.devices import metadata
+
+from test.devices import device_base
+
+
+@patch("tango.Database")
+class TestMetadataDevice(device_base.DeviceTestCase):
+
+    TEST_PROPERTIES = {
+        "metadata_port": "6001",
+    }
+
+    def setUp(self):
+        super(TestMetadataDevice, self).setUp()
+
+    def test_start_stop(self, tango_db):
+        with DeviceTestContext(
+            metadata.Metadata,
+            properties=self.TEST_PROPERTIES,
+            process=False,
+            timeout=10,
+        ) as proxy:
+            proxy.initialise()
+            proxy.on()
+
+            self.assertTrue(proxy.is_running_R)
+            self.assertFalse(proxy.is_stopping_R)
+            self.assertAlmostEqual(0.0, proxy.queue_fill_percentage_R)
+
+            proxy.off()
+
+            self.assertFalse(proxy.is_running_R)
+            self.assertFalse(proxy.is_stopping_R)
+            self.assertAlmostEqual(0.0, proxy.queue_fill_percentage_R)
+
+    @mock.patch.object(metadata, "ZeroMQPublisher", autospec=True)
+    def test_stop_timeout_exception(self, m_publisher, tango_db):
+        """Test that publisher threads failing to stop don't block forever"""
+
+        m_publisher.return_value.is_done = False
+
+        with DeviceTestContext(
+            metadata.Metadata,
+            properties=self.TEST_PROPERTIES,
+            process=False,
+            timeout=10,
+        ) as proxy:
+            proxy.initialise()
+            proxy.on()
+
+            self.assertRaises(DevFailed, proxy.off)
+
+    @mock.patch.object(lofar_device, "EventSubscriptions", autospec=True)
+    def test_register_change_events(self, m_events, tango_db):
+        """Test that  for each device attribute pair change events are registered"""
+
+        tango_db.return_value.get_device_exported_for_class.side_effect = lambda cls: [
+            f"stat/{cls.casefold()}/1"
+        ]
+
+        m_devices = []
+        t_calls = []
+        for device_cls, attributes in metadata.Metadata.METADATA_CONFIG.items():
+            m_device = mock.Mock(spec=DeviceProxy)
+            m_device.name.return_value = f"stat/{device_cls.casefold()}/1"
+            m_device.info.return_value.dev_class = device_cls
+            m_devices.append(m_device)
+            for attribute_name in attributes:
+                t_calls.append(call(m_device, attribute_name, ANY, ANY))
+
+        with DeviceTestContext(
+            metadata.Metadata,
+            properties=self.TEST_PROPERTIES,
+            process=False,
+            timeout=10,
+        ) as proxy:
+            self.device_proxy_mock["object"].side_effect = m_devices
+
+            proxy.boot()
+
+            m_events.return_value.subscribe_change_event.assert_has_calls(t_calls)
+
+            proxy.send_metadata()
diff --git a/tangostationcontrol/test/devices/test_observation_control.py b/tangostationcontrol/test/devices/test_observation_control.py
new file mode 100644
index 000000000..878d2ebe1
--- /dev/null
+++ b/tangostationcontrol/test/devices/test_observation_control.py
@@ -0,0 +1,51 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+from unittest import mock
+
+from numpy import testing
+
+from tango import DevState
+from tango.test_context import DeviceTestContext
+
+from tangostationcontrol.devices import observation_control
+from tangostationcontrol.devices.base_device_classes import lofar_device
+from tangostationcontrol.observation import observation_controller
+
+from test.devices import device_base
+
+
+class TestObservationControlDevice(device_base.DeviceTestCase):
+    """Unit test class for device ObservationControl"""
+
+    def setUp(self):
+        super(TestObservationControlDevice, self).setUp()
+
+        proxy_patcher = mock.patch.object(
+            lofar_device, "ControlHierarchyDevice", autospec=True
+        )
+        self.m_control = proxy_patcher.start()
+        self.addCleanup(proxy_patcher.stop)
+
+        self.m_db = self.database_proxy_patch(observation_controller)
+
+    def test_init_on(self):
+        """Test device initialization and defaults"""
+
+        with DeviceTestContext(
+            observation_control.ObservationControl,
+            process=False,
+            timeout=10,
+        ) as proxy:
+            proxy.boot()
+
+            self.assertEqual(proxy.state(), DevState.ON)
+
+            testing.assert_equal([], proxy.observations_R)
+            testing.assert_equal([], proxy.running_observations_R)
+            testing.assert_equal([], proxy.active_antenna_fields_R)
+
+            self.assertFalse(proxy.is_any_observation_running())
+            self.assertFalse(proxy.is_antenna_field_active("HBA"))
+            self.assertFalse(proxy.is_antenna_field_active("HBA0"))
+            self.assertFalse(proxy.is_antenna_field_active("HBA1"))
+            self.assertFalse(proxy.is_antenna_field_active("LBA"))
diff --git a/tangostationcontrol/test/devices/test_observation_control_device.py b/tangostationcontrol/test/devices/test_observation_control_device.py
deleted file mode 100644
index 535ebe62d..000000000
--- a/tangostationcontrol/test/devices/test_observation_control_device.py
+++ /dev/null
@@ -1,11 +0,0 @@
-# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
-# SPDX-License-Identifier: Apache-2.0
-
-from test import base
-
-
-class TestObservationControlDevice(base.TestCase):
-    def setUp(self):
-        super(TestObservationControlDevice, self).setUp()
-
-    # TODO(Corne): Either make this do something or remove this file, this makes 0 sense
diff --git a/tangostationcontrol/test/devices/test_observation_device.py b/tangostationcontrol/test/devices/test_observation_device.py
deleted file mode 100644
index c70e7f8fc..000000000
--- a/tangostationcontrol/test/devices/test_observation_device.py
+++ /dev/null
@@ -1,12 +0,0 @@
-# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
-# SPDX-License-Identifier: Apache-2.0
-
-from test.devices import device_base
-
-
-class TestObservationDevice(device_base.DeviceTestCase):
-    def setUp(self):
-        # DeviceTestCase setUp patches lofar_device DeviceProxy
-        super(TestObservationDevice, self).setUp()
-
-    # TODO(Corne): Either make this do something or remove this file, this makes 0 sense
diff --git a/tangostationcontrol/test/devices/test_device_temperature_manager.py b/tangostationcontrol/test/devices/test_temperature_manager_device.py
similarity index 100%
rename from tangostationcontrol/test/devices/test_device_temperature_manager.py
rename to tangostationcontrol/test/devices/test_temperature_manager_device.py
diff --git a/tangostationcontrol/test/metadata/__init__.py b/tangostationcontrol/test/metadata/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tangostationcontrol/test/metadata/test_metadata_organizer.py b/tangostationcontrol/test/metadata/test_metadata_organizer.py
new file mode 100644
index 000000000..c6856e456
--- /dev/null
+++ b/tangostationcontrol/test/metadata/test_metadata_organizer.py
@@ -0,0 +1,277 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+import logging
+from unittest import mock
+from unittest.mock import patch
+
+import numpy as np
+from tango import DevFailed
+
+from tangostationcontrol.common.case_insensitive_dict import CaseInsensitiveDict
+from tangostationcontrol.metadata import metadata_organizer
+
+from test import base
+
+logger = logging.getLogger()
+
+
+@patch("tango.Database")
+class TestMetadataOrganizer(base.TestCase):
+
+    def setUp(self):
+        super(TestMetadataOrganizer, self).setUp()
+
+        proxy_patcher = mock.patch.object(
+            metadata_organizer, "create_device_proxy", autospec=True
+        )
+        self.m_proxy = proxy_patcher.start()
+        self.addCleanup(proxy_patcher.stop)
+
+    def test_create_proxies(self, tango_db):
+        """Test proxy creation and storage access"""
+
+        tango_db.return_value.get_device_exported_for_class.return_value = [
+            "STAT/device/1",
+            "STAT/Device/2",
+        ]
+
+        t_organizer = metadata_organizer.MetadataOrganizer(
+            CaseInsensitiveDict({"Device": []})
+        )
+        t_organizer.create_proxies()
+
+        self.assertEqual(2, self.m_proxy.call_count)
+        self.assertIn("STAT/DEVICE/1", t_organizer._proxies.keys())
+        self.assertIn("STAT/DEVICE/2", t_organizer._proxies.keys())
+
+    def test_create_proxy_exception(self, tango_db):
+        """Test creating the proxies but some fail"""
+
+        tango_db.return_value.get_device_exported_for_class.return_value = [
+            "STAT/device/1",
+            "STAT/Device/2",
+        ]
+
+        self.m_proxy.side_effect = [mock.Mock(), DevFailed("arggg")]
+
+        t_organizer = metadata_organizer.MetadataOrganizer(
+            CaseInsensitiveDict({"Device": []})
+        )
+        t_organizer.create_proxies()
+
+        self.assertEqual(2, self.m_proxy.call_count)
+        self.assertIn("STAT/DEVICE/1", t_organizer._proxies.keys())
+        self.assertTrue(isinstance(t_organizer._proxies["STAT/DEVICE/2"], DevFailed))
+
+    def test_gather_metadata(self, tango_db):
+        """Test gathering different types of data"""
+        tango_db.return_value.get_device_exported_for_class.side_effect = lambda dc: (
+            ["stat/DEVICE/1"] if dc.casefold() == "device1" else ["STAT/device/2"]
+        )
+
+        device_1_attributes = {
+            "antenna_names_R": ["antenna_1", "antenna_2"],
+            "observation_id_R": 125434612735345,
+        }
+        device_2_attributes = {
+            "subband_weights_R": np.ndarray(shape=(2, 2), dtype=float)
+        }
+
+        m_proxy_1 = mock.Mock(**device_1_attributes)
+
+        m_proxy_2 = mock.Mock(**device_2_attributes)
+
+        self.m_proxy.side_effect = [m_proxy_1, m_proxy_2]
+
+        t_organizer = metadata_organizer.MetadataOrganizer(
+            CaseInsensitiveDict(
+                {
+                    "device1": ["antenna_names_R", "observation_id_R"],
+                    "Device2": ["subband_weights_R"],
+                }
+            )
+        )
+        t_organizer.create_proxies()
+
+        t_organizer.gather_metadata()
+
+        self.assertIn("stat/device/1", t_organizer._data.keys())
+        self.assertIn("stat/DEVICE/2", t_organizer._data.keys())
+
+        self.assertEqual(
+            t_organizer._data["stat/device/1"]["antenna_names_R"],
+            device_1_attributes["antenna_names_R"],
+        )
+        self.assertEqual(
+            t_organizer._data["stat/device/1"]["observation_id_R"],
+            device_1_attributes["observation_id_R"],
+        )
+
+        np.testing.assert_almost_equal(
+            t_organizer._data["stat/device/2"]["subband_weights_R"],
+            device_2_attributes["subband_weights_R"].tolist(),
+        )
+
+    def test_gather_metadata_exceptions(self, tango_db):
+        """Failed proxies and conversion errors should put exceptions into data"""
+
+        tango_db.return_value.get_device_exported_for_class.return_value = [
+            "STAT/device/1",
+            "STAT/device/2",
+        ]
+        m_except = DevFailed("arggg")
+
+        device_1_attributes = {"antenna_names_R": CaseInsensitiveDict()}
+
+        self.m_proxy.side_effect = [mock.Mock(**device_1_attributes), m_except]
+
+        t_organizer = metadata_organizer.MetadataOrganizer(
+            CaseInsensitiveDict({"Device1": ["antenna_names_R"]})
+        )
+        t_organizer.create_proxies()
+
+        t_organizer.gather_metadata()
+
+        self.assertIn("stat/device/1", t_organizer._data.keys())
+        self.assertIn("stat/DEVICE/2", t_organizer._data.keys())
+        self.assertEqual(t_organizer._data["stat/device/2"], str(m_except))
+        self.assertIn(
+            "Failed to convert object",
+            t_organizer._data["stat/device/1"]["antenna_names_R"],
+        )
+
+    def test_partial_update(self, tango_db):
+        """Update several attributes in isolation"""
+
+        t_organizer = metadata_organizer.MetadataOrganizer(
+            CaseInsensitiveDict({"Device": []})
+        )
+
+        m_proxy = mock.Mock()
+        m_proxy.name.return_value = "stat/DEVICE/1"
+
+        t_organizer.partial_update(
+            m_proxy, "antenna_names_R", ["antenna_1", "antenna_2"]
+        )
+
+        self.assertEqual(
+            CaseInsensitiveDict({"antenna_names_R": ["antenna_1", "antenna_2"]}),
+            t_organizer._data["stat/device/1"],
+        )
+
+        t_organizer.partial_update(
+            m_proxy, "antenna_names_R", ["antenna_3", "antenna_4"]
+        )
+
+        self.assertEqual(
+            CaseInsensitiveDict({"antenna_names_R": ["antenna_3", "antenna_4"]}),
+            t_organizer._data["stat/device/1"],
+        )
+
+    def test_get_json(self, _):
+        """Test converting variety of data products to json"""
+
+        t_organizer = metadata_organizer.MetadataOrganizer(
+            config=CaseInsensitiveDict({})
+        )
+        t_organizer._data["stat/device/1"] = {}
+        t_organizer._data["stat/device/1"]["antenna_names_R"] = [
+            "antenna_1",
+            "antenna_2",
+        ]
+        t_organizer._data["stat/device/1"]["subband_weights_R"] = np.ndarray(
+            shape=(2, 2), dtype=float
+        ).tolist()
+        t_organizer._data["stat/device/1"]["observation_id_R"] = 125434612735345
+
+        t_result = t_organizer.get_json()
+
+        self.assertIn("stat/device/1", t_result)
+        self.assertIn("antenna_names_R", t_result)
+        self.assertIn("subband_weights_R", t_result)
+        self.assertIn("observation_id_R", t_result)
+        self.assertIn("antenna_1", t_result)
+        self.assertIn("125434612735345", t_result)
+
+    def test_end_to_end(self, tango_db):
+        """Test the complete chain of functionality"""
+
+        tango_db.return_value.get_device_exported_for_class.side_effect = lambda dc: (
+            ["stat/DEVICE/1"]
+            if dc.casefold() == "device1"
+            else (
+                ["STAT/device/2"] if dc.casefold() == "device2" else ["STAT/device/3"]
+            )
+        )
+
+        m_except = DevFailed("arggg")
+
+        device_1_attributes = {
+            "antenna_names_R": ["antenna_1", "antenna_2"],
+            "observation_id_R": 125434612735345,
+        }
+        device_2_attributes = {
+            "subband_weights_R": np.ndarray(shape=(2, 2), dtype=float),
+            "unconvertable_type_R": CaseInsensitiveDict(),
+        }
+
+        m_proxy_1 = mock.Mock(**device_1_attributes)
+
+        m_proxy_2 = mock.Mock(**device_2_attributes)
+
+        self.m_proxy.side_effect = [m_proxy_1, m_proxy_2, m_except]
+
+        t_organizer = metadata_organizer.MetadataOrganizer(
+            CaseInsensitiveDict(
+                {
+                    "device1": ["antenna_names_R", "observation_id_R"],
+                    "device2": ["subband_weights_R", "unconvertable_type_R"],
+                    "device3": [],
+                }
+            )
+        )
+        t_organizer.create_proxies()
+
+        t_organizer.gather_metadata()
+
+        self.assertEqual(
+            t_organizer._data["stat/device/1"]["antenna_names_R"],
+            device_1_attributes["antenna_names_R"],
+        )
+
+        np.testing.assert_almost_equal(
+            t_organizer._data["stat/device/2"]["subband_weights_R"],
+            device_2_attributes["subband_weights_R"].tolist(),
+        )
+
+        self.assertEqual(t_organizer._data["stat/device/3"], str(m_except))
+        self.assertIn(
+            "Failed to convert object",
+            t_organizer._data["stat/device/2"]["unconvertable_type_R"],
+        )
+
+        m_proxy = mock.Mock()
+        m_proxy.name.return_value = "stat/DEVICE/1"
+        t_organizer.partial_update(
+            m_proxy, "antenna_names_R", ["antenna_3", "antenna_4"]
+        )
+
+        self.assertEqual(
+            CaseInsensitiveDict(
+                {
+                    "antenna_names_R": ["antenna_3", "antenna_4"],
+                    "observation_id_R": 125434612735345,
+                }
+            ),
+            t_organizer._data["stat/device/1"],
+        )
+
+        t_result = t_organizer.get_json()
+
+        self.assertIn("stat/device/1", t_result)
+        self.assertIn("antenna_names_r", t_result)
+        self.assertIn("subband_weights_r", t_result)
+        self.assertIn("observation_id_r", t_result)
+        self.assertIn("antenna_3", t_result)
+        self.assertIn("125434612735345", t_result)
diff --git a/tangostationcontrol/test/metrics/__init__.py b/tangostationcontrol/test/metrics/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tangostationcontrol/test/observation/test_observation.py b/tangostationcontrol/test/observation/test_observation.py
index fae464f01..241326069 100644
--- a/tangostationcontrol/test/observation/test_observation.py
+++ b/tangostationcontrol/test/observation/test_observation.py
@@ -10,6 +10,7 @@ from tangostationcontrol.observation.observation import Observation
 from tangostationcontrol.observation import observation_field
 from tangostationcontrol.test.dummy_observation_settings import (
     get_observation_settings_two_fields,
+    get_observation_settings_hba_immediate,
 )
 
 from test import base
@@ -110,13 +111,13 @@ class TestObservation(base.TestCase):
             obs_field.subscribe.assert_called_once_with(sut.observation_callback)
 
     def test_update_observation_state(self, tu_mock):
-        sut = Observation("DMR", get_observation_settings_two_fields())
+        sut = Observation("DMR", get_observation_settings_hba_immediate())
 
         sut._stop_antenna_field = mock.Mock()
         sut._start_antenna_field = mock.Mock()
 
         m_device_proxy = mock.Mock(
-            antenna_field_R="HBA",
+            antenna_field_R="HBA0",
             start_time_R=datetime.datetime.now().timestamp(),
             stop_time_R=(
                 datetime.datetime.now() + datetime.timedelta(hours=1)
@@ -125,13 +126,79 @@ class TestObservation(base.TestCase):
             state=mock.Mock(return_value=DevState.STANDBY),
         )
 
+        sut._started_antenna_fields = 1  # test default lambda callback
         sut._update_observation_state(m_device_proxy)
         sut._start_antenna_field.assert_called_once_with(m_device_proxy.antenna_field_R)
 
+        sut._stopped_antenna_fields = 1  # test default lambda callback
         m_device_proxy.stop_time_R = datetime.datetime.now().timestamp()
         sut._update_observation_state(m_device_proxy)
         sut._stop_antenna_field.assert_called_once_with(m_device_proxy.antenna_field_R)
 
+    def test_observation_callback(self, tu_mock):
+        """Test the start / stop observation callbacks"""
+        t_params = get_observation_settings_hba_immediate()
+
+        t_callback = mock.Mock()
+
+        sut = Observation(
+            tango_domain="DMR",
+            parameters=t_params,
+            start_callback=t_callback,
+            stop_callback=t_callback,
+        )
+        sut._stop_antenna_field = mock.Mock()
+        sut._start_antenna_field = mock.Mock()
+
+        m_device_proxy = mock.Mock(
+            antenna_field_R="HBA",
+            start_time_R=datetime.datetime.now().timestamp(),
+            stop_time_R=(
+                datetime.datetime.now() + datetime.timedelta(hours=1)
+            ).timestamp(),
+            lead_time_R=5,
+            state=mock.Mock(return_value=DevState.STANDBY),
+        )
+
+        sut._started_antenna_fields = 1
+        sut._update_observation_state(m_device_proxy)
+        t_callback.assert_called_once_with(t_params.antenna_fields[0].observation_id)
+
+        sut._stopped_antenna_fields = 1
+        m_device_proxy.stop_time_R = datetime.datetime.now().timestamp()
+        sut._update_observation_state(m_device_proxy)
+        self.assertEqual(2, t_callback.call_count)
+
+    def bound_callback(self, obs_id: int):
+        """Test passing bound (self) callback"""
+        self.assertEqual(
+            get_observation_settings_hba_immediate().antenna_fields[0].observation_id,
+            obs_id,
+        )
+
+    def test_bound_observation_callback(self, tu_mock):
+        """Test calling bound callback"""
+        sut = Observation(
+            tango_domain="DMR",
+            parameters=get_observation_settings_hba_immediate(),
+            start_callback=self.bound_callback,
+        )
+        sut._stop_antenna_field = mock.Mock()
+        sut._start_antenna_field = mock.Mock()
+
+        m_device_proxy = mock.Mock(
+            antenna_field_R="HBA",
+            start_time_R=datetime.datetime.now().timestamp(),
+            stop_time_R=(
+                datetime.datetime.now() + datetime.timedelta(hours=1)
+            ).timestamp(),
+            lead_time_R=5,
+            state=mock.Mock(return_value=DevState.STANDBY),
+        )
+
+        sut._started_antenna_fields = 1
+        sut._update_observation_state(m_device_proxy)
+
     @mock.patch.object(observation_field, "ObservationField", autospec=True)
     def test_create_subscriptions_failed(self, m_obs_field, tu_mock):
         sut = self.mocked_observation_field_base(m_obs_field)
diff --git a/tangostationcontrol/test/observation/test_observation_controller.py b/tangostationcontrol/test/observation/test_observation_controller.py
index 3be2bb257..a83ee9a39 100644
--- a/tangostationcontrol/test/observation/test_observation_controller.py
+++ b/tangostationcontrol/test/observation/test_observation_controller.py
@@ -133,11 +133,16 @@ class TestObservationController(base.TestCase):
 
         sut = obs_module.ObservationController("DMR")
 
-        self.m_observation.return_value.observation_id = "5"
+        self.m_observation.return_value.observation_id = 5
 
         sut.add_observation(settings)
 
-        self.m_observation.assert_called_once_with("DMR", settings, sut.pop)
+        self.m_observation.assert_called_once_with(
+            tango_domain="DMR",
+            parameters=settings,
+            start_callback=sut._start_callback,
+            stop_callback=sut._stop_callback,
+        )
         self.m_observation.return_value.create_devices.assert_called_once()
         self.m_observation.return_value.initialise_observation.assert_called_once()
         self.m_observation.return_value.create_subscriptions.assert_called_once()
@@ -148,6 +153,25 @@ class TestObservationController(base.TestCase):
             self.m_observation.return_value,
         )
 
+    def test_stop_callback(self, _m_tango_util):
+        """Test that the _stop_callback correctly cleans up observations"""
+
+        settings = get_observation_settings_two_fields()
+        for antenna_field in settings.antenna_fields:
+            antenna_field.stop_time = (datetime.now() + timedelta(days=1)).isoformat()
+
+        self.m_observation.return_value.observation_id = 5
+
+        sut = obs_module.ObservationController("DMR")
+        sut.add_observation(settings)
+
+        self.assertEqual(1, len(sut))
+
+        sut._stop_callback(settings.antenna_fields[0].observation_id)
+
+        self.m_observation.return_value.destroy_devices.assert_called_once()
+        self.assertEqual(0, len(sut))
+
     def test_add_observation_failed(self, _m_tango_util):
         settings = get_observation_settings_two_fields()
         for antenna_field in settings.antenna_fields:
@@ -155,7 +179,7 @@ class TestObservationController(base.TestCase):
 
         sut = obs_module.ObservationController("DMR")
 
-        self.m_observation.return_value.observation_id = "5"
+        self.m_observation.return_value.observation_id = 5
         self.m_observation.return_value.create_devices.side_effect = [DevFailed]
 
         self.assertRaises(RuntimeError, sut.add_observation, settings)
@@ -165,11 +189,11 @@ class TestObservationController(base.TestCase):
     def test_start_observation(self, _m_tango_util):
         sut = obs_module.ObservationController("DMR")
 
-        sut["5"] = mock.Mock()
+        sut[5] = mock.Mock()
 
-        sut.start_observation("5")
+        sut.start_observation(5)
 
-        sut["5"].start.assert_called_once()
+        sut[5].start.assert_called_once()
 
     def test_start_observation_key_error(self, _m_tango_util):
         sut = obs_module.ObservationController("DMR")
diff --git a/tangostationcontrol/test/zeromq/__init__.py b/tangostationcontrol/test/zeromq/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tangostationcontrol/test/zeromq/test_configdb.json b/tangostationcontrol/test/zeromq/test_configdb.json
new file mode 100644
index 000000000..9b83cefad
--- /dev/null
+++ b/tangostationcontrol/test/zeromq/test_configdb.json
@@ -0,0 +1,1004 @@
+{
+    "servers": {
+        "Boot": {
+            "STAT": {
+                "Boot": {
+                    "STAT/Boot/1": {
+                        "properties": {
+                            "Initialise_Hardware": [
+                                "False"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "APSCT": {
+            "STAT": {
+                "APSCT": {
+                    "STAT/APSCT/L0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "apsct-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4843"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/APSCT/L1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "apsct-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4843"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/APSCT/H0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "apsct-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4843"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "CCD": {
+            "STAT": {
+                "CCD": {
+                    "STAT/CCD/1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "ccd-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4843"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "EC": {
+            "STAT": {
+                "EC": {
+                    "STAT/EC/1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "ec-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "OPC_Node_Path_Prefix": [
+                              "3:ServerInterfaces",
+                              "4:Environmental_Control"
+                            ],
+                            "OPC_namespace": [
+                              "http://Environmental_Control"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "APSPU": {
+            "STAT": {
+                "APSPU": {
+                    "STAT/APSPU/L0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "apspu-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4842"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/APSPU/L1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "apspu-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4842"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/APSPU/H0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "apspu-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4842"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "Beamlet": {
+            "STAT": {
+                "Beamlet": {
+                    "STAT/Beamlet/LBA": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_beamlet_output_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_beamlet_output_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    },
+                    "STAT/Beamlet/HBA0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_beamlet_output_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_beamlet_output_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    },
+                    "STAT/Beamlet/HBA1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_beamlet_output_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_beamlet_output_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "DigitalBeam": {
+            "STAT": {
+                "DigitalBeam": {
+                    "STAT/DigitalBeam/LBA": {
+                        "properties": {
+                        }
+                    },
+                    "STAT/DigitalBeam/HBA0": {
+                        "properties": {
+                        }
+                    },
+                    "STAT/DigitalBeam/HBA1": {
+                        "properties": {
+                        }
+                    }
+                }
+            }
+        },
+        "TemperatureManager": {
+            "STAT": {
+                "TemperatureManager": {
+                    "STAT/TemperatureManager/1": {
+                        "properties": {
+                        }
+                    }
+                }
+            }
+        },
+        "PCON": {
+            "STAT": {
+                "PCON": {
+                    "STAT/PCON/1": {
+                        "properties": {
+                            "SNMP_use_simulators": [
+                                "True"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "PSOC": {
+            "STAT": {
+                "PSOC": {
+                    "STAT/PSOC/1": {
+                        "properties": {
+                            "SNMP_use_simulators": [
+                                "True"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "RECVH": {
+            "STAT": {
+                "RECVH": {
+                    "STAT/RECVH/H0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "recvh-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4844"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "RECVL": {
+            "STAT": {
+                "RECVL": {
+                    "STAT/RECVL/L1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "recvl-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4845"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/RECVL/L0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "recvl-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4845"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "SDPFirmware": {
+            "STAT": {
+                "SDPFirmware": {
+                    "STAT/SDPFirmware/LBA": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/SDPFirmware/HBA0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/SDPFirmware/HBA1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "SDP": {
+            "STAT": {
+                "SDP": {
+                    "STAT/SDP/LBA": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/SDP/HBA0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/SDP/HBA1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "BST": {
+            "STAT": {
+                "BST": {
+                    "STAT/BST/LBA": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_bst_offload_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_bst_offload_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    },
+                    "STAT/BST/HBA0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_bst_offload_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_bst_offload_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    },
+                    "STAT/BST/HBA1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_bst_offload_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_bst_offload_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "SST": {
+            "STAT": {
+                "SST": {
+                    "STAT/SST/LBA": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_sst_offload_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_sst_offload_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    },
+                    "STAT/SST/HBA0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_sst_offload_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_sst_offload_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    },
+                    "STAT/SST/HBA1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_sst_offload_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_sst_offload_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "XST": {
+            "STAT": {
+                "XST": {
+                    "STAT/XST/LBA": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_xst_offload_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_xst_offload_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    },
+                    "STAT/XST/HBA0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_xst_offload_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_xst_offload_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    },
+                    "STAT/XST/HBA1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "sdptr-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4840"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ],
+                            "FPGA_xst_offload_hdr_eth_destination_mac_RW_default": [
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB",
+                                "01:23:45:67:89:AB"
+                            ],
+                            "FPGA_xst_offload_hdr_ip_destination_address_RW_default": [
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1",
+                                "127.0.0.1"
+                            ]
+                        }
+                    }
+                }
+            }
+        },
+        "UNB2": {
+            "STAT": {
+                "UNB2": {
+                    "STAT/UNB2/L0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "unb2-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4841"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/UNB2/L1": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "unb2-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4841"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    },
+                    "STAT/UNB2/H0": {
+                        "properties": {
+                            "OPC_Server_Name": [
+                                "unb2-sim"
+                            ],
+                            "OPC_Server_Port": [
+                                "4841"
+                            ],
+                            "OPC_Time_Out": [
+                                "5.0"
+                            ]
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/tangostationcontrol/test/zeromq/test_publisher.py b/tangostationcontrol/test/zeromq/test_publisher.py
new file mode 100644
index 000000000..283891f60
--- /dev/null
+++ b/tangostationcontrol/test/zeromq/test_publisher.py
@@ -0,0 +1,249 @@
+# Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
+# SPDX-License-Identifier: Apache-2.0
+
+from datetime import datetime
+import json
+import logging
+import queue
+import time
+from importlib.resources import files
+from multiprocessing.sharedctypes import Value
+from ctypes import c_int8
+from threading import Thread
+from typing import Any, Dict
+
+import zmq
+from zmq.utils.monitor import recv_monitor_message
+from timeout_decorator import timeout_decorator
+
+from tangostationcontrol.zeromq.publisher import ZeroMQPublisher
+
+from unittest import mock
+from test import base
+
+logger = logging.getLogger()
+
+
+class TestPublisher(base.TestCase):
+
+    DEFAULT_PUBLISH_ADDRESS = "tcp://*:6001"
+    DEFAULT_SUBSCRIBE_ADDRESS = "tcp://127.0.0.1:6001"
+
+    @staticmethod
+    def event_monitor_loop(monitor: zmq.Socket, trigger: Value):
+        """Loop on monitor socket to count number of subscribers
+
+        :param monitor: zmq monit socket, use `get_monitor_socket()`
+        :param trigger: multiprocessing shared value, must be incrementable / number
+        """
+        while monitor.poll():
+            evt: Dict[str, Any] = {}
+            mon_evt = recv_monitor_message(monitor)
+            evt.update(mon_evt)
+            evt["description"] = evt["event"]
+            logger.warning(f"Event: {evt}")
+            if evt["event"] == zmq.EVENT_HANDSHAKE_SUCCEEDED:
+                logger.info("Setting connected to true")
+                trigger.value += 1
+            elif evt["event"] == zmq.EVENT_DISCONNECTED:
+                logger.info("Dropping connection")
+                trigger.value -= 1
+            elif evt["event"] == zmq.EVENT_MONITOR_STOPPED:
+                break
+        monitor.close()
+
+    @staticmethod
+    def create_event_monitor(monitor: zmq.Socket, trigger: Value):
+        """Create a thread that uses an event monitor socket"""
+        t_monitor = Thread(
+            target=TestPublisher.event_monitor_loop, args=(monitor, trigger)
+        )
+        t_monitor.start()
+        return t_monitor
+
+    @staticmethod
+    def wait_for_start(publisher: ZeroMQPublisher):
+        """Spin until publisher is running"""
+        while not publisher.is_running:
+            logger.info("Waiting for publisher thread to start..")
+            time.sleep(0.1)
+
+    @staticmethod
+    def load_test_json():
+        """Load test_configdb into memory from disc"""
+        file_path = files(__package__).joinpath(f"test_configdb.json")
+        with file_path.open() as _file:
+            return json.dumps(json.load(_file))
+
+    def test_contstruct_bind_uri(self):
+        """Test that helper function creates proper strings"""
+
+        self.assertEqual(
+            "tcp://0.0.0.0:1624",
+            ZeroMQPublisher.contstruct_bind_uri("tcp", "0.0.0.0", 1624),
+        )
+
+        self.assertEqual(
+            "udp://0.0.0.0:1624",
+            ZeroMQPublisher.contstruct_bind_uri("udp", "0.0.0.0", "1624"),
+        )
+
+    @timeout_decorator.timeout(5)
+    def test_topic_bytearray(self):
+        """Pass a list of topics as bytearray"""
+        t_topics = [b"A", b"B"]
+
+        t_publisher = ZeroMQPublisher(self.DEFAULT_PUBLISH_ADDRESS, t_topics)
+        self.assertListEqual(t_topics, t_publisher._topics)
+
+    @timeout_decorator.timeout(5)
+    def test_topic_str(self):
+        """Pass a list of topics as str"""
+        t_topics = ["A", "B"]
+
+        t_publisher = ZeroMQPublisher(self.DEFAULT_PUBLISH_ADDRESS, t_topics)
+        self.assertNotEqual(t_topics, t_publisher._topics)
+
+        t_topics = [b"A", b"B"]
+
+        self.assertListEqual(t_topics, t_publisher._topics)
+
+    @timeout_decorator.timeout(5)
+    def test_start_stop(self):
+        """Test the startup and shutdown sequence"""
+        t_topic = b"A"
+        t_publisher = ZeroMQPublisher(self.DEFAULT_PUBLISH_ADDRESS, [t_topic])
+
+        self.wait_for_start(t_publisher)
+
+        self.assertTrue(t_publisher.is_running)
+        self.assertFalse(t_publisher.is_stopping)
+        self.assertFalse(t_publisher.is_done)
+
+        t_publisher.shutdown()
+
+        self.assertTrue(t_publisher.is_stopping)
+
+        while not t_publisher.is_stopping or not t_publisher.is_done:
+            logger.info("Waiting for publisher thread to stop..")
+            time.sleep(0.1)
+
+        self.assertIsNone(t_publisher.get_exception())
+        self.assertFalse(t_publisher.is_running)
+        self.assertTrue(t_publisher.is_done)
+
+    @timeout_decorator.timeout(5)
+    def test_publish(self):
+        """Test publishing a message and having a subscriber receive it"""
+        t_msg = "test"
+        t_topic = b"A"
+        t_publisher = ZeroMQPublisher(self.DEFAULT_PUBLISH_ADDRESS, [t_topic])
+
+        self.wait_for_start(t_publisher)
+
+        ctx = zmq.Context.instance()
+
+        t_connected = Value(c_int8, 0, lock=False)
+
+        self.create_event_monitor(
+            t_publisher._publisher.get_monitor_socket(), t_connected
+        )
+
+        subscribe = ctx.socket(zmq.SUB)
+        subscribe.connect(self.DEFAULT_SUBSCRIBE_ADDRESS)
+        subscribe.setsockopt(zmq.SUBSCRIBE, t_topic)
+
+        while not t_connected.value >= 1:
+            logger.info("Waiting for topic subscription..")
+            time.sleep(0.1)
+
+        for _ in range(0, 5):  # check against accidental shutdown after first message
+            t_publisher.send(t_msg)
+            msg = subscribe.recv_multipart()
+            self.assertIsInstance(datetime.fromisoformat(msg[1].decode()), datetime)
+            self.assertEqual(t_msg.encode(), msg[2])
+
+        subscribe.close()
+
+        while not t_connected.value == 0:
+            logger.info("Waiting for subscriber to disconnect")
+            time.sleep(0.1)
+
+        self.assertTrue(t_publisher.is_running)
+        self.assertFalse(t_publisher.is_stopping)
+        self.assertFalse(t_publisher.is_done)
+
+    def test_publish_huge_message_multi_subscriber(self):
+        test_data = self.load_test_json()
+        t_topic = b"A"
+        t_publisher = ZeroMQPublisher(self.DEFAULT_PUBLISH_ADDRESS, [t_topic])
+
+        self.wait_for_start(t_publisher)
+
+        ctx = zmq.Context.instance()
+
+        t_connected = Value(c_int8, 0, lock=False)
+
+        self.create_event_monitor(
+            t_publisher._publisher.get_monitor_socket(), t_connected
+        )
+
+        subscribers = []
+        for _ in range(0, 2):
+            subscribe = ctx.socket(zmq.SUB)
+            subscribe.connect(self.DEFAULT_SUBSCRIBE_ADDRESS)
+            subscribe.setsockopt(zmq.SUBSCRIBE, t_topic)
+            subscribers.append(subscribe)
+
+        while not t_connected.value >= 2:
+            logger.info("Waiting for topic subscriptions..")
+            time.sleep(0.1)
+
+        t_publisher.send(test_data)
+        for i in range(0, 2):
+            msg = subscribers[i].recv_multipart()
+            self.assertIsInstance(datetime.fromisoformat(msg[1].decode()), datetime)
+            self.assertEqual(test_data.encode(), msg[2])
+
+    def test_callback(self):
+        """Test that triggering done callbacks works"""
+
+        t_topic = b"A"
+        t_publisher = ZeroMQPublisher(self.DEFAULT_PUBLISH_ADDRESS, [t_topic])
+
+        self.wait_for_start(t_publisher)
+
+        t_cb = mock.Mock()
+
+        t_publisher.register_callback(t_cb)
+
+        t_publisher.shutdown()
+
+        while not t_publisher.is_stopping or not t_publisher.is_done:
+            logger.info("Waiting for publisher thread to stop..")
+            time.sleep(0.1)
+
+        t_cb.assert_called_once()
+
+    def test_queue(self):
+        """Test queuing of messages and full exception"""
+        t_queue_size = 10
+        t_topic = b"A"
+        t_publisher = ZeroMQPublisher(
+            bind_uri=self.DEFAULT_PUBLISH_ADDRESS,
+            topics=[t_topic],
+            queue_size=t_queue_size,
+        )
+        t_publisher.shutdown()
+
+        while not t_publisher.is_stopping or not t_publisher.is_done:
+            logger.info("Waiting for publisher thread to stop..")
+            time.sleep(0.1)
+
+        self.assertEqual(t_queue_size, t_publisher.queue_size)
+
+        for i in range(1, t_queue_size + 1):
+            t_publisher.send("hello")
+            self.assertEqual(i, t_publisher.queue_fill)
+
+        self.assertRaises(queue.Full, t_publisher.send, "hello")
diff --git a/tangostationcontrol/tox.ini b/tangostationcontrol/tox.ini
index 79aa5947d..9100ab352 100644
--- a/tangostationcontrol/tox.ini
+++ b/tangostationcontrol/tox.ini
@@ -48,6 +48,7 @@ commands =
     {envpython} -m coverage report --omit='*test*'
 
 [testenv:{cover,coverage}]
+base_python = python3.10
 runner = ignore_env_name_mismatch
 envdir = {toxworkdir}/coverage
 setenv =
@@ -62,6 +63,7 @@ commands =
 # Use generative name and command prefixes to reuse the same virtualenv
 # for all linting jobs.
 [testenv:{pep8,black,pylint,format}]
+base_python = python3.10
 runner = ignore_env_name_mismatch
 usedevelop = False
 envdir = {toxworkdir}/linting
@@ -93,6 +95,7 @@ commands =
     {envpython} -m xenon tangostationcontrol -b B -m A -a A -i libhdbpp-python
 
 [testenv:build]
+base_python = python3.10
 usedevelop = False
 commands =
     {envpython} -m grpc_tools.protoc -Itangostationcontrol/rpc/_proto=proto --python_out=. --pyi_out=. --grpc_python_out=. proto/observation.proto
-- 
GitLab