Skip to content
Snippets Groups Projects
Select Git revision
  • e29af57fa6a5d758b00a8834f95c64dd49ca1719
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

t_websocket_service.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    t_websocket_service.py 8.85 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2012-2015  ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    import unittest
    import uuid
    
    import logging
    logger = logging.getLogger('lofar.' + __name__)
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
    from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
    
    from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
    from lofar.sas.tmss.services.websocket_service import create_service
    from lofar.common.test_utils import integration_test
    
    from collections import deque
    from json import loads as JSONLoads
    import requests
    from threading import Thread, Event
    import websocket
    
    
    @integration_test
    class TestSubtaskSchedulingService(unittest.TestCase):
        '''
        Tests for the SubtaskSchedulingService
        '''
    
        def __init__(self, methodName: str = ...) -> None:
            self.msg_queue = deque()
            self.sync_event = Event()
            super().__init__(methodName)
    
        def start_ws_client(self):
            # Setup and start the Websocket client
    
            def on_message(ws, message):
                logger.info('Received msg from ws')
                # logger.info('\n\nBefore %s\n\n', self.msg_queue)
                self.msg_queue.append(JSONLoads(message))
                # logger.info('\n\nAfter %s\n\n', self.msg_queue)
                self.sync_event.set()
    
            def on_error(ws, error):
                logger.info(error)
    
            def on_open(ws):
                logger.info('Connected to ws')
    
            def on_close(ws):
                logger.info('Closed ws')
    
            websocket.enableTrace(True)
            def thread_ws_starter():
                self.ws = websocket.WebSocketApp("ws://127.0.0.1:5678/",
                                            on_open=on_open,
                                            on_message=on_message,
                                            on_error=on_error,
                                            on_close=on_close)
                self.ws.run_forever()
    
            self.t = Thread(target=thread_ws_starter, daemon=True)
            self.t.start()
    
        @classmethod
        def setUpClass(cls) -> None:
            cls.TEST_UUID = uuid.uuid1()
    
            cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
            cls.tmp_exchange.open()
    
            cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_test_data=False, populate_schemas=False, start_websocket=False, start_postgres_listener=True, enable_viewflow=False)
            cls.tmss_test_env.start()
    
            cls.test_data_creator = cls.tmss_test_env.create_test_data_creator()
    
        @classmethod
        def tearDownClass(cls) -> None:
            cls.tmss_test_env.stop()
            cls.tmp_exchange.close()
    
        def test_01(self):
            '''
            This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled.
            '''
    
            logger.info(' -- test_01_for_expected_behaviour -- ')
    
            # create and start the service (the object under test)
            service = create_service(exchange=self.tmp_exchange.address)
            with BusListenerJanitor(service):
    
                self.start_ws_client()  # Start ws client
    
                def test_object(json_test, obj_type, action):   # Check if the correct/expected json_blobs arrive in the ws client
                    # Wait for incoming ws message
                    if not self.sync_event.wait(timeout=5):
                        raise TimeoutError()
                    self.sync_event.clear()
                    # Assert json_blobs
                    json_blob = {'id': json_test['id'], 'object_type': obj_type, 'action': action}
                    if action == 'create' or action == 'update':
                        json_blob['object'] = json_test
                    self.assertEqual(json_blob, self.msg_queue.popleft())
    
                # TODO: Add enums for obj_type and action like in ws service
    
                # Test scheduling_unit_draft create
                logger.info('\n\n\nCreate su_draft\n\n')
                su_draft = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.SchedulingUnitDraft(), '/scheduling_unit_draft/')
                test_object(su_draft, 'scheduling_unit_draft', 'create')
    
                # Test task_draft create
                logger.info('\n\n\nCreate task_draft\n\n')
                task_draft = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.TaskDraft(scheduling_unit_draft_url=su_draft['url']), '/task_draft/')
                test_object(task_draft, 'task_draft', 'create')
    
                # Test scheduling_unit_blueprint create
                logger.info('\n\n\nCreate su_blueprint\n\n')
                su_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.SchedulingUnitBlueprint(scheduling_unit_draft_url=su_draft['url']),
                    '/scheduling_unit_blueprint/')
                test_object(su_blueprint, 'scheduling_unit_blueprint', 'create')
    
                # Test task_blueprint create
                logger.info('\n\n\nCreate task_blueprint\n\n')
                task_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.TaskBlueprint(scheduling_unit_blueprint_url=su_blueprint['url'],
                                                         draft_url=task_draft['url']), '/task_blueprint/')
                test_object(task_blueprint, 'task_blueprint', 'create')
    
                # Test subtask create
                logger.info('\n\n\nCreate subtask\n\n')
                subtask = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.Subtask(task_blueprint_url=task_blueprint['url']), '/subtask/')
                test_object(subtask, 'subtask', 'create')
    
                # TODO: Add task_draft and su_draft updates
                # Test updates
                with self.tmss_test_env.create_tmss_client() as client:
                    # Test subtask update
                    logger.info('\n\n\nUpdate subtask\n\n')
                    client.set_subtask_status(subtask['id'], 'scheduled')
                    subtask = requests.get(subtask['url'], auth=self.test_data_creator.auth).json()
                    test_object(subtask, 'subtask', 'update')
                    # Test task_blueprint update
                    logger.info('\n\n\nUpdate task_blueprint\n\n')
                    task_blueprint = requests.get(task_blueprint['url'], auth=self.test_data_creator.auth).json()
                    test_object(task_blueprint, 'task_blueprint', 'update')
                    # Test scheduling_unit_blueprint update
                    logger.info('\n\n\nUpdate su_blueprint\n\n')
                    su_blueprint = requests.get(su_blueprint['url'], auth=self.test_data_creator.auth).json()
                    test_object(su_blueprint, 'scheduling_unit_blueprint', 'update')
    
                # Test deletions
                # Test substask delete
                logger.info('\n\n\nDelete subtask\n\n')
                requests.delete(subtask['url'], auth=self.test_data_creator.auth)
                test_object({'id': subtask['id']}, 'subtask', 'delete')
                # Test task_blueprint delete
                logger.info('\n\n\nDelete task_blueprint\n\n')
                requests.delete(task_blueprint['url'], auth=self.test_data_creator.auth)
                test_object({'id': task_blueprint['id']}, 'task_blueprint', 'delete')
                # Test scheduling_unit_blueprint delete
                logger.info('\n\n\nDelete su_blueprint\n\n')
                requests.delete(su_blueprint['url'], auth=self.test_data_creator.auth)
                test_object({'id': su_blueprint['id']}, 'scheduling_unit_blueprint', 'delete')
                # Test task_draft delete
                logger.info('\n\n\nDelete task_draft\n\n')
                requests.delete(task_draft['url'], auth=self.test_data_creator.auth)
                test_object({'id': task_draft['id']}, 'task_draft', 'delete')
                # Test scheduling_unit_draft delete
                logger.info('\n\n\nDelete su_draft\n\n')
                requests.delete(su_draft['url'], auth=self.test_data_creator.auth)
                test_object({'id': su_draft['id']}, 'scheduling_unit_draft', 'delete')
    
    
    if __name__ == '__main__':
        #run the unit tests
        unittest.main()