diff --git a/SAS/TMSS/test/t_tmssapp_scheduling_django_API.py b/SAS/TMSS/test/t_tmssapp_scheduling_django_API.py index d9f079f35a4ae89db4c160bc9c75c4b554cd5dfc..1e9d43e9cab89a6c25cf8e4a903b16ea8dd53b49 100755 --- a/SAS/TMSS/test/t_tmssapp_scheduling_django_API.py +++ b/SAS/TMSS/test/t_tmssapp_scheduling_django_API.py @@ -271,6 +271,54 @@ class SubtaskTest(unittest.TestCase): self.assertEqual(set(), set(subtask4.successors.all())) self.assertEqual(set((subtask6,)), set(subtask5.successors.all())) + def test_Subtask_transformed_dataproducts(self): + # setup + subtask1:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + output1:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask1) + output1_dp:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=output1)) + + subtask2:models.Subtask = models.Subtask.objects.create(**Subtask_test_data()) + input2:models.SubtaskInput = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=subtask2, producer=output1)) + input2_dp = output1_dp + input2.dataproducts.set([input2_dp]) + input2.save() + output2:models.SubtaskOutput = models.SubtaskOutput.objects.create(subtask=subtask2) + output2_dp:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=output2)) + + models.DataproductTransform.objects.create(input=input2_dp, output=output2_dp, identity=True) + + subtask1.refresh_from_db() + subtask2.refresh_from_db() + + # make sure the subtask and input/output dp's are setup correct... + self.assertEqual(1, subtask2.output_dataproducts.count()) + self.assertEqual(1, subtask2.input_dataproducts.count()) + self.assertEqual(output2_dp, subtask2.output_dataproducts.first()) + self.assertEqual(input2_dp, subtask2.input_dataproducts.first()) + + # now test the get_transformed_input_dataproduct/get_transformed_ouput_dataproduct methods + self.assertEqual(input2_dp, subtask2.get_transformed_input_dataproduct(output2_dp.id)) + self.assertEqual(output2_dp, subtask2.get_transformed_output_dataproduct(input2_dp.id)) + + # add some extra data + output1_dp2:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=output1)) + input2_dp2 = output1_dp2 + input2.dataproducts.set([input2_dp, input2_dp2]) + input2.save() + + output2_dp2:models.Dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=output2)) + models.DataproductTransform.objects.create(input=input2_dp2, output=output2_dp2, identity=True) + + # make sure the subtask and input/output dp's are setup correct... + self.assertEqual(2, subtask2.output_dataproducts.count()) + self.assertEqual(2, subtask2.input_dataproducts.count()) + + # test the get_transformed_input_dataproduct/get_transformed_ouput_dataproduct methods again, wit multiple dp's + self.assertEqual(input2_dp, subtask2.get_transformed_input_dataproduct(output2_dp.id)) + self.assertEqual(input2_dp2, subtask2.get_transformed_input_dataproduct(output2_dp2.id)) + self.assertEqual(output2_dp, subtask2.get_transformed_output_dataproduct(input2_dp.id)) + self.assertEqual(output2_dp2, subtask2.get_transformed_output_dataproduct(input2_dp2.id)) + def test_Subtask_raises_ValidationError_on_duplicate_pointing_names(self): # setup diff --git a/SAS/TMSS/test/tmss_test_data_django_models.py b/SAS/TMSS/test/tmss_test_data_django_models.py index 0d24b4da2e1491ed802a53c3354c2a8e45ceeaa7..53f6cd54ab130c1af78727e2b3517ba3cdea71e8 100644 --- a/SAS/TMSS/test/tmss_test_data_django_models.py +++ b/SAS/TMSS/test/tmss_test_data_django_models.py @@ -330,21 +330,24 @@ def SubtaskOutput_test_data(subtask: models.Subtask=None) -> dict: return {"subtask": subtask, "tags":[]} -def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.SubtaskOutput=None, selection_doc=None) -> dict: +def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.SubtaskOutput=None, selection_doc=None, selection_template: models.TaskRelationSelectionTemplate=None) -> dict: if subtask is None: subtask = models.Subtask.objects.create(**Subtask_test_data()) if producer is None: producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data()) + if selection_template is None: + selection_template = models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()) + if selection_doc is None: - selection_doc = {} + selection_doc = get_default_json_object_for_schema(selection_template.schema) return {"subtask": subtask, "task_relation_blueprint": models.TaskRelationBlueprint.objects.create(**TaskRelationBlueprint_test_data()), "producer": producer, "selection_doc": selection_doc, - "selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()), + "selection_template": selection_template, "tags":[]} def Subtask_test_data(task_blueprint: models.TaskBlueprint=None, subtask_template: models.SubtaskTemplate=None,