Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
station_test_raw_parser.py 11.96 KiB
"""
This modules contains all the function needed to parse a raw station test output into the models used to describe it
"""
import logging
from datetime import datetime

import pytz
import re
from collections import defaultdict

logger = logging.getLogger('station_test.parser')


def parse_key_value_pairs(content):
    """
    Parse a key value pair returning a dict.
    es 'key=value' -> {'key': 'value'}
    :param content: the string to parse
    :return: dict
    """
    assert '=' in content
    key, value = content.split('=')
    if re.match('\A\d*\.\d*', key):
        key = 'v' + key.strip('V')

    if re.match('\A\w*-\d*.\d*', key):
        key = key.replace('-', '')

    key = re.sub('([.-])', '_', key).lower()

    return {key: value}


def parse_datetime(date, date_time):
    """
    If the datetime is only the time of the current date it used the data
    to generate the date time
    :param date:
    :param date_time:
    :return:
    """
    if 'T' in date_time:
        try:
            return pytz.utc.localize(datetime.strptime(date_time, '%Y-%m-%dT%H:%M:%S'))
        except ValueError:
            return pytz.utc.localize(datetime.strptime(date_time, '%d-%m-%YT%H:%M:%S'))
    else:
        try:
            return pytz.utc.localize(
                datetime.strptime("T".join([date, date_time]), '%Y%m%dT%H:%M:%S'))
        except ValueError:
            return pytz.utc.localize(
                datetime.strptime("T".join([date, date_time]), '%d-%m-%YT%H:%M:%S'))


def parse_raw_station_test(content):
    """
    Expects a string content with the station test output
    and output a list of Django Models
    :param content: string content with the station test output
    :return: a list of Django models
    """
    logger.debug('content splitted %s', content.split('\n'))
    station_tests = split_history_into_tests(content.split('\n'))
    logger.debug('station_tests_are %s', station_tests)
    results = []
    for stest in station_tests:
        dict_stest = dict_from_raw_station_test(stest)
        logger.debug('parsed test %s', dict_stest)
        if dict_stest['station']['name'] == 'Unknown':
            logger.error('error in the station name for test %s', dict_stest)
            continue
        if 'start_datetime' in dict_stest:
            results.append(dict_stest)
        else:
            return None
    return results


def split_history_into_tests(content):
    all_tests = []
    current_test = []
    for i, line in enumerate(content[:-1]):
        next_line_columns = content[i + 1].split(',')
        line_type = next_line_columns[3]
        current_test.append(line)
        if 'VERSIONS' in line_type or (
                'STATION' in line_type and ('VERSIONS' not in current_test[0])):
            all_tests.append(current_test)
            current_test = []
    all_tests.append(current_test)
    return all_tests


def parse_component_id(component_id_str):
    """
    It parses the component_id string and returns a integer with the component
    if the string is != --- or None otherwise
    :param component_id_str: string representation of the component id as in the stationtest output
    :return: the component id integer
    """
    return int(component_id_str) if component_id_str != '---' else -1


def collect_error_details(content):
    """
    Merge the error_details list of dicts into a unique dict
    :param content: row describing the component error in a stationtest output
    :return: a dict with the merged content of the list of dict
    """
    error_details = dict()
    for k in content[4:]:
        if isinstance(k, dict):
            error_details.update(k)
        else:
            error_details.update({k: True})

    # Parse RF_FAIL string
    if 'x' in error_details:
        rf_fail = parse_rffail_string('x', error_details.pop('x'))
        error_details.update(rf_fail)

    if 'y' in error_details:
        rf_fail = parse_rffail_string('y', error_details.pop('y'))
        error_details.update(rf_fail)

    element_errors = []
    for error_name, error_value in dict(error_details).items():
        modem_comunication_error_pattern = 'e([0-9]{2})'
        matched_key = re.search(modem_comunication_error_pattern, error_name)

        if matched_key:
            error_details.pop(error_name)
            element_id = int(matched_key.group(1))
            element = dict(element_id=element_id)

            element_error_details = dict(error_code=error_value)
            element_error = dict(element=element,
                                 type='MODEM',
                                 details=element_error_details)
            element_errors.append(element_error)

        nosignal_error_pattern = 'E([0-9]{2})([XY])'
        matched_key = re.search(nosignal_error_pattern, error_name)

        if matched_key:
            error_details.pop(error_name)
            element_id = int(matched_key.group(1))
            polarization = matched_key.group(2).lower()
            element = dict(element_id=element_id)

            element_error_details = {polarization: error_value}
            element_error = dict(element=element,
                                 type='NOSIGNAL',
                                 details=element_error_details)
            element_errors.append(element_error)

        if error_name in ['X', 'Y']:
            error_details.pop(error_name)
            error_name = error_name.lower()
            error_details[error_name] = error_value
    error_details.update(element_errors=element_errors)

    return error_details


def dict_from_component_error(content):
    """
    Parse the component error into a dict
    :param content: row representing the component error in the station test output
    :return:
    """
    component_type, component_id, error_type = content[1:4]
    error_details = collect_error_details(content)

    component = dict(component_id=parse_component_id(component_id), type=component_type.strip())

    element_errors = error_details.pop('element_errors')

    component_error = dict(component=component,
                           details=error_details,
                           type=error_type.strip())

    for element_error in element_errors:
        element_error['component_error'] = dict(component_error)

    component_error['element_errors'] = element_errors
    return component_error


