Select Git revision
testRCUL.py
-
Paulus Kruger authoredPaulus Kruger authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_websocket_service.py 12.76 KiB
#!/usr/bin/env python3
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import time
import unittest
import uuid
import logging
logger = logging.getLogger('lofar.' + __name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment
from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
from lofar.sas.tmss.services.websocket_service import create_service, TMSSEventMessageHandlerForWebsocket, DEFAULT_WEBSOCKET_PORT
from lofar.common.test_utils import integration_test
from lofar.common.util import single_line_with_single_spaces, find_free_port
from collections import deque
from json import dumps as JSONdumps
from json import loads as JSONloads
import requests
from threading import Thread, Event
import websocket
@integration_test
class TestSubtaskSchedulingService(unittest.TestCase):
'''
Tests for the SubtaskSchedulingService
'''
def __init__(self, methodName: str = ...) -> None:
self.msg_queue = deque()
self.sync_event = Event()
self.ObjActions = TMSSEventMessageHandlerForWebsocket.ObjActions
self.ObjTypes = TMSSEventMessageHandlerForWebsocket.ObjTypes
super().__init__(methodName)
def start_ws_client(self, websocket_port: int=DEFAULT_WEBSOCKET_PORT):
# Setup and start the Websocket client
def on_message(ws, message):
json_doc = JSONloads(message)
logger.info('Received msg from ws: %s', single_line_with_single_spaces(json_doc))
self.msg_queue.append(json_doc)
self.sync_event.set()
def on_error(ws, error):
logger.info(error)
def on_open(ws):
logger.info('Connected to ws')
# Send auth token as first message after the WS handshake
response = requests.post(self.test_data_creator.django_api_url + '/token-auth/',
json={'username': 'paulus', 'password': 'pauluspass'})
ws.send(JSONdumps(response.json()))
def on_close(ws):
logger.info('Closed ws')
def thread_ws_starter():
self.ws = websocket.WebSocketApp("ws://127.0.0.1:%d/"%(websocket_port,),
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
self.ws.run_forever()
self.t = Thread(target=thread_ws_starter, daemon=True)
self.t.start()
@classmethod
def setUpClass(cls) -> None:
cls.TEST_UUID = uuid.uuid1()
cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
cls.tmp_exchange.open()
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_test_data=False, populate_schemas=False, start_websocket=False, start_postgres_listener=True, enable_viewflow=False)
cls.tmss_test_env.start()
cls.test_data_creator = cls.tmss_test_env.create_test_data_creator()
# Create group and permissions and add to user 'paulus' to test auth rules for WS messages
from django.contrib.auth.models import Group, Permission
from django.contrib.auth import get_user_model
User = get_user_model()
ws_test_group, _ = Group.objects.get_or_create(name='ws_test')
for model_name in ('schedulingunitdraft', 'taskdraft', 'schedulingunitblueprint', 'taskblueprint', 'subtask'):
ws_test_group.permissions.add(Permission.objects.get(codename='view_%s' % model_name))
ws_test_user, _ = User.objects.get_or_create(username='paulus', password='pauluspass')
ws_test_user.groups.add(ws_test_group)
while not ws_test_user.has_perm('tmssapp.view_subtask'):
ws_test_user = User.objects.get(username='paulus')
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
cls.tmp_exchange.close()
@unittest.skip('Skipped until TMSS-2601 is resolved and we do not send unexpected debug messages to the clients any more')
def test_01(self):
'''
This test starts a websocket service and tmss. Creates, updates and deletes objects to check if json_blobs from the ws service are properly received.
'''
logger.info(' -- test_01_for_expected_behaviour -- ')
websocket_port = find_free_port(DEFAULT_WEBSOCKET_PORT)
# create and start the service (the object under test)
service = create_service(websocket_port=websocket_port, exchange=self.tmp_exchange.address)
with BusListenerJanitor(service):
self.start_ws_client(websocket_port) # Start ws client
def test_object(json_test, obj_type, action): # Check if the correct/expected json_blobs arrive in the ws client
# Wait for incoming ws message
if not self.sync_event.wait(timeout=50):
raise TimeoutError()
self.sync_event.clear()
# Assert json_blobs
expected_json_blob = {'object_details': {'id': json_test['id']}, 'object_type': obj_type.value, 'action': action.value}
if action == self.ObjActions.CREATE or action == self.ObjActions.UPDATE:
for key in ('process_start_time', 'process_stop_time', 'duration', 'status_value', 'state_value', 'placed'):
if key in json_test:
expected_json_blob['object_details'][key] = json_test[key]
if key == 'duration' and json_test[key] is None:
expected_json_blob['object_details']['duration'] = 0
received_json_blob = self.msg_queue.popleft()
self.assertEqual(expected_json_blob, received_json_blob)
# ToDo: fix. this is flaky
# flush any lingering messages... some objects creations/updates cause a cascade of messages.
# we are only interested in the first one about the object itself.
time.sleep(1) # ugly.... but we have to wait for incoming lingering messages
while len(self.msg_queue):
self.msg_queue.popleft()
time.sleep(1) # ugly.... but we have to wait for (more) incoming lingering messages
# Test creations
# Test reservation create not authorised to receive messages
reservation = self.test_data_creator.post_data_and_get_response_as_json_object(
self.test_data_creator.Reservation(), '/reservation/')
# No need to assert, the WS client just should not receive any messages. Otherwise, the subsequent asserts would fail.
# Test scheduling_unit_draft create
su_draft = self.test_data_creator.post_data_and_get_response_as_json_object(
self.test_data_creator.SchedulingUnitDraft(), '/scheduling_unit_draft/')
test_object(su_draft, self.ObjTypes.SCHED_UNIT_DRAFT, self.ObjActions.CREATE)
# Test task_draft create
task_draft = self.test_data_creator.post_data_and_get_response_as_json_object(
self.test_data_creator.TaskDraft(scheduling_unit_draft_url=su_draft['url']), '/task_draft/')
test_object(task_draft, self.ObjTypes.TASK_DRAFT, self.ObjActions.CREATE)
# Test scheduling_unit_blueprint create
su_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(
self.test_data_creator.SchedulingUnitBlueprint(scheduling_unit_draft_url=su_draft['url']),
'/scheduling_unit_blueprint/')
test_object(su_blueprint, self.ObjTypes.SCHED_UNIT_BLUEPRINT, self.ObjActions.CREATE)
# Test task_blueprint create
task_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(
self.test_data_creator.TaskBlueprint(scheduling_unit_blueprint_url=su_blueprint['url'],
draft_url=task_draft['url']), '/task_blueprint/')
test_object(task_blueprint, self.ObjTypes.TASK_BLUEPRINT, self.ObjActions.CREATE)
# Test subtask create
subtask = self.test_data_creator.post_data_and_get_response_as_json_object(
self.test_data_creator.Subtask(task_blueprint_url=task_blueprint['url']), '/subtask/')
test_object(subtask, self.ObjTypes.SUBTASK, self.ObjActions.CREATE)
# Test updates
# with self.tmss_test_env.create_tmss_client() as client:
# Test subtask update
# TODO: fix.
# subtask = client.get_subtask(subtask['id'])
# client.set_subtask_status(subtask['id'], 'defined')
# subtask = client.get_subtask(subtask['id'])
# test_object(subtask, self.ObjTypes.SUBTASK, self.ObjActions.UPDATE)
# Test task_blueprint update
# TODO: fix. The update messages are received, but we flush the queue after asserting one object, so they are not found and the assert fails.
# Probably we want to only send the id and action, and let the client do a GET on the url, because the client can auth himself.
# task_blueprint = requests.get(task_blueprint['url'], auth=self.test_data_creator.auth).json()
# test_object(task_blueprint, self.ObjTypes.TASK_BLUEPRINT, self.ObjActions.UPDATE)
# Test scheduling_unit_blueprint update
# su_blueprint = requests.get(su_blueprint['url'], auth=self.test_data_creator.auth).json()
# test_object(su_blueprint, self.ObjTypes.SCHED_UNIT_BLUEPRINT, self.ObjActions.UPDATE)
# ToDo: fix. this is flaky
# # Test scheduling_unit_draft update
# su_draft['description'] = 'This is an update test'
# su_draft = requests.put(su_draft['url'], json=su_draft, auth=self.test_data_creator.auth).json()
# test_object(su_draft, self.ObjTypes.SCHED_UNIT_DRAFT, self.ObjActions.UPDATE)
#
# # Test task_draft update
# task_draft['description'] = 'This is an update test'
# task_draft = requests.put(task_draft['url'], json=task_draft, auth=self.test_data_creator.auth).json()
# test_object(task_draft, self.ObjTypes.TASK_DRAFT, self.ObjActions.UPDATE)
#
# # Test deletions
# # Test reservation delete not authorised to receive messages
# requests.delete(reservation['url'], auth=self.test_data_creator.auth)
# # No need to assert, the WS client just should not receive any messages. Otherwise, the subsequent asserts would fail.
#
# # Test subtask delete
# requests.delete(subtask['url'], auth=self.test_data_creator.auth)
# test_object({'id': subtask['id']}, self.ObjTypes.SUBTASK, self.ObjActions.DELETE)
#
# # Test task_blueprint delete
# requests.delete(task_blueprint['url'], auth=self.test_data_creator.auth)
# test_object({'id': task_blueprint['id']}, self.ObjTypes.TASK_BLUEPRINT, self.ObjActions.DELETE)
#
# # Test scheduling_unit_blueprint delete
# requests.delete(su_blueprint['url'], auth=self.test_data_creator.auth)
# test_object({'id': su_blueprint['id']}, self.ObjTypes.SCHED_UNIT_BLUEPRINT, self.ObjActions.DELETE)
#
# # Test task_draft delete
# requests.delete(task_draft['url'], auth=self.test_data_creator.auth)
# test_object({'id': task_draft['id']}, self.ObjTypes.TASK_DRAFT, self.ObjActions.DELETE)
#
# # Test scheduling_unit_draft delete
# requests.delete(su_draft['url'], auth=self.test_data_creator.auth)
# test_object({'id': su_draft['id']}, self.ObjTypes.SCHED_UNIT_DRAFT, self.ObjActions.DELETE)
if __name__ == '__main__':
#run the unit tests
unittest.main()