Skip to content
Snippets Groups Projects
Commit 65118a42 authored by Hannes Feldt's avatar Hannes Feldt
Browse files

add timestamps to zmq

parent 9e9728d2
No related branches found
1 merge request!89L2SS-1740: Move modules to prepare for statistics metadata
Pipeline #82196 passed
...@@ -82,18 +82,9 @@ Suppose you captured statistics or beamlets in a file called `packets.raw`. You ...@@ -82,18 +82,9 @@ Suppose you captured statistics or beamlets in a file called `packets.raw`. You
and print a brief summary per packet using: and print a brief summary per packet using:
```python ```python
from lofar_station_client.statistics.receiver import FileReceiver from lofar_station_client.statistics import receivers
for packet in FileReceiver("packets.raw"): for packet in receivers.create("file:///packets.raw"):
print(packet)
```
You can also process them live from a station, for example:
```python
from lofar_station_client.statistics.receiver import TCPReceiver
for packet in TCPReceiver("cs001c.control.lofar", 5101):
print(packet) print(packet)
``` ```
...@@ -131,6 +122,7 @@ tox -e debug tests.requests.test_prometheus ...@@ -131,6 +122,7 @@ tox -e debug tests.requests.test_prometheus
## Release notes ## Release notes
- 0.18.7 - Add support for various ZeroMQ package receivers
- 0.18.6 - Compatability with new black versions - 0.18.6 - Compatability with new black versions
- 0.18.5 - Compatability with python 3.10 and higher - 0.18.5 - Compatability with python 3.10 and higher
- 0.18.4 - Compatability with PyTango 9.5.0 - 0.18.4 - Compatability with PyTango 9.5.0
......
0.18.6 0.18.7
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
"""ZeroMQ Receiver""" """ZeroMQ Receiver"""
import json import json
import logging import logging
from typing import Union from typing import Union, Tuple
from urllib.parse import urlsplit, parse_qs from urllib.parse import urlsplit, parse_qs
import zmq import zmq
...@@ -21,12 +21,15 @@ class ZeroMQReceiver: ...@@ -21,12 +21,15 @@ class ZeroMQReceiver:
self._topic: bytes = o.path.strip("/").encode() self._topic: bytes = o.path.strip("/").encode()
self._uri = f"{o.scheme}://{o.netloc}" self._uri = f"{o.scheme}://{o.netloc}"
self._content_type = "application/octet-stream" self._content_type = "application/octet-stream"
self._json_timestamp = "ts"
self._ctx: zmq.Context = zmq.Context.instance() self._ctx: zmq.Context = zmq.Context.instance()
self._subscriber: zmq.Socket = self._ctx.socket(zmq.SUB) self._subscriber: zmq.Socket = self._ctx.socket(zmq.SUB)
qs = parse_qs(o.query) qs = parse_qs(o.query)
if "content-type" in qs: if "content-type" in qs:
self._content_type = qs["content-type"][0] self._content_type = qs["content-type"][0]
if "json-timestamp" in qs:
self._json_timestamp = qs["json-timestamp"][0]
super().__init__() super().__init__()
...@@ -36,7 +39,7 @@ class ZeroMQReceiver: ...@@ -36,7 +39,7 @@ class ZeroMQReceiver:
self._subscriber.setsockopt(zmq.SUBSCRIBE, self._topic) self._subscriber.setsockopt(zmq.SUBSCRIBE, self._topic)
return self return self
def __next__(self) -> Union[str, dict, bytes]: def __next__(self) -> Union[Tuple[str, str], dict, Tuple[str, bytes]]:
try: try:
while True: while True:
msg = self._subscriber.recv_multipart() msg = self._subscriber.recv_multipart()
...@@ -45,12 +48,14 @@ class ZeroMQReceiver: ...@@ -45,12 +48,14 @@ class ZeroMQReceiver:
continue continue
if self._content_type == "application/json": if self._content_type == "application/json":
return json.loads(msg[1]) data = json.loads(msg[2])
data[self._json_timestamp] = msg[1].decode()
return data
if self._content_type == "text/plain": if self._content_type == "text/plain":
return msg[1].decode() return msg[1].decode(), msg[2].decode()
return msg[1] return msg[1].decode(), msg[2]
except zmq.ZMQError as e: except zmq.ZMQError as e:
if e.errno == zmq.ETERM: if e.errno == zmq.ETERM:
raise StopIteration from e raise StopIteration from e
...@@ -65,3 +70,8 @@ class ZeroMQReceiver: ...@@ -65,3 +70,8 @@ class ZeroMQReceiver:
def content_type(self): def content_type(self):
"""Returns the content type of the receiver""" """Returns the content type of the receiver"""
return self._content_type return self._content_type
@property
def json_timestamp(self):
"""Returns the content type of the receiver"""
return self._json_timestamp
...@@ -15,8 +15,8 @@ class TestZeroMQReceiver(base.TestCase): ...@@ -15,8 +15,8 @@ class TestZeroMQReceiver(base.TestCase):
ctx.return_value = mock.Mock() ctx.return_value = mock.Mock()
ctx.return_value.socket.return_value = socket_mock ctx.return_value.socket.return_value = socket_mock
socket_mock.recv_multipart.side_effect = [ socket_mock.recv_multipart.side_effect = [
["test".encode(), "data1".encode()], ["test".encode(), "2024-05-17T08:35:48Z".encode(), "data1".encode()],
["test".encode(), "data1".encode()], ["test".encode(), "2024-05-17T08:36:48Z".encode(), "data2".encode()],
zmq.ZMQError(errno=zmq.ETERM), zmq.ZMQError(errno=zmq.ETERM),
] ]
...@@ -26,39 +26,107 @@ class TestZeroMQReceiver(base.TestCase): ...@@ -26,39 +26,107 @@ class TestZeroMQReceiver(base.TestCase):
self.assertEqual("test".encode(), receiver.topic) self.assertEqual("test".encode(), receiver.topic)
self.assertEqual("text/plain", receiver.content_type) self.assertEqual("text/plain", receiver.content_type)
packages = []
for package in receiver: for package in receiver:
self.assertIsInstance(package, str) packages.append(package)
self.assertListEqual(
[("2024-05-17T08:35:48Z", "data1"), ("2024-05-17T08:36:48Z", "data2")],
packages,
)
def test_json(self, ctx): def test_json(self, ctx):
socket_mock = mock.Mock() socket_mock = mock.Mock()
ctx.return_value = mock.Mock() ctx.return_value = mock.Mock()
ctx.return_value.socket.return_value = socket_mock ctx.return_value.socket.return_value = socket_mock
socket_mock.recv_multipart.side_effect = [ socket_mock.recv_multipart.side_effect = [
["test2".encode(), "{}".encode()], [
["test2".encode(), "{}".encode()], "test2".encode(),
"2024-05-18T08:35:48Z".encode(),
'{"data": "one"}'.encode(),
],
[
"test2".encode(),
"2024-05-18T08:36:48Z".encode(),
'{"data": "two"}'.encode(),
],
zmq.ZMQError(errno=zmq.ETERM),
]
receiver = ZeroMQReceiver(
"tcp://localhost:9999/test2?content-type=application%2Fjson"
)
self.assertEqual("test2".encode(), receiver.topic)
self.assertEqual("application/json", receiver.content_type)
packages = []
for package in receiver:
packages.append(package)
self.assertListEqual(
[
dict(ts="2024-05-18T08:35:48Z", data="one"),
dict(ts="2024-05-18T08:36:48Z", data="two"),
],
packages,
)
def test_json_custom(self, ctx):
socket_mock = mock.Mock()
ctx.return_value = mock.Mock()
ctx.return_value.socket.return_value = socket_mock
socket_mock.recv_multipart.side_effect = [
[
"test2".encode(),
"2024-05-16T08:35:48Z".encode(),
'{"data": "one"}'.encode(),
],
[
"test2".encode(),
"2024-05-16T08:36:48Z".encode(),
'{"data": "two"}'.encode(),
],
zmq.ZMQError(errno=zmq.ETERM), zmq.ZMQError(errno=zmq.ETERM),
] ]
receiver = ZeroMQReceiver( receiver = ZeroMQReceiver(
"tcp://localhost:9999/test2?content-type=application%2Fjson" "tcp://localhost:9999/test2?content-type=application%2Fjson"
"&json-timestamp=timestamp"
) )
self.assertEqual("test2".encode(), receiver.topic) self.assertEqual("test2".encode(), receiver.topic)
self.assertEqual("application/json", receiver.content_type) self.assertEqual("application/json", receiver.content_type)
packages = []
for package in receiver: for package in receiver:
self.assertIsInstance(package, dict) packages.append(package)
self.assertListEqual(
[
dict(timestamp="2024-05-16T08:35:48Z", data="one"),
dict(timestamp="2024-05-16T08:36:48Z", data="two"),
],
packages,
)
def test_binary(self, ctx): def test_binary(self, ctx):
socket_mock = mock.Mock() socket_mock = mock.Mock()
ctx.return_value = mock.Mock() ctx.return_value = mock.Mock()
ctx.return_value.socket.return_value = socket_mock ctx.return_value.socket.return_value = socket_mock
socket_mock.recv_multipart.side_effect = [ socket_mock.recv_multipart.side_effect = [
["test2".encode(), "{}".encode()], ["test2".encode(), "2024-05-19T08:35:48Z".encode(), b"{data1}"],
["test2".encode(), "{}".encode()], ["test2".encode(), "2024-05-19T08:36:48Z".encode(), b"{data2}"],
zmq.ZMQError(errno=zmq.ETERM), zmq.ZMQError(errno=zmq.ETERM),
] ]
receiver = ZeroMQReceiver("tcp://localhost:9999/test2") receiver = ZeroMQReceiver("tcp://localhost:9999/test2")
self.assertEqual("test2".encode(), receiver.topic) self.assertEqual("test2".encode(), receiver.topic)
self.assertEqual("application/octet-stream", receiver.content_type) self.assertEqual("application/octet-stream", receiver.content_type)
packages = []
for package in receiver: for package in receiver:
self.assertIsInstance(package, bytes) packages.append(package)
self.assertListEqual(
[
("2024-05-19T08:35:48Z", b"{data1}"),
("2024-05-19T08:36:48Z", b"{data2}"),
],
packages,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment