Skip to content
Snippets Groups Projects
Select Git revision
  • 8514077bc780113d16b1537fa645f6ebaca8dd6d
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

t_websocket_service.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    t_websocket_service.py 4.92 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2012-2015  ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    import unittest
    import uuid
    
    import logging
    logger = logging.getLogger('lofar.' + __name__)
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
    from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
    
    from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
    from lofar.sas.tmss.services.websocket_service import create_service
    from lofar.common.test_utils import integration_test
    from time import sleep
    from datetime import datetime, timedelta
    
    from collections import deque
    from time import sleep
    from threading import Thread, Event
    import websocket
    
    @integration_test
    class TestSubtaskSchedulingService(unittest.TestCase):
        '''
        Tests for the SubtaskSchedulingService
        '''
    
        def __init__(self, methodName: str = ...) -> None:
            self.msg_queue = deque()
            self.sync_event = Event()
            super().__init__(methodName)
    
        def start_ws_client(self):
            # Setup and start the Websocket client
    
            def on_message(ws, message):
                logger.info('### ws_client received: %s', message)
                self.msg_queue.append(message)
                self.sync_event.set()
    
            def on_error(ws, error):
                logger.info(error)
    
            def on_open(ws):
                logger.info("### open ###")
    
            def on_close(ws):
                logger.info("### closed ###")
    
            websocket.enableTrace(True)
            def thread_ws_starter():
                self.ws = websocket.WebSocketApp("ws://127.0.0.1:5678/",
                                            on_open=on_open,
                                            on_message=on_message,
                                            on_error=on_error,
                                            on_close=on_close)
                self.ws.run_forever()
    
            self.t = Thread(target=thread_ws_starter, daemon=True)
            self.t.start()
    
        @classmethod
        def setUpClass(cls) -> None:
            cls.TEST_UUID = uuid.uuid1()
    
            cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
            cls.tmp_exchange.open()
    
            cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_test_data=False, populate_schemas=False, start_websocket=False, start_postgres_listener=True, enable_viewflow=False)
            cls.tmss_test_env.start()
    
            cls.test_data_creator = cls.tmss_test_env.create_test_data_creator()
    
        @classmethod
        def tearDownClass(cls) -> None:
            cls.tmss_test_env.stop()
            cls.tmp_exchange.close()
    
        def test_01(self):
            '''
            This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled.
            '''
    
            logger.info(' -- test_01_for_expected_behaviour -- ')
    
            # create and start the service (the object under test)
            service = create_service(exchange=self.tmp_exchange.address)
            with BusListenerJanitor(service):
    
                self.start_ws_client()
                json_schedulingset = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingSet(), url_postfix='/scheduling_set')
                json_schedulingunitdraft = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingUnitDraft(scheduling_set_url=json_schedulingset['url']), url_postfix='/scheduling_unit_draft')
                if not self.sync_event.wait(timeout=5):
                    raise TimeoutError()
                self.sync_event.clear()
                recv_msg = self.msg_queue.popleft()
                logger.info('Created %s', json_schedulingunitdraft)
                logger.info('Received %s', recv_msg)
    
                json_taskdraft = self.test_data_creator.post_data_and_get_response_as_json_object(
                    self.test_data_creator.TaskDraft(scheduling_unit_draft_url=json_schedulingunitdraft['url']),
                    url_postfix='/task_draft')
    
    
                # TODO: create/update/delete objects like SubTask, TaskBlueprint etc
                # TODO: check if the correct/expected json_blobs arrive in the websocket client
    
    
    if __name__ == '__main__':
        #run the unit tests
        unittest.main()