diff --git a/CMake/LofarPackageList.cmake b/CMake/LofarPackageList.cmake index 740c52181b38c49d3b3a57866e5423582e3d096d..db28a087be704c2b09c85cd66fea45146d617029 100644 --- a/CMake/LofarPackageList.cmake +++ b/CMake/LofarPackageList.cmake @@ -1,7 +1,7 @@ # - Create for each LOFAR package a variable containing the absolute path to # its source directory. # -# Generated by gen_LofarPackageList_cmake.sh at do 28 mei 2020 11:22:44 CEST +# Generated by gen_LofarPackageList_cmake.sh at do 29 okt 2020 7:42:34 CET # # ---- DO NOT EDIT ---- # @@ -208,6 +208,8 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(RACommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/Common) set(TMSSClient_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/client) set(TMSSSubtaskSchedulingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/subtask_scheduling) + set(TMSSFeedbackHandlingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/feedback_handling) + set(TMSSPostgresListenerService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/tmss_postgres_listener) set(TriggerEmailServiceCommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Common) set(TriggerEmailServiceServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Server) set(CCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/CCU_MAC) diff --git a/LCS/Messaging/python/messaging/exceptions.py b/LCS/Messaging/python/messaging/exceptions.py index 52e023d2145e5775f05296c9a1cef9b48b67654c..003324cc41f1c4ca58597ceb4a02cdc9daee0e96 100644 --- a/LCS/Messaging/python/messaging/exceptions.py +++ b/LCS/Messaging/python/messaging/exceptions.py @@ -65,3 +65,15 @@ class MessagingTimeoutError(MessagingError, TimeoutError): """ pass + +class MessageHandlerError(MessagingError): + """ + raised upon handling a message + """ + pass + +class MessageHandlerUnknownSubjectError(MessageHandlerError): + """ + raised upon handling a message with an unknown subject + """ + pass diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 734e5db18de0c723d4ba47c4b9542f827a5e6558..66feb5bad871544098d302401a75b8be9fb11691 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -1570,6 +1570,9 @@ class BusListener: if isinstance(e, TimeoutError): logger.error("Handling of %s timed out: %s", lofar_msg, e) receiver.reject(lofar_msg, requeue=True) + elif isinstance(e, MessageHandlerError): + logger.error("Could not handle message %s: %s", lofar_msg, e) + receiver.reject(lofar_msg, requeue=False) else: logger.exception("Handling of %s failed. Rejecting message. Error: %s", lofar_msg, e) receiver.reject(lofar_msg, requeue=False) diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py index 13d6cee1db2a03e2b86863a3c08e174a2cc3ad01..9c6d36e6e4369f722c807b198ae07b34b0924d06 100644 --- a/LCS/PyCommon/postgres.py +++ b/LCS/PyCommon/postgres.py @@ -49,45 +49,66 @@ def makePostgresNotificationQueries(schema, table, action, column_name='id'): if column_name != 'id': change_name += '_column_' + column_name function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name) + + if action == 'UPDATE': + if column_name == 'id': + select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload;''' + else: + select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || ', "''' + column_name + '''": "' || CAST(NEW.''' + column_name + ''' AS text) || '"}' INTO payload;''' + elif action == 'INSERT': + select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload;''' + elif action == 'DELETE': + select_payload = '''SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload;''' + + if action == 'UPDATE': + begin_update_check = 'IF ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what}) THEN'.format(what='*' if column_name == 'id' else column_name) + end_update_check = 'END IF;' + else: + begin_update_check = '' + end_update_check = '' + function_sql = ''' - CREATE OR REPLACE FUNCTION {schema}.{function_name}() + CREATE OR REPLACE FUNCTION {schema}{function_name}() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN - {begin_update_check}SELECT CAST({column_value} AS text) INTO payload; - PERFORM pg_notify(CAST('{change_name}' AS text), payload);{end_update_check} + {begin_update_check} + {select_payload} + PERFORM pg_notify(CAST('{change_name}' AS text), payload); + {end_update_check} RETURN {value}; END; $$ LANGUAGE plpgsql; - '''.format(schema=schema, + '''.format(schema=schema+'.' if schema else '', function_name=function_name, table=table, action=action, - column_value=('OLD' if action == 'DELETE' else 'NEW') + '.' + column_name, + old_or_new=('OLD' if action == 'DELETE' else 'NEW') + '.' + column_name, value='OLD' if action == 'DELETE' else 'NEW', change_name=change_name.lower(), - begin_update_check='IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN\n' if action == 'UPDATE' else '', - end_update_check='\nEND IF;' if action == 'UPDATE' else '') + begin_update_check=begin_update_check, + select_payload=select_payload, + end_update_check=end_update_check) trigger_name = 'T_%s' % function_name trigger_sql = ''' CREATE TRIGGER {trigger_name} - AFTER {action} ON {schema}.{table} + AFTER {action} ON {schema}{table} FOR EACH ROW - EXECUTE PROCEDURE {schema}.{function_name}(); + EXECUTE PROCEDURE {schema}{function_name}(); '''.format(trigger_name=trigger_name, function_name=function_name, - schema=schema, + schema=schema+'.' if schema else '', table=table, action=action) drop_sql = ''' - DROP TRIGGER IF EXISTS {trigger_name} ON {schema}.{table} CASCADE; - DROP FUNCTION IF EXISTS {schema}.{function_name}(); + DROP TRIGGER IF EXISTS {trigger_name} ON {schema}{table} CASCADE; + DROP FUNCTION IF EXISTS {schema}{function_name}(); '''.format(trigger_name=trigger_name, function_name=function_name, - schema=schema, + schema=schema+'.' if schema else '', table=table) sql = drop_sql + '\n' + function_sql + '\n' + trigger_sql @@ -321,7 +342,7 @@ class PostgresDatabaseConnection: try: if self._connection.notices: for notice in self._connection.notices: - logger.info('database log message %s', notice.strip()) + logger.debug('database log message %s', notice.strip()) if isinstance(self._connection.notices, collections.deque): self._connection.notices.clear() else: @@ -331,19 +352,19 @@ class PostgresDatabaseConnection: def commit(self): if self.is_connected: - logger.info('commit') + logger.debug('commit') self._connection.commit() def rollback(self): if self.is_connected: - logger.info('rollback') + logger.debug('rollback') self._connection.rollback() class PostgresListener(PostgresDatabaseConnection): - ''' This class lets you listen to postgress notifications - It execute callbacks when a notifocation occurs. - Make your own subclass with your callbacks and subscribe them to the appriate channel. + ''' This class lets you listen to postgres notifications + It execute callbacks when a notification occurs. + Make your own subclass with your callbacks and subscribe them to the appropriate channel. Example: class MyListener(PostgresListener): @@ -385,18 +406,20 @@ class PostgresListener(PostgresDatabaseConnection): def subscribe(self, notification, callback): '''Subscribe to a certain postgres notification. Call callback method in case such a notification is received.''' - logger.info("Subscribed %sto %s" % ('and listening ' if self.isListening() else '', notification)) + logger.debug("Subscribing %sto %s" % ('and listening ' if self.isListening() else '', notification)) with self.__lock: self.executeQuery("LISTEN %s;", (psycopg2.extensions.AsIs(notification),)) self.__callbacks[notification] = callback + logger.info("Subscribed %sto %s" % ('and listening ' if self.isListening() else '', notification)) def unsubscribe(self, notification): '''Unubscribe from a certain postgres notification.''' - logger.info("Unsubscribed from %s" % notification) + logger.debug("Unsubscribing from %s" % notification) with self.__lock: self.executeQuery("UNLISTEN %s;", (psycopg2.extensions.AsIs(notification),)) if notification in self.__callbacks: del self.__callbacks[notification] + logger.info("Unsubscribed from %s" % notification) def isListening(self): '''Are we listening? Has the listener been started?''' @@ -459,12 +482,16 @@ class PostgresListener(PostgresDatabaseConnection): self.disconnect() def __enter__(self): - '''starts the listener upon contect enter''' - self.start() + '''starts the listener upon 'with' context enter''' + try: + self.start() + except Exception as e: + logger.exception(str(e)) + self.stop() return self def __exit__(self, exc_type, exc_val, exc_tb): - '''stops the listener upon contect enter''' + '''stops the listener upon 'with' context enter''' self.stop() def _callCallback(self, channel, payload = None): diff --git a/LCS/PyCommon/test/postgres.py b/LCS/PyCommon/test/postgres.py index f98092c3d7fd2a42f745c20ff59b2703bdb6cb43..51e3be001e05424dea7358c5aa4f239e02140faf 100755 --- a/LCS/PyCommon/test/postgres.py +++ b/LCS/PyCommon/test/postgres.py @@ -90,6 +90,8 @@ class PostgresTestDatabaseInstance(): # make the user known in the new test database self._create_superuser(dsn) + logger.info('Created test-database instance. It is available at: %s', self.dbcreds.stringWithHiddenPassword()) + logger.info('Applying test-database schema...') self.apply_database_schema() return @@ -106,9 +108,6 @@ class PostgresTestDatabaseInstance(): # create user role query = "CREATE USER %s WITH SUPERUSER PASSWORD '%s'" % (self.dbcreds.user, self.dbcreds.password) cursor.execute(query) - - logger.info('Created test-database instance. It is available at: %s', - self.dbcreds.stringWithHiddenPassword()) finally: cursor.close() conn.commit() diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 0f6febfe8ce68168d2f93003444dfb52d90cb8ce..8fb09299fd0b8051e9252de5f57f31f601ddb489 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -75,7 +75,7 @@ from lofar.common.subprocess_utils import communicate_returning_strings from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession -from lofar.sas.tmss.client.tmssbuslistener import TMSSSubTaskEventMessageHandler, TMSSSubTaskBusListener +from lofar.sas.tmss.client.tmssbuslistener import TMSSEventMessageHandler, TMSSBusListener import subprocess import pipes @@ -342,7 +342,7 @@ class PipelineDependencies(object): return self.rarpc.getTasks(task_status=task_status, task_type=task_type) -class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): +class PipelineControlTMSSHandler(TMSSEventMessageHandler): def __init__(self): super(PipelineControlTMSSHandler, self).__init__() @@ -394,22 +394,24 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): except Exception as e: logger.error(e) - def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state: str): - try: - subtask = self.tmss_client.get_subtask(subtask_id) - subtask_template = self.tmss_client.get_url_as_json_object(subtask['specifications_template']) - if 'pipeline' not in subtask_template['type_value']: - logger.info("skipping scheduled subtask id=%s of non-pipeline type '%s'", subtask_id, subtask_template['type_value']) - return - - logger.info("getting parset for scheduled subtask id=%s of type '%s'", subtask_id, subtask_template['type_value']) - parset = self.tmss_client.get_subtask_parset(subtask_id) - parset = parameterset.fromString(parset) - parset = Parset(parset.dict()) - if parset and self._shouldHandle(parset): - self._startPipeline(subtask_id, parset) - except Exception as e: - logger.error(e) + def onSubTaskStatusChanged(self, id: int, status: str): + if status == "scheduled": + try: + subtask = self.tmss_client.get_subtask(id) + subtask_template = self.tmss_client.get_url_as_json_object(subtask['specifications_template']) + if 'pipeline' not in subtask_template['type_value']: + logger.info("skipping scheduled subtask id=%s of non-pipeline type '%s'", id, subtask_template['type_value']) + return + + logger.info("getting parset for scheduled subtask id=%s of type '%s'", id, subtask_template['type_value']) + parset = self.tmss_client.get_subtask_parset(id) + parset = parameterset.fromString(parset) + parset = Parset(parset.dict()) + if parset and self._shouldHandle(parset): + self._startPipeline(id, parset) + except Exception as e: + logger.error(e) + @staticmethod def _shouldHandle(parset): @@ -978,7 +980,7 @@ class PipelineControl(OTDBBusListener): -class PipelineControlTMSS(TMSSSubTaskBusListener): +class PipelineControlTMSS(TMSSBusListener): def __init__(self, handler_kwargs: dict = None, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER): diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index d189be1560b98aca99ce59488de94f107fc1f7b3..18bd13f9c1f44378b90bdd5e6f99919627123012 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -23,7 +23,7 @@ from subprocess import call from optparse import OptionParser, OptionGroup from lofar.common.util import waitForInterrupt from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler -from lofar.sas.tmss.client.tmssbuslistener import TMSSSubTaskEventMessageHandler, TMSSSubTaskBusListener +from lofar.sas.tmss.client.tmssbuslistener import TMSSEventMessageHandler, TMSSBusListener from lofar.messaging import UsingToBusMixin, BusListener, ToBus, AbstractMessageHandler from lofar.messaging.messages import EventMessage, CommandMessage from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME @@ -67,8 +67,8 @@ class QAFilteringOTDBBusListener(OTDBBusListener): broker=broker) -class QAFilteringTMSSSubTaskBusListener(TMSSSubTaskBusListener): - class QAFilteringTMSSSubTaskEventMessageHandler(UsingToBusMixin, TMSSSubTaskEventMessageHandler): +class QAFilteringTMSSSubTaskBusListener(TMSSBusListener): + class QAFilteringTMSSSubTaskEventMessageHandler(UsingToBusMixin, TMSSEventMessageHandler): def _send_qa_command_message(self, subtask_id: int, command_subject: str): with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: tmsssession.set_subtask_status(subtask_id, 'queueing') @@ -83,14 +83,15 @@ class QAFilteringTMSSSubTaskBusListener(TMSSSubTaskBusListener): tmsssession.set_subtask_status(subtask_id, 'queued') - def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str): - with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: - subtask = tmsssession.get_subtask(subtask_id) - spec = tmsssession.get_url_as_json_object(subtask['specifications_template']) - if 'qa_files' == spec['type_value']: - self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_SUBJECT) - elif 'qa_plots' == spec['type_value']: - self._send_qa_command_message(subtask_id, DEFAULT_DO_QAPLOTS_SUBJECT) + def onSubTaskStatusChanged(self, id: int, status:str): + if status == "scheduled": + with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: + subtask = tmsssession.get_subtask(id) + spec = tmsssession.get_url_as_json_object(subtask['specifications_template']) + if 'qa_files' == spec['type_value']: + self._send_qa_command_message(id, DEFAULT_DO_QAFILE_CONVERSION_SUBJECT) + elif 'qa_plots' == spec['type_value']: + self._send_qa_command_message(id, DEFAULT_DO_QAPLOTS_SUBJECT) def __init__(self, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER): super().__init__(handler_type=QAFilteringTMSSSubTaskBusListener.QAFilteringTMSSSubTaskEventMessageHandler, diff --git a/QA/QA_Service/test/t_qa_service.py b/QA/QA_Service/test/t_qa_service.py index 477ba9dc491fb229786c354b14ec0fc6e8fcd1fe..fe5bfc908acd25b0225f6a6747277302564efa44 100755 --- a/QA/QA_Service/test/t_qa_service.py +++ b/QA/QA_Service/test/t_qa_service.py @@ -27,7 +27,6 @@ from datetime import datetime import logging -from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment logger = logging.getLogger(__name__) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_notifications.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_notifications.sql index 75b7008589043e296e1fd1cefd693497ef17c41f..5bbf683cac0016797b857704d6cc360daf271ecd 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_notifications.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_notifications.sql @@ -12,12 +12,18 @@ BEGIN; SET LOCAL client_min_messages=warning; +DROP TRIGGER IF EXISTS T_NOTIFY_task_INSERT ON resource_allocation.task CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_INSERT(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_INSERT() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(NEW.id AS text) INTO payload; + +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_insert' AS text), payload); + RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -29,12 +35,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_INSERT(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_UPDATE ON resource_allocation.task CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_UPDATE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_UPDATE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.id AS text) INTO payload; +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_update' AS text), payload); END IF; RETURN NEW; @@ -48,12 +58,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_UPDATE(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_DELETE ON resource_allocation.task CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_DELETE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_DELETE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(OLD.id AS text) INTO payload; + +SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_delete' AS text), payload); + RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -65,12 +81,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_DELETE(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_INSERT_column_task_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(NEW.task_id AS text) INTO payload; + +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_insert_column_task_id' AS text), payload); + RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -82,12 +104,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_UPDATE_column_task_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.task_id AS text) INTO payload; +IF ROW(NEW.task_id) IS DISTINCT FROM ROW(OLD.task_id) THEN +SELECT '{"id": ' || CAST(NEW.id AS text) || ', "task_id": "' || CAST(NEW.task_id AS text) || '"}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_update_column_task_id' AS text), payload); END IF; RETURN NEW; @@ -101,12 +127,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_DELETE_column_task_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(OLD.task_id AS text) INTO payload; + +SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_delete_column_task_id' AS text), payload); + RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -118,12 +150,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_INSERT_column_predecessor_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(NEW.predecessor_id AS text) INTO payload; + +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_insert_column_predecessor_id' AS text), payload); + RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -135,12 +173,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_UPDATE_column_predecessor_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.predecessor_id AS text) INTO payload; +IF ROW(NEW.predecessor_id) IS DISTINCT FROM ROW(OLD.predecessor_id) THEN +SELECT '{"id": ' || CAST(NEW.id AS text) || ', "predecessor_id": "' || CAST(NEW.predecessor_id AS text) || '"}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_update_column_predecessor_id' AS text), payload); END IF; RETURN NEW; @@ -154,12 +196,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_DELETE_column_predecessor_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(OLD.predecessor_id AS text) INTO payload; + +SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_delete_column_predecessor_id' AS text), payload); + RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -171,12 +219,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_specification_UPDATE ON resource_allocation.specification CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_specification_UPDATE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_specification_UPDATE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.id AS text) INTO payload; +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('specification_update' AS text), payload); END IF; RETURN NEW; @@ -190,12 +242,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_specification_UPDATE(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_claim_INSERT ON resource_allocation.resource_claim CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_INSERT(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_INSERT() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(NEW.id AS text) INTO payload; + +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('resource_claim_insert' AS text), payload); + RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -207,12 +265,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_INSERT(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_claim_UPDATE ON resource_allocation.resource_claim CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_UPDATE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_UPDATE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.id AS text) INTO payload; +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('resource_claim_update' AS text), payload); END IF; RETURN NEW; @@ -226,12 +288,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_UPDATE(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_claim_DELETE ON resource_allocation.resource_claim CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_DELETE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_DELETE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(OLD.id AS text) INTO payload; + +SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('resource_claim_delete' AS text), payload); + RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -243,12 +311,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_DELETE(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_availability_UPDATE_column_resource_id ON resource_monitoring.resource_availability CASCADE; +DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id(); + + CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.resource_id AS text) INTO payload; +IF ROW(NEW.resource_id) IS DISTINCT FROM ROW(OLD.resource_id) THEN +SELECT '{"id": ' || CAST(NEW.id AS text) || ', "resource_id": "' || CAST(NEW.resource_id AS text) || '"}' INTO payload; PERFORM pg_notify(CAST('resource_availability_update_column_resource_id' AS text), payload); END IF; RETURN NEW; @@ -262,12 +334,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_capacity_UPDATE_column_resource_id ON resource_monitoring.resource_capacity CASCADE; +DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id(); + + CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.resource_id AS text) INTO payload; +IF ROW(NEW.resource_id) IS DISTINCT FROM ROW(OLD.resource_id) THEN +SELECT '{"id": ' || CAST(NEW.id AS text) || ', "resource_id": "' || CAST(NEW.resource_id AS text) || '"}' INTO payload; PERFORM pg_notify(CAST('resource_capacity_update_column_resource_id' AS text), payload); END IF; RETURN NEW; diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_add_notifications.sql.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_add_notifications.sql.py index 81410073abe2002580597812c7adb9aba1260e25..fc0406c720075bd9b02eb07558ee8940950d11ed 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_add_notifications.sql.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_add_notifications.sql.py @@ -29,6 +29,7 @@ from lofar.common.postgres import makePostgresNotificationQueries logger = logging.getLogger(__name__) if __name__ == '__main__': + print("ToDo: the new notifications send a json dict as payload instead on just the id. Adapt RADB code to handle that. For now it's ok, as we don't change the RADB schema or notifications.") with open('add_notifications.sql', 'wt') as f: f.write('--this file was generated by create_add_notifications.sql.py\n') f.write('--it creates triggers and functions which fire postgres notify events upon the given table actions\n') diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py index 89a69bac043124e497db465e8718f3ae08f761f9..6a1786252db82892e650c2e24899cc6836046570 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py @@ -74,7 +74,7 @@ class RADBPGListener(PostgresListener): def onTaskUpdated(self, payload = None): # Send notification for the given updated task - task_id = payload + task_id = json.loads(payload)['id'] task = self.radb.getTask(task_id) self._sendNotification('TaskUpdated', task) @@ -86,39 +86,39 @@ class RADBPGListener(PostgresListener): self._sendNotification('TaskUpdated', suc_sched_task) def onTaskInserted(self, payload = None): - self._sendNotification('TaskInserted', self.radb.getTask(payload)) + self._sendNotification('TaskInserted', self.radb.getTask(json.loads(payload)['id'])) def onTaskDeleted(self, payload = None): - self._sendNotification('TaskDeleted', payload) + self._sendNotification('TaskDeleted', json.loads(payload)['id']) - def onTaskPredecessorChanged(self, task_id): - logger.info('onTaskPredecessorChanged(task_id=%s)', task_id) - self._sendNotification('TaskUpdated', self.radb.getTask(task_id)) + def onTaskPredecessorChanged(self, payload): + logger.info('onTaskPredecessorChanged(task_id=%s)', json.loads(payload)['task_id']) + self._sendNotification('TaskUpdated', self.radb.getTask(json.loads(payload)['task_id'])) - def onTaskSuccessorChanged(self, task_id): - logger.info('onTaskSuccessorChanged(task_id=%s)', task_id) - self._sendNotification('TaskUpdated', self.radb.getTask(task_id)) + def onTaskSuccessorChanged(self, payload): + logger.info('onTaskSuccessorChanged(task_id=%s)', json.loads(payload)['task_id']) + self._sendNotification('TaskUpdated', self.radb.getTask(json.loads(payload)['task_id'])) def onSpecificationUpdated(self, payload = None): # when the specification starttime and endtime are updated, then that effects the task as well - self._sendNotification('TaskUpdated', self.radb.getTask(specification_id=payload)) + self._sendNotification('TaskUpdated', self.radb.getTask(specification_id=json.loads(payload)['id'])) def onResourceClaimUpdated(self, payload = None): - self._sendNotification('ResourceClaimUpdated', self.radb.getResourceClaim(payload)) + self._sendNotification('ResourceClaimUpdated', self.radb.getResourceClaim(json.loads(payload)['id'])) def onResourceClaimInserted(self, payload = None): - self._sendNotification('ResourceClaimInserted', self.radb.getResourceClaim(payload)) + self._sendNotification('ResourceClaimInserted', self.radb.getResourceClaim(json.loads(payload)['id'])) def onResourceClaimDeleted(self, payload = None): - self._sendNotification('ResourceClaimDeleted', payload) + self._sendNotification('ResourceClaimDeleted', json.loads(payload)['id']) def onResourceAvailabilityUpdated(self, payload = None): - r = self.radb.getResources(resource_ids=[payload], include_availability=True)[0] + r = self.radb.getResources(resource_ids=[json.loads(payload)['id']], include_availability=True)[0] r = {k:r[k] for k in ['id', 'active']} self._sendNotification('ResourceAvailabilityUpdated', r) def onResourceCapacityUpdated(self, payload = None): - r = self.radb.getResources(resource_ids=[payload], include_availability=True)[0] + r = self.radb.getResources(resource_ids=[json.loads(payload)['id']], include_availability=True)[0] r = {k:r[k] for k in ['id', 'total_capacity', 'available_capacity', 'used_capacity']} self._sendNotification('ResourceCapacityUpdated', r) diff --git a/SAS/TMSS/client/lib/tmss_http_rest_client.py b/SAS/TMSS/client/lib/tmss_http_rest_client.py index 0fb13c71c657b88d82ad7bdd2dafdd6419cfdf99..48df33a1cab208a1892a4f5aa47ed11e73b066e0 100644 --- a/SAS/TMSS/client/lib/tmss_http_rest_client.py +++ b/SAS/TMSS/client/lib/tmss_http_rest_client.py @@ -156,10 +156,13 @@ class TMSSsession(object): return self.get_path_as_json_object("subtask", clauses) + def get_full_url_for_path(self, path: str) -> str: + '''get the full URL for the given path''' + return '%s/%s' % (self.base_url, path.strip('/')) + def get_path_as_json_object(self, path: str, params={}) -> object: '''get resource at the given path, interpret it as json, and return it as as native object (usually a dict or a list of dicts)''' - full_url = '%s/%s' % (self.base_url, path.strip('/')) - return self.get_url_as_json_object(full_url, params=params) + return self.get_url_as_json_object(self.get_full_url_for_path(path=path), params=params) def get_url_as_json_object(self, full_url: str, params={}) -> object: '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as native object (usually a dict or a list of dicts)''' diff --git a/SAS/TMSS/client/lib/tmssbuslistener.py b/SAS/TMSS/client/lib/tmssbuslistener.py index 30c49bb7ce9ae2bd70093821088dbd1a4667e607..81448e9a16c97e4cfb5f91213a218dde91f9edaf 100644 --- a/SAS/TMSS/client/lib/tmssbuslistener.py +++ b/SAS/TMSS/client/lib/tmssbuslistener.py @@ -28,180 +28,209 @@ Typical usage is to derive your own subclass from TMSSBusListener and implement """ from lofar.messaging.messagebus import BusListener, AbstractMessageHandler -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, EventMessage +from lofar.messaging import DEFAULT_BUSNAME, DEFAULT_BROKER, EventMessage +from lofar.messaging.exceptions import MessageHandlerUnknownSubjectError from lofar.common.util import waitForInterrupt, single_line_with_single_spaces import logging logger = logging.getLogger(__name__) -_DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE = 'TMSS.%s.notification' -DEFAULT_TMSS_TASK_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE % 'Task' -DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE % 'SubTask' -DEFAULT_TMSS_ALL_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE + '#' +_TMSS_EVENT_PREFIX_TEMPLATE = 'TMSS.Event.%s' +TMSS_SUBTASK_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SubTask.Object' +TMSS_SUBTASK_STATUS_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SubTask.Status' +TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'TaskBlueprint.Object' +TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'TaskBlueprint.Status' +TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'TaskDraft.Object' +TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SchedulingUnitBlueprint.Object' +TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SchedulingUnitBlueprint.Status' +TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SchedulingUnitDraft.Object' +TMSS_ALL_OBJECT_EVENTS_FILTER = _TMSS_EVENT_PREFIX_TEMPLATE % '.*.Object.#' +TMSS_ALL_STATUS_EVENTS_FILTER = _TMSS_EVENT_PREFIX_TEMPLATE % '.*.Status.#' +TMSS_ALL_EVENTS_FILTER = _TMSS_EVENT_PREFIX_TEMPLATE % '#' -class TMSSSubTaskEventMessageHandler(AbstractMessageHandler): +class TMSSEventMessageHandler(AbstractMessageHandler): ''' - Base-type messagehandler for handling TMSS event messages. - Typical usage is to derive your own subclass from TMSSSubTaskEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. + Base-type messagehandler for handling all TMSS event messages. + Typical usage is to derive your own subclass from TMSSEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. ''' def handle_message(self, msg: EventMessage): if not isinstance(msg, EventMessage): raise ValueError("%s: Ignoring non-EventMessage: %s" % (self.__class__.__name__, msg)) - stripped_subject = msg.subject.replace("%s." % DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX, '') - - logger.info("%s.on%s: %s" % (self.__class__.__name__, stripped_subject, single_line_with_single_spaces(msg.content))) - - if stripped_subject == 'Defining': - self.onSubTaskDefining(**msg.content) - elif stripped_subject == 'Defined': - self.onSubTaskDefined(**msg.content) - elif stripped_subject == 'Scheduling': - self.onSubTaskScheduling(**msg.content) - elif stripped_subject == 'Scheduled': - self.onSubTaskScheduled(**msg.content) - elif stripped_subject == 'Queueing': - self.onSubTaskQueueing(**msg.content) - elif stripped_subject == 'Queued': - self.onSubTaskQueued(**msg.content) - elif stripped_subject == 'Starting': - self.onSubTaskStarting(**msg.content) - elif stripped_subject == 'Started': - self.onSubTaskStarted(**msg.content) - elif stripped_subject == 'Finishing': - self.onSubTaskFinishing(**msg.content) - elif stripped_subject == 'Finished': - self.onSubTaskFinished(**msg.content) - elif stripped_subject == 'Cancelling': - self.onSubTaskCancelling(**msg.content) - elif stripped_subject == 'Cancelled': - self.onSubTaskCancelled(**msg.content) - elif stripped_subject == 'Error': - self.onSubTaskError(**msg.content) + stripped_subject = msg.subject.replace(_TMSS_EVENT_PREFIX_TEMPLATE%('',), '') + + logger.info("%s %s: %s" % (self.__class__.__name__, stripped_subject, single_line_with_single_spaces(msg.content))) + + # sorry, very big if/elif/else tree. + # it just maps all possible event subjects for all possible objects and statuses onto handler methods. + if stripped_subject == 'SubTask.Object.Created': + self.onSubTaskCreated(**msg.content) + elif stripped_subject == 'SubTask.Object.Updated': + self.onSubTaskUpdated(**msg.content) + elif stripped_subject == 'SubTask.Object.Deleted': + self.onSubTaskDeleted(**msg.content) + elif stripped_subject == 'TaskBlueprint.Object.Created': + self.onTaskBlueprintCreated(**msg.content) + elif stripped_subject == 'TaskBlueprint.Object.Updated': + self.onTaskBlueprintUpdated(**msg.content) + elif stripped_subject == 'TaskBlueprint.Object.Deleted': + self.onTaskBlueprintDeleted(**msg.content) + elif stripped_subject == 'TaskDraft.Object.Created': + self.onTaskDraftCreated(**msg.content) + elif stripped_subject == 'TaskDraft.Object.Updated': + self.onTaskDraftUpdated(**msg.content) + elif stripped_subject == 'TaskDraft.Object.Deleted': + self.onTaskDraftDeleted(**msg.content) + elif stripped_subject == 'SchedulingUnitBlueprint.Object.Created': + self.onSchedulingUnitBlueprintCreated(**msg.content) + elif stripped_subject == 'SchedulingUnitBlueprint.Object.Updated': + self.onSchedulingUnitBlueprintUpdated(**msg.content) + elif stripped_subject == 'SchedulingUnitBlueprint.Object.Deleted': + self.onSchedulingUnitBlueprintDeleted(**msg.content) + elif stripped_subject == 'SchedulingUnitDraft.Object.Created': + self.onSchedulingUnitDraftCreated(**msg.content) + elif stripped_subject == 'SchedulingUnitDraft.Object.Updated': + self.onSchedulingUnitDraftUpdated(**msg.content) + elif stripped_subject == 'SchedulingUnitDraft.Object.Deleted': + self.onSchedulingUnitDraftDeleted(**msg.content) + elif stripped_subject.startswith('SubTask.Status.'): + self.onSubTaskStatusChanged(**msg.content) + elif stripped_subject.startswith('TaskBlueprint.Status.'): + self.onTaskBlueprintStatusChanged(**msg.content) + elif stripped_subject.startswith('SchedulingUnitBlueprint.Status.'): + self.onSchedulingUnitBlueprintStatusChanged(**msg.content) else: - raise ValueError("TMSSBusListener.handleMessage: unknown subject: %s" % msg.subject) + raise MessageHandlerUnknownSubjectError("TMSSBusListener.handleMessage: unknown subject: %s" % msg.subject) - def onSubTaskDefining(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskDefining is called upon receiving a SubTaskDefining message, which is sent when a SubTasks changes state to "Defining". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + + def onSubTaskStatusChanged(self, id: int, status:str): + '''onSubTaskStatusChanged is called upon receiving a SubTask.Status.* message, which is sent when a SubTasks changes status. + :param id: the TMSS id of the SubTask + :param status: the new status of the SubTask + ''' + pass + + def onTaskBlueprintStatusChanged(self, id: int, status:str): + '''onTaskBlueprintStatusChanged is called upon receiving a TaskBlueprint.Status.* message, which is sent when a TaskBlueprint changes status. + :param id: the TMSS id of the TaskBlueprint + :param status: the new status of the TaskBlueprint + ''' + pass + + def onSchedulingUnitBlueprintStatusChanged(self, id: int, status:str): + '''onSchedulingUnitBlueprintStatusChanged is called upon receiving a SchedulingUnitBlueprint.Status.* message, which is sent when a SchedulingUnitBlueprints changes status. + :param id: the TMSS id of the SchedulingUnitBlueprint + :param status: the new status of the SchedulingUnitBlueprint + ''' + pass + + def onSubTaskCreated(self, id: int): + '''onSubTaskCreated is called upon receiving a SubTask.Object.Created message, which is sent when a SubTasks was created. + :param id: the TMSS id of the SubTask + ''' + pass + + def onSubTaskUpdated(self, id: int): + '''onSubTaskUpdated is called upon receiving a SubTask.Object.Updated message, which is sent when a SubTasks was created. + :param id: the TMSS id of the SubTask + ''' + pass + + def onSubTaskDeleted(self, id: int): + '''onSubTaskDeleted is called upon receiving a SubTask.Object.Deleted message, which is sent when a SubTasks was created. + :param id: the TMSS id of the SubTask ''' pass - def onSubTaskDefined(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskDefined is called upon received a SubTaskDefined message, which is sent when a SubTasks changes state to "Defined". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskDraftCreated(self, id: int): + '''onTaskDraftCreated is called upon receiving a TaskDraft.Object.Created message, which is sent when a TaskDrafts was created. + :param id: the TMSS id of the TaskDraft ''' pass - def onSubTaskScheduling(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskScheduling is called upon receiving a SubTaskScheduling message, which is sent when a SubTasks changes state to "Scheduling". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskDraftUpdated(self, id: int): + '''onTaskDraftUpdated is called upon receiving a TaskDraft.Object.Updated message, which is sent when a TaskDrafts was created. + :param id: the TMSS id of the TaskDraft ''' pass - def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskScheduled is called upon received a SubTaskScheduled message, which is sent when a SubTasks changes state to "Scheduled". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskDraftDeleted(self, id: int): + '''onTaskDraftDeleted is called upon receiving a TaskDraft.Object.Deleted message, which is sent when a TaskDrafts was created. + :param id: the TMSS id of the TaskDraft ''' pass - def onSubTaskQueueing(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskQueueing is called upon receiving a SubTaskQueueing message, which is sent when a SubTasks changes state to "Queueing". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskBlueprintCreated(self, id: int): + '''onTaskBlueprintCreated is called upon receiving a TaskBlueprint.Object.Created message, which is sent when a TaskBlueprints was created. + :param id: the TMSS id of the TaskBlueprint ''' pass - def onSubTaskQueued(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskQueued is called upon received a SubTaskQueued message, which is sent when a SubTasks changes state to "Queued". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskBlueprintUpdated(self, id: int): + '''onTaskBlueprintUpdated is called upon receiving a TaskBlueprint.Object.Updated message, which is sent when a TaskBlueprints was created. + :param id: the TMSS id of the TaskBlueprint ''' pass - def onSubTaskStarting(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskStarting is called upon receiving a SubTaskStarting message, which is sent when a SubTasks changes state to "Starting". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskBlueprintDeleted(self, id: int): + '''onTaskBlueprintDeleted is called upon receiving a TaskBlueprint.Object.Deleted message, which is sent when a TaskBlueprints was created. + :param id: the TMSS id of the TaskBlueprint ''' pass - def onSubTaskStarted(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskStarted is called upon received a SubTaskStarted message, which is sent when a SubTasks changes state to "Started". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitDraftCreated(self, id: int): + '''onSchedulingUnitDraftCreated is called upon receiving a SchedulingUnitDraft.Object.Created message, which is sent when a SchedulingUnitDrafts was created. + :param id: the TMSS id of the SchedulingUnitDraft ''' pass - def onSubTaskFinishing(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskFinishing is called upon receiving a SubTaskFinishing message, which is sent when a SubTasks changes state to "Finishing". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitDraftUpdated(self, id: int): + '''onSchedulingUnitDraftUpdated is called upon receiving a SchedulingUnitDraft.Object.Updated message, which is sent when a SchedulingUnitDrafts was created. + :param id: the TMSS id of the SchedulingUnitDraft ''' pass - def onSubTaskFinished(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskFinished is called upon received a SubTaskFinished message, which is sent when a SubTasks changes state to "Finished". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitDraftDeleted(self, id: int): + '''onSchedulingUnitDraftDeleted is called upon receiving a SchedulingUnitDraft.Object.Deleted message, which is sent when a SchedulingUnitDrafts was created. + :param id: the TMSS id of the SchedulingUnitDraft ''' pass - def onSubTaskCancelling(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskCancelling is called upon receiving a SubTaskCancelling message, which is sent when a SubTasks changes state to "Cancelling". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitBlueprintCreated(self, id: int): + '''onSchedulingUnitBlueprintCreated is called upon receiving a SchedulingUnitBlueprint.Object.Created message, which is sent when a SchedulingUnitBlueprints was created. + :param id: the TMSS id of the SchedulingUnitBlueprint ''' pass - def onSubTaskCancelled(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskCancelled is called upon received a SubTaskCancelled message, which is sent when a SubTasks changes state to "Cancelled". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitBlueprintUpdated(self, id: int): + '''onSchedulingUnitBlueprintUpdated is called upon receiving a SchedulingUnitBlueprint.Object.Updated message, which is sent when a SchedulingUnitBlueprints was created. + :param id: the TMSS id of the SchedulingUnitBlueprint ''' pass - def onSubTaskError(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskError is called upon receiving a SubTaskError message, which is sent when a SubTasks changes state to "Error". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitBlueprintDeleted(self, id: int): + '''onSchedulingUnitBlueprintDeleted is called upon receiving a SchedulingUnitBlueprint.Object.Deleted message, which is sent when a SchedulingUnitBlueprints was created. + :param id: the TMSS id of the SchedulingUnitBlueprint ''' pass -class TMSSSubTaskBusListener(BusListener): +class TMSSBusListener(BusListener): def __init__(self, - handler_type: TMSSSubTaskEventMessageHandler.__class__ = TMSSSubTaskEventMessageHandler, + handler_type: TMSSEventMessageHandler.__class__ = TMSSEventMessageHandler, handler_kwargs: dict = None, exchange: str = DEFAULT_BUSNAME, - routing_key: str = DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX+".#", + routing_key: str = TMSS_ALL_EVENTS_FILTER, num_threads: int = 1, broker: str = DEFAULT_BROKER): """ - TMSSSubTaskBusListener listens on the lofar notification message bus and calls on<SomeMessage> methods in the TMSSSubTaskEventMessageHandler when such a message is received. - Typical usage is to derive your own subclass from TMSSSubTaskEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. + TMSSSubTaskBusListener listens on the lofar notification message bus and calls on<SomeMessage> methods in the TMSSEventMessageHandler when such a message is received. + Typical usage is to derive your own subclass from TMSSEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. """ - if not issubclass(handler_type, TMSSSubTaskEventMessageHandler): - raise TypeError("handler_type should be a TMSSSubTaskEventMessageHandler subclass") + if not issubclass(handler_type, TMSSEventMessageHandler): + raise TypeError("handler_type should be a TMSSEventMessageHandler subclass") super().__init__(handler_type, handler_kwargs, exchange, routing_key, num_threads, broker) @@ -209,10 +238,11 @@ class TMSSSubTaskBusListener(BusListener): if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - class ExampleTMSSSubTaskEventMessageHandler(TMSSSubTaskEventMessageHandler): - def onSubTaskDefined(self, **kwargs): - logger.debug("MyTMSSSubTaskEventMessageHandler.onSubTaskDefined(%s)", kwargs) + class ExampleTMSSEventMessageHandler(TMSSEventMessageHandler): + def onSubTaskStatusChanged(self, id: int, status:str): + logger.debug("MyTMSSEventMessageHandler.onSubTaskStatusChanged(id=%s, status=%s)", id, status) - with TMSSSubTaskBusListener(handler_type=ExampleTMSSSubTaskEventMessageHandler): + from lofar.messaging.messagebus import BusListenerJanitor + with BusListenerJanitor(TMSSBusListener(handler_type=ExampleTMSSEventMessageHandler)): waitForInterrupt() diff --git a/SAS/TMSS/services/CMakeLists.txt b/SAS/TMSS/services/CMakeLists.txt index 5e89c9c4e37d3ae53fe078e2c3c367e03d438ba5..b1cdad1bc8906d3ba0302fe6c867a6eb8bff9df1 100644 --- a/SAS/TMSS/services/CMakeLists.txt +++ b/SAS/TMSS/services/CMakeLists.txt @@ -1,3 +1,4 @@ lofar_add_package(TMSSSubtaskSchedulingService subtask_scheduling) lofar_add_package(TMSSFeedbackHandlingService feedback_handling) +lofar_add_package(TMSSPostgresListenerService tmss_postgres_listener) diff --git a/SAS/TMSS/services/feedback_handling/test/t_feedback_handling_service.py b/SAS/TMSS/services/feedback_handling/test/t_feedback_handling_service.py index b3fe3a8361596c851e50b5a364d663532c2ef03b..4a414858756a8f417a77918681ab334609911369 100755 --- a/SAS/TMSS/services/feedback_handling/test/t_feedback_handling_service.py +++ b/SAS/TMSS/services/feedback_handling/test/t_feedback_handling_service.py @@ -25,7 +25,6 @@ import logging logger = logging.getLogger(__name__) from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment -from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import * from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator from lofar.sas.tmss.services.feedback_handling import TMSSFeedbackListener diff --git a/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py b/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py index 09fb330ef2e33507df236374fea799e342ee72c2..524a616a86fa35fca2351278a1d69b1df46d882f 100644 --- a/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py +++ b/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py @@ -36,7 +36,7 @@ logger = logging.getLogger(__name__) from lofar.sas.tmss.client.tmssbuslistener import * from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession -class TMSSSubTaskSchedulingEventMessageHandler(TMSSSubTaskEventMessageHandler): +class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler): ''' ''' def __init__(self, tmss_client_credentials_id: str=None): @@ -51,39 +51,34 @@ class TMSSSubTaskSchedulingEventMessageHandler(TMSSSubTaskEventMessageHandler): super().stop_handling() self.tmss_client.close() - def onSubTaskFinished(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskFinished is called upon received a SubTaskFinished message, which is sent when a SubTasks changes state to "Finished". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask - ''' - logger.info("subtask %s finished. Trying to schedule defined successor subtasks...", subtask_id) - - successors = self.tmss_client.get_subtask_successors(subtask_id, state="defined") - successor_ids = [s['id'] for s in successors] - - logger.info("subtask %s finished. trying to schedule defined successors: %s", - subtask_id, - ', '.join(str(id) for id in successor_ids) or 'None') - - for successor in successors: - try: - suc_subtask_id = successor['id'] - suc_subtask_state = successor['state_value'] - - if suc_subtask_state == "defined": - logger.info("trying to schedule successor subtask %s for finished subtask %s", suc_subtask_id, subtask_id) - scheduled_successor = self.tmss_client.schedule_subtask(suc_subtask_id) - suc_subtask_state = scheduled_successor['state_value'] - logger.info("successor subtask %s for finished subtask %s now has state '%s', see %s", suc_subtask_id, subtask_id, suc_subtask_state, scheduled_successor['url']) - else: - logger.warning("skipping scheduling of successor subtask %s for finished subtask %s because its state is '%s'", suc_subtask_id, subtask_id, suc_subtask_state) - - except Exception as e: - logger.error(e) + def onSubTaskStatusChanged(self, id: int, status: str): + super().onSubTaskStatusChanged(id, status) + + if status == "finished": + successors = self.tmss_client.get_subtask_successors(id, state="defined") + successor_ids = sorted([s['id'] for s in successors]) + + logger.info("subtask %s finished. trying to schedule defined successors: %s", + id, ', '.join(str(id) for id in successor_ids) or 'None') + + for successor in successors: + try: + suc_subtask_id = successor['id'] + suc_subtask_state = successor['state_value'] + + if suc_subtask_state == "defined": + logger.info("trying to schedule successor subtask %s for finished subtask %s", suc_subtask_id, id) + scheduled_successor = self.tmss_client.schedule_subtask(suc_subtask_id) + suc_subtask_state = scheduled_successor['state_value'] + logger.info("successor subtask %s for finished subtask %s now has state '%s', see %s", suc_subtask_id, id, suc_subtask_state, scheduled_successor['url']) + else: + logger.warning("skipping scheduling of successor subtask %s for finished subtask %s because its state is '%s'", suc_subtask_id, id, suc_subtask_state) + + except Exception as e: + logger.error(e) def create_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str=None): - return TMSSSubTaskBusListener(handler_type=TMSSSubTaskSchedulingEventMessageHandler, + return TMSSBusListener(handler_type=TMSSSubTaskSchedulingEventMessageHandler, handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id}, exchange=exchange, broker=broker) diff --git a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py index 1027b12e91d8d22686a32da76d8c126136d6c8a3..84d85d879019b0a5d09832d7cf5815f53ef12a2b 100755 --- a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py +++ b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py @@ -22,9 +22,9 @@ import uuid import logging logger = logging.getLogger(__name__) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment -from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import * from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor @@ -49,7 +49,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): cls.tmss_test_env.start() cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url, - (tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password)) + (cls.tmss_test_env.ldap_server.dbcreds.user, cls.tmss_test_env.ldap_server.dbcreds.password)) @classmethod def tearDownClass(cls) -> None: @@ -109,8 +109,6 @@ class TestSubtaskSchedulingService(unittest.TestCase): # subtask2 should now be scheduled self.assertEqual(subtask2['state_value'], 'scheduled') -logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - if __name__ == '__main__': #run the unit tests unittest.main() diff --git a/SAS/TMSS/services/tmss_postgres_listener/CMakeLists.txt b/SAS/TMSS/services/tmss_postgres_listener/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c500272b175b8952e8f6b1d3ff632fc642205b7a --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/CMakeLists.txt @@ -0,0 +1,8 @@ +lofar_package(TMSSPostgresListenerService 0.1 DEPENDS TMSSClient PyCommon PyMessaging) + +lofar_find_package(PythonInterp 3.4 REQUIRED) + +add_subdirectory(lib) +add_subdirectory(bin) +add_subdirectory(test) + diff --git a/SAS/TMSS/services/tmss_postgres_listener/bin/CMakeLists.txt b/SAS/TMSS/services/tmss_postgres_listener/bin/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..5bd20e76ba68d1a537069fe468b92fc0763ae93b --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/bin/CMakeLists.txt @@ -0,0 +1,4 @@ +lofar_add_bin_scripts(tmss_postgres_listener_service) + +# supervisord config files +lofar_add_sysconf_files(tmss_postgres_listener_service.ini DESTINATION supervisord.d) diff --git a/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service b/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service new file mode 100755 index 0000000000000000000000000000000000000000..6f4ba61d8055d8e1170f3914bd766a16659b509d --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service @@ -0,0 +1,24 @@ +#!/usr/bin/python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + + +from lofar.sas.tmss.services.tmss_postgres_listener import main + +if __name__ == "__main__": + main() diff --git a/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service.ini b/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service.ini new file mode 100644 index 0000000000000000000000000000000000000000..3564a30c1f84cba26814a90a3dd1cc2366db65d3 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service.ini @@ -0,0 +1,9 @@ +[program:tmss_pglistener_service] +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec tmss_pglistener_service' +user=lofarsys +stopsignal=INT ; KeyboardInterrupt +stopasgroup=true ; bash does not propagate signals +stdout_logfile=%(program_name)s.log +redirect_stderr=true +stderr_logfile=NONE +stdout_logfile_maxbytes=0 diff --git a/SAS/TMSS/services/tmss_postgres_listener/lib/CMakeLists.txt b/SAS/TMSS/services/tmss_postgres_listener/lib/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c438ae81c5d294ec3159cfa7b60d837678020c37 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/lib/CMakeLists.txt @@ -0,0 +1,10 @@ +lofar_find_package(PythonInterp 3.4 REQUIRED) +include(PythonInstall) + +set(_py_files + tmss_postgres_listener.py + ) + +python_install(${_py_files} + DESTINATION lofar/sas/tmss/services) + diff --git a/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py b/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py new file mode 100644 index 0000000000000000000000000000000000000000..3cf20c24ec7ed26321f2c8acc85e09a14961b6eb --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import logging +logger = logging.getLogger(__name__) + +from optparse import OptionParser, OptionGroup +import json + +from lofar.common.postgres import PostgresListener, makePostgresNotificationQueries +from lofar.messaging.messagebus import ToBus +from lofar.sas.tmss.client.tmssbuslistener import * +from lofar.common import dbcredentials +from lofar.common.util import single_line_with_single_spaces + +class TMSSPGListener(PostgresListener): + '''This class subscribes to the Subtask, TaskDraft/Blueprint & SchedulingUnitDraft/Blueprint tables in the TMSS database + and send EventMessages upon each table row action, *Created, *Updated, *Deleted, and for each status update. + See lofar.sas.tmss.client.tmssbuslistener.TMSSBusListener for the receiving BusListener''' + def __init__(self, + dbcreds, + exchange=DEFAULT_BUSNAME, + broker=DEFAULT_BROKER): + super().__init__(dbcreds=dbcreds) + self.event_bus = ToBus(exchange=exchange, broker=broker) + + def start(self): + logger.info("Starting to listen for TMSS database changes and publishing EventMessages on %s db: %s", self.event_bus.exchange, self._dbcreds.stringWithHiddenPassword()) + self.event_bus.open() + + # SubTask + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'insert')) + self.subscribe('tmssapp_subtask_insert', self.onSubTaskInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'update')) + self.subscribe('tmssapp_subtask_update', self.onSubTaskUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'delete')) + self.subscribe('tmssapp_subtask_delete', self.onSubTaskDeleted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'update', 'state_id')) + self.subscribe('tmssapp_subtask_update_column_state_id', self.onSubTaskStateUpdated) + + + # TaskBlueprint + # please note that the status property in the TaskBlueprint model is derived from the subtasks + # hence we cannot create a postgres notification for this "column". + # But, see how onSubTaskStateUpdated does send a TaskBlueprint status changed event as well for its parent. + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'insert')) + self.subscribe('tmssapp_taskblueprint_insert', self.onTaskBlueprintInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update')) + self.subscribe('tmssapp_taskblueprint_update', self.onTaskBlueprintUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'delete')) + self.subscribe('tmssapp_taskblueprint_delete', self.onTaskBlueprintDeleted) + + + # TaskDraft + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskdraft', 'insert')) + self.subscribe('tmssapp_taskdraft_insert', self.onTaskDraftInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskdraft', 'update')) + self.subscribe('tmssapp_taskdraft_update', self.onTaskDraftUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskdraft', 'delete')) + self.subscribe('tmssapp_taskdraft_delete', self.onTaskDraftDeleted) + + + # SchedulingUnitBlueprint + # please note that the status property in the SchedulingUnitBlueprint model is derived from the tasks, and these are derived from the subtasks + # hence we cannot create a postgres notification for this "column". + # But, see how onSubTaskStateUpdated does send a SchedulingUnitBlueprint status changed event as well for its parent. + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'insert')) + self.subscribe('tmssapp_schedulingunitblueprint_insert', self.onSchedulingUnitBlueprintInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update')) + self.subscribe('tmssapp_schedulingunitblueprint_update', self.onSchedulingUnitBlueprintUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'delete')) + self.subscribe('tmssapp_schedulingunitblueprint_delete', self.onSchedulingUnitBlueprintDeleted) + + + # SchedulingUnitDraft + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitdraft', 'insert')) + self.subscribe('tmssapp_schedulingunitdraft_insert', self.onSchedulingUnitDraftInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitdraft', 'update')) + self.subscribe('tmssapp_schedulingunitdraft_update', self.onSchedulingUnitDraftUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitdraft', 'delete')) + self.subscribe('tmssapp_schedulingunitdraft_delete', self.onSchedulingUnitDraftDeleted) + + return super().start() + + def __exit__(self, exc_type, exc_val, exc_tb): + super().stop() + self.event_bus.close() + logger.info("Stopped listening for TMSS database changes and publishing EventMessages on %s broker=%s db: %s", + self.event_bus.exchange, self.event_bus.broker, self._dbcreds.stringWithHiddenPassword()) + + def _sendNotification(self, subject, contentDict): + try: + if isinstance(contentDict, str): + contentDict = json.loads(contentDict) + + msg = EventMessage(subject=subject, content=contentDict) + logger.info('Sending %s to %s: %s', + subject, self.event_bus.exchange, single_line_with_single_spaces(contentDict)) + self.event_bus.send(msg) + except Exception as e: + logger.error(str(e)) + + def onSubTaskInserted(self, payload = None): + self._sendNotification(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onSubTaskUpdated(self, payload = None): + self._sendNotification(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onSubTaskDeleted(self, payload = None): + self._sendNotification(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onSubTaskStateUpdated(self, payload = None): + payload_dict = json.loads(payload) + # send notification for this subtask... + from lofar.sas.tmss.tmss.tmssapp.models.scheduling import Subtask + subtask = Subtask.objects.get(id=payload_dict['id']) + self._sendNotification(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.'+subtask.state.value.capitalize(), + {'id': subtask.id, 'status': subtask.state.value}) + + # ... and also send status change and object update events for the parent task, and schedulingunit, + # because their status is implicitly derived from their subtask(s) + # send both object.updated and status change events + self.onTaskBlueprintUpdated( {'id': subtask.task_blueprint.id}) + self._sendNotification(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.'+subtask.task_blueprint.status.capitalize(), + {'id': subtask.task_blueprint.id, 'status': subtask.task_blueprint.status}) + + self.onSchedulingUnitBlueprintUpdated( {'id': subtask.task_blueprint.scheduling_unit_blueprint.id}) + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.'+subtask.task_blueprint.scheduling_unit_blueprint.status.capitalize(), + {'id': subtask.task_blueprint.scheduling_unit_blueprint.id, 'status': subtask.task_blueprint.scheduling_unit_blueprint.status}) + + def onTaskBlueprintInserted(self, payload = None): + self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onTaskBlueprintUpdated(self, payload = None): + self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onTaskBlueprintDeleted(self, payload = None): + self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onTaskDraftInserted(self, payload = None): + self._sendNotification(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onTaskDraftUpdated(self, payload = None): + self._sendNotification(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onTaskDraftDeleted(self, payload = None): + self._sendNotification(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onSchedulingUnitBlueprintInserted(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onSchedulingUnitBlueprintUpdated(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onSchedulingUnitBlueprintDeleted(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onSchedulingUnitDraftInserted(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onSchedulingUnitDraftUpdated(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onSchedulingUnitDraftDeleted(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + +def create_service(dbcreds, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + '''create a TMSSPGListener instance''' + return TMSSPGListener(dbcreds=dbcreds, exchange=exchange, broker=broker) + + +def main(): + # make sure we run in UTC timezone + import os + os.environ['TZ'] = 'UTC' + + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = OptionParser("%prog [options]", + description='runs the radb postgres listener which listens to changes on some tables in the radb and publishes the changes as notifications on the bus.') + + group = OptionGroup(parser, 'Messaging options') + group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, + help='Address of the message broker, default: %default') + group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, + help="Bus or queue where the TMSS messages are published. [default: %default]") + parser.add_option_group(group) + + parser.add_option_group(dbcredentials.options_group(parser)) + parser.set_defaults(dbcredentials=os.environ.get('TMSS_CLIENT_DBCREDENTIALS', 'TMSS')) + (options, args) = parser.parse_args() + + dbcreds = dbcredentials.parse_options(options) + logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) + + # setup django + os.environ["TMSS_DBCREDENTIALS"] = options.dbcredentials + os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings" + import django + django.setup() + + with create_service(dbcreds=dbcreds, + exchange=options.exchange, + broker=options.broker) as listener: + listener.waitWhileListening() + +if __name__ == '__main__': + main() diff --git a/SAS/TMSS/services/tmss_postgres_listener/test/CMakeLists.txt b/SAS/TMSS/services/tmss_postgres_listener/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..36dfcc36e0a541a4c83b491ff3bc577ad1801b32 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/test/CMakeLists.txt @@ -0,0 +1,7 @@ +# $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $ + +if(BUILD_TESTING) + include(LofarCTest) + + lofar_add_test(t_tmss_postgres_listener_service) +endif() diff --git a/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py new file mode 100755 index 0000000000000000000000000000000000000000..b0b847668bf48905701d8e48adbc0273bb416162 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import uuid + +import logging +logger = logging.getLogger(__name__) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + +from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment +from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator + +from lofar.messaging.messagebus import TemporaryExchange +from lofar.sas.tmss.services.tmss_postgres_listener import * +from lofar.common.test_utils import integration_test +from threading import Lock +import requests +import json +from collections import deque +from datetime import datetime, timedelta + +@integration_test +class TestSubtaskSchedulingService(unittest.TestCase): + ''' + Tests for the SubtaskSchedulingService + ''' + @classmethod + def setUpClass(cls) -> None: + cls.TEST_UUID = uuid.uuid1() + + cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID)) + cls.tmp_exchange.open() + + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, start_postgres_listener=False, populate_schemas=False, populate_test_data=False) + cls.tmss_test_env.start() + + cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url, + (cls.tmss_test_env.ldap_server.dbcreds.user, + cls.tmss_test_env.ldap_server.dbcreds.password)) + + @classmethod + def tearDownClass(cls) -> None: + cls.tmss_test_env.stop() + cls.tmp_exchange.close() + + def test_01_for_expected_behaviour(self): + ''' + This test starts a TMSSPGListener service and TMSS, creates/updates/deletes subtasks/tasks/schedulingunits, and checks if the correct events are sent. + ''' + logger.info(' -- test_01_for_expected_behaviour -- ') + + class TestTMSSPGListener(TMSSPGListener): + '''Helper TMSSPGListener for this test, storing intermediate results, and providing synchronization threading.Events''' + def __init__(self, dbcreds, exchange=self.tmp_exchange.address): + super().__init__(dbcreds, exchange) + self.subjects = deque() + self.contentDicts = deque() + self.lock = Lock() + + def _sendNotification(self, subject, contentDict): + # instead of sending a notification to the messagebus, record the subject and content in queues + # so we can check in the test if the correct subjects are recorded + with self.lock: + logger.info("detected db change: %s %s", subject, single_line_with_single_spaces(contentDict)) + self.subjects.append(subject) + self.contentDicts.append(json.loads(contentDict) if isinstance(contentDict, str) else contentDict) + + # create and start the service (the object under test) + with TestTMSSPGListener(exchange=self.tmp_exchange.address, dbcreds=self.tmss_test_env.database.dbcreds) as service: + # create a SchedulingUnitDraft + su_draft = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingUnitDraft(), '/scheduling_unit_draft/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": su_draft['id']}, service.contentDicts.popleft()) + + + # create a TaskDraft + task_draft = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.TaskDraft(scheduling_unit_draft_url=su_draft['url']), '/task_draft/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": task_draft['id']}, service.contentDicts.popleft()) + + + # create a SchedulingUnitBlueprint + su_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingUnitBlueprint(scheduling_unit_draft_url=su_draft['url']), '/scheduling_unit_blueprint/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": su_blueprint['id']}, service.contentDicts.popleft()) + + + # create a TaskBlueprint + task_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.TaskBlueprint(scheduling_unit_blueprint_url=su_blueprint['url'], + draft_url=task_draft['url']), '/task_blueprint/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": task_blueprint['id']}, service.contentDicts.popleft()) + + + # create a SubTask + subtask = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(task_blueprint_url=task_blueprint['url']), '/subtask/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": subtask['id']}, service.contentDicts.popleft()) + + # update subtask status, use a nice tmss_client and the rest api. + with self.tmss_test_env.create_tmss_client() as client: + client.set_subtask_status(subtask['id'], 'scheduled') + + # ugly, but functional. Wait for all status updates: 1 object, 1 status. both per each object (3 types) => total 6 events. + start_wait = datetime.utcnow() + while True: + with service.lock: + if len(service.subjects) == 6: + break + if datetime.utcnow() - start_wait > timedelta(seconds=5): + raise TimeoutError("timeout while waiting for status/object updates") + + # sync and check + with service.lock: + self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX + '.Updated', service.subjects.popleft()) + self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) + self.assertEqual({'id': subtask['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft()) + self.assertEqual({'id': task_blueprint['id']}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) + self.assertEqual({'id': task_blueprint['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft()) + self.assertEqual({'id': su_blueprint['id']}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) + self.assertEqual({'id': su_blueprint['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + + # delete subtask, use direct http delete request on rest api + requests.delete(subtask['url'], auth=self.test_data_creator.auth) + + # sync and check subtask deleted + with service.lock: + self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Deleted', service.subjects.popleft()) + self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft()) + + +if __name__ == '__main__': + #run the unit tests + unittest.main() diff --git a/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.run b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.run new file mode 100755 index 0000000000000000000000000000000000000000..b3b8a82585942bb7a1cfaa3f7ea5f7a9d355c484 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.run @@ -0,0 +1,6 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "*tmss*" t_tmss_postgres_listener_service.py + diff --git a/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.sh b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.sh new file mode 100755 index 0000000000000000000000000000000000000000..600f72e660b1a47338192230ea4131140c7c7550 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_tmss_postgres_listener_service \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/settings.py b/SAS/TMSS/src/tmss/settings.py index 7f4fb7aaadf38a994f0f9eeb5212699bf9f31434..7f160668b40ac7164efdfaea77f44fb018e32d7d 100644 --- a/SAS/TMSS/src/tmss/settings.py +++ b/SAS/TMSS/src/tmss/settings.py @@ -354,9 +354,3 @@ SWAGGER_SETTINGS = { }, } - -# TODO Do I need distinguish more between Test and Production Environment?? -# maybe a local file in Development environment for test purposes -SCU = "http://scu199" if isDevelopmentEnvironment() or isTestEnvironment() else "http://scu001" -PIPELINE_SUBTASK_LOG_URL = SCU + ".control.lofar:7412/tasks/%s/log.html" -OBSERVATION_SUBTASK_LOG_URL = "https://proxy.lofar.eu/inspect/%s/rtcp-%s.errors" diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index a6257840944db83aedf0df4fc8667588ecd8a19f..6e58f28a9dcd373dc38be715dd609274e2e6deb1 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -18,10 +18,6 @@ from django.db.models.expressions import RawSQL from django.core.exceptions import ValidationError from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException -from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.messages import EventMessage -from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX -from lofar.common.util import single_line_with_single_spaces from django.conf import settings from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC import uuid @@ -170,17 +166,6 @@ class Subtask(BasicCommon): # keep original state for logging self.__original_state_id = self.state_id - @staticmethod - def _send_state_change_event_message(subtask_id:int, old_state: str, new_state: str): - with ToBus(exchange=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), - broker=os.environ.get("TMSS_BROKER", DEFAULT_BROKER)) as tobus: #TODO: do we want to connect to the bus for each new message, or have some global tobus? - msg = EventMessage(subject="%s.%s" % (DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX, new_state.capitalize()), - content={'subtask_id': subtask_id, 'old_state': old_state, 'new_state': new_state}) - address = tobus.remote_address - logger.info("Sending message with subject '%s' to exchange='%s' on broker=%s:%s content: %s", - msg.subject, tobus.exchange, address[0], address[1], single_line_with_single_spaces(msg.content)) - tobus.send(msg) - @property def successors(self) -> QuerySet: '''return the connect successor subtask(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets) @@ -242,39 +227,9 @@ class Subtask(BasicCommon): user=self.created_or_updated_by_user, user_identifier=identifier) log_entry.save() - try: - self._send_state_change_event_message(self.id, log_entry.old_state.value, log_entry.new_state.value) - except Exception as e: - logger.error("Could not send state change to messagebus: %s", e) - # update the previous state value self.__original_state_id = self.state_id - @property - def log_url(self): - """ - Return the link to the pipeline log in case of pipeline or - link to COBALT error log in case of an observation - otherwise just an empty string - """ - if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: - url = settings.OBSERVATION_SUBTASK_LOG_URL % (self.id, self.id) - elif self.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: - # Get RADBID, subtask must be at least 'scheduled' to exist in radb - # If RA is not started don't wait longer than 10 seconds - with RADBRPC.create(timeout=10) as radbrpc: - try: - radb_id = radbrpc.getTask(tmss_id=self.id) - except: - radb_id = None - if radb_id is None: - url = "not available (missing radbid)" - else: - url = settings.PIPELINE_SUBTASK_LOG_URL % radb_id['id'] - else: - url = "" - return url - class SubtaskStateLog(BasicCommon): """ diff --git a/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py index a2a10449daef88deacd7eadf8ae421e0e8891bf9..85d7bd21c54ca2ad78badd911131847c11fb3375 100644 --- a/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py @@ -85,7 +85,7 @@ class SubtaskSerializer(RelationalHyperlinkedModelSerializer): class Meta: model = models.Subtask fields = '__all__' - extra_fields = ['cluster_value', 'log_url'] + extra_fields = ['cluster_value'] class SubtaskInputSerializer(RelationalHyperlinkedModelSerializer): diff --git a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py index 54ed8f42527b6a902201b46b2c6e644b26ce38cc..2bc7b1814e5c667bcdd9fae7bea322e7696cdf82 100644 --- a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py @@ -16,9 +16,10 @@ from drf_yasg.utils import swagger_auto_schema from drf_yasg.inspectors import SwaggerAutoSchema from rest_framework.decorators import action -from django.http import HttpResponse, JsonResponse +from django.http import HttpResponse, JsonResponse, HttpResponseRedirect, HttpResponseNotFound from rest_framework.response import Response as RestResponse +from lofar.common import isProductionEnvironment, isTestEnvironment from lofar.sas.tmss.tmss.tmssapp.viewsets.lofar_viewset import LOFARViewSet from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp import serializers @@ -209,6 +210,49 @@ class SubtaskViewSet(LOFARViewSet): return RestResponse(serializer.data) + @swagger_auto_schema(responses={302: 'A redirect url to the task log for this Subtask.', + 403: 'forbidden'}, + operation_description="Get the task log for this Subtask.") + @action(methods=['get'], detail=True) + def task_log(self, request, pk=None): + """ + Return a redirect to the the link to the pipeline log in case of pipeline or + link to COBALT error log in case of an observation. + """ + subtask = get_object_or_404(models.Subtask, pk=pk) + + # redirect to cobalt log served at proxy.lofar.eu + if subtask.specifications_template.type.value == models.SubtaskType.Choices.OBSERVATION.value: + url = "https://proxy.lofar.eu/inspect/%s/rtcp-%s.errors" % (subtask.id, subtask.id) + return HttpResponseRedirect(redirect_to=url) + + # redirect to pipeline log served via webscheduler + if subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value: + # import here and not at top of module to "loosen" dependency on external packages, such as in this case the RADB RPC. + from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC + # Get RADBID, subtask must be at least 'scheduled' to exist in radb + try: + with RADBRPC.create(timeout=2) as radbrpc: + radb_id = radbrpc.getTask(tmss_id=subtask.id) + + if radb_id is None: + return HttpResponseNotFound( + content='No RADB task found for subtask id=%s type="%s status=%s". Cannot redirect to pipeline log.' % ( + subtask.id, subtask.specifications_template.type.value, subtask.state)) + + WEBSCHEDULER_URL = "http://scu001.control.lofar:7412" if isProductionEnvironment() else \ + "http://scu199.control.lofar:7412" if isTestEnvironment() else \ + "http://localhost:7412" + + url = "%s/tasks/%s/log.html" % (WEBSCHEDULER_URL, radb_id) + return HttpResponseRedirect(redirect_to=url) + except Exception as e: + return HttpResponseNotFound(content='No RADB task found for subtask id=%s type="%s". Cannot redirect to pipeline log.' % (subtask.id, subtask.specifications_template.type.value)) + + # unknown log + return HttpResponseNotFound(content='No log (url) available for subtask id=%s type="%s"' % (subtask.id, subtask.specifications_template.type.value) ) + + @swagger_auto_schema(responses={200: 'The input dataproducts of this subtask.', 403: 'forbidden'}, operation_description="Get the input dataproducts of this subtask.") diff --git a/SAS/TMSS/test/t_subtasks.py b/SAS/TMSS/test/t_subtasks.py index 4f40e9c53b8c0857430c032a9df3d08848f517b1..b9021a86f94d25f5fcccd620daf7705c07c8d88e 100755 --- a/SAS/TMSS/test/t_subtasks.py +++ b/SAS/TMSS/test/t_subtasks.py @@ -34,12 +34,7 @@ tmss_test_env.populate_schemas() from lofar.sas.tmss.test.tmss_test_data_django_models import * - -# import and setup rest test data creator -from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator - from lofar.sas.tmss.tmss.tmssapp import models - from lofar.sas.tmss.tmss.tmssapp.subtasks import * @@ -347,6 +342,31 @@ class SubtaskInputSelectionFilteringTest(unittest.TestCase): selection = {'sap': ['target0'], 'is_relevant': True} self.assertFalse(specifications_doc_meets_selection_doc(specs, selection)) + def test_links_to_log_files(self): + """ + Test redirect urls to subtask logfiles. + """ + + # the link to log files is a 'view' on the subtask, and NOT part of the subtask model. + # the link is served as an action on the REST API, redirecting to externally served log files. + # check/test the redirect urls. + with tmss_test_env.create_tmss_client() as client: + # observation + subtask_observation = create_subtask_object_for_testing("observation", "defined") + response = client.session.get(url=client.get_full_url_for_path('/subtask/%s/task_log' % (subtask_observation.id,)), allow_redirects=False) + self.assertTrue(response.is_redirect) + self.assertIn("proxy.lofar.eu", response.headers['Location']) + self.assertIn("rtcp-%s.errors" % subtask_observation.id, response.headers['Location']) + + # pipeline + subtask_pipeline = create_subtask_object_for_testing("pipeline", "defined") + response = client.session.get(url=client.get_full_url_for_path('/subtask/%s/task_log' % (subtask_pipeline.id,)), allow_redirects=False) + self.assertEqual(404, response.status_code) # no log (yet) for unscheduled pipeline + + # other (qa_plots) + subtask_qa_plots = create_subtask_object_for_testing("qa_plots", "defined") + self.assertEqual(404, response.status_code) # no log for other subtasktypes + class SettingTest(unittest.TestCase): @@ -359,22 +379,6 @@ class SettingTest(unittest.TestCase): with self.assertRaises(SubtaskSchedulingException): schedule_observation_subtask(obs_st) - def test_links_to_log_files(self): - """ - Test if the links to logging of a subtasks is correct: - For an observation the subtaskid is in the logging url - For a pipeline the radbid of the subtaskid is in the link, BUT because RA is not started is should - return "not available" - All other subtask types (like qa) should have an empty string (no logging) - """ - subtask_pipeline = create_subtask_object_for_testing("pipeline", "defined") - subtask_qa_plots = create_subtask_object_for_testing("qa_plots", "defined") - subtask_observation = create_subtask_object_for_testing("observation", "defined") - - self.assertIn("proxy.lofar.eu", subtask_observation.log_url) - self.assertIn("rtcp-%s.errors" % subtask_observation.id, subtask_observation.log_url) - self.assertIn("not available", subtask_pipeline.log_url) - self.assertEqual("", subtask_qa_plots.log_url) if __name__ == "__main__": diff --git a/SAS/TMSS/test/test_utils.py b/SAS/TMSS/test/test_utils.py index 2edeaae66b24887a9491527b23bbe6518f4456ae..7d559bb9800d4ad3112d49df59d3aa3094fec86a 100644 --- a/SAS/TMSS/test/test_utils.py +++ b/SAS/TMSS/test/test_utils.py @@ -142,8 +142,6 @@ class TMSSDjangoServerInstance(): self.port = port self.public_host = public_host or host self._server_process = None - self._exchange = exchange - self._broker = broker @property def host_address(self): @@ -190,8 +188,6 @@ class TMSSDjangoServerInstance(): # set these here, run django setup, and start the server os.environ["TMSS_LDAPCREDENTIALS"] = self.ldap_dbcreds_id os.environ["TMSS_DBCREDENTIALS"] = self.database_dbcreds_id - os.environ["TMSS_EXCHANGE"] = self._exchange - os.environ["TMSS_BROKER"] = self._broker os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings" django.setup() @@ -273,7 +269,10 @@ class TMSSTestEnvironment: '''Create and run a test django TMSS server against a newly created test database and a test ldap server (and cleanup automagically)''' def __init__(self, host: str='127.0.0.1', preferred_django_port: int=8000, public_host: str=None, exchange: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), broker: str=os.environ.get("TMSS_BROKER", DEFAULT_BROKER), - populate_schemas:bool=False, populate_test_data:bool=False): + populate_schemas:bool=False, populate_test_data:bool=False, + start_postgres_listener: bool=True): + self._exchange = exchange + self._broker = broker self._populate_schemas = populate_schemas self._populate_test_data = populate_test_data self.ldap_server = TestLDAPServer(user='test', password='test') @@ -282,11 +281,12 @@ class TMSSTestEnvironment: ldap_dbcreds_id=self.ldap_server.dbcreds_id, host=host, port=find_free_port(preferred_django_port), - public_host=public_host, - exchange=exchange, - broker=broker) + public_host=public_host) self.client_credentials = TemporaryCredentials(user=self.ldap_server.dbcreds.user, password=self.ldap_server.dbcreds.password) + self._start_postgres_listener = start_postgres_listener + self.postgres_listener = None + # Check for correct Django version, should be at least 3.0 if django.VERSION[0] < 3: print("\nWARNING: YOU ARE USING DJANGO VERSION '%s', WHICH WILL NOT SUPPORT ALL FEATURES IN TMSS!\n" % @@ -318,13 +318,24 @@ class TMSSTestEnvironment: user.is_superuser = True user.save() + if self._start_postgres_listener: + # start the TMSSPGListener, so the changes in the database are posted as EventMessages on the bus + from lofar.sas.tmss.services.tmss_postgres_listener import TMSSPGListener + self.postgres_listener = TMSSPGListener(exchange=self._exchange, broker=self._broker, dbcreds=self.database.dbcreds) + self.postgres_listener.start() + if self._populate_schemas or self._populate_test_data: self.populate_schemas() if self._populate_test_data: self.populate_test_data() + def stop(self): + if self.postgres_listener is not None: + self.postgres_listener.stop() + self.postgres_listener = None + self.django_server.stop() self.ldap_server.stop() self.database.destroy() diff --git a/SAS/TMSS/test/tmss_test_environment_unittest_setup.py b/SAS/TMSS/test/tmss_test_environment_unittest_setup.py index 04b9882454838c474e06523d8017ebc9320aca07..45d148eb3754b3235d7e41909b691f1129d35031 100644 --- a/SAS/TMSS/test/tmss_test_environment_unittest_setup.py +++ b/SAS/TMSS/test/tmss_test_environment_unittest_setup.py @@ -32,7 +32,8 @@ from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment tmss_test_env = TMSSTestEnvironment() try: tmss_test_env.start() -except: +except Exception as e: + logger.exception(str(e)) tmss_test_env.stop() exit(1)