diff --git a/lofar_stingray/aggregator/message.py b/lofar_stingray/aggregator/message.py index d7199b565157ef8c70bd62478dc5348adb16e58d..3fde14d780a8fad4e3b116ec2a70365ae43ef812 100644 --- a/lofar_stingray/aggregator/message.py +++ b/lofar_stingray/aggregator/message.py @@ -26,22 +26,24 @@ class Message: class StatisticsHeader: """Generic fields for metadata regarding statistics.""" + # timing information timestamp: datetime = datetime.min + integration_interval: float = 0.0 + + # frequency information + source_info: dict = field(default_factory=dict) + f_adc: int = 0 # source of statistics station: str = "" antenna_field: str = "" type: str = "" - - # packet header metadata, - # for testing and validation - packet_version: int = 0 station_id: int = 0 station_info: dict = field(default_factory=dict) - source_info: dict = field(default_factory=dict) + + # packet header metadata + packet_version: int = 0 observation_id: int = 0 - clock_mhz: int = 0 - integration_interval: float = 0.0 @dataclass diff --git a/lofar_stingray/publish.py b/lofar_stingray/publish.py index 8d6809ec0073eb84d7accf70f9633a8127219d17..fb947d346fa81f458ba10e11601db0e688378b32 100644 --- a/lofar_stingray/publish.py +++ b/lofar_stingray/publish.py @@ -88,13 +88,16 @@ def packets_to_messages( m.antenna_field = antenna_field m.type = mode - m.packet_version = packet.header.version_id - m.station_id = packet.header.station_id - m.station_info = packet.header.station_info - m.source_info = packet.header.source_info - m.observation_id = packet.header.observation_id - m.clock_mhz = packet.header.t_adc - m.integration_interval = packet.header.integration_interval + # convert to dict to get all subfields as well + header_fields = {field[0]: field[1] for field in packet.header} + + m.packet_version = header_fields["version_id"] + m.station_id = header_fields["station_id"] + m.station_info = header_fields["station_info"] + m.source_info = header_fields["source_info"] + m.f_adc = header_fields["f_adc"] + m.observation_id = header_fields["observation_id"] + m.integration_interval = header_fields["integration_interval"] return messages diff --git a/tests/publish/test_publish.py b/tests/publish/test_publish.py index 99a356c689aa9e70aa16798d7283dcba53cd5f18..033a29e9bb8b5e83e0d4840ead28d397a2a83acd 100644 --- a/tests/publish/test_publish.py +++ b/tests/publish/test_publish.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 """Testing of the publish program""" +from datetime import timezone import json import numpy from os.path import dirname @@ -9,6 +10,7 @@ from unittest import TestCase from unittest.mock import patch from lofar_stingray import publish +from lofar_stingray.streams import FileStream from prometheus_client.registry import REGISTRY @@ -28,6 +30,37 @@ class TestPublish(TestCase): for collector in list(REGISTRY._collector_to_names.keys()): REGISTRY.unregister(collector) + def test_packets_to_messages(self): + """Test messages created by packets_to_messages.""" + + with FileStream(f"{dirname(__file__)}/SDP_BST_statistics_packets.bin") as f: + bst_packets = [packet for packet in f] + + messages = publish.packets_to_messages("cs001", "lba", "bst", bst_packets) + + self.assertTrue(len(messages) > 0) + self.assertEqual(timezone.utc, messages[0].timestamp.tzinfo) + self.assertEqual(5, messages[0].packet_version) + + # verify BST data dimensionality + self.assertEqual(488, len(messages[0].bst_data)) + self.assertEqual(2, len(messages[0].bst_data[0])) + + # verify station info + self.assertEqual("cs001", messages[0].station) + self.assertEqual("lba", messages[0].antenna_field) + self.assertEqual("bst", messages[0].type) + self.assertEqual(dict, type(messages[0].station_info)) + self.assertEqual(903, messages[0].station_info["station_id"]) + self.assertEqual(0, messages[0].station_info["antenna_field_index"]) + + # verify frequency information + self.assertEqual(200, messages[0].f_adc) + self.assertEqual(dict, type(messages[0].source_info)) + self.assertEqual(0, messages[0].source_info["antenna_band_index"]) + self.assertEqual(0, messages[0].source_info["nyquist_zone_index"]) + self.assertEqual(1, messages[0].source_info["t_adc"]) + @patch("lofar_stingray.publish.start_http_server", autospec=True) @patch("lofar_stingray.publish.ZeroMQPublisher", autospec=True) def test_bst(self, m_publisher, m_prometheus_server):