-
Mattia Mancini authoredMattia Mancini authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
calibration_table.py 4.67 KiB
import logging
from datetime import datetime
from re import fullmatch
from struct import iter_unpack, pack
from typing import BinaryIO
from typing import List
from dataclasses import dataclass, asdict
from numpy import empty as empty_ndarray, ndarray, fromiter as array_from_iter, float64
logger = logging.getLogger(__name__)
__MAX_HEADER_LINES = 100
__HEADER_LINE_PATTERN = '(^[A-z]*\.[A-z]*\.[A-z]*\s=\s.*$)|(^[A-z]*\.[A-z]*\s=\s.*$)'
__FREQUENCIES = 512
__FLOATS_PER_FREQUENCY = 2
__N_ANTENNAS_DUTCH = 96
__N_ANTENNAS_INTERNATIONAL = 192
_ATTRIBUTE_NAME_TO_SERIALIZED_NAME = {
'observation_station': 'CalTableHeader.Observation.Station',
'observation_mode': 'CalTableHeader.Observation.Mode',
'observation_antennaset': 'CalTableHeader.Observation.AntennaSet',
'observation_band': 'CalTableHeader.Observation.Band',
'observation_source': 'CalTableHeader.Observation.Source',
'observation_date': 'CalTableHeader.Observation.Date',
'calibration_version': 'CalTableHeader.Calibration.Version',
'calibration_name': 'CalTableHeader.Calibration.Name',
'calibration_date': 'CalTableHeader.Calibration.Date',
'calibration_ppsdelay': 'CalTableHeader.Calibration.PPSDelay',
'comment': 'CalTableHeader.Comment'
}
class UnvalidFileException(Exception):
def __init__(self, message):
self.message = message
def _extract_header(fstream: BinaryIO):
header = {}
for i in range(__MAX_HEADER_LINES):
line = fstream.readline().decode('utf8').rstrip('\n')
if line == 'HeaderStop':
break
elif line == 'HeaderStart':
continue
elif fullmatch(__HEADER_LINE_PATTERN, line):
key, value = line.split('=')
key = key.lower().replace('caltableheader.', '').strip().replace('.', '_')
value = value.strip()
header[key] = value
else:
logger.error('unrecognized line \"%s\"', line)
raise UnvalidFileException('unrecognized line \"%s\"' % line)
if len(header) == 0:
raise UnvalidFileException('empty header')
return header
def parse_data(data_buffer):
data = array_from_iter(map(lambda x: x[0], iter_unpack('d', data_buffer)), dtype=float)
n_antennas = data.shape[0] // __FREQUENCIES // __FLOATS_PER_FREQUENCY
if n_antennas not in [__N_ANTENNAS_DUTCH, __N_ANTENNAS_INTERNATIONAL]:
raise UnvalidFileException('invalid data range expected %s or %s antennas got %s' %
(__N_ANTENNAS_DUTCH,
__N_ANTENNAS_INTERNATIONAL,
n_antennas))
data = data.reshape((__FREQUENCIES, n_antennas, __FLOATS_PER_FREQUENCY))
complex_data = empty_ndarray([__FREQUENCIES, n_antennas], dtype=complex)
complex_data.real = data[:, :, 0]
complex_data.imag = data[:, :, 1]
return complex_data
@dataclass(init=True, repr=True, frozen=True)
class CalibrationTable:
observation_station: str
observation_mode: str
observation_antennaset: str
observation_band: str
observation_source: str
observation_date: datetime
calibration_version: int
calibration_name: str
calibration_date: datetime
calibration_ppsdelay: List[int]
data: ndarray
comment: str = ''
@staticmethod
def load_from_file(file_path):
with open(file_path, 'rb') as file_stream:
header = _extract_header(file_stream)
data_raw = file_stream.read()
data = parse_data(data_raw)
calibration_table = CalibrationTable(**header,
data=data)
return calibration_table
def __serialize_header(self, f_stream: BinaryIO):
f_stream.write(b'HeaderStart\n')
for key, value in asdict(self).items():
if key is 'data':
# skipping field data
continue
serialized_name = _ATTRIBUTE_NAME_TO_SERIALIZED_NAME[key]
serialized_line = '{} = {}\n'.format(serialized_name, value).encode('utf8')
f_stream.write(serialized_line)
f_stream.write(b'HeaderStop\n')
def __serialize_data(self, f_stream: BinaryIO):
dimensions = list(self.data.shape) + [2]
data_reshaped = empty_ndarray(dimensions, dtype=float64)
data_reshaped[:, :, 0] = self.data.real
data_reshaped[:, :, 1] = self.data.imag
data_flattened = data_reshaped.flatten()
data_packed = pack('%sd' % len(data_flattened), *data_flattened)
f_stream.write(data_packed)
def store_to_file(self, file_path):
with open(file_path, 'wb') as file_stream:
self.__serialize_header(file_stream)
self.__serialize_data(file_stream)