diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py index 70d070e24025acdacd7ae25ee42b15450ea5f5d0..8b2222ac49cb062a0468328a3d24b72befd04b7e 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py @@ -58,7 +58,7 @@ class IngestEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, IngestEvent @staticmethod def is_tmss_job(job_dict) -> bool: - return job_dict.get('type', '').lower() == 'tmss' + return job_dict.get('type', job_dict.get('Type','')).lower() == 'tmss' def onJobStarted(self, job_dict): if self.is_tmss_job(job_dict): diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.py index 7c95734fec9875d163c5bb694ce21e47b357ed3c..cc0c971bdfc972f3bec76b2e12f24415ae6884e1 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.py @@ -127,7 +127,7 @@ class TestIngestTMSSAdapter(unittest.TestCase): self.assertEqual("TMSS", job['Type']) self.assertEqual(str(ingest_subtask.id), job['TMSSIngestSubtaskId']) self.assertEqual(str(obs_subtask.id), job['ObservationId']) - self.assertEqual(dataproduct.filepath, job['Location']) + self.assertTrue(job['Location'].endswith(dataproduct.filepath)) self.assertEqual(obs_task.scheduling_unit_blueprint.draft.scheduling_set.project.name, job['Project']) input_pd2jobs[dataproduct] = job diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/t_ingestpipeline.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/t_ingestpipeline.py index 5e7a88d32948ff53f8bc6352e4abbe3155cf3b51..7165dc848fa5505099ade7426c388905ec0fd7ea 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/t_ingestpipeline.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/t_ingestpipeline.py @@ -299,83 +299,85 @@ with patch('lofar.lta.ingest.server.ltaclient.LTAClient', autospec=True) as Mock # create TMSSTestEnvironment with a running ingest_tmss_adapter # assume the ingest_tmss_adapter works correctly. It is tested in t_ingesttmssadapter. - with TMSSTestEnvironment(exchange=self.tmp_exchange.address, populate_schemas=True, start_ingest_tmss_adapter=True) as tmss_test_env: - from lofar.sas.tmss.test.tmss_test_data_django_models import SubtaskTemplate_test_data, Subtask_test_data, \ + with TMSSTestEnvironment(exchange=self.tmp_exchange.address, populate_schemas=True) as tmss_test_env: + from lofar.lta.ingest.server.ingesttmssadapter import IngestTMSSAdapter + with IngestTMSSAdapter(tmss_test_env.client_credentials.dbcreds, self.tmp_exchange.address): + from lofar.sas.tmss.test.tmss_test_data_django_models import SubtaskTemplate_test_data, Subtask_test_data, \ TaskBlueprint_test_data, TaskTemplate_test_data, Dataproduct_test_data, \ SubtaskOutput_test_data, SubtaskInput_test_data - from lofar.sas.tmss.tmss.tmssapp import models - from lofar.common.json_utils import get_default_json_object_for_schema - - #################################################### - # setup: create observation and link an ingest to it. - #################################################### - - obs_task_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data(task_type_value='observation')) - obs_task = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(specifications_template=obs_task_template)) - obs_subtask_template = models.SubtaskTemplate.objects.get(name='observation control') - obs_subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=obs_subtask_template, task_blueprint=obs_task)) - obs_subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=obs_subtask)) - - feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback') - feedback_doc = get_default_json_object_for_schema(feedback_template.schema) - feedback_doc['frequency']['subbands'] = [0] - feedback_doc['frequency']['central_frequencies'] = [1] - obs_dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_subtask_output, feedback_template=feedback_template, feedback_doc=feedback_doc)) - - ingest_task_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data(task_type_value='ingest')) - ingest_task = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(scheduling_unit_blueprint=obs_subtask.task_blueprint.scheduling_unit_blueprint, specifications_template=ingest_task_template)) - ingest_subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data(subtask_type_value='ingest')) - ingest_subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=ingest_subtask_template, task_blueprint=ingest_task)) - ingest_subtask_input = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=ingest_subtask, producer=obs_subtask_output)) - ingest_subtask_input.dataproducts.set([obs_dataproduct]) - ingest_subtask_input.save() - - ingest_subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=ingest_subtask)) - ingest_output_dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=ingest_subtask_output)) - transforms = models.DataproductTransform.objects.create(input=obs_dataproduct, output=ingest_output_dataproduct, identity=True) - - #################################################### - # end of object setup - #################################################### - - project_name = ingest_task.draft.scheduling_unit_draft.scheduling_set.project.name - obs_id = obs_subtask.id - dpname = 'L%s_SAP000_SB000_uv.MS' % obs_id - self.test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1(), dpname) - - def stub_GetStorageTicket(project, filename, filesize, archive_id, job_id, obs_id, check_mom_id=True, id_source='TMSS'): - return { 'primary_uri_rnd': 'srm://some.site.name:8443/some/path/data/lofar/ops/projects/%s/%s/%s.tar' % (project, obs_id, dpname), - 'result': 'ok', - 'error': '', - 'ticket': '3E0A47ED860D6339E053B316A9C3BEE2'} - ltamock.GetStorageTicket.side_effect = stub_GetStorageTicket - - os.makedirs(self.test_dir_path) - test_file_paths = [] - for i in range(10): - test_file_path = os.path.join(self.test_dir_path, 'testfile_%s.txt' % i) - test_file_paths.append(test_file_path) - with open(test_file_path, 'w') as file: - file.write(1000*'a') - - job_xml = createJobXml(testname, obs_id, dpname, obs_dataproduct.global_identifier.unique_identifier, - 'localhost:%s' % self.test_dir_path, - tmss_ingest_subtask_id=ingest_subtask.id, tmss_input_dataproduct_id=obs_dataproduct.id) - logger.info('job xml: %s', job_xml) - job = parseJobXml(job_xml) - - with tmss_test_env.create_tmss_client() as tmss_client: - pl = IngestPipeline(job, momClient=None, ltaClient=self.ltaclient, tmss_client=tmss_client, - exchange=self.tmp_exchange.address) - pl.run() - - # check SIP - SIP = tmss_client.get_dataproduct_SIP(ingest_output_dataproduct.id) - self.assertTrue("<storageTicket>3E0A47ED860D6339E053B316A9C3BEE2</storageTicket>" in SIP) - - # check archive info - ingest_output_dataproduct.refresh_from_db() - self.assertEqual("3E0A47ED860D6339E053B316A9C3BEE2", ingest_output_dataproduct.archive_info.storage_ticket) + from lofar.sas.tmss.tmss.tmssapp import models + from lofar.common.json_utils import get_default_json_object_for_schema + + #################################################### + # setup: create observation and link an ingest to it. + #################################################### + + obs_task_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data(task_type_value='observation')) + obs_task = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(specifications_template=obs_task_template)) + obs_subtask_template = models.SubtaskTemplate.objects.get(name='observation control') + obs_subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=obs_subtask_template, task_blueprint=obs_task)) + obs_subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=obs_subtask)) + + feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback') + feedback_doc = get_default_json_object_for_schema(feedback_template.schema) + feedback_doc['frequency']['subbands'] = [0] + feedback_doc['frequency']['central_frequencies'] = [1] + obs_dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_subtask_output, feedback_template=feedback_template, feedback_doc=feedback_doc)) + + ingest_task_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data(task_type_value='ingest')) + ingest_task = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(scheduling_unit_blueprint=obs_subtask.task_blueprint.scheduling_unit_blueprint, specifications_template=ingest_task_template)) + ingest_subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data(subtask_type_value='ingest')) + ingest_subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=ingest_subtask_template, task_blueprint=ingest_task)) + ingest_subtask_input = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=ingest_subtask, producer=obs_subtask_output)) + ingest_subtask_input.dataproducts.set([obs_dataproduct]) + ingest_subtask_input.save() + + ingest_subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=ingest_subtask)) + ingest_output_dataproduct = models.Dataproduct.objects.create(**Dataproduct_test_data(producer=ingest_subtask_output)) + transforms = models.DataproductTransform.objects.create(input=obs_dataproduct, output=ingest_output_dataproduct, identity=True) + + #################################################### + # end of object setup + #################################################### + + project_name = ingest_task.draft.scheduling_unit_draft.scheduling_set.project.name + obs_id = obs_subtask.id + dpname = 'L%s_SAP000_SB000_uv.MS' % obs_id + self.test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1(), dpname) + + def stub_GetStorageTicket(project, filename, filesize, archive_id, job_id, obs_id, check_mom_id=True, id_source='TMSS'): + return { 'primary_uri_rnd': 'srm://some.site.name:8443/some/path/data/lofar/ops/projects/%s/%s/%s.tar' % (project, obs_id, dpname), + 'result': 'ok', + 'error': '', + 'ticket': '3E0A47ED860D6339E053B316A9C3BEE2'} + ltamock.GetStorageTicket.side_effect = stub_GetStorageTicket + + os.makedirs(self.test_dir_path) + test_file_paths = [] + for i in range(10): + test_file_path = os.path.join(self.test_dir_path, 'testfile_%s.txt' % i) + test_file_paths.append(test_file_path) + with open(test_file_path, 'w') as file: + file.write(1000*'a') + + job_xml = createJobXml(testname, obs_id, dpname, obs_dataproduct.global_identifier.unique_identifier, + 'localhost:%s' % self.test_dir_path, + tmss_ingest_subtask_id=ingest_subtask.id, tmss_input_dataproduct_id=obs_dataproduct.id) + logger.info('job xml: %s', job_xml) + job = parseJobXml(job_xml) + + with tmss_test_env.create_tmss_client() as tmss_client: + pl = IngestPipeline(job, momClient=None, ltaClient=self.ltaclient, tmss_client=tmss_client, + exchange=self.tmp_exchange.address) + pl.run() + + # check SIP + SIP = tmss_client.get_dataproduct_SIP(ingest_output_dataproduct.id) + self.assertTrue("<storageTicket>3E0A47ED860D6339E053B316A9C3BEE2</storageTicket>" in SIP) + + # check archive info + ingest_output_dataproduct.refresh_from_db() + self.assertEqual("3E0A47ED860D6339E053B316A9C3BEE2", ingest_output_dataproduct.archive_info.storage_ticket) except Exception as e: self.assertTrue(False, 'Unexpected exception in pipeline: %s' % e) finally: diff --git a/SAS/TMSS/test/tmss_test_data_django_models.py b/SAS/TMSS/test/tmss_test_data_django_models.py index a19c65bfcd6fdcafab3784542b9086c04383e1db..360a4a90b3f8bb2c9b0d3b7916c95b8f582c2973 100644 --- a/SAS/TMSS/test/tmss_test_data_django_models.py +++ b/SAS/TMSS/test/tmss_test_data_django_models.py @@ -240,7 +240,10 @@ def SchedulingUnitBlueprint_test_data(name=None, requirements_template: models.S "do_cancel": False, "draft": models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data()) } -def TaskBlueprint_test_data(name='my_task_blueprint', task_draft: models.TaskDraft = None, scheduling_unit_blueprint: models.SchedulingUnitBlueprint = None, specifications_template: models.TaskTemplate=None, specifications_doc: dict=None) -> dict: +def TaskBlueprint_test_data(name: str=None, task_draft: models.TaskDraft = None, scheduling_unit_blueprint: models.SchedulingUnitBlueprint = None, specifications_template: models.TaskTemplate=None, specifications_doc: dict=None) -> dict: + if name is None: + name = str(uuid.uuid4()) + if scheduling_unit_blueprint is None: scheduling_unit_blueprint = models.SchedulingUnitBlueprint.objects.create(**SchedulingUnitBlueprint_test_data())