Skip to content
Snippets Groups Projects
Commit 3ca250c6 authored by goei's avatar goei
Browse files

TMSS-316 Create view to derive schedulingblueprint status from taskblueprint...

TMSS-316 Create view to derive schedulingblueprint status from taskblueprint status as described in design
parent a62493ce
No related branches found
No related tags found
1 merge request!225TMSS-334 Create view to derive taskblueprint status from subtask status as described in design
......@@ -16,11 +16,16 @@ class Migration(migrations.Migration):
# Start SubTask id with 2 000 000 to avoid overlap with 'old' (test/production) OTDB
operations = [ migrations.RunSQL('ALTER SEQUENCE tmssapp_SubTask_id_seq RESTART WITH 2000000;'),
migrations.RunSQL('DROP VIEW IF EXISTS tmssapp_taskblueprintsummary; '
'CREATE OR REPLACE VIEW tmssapp_taskblueprintsummary AS '
'SELECT tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_subtask.id AS subtask_id, tmssapp_subtask.state_id AS substate, tmssapp_subtasktemplate.type_id AS subtask_type'
' FROM tmssapp_subtask LEFT JOIN tmssapp_taskblueprint ON tmssapp_taskblueprint.id = tmssapp_subtask.task_blueprint_id'
' LEFT JOIN tmssapp_subtasktemplate ON tmssapp_subtasktemplate.id = tmssapp_subtask.specifications_template_id;'),
migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_taskblueprintsummary; "
"CREATE OR REPLACE VIEW tmssapp_taskblueprintsummary AS "
"SELECT tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_subtask.id AS subtask_id, tmssapp_subtask.state_id AS substate, tmssapp_subtasktemplate.type_id AS subtask_type"
" FROM tmssapp_subtask LEFT JOIN tmssapp_taskblueprint ON tmssapp_taskblueprint.id = tmssapp_subtask.task_blueprint_id"
" LEFT JOIN tmssapp_subtasktemplate ON tmssapp_subtasktemplate.id = tmssapp_subtask.specifications_template_id;"),
migrations.RunSQL("DROP VIEW IF EXISTS tmssapp_schedulingunitblueprintsummary; "
"CREATE OR REPLACE VIEW tmssapp_schedulingunitblueprintsummary AS "
"SELECT tmssapp_schedulingunitblueprint.id AS sub_id, tmssapp_taskblueprint.id AS taskblueprint_id, tmssapp_tasktemplate.type_id AS task_type, 'unknown' AS derived_task_status"
" FROM tmssapp_taskblueprint LEFT JOIN tmssapp_schedulingunitblueprint ON tmssapp_schedulingunitblueprint.id = tmssapp_taskblueprint.scheduling_unit_blueprint_id"
" LEFT JOIN tmssapp_tasktemplate ON tmssapp_tasktemplate.id = tmssapp_taskblueprint.specifications_template_id;"),
migrations.RunPython(populate_choices),
migrations.RunPython(populate_settings),
migrations.RunPython(populate_misc),
......
......@@ -18,6 +18,9 @@ import datetime
import json
import jsonschema
from django.urls import reverse as revese_url
from collections import Counter
from django.db import connection
#
# Common
......@@ -378,6 +381,17 @@ class TaskBlueprintSummary(Model):
managed = False
db_table = 'tmssapp_taskblueprintsummary'
class SchedulingUnitBlueprintSummary(Model):
sub_id = IntegerField()
taskblueprint_id = IntegerField()
task_type = CharField(max_length=128)
derived_task_status = CharField(max_length=128)
class Meta:
managed = False
db_table = 'tmssapp_schedulingunitblueprintsummary'
#
# Instance Objects
#
......@@ -614,6 +628,105 @@ class SchedulingUnitBlueprint(NamedCommon):
else:
return None
@property
def status(self):
"""
Return the schedulingunit blueprint status which is derived from the taskblueprint status (which is derived
from the subtasks states)
See https://support.astron.nl/confluence/display/TMSS/Specification+Flow#SpecificationFlow-TaskBlueprints
The status is displayed as extra field in rest api of the schedulingunit
"""
logger.debug("Determine status of SUB with id %d" % self.id)
total_nbr_tasks = self.task_blueprints.all().count()
total_nbr_observation_tasks = SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id,
task_type='observation').count()
total_nbr_processing_tasks = SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id,
task_type='pipeline').count()
total_nbr_ingest_tasks = SchedulingUnitBlueprintSummary.objects.filter(sub_id=self.id,
task_type='ingest').count()
logger.debug("total_nbr_observation_tasks=%d, total_nbr_processing_tasks=%d, total_nbr_ingest_tasks=%d"
% (total_nbr_observation_tasks, total_nbr_processing_tasks, total_nbr_ingest_tasks) )
# Next code is implemented in raw SQL because retrieving objects like SchedulingUnitBlueprintSummary.objects.filter(...)
# crashed due to exceptions as "column tmssapp_schedulingunitblueprintsummary.id does not exist"
# Note that just doing a ....filter(...).count() does NOT give this exception
taskblueprints_per_type_dict = {"observation": [], "pipeline": [], "ingest": []}
for key in taskblueprints_per_type_dict:
with connection.cursor() as cursor:
cursor.execute("SELECT taskblueprint_id FROM tmssapp_schedulingunitblueprintsummary WHERE sub_id = '%s' AND task_type = %s", [self.id, key])
sql_result = cursor.fetchall()
if sql_result is not None:
taskblueprints_per_type_dict[key].extend([col[0] for col in sql_result])
logger.debug("TaskBlueprint ids per type", taskblueprints_per_type_dict)
# Determine status per task_type (unfortunately did not manage with updatable view)
status_overview = Counter()
status_overview_per_type = {"observation": Counter(), "pipeline": Counter(), "ingest": Counter() }
for tb in TaskBlueprint.objects.filter(scheduling_unit_blueprint_id=self.id):
status_overview[(tb.status)]+=1
for key in taskblueprints_per_type_dict:
if tb.id in taskblueprints_per_type_dict[key]:
status_overview_per_type[key][(tb.status)] += 1
logger.debug("Status overview", status_overview)
logger.debug("Status per overview per task type", status_overview_per_type)
def _task_graph_instantiated():
return self.task_blueprints.all().count() > 0
def _all_task_finished():
return status_overview["finished"] == total_nbr_tasks
def _any_task_cancelled():
return status_overview["cancelled"] > 0
def _any_task_error():
return status_overview["error"] > 0
def _any_task_started_observed_finished():
return (status_overview["started"] + status_overview["observed"] + status_overview["finished"]) > 0
def _any_task_scheduled():
return status_overview["scheduled"] > 0
def _all_observation_task_observed_finished():
return (status_overview_per_type["observation"]["observed"] +
status_overview_per_type["observation"]["finished"]) == total_nbr_tasks
def _any_processing_task_started():
return status_overview_per_type["pipeline"]["started"] > 0
def _all_processing_tasks_finished():
return status_overview_per_type["pipeline"]["finished"] == total_nbr_processing_tasks
def _any_ingest_task_started():
return status_overview_per_type["ingest"]["started"] > 0
# The actual determination of the SchedulingunitBlueprint status
if not _task_graph_instantiated():
status = "defined"
elif _all_task_finished():
status = "finished"
elif _any_task_cancelled():
status = "cancelled"
elif _any_task_error():
status = "error"
elif _any_task_started_observed_finished():
if not _all_observation_task_observed_finished():
status = "observing"
elif not _any_processing_task_started():
status = "observed"
elif not _all_processing_tasks_finished():
status = "processing"
elif not _any_ingest_task_started():
status = "processed"
else:
status = "ingesting"
elif _any_task_scheduled():
status = "scheduled"
else:
status = "schedulable"
return status
class TaskDraft(NamedCommon):
specifications_doc = JSONField(help_text='Specifications for this task.')
......
......@@ -301,7 +301,7 @@ class SchedulingUnitBlueprintSerializer(RelationalHyperlinkedModelSerializer):
class Meta:
model = models.SchedulingUnitBlueprint
fields = '__all__'
extra_fields = ['task_blueprints', 'duration', 'start_time', 'stop_time']
extra_fields = ['task_blueprints', 'duration', 'start_time', 'stop_time', 'status']
class SchedulingUnitBlueprintCopyToSchedulingUnitDraftSerializer(SchedulingUnitBlueprintSerializer):
class Meta(SchedulingUnitDraftSerializer.Meta):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment