diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py index 80dfcee16ec563ea2417916c6eb06069f2ebe7e7..92298e833d01864410d81ebf91203e72f6f834b7 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/adapters/parset.py @@ -64,7 +64,7 @@ def _dataproduct_parset_subkeys(subtask: models.Subtask, dataproducts: list) -> parset["enabled"] = len(dataproducts) > 0 parset["filenames"] = [dp.filename if dp else "null:" for dp in dataproducts] parset["skip"] = [0 if dp else 1 for dp in dataproducts] - parset["locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory if dp else "") for dp in dataproducts] + parset["locations"] = ["%s:%s" % (subtask.cluster.name, dp.directory if dp else "") for dp in dataproducts] # todo: use storage cluster instead of processing cluster? dp.producer.filesystem.cluster.name? return parset @@ -112,8 +112,8 @@ def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: d correlator_dataproducts.append(dataproduct[0] if dataproduct else None) parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, correlator_dataproducts), "Observation.DataProducts.Output_Correlated.")) - parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster - parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = "/data/projects" if isProductionEnvironment() else "/data/test-projects" + parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.outputs.first().filesystem.cluster.name # todo: filter for specific output? + parset["Observation.DataProducts.Output_Correlated.storageClusterPartition"] = subtask.outputs.first().filesystem.directory # todo: filter for specific output? # mimic MoM placeholder thingy (the resource estimator parses this) parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] # we need one per SAP, and that needs to include the string "SAPxxx" with xxx being the SAP number, just to satisfy the ResourceEstimator. @@ -288,12 +288,12 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d coherent_dataproducts, incoherent_dataproducts = _order_beamformer_dataproducts(dataproducts, spec) parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, coherent_dataproducts), "Observation.DataProducts.Output_CoherentStokes.")) - parset["Observation.DataProducts.Output_CoherentStokes.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster - parset["Observation.DataProducts.Output_CoherentStokes.storageClusterPartition"] = "/data/projects" if isProductionEnvironment() else "/data/test-projects" + parset["Observation.DataProducts.Output_CoherentStokes.storageClusterName"] = subtask.outputs.first().filesystem.cluster.name # todo: filter for specific output? + parset["Observation.DataProducts.Output_CoherentStokes.storageClusterPartition"] = subtask.outputs.first().filesystem.directory # todo: filter for specific output? parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, incoherent_dataproducts), "Observation.DataProducts.Output_IncoherentStokes.")) - parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterName"] = subtask.cluster.name # TODO: This must be the storage cluster, not the processing cluster - parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterPartition"] = "/data/projects" if isProductionEnvironment() else "/data/test-projects" + parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterName"] = subtask.outputs.first().filesystem.cluster.name # todo: filter for specific output? + parset["Observation.DataProducts.Output_IncoherentStokes.storageClusterPartition"] = subtask.outputs.first().filesystem.directory # todo: filter for specific output? # mimic MoM placeholder thingy (the resource estimator parses this) parset["Observation.DataProducts.Output_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] # this needs to be unique per SAP only, not dataproduct @@ -609,7 +609,7 @@ def _convert_to_parset_dict_for_preprocessing_pipeline_schema(subtask: models.Su parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Correlated.")) parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)] - parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.cluster.name + parset["Observation.DataProducts.Output_Correlated.storageClusterName"] = subtask.outputs.first().filesystem.cluster.name # todo: should we filter for an output with role=Role.Choices.CORRELATOR? # Other parset["Observation.ObservationControl.PythonControl.PreProcessing.SkyModel"] = "Ateam_LBA_CC" @@ -700,7 +700,7 @@ def _convert_to_parset_dict_for_pulsarpipeline_schema(subtask: models.Subtask) - parset.update(_add_prefix(_dataproduct_parset_subkeys(subtask, out_dataproducts), "Observation.DataProducts.Output_Pulsar.")) parset["Observation.DataProducts.Output_Pulsar.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, 0)] # todo: find correct SAP id (although this is probably fine since the pulsar pipeline does not use this?) - parset["Observation.DataProducts.Output_Pulsar.storageClusterName"] = subtask.cluster.name + parset["Observation.DataProducts.Output_Pulsar.storageClusterName"] = subtask.outputs.first().filesystem.cluster.name # todo: should we filter for an output with role=Role.Choices.BEAMFORMER? # pragmatic solution to deal with the various parset using subsystems... # some want the keys as "Observation.<subkey>" and some as "ObsSW.Observation.<subkey>" diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0023_subtaskoutput_filesystem.py b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0023_subtaskoutput_filesystem.py new file mode 100644 index 0000000000000000000000000000000000000000..156fc9cb0e1aef3e64ef7c74cac69e6e7da884fb --- /dev/null +++ b/SAS/TMSS/backend/src/tmss/tmssapp/migrations/0023_subtaskoutput_filesystem.py @@ -0,0 +1,32 @@ +# Generated by Django 3.0.9 on 2022-01-25 10:18 + +from django.db import migrations, models +import django.db.models.deletion +from lofar.sas.tmss.tmss.tmssapp.populate import populate_misc2 + +class Migration(migrations.Migration): + + dependencies = [ + ('tmssapp', '0022_sip'), + ] + + operations = [ + migrations.AddField( + model_name='subtaskoutput', + name='filesystem', + field=models.ForeignKey(default=None, help_text='The filesystem that all dataproducts that this output produces are written to', on_delete=django.db.models.deletion.CASCADE, to='tmssapp.Filesystem'), + preserve_default=False, + ), + migrations.AlterField( + model_name='schedulingunitdraft', + name='priority_rank', + field=models.FloatField(default=1.0, help_text='Priority of this scheduling unit w.r.t. other scheduling units within the same queue and project.'), + ), + migrations.AlterField( + model_name='subtask', + name='global_parset_identifier', + field=models.OneToOneField(default=1, editable=False, help_text="The global unique identifier of this Subtask's parset for LTA SIP.", on_delete=django.db.models.deletion.PROTECT, related_name='related_subtask', to='tmssapp.SIPidentifier'), + preserve_default=False, + ), + migrations.RunPython(populate_misc2) + ] diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py index c1c48af4c87aa8a95a791bc3d0a79d2335382795..5a25891924aa90f2db169722d2c503bb9e44f415 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/models/scheduling.py @@ -485,6 +485,7 @@ class SubtaskInput(BasicCommon, TemplateSchemaMixin): class SubtaskOutput(BasicCommon): subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='outputs', help_text='Subtask to which this output specification refers.') output_role = ForeignKey('TaskConnectorType', null=True, related_name='subtask_outputs', on_delete=CASCADE, help_text='Output connector type (what kind of data is taken from the producer).') + filesystem = ForeignKey('Filesystem', null=False, on_delete=PROTECT, help_text='The filesystem that all dataproducts that this output produces are written to') class SAP(BasicCommon, TemplateSchemaMixin): diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py index a44659eecd9aa959f14fbeccb385c2c7c38973cb..728450422c1a7472899f37115c57c6a99de0b3a8 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/populate.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/populate.py @@ -531,6 +531,14 @@ def populate_misc(apps, schema_editor): directory="srm://lta-head.lofar.psnc.pl:8443/lofar/ops/projects/") +def populate_misc2(apps, schema_editor): + cluster, _ = Cluster.objects.get_or_create(name="CEP4", location="CIT", archive_site=False) + cep4_test = Filesystem.objects.create(name="Test Storage (CIT)", cluster=cluster, capacity=3.6e15, + directory="/data/test-projects") + cep4_production = Filesystem.objects.create(name="Production Storage (CIT)", cluster=cluster, capacity=3.6e15, + directory="/data/projects") + + def populate_connectors(): # the TaskConnectorType's define how the Task[Draft/Blueprint] *can* be connected. diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py index 1e06ed2893d358dfd3f6fb1f962eb3b33234e415..6c608716b84c61fc84d4b8285c8cf969c63932fa 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/subtasks.py @@ -653,7 +653,7 @@ def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskB # an observation has no input, it just produces output data # create a subtask_output per task_connector_type of the task, but only with role Correlator or Beamformer because that is what observations produce. for task_connector_type in task_blueprint.specifications_template.connector_types.filter(iotype__value=IOType.Choices.OUTPUT.value).filter(Q(role__value=Role.Choices.CORRELATOR.value)|Q(role__value=Role.Choices.BEAMFORMER.value)).all(): - subtask_output = SubtaskOutput.objects.create(subtask=subtask, output_role=task_connector_type) + subtask_output = SubtaskOutput.objects.create(subtask=subtask, output_role=task_connector_type, filesystem=_output_filesystem(subtask)) # step 3: set state to DEFINED subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -720,7 +720,7 @@ def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) selection_template=selection_template) # create an internal SubtaskOutput (output_role=None), because we do not expose qa data yet at task level - qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, output_role=None) + qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask, output_role=None, filesystem=_output_filesystem(qafile_subtask)) # step 3: set state to DEFINED qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -798,7 +798,7 @@ def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subta selection_template=selection_template) # create an internal SubtaskOutput (output_role=None), because we do not expose qa data yet at task level - qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask, output_role=None) + qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask, output_role=None, filesystem=_output_filesystem(qaplots_subtask)) # step 3: set state to DEFINED qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -867,7 +867,7 @@ def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, s # create a subtask_output per task_connector_type of the task for task_connector_type in task_blueprint.specifications_template.connector_types.filter(iotype__value=IOType.Choices.OUTPUT.value).all(): - subtask_output = SubtaskOutput.objects.create(subtask=subtask, output_role=task_connector_type) + subtask_output = SubtaskOutput.objects.create(subtask=subtask, output_role=task_connector_type, filesystem=_output_filesystem(subtask)) # step 3: set state to DEFINED subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value) @@ -1380,12 +1380,17 @@ def _output_root_directory(subtask: Subtask) -> str: """ Return the directory under which output needs to be stored. """ # Support for several projects will be added in TMSS-689 - directory = "/data/%s/%s/L%s" % ("projects" if isProductionEnvironment() else "test-projects", - subtask.project.name, - subtask.id) + directory = "%s/%s/L%s" % (_output_filesystem(subtask).directory, subtask.project.name, subtask.id) return directory + +def _output_filesystem(subtask: Subtask) -> Filesystem: + """ Return the filesystem that output dataproducts should be written to.""" + + return Filesystem.objects.get(name='%s Storage (CIT)' % ('Production' if isProductionEnvironment() else 'Test')) + + def schedule_observation_subtask(observation_subtask: Subtask): ''' Schedule the given observation_subtask For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler. @@ -1983,7 +1988,7 @@ def schedule_ingest_subtask(ingest_subtask: Subtask): # Create one internal SubtaskOutput (output_role=None), all dataproducts end up in here. # We do not expose ingested data as a task output connector. The LTA is an endpoint. - ingest_subtask_output = SubtaskOutput.objects.create(subtask=ingest_subtask, output_role=None) + ingest_subtask_output = SubtaskOutput.objects.create(subtask=ingest_subtask, output_role=None, filesystem=_output_filesystem(ingest_subtask)) # gather all dataproducts from all inputs for ingest_subtask_input in ingest_subtask.inputs.all(): diff --git a/SAS/TMSS/backend/test/t_tmssapp_scheduling_django_API.py b/SAS/TMSS/backend/test/t_tmssapp_scheduling_django_API.py index 3a49d448f65b1ecbb44b74039012a7e9d8a3b278..980a475c6662bb4e96c2815f23c694691c007f60 100755 --- a/SAS/TMSS/backend/test/t_tmssapp_scheduling_django_API.py +++ b/SAS/TMSS/backend/test/t_tmssapp_scheduling_django_API.py @@ -194,6 +194,7 @@ class SubtaskTest(unittest.TestCase): @classmethod def setUpClass(cls) -> None: cls.task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data()) + cls.filesystem = models.Filesystem.objects.create(**Filesystem_test_data()) def test_Subtask_gets_created_with_correct_creation_timestamp(self): @@ -281,7 +282,7 @@ class SubtaskTest(unittest.TestCase): subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=self.task_blueprint, primary=True)) subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data(task_blueprint=self.task_blueprint, primary=False)) - output1 = models.SubtaskOutput.objects.create(subtask=subtask1) + output1 = models.SubtaskOutput.objects.create(subtask=subtask1, filesystem=self.filesystem) models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask2, producer=output1)) self.assertEqual(subtask1, subtask2.predecessors.all()[0]) @@ -299,12 +300,12 @@ class SubtaskTest(unittest.TestCase): # | | # ST2 - -> ST5 ---> ST6 - output1 = models.SubtaskOutput.objects.create(subtask=subtask1) - output2 = models.SubtaskOutput.objects.create(subtask=subtask2) - output3 = models.SubtaskOutput.objects.create(subtask=subtask3) - output4 = models.SubtaskOutput.objects.create(subtask=subtask4) - output5 = models.SubtaskOutput.objects.create(subtask=subtask5) - output6 = models.SubtaskOutput.objects.create(subtask=subtask6) + output1 = models.SubtaskOutput.objects.create(subtask=subtask1, filesystem=self.filesystem) + output2 = models.SubtaskOutput.objects.create(subtask=subtask2, filesystem=self.filesystem) + output3 = models.SubtaskOutput.objects.create(subtask=subtask3, filesystem=self.filesystem) + output4 = models.SubtaskOutput.objects.create(subtask=subtask4, filesystem=self.filesystem) + output5 = models.SubtaskOutput.objects.create(subtask=subtask5, filesystem=self.filesystem) + output6 = models.SubtaskOutput.objects.create(subtask=subtask6, filesystem=self.filesystem) models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask3, producer=output1)) models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask3, producer=output2)) @@ -326,7 +327,7 @@ class SubtaskTest(unittest.TestCase): def test_Subtask_transformed_dataproducts(self): # setup subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) - output1:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask1) + output1:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask1, filesystem=self.filesystem) output1_dp:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=output1)) subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) @@ -334,7 +335,7 @@ class SubtaskTest(unittest.TestCase): input2_dp = output1_dp input2.dataproducts.set([input2_dp]) input2.save() - output2:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask2) + output2:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask2, filesystem=self.filesystem) output2_dp:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=output2)) models.DataproductTransform.objects.create(input=input2_dp, output=output2_dp, identity=True) diff --git a/SAS/TMSS/backend/test/tmss_test_data_django_models.py b/SAS/TMSS/backend/test/tmss_test_data_django_models.py index d0bf350595efa1e860afb9c995eec85cb7b09bc8..a5ce8233401eceee38183665dda69f9f7033e6bd 100644 --- a/SAS/TMSS/backend/test/tmss_test_data_django_models.py +++ b/SAS/TMSS/backend/test/tmss_test_data_django_models.py @@ -362,7 +362,8 @@ def SubtaskOutput_test_data(subtask: models.Subtask=None) -> dict: subtask = models.Subtask.objects.create(**Subtask_test_data()) return {"subtask": subtask, - "tags":[]} + "tags":[], + "filesystem": models.Filesystem.objects.create(**Filesystem_test_data())} def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.SubtaskOutput=None, selection_doc=None, selection_template: models.TaskRelationSelectionTemplate=None) -> dict: if subtask is None: diff --git a/SAS/TMSS/backend/test/tmss_test_data_rest.py b/SAS/TMSS/backend/test/tmss_test_data_rest.py index 8dbf787d06ae1f7c454a9a3c6bf2e7c55a8500a1..7592d0fdd05e64a21cda275db9fcc3902c36c5b3 100644 --- a/SAS/TMSS/backend/test/tmss_test_data_rest.py +++ b/SAS/TMSS/backend/test/tmss_test_data_rest.py @@ -659,13 +659,22 @@ class TMSSRESTTestDataCreator(): self._subtask_url = self.post_data_and_get_url(self.Subtask(task_blueprint_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')), '/subtask/') return self._subtask_url + @property + def cached_filesystem_url(self): + try: + return self._filesystem_url + except AttributeError: + self._filesystem_url = self.post_data_and_get_url(self.Filesystem(), '/filesystem/') + return self._filesystem_url + def SubtaskOutput(self, subtask_url=None): if subtask_url is None: subtask_url = self.cached_subtask_url return {"subtask": subtask_url, - "tags": []} + "tags": [], + "filesystem": self.cached_filesystem_url} @property def cached_subtask_output_url(self):