import logging from datetime import datetime import pytz logger = logging.getLogger('serializers') rtsm_error_types_to_error_types = dict(HN='HIGH_NOISE', SN='SUMMATOR_NOISE', CR='CABLE_REFLECTION', LN='LOW_NOISE', J='JITTER', OSC='OSCILLATION', FLAT='FLAT', DOWN='DOWN', SHORT='SHORT') def split_raw_rtsm_test(content): """ Split the raw rtsm test in row and filters out comments and empty lines :param content: content of rtsm output file """ results = filter(lambda x: not x.startswith('#'), filter(lambda x: len(x) > 0, map(lambda x: x.strip(), content.splitlines() ) )) return results def parse_data(date_time): """ Parses the utc timestamp to a date object aware of the time zone :param date_time: utc timestamp :return: """ return pytz.utc.localize(datetime.utcfromtimestamp(float(date_time))) def preparse_rtsm_test_file(content): """ Parse the content of the RTSM output into a dict representation :param content: :return: """ results = [] result = dict() observation = dict() for line in content: key, value = line.split('=') if 'INFO' in key and 'OBS' not in key: if 'rcu' in result: results.append(result) result = dict() rcu, mode, observation_id, error_type, start_frequency, stop_frequency, time = value.split( ',') result.update(rcu=int(rcu), mode=int(mode), error_type=rtsm_error_types_to_error_types[error_type], start_frequency=float(start_frequency), stop_frequency=float(stop_frequency), time=parse_data(time)) elif 'INFO' in key: observation_id, start_time, stop_time, samples = value.split(',') observation.update(observation_id=int(observation_id), start_datetime=parse_data(start_time), end_datetime=parse_data(stop_time), samples=int(samples)) elif 'BAD' in key: values = [float(item) for item in value.lstrip('[').rstrip(']').strip().split(' ')] result.update(bad_spectrum=values) elif 'MEAN' in key: values = [float(item) for item in value.lstrip('[').rstrip(']').strip().split(' ')] result.update(average_spectrum=values) results.append(result) observation.update(errors=results) return observation def group_samples_by_error_type(content): errors = content.pop('errors') grouped_errors = dict() for error in errors: print(error) key = (error['mode'], error['rcu'], error['error_type']) time = error.pop('time') average_spectrum = error.pop('average_spectrum') bad_spectrum = error.pop('bad_spectrum') sample = dict(time=time, average_spectrum=average_spectrum, bad_spectrum=bad_spectrum) if key in grouped_errors: grouped_errors[key]['count'] += 1 grouped_errors[key]['samples'] += [sample] else: grouped_errors[key] = dict( count=1, samples=[sample], **error ) content['errors'] = list(grouped_errors.values()) return content def parse_rtsm_test(content): """ Expects a string content with the rtsm test output and output a list of Django Models :param content: string content with the station test output :return: a list of Django models """ return group_samples_by_error_type(preparse_rtsm_test_file(split_raw_rtsm_test(content)))