diff --git a/SAS/TMSS/backend/services/websocket/CMakeLists.txt b/SAS/TMSS/backend/services/websocket/CMakeLists.txt index 7d5ea3a2e9bfb03f75528c83db74cdcb92025ce2..ba899270ef576cc4bff54cdfe1c3ffd4dc69b525 100644 --- a/SAS/TMSS/backend/services/websocket/CMakeLists.txt +++ b/SAS/TMSS/backend/services/websocket/CMakeLists.txt @@ -1,4 +1,4 @@ -lofar_package(TMSSWebSocketService 0.1 DEPENDS TMSSClient PyCommon pyparameterset PyMessaging) +lofar_package(TMSSWebSocketService 0.1 DEPENDS TMSSClient PyCommon pyparameterset PyMessaging) # also depends on TMSSBackend, but that dependency is added implicitely because this is a child package lofar_find_package(PythonInterp 3.6 REQUIRED) diff --git a/SAS/TMSS/backend/services/websocket/lib/websocket_service.py b/SAS/TMSS/backend/services/websocket/lib/websocket_service.py index e87029d4d684fc20f4f9f7d1e6f19c0a94f8ce59..64aa14b82dca4a2c5592e06f8191d2edaa08b6f2 100644 --- a/SAS/TMSS/backend/services/websocket/lib/websocket_service.py +++ b/SAS/TMSS/backend/services/websocket/lib/websocket_service.py @@ -29,13 +29,13 @@ logger = logging.getLogger(__name__) from lofar.common import dbcredentials from lofar.sas.tmss.client.tmssbuslistener import * -from lofar.sas.tmss.client.tmss_http_rest_client import TMSSsession from lofar.common.util import find_free_port from enum import Enum from json import dumps as JSONdumps from SimpleWebSocketServer import SimpleWebSocketServer, WebSocket from threading import Thread, Event +from django.apps import apps DEFAULT_WEBSOCKET_PORT = 5678 @@ -60,14 +60,12 @@ class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler): TASK_BLUEPRINT = 'task_blueprint' TASK_DRAFT = 'task_draft' - def __init__(self, websocket_port: int=DEFAULT_WEBSOCKET_PORT, rest_client_creds_id: str="TMSSClient"): + def __init__(self, websocket_port: int=DEFAULT_WEBSOCKET_PORT): super().__init__(log_event_messages=True) self.websocket_port = websocket_port - self._tmss_client = TMSSsession.create_from_dbcreds_for_ldap(rest_client_creds_id) self._run_ws = True def start_handling(self): - self._tmss_client.open() # Open tmss_client session socket_started_event = Event() # Create and run a simple ws server @@ -87,7 +85,6 @@ class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler): def stop_handling(self): super().stop_handling() - self._tmss_client.close() # Close tmss_client session self._run_ws = False # Stop the ws server self.t.join() @@ -98,10 +95,23 @@ class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler): def _post_update_on_websocket(self, id, object_type, action): # Prepare the json_blob_template - json_blob = {'id': id, 'object_type': object_type.value, 'action': action.value} + json_blob = {'object_details': {'id': id}, 'object_type': object_type.value, 'action': action.value} if action == self.ObjActions.CREATE or action == self.ObjActions.UPDATE: - # Fetch the object from DB using Django model API and add it to json_blob - json_blob['object'] = self._tmss_client.get_path_as_json_object('/%s/%s' % (object_type.value, id)) + try: + model_class = apps.get_model("tmssapp", object_type.value.replace('_','')) + model_instance = model_class.objects.get(id=id) + if hasattr(model_instance, 'start_time') and model_instance.start_time is not None: + json_blob['object_details']['start_time'] = model_instance.start_time.isoformat() + if hasattr(model_instance, 'stop_time') and model_instance.stop_time is not None: + json_blob['object_details']['stop_time'] = model_instance.stop_time.isoformat() + if hasattr(model_instance, 'duration') and model_instance.duration is not None: + json_blob['object_details']['duration'] = model_instance.duration.total_seconds() + if hasattr(model_instance, 'status'): + json_blob['object_details']['status'] = model_instance.status + if hasattr(model_instance, 'state'): + json_blob['object_details']['state'] = model_instance.state.value + except Exception as e: + logger.error("Cannot get object details for %s: %s", json_blob, e) # Send the json_blob as a broadcast message to all connected ws clients self._broadcast_notify_websocket(json_blob) @@ -151,10 +161,9 @@ class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler): def onSchedulingUnitBlueprintDeleted(self, id: int): self._post_update_on_websocket(id, self.ObjTypes.SCHED_UNIT_BLUEPRINT, self.ObjActions.DELETE) -def create_service(websocket_port: int=DEFAULT_WEBSOCKET_PORT, exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER, rest_client_creds_id: str="TMSSClient"): +def create_service(websocket_port: int=DEFAULT_WEBSOCKET_PORT, exchange: str=DEFAULT_BUSNAME, broker: str=DEFAULT_BROKER): return TMSSBusListener(handler_type=TMSSEventMessageHandlerForWebsocket, - handler_kwargs={'websocket_port': websocket_port, - 'rest_client_creds_id': rest_client_creds_id}, + handler_kwargs={'websocket_port': websocket_port}, exchange=exchange, broker=broker) @@ -180,13 +189,14 @@ def main(): group = OptionGroup(parser, 'Django options') parser.add_option_group(group) - group.add_option('-R', '--rest_credentials', dest='rest_credentials', type='string', default='TMSSClient', help='django REST API credentials name, default: %default') + group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default') (options, args) = parser.parse_args() - TMSSsession.check_connection_and_exit_on_error(options.rest_credentials) + from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error + setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials) - with create_service(options.websocket_port, options.exchange, options.broker, rest_client_creds_id=options.rest_credentials): + with create_service(options.websocket_port, options.exchange, options.broker): waitForInterrupt() if __name__ == '__main__': diff --git a/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py b/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py index 4454c6eec265334f334dfef331eb06d7293e971c..f3f8388cb9b361665964ba3660f926b2653bbfc0 100755 --- a/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py +++ b/SAS/TMSS/backend/services/websocket/test/t_websocket_service.py @@ -107,7 +107,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): websocket_port = find_free_port(DEFAULT_WEBSOCKET_PORT) # create and start the service (the object under test) - service = create_service(websocket_port=websocket_port, exchange=self.tmp_exchange.address, rest_client_creds_id=self.tmss_test_env.client_credentials.dbcreds_id) + service = create_service(websocket_port=websocket_port, exchange=self.tmp_exchange.address) with BusListenerJanitor(service): self.start_ws_client(websocket_port) # Start ws client @@ -118,9 +118,13 @@ class TestSubtaskSchedulingService(unittest.TestCase): raise TimeoutError() self.sync_event.clear() # Assert json_blobs - json_blob = {'id': json_test['id'], 'object_type': obj_type.value, 'action': action.value} + json_blob = {'object_details': {'id': json_test['id']}, 'object_type': obj_type.value, 'action': action.value} if action == self.ObjActions.CREATE or action == self.ObjActions.UPDATE: - json_blob['object'] = json_test + for key in ('start_time', 'stop_time', 'duration', 'status'): + if json_test.get(key) is not None: + json_blob['object_details'][key] = json_test[key] + if json_test.get('state_value') is not None: + json_blob['object_details']['state'] = json_test['state_value'] self.assertEqual(json_blob, self.msg_queue.popleft()) # Test creations diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/common.py b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/common.py index 4758646f1d9a4c619bfe5dd87a2c0a06fc31bf3f..c12e879675249229317935fbcd6b883bd18239b0 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/common.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/common.py @@ -8,6 +8,13 @@ from django.core.exceptions import ImproperlyConfigured from .widgets import JSONEditorField from rest_flex_fields.serializers import FlexFieldsSerializerMixin +class FloatDurationField(serializers.FloatField): + + # Turn datetime to float representation in seconds. + # (Timedeltas are otherwise by default turned into a string representation) + def to_representation(self, value): + return value.total_seconds() + class RelationalHyperlinkedModelSerializer(serializers.HyperlinkedModelSerializer): _accepted_pk_names = ('id', 'name') diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/scheduling.py b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/scheduling.py index 717833448d6a408247f2006d04ab067ea9d8cf4b..7c8bd8c29ee090cf6af7f48d6431e03418830c61 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/scheduling.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/scheduling.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) from rest_framework import serializers from .. import models from .widgets import JSONEditorField -from .common import RelationalHyperlinkedModelSerializer, AbstractTemplateSerializer, DynamicRelationalHyperlinkedModelSerializer +from .common import FloatDurationField, RelationalHyperlinkedModelSerializer, AbstractTemplateSerializer, DynamicRelationalHyperlinkedModelSerializer class SubtaskStateSerializer(DynamicRelationalHyperlinkedModelSerializer): class Meta: @@ -75,11 +75,12 @@ class SubtaskSerializer(DynamicRelationalHyperlinkedModelSerializer): # If this is OK then we can extend API with NO url ('flat' values) on more places if required cluster_value = serializers.StringRelatedField(source='cluster', label='cluster_value', read_only=True) specifications_doc = JSONEditorField(schema_source='specifications_template.schema') + duration = FloatDurationField(read_only=True) class Meta: model = models.Subtask fields = '__all__' - extra_fields = ['cluster_value'] + extra_fields = ['cluster_value', 'duration'] class SubtaskInputSerializer(DynamicRelationalHyperlinkedModelSerializer): diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py index d281087554ecbd86d4dd6c60753d9c977fab084e..7ac1a29773ff0b569b11ddd7db01ca73eb160bc8 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/serializers/specification.py @@ -5,17 +5,10 @@ This file contains the serializers (for the elsewhere defined data models) from rest_framework import serializers from .. import models from .scheduling import SubtaskSerializer -from .common import RelationalHyperlinkedModelSerializer, AbstractTemplateSerializer, DynamicRelationalHyperlinkedModelSerializer +from .common import FloatDurationField, RelationalHyperlinkedModelSerializer, AbstractTemplateSerializer, DynamicRelationalHyperlinkedModelSerializer from .widgets import JSONEditorField from django.contrib.auth.models import User -class FloatDurationField(serializers.FloatField): - - # Turn datetime to float representation in seconds. - # (Timedeltas are otherwise by default turned into a string representation) - def to_representation(self, value): - return value.total_seconds() - # This is required for keeping a user reference as ForeignKey in other models # (I think so that the HyperlinkedModelSerializer can generate a URI) class UserSerializer(serializers.Serializer): diff --git a/SAS/TMSS/backend/test/test_utils.py b/SAS/TMSS/backend/test/test_utils.py index 0a100a41c23f8b74c884dac85d007dd12978c09c..c7d1aaa6823ff1c03e4488724965ec539faccfe3 100644 --- a/SAS/TMSS/backend/test/test_utils.py +++ b/SAS/TMSS/backend/test/test_utils.py @@ -396,7 +396,7 @@ class TMSSTestEnvironment: # this implies that _start_pg_listener should be true as well self._start_pg_listener = True from lofar.sas.tmss.services.websocket_service import create_service - self.websocket_service = create_service(exchange=self._exchange, broker=self._broker, rest_client_creds_id=self.client_credentials.dbcreds_id) + self.websocket_service = create_service(exchange=self._exchange, broker=self._broker) service_threads.append(threading.Thread(target=self.websocket_service.start_listening)) service_threads[-1].start()