Skip to content
Snippets Groups Projects
Select Git revision
  • 5eed9933d0953f1615f65dd4e430b96cfdd31589
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

t_websocket_service.py

Blame
  • Mario Raciti's avatar
    TMSS-413: Add python-socketio[asyncio_client] module and initial test client
    Mario Raciti authored
    5eed9933
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    t_websocket_service.py 3.87 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2012-2015  ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    import unittest
    import uuid
    
    import logging
    logger = logging.getLogger(__name__)
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
    from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
    from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
    
    from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
    from lofar.sas.tmss.services.websocket_service import create_service
    from lofar.common.test_utils import integration_test
    from time import sleep
    from datetime import datetime, timedelta
    
    import socketio
    
    
    class WebsocketClient():
        '''
        Websocket Client for the TMSSEventMessageHandlerForWebsocket
        '''
        sio = socketio.Client()
        # FIXME: Find out a solution for placing socketio inside a class
    
        def __init__(self):
            self.sio.on('connect', self.connect)
            self.sio.on('message', self.message)
            self.sio.on('disconnect', self.disconnect)
            self.sio.connect('ws://localhost:5678')
            self.sio.wait()
    
        # @sio.event
        def connect(self):
            print('>> Connection established')
    
        # @sio.event
        async def message(self, data):
            print('>> Received: ', data)
            await self.sio.emit('message', {'msg': 'Hello from client'})
    
        # @sio.event
        def disconnect(self):
            print('>> Disconnected from server')
    
    
    @integration_test
    class TestSubtaskSchedulingService(unittest.TestCase):
        '''
        Tests for the SubtaskSchedulingService
        '''
        websocket = WebsocketClient()   # TODO: find a better place
    
        @classmethod
        def setUpClass(cls) -> None:
            cls.TEST_UUID = uuid.uuid1()
    
            cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
            cls.tmp_exchange.open()
    
            cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_test_data=False, populate_schemas=False)
            cls.tmss_test_env.start()
    
            cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url,
                                                            (cls.tmss_test_env.ldap_server.dbcreds.user, cls.tmss_test_env.ldap_server.dbcreds.password))
    
        @classmethod
        def tearDownClass(cls) -> None:
            cls.tmss_test_env.stop()
            cls.tmp_exchange.close()
    
        def test_01(self):
            '''
            This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled.
            '''
    
            logger.info(' -- test_01_for_expected_behaviour -- ')
    
            # create and start the service (the object under test)
            service = create_service(exchange=self.tmp_exchange.address)
            with BusListenerJanitor(service):
                pass
    
                # TODO: setup http websocket client which connects to our TMSS websocket
                # TODO: create/update/delete objects like SubTask, TaskBlueprint etc
                # TODO: check if the correct/expected json_blobs arrive in the websocket client
    
    if __name__ == '__main__':
        #run the unit tests
        unittest.main()