Skip to content
Snippets Groups Projects
Select Git revision
  • c24173a9721977de97b7736699ea6c30f27e34f4
  • master default protected
  • L2SS-2199-apply-dab-to-xy
  • L2SS-2417-more-vector-memory
  • test-pytango-10.0.3
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

Dockerfile

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    t_websocket_service.py 3.64 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2012-2015  ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    import unittest
    import uuid
    
    import logging
    logger = logging.getLogger(__name__)
    logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
    from lofar.sas.tmss.test.test_utils import TMSSTestEnvironment
    from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
    
    from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
    from lofar.sas.tmss.services.websocket_service import create_service
    from lofar.common.test_utils import integration_test
    from time import sleep
    from datetime import datetime, timedelta
    
    import socketio
    
    
    # TODO: Socketio is no more needed
    class WebsocketClient():
        '''
        Websocket Client for the TMSSEventMessageHandlerForWebsocket
        '''
    
        sio = socketio.Client()
    
        def __init__(self):
            self.sio.connect('ws://localhost:5678')
            self.sio.wait()
    
        @staticmethod
        @sio.event
        def connect():
            logger.info('Connection established')
    
        @staticmethod
        @sio.event
        def broadcastNotify(data):
            logger.info('Received: %s' % data)
    
        @staticmethod
        @sio.event
        def disconnect():
            logger.info('Disconnected from server')
    
    
    @integration_test
    class TestSubtaskSchedulingService(unittest.TestCase):
        '''
        Tests for the SubtaskSchedulingService
        '''
        @classmethod
        def setUpClass(cls) -> None:
            cls.TEST_UUID = uuid.uuid1()
    
            cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
            cls.tmp_exchange.open()
    
            cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_test_data=False, populate_schemas=False)
            cls.tmss_test_env.start()
    
            cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url,
                                                            (cls.tmss_test_env.ldap_server.dbcreds.user, cls.tmss_test_env.ldap_server.dbcreds.password))
    
        @classmethod
        def tearDownClass(cls) -> None:
            cls.tmss_test_env.stop()
            cls.tmp_exchange.close()
    
        def test_01(self):
            '''
            This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled.
            '''
    
            logger.info(' -- test_01_for_expected_behaviour -- ')
    
            # create and start the service (the object under test)
            service = create_service(exchange=self.tmp_exchange.address)
            with BusListenerJanitor(service):
    
                websocket = WebsocketClient()
    
                # TODO: setup http websocket client which connects to our TMSS websocket
                # TODO: create/update/delete objects like SubTask, TaskBlueprint etc
                # TODO: check if the correct/expected json_blobs arrive in the websocket client
    
    if __name__ == '__main__':
        #run the unit tests
        unittest.main()