diff --git a/infra/env/cs.yaml b/infra/env/cs.yaml index 6597eeacca3c3f7382543bbf1ef207707f6e2a72..121734c09b64d1976c0d25df1a7499d0e2293aad 100644 --- a/infra/env/cs.yaml +++ b/infra/env/cs.yaml @@ -1,13 +1,13 @@ stingray: lba: - sst: 5101 - xst: 5102 - bst: 5103 + sst: 10.99.76.1/16 + xst: 10.99.76.2/16 + bst: 10.99.76.3/16 hba0: - sst: 5111 - xst: 5112 - bst: 5113 + sst: 10.99.76.4/16 + xst: 10.99.76.5/16 + bst: 10.99.76.6/16 hba1: - sst: 5121 - xst: 5122 - bst: 5123 + sst: 10.99.76.7/16 + xst: 10.99.76.8/16 + bst: 10.99.76.9/16 diff --git a/infra/env/rs.yaml b/infra/env/rs.yaml index 335b01221c0351600ff8e77c2c42c15578e0e984..980872889bf26a4e20450e7adbe1d1762d6e39b9 100644 --- a/infra/env/rs.yaml +++ b/infra/env/rs.yaml @@ -1,9 +1,9 @@ stingray: lba: - sst: 5101 - xst: 5102 - bst: 5103 + sst: 10.99.76.1/16 + xst: 10.99.76.2/16 + bst: 10.99.76.3/16 hba: - sst: 5111 - xst: 5112 - bst: 5113 + sst: 10.99.76.4/16 + xst: 10.99.76.5/16 + bst: 10.99.76.6/16 diff --git a/infra/jobs/station/stingray.levant.nomad b/infra/jobs/station/stingray.levant.nomad index 1d1162cea4e4ee60b8b4ec0026c92e4c92b0fc1d..e69d8985c8bef5b415d54f34ef5ea44e700308a5 100644 --- a/infra/jobs/station/stingray.levant.nomad +++ b/infra/jobs/station/stingray.levant.nomad @@ -43,12 +43,19 @@ job "statistics" { } [[ range $af, $fields := $.stingray ]] - [[ range $st, $port := $fields ]] + [[ range $st, $ip := $fields ]] group "stingray-[[ $af ]]-[[ $st ]]" { count = 1 network { - mode = "cni/station" + mode = "cni/statistics" + + cni { + args { + IP = "[[ $ip ]]", + GATEWAY = "10.99.250.250" + } + } } service { @@ -66,7 +73,7 @@ job "statistics" { service { name = "stingray-[[ $af ]]-[[ $st ]]-udp" - port = [[ $port ]] + port = 5001 address_mode = "alloc" } @@ -95,7 +102,7 @@ job "statistics" { "[[ $.station ]]", "[[ $af ]]", "[[ $st ]]", - "udp://0.0.0.0:[[ $port ]]", + "udp://0.0.0.0:5001", "--port=6001" ] } diff --git a/lofar_stingray/forward.py b/lofar_stingray/forward.py index 54f06fc7caf285b567c483436390f85bea3b462b..4bd5e73c8a58f3c7c77b89e2194e52558c8bf28a 100644 --- a/lofar_stingray/forward.py +++ b/lofar_stingray/forward.py @@ -83,21 +83,24 @@ def main(argv=None): with streams.create(args.destination, True, minio_client) as writer: with streams.create(args.source, False, minio_client) as reader: - if args.datatype == "packet": - for packet in reader: - writer.put_packet(packet) - - metric_nr_packets_processed.inc() - metric_nr_bytes_read.set(reader.num_bytes_read) - metric_nr_bytes_written.set(writer.num_bytes_written) - - elif args.datatype == "json": - while data := reader.get_json(): - writer.put_json(data) - - metric_nr_packets_processed.inc() - metric_nr_bytes_read.set(reader.num_bytes_read) - metric_nr_bytes_written.set(writer.num_bytes_written) + try: + if args.datatype == "packet": + for packet in reader: + writer.put_packet(packet) + + metric_nr_packets_processed.inc() + metric_nr_bytes_read.set(reader.num_bytes_read) + metric_nr_bytes_written.set(writer.num_bytes_written) + + elif args.datatype == "json": + while data := reader.get_json(): + writer.put_json(data) + + metric_nr_packets_processed.inc() + metric_nr_bytes_read.set(reader.num_bytes_read) + metric_nr_bytes_written.set(writer.num_bytes_written) + except Exception: # pylint: disable=broad-exception-caught + logger.exception("Caught exception while forwarding packets") logger.info("End of packet stream. Shutting down.") diff --git a/lofar_stingray/publish.py b/lofar_stingray/publish.py index 645b8acd40888d7d895bb4823900c87b3ba1e19f..8d6809ec0073eb84d7accf70f9633a8127219d17 100644 --- a/lofar_stingray/publish.py +++ b/lofar_stingray/publish.py @@ -229,10 +229,12 @@ def main(argv=None): zmq_url = f"tcp://*:{args.port}" topic = f"{args.type}/{args.antenna_field}/{args.station}" logger.info("Publishing on %s with topic %s", zmq_url, topic) + with ZeroMQPublisher(zmq_url, [topic]) as publisher: logger.info("Waiting for publisher to start...") while not publisher.is_running: time.sleep(1) + logger.info("Publisher started") collector = CollectPacketsPerTimestamp() @@ -246,15 +248,18 @@ def main(argv=None): if send_message(publisher, message): metric_nr_messages_published.inc() - # process stream - with streams.create(args.source) as stream: - for packet in read_packets(stream, metric_labels): - for packets_of_same_timestamp in collector.put_packet(packet): - process_packets(packets_of_same_timestamp) - - # process remainder - for packets_of_same_timestamp in collector.done(): - process_packets(packets_of_same_timestamp) + try: + # process stream + with streams.create(args.source) as stream: + for packet in read_packets(stream, metric_labels): + for packets_of_same_timestamp in collector.put_packet(packet): + process_packets(packets_of_same_timestamp) + + # process remainder + for packets_of_same_timestamp in collector.done(): + process_packets(packets_of_same_timestamp) + except Exception: # pylint: disable=broad-exception-caught + logger.exception("Caught exception while processing packets") logger.info("End of packet stream. Shutting down.") diff --git a/lofar_stingray/writer.py b/lofar_stingray/writer.py index 80f85ffb777710891df8fd6f5adce5a48f2fb2c6..d00f55f865c65c3f8bb33a6cb5426f46bb181a90 100644 --- a/lofar_stingray/writer.py +++ b/lofar_stingray/writer.py @@ -3,7 +3,6 @@ """Implements a storage class to write text data to a S3 backend in blocks""" -import asyncio import io import logging from datetime import datetime, timezone, timedelta @@ -58,7 +57,7 @@ class Storage: def __exit__(self, *args): if self.current_block: block = self.current_block - asyncio.run(self._complete_current_block(block)) + self._complete_current_block(block) self.current_block = None def _init_bucket(self): @@ -79,7 +78,7 @@ class Storage: ), ) - async def _complete_current_block(self, block): + def _complete_current_block(self, block): block.seek(io.SEEK_SET, 0) timestamp = datetime.now(timezone.utc) size = len(block.getvalue()) @@ -107,7 +106,7 @@ class Storage: logger.debug("Current block is expired, complete block and start new") block = self.current_block self.current_block = None - asyncio.run(self._complete_current_block(block)) + self._complete_current_block(block) self.current_block = Block(self.duration) data = line.encode() + b"\n"