-
Mattia Mancini authoredMattia Mancini authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
holography_specification.py 11.04 KiB
import datetime
import logging
import os
import re
from collections import defaultdict
from glob import glob
logger = logging.getLogger(__file__)
MODE_TO_COMPONENT = {
1: 'LBL',
2: 'LBL',
3: 'LBH',
4: 'LBH',
5: 'HBA',
6: 'HBA',
7: 'HBA'
}
def antenna_id_polarization_from_rcu_type(rcu, type):
"""
Compute the antenna id for a given rcu, type and polarization
:param rcu: id of the rcu
:param type: type of the antenna
:return: the antenna id and polarization
:rtype: (int, str)
:rtype: int
"""
polarization_index = rcu % 2
if type in ['LBH', 'HBA']:
antenna_id = rcu - polarization_index
antenna_id /= 2.
elif type == 'LBL':
antenna_id = (rcu - polarization_index) / 2. + 48
else:
antenna_id = -1
return int(antenna_id)
def antenna_id_polarization_from_rcu_mode_polarization(rcu, mode):
if mode in MODE_TO_COMPONENT:
type = MODE_TO_COMPONENT[mode]
return antenna_id_polarization_from_rcu_type(rcu, type)
else:
raise Exception('Unrecognized mode %s' % mode)
class HolographyStationBeamSpecification(object):
"""
Contains the data regarding the beam specification for a single station.
"""
station_name = None
rcus_mode = None
sub_band_ids = ()
mode_description = None
rcus_involved = ()
beamlets = ()
antennas_used = ()
station_pointing = ()
virtual_pointing = ()
station_type = ()
def _parse_line(split_line):
"""
Parses a line in the holography specification file
:param split_line: dict containing the row indexed by the column name
:type split_line: dict[str, str]
"""
beam_specification = HolographyStationBeamSpecification()
beam_specification.rcus_mode = int(split_line['rcus_mode'])
beam_specification.sub_band_ids = [int(sub_band) for sub_band in
split_line['sub_band'].split(',')]
beam_specification.mode_description = split_line['mode_description']
beam_specification.rcus_involved = _parse_integer_range_or_list(
split_line['rcus'])
beam_specification.beamlets = split_line['beamlets']
beam_specification.antennas_used = [
antenna_id_polarization_from_rcu_mode_polarization(rcu,
beam_specification.rcus_mode)
for rcu in beam_specification.rcus_involved]
beam_specification.station_pointing = _parse_pointing(
split_line['station_pointing'])
beam_specification.virtual_pointing = _parse_pointing(
split_line['virtual_pointing'])
if len(beam_specification.sub_band_ids) == 1:
beam_specification.station_type = 'target'
else:
beam_specification.station_type = 'reference'
beam_specification.station_name = split_line['station_name']
return beam_specification
def list_bsf_files_in_path(path):
"""
List all the Holography beam specification files in the given path
:param path: path to the beam specification files
:type path: str
:return: the list of Holography Specifications
:rtype: list[HolographySpecification]
"""
logger.info("Loading holography beam specifications from \"%s\"...", path)
bsf_files_name_pattern = 'Holog-*.txt'
bsf_files_path_pattern = os.path.join(path, bsf_files_name_pattern)
matched_path_list = glob(bsf_files_path_pattern)
matched_file_path_list = list(
filter(lambda path_i: os.path.isfile(path_i), matched_path_list))
matched_file_name_list = list(map(lambda path_i: os.path.basename(path_i),
matched_file_path_list))
holography_beam_specification = create_hs_list_from_name_list_and_path(
matched_file_name_list, path)
logger.info("Loading holography beam specifications were successfully loaded from \"%s\".",
path)
return holography_beam_specification
def create_hs_list_from_name_list_and_path(name_list, path):
return [HolographySpecification(name, path) for name in name_list]
def is_holography_specification_file_name(name):
return re.match(HolographySpecification.hs_name_pattern, name) is not None
def extract_id_date_comment_from_name(name):
match = re.match(HolographySpecification.hs_name_pattern, name)
date = match.group('date')
hs_id = int(match.group('id'))
comment = match.group('comment')
date = datetime.datetime.strptime(date, '%Y%m%d')
return hs_id, date, comment
def _split_header(line):
"""
Split the header of holography specification file
:param line: line containing the header
:return: a dict containing the start_date,
the end_date, the rcu_mode, and the beam_switch_delay
:rtype: dict
"""
date_regex = '\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}'
format = r'(?P<start_date>{date_regex})\s*' \
'(?P<end_date>{date_regex})\s*' \
'(?P<rcu_mode>\d*)\s*' \
'(?P<beam_switch_delay>\d*.\d*)'.format(date_regex=date_regex)
match = re.match(format, line)
return match.groupdict()
def _split_line(line):
"""
Split the header of holography specification file
:param line: line containing the header
:return: a dict containing the start_date,
the end_date, the rcu_mode, and the beam_switch_delay
:rtype: dict
"""
range_regex = '(\d*\:\d*)|(\d*)'
ra_dec_regex = '(\d*\.\d*|nan),(-?\d*\.\d*|nan),(\w*)'
regex = r'^(?P<station_name>\w*)\s*' \
r'(?P<mode_description>\w*)\s*' \
r'(?P<sub_band>[\d,]*)\s*' \
r'(?P<beamlets>{range_regex})\s*' \
r'(?P<rcus>{range_regex})\s*' \
r'(?P<rcus_mode>(\d*))\s*' \
r'(?P<virtual_pointing>{ra_dec_regex})\s*' \
r'(?P<station_pointing>{ra_dec_regex})'.format(range_regex=range_regex,
ra_dec_regex=ra_dec_regex)
match = re.match(regex, line)
if match is None:
raise ValueError('Cannot parse line {}'.format(line))
return match.groupdict()
def _split_lines(lines):
return [_split_line(line)
for line in lines]
def _parse_pointing(pointing_string):
ra, dec, coordinate_system = pointing_string.split(',')
ra = float(ra)
dec = float(dec)
return dict(ra=ra, dec=dec, coordinate_system=coordinate_system)
def _parse_integer_range_or_list(string_to_be_parsed):
"""
Parses a string containing a list of int or an inclusive range and returns a list of int
ex "1,2,3,4,5" -> [1, 2, 3, 4, 5]
"1:4" -> [1, 2, 3, 4]
:param string_to_be_parsed: the string representing a list of int or a range
:return: a list of int
:rtype: list(int)
"""
if ':' in string_to_be_parsed:
try:
start, end = map(int, string_to_be_parsed.split(':'))
except ValueError as e:
raise ValueError('Cannot parse string %s expected [start]:[end] -> %s' %
(string_to_be_parsed, e))
return_value = [x for x in range(start, end)]
elif ',' in string_to_be_parsed:
try:
return_value = list(map(int, string_to_be_parsed.split(',')))
except ValueError as e:
raise ValueError('Cannot parse string %s expected [int],[int],... -> %s' %
(string_to_be_parsed, e))
else:
raise ValueError('Cannot parse string %s expected [int],[int],... or [start]:[end] %s' %
string_to_be_parsed)
return return_value
class HolographySpecification(object):
"""
The HolographySpecification represents the set of input used to specify an holography
observation with the LOFAR telescope
"""
hs_name_pattern = r'Holog-(?P<date>\d{8})-(?P<comment>.*)-(?P<id>\d{3}).txt'
def __init__(self, name, path):
"""
Construct the holography specification object and extracts id date and comment from
the file name (see HolographySpecification.extract_id_date_comment_from_name)
:param name: file name of the holography specification
:type name: str
:param path: path of the holography specification file
:type path: str
"""
self.path = os.path.join(path, name)
self.name = name
self.id, self.date, self.comment = extract_id_date_comment_from_name(name)
self.reference_station_names = None
self.target_station_names = None
self.station_specification_map = defaultdict(list)
self.start_datetime = None
self.end_datetime = None
self.rcu_mode = None
self.beam_set_interval = None
def __repr__(self):
return 'HolographySpecification(%s, %s, %s, %s, %s)' % (
self.id,
self.date,
self.comment,
self.name,
self.path,
)
def get_beam_specifications_per_station_name(self, station_name):
"""
Returns a list of beam specifications for the given station name
:param station_name: name of the station
:type station_name: str
:return: a list of beam specifications
:rtype: list[HolographyStationBeamSpecification]
"""
return self.station_specification_map[station_name]
def read_file(self):
logger.debug("Reading holography file \"%s\"...", self.path)
lines = self._read_lines()
self._parse_header(lines[0])
self._parse_lines(lines[1:])
self._update_class_attributes()
logger.debug("Reading holography file \"%s\" done.", self.path)
def _read_lines(self):
with open(self.path, 'r') as fstream_in:
return fstream_in.readlines()
def _parse_header(self, header):
"""
Parse the header
:param header: string containing the header
"""
date_time_format = '%Y-%m-%d %H:%M:%S'
split_header = _split_header(header)
self.start_datetime = datetime.datetime.strptime(split_header['start_date'],
date_time_format)
self.end_datetime = datetime.datetime.strptime(split_header['end_date'],
date_time_format)
self.rcu_mode = split_header['rcu_mode']
self.beam_set_interval = split_header['beam_switch_delay']
def _parse_lines(self, lines):
split_lines = _split_lines(lines)
for line in split_lines:
parsed_line = _parse_line(line)
self.station_specification_map[parsed_line.station_name] += [parsed_line]
def _update_class_attributes(self):
self.station_names = list(self.station_specification_map.keys())
self.reference_station_names = [station_name for station_name in
self.station_specification_map
if len(self.station_specification_map[station_name]) == 1]
self.target_station_names = [station_name for station_name in
self.station_specification_map
if len(self.station_specification_map[station_name]) > 1]