Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
calibration_table.py 4.67 KiB
import logging
from datetime import datetime
from re import fullmatch
from struct import iter_unpack, pack
from typing import BinaryIO
from typing import List

from dataclasses import dataclass, asdict
from numpy import empty as empty_ndarray, ndarray, fromiter as array_from_iter, float64

logger = logging.getLogger(__name__)

__MAX_HEADER_LINES = 100
__HEADER_LINE_PATTERN = '(^[A-z]*\.[A-z]*\.[A-z]*\s=\s.*$)|(^[A-z]*\.[A-z]*\s=\s.*$)'
__FREQUENCIES = 512
__FLOATS_PER_FREQUENCY = 2
__N_ANTENNAS_DUTCH = 96
__N_ANTENNAS_INTERNATIONAL = 192
_ATTRIBUTE_NAME_TO_SERIALIZED_NAME = {
    'observation_station': 'CalTableHeader.Observation.Station',
    'observation_mode': 'CalTableHeader.Observation.Mode',
    'observation_antennaset': 'CalTableHeader.Observation.AntennaSet',
    'observation_band': 'CalTableHeader.Observation.Band',
    'observation_source': 'CalTableHeader.Observation.Source',
    'observation_date': 'CalTableHeader.Observation.Date',
    'calibration_version': 'CalTableHeader.Calibration.Version',
    'calibration_name': 'CalTableHeader.Calibration.Name',
    'calibration_date': 'CalTableHeader.Calibration.Date',
    'calibration_ppsdelay': 'CalTableHeader.Calibration.PPSDelay',
    'comment': 'CalTableHeader.Comment'
}


class UnvalidFileException(Exception):
    def __init__(self, message):
        self.message = message


def _extract_header(fstream: BinaryIO):
    header = {}
    for i in range(__MAX_HEADER_LINES):
        line = fstream.readline().decode('utf8').rstrip('\n')

        if line == 'HeaderStop':
            break
        elif line == 'HeaderStart':
            continue
        elif fullmatch(__HEADER_LINE_PATTERN, line):

            key, value = line.split('=')

            key = key.lower().replace('caltableheader.', '').strip().replace('.', '_')
            value = value.strip()
            header[key] = value
        else:
            logger.error('unrecognized line \"%s\"', line)
            raise UnvalidFileException('unrecognized line \"%s\"' % line)
    if len(header) == 0:
        raise UnvalidFileException('empty header')
    return header


def parse_data(data_buffer):
    data = array_from_iter(map(lambda x: x[0], iter_unpack('d', data_buffer)), dtype=float)
    n_antennas = data.shape[0] // __FREQUENCIES // __FLOATS_PER_FREQUENCY
    if n_antennas not in [__N_ANTENNAS_DUTCH, __N_ANTENNAS_INTERNATIONAL]:
        raise UnvalidFileException('invalid data range expected %s or %s antennas got %s' %
                                   (__N_ANTENNAS_DUTCH,
                                    __N_ANTENNAS_INTERNATIONAL,
                                    n_antennas))

    data = data.reshape((__FREQUENCIES, n_antennas, __FLOATS_PER_FREQUENCY))
    complex_data = empty_ndarray([__FREQUENCIES, n_antennas], dtype=complex)
    complex_data.real = data[:, :, 0]
    complex_data.imag = data[:, :, 1]

    return complex_data


@dataclass(init=True, repr=True, frozen=True)
class CalibrationTable:
    observation_station: str
    observation_mode: str
    observation_antennaset: str
    observation_band: str
    observation_source: str
    observation_date: datetime
    calibration_version: int
    calibration_name: str
    calibration_date: datetime
    calibration_ppsdelay: List[int]
    data: ndarray
    comment: str = ''

    @staticmethod
    def load_from_file(file_path):
        with open(file_path, 'rb') as file_stream:
            header = _extract_header(file_stream)
            data_raw = file_stream.read()

        data = parse_data(data_raw)

        calibration_table = CalibrationTable(**header,
                                             data=data)
        return calibration_table

    def __serialize_header(self, f_stream: BinaryIO):
        f_stream.write(b'HeaderStart\n')

        for key, value in asdict(self).items():
            if key is 'data':
                # skipping field data
                continue
            serialized_name = _ATTRIBUTE_NAME_TO_SERIALIZED_NAME[key]
            serialized_line = '{} = {}\n'.format(serialized_name, value).encode('utf8')
            f_stream.write(serialized_line)

        f_stream.write(b'HeaderStop\n')

    def __serialize_data(self, f_stream: BinaryIO):
        dimensions = list(self.data.shape) + [2]
        data_reshaped = empty_ndarray(dimensions, dtype=float64)
        data_reshaped[:, :, 0] = self.data.real
        data_reshaped[:, :, 1] = self.data.imag
        data_flattened = data_reshaped.flatten()
        data_packed = pack('%sd' % len(data_flattened), *data_flattened)
        f_stream.write(data_packed)

    def store_to_file(self, file_path):
        with open(file_path, 'wb') as file_stream:
            self.__serialize_header(file_stream)
            self.__serialize_data(file_stream)