-
Mattia Mancini authoredMattia Mancini authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
station_test_raw_parser.py 18.31 KiB
"""
This modules contains all the function needed to parse a raw station test output into the models used to describe it
"""
from datetime import datetime
import re
import logging
import pytz
logger = logging.getLogger('station_test.parser')
def parse_key_value_pairs(content):
"""
Parse a key value pair returning a dict.
es 'key=value' -> {'key': 'value'}
:param content: the string to parse
:return: dict
"""
assert '=' in content
pairs = content.split('=')
return {pairs[0]: pairs[1]}
def parse_datetime(date, date_time):
"""
If the datetime is only the time of the current date it used the data
to generate the date time
:param date:
:param date_time:
:return:
"""
if 'T' in date_time:
try:
return pytz.utc.localize(datetime.strptime(date_time, '%Y-%m-%dT%H:%M:%S'))
except ValueError:
return pytz.utc.localize(datetime.strptime(date_time, '%d-%m-%YT%H:%M:%S'))
else:
try:
return pytz.utc.localize(datetime.strptime("T".join([date, date_time]), '%Y%m%dT%H:%M:%S'))
except ValueError:
return pytz.utc.localize(datetime.strptime("T".join([date, date_time]), '%d-%m-%YT%H:%M:%S'))
def parse_raw_station_test(content):
"""
Expects a string content with the station test output
and output a list of Django Models
:param content: string content with the station test output
:return: a list of Django models
"""
logger.debug('content splitted %s', content.split('\n'))
station_tests = split_history_into_tests(content.split('\n'))
logger.debug('station_tests_are %s', station_tests)
results = []
for stest in station_tests:
dict_stest = dict_from_raw_station_test(stest)
logger.debug('parsed test %s', dict_stest)
if dict_stest['station_name'] == 'Unknown':
logger.error('error in the station name for test %s', dict_stest)
continue
if 'start_time' in dict_stest:
results.append(dict_stest)
else:
return None
return results
def split_history_into_tests(content):
all_tests = []
current_test = []
for i, line in enumerate(content[:-1]):
next_line_columns = content[i + 1].split(',')
line_type = next_line_columns[3]
current_test.append(line)
if 'VERSIONS' in line_type or ('STATION' in line_type and ('VERSIONS' not in current_test[0])):
all_tests.append(current_test)
current_test = []
all_tests.append(current_test)
return all_tests
def dict_from_spu_component_error(content):
error_type = content[3]
extra_args = content[4:]
result = dict()
resourcetype = 'SPUComponentError'
if error_type == 'VOLTAGE':
resourcetype = 'SPUVoltageError'
for arg in extra_args:
key, value = arg.popitem()
result['voltage_' + key[0:3].lower()] = float(value)
result.update(resourcetype=resourcetype)
return result
def dict_from_rsp_component_error(content):
error_type = content[3]
extra_args = content[4:]
result = dict()
resourcetype = 'RSPComponentError'
if error_type == 'VERSION':
resourcetype = 'RSPVersionError'
for arg in extra_args:
result.update(arg)
elif error_type == 'VOLTAGE':
resourcetype = 'RSPVoltageError'
for arg in extra_args:
key, value = arg.popitem()
key = 'voltage_' + key.replace('.', '_').strip('V')
result[key] = float(value)
elif error_type == 'TEMPERATURE':
resourcetype = 'RSPTemperatureError'
for arg in extra_args:
key, value = arg.popitem()
key = key.lower()
result[key] = float(value)
result.update(resourcetype=resourcetype)
return result
def dict_from_tbb_component_error(content):
error_type = content[3]
extra_args = content[4:]
result = dict()
resourcetype = 'TBBComponentError'
if error_type == 'VERSION':
resourcetype = 'TBBVersionError'
for arg in extra_args:
result.update(arg)
elif error_type == 'VOLTAGE':
resourcetype = 'TBBVoltageError'
for arg in extra_args:
key, value = arg.popitem()
key = 'voltage_' + key.replace('.', '_').strip('V')
result[key] = float(value)
elif error_type == 'TEMPERATURE':
resourcetype = 'TBBTemperatureError'
for arg in extra_args:
key, value = arg.popitem()
key = key.lower()
result[key] = float(value)
result.update(resourcetype=resourcetype)
return result
def dict_from_hba_component_error(content):
error_type = content[3]
extra_args = content[4:]
result = dict(tile_errors=[])
translate = dict(x='signalX', y='signalY', xoff='signalX_offset', yoff='signalY_offset',
xmean='mean_signalX', ymean='mean_signalY',
proc='percentage', val='value', diff='fluctuation', ref='limit')
if error_type in ['C_SUMMATOR', 'P_SUMMATOR']:
resourcetype = 'HBASummatorError'
type = error_type[0].capitalize()
result.update(type=type)
elif error_type == 'MODEM':
resourcetype = 'HBAModemError'
elif error_type == 'RF_FAIL':
resourcetype = 'HBARFFailError'
for arg in extra_args:
key, values = arg.popitem()
polarization = key.strip().capitalize()
measured_signal_nodelay, \
measured_signal_fulldelay, \
subband_used_nodelay, \
subband_used_fulldelay, \
reference_signal_nodelay, \
reference_signal_fulldelay = map(float, values.replace('nan', '999').split(' '))
result['measured_signal_nodelay' + polarization] = measured_signal_nodelay
result['measured_signal_fulldelay' + polarization] = measured_signal_fulldelay
result['subband_used_nodelay' + polarization] = measured_signal_nodelay
result['subband_used_fulldelay' + polarization] = subband_used_fulldelay
result['reference_signal_nodelay' + polarization] = measured_signal_nodelay
result['reference_signal_fulldelay' + polarization] = reference_signal_fulldelay
elif error_type == 'SUMMATOR_NOISE':
resourcetype = 'HBASummatorNoise'
for arg in extra_args:
key, value = arg.popitem()
key = 'polarization' + key.strip().capitalize()
result[key] = True
elif 'NOISE' in error_type:
resourcetype = 'HBANoiseError'
type = 'H' if 'HIGH' in error_type else 'L'
result.update(type=type)
for arg in extra_args:
key, value = arg.popitem()
polarization = 'X' if 'X' in key else 'Y'
key = key.strip('X').strip('Y')
if key not in translate:
logger.error('cannot translate Noise HBA error in %s ', content)
raise Exception('cannot translate Noise HBA error in %s', content)
key = translate[key] + polarization
if 'value' in key:
key = 'peak_' + key
result[key] = float(value)
elif 'JITTER' in error_type:
resourcetype = 'HBAJitterError'
for arg in extra_args:
key, value = arg.popitem()
polarization = 'X' if 'X' in key else 'Y'
key = key.strip('X').strip('Y')
key = 'val' if key == 'ref' else key
key = translate[key] + polarization
if 'value' in key:
key = 'average_' + key
result[key] = float(value)
elif error_type == 'OSCILLATION':
resourcetype = 'HBAOscillationError'
for arg in extra_args:
key, value = arg.popitem()
key = 'polarization' + key.strip().capitalize()
result[key] = True
elif error_type == 'SPURIOUS':
resourcetype = 'HBASpuriousError'
for arg in extra_args:
key, value = arg.popitem()
key = 'polarization' + key.strip().capitalize()
result[key] = True
else:
raise NotImplementedError('Cannot parse HBA component error %s' % (content,))
result.update(resourcetype=resourcetype)
return result
def dict_from_lba_component_error(content):
component_type = content[1]
error_type = content[3]
extra_args = content[4:]
result = dict()
result['lba_type'] = component_type[-1].lower()
translate = dict(x='signalX', y='signalY', xoff='signalX_offset', yoff='signalY_offset',
xmean='mean_signalX', ymean='mean_signalY',
proc='percentage', val='value', diff='fluctuation', ref='limit')
resourcetype = 'LBAComponentError'
if error_type == 'DOWN':
resourcetype = 'LBADownError'
for arg in extra_args:
key, value = arg.popitem()
key = translate[key.lower()]
result[key] = float(value)
elif error_type == 'RF_FAIL':
resourcetype = 'LBARFFailError'
for arg in extra_args:
key, value = arg.popitem()
key = translate[key.lower()]
result[key] = float(value)
elif error_type == 'FLAT':
resourcetype = 'LBAFlatError'
for arg in extra_args:
key, value = arg.popitem()
key = translate[key.lower()]
result[key] = float(value)
elif error_type == 'SHORT':
resourcetype = 'LBAShortError'
for arg in extra_args:
key, value = arg.popitem()
key = translate[key.lower()]
result[key] = float(value)
elif error_type == 'OSCILLATION':
resourcetype = 'LBAOscillationError'
for arg in extra_args:
key, value = arg.popitem()
key = 'polarization' + key.strip().capitalize()
result[key] = True
elif error_type == 'SPURIOUS':
resourcetype = 'LBASpuriousError'
for arg in extra_args:
key, value = arg.popitem()
key = 'polarization' + key.strip().capitalize()
result[key] = True
elif 'NOISE' in error_type:
type = 'H' if 'HIGH' in error_type else 'L'
resourcetype = "LBANoiseError"
result.update(type=type)
for arg in extra_args:
key, value = arg.popitem()
polarization = 'X' if 'X' in key else 'Y'
key = key.strip('X').strip('Y')
key = translate[key] + polarization
if 'value' in key:
key = 'peak_' + key
result[key] = float(value)
elif 'JITTER' in error_type:
resourcetype = "LBAJitterError"
for arg in extra_args:
key, value = arg.popitem()
polarization = 'X' if 'X' in key else 'Y'
key = key.strip('X').strip('Y')
key = 'val' if key == 'ref' else key
key = translate[key] + polarization
if 'value' in key:
key = 'average_' + key
result[key] = float(value)
result.update(resourcetype=resourcetype)
return result
def dict_from_component_error(content):
"""
Parse the line that describes the component error and returns a dict
"""
component_id = int(content[2])
component_type = content[1]
error_type = content[3]
result = dict(component_id=component_id,
component_type=component_type,
error_type=error_type)
if component_type == 'SPU':
result.update(dict_from_spu_component_error(content))
elif component_type == 'RSP':
result.update(dict_from_rsp_component_error(content))
elif component_type == 'RCU':
result.update(resourcetype='RCUComponentError')
elif component_type == 'TBB':
result.update(dict_from_tbb_component_error(content))
elif 'LB' in component_type:
result.update(dict_from_lba_component_error(content))
elif 'HBA' in component_type:
result.update(dict_from_hba_component_error(content))
else:
raise NotImplementedError('Unknown error type ' + str(content))
return result
def dict_from_raw_station_test(content):
"""
Expects a string content with the station test output
and output a dict that describes the content of the station test
:param content: string content with the station test output
:return: a station test dict
"""
preparsed_content = []
for line in content:
if "CHECKS" in line:
values = [value for value in line.split(',')]
else:
values = [parse_key_value_pairs(value) if ('=' in value) else value for value in line.split(',')]
preparsed_content.append(values)
result = {'components_error': {}}
for row in preparsed_content:
row_type = row[3]
if row_type == "STATION":
station_name = row[4]['NAME']
station_type = station_name[0].capitalize()
station_type = station_type if station_type in ['C', 'R'] else 'I'
result.update(station_name=station_name, station_type=station_type)
elif row_type == 'VERSIONS':
pass
elif row_type == 'BADLIST':
pass
elif row_type == 'TOOLOW':
pass
elif row_type == "RUNTIME":
start_time = parse_datetime(row[0], row[4]['START'])
end_time = parse_datetime(row[0], row[5]['STOP'])
result.update(start_time=start_time)
result.update(end_time=end_time)
elif row_type == 'CHECKS':
result.update(performed_checks=" ".join(row[4:]))
elif row_type == 'TESTSIGNAL':
pass
elif row_type == 'NOSIGNAL':
pass
elif row_type == 'STATISTICS':
pass
elif row_type == 'DRIVER':
pass
elif row_type == "E_FAIL":
tile_errors = dicts_from_tile_error(row[4:])
translate_tile_error_to_component_error_type = dict(TileModemError='HBAModemError',
TileOscillatingError='HBAOscillationError',
TileSpuriousError='HBASpuriousError',
TileNoiseError='HBANoiseError',
TileJitterError='HBAJitterError',
TileRFFailure='HBARFFailError')
translate_tile_error_to_component_error = dict(TileModemError='MODEM',
TileOscillatingError='OSCILLATIN',
TileSpuriousError='SPURIOUS',
TileNoiseError='NOISE',
TileJitterError='JITTER',
TileRFFailure='RF_FAIL')
for tile_error in tile_errors:
linked_type = translate_tile_error_to_component_error_type[tile_error['resourcetype']]
component_id = int(row[2])
if (component_id, linked_type) not in result['components_error']:
error_type = translate_tile_error_to_component_error[tile_error['resourcetype']]
if error_type == 'NOISE':
error_type = 'HIGH_' + error_type if tile_error['type'] == 'H' else 'LOW_' + error_type
new_component_error = dict(resourcetype=linked_type,
component_id=component_id,
component_type='HBA',
error_type=error_type, tile_errors=[])
result['components_error'][(component_id, linked_type)] = new_component_error
result['components_error'][(component_id, linked_type)]['tile_errors'].append(tile_error)
else:
if row[2] == '---':
logger.error('row type %s unrecognized: %s', row_type, row)
else:
result['components_error'][(int(row[2]), row_type)] = dict_from_component_error(row)
result['components_error'] = result['components_error'].values()
return result
def dicts_from_tile_error(contents):
"""
Parses the content into a dict that represents the tile error's data
:param contents:
:return:
"""
results = list()
key_pattern = "([^XY0-9]{0,3})([X,Y]{0,1})([0-9]{1,3})"
for tile_error in contents:
key, args = tile_error.popitem()
error_type, polarization, tile_id = re.search(key_pattern, key).groups()
resourcetype = "TileError"
item = dict(tile_id=tile_id, polarization=polarization)
if error_type == 'M':
item.pop('polarization')
resourcetype = 'TileModemError'
elif error_type == 'O':
resourcetype = 'TileOscillatingError'
elif error_type == 'SP':
resourcetype = 'TileSpuriousError'
elif error_type in ['HN', 'LN']:
resourcetype = 'TileNoiseError'
item['type'] = error_type[0]
value, diff = map(float, args.strip().split(' '))
item['value'] = value
item['diff'] = diff
elif 'J' in error_type:
resourcetype = 'TileJitterError'
item['fluctuation'] = float(args.strip())
elif error_type == '':
resourcetype = 'TileRFFailure'
measured_signal_nodelay, measured_signal_fulldelay, subband_used_nodelay, subband_used_fulldelay, \
reference_signal_nodelay, reference_signal_fulldelay = map(float,
args.replace('nan', '999').
replace('-1', '999').
split(' '))
item.update(measured_signal_fulldelay=measured_signal_fulldelay,
measured_signal_nodelay=measured_signal_nodelay,
subband_used_fulldelay=subband_used_fulldelay,
subband_used_nodelay=subband_used_nodelay,
reference_signal_fulldelay=reference_signal_fulldelay,
reference_signal_nodelay=reference_signal_nodelay)
item.update(resourcetype=resourcetype)
results.append(item)
return results