From 4793f907e07e37e1b252f5eef61e0dc1d112da5f Mon Sep 17 00:00:00 2001
From: Jorrit Schaap <schaap@astron.nl>
Date: Thu, 26 Nov 2020 17:12:23 +0100
Subject: [PATCH] TMSS-320: changed SIPidentifier to make it lighter in storage
 and (de)serialization. Changed ForeignKey into OneToOneField, ensuring each
 SIPIdentifier is used only once. Create new SIPidentifier instances
 where/when needed by default. Adated scheduling of observation/pipeline
 subtasks for bulk creation and usage of global_identifiers, and added a
 schedule_ingest_subtask method.

---
 .../tmss/tmssapp/migrations/0001_initial.py   |  18 +---
 .../src/tmss/tmssapp/models/scheduling.py     |  42 ++++----
 .../tmss/tmssapp/serializers/scheduling.py    |   5 +-
 SAS/TMSS/src/tmss/tmssapp/subtasks.py         | 102 +++++++++++++++++-
 4 files changed, 126 insertions(+), 41 deletions(-)

diff --git a/SAS/TMSS/src/tmss/tmssapp/migrations/0001_initial.py b/SAS/TMSS/src/tmss/tmssapp/migrations/0001_initial.py
index f375f739ae5..77f0c63dc26 100644
--- a/SAS/TMSS/src/tmss/tmssapp/migrations/0001_initial.py
+++ b/SAS/TMSS/src/tmss/tmssapp/migrations/0001_initial.py
@@ -1,4 +1,4 @@
-# Generated by Django 3.0.9 on 2020-11-17 13:44
+# Generated by Django 3.0.9 on 2020-11-26 15:29
 
 from django.conf import settings
 import django.contrib.postgres.fields
