-
Mattia Mancini authoredMattia Mancini authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
holography_specification.py 8.93 KiB
import datetime
from collections import defaultdict
import re
import os
from .measurementset import MeasurementSet
class HolographySpecification(object):
hs_name_pattern = r'Holog-(?P<date>\d{8})-(?P<comment>.*)-(?P<id>\d{3}).txt'
def __init__(self, name, path):
self.path = os.path.join(path, name)
self.name = name
self.id, self.date, self.comment = HolographySpecification.\
extract_id_date_comment_from_name(name)
self.station_specification_map = defaultdict(list)
self.start_datetime = None
self.end_datetime = None
self.rcu_mode = None
self.beam_set_interval = None
def __repr__(self):
return 'HolographySpecification(%s, %s, %s, %s, %s)' % (
self.id,
self.date,
self.comment,
self.name,
self.path,
)
def _read_lines(self):
with open(self.path, 'r') as fstream_in:
return fstream_in.read().splitlines()
@staticmethod
def _split_header(line):
date_regex = '\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}'
format = r'(?P<start_date>{date_regex})\s*' \
'(?P<end_date>{date_regex})\s*' \
'(?P<rcu_mode>\d*)\s*' \
'(?P<beam_switch_delay>\d*.\d*)'.format(date_regex=date_regex)
match = re.match(format, line)
return match.groupdict()
def _parse_header(self, header):
date_time_format = '%Y-%m-%d %H:%M:%S'
splitted_header = HolographySpecification._split_header(header)
self.start_datetime = datetime.datetime.strptime(splitted_header['start_date'],
date_time_format)
self.end_datetime = datetime.datetime.strptime(splitted_header['end_date'],
date_time_format)
self.rcu_mode = splitted_header['rcu_mode']
self.beam_set_interval = splitted_header['beam_switch_delay']
@staticmethod
def _split_line(line):
range_regex = '(\d*\:\d*)|(\d*)'
ra_dec_regex = '\d*\.\d*,-?\d*\.\d*,\w*'
regex = r'^(?P<station_name>\w*)\s*' \
r'(?P<mode_description>\w*)\s*' \
r'(?P<sub_band>[\d,]*)\s*' \
r'(?P<beamlets>{range_regex})\s*' \
r'(?P<rcus>{range_regex})\s*' \
r'(?P<rcus_mode>(\d*))\s*' \
r'(?P<virtual_pointing>{ra_dec_regex})\s*' \
r'(?P<station_pointing>{ra_dec_regex})'.format(range_regex=range_regex,
ra_dec_regex=ra_dec_regex)
match = re.match(regex, line)
if match is None:
raise ValueError('Cannot parse line {}'.format(line))
return match.groupdict()
@staticmethod
def _split_lines(lines):
return [HolographySpecification._split_line(line)
for line in lines]
@staticmethod
def _parse_pointing(pointing_string):
ra, dec, coordinate_system = pointing_string.split(',')
ra = float(ra)
dec = float(dec)
return dict(ra=ra, dec=dec, coordinate_system=coordinate_system)
@staticmethod
def _parse_line(splitted_line):
rcus_mode = int(splitted_line['rcus_mode'])
sub_band_ids = [int(sub_band) for sub_band in splitted_line['sub_band'].split(',')]
mode_description = splitted_line['mode_description']
rcus_involved = splitted_line['rcus']
beamlets = splitted_line['beamlets']
station_pointing = HolographySpecification._parse_pointing(
splitted_line['station_pointing'])
virtual_pointing = HolographySpecification._parse_pointing(
splitted_line['virtual_pointing'])
if len(sub_band_ids) == 1:
station_type = 'target'
else:
station_type = 'reference'
station_name = splitted_line['station_name']
return dict(station_name=station_name,
rcus_mode=rcus_mode,
sub_band_ids=sub_band_ids,
mode_description=mode_description,
rcus_involved=rcus_involved,
beamlets=beamlets,
station_pointing=station_pointing,
virtual_pointing=virtual_pointing,
station_type=station_type)
def _parse_lines(self, lines):
splitted_lines = HolographySpecification._split_lines(lines)
for line in splitted_lines:
parsed_line = HolographySpecification._parse_line(line)
self.station_specification_map[parsed_line['station_name']] += [parsed_line]
def _update_class_attributes(self):
self.station_names = self.station_specification_map.keys()
self.reference_station_names = [station_name for station_name in
self.station_specification_map
if len(self.station_specification_map[station_name]) == 1]
self.target_station_names = [station_name for station_name in
self.station_specification_map
if len(self.station_specification_map[station_name]) > 1]
print(self.target_station_names, self.reference_station_names)
def read_file(self):
lines = self._read_lines()
self._parse_header(lines[0])
self._parse_lines(lines[1:])
self._update_class_attributes()
@staticmethod
def create_hs_list_from_name_list_and_path(name_list, path):
return [HolographySpecification(name, path) for name in name_list]
@staticmethod
def is_holography_specification_file_name(name):
return re.match(HolographySpecification.hs_name_pattern, name) is not None
@staticmethod
def extract_id_date_comment_from_name(name):
match = re.match(HolographySpecification.hs_name_pattern, name)
print(name, match)
date = match.group('date')
hs_id = int(match.group('id'))
comment = match.group('comment')
date = datetime.datetime.strptime(date, '%Y%m%d')
return hs_id, date, comment
class HolographyObservation():
def __init__(self, path, sas_id, ms_for_a_given_beamlet, observation_start_datetime,
observation_end_datetime):
self.path = path
self.sas_id = sas_id
self.ms_for_a_given_beamlet = ms_for_a_given_beamlet
self.start_datetime = observation_start_datetime
self.end_datetime = observation_end_datetime
@staticmethod
def __compute_time_range_from_ms_list(ms_list):
observation_start, observation_end = ms_list[0].get_start_end_observation()
for ms in ms_list:
ms_start_time, ms_end_time = ms.get_start_end_observation()
if observation_start > ms_start_time:
observation_start = ms_start_time
if observation_end < ms_end_time:
observation_end = ms_end_time
return observation_start, observation_end
@staticmethod
def list_observations_in_path(path):
ms_dir_name_pattern = 'L(?P<sas_id>\d{6})'
ms_dirs_path_pattern = '^' + os.path.join(path, ms_dir_name_pattern, 'uv$')
observations_list = []
for root, dirnames, filenames in os.walk(path):
match = re.match(ms_dirs_path_pattern, root)
if match:
sas_id = int(match.group('sas_id'))
ms_indexed_per_beamlet_number = HolographyObservation.\
create_ms_dict_from_ms_name_list_and_path(dirnames, root)
start_datetime, end_datetime = HolographyObservation.\
__compute_time_range_from_ms_list(
ms_indexed_per_beamlet_number.values())
observations_list.append(
HolographyObservation(path, sas_id, ms_indexed_per_beamlet_number,
start_datetime, end_datetime))
return observations_list
@staticmethod
def create_ms_dict_from_ms_name_list_and_path(list_of_ms_names, path):
"""
Creates a dict measurement sets indexed by beamlet id
:param list_of_ms_names: a list of the ms to process
:param path: a path were the ms are stored
:return: a dict containing the map of the ms indexed by their beamlet number
ex. { 0 : ms_beam0 ....}
"""
filtered_list_of_ms_names = MeasurementSet.filter_valid_ms_names(list_of_ms_names)
ms_list = [MeasurementSet(ms_name, path) for ms_name in filtered_list_of_ms_names]
beamlet_ms_map = {ms.beamlet:ms for ms in ms_list}
return beamlet_ms_map