diff --git a/SAS/TMSS/services/websocket/lib/websocket_service.py b/SAS/TMSS/services/websocket/lib/websocket_service.py index c2b1981ac6e4ea04da38c6f4419ef5a39a4cf5fc..fc7bc19ff1d2468e79f3c52641a8c5ca52d52b4c 100644 --- a/SAS/TMSS/services/websocket/lib/websocket_service.py +++ b/SAS/TMSS/services/websocket/lib/websocket_service.py @@ -29,44 +29,69 @@ logger = logging.getLogger(__name__) from lofar.common import dbcredentials from lofar.sas.tmss.client.tmssbuslistener import * +import asyncio import socketio +from aiohttp import web -# TODO: ATM test websockets with '$: daphne -p <port> websocket_service:TMSSEventMessageHandlerForWebsocket.app' -class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler): +class WebsocketServer(): ''' + Websocket Server for the TMSSEventMessageHandlerForWebsocket ''' - # FIXME: Find out a solution for placing socketio inside a class - sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins='*') - app = socketio.ASGIApp(sio) - def __init__(self) -> None: - self.sio.on('connect', self.connect) - self.sio.on('message', self.message) - self.sio.on('disconnect', self.disconnect) - super().__init__() + sio = socketio.AsyncServer(async_mode='aiohttp') + + def __init__(self): + self.app = web.Application() + self.sio.attach(self.app) + + def start(self): + web.run_app(self.app, host = '127.0.0.1', port = 5678) + + def stop(self): + # TODO: + # self.app.shutdown() + # self.app.cleanup() + pass + + @staticmethod + @sio.event + def connect(sid, environ): + logger.info('New client connected: %s' % sid) - # Test connection - # @sio.event - def connect(self, sid, environ): - print('>> New client connected: ', sid) + @staticmethod + @sio.event + async def message(sid, data): + logger.info('Received: %s' % data) + await WebsocketServer.sio.emit('broadcastNotify', {'msg': 'Broadcast notify.'}) - # @sio.event - async def message(self, sid, data): - print('>> Received: ', data) - await sio.emit('message', {'msg': data['msg'][::-1]}) + @staticmethod + @sio.event + def disconnect(sid): + logger.info('Client disconnected: %s' % sid) - # @sio.event - def disconnect(self, sid): - print('>> Client disconnected: ', sid) + async def broadcastNotify(self, data): + logger.info('Sending: ', data) + await self.sio.emit('broadcastNotify', data) + + async def disconnectClient(self, sid): + await self.sio.disconnect(sid) + + +class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler): + ''' + ''' + def __init__(self) -> None: + self._ws = WebsocketServer() # TODO: create websocket + super().__init__() def start_handling(self): - #TODO: "open" websocket if needed + self._ws.start() # TODO: Run the server in a separated thread super().start_handling() def stop_handling(self): - #TODO: "close" websocket if needed super().stop_handling() + self._ws.stop() def _post_update_on_websocket(self, json_blob): #TODO: post the jsob_blob on the socket diff --git a/SAS/TMSS/services/websocket/test/t_websocket_service.py b/SAS/TMSS/services/websocket/test/t_websocket_service.py index 83f0601064f96d29f93f65c4fa018b21093f7c11..233b081ad57b948813e4df4def0d8d56850154ce 100755 --- a/SAS/TMSS/services/websocket/test/t_websocket_service.py +++ b/SAS/TMSS/services/websocket/test/t_websocket_service.py @@ -40,28 +40,27 @@ class WebsocketClient(): ''' Websocket Client for the TMSSEventMessageHandlerForWebsocket ''' + sio = socketio.Client() - # FIXME: Find out a solution for placing socketio inside a class def __init__(self): - self.sio.on('connect', self.connect) - self.sio.on('message', self.message) - self.sio.on('disconnect', self.disconnect) self.sio.connect('ws://localhost:5678') self.sio.wait() - # @sio.event - def connect(self): - print('>> Connection established') + @staticmethod + @sio.event + def connect(): + logger.info('Connection established') - # @sio.event - async def message(self, data): - print('>> Received: ', data) - await self.sio.emit('message', {'msg': 'Hello from client'}) + @staticmethod + @sio.event + def broadcastNotify(data): + logger.info('Received: %s' % data) - # @sio.event - def disconnect(self): - print('>> Disconnected from server') + @staticmethod + @sio.event + def disconnect(): + logger.info('Disconnected from server') @integration_test @@ -69,8 +68,6 @@ class TestSubtaskSchedulingService(unittest.TestCase): ''' Tests for the SubtaskSchedulingService ''' - websocket = WebsocketClient() # TODO: find a better place - @classmethod def setUpClass(cls) -> None: cls.TEST_UUID = uuid.uuid1() @@ -99,7 +96,8 @@ class TestSubtaskSchedulingService(unittest.TestCase): # create and start the service (the object under test) service = create_service(exchange=self.tmp_exchange.address) with BusListenerJanitor(service): - pass + + websocket = WebsocketClient() # TODO: setup http websocket client which connects to our TMSS websocket # TODO: create/update/delete objects like SubTask, TaskBlueprint etc