diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/changeshandler.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/changeshandler.py index de420f125aefb258ffad0e5e0c4989502ae162de..52b90879f630964626099346eb25224ad5120717 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/changeshandler.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/changeshandler.py @@ -37,6 +37,10 @@ from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManage from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT +from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener +from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME +from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS + from lofar.common.util import humanreadablesize from lofar.common.util import waitForInterrupt from lofar.sas.resourceassignment.resourceassignmenteditor.mom import updateTaskMomDetails @@ -59,6 +63,7 @@ class ChangesHandler: def __init__(self, radb_busname=RADB_NOTIFICATION_BUSNAME, radb_subjects=RADB_NOTIFICATION_SUBJECTS, dm_busname=DEFAULT_DM_NOTIFICATION_BUSNAME, dm_subjects=DEFAULT_DM_NOTIFICATION_SUBJECTS, otdb_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, otdb_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT, + ingest_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME, ingest_subjects=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, broker=None, momqueryrpc=None, radbrpc=None, sqrpc=None, **kwargs): """ ChangesHandler listens on the lofar notification message bus and keeps track of all the change notifications. @@ -88,6 +93,9 @@ class ChangesHandler: self._otdb_listener = OTDBBusListener(busname=otdb_busname, subject=otdb_subject, broker=broker, **kwargs) self._otdb_listener.onObservationStatusChanged = self.onObservationStatusChanged + self._ingest_listener = IngestBusListener(busname=ingest_busname, subjects=ingest_subjects, broker=broker, **kwargs) + self._ingest_listener.onTaskFinished = self.onIngestTaskFinished + self._changes = [] self._lock = Lock() self._changedCondition = Condition() @@ -107,11 +115,13 @@ class ChangesHandler: self._radb_listener.start_listening() self._dm_listener.start_listening() self._otdb_listener.start_listening() + self._ingest_listener.start_listening() def stop_listening(self): self._radb_listener.stop_listening() self._dm_listener.stop_listening() self._otdb_listener.stop_listening() + self._ingest_listener.stop_listening() def _handleChange(self, change): '''_handleChange appends a change in the changes list and calls the onChangedCallback. @@ -216,6 +226,26 @@ class ChangesHandler: task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':updated_task} self._handleChange(task_change) + def onIngestTaskFinished(self, ingest_task_dict): + self._ingest_listener._logJobNotification('task finised', ingest_task_dict); + + if 'otdb_id' not in ingest_task_dict: + return + + otdb_id = ingest_task_dict['otdb_id'] + updated_task = self._radbrpc.getTask(otdb_id=otdb_id) + if not updated_task: + return + + updated_task['ingest_status'] = 'ingested' + task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':updated_task} + self._handleChange(task_change) + + task_type = updated_task.get('type', 'task') + message = 'Data for %s with otdb_id %s has been ingested to the LTA' % (task_type, otdb_id) + change = {'changeType':CHANGE_EVENT_TYPE, 'objectType':'logevent', 'value':message} + self._handleChange(change) + def onObservationStatusChanged(self, otdb_id, new_status, modificationTime): task = self._radbrpc.getTask(otdb_id=otdb_id) task_type = task.get('type', 'task') if task else 'task' diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py index 0a7fd97f4c581f2b76a1ad3f613ac3ceb6d68476..5dd75c1b5e3897e79d46a25670e22c32fe66580a 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py @@ -64,6 +64,8 @@ from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFA from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME +from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME +from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS from lofar.common import isProductionEnvironment, isTestEnvironment from lofar.common.util import humanreadablesize @@ -812,6 +814,8 @@ def main(): parser.add_option('--storagequery_servicename', dest='storagequery_servicename', type='string', default=DEFAULT_STORAGEQUERY_SERVICENAME, help='Name of the storagequeryservice, default: %default') parser.add_option('--dm_notification_busname', dest='dm_notification_busname', type='string', default=DEFAULT_DM_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the data management notifications are published, default: %default') parser.add_option('--dm_notification_subjects', dest='dm_notification_subjects', type='string', default=DEFAULT_DM_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the data management notification bus exchange on the qpid broker, default: %default') + parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string', default=DEFAULT_INGEST_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default') + parser.add_option('--ingest_notification_subjects', dest='ingest_notification_subjects', type='string', default=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the ingest notification bus exchange on the qpid broker, default: %default') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') (options, args) = parser.parse_args() @@ -834,6 +838,7 @@ def main(): changeshandler = ChangesHandler(radb_busname=options.radb_notification_busname, radb_subjects=options.radb_notification_subjects, dm_busname=options.dm_notification_busname, dm_subjects=options.dm_notification_subjects, otdb_busname=options.otdb_notification_busname, otdb_subject=options.otdb_notification_subject, + ingest_busname=options.ingest_notification_busname, ingest_subjects=options.ingest_notification_subjects, broker=options.broker, momqueryrpc=momqueryrpc, radbrpc=rarpc, sqrpc=sqrpc) with changeshandler, rarpc, otdbrpc, curpc, sqrpc, momrpc, momqueryrpc: