Skip to content
Snippets Groups Projects
Select Git revision
  • ca083c57c024bd7b9c300506026e7657aed1d27f
  • master default protected
  • releases/v0.5.20 protected
  • v0.5.x
  • releases/v0.7.2 protected
  • releases/v0.7.1 protected
  • releases/v0.5.19 protected
  • releases/v0.7.0 protected
  • releases/v0.5.17.tim_survey protected
  • compress_tim_survey_no_metadata_compression
  • juelich_0_5_18
  • releases/v0.6.0.tim_survey protected
  • compress_tim_survey
  • releases/v0.5.18 protected
  • expose_elevation_for_parset
  • releases/v0.5.17 protected
  • releases/v0.6.0 protected
  • releases/v0.5.16 protected
  • releases/v0.5.15 protected
  • nico_testing_juelich
  • nightly_build_test
  • v0.5.20
  • v0.7.2
  • v0.7.1
  • v0.5.19
  • v0.7.0
  • v0.5.17.tim_survey
  • v0.6.0.tim_survey
  • v0.5.18
  • v0.5.17
  • v0.6.0
  • v0.5.16
  • v0.5.15
  • v0.5.14
  • v0.5.13
  • v0.5.12
  • v0.5.11
  • v0.5.10
  • v0.5.9
  • v0.5.8
  • v0.5.7
41 results

extract_quality_metrics.cwl

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    t_websocket_service.py 3.64 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2012-2015  ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    import unittest
    import uuid
    
    import logging
    logger = logging.getLogger(__name__)
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
    from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
    from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
    
    from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
    from lofar.sas.tmss.services.websocket_service import create_service
    from lofar.common.test_utils import integration_test
    from time import sleep
    from datetime import datetime, timedelta
    
    import socketio
    
    
    # TODO: Socketio is no more needed
    class WebsocketClient():
        '''
        Websocket Client for the TMSSEventMessageHandlerForWebsocket
        '''
    
        sio = socketio.Client()
    
        def __init__(self):
            self.sio.connect('ws://localhost:5678')
            self.sio.wait()
    
        @staticmethod
        @sio.event
        def connect():
            logger.info('Connection established')
    
        @staticmethod
        @sio.event
        def broadcastNotify(data):
            logger.info('Received: %s' % data)
    
        @staticmethod
        @sio.event
        def disconnect():
            logger.info('Disconnected from server')
    
    
    @integration_test
    class TestSubtaskSchedulingService(unittest.TestCase):
        '''
        Tests for the SubtaskSchedulingService
        '''
        @classmethod
        def setUpClass(cls) -> None:
            cls.TEST_UUID = uuid.uuid1()
    
            cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
            cls.tmp_exchange.open()
    
            cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_test_data=False, populate_schemas=False)
            cls.tmss_test_env.start()
    
            cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url,
                                                            (cls.tmss_test_env.ldap_server.dbcreds.user, cls.tmss_test_env.ldap_server.dbcreds.password))
    
        @classmethod
        def tearDownClass(cls) -> None:
            cls.tmss_test_env.stop()
            cls.tmp_exchange.close()
    
        def test_01(self):
            '''
            This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled.
            '''
    
            logger.info(' -- test_01_for_expected_behaviour -- ')
    
            # create and start the service (the object under test)
            service = create_service(exchange=self.tmp_exchange.address)
            with BusListenerJanitor(service):
    
                websocket = WebsocketClient()
    
                # TODO: setup http websocket client which connects to our TMSS websocket
                # TODO: create/update/delete objects like SubTask, TaskBlueprint etc
                # TODO: check if the correct/expected json_blobs arrive in the websocket client
    
    if __name__ == '__main__':
        #run the unit tests
        unittest.main()