Skip to content
Snippets Groups Projects
Commit f32eb766 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

SSB-47: implemented storing function for calibration table

parent 1b1d363a
No related branches found
No related tags found
1 merge request!44Merge back holography to master
import logging
from datetime import datetime
from re import fullmatch
from struct import iter_unpack
from struct import iter_unpack, pack
from typing import BinaryIO
from typing import List
......@@ -12,7 +12,10 @@ logger = logging.getLogger(__name__)
__MAX_HEADER_LINES = 100
__HEADER_LINE_PATTERN = '(^[A-z]*\.[A-z]*\.[A-z]*\s=\s.*$)|(^[A-z]*\.[A-z]*\s=\s.*$)'
__FREQUENCIES = 512
__FLOATS_PER_FREQUENCY = 2
__N_ANTENNAS_DUTCH = 96
__N_ANTENNAS_INTERNATIONAL = 192
_ATTRIBUTE_NAME_TO_SERIALIZED_NAME = {
'observation_station': 'CalTableHeader.Observation.Station',
'observation_mode': 'CalTableHeader.Observation.Mode',
......@@ -50,19 +53,13 @@ def _extract_header(fstream: BinaryIO):
value = value.strip()
header[key] = value
else:
logger.error('unrecognized line %s', line)
raise UnvalidFileException('unrecognized line %s' % line)
logger.error('unrecognized line \"%s\"', line)
raise UnvalidFileException('unrecognized line \"%s\"' % line)
if len(header) == 0:
raise UnvalidFileException('empty header')
return header
__FREQUENCIES = 512
__FLOATS_PER_FREQUENCY = 2
__N_ANTENNAS_DUTCH = 96
__N_ANTENNAS_INTERNATIONAL = 192
def parse_data(data_buffer):
data = array_from_iter(map(lambda x: x[0], iter_unpack('d', data_buffer)), dtype=float)
n_antennas = data.shape[0] // __FREQUENCIES // __FLOATS_PER_FREQUENCY
......@@ -108,7 +105,7 @@ class CalibrationTable:
return calibration_table
def __serialize_header(self, f_stream: BinaryIO):
f_stream.write(b'HeaderStart')
f_stream.write(b'HeaderStart\n')
for key, value in asdict(self).items():
if key is 'data':
......@@ -118,17 +115,18 @@ class CalibrationTable:
serialized_line = '{} = {}\n'.format(serialized_name, value).encode('utf8')
f_stream.write(serialized_line)
f_stream.write(b'HeaderStop')
f_stream.write(b'HeaderStop\n')
def __serialize_data(self, f_stream: BinaryIO):
dimensions = list(self.data.shape) + [2]
data_flatten = empty_ndarray(dimensions, dtype=float64)
data_flatten[:, :, 0] = self.data.real
data_flatten[:, :, 1] = self.data.imag
data_flattened = data_flatten.flatten().tolist()
f_stream.write()
data_reshaped = empty_ndarray(dimensions, dtype=float64)
data_reshaped[:, :, 0] = self.data.real
data_reshaped[:, :, 1] = self.data.imag
data_flattened = data_reshaped.flatten()
data_packed = pack('%sd' % len(data_flattened), *data_flattened)
f_stream.write(data_packed)
def store_to_file(self, file_path):
with open(file_path, 'wb') as file_stream:
self.__serialize_header(file_stream)
self.__serialize_data(file_stream)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment