diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py index 2db1316e19116db9630de7d6e86256a892a02fd0..1251ede07ad56b68ac27a017f19c81f47b0bea78 100644 --- a/LCS/PyCommon/postgres.py +++ b/LCS/PyCommon/postgres.py @@ -33,60 +33,34 @@ import psycopg2.extensions logger = logging.getLogger(__name__) -def makePostgresNotificationQueries(schema, table, action, view_for_row=None, view_selection_id=None): +def makePostgresNotificationQueries(schema, table, action, column_name='id'): action = action.upper() if action not in ('INSERT', 'UPDATE', 'DELETE'): raise ValueError('''trigger_type '%s' not in ('INSERT', 'UPDATE', 'DELETE')''' % action) - if view_for_row and action == 'DELETE': - raise ValueError('You cannot use a view for results on action DELETE') - - if view_for_row: - change_name = '''{table}_{action}_with_{view_for_row}'''.format(schema=schema, - table=table, - action=action, - view_for_row=view_for_row) - function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name) - function_sql = ''' - CREATE OR REPLACE FUNCTION {schema}.{function_name}() - RETURNS TRIGGER AS $$ - DECLARE - new_row_from_view {schema}.{view_for_row}%ROWTYPE; - BEGIN - select * into new_row_from_view from {schema}.{view_for_row} where {view_selection_id} = NEW.id LIMIT 1; - PERFORM pg_notify(CAST('{change_name}' AS text), - '{{"old":' || {old} || ',"new":' || row_to_json(new_row_from_view)::text || '}}'); - RETURN NEW; - END; - $$ LANGUAGE plpgsql; - '''.format(schema=schema, - function_name=function_name, - table=table, - action=action, - old='row_to_json(OLD)::text' if action == 'UPDATE' or action == 'DELETE' else '\'null\'', - view_for_row=view_for_row, - view_selection_id=view_selection_id if view_selection_id else 'id', - change_name=change_name.lower()) - else: - change_name = '''{table}_{action}'''.format(table=table, action=action) - function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name) - function_sql = ''' - CREATE OR REPLACE FUNCTION {schema}.{function_name}() - RETURNS TRIGGER AS $$ - BEGIN - PERFORM pg_notify(CAST('{change_name}' AS text), - '{{"old":' || {old} || ',"new":' || {new} || '}}'); - RETURN {value}; - END; - $$ LANGUAGE plpgsql; - '''.format(schema=schema, - function_name=function_name, - table=table, - action=action, - old='row_to_json(OLD)::text' if action == 'UPDATE' or action == 'DELETE' else '\'null\'', - new='row_to_json(NEW)::text' if action == 'UPDATE' or action == 'INSERT' else '\'null\'', - value='OLD' if action == 'DELETE' else 'NEW', - change_name=change_name.lower()) + change_name = '''{table}_{action}'''.format(table=table, action=action) + if column_name != 'id': + change_name += '_column_' + column_name + function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name) + function_sql = ''' + CREATE OR REPLACE FUNCTION {schema}.{function_name}() + RETURNS TRIGGER AS $$ + DECLARE payload text; + BEGIN + {begin_update_check}SELECT CAST({column_value} AS text) INTO payload; + PERFORM pg_notify(CAST('{change_name}' AS text), payload);{end_update_check} + RETURN {value}; + END; + $$ LANGUAGE plpgsql; + '''.format(schema=schema, + function_name=function_name, + table=table, + action=action, + column_value=('OLD' if action == 'DELETE' else 'NEW') + '.' + column_name, + value='OLD' if action == 'DELETE' else 'NEW', + change_name=change_name.lower(), + begin_update_check='IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN\n' if action == 'UPDATE' else '', + end_update_check='\nEND IF;' if action == 'UPDATE' else '') trigger_name = 'TRIGGER_NOTIFY_%s' % function_name diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index f78e06beb3e3de7a3a0201ccdcaf06102fa11c5f..f2117648b52df96c957ea6b84bd6263c50d3f481 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -135,8 +135,8 @@ class ResourceAssigner(): clusterIsCEP4 = self.checkClusterIsCEP4(mainParset) clusterName = 'CEP4' if clusterIsCEP4 else 'CEP2' - if clusterIsCEP4: - def applySaneStartEndTime(): + def applySaneStartEndTime(): + if clusterIsCEP4: startTime = datetime.utcnow() + timedelta(minutes=1) maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree) @@ -160,26 +160,26 @@ class ResourceAssigner(): return startTime, endTime - try: - startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S') - endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') - if startTime < datetime.utcnow(): - startTime, endTime = applySaneStartEndTime() - except ValueError: - logger.warning('cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', otdb_id) + try: + startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S') + endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') + if startTime < datetime.utcnow(): startTime, endTime = applySaneStartEndTime() + except ValueError: + logger.warning('cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...', otdb_id) + startTime, endTime = applySaneStartEndTime() - try: - # fix for MoM bug introduced before NV's holiday - # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4 - # so, override it here if needed, and update to otdb - processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '') - if processingClusterName != clusterName: - logger.info('overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s', - processingClusterName, clusterName, otdb_id) - self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName }) - except Exception as e: - logger.error(e) + try: + # fix for MoM bug introduced before NV's holiday + # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4 + # so, override it here if needed, and update to otdb + processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '') + if processingClusterName != clusterName: + logger.info('overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s', + processingClusterName, clusterName, otdb_id) + self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName }) + except Exception as e: + logger.error(e) # insert new task and specification in the radb # any existing specification and task with same otdb_id will be deleted automatically diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py index d309b2a229cc213b9e588c557e2a458890bacafa..7e6d3371ede7cc8cb6dde0a9432b3b6de5e76c06 100644 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/schedulechecker.py @@ -53,12 +53,32 @@ def movePipelineAfterItsPredecessors(task, radbrpc, min_start_timestamp=None): if (task['starttime'] < max_pred_endtime) or (min_start_timestamp and task['starttime'] > min_start_timestamp): shift = max_pred_endtime - task['starttime'] - logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], max_pred_endtime) - radbrpc.updateTaskAndResourceClaims(task['id'], starttime=task['starttime']+shift, endtime=task['endtime']+shift) - updated_task = radbrpc.getTask(task['id']) - if updated_task['status'] != task['status']: - logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status']) - #TODO: automatically resolve conflict status by moved pipeline in first free time slot. + newStartTime = task['starttime']+shift + newEndTime = task['endtime']+shift + + # move pipeline even further ahead in case there are more than 2 overlapping scheduled/queued pipelines + while True: + overlapping_pipelines = radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status='scheduled', cluster='CEP4') + overlapping_pipelines += radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status='queued', cluster='CEP4') + overlapping_pipelines += radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status='active', cluster='CEP4') + overlapping_pipelines += radbrpc.getTasks(lower_bound=newStartTime, upper_bound=newEndTime, task_type='pipeline', task_status='completing', cluster='CEP4') + overlapping_pipelines = [pl for pl in overlapping_pipelines if pl['id'] != task['id']] + + if len(overlapping_pipelines) >= 2: + max_overlapping_pipeline_endtime = max([t['endtime'] for t in overlapping_pipelines]) + shift = max_overlapping_pipeline_endtime + timedelta(minutes=1) - task['starttime'] + newStartTime = task['starttime']+shift + newEndTime = task['endtime']+shift + else: + break + + if shift != timedelta(seconds=0): + logger.info("Moving %s pipeline radb_id=%s otdb_id=%s by %s from \'%s\' to \'%s\'", task['status'], task['id'], task['otdb_id'], shift, task['starttime'], newStartTime) + radbrpc.updateTaskAndResourceClaims(task['id'], starttime=newStartTime, endtime=newEndTime) + updated_task = radbrpc.getTask(task['id']) + if updated_task['status'] != task['status']: + logger.warn("Moving of pipeline radb_id=%s otdb_id=%s caused the status to change from %s to %s", updated_task['id'], updated_task['otdb_id'], task['status'], updated_task['status']) + #TODO: automatically resolve conflict status by moved pipeline in first free time slot. except Exception as e: logger.error("Error while checking pipeline starttime: %s", e) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index 79126379d7efe35293a61af493a412f55c0239f3..7f69d9723fad8348c830c5cb068cb082fbf48c38 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -231,21 +231,24 @@ class RADatabase: return tasks - def getTask(self, id=None, mom_id=None, otdb_id=None): - '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id''' - ids = [id, mom_id, otdb_id] + def getTask(self, id=None, mom_id=None, otdb_id=None, specification_id=None): + '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id, or for the given specification_id''' + ids = [id, mom_id, otdb_id, specification_id] validIds = [x for x in ids if x != None] if len(validIds) != 1: - raise KeyError("Provide one and only one id: id=%s, mom_id=%s, otdb_id=%s" % (id, mom_id, otdb_id)) + raise KeyError("Provide one and only one id: id=%s, mom_id=%s, otdb_id=%s, specification_id=%s" % (id, mom_id, otdb_id, specification_id)) query = '''SELECT * from resource_allocation.task_view tv ''' - if id: + if id is not None: query += '''where tv.id = (%s);''' - elif mom_id: + elif mom_id is not None: query += '''where tv.mom_id = (%s);''' - elif otdb_id: + elif otdb_id is not None: query += '''where tv.otdb_id = (%s);''' + elif specification_id is not None: + query += '''where tv.specification_id = (%s);''' + result = self._executeQuery(query, validIds, fetch=_FETCH_ONE) task = dict(result) if result else None @@ -272,10 +275,10 @@ class RADatabase: return task_status, task_type def insertTask(self, mom_id, otdb_id, task_status, task_type, specification_id, commit=True): - if isinstance(mom_id, int) and mom_id <= 0: + if isinstance(mom_id, int) and mom_id < 0: mom_id = None - if isinstance(otdb_id, int) and otdb_id <= 0: + if isinstance(otdb_id, int) and otdb_id < 0: otdb_id = None logger.info('insertTask mom_id=%s, otdb_id=%s, task_status=%s, task_type=%s, specification_id=%s' % @@ -358,9 +361,14 @@ class RADatabase: return self.cursor.rowcount > 0 - def getTaskPredecessorIds(self): - query = '''SELECT * from resource_allocation.task_predecessor tp;''' - items = list(self._executeQuery(query, fetch=_FETCH_ALL)) + def getTaskPredecessorIds(self, id=None): + query = '''SELECT * from resource_allocation.task_predecessor tp''' + + if id is not None : + query += ' WHERE id=%s' + + items = list(self._executeQuery(query, [id] if id is not None else None, fetch=_FETCH_ALL)) + predIdDict = {} for item in items: taskId = item['task_id'] @@ -369,9 +377,14 @@ class RADatabase: predIdDict[taskId].append(item['predecessor_id']) return predIdDict - def getTaskSuccessorIds(self): + def getTaskSuccessorIds(self, id=None): query = '''SELECT * from resource_allocation.task_predecessor tp;''' - items = list(self._executeQuery(query, fetch=_FETCH_ALL)) + + if id is not None : + query += ' WHERE id=%s' + + items = list(self._executeQuery(query, [id] if id is not None else None, fetch=_FETCH_ALL)) + succIdDict = {} for item in items: predId = item['predecessor_id'] @@ -387,11 +400,11 @@ class RADatabase: items = list(self._executeQuery(query, [task_id], fetch=_FETCH_ALL)) return [x['predecessor_id'] for x in items] - def getTaskSuccessorIdsForTask(self, task_): + def getTaskSuccessorIdsForTask(self, task_id): query = '''SELECT * from resource_allocation.task_predecessor tp WHERE tp.predecessor_id = %s;''' - items = list(self._executeQuery(query, [task_], fetch=_FETCH_ALL)) + items = list(self._executeQuery(query, [task_id], fetch=_FETCH_ALL)) return [x['task_id'] for x in items] def insertTaskPredecessor(self, task_id, predecessor_id, commit=True): @@ -1325,7 +1338,8 @@ class RADatabase: if commit: self.commit() return {'inserted': True, 'specification_id': specId, 'task_id': taskId} - except: + except Exception as e: + logger.error(e) self.rollback() return {'inserted': False, 'specification_id': None, 'task_id': None} @@ -1526,7 +1540,11 @@ if __name__ == '__main__': logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) db = RADatabase(dbcreds=dbcreds, log_queries=True) + st1 = db.insertSpecificationAndTask(0, 0, 'approved', 'observation', datetime.utcnow(), datetime.utcnow() + timedelta(hours=1), 'foo', 'CEP4') + st2 = db.insertSpecificationAndTask(1, 1, 'approved', 'pipeline', datetime.utcnow(), datetime.utcnow() + timedelta(hours=1), 'foo', 'CEP4') + db.insertTaskPredecessor(st2['task_id'], st1['task_id']) + exit() def resultPrint(method): print '\n-- ' + str(method.__name__) + ' --' diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py index fcf3ff0cf918f61af87224ede09814caea4d2eeb..7f108839fbad36a1ad9a859cfc941ca7b59227c0 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbbuslistener.py @@ -61,28 +61,27 @@ class RADBBusListener(AbstractBusListener): logger.info("on%s: %s" % (msg.subject.replace(self.subject_prefix, ''), str(msg.content).replace('\n', ' '))) if msg.subject == '%sTaskUpdated' % self.subject_prefix: - self.onTaskUpdated(msg.content.get('old'), msg.content.get('new')) + self.onTaskUpdated(msg.content) elif msg.subject == '%sTaskInserted' % self.subject_prefix: - self.onTaskInserted(msg.content.get('new')) + self.onTaskInserted(msg.content) elif msg.subject == '%sTaskDeleted' % self.subject_prefix: - self.onTaskDeleted(msg.content.get('old')) + self.onTaskDeleted(msg.content) elif msg.subject == '%sResourceClaimUpdated' % self.subject_prefix: - self.onResourceClaimUpdated(msg.content.get('old'), msg.content.get('new')) + self.onResourceClaimUpdated(msg.content) elif msg.subject == '%sResourceClaimInserted' % self.subject_prefix: - self.onResourceClaimInserted(msg.content.get('new')) + self.onResourceClaimInserted(msg.content) elif msg.subject == '%sResourceClaimDeleted' % self.subject_prefix: - self.onResourceClaimDeleted(msg.content.get('old')) + self.onResourceClaimDeleted(msg.content) elif msg.subject == '%sResourceAvailabilityUpdated' % self.subject_prefix: - self.onResourceAvailabilityUpdated(msg.content.get('old'), msg.content.get('new')) + self.onResourceAvailabilityUpdated(msg.content) elif msg.subject == '%sResourceCapacityUpdated' % self.subject_prefix: - self.onResourceCapacityUpdated(msg.content.get('old'), msg.content.get('new')) + self.onResourceCapacityUpdated(msg.content) else: logger.error("RADBBusListener.handleMessage: unknown subject: %s" %str(msg.subject)) - def onTaskUpdated(self, old_task, new_task): + def onTaskUpdated(self, updated_task): '''onTaskUpdated is called upon receiving a TaskUpdated message. - :param old_task: dictionary with the task before the update - :param new_task: dictionary with the updated task''' + :param updated_task: dictionary with the updated task''' pass def onTaskInserted(self, new_task): @@ -90,15 +89,14 @@ class RADBBusListener(AbstractBusListener): :param new_task: dictionary with the inserted task''' pass - def onTaskDeleted(self, old_task): + def onTaskDeleted(self, old_task_id): '''onTaskDeleted is called upon receiving a TaskDeleted message. - :param old_task: dictionary with the deleted task''' + :param old_task_id: id of the deleted task''' pass - def onResourceClaimUpdated(self, old_claim, new_claim): + def onResourceClaimUpdated(self, updated_claim): '''onResourceClaimUpdated is called upon receiving a ResourceClaimUpdated message. - :param old_claim: dictionary with the claim before the update - :param new_claim: dictionary with the updated claim''' + :param updated_claim: dictionary with the updated claim''' pass def onResourceClaimInserted(self, new_claim): @@ -106,21 +104,19 @@ class RADBBusListener(AbstractBusListener): :param new_claim: dictionary with the inserted claim''' pass - def onResourceClaimDeleted(self, old_claim): + def onResourceClaimDeleted(self, old_claim_id): '''onResourceClaimDeleted is called upon receiving a ResourceClaimDeleted message. - :param old_claim: dictionary with the deleted claim''' + :param old_claim_id: id of the deleted claim''' pass - def onResourceAvailabilityUpdated(self, old_availability, new_availability): + def onResourceAvailabilityUpdated(self, updated_availability): '''onResourceAvailabilityUpdated is called upon receiving a ResourceAvailabilityUpdated message. - :param old_availability: dictionary with the resource availability before the update - :param new_availability: dictionary with the updated availability''' + :param updated_availability: dictionary with the updated availability''' pass - def onResourceCapacityUpdated(self, old_capacity, new_capacity): + def onResourceCapacityUpdated(self, updated_capacity): '''onResourceCapacityUpdated is called upon receiving a ResourceCapacityUpdated message. - :param old_capacity: dictionary with the resource capacity before the update - :param new_capacity: dictionary with the updated capacity''' + :param updated_capacity: dictionary with the updated capacity''' pass diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py index 03065ae1c7211fcb8ac0eab101967dc50a6bb8cc..499821a7e8c5cdbc2d1e9b41a05f7a65bbf0ecac 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py @@ -56,85 +56,67 @@ class RADBPGListener(PostgresListener): self.rarpc = RARPC(busname=radb_busname, servicename=radb_servicename, broker=broker) - self.subscribe('task_update_with_task_view', self.onTaskUpdated) - self.subscribe('task_insert_with_task_view', self.onTaskInserted) + self.subscribe('task_update', self.onTaskUpdated) + self.subscribe('task_insert', self.onTaskInserted) self.subscribe('task_delete', self.onTaskDeleted) - self.subscribe('task_predecessor_insert', self.onTaskPredecessorInserted) - self.subscribe('task_predecessor_update', self.onTaskPredecessorUpdated) - self.subscribe('task_predecessor_delete', self.onTaskPredecessorDeleted) + self.subscribe('task_predecessor_insert_column_task_id', self.onTaskPredecessorChanged) + self.subscribe('task_predecessor_update_column_task_id', self.onTaskPredecessorChanged) + self.subscribe('task_predecessor_delete_column_task_id', self.onTaskPredecessorChanged) + + self.subscribe('task_predecessor_insert_column_predecessor_id', self.onTaskSuccessorChanged) + self.subscribe('task_predecessor_update_column_predecessor_id', self.onTaskSuccessorChanged) + self.subscribe('task_predecessor_delete_column_predecessor_id', self.onTaskSuccessorChanged) # when the specification starttime and endtime are updated, then that effects the task as well - # so subscribe to specification_update, and use task_view as view_for_row - self.subscribe('specification_update_with_task_view', self.onSpecificationUpdated) + self.subscribe('specification_update', self.onSpecificationUpdated) - self.subscribe('resource_claim_update_with_resource_claim_view', self.onResourceClaimUpdated) - self.subscribe('resource_claim_insert_with_resource_claim_view', self.onResourceClaimInserted) + self.subscribe('resource_claim_update', self.onResourceClaimUpdated) + self.subscribe('resource_claim_insert', self.onResourceClaimInserted) self.subscribe('resource_claim_delete', self.onResourceClaimDeleted) self.subscribe('resource_availability_update', self.onResourceAvailabilityUpdated) self.subscribe('resource_capacity_update', self.onResourceCapacityUpdated) def onTaskUpdated(self, payload = None): - self._convertPayloadAndSendNotification('TaskUpdated', payload, ['starttime', 'endtime']) + self._sendNotification('TaskUpdated', self.rarpc.getTask(payload)) def onTaskInserted(self, payload = None): - self._convertPayloadAndSendNotification('TaskInserted', payload, ['starttime', 'endtime']) + self._sendNotification('TaskInserted', self.rarpc.getTask(payload)) def onTaskDeleted(self, payload = None): - self._convertPayloadAndSendNotification('TaskDeleted', payload) - - def _onTaskPredecessorChanged(self, change_type, payload = None): - logger.info(payload) - try: - content = json.loads(payload) - except Exception as e: - logger.error('Could not parse payload: %s\n%s' % (payload, e)) - return - - try: - task_content = {} - - if change_type == 'Inserted' or change_type == 'Updated': - task_id = content['new']['task_id'] - task = self.rarpc.getTask(task_id) - task_content = {'new': task } - elif change_type == 'Deleted': - task_id = content['old']['task_id'] - task_content = {'old': {'id':task_id} } - - self._sendNotification('Task%s' % change_type, task_content) - except Exception as e: - logger.error('Could not parse payload: %s\n%s' % (payload, e)) - - def onTaskPredecessorUpdated(self, payload = None): - self._onTaskPredecessorChanged('Updated', payload) + self._sendNotification('TaskDeleted', {'id': payload }) - def onTaskPredecessorInserted(self, payload = None): - self._onTaskPredecessorChanged('Inserted', payload) + def onTaskPredecessorChanged(self, task_id): + logger.info('onTaskPredecessorChanged(task_id=%s)', task_id) + self._sendNotification('TaskUpdated', self.rarpc.getTask(task_id)) - def onTaskPredecessorDeleted(self, payload = None): - self._onTaskPredecessorChanged('Deleted', payload) + def onTaskSuccessorChanged(self, task_id): + logger.info('onTaskSuccessorChanged(task_id=%s)', task_id) + self._sendNotification('TaskUpdated', self.rarpc.getTask(task_id)) def onSpecificationUpdated(self, payload = None): # when the specification starttime and endtime are updated, then that effects the task as well - # so send a TaskUpdated notification - self._convertPayloadAndSendNotification('TaskUpdated', payload, ['starttime', 'endtime']) + self._sendNotification('TaskUpdated', self.rarpc.getTask(specification_id=payload)) def onResourceClaimUpdated(self, payload = None): - self._convertPayloadAndSendNotification('ResourceClaimUpdated', payload, ['starttime', 'endtime']) + self._sendNotification('ResourceClaimUpdated', self.rarpc.getResourceClaim(payload)) def onResourceClaimInserted(self, payload = None): - self._convertPayloadAndSendNotification('ResourceClaimInserted', payload, ['starttime', 'endtime']) + self._sendNotification('ResourceClaimInserted', self.rarpc.getResourceClaim(payload)) def onResourceClaimDeleted(self, payload = None): - self._convertPayloadAndSendNotification('ResourceClaimDeleted', payload) + self._sendNotification('ResourceClaimDeleted', {'id': payload }) def onResourceAvailabilityUpdated(self, payload = None): - self._convertPayloadAndSendNotification('ResourceAvailabilityUpdated', payload) + r = self.rarpc.getResources(resource_ids=[payload], include_availability=True)[0] + r = {k:r[k] for k in ['id', 'active']} + self._sendNotification('ResourceAvailabilityUpdated', r) def onResourceCapacityUpdated(self, payload = None): - self._convertPayloadAndSendNotification('ResourceCapacityUpdated', payload) + r = self.rarpc.getResources(resource_ids=[payload], include_availability=True)[0] + r = {k:r[k] for k in ['id', 'total_capacity', 'available_capacity', 'used_capacity']} + self._sendNotification('ResourceCapacityUpdated', r) def __enter__(self): super(RADBPGListener, self).__enter__() @@ -155,58 +137,28 @@ class RADBPGListener(PostgresListener): So, parse the requested fields, and return them as datetime. ''' try: - for state in ('old', 'new'): - if state in contentDict: - for field in fields: - try: - if contentDict[state] and field in contentDict[state]: - timestampStr = contentDict[state][field] - formatStr = '%Y-%m-%dT%H:%M:%S' if 'T' in timestampStr else '%Y-%m-%d %H:%M:%S' - if timestampStr.rfind('.') > -1: - formatStr += '.%f' - - timestamp = datetime.strptime(timestampStr, formatStr) - - contentDict[state][field] = timestamp - except Exception as e: - logger.error('Could not convert field \'%s\' to datetime: %s' % (field, e)) + for field in fields: + try: + if field in contentDict: + timestampStr = contentDict[field] + formatStr = '%Y-%m-%dT%H:%M:%S' if 'T' in timestampStr else '%Y-%m-%d %H:%M:%S' + if timestampStr.rfind('.') > -1: + formatStr += '.%f' - return contentDict - except Exception as e: - logger.error('Error while convering timestamp fields \'%s\'in %s\n%s' % (fields, contentDict, e)) - - - def _convertPayloadAndSendNotification(self, subject, payload, timestampFields = None): - try: - content = json.loads(payload) - except Exception as e: - logger.error('Could not parse payload: %s\n%s' % (payload, e)) - content=None + timestamp = datetime.strptime(timestampStr, formatStr) - return self._sendNotification(subject, content, timestampFields) + contentDict[field] = timestamp + except Exception as e: + logger.error('Could not convert field \'%s\' to datetime: %s' % (field, e)) - def _sendNotification(self, subject, content, timestampFields = None): - try: - if 'new' in content and content['new'] and 'old' in content and content['old']: - # check if new and old are equal. - # however, new and old can be based on different views, - # so, only check the values for the keys they have in common - new_keys = set(content['new'].keys()) - old_keys = set(content['old'].keys()) - common_keys = new_keys & old_keys - equal_valued_keys = [k for k in common_keys if content['new'][k] == content['old'][k]] - if len(equal_valued_keys) == len(common_keys): - logger.info('new and old values are equal, not sending notification. %s' % (content['new'])) - return - - if timestampFields: - content = self._formatTimestampsAsIso(timestampFields, content) + return contentDict except Exception as e: - logger.error('Exception while anayzing content: %s\n%s' % (content, e)) + logger.error('Error while convering timestamp fields \'%s\'in %s\n%s' % (fields, contentDict, e)) + def _sendNotification(self, subject, contentDict): try: - msg = EventMessage(context=self.notification_prefix + subject, content=content) - logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' '))) + msg = EventMessage(context=self.notification_prefix + subject, content=contentDict) + logger.info('Sending notification %s: %s' % (subject, str(contentDict).replace('\n', ' '))) self.event_bus.send(msg) except Exception as e: logger.error(str(e)) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql index 05fec7a48a4bcd9e0dbd43065ba21fa0b2d84720..05906465401a4c48d29c0edb8b0471d672015140 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_notifications.sql @@ -6,50 +6,48 @@ --this RADBPGListener then broadcasts the event on the lofar bus. -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_INSERT_with_task_view ON resource_allocation.task CASCADE; -DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_INSERT_with_task_view(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_INSERT ON resource_allocation.task CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_INSERT(); -CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_INSERT_with_task_view() +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_INSERT() RETURNS TRIGGER AS $$ -DECLARE -new_row_from_view resource_allocation.task_view%ROWTYPE; +DECLARE payload text; BEGIN -select * into new_row_from_view from resource_allocation.task_view where id = NEW.id LIMIT 1; -PERFORM pg_notify(CAST('task_insert_with_task_view' AS text), -'{"old":' || 'null' || ',"new":' || row_to_json(new_row_from_view)::text || '}'); +SELECT CAST(NEW.id AS text) INTO payload; +PERFORM pg_notify(CAST('task_insert' AS text), payload); RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_INSERT_with_task_view +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_INSERT AFTER INSERT ON resource_allocation.task FOR EACH ROW -EXECUTE PROCEDURE resource_allocation.NOTIFY_task_INSERT_with_task_view(); +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_INSERT(); -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_UPDATE_with_task_view ON resource_allocation.task CASCADE; -DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_UPDATE_with_task_view(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_UPDATE ON resource_allocation.task CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_UPDATE(); -CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_UPDATE_with_task_view() +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_UPDATE() RETURNS TRIGGER AS $$ -DECLARE -new_row_from_view resource_allocation.task_view%ROWTYPE; +DECLARE payload text; BEGIN -select * into new_row_from_view from resource_allocation.task_view where id = NEW.id LIMIT 1; -PERFORM pg_notify(CAST('task_update_with_task_view' AS text), -'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(new_row_from_view)::text || '}'); +IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN +SELECT CAST(NEW.id AS text) INTO payload; +PERFORM pg_notify(CAST('task_update' AS text), payload); +END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_UPDATE_with_task_view +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_UPDATE AFTER UPDATE ON resource_allocation.task FOR EACH ROW -EXECUTE PROCEDURE resource_allocation.NOTIFY_task_UPDATE_with_task_view(); +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_UPDATE(); DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_DELETE ON resource_allocation.task CASCADE; @@ -58,9 +56,10 @@ DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_DELETE(); CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_DELETE() RETURNS TRIGGER AS $$ +DECLARE payload text; BEGIN -PERFORM pg_notify(CAST('task_delete' AS text), -'{"old":' || row_to_json(OLD)::text || ',"new":' || 'null' || '}'); +SELECT CAST(OLD.id AS text) INTO payload; +PERFORM pg_notify(CAST('task_delete' AS text), payload); RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -72,133 +71,201 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_DELETE(); -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT ON resource_allocation.task_predecessor CASCADE; -DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT_column_task_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id(); -CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT() +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id() RETURNS TRIGGER AS $$ +DECLARE payload text; BEGIN -PERFORM pg_notify(CAST('task_predecessor_insert' AS text), -'{"old":' || 'null' || ',"new":' || row_to_json(NEW)::text || '}'); +SELECT CAST(NEW.task_id AS text) INTO payload; +PERFORM pg_notify(CAST('task_predecessor_insert_column_task_id' AS text), payload); RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT_column_task_id AFTER INSERT ON resource_allocation.task_predecessor FOR EACH ROW -EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT(); +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id(); -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE ON resource_allocation.task_predecessor CASCADE; -DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE_column_task_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id(); -CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE() +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id() RETURNS TRIGGER AS $$ +DECLARE payload text; BEGIN -PERFORM pg_notify(CAST('task_predecessor_update' AS text), -'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(NEW)::text || '}'); +IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN +SELECT CAST(NEW.task_id AS text) INTO payload; +PERFORM pg_notify(CAST('task_predecessor_update_column_task_id' AS text), payload); +END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE_column_task_id AFTER UPDATE ON resource_allocation.task_predecessor FOR EACH ROW -EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE(); +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id(); -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE ON resource_allocation.task_predecessor CASCADE; -DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE_column_task_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id(); -CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE() +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id() RETURNS TRIGGER AS $$ +DECLARE payload text; BEGIN -PERFORM pg_notify(CAST('task_predecessor_delete' AS text), -'{"old":' || row_to_json(OLD)::text || ',"new":' || 'null' || '}'); +SELECT CAST(OLD.task_id AS text) INTO payload; +PERFORM pg_notify(CAST('task_predecessor_delete_column_task_id' AS text), payload); RETURN OLD; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE_column_task_id AFTER DELETE ON resource_allocation.task_predecessor FOR EACH ROW -EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE(); +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id(); -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_specification_UPDATE_with_task_view ON resource_allocation.specification CASCADE; -DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_specification_UPDATE_with_task_view(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT_column_predecessor_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id(); -CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_specification_UPDATE_with_task_view() +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id() RETURNS TRIGGER AS $$ -DECLARE -new_row_from_view resource_allocation.task_view%ROWTYPE; +DECLARE payload text; BEGIN -select * into new_row_from_view from resource_allocation.task_view where specification_id = NEW.id LIMIT 1; -PERFORM pg_notify(CAST('specification_update_with_task_view' AS text), -'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(new_row_from_view)::text || '}'); +SELECT CAST(NEW.predecessor_id AS text) INTO payload; +PERFORM pg_notify(CAST('task_predecessor_insert_column_predecessor_id' AS text), payload); RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_specification_UPDATE_with_task_view +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_INSERT_column_predecessor_id +AFTER INSERT ON resource_allocation.task_predecessor +FOR EACH ROW +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id(); + + +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE_column_predecessor_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id(); + + +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id() +RETURNS TRIGGER AS $$ +DECLARE payload text; +BEGIN +IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN +SELECT CAST(NEW.predecessor_id AS text) INTO payload; +PERFORM pg_notify(CAST('task_predecessor_update_column_predecessor_id' AS text), payload); +END IF; +RETURN NEW; +END; +$$ LANGUAGE plpgsql; + + +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_UPDATE_column_predecessor_id +AFTER UPDATE ON resource_allocation.task_predecessor +FOR EACH ROW +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id(); + + +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE_column_predecessor_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id(); + + +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id() +RETURNS TRIGGER AS $$ +DECLARE payload text; +BEGIN +SELECT CAST(OLD.predecessor_id AS text) INTO payload; +PERFORM pg_notify(CAST('task_predecessor_delete_column_predecessor_id' AS text), payload); +RETURN OLD; +END; +$$ LANGUAGE plpgsql; + + +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_task_predecessor_DELETE_column_predecessor_id +AFTER DELETE ON resource_allocation.task_predecessor +FOR EACH ROW +EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id(); + + +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_specification_UPDATE ON resource_allocation.specification CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_specification_UPDATE(); + + +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_specification_UPDATE() +RETURNS TRIGGER AS $$ +DECLARE payload text; +BEGIN +IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN +SELECT CAST(NEW.id AS text) INTO payload; +PERFORM pg_notify(CAST('specification_update' AS text), payload); +END IF; +RETURN NEW; +END; +$$ LANGUAGE plpgsql; + + +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_specification_UPDATE AFTER UPDATE ON resource_allocation.specification FOR EACH ROW -EXECUTE PROCEDURE resource_allocation.NOTIFY_specification_UPDATE_with_task_view(); +EXECUTE PROCEDURE resource_allocation.NOTIFY_specification_UPDATE(); -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_INSERT_with_resource_claim_view ON resource_allocation.resource_claim CASCADE; -DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_INSERT_with_resource_claim_view(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_INSERT ON resource_allocation.resource_claim CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_INSERT(); -CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_INSERT_with_resource_claim_view() +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_INSERT() RETURNS TRIGGER AS $$ -DECLARE -new_row_from_view resource_allocation.resource_claim_view%ROWTYPE; +DECLARE payload text; BEGIN -select * into new_row_from_view from resource_allocation.resource_claim_view where id = NEW.id LIMIT 1; -PERFORM pg_notify(CAST('resource_claim_insert_with_resource_claim_view' AS text), -'{"old":' || 'null' || ',"new":' || row_to_json(new_row_from_view)::text || '}'); +SELECT CAST(NEW.id AS text) INTO payload; +PERFORM pg_notify(CAST('resource_claim_insert' AS text), payload); RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_claim_INSERT_with_resource_claim_view +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_claim_INSERT AFTER INSERT ON resource_allocation.resource_claim FOR EACH ROW -EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_INSERT_with_resource_claim_view(); +EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_INSERT(); -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_UPDATE_with_resource_claim_view ON resource_allocation.resource_claim CASCADE; -DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_UPDATE_with_resource_claim_view(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_UPDATE ON resource_allocation.resource_claim CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_UPDATE(); -CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_UPDATE_with_resource_claim_view() +CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_UPDATE() RETURNS TRIGGER AS $$ -DECLARE -new_row_from_view resource_allocation.resource_claim_view%ROWTYPE; +DECLARE payload text; BEGIN -select * into new_row_from_view from resource_allocation.resource_claim_view where id = NEW.id LIMIT 1; -PERFORM pg_notify(CAST('resource_claim_update_with_resource_claim_view' AS text), -'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(new_row_from_view)::text || '}'); +IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN +SELECT CAST(NEW.id AS text) INTO payload; +PERFORM pg_notify(CAST('resource_claim_update' AS text), payload); +END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_claim_UPDATE_with_resource_claim_view +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_claim_UPDATE AFTER UPDATE ON resource_allocation.resource_claim FOR EACH ROW -EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_UPDATE_with_resource_claim_view(); +EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_UPDATE(); DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_claim_DELETE ON resource_allocation.resource_claim CASCADE; @@ -207,9 +274,10 @@ DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_DELETE(); CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_DELETE() RETURNS TRIGGER AS $$ +DECLARE payload text; BEGIN -PERFORM pg_notify(CAST('resource_claim_delete' AS text), -'{"old":' || row_to_json(OLD)::text || ',"new":' || 'null' || '}'); +SELECT CAST(OLD.id AS text) INTO payload; +PERFORM pg_notify(CAST('resource_claim_delete' AS text), payload); RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -221,42 +289,48 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_DELETE(); -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_availability_UPDATE ON resource_monitoring.resource_availability CASCADE; -DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_availability_UPDATE(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_availability_UPDATE_column_resource_id ON resource_monitoring.resource_availability CASCADE; +DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id(); -CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_availability_UPDATE() +CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id() RETURNS TRIGGER AS $$ +DECLARE payload text; BEGIN -PERFORM pg_notify(CAST('resource_availability_update' AS text), -'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(NEW)::text || '}'); +IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN +SELECT CAST(NEW.resource_id AS text) INTO payload; +PERFORM pg_notify(CAST('resource_availability_update_column_resource_id' AS text), payload); +END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_availability_UPDATE +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_availability_UPDATE_column_resource_id AFTER UPDATE ON resource_monitoring.resource_availability FOR EACH ROW -EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_availability_UPDATE(); +EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id(); -DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_capacity_UPDATE ON resource_monitoring.resource_capacity CASCADE; -DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_capacity_UPDATE(); +DROP TRIGGER IF EXISTS TRIGGER_NOTIFY_NOTIFY_resource_capacity_UPDATE_column_resource_id ON resource_monitoring.resource_capacity CASCADE; +DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id(); -CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_capacity_UPDATE() +CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id() RETURNS TRIGGER AS $$ +DECLARE payload text; BEGIN -PERFORM pg_notify(CAST('resource_capacity_update' AS text), -'{"old":' || row_to_json(OLD)::text || ',"new":' || row_to_json(NEW)::text || '}'); +IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN +SELECT CAST(NEW.resource_id AS text) INTO payload; +PERFORM pg_notify(CAST('resource_capacity_update_column_resource_id' AS text), payload); +END IF; RETURN NEW; END; $$ LANGUAGE plpgsql; -CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_capacity_UPDATE +CREATE TRIGGER TRIGGER_NOTIFY_NOTIFY_resource_capacity_UPDATE_column_resource_id AFTER UPDATE ON resource_monitoring.resource_capacity FOR EACH ROW -EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_capacity_UPDATE(); +EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id(); diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py index 750b26c4871f7880424bd98460337702b501af9f..07860f3b37ce3ac55a2ebad1f4d38e2f736a8b0e 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/create_add_notifications.sql.py @@ -40,15 +40,18 @@ if __name__ == '__main__': f.write('--this RADBPGListener then broadcasts the event on the lofar bus.\n') f.write('\n') - f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'INSERT', view_for_row='task_view')) - f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'UPDATE', view_for_row='task_view')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'INSERT')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'UPDATE')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'task', 'DELETE')) - f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'INSERT')) - f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'UPDATE')) - f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'DELETE')) - f.writelines(makePostgresNotificationQueries('resource_allocation', 'specification', 'UPDATE', view_for_row='task_view', view_selection_id='specification_id')) - f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'INSERT', view_for_row='resource_claim_view')) - f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'UPDATE', view_for_row='resource_claim_view')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'INSERT', column_name='task_id')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'UPDATE', column_name='task_id')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'DELETE', column_name='task_id')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'INSERT', column_name='predecessor_id')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'UPDATE', column_name='predecessor_id')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'task_predecessor', 'DELETE', column_name='predecessor_id')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'specification', 'UPDATE')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'INSERT')) + f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'UPDATE')) f.writelines(makePostgresNotificationQueries('resource_allocation', 'resource_claim', 'DELETE')) - f.writelines(makePostgresNotificationQueries('resource_monitoring', 'resource_availability', 'UPDATE')) - f.writelines(makePostgresNotificationQueries('resource_monitoring', 'resource_capacity', 'UPDATE')) + f.writelines(makePostgresNotificationQueries('resource_monitoring', 'resource_availability', 'UPDATE', column_name='resource_id')) + f.writelines(makePostgresNotificationQueries('resource_monitoring', 'resource_capacity', 'UPDATE', column_name='resource_id')) diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py index 0df9611f2f8ce275f8ad1e98085c294f5f888d91..16109932eeba0e2611f15df6a0626fab8c5dd335 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/radbchangeshandler.py @@ -77,58 +77,56 @@ class RADBChangesHandler(RADBBusListener): with self._changedCondition: self._changedCondition.notifyAll() - def onTaskUpdated(self, old_task, new_task): + def onTaskUpdated(self, updated_task): '''onTaskUpdated is called upon receiving a TaskUpdated message.''' - #ignore old_task and new_task, which miss some properties via this update mechanism - #get task with all expected properties via radbrpc - task = self._radbrpc.getTask(new_task['id']) - task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':task} + updated_task['starttime'] = updated_task['starttime'].datetime() + updated_task['endtime'] = updated_task['endtime'].datetime() + task_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'task', 'value':updated_task} self._handleChange(task_change) - def onTaskInserted(self, task): + def onTaskInserted(self, new_task): '''onTaskInserted is called upon receiving a TaskInserted message. - :param task: dictionary with the inserted task''' - #ignore old_task and new_task, which miss some properties via this update mechanism - #get task with all expected properties via radbrpc - task = self._radbrpc.getTask(task['id']) - updateTaskMomDetails(task, self._momqueryrpc) - task_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'task', 'value':task} + :param new_task: dictionary with the inserted task''' + new_task['starttime'] = new_task['starttime'].datetime() + new_task['endtime'] = new_task['endtime'].datetime() + updateTaskMomDetails(new_task, self._momqueryrpc) + task_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'task', 'value':new_task} self._handleChange(task_change) - def onTaskDeleted(self, task): + def onTaskDeleted(self, old_task_id): '''onTaskDeleted is called upon receiving a TaskDeleted message. - :param task: dictionary with the deleted task''' - task_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'task', 'value':task} + :param old_task_id: id of the deleted task''' + task_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'task', 'value':{'id':old_task_id}} self._handleChange(task_change) - def onResourceClaimUpdated(self, old_claim, new_claim): + def onResourceClaimUpdated(self, updated_claim): '''onResourceClaimUpdated is called upon receiving a ResourceClaimUpdated message. - :param task: dictionary with the updated claim''' - new_claim['starttime'] = new_claim['starttime'].datetime() - new_claim['endtime'] = new_claim['endtime'].datetime() - claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceClaim', 'value':new_claim} + :param updated_claim: dictionary with the updated claim''' + updated_claim['starttime'] = updated_claim['starttime'].datetime() + updated_claim['endtime'] = updated_claim['endtime'].datetime() + claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceClaim', 'value':updated_claim} self._handleChange(claim_change) - def onResourceClaimInserted(self, claim): + def onResourceClaimInserted(self, new_claim): '''onResourceClaimInserted is called upon receiving a ResourceClaimInserted message. - :param claim: dictionary with the inserted claim''' - claim['starttime'] = claim['starttime'].datetime() - claim['endtime'] = claim['endtime'].datetime() - claim_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'resourceClaim', 'value':claim} + :param new_claim: dictionary with the inserted claim''' + new_claim['starttime'] = new_claim['starttime'].datetime() + new_claim['endtime'] = new_claim['endtime'].datetime() + claim_change = {'changeType':CHANGE_INSERT_TYPE, 'objectType':'resourceClaim', 'value':new_claim} self._handleChange(claim_change) - def onResourceClaimDeleted(self, claim): + def onResourceClaimDeleted(self, old_claim_id): '''onResourceClaimDeleted is called upon receiving a ResourceClaimDeleted message. - :param claim: dictionary with the deleted claim''' - claim_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'resourceClaim', 'value':claim} + :param old_claim_id: id of the deleted claim''' + claim_change = {'changeType':CHANGE_DELETE_TYPE, 'objectType':'resourceClaim', 'value':{'id': old_claim_id}} self._handleChange(claim_change) - def onResourceAvailabilityUpdated(self, old_availability, new_availability): - claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceAvailability', 'value':new_availability} + def onResourceAvailabilityUpdated(self, old_availability, updated_availability): + claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceAvailability', 'value':updated_availability} self._handleChange(claim_change) - def onResourceCapacityUpdated(self, old_capacity, new_capacity): - claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceCapacity', 'value':new_capacity} + def onResourceCapacityUpdated(self, old_capacity, updated_capacity): + claim_change = {'changeType':CHANGE_UPDATE_TYPE, 'objectType':'resourceCapacity', 'value':updated_capacity} self._handleChange(claim_change) def getMostRecentChangeNumber(self): diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py index 7b4496abd62899a6b7100e1e05fb5f446a26a5e2..29733fc1c6d9a8d8bc6a6b6627a6af3436d82be1 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py @@ -147,9 +147,9 @@ class RARPC(RPCWrapper): available_capacity=available_capacity, total_capacity=total_capacity) - def getTask(self, id=None, mom_id=None, otdb_id=None): - '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id''' - task = self.rpc('GetTask', id=id, mom_id=mom_id, otdb_id=otdb_id) + def getTask(self, id=None, mom_id=None, otdb_id=None, specification_id=None): + '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id, or for the given specification_id''' + task = self.rpc('GetTask', id=id, mom_id=mom_id, otdb_id=otdb_id, specification_id=specification_id) if task: task['starttime'] = task['starttime'].datetime() task['endtime'] = task['endtime'].datetime() @@ -186,6 +186,12 @@ class RARPC(RPCWrapper): task['endtime'] = task['endtime'].datetime() return tasks + def getTaskPredecessorIds(self, id=None): + return self.rpc('GetTaskPredecessorIds', id=id) + + def getTaskSuccessorIds(self, **kwargs): + return self.rpc('GetTaskSuccessorIds', id=id) + def insertTaskPredecessor(self, task_id, predecessor_id): return self.rpc('InsertTaskPredecessor', task_id=task_id, predecessor_id=predecessor_id) diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/service.py b/SAS/ResourceAssignment/ResourceAssignmentService/service.py index b205c500b869ccb74ff4e47645c0a5608964203b..10afe86da26f5983e0a7a0dbdaedaf75489ecc3c 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/service.py @@ -62,8 +62,10 @@ class RADBHandler(MessageHandlerInterface): 'DeleteTask': self._deleteTask, 'UpdateTask': self._updateTask, 'UpdateTaskStatusForOtdbId': self._updateTaskStatusForOtdbId, + 'GetTaskPredecessorIds': self._getTaskPredecessorIds, + 'GetTaskSuccessorIds': self._getTaskSuccessorIds, 'InsertTaskPredecessor': self._insertTaskPredecessor, - 'insertTaskPredecessors': self._insertTaskPredecessors, + 'InsertTaskPredecessors': self._insertTaskPredecessors, 'GetTaskStatuses': self._getTaskStatuses, 'GetTaskTypes': self._getTaskTypes, 'GetSpecifications': self._getSpecifications, @@ -222,7 +224,7 @@ class RADBHandler(MessageHandlerInterface): def _getTask(self, **kwargs): logger.info('GetTask: %s' % dict({k:v for k,v in kwargs.items() if v != None})) - task = self.radb.getTask(id=kwargs.get('id'), mom_id=kwargs.get('mom_id'), otdb_id=kwargs.get('otdb_id')) + task = self.radb.getTask(id=kwargs.get('id'), mom_id=kwargs.get('mom_id'), otdb_id=kwargs.get('otdb_id'), specification_id=kwargs.get('specification_id')) return task def _insertTask(self, **kwargs): @@ -258,6 +260,14 @@ class RADBHandler(MessageHandlerInterface): specification_id=kwargs.get('specification_id')) return {'id': id, 'updated': updated} + def _getTaskPredecessorIds(self, **kwargs): + logger.info('GetTaskPredecessorIds: %s' % dict({k:v for k,v in kwargs.items() if v != None})) + return convertIntKeysToString(self.radb.getTaskPredecessorIds(kwargs.get('id'))) + + def _getTaskSuccessorIds(self, **kwargs): + logger.info('GetTaskSuccessorIds: %s' % dict({k:v for k,v in kwargs.items() if v != None})) + return convertIntKeysToString(self.radb.getTaskSuccessorIds(kwargs.get('id'))) + def _insertTaskPredecessor(self, **kwargs): id = self.radb.insertTaskPredecessor(kwargs['task_id'], kwargs['predecessor_id'])