Skip to content
Snippets Groups Projects
Commit 73cdcd8d authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-413: added skeleton for tmss websocket service, which listen on the...

TMSS-413: added skeleton for tmss websocket service, which listen on the messagebus for tmss event messages and posts (a translation) on an http websocket.
parent d6f8b079
No related branches found
No related tags found
1 merge request!282Resolve TMSS-417
Showing
with 300 additions and 1 deletion
# - Create for each LOFAR package a variable containing the absolute path to # - Create for each LOFAR package a variable containing the absolute path to
# its source directory. # its source directory.
# #
# Generated by gen_LofarPackageList_cmake.sh at do 29 okt 2020 7:42:34 CET # Generated by gen_LofarPackageList_cmake.sh at ma 2 nov 2020 19:21:07 CET
# #
# ---- DO NOT EDIT ---- # ---- DO NOT EDIT ----
# #
...@@ -210,6 +210,7 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) ...@@ -210,6 +210,7 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED)
set(TMSSSubtaskSchedulingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/subtask_scheduling) set(TMSSSubtaskSchedulingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/subtask_scheduling)
set(TMSSFeedbackHandlingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/feedback_handling) set(TMSSFeedbackHandlingService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/feedback_handling)
set(TMSSPostgresListenerService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/tmsspglistener) set(TMSSPostgresListenerService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/tmsspglistener)
set(TMSSWebSocketService_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TMSS/services/websocket)
set(TriggerEmailServiceCommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Common) set(TriggerEmailServiceCommon_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Common)
set(TriggerEmailServiceServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Server) set(TriggerEmailServiceServer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SAS/TriggerEmailService/Server)
set(CCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/CCU_MAC) set(CCU_MAC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/CCU_MAC)
......
lofar_add_package(TMSSSubtaskSchedulingService subtask_scheduling) lofar_add_package(TMSSSubtaskSchedulingService subtask_scheduling)
lofar_add_package(TMSSFeedbackHandlingService feedback_handling) lofar_add_package(TMSSFeedbackHandlingService feedback_handling)
lofar_add_package(TMSSPostgresListenerService tmsspglistener) lofar_add_package(TMSSPostgresListenerService tmsspglistener)
lofar_add_package(TMSSWebSocketService websocket)
lofar_package(TMSSSubtaskSchedulingService 0.1 DEPENDS TMSSClient PyCommon pyparameterset PyMessaging)
lofar_find_package(PythonInterp 3.4 REQUIRED)
add_subdirectory(lib)
add_subdirectory(bin)
add_subdirectory(test)
lofar_add_bin_scripts(tmss_websocket_service)
# supervisord config files
lofar_add_sysconf_files(tmss_websocket_service.ini DESTINATION supervisord.d)
#!/usr/bin/python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
from lofar.sas.tmss.services.websocket_service import main
if __name__ == "__main__":
main()
[program:tmss_websocket_service]
command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec tmss_websocket_service'
user=lofarsys
stopsignal=INT ; KeyboardInterrupt
stopasgroup=true ; bash does not propagate signals
stdout_logfile=%(program_name)s.log
redirect_stderr=true
stderr_logfile=NONE
stdout_logfile_maxbytes=0
lofar_find_package(PythonInterp 3.4 REQUIRED)
include(PythonInstall)
set(_py_files
websocket_service.py
)
python_install(${_py_files}
DESTINATION lofar/sas/tmss/services)
#!/usr/bin/env python3
# subtask_scheduling.py
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import os
from optparse import OptionParser, OptionGroup
import logging
logger = logging.getLogger(__name__)
from lofar.common import dbcredentials
from lofar.sas.tmss.client.tmssbuslistener import *
class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler):
'''
'''
def __init__(self) -> None:
self._socket = None # TODO: create websocket
super().__init__()
def start_handling(self):
#TODO: "open" websocket if needed
super().start_handling()
def stop_handling(self):
#TODO: "close" websocket if needed
super().stop_handling()
def _post_update_on_websocket(self, json_blob):
#TODO: post the jsob_blob on the socket
#TODO: do we want the json_blob as argument, or parameters like: object_type (subtask, task_blueprint, etc) id, action (create/update/delete)
#TODO: do we want to post just the id, object_type and action? Or the full object? If the latter, then fetch the object from the database, and post it as json.
pass
def onSubTaskCreated(self, id: int):
self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'create'})
def onSubTaskUpdated(self, id: int):
self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'update'})
def onSubTaskDeleted(self, id: int):
self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'delete'})
def onTaskDraftCreated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskDraftUpdated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskDraftDeleted(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskBlueprintCreated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskBlueprintUpdated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskBlueprintDeleted(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitDraftCreated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitDraftUpdated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitDraftDeleted(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitBlueprintCreated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitBlueprintUpdated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitBlueprintDeleted(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def create_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
return TMSSBusListener(handler_type=TMSSEventMessageHandlerForWebsocket,
exchange=exchange, broker=broker)
def main():
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='run the tmss_websocket_service which listens for TMSS event messages on the messagebus, and posts the updates on the websocket for htpp clients.')
group = OptionGroup(parser, 'Messaging options')
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the message broker, default: %default')
group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME,
help="exchange where the TMSS event messages are published. [default: %default]")
parser.add_option_group(group)
parser.add_option_group(dbcredentials.options_group(parser))
parser.set_defaults(dbcredentials=os.environ.get('TMSS_CLIENT_DBCREDENTIALS', 'TMSS'))
(options, args) = parser.parse_args()
dbcreds = dbcredentials.parse_options(options)
logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
# setup django
# TODO: this is only needed if we fetch objects from the database (see above). Otherwise, remove.
os.environ["TMSS_DBCREDENTIALS"] = options.dbcredentials
os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
import django
django.setup()
with create_service(options.exchange, options.broker):
waitForInterrupt()
if __name__ == '__main__':
main()
# $Id: CMakeLists.txt 32679 2015-10-26 09:31:56Z schaap $
if(BUILD_TESTING)
include(LofarCTest)
lofar_add_test(t_websocket_service)
endif()
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import unittest
import uuid
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
from lofar.sas.tmss.services.websocket_service import create_service
from lofar.common.test_utils import integration_test
from time import sleep
from datetime import datetime, timedelta
@integration_test
class TestSubtaskSchedulingService(unittest.TestCase):
'''
Tests for the SubtaskSchedulingService
'''
@classmethod
def setUpClass(cls) -> None:
cls.TEST_UUID = uuid.uuid1()
cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
cls.tmp_exchange.open()
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_test_data=False, populate_schemas=False)
cls.tmss_test_env.start()
cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url,
(cls.tmss_test_env.ldap_server.dbcreds.user, cls.tmss_test_env.ldap_server.dbcreds.password))
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
cls.tmp_exchange.close()
def test_01(self):
'''
This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled.
'''
logger.info(' -- test_01_for_expected_behaviour -- ')
# create and start the service (the object under test)
service = create_service(exchange=self.tmp_exchange.address)
with BusListenerJanitor(service):
pass
# TODO: setup http websocket client which connects to our TMSS websocket
# TODO: create/update/delete objects like SubTask, TaskBlueprint etc
# TODO: check if the correct/expected json_blobs arrive in the websocket client
if __name__ == '__main__':
#run the unit tests
unittest.main()
#!/bin/bash
# Run the unit test
source python-coverage.sh
python_coverage_test "*tmss*" t_websocket_service.py
#!/bin/sh
./runctest.sh t_websocket_service
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment