diff --git a/CAL/CalibrationCommon/lib/datacontainers/calibration_table.py b/CAL/CalibrationCommon/lib/datacontainers/calibration_table.py index b359e1e2130db90b7b1c382f8af14ddf895a96e7..14d232e3e8fd0601dfa5521ea44a6a1307121112 100644 --- a/CAL/CalibrationCommon/lib/datacontainers/calibration_table.py +++ b/CAL/CalibrationCommon/lib/datacontainers/calibration_table.py @@ -15,13 +15,6 @@ from copy import deepcopy logger = logging.getLogger(__name__) -__MAX_HEADER_LINES = 100 -__HEADER_LINE_PATTERN = '(^[A-z]*\.[A-z]*\.[A-z]*\s=\s.*$)|(^[A-z]*\.[A-z]*\s=\s.*$)' -__FREQUENCIES = 512 -__FLOATS_PER_FREQUENCY = 2 -__N_ANTENNAS_DUTCH = 96 -__N_ANTENNAS_INTERNATIONAL = 192 -__CALIBRATION_TABLE_FILENAME_PATTERN = '**/*CalTable-???-???-*_*.dat' _MODE_TO_CLOCK = {1: 200, 3: 200, 5: 200, 6: 160, 7: 200} _MODE_TO_NYQ_ZONE = {1: 1, 3: 1, 5: 2, 6: 1, 7: 3} @@ -38,56 +31,23 @@ _ATTRIBUTE_NAME_TO_SERIALIZED_NAME = { 'calibration_ppsdelay': 'CalTableHeader.Calibration.PPSDelay', 'comment': 'CalTableHeader.Comment' } +_CALIBRATION_TABLE_FILENAME_PATTERN = '**/*CalTable-???-???-*_*.dat' -class UnvalidFileException(Exception): +class InvalidFileException(Exception): def __init__(self, message): self.message = message -def _extract_header(fstream: BinaryIO): - header = {} - for i in range(__MAX_HEADER_LINES): - line = fstream.readline().decode('utf8').rstrip('\n') - - if line == 'HeaderStop': - break - elif line == 'HeaderStart': - continue - elif fullmatch(__HEADER_LINE_PATTERN, line): - - key, value = line.split('=') - - key = key.lower().replace('caltableheader.', '').strip().replace('.', '_') - value = value.strip() - header[key] = value - else: - logger.error('unrecognized line \"%s\"', line) - raise UnvalidFileException('unrecognized line \"%s\"' % line) - if len(header) == 0: - raise UnvalidFileException('empty header') - return header - - -def parse_data(data_buffer): - data = array_from_iter(map(lambda x: x[0], iter_unpack('d', data_buffer)), dtype=float) - n_antennas = data.shape[0] // __FREQUENCIES // __FLOATS_PER_FREQUENCY - if n_antennas not in [__N_ANTENNAS_DUTCH, __N_ANTENNAS_INTERNATIONAL]: - raise UnvalidFileException('invalid data range expected %s or %s antennas got %s' % - (__N_ANTENNAS_DUTCH, - __N_ANTENNAS_INTERNATIONAL, - n_antennas)) - - data = data.reshape((__FREQUENCIES, n_antennas, __FLOATS_PER_FREQUENCY)) - complex_data = empty_ndarray([__FREQUENCIES, n_antennas], dtype=complex) - complex_data.real = data[:, :, 0] - complex_data.imag = data[:, :, 1] - - return complex_data - - @dataclass(init=True, repr=True, frozen=False) class CalibrationTable: + _MAX_HEADER_LINES = 100 + _HEADER_LINE_PATTERN = r'(^[A-z]*\.[A-z]*\.[A-z]*\s=\s.*$)|(^[A-z]*\.[A-z]*\s=\s.*$)' + _FREQUENCIES = 512 + _FLOATS_PER_FREQUENCY = 2 + _N_ANTENNAS_DUTCH = 96 + _N_ANTENNAS_INTERNATIONAL = 192 + observation_station: str observation_mode: int observation_source: str @@ -101,26 +61,32 @@ class CalibrationTable: observation_antennaset: str = '' observation_band: str = '' + @staticmethod + def load_from_file(file_path): + logger.info('loading file %s', file_path) + with open(file_path, 'rb') as file_stream: + header = CalibrationTable._extract_header(file_stream) + data_raw = file_stream.read().rstrip(b'\n') + try: + data = CalibrationTable._parse_data(data_raw) + except Exception as e: - def __parse_attributes(self): - self.observation_mode = int(self.observation_mode) - self.calibration_version = int(self.calibration_version) - if isinstance(self.calibration_ppsdelay, str): - self.calibration_ppsdelay = list(map(int, self.calibration_ppsdelay. - lstrip('['). - rstrip(']'). - strip(). - split(' '))) + logger.error('error reading file %s', file_path) + logger.debug(data_raw) + logger.exception(e) + raise e + calibration_table = CalibrationTable(**header, + data=data) + return calibration_table - def __post_init__(self): - self.__parse_attributes() + @staticmethod + def load_from_hdf(file_descriptor: File, uri: str): + if uri not in file_descriptor: + raise ValueError('specified uri does not exist in %s' % file_descriptor.filename) - def frequencies(self) -> ndarray: - subbands = arange(1, 513, 1.) - clock = _MODE_TO_CLOCK[self.observation_mode] - nyquist_zone = _MODE_TO_NYQ_ZONE[self.observation_mode] - frequencies = subbands * clock / 1024. + (nyquist_zone - 1) * clock / 2. - return frequencies + data = array(file_descriptor[uri]) + + return CalibrationTable(data=data, **dict(file_descriptor[uri].attrs.items())) def derive_calibration_table_from_gain_fit(self, observation_source: str, @@ -139,25 +105,29 @@ class CalibrationTable: new_calibration_table.data = gains return new_calibration_table - @staticmethod - def load_from_file(file_path): - logger.info('loading file %s', file_path) - with open(file_path, 'rb') as file_stream: - header = _extract_header(file_stream) - data_raw = file_stream.read().rstrip(b'\n') - try: - data = parse_data(data_raw) - except Exception as e: + def frequencies(self) -> ndarray: + subbands = arange(1, 513, 1.) + clock = _MODE_TO_CLOCK[self.observation_mode] + nyquist_zone = _MODE_TO_NYQ_ZONE[self.observation_mode] + frequencies = subbands * clock / 1024. + (nyquist_zone - 1) * clock / 2. + return frequencies - logger.error('error reading file %s', file_path) - logger.debug(data_raw) - logger.exception(e) - raise e - calibration_table = CalibrationTable(**header, - data=data) - return calibration_table + def store_to_hdf(self, file_descriptor: File, uri: str): + if uri not in file_descriptor: + file_descriptor[uri] = self.data + for key, value in asdict(self).items(): + if key is 'data': + # skipping field data + continue + file_descriptor[uri].attrs[key] = value + file_descriptor.flush() + + def store_to_file(self, file_path): + with open(file_path, 'wb') as file_stream: + self._serialize_header(file_stream) + self._serialize_data(file_stream) - def __serialize_header(self, f_stream: BinaryIO): + def _serialize_header(self, f_stream: BinaryIO): f_stream.write(b'HeaderStart\n') for key, value in asdict(self).items(): @@ -174,7 +144,7 @@ class CalibrationTable: f_stream.write(b'HeaderStop\n') - def __serialize_data(self, f_stream: BinaryIO): + def _serialize_data(self, f_stream: BinaryIO): dimensions = list(self.data.shape) + [2] data_reshaped = empty_ndarray(dimensions, dtype=float64) data_reshaped[:, :, 0] = self.data.real @@ -183,38 +153,70 @@ class CalibrationTable: data_packed = pack('%sd' % len(data_flattened), *data_flattened) f_stream.write(data_packed) - def store_to_hdf(self, file_descriptor: File, uri: str): - if uri not in file_descriptor: - file_descriptor[uri] = self.data - for key, value in asdict(self).items(): - if key is 'data': - # skipping field data + @staticmethod + def _extract_header(fstream: BinaryIO): + header = {} + for i in range(CalibrationTable._MAX_HEADER_LINES): + line = fstream.readline().decode('utf8').rstrip('\n') + + if line == 'HeaderStop': + break + elif line == 'HeaderStart': continue - file_descriptor[uri].attrs[key] = value - file_descriptor.flush() + elif fullmatch(CalibrationTable._HEADER_LINE_PATTERN, line): + + key, value = line.split('=') + + key = key.lower().replace('caltableheader.', '').strip().replace('.', '_') + value = value.strip() + header[key] = value + else: + logger.error('unrecognized line \"%s\"', line) + raise InvalidFileException('unrecognized line \"%s\"' % line) + if len(header) == 0: + raise InvalidFileException('empty header') + return header @staticmethod - def load_from_hdf(file_descriptor: File, uri: str): - if uri not in file_descriptor: - raise ValueError('specified uri does not exist in %s' % file_descriptor.filename) + def _parse_data(data_buffer: bytes): + data = array_from_iter(map(lambda x: x[0], iter_unpack('d', data_buffer)), dtype=float) - data = array(file_descriptor[uri]) + n_antennas = data.shape[0] // CalibrationTable._FREQUENCIES // CalibrationTable._FLOATS_PER_FREQUENCY - return CalibrationTable(data=data, **dict(file_descriptor[uri].attrs.items())) + if n_antennas not in [CalibrationTable._N_ANTENNAS_DUTCH, CalibrationTable._N_ANTENNAS_INTERNATIONAL]: + raise InvalidFileException('invalid data range expected %s or %s antennas got %s' % + (CalibrationTable._N_ANTENNAS_DUTCH, + CalibrationTable._N_ANTENNAS_INTERNATIONAL, + n_antennas)) + + data = data.reshape((CalibrationTable._FREQUENCIES, n_antennas, CalibrationTable._FLOATS_PER_FREQUENCY)) + complex_data = empty_ndarray([CalibrationTable._FREQUENCIES, n_antennas], dtype=complex) + complex_data.real = data[:, :, 0] + complex_data.imag = data[:, :, 1] + + return complex_data + + def _parse_attributes(self): + self.observation_mode = int(self.observation_mode) + self.calibration_version = int(self.calibration_version) + if isinstance(self.calibration_ppsdelay, str): + self.calibration_ppsdelay = list(map(int, self.calibration_ppsdelay. + lstrip('['). + rstrip(']'). + strip(). + split(' '))) + + def __post_init__(self): + self._parse_attributes() def __eq__(self, other): return super().__eq__(other) and array_equal(self.data, other.data) - def store_to_file(self, file_path): - with open(file_path, 'wb') as file_stream: - self.__serialize_header(file_stream) - self.__serialize_data(file_stream) - def read_calibration_tables_in_directory(directory_path: str): if not path.isdir(directory_path): raise NotADirectoryError(directory_path) - files = path.join(directory_path, __CALIBRATION_TABLE_FILENAME_PATTERN) + files = path.join(directory_path, _CALIBRATION_TABLE_FILENAME_PATTERN) return [CalibrationTable.load_from_file(file_path) for file_path in glob(files, recursive=True)]