diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py index b6cf5859e5d69ae0e4d5d52d3ba8b2dbf67076b7..f2957a171174dc807382961800613a7865d2e90e 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingesttmssadapter.py @@ -39,13 +39,14 @@ from typing import Union import logging logger = logging.getLogger(__name__) +from lofar.common.util import single_line_with_single_spaces class IngestEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, IngestEventMessageHandler): '''The IngestBusListenerForIngestTMSSAdapter handles the Ingest EventMessages, adapts them handles them with updates on the TMSS REST API''' def __init__(self, tmss_creds: DBCredentials): self.tmss_client = TMSSsession.create_from_dbcreds(tmss_creds) - IngestEventMessageHandler.__init__(self, ['JobStarted', 'JobFinished', 'JobTransferFailed', 'JobRemoved']) + IngestEventMessageHandler.__init__(self)#, ['JobStarted', 'JobFinished', 'JobTransferFailed', 'JobRemoved']) UsingToBusMixin.__init__(self) def start_handling(self): @@ -71,11 +72,12 @@ class IngestEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, IngestEvent self._update_tmss_status_if_applicable(job_dict, JobFailed) def _update_tmss_status_if_applicable(self, job_dict, status): - pass # TODO: update the tmss subtask/dataproduct status if applicable - # if job_dict.get('type','').lower() == 'tmss': - # if not self._mom_client.setStatus(job_dict.get('job_id'), status, job_dict.get('message')): - # raise Exception('Could not update status in MoM to %s for %s' % (jobState2String(status), job_dict.get('job_id'))) + if job_dict.get('Type','').lower() == 'tmss': + logger.info('_update_tmss_status_if_applicable: %s status=%s', single_line_with_single_spaces(job_dict), status) + # self.tmss_client. + # if not self._mom_client.setStatus(job_dict.get('job_id'), status, job_dict.get('message')): + # raise Exception('Could not update status in MoM to %s for %s' % (jobState2String(status), job_dict.get('job_id'))) def _checkTaskFullyIngested(self, job_dict): pass @@ -149,35 +151,55 @@ class IngestEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, IngestEvent logger.error(str(e)) - -class IngestBusListenerForIngestTMSSAdapter(IngestEventMesssageBusListener): - '''The IngestBusListenerForIngestTMSSAdapter listens for Ingest EventMessages and handles them with the IngestEventMessageHandlerForIngestTMSSAdapter''' - def __init__(self, tmss_creds: DBCredentials, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): - super().__init__(handler_type=IngestEventMessageHandlerForIngestTMSSAdapter, - handler_kwargs={'tmss_creds': tmss_creds}, - exchange=exchange, broker=broker) - - class TMSSEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, TMSSEventMessageHandler): def __init__(self, tmss_creds: DBCredentials): + UsingToBusMixin.__init__(self) + TMSSEventMessageHandler.__init__(self, log_event_messages=False) self.tmss_client = TMSSsession.create_from_dbcreds(tmss_creds) - super().__init__() def start_handling(self): - super().start_handling() + UsingToBusMixin.start_handling(self) + TMSSEventMessageHandler.start_handling(self) self.tmss_client.open() def stop_handling(self): + TMSSEventMessageHandler.stop_handling(self) + UsingToBusMixin.stop_handling(self) self.tmss_client.close() - super().stop_handling() def onSubTaskStatusChanged(self, id: int, status: str): super().onSubTaskStatusChanged(id, status) if status == 'scheduled': subtask = self.tmss_client.get_subtask(id) - logger.info(subtask) - # TODO: if this is an ingest subtask, create job xmls, and submit them to the ingest bus. + subtask_template = self.tmss_client.get_url_as_json_object(subtask['specifications_template']) + if subtask_template['type_value'] == 'ingest': + logger.info("TMSS Ingest subtask id=%s was scheduled. Creating ingest jobs per dataproduct...", id) + self.tmss_client.set_subtask_status(subtask['id'], 'queueing') + + task_blueprint = self.tmss_client.get_url_as_json_object(subtask['task_blueprint']) + task_draft = self.tmss_client.get_url_as_json_object(task_blueprint['draft']) + scheduling_unit_draft = self.tmss_client.get_url_as_json_object(task_draft['scheduling_unit_draft']) + scheduling_set = self.tmss_client.get_url_as_json_object(scheduling_unit_draft['scheduling_set']) + project = self.tmss_client.get_url_as_json_object(scheduling_set['project']) + dataproducts = self.tmss_client.get_url_as_json_object(subtask['url'] + '/input_dataproducts') + + for dataproduct in dataproducts: + dp_global_identifier = self.tmss_client.get_url_as_json_object(dataproduct['global_identifier']) + producer = self.tmss_client.get_url_as_json_object(dataproduct['producer']) + producing_subtask = self.tmss_client.get_url_as_json_object(producer['subtask']) + + job = createJobXml(project['name'], producing_subtask['id'], dataproduct['filename'], + dp_global_identifier['unique_identifier'], os.path.join(dataproduct['directory'], dataproduct['filename']), + tmss_ingest_subtask_id=subtask['id']) + + msg = CommandMessage(content=job, subject=DEFAULT_INGEST_INCOMING_JOB_SUBJECT) + logger.info('submitting job %s to exchange %s with subject %s at broker %s', + parseJobXml(job)['JobId'], self._tobus.exchange, msg.subject, self._tobus.broker) + self.send(msg) + + self.tmss_client.set_subtask_status(subtask['id'], 'queued') + logger.info("Created and enqueued ingest jobs for all dataproducts in TMSS Ingest subtask id=%s", id) # def onXmlRPCJobReceived(self, fileName, fileContent): @@ -209,20 +231,16 @@ class TMSSEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, TMSSEventMess # return True -class TMSSBusListenerForIngestTMSSAdapter(TMSSBusListener): - '''The TMSSBusListenerForIngestTMSSAdapter listens for TMSS EventMessages and handles them with the TMSSEventMessageHandlerForIngestTMSSAdapter''' - def __init__(self, tmss_creds: DBCredentials, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): - super().__init__(handler_type=TMSSEventMessageHandlerForIngestTMSSAdapter, - handler_kwargs={'tmss_creds': tmss_creds}, - routing_key=TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.#', - exchange=exchange, broker=broker) - - class IngestTMSSAdapter: def __init__(self, tmss_creds: DBCredentials, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): - self.ingest2tmss_adapter = IngestBusListenerForIngestTMSSAdapter(tmss_creds, exchange, broker) - self.tmss2ingest_adapter = TMSSBusListenerForIngestTMSSAdapter(tmss_creds, exchange, broker) + self.ingest2tmss_adapter = IngestEventMesssageBusListener(handler_type=IngestEventMessageHandlerForIngestTMSSAdapter, + handler_kwargs={'tmss_creds': tmss_creds}, + exchange=exchange, broker=broker) + self.tmss2ingest_adapter = TMSSBusListener(handler_type=TMSSEventMessageHandlerForIngestTMSSAdapter, + handler_kwargs={'tmss_creds': tmss_creds}, + routing_key=TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.#', + exchange=exchange, broker=broker) def open(self): self.ingest2tmss_adapter.start_listening() @@ -230,7 +248,7 @@ class IngestTMSSAdapter: def close(self): self.ingest2tmss_adapter.stop_listening() - self.tmss2ingest_adapter.start_listening() + self.tmss2ingest_adapter.stop_listening() def __enter__(self): self.open() diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/CMakeLists.txt b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/CMakeLists.txt index 045aa645ab7a9e6cae3fbd8b8a8066bae0f6c9b5..791494c7486a1b8790553112f9b8334402e6d658 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/CMakeLists.txt +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/CMakeLists.txt @@ -2,5 +2,6 @@ include(LofarCTest) lofar_add_test(t_ingestjobmanagementserver) lofar_add_test(t_ingestmomadapter) +lofar_add_test(t_ingesttmssadapter) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.py new file mode 100755 index 0000000000000000000000000000000000000000..cd8e5d9b703ec543ebcd1dabffca6d00136e41e7 --- /dev/null +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python3 + +import unittest +from unittest import mock +from random import randint +from pysimplesoap.client import SoapClient +from time import sleep + +import logging +logger = logging.getLogger(__name__) + +from lofar.messaging.messagebus import TemporaryExchange, TemporaryQueue, BusListenerJanitor +from lofar.messaging.messages import EventMessage +from lofar.lta.ingest.common.job import parseJobXml +from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX +from lofar.common.util import single_line_with_single_spaces +from lofar.common.test_utils import integration_test +from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment +from lofar.lta.ingest.server.ingesttmssadapter import * +from lofar.lta.ingest.common import config as ingest_config +from datetime import timedelta + +@integration_test +class TestIngestTMSSAdapter(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + cls.tmp_exchange = TemporaryExchange("TestIngestTMSSAdapter") + cls.tmp_exchange.open() + + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, + start_postgres_listener=True, + populate_schemas=True) + cls.tmss_test_env.start() + + @classmethod + def tearDownClass(cls) -> None: + cls.tmss_test_env.stop() + cls.tmp_exchange.close() + + @staticmethod + def wait_for_subtask_to_get_status(subtask_id, expected_status, timeout=30): + '''helper method to poll for a subtask's status. + raises TimeoutError if expected_status is not met withing timout seconds. + returns subtask when expected_status is met.''' + from lofar.sas.tmss.tmss.tmssapp import models + start = datetime.utcnow() + subtask = models.Subtask.objects.get(id=subtask_id) + while subtask.state.value != expected_status: + sleep(0.1) + logger.info("Waiting for subtask id=%s to get status '%s'. Current status='%s'. Polling...", subtask_id, expected_status, subtask.state.value) + subtask.refresh_from_db() + if datetime.utcnow() - start > timedelta(seconds=timeout): + raise TimeoutError("timeout while waiting for subtask id=%s to get status '%s'. It currently has status '%s'" % (subtask_id, expected_status, subtask.state.value)) + + return subtask + + + def test(self): + from lofar.sas.tmss.test.tmss_test_data_django_models import SubtaskTemplate_test_data, Subtask_test_data, \ + TaskBlueprint_test_data, TaskTemplate_test_data, Dataproduct_test_data, SubtaskOutput_test_data, SubtaskInput_test_data + from lofar.sas.tmss.tmss.tmssapp import models + from lofar.common.json_utils import get_default_json_object_for_schema + + obs_task_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data(task_type_value='observation')) + obs_task = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(specifications_template=obs_task_template)) + obs_subtask_template = models.SubtaskTemplate.objects.get(name='observation control') + obs_subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=obs_subtask_template, task_blueprint=obs_task)) + obs_subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=obs_subtask)) + + feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback') + feedback_doc = get_default_json_object_for_schema(feedback_template.schema) + feedback_doc['frequency']['subbands'] = [0] + feedback_doc['frequency']['central_frequencies'] = [1] + dataproducts = [models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_subtask_output, feedback_template=feedback_template, feedback_doc=feedback_doc)) for _ in range(2)] + + ingest_task_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data(task_type_value='ingest')) + ingest_task = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(scheduling_unit_blueprint=obs_subtask.task_blueprint.scheduling_unit_blueprint, + specifications_template=ingest_task_template)) + ingest_subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data(subtask_type_value='ingest')) + ingest_subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=ingest_subtask_template, task_blueprint=ingest_task)) + ingest_subtask_input = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=ingest_subtask, producer=obs_subtask_output)) + ingest_subtask_input.dataproducts.set(models.Dataproduct.objects.filter(producer=obs_subtask_output).all()) + ingest_subtask_input.save() + + ingest_subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=ingest_subtask)) + ingested_dataproducts = [models.Dataproduct.objects.create(**Dataproduct_test_data(producer=ingest_subtask_output)) for dp in dataproducts] + + transforms = [models.DataproductTransform(input=input_dp, output=output_dp, identity=True) for input_dp,output_dp in zip(dataproducts, ingested_dataproducts)] + models.DataproductTransform.objects.bulk_create(transforms) + + + with self.tmp_exchange.create_tobus() as test_notifier_tobus: + def sendIngestNotification(event, job_id, export_id=None): + content = {'job_id': job_id, 'Type': "TMSS"} + if export_id: + content['export_id'] = export_id + event_msg = EventMessage(subject="%s.%s" % (ingest_config.INGEST_NOTIFICATION_PREFIX, event), content=content) + logger.info('sending test event message on %s subject=%s content=%s', test_notifier_tobus.exchange, event_msg.subject, event_msg.content) + test_notifier_tobus.send(event_msg) + + # create a tmp job receiver queue + with self.tmp_exchange.create_temporary_queue(routing_key=DEFAULT_INGEST_INCOMING_JOB_SUBJECT) as tmp_job_queue: + with tmp_job_queue.create_frombus() as job_receiver: + + with IngestTMSSAdapter(tmss_creds=self.tmss_test_env.client_credentials.dbcreds, exchange=self.tmp_exchange.address): + ingest_subtask.state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.SCHEDULED.value) + ingest_subtask.save() + + # there should now be a job per dataproduct on the tmp_job_queue. receive and check it. + # and keep track of receiced jobs for later test usage + dp2jobs = {} + + for dataproduct in dataproducts: + job_msg = job_receiver.receive(timeout=5) + self.assertIsNotNone(job_msg) + logger.info("received job xml: %s", single_line_with_single_spaces(job_msg.content)) + job = parseJobXml(job_msg.content) + logger.info("extracted job: %s", job) + self.assertEqual("TMSS", job['Type']) + self.assertEqual(str(ingest_subtask.id), job['TMSSIngestSubtaskId']) + self.assertEqual(str(obs_subtask.id), job['ObservationId']) + self.assertEqual(dataproduct.filepath, job['Location']) + self.assertEqual(obs_task.scheduling_unit_blueprint.draft.scheduling_set.project.name, job['Project']) + dp2jobs[dataproduct] = job + + # ingest subtask should now be 'queued' + self.assertEqual('queued', TestIngestTMSSAdapter.wait_for_subtask_to_get_status(ingest_subtask.id, 'queued').state.value) + + for dataproduct, job in dp2jobs.items(): + sendIngestNotification('JobStarted', job['JobId'], export_id=job['job_group_id']) + sendIngestNotification('JobFinished', job['JobId'], export_id=job['job_group_id']) + + # self.assertEqual(job_xml, job_msg.content) + + waitForInterrupt() + +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + +if __name__ == '__main__': + unittest.main() diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.run b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.run new file mode 100755 index 0000000000000000000000000000000000000000..a03c3b2fa701127ab29f81a6bfe913097c392e13 --- /dev/null +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.run @@ -0,0 +1,6 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "*LTAIngest*" t_ingesttmssadapter.py + diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.sh b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.sh new file mode 100755 index 0000000000000000000000000000000000000000..d69b3a2a7b3429eec4c8ad0f54b4bb1b2b377dac --- /dev/null +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/test/t_ingesttmssadapter.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_ingesttmssadapter