Skip to content
Snippets Groups Projects
Commit 8575ed63 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #10057: handle ingest taskfinished events

parent 7b5fe405
Branches
Tags
No related merge requests found
......@@ -37,6 +37,10 @@ from lofar.sas.datamanagement.common.datamanagementbuslistener import DataManage
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTDB_NOTIFICATION_SUBJECT
from lofar.lta.ingest.client.ingestbuslistener import IngestBusListener
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS
from lofar.common.util import humanreadablesize
from lofar.common.util import waitForInterrupt
from lofar.sas.resourceassignment.resourceassignmenteditor.mom import updateTaskMomDetails
......@@ -59,6 +63,7 @@ class ChangesHandler:
def __init__(self, radb_busname=RADB_NOTIFICATION_BUSNAME, radb_subjects=RADB_NOTIFICATION_SUBJECTS,
dm_busname=DEFAULT_DM_NOTIFICATION_BUSNAME, dm_subjects=DEFAULT_DM_NOTIFICATION_SUBJECTS,
otdb_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, otdb_subject=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
ingest_busname=DEFAULT_INGEST_NOTIFICATION_BUSNAME, ingest_subjects=DEFAULT_INGEST_NOTIFICATION_SUBJECTS,
broker=None, momqueryrpc=None, radbrpc=None, sqrpc=None, **kwargs):
"""
ChangesHandler listens on the lofar notification message bus and keeps track of all the change notifications.
......@@ -88,6 +93,9 @@ class ChangesHandler:
self._otdb_listener = OTDBBusListener(busname=otdb_busname, subject=otdb_subject, broker=broker, **kwargs)
self._otdb_listener.onObservationStatusChanged = self.onObservationStatusChanged
self._ingest_listener = IngestBusListener(busname=ingest_busname, subjects=ingest_subjects, broker=broker, **kwargs)
self._ingest_listener.onTaskFinished = self.onIngestTaskFinished
self._changes = []
self._lock = Lock()
self._changedCondition = Condition()
......@@ -107,11 +115,13 @@ class ChangesHandler:
self._radb_listener.start_listening()
self._dm_listener.start_listening()
self._otdb_listener.start_listening()
self._ingest_listener.start_listening()
def stop_listening(self):
self._radb_listener.stop_listening()
self._dm_listener.stop_listening()
self._otdb_listener.stop_listening()
self._ingest_listener.stop_listening()
def _handleChange(self, change):
'''_handleChange appends a change in the changes list and calls the onChangedCallback.
......@@ -216,6 +226,26 @@ class ChangesHandler:
task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':updated_task}
self._handleChange(task_change)
def onIngestTaskFinished(self, ingest_task_dict):
self._ingest_listener._logJobNotification('task finised', ingest_task_dict);
if 'otdb_id' not in ingest_task_dict:
return
otdb_id = ingest_task_dict['otdb_id']
updated_task = self._radbrpc.getTask(otdb_id=otdb_id)
if not updated_task:
return
updated_task['ingest_status'] = 'ingested'
task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':updated_task}
self._handleChange(task_change)
task_type = updated_task.get('type', 'task')
message = 'Data for %s with otdb_id %s has been ingested to the LTA' % (task_type, otdb_id)
change = {'changeType':CHANGE_EVENT_TYPE, 'objectType':'logevent', 'value':message}
self._handleChange(change)
def onObservationStatusChanged(self, otdb_id, new_status, modificationTime):
task = self._radbrpc.getTask(otdb_id=otdb_id)
task_type = task.get('type', 'task') if task else 'task'
......
......@@ -64,6 +64,8 @@ from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFA
from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_BUSNAME
from lofar.lta.ingest.common.config import DEFAULT_INGEST_NOTIFICATION_SUBJECTS
from lofar.common import isProductionEnvironment, isTestEnvironment
from lofar.common.util import humanreadablesize
......@@ -812,6 +814,8 @@ def main():
parser.add_option('--storagequery_servicename', dest='storagequery_servicename', type='string', default=DEFAULT_STORAGEQUERY_SERVICENAME, help='Name of the storagequeryservice, default: %default')
parser.add_option('--dm_notification_busname', dest='dm_notification_busname', type='string', default=DEFAULT_DM_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the data management notifications are published, default: %default')
parser.add_option('--dm_notification_subjects', dest='dm_notification_subjects', type='string', default=DEFAULT_DM_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the data management notification bus exchange on the qpid broker, default: %default')
parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string', default=DEFAULT_INGEST_NOTIFICATION_BUSNAME, help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default')
parser.add_option('--ingest_notification_subjects', dest='ingest_notification_subjects', type='string', default=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, help='Subject(s) to listen for on the ingest notification bus exchange on the qpid broker, default: %default')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
(options, args) = parser.parse_args()
......@@ -834,6 +838,7 @@ def main():
changeshandler = ChangesHandler(radb_busname=options.radb_notification_busname, radb_subjects=options.radb_notification_subjects,
dm_busname=options.dm_notification_busname, dm_subjects=options.dm_notification_subjects,
otdb_busname=options.otdb_notification_busname, otdb_subject=options.otdb_notification_subject,
ingest_busname=options.ingest_notification_busname, ingest_subjects=options.ingest_notification_subjects,
broker=options.broker, momqueryrpc=momqueryrpc, radbrpc=rarpc, sqrpc=sqrpc)
with changeshandler, rarpc, otdbrpc, curpc, sqrpc, momrpc, momqueryrpc:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment