-
Mattia Mancini authoredMattia Mancini authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
station_test_raw_parser.py 11.96 KiB
"""
This modules contains all the function needed to parse a raw station test output into the models used to describe it
"""
import logging
from datetime import datetime
import pytz
import re
from collections import defaultdict
logger = logging.getLogger('station_test.parser')
def parse_key_value_pairs(content):
"""
Parse a key value pair returning a dict.
es 'key=value' -> {'key': 'value'}
:param content: the string to parse
:return: dict
"""
assert '=' in content
key, value = content.split('=')
if re.match('\A\d*\.\d*', key):
key = 'v' + key.strip('V')
if re.match('\A\w*-\d*.\d*', key):
key = key.replace('-', '')
key = re.sub('([.-])', '_', key).lower()
return {key: value}
def parse_datetime(date, date_time):
"""
If the datetime is only the time of the current date it used the data
to generate the date time
:param date:
:param date_time:
:return:
"""
if 'T' in date_time:
try:
return pytz.utc.localize(datetime.strptime(date_time, '%Y-%m-%dT%H:%M:%S'))
except ValueError:
return pytz.utc.localize(datetime.strptime(date_time, '%d-%m-%YT%H:%M:%S'))
else:
try:
return pytz.utc.localize(
datetime.strptime("T".join([date, date_time]), '%Y%m%dT%H:%M:%S'))
except ValueError:
return pytz.utc.localize(
datetime.strptime("T".join([date, date_time]), '%d-%m-%YT%H:%M:%S'))
def parse_raw_station_test(content):
"""
Expects a string content with the station test output
and output a list of Django Models
:param content: string content with the station test output
:return: a list of Django models
"""
logger.debug('content splitted %s', content.split('\n'))
station_tests = split_history_into_tests(content.split('\n'))
logger.debug('station_tests_are %s', station_tests)
results = []
for stest in station_tests:
dict_stest = dict_from_raw_station_test(stest)
logger.debug('parsed test %s', dict_stest)
if dict_stest['station']['name'] == 'Unknown':
logger.error('error in the station name for test %s', dict_stest)
continue
if 'start_datetime' in dict_stest:
results.append(dict_stest)
else:
return None
return results
def split_history_into_tests(content):
all_tests = []
current_test = []
for i, line in enumerate(content[:-1]):
next_line_columns = content[i + 1].split(',')
line_type = next_line_columns[3]
current_test.append(line)
if 'VERSIONS' in line_type or (
'STATION' in line_type and ('VERSIONS' not in current_test[0])):
all_tests.append(current_test)
current_test = []
all_tests.append(current_test)
return all_tests
def parse_component_id(component_id_str):
"""
It parses the component_id string and returns a integer with the component
if the string is != --- or None otherwise
:param component_id_str: string representation of the component id as in the stationtest output
:return: the component id integer
"""
return int(component_id_str) if component_id_str != '---' else -1
def collect_error_details(content):
"""
Merge the error_details list of dicts into a unique dict
:param content: row describing the component error in a stationtest output
:return: a dict with the merged content of the list of dict
"""
error_details = dict()
for k in content[4:]:
if isinstance(k, dict):
error_details.update(k)
else:
error_details.update({k: True})
# Parse RF_FAIL string
if 'x' in error_details:
rf_fail = parse_rffail_string('x', error_details.pop('x'))
error_details.update(rf_fail)
if 'y' in error_details:
rf_fail = parse_rffail_string('y', error_details.pop('y'))
error_details.update(rf_fail)
element_errors = []
for error_name, error_value in dict(error_details).items():
modem_comunication_error_pattern = 'e([0-9]{2})'
matched_key = re.search(modem_comunication_error_pattern, error_name)
if matched_key:
error_details.pop(error_name)
element_id = int(matched_key.group(1))
element = dict(element_id=element_id)
element_error_details = dict(error_code=error_value)
element_error = dict(element=element,
type='MODEM',
details=element_error_details)
element_errors.append(element_error)
nosignal_error_pattern = 'E([0-9]{2})([XY])'
matched_key = re.search(nosignal_error_pattern, error_name)
if matched_key:
error_details.pop(error_name)
element_id = int(matched_key.group(1))
polarization = matched_key.group(2).lower()
element = dict(element_id=element_id)
element_error_details = {polarization: error_value}
element_error = dict(element=element,
type='NOSIGNAL',
details=element_error_details)
element_errors.append(element_error)
if error_name in ['X', 'Y']:
error_details.pop(error_name)
error_name = error_name.lower()
error_details[error_name] = error_value
error_details.update(element_errors=element_errors)
return error_details
def dict_from_component_error(content):
"""
Parse the component error into a dict
:param content: row representing the component error in the station test output
:return:
"""
component_type, component_id, error_type = content[1:4]
error_details = collect_error_details(content)
component = dict(component_id=parse_component_id(component_id), type=component_type.strip())
element_errors = error_details.pop('element_errors')
component_error = dict(component=component,
details=error_details,
type=error_type.strip())
for element_error in element_errors:
element_error['component_error'] = dict(component_error)
component_error['element_errors'] = element_errors
return component_error
def dict_from_raw_station_test(content):
"""
Expects a string content with the station test output
and output a dict that describes the content of the station test
:param content: string content with the station test output
:return: a station test dict
"""
preparsed_content = []
for line in content:
if "CHECKS" in line:
values = [value for value in line.split(',')]
else:
values = [parse_key_value_pairs(value) if ('=' in value) else value for value in
line.split(',')]
preparsed_content.append(values)
result = {'component_errors': [], 'element_errors': []}
for row in preparsed_content:
row_type = row[3]
if row_type == "STATION":
station_name = row[4]['name']
station_type = station_name[0].capitalize()
station_type = station_type if station_type in ['C', 'R'] else 'I'
result.update(station=dict(name=station_name, type=station_type))
elif row_type == 'VERSIONS':
pass
elif row_type == 'BADLIST':
pass
elif row_type == 'TOOLOW':
pass
elif row_type == "RUNTIME":
start_time = parse_datetime(row[0], row[4]['start'])
end_time = parse_datetime(row[0], row[5]['stop'])
result.update(start_datetime=start_time)
result.update(end_datetime=end_time)
elif row_type == 'CHECKS':
result.update(checks=" ".join(row[4:]))
elif row_type == 'TESTSIGNAL':
pass
elif row_type == 'STATISTICS':
pass
elif row_type == 'DRIVER':
pass
elif row_type == 'E_FAIL':
result['element_errors'].extend(dicts_from_element_error(row))
else:
component_error_dict = dict_from_component_error(row)
element_errors = component_error_dict.pop('element_errors')
result['element_errors'].extend(element_errors)
result['component_errors'].append(component_error_dict)
return result
element_error_name_mapping = {
'm': 'MODEM',
'o': 'OSCILLATION',
'sp': 'SPURIOUS',
'hn': 'HIGH_NOISE',
'ln': 'LOW_NOISE',
'j': 'JITTER',
'': 'RF_FAIL'
}
def parse_rffail_string(polarization, rf_string):
"""
Parse the string for the rffail test into a dict
:param polarization: polarization to which the test (either ['x'|'y'])
:param rf_string: content
:return: a dict that represent the rf fail test outcome
"""
parameters = list(map(float, rf_string.replace('nan', '999').
replace('-1', '999').
split(' ')))
HBA_RF_FAIL_NPARAMETERS = 6
LBA_RF_FAIL_NPARAMETERS = 1
if HBA_RF_FAIL_NPARAMETERS == len(parameters):
measured_signal_nodelay, measured_signal_fulldelay, subband_used_nodelay,\
subband_used_fulldelay,\
reference_signal_nodelay, reference_signal_fulldelay = map(float,
rf_string.replace('nan', '999').
replace('-1', '999').
split(' '))
return {
polarization + 'val_full_delay': measured_signal_fulldelay,
polarization + 'val_no_delay': measured_signal_nodelay,
polarization + 'sb_full_delay': subband_used_fulldelay,
polarization + 'sb_no_delay': subband_used_nodelay,
polarization + 'ref_full_delay': reference_signal_fulldelay,
polarization + 'ref_no_delay': reference_signal_nodelay}
elif LBA_RF_FAIL_NPARAMETERS:
return {polarization + 'val': parameters[0]}
else:
raise ValueError('String %s is not a RF_FAIL', rf_string)
def dicts_from_element_error(contents):
"""
Parses the content into a dict that represents the tile error's data
:param contents:
:return:
"""
results = defaultdict(dict)
key_pattern = "([^xy0-9]{0,3})([x,y]{0,1})([0-9]{1,3})"
component_type, component_id, error_type = contents[1:4]
for tile_error in contents[4:]:
key, args = tile_error.popitem()
element_error_type, polarization, element_id = re.search(key_pattern, key).groups()
component_error_type = element_error_name_mapping[element_error_type]
component = dict(type=component_type,
component_id=int(component_id))
component_error = dict(component=component,
type=component_error_type)
element = dict(element_id=int(element_id))
element_error = dict(component_error=component_error,
element=element,
type=component_error_type)
element_error_details = dict()
if element_error_type in ['hn', 'ln']:
value, diff = map(float, args.strip().split(' '))
element_error_details[polarization + 'val'] = value
element_error_details[polarization + 'diff'] = diff
elif 'j' in element_error_type:
element_error_details[polarization + 'diff'] = float(args.strip())
elif element_error_type == '':
element_error_details.update(parse_rffail_string(polarization, args))
elif element_error_type in ['o', 'sp']:
element_error_details[polarization] = True
element_error['details'] = element_error_details
element_error_key = (component_id, element_id, element_error_type)
if element_error_key in results:
results[element_error_key]['details'].update(element_error_details)
else:
results[element_error_key] = dict(element_error)
return list(results.values())