diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py index 05b2540c788466ac7830eac16e3694d4c276df54..a130df80ed66e654fa6839805be9c786eda8d607 100644 --- a/LCS/PyCommon/postgres.py +++ b/LCS/PyCommon/postgres.py @@ -54,7 +54,7 @@ def makePostgresNotificationQueries(schema, table, action, column_name='id'): if column_name == 'id': select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload;''' else: - select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || ', "''' + column_name + '''": "' || CAST(NEW.''' + column_name + ''' AS text) || '"}' INTO payload;''' + select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || ', "''' + column_name + '''": "' || REPLACE(CAST(NEW.''' + column_name + ''' AS text), '"', '\\"') || '"}' INTO payload;''' elif action == 'INSERT': select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload;''' elif action == 'DELETE': @@ -85,7 +85,7 @@ def makePostgresNotificationQueries(schema, table, action, column_name='id'): action=action, old_or_new=('OLD' if action == 'DELETE' else 'NEW') + '.' + column_name, value='OLD' if action == 'DELETE' else 'NEW', - change_name=change_name.lower(), + change_name=change_name[:63].lower(), # postgres limits channel names to 63 chars begin_update_check=begin_update_check, select_payload=select_payload, end_update_check=end_update_check) diff --git a/SAS/TMSS/client/lib/tmssbuslistener.py b/SAS/TMSS/client/lib/tmssbuslistener.py index 81448e9a16c97e4cfb5f91213a218dde91f9edaf..72d8c2febdec2b085da08ec8c5010467c02d992f 100644 --- a/SAS/TMSS/client/lib/tmssbuslistener.py +++ b/SAS/TMSS/client/lib/tmssbuslistener.py @@ -93,6 +93,8 @@ class TMSSEventMessageHandler(AbstractMessageHandler): self.onSchedulingUnitDraftCreated(**msg.content) elif stripped_subject == 'SchedulingUnitDraft.Object.Updated': self.onSchedulingUnitDraftUpdated(**msg.content) + elif stripped_subject == 'SchedulingUnitDraft.Object.Constraints.Updated': + self.onSchedulingUnitDraftConstraintsUpdated(**msg.content) elif stripped_subject == 'SchedulingUnitDraft.Object.Deleted': self.onSchedulingUnitDraftDeleted(**msg.content) elif stripped_subject.startswith('SubTask.Status.'): @@ -192,6 +194,12 @@ class TMSSEventMessageHandler(AbstractMessageHandler): ''' pass + def onSchedulingUnitDraftConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict): + '''onSchedulingUnitDraftConstraintsUpdated is called upon receiving a SchedulingUnitDraft.Object.Constraints.Updated message, which is sent when a the constraints on a SchedulingUnitDrafts were updated. + :param id: the TMSS id of the SchedulingUnitDraft + ''' + pass + def onSchedulingUnitDraftDeleted(self, id: int): '''onSchedulingUnitDraftDeleted is called upon receiving a SchedulingUnitDraft.Object.Deleted message, which is sent when a SchedulingUnitDrafts was created. :param id: the TMSS id of the SchedulingUnitDraft diff --git a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py index 8b3e11939cf8321c2e13bbe8a10466fcccad7ef0..0c985bea504786317ae0d721bd5928d2c23b80b5 100644 --- a/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py +++ b/SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py @@ -249,6 +249,9 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag # This way we are sure that the latest units are always taken into account while scheduling, but we do not waste cpu cylces. self._do_schedule_event.set() + def onSchedulingUnitDraftConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict): + self._do_schedule_event.set() + def _scheduling_loop(self): while self._scheduling_thread_running: if self._do_schedule_event.wait(timeout=10): diff --git a/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py b/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py index 81b406f0b36ef37ccfe83af0744b184a31ab4c05..36119d341990e1189347220fb5d0a040598a62f8 100644 --- a/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py +++ b/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py @@ -107,6 +107,9 @@ class TMSSPGListener(PostgresListener): self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitdraft', 'delete')) self.subscribe('tmssapp_schedulingunitdraft_delete', self.onSchedulingUnitDraftDeleted) + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitdraft', 'update', 'scheduling_constraints_doc')) + self.subscribe('tmssapp_schedulingunitdraft_update_column_scheduling_constraints_doc'[:63], self.onSchedulingUnitDraftConstraintsUpdated) + return super().start() def __exit__(self, exc_type, exc_val, exc_tb): @@ -191,6 +194,12 @@ class TMSSPGListener(PostgresListener): def onSchedulingUnitDraftDeleted(self, payload = None): self._sendNotification(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + def onSchedulingUnitDraftConstraintsUpdated(self, payload = None): + # convert payload string to nested json doc + payload = json.loads(payload) + payload['scheduling_constraints_doc'] = json.loads(payload['scheduling_constraints_doc']) + self._sendNotification(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Constraints.Updated', payload) + def create_service(dbcreds, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): '''create a TMSSPGListener instance'''