diff --git a/CAL/CalibrationCommon/lib/datacontainers/holography_dataset.py b/CAL/CalibrationCommon/lib/datacontainers/holography_dataset.py index ba5d413953ddfb885f778017979b33b8cc70cb50..2a8782ac50c7b4eb22dc8e7486ae399d2584a9d9 100644 --- a/CAL/CalibrationCommon/lib/datacontainers/holography_dataset.py +++ b/CAL/CalibrationCommon/lib/datacontainers/holography_dataset.py @@ -1,10 +1,12 @@ import logging import os + import h5py +from .calibration_table import CalibrationTable from .holography_dataset_definitions import * -from .holography_specification import HolographySpecification from .holography_observation import HolographyObservation +from .holography_specification import HolographySpecification logger = logging.getLogger(__file__) @@ -68,7 +70,8 @@ class HolographyDataset(): # The central beam or beamlet for each frequency self.central_beamlets = dict() - + self.calibration_tables = dict() + self.derived_calibration_tables = dict() # coordinates of the antenna position in the target self.antenna_field_position = [] # list of reference station names @@ -611,6 +614,18 @@ class HolographyDataset(): result.frequencies = list(f[HDS_FREQUENCY]) + if HDS_CALIBRATION_TABLES in f: + for mode in f[HDS_CALIBRATION_TABLES]: + uri = '/%s/%s' % (HDS_CALIBRATION_TABLES, mode) + result.calibration_tables[mode] = \ + CalibrationTable.load_from_hdf(file_descriptor=f, uri=uri) + + if HDS_DERIVED_CAL_TABLES in f: + for mode in f[HDS_DERIVED_CAL_TABLES]: + uri = '/%s/%s' % (HDS_DERIVED_CAL_TABLES, mode) + result.calibration_tables[mode] = \ + CalibrationTable.load_from_hdf(file_descriptor=f, uri=uri) + result.ra_dec = dict() for frequency in f["RA_DEC"].keys(): for beamlet in f["RA_DEC"][frequency].keys(): @@ -642,6 +657,10 @@ class HolographyDataset(): return result, f + def insert_calibration_table(self, caltable: CalibrationTable): + mode = caltable.observation_mode + self.calibration_tables[mode] = caltable + def store_to_file(self, path): """ Stores the holography dataset at the given path @@ -679,26 +698,27 @@ class HolographyDataset(): f[HDS_REFERENCE_STATION] = to_numpy_array_string(self.reference_stations) f[HDS_FREQUENCY] = self.frequencies + f.create_group(HDS_CALIBRATION_TABLES) + for mode in self.calibration_tables: + self.calibration_tables[mode].store_to_hdf(f, + '/{}/{}'.format( + HDS_CALIBRATION_TABLES, mode)) + for mode in self.derived_calibration_tables: + self.derived_calibration_tables[mode].store_to_hdf(f, + '/{}/{}'.format( + HDS_DERIVED_CAL_TABLES, + mode)) + + HolographyDataset._store_grouped_data(h5file=f, uri='/RA_DEC', + data_to_store=self.ra_dec) + # We create groups for the reference stations and the _frequencies. # Then we store the data samples [XX, YY, XY, YX, t, l, m, flag] # in an array. The reference station name, the frequency and the # beamlet number (index of the data sample array) allow random # access of the data. - - f.create_group("RA_DEC") - for frequency in self.ra_dec.keys(): - f["RA_DEC"].create_group(frequency) - for beamlet in self.ra_dec[frequency].keys(): - f["RA_DEC"][frequency][beamlet] = self.ra_dec[frequency][beamlet] - - f.create_group("CROSSCORRELATION") - for reference_station in self.data.keys(): - f["CROSSCORRELATION"].create_group(reference_station) - for frequency in self.data[reference_station].keys(): - f["CROSSCORRELATION"][reference_station].create_group(frequency) - for beamlet in self.data[reference_station][frequency].keys(): - f["CROSSCORRELATION"][reference_station][frequency][beamlet] = \ - self.data[reference_station][frequency][beamlet] + HolographyDataset._store_grouped_data(h5file=f, uri='/CROSSCORRELATION', + data_to_store=self.data) HolographyDataset._store_grouped_data(h5file=f, uri=HDS_CENTRAL_BEAMLETS, diff --git a/CAL/CalibrationCommon/lib/datacontainers/holography_dataset_definitions.py b/CAL/CalibrationCommon/lib/datacontainers/holography_dataset_definitions.py index 5c63557c62140b54767c9eb57ecc8c7a21bd458a..21ed4aa8ca0247e98d0d8717cc895d450e2f4235 100644 --- a/CAL/CalibrationCommon/lib/datacontainers/holography_dataset_definitions.py +++ b/CAL/CalibrationCommon/lib/datacontainers/holography_dataset_definitions.py @@ -20,6 +20,8 @@ HDS_SPECIFIED_REFERENCE_STATION = "Specified_Reference_station" HDS_SPECIFIED_FREQUENCY = "Specified_frequency" HDS_BEAMLETS = "Beamlets" HDS_CENTRAL_BEAMLETS = "Central_beamlets" +HDS_CALIBRATION_TABLES = "Calibration_tables" +HDS_DERIVED_CAL_TABLES = "Derived_calibration_tables" # GROUP "RA DEC" HDS_SPECIFIED_RA_DEC = "Specified_RA_DEC" @@ -35,7 +37,7 @@ HDS_DATA = "Data" HDS_coordinate_type = numpy.dtype([ ('RA', numpy.float64), ('DEC', numpy.float64), - ('EPOCH', 'S10')]) + ('EPOCH', '<S10')]) # One element that is stored in "Data". HDS_data_sample_type = numpy.dtype([ diff --git a/CAL/CalibrationCommon/lib/mshologextract.py b/CAL/CalibrationCommon/lib/mshologextract.py index a442da646f526dfb4564b7a7a757ca1bb4879d6b..8c5406c69a00b9db5bf7699fd23fadbe69408297 100644 --- a/CAL/CalibrationCommon/lib/mshologextract.py +++ b/CAL/CalibrationCommon/lib/mshologextract.py @@ -1,8 +1,7 @@ #!/usr/bin/env python3 import argparse -import logging - +from functools import reduce import signal import time from multiprocessing import Pool as ThreadPool @@ -10,6 +9,9 @@ import os from lofar.calibration.common.utils import * from lofar.calibration.common.datacontainers.holography_dataset import HolographyDataset +from lofar.calibration.common.datacontainers.calibration_table import \ + read_calibration_tables_per_station_mode +import logging DEFAULT_SLEEP_TIME = 1 @@ -22,6 +24,7 @@ def main(): read_holography_datasets_and_store(arguments.input_path, arguments.output_path, holography_bsf_path=arguments.holography_bsf, holography_ms_path=arguments.holography_ms, + calibration_tables_path=arguments.calibration_tables, n_processes=arguments.num_proc) @@ -57,6 +60,9 @@ def specify_command_line_arguments(): ' specification file', default=None) parser.add_argument('--holography_ms', help='override default path for the holography' ' observation MS files', default=None) + parser.add_argument('--calibration_tables', help='override default path for the holography' + ' calibration table files used', default=None) + parser.add_argument('--num_proc', help='number of processes used to convert the holography' ' observation', type=int) parser.add_argument('-v', help='verbose logging', action='store_true') @@ -64,7 +70,10 @@ def specify_command_line_arguments(): return parser -def read_and_store_single_target_station(target_station_name, matched_bsf_ms_pair, store_path): +def read_and_store_single_target_station(target_station_name, + matched_bsf_ms_pair, + caltables, + store_path): """ Reads a single target station hdf5 file from a list of bsf and ms pairs :param target_station_name: target station name to process @@ -73,14 +82,28 @@ def read_and_store_single_target_station(target_station_name, matched_bsf_ms_pai its specific holography observation :type matched_bsf_ms_pair: list[(lofar.calibration.common.datacontainers.HolographySpecification, lofar.calibration.common.datacontainers.HolographyObservation)] + :param caltables: calibration tables used during the observation + :type caltables: Dict[Tuple[str, int], CalibrationTable] :param store_path: path where to store the file :type store_path: str :return: """ holography_dataset = HolographyDataset() logger.info('Loading data for station %s ', target_station_name) + holography_dataset.load_from_beam_specification_and_ms(target_station_name, matched_bsf_ms_pair) + logger.info('Loading calibration tables per station %s', target_station_name) + + station_name = target_station_name.replace('HBA0', '').replace('HBA1', '').replace('HBA', '') + try: + holography_dataset.insert_calibration_table(caltables[(station_name, + holography_dataset.mode)]) + except KeyError: + logger.error('cannot calibration file for station %s and mode %s', station_name, + holography_dataset.mode) + raise SystemExit() + filename = '%s.hdf5' % target_station_name outpath = os.path.join(store_path, filename) @@ -110,18 +133,20 @@ def wait_for_processes_to_complete_execution(thread_pool, :type future_results: list[multiprocess.pool.ApplyResult] """ all_done = False - print( - - 'dude' - ) + n_to_be_processed = len(future_results) try: while not all_done: - all_done = True - for k in future_results: - all_done = all_done and k.ready() - print(k) - print(future_results) + are_results_ready = list(map(lambda x: x.ready(), future_results)) + + result_status = map(lambda x: 1 if x else 0, + are_results_ready) + n_processed_dataset = sum(result_status) + logger.info('processed %s of %s', n_processed_dataset, n_to_be_processed) + + all_done = reduce(lambda x, y: x and y, are_results_ready, True) + time.sleep(sleep_time) + except KeyboardInterrupt: thread_pool.terminate() thread_pool.join() @@ -134,6 +159,7 @@ def read_holography_datasets_and_store(holography_observation_path, store_path, holography_bsf_path=None, holography_ms_path=None, + calibration_tables_path=None, n_processes=1, sleep_time=DEFAULT_SLEEP_TIME): @@ -147,6 +173,8 @@ def read_holography_datasets_and_store(holography_observation_path, :type holography_bsf_path: str :param holography_ms_path: path to the measurement set directory :type holography_ms_path: str + :param calibration_tables_path: path to the calibration tables + :type calibration_tables_path: str :param n_processes: number of precesses to spawn to do the conversion :type n_processes: int """ @@ -155,6 +183,10 @@ def read_holography_datasets_and_store(holography_observation_path, raise NotImplementedError() if holography_bsf_path is not None: raise NotImplementedError() + + if calibration_tables_path is None: + calibration_tables_path = holography_observation_path + target_station_names = list_all_target_stations(holography_observation_path) logger.debug('target station names %s', target_station_names) @@ -162,6 +194,12 @@ def read_holography_datasets_and_store(holography_observation_path, match_holography_beam_specification_file_with_observation(holography_observation_path) logger.debug('matched beam specification files and measurement sets: %s', matched_bsf_ms_pair) + + calibration_tables = read_calibration_tables_per_station_mode(calibration_tables_path) + logger.info('calibration tables found %s', list(calibration_tables.keys())) + if len(calibration_tables) == 0: + logger.error('calibration tables not found in directory %s', calibration_tables_path) + raise SystemExit try: if not os.path.exists(store_path): os.makedirs(store_path) @@ -173,7 +211,8 @@ def read_holography_datasets_and_store(holography_observation_path, results = [] for station_name in target_station_names: result = thread_pool.apply_async(read_and_store_single_target_station, - (station_name, matched_bsf_ms_pair, store_path)) + (station_name, matched_bsf_ms_pair, calibration_tables, + store_path)) results.append(result) @@ -182,4 +221,4 @@ def read_holography_datasets_and_store(holography_observation_path, if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) - main() \ No newline at end of file + main()