Skip to content
Snippets Groups Projects
holography_specification.py 9.21 KiB
Newer Older
import datetime
from collections import defaultdict
class HolographyStationBeamSpecification(object):
    station_name = None
    rcus_mode = None
    sub_band_ids = ()
    mode_description = None
    rcus_involved = ()
    beamlets = ()
    station_pointing = ()
    virtual_pointing = ()
    station_type = ()


class HolographySpecification(object):
    hs_name_pattern = r'Holog-(?P<date>\d{8})-(?P<comment>.*)-(?P<id>\d{3}).txt'

    def __init__(self, name, path):
        self.path = os.path.join(path, name)
        self.name = name
        self.id, self.date, self.comment = HolographySpecification.\
            extract_id_date_comment_from_name(name)
        self.reference_station_names = None
        self.target_station_names = None
        self.station_specification_map = defaultdict(list)
        self.start_datetime = None
        self.end_datetime = None
        self.rcu_mode = None
        self.beam_set_interval = None

    def __repr__(self):
        return 'HolographySpecification(%s, %s, %s, %s, %s)' % (
                                                                self.id,
                                                                self.date,
                                                                self.comment,
                                                                self.name,
                                                                self.path,
                                                                )

    def _read_lines(self):
        with open(self.path, 'r') as fstream_in:
            return fstream_in.read().splitlines()

    @staticmethod
    def _split_header(line):
        date_regex = '\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}'
        format = r'(?P<start_date>{date_regex})\s*' \
                  '(?P<end_date>{date_regex})\s*' \
                  '(?P<rcu_mode>\d*)\s*' \
                  '(?P<beam_switch_delay>\d*.\d*)'.format(date_regex=date_regex)
        match = re.match(format, line)
        return match.groupdict()

    def _parse_header(self, header):
        date_time_format = '%Y-%m-%d %H:%M:%S'

        splitted_header = HolographySpecification._split_header(header)

        self.start_datetime =  datetime.datetime.strptime(splitted_header['start_date'],
                                                          date_time_format)
        self.end_datetime = datetime.datetime.strptime(splitted_header['end_date'],
                                                       date_time_format)
        self.rcu_mode = splitted_header['rcu_mode']
        self.beam_set_interval = splitted_header['beam_switch_delay']

    @staticmethod
    def _split_line(line):
        range_regex = '(\d*\:\d*)|(\d*)'
        ra_dec_regex = '\d*\.\d*,-?\d*\.\d*,\w*'
        regex = r'^(?P<station_name>\w*)\s*' \
                r'(?P<mode_description>\w*)\s*' \
                r'(?P<sub_band>[\d,]*)\s*' \
                r'(?P<beamlets>{range_regex})\s*' \
                r'(?P<rcus>{range_regex})\s*' \
                r'(?P<rcus_mode>(\d*))\s*' \
                r'(?P<virtual_pointing>{ra_dec_regex})\s*' \
                r'(?P<station_pointing>{ra_dec_regex})'.format(range_regex=range_regex,
                                                               ra_dec_regex=ra_dec_regex)
        match = re.match(regex, line)
        if match is None:
            raise ValueError('Cannot parse line {}'.format(line))
        return match.groupdict()

    @staticmethod
    def _split_lines(lines):
        return [HolographySpecification._split_line(line)
                for line in lines]

    @staticmethod
    def _parse_pointing(pointing_string):
        ra, dec, coordinate_system = pointing_string.split(',')
        ra = float(ra)
        dec = float(dec)
        return dict(ra=ra, dec=dec, coordinate_system=coordinate_system)

    @staticmethod
    def _parse_integer_range_or_list(string_to_be_parsed):
        """
        Parses a string containing a list of int or an inclusive range and returns a list of int

        ex "1,2,3,4,5" -> [1, 2, 3, 4, 5]
           "1:4" -> [1, 2, 3, 4]
        :param string_to_be_parsed: the string representing a list of int or a range
        :return: a list of int
        :rtype: list(int)
        """
        if ':' in string_to_be_parsed:
            try:
                start, end = map(int, string_to_be_parsed.split(':'))
            except ValueError as e:
                raise ValueError('Cannot parse string %s expected [start]:[end] -> %s' %
                                 (string_to_be_parsed, e))
            return_value = [x for x in range(start, end)]
        elif ',' in string_to_be_parsed:
            try:
                return_value = list(map(int, string_to_be_parsed.split(',')))
            except ValueError as e:
                raise ValueError('Cannot parse string %s expected [int],[int],... -> %s' %
                                 (string_to_be_parsed, e))
        else:
            raise ValueError('Cannot parse string %s expected [int],[int],... or [start]:[end] %s' %
                             string_to_be_parsed)
        return return_value

    @staticmethod
    def _parse_line(splitted_line):


        beam_specification = HolographyStationBeamSpecification()

        beam_specification.rcus_mode = int(splitted_line['rcus_mode'])
        beam_specification.sub_band_ids = [int(sub_band) for sub_band in splitted_line['sub_band'].split(',')]
        beam_specification.mode_description = splitted_line['mode_description']
        beam_specification.rcus_involved = HolographySpecification._parse_integer_range_or_list(
            splitted_line['rcus'])
        beam_specification.beamlets = splitted_line['beamlets']

        beam_specification.station_pointing = HolographySpecification._parse_pointing(
            splitted_line['station_pointing'])
        beam_specification.virtual_pointing = HolographySpecification._parse_pointing(
            splitted_line['virtual_pointing'])
        if len(beam_specification.sub_band_ids) == 1:
            beam_specification.station_type = 'target'
            beam_specification.station_type = 'reference'
        beam_specification.station_name = splitted_line['station_name']

        return beam_specification
    @staticmethod
    def list_bsf_files_in_path(path):
        """
        List all the Holography beam specification files in the given path
        :param path: path to the beam specification files
        :type path: str
        :return: the list of Holography Specifications
        :rtype: list[HolographySpecification]
        """
        bsf_files_name_pattern = 'Holog-*.txt'
        bsf_files_path_pattern = os.path.join(path, bsf_files_name_pattern)
        matched_path_list = glob(bsf_files_path_pattern)
        matched_file_path_list = list(
            filter(lambda path_i: os.path.isfile(path_i), matched_path_list))
        matched_file_name_list = list(map(lambda path_i: os.path.basename(path_i),
                                          matched_file_path_list))
        holography_beam_specification = HolographySpecification.create_hs_list_from_name_list_and_path(
            matched_file_name_list, path)
        return holography_beam_specification

    def _parse_lines(self, lines):
        splitted_lines = HolographySpecification._split_lines(lines)

        for line in splitted_lines:
            parsed_line = HolographySpecification._parse_line(line)

            self.station_specification_map[parsed_line.station_name] += [parsed_line]

    def _update_class_attributes(self):
        self.station_names = self.station_specification_map.keys()
        self.reference_station_names = [station_name for station_name in
                                         self.station_specification_map
                                         if len(self.station_specification_map[station_name]) == 1]
        self.target_station_names = [station_name for station_name in
                                        self.station_specification_map
                                        if len(self.station_specification_map[station_name]) > 1]

    def get_beam_specifications_per_station_name(self, station_name):
        """
        Returns a list of beam specifications for the given station name
        :param station_name: name of the station
        :type station_name: str
        :return: a list of beam specifications
        :rtype: list[HolographyStationBeamSpecification]
        """
        return self.station_specification_map[station_name]
    
    def read_file(self):
        lines = self._read_lines()
        self._parse_header(lines[0])
        self._parse_lines(lines[1:])
        self._update_class_attributes()

    @staticmethod
    def create_hs_list_from_name_list_and_path(name_list, path):
        return [HolographySpecification(name, path) for name in name_list]

    @staticmethod
    def is_holography_specification_file_name(name):
        return re.match(HolographySpecification.hs_name_pattern, name) is not None

    @staticmethod
    def extract_id_date_comment_from_name(name):
        match = re.match(HolographySpecification.hs_name_pattern, name)
        date = match.group('date')
        hs_id = int(match.group('id'))
        comment = match.group('comment')
        date = datetime.datetime.strptime(date, '%Y%m%d')
        return hs_id, date, comment