Skip to content
Snippets Groups Projects
Commit b095c8b3 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

SSB-47: Move module's function in class

parent 8e4b2944
No related branches found
No related tags found
1 merge request!44Merge back holography to master
......@@ -15,13 +15,6 @@ from copy import deepcopy
logger = logging.getLogger(__name__)
__MAX_HEADER_LINES = 100
__HEADER_LINE_PATTERN = '(^[A-z]*\.[A-z]*\.[A-z]*\s=\s.*$)|(^[A-z]*\.[A-z]*\s=\s.*$)'
__FREQUENCIES = 512
__FLOATS_PER_FREQUENCY = 2
__N_ANTENNAS_DUTCH = 96
__N_ANTENNAS_INTERNATIONAL = 192
__CALIBRATION_TABLE_FILENAME_PATTERN = '**/*CalTable-???-???-*_*.dat'
_MODE_TO_CLOCK = {1: 200, 3: 200, 5: 200, 6: 160, 7: 200}
_MODE_TO_NYQ_ZONE = {1: 1, 3: 1, 5: 2, 6: 1, 7: 3}
......@@ -38,56 +31,23 @@ _ATTRIBUTE_NAME_TO_SERIALIZED_NAME = {
'calibration_ppsdelay': 'CalTableHeader.Calibration.PPSDelay',
'comment': 'CalTableHeader.Comment'
}
_CALIBRATION_TABLE_FILENAME_PATTERN = '**/*CalTable-???-???-*_*.dat'
class UnvalidFileException(Exception):
class InvalidFileException(Exception):
def __init__(self, message):
self.message = message
def _extract_header(fstream: BinaryIO):
header = {}
for i in range(__MAX_HEADER_LINES):
line = fstream.readline().decode('utf8').rstrip('\n')
if line == 'HeaderStop':
break
elif line == 'HeaderStart':
continue
elif fullmatch(__HEADER_LINE_PATTERN, line):
key, value = line.split('=')
key = key.lower().replace('caltableheader.', '').strip().replace('.', '_')
value = value.strip()
header[key] = value
else:
logger.error('unrecognized line \"%s\"', line)
raise UnvalidFileException('unrecognized line \"%s\"' % line)
if len(header) == 0:
raise UnvalidFileException('empty header')
return header
def parse_data(data_buffer):
data = array_from_iter(map(lambda x: x[0], iter_unpack('d', data_buffer)), dtype=float)
n_antennas = data.shape[0] // __FREQUENCIES // __FLOATS_PER_FREQUENCY
if n_antennas not in [__N_ANTENNAS_DUTCH, __N_ANTENNAS_INTERNATIONAL]:
raise UnvalidFileException('invalid data range expected %s or %s antennas got %s' %
(__N_ANTENNAS_DUTCH,
__N_ANTENNAS_INTERNATIONAL,
n_antennas))
data = data.reshape((__FREQUENCIES, n_antennas, __FLOATS_PER_FREQUENCY))
complex_data = empty_ndarray([__FREQUENCIES, n_antennas], dtype=complex)
complex_data.real = data[:, :, 0]
complex_data.imag = data[:, :, 1]
return complex_data
@dataclass(init=True, repr=True, frozen=False)
class CalibrationTable:
_MAX_HEADER_LINES = 100
_HEADER_LINE_PATTERN = r'(^[A-z]*\.[A-z]*\.[A-z]*\s=\s.*$)|(^[A-z]*\.[A-z]*\s=\s.*$)'
_FREQUENCIES = 512
_FLOATS_PER_FREQUENCY = 2
_N_ANTENNAS_DUTCH = 96
_N_ANTENNAS_INTERNATIONAL = 192
observation_station: str
observation_mode: int
observation_source: str
......@@ -101,26 +61,32 @@ class CalibrationTable:
observation_antennaset: str = ''
observation_band: str = ''
@staticmethod
def load_from_file(file_path):
logger.info('loading file %s', file_path)
with open(file_path, 'rb') as file_stream:
header = CalibrationTable._extract_header(file_stream)
data_raw = file_stream.read().rstrip(b'\n')
try:
data = CalibrationTable._parse_data(data_raw)
except Exception as e:
def __parse_attributes(self):
self.observation_mode = int(self.observation_mode)
self.calibration_version = int(self.calibration_version)
if isinstance(self.calibration_ppsdelay, str):
self.calibration_ppsdelay = list(map(int, self.calibration_ppsdelay.
lstrip('[').
rstrip(']').
strip().
split(' ')))
logger.error('error reading file %s', file_path)
logger.debug(data_raw)
logger.exception(e)
raise e
calibration_table = CalibrationTable(**header,
data=data)
return calibration_table
def __post_init__(self):
self.__parse_attributes()
@staticmethod
def load_from_hdf(file_descriptor: File, uri: str):
if uri not in file_descriptor:
raise ValueError('specified uri does not exist in %s' % file_descriptor.filename)
def frequencies(self) -> ndarray:
subbands = arange(1, 513, 1.)
clock = _MODE_TO_CLOCK[self.observation_mode]
nyquist_zone = _MODE_TO_NYQ_ZONE[self.observation_mode]
frequencies = subbands * clock / 1024. + (nyquist_zone - 1) * clock / 2.
return frequencies
data = array(file_descriptor[uri])
return CalibrationTable(data=data, **dict(file_descriptor[uri].attrs.items()))
def derive_calibration_table_from_gain_fit(self,
observation_source: str,
......@@ -139,25 +105,29 @@ class CalibrationTable:
new_calibration_table.data = gains
return new_calibration_table
@staticmethod
def load_from_file(file_path):
logger.info('loading file %s', file_path)
with open(file_path, 'rb') as file_stream:
header = _extract_header(file_stream)
data_raw = file_stream.read().rstrip(b'\n')
try:
data = parse_data(data_raw)
except Exception as e:
def frequencies(self) -> ndarray:
subbands = arange(1, 513, 1.)
clock = _MODE_TO_CLOCK[self.observation_mode]
nyquist_zone = _MODE_TO_NYQ_ZONE[self.observation_mode]
frequencies = subbands * clock / 1024. + (nyquist_zone - 1) * clock / 2.
return frequencies
logger.error('error reading file %s', file_path)
logger.debug(data_raw)
logger.exception(e)
raise e
calibration_table = CalibrationTable(**header,
data=data)
return calibration_table
def store_to_hdf(self, file_descriptor: File, uri: str):
if uri not in file_descriptor:
file_descriptor[uri] = self.data
for key, value in asdict(self).items():
if key is 'data':
# skipping field data
continue
file_descriptor[uri].attrs[key] = value
file_descriptor.flush()
def store_to_file(self, file_path):
with open(file_path, 'wb') as file_stream:
self._serialize_header(file_stream)
self._serialize_data(file_stream)
def __serialize_header(self, f_stream: BinaryIO):
def _serialize_header(self, f_stream: BinaryIO):
f_stream.write(b'HeaderStart\n')
for key, value in asdict(self).items():
......@@ -174,7 +144,7 @@ class CalibrationTable:
f_stream.write(b'HeaderStop\n')
def __serialize_data(self, f_stream: BinaryIO):
def _serialize_data(self, f_stream: BinaryIO):
dimensions = list(self.data.shape) + [2]
data_reshaped = empty_ndarray(dimensions, dtype=float64)
data_reshaped[:, :, 0] = self.data.real
......@@ -183,38 +153,70 @@ class CalibrationTable:
data_packed = pack('%sd' % len(data_flattened), *data_flattened)
f_stream.write(data_packed)
def store_to_hdf(self, file_descriptor: File, uri: str):
if uri not in file_descriptor:
file_descriptor[uri] = self.data
for key, value in asdict(self).items():
if key is 'data':
# skipping field data
@staticmethod
def _extract_header(fstream: BinaryIO):
header = {}
for i in range(CalibrationTable._MAX_HEADER_LINES):
line = fstream.readline().decode('utf8').rstrip('\n')
if line == 'HeaderStop':
break
elif line == 'HeaderStart':
continue
file_descriptor[uri].attrs[key] = value
file_descriptor.flush()
elif fullmatch(CalibrationTable._HEADER_LINE_PATTERN, line):
key, value = line.split('=')
key = key.lower().replace('caltableheader.', '').strip().replace('.', '_')
value = value.strip()
header[key] = value
else:
logger.error('unrecognized line \"%s\"', line)
raise InvalidFileException('unrecognized line \"%s\"' % line)
if len(header) == 0:
raise InvalidFileException('empty header')
return header
@staticmethod
def load_from_hdf(file_descriptor: File, uri: str):
if uri not in file_descriptor:
raise ValueError('specified uri does not exist in %s' % file_descriptor.filename)
def _parse_data(data_buffer: bytes):
data = array_from_iter(map(lambda x: x[0], iter_unpack('d', data_buffer)), dtype=float)
data = array(file_descriptor[uri])
n_antennas = data.shape[0] // CalibrationTable._FREQUENCIES // CalibrationTable._FLOATS_PER_FREQUENCY
return CalibrationTable(data=data, **dict(file_descriptor[uri].attrs.items()))
if n_antennas not in [CalibrationTable._N_ANTENNAS_DUTCH, CalibrationTable._N_ANTENNAS_INTERNATIONAL]:
raise InvalidFileException('invalid data range expected %s or %s antennas got %s' %
(CalibrationTable._N_ANTENNAS_DUTCH,
CalibrationTable._N_ANTENNAS_INTERNATIONAL,
n_antennas))
data = data.reshape((CalibrationTable._FREQUENCIES, n_antennas, CalibrationTable._FLOATS_PER_FREQUENCY))
complex_data = empty_ndarray([CalibrationTable._FREQUENCIES, n_antennas], dtype=complex)
complex_data.real = data[:, :, 0]
complex_data.imag = data[:, :, 1]
return complex_data
def _parse_attributes(self):
self.observation_mode = int(self.observation_mode)
self.calibration_version = int(self.calibration_version)
if isinstance(self.calibration_ppsdelay, str):
self.calibration_ppsdelay = list(map(int, self.calibration_ppsdelay.
lstrip('[').
rstrip(']').
strip().
split(' ')))
def __post_init__(self):
self._parse_attributes()
def __eq__(self, other):
return super().__eq__(other) and array_equal(self.data, other.data)
def store_to_file(self, file_path):
with open(file_path, 'wb') as file_stream:
self.__serialize_header(file_stream)
self.__serialize_data(file_stream)
def read_calibration_tables_in_directory(directory_path: str):
if not path.isdir(directory_path):
raise NotADirectoryError(directory_path)
files = path.join(directory_path, __CALIBRATION_TABLE_FILENAME_PATTERN)
files = path.join(directory_path, _CALIBRATION_TABLE_FILENAME_PATTERN)
return [CalibrationTable.load_from_file(file_path)
for file_path in glob(files, recursive=True)]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment