Skip to content
Snippets Groups Projects
Commit 63d01f50 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-320: added first crude form of t_ingesttmssadapter

parent d94910ac
No related branches found
No related tags found
2 merge requests!308Resolve TMSS-495,!306Resolve TMSS-320
......@@ -39,13 +39,14 @@ from typing import Union
import logging
logger = logging.getLogger(__name__)
from lofar.common.util import single_line_with_single_spaces
class IngestEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, IngestEventMessageHandler):
'''The IngestBusListenerForIngestTMSSAdapter handles the Ingest EventMessages, adapts them handles them with updates on the TMSS REST API'''
def __init__(self, tmss_creds: DBCredentials):
self.tmss_client = TMSSsession.create_from_dbcreds(tmss_creds)
IngestEventMessageHandler.__init__(self, ['JobStarted', 'JobFinished', 'JobTransferFailed', 'JobRemoved'])
IngestEventMessageHandler.__init__(self)#, ['JobStarted', 'JobFinished', 'JobTransferFailed', 'JobRemoved'])
UsingToBusMixin.__init__(self)
def start_handling(self):
......@@ -71,11 +72,12 @@ class IngestEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, IngestEvent
self._update_tmss_status_if_applicable(job_dict, JobFailed)
def _update_tmss_status_if_applicable(self, job_dict, status):
pass
# TODO: update the tmss subtask/dataproduct status if applicable
# if job_dict.get('type','').lower() == 'tmss':
# if not self._mom_client.setStatus(job_dict.get('job_id'), status, job_dict.get('message')):
# raise Exception('Could not update status in MoM to %s for %s' % (jobState2String(status), job_dict.get('job_id')))
if job_dict.get('Type','').lower() == 'tmss':
logger.info('_update_tmss_status_if_applicable: %s status=%s', single_line_with_single_spaces(job_dict), status)
# self.tmss_client.
# if not self._mom_client.setStatus(job_dict.get('job_id'), status, job_dict.get('message')):
# raise Exception('Could not update status in MoM to %s for %s' % (jobState2String(status), job_dict.get('job_id')))
def _checkTaskFullyIngested(self, job_dict):
pass
......@@ -149,35 +151,55 @@ class IngestEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, IngestEvent
logger.error(str(e))
class IngestBusListenerForIngestTMSSAdapter(IngestEventMesssageBusListener):
'''The IngestBusListenerForIngestTMSSAdapter listens for Ingest EventMessages and handles them with the IngestEventMessageHandlerForIngestTMSSAdapter'''
def __init__(self, tmss_creds: DBCredentials, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
super().__init__(handler_type=IngestEventMessageHandlerForIngestTMSSAdapter,
handler_kwargs={'tmss_creds': tmss_creds},
exchange=exchange, broker=broker)
class TMSSEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, TMSSEventMessageHandler):
def __init__(self, tmss_creds: DBCredentials):
UsingToBusMixin.__init__(self)
TMSSEventMessageHandler.__init__(self, log_event_messages=False)
self.tmss_client = TMSSsession.create_from_dbcreds(tmss_creds)
super().__init__()
def start_handling(self):
super().start_handling()
UsingToBusMixin.start_handling(self)
TMSSEventMessageHandler.start_handling(self)
self.tmss_client.open()
def stop_handling(self):
TMSSEventMessageHandler.stop_handling(self)
UsingToBusMixin.stop_handling(self)
self.tmss_client.close()
super().stop_handling()
def onSubTaskStatusChanged(self, id: int, status: str):
super().onSubTaskStatusChanged(id, status)
if status == 'scheduled':
subtask = self.tmss_client.get_subtask(id)
logger.info(subtask)
# TODO: if this is an ingest subtask, create job xmls, and submit them to the ingest bus.
subtask_template = self.tmss_client.get_url_as_json_object(subtask['specifications_template'])
if subtask_template['type_value'] == 'ingest':
logger.info("TMSS Ingest subtask id=%s was scheduled. Creating ingest jobs per dataproduct...", id)
self.tmss_client.set_subtask_status(subtask['id'], 'queueing')
task_blueprint = self.tmss_client.get_url_as_json_object(subtask['task_blueprint'])
task_draft = self.tmss_client.get_url_as_json_object(task_blueprint['draft'])
scheduling_unit_draft = self.tmss_client.get_url_as_json_object(task_draft['scheduling_unit_draft'])
scheduling_set = self.tmss_client.get_url_as_json_object(scheduling_unit_draft['scheduling_set'])
project = self.tmss_client.get_url_as_json_object(scheduling_set['project'])
dataproducts = self.tmss_client.get_url_as_json_object(subtask['url'] + '/input_dataproducts')
for dataproduct in dataproducts:
dp_global_identifier = self.tmss_client.get_url_as_json_object(dataproduct['global_identifier'])
producer = self.tmss_client.get_url_as_json_object(dataproduct['producer'])
producing_subtask = self.tmss_client.get_url_as_json_object(producer['subtask'])
job = createJobXml(project['name'], producing_subtask['id'], dataproduct['filename'],
dp_global_identifier['unique_identifier'], os.path.join(dataproduct['directory'], dataproduct['filename']),
tmss_ingest_subtask_id=subtask['id'])
msg = CommandMessage(content=job, subject=DEFAULT_INGEST_INCOMING_JOB_SUBJECT)
logger.info('submitting job %s to exchange %s with subject %s at broker %s',
parseJobXml(job)['JobId'], self._tobus.exchange, msg.subject, self._tobus.broker)
self.send(msg)
self.tmss_client.set_subtask_status(subtask['id'], 'queued')
logger.info("Created and enqueued ingest jobs for all dataproducts in TMSS Ingest subtask id=%s", id)
# def onXmlRPCJobReceived(self, fileName, fileContent):
......@@ -209,20 +231,16 @@ class TMSSEventMessageHandlerForIngestTMSSAdapter(UsingToBusMixin, TMSSEventMess
# return True
class TMSSBusListenerForIngestTMSSAdapter(TMSSBusListener):
'''The TMSSBusListenerForIngestTMSSAdapter listens for TMSS EventMessages and handles them with the TMSSEventMessageHandlerForIngestTMSSAdapter'''
def __init__(self, tmss_creds: DBCredentials, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
super().__init__(handler_type=TMSSEventMessageHandlerForIngestTMSSAdapter,
handler_kwargs={'tmss_creds': tmss_creds},
routing_key=TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.#',
exchange=exchange, broker=broker)
class IngestTMSSAdapter:
def __init__(self, tmss_creds: DBCredentials, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
self.ingest2tmss_adapter = IngestBusListenerForIngestTMSSAdapter(tmss_creds, exchange, broker)
self.tmss2ingest_adapter = TMSSBusListenerForIngestTMSSAdapter(tmss_creds, exchange, broker)
self.ingest2tmss_adapter = IngestEventMesssageBusListener(handler_type=IngestEventMessageHandlerForIngestTMSSAdapter,
handler_kwargs={'tmss_creds': tmss_creds},
exchange=exchange, broker=broker)
self.tmss2ingest_adapter = TMSSBusListener(handler_type=TMSSEventMessageHandlerForIngestTMSSAdapter,
handler_kwargs={'tmss_creds': tmss_creds},
routing_key=TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.#',
exchange=exchange, broker=broker)
def open(self):
self.ingest2tmss_adapter.start_listening()
......@@ -230,7 +248,7 @@ class IngestTMSSAdapter:
def close(self):
self.ingest2tmss_adapter.stop_listening()
self.tmss2ingest_adapter.start_listening()
self.tmss2ingest_adapter.stop_listening()
def __enter__(self):
self.open()
......
......@@ -2,5 +2,6 @@ include(LofarCTest)
lofar_add_test(t_ingestjobmanagementserver)
lofar_add_test(t_ingestmomadapter)
lofar_add_test(t_ingesttmssadapter)
#!/usr/bin/env python3
import unittest
from unittest import mock
from random import randint
from pysimplesoap.client import SoapClient
from time import sleep
import logging
logger = logging.getLogger(__name__)
from lofar.messaging.messagebus import TemporaryExchange, TemporaryQueue, BusListenerJanitor
from lofar.messaging.messages import EventMessage
from lofar.lta.ingest.common.job import parseJobXml
from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX
from lofar.common.util import single_line_with_single_spaces
from lofar.common.test_utils import integration_test
from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
from lofar.lta.ingest.server.ingesttmssadapter import *
from lofar.lta.ingest.common import config as ingest_config
from datetime import timedelta
@integration_test
class TestIngestTMSSAdapter(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.tmp_exchange = TemporaryExchange("TestIngestTMSSAdapter")
cls.tmp_exchange.open()
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address,
start_postgres_listener=True,
populate_schemas=True)
cls.tmss_test_env.start()
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
cls.tmp_exchange.close()
@staticmethod
def wait_for_subtask_to_get_status(subtask_id, expected_status, timeout=30):
'''helper method to poll for a subtask's status.
raises TimeoutError if expected_status is not met withing timout seconds.
returns subtask when expected_status is met.'''
from lofar.sas.tmss.tmss.tmssapp import models
start = datetime.utcnow()
subtask = models.Subtask.objects.get(id=subtask_id)
while subtask.state.value != expected_status:
sleep(0.1)
logger.info("Waiting for subtask id=%s to get status '%s'. Current status='%s'. Polling...", subtask_id, expected_status, subtask.state.value)
subtask.refresh_from_db()
if datetime.utcnow() - start > timedelta(seconds=timeout):
raise TimeoutError("timeout while waiting for subtask id=%s to get status '%s'. It currently has status '%s'" % (subtask_id, expected_status, subtask.state.value))
return subtask
def test(self):
from lofar.sas.tmss.test.tmss_test_data_django_models import SubtaskTemplate_test_data, Subtask_test_data, \
TaskBlueprint_test_data, TaskTemplate_test_data, Dataproduct_test_data, SubtaskOutput_test_data, SubtaskInput_test_data
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.common.json_utils import get_default_json_object_for_schema
obs_task_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data(task_type_value='observation'))
obs_task = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(specifications_template=obs_task_template))
obs_subtask_template = models.SubtaskTemplate.objects.get(name='observation control')
obs_subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=obs_subtask_template, task_blueprint=obs_task))
obs_subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=obs_subtask))
feedback_template = models.DataproductFeedbackTemplate.objects.get(name='feedback')
feedback_doc = get_default_json_object_for_schema(feedback_template.schema)
feedback_doc['frequency']['subbands'] = [0]
feedback_doc['frequency']['central_frequencies'] = [1]
dataproducts = [models.Dataproduct.objects.create(**Dataproduct_test_data(producer=obs_subtask_output, feedback_template=feedback_template, feedback_doc=feedback_doc)) for _ in range(2)]
ingest_task_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data(task_type_value='ingest'))
ingest_task = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data(scheduling_unit_blueprint=obs_subtask.task_blueprint.scheduling_unit_blueprint,
specifications_template=ingest_task_template))
ingest_subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data(subtask_type_value='ingest'))
ingest_subtask = models.Subtask.objects.create(**Subtask_test_data(subtask_template=ingest_subtask_template, task_blueprint=ingest_task))
ingest_subtask_input = models.SubtaskInput.objects.create(**SubtaskInput_test_data(subtask=ingest_subtask, producer=obs_subtask_output))
ingest_subtask_input.dataproducts.set(models.Dataproduct.objects.filter(producer=obs_subtask_output).all())
ingest_subtask_input.save()
ingest_subtask_output = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data(subtask=ingest_subtask))
ingested_dataproducts = [models.Dataproduct.objects.create(**Dataproduct_test_data(producer=ingest_subtask_output)) for dp in dataproducts]
transforms = [models.DataproductTransform(input=input_dp, output=output_dp, identity=True) for input_dp,output_dp in zip(dataproducts, ingested_dataproducts)]
models.DataproductTransform.objects.bulk_create(transforms)
with self.tmp_exchange.create_tobus() as test_notifier_tobus:
def sendIngestNotification(event, job_id, export_id=None):
content = {'job_id': job_id, 'Type': "TMSS"}
if export_id:
content['export_id'] = export_id
event_msg = EventMessage(subject="%s.%s" % (ingest_config.INGEST_NOTIFICATION_PREFIX, event), content=content)
logger.info('sending test event message on %s subject=%s content=%s', test_notifier_tobus.exchange, event_msg.subject, event_msg.content)
test_notifier_tobus.send(event_msg)
# create a tmp job receiver queue
with self.tmp_exchange.create_temporary_queue(routing_key=DEFAULT_INGEST_INCOMING_JOB_SUBJECT) as tmp_job_queue:
with tmp_job_queue.create_frombus() as job_receiver:
with IngestTMSSAdapter(tmss_creds=self.tmss_test_env.client_credentials.dbcreds, exchange=self.tmp_exchange.address):
ingest_subtask.state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.SCHEDULED.value)
ingest_subtask.save()
# there should now be a job per dataproduct on the tmp_job_queue. receive and check it.
# and keep track of receiced jobs for later test usage
dp2jobs = {}
for dataproduct in dataproducts:
job_msg = job_receiver.receive(timeout=5)
self.assertIsNotNone(job_msg)
logger.info("received job xml: %s", single_line_with_single_spaces(job_msg.content))
job = parseJobXml(job_msg.content)
logger.info("extracted job: %s", job)
self.assertEqual("TMSS", job['Type'])
self.assertEqual(str(ingest_subtask.id), job['TMSSIngestSubtaskId'])
self.assertEqual(str(obs_subtask.id), job['ObservationId'])
self.assertEqual(dataproduct.filepath, job['Location'])
self.assertEqual(obs_task.scheduling_unit_blueprint.draft.scheduling_set.project.name, job['Project'])
dp2jobs[dataproduct] = job
# ingest subtask should now be 'queued'
self.assertEqual('queued', TestIngestTMSSAdapter.wait_for_subtask_to_get_status(ingest_subtask.id, 'queued').state.value)
for dataproduct, job in dp2jobs.items():
sendIngestNotification('JobStarted', job['JobId'], export_id=job['job_group_id'])
sendIngestNotification('JobFinished', job['JobId'], export_id=job['job_group_id'])
# self.assertEqual(job_xml, job_msg.content)
waitForInterrupt()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
if __name__ == '__main__':
unittest.main()
#!/bin/bash
# Run the unit test
source python-coverage.sh
python_coverage_test "*LTAIngest*" t_ingesttmssadapter.py
#!/bin/sh
./runctest.sh t_ingesttmssadapter
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment