Skip to content
Snippets Groups Projects
Select Git revision
  • c085b31f701f97d47e779b4e0f9fe15e444bb532
  • MCCS-163 default
  • main
  • sar-277-update-docs-with-examples-for-lrc
  • st-946-automate
  • sar_302-log-fix
  • sar-287_subarray_commands_to_lrc
  • sar_302-POC_await_sub_device_state
  • sat_302_fix_pipelines
  • sar-286_lrc_one_subarry_command
  • sar-286_lrc_improvements
  • sar-288-async-controller
  • sar-276-combine-tango-queue
  • sar-255_remove_nexus_reference
  • sar-275-add-LRC
  • sar-273-add-lrc-attributes
  • sar-272
  • sp-1106-marvin-1230525148-ska-tango-base
  • sp-1106-marvin-813091765-ska-tango-base
  • sar-255/Publish-package-to-CAR
  • mccs-661-device-under-test-fixture
  • mccs-659-pep257-docstring-linting
  • 0.11.3
  • 0.11.2
  • 0.11.1
  • 0.11.0
  • 0.10.1
  • 0.10.0
  • 0.9.1
  • 0.9.0
  • 0.8.1
  • 0.8.0
  • 0.7.2
  • 0.7.1
  • 0.7.0
  • 0.6.6
  • 0.6.5
  • 0.6.4
  • 0.6.3
  • 0.6.2
  • 0.6.1
  • 0.6.0
42 results

index.rst

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    websocket_service.py 7.55 KiB
    #!/usr/bin/env python3
    
    # subtask_scheduling.py
    #
    # Copyright (C) 2015
    # ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it
    # and/or modify it under the terms of the GNU General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be
    # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    
    import os
    from optparse import OptionParser, OptionGroup
    import logging
    logger = logging.getLogger(__name__)
    
    from lofar.common import dbcredentials
    from lofar.sas.tmss.client.tmssbuslistener import *
    
    import asyncio
    import socketio
    from aiohttp import web
    from threading import Thread
    
    
    class WebsocketServer():
        '''
        Websocket Server for the TMSSEventMessageHandlerForWebsocket
        '''
    
        sio = socketio.AsyncServer(async_mode='aiohttp')
    
        def __init__(self):
            self.app = web.Application()
            self.sio.attach(self.app)
    
        def start(self, loop):    # Start a websocket server
            asyncio.set_event_loop(loop)  # FIXME: set_wakeup_fd only works in main thread.
            web.run_app(self.app, host='127.0.0.1', port=5678)
    
        def stop(self):     # TODO: Gracefully shutdown the server
            # self.app.shutdown()
            # self.app.cleanup()
            pass
    
        @staticmethod
        @sio.event
        def connect(sid, environ):
            logger.info('New client connected: %s' % sid)
    
        @staticmethod
        @sio.event
        async def message(sid, data):   # Just for debugging
            logger.info('Received: %s' % data)
            await WebsocketServer.sio.emit('broadcastNotify', {'msg': 'Broadcast notify.'})
    
        @staticmethod
        @sio.event
        def disconnect(sid):
            logger.info('Client disconnected: %s' % sid)
    
        async def broadcastNotify(self, data):
            logger.info('Sending: ', data)
            await self.sio.emit('broadcastNotify', data)
    
        async def disconnectClient(self, sid):
            await self.sio.disconnect(sid)
    
    
    class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler):
        '''
        '''
        def __init__(self) -> None:
            self._ws = WebsocketServer()
            super().__init__()
    
        def start_handling(self):
            # self._ws.start()    # TODO: Run the server in a separated thread
            # FIXME:
            # If include lines 94-95 -> ValueError: set_wakeup_fd only works in main thread.
            # If comment lines 94-95 and run _ws.start() without loop -> RuntimeError: There is no current event loop in thread 'Thread-1'.
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            self._thread = Thread(target=self._ws.start, args=(loop,), daemon=True)
            self._thread.start()
            super().start_handling()
    
        def stop_handling(self):
            super().stop_handling()
            self._ws.stop()
    
        def _post_update_on_websocket(self, json_blob):
            #TODO: post the jsob_blob on the socket
            #TODO: do we want the json_blob as argument, or parameters like: object_type (subtask, task_blueprint, etc) id, action (create/update/delete)
            #TODO: do we want to post just the id, object_type and action? Or the full object? If the latter, then fetch the object from the database, and post it as json.
            pass
    
        def onSubTaskCreated(self, id: int):
            self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'create'})
    
        def onSubTaskUpdated(self, id: int):
            self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'update'})
    
        def onSubTaskDeleted(self, id: int):
            self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'delete'})
    
        def onTaskDraftCreated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskDraftUpdated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskDraftDeleted(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskBlueprintCreated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskBlueprintUpdated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskBlueprintDeleted(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitDraftCreated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitDraftUpdated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitDraftDeleted(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitBlueprintCreated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitBlueprintUpdated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitBlueprintDeleted(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
    def create_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
        return TMSSBusListener(handler_type=TMSSEventMessageHandlerForWebsocket,
                               exchange=exchange, broker=broker)
    
    
    def main():
        # make sure we run in UTC timezone
        os.environ['TZ'] = 'UTC'
    
        logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
        # Check the invocation arguments
        parser = OptionParser('%prog [options]',
                              description='run the tmss_websocket_service which listens for TMSS event messages on the messagebus, and posts the updates on the websocket for htpp clients.')
    
        group = OptionGroup(parser, 'Messaging options')
        group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
                         help='Address of the message broker, default: %default')
        group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME,
                         help="exchange where the TMSS event messages are published. [default: %default]")
        parser.add_option_group(group)
    
        parser.add_option_group(dbcredentials.options_group(parser))
        parser.set_defaults(dbcredentials=os.environ.get('TMSS_CLIENT_DBCREDENTIALS', 'TMSS'))
        (options, args) = parser.parse_args()
    
        dbcreds = dbcredentials.parse_options(options)
        logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
    
        # setup django
        # TODO: this is only needed if we fetch objects from the database (see above). Otherwise, remove.
        os.environ["TMSS_DBCREDENTIALS"] = options.dbcredentials
        os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
        import django
        django.setup()
    
        with create_service(options.exchange, options.broker):
            waitForInterrupt()
    
    
    if __name__ == '__main__':
        main()