Skip to content
Snippets Groups Projects
Commit 193b00e3 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-180: (and TMSS-60) implemented a TMSSSubTaskBusListener in a TMSSClient...

TMSS-180: (and TMSS-60) implemented a TMSSSubTaskBusListener in a TMSSClient package, and used it to send/receice TMSS Subtask status changes messages
parent 93041608
No related branches found
No related tags found
1 merge request!154Resolve TMSS-60 and TMSS-171 and TMSS-198
# - Create for each LOFAR package a variable containing the absolute path to
# its source directory.
#
# Generated by gen_LofarPackageList_cmake.sh at Fr 28. Feb 20:47:32 CET 2020
# Generated by gen_LofarPackageList_cmake.sh at ma 4 mei 2020 12:07:08 CEST
#
# ---- DO NOT EDIT ----
#
......@@ -142,7 +142,6 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED)
set(OTDB_Comps_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Deployment/data/OTDB)
set(StaticMetaData_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Deployment/data/StaticMetaData)
set(WinCCPublisher_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/WinCCPublisher)
set(WinCCREST_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/WinCCREST)
set(WinCCDBBridge_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/WinCCDBBridge)
set(TaskManagement_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/Services/TaskManagement)
set(TBB_SOURCE_DIR ${CMAKE_SOURCE_DIR}/MAC/TBB)
......@@ -207,6 +206,7 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED)
set(RAScripts_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/RAScripts)
set(TaskPrescheduler_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/TaskPrescheduler)
set(RACommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/ResourceAssignment/Common)
set(TMSSClient_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/client)
set(TriggerEmailServiceCommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Common)
set(TriggerEmailServiceServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Server)
set(CCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/CCU_MAC)
......
......@@ -8,3 +8,5 @@ add_subdirectory(test)
add_subdirectory(frontend)
lofar_add_docker_files(docker-compose-tmss.yml)
lofar_add_package(TMSSClient client)
lofar_package(TMSSClient 0.1 DEPENDS PyCommon pyparameterset PyMessaging)
lofar_find_package(PythonInterp 3.4 REQUIRED)
include(PythonInstall)
set(_py_files
tmssbuslistener.py
)
python_install(${_py_files}
DESTINATION lofar/sas/tmss/client)
#!/usr/bin/env python3
# TMSSBusListener.py
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: TMSSBusListener.py 1580 2015-09-30 14:18:57Z loose $
"""
TMSSBusListener listens on the lofar notification message bus and calls (empty) on<SomeMessage> methods when such a message is received.
Typical usage is to derive your own subclass from TMSSBusListener and implement the specific on<SomeMessage> methods that you are interested in.
"""
from lofar.messaging.messagebus import BusListener, AbstractMessageHandler
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME, EventMessage
from lofar.common.util import waitForInterrupt, single_line_with_single_spaces
import logging
logger = logging.getLogger(__name__)
_DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE = 'TMSS.%s.notification'
DEFAULT_TMSS_TASK_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE % 'Task'
DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE % 'SubTask'
DEFAULT_TMSS_ALL_NOTIFICATION_PREFIX = _DEFAULT_TMSS_NOTIFICATION_PREFIX_TEMPLATE + '#'
class TMSSSubTaskEventMessageHandler(AbstractMessageHandler):
'''
Base-type messagehandler for handling TMSS event messages.
Typical usage is to derive your own subclass from TMSSSubTaskEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in.
'''
def handle_message(self, msg: EventMessage):
if not isinstance(msg, EventMessage):
raise ValueError("%s: Ignoring non-EventMessage: %s" % (self.__class__.__name__, msg))
stripped_subject = msg.subject.replace("%s." % DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX, '')
logger.info("%s.on%s: %s" % (self.__class__.__name__, stripped_subject, single_line_with_single_spaces(msg.content)))
if stripped_subject == 'Defining':
self.onSubTaskDefining(**msg.content)
elif stripped_subject == 'Defined':
self.onSubTaskDefined(**msg.content)
elif stripped_subject == 'Scheduling':
self.onSubTaskScheduling(**msg.content)
elif stripped_subject == 'Scheduled':
self.onSubTaskScheduled(**msg.content)
elif stripped_subject == 'Queueing':
self.onSubTaskQueueing(**msg.content)
elif stripped_subject == 'Queued':
self.onSubTaskQueued(**msg.content)
elif stripped_subject == 'Starting':
self.onSubTaskStarting(**msg.content)
elif stripped_subject == 'Started':
self.onSubTaskStarted(**msg.content)
elif stripped_subject == 'Finishing':
self.onSubTaskFinishing(**msg.content)
elif stripped_subject == 'Finishing':
self.onSubTaskDefined(**msg.content)
elif stripped_subject == 'Cancelling':
self.onSubTaskCancelling(**msg.content)
elif stripped_subject == 'Cancelled':
self.onSubTaskCancelled(**msg.content)
elif stripped_subject == 'Error':
self.onSubTaskError(**msg.content)
else:
raise ValueError("TMSSBusListener.handleMessage: unknown subject: %s" % msg.subject)
def onSubTaskDefining(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskDefining is called upon receiving a SubTaskDefining message, which is sent when a SubTasks changes state to "Defining".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskDefined(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskDefined is called upon received a SubTaskDefined message, which is sent when a SubTasks changes state to "Defined".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskScheduling(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskScheduling is called upon receiving a SubTaskScheduling message, which is sent when a SubTasks changes state to "Scheduling".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskScheduled(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskScheduled is called upon received a SubTaskScheduled message, which is sent when a SubTasks changes state to "Scheduled".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskQueueing(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskQueueing is called upon receiving a SubTaskQueueing message, which is sent when a SubTasks changes state to "Queueing".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskQueued(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskQueued is called upon received a SubTaskQueued message, which is sent when a SubTasks changes state to "Queued".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskStarting(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskStarting is called upon receiving a SubTaskStarting message, which is sent when a SubTasks changes state to "Starting".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskStarted(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskStarted is called upon received a SubTaskStarted message, which is sent when a SubTasks changes state to "Started".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskFinishing(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskFinishing is called upon receiving a SubTaskFinishing message, which is sent when a SubTasks changes state to "Finishing".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskFinished(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskFinished is called upon received a SubTaskFinished message, which is sent when a SubTasks changes state to "Finished".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskCancelling(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskCancelling is called upon receiving a SubTaskCancelling message, which is sent when a SubTasks changes state to "Cancelling".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskCancelled(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskCancelled is called upon received a SubTaskCancelled message, which is sent when a SubTasks changes state to "Cancelled".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
def onSubTaskError(self, subtask_id: int, old_state: str, new_state:str):
'''onSubTaskError is called upon receiving a SubTaskError message, which is sent when a SubTasks changes state to "Error".
:param subtask_id: the TMSS id of the SubTask
:param old_state: the previous state of the SubTask
:param new_state: the new state of the SubTask
'''
pass
class TMSSSubTaskBusListener(BusListener):
def __init__(self,
handler_type: TMSSSubTaskEventMessageHandler.__class__ = TMSSSubTaskEventMessageHandler,
handler_kwargs: dict = None,
exchange: str = DEFAULT_BUSNAME,
routing_key: str = DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX+".#",
num_threads: int = 1,
broker: str = DEFAULT_BROKER):
"""
TMSSSubTaskBusListener listens on the lofar notification message bus and calls on<SomeMessage> methods in the TMSSSubTaskEventMessageHandler when such a message is received.
Typical usage is to derive your own subclass from TMSSSubTaskEventMessageHandler and implement the specific on<SomeMessage> methods that you are interested in.
"""
if not issubclass(handler_type, TMSSSubTaskEventMessageHandler):
raise TypeError("handler_type should be a TMSSSubTaskEventMessageHandler subclass")
super().__init__(handler_type, handler_kwargs, exchange, routing_key, num_threads, broker)
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
class ExampleTMSSSubTaskEventMessageHandler(TMSSSubTaskEventMessageHandler):
def onSubTaskDefined(self, **kwargs):
logger.info("MyTMSSSubTaskEventMessageHandler.onSubTaskDefined(%s)", kwargs)
with TMSSSubTaskBusListener(handler_type=ExampleTMSSSubTaskEventMessageHandler):
waitForInterrupt()
......@@ -2,6 +2,9 @@
This file contains the database models
"""
import logging
logger = logging.getLogger(__name__)
from django.db.models import ForeignKey, CharField, DateTimeField, BooleanField, IntegerField, BigIntegerField, \
ManyToManyField, CASCADE, SET_NULL, PROTECT
from django.contrib.postgres.fields import ArrayField, JSONField
......@@ -12,6 +15,9 @@ from rest_framework.serializers import HyperlinkedRelatedField
from django.dispatch import receiver
from lofar.sas.tmss.tmss.tmssapp.validation import validate_json_against_schema
from lofar.messaging.messagebus import ToBus
from lofar.messaging.messages import EventMessage
from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX
#
# I/O
......@@ -148,6 +154,13 @@ class Subtask(BasicCommon):
# keep original state for logging
self.__original_state = self.state
@staticmethod
def _send_state_change_event_message(subtask_id:int, old_state: str, new_state: str):
with ToBus() as tobus: #TODO: do we want to connect to the bus for each new message, or have some global tobus?
msg = EventMessage(subject="%s.%s" % (DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX, new_state.capitalize()),
content={'subtask_id': subtask_id, 'old_state': old_state, 'new_state': new_state})
tobus.send(msg)
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
creating = self._state.adding # True on create, False on update
......@@ -157,7 +170,7 @@ class Subtask(BasicCommon):
super().save(force_insert, force_update, using, update_fields)
# log if either state update or new entry:
if self.state != self.__original_state or creating is True:
if self.state != self.__original_state or creating == True:
if self.created_or_updated_by_user is None:
identifier = None
else:
......@@ -166,6 +179,10 @@ class Subtask(BasicCommon):
user=self.created_or_updated_by_user, user_identifier=identifier)
log_entry.save()
try:
self._send_state_change_event_message(self.id, log_entry.old_state.value, log_entry.new_state.value)
except Exception as e:
logger.error("Could not send state change to messagebug: %s", e)
class SubtaskStateLog(BasicCommon):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment