From 193b00e3f329c4b97f688f633f5842042a92d1db Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 4 May 2020 12:47:57 +0200 Subject: [PATCH] TMSS-180: (and TMSS-60) implemented a TMSSSubTaskBusListener in a TMSSClient package, and used it to send/receice TMSS Subtask status changes messages --- CMake/LofarPackageList.cmake | 4 +- SAS/TMSS/CMakeLists.txt | 2 + SAS/TMSS/client/CMakeLists.txt | 13 ++ SAS/TMSS/client/tmssbuslistener.py | 218 ++++++++++++++++++ .../src/tmss/tmssapp/models/scheduling.py | 19 +- 5 files changed, 253 insertions(+), 3 deletions(-) create mode 100644 SAS/TMSS/client/CMakeLists.txt create mode 100644 SAS/TMSS/client/tmssbuslistener.py diff --git a/CMake/LofarPackageList.cmake b/CMake/LofarPackageList.cmake index 3e4b94eb330..10ed0938ba2 100644 --- a/CMake/LofarPackageList.cmake +++ b/CMake/LofarPackageList.cmake @@ -1,7 +1,7 @@ # - Create for each LOFAR package a variable containing the absolute path to # its source directory. # -# Generated by gen_LofarPackageList_cmake.sh at Fr 28. Feb 20:47:32 CET 2020 +# Generated by gen_LofarPackageList_cmake.sh at ma 4 mei 2020 12:07:08 CEST # # ---- DO NOT EDIT ---- # @@ -142,7 +142,6 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(OTDB_Comps_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Deployment/data/OTDB) set(StaticMetaData_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Deployment/data/StaticMetaData) set(WinCCPublisher_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/WinCCPublisher) - set(WinCCREST_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/WinCCREST) set(WinCCDBBridge_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/WinCCDBBridge) set(TaskManagement_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Services/TaskManagement) set(TBB_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/TBB) @@ -207,6 +206,7 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(RAScripts_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/RAScripts) set(TaskPrescheduler_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/TaskPrescheduler) set(RACommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/Common) + set(TMSSClient_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/client) set(TriggerEmailServiceCommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Common) set(TriggerEmailServiceServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Server) set(CCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/CCU_MAC) diff --git a/SAS/TMSS/CMakeLists.txt b/SAS/TMSS/CMakeLists.txt index bfadf620e6b..afc70e4bdb9 100644 --- a/SAS/TMSS/CMakeLists.txt +++ b/SAS/TMSS/CMakeLists.txt @@ -8,3 +8,5 @@ add_subdirectory(test) add_subdirectory(frontend) lofar_add_docker_files(docker-compose-tmss.yml) + +lofar_add_package(TMSSClient client) diff --git a/SAS/TMSS/client/CMakeLists.txt b/SAS/TMSS/client/CMakeLists.txt new file mode 100644 index 00000000000..362c98b257a --- /dev/null +++ b/SAS/TMSS/client/CMakeLists.txt @@ -0,0 +1,13 @@ +lofar_package(TMSSClient 0.1 DEPENDS PyCommon pyparameterset PyMessaging) + +lofar_find_package(PythonInterp 3.4 REQUIRED) + +include(PythonInstall) + +set(_py_files + tmssbuslistener.py + ) + +python_install(${_py_files} + DESTINATION lofar/sas/tmss/client) + diff --git a/SAS/TMSS/client/tmssbuslistener.py b/SAS/TMSS/client/tmssbuslistener.py new file mode 100644 index 00000000000..456fda76828 --- /dev/null +++ b/SAS/TMSS/client/tmssbuslistener.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 + +# TMSSBusListener.py +# +# Copyright (C) 2015 +# ASTRON (Netherlands Institute for Radio Astronomy) +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it +# and/or modify it under the terms of the GNU General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# +# $Id: TMSSBusListener.py 1580 2015-09-30 14:18:57Z loose $ + +""" +TMSSBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received. +Typical usage is to derive your own subclass from TMSSBusListener and implement the specific on<SomeMessage> methods that you are interested in. +""" + +from lofar.messaging.messagebus import BusListener, AbstractMessageHandler +from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, EventMessage +from lofar.common.util import waitForInterrupt, single_line_with_single_spaces + +import logging +logger = logging.getLogger(__name__) + + +_DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE = 'TMSS.%s.notification' +DEFAULT_TMSS_TASK_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE % 'Task' +DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE % 'SubTask' +DEFAULT_TMSS_ALL_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE + '#' + + +class TMSSSubTaskEventMessageHandler(AbstractMessageHandler): + ''' + Base-type messagehandler for handling TMSS event messages. + Typical usage is to derive your own subclass from TMSSSubTaskEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. + ''' + def handle_message(self, msg: EventMessage): + if not isinstance(msg, EventMessage): + raise ValueError("%s: Ignoring non-EventMessage: %s" % (self.__class__.__name__, msg)) + + stripped_subject = msg.subject.replace("%s." % DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX, '') + + logger.info("%s.on%s: %s" % (self.__class__.__name__, stripped_subject, single_line_with_single_spaces(msg.content))) + + if stripped_subject == 'Defining': + self.onSubTaskDefining(**msg.content) + elif stripped_subject == 'Defined': + self.onSubTaskDefined(**msg.content) + elif stripped_subject == 'Scheduling': + self.onSubTaskScheduling(**msg.content) + elif stripped_subject == 'Scheduled': + self.onSubTaskScheduled(**msg.content) + elif stripped_subject == 'Queueing': + self.onSubTaskQueueing(**msg.content) + elif stripped_subject == 'Queued': + self.onSubTaskQueued(**msg.content) + elif stripped_subject == 'Starting': + self.onSubTaskStarting(**msg.content) + elif stripped_subject == 'Started': + self.onSubTaskStarted(**msg.content) + elif stripped_subject == 'Finishing': + self.onSubTaskFinishing(**msg.content) + elif stripped_subject == 'Finishing': + self.onSubTaskDefined(**msg.content) + elif stripped_subject == 'Cancelling': + self.onSubTaskCancelling(**msg.content) + elif stripped_subject == 'Cancelled': + self.onSubTaskCancelled(**msg.content) + elif stripped_subject == 'Error': + self.onSubTaskError(**msg.content) + else: + raise ValueError("TMSSBusListener.handleMessage: unknown subject: %s" % msg.subject) + + def onSubTaskDefining(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskDefining is called upon receiving a SubTaskDefining message, which is sent when a SubTasks changes state to "Defining". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskDefined(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskDefined is called upon received a SubTaskDefined message, which is sent when a SubTasks changes state to "Defined". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskScheduling(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskScheduling is called upon receiving a SubTaskScheduling message, which is sent when a SubTasks changes state to "Scheduling". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskScheduled is called upon received a SubTaskScheduled message, which is sent when a SubTasks changes state to "Scheduled". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskQueueing(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskQueueing is called upon receiving a SubTaskQueueing message, which is sent when a SubTasks changes state to "Queueing". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskQueued(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskQueued is called upon received a SubTaskQueued message, which is sent when a SubTasks changes state to "Queued". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskStarting(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskStarting is called upon receiving a SubTaskStarting message, which is sent when a SubTasks changes state to "Starting". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskStarted(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskStarted is called upon received a SubTaskStarted message, which is sent when a SubTasks changes state to "Started". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskFinishing(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskFinishing is called upon receiving a SubTaskFinishing message, which is sent when a SubTasks changes state to "Finishing". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskFinished(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskFinished is called upon received a SubTaskFinished message, which is sent when a SubTasks changes state to "Finished". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskCancelling(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskCancelling is called upon receiving a SubTaskCancelling message, which is sent when a SubTasks changes state to "Cancelling". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskCancelled(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskCancelled is called upon received a SubTaskCancelled message, which is sent when a SubTasks changes state to "Cancelled". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + def onSubTaskError(self, subtask_id: int, old_state: str, new_state:str): + '''onSubTaskError is called upon receiving a SubTaskError message, which is sent when a SubTasks changes state to "Error". + :param subtask_id: the TMSS id of the SubTask + :param old_state: the previous state of the SubTask + :param new_state: the new state of the SubTask + ''' + pass + + +class TMSSSubTaskBusListener(BusListener): + def __init__(self, + handler_type: TMSSSubTaskEventMessageHandler.__class__ = TMSSSubTaskEventMessageHandler, + handler_kwargs: dict = None, + exchange: str = DEFAULT_BUSNAME, + routing_key: str = DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX+".#", + num_threads: int = 1, + broker: str = DEFAULT_BROKER): + """ + TMSSSubTaskBusListener listens on the lofar notification message bus and calls on<SomeMessage> methods in the TMSSSubTaskEventMessageHandler when such a message is received. + Typical usage is to derive your own subclass from TMSSSubTaskEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in. + """ + if not issubclass(handler_type, TMSSSubTaskEventMessageHandler): + raise TypeError("handler_type should be a TMSSSubTaskEventMessageHandler subclass") + + super().__init__(handler_type, handler_kwargs, exchange, routing_key, num_threads, broker) + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + class ExampleTMSSSubTaskEventMessageHandler(TMSSSubTaskEventMessageHandler): + def onSubTaskDefined(self, **kwargs): + logger.info("MyTMSSSubTaskEventMessageHandler.onSubTaskDefined(%s)", kwargs) + + with TMSSSubTaskBusListener(handler_type=ExampleTMSSSubTaskEventMessageHandler): + waitForInterrupt() + diff --git a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py index 2208e73633e..e33dd8b1c65 100644 --- a/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py +++ b/SAS/TMSS/src/tmss/tmssapp/models/scheduling.py @@ -2,6 +2,9 @@ This file contains the database models """ +import logging +logger = logging.getLogger(__name__) + from django.db.models import ForeignKey, CharField, DateTimeField, BooleanField, IntegerField, BigIntegerField, \ ManyToManyField, CASCADE, SET_NULL, PROTECT from django.contrib.postgres.fields import ArrayField, JSONField @@ -12,6 +15,9 @@ from rest_framework.serializers import HyperlinkedRelatedField from django.dispatch import receiver from lofar.sas.tmss.tmss.tmssapp.validation import validate_json_against_schema +from lofar.messaging.messagebus import ToBus +from lofar.messaging.messages import EventMessage +from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX # # I/O @@ -148,6 +154,13 @@ class Subtask(BasicCommon): # keep original state for logging self.__original_state = self.state + @staticmethod + def _send_state_change_event_message(subtask_id:int, old_state: str, new_state: str): + with ToBus() as tobus: #TODO: do we want to connect to the bus for each new message, or have some global tobus? + msg = EventMessage(subject="%s.%s" % (DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX, new_state.capitalize()), + content={'subtask_id': subtask_id, 'old_state': old_state, 'new_state': new_state}) + tobus.send(msg) + def save(self, force_insert=False, force_update=False, using=None, update_fields=None): creating = self._state.adding # True on create, False on update @@ -157,7 +170,7 @@ class Subtask(BasicCommon): super().save(force_insert, force_update, using, update_fields) # log if either state update or new entry: - if self.state != self.__original_state or creating is True: + if self.state != self.__original_state or creating == True: if self.created_or_updated_by_user is None: identifier = None else: @@ -166,6 +179,10 @@ class Subtask(BasicCommon): user=self.created_or_updated_by_user, user_identifier=identifier) log_entry.save() + try: + self._send_state_change_event_message(self.id, log_entry.old_state.value, log_entry.new_state.value) + except Exception as e: + logger.error("Could not send state change to messagebug: %s", e) class SubtaskStateLog(BasicCommon): -- GitLab