Select Git revision
t_websocket_service.py

TMSS-413: Add python-socketio[asyncio_client] module and initial test client
Mario Raciti authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_websocket_service.py 3.87 KiB
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import unittest
import uuid
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
from lofar.sas.tmss.services.websocket_service import create_service
from lofar.common.test_utils import integration_test
from time import sleep
from datetime import datetime, timedelta
import socketio
class WebsocketClient():
'''
Websocket Client for the TMSSEventMessageHandlerForWebsocket
'''
sio = socketio.Client()
# FIXME: Find out a solution for placing socketio inside a class
def __init__(self):
self.sio.on('connect', self.connect)
self.sio.on('message', self.message)
self.sio.on('disconnect', self.disconnect)
self.sio.connect('ws://localhost:5678')
self.sio.wait()
# @sio.event
def connect(self):
print('>> Connection established')
# @sio.event
async def message(self, data):
print('>> Received: ', data)
await self.sio.emit('message', {'msg': 'Hello from client'})
# @sio.event
def disconnect(self):
print('>> Disconnected from server')
@integration_test
class TestSubtaskSchedulingService(unittest.TestCase):
'''
Tests for the SubtaskSchedulingService
'''
websocket = WebsocketClient() # TODO: find a better place
@classmethod
def setUpClass(cls) -> None:
cls.TEST_UUID = uuid.uuid1()
cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
cls.tmp_exchange.open()
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_test_data=False, populate_schemas=False)
cls.tmss_test_env.start()
cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url,
(cls.tmss_test_env.ldap_server.dbcreds.user, cls.tmss_test_env.ldap_server.dbcreds.password))
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
cls.tmp_exchange.close()
def test_01(self):
'''
This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled.
'''
logger.info(' -- test_01_for_expected_behaviour -- ')
# create and start the service (the object under test)
service = create_service(exchange=self.tmp_exchange.address)
with BusListenerJanitor(service):
pass
# TODO: setup http websocket client which connects to our TMSS websocket
# TODO: create/update/delete objects like SubTask, TaskBlueprint etc
# TODO: check if the correct/expected json_blobs arrive in the websocket client
if __name__ == '__main__':
#run the unit tests
unittest.main()