Skip to content
Snippets Groups Projects
Commit e4bb61cc authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-190: auto trigger dynamic scheduling based on allow_scheduling_observations setting

parent 0456cd73
No related branches found
No related tags found
1 merge request!252Resolve TMSS-190
......@@ -40,28 +40,28 @@ from lofar.common.dbcredentials import DBCredentials
logger = logging.getLogger(__name__)
def makePostgresNotificationQueries(schema, table, action, column_name='id'):
def makePostgresNotificationQueries(schema, table, action, column_name=None, id_column_name='id'):
action = action.upper()
if action not in ('INSERT', 'UPDATE', 'DELETE'):
raise ValueError('''trigger_type '%s' not in ('INSERT', 'UPDATE', 'DELETE')''' % action)
change_name = '''{table}_{action}'''.format(table=table, action=action)
if column_name != 'id':
if column_name is not None and column_name != id_column_name:
change_name += '_column_' + column_name
function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name)
if action == 'UPDATE':
if column_name == 'id':
select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload;'''
if column_name is None or column_name == id_column_name:
select_payload = '''SELECT '{"''' + id_column_name + '''": "' || CAST(NEW.''' + id_column_name + ''' AS text) || '"}' INTO payload;'''
else:
select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || ', "''' + column_name + '''": "' || REPLACE(CAST(NEW.''' + column_name + ''' AS text), '"', '\\"') || '"}' INTO payload;'''
select_payload = '''SELECT '{"''' + id_column_name + '''": "' || CAST(NEW.''' + id_column_name + ''' AS text) || '", "''' + column_name + '''": "' || REPLACE(CAST(NEW.''' + column_name + ''' AS text), '"', '\\"') || '"}' INTO payload;'''
elif action == 'INSERT':
select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload;'''
select_payload = '''SELECT '{"''' + id_column_name + '''": "' || CAST(NEW.''' + id_column_name + ''' AS text) || '"}' INTO payload;'''
elif action == 'DELETE':
select_payload = '''SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload;'''
select_payload = '''SELECT '{"''' + id_column_name + '''": "' || CAST(OLD.''' + id_column_name + ''' AS text) || '"}' INTO payload;'''
if action == 'UPDATE':
begin_update_check = 'IF ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what}) THEN'.format(what='*' if column_name == 'id' else column_name)
begin_update_check = 'IF ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what}) THEN'.format(what='*' if column_name is None or column_name == id_column_name else column_name)
end_update_check = 'END IF;'
else:
begin_update_check = ''
......@@ -83,7 +83,6 @@ def makePostgresNotificationQueries(schema, table, action, column_name='id'):
function_name=function_name,
table=table,
action=action,
old_or_new=('OLD' if action == 'DELETE' else 'NEW') + '.' + column_name,
value='OLD' if action == 'DELETE' else 'NEW',
change_name=change_name[:63].lower(), # postgres limits channel names to 63 chars
begin_update_check=begin_update_check,
......
......@@ -45,6 +45,7 @@ TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE %
TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SchedulingUnitBlueprint.Object'
TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SchedulingUnitBlueprint.Status'
TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SchedulingUnitDraft.Object'
TMSS_SETTING_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'Setting.Object'
TMSS_ALL_OBJECT_EVENTS_FILTER = _TMSS_EVENT_PREFIX_TEMPLATE % '.*.Object.#'
TMSS_ALL_STATUS_EVENTS_FILTER = _TMSS_EVENT_PREFIX_TEMPLATE % '.*.Status.#'
TMSS_ALL_EVENTS_FILTER = _TMSS_EVENT_PREFIX_TEMPLATE % '#'
......@@ -103,6 +104,8 @@ class TMSSEventMessageHandler(AbstractMessageHandler):
self.onTaskBlueprintStatusChanged(**msg.content)
elif stripped_subject.startswith('SchedulingUnitBlueprint.Status.'):
self.onSchedulingUnitBlueprintStatusChanged(**msg.content)
elif stripped_subject == 'Setting.Object.Updated':
self.onSettingUpdated(**msg.content)
else:
raise MessageHandlerUnknownSubjectError("TMSSBusListener.handleMessage: unknown subject: %s" % msg.subject)
......@@ -224,6 +227,13 @@ class TMSSEventMessageHandler(AbstractMessageHandler):
'''
pass
def onSettingUpdated(self, name: str, value):
'''onSettingUpdated is called upon receiving a Setting.Object.Updated message, which is sent when a Setting was updated.
:param name: the name of the Setting
'''
pass
class TMSSBusListener(BusListener):
def __init__(self,
......
......@@ -252,12 +252,20 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag
def onSchedulingUnitDraftConstraintsUpdated(self, id: int, scheduling_constraints_doc: dict):
self._do_schedule_event.set()
def onSettingUpdated(self, name: str, value: bool):
if name == models.Flag.Choices.AUTOSCHEDULE.value and value:
logger.info("%s was set to %s: triggering update of dynamic schedule...", name, value)
self._do_schedule_event.set()
def _scheduling_loop(self):
while self._scheduling_thread_running:
if self._do_schedule_event.wait(timeout=10):
self._do_schedule_event.clear()
try:
schedule_next_scheduling_unit_and_assign_start_stop_times_to_remaining_schedulable_scheduling_units()
if models.Setting.objects.get(name=models.Flag.Choices.AUTOSCHEDULE.value).value:
schedule_next_scheduling_unit_and_assign_start_stop_times_to_remaining_schedulable_scheduling_units()
else:
logger.warning("Skipping update of dynamic schedule because the setting %s=%s", models.Flag.Choices.AUTOSCHEDULE.value, models.Setting.objects.get(name=models.Flag.Choices.AUTOSCHEDULE.value).value)
except Exception as e:
logger.exception(str(e))
# just continue processing events. better luck next time...
......
......@@ -110,6 +110,10 @@ class TMSSPGListener(PostgresListener):
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitdraft', 'update', 'scheduling_constraints_doc'))
self.subscribe('tmssapp_schedulingunitdraft_update_column_scheduling_constraints_doc'[:63], self.onSchedulingUnitDraftConstraintsUpdated)
# Settings
self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_setting', 'update', id_column_name='name_id', column_name='value'))
self.subscribe('tmssapp_setting_update_column_value', self.onSettingUpdated)
return super().start()
def __exit__(self, exc_type, exc_val, exc_tb):
......@@ -200,6 +204,13 @@ class TMSSPGListener(PostgresListener):
payload['scheduling_constraints_doc'] = json.loads(payload['scheduling_constraints_doc'])
self._sendNotification(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Constraints.Updated', payload)
def onSettingUpdated(self, payload = None):
payload = json.loads(payload)
payload['name'] = payload['name_id']
del payload['name_id']
payload['value'] = payload['value'] in ('true', 'True', 't')
self._sendNotification(TMSS_SETTING_OBJECT_EVENT_PREFIX+'.Updated', payload)
def create_service(dbcreds, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
'''create a TMSSPGListener instance'''
......
......@@ -259,7 +259,7 @@ class FlagSerializer(RelationalHyperlinkedModelSerializer):
fields = '__all__'
class SettingSerializer(RelationalHyperlinkedModelSerializer):
class SettingSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = models.Setting
fields = '__all__'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment