import datetime import logging import os import re from collections import defaultdict from glob import glob logger = logging.getLogger(__file__) MODE_TO_COMPONENT = { 1: 'LBL', 2: 'LBL', 3: 'LBH', 4: 'LBH', 5: 'HBA', 6: 'HBA', 7: 'HBA' } def antenna_id_from_rcu_type(rcu, type): """ Compute the antenna id for a given rcu, type :param rcu: id of the rcu :param type: type of the antenna :return: the antenna id and polarization :rtype: (int, str) :rtype: int """ polarization_index = rcu % 2 if type in ['LBH', 'HBA']: antenna_id = rcu - polarization_index antenna_id /= 2. elif type == 'LBL': antenna_id = (rcu - polarization_index) / 2. + 48 else: antenna_id = -1 return int(antenna_id) def antenna_id_from_rcu_mode_polarization(rcu, mode): if mode in MODE_TO_COMPONENT: type = MODE_TO_COMPONENT[mode] return antenna_id_from_rcu_type(rcu, type) else: raise Exception('Unrecognized mode %s' % mode) class HolographyStationBeamSpecification(object): """ Contains the data regarding the beam specification for a single station. """ station_name = None rcus_mode = None sub_band_ids = () mode_description = None rcus_involved = () beamlets = () antennas_used = () station_pointing = () virtual_pointing = () station_type = () def _parse_line(split_line): """ Parses a line in the holography specification file :param split_line: dict containing the row indexed by the column name :type split_line: dict[str, str] """ beam_specification = HolographyStationBeamSpecification() beam_specification.rcus_mode = int(split_line['rcus_mode']) beam_specification.sub_band_ids = [int(sub_band) for sub_band in split_line['sub_band'].split(',')] beam_specification.mode_description = split_line['mode_description'] beam_specification.rcus_involved = _parse_integer_range_or_list( split_line['rcus']) beam_specification.beamlets = split_line['beamlets'] beam_specification.antennas_used = [ antenna_id_polarization_from_rcu_mode_polarization(rcu, beam_specification.rcus_mode) for rcu in beam_specification.rcus_involved] beam_specification.station_pointing = _parse_pointing( split_line['station_pointing']) beam_specification.virtual_pointing = _parse_pointing( split_line['virtual_pointing']) if len(beam_specification.sub_band_ids) == 1: beam_specification.station_type = 'target' else: beam_specification.station_type = 'reference' beam_specification.station_name = split_line['station_name'] return beam_specification def list_bsf_files_in_path(path): """ List all the Holography beam specification files in the given path :param path: path to the beam specification files :type path: str :return: the list of Holography Specifications :rtype: list[HolographySpecification] """ logger.info("Loading holography beam specifications from \"%s\"...", path) bsf_files_name_pattern = 'Holog-*.txt' bsf_files_path_pattern = os.path.join(path, bsf_files_name_pattern) matched_path_list = glob(bsf_files_path_pattern) matched_file_path_list = list( filter(lambda path_i: os.path.isfile(path_i), matched_path_list)) matched_file_name_list = list(map(lambda path_i: os.path.basename(path_i), matched_file_path_list)) holography_beam_specification = create_hs_list_from_name_list_and_path( matched_file_name_list, path) logger.info("Loading holography beam specifications were successfully loaded from \"%s\".", path) return holography_beam_specification def create_hs_list_from_name_list_and_path(name_list, path): return [HolographySpecification(name, path) for name in name_list] def is_holography_specification_file_name(name): return re.match(HolographySpecification.hs_name_pattern, name) is not None def extract_id_date_comment_from_name(name): match = re.match(HolographySpecification.hs_name_pattern, name) date = match.group('date') hs_id = int(match.group('id')) comment = match.group('comment') date = datetime.datetime.strptime(date, '%Y%m%d') return hs_id, date, comment def _split_header(line): """ Split the header of holography specification file :param line: line containing the header :return: a dict containing the start_date, the end_date, the rcu_mode, and the beam_switch_delay :rtype: dict """ date_regex = '\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}' format = r'(?P<start_date>{date_regex})\s*' \ '(?P<end_date>{date_regex})\s*' \ '(?P<rcu_mode>\d*)\s*' \ '(?P<beam_switch_delay>\d*.\d*)'.format(date_regex=date_regex) match = re.match(format, line) return match.groupdict() def _split_line(line): """ Split the header of holography specification file :param line: line containing the header :return: a dict containing the start_date, the end_date, the rcu_mode, and the beam_switch_delay :rtype: dict """ range_regex = '(\d*\:\d*)|(\d*)' ra_dec_regex = '(\d*\.\d*|nan),(-?\d*\.\d*|nan),(\w*)' regex = r'^(?P<station_name>\w*)\s*' \ r'(?P<mode_description>\w*)\s*' \ r'(?P<sub_band>[\d,]*)\s*' \ r'(?P<beamlets>{range_regex})\s*' \ r'(?P<rcus>{range_regex})\s*' \ r'(?P<rcus_mode>(\d*))\s*' \ r'(?P<virtual_pointing>{ra_dec_regex})\s*' \ r'(?P<station_pointing>{ra_dec_regex})'.format(range_regex=range_regex, ra_dec_regex=ra_dec_regex) match = re.match(regex, line) if match is None: raise ValueError('Cannot parse line {}'.format(line)) return match.groupdict() def _split_lines(lines): return [_split_line(line) for line in lines] def _parse_pointing(pointing_string): ra, dec, coordinate_system = pointing_string.split(',') ra = float(ra) dec = float(dec) return dict(ra=ra, dec=dec, coordinate_system=coordinate_system) def _parse_integer_range_or_list(string_to_be_parsed): """ Parses a string containing a list of int or an inclusive range and returns a list of int ex "1,2,3,4,5" -> [1, 2, 3, 4, 5] "1:4" -> [1, 2, 3, 4] :param string_to_be_parsed: the string representing a list of int or a range :return: a list of int :rtype: list(int) """ if ':' in string_to_be_parsed: try: start, end = map(int, string_to_be_parsed.split(':')) except ValueError as e: raise ValueError('Cannot parse string %s expected [start]:[end] -> %s' % (string_to_be_parsed, e)) return_value = [x for x in range(start, end)] elif ',' in string_to_be_parsed: try: return_value = list(map(int, string_to_be_parsed.split(','))) except ValueError as e: raise ValueError('Cannot parse string %s expected [int],[int],... -> %s' % (string_to_be_parsed, e)) else: raise ValueError('Cannot parse string %s expected [int],[int],... or [start]:[end] %s' % string_to_be_parsed) return return_value class HolographySpecification(object): """ The HolographySpecification represents the set of input used to specify an holography observation with the LOFAR telescope """ hs_name_pattern = r'Holog-(?P<date>\d{8})-(?P<comment>.*)-(?P<id>\d{3}).txt' def __init__(self, name, path): """ Construct the holography specification object and extracts id date and comment from the file name (see HolographySpecification.extract_id_date_comment_from_name) :param name: file name of the holography specification :type name: str :param path: path of the holography specification file :type path: str """ self.path = os.path.join(path, name) self.name = name self.id, self.date, self.comment = extract_id_date_comment_from_name(name) self.reference_station_names = None self.target_station_names = None self.station_specification_map = defaultdict(list) self.start_datetime = None self.end_datetime = None self.rcu_mode = None self.beam_set_interval = None def __repr__(self): return 'HolographySpecification(%s, %s, %s, %s, %s)' % ( self.id, self.date, self.comment, self.name, self.path, ) def get_beam_specifications_per_station_name(self, station_name): """ Returns a list of beam specifications for the given station name :param station_name: name of the station :type station_name: str :return: a list of beam specifications :rtype: list[HolographyStationBeamSpecification] """ return self.station_specification_map[station_name] def read_file(self): logger.debug("Reading holography file \"%s\"...", self.path) lines = self._read_lines() self._parse_header(lines[0]) self._parse_lines(lines[1:]) self._update_class_attributes() logger.debug("Reading holography file \"%s\" done.", self.path) def _read_lines(self): with open(self.path, 'r') as fstream_in: return fstream_in.readlines() def _parse_header(self, header): """ Parse the header :param header: string containing the header """ date_time_format = '%Y-%m-%d %H:%M:%S' split_header = _split_header(header) self.start_datetime = datetime.datetime.strptime(split_header['start_date'], date_time_format) self.end_datetime = datetime.datetime.strptime(split_header['end_date'], date_time_format) self.rcu_mode = split_header['rcu_mode'] self.beam_set_interval = split_header['beam_switch_delay'] def _parse_lines(self, lines): split_lines = _split_lines(lines) for line in split_lines: parsed_line = _parse_line(line) self.station_specification_map[parsed_line.station_name] += [parsed_line] def _update_class_attributes(self): self.station_names = list(self.station_specification_map.keys()) self.reference_station_names = [station_name for station_name in self.station_specification_map if len(self.station_specification_map[station_name]) == 1] self.target_station_names = [station_name for station_name in self.station_specification_map if len(self.station_specification_map[station_name]) > 1]