diff --git a/README.md b/README.md index 297e9932b117749d3ff98a5f6b976f8b866a5a38..e02c5cc3686c97f27483317295951f93816a861f 100644 --- a/README.md +++ b/README.md @@ -82,18 +82,9 @@ Suppose you captured statistics or beamlets in a file called `packets.raw`. You and print a brief summary per packet using: ```python -from lofar_station_client.statistics.receiver import FileReceiver +from lofar_station_client.statistics import receivers -for packet in FileReceiver("packets.raw"): - print(packet) -``` - -You can also process them live from a station, for example: - -```python -from lofar_station_client.statistics.receiver import TCPReceiver - -for packet in TCPReceiver("cs001c.control.lofar", 5101): +for packet in receivers.create("file:///packets.raw"): print(packet) ``` @@ -131,6 +122,7 @@ tox -e debug tests.requests.test_prometheus ## Release notes +- 0.18.7 - Add support for various ZeroMQ package receivers - 0.18.6 - Compatability with new black versions - 0.18.5 - Compatability with python 3.10 and higher - 0.18.4 - Compatability with PyTango 9.5.0 diff --git a/VERSION b/VERSION index a1b6a99c6f00a858828bb374ef5d8e0d9f1f678d..fc67997161f3e5a2d0fa074b69f109dad05db210 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.18.6 +0.18.7 diff --git a/lofar_station_client/statistics/receivers/_zmq.py b/lofar_station_client/statistics/receivers/_zmq.py index f078fec6886f915434c2cb67c4bdc0cc737e0e03..180e9721f0cd5098a11afa12d1b8883127c14de7 100644 --- a/lofar_station_client/statistics/receivers/_zmq.py +++ b/lofar_station_client/statistics/receivers/_zmq.py @@ -4,7 +4,7 @@ """ZeroMQ Receiver""" import json import logging -from typing import Union +from typing import Union, Tuple from urllib.parse import urlsplit, parse_qs import zmq @@ -21,12 +21,15 @@ class ZeroMQReceiver: self._topic: bytes = o.path.strip("/").encode() self._uri = f"{o.scheme}://{o.netloc}" self._content_type = "application/octet-stream" + self._json_timestamp = "ts" self._ctx: zmq.Context = zmq.Context.instance() self._subscriber: zmq.Socket = self._ctx.socket(zmq.SUB) qs = parse_qs(o.query) if "content-type" in qs: self._content_type = qs["content-type"][0] + if "json-timestamp" in qs: + self._json_timestamp = qs["json-timestamp"][0] super().__init__() @@ -36,7 +39,7 @@ class ZeroMQReceiver: self._subscriber.setsockopt(zmq.SUBSCRIBE, self._topic) return self - def __next__(self) -> Union[str, dict, bytes]: + def __next__(self) -> Union[Tuple[str, str], dict, Tuple[str, bytes]]: try: while True: msg = self._subscriber.recv_multipart() @@ -45,12 +48,14 @@ class ZeroMQReceiver: continue if self._content_type == "application/json": - return json.loads(msg[1]) + data = json.loads(msg[2]) + data[self._json_timestamp] = msg[1].decode() + return data if self._content_type == "text/plain": - return msg[1].decode() + return msg[1].decode(), msg[2].decode() - return msg[1] + return msg[1].decode(), msg[2] except zmq.ZMQError as e: if e.errno == zmq.ETERM: raise StopIteration from e @@ -65,3 +70,8 @@ class ZeroMQReceiver: def content_type(self): """Returns the content type of the receiver""" return self._content_type + + @property + def json_timestamp(self): + """Returns the content type of the receiver""" + return self._json_timestamp diff --git a/tests/statistics/receivers/test_zmq.py b/tests/statistics/receivers/test_zmq.py index a26043a92ae6bfbf8ca6b8036c7232e99e24d29d..fe85e161a7e73aabdf4f27421ee75167d64adbe0 100644 --- a/tests/statistics/receivers/test_zmq.py +++ b/tests/statistics/receivers/test_zmq.py @@ -15,8 +15,8 @@ class TestZeroMQReceiver(base.TestCase): ctx.return_value = mock.Mock() ctx.return_value.socket.return_value = socket_mock socket_mock.recv_multipart.side_effect = [ - ["test".encode(), "data1".encode()], - ["test".encode(), "data1".encode()], + ["test".encode(), "2024-05-17T08:35:48Z".encode(), "data1".encode()], + ["test".encode(), "2024-05-17T08:36:48Z".encode(), "data2".encode()], zmq.ZMQError(errno=zmq.ETERM), ] @@ -26,39 +26,107 @@ class TestZeroMQReceiver(base.TestCase): self.assertEqual("test".encode(), receiver.topic) self.assertEqual("text/plain", receiver.content_type) + packages = [] for package in receiver: - self.assertIsInstance(package, str) + packages.append(package) + + self.assertListEqual( + [("2024-05-17T08:35:48Z", "data1"), ("2024-05-17T08:36:48Z", "data2")], + packages, + ) def test_json(self, ctx): socket_mock = mock.Mock() ctx.return_value = mock.Mock() ctx.return_value.socket.return_value = socket_mock socket_mock.recv_multipart.side_effect = [ - ["test2".encode(), "{}".encode()], - ["test2".encode(), "{}".encode()], + [ + "test2".encode(), + "2024-05-18T08:35:48Z".encode(), + '{"data": "one"}'.encode(), + ], + [ + "test2".encode(), + "2024-05-18T08:36:48Z".encode(), + '{"data": "two"}'.encode(), + ], + zmq.ZMQError(errno=zmq.ETERM), + ] + receiver = ZeroMQReceiver( + "tcp://localhost:9999/test2?content-type=application%2Fjson" + ) + self.assertEqual("test2".encode(), receiver.topic) + self.assertEqual("application/json", receiver.content_type) + + packages = [] + for package in receiver: + packages.append(package) + + self.assertListEqual( + [ + dict(ts="2024-05-18T08:35:48Z", data="one"), + dict(ts="2024-05-18T08:36:48Z", data="two"), + ], + packages, + ) + + def test_json_custom(self, ctx): + socket_mock = mock.Mock() + ctx.return_value = mock.Mock() + ctx.return_value.socket.return_value = socket_mock + socket_mock.recv_multipart.side_effect = [ + [ + "test2".encode(), + "2024-05-16T08:35:48Z".encode(), + '{"data": "one"}'.encode(), + ], + [ + "test2".encode(), + "2024-05-16T08:36:48Z".encode(), + '{"data": "two"}'.encode(), + ], zmq.ZMQError(errno=zmq.ETERM), ] receiver = ZeroMQReceiver( "tcp://localhost:9999/test2?content-type=application%2Fjson" + "&json-timestamp=timestamp" ) self.assertEqual("test2".encode(), receiver.topic) self.assertEqual("application/json", receiver.content_type) + packages = [] for package in receiver: - self.assertIsInstance(package, dict) + packages.append(package) + + self.assertListEqual( + [ + dict(timestamp="2024-05-16T08:35:48Z", data="one"), + dict(timestamp="2024-05-16T08:36:48Z", data="two"), + ], + packages, + ) def test_binary(self, ctx): socket_mock = mock.Mock() ctx.return_value = mock.Mock() ctx.return_value.socket.return_value = socket_mock socket_mock.recv_multipart.side_effect = [ - ["test2".encode(), "{}".encode()], - ["test2".encode(), "{}".encode()], + ["test2".encode(), "2024-05-19T08:35:48Z".encode(), b"{data1}"], + ["test2".encode(), "2024-05-19T08:36:48Z".encode(), b"{data2}"], zmq.ZMQError(errno=zmq.ETERM), ] receiver = ZeroMQReceiver("tcp://localhost:9999/test2") self.assertEqual("test2".encode(), receiver.topic) self.assertEqual("application/octet-stream", receiver.content_type) + packages = [] for package in receiver: - self.assertIsInstance(package, bytes) + packages.append(package) + + self.assertListEqual( + [ + ("2024-05-19T08:35:48Z", b"{data1}"), + ("2024-05-19T08:36:48Z", b"{data2}"), + ], + packages, + )