Skip to content
Snippets Groups Projects
holography_specification.py 11 KiB
Newer Older
import logging
import re
from collections import defaultdict
logger = logging.getLogger(__file__)
MODE_TO_COMPONENT = {
    1: 'LBL',
    2: 'LBL',
    3: 'LBH',
    4: 'LBH',
    5: 'HBA',
    6: 'HBA',
    7: 'HBA'
}


def antenna_id_from_rcu_type(rcu, type):
    Compute the antenna id for a given rcu, type
    :param rcu: id of the rcu
    :param type: type of the antenna
    :return: the antenna id and polarization
    :rtype: (int, str)
    :rtype: int
    """
    polarization_index = rcu % 2

    if type in ['LBH', 'HBA']:
        antenna_id = rcu - polarization_index
        antenna_id /= 2.
    elif type == 'LBL':
        antenna_id = (rcu - polarization_index) / 2. + 48
    else:
        antenna_id = -1

    return int(antenna_id)


Mattia Mancini's avatar
Mattia Mancini committed
def antenna_id_from_rcu_mode_polarization(rcu, mode):
    if mode in MODE_TO_COMPONENT:
        type = MODE_TO_COMPONENT[mode]
        return antenna_id_from_rcu_type(rcu, type)
class HolographyStationBeamSpecification(object):
    """
    Contains the data regarding the beam specification for a single station.
    """
    station_name = None
    rcus_mode = None
    sub_band_ids = ()
    mode_description = None
    rcus_involved = ()
    beamlets = ()
    station_pointing = ()
    virtual_pointing = ()
    station_type = ()


def _parse_line(split_line):
    """
    Parses a line in the holography specification file
    :param split_line: dict containing the row indexed by the column name
    :type split_line: dict[str, str]
    """
    beam_specification = HolographyStationBeamSpecification()

    beam_specification.rcus_mode = int(split_line['rcus_mode'])
    beam_specification.sub_band_ids = [int(sub_band) for sub_band in
                                       split_line['sub_band'].split(',')]
    beam_specification.mode_description = split_line['mode_description']
    beam_specification.rcus_involved = _parse_integer_range_or_list(
        split_line['rcus'])
    beam_specification.beamlets = split_line['beamlets']

    beam_specification.antennas_used = [
        antenna_id_polarization_from_rcu_mode_polarization(rcu,
                                                           beam_specification.rcus_mode)
        for rcu in beam_specification.rcus_involved]
    beam_specification.station_pointing = _parse_pointing(
        split_line['station_pointing'])
    beam_specification.virtual_pointing = _parse_pointing(
        split_line['virtual_pointing'])
    if len(beam_specification.sub_band_ids) == 1:
        beam_specification.station_type = 'target'
    else:
        beam_specification.station_type = 'reference'
    beam_specification.station_name = split_line['station_name']

    return beam_specification


def list_bsf_files_in_path(path):
    """
    List all the Holography beam specification files in the given path
    :param path: path to the beam specification files
    :type path: str
    :return: the list of Holography Specifications
    :rtype: list[HolographySpecification]
    """
    logger.info("Loading holography beam specifications from \"%s\"...", path)
    bsf_files_name_pattern = 'Holog-*.txt'
    bsf_files_path_pattern = os.path.join(path, bsf_files_name_pattern)
    matched_path_list = glob(bsf_files_path_pattern)
    matched_file_path_list = list(
        filter(lambda path_i: os.path.isfile(path_i), matched_path_list))
    matched_file_name_list = list(map(lambda path_i: os.path.basename(path_i),
                                      matched_file_path_list))
    holography_beam_specification = create_hs_list_from_name_list_and_path(
        matched_file_name_list, path)
    logger.info("Loading holography beam specifications were successfully loaded from \"%s\".",
                path)
    return holography_beam_specification


def create_hs_list_from_name_list_and_path(name_list, path):
    return [HolographySpecification(name, path) for name in name_list]


def is_holography_specification_file_name(name):
    return re.match(HolographySpecification.hs_name_pattern, name) is not None


def extract_id_date_comment_from_name(name):
    match = re.match(HolographySpecification.hs_name_pattern, name)
    date = match.group('date')
    hs_id = int(match.group('id'))
    comment = match.group('comment')
    date = datetime.datetime.strptime(date, '%Y%m%d')
    return hs_id, date, comment


def _split_header(line):
    """
    Split the header of holography specification file
    :param line: line containing the header
    :return: a dict containing the start_date,
     the end_date, the rcu_mode, and the beam_switch_delay
    :rtype: dict
    """
    date_regex = '\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}'
    format = r'(?P<start_date>{date_regex})\s*' \
             '(?P<end_date>{date_regex})\s*' \
             '(?P<rcu_mode>\d*)\s*' \
             '(?P<beam_switch_delay>\d*.\d*)'.format(date_regex=date_regex)
    match = re.match(format, line)
    return match.groupdict()


def _split_line(line):
    """
    Split the header of holography specification file
    :param line: line containing the header
    :return: a dict containing the start_date,
     the end_date, the rcu_mode, and the beam_switch_delay
    :rtype: dict
    """
    range_regex = '(\d*\:\d*)|(\d*)'
    ra_dec_regex = '(\d*\.\d*|nan),(-?\d*\.\d*|nan),(\w*)'
    regex = r'^(?P<station_name>\w*)\s*' \
            r'(?P<mode_description>\w*)\s*' \
            r'(?P<sub_band>[\d,]*)\s*' \
            r'(?P<beamlets>{range_regex})\s*' \
            r'(?P<rcus>{range_regex})\s*' \
            r'(?P<rcus_mode>(\d*))\s*' \
            r'(?P<virtual_pointing>{ra_dec_regex})\s*' \
            r'(?P<station_pointing>{ra_dec_regex})'.format(range_regex=range_regex,
                                                           ra_dec_regex=ra_dec_regex)
    match = re.match(regex, line)
    if match is None:
        raise ValueError('Cannot parse line {}'.format(line))
    return match.groupdict()


def _split_lines(lines):
    return [_split_line(line)
            for line in lines]


def _parse_pointing(pointing_string):
    ra, dec, coordinate_system = pointing_string.split(',')
    ra = float(ra)
    dec = float(dec)
    return dict(ra=ra, dec=dec, coordinate_system=coordinate_system)


def _parse_integer_range_or_list(string_to_be_parsed):
    """
    Parses a string containing a list of int or an inclusive range and returns a list of int

    ex "1,2,3,4,5" -> [1, 2, 3, 4, 5]
       "1:4" -> [1, 2, 3, 4]
    :param string_to_be_parsed: the string representing a list of int or a range
    :return: a list of int
    :rtype: list(int)
    """
    if ':' in string_to_be_parsed:
        try:
            start, end = map(int, string_to_be_parsed.split(':'))
        except ValueError as e:
            raise ValueError('Cannot parse string %s expected [start]:[end] -> %s' %
                             (string_to_be_parsed, e))
        return_value = [x for x in range(start, end)]
    elif ',' in string_to_be_parsed:
        try:
            return_value = list(map(int, string_to_be_parsed.split(',')))
        except ValueError as e:
            raise ValueError('Cannot parse string %s expected [int],[int],... -> %s' %
                             (string_to_be_parsed, e))
    else:
        raise ValueError('Cannot parse string %s expected [int],[int],... or [start]:[end] %s' %
                         string_to_be_parsed)
    return return_value


class HolographySpecification(object):
    """
    The HolographySpecification represents the set of input used to specify an holography
    observation with the LOFAR telescope
    """
    hs_name_pattern = r'Holog-(?P<date>\d{8})-(?P<comment>.*)-(?P<id>\d{3}).txt'

    def __init__(self, name, path):
        """
        Construct the holography specification object and extracts id date and comment from
        the file name (see HolographySpecification.extract_id_date_comment_from_name)
        :param name: file name of the holography specification
        :type name: str
        :param path: path of the holography specification file
        :type path: str
        """
        self.path = os.path.join(path, name)
        self.name = name
        self.id, self.date, self.comment = extract_id_date_comment_from_name(name)
        self.reference_station_names = None
        self.target_station_names = None
        self.station_specification_map = defaultdict(list)
        self.start_datetime = None
        self.end_datetime = None
        self.rcu_mode = None
        self.beam_set_interval = None

    def __repr__(self):
        return 'HolographySpecification(%s, %s, %s, %s, %s)' % (
            self.id,
            self.date,
            self.comment,
            self.name,
            self.path,
        )

    def get_beam_specifications_per_station_name(self, station_name):
        """
        Returns a list of beam specifications for the given station name
        :param station_name: name of the station
        :type station_name: str
        :return: a list of beam specifications
        :rtype: list[HolographyStationBeamSpecification]
        """
        return self.station_specification_map[station_name]

    def read_file(self):
        logger.debug("Reading holography file \"%s\"...", self.path)
        lines = self._read_lines()
        self._parse_header(lines[0])
        self._parse_lines(lines[1:])
        self._update_class_attributes()
        logger.debug("Reading holography file \"%s\" done.", self.path)
    def _read_lines(self):
        with open(self.path, 'r') as fstream_in:
            return fstream_in.readlines()
        """
        Parse the header
        :param header: string containing the header
        """
        date_time_format = '%Y-%m-%d %H:%M:%S'

        split_header = _split_header(header)
        self.start_datetime = datetime.datetime.strptime(split_header['start_date'],
                                                         date_time_format)
        self.end_datetime = datetime.datetime.strptime(split_header['end_date'],
        self.rcu_mode = split_header['rcu_mode']
        self.beam_set_interval = split_header['beam_switch_delay']
        split_lines = _split_lines(lines)
        for line in split_lines:
            parsed_line = _parse_line(line)
            self.station_specification_map[parsed_line.station_name] += [parsed_line]
        self.station_names = list(self.station_specification_map.keys())
        self.reference_station_names = [station_name for station_name in
                                        self.station_specification_map
                                        if len(self.station_specification_map[station_name]) == 1]
        self.target_station_names = [station_name for station_name in
                                     self.station_specification_map
                                     if len(self.station_specification_map[station_name]) > 1]