diff --git a/CMake/LofarPackageList.cmake b/CMake/LofarPackageList.cmake index 740c52181b38c49d3b3a57866e5423582e3d096d..db28a087be704c2b09c85cd66fea45146d617029 100644 --- a/CMake/LofarPackageList.cmake +++ b/CMake/LofarPackageList.cmake @@ -1,7 +1,7 @@ # - Create for each LOFAR package a variable containing the absolute path to # its source directory. # -# Generated by gen_LofarPackageList_cmake.sh at do 28 mei 2020 11:22:44 CEST +# Generated by gen_LofarPackageList_cmake.sh at do 29 okt 2020 7:42:34 CET # # ---- DO NOT EDIT ---- # @@ -208,6 +208,8 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(RACommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/Common) set(TMSSClient_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/client) set(TMSSSubtaskSchedulingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/subtask_scheduling) + set(TMSSFeedbackHandlingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/feedback_handling) + set(TMSSPostgresListenerService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/tmss_postgres_listener) set(TriggerEmailServiceCommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Common) set(TriggerEmailServiceServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Server) set(CCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/CCU_MAC) diff --git a/Docker/lofar-ci/Dockerfile_ci_sas b/Docker/lofar-ci/Dockerfile_ci_sas index ffcfb4133ecabcbb5004bef01b945d8251ae9127..b515298af20f9d3a3bd01d36e1628ac2eec8c2c5 100644 --- a/Docker/lofar-ci/Dockerfile_ci_sas +++ b/Docker/lofar-ci/Dockerfile_ci_sas @@ -16,7 +16,7 @@ RUN yum erase -y postgresql postgresql-server postgresql-devel && \ cd /bin && ln -s /usr/pgsql-9.6/bin/initdb && ln -s /usr/pgsql-9.6/bin/postgres ENV PATH /usr/pgsql-9.6/bin:$PATH -RUN pip3 install cython kombu lxml requests pygcn xmljson mysql-connector-python python-dateutil Django==3.0.9 djangorestframework==3.11.1 djangorestframework-xml ldap==1.0.2 flask fabric coverage python-qpid-proton PyGreSQL numpy h5py psycopg2 testing.postgresql Flask-Testing scipy Markdown django-filter python-ldap python-ldap-test ldap3 django-jsonforms django-json-widget django-jsoneditor drf-yasg flex swagger-spec-validator django-auth-ldap mozilla-django-oidc jsonschema comet pyxb==1.2.5 graphviz isodate astropy packaging django-debug-toolbar pymysql +RUN pip3 install cython kombu lxml requests pygcn xmljson mysql-connector-python python-dateutil Django==3.0.9 djangorestframework==3.11.1 djangorestframework-xml ldap==1.0.2 flask fabric coverage python-qpid-proton PyGreSQL numpy h5py psycopg2 testing.postgresql Flask-Testing scipy Markdown django-filter python-ldap python-ldap-test ldap3 django-jsonforms django-json-widget django-jsoneditor drf-yasg flex swagger-spec-validator django-auth-ldap mozilla-django-oidc jsonschema comet pyxb==1.2.5 graphviz isodate astropy packaging django-debug-toolbar pymysql astroplan #Viewflow package RUN pip3 install django-material django-viewflow diff --git a/LCS/Messaging/python/messaging/exceptions.py b/LCS/Messaging/python/messaging/exceptions.py index 52e023d2145e5775f05296c9a1cef9b48b67654c..003324cc41f1c4ca58597ceb4a02cdc9daee0e96 100644 --- a/LCS/Messaging/python/messaging/exceptions.py +++ b/LCS/Messaging/python/messaging/exceptions.py @@ -65,3 +65,15 @@ class MessagingTimeoutError(MessagingError, TimeoutError): """ pass + +class MessageHandlerError(MessagingError): + """ + raised upon handling a message + """ + pass + +class MessageHandlerUnknownSubjectError(MessageHandlerError): + """ + raised upon handling a message with an unknown subject + """ + pass diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 734e5db18de0c723d4ba47c4b9542f827a5e6558..66feb5bad871544098d302401a75b8be9fb11691 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -1570,6 +1570,9 @@ class BusListener: if isinstance(e, TimeoutError): logger.error("Handling of %s timed out: %s", lofar_msg, e) receiver.reject(lofar_msg, requeue=True) + elif isinstance(e, MessageHandlerError): + logger.error("Could not handle message %s: %s", lofar_msg, e) + receiver.reject(lofar_msg, requeue=False) else: logger.exception("Handling of %s failed. Rejecting message. Error: %s", lofar_msg, e) receiver.reject(lofar_msg, requeue=False) diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py index 13d6cee1db2a03e2b86863a3c08e174a2cc3ad01..9c6d36e6e4369f722c807b198ae07b34b0924d06 100644 --- a/LCS/PyCommon/postgres.py +++ b/LCS/PyCommon/postgres.py @@ -49,45 +49,66 @@ def makePostgresNotificationQueries(schema, table, action, column_name='id'): if column_name != 'id': change_name += '_column_' + column_name function_name = '''NOTIFY_{change_name}'''.format(change_name=change_name) + + if action == 'UPDATE': + if column_name == 'id': + select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload;''' + else: + select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || ', "''' + column_name + '''": "' || CAST(NEW.''' + column_name + ''' AS text) || '"}' INTO payload;''' + elif action == 'INSERT': + select_payload = '''SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload;''' + elif action == 'DELETE': + select_payload = '''SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload;''' + + if action == 'UPDATE': + begin_update_check = 'IF ROW(NEW.{what}) IS DISTINCT FROM ROW(OLD.{what}) THEN'.format(what='*' if column_name == 'id' else column_name) + end_update_check = 'END IF;' + else: + begin_update_check = '' + end_update_check = '' + function_sql = ''' - CREATE OR REPLACE FUNCTION {schema}.{function_name}() + CREATE OR REPLACE FUNCTION {schema}{function_name}() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN - {begin_update_check}SELECT CAST({column_value} AS text) INTO payload; - PERFORM pg_notify(CAST('{change_name}' AS text), payload);{end_update_check} + {begin_update_check} + {select_payload} + PERFORM pg_notify(CAST('{change_name}' AS text), payload); + {end_update_check} RETURN {value}; END; $$ LANGUAGE plpgsql; - '''.format(schema=schema, + '''.format(schema=schema+'.' if schema else '', function_name=function_name, table=table, action=action, - column_value=('OLD' if action == 'DELETE' else 'NEW') + '.' + column_name, + old_or_new=('OLD' if action == 'DELETE' else 'NEW') + '.' + column_name, value='OLD' if action == 'DELETE' else 'NEW', change_name=change_name.lower(), - begin_update_check='IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN\n' if action == 'UPDATE' else '', - end_update_check='\nEND IF;' if action == 'UPDATE' else '') + begin_update_check=begin_update_check, + select_payload=select_payload, + end_update_check=end_update_check) trigger_name = 'T_%s' % function_name trigger_sql = ''' CREATE TRIGGER {trigger_name} - AFTER {action} ON {schema}.{table} + AFTER {action} ON {schema}{table} FOR EACH ROW - EXECUTE PROCEDURE {schema}.{function_name}(); + EXECUTE PROCEDURE {schema}{function_name}(); '''.format(trigger_name=trigger_name, function_name=function_name, - schema=schema, + schema=schema+'.' if schema else '', table=table, action=action) drop_sql = ''' - DROP TRIGGER IF EXISTS {trigger_name} ON {schema}.{table} CASCADE; - DROP FUNCTION IF EXISTS {schema}.{function_name}(); + DROP TRIGGER IF EXISTS {trigger_name} ON {schema}{table} CASCADE; + DROP FUNCTION IF EXISTS {schema}{function_name}(); '''.format(trigger_name=trigger_name, function_name=function_name, - schema=schema, + schema=schema+'.' if schema else '', table=table) sql = drop_sql + '\n' + function_sql + '\n' + trigger_sql @@ -321,7 +342,7 @@ class PostgresDatabaseConnection: try: if self._connection.notices: for notice in self._connection.notices: - logger.info('database log message %s', notice.strip()) + logger.debug('database log message %s', notice.strip()) if isinstance(self._connection.notices, collections.deque): self._connection.notices.clear() else: @@ -331,19 +352,19 @@ class PostgresDatabaseConnection: def commit(self): if self.is_connected: - logger.info('commit') + logger.debug('commit') self._connection.commit() def rollback(self): if self.is_connected: - logger.info('rollback') + logger.debug('rollback') self._connection.rollback() class PostgresListener(PostgresDatabaseConnection): - ''' This class lets you listen to postgress notifications - It execute callbacks when a notifocation occurs. - Make your own subclass with your callbacks and subscribe them to the appriate channel. + ''' This class lets you listen to postgres notifications + It execute callbacks when a notification occurs. + Make your own subclass with your callbacks and subscribe them to the appropriate channel. Example: class MyListener(PostgresListener): @@ -385,18 +406,20 @@ class PostgresListener(PostgresDatabaseConnection): def subscribe(self, notification, callback): '''Subscribe to a certain postgres notification. Call callback method in case such a notification is received.''' - logger.info("Subscribed %sto %s" % ('and listening ' if self.isListening() else '', notification)) + logger.debug("Subscribing %sto %s" % ('and listening ' if self.isListening() else '', notification)) with self.__lock: self.executeQuery("LISTEN %s;", (psycopg2.extensions.AsIs(notification),)) self.__callbacks[notification] = callback + logger.info("Subscribed %sto %s" % ('and listening ' if self.isListening() else '', notification)) def unsubscribe(self, notification): '''Unubscribe from a certain postgres notification.''' - logger.info("Unsubscribed from %s" % notification) + logger.debug("Unsubscribing from %s" % notification) with self.__lock: self.executeQuery("UNLISTEN %s;", (psycopg2.extensions.AsIs(notification),)) if notification in self.__callbacks: del self.__callbacks[notification] + logger.info("Unsubscribed from %s" % notification) def isListening(self): '''Are we listening? Has the listener been started?''' @@ -459,12 +482,16 @@ class PostgresListener(PostgresDatabaseConnection): self.disconnect() def __enter__(self): - '''starts the listener upon contect enter''' - self.start() + '''starts the listener upon 'with' context enter''' + try: + self.start() + except Exception as e: + logger.exception(str(e)) + self.stop() return self def __exit__(self, exc_type, exc_val, exc_tb): - '''stops the listener upon contect enter''' + '''stops the listener upon 'with' context enter''' self.stop() def _callCallback(self, channel, payload = None): diff --git a/LCS/PyCommon/test/postgres.py b/LCS/PyCommon/test/postgres.py index f98092c3d7fd2a42f745c20ff59b2703bdb6cb43..51e3be001e05424dea7358c5aa4f239e02140faf 100755 --- a/LCS/PyCommon/test/postgres.py +++ b/LCS/PyCommon/test/postgres.py @@ -90,6 +90,8 @@ class PostgresTestDatabaseInstance(): # make the user known in the new test database self._create_superuser(dsn) + logger.info('Created test-database instance. It is available at: %s', self.dbcreds.stringWithHiddenPassword()) + logger.info('Applying test-database schema...') self.apply_database_schema() return @@ -106,9 +108,6 @@ class PostgresTestDatabaseInstance(): # create user role query = "CREATE USER %s WITH SUPERUSER PASSWORD '%s'" % (self.dbcreds.user, self.dbcreds.password) cursor.execute(query) - - logger.info('Created test-database instance. It is available at: %s', - self.dbcreds.stringWithHiddenPassword()) finally: cursor.close() conn.commit() diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 0f6febfe8ce68168d2f93003444dfb52d90cb8ce..8fb09299fd0b8051e9252de5f57f31f601ddb489 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -75,7 +75,7 @@ from lofar.common.subprocess_utils import communicate_returning_strings from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession -from lofar.sas.tmss.client.tmssbuslistener import TMSSSubTaskEventMessageHandler, TMSSSubTaskBusListener +from lofar.sas.tmss.client.tmssbuslistener import TMSSEventMessageHandler, TMSSBusListener import subprocess import pipes @@ -342,7 +342,7 @@ class PipelineDependencies(object): return self.rarpc.getTasks(task_status=task_status, task_type=task_type) -class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): +class PipelineControlTMSSHandler(TMSSEventMessageHandler): def __init__(self): super(PipelineControlTMSSHandler, self).__init__() @@ -394,22 +394,24 @@ class PipelineControlTMSSHandler(TMSSSubTaskEventMessageHandler): except Exception as e: logger.error(e) - def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state: str): - try: - subtask = self.tmss_client.get_subtask(subtask_id) - subtask_template = self.tmss_client.get_url_as_json_object(subtask['specifications_template']) - if 'pipeline' not in subtask_template['type_value']: - logger.info("skipping scheduled subtask id=%s of non-pipeline type '%s'", subtask_id, subtask_template['type_value']) - return - - logger.info("getting parset for scheduled subtask id=%s of type '%s'", subtask_id, subtask_template['type_value']) - parset = self.tmss_client.get_subtask_parset(subtask_id) - parset = parameterset.fromString(parset) - parset = Parset(parset.dict()) - if parset and self._shouldHandle(parset): - self._startPipeline(subtask_id, parset) - except Exception as e: - logger.error(e) + def onSubTaskStatusChanged(self, id: int, status: str): + if status == "scheduled": + try: + subtask = self.tmss_client.get_subtask(id) + subtask_template = self.tmss_client.get_url_as_json_object(subtask['specifications_template']) + if 'pipeline' not in subtask_template['type_value']: + logger.info("skipping scheduled subtask id=%s of non-pipeline type '%s'", id, subtask_template['type_value']) + return + + logger.info("getting parset for scheduled subtask id=%s of type '%s'", id, subtask_template['type_value']) + parset = self.tmss_client.get_subtask_parset(id) + parset = parameterset.fromString(parset) + parset = Parset(parset.dict()) + if parset and self._shouldHandle(parset): + self._startPipeline(id, parset) + except Exception as e: + logger.error(e) + @staticmethod def _shouldHandle(parset): @@ -978,7 +980,7 @@ class PipelineControl(OTDBBusListener): -class PipelineControlTMSS(TMSSSubTaskBusListener): +class PipelineControlTMSS(TMSSBusListener): def __init__(self, handler_kwargs: dict = None, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER): diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index d189be1560b98aca99ce59488de94f107fc1f7b3..18bd13f9c1f44378b90bdd5e6f99919627123012 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -23,7 +23,7 @@ from subprocess import call from optparse import OptionParser, OptionGroup from lofar.common.util import waitForInterrupt from lofar.sas.otdb.OTDBBusListener import OTDBBusListener, OTDBEventMessageHandler -from lofar.sas.tmss.client.tmssbuslistener import TMSSSubTaskEventMessageHandler, TMSSSubTaskBusListener +from lofar.sas.tmss.client.tmssbuslistener import TMSSEventMessageHandler, TMSSBusListener from lofar.messaging import UsingToBusMixin, BusListener, ToBus, AbstractMessageHandler from lofar.messaging.messages import EventMessage, CommandMessage from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME @@ -67,8 +67,8 @@ class QAFilteringOTDBBusListener(OTDBBusListener): broker=broker) -class QAFilteringTMSSSubTaskBusListener(TMSSSubTaskBusListener): - class QAFilteringTMSSSubTaskEventMessageHandler(UsingToBusMixin, TMSSSubTaskEventMessageHandler): +class QAFilteringTMSSSubTaskBusListener(TMSSBusListener): + class QAFilteringTMSSSubTaskEventMessageHandler(UsingToBusMixin, TMSSEventMessageHandler): def _send_qa_command_message(self, subtask_id: int, command_subject: str): with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: tmsssession.set_subtask_status(subtask_id, 'queueing') @@ -83,14 +83,15 @@ class QAFilteringTMSSSubTaskBusListener(TMSSSubTaskBusListener): tmsssession.set_subtask_status(subtask_id, 'queued') - def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str): - with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: - subtask = tmsssession.get_subtask(subtask_id) - spec = tmsssession.get_url_as_json_object(subtask['specifications_template']) - if 'qa_files' == spec['type_value']: - self._send_qa_command_message(subtask_id, DEFAULT_DO_QAFILE_CONVERSION_SUBJECT) - elif 'qa_plots' == spec['type_value']: - self._send_qa_command_message(subtask_id, DEFAULT_DO_QAPLOTS_SUBJECT) + def onSubTaskStatusChanged(self, id: int, status:str): + if status == "scheduled": + with TMSSsession.create_from_dbcreds_for_ldap() as tmsssession: + subtask = tmsssession.get_subtask(id) + spec = tmsssession.get_url_as_json_object(subtask['specifications_template']) + if 'qa_files' == spec['type_value']: + self._send_qa_command_message(id, DEFAULT_DO_QAFILE_CONVERSION_SUBJECT) + elif 'qa_plots' == spec['type_value']: + self._send_qa_command_message(id, DEFAULT_DO_QAPLOTS_SUBJECT) def __init__(self, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER): super().__init__(handler_type=QAFilteringTMSSSubTaskBusListener.QAFilteringTMSSSubTaskEventMessageHandler, diff --git a/QA/QA_Service/test/t_qa_service.py b/QA/QA_Service/test/t_qa_service.py index 477ba9dc491fb229786c354b14ec0fc6e8fcd1fe..fe5bfc908acd25b0225f6a6747277302564efa44 100755 --- a/QA/QA_Service/test/t_qa_service.py +++ b/QA/QA_Service/test/t_qa_service.py @@ -27,7 +27,6 @@ from datetime import datetime import logging -from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment logger = logging.getLogger(__name__) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_notifications.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_notifications.sql index 75b7008589043e296e1fd1cefd693497ef17c41f..5bbf683cac0016797b857704d6cc360daf271ecd 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_notifications.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_notifications.sql @@ -12,12 +12,18 @@ BEGIN; SET LOCAL client_min_messages=warning; +DROP TRIGGER IF EXISTS T_NOTIFY_task_INSERT ON resource_allocation.task CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_INSERT(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_INSERT() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(NEW.id AS text) INTO payload; + +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_insert' AS text), payload); + RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -29,12 +35,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_INSERT(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_UPDATE ON resource_allocation.task CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_UPDATE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_UPDATE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.id AS text) INTO payload; +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_update' AS text), payload); END IF; RETURN NEW; @@ -48,12 +58,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_UPDATE(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_DELETE ON resource_allocation.task CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_DELETE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_DELETE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(OLD.id AS text) INTO payload; + +SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_delete' AS text), payload); + RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -65,12 +81,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_DELETE(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_INSERT_column_task_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(NEW.task_id AS text) INTO payload; + +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_insert_column_task_id' AS text), payload); + RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -82,12 +104,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT_column_task_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_UPDATE_column_task_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.task_id AS text) INTO payload; +IF ROW(NEW.task_id) IS DISTINCT FROM ROW(OLD.task_id) THEN +SELECT '{"id": ' || CAST(NEW.id AS text) || ', "task_id": "' || CAST(NEW.task_id AS text) || '"}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_update_column_task_id' AS text), payload); END IF; RETURN NEW; @@ -101,12 +127,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE_column_task_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_DELETE_column_task_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(OLD.task_id AS text) INTO payload; + +SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_delete_column_task_id' AS text), payload); + RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -118,12 +150,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE_column_task_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_INSERT_column_predecessor_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(NEW.predecessor_id AS text) INTO payload; + +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_insert_column_predecessor_id' AS text), payload); + RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -135,12 +173,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_INSERT_column_predecessor_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_UPDATE_column_predecessor_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.predecessor_id AS text) INTO payload; +IF ROW(NEW.predecessor_id) IS DISTINCT FROM ROW(OLD.predecessor_id) THEN +SELECT '{"id": ' || CAST(NEW.id AS text) || ', "predecessor_id": "' || CAST(NEW.predecessor_id AS text) || '"}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_update_column_predecessor_id' AS text), payload); END IF; RETURN NEW; @@ -154,12 +196,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_UPDATE_column_predecessor_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_task_predecessor_DELETE_column_predecessor_id ON resource_allocation.task_predecessor CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(OLD.predecessor_id AS text) INTO payload; + +SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('task_predecessor_delete_column_predecessor_id' AS text), payload); + RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -171,12 +219,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_task_predecessor_DELETE_column_predecessor_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_specification_UPDATE ON resource_allocation.specification CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_specification_UPDATE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_specification_UPDATE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.id AS text) INTO payload; +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('specification_update' AS text), payload); END IF; RETURN NEW; @@ -190,12 +242,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_specification_UPDATE(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_claim_INSERT ON resource_allocation.resource_claim CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_INSERT(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_INSERT() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(NEW.id AS text) INTO payload; + +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('resource_claim_insert' AS text), payload); + RETURN NEW; END; $$ LANGUAGE plpgsql; @@ -207,12 +265,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_INSERT(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_claim_UPDATE ON resource_allocation.resource_claim CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_UPDATE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_UPDATE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.id AS text) INTO payload; +SELECT '{"id": ' || CAST(NEW.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('resource_claim_update' AS text), payload); END IF; RETURN NEW; @@ -226,12 +288,18 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_UPDATE(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_claim_DELETE ON resource_allocation.resource_claim CASCADE; +DROP FUNCTION IF EXISTS resource_allocation.NOTIFY_resource_claim_DELETE(); + + CREATE OR REPLACE FUNCTION resource_allocation.NOTIFY_resource_claim_DELETE() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -SELECT CAST(OLD.id AS text) INTO payload; + +SELECT '{"id": ' || CAST(OLD.id AS text) || '}' INTO payload; PERFORM pg_notify(CAST('resource_claim_delete' AS text), payload); + RETURN OLD; END; $$ LANGUAGE plpgsql; @@ -243,12 +311,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_allocation.NOTIFY_resource_claim_DELETE(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_availability_UPDATE_column_resource_id ON resource_monitoring.resource_availability CASCADE; +DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id(); + + CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.resource_id AS text) INTO payload; +IF ROW(NEW.resource_id) IS DISTINCT FROM ROW(OLD.resource_id) THEN +SELECT '{"id": ' || CAST(NEW.id AS text) || ', "resource_id": "' || CAST(NEW.resource_id AS text) || '"}' INTO payload; PERFORM pg_notify(CAST('resource_availability_update_column_resource_id' AS text), payload); END IF; RETURN NEW; @@ -262,12 +334,16 @@ FOR EACH ROW EXECUTE PROCEDURE resource_monitoring.NOTIFY_resource_availability_UPDATE_column_resource_id(); +DROP TRIGGER IF EXISTS T_NOTIFY_resource_capacity_UPDATE_column_resource_id ON resource_monitoring.resource_capacity CASCADE; +DROP FUNCTION IF EXISTS resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id(); + + CREATE OR REPLACE FUNCTION resource_monitoring.NOTIFY_resource_capacity_UPDATE_column_resource_id() RETURNS TRIGGER AS $$ DECLARE payload text; BEGIN -IF ROW(NEW.*) IS DISTINCT FROM ROW(OLD.*) THEN -SELECT CAST(NEW.resource_id AS text) INTO payload; +IF ROW(NEW.resource_id) IS DISTINCT FROM ROW(OLD.resource_id) THEN +SELECT '{"id": ' || CAST(NEW.id AS text) || ', "resource_id": "' || CAST(NEW.resource_id AS text) || '"}' INTO payload; PERFORM pg_notify(CAST('resource_capacity_update_column_resource_id' AS text), payload); END IF; RETURN NEW; diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_add_notifications.sql.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_add_notifications.sql.py index 81410073abe2002580597812c7adb9aba1260e25..fc0406c720075bd9b02eb07558ee8940950d11ed 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_add_notifications.sql.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_add_notifications.sql.py @@ -29,6 +29,7 @@ from lofar.common.postgres import makePostgresNotificationQueries logger = logging.getLogger(__name__) if __name__ == '__main__': + print("ToDo: the new notifications send a json dict as payload instead on just the id. Adapt RADB code to handle that. For now it's ok, as we don't change the RADB schema or notifications.") with open('add_notifications.sql', 'wt') as f: f.write('--this file was generated by create_add_notifications.sql.py\n') f.write('--it creates triggers and functions which fire postgres notify events upon the given table actions\n') diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py index 89a69bac043124e497db465e8718f3ae08f761f9..6a1786252db82892e650c2e24899cc6836046570 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py @@ -74,7 +74,7 @@ class RADBPGListener(PostgresListener): def onTaskUpdated(self, payload = None): # Send notification for the given updated task - task_id = payload + task_id = json.loads(payload)['id'] task = self.radb.getTask(task_id) self._sendNotification('TaskUpdated', task) @@ -86,39 +86,39 @@ class RADBPGListener(PostgresListener): self._sendNotification('TaskUpdated', suc_sched_task) def onTaskInserted(self, payload = None): - self._sendNotification('TaskInserted', self.radb.getTask(payload)) + self._sendNotification('TaskInserted', self.radb.getTask(json.loads(payload)['id'])) def onTaskDeleted(self, payload = None): - self._sendNotification('TaskDeleted', payload) + self._sendNotification('TaskDeleted', json.loads(payload)['id']) - def onTaskPredecessorChanged(self, task_id): - logger.info('onTaskPredecessorChanged(task_id=%s)', task_id) - self._sendNotification('TaskUpdated', self.radb.getTask(task_id)) + def onTaskPredecessorChanged(self, payload): + logger.info('onTaskPredecessorChanged(task_id=%s)', json.loads(payload)['task_id']) + self._sendNotification('TaskUpdated', self.radb.getTask(json.loads(payload)['task_id'])) - def onTaskSuccessorChanged(self, task_id): - logger.info('onTaskSuccessorChanged(task_id=%s)', task_id) - self._sendNotification('TaskUpdated', self.radb.getTask(task_id)) + def onTaskSuccessorChanged(self, payload): + logger.info('onTaskSuccessorChanged(task_id=%s)', json.loads(payload)['task_id']) + self._sendNotification('TaskUpdated', self.radb.getTask(json.loads(payload)['task_id'])) def onSpecificationUpdated(self, payload = None): # when the specification starttime and endtime are updated, then that effects the task as well - self._sendNotification('TaskUpdated', self.radb.getTask(specification_id=payload)) + self._sendNotification('TaskUpdated', self.radb.getTask(specification_id=json.loads(payload)['id'])) def onResourceClaimUpdated(self, payload = None): - self._sendNotification('ResourceClaimUpdated', self.radb.getResourceClaim(payload)) + self._sendNotification('ResourceClaimUpdated', self.radb.getResourceClaim(json.loads(payload)['id'])) def onResourceClaimInserted(self, payload = None): - self._sendNotification('ResourceClaimInserted', self.radb.getResourceClaim(payload)) + self._sendNotification('ResourceClaimInserted', self.radb.getResourceClaim(json.loads(payload)['id'])) def onResourceClaimDeleted(self, payload = None): - self._sendNotification('ResourceClaimDeleted', payload) + self._sendNotification('ResourceClaimDeleted', json.loads(payload)['id']) def onResourceAvailabilityUpdated(self, payload = None): - r = self.radb.getResources(resource_ids=[payload], include_availability=True)[0] + r = self.radb.getResources(resource_ids=[json.loads(payload)['id']], include_availability=True)[0] r = {k:r[k] for k in ['id', 'active']} self._sendNotification('ResourceAvailabilityUpdated', r) def onResourceCapacityUpdated(self, payload = None): - r = self.radb.getResources(resource_ids=[payload], include_availability=True)[0] + r = self.radb.getResources(resource_ids=[json.loads(payload)['id']], include_availability=True)[0] r = {k:r[k] for k in ['id', 'total_capacity', 'available_capacity', 'used_capacity']} self._sendNotification('ResourceCapacityUpdated', r) diff --git a/SAS/TMSS/client/lib/tmss_http_rest_client.py b/SAS/TMSS/client/lib/tmss_http_rest_client.py index 0fb13c71c657b88d82ad7bdd2dafdd6419cfdf99..48df33a1cab208a1892a4f5aa47ed11e73b066e0 100644 --- a/SAS/TMSS/client/lib/tmss_http_rest_client.py +++ b/SAS/TMSS/client/lib/tmss_http_rest_client.py @@ -156,10 +156,13 @@ class TMSSsession(object): return self.get_path_as_json_object("subtask", clauses) + def get_full_url_for_path(self, path: str) -> str: + '''get the full URL for the given path''' + return '%s/%s' % (self.base_url, path.strip('/')) + def get_path_as_json_object(self, path: str, params={}) -> object: '''get resource at the given path, interpret it as json, and return it as as native object (usually a dict or a list of dicts)''' - full_url = '%s/%s' % (self.base_url, path.strip('/')) - return self.get_url_as_json_object(full_url, params=params) + return self.get_url_as_json_object(self.get_full_url_for_path(path=path), params=params) def get_url_as_json_object(self, full_url: str, params={}) -> object: '''get resource at the given full url (including http://<base_url>, interpret it as json, and return it as as native object (usually a dict or a list of dicts)''' diff --git a/SAS/TMSS/client/lib/tmssbuslistener.py b/SAS/TMSS/client/lib/tmssbuslistener.py index 30c49bb7ce9ae2bd70093821088dbd1a4667e607..81448e9a16c97e4cfb5f91213a218dde91f9edaf 100644 --- a/SAS/TMSS/client/lib/tmssbuslistener.py +++ b/SAS/TMSS/client/lib/tmssbuslistener.py @@ -28,180 +28,209 @@ Typical usage is to derive your own subclass from TMSSBusListener and implement """ from lofar.messaging.messagebus import BusListener, AbstractMessageHandler -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, EventMessage +from lofar.messaging import DEFAULT_BUSNAME, DEFAULT_BROKER, EventMessage +from lofar.messaging.exceptions import MessageHandlerUnknownSubjectError from lofar.common.util import waitForInterrupt, single_line_with_single_spaces import logging logger = logging.getLogger(__name__) -_DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE = 'TMSS.%s.notification' -DEFAULT_TMSS_TASK_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE % 'Task' -DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE % 'SubTask' -DEFAULT_TMSS_ALL_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE + '#' +_TMSS_EVENT_PREFIX_TEMPLATE = 'TMSS.Event.%s' +TMSS_SUBTASK_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SubTask.Object' +TMSS_SUBTASK_STATUS_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SubTask.Status' +TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'TaskBlueprint.Object' +TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'TaskBlueprint.Status' +TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'TaskDraft.Object' +TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SchedulingUnitBlueprint.Object' +TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SchedulingUnitBlueprint.Status' +TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX = _TMSS_EVENT_PREFIX_TEMPLATE % 'SchedulingUnitDraft.Object' +TMSS_ALL_OBJECT_EVENTS_FILTER = _TMSS_EVENT_PREFIX_TEMPLATE % '.*.Object.#' +TMSS_ALL_STATUS_EVENTS_FILTER = _TMSS_EVENT_PREFIX_TEMPLATE % '.*.Status.#' +TMSS_ALL_EVENTS_FILTER = _TMSS_EVENT_PREFIX_TEMPLATE % '#' -class TMSSSubTaskEventMessageHandler(AbstractMessageHandler): +class TMSSEventMessageHandler(AbstractMessageHandler): ''' - Base-type messagehandler for handling TMSS event messages. - Typical usage is to derive your own subclass from TMSSSubTaskEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. + Base-type messagehandler for handling all TMSS event messages. + Typical usage is to derive your own subclass from TMSSEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. ''' def handle_message(self, msg: EventMessage): if not isinstance(msg, EventMessage): raise ValueError("%s: Ignoring non-EventMessage: %s" % (self.__class__.__name__, msg)) - stripped_subject = msg.subject.replace("%s." % DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX, '') - - logger.info("%s.on%s: %s" % (self.__class__.__name__, stripped_subject, single_line_with_single_spaces(msg.content))) - - if stripped_subject == 'Defining': - self.onSubTaskDefining(**msg.content) - elif stripped_subject == 'Defined': - self.onSubTaskDefined(**msg.content) - elif stripped_subject == 'Scheduling': - self.onSubTaskScheduling(**msg.content) - elif stripped_subject == 'Scheduled': - self.onSubTaskScheduled(**msg.content) - elif stripped_subject == 'Queueing': - self.onSubTaskQueueing(**msg.content) - elif stripped_subject == 'Queued': - self.onSubTaskQueued(**msg.content) - elif stripped_subject == 'Starting': - self.onSubTaskStarting(**msg.content) - elif stripped_subject == 'Started': - self.onSubTaskStarted(**msg.content) - elif stripped_subject == 'Finishing': - self.onSubTaskFinishing(**msg.content) - elif stripped_subject == 'Finished': - self.onSubTaskFinished(**msg.content) - elif stripped_subject == 'Cancelling': - self.onSubTaskCancelling(**msg.content) - elif stripped_subject == 'Cancelled': - self.onSubTaskCancelled(**msg.content) - elif stripped_subject == 'Error': - self.onSubTaskError(**msg.content) + stripped_subject = msg.subject.replace(_TMSS_EVENT_PREFIX_TEMPLATE%('',), '') + + logger.info("%s %s: %s" % (self.__class__.__name__, stripped_subject, single_line_with_single_spaces(msg.content))) + + # sorry, very big if/elif/else tree. + # it just maps all possible event subjects for all possible objects and statuses onto handler methods. + if stripped_subject == 'SubTask.Object.Created': + self.onSubTaskCreated(**msg.content) + elif stripped_subject == 'SubTask.Object.Updated': + self.onSubTaskUpdated(**msg.content) + elif stripped_subject == 'SubTask.Object.Deleted': + self.onSubTaskDeleted(**msg.content) + elif stripped_subject == 'TaskBlueprint.Object.Created': + self.onTaskBlueprintCreated(**msg.content) + elif stripped_subject == 'TaskBlueprint.Object.Updated': + self.onTaskBlueprintUpdated(**msg.content) + elif stripped_subject == 'TaskBlueprint.Object.Deleted': + self.onTaskBlueprintDeleted(**msg.content) + elif stripped_subject == 'TaskDraft.Object.Created': + self.onTaskDraftCreated(**msg.content) + elif stripped_subject == 'TaskDraft.Object.Updated': + self.onTaskDraftUpdated(**msg.content) + elif stripped_subject == 'TaskDraft.Object.Deleted': + self.onTaskDraftDeleted(**msg.content) + elif stripped_subject == 'SchedulingUnitBlueprint.Object.Created': + self.onSchedulingUnitBlueprintCreated(**msg.content) + elif stripped_subject == 'SchedulingUnitBlueprint.Object.Updated': + self.onSchedulingUnitBlueprintUpdated(**msg.content) + elif stripped_subject == 'SchedulingUnitBlueprint.Object.Deleted': + self.onSchedulingUnitBlueprintDeleted(**msg.content) + elif stripped_subject == 'SchedulingUnitDraft.Object.Created': + self.onSchedulingUnitDraftCreated(**msg.content) + elif stripped_subject == 'SchedulingUnitDraft.Object.Updated': + self.onSchedulingUnitDraftUpdated(**msg.content) + elif stripped_subject == 'SchedulingUnitDraft.Object.Deleted': + self.onSchedulingUnitDraftDeleted(**msg.content) + elif stripped_subject.startswith('SubTask.Status.'): + self.onSubTaskStatusChanged(**msg.content) + elif stripped_subject.startswith('TaskBlueprint.Status.'): + self.onTaskBlueprintStatusChanged(**msg.content) + elif stripped_subject.startswith('SchedulingUnitBlueprint.Status.'): + self.onSchedulingUnitBlueprintStatusChanged(**msg.content) else: - raise ValueError("TMSSBusListener.handleMessage: unknown subject: %s" % msg.subject) + raise MessageHandlerUnknownSubjectError("TMSSBusListener.handleMessage: unknown subject: %s" % msg.subject) - def onSubTaskDefining(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskDefining is called upon receiving a SubTaskDefining message, which is sent when a SubTasks changes state to "Defining". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + + def onSubTaskStatusChanged(self, id: int, status:str): + '''onSubTaskStatusChanged is called upon receiving a SubTask.Status.* message, which is sent when a SubTasks changes status. + :param id: the TMSS id of the SubTask + :param status: the new status of the SubTask + ''' + pass + + def onTaskBlueprintStatusChanged(self, id: int, status:str): + '''onTaskBlueprintStatusChanged is called upon receiving a TaskBlueprint.Status.* message, which is sent when a TaskBlueprint changes status. + :param id: the TMSS id of the TaskBlueprint + :param status: the new status of the TaskBlueprint + ''' + pass + + def onSchedulingUnitBlueprintStatusChanged(self, id: int, status:str): + '''onSchedulingUnitBlueprintStatusChanged is called upon receiving a SchedulingUnitBlueprint.Status.* message, which is sent when a SchedulingUnitBlueprints changes status. + :param id: the TMSS id of the SchedulingUnitBlueprint + :param status: the new status of the SchedulingUnitBlueprint + ''' + pass + + def onSubTaskCreated(self, id: int): + '''onSubTaskCreated is called upon receiving a SubTask.Object.Created message, which is sent when a SubTasks was created. + :param id: the TMSS id of the SubTask + ''' + pass + + def onSubTaskUpdated(self, id: int): + '''onSubTaskUpdated is called upon receiving a SubTask.Object.Updated message, which is sent when a SubTasks was created. + :param id: the TMSS id of the SubTask + ''' + pass + + def onSubTaskDeleted(self, id: int): + '''onSubTaskDeleted is called upon receiving a SubTask.Object.Deleted message, which is sent when a SubTasks was created. + :param id: the TMSS id of the SubTask ''' pass - def onSubTaskDefined(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskDefined is called upon received a SubTaskDefined message, which is sent when a SubTasks changes state to "Defined". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskDraftCreated(self, id: int): + '''onTaskDraftCreated is called upon receiving a TaskDraft.Object.Created message, which is sent when a TaskDrafts was created. + :param id: the TMSS id of the TaskDraft ''' pass - def onSubTaskScheduling(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskScheduling is called upon receiving a SubTaskScheduling message, which is sent when a SubTasks changes state to "Scheduling". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskDraftUpdated(self, id: int): + '''onTaskDraftUpdated is called upon receiving a TaskDraft.Object.Updated message, which is sent when a TaskDrafts was created. + :param id: the TMSS id of the TaskDraft ''' pass - def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskScheduled is called upon received a SubTaskScheduled message, which is sent when a SubTasks changes state to "Scheduled". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskDraftDeleted(self, id: int): + '''onTaskDraftDeleted is called upon receiving a TaskDraft.Object.Deleted message, which is sent when a TaskDrafts was created. + :param id: the TMSS id of the TaskDraft ''' pass - def onSubTaskQueueing(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskQueueing is called upon receiving a SubTaskQueueing message, which is sent when a SubTasks changes state to "Queueing". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskBlueprintCreated(self, id: int): + '''onTaskBlueprintCreated is called upon receiving a TaskBlueprint.Object.Created message, which is sent when a TaskBlueprints was created. + :param id: the TMSS id of the TaskBlueprint ''' pass - def onSubTaskQueued(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskQueued is called upon received a SubTaskQueued message, which is sent when a SubTasks changes state to "Queued". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskBlueprintUpdated(self, id: int): + '''onTaskBlueprintUpdated is called upon receiving a TaskBlueprint.Object.Updated message, which is sent when a TaskBlueprints was created. + :param id: the TMSS id of the TaskBlueprint ''' pass - def onSubTaskStarting(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskStarting is called upon receiving a SubTaskStarting message, which is sent when a SubTasks changes state to "Starting". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onTaskBlueprintDeleted(self, id: int): + '''onTaskBlueprintDeleted is called upon receiving a TaskBlueprint.Object.Deleted message, which is sent when a TaskBlueprints was created. + :param id: the TMSS id of the TaskBlueprint ''' pass - def onSubTaskStarted(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskStarted is called upon received a SubTaskStarted message, which is sent when a SubTasks changes state to "Started". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitDraftCreated(self, id: int): + '''onSchedulingUnitDraftCreated is called upon receiving a SchedulingUnitDraft.Object.Created message, which is sent when a SchedulingUnitDrafts was created. + :param id: the TMSS id of the SchedulingUnitDraft ''' pass - def onSubTaskFinishing(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskFinishing is called upon receiving a SubTaskFinishing message, which is sent when a SubTasks changes state to "Finishing". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitDraftUpdated(self, id: int): + '''onSchedulingUnitDraftUpdated is called upon receiving a SchedulingUnitDraft.Object.Updated message, which is sent when a SchedulingUnitDrafts was created. + :param id: the TMSS id of the SchedulingUnitDraft ''' pass - def onSubTaskFinished(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskFinished is called upon received a SubTaskFinished message, which is sent when a SubTasks changes state to "Finished". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitDraftDeleted(self, id: int): + '''onSchedulingUnitDraftDeleted is called upon receiving a SchedulingUnitDraft.Object.Deleted message, which is sent when a SchedulingUnitDrafts was created. + :param id: the TMSS id of the SchedulingUnitDraft ''' pass - def onSubTaskCancelling(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskCancelling is called upon receiving a SubTaskCancelling message, which is sent when a SubTasks changes state to "Cancelling". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitBlueprintCreated(self, id: int): + '''onSchedulingUnitBlueprintCreated is called upon receiving a SchedulingUnitBlueprint.Object.Created message, which is sent when a SchedulingUnitBlueprints was created. + :param id: the TMSS id of the SchedulingUnitBlueprint ''' pass - def onSubTaskCancelled(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskCancelled is called upon received a SubTaskCancelled message, which is sent when a SubTasks changes state to "Cancelled". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitBlueprintUpdated(self, id: int): + '''onSchedulingUnitBlueprintUpdated is called upon receiving a SchedulingUnitBlueprint.Object.Updated message, which is sent when a SchedulingUnitBlueprints was created. + :param id: the TMSS id of the SchedulingUnitBlueprint ''' pass - def onSubTaskError(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskError is called upon receiving a SubTaskError message, which is sent when a SubTasks changes state to "Error". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask + def onSchedulingUnitBlueprintDeleted(self, id: int): + '''onSchedulingUnitBlueprintDeleted is called upon receiving a SchedulingUnitBlueprint.Object.Deleted message, which is sent when a SchedulingUnitBlueprints was created. + :param id: the TMSS id of the SchedulingUnitBlueprint ''' pass -class TMSSSubTaskBusListener(BusListener): +class TMSSBusListener(BusListener): def __init__(self, - handler_type: TMSSSubTaskEventMessageHandler.__class__ = TMSSSubTaskEventMessageHandler, + handler_type: TMSSEventMessageHandler.__class__ = TMSSEventMessageHandler, handler_kwargs: dict = None, exchange: str = DEFAULT_BUSNAME, - routing_key: str = DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX+".#", + routing_key: str = TMSS_ALL_EVENTS_FILTER, num_threads: int = 1, broker: str = DEFAULT_BROKER): """ - TMSSSubTaskBusListener listens on the lofar notification message bus and calls on<SomeMessage> methods in the TMSSSubTaskEventMessageHandler when such a message is received. - Typical usage is to derive your own subclass from TMSSSubTaskEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. + TMSSSubTaskBusListener listens on the lofar notification message bus and calls on<SomeMessage> methods in the TMSSEventMessageHandler when such a message is received. + Typical usage is to derive your own subclass from TMSSEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. """ - if not issubclass(handler_type, TMSSSubTaskEventMessageHandler): - raise TypeError("handler_type should be a TMSSSubTaskEventMessageHandler subclass") + if not issubclass(handler_type, TMSSEventMessageHandler): + raise TypeError("handler_type should be a TMSSEventMessageHandler subclass") super().__init__(handler_type, handler_kwargs, exchange, routing_key, num_threads, broker) @@ -209,10 +238,11 @@ class TMSSSubTaskBusListener(BusListener): if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - class ExampleTMSSSubTaskEventMessageHandler(TMSSSubTaskEventMessageHandler): - def onSubTaskDefined(self, **kwargs): - logger.debug("MyTMSSSubTaskEventMessageHandler.onSubTaskDefined(%s)", kwargs) + class ExampleTMSSEventMessageHandler(TMSSEventMessageHandler): + def onSubTaskStatusChanged(self, id: int, status:str): + logger.debug("MyTMSSEventMessageHandler.onSubTaskStatusChanged(id=%s, status=%s)", id, status) - with TMSSSubTaskBusListener(handler_type=ExampleTMSSSubTaskEventMessageHandler): + from lofar.messaging.messagebus import BusListenerJanitor + with BusListenerJanitor(TMSSBusListener(handler_type=ExampleTMSSEventMessageHandler)): waitForInterrupt() diff --git a/SAS/TMSS/frontend/tmss_webapp/src/components/Timeline/CalendarTimeline.js b/SAS/TMSS/frontend/tmss_webapp/src/components/Timeline/CalendarTimeline.js index 4dfd1a1e96fe5e7a4fc0ceb9643941fccc990edb..d611b7857cf365ed32c8eee56b0936e8eb7e5b87 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/components/Timeline/CalendarTimeline.js +++ b/SAS/TMSS/frontend/tmss_webapp/src/components/Timeline/CalendarTimeline.js @@ -5,7 +5,8 @@ import Timeline, { SidebarHeader, DateHeader, CustomMarker, - CursorMarker + CursorMarker, + CustomHeader } from 'react-calendar-timeline'; import containerResizeDetector from 'react-calendar-timeline/lib/resize-detector/container'; import moment from 'moment'; @@ -83,7 +84,7 @@ export class CalendarTimeline extends Component { prevZoomRange: null, lineHeight: props.rowHeight || 50, // Row line height sidebarWidth: props.sidebarWidth || 200, - timeSteps: props.timeSteps || {minute: 60}, + timeSteps: props.timeSteps || {minute: 1}, canMove: props.itemsMovable || false, canResize: props.itemsResizable || false, canchangeGroup: props.itemGroupChangeable || true, @@ -126,6 +127,7 @@ export class CalendarTimeline extends Component { this.renderLSTDateHeader = this.renderLSTDateHeader.bind(this); this.renderCursor = this.renderCursor.bind(this); this.renderItem = this.renderItem.bind(this); + this.renderNormalSuntimeHeader = this.renderNormalSuntimeHeader.bind(this); //<<<<<<< Custom Renderer Functions //>>>>>> Functions of this component @@ -257,6 +259,14 @@ export class CalendarTimeline extends Component { :`Week (${this.state.timelineStartDate.week()}) / Day`}</div> <div style={{height:'30px'}}>{this.state.dayHeaderVisible?`UTC(Hr)`:`UTC(Day)`}</div> <div style={{height:'30px'}}>{this.state.dayHeaderVisible?`LST(Hr)`:`LST(Day)`}</div> + {this.state.viewType === UIConstants.timeline.types.NORMAL && + <div className="p-grid" + style={{height:this.props.showSunTimings?'30px':'0px', paddingTop:'10px', paddingLeft:'10px'}}> + <div className="col-4" style={{marginTop:'2px', paddingLeft:'5px', backgroundColor:'yellow', color: '#212529'}}>Sunrise</div> + <div className="col-4" style={{marginTop:'2px', paddingLeft:'5px', backgroundColor:'orange', color: '#212529'}}>Sunset</div> + <div className="col-4" style={{marginTop:'2px', paddingLeft:'5px', backgroundColor:'blue'}}>Night</div> + </div> + } </div> ); } @@ -345,7 +355,7 @@ export class CalendarTimeline extends Component { return <div {...getIntervalProps()} className="rct-dateHeader" style={divStyle}> { (this.state.timeHeaderLabelVisibile)? (showBorder)? - <span> + <span key={`utchead-${displayValue}`}> {displayValue} </span>: <> @@ -420,6 +430,75 @@ export class CalendarTimeline extends Component { } } + /** Custom renderer to show sunrise, sunset and night times */ + renderNormalSuntimeHeader({ + headerContext: { intervals }, + getRootProps, + getIntervalProps, + showPeriod, + data, + }) { + const sunTimeMap = this.state.sunTimeMap; + return ( + <div {...getRootProps()}> + {intervals.map(interval => { + const dayStyle = { + lineHeight: '30px', + backgroundColor: 'white', + color: 'white' + } + const nightStyle = { + lineHeight: '30px', + backgroundColor: 'blue', + color: 'blue' + } + const sunriseStyle = { + lineHeight: '30px', + backgroundColor: 'yellow', + color: 'yellow' + } + const sunsetStyle = { + lineHeight: '30px', + backgroundColor: 'orange', + color: 'orange' + } + // Get the intervals UTC date format and time + const intervalDate = interval.startTime.clone().utc().format("YYYYMMDDT12:00:00"); + const intervalTime = interval.startTime.clone().utc(); + // Get the suntime for the UTC date + const intervalDateSunTime = sunTimeMap[intervalDate]; + let intervalStyle = dayStyle; + // If suntime is available display suntime blocks + if (intervalDateSunTime) { + // Set 15 minutes duration for sunrise and sunset and create blocks accordingly + if (intervalTime.isBefore(intervalDateSunTime.sunrise) || + intervalTime.isAfter(intervalDateSunTime.sunset.clone().add(14, 'minutes'))) { + intervalStyle = nightStyle; + } else if (intervalTime.isSame(intervalDateSunTime.sunrise) || + intervalTime.isBefore(intervalDateSunTime.sunrise.clone().add(15, 'minutes'))) { + intervalStyle = sunriseStyle; + } else if (intervalTime.isSame(intervalDateSunTime.sunset) || + (intervalTime.isAfter(intervalDateSunTime.sunset) && + intervalTime.isBefore(intervalDateSunTime.sunset.clone().add(15, 'minutes')))) { + intervalStyle = sunsetStyle; + } + return ( + <div + {...getIntervalProps({ + interval, + style: intervalStyle + })} + > + </div> + ) + } else { + return (""); + } + })} + </div> + ) + } + /** * Function to render sunrise timings on the timeline view in normal view. * @param {Array} sunRiseTimings @@ -636,18 +715,22 @@ export class CalendarTimeline extends Component { * @param {moment} endTime */ setNormalSuntimings(startTime, endTime) { - let sunRiseTimings = [], sunSetTimings = []; + let sunRiseTimings = [], sunSetTimings = [], sunTimeMap={}; const noOfDays = endTime.diff(startTime, 'days'); for (const number of _.range(noOfDays+1)) { const date = startTime.clone().add(number, 'days').hours(12).minutes(0).seconds(0); - UtilService.getSunTimings(date.format("YYYYMMDDTHH:mm:ss")+"Z").then(timings => { + const formattedDate = date.format("YYYYMMDDTHH:mm:ss"); + UtilService.getSunTimings(formattedDate+"Z").then(timings => { + const sunriseTime = moment.utc(timings.sun_rise.split('.')[0]); + const sunsetTime = moment.utc(timings.sun_set.split('.')[0]); if (moment.utc(timings.sun_rise).isAfter(startTime)) { - sunRiseTimings.push(moment.utc(timings.sun_rise)); + sunRiseTimings.push(sunriseTime); } if (moment.utc(timings.sun_set).isBefore(endTime)) { - sunSetTimings.push(moment.utc(timings.sun_set)); + sunSetTimings.push(sunsetTime); } - this.setState({sunRiseTimings: sunRiseTimings, sunSetTimings: sunSetTimings}); + sunTimeMap[formattedDate] = {sunrise: sunriseTime, sunset: sunsetTime}; + this.setState({sunRiseTimings: sunRiseTimings, sunSetTimings: sunSetTimings, sunTimeMap: sunTimeMap}); }); } } @@ -718,6 +801,7 @@ export class CalendarTimeline extends Component { async changeZoomLevel(zoomLevel, isTimelineZoom) { zoomLevel = zoomLevel?zoomLevel: DEFAULT_ZOOM_LEVEL; const newZoomLevel = _.find(ZOOM_LEVELS, {'name': zoomLevel}); + this.setState({isTimelineZoom: isTimelineZoom}); let startTime = this.state.defaultStartTime; let endTime = this.state.defaultEndTime; if (zoomLevel === 'Custom') { @@ -751,7 +835,7 @@ export class CalendarTimeline extends Component { let result = await this.changeDateRange(startTime, endTime); let group = DEFAULT_GROUP.concat(result.group); this.setState({zoomLevel: zoomLevel, defaultStartTime: startTime, defaultEndTime: endTime, - isTimelineZoom: isTimelineZoom, zoomRange: null, + isTimelineZoom: true, zoomRange: null, dayHeaderVisible: true, weekHeaderVisible: false, lstDateHeaderUnit: 'hour', group: group, items: result.items}); } @@ -1011,6 +1095,13 @@ export class CalendarTimeline extends Component { // This method will render once but will not update the values after fetching from server // <DateHeader unit={this.state.lstDateHeaderUnit} intervalRenderer={this.renderLSTDateHeader}></DateHeader> } + {/* Suntime Header in normal view with sunrise, sunset and night time */} + {this.props.showSunTimings && this.state.viewType === UIConstants.timeline.types.NORMAL && this.state.sunTimeMap && + <CustomHeader height={30} unit="minute" + children={({ headerContext: { intervals }, getRootProps, getIntervalProps, showPeriod, data})=> { + return this.renderNormalSuntimeHeader({ headerContext: { intervals }, getRootProps, getIntervalProps, showPeriod, data})}}> + </CustomHeader> + } </TimelineHeaders> <TimelineMarkers> diff --git a/SAS/TMSS/frontend/tmss_webapp/src/layout/sass/_timeline.scss b/SAS/TMSS/frontend/tmss_webapp/src/layout/sass/_timeline.scss index 953df5906117caa9ba42ad60fef84ea12934a515..25e0ca50ba4e4546260f615e0ccb3150dc01d50a 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/layout/sass/_timeline.scss +++ b/SAS/TMSS/frontend/tmss_webapp/src/layout/sass/_timeline.scss @@ -1,3 +1,9 @@ +.sticky { + position: sticky; + top:49px; + z-index:1000; +} + .rct-sidebar-row { font-size: 14px; } diff --git a/SAS/TMSS/frontend/tmss_webapp/src/routes/Scheduling/create.scheduleset.js b/SAS/TMSS/frontend/tmss_webapp/src/routes/Scheduling/create.scheduleset.js index 885e56615b0bc9c712a7da26cae5a247482ab486..2e330e0fbc70c7708dd629f2a17e1229e8507b18 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/routes/Scheduling/create.scheduleset.js +++ b/SAS/TMSS/frontend/tmss_webapp/src/routes/Scheduling/create.scheduleset.js @@ -2,8 +2,6 @@ import React, {Component} from 'react'; import { Link, Redirect } from 'react-router-dom'; import _ from 'lodash'; -import PageHeader from '../../layout/components/PageHeader'; - import {Dropdown} from 'primereact/dropdown'; import { Button } from 'primereact/button'; import {Dialog} from 'primereact/components/dialog/Dialog'; @@ -24,9 +22,9 @@ import 'ag-grid-community/dist/styles/ag-grid.css'; import 'ag-grid-community/dist/styles/ag-theme-alpine.css'; import UnitConverter from '../../utils/unit.converter' import Validator from '../../utils/validator'; - +import PageHeader from '../../layout/components/PageHeader'; /** - * Component to create / update Scheduling Unit Draft using Spreadsheet + * Component to create / update Scheduling Unit Drafts using Spreadsheet */ export class SchedulingSetCreate extends Component { constructor(props) { @@ -37,27 +35,22 @@ export class SchedulingSetCreate extends Component { this.tmpRowData = []; this.state = { - projectDisabled: (props.match?(props.match.params.project? true:false):false), isLoading: true, // Flag for loading spinner dialog: { header: '', detail: ''}, // Dialog properties redirect: null, // URL to redirect errors: [], // Form Validation errors clipboard: [], // Maintaining grid data while Ctrl+C/V - schedulingUnit: { project: (props.match?props.match.params.project:null) || null, }, - schedulingSets: [], schedulingUnitList: [], selectedSchedulingSetId: null, - // selectedStrategyId: null, - // selectedProjectId: null, observStrategy: {}, totalCount: 0, - validEditor: false, // For JSON editor validation - validFields: {}, // For Form Validation + validEditor: false, + validFields: {}, noOfSU: 10, //ag-grid columnMap: [], @@ -81,7 +74,7 @@ export class SchedulingSetCreate extends Component { editable: true, flex: 1, sortable: true, minWidth: 100, resizable: true, }, rowSelection: 'multiple', - // ag grid show row index + // ag grid to show row index components: { rowIdRenderer: function (params) { return 1 + params.rowIndex; @@ -103,7 +96,7 @@ export class SchedulingSetCreate extends Component { this.cancelCreate = this.cancelCreate.bind(this); this.clipboardEvent = this.clipboardEvent.bind(this); this.reset = this.reset.bind(this); - + this.projects = []; // All projects to load project dropdown this.schedulingSets = []; // All scheduling sets to be filtered for project this.observStrategies = []; // All Observing strategy templates @@ -204,6 +197,9 @@ export class SchedulingSetCreate extends Component { this.state.gridApi.redrawRows(); } + /** + * Resolve JSON Schema + */ async resolveSchema(schema){ let properties = schema.properties; schema.definitions = schema.definitions?schema.definitions:{}; @@ -247,76 +243,18 @@ export class SchedulingSetCreate extends Component { return schema; } + /** * Function to generate AG-Grid column definition. * @param {number} strategyId */ - async createGridColums(scheduleUnit){ - let strategyId = scheduleUnit.observation_strategy_template_id; - let tasksToUpdate = {}; - const observStrategy = _.find(this.observStrategies, {'id': strategyId}); - const tasks = observStrategy.template.tasks; - let paramsOutput = {}; - let schema = { type: 'object', additionalProperties: false, - properties: {}, definitions:{} - }; - - let taskDrafts= []; - await ScheduleService.getTasksDraftBySchedulingUnitId(scheduleUnit.id).then(response =>{ - taskDrafts= response.data.results; - }) - - for (const taskName in tasks) { - const task = tasks[taskName]; - const taskDraft = taskDrafts.find(taskD => taskD.name === taskName); - if (taskDraft) { - task.specifications_doc = taskDraft.specifications_doc; - } - //Resolve task from the strategy template - const $taskRefs = await $RefParser.resolve(task); - - // Identify the task specification template of every task in the strategy template - const taskTemplate = _.find(this.taskTemplates, {'name': task['specifications_template']}); - schema['$id'] = taskTemplate.schema['$id']; - schema['$schema'] = taskTemplate.schema['$schema']; - let index = 0; - for (const param of observStrategy.template.parameters) { - if (param.refs[0].indexOf(`/tasks/${taskName}`) > 0) { - tasksToUpdate[taskName] = taskName; - // Resolve the identified template - const $templateRefs = await $RefParser.resolve(taskTemplate); - let property = { }; - let tempProperty = null; - const taskPaths = param.refs[0].split("/"); - // Get the property type from the template and create new property in the schema for the parameters - try { - const parameterRef = param.refs[0]; - tempProperty = $templateRefs.get(parameterRef); - } catch(error) { - tempProperty = _.cloneDeep(taskTemplate.schema.properties[taskPaths[4]]); - if (tempProperty.type === 'array') { - tempProperty = tempProperty.items.properties[taskPaths[6]]; - } - property = tempProperty; - } - property.title = param.name; - property.default = $taskRefs.get(param.refs[0].replace(`#/tasks/${taskName}`, '#')); - paramsOutput[`param_${index}`] = property.default; - schema.properties[`param_${index}`] = property; - // Set property defintions taken from the task template in new schema - for (const definitionName in taskTemplate.schema.definitions) { - schema.definitions[definitionName] = taskTemplate.schema.definitions[definitionName]; - } - } - index++; - } - } - + async createGridColumns(scheduleUnit){ + let schema = await this.getTaskSchema(scheduleUnit); schema = await this.resolveSchema(schema); // AG Grid Cell Specific Properties const cellProps =[]; - cellProps['angle1'] = {type:'numberValueColumn', cellRenderer: 'timeInputMask',cellEditor: 'timeInputMask' }; - cellProps['angle2'] = {type:'numberValueColumn', cellRenderer: 'degreeInputMask',cellEditor: 'degreeInputMask' }; + cellProps['angle1'] = {type:'numberValueColumn', cellRenderer: 'timeInputMask',cellEditor: 'timeInputMask', valueSetter: 'valueSetter' }; + cellProps['angle2'] = {type:'numberValueColumn', cellRenderer: 'degreeInputMask',cellEditor: 'degreeInputMask', valueSetter: 'valueSetter' }; cellProps['angle3'] = {cellEditor: 'numericEditor',}; cellProps['direction_type'] = {cellEditor: 'agSelectCellEditor',default: schema.definitions.pointing.properties.direction_type.default, cellEditorParams: { @@ -324,6 +262,8 @@ export class SchedulingSetCreate extends Component { }, }; //Ag-grid Colums definition + + let colKeyOrder = []; let columnMap = []; let colProperty = {}; let columnDefs = [ @@ -344,6 +284,8 @@ export class SchedulingSetCreate extends Component { ], } ]; + colKeyOrder.push("suname"); + colKeyOrder.push("sudesc"); colProperty ={'ID':'id', 'Name':'suname', 'Description':'sudesc'}; columnMap['Scheduling Unit'] = colProperty; @@ -356,15 +298,15 @@ export class SchedulingSetCreate extends Component { let childern = []; colProperty = {}; - let childalais = property.title; - childalais = _.lowerCase(childalais).split(' ').map(x => x[0]).join(''); + let childalias = property.title; + childalias = _.lowerCase(childalias).split(' ').map(x => x[0]).join(''); const paramKeys = Object.keys(property.default); paramKeys.forEach(key =>{ - colProperty[key] = childalais+key; + colProperty[key] = childalias+key; let cellAttr = {}; cellAttr['headerName'] = definitions[key].title; - cellAttr['field'] = childalais+key; - + cellAttr['field'] = childalias+key; + colKeyOrder.push(childalias+key); let cellKeys = Object.keys(cellProps[key]); for(const cellKey of cellKeys){ cellAttr[cellKey] = cellProps[key][cellKey]; @@ -377,12 +319,79 @@ export class SchedulingSetCreate extends Component { }) columnMap[property.title] = colProperty; } + colProperty ={'From':'bfrom', 'Until':'buntil'}; + columnMap['Between'] = colProperty; this.setState({ columnDefs:columnDefs, columnMap:columnMap, + colKeyOrder:colKeyOrder }) + } + async getTaskSchema(scheduleUnit){ + let strategyId = scheduleUnit.observation_strategy_template_id; + let tasksToUpdate = {}; + const observStrategy = _.find(this.observStrategies, {'id': strategyId}); + const tasks = observStrategy.template.tasks; + let paramsOutput = {}; + let schema = { type: 'object', additionalProperties: false, + properties: {}, definitions:{} + }; + + let taskDrafts= []; + await ScheduleService.getTasksDraftBySchedulingUnitId(scheduleUnit.id).then(response =>{ + taskDrafts= response.data.results; + }) + + for (const taskName in tasks) { + const task = tasks[taskName]; + const taskDraft = taskDrafts.find(taskD => taskD.name === taskName); + if (taskDraft) { + task.specifications_doc = taskDraft.specifications_doc; + } + //Resolve task from the strategy template + const $taskRefs = await $RefParser.resolve(task); + + // Identify the task specification template of every task in the strategy template + const taskTemplate = _.find(this.taskTemplates, {'name': task['specifications_template']}); + schema['$id'] = taskTemplate.schema['$id']; + schema['$schema'] = taskTemplate.schema['$schema']; + let index = 0; + for (const param of observStrategy.template.parameters) { + if (param.refs[0].indexOf(`/tasks/${taskName}`) > 0) { + tasksToUpdate[taskName] = taskName; + // Resolve the identified template + const $templateRefs = await $RefParser.resolve(taskTemplate); + let property = { }; + let tempProperty = null; + const taskPaths = param.refs[0].split("/"); + // Get the property type from the template and create new property in the schema for the parameters + try { + const parameterRef = param.refs[0]; + tempProperty = $templateRefs.get(parameterRef); + } catch(error) { + tempProperty = _.cloneDeep(taskTemplate.schema.properties[taskPaths[4]]); + if (tempProperty.type === 'array') { + tempProperty = tempProperty.items.properties[taskPaths[6]]; + } + property = tempProperty; + } + property.title = param.name; + property.default = $taskRefs.get(param.refs[0].replace(`#/tasks/${taskName}`, '#')); + paramsOutput[`param_${index}`] = property.default; + schema.properties[`param_${index}`] = property; + // Set property defintions taken from the task template in new schema + for (const definitionName in taskTemplate.schema.definitions) { + schema.definitions[definitionName] = taskTemplate.schema.definitions[definitionName]; + } + } + index++; + } + } + return schema; + } + /** * Function to prepare ag-grid row data. @@ -391,15 +400,11 @@ export class SchedulingSetCreate extends Component { if(this.state.schedulingUnitList.length===0){ return; } - // const observStrategy = _.find(this.observStrategies, {'id': this.state.schedulingUnitList[0].observation_strategy_template_id}); - // this.setState({observStrategy: observStrategy}); - this.tmpRowData = []; let totalSU = this.state.noOfSU; let paramsOutput = {}; //refresh column header - await this.createGridColums(this.state.schedulingUnitList[0]); - + await this.createGridColumns(this.state.schedulingUnitList[0]); let observationPropsList = []; for(const scheduleunit of this.state.schedulingUnitList){ let observationProps = { @@ -467,7 +472,7 @@ export class SchedulingSetCreate extends Component { * @param {Stirng} cell -> contains Row ID, Column Name, Value, isDegree */ async updateAngle(rowIndex, field, value, isDegree, isValid){ - let row = this.state.rowData[rowIndex] + let row = this.state.rowData[rowIndex]; row[field] = value; row['isValid'] = isValid; //Convertverted value for Angle 1 & 2, set in SU Row @@ -477,10 +482,37 @@ export class SchedulingSetCreate extends Component { await this.setState({ rowData: tmpRowData }); - // console.log('rowdata', this.state.rowData) + } - + /** + * Read Data from clipboard + */ + async readClipBoard(){ + try{ + const queryOpts = { name: 'clipboard-read', allowWithoutGesture: true }; + const permissionStatus = await navigator.permissions.query(queryOpts); + let data = await navigator.clipboard.readText(); + return data; + }catch(err){ + console.log("Error",err); + } + } + + /** + * Check the content is JSON format + * @param {*} jsonData + */ + async isJsonData(jsonData){ + try{ + let jsonObj = JSON.parse(jsonData); + return true; + }catch(err){ + console.log("error :",err) + return false; + } + } + /** * Copy data to/from clipboard * @param {*} e @@ -490,42 +522,77 @@ export class SchedulingSetCreate extends Component { var ctrl = e.ctrlKey ? e.ctrlKey : ((key === 17) ? true : false); // ctrl detection if ( key == 86 && ctrl ) { // Ctrl+V - let emptyRow = this.state.emptyRow; this.tmpRowData = this.state.rowData; let dataRowCount = this.state.totalCount; - for(const row of this.state.clipboard){ - let copyRow = _.cloneDeep(row); - copyRow['id'] = 0; - this.tmpRowData[dataRowCount] = copyRow; - dataRowCount++; - } - - let tmpNoOfSU= this.state.noOfSU; - if(dataRowCount >= tmpNoOfSU){ - tmpNoOfSU = dataRowCount+10; - //Create additional empty row at the end - for(let i= this.tmpRowData.length; i<=tmpNoOfSU; i++){ - this.tmpRowData.push(emptyRow); + try { + let clipboardData = ''; + try{ + //Read Clipboard Data + clipboardData = await this.readClipBoard(); + }catch(err){ + console.log("error :",err); + } + if(clipboardData){ + let suGridRowData= this.state.emptyRow; + clipboardData = _.trim(clipboardData); + let suRows = clipboardData.split("\n"); + suRows.forEach(line =>{ + let colCount = 0; + suGridRowData ={}; + let suRow = line.split("\t"); + suGridRowData['id']= 0; + suGridRowData['isValid']= true; + for(const key of this.state.colKeyOrder){ + suGridRowData[key]= suRow[colCount]; + colCount++; + } + this.tmpRowData[dataRowCount]= (suGridRowData); + dataRowCount++ + }) + } + let emptyRow = this.state.emptyRow; + let tmpNoOfSU= this.state.noOfSU; + if(dataRowCount >= tmpNoOfSU){ + tmpNoOfSU = dataRowCount+10; + //Create additional empty row at the end + for(let i= this.tmpRowData.length; i<=tmpNoOfSU; i++){ + this.tmpRowData.push(emptyRow); + } } - } - await this.setState({ - rowData: this.tmpRowData, - noOfSU: this.tmpRowData.length, - totalCount: dataRowCount, - }) + await this.setState({ + rowData: this.tmpRowData, + noOfSU: this.tmpRowData.length, + totalCount: dataRowCount, + }) + + this.state.gridApi.setRowData(this.state.rowData); + this.state.gridApi.redrawRows(); - this.state.gridApi.setRowData(this.state.rowData) - this.state.gridApi.redrawRows(); + }catch (err) { + console.error('Error: ', err); + } } else if ( key == 67 && ctrl ) { - //Ctrl+C = Store the data into local state + //Ctrl+C var selectedRows = this.state.gridApi.getSelectedRows(); - this.setState({ - clipboard : selectedRows - }) + let clipboardData = ''; + for(const rowData of selectedRows){ + var line = ''; + for(const key of this.state.colKeyOrder){ + line += rowData[key] + '\t'; + } + line = _.trim(line); + clipboardData += line + '\r\n'; + } + clipboardData = _.trim(clipboardData); + const queryOpts = { name: 'clipboard-write', allowWithoutGesture: true }; + await navigator.permissions.query(queryOpts); + await navigator.clipboard.writeText(clipboardData); } } + + /** * Function to create Scheduling unit */ @@ -571,12 +638,11 @@ export class SchedulingSetCreate extends Component { index++; } if(!validRow){ - continue + continue; } observStrategy.template.parameters.forEach(async(param, index) => { $refs.set(observStrategy.template.parameters[index]['refs'][0], paramsOutput['param_' + index]); }); - if(suRow.id >0 && suRow.suname.length>0 && suRow.sudesc.length>0){ newSU = _.find(this.state.schedulingUnitList, {'id': suRow.id}); newSU['name'] = suRow.suname; @@ -597,14 +663,8 @@ export class SchedulingSetCreate extends Component { if((newSUCount+existingSUCount)>0){ const dialog = {header: 'Success', detail: '['+newSUCount+'] Scheduling Units are created & ['+existingSUCount+'] Scheduling Units are updated successfully.'}; this.setState({ dialogVisible: true, dialog: dialog}); - /* let schedulingUnitList= await ScheduleService.getSchedulingBySet(this.state.selectedSchedulingSetId); - schedulingUnitList = _.filter(schedulingUnitList,{'observation_strategy_template_id': this.state.observStrategy.id}) ; - this.setState({ - schedulingUnitList: schedulingUnitList - }) - this.prepareScheduleUnitListForGrid();*/ }else{ - this.growl.show({severity: 'error', summary: 'Warring', detail: 'Scheduling Units are not create/update '}); + this.growl.show({severity: 'error', summary: 'Warning', detail: 'No Scheduling Units create/update '}); } }catch(err){ this.growl.show({severity: 'error', summary: 'Error Occured', detail: 'Unable to create/update Scheduling Units'}); @@ -612,19 +672,18 @@ export class SchedulingSetCreate extends Component { } /** - * Refresh the grid with updated data, it helps to make next update to make immediatly for the same filter + * Refresh the grid with updated data */ async reset() { - //this.setState({dialogVisible: false}); let schedulingUnitList= await ScheduleService.getSchedulingBySet(this.state.selectedSchedulingSetId); - schedulingUnitList = _.filter(schedulingUnitList,{'observation_strategy_template_id': this.state.observStrategy.id}) ; - this.setState({ - schedulingUnitList: schedulingUnitList, - dialogVisible: false - }) - await this.prepareScheduleUnitListForGrid(); - this.state.gridApi.setRowData(this.state.rowData) - this.state.gridApi.redrawRows(); + schedulingUnitList = _.filter(schedulingUnitList,{'observation_strategy_template_id': this.state.observStrategy.id}) ; + this.setState({ + schedulingUnitList: schedulingUnitList, + dialogVisible: false + }) + await this.prepareScheduleUnitListForGrid(); + this.state.gridApi.setRowData(this.state.rowData); + this.state.gridApi.redrawRows(); } /** @@ -634,15 +693,16 @@ export class SchedulingSetCreate extends Component { this.setState({redirect: '/schedulingunit'}); } - onGridReady (params) { - this.setState({ + async onGridReady (params) { + await this.setState({ gridApi:params.api, gridColumnApi:params.columnApi, }) + this.state.gridApi.hideOverlay(); } async setNoOfSUint(value){ - if(value>=0 && value<501){ + if(value >= 0 && value < 501){ await this.setState({ noOfSU: value }) @@ -701,7 +761,8 @@ export class SchedulingSetCreate extends Component { validateEditor() { return this.validEditor?true:false; } - + + render() { if (this.state.redirect) { return <Redirect to={ {pathname: this.state.redirect} }></Redirect> @@ -772,27 +833,27 @@ export class SchedulingSetCreate extends Component { </div> </div> <> - <div className="ag-theme-alpine" style={ { height: '500px', marginBottom: '10px' } } onKeyDown={this.clipboardEvent}> - <AgGridReact - suppressClipboardPaste={false} - columnDefs={this.state.columnDefs} - columnTypes={this.state.columnTypes} - defaultColDef={this.state.defaultColDef} - rowSelection={this.state.rowSelection} - onGridReady={this.onGridReady} - rowData={this.state.rowData} - frameworkComponents={this.state.frameworkComponents} - context={this.state.context} - components={this.state.components} - modules={this.state.modules} - enableRangeSelection={true} - rowSelection={this.state.rowSelection} - - //onSelectionChanged={this.onSelectionChanged.bind(this)} - > - - </AgGridReact> - </div> + {this.state.observStrategy.id && + <div className="ag-theme-alpine" style={ { height: '500px', marginBottom: '10px' } } onKeyDown={this.clipboardEvent}> + <AgGridReact + suppressClipboardPaste={false} + columnDefs={this.state.columnDefs} + columnTypes={this.state.columnTypes} + defaultColDef={this.state.defaultColDef} + rowSelection={this.state.rowSelection} + onGridReady={this.onGridReady} + rowData={this.state.rowData} + frameworkComponents={this.state.frameworkComponents} + context={this.state.context} + components={this.state.components} + modules={this.state.modules} + enableRangeSelection={true} + rowSelection={this.state.rowSelection} + > + + </AgGridReact> + </div> + } </> <div className="p-grid p-justify-start"> <div className="p-col-1"> diff --git a/SAS/TMSS/frontend/tmss_webapp/src/services/schedule.service.js b/SAS/TMSS/frontend/tmss_webapp/src/services/schedule.service.js index 53b26626b6667fb8d0b526c0f6d28a6f63ab5768..dbd8f51805d1772f2234904d34787d1ed23cf5ca 100644 --- a/SAS/TMSS/frontend/tmss_webapp/src/services/schedule.service.js +++ b/SAS/TMSS/frontend/tmss_webapp/src/services/schedule.service.js @@ -237,9 +237,12 @@ const ScheduleService = { if (schedulingUnit && schedulingUnit.id) { // Update the newly created SU draft requirement_doc with captured parameter values schedulingUnit.requirements_doc = observStrategy.template; - schedulingUnit.scheduling_constraints_doc = constraint.scheduling_constraints_doc; - schedulingUnit.scheduling_constraints_template_id = constraint.id; - schedulingUnit.scheduling_constraints_template = constraint.constraint.url; + if(constraint){ + schedulingUnit.scheduling_constraints_doc = constraint.scheduling_constraints_doc; + schedulingUnit.scheduling_constraints_template_id = constraint.id; + schedulingUnit.scheduling_constraints_template = constraint.constraint.url; + } + delete schedulingUnit['duration']; schedulingUnit = await this.updateSchedulingUnitDraft(schedulingUnit); if (!schedulingUnit || !schedulingUnit.id) { diff --git a/SAS/TMSS/services/CMakeLists.txt b/SAS/TMSS/services/CMakeLists.txt index 5e89c9c4e37d3ae53fe078e2c3c367e03d438ba5..b1cdad1bc8906d3ba0302fe6c867a6eb8bff9df1 100644 --- a/SAS/TMSS/services/CMakeLists.txt +++ b/SAS/TMSS/services/CMakeLists.txt @@ -1,3 +1,4 @@ lofar_add_package(TMSSSubtaskSchedulingService subtask_scheduling) lofar_add_package(TMSSFeedbackHandlingService feedback_handling) +lofar_add_package(TMSSPostgresListenerService tmss_postgres_listener) diff --git a/SAS/TMSS/services/feedback_handling/test/t_feedback_handling_service.py b/SAS/TMSS/services/feedback_handling/test/t_feedback_handling_service.py index b3fe3a8361596c851e50b5a364d663532c2ef03b..4a414858756a8f417a77918681ab334609911369 100755 --- a/SAS/TMSS/services/feedback_handling/test/t_feedback_handling_service.py +++ b/SAS/TMSS/services/feedback_handling/test/t_feedback_handling_service.py @@ -25,7 +25,6 @@ import logging logger = logging.getLogger(__name__) from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment -from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import * from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator from lofar.sas.tmss.services.feedback_handling import TMSSFeedbackListener diff --git a/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py b/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py index 09fb330ef2e33507df236374fea799e342ee72c2..524a616a86fa35fca2351278a1d69b1df46d882f 100644 --- a/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py +++ b/SAS/TMSS/services/subtask_scheduling/lib/subtask_scheduling.py @@ -36,7 +36,7 @@ logger = logging.getLogger(__name__) from lofar.sas.tmss.client.tmssbuslistener import * from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession -class TMSSSubTaskSchedulingEventMessageHandler(TMSSSubTaskEventMessageHandler): +class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler): ''' ''' def __init__(self, tmss_client_credentials_id: str=None): @@ -51,39 +51,34 @@ class TMSSSubTaskSchedulingEventMessageHandler(TMSSSubTaskEventMessageHandler): super().stop_handling() self.tmss_client.close() - def onSubTaskFinished(self, subtask_id: int, old_state: str, new_state:str): - '''onSubTaskFinished is called upon received a SubTaskFinished message, which is sent when a SubTasks changes state to "Finished". - :param subtask_id: the TMSS id of the SubTask - :param old_state: the previous state of the SubTask - :param new_state: the new state of the SubTask - ''' - logger.info("subtask %s finished. Trying to schedule defined successor subtasks...", subtask_id) - - successors = self.tmss_client.get_subtask_successors(subtask_id, state="defined") - successor_ids = [s['id'] for s in successors] - - logger.info("subtask %s finished. trying to schedule defined successors: %s", - subtask_id, - ', '.join(str(id) for id in successor_ids) or 'None') - - for successor in successors: - try: - suc_subtask_id = successor['id'] - suc_subtask_state = successor['state_value'] - - if suc_subtask_state == "defined": - logger.info("trying to schedule successor subtask %s for finished subtask %s", suc_subtask_id, subtask_id) - scheduled_successor = self.tmss_client.schedule_subtask(suc_subtask_id) - suc_subtask_state = scheduled_successor['state_value'] - logger.info("successor subtask %s for finished subtask %s now has state '%s', see %s", suc_subtask_id, subtask_id, suc_subtask_state, scheduled_successor['url']) - else: - logger.warning("skipping scheduling of successor subtask %s for finished subtask %s because its state is '%s'", suc_subtask_id, subtask_id, suc_subtask_state) - - except Exception as e: - logger.error(e) + def onSubTaskStatusChanged(self, id: int, status: str): + super().onSubTaskStatusChanged(id, status) + + if status == "finished": + successors = self.tmss_client.get_subtask_successors(id, state="defined") + successor_ids = sorted([s['id'] for s in successors]) + + logger.info("subtask %s finished. trying to schedule defined successors: %s", + id, ', '.join(str(id) for id in successor_ids) or 'None') + + for successor in successors: + try: + suc_subtask_id = successor['id'] + suc_subtask_state = successor['state_value'] + + if suc_subtask_state == "defined": + logger.info("trying to schedule successor subtask %s for finished subtask %s", suc_subtask_id, id) + scheduled_successor = self.tmss_client.schedule_subtask(suc_subtask_id) + suc_subtask_state = scheduled_successor['state_value'] + logger.info("successor subtask %s for finished subtask %s now has state '%s', see %s", suc_subtask_id, id, suc_subtask_state, scheduled_successor['url']) + else: + logger.warning("skipping scheduling of successor subtask %s for finished subtask %s because its state is '%s'", suc_subtask_id, id, suc_subtask_state) + + except Exception as e: + logger.error(e) def create_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, tmss_client_credentials_id: str=None): - return TMSSSubTaskBusListener(handler_type=TMSSSubTaskSchedulingEventMessageHandler, + return TMSSBusListener(handler_type=TMSSSubTaskSchedulingEventMessageHandler, handler_kwargs={'tmss_client_credentials_id': tmss_client_credentials_id}, exchange=exchange, broker=broker) diff --git a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py index 1027b12e91d8d22686a32da76d8c126136d6c8a3..84d85d879019b0a5d09832d7cf5815f53ef12a2b 100755 --- a/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py +++ b/SAS/TMSS/services/subtask_scheduling/test/t_subtask_scheduling_service.py @@ -22,9 +22,9 @@ import uuid import logging logger = logging.getLogger(__name__) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment -from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import * from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor @@ -49,7 +49,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): cls.tmss_test_env.start() cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url, - (tmss_test_env.ldap_server.dbcreds.user, tmss_test_env.ldap_server.dbcreds.password)) + (cls.tmss_test_env.ldap_server.dbcreds.user, cls.tmss_test_env.ldap_server.dbcreds.password)) @classmethod def tearDownClass(cls) -> None: @@ -109,8 +109,6 @@ class TestSubtaskSchedulingService(unittest.TestCase): # subtask2 should now be scheduled self.assertEqual(subtask2['state_value'], 'scheduled') -logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - if __name__ == '__main__': #run the unit tests unittest.main() diff --git a/SAS/TMSS/services/tmss_postgres_listener/CMakeLists.txt b/SAS/TMSS/services/tmss_postgres_listener/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c500272b175b8952e8f6b1d3ff632fc642205b7a --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/CMakeLists.txt @@ -0,0 +1,8 @@ +lofar_package(TMSSPostgresListenerService 0.1 DEPENDS TMSSClient PyCommon PyMessaging) + +lofar_find_package(PythonInterp 3.4 REQUIRED) + +add_subdirectory(lib) +add_subdirectory(bin) +add_subdirectory(test) + diff --git a/SAS/TMSS/services/tmss_postgres_listener/bin/CMakeLists.txt b/SAS/TMSS/services/tmss_postgres_listener/bin/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..5bd20e76ba68d1a537069fe468b92fc0763ae93b --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/bin/CMakeLists.txt @@ -0,0 +1,4 @@ +lofar_add_bin_scripts(tmss_postgres_listener_service) + +# supervisord config files +lofar_add_sysconf_files(tmss_postgres_listener_service.ini DESTINATION supervisord.d) diff --git a/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service b/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service new file mode 100755 index 0000000000000000000000000000000000000000..6f4ba61d8055d8e1170f3914bd766a16659b509d --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service @@ -0,0 +1,24 @@ +#!/usr/bin/python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + + +from lofar.sas.tmss.services.tmss_postgres_listener import main + +if __name__ == "__main__": + main() diff --git a/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service.ini b/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service.ini new file mode 100644 index 0000000000000000000000000000000000000000..3564a30c1f84cba26814a90a3dd1cc2366db65d3 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/bin/tmss_postgres_listener_service.ini @@ -0,0 +1,9 @@ +[program:tmss_pglistener_service] +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec tmss_pglistener_service' +user=lofarsys +stopsignal=INT ; KeyboardInterrupt +stopasgroup=true ; bash does not propagate signals +stdout_logfile=%(program_name)s.log +redirect_stderr=true +stderr_logfile=NONE +stdout_logfile_maxbytes=0 diff --git a/SAS/TMSS/services/tmss_postgres_listener/lib/CMakeLists.txt b/SAS/TMSS/services/tmss_postgres_listener/lib/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c438ae81c5d294ec3159cfa7b60d837678020c37 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/lib/CMakeLists.txt @@ -0,0 +1,10 @@ +lofar_find_package(PythonInterp 3.4 REQUIRED) +include(PythonInstall) + +set(_py_files + tmss_postgres_listener.py + ) + +python_install(${_py_files} + DESTINATION lofar/sas/tmss/services) + diff --git a/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py b/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py new file mode 100644 index 0000000000000000000000000000000000000000..3cf20c24ec7ed26321f2c8acc85e09a14961b6eb --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/lib/tmss_postgres_listener.py @@ -0,0 +1,237 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import logging +logger = logging.getLogger(__name__) + +from optparse import OptionParser, OptionGroup +import json + +from lofar.common.postgres import PostgresListener, makePostgresNotificationQueries +from lofar.messaging.messagebus import ToBus +from lofar.sas.tmss.client.tmssbuslistener import * +from lofar.common import dbcredentials +from lofar.common.util import single_line_with_single_spaces + +class TMSSPGListener(PostgresListener): + '''This class subscribes to the Subtask, TaskDraft/Blueprint & SchedulingUnitDraft/Blueprint tables in the TMSS database + and send EventMessages upon each table row action, *Created, *Updated, *Deleted, and for each status update. + See lofar.sas.tmss.client.tmssbuslistener.TMSSBusListener for the receiving BusListener''' + def __init__(self, + dbcreds, + exchange=DEFAULT_BUSNAME, + broker=DEFAULT_BROKER): + super().__init__(dbcreds=dbcreds) + self.event_bus = ToBus(exchange=exchange, broker=broker) + + def start(self): + logger.info("Starting to listen for TMSS database changes and publishing EventMessages on %s db: %s", self.event_bus.exchange, self._dbcreds.stringWithHiddenPassword()) + self.event_bus.open() + + # SubTask + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'insert')) + self.subscribe('tmssapp_subtask_insert', self.onSubTaskInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'update')) + self.subscribe('tmssapp_subtask_update', self.onSubTaskUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'delete')) + self.subscribe('tmssapp_subtask_delete', self.onSubTaskDeleted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_subtask', 'update', 'state_id')) + self.subscribe('tmssapp_subtask_update_column_state_id', self.onSubTaskStateUpdated) + + + # TaskBlueprint + # please note that the status property in the TaskBlueprint model is derived from the subtasks + # hence we cannot create a postgres notification for this "column". + # But, see how onSubTaskStateUpdated does send a TaskBlueprint status changed event as well for its parent. + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'insert')) + self.subscribe('tmssapp_taskblueprint_insert', self.onTaskBlueprintInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'update')) + self.subscribe('tmssapp_taskblueprint_update', self.onTaskBlueprintUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskblueprint', 'delete')) + self.subscribe('tmssapp_taskblueprint_delete', self.onTaskBlueprintDeleted) + + + # TaskDraft + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskdraft', 'insert')) + self.subscribe('tmssapp_taskdraft_insert', self.onTaskDraftInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskdraft', 'update')) + self.subscribe('tmssapp_taskdraft_update', self.onTaskDraftUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_taskdraft', 'delete')) + self.subscribe('tmssapp_taskdraft_delete', self.onTaskDraftDeleted) + + + # SchedulingUnitBlueprint + # please note that the status property in the SchedulingUnitBlueprint model is derived from the tasks, and these are derived from the subtasks + # hence we cannot create a postgres notification for this "column". + # But, see how onSubTaskStateUpdated does send a SchedulingUnitBlueprint status changed event as well for its parent. + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'insert')) + self.subscribe('tmssapp_schedulingunitblueprint_insert', self.onSchedulingUnitBlueprintInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'update')) + self.subscribe('tmssapp_schedulingunitblueprint_update', self.onSchedulingUnitBlueprintUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitblueprint', 'delete')) + self.subscribe('tmssapp_schedulingunitblueprint_delete', self.onSchedulingUnitBlueprintDeleted) + + + # SchedulingUnitDraft + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitdraft', 'insert')) + self.subscribe('tmssapp_schedulingunitdraft_insert', self.onSchedulingUnitDraftInserted) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitdraft', 'update')) + self.subscribe('tmssapp_schedulingunitdraft_update', self.onSchedulingUnitDraftUpdated) + + self.executeQuery(makePostgresNotificationQueries('', 'tmssapp_schedulingunitdraft', 'delete')) + self.subscribe('tmssapp_schedulingunitdraft_delete', self.onSchedulingUnitDraftDeleted) + + return super().start() + + def __exit__(self, exc_type, exc_val, exc_tb): + super().stop() + self.event_bus.close() + logger.info("Stopped listening for TMSS database changes and publishing EventMessages on %s broker=%s db: %s", + self.event_bus.exchange, self.event_bus.broker, self._dbcreds.stringWithHiddenPassword()) + + def _sendNotification(self, subject, contentDict): + try: + if isinstance(contentDict, str): + contentDict = json.loads(contentDict) + + msg = EventMessage(subject=subject, content=contentDict) + logger.info('Sending %s to %s: %s', + subject, self.event_bus.exchange, single_line_with_single_spaces(contentDict)) + self.event_bus.send(msg) + except Exception as e: + logger.error(str(e)) + + def onSubTaskInserted(self, payload = None): + self._sendNotification(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onSubTaskUpdated(self, payload = None): + self._sendNotification(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onSubTaskDeleted(self, payload = None): + self._sendNotification(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onSubTaskStateUpdated(self, payload = None): + payload_dict = json.loads(payload) + # send notification for this subtask... + from lofar.sas.tmss.tmss.tmssapp.models.scheduling import Subtask + subtask = Subtask.objects.get(id=payload_dict['id']) + self._sendNotification(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.'+subtask.state.value.capitalize(), + {'id': subtask.id, 'status': subtask.state.value}) + + # ... and also send status change and object update events for the parent task, and schedulingunit, + # because their status is implicitly derived from their subtask(s) + # send both object.updated and status change events + self.onTaskBlueprintUpdated( {'id': subtask.task_blueprint.id}) + self._sendNotification(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.'+subtask.task_blueprint.status.capitalize(), + {'id': subtask.task_blueprint.id, 'status': subtask.task_blueprint.status}) + + self.onSchedulingUnitBlueprintUpdated( {'id': subtask.task_blueprint.scheduling_unit_blueprint.id}) + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.'+subtask.task_blueprint.scheduling_unit_blueprint.status.capitalize(), + {'id': subtask.task_blueprint.scheduling_unit_blueprint.id, 'status': subtask.task_blueprint.scheduling_unit_blueprint.status}) + + def onTaskBlueprintInserted(self, payload = None): + self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onTaskBlueprintUpdated(self, payload = None): + self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onTaskBlueprintDeleted(self, payload = None): + self._sendNotification(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onTaskDraftInserted(self, payload = None): + self._sendNotification(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onTaskDraftUpdated(self, payload = None): + self._sendNotification(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onTaskDraftDeleted(self, payload = None): + self._sendNotification(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onSchedulingUnitBlueprintInserted(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onSchedulingUnitBlueprintUpdated(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onSchedulingUnitBlueprintDeleted(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + def onSchedulingUnitDraftInserted(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Created', payload) + + def onSchedulingUnitDraftUpdated(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Updated', payload) + + def onSchedulingUnitDraftDeleted(self, payload = None): + self._sendNotification(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Deleted', payload) + + +def create_service(dbcreds, exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): + '''create a TMSSPGListener instance''' + return TMSSPGListener(dbcreds=dbcreds, exchange=exchange, broker=broker) + + +def main(): + # make sure we run in UTC timezone + import os + os.environ['TZ'] = 'UTC' + + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + # Check the invocation arguments + parser = OptionParser("%prog [options]", + description='runs the radb postgres listener which listens to changes on some tables in the radb and publishes the changes as notifications on the bus.') + + group = OptionGroup(parser, 'Messaging options') + group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, + help='Address of the message broker, default: %default') + group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, + help="Bus or queue where the TMSS messages are published. [default: %default]") + parser.add_option_group(group) + + parser.add_option_group(dbcredentials.options_group(parser)) + parser.set_defaults(dbcredentials=os.environ.get('TMSS_CLIENT_DBCREDENTIALS', 'TMSS')) + (options, args) = parser.parse_args() + + dbcreds = dbcredentials.parse_options(options) + logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) + + # setup django + os.environ["TMSS_DBCREDENTIALS"] = options.dbcredentials + os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings" + import django + django.setup() + + with create_service(dbcreds=dbcreds, + exchange=options.exchange, + broker=options.broker) as listener: + listener.waitWhileListening() + +if __name__ == '__main__': + main() diff --git a/SAS/TMSS/services/tmss_postgres_listener/test/CMakeLists.txt b/SAS/TMSS/services/tmss_postgres_listener/test/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..36dfcc36e0a541a4c83b491ff3bc577ad1801b32 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/test/CMakeLists.txt @@ -0,0 +1,7 @@ +# $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $ + +if(BUILD_TESTING) + include(LofarCTest) + + lofar_add_test(t_tmss_postgres_listener_service) +endif() diff --git a/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py new file mode 100755 index 0000000000000000000000000000000000000000..b0b847668bf48905701d8e48adbc0273bb416162 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import uuid + +import logging +logger = logging.getLogger(__name__) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + +from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment +from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator + +from lofar.messaging.messagebus import TemporaryExchange +from lofar.sas.tmss.services.tmss_postgres_listener import * +from lofar.common.test_utils import integration_test +from threading import Lock +import requests +import json +from collections import deque +from datetime import datetime, timedelta + +@integration_test +class TestSubtaskSchedulingService(unittest.TestCase): + ''' + Tests for the SubtaskSchedulingService + ''' + @classmethod + def setUpClass(cls) -> None: + cls.TEST_UUID = uuid.uuid1() + + cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID)) + cls.tmp_exchange.open() + + cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, start_postgres_listener=False, populate_schemas=False, populate_test_data=False) + cls.tmss_test_env.start() + + cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url, + (cls.tmss_test_env.ldap_server.dbcreds.user, + cls.tmss_test_env.ldap_server.dbcreds.password)) + + @classmethod + def tearDownClass(cls) -> None: + cls.tmss_test_env.stop() + cls.tmp_exchange.close() + + def test_01_for_expected_behaviour(self): + ''' + This test starts a TMSSPGListener service and TMSS, creates/updates/deletes subtasks/tasks/schedulingunits, and checks if the correct events are sent. + ''' + logger.info(' -- test_01_for_expected_behaviour -- ') + + class TestTMSSPGListener(TMSSPGListener): + '''Helper TMSSPGListener for this test, storing intermediate results, and providing synchronization threading.Events''' + def __init__(self, dbcreds, exchange=self.tmp_exchange.address): + super().__init__(dbcreds, exchange) + self.subjects = deque() + self.contentDicts = deque() + self.lock = Lock() + + def _sendNotification(self, subject, contentDict): + # instead of sending a notification to the messagebus, record the subject and content in queues + # so we can check in the test if the correct subjects are recorded + with self.lock: + logger.info("detected db change: %s %s", subject, single_line_with_single_spaces(contentDict)) + self.subjects.append(subject) + self.contentDicts.append(json.loads(contentDict) if isinstance(contentDict, str) else contentDict) + + # create and start the service (the object under test) + with TestTMSSPGListener(exchange=self.tmp_exchange.address, dbcreds=self.tmss_test_env.database.dbcreds) as service: + # create a SchedulingUnitDraft + su_draft = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingUnitDraft(), '/scheduling_unit_draft/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": su_draft['id']}, service.contentDicts.popleft()) + + + # create a TaskDraft + task_draft = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.TaskDraft(scheduling_unit_draft_url=su_draft['url']), '/task_draft/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": task_draft['id']}, service.contentDicts.popleft()) + + + # create a SchedulingUnitBlueprint + su_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingUnitBlueprint(scheduling_unit_draft_url=su_draft['url']), '/scheduling_unit_blueprint/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": su_blueprint['id']}, service.contentDicts.popleft()) + + + # create a TaskBlueprint + task_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.TaskBlueprint(scheduling_unit_blueprint_url=su_blueprint['url'], + draft_url=task_draft['url']), '/task_blueprint/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": task_blueprint['id']}, service.contentDicts.popleft()) + + + # create a SubTask + subtask = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(task_blueprint_url=task_blueprint['url']), '/subtask/') + + # sync and check + with service.lock: + self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": subtask['id']}, service.contentDicts.popleft()) + + # update subtask status, use a nice tmss_client and the rest api. + with self.tmss_test_env.create_tmss_client() as client: + client.set_subtask_status(subtask['id'], 'scheduled') + + # ugly, but functional. Wait for all status updates: 1 object, 1 status. both per each object (3 types) => total 6 events. + start_wait = datetime.utcnow() + while True: + with service.lock: + if len(service.subjects) == 6: + break + if datetime.utcnow() - start_wait > timedelta(seconds=5): + raise TimeoutError("timeout while waiting for status/object updates") + + # sync and check + with service.lock: + self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX + '.Updated', service.subjects.popleft()) + self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) + self.assertEqual({'id': subtask['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft()) + self.assertEqual({'id': task_blueprint['id']}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) + self.assertEqual({'id': task_blueprint['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft()) + self.assertEqual({'id': su_blueprint['id']}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) + self.assertEqual({'id': su_blueprint['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + + # delete subtask, use direct http delete request on rest api + requests.delete(subtask['url'], auth=self.test_data_creator.auth) + + # sync and check subtask deleted + with service.lock: + self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Deleted', service.subjects.popleft()) + self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft()) + + +if __name__ == '__main__': + #run the unit tests + unittest.main() diff --git a/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.run b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.run new file mode 100755 index 0000000000000000000000000000000000000000..b3b8a82585942bb7a1cfaa3f7ea5f7a9d355c484 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.run @@ -0,0 +1,6 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "*tmss*" t_tmss_postgres_listener_service.py + diff --git a/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.sh b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.sh new file mode 100755 index 0000000000000000000000000000000000000000..600f72e660b1a47338192230ea4131140c7c7550 --- /dev/null +++ b/SAS/TMSS/services/tmss_postgres_listener/test/t_tmss_postgres_listener_service.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_tmss_postgres_listener_service \ No newline at end of file diff --git a/SAS/TMSS/src/tmss/settings.py b/SAS/TMSS/src/tmss/settings.py index 909caac96ed6807ff88769ebf39379506f2ec40c..7f160668b40ac7164efdfaea77f44fb018e32d7d 100644 --- a/SAS/TMSS/src/tmss/settings.py +++ b/SAS/TMSS/src/tmss/settings.py @@ -137,7 +137,6 @@ DEBUG_TOOLBAR_CONFIG = { MIDDLEWARE = [ 'django.middleware.gzip.GZipMiddleware', - 'debug_toolbar.middleware.DebugToolbarMiddleware', 'django.middleware.security.SecurityMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.middleware.common.CommonMiddleware', @@ -355,9 +354,3 @@ SWAGGER_SETTINGS = { }, } - -# TODO Do I need distinguish more between Test and Production Environment?? -# maybe a local file in Development environment for test purposes -SCU = "http://scu199" if isDevelopmentEnvironment() or isTestEnvironment() else "http://scu001" -PIPELINE_SUBTASK_LOG_URL = SCU + ".control.lofar:7412/tasks/%s/log.html" -OBSERVATION_SUBTASK_LOG_URL = "https://proxy.lofar.eu/inspect/%s/rtcp-%s.errors" diff --git a/SAS/TMSS/src/tmss/tmssapp/conversions.py b/SAS/TMSS/src/tmss/tmssapp/conversions.py index ee8f35b3770cfba1683ad3a2822949a6f6dabe60..ce112f7b30b8f697baf91d4da9202899703715ba 100644 --- a/SAS/TMSS/src/tmss/tmssapp/conversions.py +++ b/SAS/TMSS/src/tmss/tmssapp/conversions.py @@ -3,6 +3,60 @@ import astropy.units from lofar.lta.sip import station_coordinates from datetime import datetime from astropy.coordinates.earth import EarthLocation +from astropy.coordinates import Angle +from astroplan.observer import Observer + + +def create_astroplan_observer_for_station(station: str) -> Observer: + ''' + returns an astroplan observer for object for a given station, located in the LBA center of the given station + :param station: a station name, e.g. "CS002" + :return: astroplan.observer.Observer object + ''' + + coords = station_coordinates.parse_station_coordinates()["%s_LBA" % station.upper()] + location = EarthLocation.from_geocentric(x=coords['x'], y=coords['y'], z=coords['z'], unit=astropy.units.m) + observer = Observer(location, name="LOFAR", timezone="UTC") + return observer + +# default angle to the horizon at which the sunset/sunrise starts and ends, as per LOFAR definition. +SUN_SET_RISE_ANGLE_TO_HORIZON = Angle(10, unit=astropy.units.deg) + +def timestamps_and_stations_to_sun_rise_and_set(timestamps: [datetime], stations: [str], angle_to_horizon: Angle=SUN_SET_RISE_ANGLE_TO_HORIZON) -> dict: + """ + compute sunrise, sunset, day and night of the given stations at the given timestamps + :param timestamps: list of datetimes, e.g. [datetime(2020, 1, 1), datetime(2020, 1, 2)] + :param stations: list of station names, e.g. ["CS001"] + :return A dict that maps station names to a nested dict that contains lists of start and end times for sunrise, sunset, etc, on each requested date. + E.g. + {"CS001": + { "sunrise": [{"start": (2020, 1, 1, 6, 0, 0)), "end": (2020, 1, 1, 6, 30, 0)}, + {"start": (2020, 1, 2, 6, 0, 0)), "end": (2020, 1, 2, 6, 30, 0)}], + "sunset": [{"start": (2020, 1, 1, 18, 0, 0)), "end": (2020, 1, 1, 18, 30, 0)}, + {"start": (2020, 1, 2, 18, 0, 0)), "end": (2020, 1, 2, 18, 30, 0)}], + "day": [{"start": (2020, 1, 1, 6, 30, 0)), "end": (2020, 1, 1, 18, 00, 0)}, + {"start": (2020, 1, 2, 6, 30, 0)), "end": (2020, 1, 2, 18, 00, 0)}], + "night": [{"start": (2020, 1, 1, 18, 30, 0)), "end": (2020, 1, 2, 6, 0, 0)}, + {"start": (2020, 1, 2, 18,3 0, 0)), "end": (2020, 1, 3, 6, 0, 0)}], + } + } + """ + return_dict = {} + for station in stations: + for timestamp in timestamps: + observer = create_astroplan_observer_for_station(station) + sunrise_start = observer.sun_rise_time(time=Time(timestamp), which='previous') + if sunrise_start.to_datetime().date() < timestamp.date(): + sunrise_start = observer.sun_rise_time(time=Time(timestamp), horizon=-angle_to_horizon, which='next') + sunrise_end = observer.sun_rise_time(time=Time(timestamp), horizon=angle_to_horizon, which='next') + sunset_start = observer.sun_set_time(time=sunrise_end, horizon=angle_to_horizon, which='next') + sunset_end = observer.sun_set_time(time=sunrise_end, horizon=-angle_to_horizon, which='next') + sunrise_next_start = observer.sun_rise_time(time=sunset_end, horizon=-angle_to_horizon, which='next') + return_dict.setdefault(station, {}).setdefault("sunrise", []).append({"start": sunrise_start.to_datetime(), "end": sunrise_end.to_datetime()}) + return_dict[station].setdefault("sunset", []).append({"start": sunset_start.to_datetime(), "end": sunset_end.to_datetime()}) + return_dict[station].setdefault("day", []).append({"start": sunrise_end.to_datetime(), "end": sunset_start.to_datetime()}) + return_dict[station].setdefault("night", []).append({"start": sunset_end.to_datetime(), "end": sunrise_next_start.to_datetime()}) + return return_dict def local_sidereal_time_for_utc_and_station(timestamp: datetime = None, diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index a6257840944db83aedf0df4fc8667588ecd8a19f..6e58f28a9dcd373dc38be715dd609274e2e6deb1 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -18,10 +18,6 @@ from django.db.models.expressions import RawSQL from django.core.exceptions import ValidationError from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException -from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging.messages import EventMessage -from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX -from lofar.common.util import single_line_with_single_spaces from django.conf import settings from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC import uuid @@ -170,17 +166,6 @@ class Subtask(BasicCommon): # keep original state for logging self.__original_state_id = self.state_id - @staticmethod - def _send_state_change_event_message(subtask_id:int, old_state: str, new_state: str): - with ToBus(exchange=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), - broker=os.environ.get("TMSS_BROKER", DEFAULT_BROKER)) as tobus: #TODO: do we want to connect to the bus for each new message, or have some global tobus? - msg = EventMessage(subject="%s.%s" % (DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX, new_state.capitalize()), - content={'subtask_id': subtask_id, 'old_state': old_state, 'new_state': new_state}) - address = tobus.remote_address - logger.info("Sending message with subject '%s' to exchange='%s' on broker=%s:%s content: %s", - msg.subject, tobus.exchange, address[0], address[1], single_line_with_single_spaces(msg.content)) - tobus.send(msg) - @property def successors(self) -> QuerySet: '''return the connect successor subtask(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets) @@ -242,39 +227,9 @@ class Subtask(BasicCommon): user=self.created_or_updated_by_user, user_identifier=identifier) log_entry.save() - try: - self._send_state_change_event_message(self.id, log_entry.old_state.value, log_entry.new_state.value) - except Exception as e: - logger.error("Could not send state change to messagebus: %s", e) - # update the previous state value self.__original_state_id = self.state_id - @property - def log_url(self): - """ - Return the link to the pipeline log in case of pipeline or - link to COBALT error log in case of an observation - otherwise just an empty string - """ - if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value: - url = settings.OBSERVATION_SUBTASK_LOG_URL % (self.id, self.id) - elif self.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value: - # Get RADBID, subtask must be at least 'scheduled' to exist in radb - # If RA is not started don't wait longer than 10 seconds - with RADBRPC.create(timeout=10) as radbrpc: - try: - radb_id = radbrpc.getTask(tmss_id=self.id) - except: - radb_id = None - if radb_id is None: - url = "not available (missing radbid)" - else: - url = settings.PIPELINE_SUBTASK_LOG_URL % radb_id['id'] - else: - url = "" - return url - class SubtaskStateLog(BasicCommon): """ diff --git a/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py index a2a10449daef88deacd7eadf8ae421e0e8891bf9..85d7bd21c54ca2ad78badd911131847c11fb3375 100644 --- a/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/serializers/scheduling.py @@ -85,7 +85,7 @@ class SubtaskSerializer(RelationalHyperlinkedModelSerializer): class Meta: model = models.Subtask fields = '__all__' - extra_fields = ['cluster_value', 'log_url'] + extra_fields = ['cluster_value'] class SubtaskInputSerializer(RelationalHyperlinkedModelSerializer): diff --git a/SAS/TMSS/src/tmss/tmssapp/views.py b/SAS/TMSS/src/tmss/tmssapp/views.py index cf57dc6832f0f7340d7483d8789b2f0b2b2e12b8..851a625197765c401e1cc54db50c4b33d986b2e7 100644 --- a/SAS/TMSS/src/tmss/tmssapp/views.py +++ b/SAS/TMSS/src/tmss/tmssapp/views.py @@ -4,15 +4,17 @@ from django.http import HttpResponse, JsonResponse, Http404 from django.shortcuts import get_object_or_404, render from lofar.sas.tmss.tmss.tmssapp import models from lofar.common.json_utils import get_default_json_object_for_schema +from lofar.common.datetimeutils import formatDatetime from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset from drf_yasg.utils import swagger_auto_schema +from drf_yasg.openapi import Parameter from rest_framework.permissions import AllowAny from rest_framework.decorators import authentication_classes, permission_classes from django.apps import apps from datetime import datetime import dateutil.parser -from lofar.sas.tmss.tmss.tmssapp.conversions import local_sidereal_time_for_utc_and_station, local_sidereal_time_for_utc_and_longitude +from lofar.sas.tmss.tmss.tmssapp.conversions import local_sidereal_time_for_utc_and_station, local_sidereal_time_for_utc_and_longitude, timestamps_and_stations_to_sun_rise_and_set def subtask_template_default_specification(request, subtask_template_pk:int): subtask_template = get_object_or_404(models.SubtaskTemplate, pk=subtask_template_pk) @@ -84,10 +86,24 @@ def get_stations_in_group(request, template_name:str, template_version:str, stat 'stations': stations}) +@permission_classes([AllowAny]) +@authentication_classes([AllowAny]) +@swagger_auto_schema(responses={200: 'An isoformat timestamp of the current UTC clock of the system'}, + operation_description="Get the current system time in UTC") def utc(request): return HttpResponse(datetime.utcnow().isoformat(), content_type='text/plain') - +@permission_classes([AllowAny]) +@authentication_classes([AllowAny]) +@swagger_auto_schema(responses={200: 'The LST time in hms format at the given UTC time and station or longitude'}, + operation_description="Get LST time for UTC time and station or longitude", + manual_parameters=[Parameter(name='station', required=False, type='string', in_='query', + description="A station names (defaults to CS002)"), + Parameter(name='timestamp', required=False, type='string', in_='query', + description="A timestamp in isoformat (defaults to utcnow)"), + Parameter(name='longitude', required=False, type='float', in_='query', + description="A longitude") + ]) def lst(request): # Handling optional parameters via django paths in urls.py is a pain, we access them on the request directly instead. timestamp = request.GET.get('timestamp', None) @@ -109,4 +125,33 @@ def lst(request): lst_lon = local_sidereal_time_for_utc_and_station(timestamp) # todo: do we want to return a dict, so users can make sure their parameters were parsed correctly instead? - return HttpResponse(str(lst_lon), content_type='text/plain') \ No newline at end of file + return HttpResponse(str(lst_lon), content_type='text/plain') + + +@permission_classes([AllowAny]) +@authentication_classes([AllowAny]) +@swagger_auto_schema(responses={200: 'A JSON object with sunrise, sunset, day and night of the given stations at the given timestamps'}, + operation_description="Get sunrise, sunset, day and night for stations and timestamps", + manual_parameters=[Parameter(name='stations', required=False, type='string', in_='query', + description="comma-separated list of station names"), + Parameter(name='timestamps', required=False, type='string', in_='query', + description="comma-separated list of isoformat timestamps")]) +def get_sun_rise_and_set(request): + """ + returns sunrise and sunset at the given stations and timestamps, or today at LOFAR core if none specified. + example request: /api/util/sun_rise_and_set?stations=CS002,CS005×tamps=2020-05-01,2020-09-09T11-11-00 + """ + timestamps = request.GET.get('timestamps', None) + stations = request.GET.get('stations', None) + if timestamps is None: + timestamps = [datetime.utcnow()] + else: + timestamps = timestamps.split(',') + timestamps = [dateutil.parser.parse(timestamp) for timestamp in timestamps] # isot to datetime + if stations is None: + stations = ['CS002'] + else: + stations = stations.split(',') + + return JsonResponse(timestamps_and_stations_to_sun_rise_and_set(timestamps, stations)) + diff --git a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py index 54ed8f42527b6a902201b46b2c6e644b26ce38cc..2bc7b1814e5c667bcdd9fae7bea322e7696cdf82 100644 --- a/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/viewsets/scheduling.py @@ -16,9 +16,10 @@ from drf_yasg.utils import swagger_auto_schema from drf_yasg.inspectors import SwaggerAutoSchema from rest_framework.decorators import action -from django.http import HttpResponse, JsonResponse +from django.http import HttpResponse, JsonResponse, HttpResponseRedirect, HttpResponseNotFound from rest_framework.response import Response as RestResponse +from lofar.common import isProductionEnvironment, isTestEnvironment from lofar.sas.tmss.tmss.tmssapp.viewsets.lofar_viewset import LOFARViewSet from lofar.sas.tmss.tmss.tmssapp import models from lofar.sas.tmss.tmss.tmssapp import serializers @@ -209,6 +210,49 @@ class SubtaskViewSet(LOFARViewSet): return RestResponse(serializer.data) + @swagger_auto_schema(responses={302: 'A redirect url to the task log for this Subtask.', + 403: 'forbidden'}, + operation_description="Get the task log for this Subtask.") + @action(methods=['get'], detail=True) + def task_log(self, request, pk=None): + """ + Return a redirect to the the link to the pipeline log in case of pipeline or + link to COBALT error log in case of an observation. + """ + subtask = get_object_or_404(models.Subtask, pk=pk) + + # redirect to cobalt log served at proxy.lofar.eu + if subtask.specifications_template.type.value == models.SubtaskType.Choices.OBSERVATION.value: + url = "https://proxy.lofar.eu/inspect/%s/rtcp-%s.errors" % (subtask.id, subtask.id) + return HttpResponseRedirect(redirect_to=url) + + # redirect to pipeline log served via webscheduler + if subtask.specifications_template.type.value == models.SubtaskType.Choices.PIPELINE.value: + # import here and not at top of module to "loosen" dependency on external packages, such as in this case the RADB RPC. + from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC + # Get RADBID, subtask must be at least 'scheduled' to exist in radb + try: + with RADBRPC.create(timeout=2) as radbrpc: + radb_id = radbrpc.getTask(tmss_id=subtask.id) + + if radb_id is None: + return HttpResponseNotFound( + content='No RADB task found for subtask id=%s type="%s status=%s". Cannot redirect to pipeline log.' % ( + subtask.id, subtask.specifications_template.type.value, subtask.state)) + + WEBSCHEDULER_URL = "http://scu001.control.lofar:7412" if isProductionEnvironment() else \ + "http://scu199.control.lofar:7412" if isTestEnvironment() else \ + "http://localhost:7412" + + url = "%s/tasks/%s/log.html" % (WEBSCHEDULER_URL, radb_id) + return HttpResponseRedirect(redirect_to=url) + except Exception as e: + return HttpResponseNotFound(content='No RADB task found for subtask id=%s type="%s". Cannot redirect to pipeline log.' % (subtask.id, subtask.specifications_template.type.value)) + + # unknown log + return HttpResponseNotFound(content='No log (url) available for subtask id=%s type="%s"' % (subtask.id, subtask.specifications_template.type.value) ) + + @swagger_auto_schema(responses={200: 'The input dataproducts of this subtask.', 403: 'forbidden'}, operation_description="Get the input dataproducts of this subtask.") diff --git a/SAS/TMSS/src/tmss/urls.py b/SAS/TMSS/src/tmss/urls.py index 376c1c83ba457c81f98901a262b286e8db94c52b..781e6af696a5bc3f0827c84b8c60286fa898112f 100644 --- a/SAS/TMSS/src/tmss/urls.py +++ b/SAS/TMSS/src/tmss/urls.py @@ -65,6 +65,7 @@ urlpatterns = [ path('schemas/<str:template>/<str:name>/<str:version>/', views.get_template_json_schema, name='get_template_json_schema'), path('station_groups/<str:template_name>/<str:template_version>/<str:station_group>', views.get_stations_in_group, name='get_stations_in_group'), #TODO: how to make trailing slash optional? path('station_groups/<str:template_name>/<str:template_version>/<str:station_group>/', views.get_stations_in_group, name='get_stations_in_group'), + path('util/sun_rise_and_set', views.get_sun_rise_and_set, name='get_sun_rise_and_set'), path(r'util/utc', views.utc, name="system-utc"), path(r'util/lst', views.lst, name="conversion-lst"), ] diff --git a/SAS/TMSS/test/t_conversions.py b/SAS/TMSS/test/t_conversions.py index ccd4025f6c4c21a43d63f5ccb6a55c3b764f0963..14231c4f091c04b1f3c53b971bbf069555e6000f 100755 --- a/SAS/TMSS/test/t_conversions.py +++ b/SAS/TMSS/test/t_conversions.py @@ -26,6 +26,7 @@ import logging import requests import dateutil.parser import astropy.coordinates +import json logger = logging.getLogger(__name__) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) @@ -127,6 +128,43 @@ class UtilREST(unittest.TestCase): lon_str2 = r2.content.decode('utf8') self.assertNotEqual(lon_str1, lon_str2) + def test_util_sun_rise_and_set_returns_json_structure_with_defaults(self): + r = requests.get(BASE_URL + '/util/sun_rise_and_set', auth=AUTH) + self.assertEqual(r.status_code, 200) + r_dict = json.loads(r.content.decode('utf-8')) + + # assert defaults to core and today + self.assertIn('CS002', r_dict.keys()) + sunrise_start = dateutil.parser.parse(r_dict['CS002']['sunrise'][0]['start']) + self.assertEqual(datetime.date.today(), sunrise_start.date()) + + def test_util_sun_rise_and_set_considers_stations(self): + stations = ['CS005', 'RS305', 'DE609'] + r = requests.get(BASE_URL + '/util/sun_rise_and_set?stations=%s' % ','.join(stations), auth=AUTH) + self.assertEqual(r.status_code, 200) + r_dict = json.loads(r.content.decode('utf-8')) + + # assert station is included in response and timestamps differ + sunset_start_last = None + for station in stations: + self.assertIn(station, r_dict.keys()) + sunset_start = dateutil.parser.parse(r_dict[station]['sunset'][0]['start']) + if sunset_start_last: + self.assertNotEqual(sunset_start, sunset_start_last) + sunset_start_last = sunset_start + + def test_util_sun_rise_and_set_considers_timestamps(self): + timestamps = ['2020-01-01', '2020-02-22T16-00-00', '2020-3-11', '2020-01-01'] + r = requests.get(BASE_URL + '/util/sun_rise_and_set?timestamps=%s' % ','.join(timestamps), auth=AUTH) + self.assertEqual(r.status_code, 200) + r_dict = json.loads(r.content.decode('utf-8')) + + # assert all requested timestamps are included in response (sunrise on same day) + for i in range(len(timestamps)): + expected_date = dateutil.parser.parse(timestamps[i]).date() + response_date = dateutil.parser.parse(r_dict['CS002']['sunrise'][i]['start']).date() + self.assertEqual(expected_date, response_date) + if __name__ == "__main__": os.environ['TZ'] = 'UTC' diff --git a/SAS/TMSS/test/t_subtasks.py b/SAS/TMSS/test/t_subtasks.py index 4f40e9c53b8c0857430c032a9df3d08848f517b1..b9021a86f94d25f5fcccd620daf7705c07c8d88e 100755 --- a/SAS/TMSS/test/t_subtasks.py +++ b/SAS/TMSS/test/t_subtasks.py @@ -34,12 +34,7 @@ tmss_test_env.populate_schemas() from lofar.sas.tmss.test.tmss_test_data_django_models import * - -# import and setup rest test data creator -from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator - from lofar.sas.tmss.tmss.tmssapp import models - from lofar.sas.tmss.tmss.tmssapp.subtasks import * @@ -347,6 +342,31 @@ class SubtaskInputSelectionFilteringTest(unittest.TestCase): selection = {'sap': ['target0'], 'is_relevant': True} self.assertFalse(specifications_doc_meets_selection_doc(specs, selection)) + def test_links_to_log_files(self): + """ + Test redirect urls to subtask logfiles. + """ + + # the link to log files is a 'view' on the subtask, and NOT part of the subtask model. + # the link is served as an action on the REST API, redirecting to externally served log files. + # check/test the redirect urls. + with tmss_test_env.create_tmss_client() as client: + # observation + subtask_observation = create_subtask_object_for_testing("observation", "defined") + response = client.session.get(url=client.get_full_url_for_path('/subtask/%s/task_log' % (subtask_observation.id,)), allow_redirects=False) + self.assertTrue(response.is_redirect) + self.assertIn("proxy.lofar.eu", response.headers['Location']) + self.assertIn("rtcp-%s.errors" % subtask_observation.id, response.headers['Location']) + + # pipeline + subtask_pipeline = create_subtask_object_for_testing("pipeline", "defined") + response = client.session.get(url=client.get_full_url_for_path('/subtask/%s/task_log' % (subtask_pipeline.id,)), allow_redirects=False) + self.assertEqual(404, response.status_code) # no log (yet) for unscheduled pipeline + + # other (qa_plots) + subtask_qa_plots = create_subtask_object_for_testing("qa_plots", "defined") + self.assertEqual(404, response.status_code) # no log for other subtasktypes + class SettingTest(unittest.TestCase): @@ -359,22 +379,6 @@ class SettingTest(unittest.TestCase): with self.assertRaises(SubtaskSchedulingException): schedule_observation_subtask(obs_st) - def test_links_to_log_files(self): - """ - Test if the links to logging of a subtasks is correct: - For an observation the subtaskid is in the logging url - For a pipeline the radbid of the subtaskid is in the link, BUT because RA is not started is should - return "not available" - All other subtask types (like qa) should have an empty string (no logging) - """ - subtask_pipeline = create_subtask_object_for_testing("pipeline", "defined") - subtask_qa_plots = create_subtask_object_for_testing("qa_plots", "defined") - subtask_observation = create_subtask_object_for_testing("observation", "defined") - - self.assertIn("proxy.lofar.eu", subtask_observation.log_url) - self.assertIn("rtcp-%s.errors" % subtask_observation.id, subtask_observation.log_url) - self.assertIn("not available", subtask_pipeline.log_url) - self.assertEqual("", subtask_qa_plots.log_url) if __name__ == "__main__": diff --git a/SAS/TMSS/test/test_utils.py b/SAS/TMSS/test/test_utils.py index 2edeaae66b24887a9491527b23bbe6518f4456ae..7d559bb9800d4ad3112d49df59d3aa3094fec86a 100644 --- a/SAS/TMSS/test/test_utils.py +++ b/SAS/TMSS/test/test_utils.py @@ -142,8 +142,6 @@ class TMSSDjangoServerInstance(): self.port = port self.public_host = public_host or host self._server_process = None - self._exchange = exchange - self._broker = broker @property def host_address(self): @@ -190,8 +188,6 @@ class TMSSDjangoServerInstance(): # set these here, run django setup, and start the server os.environ["TMSS_LDAPCREDENTIALS"] = self.ldap_dbcreds_id os.environ["TMSS_DBCREDENTIALS"] = self.database_dbcreds_id - os.environ["TMSS_EXCHANGE"] = self._exchange - os.environ["TMSS_BROKER"] = self._broker os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings" django.setup() @@ -273,7 +269,10 @@ class TMSSTestEnvironment: '''Create and run a test django TMSS server against a newly created test database and a test ldap server (and cleanup automagically)''' def __init__(self, host: str='127.0.0.1', preferred_django_port: int=8000, public_host: str=None, exchange: str=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME), broker: str=os.environ.get("TMSS_BROKER", DEFAULT_BROKER), - populate_schemas:bool=False, populate_test_data:bool=False): + populate_schemas:bool=False, populate_test_data:bool=False, + start_postgres_listener: bool=True): + self._exchange = exchange + self._broker = broker self._populate_schemas = populate_schemas self._populate_test_data = populate_test_data self.ldap_server = TestLDAPServer(user='test', password='test') @@ -282,11 +281,12 @@ class TMSSTestEnvironment: ldap_dbcreds_id=self.ldap_server.dbcreds_id, host=host, port=find_free_port(preferred_django_port), - public_host=public_host, - exchange=exchange, - broker=broker) + public_host=public_host) self.client_credentials = TemporaryCredentials(user=self.ldap_server.dbcreds.user, password=self.ldap_server.dbcreds.password) + self._start_postgres_listener = start_postgres_listener + self.postgres_listener = None + # Check for correct Django version, should be at least 3.0 if django.VERSION[0] < 3: print("\nWARNING: YOU ARE USING DJANGO VERSION '%s', WHICH WILL NOT SUPPORT ALL FEATURES IN TMSS!\n" % @@ -318,13 +318,24 @@ class TMSSTestEnvironment: user.is_superuser = True user.save() + if self._start_postgres_listener: + # start the TMSSPGListener, so the changes in the database are posted as EventMessages on the bus + from lofar.sas.tmss.services.tmss_postgres_listener import TMSSPGListener + self.postgres_listener = TMSSPGListener(exchange=self._exchange, broker=self._broker, dbcreds=self.database.dbcreds) + self.postgres_listener.start() + if self._populate_schemas or self._populate_test_data: self.populate_schemas() if self._populate_test_data: self.populate_test_data() + def stop(self): + if self.postgres_listener is not None: + self.postgres_listener.stop() + self.postgres_listener = None + self.django_server.stop() self.ldap_server.stop() self.database.destroy() diff --git a/SAS/TMSS/test/tmss_test_environment_unittest_setup.py b/SAS/TMSS/test/tmss_test_environment_unittest_setup.py index 04b9882454838c474e06523d8017ebc9320aca07..45d148eb3754b3235d7e41909b691f1129d35031 100644 --- a/SAS/TMSS/test/tmss_test_environment_unittest_setup.py +++ b/SAS/TMSS/test/tmss_test_environment_unittest_setup.py @@ -32,7 +32,8 @@ from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment tmss_test_env = TMSSTestEnvironment() try: tmss_test_env.start() -except: +except Exception as e: + logger.exception(str(e)) tmss_test_env.stop() exit(1)