Skip to content
Snippets Groups Projects
Commit 5cf84939 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-2120: Use static IP range for statistics record jobs

parent f8d7e98d
No related branches found
No related tags found
1 merge request!21L2SS-2120: Use static IP range for statistics record jobs
stingray: stingray:
lba: lba:
sst: 5101 sst: 10.99.76.1/16
xst: 5102 xst: 10.99.76.2/16
bst: 5103 bst: 10.99.76.3/16
hba0: hba0:
sst: 5111 sst: 10.99.76.4/16
xst: 5112 xst: 10.99.76.5/16
bst: 5113 bst: 10.99.76.6/16
hba1: hba1:
sst: 5121 sst: 10.99.76.7/16
xst: 5122 xst: 10.99.76.8/16
bst: 5123 bst: 10.99.76.9/16
stingray: stingray:
lba: lba:
sst: 5101 sst: 10.99.76.1/16
xst: 5102 xst: 10.99.76.2/16
bst: 5103 bst: 10.99.76.3/16
hba: hba:
sst: 5111 sst: 10.99.76.4/16
xst: 5112 xst: 10.99.76.5/16
bst: 5113 bst: 10.99.76.6/16
...@@ -43,12 +43,19 @@ job "statistics" { ...@@ -43,12 +43,19 @@ job "statistics" {
} }
[[ range $af, $fields := $.stingray ]] [[ range $af, $fields := $.stingray ]]
[[ range $st, $port := $fields ]] [[ range $st, $ip := $fields ]]
group "stingray-[[ $af ]]-[[ $st ]]" { group "stingray-[[ $af ]]-[[ $st ]]" {
count = 1 count = 1
network { network {
mode = "cni/station" mode = "cni/statistics"
cni {
args {
IP = "[[ $ip ]]",
GATEWAY = "10.99.250.250"
}
}
} }
service { service {
...@@ -66,7 +73,7 @@ job "statistics" { ...@@ -66,7 +73,7 @@ job "statistics" {
service { service {
name = "stingray-[[ $af ]]-[[ $st ]]-udp" name = "stingray-[[ $af ]]-[[ $st ]]-udp"
port = [[ $port ]] port = 5001
address_mode = "alloc" address_mode = "alloc"
} }
...@@ -95,7 +102,7 @@ job "statistics" { ...@@ -95,7 +102,7 @@ job "statistics" {
"[[ $.station ]]", "[[ $.station ]]",
"[[ $af ]]", "[[ $af ]]",
"[[ $st ]]", "[[ $st ]]",
"udp://0.0.0.0:[[ $port ]]", "udp://0.0.0.0:5001",
"--port=6001" "--port=6001"
] ]
} }
......
...@@ -83,21 +83,24 @@ def main(argv=None): ...@@ -83,21 +83,24 @@ def main(argv=None):
with streams.create(args.destination, True, minio_client) as writer: with streams.create(args.destination, True, minio_client) as writer:
with streams.create(args.source, False, minio_client) as reader: with streams.create(args.source, False, minio_client) as reader:
if args.datatype == "packet": try:
for packet in reader: if args.datatype == "packet":
writer.put_packet(packet) for packet in reader:
writer.put_packet(packet)
metric_nr_packets_processed.inc()
metric_nr_bytes_read.set(reader.num_bytes_read) metric_nr_packets_processed.inc()
metric_nr_bytes_written.set(writer.num_bytes_written) metric_nr_bytes_read.set(reader.num_bytes_read)
metric_nr_bytes_written.set(writer.num_bytes_written)
elif args.datatype == "json":
while data := reader.get_json(): elif args.datatype == "json":
writer.put_json(data) while data := reader.get_json():
writer.put_json(data)
metric_nr_packets_processed.inc()
metric_nr_bytes_read.set(reader.num_bytes_read) metric_nr_packets_processed.inc()
metric_nr_bytes_written.set(writer.num_bytes_written) metric_nr_bytes_read.set(reader.num_bytes_read)
metric_nr_bytes_written.set(writer.num_bytes_written)
except Exception: # pylint: disable=broad-exception-caught
logger.exception("Caught exception while forwarding packets")
logger.info("End of packet stream. Shutting down.") logger.info("End of packet stream. Shutting down.")
......
...@@ -229,10 +229,12 @@ def main(argv=None): ...@@ -229,10 +229,12 @@ def main(argv=None):
zmq_url = f"tcp://*:{args.port}" zmq_url = f"tcp://*:{args.port}"
topic = f"{args.type}/{args.antenna_field}/{args.station}" topic = f"{args.type}/{args.antenna_field}/{args.station}"
logger.info("Publishing on %s with topic %s", zmq_url, topic) logger.info("Publishing on %s with topic %s", zmq_url, topic)
with ZeroMQPublisher(zmq_url, [topic]) as publisher: with ZeroMQPublisher(zmq_url, [topic]) as publisher:
logger.info("Waiting for publisher to start...") logger.info("Waiting for publisher to start...")
while not publisher.is_running: while not publisher.is_running:
time.sleep(1) time.sleep(1)
logger.info("Publisher started") logger.info("Publisher started")
collector = CollectPacketsPerTimestamp() collector = CollectPacketsPerTimestamp()
...@@ -246,15 +248,18 @@ def main(argv=None): ...@@ -246,15 +248,18 @@ def main(argv=None):
if send_message(publisher, message): if send_message(publisher, message):
metric_nr_messages_published.inc() metric_nr_messages_published.inc()
# process stream try:
with streams.create(args.source) as stream: # process stream
for packet in read_packets(stream, metric_labels): with streams.create(args.source) as stream:
for packets_of_same_timestamp in collector.put_packet(packet): for packet in read_packets(stream, metric_labels):
process_packets(packets_of_same_timestamp) for packets_of_same_timestamp in collector.put_packet(packet):
process_packets(packets_of_same_timestamp)
# process remainder
for packets_of_same_timestamp in collector.done(): # process remainder
process_packets(packets_of_same_timestamp) for packets_of_same_timestamp in collector.done():
process_packets(packets_of_same_timestamp)
except Exception: # pylint: disable=broad-exception-caught
logger.exception("Caught exception while processing packets")
logger.info("End of packet stream. Shutting down.") logger.info("End of packet stream. Shutting down.")
......
...@@ -3,7 +3,6 @@ ...@@ -3,7 +3,6 @@
"""Implements a storage class to write text data to a S3 backend in blocks""" """Implements a storage class to write text data to a S3 backend in blocks"""
import asyncio
import io import io
import logging import logging
from datetime import datetime, timezone, timedelta from datetime import datetime, timezone, timedelta
...@@ -58,7 +57,7 @@ class Storage: ...@@ -58,7 +57,7 @@ class Storage:
def __exit__(self, *args): def __exit__(self, *args):
if self.current_block: if self.current_block:
block = self.current_block block = self.current_block
asyncio.run(self._complete_current_block(block)) self._complete_current_block(block)
self.current_block = None self.current_block = None
def _init_bucket(self): def _init_bucket(self):
...@@ -79,7 +78,7 @@ class Storage: ...@@ -79,7 +78,7 @@ class Storage:
), ),
) )
async def _complete_current_block(self, block): def _complete_current_block(self, block):
block.seek(io.SEEK_SET, 0) block.seek(io.SEEK_SET, 0)
timestamp = datetime.now(timezone.utc) timestamp = datetime.now(timezone.utc)
size = len(block.getvalue()) size = len(block.getvalue())
...@@ -107,7 +106,7 @@ class Storage: ...@@ -107,7 +106,7 @@ class Storage:
logger.debug("Current block is expired, complete block and start new") logger.debug("Current block is expired, complete block and start new")
block = self.current_block block = self.current_block
self.current_block = None self.current_block = None
asyncio.run(self._complete_current_block(block)) self._complete_current_block(block)
self.current_block = Block(self.duration) self.current_block = Block(self.duration)
data = line.encode() + b"\n" data = line.encode() + b"\n"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment