Skip to content
Snippets Groups Projects
Commit 14019c73 authored by Mario Raciti's avatar Mario Raciti
Browse files

TMSS-413: Switch to simpler WS solution, PoC done

parent d414a68c
No related branches found
No related tags found
1 merge request!282Resolve TMSS-417
......@@ -21,91 +21,48 @@
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import logging
import os
from optparse import OptionParser, OptionGroup
import logging
logger = logging.getLogger(__name__)
from lofar.common import dbcredentials
from lofar.sas.tmss.client.tmssbuslistener import *
import asyncio
import socketio
from aiohttp import web
from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket
from threading import Thread
class WebsocketServer():
'''
Websocket Server for the TMSSEventMessageHandlerForWebsocket
'''
sio = socketio.AsyncServer(async_mode='aiohttp')
def __init__(self):
self.app = web.Application()
self.sio.attach(self.app)
def start(self, loop): # Start a websocket server
asyncio.set_event_loop(loop) # FIXME: set_wakeup_fd only works in main thread.
web.run_app(self.app, host='127.0.0.1', port=5678)
def stop(self): # TODO: Gracefully shutdown the server
# self.app.shutdown()
# self.app.cleanup()
pass
@staticmethod
@sio.event
def connect(sid, environ):
logger.info('New client connected: %s' % sid)
@staticmethod
@sio.event
async def message(sid, data): # Just for debugging
logger.info('Received: %s' % data)
await WebsocketServer.sio.emit('broadcastNotify', {'msg': 'Broadcast notify.'})
@staticmethod
@sio.event
def disconnect(sid):
logger.info('Client disconnected: %s' % sid)
async def broadcastNotify(self, data):
logger.info('Sending: ', data)
await self.sio.emit('broadcastNotify', data)
async def disconnectClient(self, sid):
await self.sio.disconnect(sid)
from time import sleep
class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler):
'''
'''
def __init__(self) -> None:
self._ws = WebsocketServer()
super().__init__()
def start_handling(self):
# self._ws.start() # TODO: Run the server in a separated thread
# FIXME:
# If include lines 94-95 -> ValueError: set_wakeup_fd only works in main thread.
# If comment lines 94-95 and run _ws.start() without loop -> RuntimeError: There is no current event loop in thread 'Thread-1'.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._thread = Thread(target=self._ws.start, args=(loop,), daemon=True)
self._thread.start()
# Create and run a simple ws server
def start_ws_server():
try:
self.ws_server = SimpleWebSocketServer('', 5678, WebSocket)
self.ws_server.serveforever()
except Exception as e:
logger.exception(e)
self.t = Thread(target=start_ws_server)
self.t.start()
super().start_handling()
def stop_handling(self):
super().stop_handling()
self._ws.stop()
# TODO: Stop the ws server
def _post_update_on_websocket(self, json_blob):
#TODO: post the jsob_blob on the socket
#TODO: do we want the json_blob as argument, or parameters like: object_type (subtask, task_blueprint, etc) id, action (create/update/delete)
#TODO: do we want to post just the id, object_type and action? Or the full object? If the latter, then fetch the object from the database, and post it as json.
pass
# Send the json_blob as a broadcast message to all connected ws clients
for ws in self.ws_server.connections.values():
logger.info('Sending %s to %s', json_blob, ws)
ws.sendMessage(str(json_blob))
def onSubTaskCreated(self, id: int):
self._post_update_on_websocket({'id': id, 'object': 'subtask', 'action': 'create'})
......@@ -201,6 +158,12 @@ def main():
django.setup()
with create_service(options.exchange, options.broker):
# TODO: Remove these lines as they were just for testing
# from lofar.messaging.messages import *
# from lofar.messaging.messagebus import *
# sleep(5)
# with ToBus(options.exchange) as tobus:
# tobus.send(EventMessage(subject="TMSS.Event.SubTask.Object.Created", content={'id': 42}))
waitForInterrupt()
......
......@@ -36,6 +36,7 @@ from datetime import datetime, timedelta
import socketio
# TODO: Socketio is no more needed
class WebsocketClient():
'''
Websocket Client for the TMSSEventMessageHandlerForWebsocket
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment