Skip to content
Snippets Groups Projects
Select Git revision
  • d414a68c98ae3272ed4c9a9c0742b9b56b3aff18
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

websocket_service.py

Blame
  • Mario Raciti's avatar
    TMSS-413: Add asyncio event loop to run websocket server as a thread
    Mario Raciti authored
    d414a68c
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    websocket_service.py 7.55 KiB
    #!/usr/bin/env python3
    
    # subtask_scheduling.py
    #
    # Copyright (C) 2015
    # ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it
    # and/or modify it under the terms of the GNU General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be
    # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    
    import os
    from optparse import OptionParser, OptionGroup
    import logging
    logger = logging.getLogger(__name__)
    
    from lofar.common import dbcredentials
    from lofar.sas.tmss.client.tmssbuslistener import *
    
    import asyncio
    import socketio
    from aiohttp import web
    from threading import Thread
    
    
    class WebsocketServer():
        '''
        Websocket Server for the TMSSEventMessageHandlerForWebsocket
        '''
    
        sio = socketio.AsyncServer(async_mode='aiohttp')
    
        def __init__(self):
            self.app = web.Application()
            self.sio.attach(self.app)
    
        def start(self, loop):    # Start a websocket server
            asyncio.set_event_loop(loop)  # FIXME: set_wakeup_fd only works in main thread.
            web.run_app(self.app, host='127.0.0.1', port=5678)
    
        def stop(self):     # TODO: Gracefully shutdown the server
            # self.app.shutdown()
            # self.app.cleanup()
            pass
    
        @staticmethod
        @sio.event
        def connect(sid, environ):
            logger.info('New client connected: %s' % sid)
    
        @staticmethod
        @sio.event
        async def message(sid, data):   # Just for debugging
            logger.info('Received: %s' % data)
            await WebsocketServer.sio.emit('broadcastNotify', {'msg': 'Broadcast notify.'})
    
        @staticmethod
        @sio.event
        def disconnect(sid):
            logger.info('Client disconnected: %s' % sid)
    
        async def broadcastNotify(self, data):
            logger.info('Sending: ', data)
            await self.sio.emit('broadcastNotify', data)
    
        async def disconnectClient(self, sid):
            await self.sio.disconnect(sid)
    
    
    class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler):
        '''
        '''
        def __init__(self) -> None:
            self._ws = WebsocketServer()
            super().__init__()
    
        def start_handling(self):
            # self._ws.start()    # TODO: Run the server in a separated thread
            # FIXME:
            # If include lines 94-95 -> ValueError: set_wakeup_fd only works in main thread.
            # If comment lines 94-95 and run _ws.start() without loop -> RuntimeError: There is no current event loop in thread 'Thread-1'.
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            self._thread = Thread(target=self._ws.start, args=(loop,), daemon=True)
            self._thread.start()
            super().start_handling()
    
        def stop_handling(self):
            super().stop_handling()
            self._ws.stop()
    
        def _post_update_on_websocket(self, json_blob):
            #TODO: post the jsob_blob on the socket
            #TODO: do we want the json_blob as argument, or parameters like: object_type (subtask, task_blueprint, etc) id, action (create/update/delete)
            #TODO: do we want to post just the id, object_type and action? Or the full object? If the latter, then fetch the object from the database, and post it as json.
            pass
    
        def onSubTaskCreated(self, id: int):
            self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'create'})
    
        def onSubTaskUpdated(self, id: int):
            self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'update'})
    
        def onSubTaskDeleted(self, id: int):
            self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'delete'})
    
        def onTaskDraftCreated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskDraftUpdated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskDraftDeleted(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskBlueprintCreated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskBlueprintUpdated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onTaskBlueprintDeleted(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitDraftCreated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitDraftUpdated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitDraftDeleted(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitBlueprintCreated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitBlueprintUpdated(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
        def onSchedulingUnitBlueprintDeleted(self, id: int):
            #TODO: call _post_update_on_websocket once the method signature is clear
            pass
    
    def create_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
        return TMSSBusListener(handler_type=TMSSEventMessageHandlerForWebsocket,
                               exchange=exchange, broker=broker)
    
    
    def main():
        # make sure we run in UTC timezone
        os.environ['TZ'] = 'UTC'
    
        logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
        # Check the invocation arguments
        parser = OptionParser('%prog [options]',
                              description='run the tmss_websocket_service which listens for TMSS event messages on the messagebus, and posts the updates on the websocket for htpp clients.')
    
        group = OptionGroup(parser, 'Messaging options')
        group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
                         help='Address of the message broker, default: %default')
        group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME,
                         help="exchange where the TMSS event messages are published. [default: %default]")
        parser.add_option_group(group)
    
        parser.add_option_group(dbcredentials.options_group(parser))
        parser.set_defaults(dbcredentials=os.environ.get('TMSS_CLIENT_DBCREDENTIALS', 'TMSS'))
        (options, args) = parser.parse_args()
    
        dbcreds = dbcredentials.parse_options(options)
        logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
    
        # setup django
        # TODO: this is only needed if we fetch objects from the database (see above). Otherwise, remove.
        os.environ["TMSS_DBCREDENTIALS"] = options.dbcredentials
        os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
        import django
        django.setup()
    
        with create_service(options.exchange, options.broker):
            waitForInterrupt()
    
    
    if __name__ == '__main__':
        main()