Skip to content
Snippets Groups Projects
Commit a9e90a22 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-190: merged master into this feature branch

parents d2811d4f db26428f
No related branches found
No related tags found
1 merge request!252Resolve TMSS-190
Showing
with 652 additions and 66 deletions
File deleted
......@@ -61,4 +61,4 @@ for port in possible_ports:
DEFAULT_BROKER, port, DEFAULT_USER, e)
# default exchange to use for publishing messages
DEFAULT_BUSNAME = os.environ.get('LOFAR_DEFAULT_BUSNAME', adaptNameToEnvironment("lofar"))
DEFAULT_BUSNAME = adaptNameToEnvironment(os.environ.get('LOFAR_DEFAULT_EXCHANGE', 'lofar'))
......@@ -75,6 +75,16 @@ class Migration(migrations.Migration):
# Start SubTask id with 2 000 000 to avoid overlap with 'old' (test/production) OTDB
operations = [ migrations.RunSQL('ALTER SEQUENCE tmssapp_SubTask_id_seq RESTART WITH 2000000;'),
migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_taskblueprintsummary; "
"CREATE OR REPLACE VIEW tmssapp_taskblueprintsummary AS "
"SELECT tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_subtask.id AS subtask_id, tmssapp_subtask.state_id AS substate, tmssapp_subtasktemplate.type_id AS subtask_type"
" FROM tmssapp_subtask LEFT JOIN tmssapp_taskblueprint ON tmssapp_taskblueprint.id = tmssapp_subtask.task_blueprint_id"
" LEFT JOIN tmssapp_subtasktemplate ON tmssapp_subtasktemplate.id = tmssapp_subtask.specifications_template_id;"),
migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_schedulingunitblueprintsummary; "
"CREATE OR REPLACE VIEW tmssapp_schedulingunitblueprintsummary AS "
"SELECT row_number() OVER () AS id, tmssapp_schedulingunitblueprint.id AS sub_id, tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_tasktemplate.type_id AS task_type, 'unknown' AS derived_task_status"
" FROM tmssapp_taskblueprint LEFT JOIN tmssapp_schedulingunitblueprint ON tmssapp_schedulingunitblueprint.id = tmssapp_taskblueprint.scheduling_unit_blueprint_id"
" LEFT JOIN tmssapp_tasktemplate ON tmssapp_tasktemplate.id = tmssapp_taskblueprint.specifications_template_id;"),
migrations.RunPython(populate_choices),
migrations.RunPython(populate_settings),
migrations.RunPython(populate_misc),
......
......@@ -16,6 +16,16 @@ class Migration(migrations.Migration):
# Start SubTask id with 2 000 000 to avoid overlap with 'old' (test/production) OTDB
operations = [ migrations.RunSQL('ALTER SEQUENCE tmssapp_SubTask_id_seq RESTART WITH 2000000;'),
migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_taskblueprintsummary; "
"CREATE OR REPLACE VIEW tmssapp_taskblueprintsummary AS "
"SELECT tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_subtask.id AS subtask_id, tmssapp_subtask.state_id AS substate, tmssapp_subtasktemplate.type_id AS subtask_type"
" FROM tmssapp_subtask LEFT JOIN tmssapp_taskblueprint ON tmssapp_taskblueprint.id = tmssapp_subtask.task_blueprint_id"
" LEFT JOIN tmssapp_subtasktemplate ON tmssapp_subtasktemplate.id = tmssapp_subtask.specifications_template_id;"),
migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_schedulingunitblueprintsummary; "
"CREATE OR REPLACE VIEW tmssapp_schedulingunitblueprintsummary AS "
"SELECT row_number() OVER () AS id, tmssapp_schedulingunitblueprint.id AS sub_id, tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_tasktemplate.type_id AS task_type, 'unknown' AS derived_task_status"
" FROM tmssapp_taskblueprint LEFT JOIN tmssapp_schedulingunitblueprint ON tmssapp_schedulingunitblueprint.id = tmssapp_taskblueprint.scheduling_unit_blueprint_id"
" LEFT JOIN tmssapp_tasktemplate ON tmssapp_tasktemplate.id = tmssapp_taskblueprint.specifications_template_id;"),
migrations.RunPython(populate_choices),
migrations.RunPython(populate_settings),
migrations.RunPython(populate_misc),
......
......@@ -20,6 +20,8 @@ import datetime
import json
import jsonschema
from django.urls import reverse as revese_url
from collections import Counter
from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.messaging.messages import EventMessage
from lofar.common.util import single_line_with_single_spaces
......@@ -374,11 +376,40 @@ class DefaultTaskTemplate(BasicCommon):
class TaskRelationSelectionTemplate(Template):
pass
class DefaultTaskRelationSelectionTemplate(BasicCommon):
name = CharField(max_length=128, unique=True)
template = ForeignKey("TaskRelationSelectionTemplate", on_delete=PROTECT)
#
# DatabaseView objects
#
class TaskBlueprintSummary(Model):
taskblueprint_id = IntegerField()
subtask_id = IntegerField()
substate = CharField(max_length=128)
subtask_type = CharField(max_length=128)
class Meta:
managed = False
db_table = 'tmssapp_taskblueprintsummary'
class SchedulingUnitBlueprintSummary(Model):
# Using in an id and ForeignKey is not common for a view BUT the id is a 'dummy' to be able to use in Django
# https://resources.rescale.com/using-database-views-in-django-orm/
# otherwise an exception will be thrown
id = IntegerField(primary_key=True)
sub_id = IntegerField()
taskblueprint_id = IntegerField()
task_type = CharField(max_length=128)
derived_task_status = CharField(max_length=128)
class Meta:
managed = False
db_table = 'tmssapp_schedulingunitblueprintsummary'
#
# Instance Objects
#
......@@ -634,6 +665,106 @@ class SchedulingUnitBlueprint(NamedCommon):
else:
return None
@property
def status(self):
"""
Return the schedulingunit blueprint status which is derived from the taskblueprint status (which is derived
from the subtasks states)
See https://support.astron.nl/confluence/display/TMSS/Specification+Flow#SpecificationFlow-TaskBlueprints
The status is displayed as extra field in rest api of the schedulingunit
"""
logger.debug("Status of SUB with id %d" % self.id)
logger.debug("total_nbr_observation_tasks=%d, total_nbr_processing_tasks=%d, total_nbr_ingest_tasks=%d"
% (self._get_total_nbr_observation_tasks(), self._get_total_nbr_processing_tasks(), self._get_total_nbr_observation_tasks()))
# Get the the taskblueprint_ids per task_type
taskblueprints_per_type_dict = {"observation": [], "pipeline": [], "ingest": []}
for task_type in taskblueprints_per_type_dict:
queryset = SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id, task_type=task_type)
taskblueprints_per_type_dict[task_type].extend([item.taskblueprint_id for item in queryset])
# Determine status per task_type (unfortunately did not manage with updatable view)
status_overview_counter = Counter()
status_overview_counter_per_type = {"observation": Counter(), "pipeline": Counter(), "ingest": Counter() }
for tb in TaskBlueprint.objects.filter(scheduling_unit_blueprint_id=self.id):
status_overview_counter[(tb.status)]+=1
for task_type in taskblueprints_per_type_dict:
if tb.id in taskblueprints_per_type_dict[task_type]:
status_overview_counter_per_type[task_type][(tb.status)] += 1
# The actual determination of the SchedulingunitBlueprint status
if not self._task_graph_instantiated():
status = "defined"
elif self._all_task_finished(status_overview_counter):
status = "finished"
elif self._any_task_cancelled(status_overview_counter):
status = "cancelled"
elif self._any_task_error(status_overview_counter):
status = "error"
elif self._any_task_started_observed_finished(status_overview_counter):
if not self._all_observation_task_observed_finished(status_overview_counter_per_type):
status = "observing"
elif not self._any_processing_task_started_or_finished(status_overview_counter_per_type):
status = "observed"
elif not self._all_processing_tasks_and_observation_tasks_finished(status_overview_counter_per_type):
status = "processing"
elif not self._any_ingest_task_started(status_overview_counter_per_type):
status = "processed"
else:
status = "ingesting"
elif self._any_task_scheduled(status_overview_counter):
status = "scheduled"
else:
status = "schedulable"
return status
def _task_graph_instantiated(self):
return self._get_total_nbr_tasks() > 0
def _all_task_finished(self, status_overview_counter):
return status_overview_counter["finished"] == self._get_total_nbr_tasks()
def _any_task_cancelled(self, status_overview_counter):
return status_overview_counter["cancelled"] > 0
def _any_task_error(self, status_overview_counter):
return status_overview_counter["error"] > 0
def _any_task_started_observed_finished(self, status_overview_counter):
return (status_overview_counter["started"] + status_overview_counter["observed"] + status_overview_counter["finished"]) > 0
def _any_task_scheduled(self, status_overview_counter):
return status_overview_counter["scheduled"] > 0
def _all_observation_task_observed_finished(self, status_overview_counter_per_type):
total_nbr_observation_tasks = self._get_total_nbr_observation_tasks()
return (status_overview_counter_per_type["observation"]["observed"] +
status_overview_counter_per_type["observation"]["finished"]) == total_nbr_observation_tasks
def _any_processing_task_started_or_finished(self, status_overview_counter_per_type):
return status_overview_counter_per_type["pipeline"]["started"] + status_overview_counter_per_type["pipeline"]["finished"] > 0
def _all_processing_tasks_and_observation_tasks_finished(self, status_overview_counter_per_type):
total_nbr_observation_tasks = self._get_total_nbr_observation_tasks()
total_nbr_processing_tasks = self._get_total_nbr_processing_tasks()
return (status_overview_counter_per_type["pipeline"]["finished"] == total_nbr_processing_tasks and
status_overview_counter_per_type["observation"]["finished"] == total_nbr_observation_tasks)
def _any_ingest_task_started(self, status_overview_counter_per_type):
return status_overview_counter_per_type["ingest"]["started"] > 0
def _get_total_nbr_tasks(self):
return self.task_blueprints.all().count()
def _get_total_nbr_observation_tasks(self):
return SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id, task_type='observation').count()
def _get_total_nbr_processing_tasks(self):
return SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id, task_type='pipeline').count()
def _get_total_nbr_ingest_tasks(self):
return SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id, task_type='ingest').count()
class TaskDraft(NamedCommon):
specifications_doc = JSONField(help_text='Specifications for this task.')
......@@ -845,6 +976,58 @@ class TaskBlueprint(NamedCommon):
else:
return None
@property
def status(self):
"""
Return the taskblueprint status which is derived from the subtasks status
See https://support.astron.nl/confluence/display/TMSS/Specification+Flow#SpecificationFlow-TaskBlueprints
The status is displayed as extra field in rest api of the taskblueprint
"""
if self._subtask_graph_not_instantiated():
status = "defined"
elif self._all_subtask_finished():
status = "finished"
elif self._any_subtask_cancelled():
status = "cancelled"
elif self._any_subtask_error():
status = "error"
elif self._all_observation_subtasks_finishing_finished():
status = "observed"
elif self._any_subtask_between_started_finished():
status = "started"
elif self._any_subtask_scheduled():
status = "scheduled"
else:
status = "schedulable"
return status
def _subtask_graph_not_instantiated(self):
total_nbr_subtasks = self.subtasks.all().count()
return (total_nbr_subtasks == 0 or
TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate='defining').count() > 0)
def _all_subtask_finished(self):
total_nbr_subtasks = self.subtasks.all().count()
return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate='finished').count() == total_nbr_subtasks)
def _any_subtask_cancelled(self):
return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate__in=('cancelling', 'cancelled')).count() > 0)
def _any_subtask_error(self):
return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate='error').count() > 0)
def _all_observation_subtasks_finishing_finished(self):
total_nbr_observation_subtasks = TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id,
subtask_type='observation').count()
return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate__in=('finishing','finished'), subtask_type='observation').count()
== total_nbr_observation_subtasks and total_nbr_observation_subtasks > 0)
def _any_subtask_between_started_finished(self):
return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate__in=('starting','started','queueing','queued','finishing','finished')).count() > 0)
def _any_subtask_scheduled(self):
return (TaskBlueprintSummary.objects.filter(taskblueprint_id=self.id, substate='scheduled').count() > 0)
class TaskRelationDraft(BasicCommon):
selection_doc = JSONField(help_text='Filter for selecting dataproducts from the output role.')
......
......@@ -69,7 +69,7 @@ def populate_test_data():
logger.info('created test scheduling_set: %s', scheduling_set.name)
for unit_nr in range(2 if 'normal' in project_prio_name else 1):
strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 observation strategy template")
strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines")
# the 'template' in the strategy_template is a predefined json-data blob which validates against the given scheduling_unit_template
# a user might 'upload' a partial json-data blob, so add all the known defaults
......
......@@ -65,18 +65,18 @@
"stations": ["CS001"],
"tile_beam": {
"direction_type": "J2000",
"angle1": 42,
"angle2": 42,
"angle3": 42
"angle1": 0.42,
"angle2": 0.43,
"angle3": 0.44
},
"SAPs": [
{
"name": "target0",
"digital_pointing": {
"direction_type": "J2000",
"angle1": 24,
"angle2": 24,
"angle3": 24
"angle1": 0.24,
"angle2": 0.25,
"angle3": 0.26
},
"subbands": [
349,
......@@ -87,9 +87,9 @@
"name": "target1",
"digital_pointing": {
"direction_type": "J2000",
"angle1": 24,
"angle2": 24,
"angle3": 24
"angle1": 0.27,
"angle2": 0.28,
"angle3": 0.29
},
"subbands": [
349,
......
......@@ -85,16 +85,11 @@
"title":"dynamic station set",
"type":"object",
"default":{},
"additionalItems":false,
"items":{
"type":"object",
"title":"Station set",
"headerTemplate":"{{ self.group }}",
"additionalProperties":false,
"properties":{
"group":{
"type":"string",
"title":"Group/station",
"title":"StationGroup",
"description":"Which (group of) station(s) to select from",
"default":"ALL",
"enum":[
......@@ -118,7 +113,6 @@
"group",
"min_stations"
]
}
},
"stations": {
"title":"stations",
......
......@@ -7,6 +7,7 @@
"type": "object",
"properties": {
"duration": {
"$id": "#duration",
"type": "number",
"title": "Duration (seconds)",
"description": "Duration of this observation",
......
......@@ -46,6 +46,7 @@
"default": ""
},
"digital_pointing": {
"$id": "#target_pointing",
"title": "Digital pointing",
"default": {},
"$ref": "http://tmss.lofar.org/api/schemas/commonschematemplate/pointing/1/#/definitions/pointing"
......@@ -70,6 +71,7 @@
}
},
"duration": {
"$id": "#duration",
"type": "number",
"title": "Duration (seconds)",
"description": "Duration of this observation",
......
......@@ -288,7 +288,7 @@ class SchedulingSetSerializer(RelationalHyperlinkedModelSerializer):
class SchedulingUnitDraftSerializer(RelationalHyperlinkedModelSerializer):
requirements_doc = JSONEditorField(schema_source="requirements_template.schema")
duration = FloatDurationField(required=False)
duration = FloatDurationField(read_only=True)
class Meta:
model = models.SchedulingUnitDraft
......@@ -308,12 +308,12 @@ class SchedulingUnitDraftCopyFromSchedulingSetSerializer(SchedulingUnitDraftSeri
class SchedulingUnitBlueprintSerializer(RelationalHyperlinkedModelSerializer):
requirements_doc = JSONEditorField(schema_source="requirements_template.schema")
duration = FloatDurationField(required=False)
duration = FloatDurationField(read_only=True)
class Meta:
model = models.SchedulingUnitBlueprint
fields = '__all__'
extra_fields = ['task_blueprints', 'duration', 'start_time', 'stop_time']
extra_fields = ['task_blueprints', 'duration', 'start_time', 'stop_time', 'status']
class SchedulingUnitBlueprintCopyToSchedulingUnitDraftSerializer(SchedulingUnitBlueprintSerializer):
class Meta(SchedulingUnitDraftSerializer.Meta):
......@@ -325,9 +325,9 @@ class SchedulingUnitBlueprintCopyToSchedulingUnitDraftSerializer(SchedulingUnitB
class TaskDraftSerializer(RelationalHyperlinkedModelSerializer):
duration = FloatDurationField(required=False)
relative_start_time = FloatDurationField(required=False)
relative_stop_time = FloatDurationField(required=False)
duration = FloatDurationField(read_only=True)
relative_start_time = FloatDurationField(read_only=True)
relative_stop_time = FloatDurationField(read_only=True)
specifications_doc = JSONEditorField(schema_source='specifications_template.schema')
class Meta:
......@@ -338,15 +338,16 @@ class TaskDraftSerializer(RelationalHyperlinkedModelSerializer):
class TaskBlueprintSerializer(RelationalHyperlinkedModelSerializer):
duration = FloatDurationField(required=False)
relative_start_time = FloatDurationField(required=False)
relative_stop_time = FloatDurationField(required=False)
duration = FloatDurationField(read_only=True)
relative_start_time = FloatDurationField(read_only=True)
relative_stop_time = FloatDurationField(read_only=True)
specifications_doc = JSONEditorField(schema_source='specifications_template.schema')
class Meta:
model = models.TaskBlueprint
fields = '__all__'
extra_fields = ['subtasks', 'produced_by', 'consumed_by', 'first_scheduling_relation', 'second_scheduling_relation', 'duration', 'start_time', 'stop_time', 'relative_start_time', 'relative_stop_time']
extra_fields = ['subtasks', 'produced_by', 'consumed_by', 'first_scheduling_relation', 'second_scheduling_relation', 'duration',
'start_time', 'stop_time', 'relative_start_time', 'relative_stop_time', 'status']
class TaskRelationDraftSerializer(RelationalHyperlinkedModelSerializer):
......
......@@ -115,6 +115,8 @@ def create_scheduling_relation_task_blueprint_for_testing(first_task_blueprint,
placement=models.SchedulingRelationPlacement.objects.get(value='before'),
time_offset=60)
return task_scheduling_rel_obj
class SubTasksCreationFromSubTask(unittest.TestCase):
def test_create_qafile_subtask_from_observation_subtask_failed(self):
......@@ -255,15 +257,15 @@ class SubTasksCreationFromTaskBluePrintCalibrator(unittest.TestCase):
create_observation_control_subtask_from_task_blueprint(cal_task_blueprint)
cal_task_blueprint.specifications_doc['autoselect'] = False
cal_task_blueprint.specifications_doc['pointing']['angle1'] = 11.11
cal_task_blueprint.specifications_doc['pointing']['angle2'] = 22.22
cal_task_blueprint.specifications_doc['pointing']['angle1'] = 1.111
cal_task_blueprint.specifications_doc['pointing']['angle2'] = 2.222
subtask = create_observation_control_subtask_from_task_blueprint(cal_task_blueprint)
self.assertEqual("defined", str(subtask.state))
self.assertEqual("observation control", str(subtask.specifications_template.name))
self.assertEqual("observation", str(subtask.specifications_template.type))
self.assertEqual('J2000', subtask.specifications_doc['stations']['analog_pointing']['direction_type'])
self.assertEqual(11.11, subtask.specifications_doc['stations']['analog_pointing']['angle1'])
self.assertEqual(22.22, subtask.specifications_doc['stations']['analog_pointing']['angle2'])
self.assertEqual(1.111, subtask.specifications_doc['stations']['analog_pointing']['angle1'])
self.assertEqual(2.222, subtask.specifications_doc['stations']['analog_pointing']['angle2'])
class SubtaskInputSelectionFilteringTest(unittest.TestCase):
......
This diff is collapsed.
......@@ -69,16 +69,16 @@
"stations": ["CS001","CS002","CS003"],
"tile_beam": {
"direction_type": "J2000",
"angle1": 42,
"angle2": 42
"angle1": 0.42,
"angle2": 0.43
},
"SAPs": [
{
"name": "target0",
"digital_pointing": {
"direction_type": "J2000",
"angle1": 24,
"angle2": 24
"angle1": 0.24,
"angle2": 0.25
},
"subbands": [
349,
......
......@@ -84,11 +84,14 @@ def SchedulingUnitObservingStrategyTemplate_test_data(name="my_SchedulingUnitObs
"scheduling_unit_template": scheduling_unit_template,
"tags": ["TMSS", "TESTING"]}
def TaskTemplate_test_data(name="my TaskTemplate", description:str=None, schema:dict=None) -> dict:
def TaskTemplate_test_data(name="my TaskTemplate", description:str=None, schema:dict=None, task_type_value:str=None) -> dict:
if schema is None:
schema = minimal_json_schema(properties={"mykey": {}})
return {"type": models.TaskType.objects.get(value='observation'),
if task_type_value is None:
task_type_value = 'observation'
return {"type": models.TaskType.objects.get(value=task_type_value),
"validation_code_js":"",
"name": name,
"description": description or "<no description>",
......
......@@ -298,8 +298,8 @@ class TMSSRESTTestDataCreator():
'task_blueprints': [],
'produced_by': [],
'consumed_by': [],
'first_to_connect': [],
'second_to_connect': []}
'first_scheduling_relation': [],
'second_scheduling_relation': []}
def TaskRelationDraft(self, producer_url=None, consumer_url=None, template_url=None, input_role_url=None, output_role_url=None, selection_doc=None):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment