From 8aa98b2d87cea93fe8bfce4aadb1214167718666 Mon Sep 17 00:00:00 2001 From: Mattia Mancini <mancini@astron.nl> Date: Fri, 16 Nov 2018 10:47:22 +0000 Subject: [PATCH] OSB-34: Setup of the preliminary implementation of the celery infrastructure, integration with the already existring django infrastructure and modification of the models to include and serve the RTSM plots --- .gitattributes | 2 + .../DBInterface/django_postgresql/__init__.py | 3 + .../django_postgresql/celery_settings.py | 22 ++ .../DBInterface/django_postgresql/settings.py | 5 +- .../DBInterface/monitoringdb/models/rtsm.py | 10 +- .../monitoringdb/serializers/rtsm.py | 22 ++ .../DBInterface/monitoringdb/tasks.py | 263 ++++++++++++++++++ .../DBInterface/monitoringdb/urls.py | 2 + .../DBInterface/monitoringdb/views/common.py | 2 + .../monitoringdb/views/controllers.py | 26 +- .../monitoringdb/views/rtsm_views.py | 22 +- 11 files changed, 374 insertions(+), 5 deletions(-) create mode 100644 LCU/Maintenance/DBInterface/django_postgresql/celery_settings.py create mode 100644 LCU/Maintenance/DBInterface/monitoringdb/tasks.py diff --git a/.gitattributes b/.gitattributes index eda3eb06521..f8dc862274b 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1790,6 +1790,7 @@ LCU/Maintenance/CMakeLists.txt -text LCU/Maintenance/DBInterface/CMakeLists.txt -text LCU/Maintenance/DBInterface/__init__.py -text LCU/Maintenance/DBInterface/django_postgresql/__init__.py -text +LCU/Maintenance/DBInterface/django_postgresql/celery_settings.py -text LCU/Maintenance/DBInterface/django_postgresql/create_db.sql -text LCU/Maintenance/DBInterface/django_postgresql/settings.py -text LCU/Maintenance/DBInterface/django_postgresql/urls.py -text @@ -1823,6 +1824,7 @@ LCU/Maintenance/DBInterface/monitoringdb/serializers/station.py -text LCU/Maintenance/DBInterface/monitoringdb/serializers/station_tests.py -text LCU/Maintenance/DBInterface/monitoringdb/serializers/utils.py -text LCU/Maintenance/DBInterface/monitoringdb/station_test_raw_parser.py -text +LCU/Maintenance/DBInterface/monitoringdb/tasks.py -text LCU/Maintenance/DBInterface/monitoringdb/tests/__init__.py -text LCU/Maintenance/DBInterface/monitoringdb/tests/common.py -text LCU/Maintenance/DBInterface/monitoringdb/tests/old_tests.py -text diff --git a/LCU/Maintenance/DBInterface/django_postgresql/__init__.py b/LCU/Maintenance/DBInterface/django_postgresql/__init__.py index e69de29bb2d..782bcb57226 100644 --- a/LCU/Maintenance/DBInterface/django_postgresql/__init__.py +++ b/LCU/Maintenance/DBInterface/django_postgresql/__init__.py @@ -0,0 +1,3 @@ +from .celery_settings import backend_tasks + +__all__ = ('backend_tasks') \ No newline at end of file diff --git a/LCU/Maintenance/DBInterface/django_postgresql/celery_settings.py b/LCU/Maintenance/DBInterface/django_postgresql/celery_settings.py new file mode 100644 index 00000000000..14a925101ee --- /dev/null +++ b/LCU/Maintenance/DBInterface/django_postgresql/celery_settings.py @@ -0,0 +1,22 @@ +from __future__ import absolute_import, unicode_literals +import os +from celery import Celery + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lofar.maintenance.django_postgresql.settings') + +backend_tasks = Celery('maintenancedb') + +# Using a string here means the worker doesn't have to serialize +# the configuration object to child processes. +# - namespace='CELERY' means all celery-related configuration keys +# should have a `CELERY_` prefix. +backend_tasks.config_from_object('django.conf:settings', namespace='CELERY') + +# Load task modules from all registered Django app configs. +backend_tasks.autodiscover_tasks() + + +@backend_tasks.task(bind=True) +def debug_task(self): + print('Request: {0!r}'.format(self.request)) \ No newline at end of file diff --git a/LCU/Maintenance/DBInterface/django_postgresql/settings.py b/LCU/Maintenance/DBInterface/django_postgresql/settings.py index 87ca345a17c..01e15ed33e9 100644 --- a/LCU/Maintenance/DBInterface/django_postgresql/settings.py +++ b/LCU/Maintenance/DBInterface/django_postgresql/settings.py @@ -177,6 +177,7 @@ REST_FRAMEWORK = { 'DEFAULT_FILTER_BACKENDS': ('django_filters.rest_framework.DjangoFilterBackend',) } - +CELERY_RESULT_BACKEND = 'amqp://guest@localhost//' # LOFAR SPECIFIC PARAMETERS -URL_TO_RTSM_PLOTS = 'https://proxy.lofar.eu/rtsm/obs_plots/' \ No newline at end of file +URL_TO_RTSM_PLOTS = 'https://proxy.lofar.eu/rtsm/obs_plots/' +URL_TO_STORE_RTSM_PLOTS = '/home/mmancini/svn-tree/MonitoringMaintenance-OSB-32/build/gnucxx11_debug/plots' \ No newline at end of file diff --git a/LCU/Maintenance/DBInterface/monitoringdb/models/rtsm.py b/LCU/Maintenance/DBInterface/monitoringdb/models/rtsm.py index b82a3132437..f684b81c2d4 100644 --- a/LCU/Maintenance/DBInterface/monitoringdb/models/rtsm.py +++ b/LCU/Maintenance/DBInterface/monitoringdb/models/rtsm.py @@ -17,7 +17,6 @@ MODE_TO_COMPONENT = { 7: 'HBA' } - class RTSMObservation(models.Model): observation_id = models.PositiveIntegerField(default=0) @@ -66,6 +65,15 @@ class RTSMErrorSummary(models.Model): stop_frequency = models.FloatField(default=None, null=True) +class RTSMSummaryPlot(models.Model): + error_summary = models.ForeignKey(RTSMErrorSummary, + related_name='summary_plot', + on_delete=models.SET_NULL, + null=True) + + uri = models.CharField(max_length=10000, default=None, null=True) + + class RTSMSpectrum(models.Model): bad_spectrum = ArrayField(models.FloatField()) average_spectrum = ArrayField(models.FloatField()) diff --git a/LCU/Maintenance/DBInterface/monitoringdb/serializers/rtsm.py b/LCU/Maintenance/DBInterface/monitoringdb/serializers/rtsm.py index 8cddf2b51b5..38640fdd33e 100644 --- a/LCU/Maintenance/DBInterface/monitoringdb/serializers/rtsm.py +++ b/LCU/Maintenance/DBInterface/monitoringdb/serializers/rtsm.py @@ -2,9 +2,11 @@ from ..models.rtsm import * from rest_framework import serializers import logging from django.db.transaction import atomic +from django.conf import settings from ..exceptions import ItemAlreadyExists from .log import ActionLogSerializer from .station import Station +import os logger = logging.getLogger('serializers') @@ -100,3 +102,23 @@ class RTSMObservationSerializer(serializers.ModelSerializer): self.compute_summary(RTSMObservation_instance) return RTSMObservation_instance + + +class FilePathSerializer(serializers.Field): + def __init__(self, path, *args, **kwargs): + self.path = path + super(FilePathSerializer, self).__init__(*args, **kwargs) + + def to_representation(self, value): + full_path = os.path.join(self.path, value) + return full_path + + def to_internal_value(self, data): + file_name = os.path.relpath(self.path, data) + return file_name + +class RTSMSummaryPlotSerializer(serializers.ModelSerializer): + uri = FilePathSerializer(settings.URL_TO_STORE_RTSM_PLOTS) + class Meta: + model = RTSMSummaryPlot + fields = '__all__' diff --git a/LCU/Maintenance/DBInterface/monitoringdb/tasks.py b/LCU/Maintenance/DBInterface/monitoringdb/tasks.py new file mode 100644 index 00000000000..7a2a0e4ac0c --- /dev/null +++ b/LCU/Maintenance/DBInterface/monitoringdb/tasks.py @@ -0,0 +1,263 @@ +import logging +import os + +import matplotlib.pyplot as plt +import numpy +from celery import shared_task +from django.conf import settings + +from .models.rtsm import RTSMObservation, MODE_TO_COMPONENT, RTSMErrorSummary, RTSMSummaryPlot + +logger = logging.getLogger(__name__) + + +@shared_task +def greetings(name): + logger.info('greetings from celerity mr/ms %s', name) + + +class Metadata: + def __init__(self, **kwargs): + for key, value in kwargs.items(): + self.__setattr__(key, value) + + +class ObservationMetadata(Metadata): + station_name = 'CSXXXC' + + start_datetime = '1212-12-12 12:12:12' + end_datetime = '1212-12-12 12:12:12' + observation_id = 'XXXXXX' + samples = 120 + + +class ErrorMetadata(Metadata): + error_type = '' + rcu_id = '' + mode = 0 + count = '' + start_frequency = 0 + stop_frequency = 0 + + +def set_style(): + plt.minorticks_on() + plt.ylim(55, 160) + plt.grid() + # plt.rcParams['svg.fonttype'] = 'none' + plt.rcParams['lines.linewidth'] = 1 + plt.rcParams['figure.autolayout'] = True + plt.rcParams['axes.labelsize'] = 'large' + plt.rcParams['path.simplify'] = True + plt.rcParams['savefig.transparent'] = False + plt.rcParams['savefig.bbox'] = 'tight' + + plt.rcParams['legend.loc'] = 'upper right' + plt.rcParams['legend.frameon'] = True + plt.rcParams['legend.framealpha'] = 1 + plt.rcParams['legend.edgecolor'] = 'black' + + +def rcu_mode_to_antenna(rcu, mode): + component_name = MODE_TO_COMPONENT[mode] + + antenna = rcu // 2 + if component_name in ['LBH', 'HBA']: + polarization = 'X' if rcu % 2 == 0 else 'Y' + elif component_name == 'LBL': + antenna = rcu // 2 + 48 + polarization = 'Y' if rcu % 2 == 0 else 'X' + else: + raise ValueError('Unknown component %s' % component_name) + + return dict(polarization=polarization, antenna=antenna) + + +def render_summary_text(error_metadata, observation_metadata): + """ + Render the summary text to include in the picture + :param error_metadata: the metadata information regarding the error + :param observation_metadata: the metadata information regarding the observation + :return: + """ + rendered_text = 'Station Name: {:<25s}'.format(observation_metadata.station_name) + rendered_text += 'Faults : {}\n'.format(error_metadata.error_type) + + rendered_text += 'ObsID : {:<25d}'.format(observation_metadata.observation_id) + rendered_text += 'Antenna : {antenna} {polarization}\n' \ + .format(**rcu_mode_to_antenna(error_metadata.rcu, error_metadata.mode)) + + rendered_text += 'Start : {:<25s}'.format(observation_metadata.start_datetime) + rendered_text += 'RCU : {}\n'.format(error_metadata.rcu) + + rendered_text += 'Stop : {:<25s}'.format(observation_metadata.end_datetime) + rendered_text += 'Samples : {count:d}/{samples:d}\n'.format(count=error_metadata.count, + samples=observation_metadata.samples) + + rendered_text += 'Mode : {:<25d}'.format(error_metadata.mode) + rendered_text += 'Badness : {:.2f} %\n'.format( + (100. * error_metadata.count) / observation_metadata.samples) + return rendered_text + + +def produce_plot(observation_metadata, + error_metadata, + list_bad_spectra, list_good_specta, path): + """ + Produces a plot given the observation metadata the error summary metadata and + the list of the bad spectra for the given (error, rcu) and the average spectrum + of the rest of the array + :param observation_metadata: the metadata information regarding the observation + :param error_metadata: the metadata information regarding the error + :param list_bad_spectra: a list containing the bad spectra for the given error, rcu couple + :param list_good_specta: the average spectrum of the rest of the array + :param path: the path where to store the file + """ + plt.switch_backend('agg') + + plt.figure(figsize=(12, 9)) + set_style() + + plt.xlabel('MHz') + plt.ylabel('dB') + + frequency_sampling = len(list_bad_spectra[0][0]) + n_spectra = len(list_bad_spectra) + frequency = numpy.linspace(error_metadata.start_frequency, + error_metadata.stop_frequency, + frequency_sampling) + bad_spectra_cube = numpy.zeros((n_spectra, frequency_sampling)) + average_spectra_cube = numpy.zeros((n_spectra, frequency_sampling)) + + for i, spectrum in enumerate(list_bad_spectra): + bad_spectra_cube[i, :] = spectrum[0] + for i, spectrum in enumerate(list_good_specta): + average_spectra_cube[i, :] = spectrum[0] + + min_bad_spectrum = numpy.min(bad_spectra_cube, axis=0) + max_bad_spectrum = numpy.max(bad_spectra_cube, axis=0) + average_bad_spectrum = numpy.median(bad_spectra_cube, axis=0) + + average_good_spectrum = numpy.median(average_spectra_cube, axis=0) + + plt.fill_between(frequency, min_bad_spectrum, max_bad_spectrum, color='red', + alpha=.3) + + plt.plot(frequency, average_bad_spectrum, color='red', label='median bad spectra') + + plt.plot(frequency, average_good_spectrum, color='blue', label='median all spectra') + + summary_text = render_summary_text(observation_metadata=observation_metadata, + error_metadata=error_metadata) + + plt.text(0.02, .98, summary_text, bbox=dict(facecolor='yellow', alpha=0.2), + family='monospace', verticalalignment='top', + horizontalalignment='left', + transform=plt.axes().transAxes) + plt.legend() + plt.savefig(path) + plt.close() + + +def generate_summary_plot_for_error(error_summary_id): + """ + Given a error summary id generates the plot in the specific location given by the settings + parameter URL_TO_STORE_RTSM_PLOTS and the file name derived in the following fashion + %(observation_id)d_%(station_name)s_rcu%(rcu)d_%(error_type)s.png + + Finally stores in the database the file name to directly access the file + + :param error_summary_id: database id of the error summary + :return: the database id of the RTSMSummaryPlot instance + :raise Exception: raise if there is an exception + """ + basePath = settings.URL_TO_STORE_RTSM_PLOTS + + error_summary = RTSMErrorSummary.objects.get(pk=error_summary_id) + summary_plot = error_summary.summary_plot.first() + if summary_plot is not None: + if summary_plot.uri == None: + return 'another worker is taken care of the task...' + + summary_plot = RTSMSummaryPlot(error_summary=error_summary, uri=None) + summary_plot.save() + + try: + observation_metadata = ObservationMetadata() + observation = error_summary.observation + observation_metadata.observation_id = observation.observation_id + observation_metadata.station_name = observation.station.name + observation_metadata.samples = observation.samples + errors = observation.errors.filter(mode=error_summary.mode, + rcu=error_summary.rcu, + error_type=error_summary.error_type) + + list_bad_spectra = errors.values_list('spectra__bad_spectrum') + list_good_spectra = errors.values_list('spectra__average_spectrum') + file_name = '%(observation_id)d_%(station_name)s_rcu%(rcu)d_%(error_type)s.png' % dict( + observation_id=observation_metadata.observation_id, + station_name=observation_metadata.station_name, + rcu=error_summary.rcu, + error_type=error_summary.error_type + ) + + full_path = os.path.join(basePath, file_name) + produce_plot(observation_metadata, + error_summary, + list_bad_spectra, list_good_spectra, + full_path) + summary_plot.uri = file_name + summary_plot.save() + return full_path + except Exception as e: + logger.exception('exception %s occurred skipping...', e) + summary_plot.delete() + raise e + + +@shared_task(bind=True, track_started=True) +def check_error_summary_plot(self, error_summary_id): + """ + Checks if the error summary plot is presents otherwise it generates one + :param self: shared_task + :type self: celery.shared_task() + :param error_summary_id: database id of the RTSMErrorSummary to check + :return: + """ + error_summary = RTSMErrorSummary.objects.get(pk=error_summary_id) + summary_plot = error_summary.summary_plot.first() + if summary_plot is None: + return generate_summary_plot_for_error(error_summary_id) + else: + base_path = settings.URL_TO_STORE_RTSM_PLOTS + if summary_plot.uri is None: + return 'another worker is taking care of the task...' + full_path = os.path.join(base_path, summary_plot.uri) + if not os.path.exists(full_path): + return generate_summary_plot_for_error(error_summary_id) + elif os.path.isdir(full_path): + raise Exception('%s is a directory' % full_path) + else: + return error_summary.summary_plot.uri + + +@shared_task(bind=True, track_started=True) +def check_observation_plots(self, database_id): + """ + Generate the summary plots from a given RTSM observation + :param self: shared_task + :type self: celery.shared_task() + :param database_id: database id of the RTSM observation + :return: + """ + + observation = RTSMObservation.objects.get(id=database_id) + observation_metadata = ObservationMetadata() + + observation_metadata.observation_id = observation.observation_id + observation_metadata.station_name = observation.station.name + observation_metadata.samples = observation.samples + list_of_generated_files = [] + for error in observation.errors_summary.all(): + list_of_generated_files += [check_error_summary_plot.delay(error.pk)] + return 'checking in progress' diff --git a/LCU/Maintenance/DBInterface/monitoringdb/urls.py b/LCU/Maintenance/DBInterface/monitoringdb/urls.py index 31bd3275b09..119c244538b 100644 --- a/LCU/Maintenance/DBInterface/monitoringdb/urls.py +++ b/LCU/Maintenance/DBInterface/monitoringdb/urls.py @@ -37,12 +37,14 @@ urlpatterns = [ url(r'^api/api-auth', include('rest_framework.urls', namespace='rest_framework')), url(r'^api/stationtests/raw/insert', insert_raw_station_test), url(r'^api/rtsm/raw/insert', insert_raw_rtsm_test), + url(r'^api/rtsm/error_summary_plot/(?P<pk>\d+)', get_summary_plot), url(r'^api/view/ctrl_stationoverview', ControllerStationOverview.as_view()), url(r'^api/view/ctrl_stationtestsummary', ControllerStationTestsSummary.as_view()), url(r'^api/view/ctrl_latest_observation', ControllerLatestObservations.as_view()), url(r'^api/view/ctrl_stationtest_statistics', ControllerStationTestStatistics.as_view()), url(r'^api/view/ctrl_list_component_error_types', ControllerAllComponentErrorTypes.as_view()), url(r'^api/view/ctrl_station_component_errors', ControllerStationComponentErrors.as_view()), + url(r'^api/view/ctrl_test_queue', ControllerTestCeleryQueue.as_view()), url(r'^api/docs', include_docs_urls(title='Monitoring DB API')) ] diff --git a/LCU/Maintenance/DBInterface/monitoringdb/views/common.py b/LCU/Maintenance/DBInterface/monitoringdb/views/common.py index 899d1642fcf..db345cc545f 100644 --- a/LCU/Maintenance/DBInterface/monitoringdb/views/common.py +++ b/LCU/Maintenance/DBInterface/monitoringdb/views/common.py @@ -1,8 +1,10 @@ from rest_framework import viewsets, status from rest_framework.response import Response +from django.http import HttpResponse from rest_framework.decorators import api_view import logging from ..exceptions import ItemAlreadyExists +from django.core.exceptions import ObjectDoesNotExist RESERVED_FILTER_NAME = ['limit', 'offset', 'format', 'page_size'] diff --git a/LCU/Maintenance/DBInterface/monitoringdb/views/controllers.py b/LCU/Maintenance/DBInterface/monitoringdb/views/controllers.py index dc4b0bd734f..47a2fa6440a 100644 --- a/LCU/Maintenance/DBInterface/monitoringdb/views/controllers.py +++ b/LCU/Maintenance/DBInterface/monitoringdb/views/controllers.py @@ -904,4 +904,28 @@ class ControllerStationComponentErrors(ValidableReadOnlyView): payload[component_type] = sorted(station_test_errors_per_type + rtsm_errors_per_type, key=lambda item: item['start_date'], reverse=True) - return Response(status=status.HTTP_200_OK, data=payload) \ No newline at end of file + return Response(status=status.HTTP_200_OK, data=payload) + +from ..tasks import greetings, check_observation_plots + +class ControllerTestCeleryQueue(ValidableReadOnlyView): + observation_database_id = None + fields = [ + coreapi.Field( + 'observation_database_id', + required=True, + type=int, + location='query', + + schema=coreschema.Integer(description='observation_id') + )] + def compute_response(self): + from celery.result import AsyncResult + task_id = 'print_last_observation_%s' % self.observation_database_id + + stuff = check_observation_plots.apply_async((self.observation_database_id,)) + stuff = check_observation_plots.apply_async((self.observation_database_id,)) + + #print(aresult.get(timeout=10)) + + return Response(status=status.HTTP_200_OK, data='command sent: %s' % stuff) \ No newline at end of file diff --git a/LCU/Maintenance/DBInterface/monitoringdb/views/rtsm_views.py b/LCU/Maintenance/DBInterface/monitoringdb/views/rtsm_views.py index 5e0a2560ff7..04cc5f151db 100644 --- a/LCU/Maintenance/DBInterface/monitoringdb/views/rtsm_views.py +++ b/LCU/Maintenance/DBInterface/monitoringdb/views/rtsm_views.py @@ -1,7 +1,7 @@ from .common import * from ..models.rtsm import RTSMObservation, RTSMError, RTSMSpectrum from ..serializers.rtsm import RTSMObservationSerializer, RTSMErrorSerializer, \ - RTSMSpectrumSerializer + RTSMSpectrumSerializer, RTSMSummaryPlotSerializer, RTSMSummaryPlot from ..rtsm_test_raw_parser import parse_rtsm_test logger = logging.getLogger('views') @@ -24,6 +24,26 @@ class RTSMObservationViewSet(viewsets.ReadOnlyModelViewSet): serializer_class = RTSMObservationSerializer filter_fields = '__all__' +import os +@api_view(['GET']) +def get_summary_plot(request, pk): + try: + entity = RTSMSummaryPlot.objects.get(pk=pk) + uri = RTSMSummaryPlotSerializer(entity).data['uri'] + + except ObjectDoesNotExist as e: + return HttpResponse('<h1>NOT FOUND</h1>', status=status.HTTP_404_NOT_FOUND) + + try: + if(os.path.exists(uri) and os.path.isfile(uri)): + with open(uri, 'rb') as f_stream: + image = f_stream.read() + return HttpResponse(image, status=status.HTTP_200_OK, content_type='image/gif') + else: + return HttpResponse('<h1>NOT FOUND</h1>', status=status.HTTP_404_NOT_FOUND) + except Exception as e: + return HttpResponse('exception %s occurred: %s' % (e.__class__.__name__, e), + status=status.HTTP_500_INTERNAL_SERVER_ERROR) @api_view(['POST']) def insert_raw_rtsm_test(request): -- GitLab