Skip to content
Snippets Groups Projects
Commit 8aa98b2d authored by Mattia Mancini's avatar Mattia Mancini
Browse files

OSB-34: Setup of the preliminary implementation of the celery infrastructure,...

OSB-34: Setup of the preliminary implementation of the celery infrastructure, integration with the already existring django infrastructure and modification of the models to include and serve the RTSM plots
parent b6fe7b1a
No related branches found
No related tags found
2 merge requests!89Monitoring maintenance Epic branch merge,!1Resolve OSB-13 "Monitoringmaintenance "
Showing
with 374 additions and 5 deletions
...@@ -1790,6 +1790,7 @@ LCU/Maintenance/CMakeLists.txt -text ...@@ -1790,6 +1790,7 @@ LCU/Maintenance/CMakeLists.txt -text
LCU/Maintenance/DBInterface/CMakeLists.txt -text LCU/Maintenance/DBInterface/CMakeLists.txt -text
LCU/Maintenance/DBInterface/__init__.py -text LCU/Maintenance/DBInterface/__init__.py -text
LCU/Maintenance/DBInterface/django_postgresql/__init__.py -text LCU/Maintenance/DBInterface/django_postgresql/__init__.py -text
LCU/Maintenance/DBInterface/django_postgresql/celery_settings.py -text
LCU/Maintenance/DBInterface/django_postgresql/create_db.sql -text LCU/Maintenance/DBInterface/django_postgresql/create_db.sql -text
LCU/Maintenance/DBInterface/django_postgresql/settings.py -text LCU/Maintenance/DBInterface/django_postgresql/settings.py -text
LCU/Maintenance/DBInterface/django_postgresql/urls.py -text LCU/Maintenance/DBInterface/django_postgresql/urls.py -text
...@@ -1823,6 +1824,7 @@ LCU/Maintenance/DBInterface/monitoringdb/serializers/station.py -text ...@@ -1823,6 +1824,7 @@ LCU/Maintenance/DBInterface/monitoringdb/serializers/station.py -text
LCU/Maintenance/DBInterface/monitoringdb/serializers/station_tests.py -text LCU/Maintenance/DBInterface/monitoringdb/serializers/station_tests.py -text
LCU/Maintenance/DBInterface/monitoringdb/serializers/utils.py -text LCU/Maintenance/DBInterface/monitoringdb/serializers/utils.py -text
LCU/Maintenance/DBInterface/monitoringdb/station_test_raw_parser.py -text LCU/Maintenance/DBInterface/monitoringdb/station_test_raw_parser.py -text
LCU/Maintenance/DBInterface/monitoringdb/tasks.py -text
LCU/Maintenance/DBInterface/monitoringdb/tests/__init__.py -text LCU/Maintenance/DBInterface/monitoringdb/tests/__init__.py -text
LCU/Maintenance/DBInterface/monitoringdb/tests/common.py -text LCU/Maintenance/DBInterface/monitoringdb/tests/common.py -text
LCU/Maintenance/DBInterface/monitoringdb/tests/old_tests.py -text LCU/Maintenance/DBInterface/monitoringdb/tests/old_tests.py -text
......
from .celery_settings import backend_tasks
__all__ = ('backend_tasks')
\ No newline at end of file
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'lofar.maintenance.django_postgresql.settings')
backend_tasks = Celery('maintenancedb')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
backend_tasks.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
backend_tasks.autodiscover_tasks()
@backend_tasks.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
\ No newline at end of file
...@@ -177,6 +177,7 @@ REST_FRAMEWORK = { ...@@ -177,6 +177,7 @@ REST_FRAMEWORK = {
'DEFAULT_FILTER_BACKENDS': ('django_filters.rest_framework.DjangoFilterBackend',) 'DEFAULT_FILTER_BACKENDS': ('django_filters.rest_framework.DjangoFilterBackend',)
} }
CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'
# LOFAR SPECIFIC PARAMETERS # LOFAR SPECIFIC PARAMETERS
URL_TO_RTSM_PLOTS = 'https://proxy.lofar.eu/rtsm/obs_plots/' URL_TO_RTSM_PLOTS = 'https://proxy.lofar.eu/rtsm/obs_plots/'
\ No newline at end of file URL_TO_STORE_RTSM_PLOTS = '/home/mmancini/svn-tree/MonitoringMaintenance-OSB-32/build/gnucxx11_debug/plots'
\ No newline at end of file
...@@ -17,7 +17,6 @@ MODE_TO_COMPONENT = { ...@@ -17,7 +17,6 @@ MODE_TO_COMPONENT = {
7: 'HBA' 7: 'HBA'
} }
class RTSMObservation(models.Model): class RTSMObservation(models.Model):
observation_id = models.PositiveIntegerField(default=0) observation_id = models.PositiveIntegerField(default=0)
...@@ -66,6 +65,15 @@ class RTSMErrorSummary(models.Model): ...@@ -66,6 +65,15 @@ class RTSMErrorSummary(models.Model):
stop_frequency = models.FloatField(default=None, null=True) stop_frequency = models.FloatField(default=None, null=True)
class RTSMSummaryPlot(models.Model):
error_summary = models.ForeignKey(RTSMErrorSummary,
related_name='summary_plot',
on_delete=models.SET_NULL,
null=True)
uri = models.CharField(max_length=10000, default=None, null=True)
class RTSMSpectrum(models.Model): class RTSMSpectrum(models.Model):
bad_spectrum = ArrayField(models.FloatField()) bad_spectrum = ArrayField(models.FloatField())
average_spectrum = ArrayField(models.FloatField()) average_spectrum = ArrayField(models.FloatField())
......
...@@ -2,9 +2,11 @@ from ..models.rtsm import * ...@@ -2,9 +2,11 @@ from ..models.rtsm import *
from rest_framework import serializers from rest_framework import serializers
import logging import logging
from django.db.transaction import atomic from django.db.transaction import atomic
from django.conf import settings
from ..exceptions import ItemAlreadyExists from ..exceptions import ItemAlreadyExists
from .log import ActionLogSerializer from .log import ActionLogSerializer
from .station import Station from .station import Station
import os
logger = logging.getLogger('serializers') logger = logging.getLogger('serializers')
...@@ -100,3 +102,23 @@ class RTSMObservationSerializer(serializers.ModelSerializer): ...@@ -100,3 +102,23 @@ class RTSMObservationSerializer(serializers.ModelSerializer):
self.compute_summary(RTSMObservation_instance) self.compute_summary(RTSMObservation_instance)
return RTSMObservation_instance return RTSMObservation_instance
class FilePathSerializer(serializers.Field):
def __init__(self, path, *args, **kwargs):
self.path = path
super(FilePathSerializer, self).__init__(*args, **kwargs)
def to_representation(self, value):
full_path = os.path.join(self.path, value)
return full_path
def to_internal_value(self, data):
file_name = os.path.relpath(self.path, data)
return file_name
class RTSMSummaryPlotSerializer(serializers.ModelSerializer):
uri = FilePathSerializer(settings.URL_TO_STORE_RTSM_PLOTS)
class Meta:
model = RTSMSummaryPlot
fields = '__all__'
import logging
import os
import matplotlib.pyplot as plt
import numpy
from celery import shared_task
from django.conf import settings
from .models.rtsm import RTSMObservation, MODE_TO_COMPONENT, RTSMErrorSummary, RTSMSummaryPlot
logger = logging.getLogger(__name__)
@shared_task
def greetings(name):
logger.info('greetings from celerity mr/ms %s', name)
class Metadata:
def __init__(self, **kwargs):
for key, value in kwargs.items():
self.__setattr__(key, value)
class ObservationMetadata(Metadata):
station_name = 'CSXXXC'
start_datetime = '1212-12-12 12:12:12'
end_datetime = '1212-12-12 12:12:12'
observation_id = 'XXXXXX'
samples = 120
class ErrorMetadata(Metadata):
error_type = ''
rcu_id = ''
mode = 0
count = ''
start_frequency = 0
stop_frequency = 0
def set_style():
plt.minorticks_on()
plt.ylim(55, 160)
plt.grid()
# plt.rcParams['svg.fonttype'] = 'none'
plt.rcParams['lines.linewidth'] = 1
plt.rcParams['figure.autolayout'] = True
plt.rcParams['axes.labelsize'] = 'large'
plt.rcParams['path.simplify'] = True
plt.rcParams['savefig.transparent'] = False
plt.rcParams['savefig.bbox'] = 'tight'
plt.rcParams['legend.loc'] = 'upper right'
plt.rcParams['legend.frameon'] = True
plt.rcParams['legend.framealpha'] = 1
plt.rcParams['legend.edgecolor'] = 'black'
def rcu_mode_to_antenna(rcu, mode):
component_name = MODE_TO_COMPONENT[mode]
antenna = rcu // 2
if component_name in ['LBH', 'HBA']:
polarization = 'X' if rcu % 2 == 0 else 'Y'
elif component_name == 'LBL':
antenna = rcu // 2 + 48
polarization = 'Y' if rcu % 2 == 0 else 'X'
else:
raise ValueError('Unknown component %s' % component_name)
return dict(polarization=polarization, antenna=antenna)
def render_summary_text(error_metadata, observation_metadata):
"""
Render the summary text to include in the picture
:param error_metadata: the metadata information regarding the error
:param observation_metadata: the metadata information regarding the observation
:return:
"""
rendered_text = 'Station Name: {:<25s}'.format(observation_metadata.station_name)
rendered_text += 'Faults : {}\n'.format(error_metadata.error_type)
rendered_text += 'ObsID : {:<25d}'.format(observation_metadata.observation_id)
rendered_text += 'Antenna : {antenna} {polarization}\n' \
.format(**rcu_mode_to_antenna(error_metadata.rcu, error_metadata.mode))
rendered_text += 'Start : {:<25s}'.format(observation_metadata.start_datetime)
rendered_text += 'RCU : {}\n'.format(error_metadata.rcu)
rendered_text += 'Stop : {:<25s}'.format(observation_metadata.end_datetime)
rendered_text += 'Samples : {count:d}/{samples:d}\n'.format(count=error_metadata.count,
samples=observation_metadata.samples)
rendered_text += 'Mode : {:<25d}'.format(error_metadata.mode)
rendered_text += 'Badness : {:.2f} %\n'.format(
(100. * error_metadata.count) / observation_metadata.samples)
return rendered_text
def produce_plot(observation_metadata,
error_metadata,
list_bad_spectra, list_good_specta, path):
"""
Produces a plot given the observation metadata the error summary metadata and
the list of the bad spectra for the given (error, rcu) and the average spectrum
of the rest of the array
:param observation_metadata: the metadata information regarding the observation
:param error_metadata: the metadata information regarding the error
:param list_bad_spectra: a list containing the bad spectra for the given error, rcu couple
:param list_good_specta: the average spectrum of the rest of the array
:param path: the path where to store the file
"""
plt.switch_backend('agg')
plt.figure(figsize=(12, 9))
set_style()
plt.xlabel('MHz')
plt.ylabel('dB')
frequency_sampling = len(list_bad_spectra[0][0])
n_spectra = len(list_bad_spectra)
frequency = numpy.linspace(error_metadata.start_frequency,
error_metadata.stop_frequency,
frequency_sampling)
bad_spectra_cube = numpy.zeros((n_spectra, frequency_sampling))
average_spectra_cube = numpy.zeros((n_spectra, frequency_sampling))
for i, spectrum in enumerate(list_bad_spectra):
bad_spectra_cube[i, :] = spectrum[0]
for i, spectrum in enumerate(list_good_specta):
average_spectra_cube[i, :] = spectrum[0]
min_bad_spectrum = numpy.min(bad_spectra_cube, axis=0)
max_bad_spectrum = numpy.max(bad_spectra_cube, axis=0)
average_bad_spectrum = numpy.median(bad_spectra_cube, axis=0)
average_good_spectrum = numpy.median(average_spectra_cube, axis=0)
plt.fill_between(frequency, min_bad_spectrum, max_bad_spectrum, color='red',
alpha=.3)
plt.plot(frequency, average_bad_spectrum, color='red', label='median bad spectra')
plt.plot(frequency, average_good_spectrum, color='blue', label='median all spectra')
summary_text = render_summary_text(observation_metadata=observation_metadata,
error_metadata=error_metadata)
plt.text(0.02, .98, summary_text, bbox=dict(facecolor='yellow', alpha=0.2),
family='monospace', verticalalignment='top',
horizontalalignment='left',
transform=plt.axes().transAxes)
plt.legend()
plt.savefig(path)
plt.close()
def generate_summary_plot_for_error(error_summary_id):
"""
Given a error summary id generates the plot in the specific location given by the settings
parameter URL_TO_STORE_RTSM_PLOTS and the file name derived in the following fashion
%(observation_id)d_%(station_name)s_rcu%(rcu)d_%(error_type)s.png
Finally stores in the database the file name to directly access the file
:param error_summary_id: database id of the error summary
:return: the database id of the RTSMSummaryPlot instance
:raise Exception: raise if there is an exception
"""
basePath = settings.URL_TO_STORE_RTSM_PLOTS
error_summary = RTSMErrorSummary.objects.get(pk=error_summary_id)
summary_plot = error_summary.summary_plot.first()
if summary_plot is not None:
if summary_plot.uri == None:
return 'another worker is taken care of the task...'
summary_plot = RTSMSummaryPlot(error_summary=error_summary, uri=None)
summary_plot.save()
try:
observation_metadata = ObservationMetadata()
observation = error_summary.observation
observation_metadata.observation_id = observation.observation_id
observation_metadata.station_name = observation.station.name
observation_metadata.samples = observation.samples
errors = observation.errors.filter(mode=error_summary.mode,
rcu=error_summary.rcu,
error_type=error_summary.error_type)
list_bad_spectra = errors.values_list('spectra__bad_spectrum')
list_good_spectra = errors.values_list('spectra__average_spectrum')
file_name = '%(observation_id)d_%(station_name)s_rcu%(rcu)d_%(error_type)s.png' % dict(
observation_id=observation_metadata.observation_id,
station_name=observation_metadata.station_name,
rcu=error_summary.rcu,
error_type=error_summary.error_type
)
full_path = os.path.join(basePath, file_name)
produce_plot(observation_metadata,
error_summary,
list_bad_spectra, list_good_spectra,
full_path)
summary_plot.uri = file_name
summary_plot.save()
return full_path
except Exception as e:
logger.exception('exception %s occurred skipping...', e)
summary_plot.delete()
raise e
@shared_task(bind=True, track_started=True)
def check_error_summary_plot(self, error_summary_id):
"""
Checks if the error summary plot is presents otherwise it generates one
:param self: shared_task
:type self: celery.shared_task()
:param error_summary_id: database id of the RTSMErrorSummary to check
:return:
"""
error_summary = RTSMErrorSummary.objects.get(pk=error_summary_id)
summary_plot = error_summary.summary_plot.first()
if summary_plot is None:
return generate_summary_plot_for_error(error_summary_id)
else:
base_path = settings.URL_TO_STORE_RTSM_PLOTS
if summary_plot.uri is None:
return 'another worker is taking care of the task...'
full_path = os.path.join(base_path, summary_plot.uri)
if not os.path.exists(full_path):
return generate_summary_plot_for_error(error_summary_id)
elif os.path.isdir(full_path):
raise Exception('%s is a directory' % full_path)
else:
return error_summary.summary_plot.uri
@shared_task(bind=True, track_started=True)
def check_observation_plots(self, database_id):
"""
Generate the summary plots from a given RTSM observation
:param self: shared_task
:type self: celery.shared_task()
:param database_id: database id of the RTSM observation
:return:
"""
observation = RTSMObservation.objects.get(id=database_id)
observation_metadata = ObservationMetadata()
observation_metadata.observation_id = observation.observation_id
observation_metadata.station_name = observation.station.name
observation_metadata.samples = observation.samples
list_of_generated_files = []
for error in observation.errors_summary.all():
list_of_generated_files += [check_error_summary_plot.delay(error.pk)]
return 'checking in progress'
...@@ -37,12 +37,14 @@ urlpatterns = [ ...@@ -37,12 +37,14 @@ urlpatterns = [
url(r'^api/api-auth', include('rest_framework.urls', namespace='rest_framework')), url(r'^api/api-auth', include('rest_framework.urls', namespace='rest_framework')),
url(r'^api/stationtests/raw/insert', insert_raw_station_test), url(r'^api/stationtests/raw/insert', insert_raw_station_test),
url(r'^api/rtsm/raw/insert', insert_raw_rtsm_test), url(r'^api/rtsm/raw/insert', insert_raw_rtsm_test),
url(r'^api/rtsm/error_summary_plot/(?P<pk>\d+)', get_summary_plot),
url(r'^api/view/ctrl_stationoverview', ControllerStationOverview.as_view()), url(r'^api/view/ctrl_stationoverview', ControllerStationOverview.as_view()),
url(r'^api/view/ctrl_stationtestsummary', ControllerStationTestsSummary.as_view()), url(r'^api/view/ctrl_stationtestsummary', ControllerStationTestsSummary.as_view()),
url(r'^api/view/ctrl_latest_observation', ControllerLatestObservations.as_view()), url(r'^api/view/ctrl_latest_observation', ControllerLatestObservations.as_view()),
url(r'^api/view/ctrl_stationtest_statistics', ControllerStationTestStatistics.as_view()), url(r'^api/view/ctrl_stationtest_statistics', ControllerStationTestStatistics.as_view()),
url(r'^api/view/ctrl_list_component_error_types', ControllerAllComponentErrorTypes.as_view()), url(r'^api/view/ctrl_list_component_error_types', ControllerAllComponentErrorTypes.as_view()),
url(r'^api/view/ctrl_station_component_errors', ControllerStationComponentErrors.as_view()), url(r'^api/view/ctrl_station_component_errors', ControllerStationComponentErrors.as_view()),
url(r'^api/view/ctrl_test_queue', ControllerTestCeleryQueue.as_view()),
url(r'^api/docs', include_docs_urls(title='Monitoring DB API')) url(r'^api/docs', include_docs_urls(title='Monitoring DB API'))
] ]
from rest_framework import viewsets, status from rest_framework import viewsets, status
from rest_framework.response import Response from rest_framework.response import Response
from django.http import HttpResponse
from rest_framework.decorators import api_view from rest_framework.decorators import api_view
import logging import logging
from ..exceptions import ItemAlreadyExists from ..exceptions import ItemAlreadyExists
from django.core.exceptions import ObjectDoesNotExist
RESERVED_FILTER_NAME = ['limit', 'offset', 'format', 'page_size'] RESERVED_FILTER_NAME = ['limit', 'offset', 'format', 'page_size']
......
...@@ -904,4 +904,28 @@ class ControllerStationComponentErrors(ValidableReadOnlyView): ...@@ -904,4 +904,28 @@ class ControllerStationComponentErrors(ValidableReadOnlyView):
payload[component_type] = sorted(station_test_errors_per_type + rtsm_errors_per_type, key=lambda item: item['start_date'], reverse=True) payload[component_type] = sorted(station_test_errors_per_type + rtsm_errors_per_type, key=lambda item: item['start_date'], reverse=True)
return Response(status=status.HTTP_200_OK, data=payload) return Response(status=status.HTTP_200_OK, data=payload)
\ No newline at end of file
from ..tasks import greetings, check_observation_plots
class ControllerTestCeleryQueue(ValidableReadOnlyView):
observation_database_id = None
fields = [
coreapi.Field(
'observation_database_id',
required=True,
type=int,
location='query',
schema=coreschema.Integer(description='observation_id')
)]
def compute_response(self):
from celery.result import AsyncResult
task_id = 'print_last_observation_%s' % self.observation_database_id
stuff = check_observation_plots.apply_async((self.observation_database_id,))
stuff = check_observation_plots.apply_async((self.observation_database_id,))
#print(aresult.get(timeout=10))
return Response(status=status.HTTP_200_OK, data='command sent: %s' % stuff)
\ No newline at end of file
from .common import * from .common import *
from ..models.rtsm import RTSMObservation, RTSMError, RTSMSpectrum from ..models.rtsm import RTSMObservation, RTSMError, RTSMSpectrum
from ..serializers.rtsm import RTSMObservationSerializer, RTSMErrorSerializer, \ from ..serializers.rtsm import RTSMObservationSerializer, RTSMErrorSerializer, \
RTSMSpectrumSerializer RTSMSpectrumSerializer, RTSMSummaryPlotSerializer, RTSMSummaryPlot
from ..rtsm_test_raw_parser import parse_rtsm_test from ..rtsm_test_raw_parser import parse_rtsm_test
logger = logging.getLogger('views') logger = logging.getLogger('views')
...@@ -24,6 +24,26 @@ class RTSMObservationViewSet(viewsets.ReadOnlyModelViewSet): ...@@ -24,6 +24,26 @@ class RTSMObservationViewSet(viewsets.ReadOnlyModelViewSet):
serializer_class = RTSMObservationSerializer serializer_class = RTSMObservationSerializer
filter_fields = '__all__' filter_fields = '__all__'
import os
@api_view(['GET'])
def get_summary_plot(request, pk):
try:
entity = RTSMSummaryPlot.objects.get(pk=pk)
uri = RTSMSummaryPlotSerializer(entity).data['uri']
except ObjectDoesNotExist as e:
return HttpResponse('<h1>NOT FOUND</h1>', status=status.HTTP_404_NOT_FOUND)
try:
if(os.path.exists(uri) and os.path.isfile(uri)):
with open(uri, 'rb') as f_stream:
image = f_stream.read()
return HttpResponse(image, status=status.HTTP_200_OK, content_type='image/gif')
else:
return HttpResponse('<h1>NOT FOUND</h1>', status=status.HTTP_404_NOT_FOUND)
except Exception as e:
return HttpResponse('exception %s occurred: %s' % (e.__class__.__name__, e),
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
@api_view(['POST']) @api_view(['POST'])
def insert_raw_rtsm_test(request): def insert_raw_rtsm_test(request):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment