Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
station_test_raw_parser.py 18.31 KiB
"""
This modules contains all the function needed to parse a raw station test output into the models used to describe it
"""
from datetime import datetime
import re
import logging
import pytz

logger = logging.getLogger('station_test.parser')


def parse_key_value_pairs(content):
    """
    Parse a key value pair returning a dict.
    es 'key=value' -> {'key': 'value'}
    :param content: the string to parse
    :return: dict
    """
    assert '=' in content
    pairs = content.split('=')
    return {pairs[0]: pairs[1]}


def parse_datetime(date, date_time):
    """
    If the datetime is only the time of the current date it used the data
    to generate the date time
    :param date:
    :param date_time:
    :return:
    """
    if 'T' in date_time:
        try:
            return pytz.utc.localize(datetime.strptime(date_time, '%Y-%m-%dT%H:%M:%S'))
        except ValueError:
            return pytz.utc.localize(datetime.strptime(date_time, '%d-%m-%YT%H:%M:%S'))
    else:
        try:
            return pytz.utc.localize(datetime.strptime("T".join([date, date_time]), '%Y%m%dT%H:%M:%S'))
        except ValueError:
            return pytz.utc.localize(datetime.strptime("T".join([date, date_time]), '%d-%m-%YT%H:%M:%S'))


def parse_raw_station_test(content):
    """
    Expects a string content with the station test output
    and output a list of Django Models
    :param content: string content with the station test output
    :return: a list of Django models
    """
    logger.debug('content splitted %s', content.split('\n'))
    station_tests = split_history_into_tests(content.split('\n'))
    logger.debug('station_tests_are %s', station_tests)
    results = []
    for stest in station_tests:
        dict_stest = dict_from_raw_station_test(stest)
        logger.debug('parsed test %s', dict_stest)
        if dict_stest['station_name'] == 'Unknown':
            logger.error('error in the station name for test %s', dict_stest)
            continue
        if 'start_time' in dict_stest:
            results.append(dict_stest)
        else:
            return None
    return results


def split_history_into_tests(content):
    all_tests = []
    current_test = []
    for i, line in enumerate(content[:-1]):
        next_line_columns = content[i + 1].split(',')
        line_type = next_line_columns[3]
        current_test.append(line)
        if 'VERSIONS' in line_type or ('STATION' in line_type and ('VERSIONS' not in current_test[0])):
            all_tests.append(current_test)
            current_test = []
    all_tests.append(current_test)
    return all_tests


def dict_from_spu_component_error(content):
    error_type = content[3]
    extra_args = content[4:]
    result = dict()
    resourcetype = 'SPUComponentError'
    if error_type == 'VOLTAGE':
        resourcetype = 'SPUVoltageError'
        for arg in extra_args:
            key, value = arg.popitem()
            result['voltage_' + key[0:3].lower()] = float(value)
    result.update(resourcetype=resourcetype)
    return result


def dict_from_rsp_component_error(content):
    error_type = content[3]
    extra_args = content[4:]
    result = dict()
    resourcetype = 'RSPComponentError'
    if error_type == 'VERSION':
        resourcetype = 'RSPVersionError'
        for arg in extra_args:
            result.update(arg)

    elif error_type == 'VOLTAGE':
        resourcetype = 'RSPVoltageError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = 'voltage_' + key.replace('.', '_').strip('V')
            result[key] = float(value)

    elif error_type == 'TEMPERATURE':
        resourcetype = 'RSPTemperatureError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = key.lower()
            result[key] = float(value)
    result.update(resourcetype=resourcetype)
    return result


def dict_from_tbb_component_error(content):
    error_type = content[3]
    extra_args = content[4:]
    result = dict()
    resourcetype = 'TBBComponentError'
    if error_type == 'VERSION':
        resourcetype = 'TBBVersionError'
        for arg in extra_args:
            result.update(arg)

    elif error_type == 'VOLTAGE':
        resourcetype = 'TBBVoltageError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = 'voltage_' + key.replace('.', '_').strip('V')
            result[key] = float(value)

    elif error_type == 'TEMPERATURE':
        resourcetype = 'TBBTemperatureError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = key.lower()
            result[key] = float(value)
    result.update(resourcetype=resourcetype)
    return result


def dict_from_hba_component_error(content):
    error_type = content[3]
    extra_args = content[4:]
    result = dict(tile_errors=[])
    translate = dict(x='signalX', y='signalY', xoff='signalX_offset', yoff='signalY_offset',
                     xmean='mean_signalX', ymean='mean_signalY',
                     proc='percentage', val='value', diff='fluctuation', ref='limit')
    if error_type in ['C_SUMMATOR', 'P_SUMMATOR']:
        resourcetype = 'HBASummatorError'
        type = error_type[0].capitalize()
        result.update(type=type)
    elif error_type == 'MODEM':
        resourcetype = 'HBAModemError'

    elif error_type == 'RF_FAIL':
        resourcetype = 'HBARFFailError'
        for arg in extra_args:
            key, values = arg.popitem()
            polarization = key.strip().capitalize()
            measured_signal_nodelay, \
            measured_signal_fulldelay, \
            subband_used_nodelay, \
            subband_used_fulldelay, \
            reference_signal_nodelay, \
            reference_signal_fulldelay = map(float, values.replace('nan', '999').split(' '))

            result['measured_signal_nodelay' + polarization] = measured_signal_nodelay
            result['measured_signal_fulldelay' + polarization] = measured_signal_fulldelay
            result['subband_used_nodelay' + polarization] = measured_signal_nodelay
            result['subband_used_fulldelay' + polarization] = subband_used_fulldelay
            result['reference_signal_nodelay' + polarization] = measured_signal_nodelay
            result['reference_signal_fulldelay' + polarization] = reference_signal_fulldelay
    elif error_type == 'SUMMATOR_NOISE':
        resourcetype = 'HBASummatorNoise'
        for arg in extra_args:
            key, value = arg.popitem()
            key = 'polarization' + key.strip().capitalize()
            result[key] = True
    elif 'NOISE' in error_type:
        resourcetype = 'HBANoiseError'
        type = 'H' if 'HIGH' in error_type else 'L'
        result.update(type=type)
        for arg in extra_args:
            key, value = arg.popitem()
            polarization = 'X' if 'X' in key else 'Y'
            key = key.strip('X').strip('Y')
            if key not in translate:
                logger.error('cannot translate Noise HBA error in %s ', content)
                raise Exception('cannot translate Noise HBA error in %s', content)
            key = translate[key] + polarization
            if 'value' in key:
                key = 'peak_' + key
            result[key] = float(value)
    elif 'JITTER' in error_type:
        resourcetype = 'HBAJitterError'
        for arg in extra_args:
            key, value = arg.popitem()
            polarization = 'X' if 'X' in key else 'Y'
            key = key.strip('X').strip('Y')
            key = 'val' if key == 'ref' else key

            key = translate[key] + polarization
            if 'value' in key:
                key = 'average_' + key
            result[key] = float(value)
    elif error_type == 'OSCILLATION':
        resourcetype = 'HBAOscillationError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = 'polarization' + key.strip().capitalize()
            result[key] = True

    elif error_type == 'SPURIOUS':
        resourcetype = 'HBASpuriousError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = 'polarization' + key.strip().capitalize()
            result[key] = True

    else:
        raise NotImplementedError('Cannot parse HBA component error %s' % (content,))
    result.update(resourcetype=resourcetype)
    return result


def dict_from_lba_component_error(content):
    component_type = content[1]
    error_type = content[3]
    extra_args = content[4:]
    result = dict()
    result['lba_type'] = component_type[-1].lower()
    translate = dict(x='signalX', y='signalY', xoff='signalX_offset', yoff='signalY_offset',
                     xmean='mean_signalX', ymean='mean_signalY',
                     proc='percentage', val='value', diff='fluctuation', ref='limit')
    resourcetype = 'LBAComponentError'
    if error_type == 'DOWN':
        resourcetype = 'LBADownError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = translate[key.lower()]
            result[key] = float(value)

    elif error_type == 'RF_FAIL':
        resourcetype = 'LBARFFailError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = translate[key.lower()]
            result[key] = float(value)

    elif error_type == 'FLAT':
        resourcetype = 'LBAFlatError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = translate[key.lower()]
            result[key] = float(value)

    elif error_type == 'SHORT':
        resourcetype = 'LBAShortError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = translate[key.lower()]
            result[key] = float(value)

    elif error_type == 'OSCILLATION':
        resourcetype = 'LBAOscillationError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = 'polarization' + key.strip().capitalize()
            result[key] = True

    elif error_type == 'SPURIOUS':
        resourcetype = 'LBASpuriousError'
        for arg in extra_args:
            key, value = arg.popitem()
            key = 'polarization' + key.strip().capitalize()
            result[key] = True

    elif 'NOISE' in error_type:
        type = 'H' if 'HIGH' in error_type else 'L'
        resourcetype = "LBANoiseError"
        result.update(type=type)
        for arg in extra_args:
            key, value = arg.popitem()
            polarization = 'X' if 'X' in key else 'Y'
            key = key.strip('X').strip('Y')
            key = translate[key] + polarization
            if 'value' in key:
                key = 'peak_' + key
            result[key] = float(value)
    elif 'JITTER' in error_type:
        resourcetype = "LBAJitterError"
        for arg in extra_args:
            key, value = arg.popitem()
            polarization = 'X' if 'X' in key else 'Y'
            key = key.strip('X').strip('Y')
            key = 'val' if key == 'ref' else key

            key = translate[key] + polarization

            if 'value' in key:
                key = 'average_' + key
            result[key] = float(value)
    result.update(resourcetype=resourcetype)
    return result


def dict_from_component_error(content):
    """
    Parse the line that describes the component error and returns a dict
    """
    component_id = int(content[2])
    component_type = content[1]
    error_type = content[3]

    result = dict(component_id=component_id,
                  component_type=component_type,
                  error_type=error_type)
    if component_type == 'SPU':
        result.update(dict_from_spu_component_error(content))

    elif component_type == 'RSP':
        result.update(dict_from_rsp_component_error(content))

    elif component_type == 'RCU':
        result.update(resourcetype='RCUComponentError')
    elif component_type == 'TBB':
        result.update(dict_from_tbb_component_error(content))
    elif 'LB' in component_type:
        result.update(dict_from_lba_component_error(content))
    elif 'HBA' in component_type:
        result.update(dict_from_hba_component_error(content))
    else:
        raise NotImplementedError('Unknown error type ' + str(content))
    return result


def dict_from_raw_station_test(content):
    """
    Expects a string content with the station test output
    and output a dict that describes the content of the station test
    :param content: string content with the station test output
    :return: a station test dict
    """

    preparsed_content = []
    for line in content:
        if "CHECKS" in line:
            values = [value for value in line.split(',')]
        else:
            values = [parse_key_value_pairs(value) if ('=' in value) else value for value in line.split(',')]

        preparsed_content.append(values)
    result = {'components_error': {}}
    for row in preparsed_content:
        row_type = row[3]
        if row_type == "STATION":
            station_name = row[4]['NAME']
            station_type = station_name[0].capitalize()
            station_type = station_type if station_type in ['C', 'R'] else 'I'
            result.update(station_name=station_name, station_type=station_type)
        elif row_type == 'VERSIONS':
            pass
        elif row_type == 'BADLIST':
            pass
        elif row_type == 'TOOLOW':
            pass
        elif row_type == "RUNTIME":
            start_time = parse_datetime(row[0], row[4]['START'])
            end_time = parse_datetime(row[0], row[5]['STOP'])

            result.update(start_time=start_time)
            result.update(end_time=end_time)
        elif row_type == 'CHECKS':
            result.update(performed_checks=" ".join(row[4:]))
        elif row_type == 'TESTSIGNAL':
            pass
        elif row_type == 'NOSIGNAL':
            pass
        elif row_type == 'STATISTICS':
            pass
        elif row_type == 'DRIVER':
            pass
        elif row_type == "E_FAIL":
            tile_errors = dicts_from_tile_error(row[4:])
            translate_tile_error_to_component_error_type = dict(TileModemError='HBAModemError',
                                                                TileOscillatingError='HBAOscillationError',
                                                                TileSpuriousError='HBASpuriousError',
                                                                TileNoiseError='HBANoiseError',
                                                                TileJitterError='HBAJitterError',
                                                                TileRFFailure='HBARFFailError')
            translate_tile_error_to_component_error = dict(TileModemError='MODEM',
                                                           TileOscillatingError='OSCILLATIN',
                                                           TileSpuriousError='SPURIOUS',
                                                           TileNoiseError='NOISE',
                                                           TileJitterError='JITTER',
                                                           TileRFFailure='RF_FAIL')

            for tile_error in tile_errors:
                linked_type = translate_tile_error_to_component_error_type[tile_error['resourcetype']]
                component_id = int(row[2])
                if (component_id, linked_type) not in result['components_error']:
                    error_type = translate_tile_error_to_component_error[tile_error['resourcetype']]
                    if error_type == 'NOISE':
                        error_type = 'HIGH_' + error_type if tile_error['type'] == 'H' else 'LOW_' + error_type
                    new_component_error = dict(resourcetype=linked_type,
                                               component_id=component_id,
                                               component_type='HBA',
                                               error_type=error_type, tile_errors=[])
                    result['components_error'][(component_id, linked_type)] = new_component_error

                result['components_error'][(component_id, linked_type)]['tile_errors'].append(tile_error)

        else:
            if row[2] == '---':
                logger.error('row type %s unrecognized: %s', row_type, row)
            else:
                result['components_error'][(int(row[2]), row_type)] = dict_from_component_error(row)

    result['components_error'] = result['components_error'].values()
    return result


def dicts_from_tile_error(contents):
    """
    Parses the content into a dict that represents the tile error's data
    :param contents:
    :return:
    """
    results = list()
    key_pattern = "([^XY0-9]{0,3})([X,Y]{0,1})([0-9]{1,3})"
    for tile_error in contents:
        key, args = tile_error.popitem()

        error_type, polarization, tile_id = re.search(key_pattern, key).groups()

        resourcetype = "TileError"
        item = dict(tile_id=tile_id, polarization=polarization)
        if error_type == 'M':
            item.pop('polarization')
            resourcetype = 'TileModemError'
        elif error_type == 'O':
            resourcetype = 'TileOscillatingError'
        elif error_type == 'SP':
            resourcetype = 'TileSpuriousError'
        elif error_type in ['HN', 'LN']:
            resourcetype = 'TileNoiseError'

            item['type'] = error_type[0]
            value, diff = map(float, args.strip().split(' '))
            item['value'] = value
            item['diff'] = diff

        elif 'J' in error_type:
            resourcetype = 'TileJitterError'
            item['fluctuation'] = float(args.strip())
        elif error_type == '':
            resourcetype = 'TileRFFailure'
            measured_signal_nodelay, measured_signal_fulldelay, subband_used_nodelay, subband_used_fulldelay, \
            reference_signal_nodelay, reference_signal_fulldelay = map(float,
                                                                       args.replace('nan', '999').
                                                                       replace('-1', '999').
                                                                       split(' '))
            item.update(measured_signal_fulldelay=measured_signal_fulldelay,
                        measured_signal_nodelay=measured_signal_nodelay,
                        subband_used_fulldelay=subband_used_fulldelay,
                        subband_used_nodelay=subband_used_nodelay,
                        reference_signal_fulldelay=reference_signal_fulldelay,
                        reference_signal_nodelay=reference_signal_nodelay)
        item.update(resourcetype=resourcetype)
        results.append(item)
    return results