Skip to content
Snippets Groups Projects
Commit fbee6611 authored by Jan David Mol's avatar Jan David Mol
Browse files

L2SS-1910: Sync dataproduct filenames with TMSS

parent 4dcf4797
No related branches found
No related tags found
1 merge request!11L2SS-1910: Sync dataproduct filenames with TMSS
......@@ -173,8 +173,10 @@ deploy_nomad_station:
matrix:
- STATION:
- cs001
- cs032
COMPONENT:
- stingray
- stingray-bucket-replication
environment:
name: $STATION
script:
......
job "statistics-aggregate" {
datacenters = ["nl-north"]
type = "batch"
namespace = "statistics"
parameterized {
payload = "forbidden"
meta_required = ["station", "antenna_field", "begin", "end", "source", "destination"]
meta_required = ["observation_id", "station", "antenna_field", "begin", "end", "source", "destination"]
}
[[ range $type := "bst xst sst" | split " " ]]
......@@ -30,7 +31,7 @@ job "statistics-aggregate" {
"${NOMAD_META_begin}",
"${NOMAD_META_end}",
"${NOMAD_META_source}",
"${NOMAD_META_destination}/[[ $type ]]/${NOMAD_META_station}-${NOMAD_META_antenna_field}.h5"
"${NOMAD_META_destination}/L${NOMAD_META_observation_id}_${NOMAD_META_station}_${NOMAD_META_antenna_field}_[[ $type ]].h5"
]
}
......
job "statistics-bucket-replication" {
datacenters = ["stat"]
type = "batch"
periodic {
cron = "*/5 * * * * *"
prohibit_overlap = true
}
group "batch" {
count = 1
network {
mode = "bridge"
}
task "mc" {
driver = "docker"
config {
image = "minio/mc:[[.object_storage.mc.version]]"
entrypoint = ["mc", "batch", "start", "local", "/local/statistics.yaml" ]
mount {
type = "bind"
source = "local/mc"
target = "/root/.mc"
}
}
env {
MINIO_ROOT_USER = "[[.object_storage.user.name]]"
MINIO_ROOT_PASSWORD = "[[.object_storage.user.pass]]"
}
resources {
cpu = 10
memory = 512
}
template {
destination = "local/mc/config.json"
change_mode = "noop"
data = <<EOF
{
"aliases": {
"local": {
"url": "http://s3.service.consul:9000",
"accessKey": "[[.object_storage.user.name]]",
"secretKey": "[[.object_storage.user.name]]",
"api": "s3v4",
"path": "on"
}
}
}
EOF
}
template {
destination = "local/statistics.yaml"
change_mode = "noop"
data = <<EOF
replicate:
apiVersion: v1
source:
type: minio
bucket: statistics
path: "on"
target:
type: minio
bucket: central-statistics
endpoint: "https://s3.lofar.net"
path: "on"
credentials:
accessKey: [[.object_storage.user.name]]
secretKey: [[.object_storage.user.name]]
flags:
filter:
newerThan: "10m"
EOF
}
}
}
}
......@@ -4,68 +4,10 @@ job "statistics" {
reschedule {
unlimited = true
delay = "10s"
delay = "30s"
delay_function = "constant"
}
group "replicate" {
count = 1
network {
mode = "bridge"
}
task "mc" {
driver = "docker"
config {
image = "minio/mc:[[$.object_storage.mc.version]]"
entrypoint = ["mc", "mirror", "--watch", "local/statistics", "central/central-statistics"]
mount {
type = "bind"
source = "local/mc"
target = "/root/.mc"
}
}
env {
MINIO_ROOT_USER = "[[$.object_storage.user.name]]"
MINIO_ROOT_PASSWORD = "[[$.object_storage.user.pass]]"
}
resources {
cpu = 10
memory = 512
memory_max = 4096
}
template {
destination = "local/mc/config.json"
change_mode = "noop"
data = <<EOF
{
"aliases": {
"local": {
"url": "http://s3.service.consul:9000",
"accessKey": "[[$.object_storage.user.name]]",
"secretKey": "[[$.object_storage.user.name]]",
"api": "s3v4",
"path": "on"
},
"central": {
"url": "https://s3.lofar.net",
"accessKey": "[[$.object_storage.user.name]]",
"secretKey": "[[$.object_storage.user.name]]",
"api": "s3v4",
"path": "on"
}
}
}
EOF
}
}
}
group "stingray-metadata" {
count = 1
......
......@@ -7,7 +7,7 @@ import argparse
import logging
import os
import tempfile
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from urllib.parse import urlparse
from lofar_station_client.file_access import create_hdf5
......@@ -23,6 +23,18 @@ logger.setLevel(logging.DEBUG)
logger.addHandler(setup_logging_handler())
def _datetime_fromisoformat_with_tz(
datetime_str: str, default_tz: timezone = timezone.utc
) -> datetime:
"""Parse the given string as an ISO datetime format. Add a default timezone
if none is provided."""
dt = datetime.fromisoformat(datetime_str)
# add timezone if none is provided
dt.replace(tzinfo=dt.tzinfo or default_tz)
return dt
def _create_parser():
"""Define the parser"""
parser = argparse.ArgumentParser(
......@@ -41,8 +53,8 @@ def _create_parser():
choices=["xst", "sst", "bst"],
help="the type of the statistics",
)
parser.add_argument("begin", type=datetime.fromisoformat)
parser.add_argument("end", type=datetime.fromisoformat)
parser.add_argument("begin", type=_datetime_fromisoformat_with_tz)
parser.add_argument("end", type=_datetime_fromisoformat_with_tz)
parser.add_argument(
"source", type=urlparse, help="the source bucket location of the data"
)
......@@ -86,7 +98,7 @@ def main():
statistics_storage.load_packets(args.begin, args.end),
)
nr_signal_inputs = 96 # SstAggregator.MAX_INPUTS
nr_signal_inputs = 192 # default LBA
first_signal_input_index = 0
match args.antennafield:
case "hba":
......
......@@ -78,15 +78,21 @@ class Storage:
)
async def _complete_current_block(self, block):
logger.info("Write block %s", block.start)
block.seek(io.SEEK_SET, 0)
timestamp = datetime.now(timezone.utc)
size = len(block.getvalue())
if size == 0:
logger.info("Discarding empty block %s", block.start)
return
logger.info("Write block %s", block.start)
self._minio_client.put_object(
self.bucket,
f"{self.prefix}/{timestamp.year}/{timestamp.month:02d}/{timestamp.day:02d}/"
f"{timestamp.isoformat()}.json",
block,
len(block.getvalue()),
size,
content_type="application/json",
)
......
......@@ -81,6 +81,33 @@ class TestStorage(TestCase):
content_type="application/json",
)
@patch("lofar_stingray.writer.datetime", autospec=True)
def test_discard_empty(self, mock_datetime):
"""Test if write line is sending chunked data to storage backend"""
mock_datetime.now.return_value = datetime(
year=2023,
month=11,
day=24,
hour=12,
minute=12,
second=12,
tzinfo=timezone.utc,
)
storage = Storage("bucket", "prefix", self.mock_minio)
with storage:
mock_datetime.now.return_value = datetime(
year=2023,
month=11,
day=25,
hour=12,
minute=12,
second=12,
tzinfo=timezone.utc,
)
self.assertFalse(self.mock_minio.put_object.called)
class TestBlock(TestCase):
"""Test cases of the block class"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment