Skip to content
Snippets Groups Projects
Select Git revision
  • 1382e4b5970518d9b4d679f93f9ecf42f4ba5e41
  • master default protected
  • dither_on_off_disabled
  • yocto
  • pypcc2
  • pypcc3
  • 2020-12-07-the_only_working_copy
  • v2.0
  • v1.0
  • v0.9
  • Working-RCU_ADC,ID
  • 2020-12-11-Holiday_Season_release
12 results

testRCUL.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    t_websocket_service.py 12.76 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2012-2015  ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    import time
    import unittest
    import uuid
    
    import logging
    logger = logging.getLogger('lofar.' + __name__)
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
    from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment
    
    from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
    from lofar.sas.tmss.services.websocket_service import create_service, TMSSEventMessageHandlerForWebsocket, DEFAULT_WEBSOCKET_PORT
    from lofar.common.test_utils import integration_test
    from lofar.common.util import single_line_with_single_spaces, find_free_port
    
    from collections import deque
    from json import dumps as JSONdumps
    from json import loads as JSONloads
    import requests
    from threading import Thread, Event
    import websocket
    
    
    @integration_test
    class TestSubtaskSchedulingService(unittest.TestCase):
        '''
        Tests for the SubtaskSchedulingService
        '''
    
        def __init__(self, methodName: str = ...) -> None:
            self.msg_queue = deque()
            self.sync_event = Event()
            self.ObjActions = TMSSEventMessageHandlerForWebsocket.ObjActions
            self.ObjTypes = TMSSEventMessageHandlerForWebsocket.ObjTypes
            super().__init__(methodName)
    
        def start_ws_client(self, websocket_port: int=DEFAULT_WEBSOCKET_PORT):
            # Setup and start the Websocket client
    
            def on_message(ws, message):
                json_doc = JSONloads(message)
                logger.info('Received msg from ws: %s', single_line_with_single_spaces(json_doc))
                self.msg_queue.append(json_doc)
                self.sync_event.set()
    
            def on_error(ws, error):
                logger.info(error)
    
            def on_open(ws):
                logger.info('Connected to ws')
                # Send auth token as first message after the WS handshake
                response = requests.post(self.test_data_creator.django_api_url + '/token-auth/',
                                         json={'username': 'paulus', 'password': 'pauluspass'})
                ws.send(JSONdumps(response.json()))
    
            def on_close(ws):
                logger.info('Closed ws')
    
            def thread_ws_starter():
                self.ws = websocket.WebSocketApp("ws://127.0.0.1:%d/"%(websocket_port,),
                                            on_open=on_open,
                                            on_message=on_message,
                                            on_error=on_error,
                                            on_close=on_close)
                self.ws.run_forever()
    
            self.t = Thread(target=thread_ws_starter, daemon=True)
            self.t.start()
    
        @classmethod
        def setUpClass(cls) -> None:
            cls.TEST_UUID = uuid.uuid1()
    
            cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
            cls.tmp_exchange.open()
    
            cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_test_data=False, populate_schemas=False, start_websocket=False, start_postgres_listener=True, enable_viewflow=False)
            cls.tmss_test_env.start()
    
            cls.test_data_creator = cls.tmss_test_env.create_test_data_creator()
    
            # Create group and permissions and add to user 'paulus' to test auth rules for WS messages
            from django.contrib.auth.models import Group, Permission
            from django.contrib.auth import get_user_model
            User = get_user_model()
    
            ws_test_group, _ = Group.objects.get_or_create(name='ws_test')
            for model_name in ('schedulingunitdraft', 'taskdraft', 'schedulingunitblueprint', 'taskblueprint', 'subtask'):
                ws_test_group.permissions.add(Permission.objects.get(codename='view_%s' % model_name))
            ws_test_user, _ = User.objects.get_or_create(username='paulus', password='pauluspass')
            ws_test_user.groups.add(ws_test_group)
            while not ws_test_user.has_perm('tmssapp.view_subtask'):
                ws_test_user = User.objects.get(username='paulus')
    
        @classmethod
        def tearDownClass(cls) -> None:
            cls.tmss_test_env.stop()
            cls.tmp_exchange.close()
    
        @unittest.skip('Skipped until TMSS-2601 is resolved and we do not send unexpected debug messages to the clients any more')
        def test_01(self):
            '''
            This test starts a websocket service and tmss. Creates, updates and deletes objects to check if json_blobs from the ws service are properly received.
            '''
    
            logger.info(' -- test_01_for_expected_behaviour -- ')
    
            websocket_port = find_free_port(DEFAULT_WEBSOCKET_PORT)
    
            # create and start the service (the object under test)
            service = create_service(websocket_port=websocket_port, exchange=self.tmp_exchange.address)
            with BusListenerJanitor(service):
    
                self.start_ws_client(websocket_port)  # Start ws client
    
                def test_object(json_test, obj_type, action):   # Check if the correct/expected json_blobs arrive in the ws client
                    # Wait for incoming ws message
                    if not self.sync_event.wait(timeout=50):
                        raise TimeoutError()
                    self.sync_event.clear()
                    # Assert json_blobs
                    expected_json_blob = {'object_details': {'id': json_test['id']}, 'object_type': obj_type.value, 'action': action.value}
                    if action == self.ObjActions.CREATE or action == self.ObjActions.UPDATE:
                        for key in ('process_start_time', 'process_stop_time', 'duration', 'status_value', 'state_value', 'placed'):
                            if key in json_test:
                                expected_json_blob['object_details'][key] = json_test[key]
                                if key == 'duration' and json_test[key] is None:
                                    expected_json_blob['object_details']['duration'] = 0
    
                    received_json_blob = self.msg_queue.popleft()
                    self.assertEqual(expected_json_blob, received_json_blob)
    
                    # ToDo: fix. this is flaky
                    # flush any lingering messages... some objects creations/updates cause a cascade of messages.
                    # we are only interested in the first one about the object itself.
                    time.sleep(1) # ugly.... but we have to wait for incoming lingering messages
                    while len(self.msg_queue):
                        self.msg_queue.popleft()
                        time.sleep(1) # ugly.... but we have to wait for (more) incoming lingering messages
    
    
                # Test creations
                # Test reservation create not authorised to receive messages
                reservation = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.Reservation(), '/reservation/')
                # No need to assert, the WS client just should not receive any messages. Otherwise, the subsequent asserts would fail.
    
                # Test scheduling_unit_draft create
                su_draft = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.SchedulingUnitDraft(), '/scheduling_unit_draft/')
                test_object(su_draft, self.ObjTypes.SCHED_UNIT_DRAFT, self.ObjActions.CREATE)
    
                # Test task_draft create
                task_draft = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.TaskDraft(scheduling_unit_draft_url=su_draft['url']), '/task_draft/')
                test_object(task_draft, self.ObjTypes.TASK_DRAFT, self.ObjActions.CREATE)
    
                # Test scheduling_unit_blueprint create
                su_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.SchedulingUnitBlueprint(scheduling_unit_draft_url=su_draft['url']),
                    '/scheduling_unit_blueprint/')
                test_object(su_blueprint, self.ObjTypes.SCHED_UNIT_BLUEPRINT, self.ObjActions.CREATE)
    
                # Test task_blueprint create
                task_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.TaskBlueprint(scheduling_unit_blueprint_url=su_blueprint['url'],
                                                         draft_url=task_draft['url']), '/task_blueprint/')
                test_object(task_blueprint, self.ObjTypes.TASK_BLUEPRINT, self.ObjActions.CREATE)
    
                # Test subtask create
                subtask = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.Subtask(task_blueprint_url=task_blueprint['url']), '/subtask/')
                test_object(subtask, self.ObjTypes.SUBTASK, self.ObjActions.CREATE)
    
                # Test updates
                # with self.tmss_test_env.create_tmss_client() as client:
    
                    # Test subtask update
                    # TODO: fix.
                    # subtask = client.get_subtask(subtask['id'])
                    # client.set_subtask_status(subtask['id'], 'defined')
                    # subtask = client.get_subtask(subtask['id'])
                    # test_object(subtask, self.ObjTypes.SUBTASK, self.ObjActions.UPDATE)
    
                    # Test task_blueprint update
                    # TODO: fix. The update messages are received, but we flush the queue after asserting one object, so they are not found and the assert fails.
                    # Probably we want to only send the id and action, and let the client do a GET on the url, because the client can auth himself.
                    # task_blueprint = requests.get(task_blueprint['url'], auth=self.test_data_creator.auth).json()
                    # test_object(task_blueprint, self.ObjTypes.TASK_BLUEPRINT, self.ObjActions.UPDATE)
    
                    # Test scheduling_unit_blueprint update
                    # su_blueprint = requests.get(su_blueprint['url'], auth=self.test_data_creator.auth).json()
                    # test_object(su_blueprint, self.ObjTypes.SCHED_UNIT_BLUEPRINT, self.ObjActions.UPDATE)
    
                # ToDo: fix. this is flaky
                # # Test scheduling_unit_draft update
                # su_draft['description'] = 'This is an update test'
                # su_draft = requests.put(su_draft['url'], json=su_draft, auth=self.test_data_creator.auth).json()
                # test_object(su_draft, self.ObjTypes.SCHED_UNIT_DRAFT, self.ObjActions.UPDATE)
                #
                # # Test task_draft update
                # task_draft['description'] = 'This is an update test'
                # task_draft = requests.put(task_draft['url'], json=task_draft, auth=self.test_data_creator.auth).json()
                # test_object(task_draft, self.ObjTypes.TASK_DRAFT, self.ObjActions.UPDATE)
                #
                # # Test deletions
                # # Test reservation delete not authorised to receive messages
                # requests.delete(reservation['url'], auth=self.test_data_creator.auth)
                # # No need to assert, the WS client just should not receive any messages. Otherwise, the subsequent asserts would fail.
                #
                # # Test subtask delete
                # requests.delete(subtask['url'], auth=self.test_data_creator.auth)
                # test_object({'id': subtask['id']}, self.ObjTypes.SUBTASK, self.ObjActions.DELETE)
                #
                # # Test task_blueprint delete
                # requests.delete(task_blueprint['url'], auth=self.test_data_creator.auth)
                # test_object({'id': task_blueprint['id']}, self.ObjTypes.TASK_BLUEPRINT, self.ObjActions.DELETE)
                #
                # # Test scheduling_unit_blueprint delete
                # requests.delete(su_blueprint['url'], auth=self.test_data_creator.auth)
                # test_object({'id': su_blueprint['id']}, self.ObjTypes.SCHED_UNIT_BLUEPRINT, self.ObjActions.DELETE)
                #
                # # Test task_draft delete
                # requests.delete(task_draft['url'], auth=self.test_data_creator.auth)
                # test_object({'id': task_draft['id']}, self.ObjTypes.TASK_DRAFT, self.ObjActions.DELETE)
                #
                # # Test scheduling_unit_draft delete
                # requests.delete(su_draft['url'], auth=self.test_data_creator.auth)
                # test_object({'id': su_draft['id']}, self.ObjTypes.SCHED_UNIT_DRAFT, self.ObjActions.DELETE)
    
    
    if __name__ == '__main__':
        #run the unit tests
        unittest.main()