def dict_from_raw_station_test(content):
    """
    Expects a string content with the station test output
    and output a dict that describes the content of the station test
    :param content: string content with the station test output
    :return: a station test dict
    """

    preparsed_content = []
    for line in content:
        if "CHECKS" in line:
            values = [value for value in line.split(',')]
        else:
            values = [parse_key_value_pairs(value) if ('=' in value) else value for value in
                      line.split(',')]

        preparsed_content.append(values)
    result = {'component_errors': [], 'element_errors': []}
    for row in preparsed_content:
        row_type = row[3]
        if row_type == "STATION":
            station_name = row[4]['name']
            station_type = station_name[0].capitalize()
            station_type = station_type if station_type in ['C', 'R'] else 'I'
            result.update(station=dict(name=station_name, type=station_type))
        elif row_type == 'VERSIONS':
            pass
        elif row_type == 'BADLIST':
            pass
        elif row_type == 'TOOLOW':
            pass
        elif row_type == "RUNTIME":
            start_time = parse_datetime(row[0], row[4]['start'])
            end_time = parse_datetime(row[0], row[5]['stop'])

            result.update(start_datetime=start_time)
            result.update(end_datetime=end_time)
        elif row_type == 'CHECKS':
            result.update(checks=" ".join(row[4:]))
        elif row_type == 'TESTSIGNAL':
            pass
        elif row_type == 'STATISTICS':
            pass
        elif row_type == 'DRIVER':
            pass
        elif row_type == 'E_FAIL':
            result['element_errors'].extend(dicts_from_element_error(row))
        else:
            component_error_dict = dict_from_component_error(row)
            element_errors = component_error_dict.pop('element_errors')
            result['element_errors'].extend(element_errors)

            result['component_errors'].append(component_error_dict)

    return result


element_error_name_mapping = {
    'm': 'MODEM',
    'o': 'OSCILLATION',
    'sp': 'SPURIOUS',
    'hn': 'HIGH_NOISE',
    'ln': 'LOW_NOISE',
    'j': 'JITTER',
    '': 'RF_FAIL'
}


def parse_rffail_string(polarization, rf_string):
    """
    Parse the string for the rffail test into a dict
    :param polarization: polarization to which the test (either ['x'|'y'])
    :param rf_string: content
    :return: a dict that represent the rf fail test outcome
    """
    parameters = list(map(float, rf_string.replace('nan', '999').
                          replace('-1', '999').
                          split(' ')))
    HBA_RF_FAIL_NPARAMETERS = 6

    LBA_RF_FAIL_NPARAMETERS = 1

    if HBA_RF_FAIL_NPARAMETERS == len(parameters):
        measured_signal_nodelay, measured_signal_fulldelay, subband_used_nodelay,\
        subband_used_fulldelay,\
        reference_signal_nodelay, reference_signal_fulldelay = map(float,
                                                                   rf_string.replace('nan', '999').
                                                                   replace('-1', '999').
                                                                   split(' '))
        return {
            polarization + 'val_full_delay': measured_signal_fulldelay,
            polarization + 'val_no_delay': measured_signal_nodelay,
            polarization + 'sb_full_delay': subband_used_fulldelay,
            polarization + 'sb_no_delay': subband_used_nodelay,
            polarization + 'ref_full_delay': reference_signal_fulldelay,
            polarization + 'ref_no_delay': reference_signal_nodelay}
    elif LBA_RF_FAIL_NPARAMETERS:
        return {polarization + 'val': parameters[0]}
    else:
        raise ValueError('String %s is not a RF_FAIL', rf_string)


def dicts_from_element_error(contents):
    """
    Parses the content into a dict that represents the tile error's data
    :param contents:
    :return:
    """
    results = defaultdict(dict)
    key_pattern = "([^xy0-9]{0,3})([x,y]{0,1})([0-9]{1,3})"
    component_type, component_id, error_type = contents[1:4]
    for tile_error in contents[4:]:
        key, args = tile_error.popitem()

        element_error_type, polarization, element_id = re.search(key_pattern, key).groups()
        component_error_type = element_error_name_mapping[element_error_type]
        component = dict(type=component_type,
                         component_id=int(component_id))
        component_error = dict(component=component,
                               type=component_error_type)
        element = dict(element_id=int(element_id))
        element_error = dict(component_error=component_error,
                             element=element,
                             type=component_error_type)
        element_error_details = dict()
        if element_error_type in ['hn', 'ln']:
            value, diff = map(float, args.strip().split(' '))
            element_error_details[polarization + 'val'] = value
            element_error_details[polarization + 'diff'] = diff
        elif 'j' in element_error_type:
            element_error_details[polarization + 'diff'] = float(args.strip())
        elif element_error_type == '':
            element_error_details.update(parse_rffail_string(polarization, args))
        elif element_error_type in ['o', 'sp']:
            element_error_details[polarization] = True
        element_error['details'] = element_error_details
        element_error_key = (component_id, element_id, element_error_type)
        if element_error_key in results:
            results[element_error_key]['details'].update(element_error_details)
        else:
            results[element_error_key] = dict(element_error)
    return list(results.values())