Select Git revision
scintillation_utils.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
websocket_service.py 7.55 KiB
#!/usr/bin/env python3
# subtask_scheduling.py
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import os
from optparse import OptionParser, OptionGroup
import logging
logger = logging.getLogger(__name__)
from lofar.common import dbcredentials
from lofar.sas.tmss.client.tmssbuslistener import *
import asyncio
import socketio
from aiohttp import web
from threading import Thread
class WebsocketServer():
'''
Websocket Server for the TMSSEventMessageHandlerForWebsocket
'''
sio = socketio.AsyncServer(async_mode='aiohttp')
def __init__(self):
self.app = web.Application()
self.sio.attach(self.app)
def start(self, loop): # Start a websocket server
asyncio.set_event_loop(loop) # FIXME: set_wakeup_fd only works in main thread.
web.run_app(self.app, host='127.0.0.1', port=5678)
def stop(self): # TODO: Gracefully shutdown the server
# self.app.shutdown()
# self.app.cleanup()
pass
@staticmethod
@sio.event
def connect(sid, environ):
logger.info('New client connected: %s' % sid)
@staticmethod
@sio.event
async def message(sid, data): # Just for debugging
logger.info('Received: %s' % data)
await WebsocketServer.sio.emit('broadcastNotify', {'msg': 'Broadcast notify.'})
@staticmethod
@sio.event
def disconnect(sid):
logger.info('Client disconnected: %s' % sid)
async def broadcastNotify(self, data):
logger.info('Sending: ', data)
await self.sio.emit('broadcastNotify', data)
async def disconnectClient(self, sid):
await self.sio.disconnect(sid)
class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler):
'''
'''
def __init__(self) -> None:
self._ws = WebsocketServer()
super().__init__()
def start_handling(self):
# self._ws.start() # TODO: Run the server in a separated thread
# FIXME:
# If include lines 94-95 -> ValueError: set_wakeup_fd only works in main thread.
# If comment lines 94-95 and run _ws.start() without loop -> RuntimeError: There is no current event loop in thread 'Thread-1'.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._thread = Thread(target=self._ws.start, args=(loop,), daemon=True)
self._thread.start()
super().start_handling()
def stop_handling(self):
super().stop_handling()
self._ws.stop()
def _post_update_on_websocket(self, json_blob):
#TODO: post the jsob_blob on the socket
#TODO: do we want the json_blob as argument, or parameters like: object_type (subtask, task_blueprint, etc) id, action (create/update/delete)
#TODO: do we want to post just the id, object_type and action? Or the full object? If the latter, then fetch the object from the database, and post it as json.
pass
def onSubTaskCreated(self, id: int):
self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'create'})
def onSubTaskUpdated(self, id: int):
self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'update'})
def onSubTaskDeleted(self, id: int):
self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'delete'})
def onTaskDraftCreated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskDraftUpdated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskDraftDeleted(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskBlueprintCreated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskBlueprintUpdated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onTaskBlueprintDeleted(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitDraftCreated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitDraftUpdated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitDraftDeleted(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitBlueprintCreated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitBlueprintUpdated(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def onSchedulingUnitBlueprintDeleted(self, id: int):
#TODO: call _post_update_on_websocket once the method signature is clear
pass
def create_service(exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER):
return TMSSBusListener(handler_type=TMSSEventMessageHandlerForWebsocket,
exchange=exchange, broker=broker)
def main():
# make sure we run in UTC timezone
os.environ['TZ'] = 'UTC'
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='run the tmss_websocket_service which listens for TMSS event messages on the messagebus, and posts the updates on the websocket for htpp clients.')
group = OptionGroup(parser, 'Messaging options')
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the message broker, default: %default')
group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME,
help="exchange where the TMSS event messages are published. [default: %default]")
parser.add_option_group(group)
parser.add_option_group(dbcredentials.options_group(parser))
parser.set_defaults(dbcredentials=os.environ.get('TMSS_CLIENT_DBCREDENTIALS', 'TMSS'))
(options, args) = parser.parse_args()
dbcreds = dbcredentials.parse_options(options)
logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
# setup django
# TODO: this is only needed if we fetch objects from the database (see above). Otherwise, remove.
os.environ["TMSS_DBCREDENTIALS"] = options.dbcredentials
os.environ["DJANGO_SETTINGS_MODULE"] = "lofar.sas.tmss.tmss.settings"
import django
django.setup()
with create_service(options.exchange, options.broker):
waitForInterrupt()
if __name__ == '__main__':
main()