Skip to content
Snippets Groups Projects
Commit a4459714 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-216: removed dataformat property when creating TaskRelations. Added...

TMSS-216: removed dataformat property when creating TaskRelations. Added TaskRelations to the cleanup convenience method, and added a big rationale explaining it's usage
parent 8fa9f021
No related branches found
No related tags found
1 merge request!409Resolve TMSS-261
from lofar.sas.tmss.tmss.exceptions import *
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.subtasks import unschedule_subtasks_in_task_blueprint
from lofar.sas.tmss.tmss.tmssapp.models.specification import TaskBlueprint, SchedulingUnitBlueprint
from lofar.sas.tmss.tmss.tmssapp.models.specification import TaskBlueprint, SchedulingUnitBlueprint, IOType, TaskTemplate, TaskType, TaskRelationSelectionTemplate
from lofar.sas.tmss.tmss.tmssapp.subtasks import create_and_schedule_subtasks_from_task_blueprint, create_subtasks_from_task_blueprint, schedule_independent_subtasks_in_task_blueprint, update_subtasks_start_times_for_scheduling_unit
from lofar.common.datetimeutils import round_to_minute_precision
from functools import cmp_to_key
......@@ -179,9 +179,16 @@ def create_task_drafts_from_scheduling_unit_draft(scheduling_unit_draft: models.
try:
producer_task_draft = scheduling_unit_draft.task_drafts.get(name=task_relation_definition["producer"])
consumer_task_draft = scheduling_unit_draft.task_drafts.get(name=task_relation_definition["consumer"])
dataformat = models.Dataformat.objects.get(value=task_relation_definition["dataformat"])
input_role = models.TaskConnectorType.objects.get(task_template=consumer_task_draft.specifications_template, role=task_relation_definition["input"]["role"], datatype=task_relation_definition["input"]["datatype"], iotype=models.IOType.objects.get(value=models.IOType.Choices.INPUT.value))
output_role = models.TaskConnectorType.objects.get(task_template=producer_task_draft.specifications_template, role=task_relation_definition["output"]["role"], datatype=task_relation_definition["output"]["datatype"], iotype=models.IOType.objects.get(value=models.IOType.Choices.OUTPUT.value))
input_role = models.TaskConnectorType.objects.get(task_template=consumer_task_draft.specifications_template,
role=task_relation_definition["input"]["role"],
datatype=task_relation_definition["input"]["datatype"],
dataformat=task_relation_definition["dataformat"],
iotype=models.IOType.Choices.INPUT.value)
output_role = models.TaskConnectorType.objects.get(task_template=producer_task_draft.specifications_template,
role=task_relation_definition["output"]["role"],
datatype=task_relation_definition["output"]["datatype"],
dataformat=task_relation_definition["dataformat"],
iotype=models.IOType.Choices.OUTPUT.value)
selection_template = models.TaskRelationSelectionTemplate.objects.get(name=task_relation_definition["selection_template"])
except Exception as e:
logger.error("Could not determine Task Relations for %s. Error: %s", task_relation_definition, e)
......@@ -191,7 +198,6 @@ def create_task_drafts_from_scheduling_unit_draft(scheduling_unit_draft: models.
with transaction.atomic():
task_relation = models.TaskRelationDraft.objects.create(tags=task_relation_definition.get("tags",[]),
selection_doc=task_relation_definition["selection_doc"],
dataformat=dataformat,
producer=producer_task_draft,
consumer=consumer_task_draft,
input_role=input_role,
......@@ -287,8 +293,7 @@ def create_task_blueprint_from_task_draft(task_draft: models.TaskDraft) -> model
producer=producing_task_blueprint,
consumer=consuming_task_blueprint,
selection_doc=task_relation_draft.selection_doc,
selection_template=task_relation_draft.selection_template,
dataformat=task_relation_draft.dataformat)
selection_template=task_relation_draft.selection_template)
logger.info("created task_relation_blueprint id=%s which connects task_blueprints producer_id=%s and consumer_id=%s",
task_relation_blueprint.pk, producing_task_blueprint.pk, consuming_task_blueprint.pk)
except IntegrityError as e:
......@@ -424,7 +429,23 @@ def unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint:
def create_cleanuptask_for_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint:
'''create a cleanuptask for the given scheduling_unit which will cleanup all output dataproducts from tasks in this scheduling_unit which aren't already cleaned up'''
# Rationale:
# adding a cleanup task(blueprint) to a scheduling_unit_blueprint adds a task to the graph (which breaks the immutable blueprint concept),
# but it does not modify observation/pipeline behaviour, hence we allow it.
# Regard this as a convenience function to allow users to simplify cleaning up after themselves if they forgot to specificy a cleanup task.
#
# Note: We do modify the graph (both in draft and blueprint),
# but we do NOT update the specifications_doc because that doc (blueprint) is immutable, and shows the user what was specified.
# The fact that the graph in the specifications_doc and in real instances are different (with an addded cleanup task) shows the users that cleanup
# was apparently forgotten at specification time, and added later, which is explainable.
#
# Maybe we want to split this function in the future into a "add cleanup to draft" and/or "add cleanup to blueprint"
# For now, we present it as a friendly convenience function to cleanup after yourself once the blueprint is already running / already ran with experimental scheduling units.
# In practice we will instantiate most scheduling units from properly defined observation_strategy_templates which include cleanup.
with transaction.atomic():
# create a cleanup task draft and blueprint....
cleanup_template = models.TaskTemplate.objects.get(name="cleanup")
cleanup_spec_doc = get_default_json_object_for_schema(cleanup_template.schema)
......@@ -445,20 +466,51 @@ def create_cleanuptask_for_scheduling_unit_blueprint(scheduling_unit_blueprint:
specifications_template=cleanup_task_draft.specifications_template,
output_pinned=False)
for task_blueprint in scheduling_unit_blueprint.task_blueprint.all():
logger.info(task_blueprint)
# dataformat = models.Dataformat.objects.get(value=task_relation_definition["dataformat"])
# input_role = models.TaskConnectorType.objects.get(task_template=consumer_task_draft.specifications_template, role=task_relation_definition["input"]["role"], datatype=task_relation_definition["input"]["datatype"], iotype=models.IOType.objects.get(value=models.IOType.Choices.INPUT.value))
# output_role = models.TaskConnectorType.objects.get(task_template=producer_task_draft.specifications_template, role=task_relation_definition["output"]["role"], datatype=task_relation_definition["output"]["datatype"], iotype=models.IOType.objects.get(value=models.IOType.Choices.OUTPUT.value))
# selection_template = models.TaskRelationSelectionTemplate.objects.get(name=task_relation_definition["selection_template"])
# task_relation = models.TaskRelationDraft.objects.create(selection_doc=task_relation_definition["selection_doc"],
# dataformat=dataformat,
# producer=task_draft,
# consumer=cleanup_task_draft,
# input_role=input_role,
# output_role=output_role,
# selection_template=selection_template)
logger.info("Created Cleanup Task id=%d for scheduling_unit id=%s, adding the outputs of all producing tasks in the scheduling unit to the cleanup...", cleanup_task_blueprint.id, scheduling_unit_blueprint.id)
# ... and connect the outputs of the producing tasks to the cleanup, so the cleanup task knows what to remove.
selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
selection_doc = get_default_json_object_for_schema(selection_template.schema)
for producer_task_blueprint in scheduling_unit_blueprint.task_blueprints.exclude(specifications_template__type=TaskType.Choices.CLEANUP).exclude(specifications_template__type=TaskType.Choices.INGEST).all():
for connector_type in producer_task_blueprint.specifications_template.output_connector_types.filter(iotype__value=IOType.Choices.OUTPUT.value).all():
# define what the producer_task_blueprint is producing
output_role = models.TaskConnectorType.objects.get(task_template=producer_task_blueprint.specifications_template,
role=connector_type.role,
datatype=connector_type.datatype,
iotype=models.IOType.objects.get(value=models.IOType.Choices.OUTPUT.value))
# define what the cleanup task accepts/consumes
input_role = models.TaskConnectorType.objects.filter(dataformat=connector_type.dataformat).get(task_template=cleanup_task_draft.specifications_template,
role=models.Role.objects.get(value=models.Role.Choices.ANY.value),
datatype=connector_type.datatype,
iotype=models.IOType.objects.get(value=models.IOType.Choices.INPUT.value))
# connect the two (in draft and blueprint)
task_relation_draft = models.TaskRelationDraft.objects.create(producer=producer_task_blueprint.draft,
consumer=cleanup_task_draft,
input_role=input_role,
output_role=output_role,
selection_doc=selection_doc,
selection_template=selection_template)
logger.info("created task_relation id=%s between task draft id=%s name='%s' and id=%s name='%s",
task_relation_draft.pk, task_relation_draft.producer.id, task_relation_draft.producer.name, task_relation_draft.consumer.id, task_relation_draft.consumer.name)
task_relation_blueprint = models.TaskRelationBlueprint.objects.create(draft=task_relation_draft,
producer=producer_task_blueprint,
consumer=cleanup_task_blueprint,
input_role=input_role,
output_role=output_role,
selection_doc=selection_doc,
selection_template=selection_template)
logger.info("created task_relation id=%s between task blueprint id=%s name='%s' and id=%s name='%s",
task_relation_blueprint.pk, task_relation_blueprint.producer.id, task_relation_blueprint.producer.name, task_relation_blueprint.consumer.id, task_relation_blueprint.consumer.name)
# and finally also create the executable subtask for the cleanup_task_blueprint, so it can actually run.
create_subtasks_from_task_blueprint(cleanup_task_blueprint)
create_subtasks_from_task_blueprint(cleanup_task_blueprint)
\ No newline at end of file
# return the modified scheduling_unit
scheduling_unit_blueprint.refresh_from_db()
return scheduling_unit_blueprint
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment