diff --git a/README.md b/README.md index 77494b978592259dd7fc12fda55fc4fc1f53d6bf..adc9173203f63e4a70b2cc246f0ba7f8d4018973 100644 --- a/README.md +++ b/README.md @@ -64,12 +64,37 @@ pip install ./ For more thorough usage explanation please consult the documentation +### Obtain archived monitoring points + +To retrieve old values for a monitoring point from the Prometheus database, use: + ```python from lofar_station_client import get_attribute_history # Metrics from the last hour -get_attribute_history("Pointing_direction_R", "stat/digitalbeam/1") +get_attribute_history("Pointing_direction_R", "stat/digitalbeam/lba") # Specify range in epoch -get_attribute_history("Pointing_direction_R", "stat/digitalbeam/1", start=1655725200.0, end=1655815200.0) +get_attribute_history("Pointing_direction_R", "stat/digitalbeam/lba", start=1655725200.0, end=1655815200.0) +``` + +### Parse raw UDP packet streams from a station + +Suppose you captured statistics or beamlets in a file called `packets.raw`. You can then parse these +and print a brief summary per packet using: + +```python +from lofar_station_client.statistics.receiver import FileReceiver + +for packet in FileReceiver("packets.raw"): + print(packet) +``` + +You can also process them live from a station, for example: + +```python +from lofar_station_client.statistics.receiver import TCPReceiver + +for packet in TCPReceiver("cs001c.control.lofar", 5101): + print(packet) ``` ## Development @@ -105,6 +130,8 @@ tox -e debug tests.requests.test_prometheus ``` ## Releasenotes +- 0.16.1 - Added iterator to Receiver classes + - Added antenna_field_index to packet header - 0.16 - HDF5 attributes: Moved frequency_band and rcu_\* from header to matrix - HDF5 attributes: Replaced subband_frequencies by the smaller subband_frequency_range - HDF5 attributes: Removed superfluous nyquist_zones and spectral_inversion attributes diff --git a/VERSION b/VERSION index 8eac30c383c3451e129b736d33e696ce149c0332..2a0970ca757cde4a70d2b420f2d491aff77b74ad 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.16 +0.16.1 diff --git a/lofar_station_client/statistics/packet.py b/lofar_station_client/statistics/packet.py index da523676045121e542b0a265c8f8c956d8135664..17163b383a4eabe4705090b1f68c09d8923c1fae 100644 --- a/lofar_station_client/statistics/packet.py +++ b/lofar_station_client/statistics/packet.py @@ -59,10 +59,18 @@ class SDPPacket: observation identifier. + station_info + + bit field with station information, encoding several other properties. + station_id station identifier. + antenna_field_index + + antenna field id. 0 = LBA/HBA/HBA0, 1 = HBA1 + source_info bit field with input information, encoding several other properties. @@ -125,6 +133,8 @@ class SDPPacket: self.fsub_type = None self.payload_error = None self.beam_repositioning_flag = None + self.station_id = None + self.antenna_field_index = None # self.source_info 5-8 are reserved self.gn_index = None self.nof_statistics_per_packet = None @@ -141,6 +151,15 @@ class SDPPacket: f"(first byte) is {self.marker}, not one of {self.valid_markers()}." ) + def __str__(self): + return ( + f"SDPPacket(marker={self.marker}, " + f"station={self.station_id}, field={self.antenna_field_name()}, " + f"gn_index={self.gn_index}, " + f"timestamp={self.timestamp().strftime('%FT%T')}, " + f"payload_error={self.payload_error})" + ) + # format string for the header, see unpack below HEADER_FORMAT = ">cBL HH xxxxx xxxxxxx HQ" HEADER_SIZE = struct.calcsize(HEADER_FORMAT) @@ -162,6 +181,12 @@ class SDPPacket: if issubclass(klass, cls) ] + def upcast(self): + """Return a more specialised object for this packet.""" + + klass = PACKET_CLASS_FOR_MARKER.get(self.marker_raw, SDPPacket) + return klass(self.packet) + def unpack(self): """Unpack the packet into properties of this object.""" @@ -171,7 +196,7 @@ class SDPPacket: self.marker_raw, self.version_id, self.observation_id, - self.station_id, + self.station_info, self.source_info, self.block_period_raw, self.block_serial_number, @@ -180,8 +205,15 @@ class SDPPacket: raise ValueError("Error parsing statistics packet") from ex # unpack the fields we just updated + self.unpack_station_info() self.unpack_source_info() + def unpack_station_info(self): + """Unpack the station_info field into properties of this object.""" + + self.station_id = get_bit_value(self.station_info, 0, 9) + self.antenna_field_index = get_bit_value(self.station_info, 10, 15) + def unpack_source_info(self): """Unpack the source_info field into properties of this object.""" @@ -225,6 +257,14 @@ class SDPPacket: # statistics packets, which the constructor will refuse to accept. return self.marker_raw + def antenna_field_name(self) -> str: + """Returns the name of the antenna field as one of LBA-#0, HBA-#0, HBA-#1.""" + + antenna_band = ["LBA", "HBA"][self.antenna_band_index] + + # NB: This returns HBA-#0 for both HBA0 and HBA + return f"{antenna_band}-#{self.antenna_field_index}" + def block_period(self) -> float: """Return the block period, in seconds.""" @@ -253,7 +293,11 @@ class SDPPacket: "marker": self.marker, "version_id": self.version_id, "observation_id": self.observation_id, - "station_id": self.station_id, + "station_info": { + "_raw": self.station_info, + "station_id": self.station_id, + "antenna_field_index": self.antenna_field_index, + }, "source_info": { "_raw": self.source_info, "antenna_band_index": self.antenna_band_index, @@ -338,6 +382,15 @@ class BeamletPacket(SDPPacket): super().__init__(packet) + def __str__(self): + return ( + f"BeamletPacket(" + f"station={self.station_id}, field={self.antenna_field_name()}, " + f"gn_index={self.gn_index}, " + f"timestamp={self.timestamp().strftime('%FT%T')}, " + f"payload_error={self.payload_error})" + ) + def unpack(self): """Unpack the packet into properties of this object.""" @@ -347,7 +400,7 @@ class BeamletPacket(SDPPacket): self.marker_raw, self.version_id, self.observation_id, - self.station_id, + self.station_info, self.source_info_h, self.source_info_l, self.beamlet_scale, @@ -363,6 +416,7 @@ class BeamletPacket(SDPPacket): self.source_info = (self.source_info_h << 16) + self.source_info_l # unpack the fields we just updated + self.unpack_station_info() self.unpack_source_info() # set computed fields in base class @@ -497,6 +551,15 @@ class StatisticsPacket(SDPPacket): super().__init__(packet) + def __str__(self): + return ( + f"StatisticsPacket(marker={self.marker}, " + f"station={self.station_id}, field={self.antenna_field_name()}, " + f"gn_index={self.gn_index}, " + f"timestamp={self.timestamp().strftime('%FT%T')}, " + f"payload_error={self.payload_error})" + ) + def unpack(self): """Unpack the packet into properties of this object.""" @@ -506,7 +569,7 @@ class StatisticsPacket(SDPPacket): self.marker_raw, self.version_id, self.observation_id, - self.station_id, + self.station_info, self.source_info, # reserved byte _, @@ -529,6 +592,7 @@ class StatisticsPacket(SDPPacket): raise ValueError("Error parsing statistics packet") from ex # unpack the fields we just updated + self.unpack_station_info() self.unpack_source_info() self.unpack_data_id() @@ -596,6 +660,16 @@ class SSTPacket(StatisticsPacket): super().__init__(packet) + def __str__(self): + return ( + f"SSTPacket(" + f"station={self.station_id}, field={self.antenna_field_name()}, " + f"gn_index={self.gn_index}, " + f"timestamp={self.timestamp().strftime('%FT%T')}, " + f"payload_error={self.payload_error}, " + f"signal_input={self.signal_input_index})" + ) + def unpack_data_id(self): super().unpack_data_id() @@ -632,6 +706,21 @@ class XSTPacket(StatisticsPacket): super().__init__(packet) + def __str__(self): + last_baseline = ( + self.first_baseline[0] + self.nof_signal_inputs - 1, + self.first_baseline[1] + self.nof_signal_inputs - 1, + ) + return ( + f"XSTPacket(" + f"station={self.station_id}, field={self.antenna_field_name()}, " + f"gn_index={self.gn_index}, " + f"timestamp={self.timestamp().strftime('%FT%T')}, " + f"payload_error={self.payload_error}, " + f"subband={self.subband_index}, " + f"baselines={self.first_baseline} - {last_baseline})" + ) + def unpack_data_id(self): super().unpack_data_id() @@ -667,6 +756,18 @@ class BSTPacket(StatisticsPacket): super().__init__(packet) + def __str__(self): + first_beamlet = self.beamlet_index + last_beamlet = first_beamlet + (self.nof_statistics_per_packet // N_POL) - 1 + return ( + f"BSTPacket(" + f"station={self.station_id}, field={self.antenna_field_name()}, " + f"gn_index={self.gn_index}, " + f"timestamp={self.timestamp().strftime('%FT%T')}, " + f"payload_error={self.payload_error}, " + f"beamlets={first_beamlet} - {last_beamlet})" + ) + def unpack_data_id(self): super().unpack_data_id() @@ -682,7 +783,7 @@ class BSTPacket(StatisticsPacket): def payload(self, signed=True): # We have signed values, per beamlet in pairs # for each polarisation. - return super().payload(signed=True).reshape(-1, 2) + return super().payload(signed=True).reshape(-1, N_POL) # Which class to use for which marker. diff --git a/lofar_station_client/statistics/receiver.py b/lofar_station_client/statistics/receiver.py index 09f08322fc75ab253b7504cdbf123b461e7a86f2..898cd7b72c96ba61ff28f5e7e7fd5137ba83984f 100644 --- a/lofar_station_client/statistics/receiver.py +++ b/lofar_station_client/statistics/receiver.py @@ -17,7 +17,18 @@ class Receiver: def __init__(self, fdesc): self.fdesc = fdesc - def get_packet(self) -> bytes: + def __iter__(self): + """Iterates over all packets in the stream.""" + return self + + def __next__(self) -> bytes: + """Return next packet.""" + try: + return self.get_packet() + except EOFError as exc: + raise StopIteration from exc + + def get_packet(self) -> StatisticsPacket: """Read exactly one statistics packet from the TCP connection.""" # read only the header, to compute the size of the packet @@ -28,8 +39,13 @@ class Receiver: payload_length = packet.expected_size() - len(header) payload = self.read_data(payload_length) - # add payload to the header, and return the full packet - return header + payload + # add payload to the header, and construct the full packet + packet = StatisticsPacket(header + payload) + + # return a more specialised class based on its type + packet = packet.upcast() + + return packet def _read(self, length: int) -> bytes: """Low-level read function to fetch at most "length" (>1) bytes. Returns diff --git a/lofar_station_client/statistics/statistics_data.py b/lofar_station_client/statistics/statistics_data.py index bee6857a535c4c820d33d84da07ab7adc6ec2a7f..82c303561b9478268c1812117dd35e8768bb65af 100644 --- a/lofar_station_client/statistics/statistics_data.py +++ b/lofar_station_client/statistics/statistics_data.py @@ -26,9 +26,15 @@ class StatisticsData(ndarray): timestamp: str = attribute() """ timestamp of the data """ - station_id: int = attribute() + station_info_raw: int = attribute(name="station_info__raw", optional=True) + """ Bit field with station information, encoding several other properties. """ + + station_info_station_id: int = attribute() """ Station identifier """ + station_info_antenna_field_index: int = attribute() + """ Antenna field number """ + source_info_t_adc: int = attribute() """" Sampling clock. 0 = 160 MHz, 1 = 200 MHz. """ diff --git a/lofar_station_client/statistics/writer/VERSION b/lofar_station_client/statistics/writer/VERSION index 2eb3c4fe4eebcdea3da0790cc0ba74cb286ec4f4..5a2a5806df6e909afe3609b5706cb1012913ca0e 100644 --- a/lofar_station_client/statistics/writer/VERSION +++ b/lofar_station_client/statistics/writer/VERSION @@ -1 +1 @@ -0.5 +0.6 diff --git a/lofar_station_client/statistics/writer/entry.py b/lofar_station_client/statistics/writer/entry.py index 34fb5db1bbf1e7c7b0e955575c93e5128cf7b9ac..5e36b13a9a90e338a21eaede29dddb4f9a48bbf9 100644 --- a/lofar_station_client/statistics/writer/entry.py +++ b/lofar_station_client/statistics/writer/entry.py @@ -197,20 +197,12 @@ def _start_loop(receiver, writer, reconnect, filename): """Main loop""" try: while True: - _receive_packets(receiver, writer, reconnect, filename) - except KeyboardInterrupt: - # user abort, don't complain - logger.warning("Received keyboard interrupt. Stopping.") - finally: - writer.close_writer() + for packet in receiver: + writer.next_packet(packet.packet) + if filename or not reconnect: + break -def _receive_packets(receiver, writer, reconnect, filename): - try: - packet = receiver.get_packet() - writer.next_packet(packet) - except EOFError: - if reconnect and not filename: logger.warning("Connection lost, attempting to reconnect") while True: try: @@ -222,10 +214,14 @@ def _receive_packets(receiver, writer, reconnect, filename): time.sleep(10) else: break - logger.warning("Reconnected! Resuming operations") - else: - logger.info("End of input.") - raise SystemExit + logger.info("Reconnected! Resuming operations") + + logger.info("End of input.") + except KeyboardInterrupt: + # user abort, don't complain + logger.warning("Received keyboard interrupt. Stopping.") + finally: + writer.close_writer() def _get_tango_device(tango_disabled, host, device_name): diff --git a/tests/statistics/test_writer.py b/tests/statistics/test_writer.py index 42e0e08fdebf5c50aa61d12c694566c762e5a617..2eab4a216cc4b0b20c2da868ba7fd72a96a05e25 100644 --- a/tests/statistics/test_writer.py +++ b/tests/statistics/test_writer.py @@ -93,8 +93,7 @@ class TestStatisticsReaderWriterSST(TestStatisticsReaderWriter): with mock.patch.object( entry.sys, "argv", default_writer_sys_argv + writer_argv ): - with self.assertRaises(SystemExit): - entry.main() + entry.main() # check if file was written self.assertTrue(isfile(f"{tmpdir}/SST_2021-09-20-12-17-40_{antennafield}.h5")) @@ -203,8 +202,7 @@ class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter): with mock.patch.object( entry.sys, "argv", default_writer_sys_argv + writer_argv ): - with self.assertRaises(SystemExit): - entry.main() + entry.main() # check if file was written self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5")) @@ -263,8 +261,7 @@ class TestStatisticsReaderWriterBST(TestStatisticsReaderWriter): tmpdir, ] with mock.patch.object(entry.sys, "argv", new_sys_argv): - with self.assertRaises(SystemExit): - entry.main() + entry.main() # check if file was written self.assertTrue(isfile(f"{tmpdir}/BST_2022-05-20-11-08-44_HBA.h5")) @@ -288,8 +285,7 @@ class TestStatisticsWriterXST(TestStatisticsReaderWriter): tmpdir, ] with mock.patch.object(entry.sys, "argv", new_sys_argv): - with self.assertRaises(SystemExit): - entry.main() + entry.main() # check if file was written self.assertTrue( @@ -341,8 +337,7 @@ class TestStatisticsWriterXST(TestStatisticsReaderWriter): tmpdir, ] with mock.patch.object(entry.sys, "argv", new_sys_argv): - with self.assertRaises(SystemExit): - entry.main() + entry.main() # check if files were written self.assertTrue( @@ -369,8 +364,7 @@ class TestStatisticsWriterXST(TestStatisticsReaderWriter): tmpdir, ] with mock.patch.object(entry.sys, "argv", new_sys_argv): - with self.assertRaises(SystemExit): - entry.main() + entry.main() # check if file was written self.assertTrue(