@@ -563,15 +563,9 @@ class Migration(migrations.Migration):
         migrations.CreateModel(
             name='SIPidentifier',
             fields=[
-                ('tags', django.contrib.postgres.fields.ArrayField(base_field=models.CharField(max_length=128), blank=True, default=list, help_text='User-defined search keywords for object.', size=8)),
-                ('created_at', models.DateTimeField(auto_now_add=True, help_text='Moment of object creation.')),
-                ('updated_at', models.DateTimeField(auto_now=True, help_text='Moment of last object update.')),
                 ('source', models.CharField(help_text='Source name', max_length=128)),
                 ('unique_identifier', models.BigAutoField(help_text='Unique global identifier.', primary_key=True, serialize=False)),
             ],
-            options={
-                'abstract': False,
-            },
         ),
         migrations.CreateModel(
             name='StationType',
@@ -1035,7 +1029,7 @@ class Migration(migrations.Migration):
         migrations.AddField(
             model_name='subtask',
             name='global_identifier',
-            field=models.ForeignKey(editable=False, help_text='The global unique identifier for LTA SIP.', null=True, on_delete=django.db.models.deletion.PROTECT, to='tmssapp.SIPidentifier'),
+            field=models.OneToOneField(editable=False, help_text='The global unique identifier for LTA SIP.', on_delete=django.db.models.deletion.PROTECT, to='tmssapp.SIPidentifier'),
         ),
         migrations.AddField(
             model_name='subtask',
@@ -1052,10 +1046,6 @@ class Migration(migrations.Migration):
             name='task_blueprint',
             field=models.ForeignKey(help_text='Task Blueprint to which this Subtask belongs.', null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='subtasks', to='tmssapp.TaskBlueprint'),
         ),
-        migrations.AddIndex(
-            model_name='sipidentifier',
-            index=django.contrib.postgres.indexes.GinIndex(fields=['tags'], name='tmssapp_sip_tags_bbce92_gin'),
-        ),
         migrations.AddConstraint(
             model_name='schedulingunittemplate',
             constraint=models.UniqueConstraint(fields=('name', 'version'), name='schedulingunittemplate_unique_name_version'),
@@ -1131,7 +1121,7 @@ class Migration(migrations.Migration):
         migrations.AddField(
             model_name='sap',
             name='global_identifier',
-            field=models.ForeignKey(editable=False, help_text='The global unique identifier for LTA SIP.', null=True, on_delete=django.db.models.deletion.PROTECT, to='tmssapp.SIPidentifier'),
+            field=models.OneToOneField(editable=False, help_text='The global unique identifier for LTA SIP.', on_delete=django.db.models.deletion.PROTECT, to='tmssapp.SIPidentifier'),
         ),
         migrations.AddField(
             model_name='sap',
@@ -1268,7 +1258,7 @@ class Migration(migrations.Migration):
         migrations.AddField(
             model_name='dataproduct',
             name='global_identifier',
-            field=models.ForeignKey(editable=False, help_text='The global unique identifier for LTA SIP.', null=True, on_delete=django.db.models.deletion.PROTECT, to='tmssapp.SIPidentifier'),
+            field=models.OneToOneField(editable=False, help_text='The global unique identifier for LTA SIP.', on_delete=django.db.models.deletion.PROTECT, to='tmssapp.SIPidentifier'),
         ),
         migrations.AddField(
             model_name='dataproduct',
diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py
index ee6b7d2e752..c59d3080844 100644
--- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py
+++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py
@@ -8,7 +8,7 @@ logger = logging.getLogger(__name__)
 
 from datetime import datetime, timedelta
 
-from django.db.models import ForeignKey, CharField, DateTimeField, BooleanField, IntegerField, BigIntegerField, \
+from django.db.models import Model, ForeignKey, OneToOneField, CharField, DateTimeField, BooleanField, IntegerField, BigIntegerField, \
     ManyToManyField, CASCADE, SET_NULL, PROTECT, QuerySet, BigAutoField
 from django.contrib.postgres.fields import ArrayField, JSONField
 from django.contrib.auth.models import User
@@ -29,16 +29,6 @@ import uuid
 # Choices
 #
 
-
-def generate_unique_identifier_for_SIP_when_needed(model):
-    """
-    Create an Unique Identifier for given model class if not exist (None)
-    We just use an Auto Increment ID which is 64 bit
-    """
-    if model.id is not None and model.global_identifier is None:
-        model.global_identifier = SIPidentifier.objects.create(source="TMSS")
-
-
 class SubtaskState(AbstractChoice):
     """Defines the model and predefined list of possible SubtaskStatusChoice's for Subtask.
     The items in the Choices class below are automagically populated into the database via a data migration."""
@@ -128,6 +118,20 @@ class SAPTemplate(Template):
 # todo: do we need to specify a default?
 
 
+class SIPidentifier(Model):
+    '''A SIPidentifier is a global unique id used to build provenance chains in the SIP for the LTA.
+    It is derived from Model and not from BasicCommon to keep a small footprint.'''
+    source = CharField(null=False, max_length=128, help_text='Source name')
+    unique_identifier = BigAutoField(primary_key=True, help_text='Unique global identifier.')
+
+    @staticmethod
+    def assign_new_id_to_model(model):
+        """
+        Create an Unique Identifier for given model class if model is being created.
+        """
+        if model._state.adding:
+            model.global_identifier = SIPidentifier.objects.create(source="TMSS")
+
 #
 # Instance Objects
 #
@@ -148,7 +152,7 @@ class Subtask(BasicCommon):
     # resource_claim = ForeignKey("ResourceClaim", null=False, on_delete=PROTECT) # todo <-- how is this external reference supposed to work?
     created_or_updated_by_user = ForeignKey(User, null=True, editable=False, on_delete=PROTECT, help_text='The user who created / updated the subtask.')
     raw_feedback = CharField(null=True, max_length=1048576, help_text='The raw feedback for this Subtask')
-    global_identifier = ForeignKey('SIPidentifier', null=True, editable=False, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.')
+    global_identifier = OneToOneField('SIPidentifier', null=False, editable=False, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.')
 
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
@@ -224,7 +228,7 @@ class Subtask(BasicCommon):
         creating = self._state.adding  # True on create, False on update
 
         annotate_validate_add_defaults_to_doc_using_template(self, 'specifications_doc', 'specifications_template')
-        generate_unique_identifier_for_SIP_when_needed(self)
+        SIPidentifier.assign_new_id_to_model(self)
 
         # check for uniqueness of SAP names:
         # todo: this is a very specific check, that depends on the template. On the task level, we have a javascript
@@ -294,11 +298,11 @@ class SubtaskOutput(BasicCommon):
 class SAP(BasicCommon):
     specifications_doc = JSONField(help_text='SAP properties.')
     specifications_template = ForeignKey('SAPTemplate', null=False, on_delete=CASCADE, help_text='Schema used for specifications_doc.')
-    global_identifier = ForeignKey('SIPidentifier', null=True, editable=False, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.')
+    global_identifier = OneToOneField('SIPidentifier', null=False, editable=False, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.')
 
     def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
         annotate_validate_add_defaults_to_doc_using_template(self, 'specifications_doc', 'specifications_template')
-        generate_unique_identifier_for_SIP_when_needed(self)
+        SIPidentifier.assign_new_id_to_model(self)
 
         super().save(force_insert, force_update, using, update_fields)
 
@@ -325,12 +329,12 @@ class Dataproduct(BasicCommon):
     feedback_doc = JSONField(help_text='Dataproduct properties, as reported by the producing process.')
     feedback_template = ForeignKey('DataproductFeedbackTemplate', on_delete=PROTECT, help_text='Schema used for feedback_doc.')
     sap = ForeignKey('SAP', on_delete=PROTECT, null=True, related_name="dataproducts", help_text='SAP this dataproduct was generated out of (NULLable).')
-    global_identifier = ForeignKey('SIPidentifier', editable=False, null=True, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.')
+    global_identifier = OneToOneField('SIPidentifier', editable=False, null=False, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.')
 
     def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
         annotate_validate_add_defaults_to_doc_using_template(self, 'specifications_doc', 'specifications_template')
         annotate_validate_add_defaults_to_doc_using_template(self, 'feedback_doc', 'feedback_template')
-        generate_unique_identifier_for_SIP_when_needed(self)
+        SIPidentifier.assign_new_id_to_model(self)
 
         super().save(force_insert, force_update, using, update_fields)
 
@@ -384,7 +388,3 @@ class DataproductHash(BasicCommon):
     algorithm = ForeignKey('Algorithm', null=False, on_delete=PROTECT, help_text='Algorithm used (MD5, AES256).')
     hash = CharField(max_length=128, help_text='Hash value.')
 
-
-class SIPidentifier(BasicCommon):
-    source = CharField(max_length=128, help_text='Source name')
-    unique_identifier = BigAutoField(primary_key=True, help_text='Unique global identifier.')
diff --git a/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py
index e70f7585074..5b94b60f586 100644
--- a/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py
+++ b/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py
@@ -164,7 +164,8 @@ class SAPSerializer(RelationalHyperlinkedModelSerializer):
         fields = '__all__'
 
 
-class SIPidentifierSerializer(RelationalHyperlinkedModelSerializer):
+class SIPidentifierSerializer(serializers.HyperlinkedModelSerializer):
     class Meta:
         model = models.SIPidentifier
-        fields = '__all__'
+        fields = ['unique_identifier', 'source', 'url']
+
diff --git a/SAS/TMSS/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/src/tmss/tmssapp/subtasks.py
index 2c0a5a50dbc..01899071172 100644
--- a/SAS/TMSS/src/tmss/tmssapp/subtasks.py
+++ b/SAS/TMSS/src/tmss/tmssapp/subtasks.py
@@ -468,7 +468,7 @@ def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) ->
     for task_relation_blueprint in task_blueprint.produced_by.all():
         producing_task_blueprint = task_relation_blueprint.producer
 
-        predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.order_by('id').all()]
+        predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.filter(specifications_template__type__value__in=(SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)).order_by('id').all()]
         for predecessor_subtask in predecessor_subtasks:
             for predecessor_subtask_output in predecessor_subtask.outputs.all():
                 SubtaskInput.objects.create(subtask=subtask,
@@ -503,6 +503,9 @@ def schedule_subtask(subtask: Subtask) -> Subtask:
         if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value:
             return schedule_qaplots_subtask(subtask)
 
+        if subtask.specifications_template.type.value == SubtaskType.Choices.INGEST.value:
+            return schedule_ingest_subtask(subtask)
+
         if subtask.specifications_template.type.value == SubtaskType.Choices.COPY.value:
             return schedule_copy_subtask(subtask)
 
@@ -510,6 +513,8 @@ def schedule_subtask(subtask: Subtask) -> Subtask:
                                          (subtask.pk, subtask.specifications_template.type.value))
     except Exception as e:
         try:
+            logger.exception(e)
+
             # set the subtask to state 'ERROR'...
             subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
             subtask.save()
@@ -840,6 +845,8 @@ def schedule_observation_subtask(observation_subtask: Subtask):
                                                     },
                                  specifications_template=SAPTemplate.objects.get(name="SAP"))
 
+        # create dataproducts in bulk, and assign each dp its own unique global identifier
+        dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in pointing['subbands']])
         Dataproduct.objects.bulk_create([Dataproduct(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr),
                                                      directory=directory,
                                                      dataformat=Dataformat.objects.get(value="MeasurementSet"),
@@ -851,7 +858,9 @@ def schedule_observation_subtask(observation_subtask: Subtask):
                                                      feedback_template=dataproduct_feedback_template,
                                                      size=0 if sb_nr%10==0 else 1024*1024*1024*sb_nr,
                                                      expected_size=1024*1024*1024*sb_nr,
-                                                     sap=sap) for sb_nr in pointing['subbands']])
+                                                     sap=sap,
+                                                     global_identifier=dp_global_identifier)
+                                         for sb_nr, dp_global_identifier in zip(pointing['subbands'], dp_global_identifiers)])
 
     # step 4: resource assigner (if possible)
     _assign_or_unassign_resources(observation_subtask)
@@ -922,8 +931,9 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask):
         # TODO: create them from the spec, instead of "copying" the input filename
         dataformat = Dataformat.objects.get(value="MeasurementSet")
         input_dps = list(pipeline_subtask_input.dataproducts.all())
+        dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in input_dps])
         output_dp_objects = []
-        for input_dp in pipeline_subtask_input.dataproducts.all():
+        for input_dp, dp_global_identifier in zip(input_dps, dp_global_identifier):
             if '_' in input_dp.filename and input_dp.filename.startswith('L'):
                 filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1])
             else:
@@ -938,7 +948,8 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask):
                                     specifications_template=dataproduct_specifications_template,
                                     feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema),
                                     feedback_template=dataproduct_feedback_template,
-                                    sap=input_dp.sap)
+                                    sap=input_dp.sap,
+                                    global_identifier=dp_global_identifier)
             output_dp_objects.append(output_dp)
 
         output_dps = Dataproduct.objects.bulk_create(output_dp_objects)
@@ -956,6 +967,89 @@ def schedule_pipeline_subtask(pipeline_subtask: Subtask):
 
     return pipeline_subtask
 
+
+def schedule_ingest_subtask(ingest_subtask: Subtask):
+    ''' Schedule the given ingest_subtask
+    This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished.
+    This method implements "Scheduling subtasks" step from the "Specification Flow"
+    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
+    '''
+    # step 0: check pre-requisites
+    check_prerequities_for_scheduling(ingest_subtask)
+
+    if ingest_subtask.specifications_template.type.value != SubtaskType.Choices.INGEST.value:
+        raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (ingest_subtask.pk,
+                                                                                                          ingest_subtask.specifications_template.type,
+                                                                                                          SubtaskType.Choices.INGEST.value))
+
+    # step 1: set state to SCHEDULING
+    ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
+    ingest_subtask.save()
+
+    # step 1a: set start/stop times
+    # not very relevant for ingest subtasks, but it's nice for the user to see when the ingest task was scheduled.
+    # please note that an ingest subtask may idle for some time while it is in the ingest queue.
+    # the actual start/stop times are set by the IngestTMSSAdapter when the subtask starts and stops.
+    ingest_subtask.start_time = max([pred.stop_time for pred in ingest_subtask.predecessors] + [datetime.utcnow()])
+    ingest_subtask.stop_time = ingest_subtask.start_time  + timedelta(hours=6)
+
+    # step 2: link input dataproducts
+    if ingest_subtask.inputs.count() == 0:
+        raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (ingest_subtask.pk,
+                                                                                                               ingest_subtask.specifications_template.type))
+
+    if ingest_subtask.inputs.select_related('producer.dataproducts').count() == 0:
+        raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input dataproduct(s)" % (ingest_subtask.pk,
+                                                                                                                           ingest_subtask.specifications_template.type))
+
+    # iterate over all inputs
+    for ingest_subtask_input in ingest_subtask.inputs.all():
+
+        # select and set input dataproducts that meet the filter defined in selection_doc
+        input_dataproducts = [dataproduct for dataproduct in ingest_subtask_input.producer.dataproducts.all()
+                              if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, ingest_subtask_input.selection_doc)]
+        ingest_subtask_input.dataproducts.set(input_dataproducts)
+
+        # define output and create output dataproducts.
+        ingest_subtask_output = SubtaskOutput.objects.create(subtask=ingest_subtask)
+
+        # prepare identifiers in bulk for each output_dataproduct
+        dp_gids = [SIPidentifier(source="TMSS") for _ in input_dataproducts]
+        SIPidentifier.objects.bulk_create(dp_gids)
+
+        # overwritten later by ingest 'feedback'. Is determined at transfer time by the LTA.
+        feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
+        feedback_doc = get_default_json_object_for_schema(feedback_template.schema)
+        output_dataproducts = [Dataproduct(filename=input_dp.filename, # overwritten later by ingest 'feedback'. Is determined at transfer time by the LTA.
+                                           directory="LTA", # filled in later by ingest 'feedback'. Is determined at transfer time by the LTA.
+                                           dataformat=input_dp.dataformat,
+                                           datatype=input_dp.datatype,
+                                           specifications_doc=input_dp.specifications_doc,
+                                           specifications_template=input_dp.specifications_template,
+                                           producer=ingest_subtask_output,
+                                           size=None,  # filled in later by ingest 'feedback'. Is determined at transfer time by the LTA.
+                                           feedback_doc=feedback_doc,
+                                           feedback_template=feedback_template,
+                                           sap=input_dp.sap,
+                                           global_identifier=dp_gid) for input_dp, dp_gid in zip(input_dataproducts, dp_gids)]
+        Dataproduct.objects.bulk_create(output_dataproducts)
+        ingest_subtask_input.dataproducts.set(output_dataproducts)
+
+        # link each input to each corresponding output dataproduct. identity=True because this is "just a copy".
+        dataproduct_transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=True)
+                                  for input_dp, output_dp in zip(input_dataproducts, output_dataproducts)]
+        DataproductTransform.objects.bulk_create(dataproduct_transforms)
+
+
+    # skip step 4: ingest does not need to have resources assigned
+
+    # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
+    ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
+    ingest_subtask.save()
+
+    return ingest_subtask
+
+
 def schedule_copy_subtask(copy_subtask: Subtask):
     ''' Schedule the given copy_subtask
     This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished.
-- 
GitLab