Skip to content
Snippets Groups Projects
Commit 9a1e800b authored by Mattia Mancini's avatar Mattia Mancini
Browse files

SSB-47: including calibration table used for the observation and the one...

SSB-47: including calibration table used for the observation and the one derived after the holography calibration
parent 313e4a77
No related branches found
No related tags found
1 merge request!44Merge back holography to master
import logging
import os
import h5py
from .calibration_table import CalibrationTable
from .holography_dataset_definitions import *
from .holography_specification import HolographySpecification
from .holography_observation import HolographyObservation
from .holography_specification import HolographySpecification
logger = logging.getLogger(__file__)
......@@ -68,7 +70,8 @@ class HolographyDataset():
# The central beam or beamlet for each frequency
self.central_beamlets = dict()
self.calibration_tables = dict()
self.derived_calibration_tables = dict()
# coordinates of the antenna position in the target
self.antenna_field_position = []
# list of reference station names
......@@ -611,6 +614,18 @@ class HolographyDataset():
result.frequencies = list(f[HDS_FREQUENCY])
if HDS_CALIBRATION_TABLES in f:
for mode in f[HDS_CALIBRATION_TABLES]:
uri = '/%s/%s' % (HDS_CALIBRATION_TABLES, mode)
result.calibration_tables[mode] = \
CalibrationTable.load_from_hdf(file_descriptor=f, uri=uri)
if HDS_DERIVED_CAL_TABLES in f:
for mode in f[HDS_DERIVED_CAL_TABLES]:
uri = '/%s/%s' % (HDS_DERIVED_CAL_TABLES, mode)
result.calibration_tables[mode] = \
CalibrationTable.load_from_hdf(file_descriptor=f, uri=uri)
result.ra_dec = dict()
for frequency in f["RA_DEC"].keys():
for beamlet in f["RA_DEC"][frequency].keys():
......@@ -642,6 +657,10 @@ class HolographyDataset():
return result, f
def insert_calibration_table(self, caltable: CalibrationTable):
mode = caltable.observation_mode
self.calibration_tables[mode] = caltable
def store_to_file(self, path):
"""
Stores the holography dataset at the given path
......@@ -679,26 +698,27 @@ class HolographyDataset():
f[HDS_REFERENCE_STATION] = to_numpy_array_string(self.reference_stations)
f[HDS_FREQUENCY] = self.frequencies
f.create_group(HDS_CALIBRATION_TABLES)
for mode in self.calibration_tables:
self.calibration_tables[mode].store_to_hdf(f,
'/{}/{}'.format(
HDS_CALIBRATION_TABLES, mode))
for mode in self.derived_calibration_tables:
self.derived_calibration_tables[mode].store_to_hdf(f,
'/{}/{}'.format(
HDS_DERIVED_CAL_TABLES,
mode))
HolographyDataset._store_grouped_data(h5file=f, uri='/RA_DEC',
data_to_store=self.ra_dec)
# We create groups for the reference stations and the _frequencies.
# Then we store the data samples [XX, YY, XY, YX, t, l, m, flag]
# in an array. The reference station name, the frequency and the
# beamlet number (index of the data sample array) allow random
# access of the data.
f.create_group("RA_DEC")
for frequency in self.ra_dec.keys():
f["RA_DEC"].create_group(frequency)
for beamlet in self.ra_dec[frequency].keys():
f["RA_DEC"][frequency][beamlet] = self.ra_dec[frequency][beamlet]
f.create_group("CROSSCORRELATION")
for reference_station in self.data.keys():
f["CROSSCORRELATION"].create_group(reference_station)
for frequency in self.data[reference_station].keys():
f["CROSSCORRELATION"][reference_station].create_group(frequency)
for beamlet in self.data[reference_station][frequency].keys():
f["CROSSCORRELATION"][reference_station][frequency][beamlet] = \
self.data[reference_station][frequency][beamlet]
HolographyDataset._store_grouped_data(h5file=f, uri='/CROSSCORRELATION',
data_to_store=self.data)
HolographyDataset._store_grouped_data(h5file=f,
uri=HDS_CENTRAL_BEAMLETS,
......
......@@ -20,6 +20,8 @@ HDS_SPECIFIED_REFERENCE_STATION = "Specified_Reference_station"
HDS_SPECIFIED_FREQUENCY = "Specified_frequency"
HDS_BEAMLETS = "Beamlets"
HDS_CENTRAL_BEAMLETS = "Central_beamlets"
HDS_CALIBRATION_TABLES = "Calibration_tables"
HDS_DERIVED_CAL_TABLES = "Derived_calibration_tables"
# GROUP "RA DEC"
HDS_SPECIFIED_RA_DEC = "Specified_RA_DEC"
......@@ -35,7 +37,7 @@ HDS_DATA = "Data"
HDS_coordinate_type = numpy.dtype([
('RA', numpy.float64),
('DEC', numpy.float64),
('EPOCH', 'S10')])
('EPOCH', '<S10')])
# One element that is stored in "Data".
HDS_data_sample_type = numpy.dtype([
......
#!/usr/bin/env python3
import argparse
import logging
from functools import reduce
import signal
import time
from multiprocessing import Pool as ThreadPool
......@@ -10,6 +9,9 @@ import os
from lofar.calibration.common.utils import *
from lofar.calibration.common.datacontainers.holography_dataset import HolographyDataset
from lofar.calibration.common.datacontainers.calibration_table import \
read_calibration_tables_per_station_mode
import logging
DEFAULT_SLEEP_TIME = 1
......@@ -22,6 +24,7 @@ def main():
read_holography_datasets_and_store(arguments.input_path, arguments.output_path,
holography_bsf_path=arguments.holography_bsf,
holography_ms_path=arguments.holography_ms,
calibration_tables_path=arguments.calibration_tables,
n_processes=arguments.num_proc)
......@@ -57,6 +60,9 @@ def specify_command_line_arguments():
' specification file', default=None)
parser.add_argument('--holography_ms', help='override default path for the holography'
' observation MS files', default=None)
parser.add_argument('--calibration_tables', help='override default path for the holography'
' calibration table files used', default=None)
parser.add_argument('--num_proc', help='number of processes used to convert the holography'
' observation', type=int)
parser.add_argument('-v', help='verbose logging', action='store_true')
......@@ -64,7 +70,10 @@ def specify_command_line_arguments():
return parser
def read_and_store_single_target_station(target_station_name, matched_bsf_ms_pair, store_path):
def read_and_store_single_target_station(target_station_name,
matched_bsf_ms_pair,
caltables,
store_path):
"""
Reads a single target station hdf5 file from a list of bsf and ms pairs
:param target_station_name: target station name to process
......@@ -73,14 +82,28 @@ def read_and_store_single_target_station(target_station_name, matched_bsf_ms_pai
its specific holography observation
:type matched_bsf_ms_pair: list[(lofar.calibration.common.datacontainers.HolographySpecification,
lofar.calibration.common.datacontainers.HolographyObservation)]
:param caltables: calibration tables used during the observation
:type caltables: Dict[Tuple[str, int], CalibrationTable]
:param store_path: path where to store the file
:type store_path: str
:return:
"""
holography_dataset = HolographyDataset()
logger.info('Loading data for station %s ', target_station_name)
holography_dataset.load_from_beam_specification_and_ms(target_station_name,
matched_bsf_ms_pair)
logger.info('Loading calibration tables per station %s', target_station_name)
station_name = target_station_name.replace('HBA0', '').replace('HBA1', '').replace('HBA', '')
try:
holography_dataset.insert_calibration_table(caltables[(station_name,
holography_dataset.mode)])
except KeyError:
logger.error('cannot calibration file for station %s and mode %s', station_name,
holography_dataset.mode)
raise SystemExit()
filename = '%s.hdf5' % target_station_name
outpath = os.path.join(store_path, filename)
......@@ -110,18 +133,20 @@ def wait_for_processes_to_complete_execution(thread_pool,
:type future_results: list[multiprocess.pool.ApplyResult]
"""
all_done = False
print(
'dude'
)
n_to_be_processed = len(future_results)
try:
while not all_done:
all_done = True
for k in future_results:
all_done = all_done and k.ready()
print(k)
print(future_results)
are_results_ready = list(map(lambda x: x.ready(), future_results))
result_status = map(lambda x: 1 if x else 0,
are_results_ready)
n_processed_dataset = sum(result_status)
logger.info('processed %s of %s', n_processed_dataset, n_to_be_processed)
all_done = reduce(lambda x, y: x and y, are_results_ready, True)
time.sleep(sleep_time)
except KeyboardInterrupt:
thread_pool.terminate()
thread_pool.join()
......@@ -134,6 +159,7 @@ def read_holography_datasets_and_store(holography_observation_path,
store_path,
holography_bsf_path=None,
holography_ms_path=None,
calibration_tables_path=None,
n_processes=1,
sleep_time=DEFAULT_SLEEP_TIME):
......@@ -147,6 +173,8 @@ def read_holography_datasets_and_store(holography_observation_path,
:type holography_bsf_path: str
:param holography_ms_path: path to the measurement set directory
:type holography_ms_path: str
:param calibration_tables_path: path to the calibration tables
:type calibration_tables_path: str
:param n_processes: number of precesses to spawn to do the conversion
:type n_processes: int
"""
......@@ -155,6 +183,10 @@ def read_holography_datasets_and_store(holography_observation_path,
raise NotImplementedError()
if holography_bsf_path is not None:
raise NotImplementedError()
if calibration_tables_path is None:
calibration_tables_path = holography_observation_path
target_station_names = list_all_target_stations(holography_observation_path)
logger.debug('target station names %s', target_station_names)
......@@ -162,6 +194,12 @@ def read_holography_datasets_and_store(holography_observation_path,
match_holography_beam_specification_file_with_observation(holography_observation_path)
logger.debug('matched beam specification files and measurement sets: %s',
matched_bsf_ms_pair)
calibration_tables = read_calibration_tables_per_station_mode(calibration_tables_path)
logger.info('calibration tables found %s', list(calibration_tables.keys()))
if len(calibration_tables) == 0:
logger.error('calibration tables not found in directory %s', calibration_tables_path)
raise SystemExit
try:
if not os.path.exists(store_path):
os.makedirs(store_path)
......@@ -173,7 +211,8 @@ def read_holography_datasets_and_store(holography_observation_path,
results = []
for station_name in target_station_names:
result = thread_pool.apply_async(read_and_store_single_target_station,
(station_name, matched_bsf_ms_pair, store_path))
(station_name, matched_bsf_ms_pair, calibration_tables,
store_path))
results.append(result)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment