Skip to content
Snippets Groups Projects
holography_specification.py 10.7 KiB
Newer Older
import re
from collections import defaultdict
logger = logging.getLogger(__file__)
class HolographyStationBeamSpecification(object):
    """
    Contains the data regarding the beam specification for a single station.
    """
    station_name = None
    rcus_mode = None
    sub_band_ids = ()
    mode_description = None
    rcus_involved = ()
    beamlets = ()
    station_pointing = ()
    virtual_pointing = ()
    station_type = ()


class HolographySpecification(object):
    """
    The HolographySpecification represents the set of input used to specify an holography
    observation with the LOFAR telescope
    """
    hs_name_pattern = r'Holog-(?P<date>\d{8})-(?P<comment>.*)-(?P<id>\d{3}).txt'

    def __init__(self, name, path):
        """
        Construct the holography specification object and extracts id date and comment from
        the file name (see HolographySpecification.extract_id_date_comment_from_name)
        :param name: file name of the holography specification
        :type name: str
        :param path: path of the holography specification file
        :type path: str
        """
        self.path = os.path.join(path, name)
        self.name = name
        self.id, self.date, self.comment = HolographySpecification. \
            extract_id_date_comment_from_name(name)
        self.reference_station_names = None
        self.target_station_names = None
        self.station_specification_map = defaultdict(list)
        self.start_datetime = None
        self.end_datetime = None
        self.rcu_mode = None
        self.beam_set_interval = None

    def __repr__(self):
        return 'HolographySpecification(%s, %s, %s, %s, %s)' % (
            self.id,
            self.date,
            self.comment,
            self.name,
            self.path,
        )

    @staticmethod
    def list_bsf_files_in_path(path):
        """
        List all the Holography beam specification files in the given path
        :param path: path to the beam specification files
        :type path: str
        :return: the list of Holography Specifications
        :rtype: list[HolographySpecification]
        """
        logger.info("Loading holography beam specifications from \"%s\"...", path)
        bsf_files_name_pattern = 'Holog-*.txt'
        bsf_files_path_pattern = os.path.join(path, bsf_files_name_pattern)
        matched_path_list = glob(bsf_files_path_pattern)
        matched_file_path_list = list(
            filter(lambda path_i: os.path.isfile(path_i), matched_path_list))
        matched_file_name_list = list(map(lambda path_i: os.path.basename(path_i),
                                          matched_file_path_list))
        holography_beam_specification = HolographySpecification.create_hs_list_from_name_list_and_path(
            matched_file_name_list, path)
        logger.info("Loading holography beam specifications were successfully loaded from \"%s\".", path)
        return holography_beam_specification

    def get_beam_specifications_per_station_name(self, station_name):
        """
        Returns a list of beam specifications for the given station name
        :param station_name: name of the station
        :type station_name: str
        :return: a list of beam specifications
        :rtype: list[HolographyStationBeamSpecification]
        """
        return self.station_specification_map[station_name]

    def read_file(self):
        logger.debug("Reading holography file \"%s\"...", self.path)
        lines = self._read_lines()
        self._parse_header(lines[0])
        self._parse_lines(lines[1:])
        self._update_class_attributes()
        logger.debug("Reading holography file \"%s\" done.", self.path)

    @staticmethod
    def create_hs_list_from_name_list_and_path(name_list, path):
        return [HolographySpecification(name, path) for name in name_list]

    @staticmethod
    def is_holography_specification_file_name(name):
        return re.match(HolographySpecification.hs_name_pattern, name) is not None

    @staticmethod
    def extract_id_date_comment_from_name(name):
        match = re.match(HolographySpecification.hs_name_pattern, name)
        date = match.group('date')
        hs_id = int(match.group('id'))
        comment = match.group('comment')
        date = datetime.datetime.strptime(date, '%Y%m%d')
        return hs_id, date, comment

    def _read_lines(self):
        with open(self.path, 'r') as fstream_in:
            return fstream_in.readlines()
        """
        Split the header of holography specification file
        :param line: line containing the header
        :return: a dict containing the start_date,
         the end_date, the rcu_mode, and the beam_switch_delay
        :rtype: dict
        """
        date_regex = '\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}'
        format = r'(?P<start_date>{date_regex})\s*' \
                 '(?P<end_date>{date_regex})\s*' \
                 '(?P<rcu_mode>\d*)\s*' \
                 '(?P<beam_switch_delay>\d*.\d*)'.format(date_regex=date_regex)
        match = re.match(format, line)
        return match.groupdict()

    def _parse_header(self, header):
        """
        Parse the header
        :param header: string containing the header
        """
        date_time_format = '%Y-%m-%d %H:%M:%S'

        split_header = HolographySpecification._split_header(header)
        self.start_datetime = datetime.datetime.strptime(split_header['start_date'],
                                                         date_time_format)
        self.end_datetime = datetime.datetime.strptime(split_header['end_date'],
        self.rcu_mode = split_header['rcu_mode']
        self.beam_set_interval = split_header['beam_switch_delay']
        """
        Split the header of holography specification file
        :param line: line containing the header
        :return: a dict containing the start_date,
         the end_date, the rcu_mode, and the beam_switch_delay
        :rtype: dict
        """
        range_regex = '(\d*\:\d*)|(\d*)'
        ra_dec_regex = '\d*\.\d*,-?\d*\.\d*,\w*'
        regex = r'^(?P<station_name>\w*)\s*' \
                r'(?P<mode_description>\w*)\s*' \
                r'(?P<sub_band>[\d,]*)\s*' \
                r'(?P<beamlets>{range_regex})\s*' \
                r'(?P<rcus>{range_regex})\s*' \
                r'(?P<rcus_mode>(\d*))\s*' \
                r'(?P<virtual_pointing>{ra_dec_regex})\s*' \
                r'(?P<station_pointing>{ra_dec_regex})'.format(range_regex=range_regex,
                                                               ra_dec_regex=ra_dec_regex)
        match = re.match(regex, line)
        if match is None:
            raise ValueError('Cannot parse line {}'.format(line))
        return match.groupdict()

    @staticmethod
    def _split_lines(lines):
        return [HolographySpecification._split_line(line)
                for line in lines]

    @staticmethod
    def _parse_pointing(pointing_string):
        ra, dec, coordinate_system = pointing_string.split(',')
        ra = float(ra)
        dec = float(dec)
        return dict(ra=ra, dec=dec, coordinate_system=coordinate_system)

    @staticmethod
    def _parse_integer_range_or_list(string_to_be_parsed):
        """
        Parses a string containing a list of int or an inclusive range and returns a list of int

        ex "1,2,3,4,5" -> [1, 2, 3, 4, 5]
           "1:4" -> [1, 2, 3, 4]
        :param string_to_be_parsed: the string representing a list of int or a range
        :return: a list of int
        :rtype: list(int)
        """
        if ':' in string_to_be_parsed:
            try:
                start, end = map(int, string_to_be_parsed.split(':'))
            except ValueError as e:
                raise ValueError('Cannot parse string %s expected [start]:[end] -> %s' %
                                 (string_to_be_parsed, e))
            return_value = [x for x in range(start, end)]
        elif ',' in string_to_be_parsed:
            try:
                return_value = list(map(int, string_to_be_parsed.split(',')))
            except ValueError as e:
                raise ValueError('Cannot parse string %s expected [int],[int],... -> %s' %
                                 (string_to_be_parsed, e))
        else:
            raise ValueError('Cannot parse string %s expected [int],[int],... or [start]:[end] %s' %
                             string_to_be_parsed)
        return return_value

    def _parse_line(split_line):
        """
        Parses a line in the holography specification file
        :param split_line: dict containing the row indexed by the column name
        :type split_line: dict[str, str]
        """
        beam_specification = HolographyStationBeamSpecification()

        beam_specification.rcus_mode = int(split_line['rcus_mode'])
        beam_specification.sub_band_ids = [int(sub_band) for sub_band in
                                           split_line['sub_band'].split(',')]
        beam_specification.mode_description = split_line['mode_description']
        beam_specification.rcus_involved = HolographySpecification._parse_integer_range_or_list(
            split_line['rcus'])
        beam_specification.beamlets = split_line['beamlets']

        beam_specification.station_pointing = HolographySpecification._parse_pointing(
            split_line['station_pointing'])
        beam_specification.virtual_pointing = HolographySpecification._parse_pointing(
            split_line['virtual_pointing'])
        if len(beam_specification.sub_band_ids) == 1:
            beam_specification.station_type = 'target'
            beam_specification.station_type = 'reference'
        beam_specification.station_name = split_line['station_name']
        split_lines = HolographySpecification._split_lines(lines)
        for line in split_lines:
            parsed_line = HolographySpecification._parse_line(line)

            self.station_specification_map[parsed_line.station_name] += [parsed_line]

    def _update_class_attributes(self):
        self.station_names = self.station_specification_map.keys()
        self.reference_station_names = [station_name for station_name in
                                        self.station_specification_map
                                        if len(self.station_specification_map[station_name]) == 1]
        self.target_station_names = [station_name for station_name in
                                     self.station_specification_map
                                     if len(self.station_specification_map[station_name]) > 1]