import datetime import logging from collections import OrderedDict from math import ceil from django.conf import settings import coreapi import coreschema from django.db.models import Count from lofar.maintenance.monitoringdb.models.component_error import ComponentError from lofar.maintenance.monitoringdb.models.rtsm import RTSMErrorSummary, MODE_TO_COMPONENT, COMPONENT_TO_MODE from lofar.maintenance.monitoringdb.models.rtsm import RTSMObservation from lofar.maintenance.monitoringdb.models.station import Station from lofar.maintenance.monitoringdb.models.station_test import StationTest from rest_framework import status from rest_framework.reverse import reverse from rest_framework.response import Response from rest_framework.schemas import ManualSchema from rest_framework.views import APIView import pytz logger = logging.getLogger(__name__) def parse_date(date): expected_format = '%Y-%m-%d' try: parsed_date = datetime.datetime.strptime(date, expected_format) return pytz.utc.localize(parsed_date) except Exception as e: raise ValueError('cannot parse %s with format %s - %s' % (date, expected_format, e)) def parse_bool(boolean_value_str): boolean_value_str = boolean_value_str.lower() if boolean_value_str in ['t', 'true', '1']: return True elif boolean_value_str in ['f', 'false', '0']: return False else: raise ValueError('%s is neither true or false' % boolean_value_str) def parse_array(array_str): parsed_array = list(array_str.strip().lstrip('[').rstrip(']').split(',')) if len(parsed_array) == 1 and parsed_array[0] == '': return [] else: return parsed_array def _get_unique_error_types(): """ List the unique error types found in the database :return: the list containing the unique error types :rtype: list """ return [item['type'] for item in ComponentError.objects.values('type').distinct()] class ValidableReadOnlyView(APIView): """ Convenience APIView class to have the validation of the query parameters on a get http request """ # Override this to make the schema validation work fields = [] description = '' def compute_response(self): raise NotImplementedError() @property def schema(self): return ManualSchema(fields=self.fields, description=self.description) def validate_query_parameters(self, request): """ Validated the request parameters and stores them as fields :param request: the http request to the api call :type request: rest_framework.request.Request :raises ValueError: if the parameter is not valid :raises KeyError: if the requested parameter is missing """ for field in self.fields: if field.required and field.name not in request.query_params: raise KeyError('%s parameter is missing' % field.name) elif field.name not in request.query_params: continue else: value = self.request.query_params.get(field.name) if field.type: self.__setattr__(field.name, field.type(value)) else: self.__setattr__(field.name, value) errors = field.schema.validate(self.__getattribute__(field.name)) for error in errors: raise ValueError(" ".join([field.name, error.text])) def get(self, request): # Store the request as attribute self.request = request try: self.validate_query_parameters(request) except ValueError as e: return Response(status=status.HTTP_406_NOT_ACCEPTABLE, data='Please specify the correct parameters: %s' % (e,)) except KeyError as e: return Response(status=status.HTTP_406_NOT_ACCEPTABLE, data='Please specify all the required parameters: %s' % (e,)) try: response = self.compute_response() except ValueError as e: return Response(status=status.HTTP_406_NOT_ACCEPTABLE, data='Please specify the correct parameters: %s' % (e,)) except Exception as e: logger.exception(e) return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR, data='exception occurred: %s' % e) return response class ControllerStationOverview(ValidableReadOnlyView): description = "Overview of the latest tests performed on the stations" station_group = 'A' errors_only = 'true' n_station_tests = 4 n_rtsm = 4 error_types = [] fields = [ coreapi.Field( "station_group", required=False, location='query', schema=coreschema.Enum(['C', 'R', 'I', 'A'], description= 'Station group to select for choices are [C|R|I|ALL]', ) ), coreapi.Field( "n_station_tests", required=False, location='query', type=int, schema=coreschema.Integer(description='number of station tests to select', minimum=1) ), coreapi.Field( "n_rtsm", required=False, location='query', type=int, schema=coreschema.Integer(description='number of station tests to select', minimum=1) ), coreapi.Field( "errors_only", required=False, location='query', type=parse_bool, schema=coreschema.Boolean( description='displays or not only the station with more than one error') ), coreapi.Field( "error_types", required=False, location='query', type=parse_array, schema=coreschema.Array(description='select the error types to filter for', items=coreschema.Enum(_get_unique_error_types()), unique_items=True) ) ] def compute_response(self): station_entities = Station.objects.all() for group in self.station_group: if group is not 'A': station_entities = station_entities.filter(type=group) # Since django preferes a ordered dict over a dict we make it happy... for now response_payload = list() for station_entity in station_entities: station_payload = OrderedDict() station_payload['station_name'] = station_entity.name station_test_list = StationTest.objects.filter( station__name=station_entity.name).order_by('-end_datetime')[:self.n_station_tests] rtsm_list = RTSMObservation.objects.filter( station__name=station_entity.name).order_by('-end_datetime')[:self.n_rtsm] station_payload['station_tests'] = list() for station_test in station_test_list: station_test_payload = OrderedDict() component_errors = station_test.component_errors selected_component_errors = component_errors if(self.error_types): selected_component_errors = component_errors.filter(type__in=self.error_types) station_test_payload[ 'total_component_errors'] = selected_component_errors.count() station_test_payload['start_datetime'] = station_test.start_datetime station_test_payload['end_datetime'] = station_test.end_datetime station_test_payload['checks'] = station_test.checks component_errors_summary = selected_component_errors. \ values('component__type', 'type').annotate( total=Count('type')).order_by('-total') component_errors_summary_dict = OrderedDict() for item in component_errors_summary: item_component_type = item['component__type'] item_error_type = item['type'] item_error_total = item['total'] if item_component_type not in component_errors_summary_dict: component_errors_summary_dict[item_component_type] = OrderedDict() component_errors_summary_dict[item_component_type][item_error_type] = \ item_error_total station_test_payload['component_error_summary'] = component_errors_summary_dict station_payload['station_tests'].append(station_test_payload) station_payload['rtsm'] = list() for rtsm in rtsm_list: rtsm_payload = OrderedDict() rtsm_payload['observation_id'] = rtsm.observation_id rtsm_payload['start_datetime'] = rtsm.start_datetime rtsm_payload['end_datetime'] = rtsm.end_datetime unique_modes = [item['mode'] for item in rtsm.errors_summary.values('mode').distinct()] rtsm_payload['mode'] = unique_modes selected_rtsm_errors = rtsm.errors_summary if(self.error_types): selected_rtsm_errors = rtsm.errors_summary.filter(error_type__in=self.error_types) rtsm_payload['total_component_errors'] = selected_rtsm_errors.count() errors_summary = OrderedDict() errors_summary_query = selected_rtsm_errors.values('error_type').annotate( total=Count('error_type')) for error_summary in errors_summary_query: errors_summary[error_summary['error_type']] = error_summary['total'] rtsm_payload['error_summary'] = errors_summary station_payload['rtsm'].append(rtsm_payload) response_payload.append(station_payload) if self.errors_only: response_payload = filter( lambda station_entry: len(station_entry['station_tests']) + len(station_entry['rtsm']) > 0, response_payload) response_payload = sorted(response_payload, key=lambda item: item['station_name']) return Response(status=status.HTTP_200_OK, data=response_payload) class ControllerStationTestsSummary(ValidableReadOnlyView): description = "Overview of the latest station tests performed on" \ " the stations a [loopback_time] days ago from now" station_group = 'A' errors_only = 'true' lookback_time = 7 error_types = [] fields = [ coreapi.Field( "station_group", required=False, location='query', schema=coreschema.Enum(['C', 'R', 'I', 'A'], description='Station group to select for choices are [C|R|I|ALL]') ), coreapi.Field( "errors_only", required=False, location='query', type=parse_bool, schema=coreschema.Boolean( description='displays or not only the station with more than one error') ), coreapi.Field( "lookback_time", required=False, type=int, location='query', schema=coreschema.Integer(description='number of days from now (default 7)', minimum=1) ), coreapi.Field( "error_types", required=False, location='query', type=parse_array, schema=coreschema.Array(description='select the error types to filter for', items=coreschema.Enum(_get_unique_error_types()), unique_items=True) ) ] def compute_response(self): self.lookback_time = datetime.timedelta(days=self.lookback_time) station_test_list = StationTest.objects \ .filter(start_datetime__gte=datetime.date.today() - self.lookback_time) \ .order_by('-start_datetime', 'station__name') for group in self.station_group: if group is not 'A': station_test_list = station_test_list.filter(station__type=group) # Since django preferes a ordered dict over a dict we make it happy... for now response_payload = list() for station_test in station_test_list: station_test_payload = OrderedDict() station_test_payload['station_name'] = station_test.station.name selected_component_errors = station_test.component_errors if(self.error_types): selected_component_errors = selected_component_errors.filter(type__in=self.error_types) station_test_payload[ 'total_component_errors'] = selected_component_errors.count() station_test_payload['date'] = station_test.start_datetime.strftime('%Y-%m-%d') station_test_payload['start_datetime'] = station_test.start_datetime station_test_payload['end_datetime'] = station_test.end_datetime station_test_payload['checks'] = station_test.checks component_errors_summary = selected_component_errors. \ values('component__type', 'type').annotate( total=Count('type')).order_by('-total') component_errors_summary_dict = OrderedDict() for item in component_errors_summary: item_component_type = item['component__type'] item_error_type = item['type'] item_error_total = item['total'] if item_component_type not in component_errors_summary_dict: component_errors_summary_dict[item_component_type] = OrderedDict() component_errors_summary_dict[item_component_type][item_error_type] = \ item_error_total station_test_payload['component_error_summary'] = component_errors_summary_dict response_payload.append(station_test_payload) if self.errors_only: response_payload = filter( lambda station_test_entry: station_test_entry['total_component_errors'] > 0, response_payload) return Response(status=status.HTTP_200_OK, data=response_payload) class ControllerLatestObservations(ValidableReadOnlyView): description = "Overview of the latest observations performed on the stations" station_group = 'A' errors_only = 'true' error_types = [] fields = [ coreapi.Field( "station_group", required=False, location='query', schema=coreschema.Enum(['C', 'R', 'I', 'A'], description= 'Station group to select for choices are [C|R|I|A]', ) ), coreapi.Field( "errors_only", required=False, location='query', type=parse_bool, schema=coreschema.Boolean( description='displays or not only the station with more than one error') ), coreapi.Field( "from_date", required=True, location='query', schema=coreschema.String( description='select rtsm from date (ex. YYYY-MM-DD)') ), coreapi.Field( "error_types", required=False, location='query', type=parse_array, schema=coreschema.Array(description='select the error types to filter for', items=coreschema.Enum(_get_unique_error_types()), unique_items=True) ) ] def compute_response(self): self.from_date = parse_date(self.from_date) filtered_entities = RTSMObservation.objects \ .filter(start_datetime__gte=self.from_date) if self.station_group != 'A': filtered_entities = filtered_entities \ .filter(station__type=self.station_group) if self.errors_only: filtered_entities = filtered_entities.exclude(errors_summary__isnull=True) errors_summary = filtered_entities \ .values('observation_id', 'station__name', 'start_datetime', 'end_datetime', 'errors_summary__error_type', 'errors_summary__mode') \ .annotate(total=Count('errors_summary__error_type')) \ .order_by('observation_id', 'station__name') if self.error_types: errors_summary = errors_summary.filter(errors_summary__error_type__in=self.error_types) response = dict() for error_summary in errors_summary: observation_id = error_summary['observation_id'] station_name = error_summary['station__name'] start_datetime = error_summary['start_datetime'] end_datetime = error_summary['end_datetime'] mode = error_summary['errors_summary__mode'] error_type = error_summary['errors_summary__error_type'] total = error_summary['total'] if observation_id not in response: response[observation_id] = OrderedDict() response[observation_id]['observation_id'] = observation_id response[observation_id]['start_datetime'] = start_datetime response[observation_id]['end_datetime'] = end_datetime response[observation_id]['total_component_errors'] = 0 response[observation_id]['mode'] = list() response[observation_id]['station_involved'] = dict() if total == 0: continue response[observation_id]['total_component_errors'] += total station_involved_summary = response[observation_id]['station_involved'] response[observation_id]['mode'] += [mode] \ if mode not in response[observation_id]['mode'] else [] if station_name not in station_involved_summary: station_involved_summary[station_name] = OrderedDict() station_involved_summary[station_name]['station_name'] = station_name station_involved_summary[station_name]['n_errors'] = 0 station_involved_summary[station_name]['component_error_summary'] = OrderedDict() station_involved_summary[station_name]['n_errors'] += total station_involved_summary[station_name]['component_error_summary'][error_type] = total response_payload = sorted(response.values(), key=lambda item: item['start_datetime'], reverse=True) return Response(status=status.HTTP_200_OK, data=response_payload) class ControllerStationTestStatistics(ValidableReadOnlyView): description = "Statistical summary of both or either the station test and RTSM" station_group = 'A' test_type = 'B' error_types = [] from_date = None to_date = None averaging_interval = None fields = [ coreapi.Field( "test_type", required=False, location='query', schema=coreschema.Enum(['R', 'S', 'B'], description='select the type of test possible values are (R, RTSM),' ' (S, Station test), (B, both)[DEFAULT=B]', ) ), coreapi.Field( "station_group", required=False, location='query', schema=coreschema.Enum(['C', 'R', 'I', 'A'], description= 'Station group to select for choices are [C|R|I|ALL]', ) ), coreapi.Field( "from_date", required=True, location='query', schema=coreschema.String( description='select tests from date (ex. YYYY-MM-DD)') ), coreapi.Field( "to_date", required=True, location='query', schema=coreschema.String( description='select tests to date (ex. YYYY-MM-DD)') ), coreapi.Field( "averaging_interval", required=True, location='query', type=int, schema=coreschema.Integer( description='averaging interval in days') ), coreapi.Field( "error_types", required=False, location='query', type=parse_array, schema=coreschema.Array(description='select the error types to filter for', items=coreschema.Enum(_get_unique_error_types()), unique_items=True) ) ] def compute_errors_per_station(self, from_date, to_date, central_time, station_group, test_type): component_errors = ComponentError.objects.all() rtsm_summary_errors = RTSMErrorSummary.objects.all() if station_group: component_errors = component_errors.filter(station_test__station__type=station_group) rtsm_summary_errors = rtsm_summary_errors.filter( observation__station__type=station_group) if self.error_types: component_errors = component_errors.filter(type__in=self.error_types) rtsm_summary_errors = rtsm_summary_errors.filter(error_type__in=self.error_types) station_test_results = [] rtsm_results = [] if test_type in ['S', 'B']: station_test_results = component_errors. \ filter(station_test__start_datetime__gt=from_date, station_test__start_datetime__lt=to_date). \ values('station_test__station__name'). \ annotate(n_errors=Count('station_test__station__name')) if test_type in ['R', 'B']: rtsm_results = rtsm_summary_errors. \ filter(observation__start_datetime__gt=from_date, observation__start_datetime__lt=to_date). \ values('observation__station__name'). \ annotate(n_errors=Count('observation__station__name')) errors_per_station_in_bin = dict() central_time_str = central_time.strftime('%Y-%m-%d') if test_type in ['S', 'B']: for result in station_test_results: station_name = result['station_test__station__name'] errors_per_station_in_bin[station_name] = dict(station_name=station_name, n_errors=result['n_errors'], time=central_time_str) if test_type in ['R', 'B']: for result in rtsm_results: station_name = result['observation__station__name'] if station_name not in errors_per_station_in_bin: errors_per_station_in_bin[station_name] = dict(station_name=station_name, n_errors=result['n_errors'], time=central_time_str) else: errors_per_station_in_bin[station_name]['n_errors'] += result['n_errors'] return errors_per_station_in_bin.values() def compute_errors_per_type(self, from_date, to_date, central_time, station_group, test_type): component_errors = ComponentError.objects.all() rtsm_summary_errors = RTSMErrorSummary.objects.all() station_test_results = [] rtsm_results = [] central_time_str = central_time.strftime('%Y-%m-%d') if station_group: component_errors = component_errors.filter(station_test__station__type=station_group) rtsm_summary_errors = rtsm_summary_errors.filter( observation__station__type=station_group) if self.error_types: component_errors = component_errors.filter(type__in=self.error_types) rtsm_summary_errors = rtsm_summary_errors.filter(error_type__in=self.error_types) if test_type in ['S', 'B']: station_test_results = component_errors. \ filter(station_test__start_datetime__gt=from_date, station_test__start_datetime__lt=to_date). \ values('type'). \ annotate(n_errors=Count('type')) if test_type in ['R', 'B']: rtsm_results = rtsm_summary_errors. \ filter(observation__start_datetime__gt=from_date, observation__start_datetime__lt=to_date). \ values('error_type'). \ annotate(n_errors=Count('error_type')) errors_per_error_type_in_bin = dict() if test_type in ['S', 'B']: for result in station_test_results: error_type = result['type'] errors_per_error_type_in_bin[error_type] = dict(error_type=error_type, n_errors=result['n_errors'], time=central_time_str) if test_type in ['R', 'B']: for result in rtsm_results: error_type = result['error_type'] if error_type not in errors_per_error_type_in_bin: errors_per_error_type_in_bin[error_type] = dict(error_type=error_type, n_errors=result['n_errors'], time=central_time_str) else: errors_per_error_type_in_bin[error_type]['n_errors'] += result['n_errors'] return errors_per_error_type_in_bin.values() def compute_response(self): from_date = parse_date(self.from_date) to_date = parse_date(self.to_date) averaging_interval = datetime.timedelta(days=self.averaging_interval) response_payload = OrderedDict() response_payload['start_date'] = from_date response_payload['end_date'] = to_date response_payload['averaging_interval'] = averaging_interval errors_per_station = [] errors_per_type = [] n_bins = int(ceil((to_date - from_date) / averaging_interval)) for i in range(n_bins): if self.station_group is 'A': station_group = None else: station_group = self.station_group errors_per_station += self.compute_errors_per_station( from_date=from_date + i * averaging_interval, to_date=from_date + (i + 1) * averaging_interval, central_time=from_date + (i + .5) * averaging_interval, station_group=station_group, test_type=self.test_type) errors_per_type += self.compute_errors_per_type( from_date=from_date + i * averaging_interval, to_date=from_date + (i + 1) * averaging_interval, central_time=from_date + (i + .5) * averaging_interval, station_group=station_group, test_type=self.test_type) response_payload['errors_per_station'] = errors_per_station response_payload['errors_per_type'] = errors_per_type return Response(status=status.HTTP_200_OK, data=response_payload) class ControllerAllComponentErrorTypes(ValidableReadOnlyView): description = "Lists all the presents component error types" def compute_response(self): data = [item['type'] for item in ComponentError.objects.values('type').distinct()] return Response(status=status.HTTP_200_OK, data=data) class ControllerStationComponentErrors(ValidableReadOnlyView): description = "Provides a summary per station of the component errors" # required parameters station_name = None from_date = None to_date = None # optional parameters test_type = 'B' error_types = [] fields = [ coreapi.Field( 'station_name', required=True, location='query', schema=coreschema.String(description='name of the station to select') ), coreapi.Field( 'from_date', required=True, location='query', schema=coreschema.String(description='select tests from date (ex. YYYY-MM-DD)') ), coreapi.Field( 'to_date', required=True, location='query', schema=coreschema.String(description='select tests from date (ex. YYYY-MM-DD)') ), coreapi.Field( 'test_type', required=False, location='query', schema=coreschema.Enum( ['R', 'S', 'B'], description='select the type of test possible values are (R, RTSM),' ' (S, Station test), (B, both)[DEFAULT=B]', ) ), coreapi.Field( "error_types", required=False, location='query', type=parse_array, schema=coreschema.Array(description='select the error types to filter for', items=coreschema.Enum(_get_unique_error_types()), unique_items=True) ) ] def collect_station_test_errors(self): station_entry = Station.objects.filter(name=self.station_name).first() response_payload = OrderedDict() station_tests = station_entry.stationtest_set \ .filter(start_datetime__gte=self.from_date, end_datetime__lte=self.to_date) failing_component_types = station_tests.distinct('component_errors__component__type').\ exclude(component_errors__component__type__isnull=True).\ values_list('component_errors__component__type') for failing_component_type in failing_component_types: failing_component_type = failing_component_type[0] component_type_errors_list = list() response_payload[failing_component_type] = component_type_errors_list for station_test in station_tests.order_by('-start_datetime'): test_summary = OrderedDict() test_summary['test_type'] = 'S' test_summary['start_date'] = station_test.start_datetime test_summary['end_date'] = station_test.end_datetime component_errors_dict = OrderedDict() test_summary['component_errors'] = component_errors_dict component_errors = station_test.component_errors\ .filter(component__type=failing_component_type) if self.error_types: component_errors = component_errors.filter(type__in=self.error_types) for component_error in component_errors: component_id = component_error.component.component_id error_type = component_error.type details = component_error.details if component_id not in component_errors: component_errors_dict[str(component_id)] = list() component_errors_dict[str(component_id)] += [dict(error_type=error_type, details=details)] component_type_errors_list.append(test_summary) return response_payload def collect_rtsm_errors(self): station_entry = Station.objects.filter(name=self.station_name).first() response_payload = OrderedDict() rtsm_observations = station_entry.rtsmobservation_set \ .filter(start_datetime__gte=self.from_date, end_datetime__lte=self.to_date) failing_component_modes = rtsm_observations.exclude(errors_summary__isnull=True).distinct( 'errors_summary__mode').values_list('errors_summary__mode') for observing_mode in failing_component_modes: observing_mode = observing_mode[0] rtsm_errors_per_component_type = list() for rtsm_observation in rtsm_observations.order_by('-start_datetime'): rtsm_summary = OrderedDict() rtsm_summary['test_type'] = 'R' rtsm_summary['start_date'] = rtsm_observation.start_datetime rtsm_summary['end_date'] = rtsm_observation.end_datetime rtsm_summary['observation_id'] = rtsm_observation.observation_id component_errors_dict = OrderedDict() rtsm_summary['component_errors'] = component_errors_dict component_errors = rtsm_observation.errors_summary\ .filter(mode=observing_mode)\ .values('error_type', 'start_frequency', 'stop_frequency', 'percentage', 'error_type', 'count', 'rcu', 'pk') if component_errors.count() == 0 and rtsm_observation.errors_summary.count() !=0: continue else: rtsm_errors_per_component_type.append(rtsm_summary) if self.error_types: component_errors = component_errors.filter(error_type__in=self.error_types) for component_error in component_errors: component_id = component_error['rcu'] details = dict(percentage = component_error['percentage'], start_frequency = component_error['start_frequency'], stop_frequency = component_error['stop_frequency'], count = component_error['count']) error_type = component_error['error_type'] # CHECKS IF THE ERROR IS PRESENT IN BOTH RCUS (hence, both polarizations of the antenna) url_to_plot = reverse('rtsm-summary-plot-detail', (component_error['pk'],), request=self.request) details['url'] = url_to_plot if component_id not in component_errors: component_errors_dict[str(component_id)] = list() component_errors_dict[str(component_id)] += [dict(error_type=error_type, details=details)] component_name = MODE_TO_COMPONENT[observing_mode] if component_name not in response_payload: response_payload[component_name] = rtsm_errors_per_component_type else: response_payload[component_name] += rtsm_errors_per_component_type return response_payload def compute_response(self): self.from_date = parse_date(self.from_date) self.to_date = parse_date(self.to_date) station_test_errors = {} rtsm_errors = {} if self.test_type in ['S', 'B']: station_test_errors = self.collect_station_test_errors() if self.test_type in ['R', 'B']: rtsm_errors = self.collect_rtsm_errors() payload = OrderedDict() for component_type in set(rtsm_errors.keys() | station_test_errors.keys()): station_test_errors_per_type = station_test_errors.get(component_type, []) rtsm_errors_per_type = rtsm_errors.get(component_type, []) payload[component_type] = sorted(station_test_errors_per_type + rtsm_errors_per_type, key=lambda item: item['start_date'], reverse=True) return Response(status=status.HTTP_200_OK, data=payload) class ControllerStationComponentElementErrors(ValidableReadOnlyView): station_name= None # required from_date= None # required to_date= None # required component_type= None # required antenna_id= None # required test_type= "A" fields = [ coreapi.Field( 'station_name', required=True, location='query', schema=coreschema.String(description='name of the station to select') ), coreapi.Field( 'from_date', required=True, location='query', schema=coreschema.String(description='select tests from date (ex. YYYY-MM-DD)') ), coreapi.Field( 'to_date', required=True, location='query', schema=coreschema.String(description='select tests from date (ex. YYYY-MM-DD)') ), coreapi.Field( 'component_type', required=True, location='query', schema=coreschema.Enum( ['HBA', 'LBH', 'LBL'], description='select the antenna type. Possible values are (HBA, LBH, LBL)' ) ), coreapi.Field( 'antenna_id', required=True, location='query', type=int, schema=coreschema.Integer(description='Select the antenna id') ), coreapi.Field( 'test_type', required=False, location='query', schema=coreschema.Enum( ['R', 'S', 'A'], description='select the type of test possible values are (R, RTSM),' ' (S, Station test), (A, all)[DEFAULT=A]', ) ) ] def rcu_from_antenna_type_polarization(self, antenna_id, type, polarization): """ Compute the rcu number for a given antenna number, type and polarization :param antenna_id: id of the antenna :param type: type of the antenna :param polarization: polarization either [X, Y] :return: the rcu id :rtype: int """ if polarization not in ['X', 'Y']: raise ValueError('Polarization has to be either X or Y: %s not recognized' % polarization) if type == 'LBH': rcu_id = antenna_id * 2 rcu_id += 0 if polarization == 'X' else 1 elif type == 'LBL': rcu_id = (antenna_id - 48) * 2 rcu_id += 1 if polarization == 'X' else 0 elif type == 'HBA': rcu_id = antenna_id * 2 rcu_id += 0 if polarization == 'X' else 1 else: rcu_id = -1 return rcu_id def rcus_from_antenna_and_type(self, antenna_id, type): rcu_x = self.rcu_from_antenna_type_polarization(antenna_id, type, 'X') rcu_y = self.rcu_from_antenna_type_polarization(antenna_id, type, 'Y') rcus={rcu_x:'X', rcu_y:'Y'} return rcus def compute_ok_rtsm_list(self): mode=COMPONENT_TO_MODE[self.component_type] rcus_per_polarization = self.rcus_from_antenna_and_type(self.antenna_id, self.component_type) good_observation_list = RTSMObservation.objects.filter(errors_summary__mode__in=mode, start_datetime__gt=self.from_date, end_datetime__lt=self.to_date, station__name=self.station_name).\ exclude(errors_summary__rcu__in=list(rcus_per_polarization.keys())).\ order_by('-start_datetime').values('pk', 'observation_id', 'start_datetime', 'end_datetime') result = [] for observation in good_observation_list: entry = dict(test_id=observation['observation_id'], db_id=observation['pk'], start_date=observation['start_datetime'], end_date=observation['end_datetime'], test_type='R', component_errors=dict()) result.append(entry) return result def compute_ok_station_test(self): good_station_test = StationTest.objects.filter(start_datetime__gt=self.from_date, end_datetime__lt=self.to_date, station__name=self.station_name).\ exclude(component_errors__component__component_id=self.antenna_id).\ values('pk', 'start_datetime', 'end_datetime').order_by('-start_datetime') result = [] for station_test in good_station_test: entry = dict(test_id=station_test['pk'], db_id=station_test['pk'], start_date=station_test['start_datetime'], end_date=station_test['end_datetime'], test_type='S', component_errors=dict()) result.append(entry) return result def compute_rtsm_errors_list(self): errors = dict() rcus_per_polarization = self.rcus_from_antenna_and_type(self.antenna_id, self.component_type) rtsm_errors = RTSMErrorSummary.objects.values('observation__pk', 'observation__start_datetime', 'observation__end_datetime', 'observation__observation_id', 'observation__station__name', 'pk', 'rcu', 'mode', 'error_type', 'percentage', 'count', 'observation__samples').filter( observation__start_datetime__gt=self.from_date, observation__end_datetime__lt=self.to_date, observation__station__name=self.station_name, mode__in=COMPONENT_TO_MODE[self.component_type], rcu__in=list(rcus_per_polarization.keys())).order_by('-observation__start_datetime') for item in rtsm_errors: observation_pk = item['observation__pk'] if observation_pk not in errors.keys(): errors[observation_pk] = dict(test_id=item['observation__observation_id'], db_id=item['observation__pk'], start_date=item[ 'observation__start_datetime'], end_date=item[ 'observation__end_datetime'], test_type='R', component_errors=dict()) rcu = item['rcu'] polarization = rcus_per_polarization[rcu] if polarization not in errors[observation_pk]: errors[observation_pk]['component_errors'][polarization] = dict( rcu=rcu, errors = dict() ) error_type = item['error_type'] percentage = item['percentage'] count = item['count'] mode = item['mode'] samples = item['observation__samples'] url_to_plot = reverse('rtsm-summary-plot-detail', (item['pk'],), request=self.request) errors[observation_pk]['component_errors'][polarization]['errors'][error_type] = dict(samples=samples, percentage=percentage, count=count, mode=mode, url=url_to_plot) return list(errors.values()) def compute_station_tests_error_list(self): errors = dict() rcus_per_polarization = self.rcus_from_antenna_and_type(self.antenna_id, self.component_type) component_errors = ComponentError.objects.filter( station_test__start_datetime__gt=self.from_date, station_test__end_datetime__lt=self.to_date, station_test__station__name=self.station_name, component__type=self.component_type, component__component_id=self.antenna_id).order_by('-station_test__start_datetime') for component_error in component_errors: station_test_pk = component_error.station_test.pk if station_test_pk not in errors.keys(): errors[station_test_pk] = dict(test_id=station_test_pk, db_id=station_test_pk, start_date=component_error.station_test.start_datetime, end_date=component_error.station_test.end_datetime, test_type='S', component_errors=dict()) for rcu_id, polarization in rcus_per_polarization.items(): if polarization not in errors[station_test_pk]: errors[station_test_pk]['component_errors'][polarization] = dict( rcu=rcu_id, errors=dict()) error_type = component_error.type details = component_error.details errors_per_error_polarization = errors[station_test_pk]['component_errors'][polarization]['errors'] errors_per_error_polarization[error_type] = dict(details=details, element_errors=dict()) for element in component_error.failing_elements.values('element__element_id', 'details'): element_id = element['element__element_id'] errors_per_error_polarization[error_type]['element_errors'][element_id] = element['details'] return list(errors.values()) def compute_response(self): self.from_date = parse_date(self.from_date) self.to_date = parse_date(self.to_date) rtsm_errors_list = [] station_test_list = [] if self.test_type == 'R' or self.test_type == 'A': rtsm_errors_list = self.compute_rtsm_errors_list() + self.compute_ok_rtsm_list() if self.test_type == 'S' or self.test_type == 'A': station_test_list = self.compute_station_tests_error_list() + self.compute_ok_station_test() combined = rtsm_errors_list + station_test_list combined_and_sorted = sorted(combined, key=lambda test: test['start_date'], reverse=True) return Response(status=status.HTTP_200_OK, data=dict(errors=combined_and_sorted))