Commit cd57994d authored by Jorrit Schaap's avatar Jorrit Schaap

TMSS-153: and TMSS-145: fixed populate.populate_lofar_json_schemas according...

TMSS-153: and TMSS-145: fixed populate.populate_lofar_json_schemas according to https://support.astron.nl/confluence/display/TMSS/UC1+JSON
There are now two Task schema and 1 Subtask schema.
Used the schema's, added dataproducts, convert everything to parset.
parent 84a2e99e
......@@ -17,12 +17,12 @@
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from lofar.sas.tmss.tmss.tmssapp.models.scheduling import Subtask
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.parameterset import parameterset
from lofar.common.datetimeutils import formatDatetime
from lofar.common.json_utils import add_defaults_to_json_object_for_schema
def convert_to_parset(subtask: Subtask) -> parameterset:
def convert_to_parset(subtask: models.Subtask) -> parameterset:
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, subtask.specifications_template.schema)
......@@ -77,11 +77,17 @@ def convert_to_parset(subtask: Subtask) -> parameterset:
parset["Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block']
parset["Observation.DataProducts.Output_Correlated.enabled"] = True
parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster or "CEP4"
# TODO: locations of dataproducts
parset["Observation.DataProducts.Output_Correlated.filenames"] = []
parset["Observation.DataProducts.Output_Correlated.locations"] = []
parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster or "CEP4"
# TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work
subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id))
for subtask_output in subtask_outputs:
dataproducts = list(models.Dataproduct.objects.filter(producer_id=subtask_output.id))
for dataproduct in dataproducts:
parset["Observation.DataProducts.Output_Correlated.filenames"].append(dataproduct.filename)
parset["Observation.DataProducts.Output_Correlated.locations"].append(dataproduct.directory)
# convert dict to real parameterset, and return it
parset = parameterset(parset)
......
......@@ -15,10 +15,10 @@ class Migration(migrations.Migration):
"""
import json
from lofar.sas.tmss.tmss.tmssapp.models.specification import Role, Datatype, Dataformat, CopyReason
from lofar.sas.tmss.tmss.tmssapp.models.specification import Role, Datatype, Dataformat, CopyReason, TaskTemplate
from lofar.sas.tmss.tmss.tmssapp.models.scheduling import SubtaskState, SubtaskType, SubtaskTemplate, Subtask, \
StationType, Algorithm, ScheduleMethod, Cluster, Filesystem
from lofar.common.json_utils import add_defaults_to_json_object_for_schema
from lofar.common.json_utils import *
def populate_choices(apps, schema_editor):
'''
......@@ -32,7 +32,7 @@ def populate_choices(apps, schema_editor):
def populate_lofar_json_schemas(apps, schema_editor):
_populate_correlator_schema()
_populate_correlator_calibrator_schema()
_populate_obscontrol_schema()
_populate_stations_schema()
......@@ -43,12 +43,11 @@ def populate_misc(apps, schema_editor):
fs = Filesystem.objects.create(name="LustreFS", cluster=cluster, capacity=3.6e15)
def _populate_correlator_schema():
subtask_template_data = {"type": SubtaskType.objects.get(value='observation'),
"name": "correlator schema",
"description": 'first attempt at correlator schema',
"version": '0.1',
"schema": json.loads('''
def _populate_correlator_calibrator_schema():
task_template_data = {"name": "correlator schema",
"description": 'correlator schema for calibrator observations',
"version": '0.1',
"schema": json.loads('''
{
"$id": "http://example.com/example.json",
"type": "object",
......@@ -160,98 +159,55 @@ def _populate_correlator_schema():
]
}
}
}'''),
"realtime": True,
"queue": False,
"tags": []}
}'''), "tags": []}
SubtaskTemplate.objects.create(**subtask_template_data)
TaskTemplate.objects.create(**task_template_data)
def _populate_example_data():
from datetime import datetime
try:
from lofar.sas.tmss.tmss.tmssapp.models.specification import TaskDraft, TaskBlueprint
from lofar.sas.tmss.test.tmss_test_data_django_models import TaskDraft_test_data, TaskBlueprint_test_data
taskdraft_data = TaskDraft_test_data()
taskdraft = TaskDraft.objects.create(**taskdraft_data)
taskblueprint_data = TaskBlueprint_test_data(task_draft=taskdraft)
taskblueprint = TaskBlueprint.objects.create(**taskblueprint_data)
except ImportError:
taskblueprint = None
# subtask 1
subtask_template = SubtaskTemplate.objects.get(name='correlator schema')
specifications_doc = {
"duration": 60,
"calibrator": {"enabled": False,
"autoselect": False,
"pointing": {"direction_type": "J2000",
"angle1": 45,
"angle2": 20}},
"channels_per_subband": 64,
"integration_time": 1,
"storage_cluster": "CEP4"}
specifications_doc = add_defaults_to_json_object_for_schema(specifications_doc, subtask_template.schema)
subtask_data = {"start_time": datetime.utcnow().isoformat(),
"stop_time": datetime.utcnow().isoformat(),
"state": SubtaskState.objects.all()[0],
"specifications_doc": specifications_doc,
"task_blueprint": taskblueprint,
"specifications_template": subtask_template,
"tags": ["TMSS", "TESTING", "FAKE_DATA"],
"do_cancel": None,
"priority": 1,
"schedule_method": ScheduleMethod.objects.all()[0],
"cluster": None,
"scheduler_input_doc": ""}
Subtask.objects.create(**subtask_data)
# subtask 2
subtask_template = SubtaskTemplate.objects.get(name='obscontrol schema')
specifications_doc = {
"stations": {"station_list": ["CS001", "CS002"],
"antenna_set": "HBA_DUAL",
"filter": "HBA_110_190",
"analog_pointing": {"direction_type": "J2000",
"angle1": 45,
"angle2": 20},
"digital_pointings": [{"name": "beam01",
"pointing": {"direction_type": "J2000",
"angle1": 45,
"angle2": 20},
"subbands": list(range(0, 16))
}]
}
}
specifications_doc = add_defaults_to_json_object_for_schema(specifications_doc, subtask_template.schema)
subtask_data = {"start_time": datetime.utcnow().isoformat(),
"stop_time": datetime.utcnow().isoformat(),
"state": SubtaskState.objects.all()[0],
"specifications_doc": specifications_doc,
"task_blueprint": taskblueprint,
"specifications_template": subtask_template,
"tags": ["TMSS", "TESTING", "FAKE_DATA"],
"do_cancel": None,
"priority": 1,
"schedule_method": ScheduleMethod.objects.all()[0],
"cluster": None,
"scheduler_input_doc": ""}
from datetime import datetime
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.test.tmss_test_data_django_models import TaskDraft_test_data, TaskBlueprint_test_data, SubtaskOutput_test_data, Dataproduct_test_data, Subtask_test_data
task_template = models.TaskTemplate.objects.get(name='correlator schema')
task_draft_data = TaskDraft_test_data(name="my test obs", specifications_template=task_template)
task_draft = models.TaskDraft.objects.create(**task_draft_data)
task_blueprint_data = TaskBlueprint_test_data(task_draft=task_draft)
task_blueprint = models.TaskBlueprint.objects.create(**task_blueprint_data)
subtask_template = models.SubtaskTemplate.objects.get(name='observationcontrol schema')
specifications_doc = {
"stations": {"station_list": ["CS001", "CS002"],
"antenna_set": "HBA_DUAL",
"filter": "HBA_110_190",
"analog_pointing": {"direction_type": "J2000",
"angle1": 45,
"angle2": 20},
"digital_pointings": [{"name": "beam01",
"pointing": {"direction_type": "J2000",
"angle1": 45,
"angle2": 20},
"subbands": list(range(0, 16))
}]
}
}
Subtask.objects.create(**subtask_data)
specifications_doc = add_defaults_to_json_object_for_schema(specifications_doc, subtask_template.schema)
subtask_data = Subtask_test_data(task_blueprint=task_blueprint, subtask_template=subtask_template, specifications_doc=specifications_doc)
subtask = models.Subtask.objects.create(**subtask_data)
subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
for sb_nr in specifications_doc['stations']['digital_pointings'][0]['subbands']:
dataproduct: models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output, filename="dataproduct_SB_%03d.h5"%sb_nr))
except ImportError:
pass
def _populate_obscontrol_schema():
subtask_template_data = {"type": SubtaskType.objects.get(value='observation'),
"name": "obscontrol schema",
"description": 'first attempt at obscontrol schema',
"name": "observationcontrol schema",
"description": 'observationcontrol schema for observation subtask',
"version": '0.1',
"schema": json.loads('''
{
......@@ -578,11 +534,10 @@ def _populate_obscontrol_schema():
def _populate_stations_schema():
subtask_template_data = {"type": SubtaskType.objects.get(value='observation'),
"name": "stations schema",
"description": 'first attempt at stations schema',
"version": '0.1',
"schema": json.loads('''
task_template_data = { "name": "stations schema",
"description": 'Generic station settings and selection',
"version": '0.1',
"schema": json.loads('''
{
"$id": "http://example.com/example.json",
"type": "object",
......@@ -821,8 +776,6 @@ def _populate_stations_schema():
}
}
}'''),
"realtime": True,
"queue": False,
"tags": []}
"tags": []}
SubtaskTemplate.objects.create(**subtask_template_data)
TaskTemplate.objects.create(**task_template_data)
from django.http import HttpResponse, JsonResponse
from django.shortcuts import get_object_or_404
from lofar.sas.tmss.tmss.tmssapp.models.scheduling import SubtaskTemplate, Subtask
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.common.json_utils import get_default_json_object_for_schema
from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset
def subtask_template_default_specification(request, subtask_template_pk:int):
subtask_template = get_object_or_404(SubtaskTemplate, pk=subtask_template_pk)
subtask_template = get_object_or_404(models.SubtaskTemplate, pk=subtask_template_pk)
spec = get_default_json_object_for_schema(subtask_template.schema)
return JsonResponse(spec)
def task_template_default_specification(request, task_template_pk:int):
task_template = get_object_or_404(models.TaskTemplate, pk=task_template_pk)
spec = get_default_json_object_for_schema(task_template.schema)
return JsonResponse(spec)
def subtask_parset(request, subtask_pk:int):
subtask = get_object_or_404(Subtask, pk=subtask_pk)
subtask = get_object_or_404(models.Subtask, pk=subtask_pk)
parset = convert_to_parset(subtask)
return HttpResponse(str(parset), content_type='text/plain')
......@@ -122,6 +122,7 @@ router.register(r'default_dataproduct_specifications_template', viewsets.Default
router.register(r'subtask_input_selection_template', viewsets.SubtaskInputSelectionTemplateViewSet)
router.register(r'dataproduct_feedback_template', viewsets.DataproductFeedbackTemplateViewSet)
urlpatterns.append(re_path(r'task_template/(?P<task_template_pk>\d+)/default_specification', views.task_template_default_specification))
urlpatterns.append(re_path(r'subtask_template/(?P<subtask_template_pk>\d+)/default_specification', views.subtask_template_default_specification))
urlpatterns.append(re_path(r'subtask/(?P<subtask_pk>\d+)/parset', views.subtask_parset))
......
......@@ -49,7 +49,9 @@ class ParsetAdapterTest(unittest.TestCase):
for dp in specifications_doc['stations']['digital_pointings']:
dp['subbands'] = list(range(8))
subtask_data = Subtask_test_data(subtask_template, specifications_doc)
subtask = models.Subtask.objects.create(**subtask_data)
subtask:models.Subtask = models.Subtask.objects.create(**subtask_data)
subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=subtask))
dataproduct:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=subtask_output))
parset = convert_to_parset(subtask)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment