-
Mattia Mancini authoredMattia Mancini authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
rtsm_test_raw_parser.py 4.10 KiB
import logging
from datetime import datetime
import pytz
logger = logging.getLogger('serializers')
rtsm_error_types_to_error_types = dict(HN='HIGH_NOISE',
SN='SUMMATOR_NOISE',
CR='CABLE_REFLECTION',
LN='LOW_NOISE',
J='JITTER',
OSC='OSCILLATION',
FLAT='FLAT',
DOWN='DOWN',
SHORT='SHORT')
def split_raw_rtsm_test(content):
"""
Split the raw rtsm test in row and filters out comments and empty lines
:param content: content of rtsm output file
"""
results = filter(lambda x: not x.startswith('#'),
filter(lambda x: len(x) > 0,
map(lambda x: x.strip(),
content.splitlines()
)
))
return results
def parse_data(date_time):
"""
Parses the utc timestamp to a date object aware of the time zone
:param date_time: utc timestamp
:return:
"""
return pytz.utc.localize(datetime.utcfromtimestamp(float(date_time)))
def preparse_rtsm_test_file(content):
"""
Parse the content of the RTSM output into a dict representation
:param content:
:return:
"""
results = []
result = dict()
observation = dict()
for line in content:
key, value = line.split('=')
if 'INFO' in key and 'OBS' not in key:
if 'rcu' in result:
results.append(result)
result = dict()
rcu, mode, observation_id, error_type, start_frequency, stop_frequency, time = value.split(
',')
result.update(rcu=int(rcu),
mode=int(mode),
error_type=rtsm_error_types_to_error_types[error_type],
start_frequency=float(start_frequency),
stop_frequency=float(stop_frequency),
time=parse_data(time))
elif 'INFO' in key:
observation_id, start_time, stop_time, samples = value.split(',')
observation.update(observation_id=int(observation_id),
start_datetime=parse_data(start_time),
end_datetime=parse_data(stop_time),
samples=int(samples))
elif 'BAD' in key:
values = [float(item) for item in value.lstrip('[').rstrip(']').strip().split(' ')]
result.update(bad_spectrum=values)
elif 'MEAN' in key:
values = [float(item) for item in value.lstrip('[').rstrip(']').strip().split(' ')]
result.update(average_spectrum=values)
results.append(result)
observation.update(errors=results)
return observation
def group_samples_by_error_type(content):
errors = content.pop('errors')
grouped_errors = dict()
for error in errors:
print(error)
key = (error['mode'], error['rcu'], error['error_type'])
time = error.pop('time')
average_spectrum = error.pop('average_spectrum')
bad_spectrum = error.pop('bad_spectrum')
sample = dict(time=time,
average_spectrum=average_spectrum,
bad_spectrum=bad_spectrum)
if key in grouped_errors:
grouped_errors[key]['count'] += 1
grouped_errors[key]['samples'] += [sample]
else:
grouped_errors[key] = dict(
count=1,
samples=[sample],
**error
)
content['errors'] = list(grouped_errors.values())
return content
def parse_rtsm_test(content):
"""
Expects a string content with the rtsm test output
and output a list of Django Models
:param content: string content with the station test output
:return: a list of Django models
"""
return group_samples_by_error_type(preparse_rtsm_test_file(split_raw_rtsm_test(content)))