From d00e2b3de1ca51dc9b86afe66127d3a9341e7fed Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 15 Oct 2018 13:56:26 +0000 Subject: [PATCH] Task #10079: another manual merge (manual file copy) from https://svn.astron.nl/LOFAR/branches/adder-Task10079 --- .gitattributes | 3 +- QA/QA_Common/bin/create_test_hypercube | 54 ++++++--- QA/QA_Common/lib/cep4_utils.py | 25 ++-- QA/QA_Common/lib/hdf5_io.py | 147 +++++++++++++++++++----- QA/QA_Common/lib/utils.py | 43 ++++--- QA/QA_Common/test/create_test_hypercube | 42 ------- QA/QA_Common/test/t_hdf5_io.py | 28 +++-- QA/QA_Common/test/test_utils.py | 79 ------------- QA/QA_Service/bin/CMakeLists.txt | 2 +- QA/QA_Service/bin/qa_webservice | 59 ++++++++++ QA/QA_Service/bin/qa_webservice.ini | 2 +- QA/QA_Service/lib/qa_service.py | 37 +++--- 12 files changed, 302 insertions(+), 219 deletions(-) delete mode 100755 QA/QA_Common/test/create_test_hypercube delete mode 100644 QA/QA_Common/test/test_utils.py create mode 100755 QA/QA_Service/bin/qa_webservice diff --git a/.gitattributes b/.gitattributes index d6f6bfbf3b1..5fa260b9c2c 100644 --- a/.gitattributes +++ b/.gitattributes @@ -3942,18 +3942,17 @@ QA/QA_Common/lib/geoconversions.py -text QA/QA_Common/lib/hdf5_io.py -text QA/QA_Common/lib/utils.py -text QA/QA_Common/test/CMakeLists.txt -text -QA/QA_Common/test/create_test_hypercube -text QA/QA_Common/test/t_cep4_utils.py -text QA/QA_Common/test/t_cep4_utils.run -text QA/QA_Common/test/t_cep4_utils.sh -text QA/QA_Common/test/t_hdf5_io.py -text QA/QA_Common/test/t_hdf5_io.run -text QA/QA_Common/test/t_hdf5_io.sh -text -QA/QA_Common/test/test_utils.py -text QA/QA_Service/CMakeLists.txt -text QA/QA_Service/bin/CMakeLists.txt -text QA/QA_Service/bin/qa_service -text QA/QA_Service/bin/qa_service.ini -text +QA/QA_Service/bin/qa_webservice -text QA/QA_Service/bin/qa_webservice.ini -text QA/QA_Service/lib/CMakeLists.txt -text QA/QA_Service/lib/QABusListener.py -text diff --git a/QA/QA_Common/bin/create_test_hypercube b/QA/QA_Common/bin/create_test_hypercube index 77c5982a8b1..e8da0881f5d 100755 --- a/QA/QA_Common/bin/create_test_hypercube +++ b/QA/QA_Common/bin/create_test_hypercube @@ -1,7 +1,7 @@ #!/usr/bin/env python import os -from optparse import OptionParser +from optparse import OptionParser, OptionGroup from lofar.qa.utils import * from lofar.qa.hdf5_io import write_hypercube @@ -12,21 +12,34 @@ def main(): # make sure we run in UTC timezone os.environ['TZ'] = 'UTC' - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', - level=logging.INFO) - ## Check the invocation arguments parser = OptionParser(usage='create_test_hypercube [options] <path_to_new_h5_file>', description='creates a test h5 hypercube with random data for the given number of stations, saps, subbands, timestamps.') - parser.add_option('-S', '--stations', dest='stations', type='int', default=3, help='number of stations to create, default: %default') - parser.add_option('-s', '--subbands', dest='subbands', type='int', default=244, help='number of subbands (per sap) to create, default: %default') - parser.add_option('-t', '--timestamps', dest='timestamps', type='int', default=128, help='number of timestamps to create, default: %default') - parser.add_option('--saps', dest='saps', type='int', default=1, help='number of saps to create, default: %default') - parser.add_option('-o', '--otdb_id', dest='otdb_id', type='int', default=None, help='optional (fake/test) otdb id, default: %default') - parser.add_option('--snr', dest='snr', type='float', default=0.9, help='signal to noise ratio. The signal is a test image with a full sweep through all phase and amplitudes from [0..1], and the noise is just random complex numbers, default: %default') - parser.add_option('-a', '--amplitude', dest='max_signal_amplitude', type='float', default=100, help='the max signal amplitude, default: %default') - parser.add_option('-p', '--pol-ratio', dest='parallel_to_cross_polarization_ratio', type='float', default=1, help='the ratio between parallel and cross polarization visibility amplitudes, default: %default') - parser.add_option('-w', '--num_phase_wraps', dest='num_phase_wraps', type='float', default=1, help='the number of times the phase wraps around 2pi along the freq/sb axis, default: %default') + group = OptionGroup(parser, 'Dimensions') + group.add_option('-S', '--stations', dest='stations', type='int', default=3, help='number of stations to create (min=2), default: %default') + group.add_option('-s', '--subbands', dest='subbands', type='int', default=244, help='number of subbands (per sap) to create, default: %default') + group.add_option('-t', '--timestamps', dest='timestamps', type='int', default=128, help='number of timestamps to create, default: %default') + group.add_option('--saps', dest='saps', type='int', default=1, help='number of saps to create, default: %default') + parser.add_option_group(group) + + group = OptionGroup(parser, 'General signal options') + group.add_option('--snr', dest='snr', type='float', default=0.9, help='signal to noise ratio. The signal is a test image with a full sweep through all phase and amplitudes from [0..1], and the noise is just random complex numbers, default: %default') + group.add_option('-a', '--amplitude', dest='max_signal_amplitude', type='float', default=100, help='the max signal amplitude, default: %default') + group.add_option('-p', '--pol-ratio', dest='parallel_to_cross_polarization_ratio', type='float', default=1, help='the amplitude ratio between parallel and cross polarization, default: %default') + parser.add_option_group(group) + + group = OptionGroup(parser, 'Specific signal options') + group.add_option('--pw', '--num_phase_wraps', dest='num_phase_wraps', type='float', default=1, help='the number of times the phase wraps around 2pi along the freq/sb axis, default: %default') + group.add_option('--tsp', '--num_time_sawtooth_periods', dest='num_time_sawtooth_periods', type='float', default=1, help='the number of periods for the sawtooth signal along the time axis, default: %default') + group.add_option('--ssp', '--num_subband_sawtooth_periods', dest='num_subband_sawtooth_periods', type='float', default=0, help='the number of periods for the sawtooth signal along the subband/frequency axis, default: %default') + group.add_option('--tcp', '--num_time_cos_periods', dest='num_time_cos_periods', type='float', default=0, help='the number of periods for the cosine signal along the time axis, default: %default') + group.add_option('--scp', '--num_subband_cos_periods', dest='num_subband_cos_periods', type='float', default=0, help='the number of periods for the cosine signal along the subband/frequency axis, default: %default') + parser.add_option_group(group) + + group = OptionGroup(parser, 'Miscellaneous') + group.add_option('-o', '--otdb_id', dest='otdb_id', type='int', default=None, help='optional (fake/test) otdb id, default: %default') + group.add_option('-V', '--verbose', dest='verbose', action='store_true', help='Verbose logging') + parser.add_option_group(group) (options, args) = parser.parse_args() @@ -36,6 +49,13 @@ def main(): parser.print_help() exit(1) + logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', + level=logging.DEBUG if options.verbose else logging.INFO) + + if options.stations < 2: + print 'setting number of stations to minimum of 2' + options.stations = 2 + cube = create_hypercube(num_stations=options.stations, num_saps=options.saps, num_subbands_per_sap={sap:options.subbands for sap in range(options.saps)}, @@ -43,9 +63,13 @@ def main(): snr=options.snr, max_signal_amplitude = options.max_signal_amplitude, parallel_to_cross_polarization_ratio= options.parallel_to_cross_polarization_ratio, - num_phase_wraps=options.num_phase_wraps) + num_phase_wraps=options.num_phase_wraps, + num_time_sawtooth_periods=options.num_time_sawtooth_periods, + num_subband_sawtooth_periods=options.num_subband_sawtooth_periods, + num_time_cos_periods=options.num_time_cos_periods, + num_subband_cos_periods=options.num_subband_cos_periods) write_hypercube(args[0], cube, sas_id=options.otdb_id) if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/QA/QA_Common/lib/cep4_utils.py b/QA/QA_Common/lib/cep4_utils.py index c0df21a3576..8870f77baae 100644 --- a/QA/QA_Common/lib/cep4_utils.py +++ b/QA/QA_Common/lib/cep4_utils.py @@ -75,7 +75,7 @@ def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True): else: return remote_cmd -def wrap_command_for_docker(cmd, image_name, image_label=''): +def wrap_command_for_docker(cmd, image_name, image_label='', mount_dirs=['/data']): '''wrap the command to be run in a docker container for the lofarsys user and environment :param list cmd: a subprocess cmd list :param string image_name: the name of the docker image to run @@ -87,15 +87,20 @@ def wrap_command_for_docker(cmd, image_name, image_label=''): check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-g'])).strip()) #return the docker run command for the lofarsys user and environment - return ['docker', 'run', '--rm', '--net=host', '-v', '/data:/data', - '-u', id_string, - '-v', '/etc/passwd:/etc/passwd:ro', - '-v', '/etc/group:/etc/group:ro', - '-v', '$HOME:$HOME', - '-e', 'HOME=$HOME', - '-e', 'USER=$USER', - '-w', '$HOME', - '%s:%s' % (image_name, image_label) if image_label else image_name] + cmd + dockerized_cmd = ['docker', 'run', '--rm', '--net=host'] + for d in mount_dirs: + dockerized_cmd += ['-v', '%s:%s' % (d,d)] + + dockerized_cmd += ['-u', id_string, + '-v', '/etc/passwd:/etc/passwd:ro', + '-v', '/etc/group:/etc/group:ro', + '-v', '$HOME:$HOME', + '-e', 'HOME=$HOME', + '-e', 'USER=$USER', + '-w', '$HOME', + '%s:%s' % (image_name, image_label) if image_label else image_name] + dockerized_cmd += cmd + return dockerized_cmd def get_cep4_available_cpu_nodes(): ''' diff --git a/QA/QA_Common/lib/hdf5_io.py b/QA/QA_Common/lib/hdf5_io.py index 6ad89e0c468..61ba99b645f 100644 --- a/QA/QA_Common/lib/hdf5_io.py +++ b/QA/QA_Common/lib/hdf5_io.py @@ -93,9 +93,10 @@ def write_hypercube(path, saps, parset=None, sas_id=None, wsrta_id=None, do_comp os.makedirs(save_dir) with h5py.File(path, "w") as file: - version = '1.3' + version = '1.4' # 1.1 -> 1.2 change is not backwards compatible by design. # 1.2 -> 1.3 change is almost backwards compatible, it just needs a dB/linear correction. see convert_12_to_13 + # 1.3 -> 1.4 storing scale factors per baseline per subband per pol, see convert_13_to_14 ds = file.create_dataset('version', (1,), h5py.special_dtype(vlen=str), version) ds.attrs['description'] = 'version of this hdf5 MS extract file' @@ -194,32 +195,36 @@ def write_hypercube(path, saps, parset=None, sas_id=None, wsrta_id=None, do_comp visibilities_dB = visibility_amplitudes_dB * visibility_phases #compute scale factors per subband, per polarization - scale_factors = np.empty(shape=(len(subbands),len(polarizations)), dtype=np.float32) + scale_factors = np.empty(shape=(len(baselines),len(subbands),len(polarizations)), dtype=np.float32) - for pol_idx, polarization in enumerate(polarizations): - #compute scale factor per subband to map the visibilities_dB per subband from complex64 to 2xint8 - for sb_nr in range(len(subbands)): - #use 99.9 percentile instead if max to get rid of spikes - max_abs_vis_sb = max(1.0, np.percentile(visibility_amplitudes_dB[:,:,sb_nr,pol_idx], 99.9)) - scale_factor = 127.0 / max_abs_vis_sb - scale_factors[sb_nr, pol_idx] = 1.0/scale_factor + # compute scale factor per baseline/subband/pol to map the visibilities_dB from complex64 to 2xint8 + for bl_idx in range(len(baselines)): + for pol_idx in range(len(polarizations)): + for sb_idx in range(len(subbands)): + #use 99.5 percentile instead if max to get rid of spikes + max_abs_vis_sb = max(1.0, np.percentile(visibility_amplitudes_dB[bl_idx,:,sb_idx,pol_idx], 99.5)) + scale_factor = 127.0 / max_abs_vis_sb + scale_factors[bl_idx, sb_idx, pol_idx] = 1.0/scale_factor # store the scale_factors in the file scale_factor_ds = sap_group.create_dataset('visibility_scale_factors', data=scale_factors) - scale_factor_ds.attrs['description'] = 'scale factors per subband per polatization to un-normalize the stored visibilities' - scale_factor_ds.attrs['description'] = 'multiply real and imag parts of the visibilities with this factor per subband per polatization to un-normalize them and get the 10log10 values of the real and imag parts of the visibilities' + scale_factor_ds.attrs['description'] = 'scale factors per baseline per subband per polatization to un-normalize the stored visibilities' + scale_factor_ds.attrs['description'] = 'multiply real and imag parts of the visibilities with this factor per baseline per subband per polatization to un-normalize them and get the 10log10 values of the real and imag parts of the visibilities' scale_factor_ds.attrs['units'] = '-' - #create a array with one extra dimension, so we can split the complex value into two scaled int8's for real and imag part - #looping in python is not the most cpu efficient way - #but is saves us extra copies of the large visibilities array, which might not fit in memory? + # create a array with one extra dimension, so we can split the complex value into two scaled int8's for real and imag part + # looping in python is not the most cpu efficient way + # but is saves us extra copies of the large visibilities array, which might not fit in memory? logger.debug('converting visibilities from complexfloat to 2xint8 for file %s', path) extended_shape = visibilities_dB.shape[:] + (2,) scaled_visibilities = np.empty(extended_shape, dtype=np.int8) - for sb_nr in range(len(subbands)): - scale_factor = 1.0 / scale_factors[sb_nr] - scaled_visibilities[:,:,sb_nr,:,0] = scale_factor*visibilities_dB[:,:,sb_nr,:].real - scaled_visibilities[:,:,sb_nr,:,1] = scale_factor*visibilities_dB[:,:,sb_nr,:].imag + + for bl_idx in range(len(baselines)): + for pol_idx in range(len(polarizations)): + for sb_idx in range(len(subbands)): + scale_factor = 1.0 / scale_factors[bl_idx, sb_idx, pol_idx] + scaled_visibilities[bl_idx,:,sb_idx,pol_idx,0] = scale_factor*visibilities_dB[bl_idx,:,sb_idx,pol_idx].real + scaled_visibilities[bl_idx,:,sb_idx,pol_idx,1] = scale_factor*visibilities_dB[bl_idx,:,sb_idx,pol_idx].imag logger.debug('reduced visibilities size from %s to %s bytes (factor %s)', visibilities.nbytes, scaled_visibilities.nbytes, visibilities.nbytes/scaled_visibilities.nbytes) @@ -266,7 +271,7 @@ def read_sap_numbers(path): with h5py.File(path, "r") as file: version_str = file['version'][0] - if version_str not in ['1.2', '1.3']: + if version_str not in ['1.2', '1.3', '1.4']: raise ValueError('Cannot read version %s' % (version_str,)) return sorted([int(sap_nr) for sap_nr in file['measurement/saps'].keys()]) @@ -291,9 +296,13 @@ def read_hypercube(path, visibilities_in_dB=True, python_datetimes=False, read_v if file['version'][0] == '1.2': convert_12_to_13(path) + with h5py.File(path, "r+") as file: + if file['version'][0] == '1.3': + convert_13_to_14(path) + # reopen file read-only for safety reasons. with h5py.File(path, "r") as file: - if file['version'][0] != '1.3': + if file['version'][0] != '1.4': raise ValueError('Cannot read version %s' % (file['version'][0],)) result = {} @@ -365,11 +374,12 @@ def read_hypercube(path, visibilities_in_dB=True, python_datetimes=False, read_v reduced_shape = normalized_visibilities.shape[:-1] visibilities = np.empty(reduced_shape, dtype=np.complex64) - for pol_idx, polarization in enumerate(polarizations): - pol_scale_factors = scale_factors[:,pol_idx] - for sb_nr, scale_factor in enumerate(pol_scale_factors): - visibilities[:,:,sb_nr,pol_idx].real = scale_factor*normalized_visibilities[:,:,sb_nr,pol_idx,0] - visibilities[:,:,sb_nr,pol_idx].imag = scale_factor*normalized_visibilities[:,:,sb_nr,pol_idx,1] + for bl_idx in range(len(baselines)): + for sb_idx in range(len(subbands)): + for pol_idx in range(len(polarizations)): + scale_factor = scale_factors[bl_idx, sb_idx, pol_idx] + visibilities[bl_idx,:,sb_idx,pol_idx].real = scale_factor*normalized_visibilities[bl_idx,:,sb_idx,pol_idx,0] + visibilities[bl_idx,:,sb_idx,pol_idx].imag = scale_factor*normalized_visibilities[bl_idx,:,sb_idx,pol_idx,1] if not visibilities_in_dB: logger.debug('converting visibilities from dB to raw linear for file sap %s in %s', sap_nr, path) @@ -464,6 +474,39 @@ def convert_12_to_13(h5_path): # and finally update the version number file['version'][0] = '1.3' +def convert_13_to_14(h5_path): + with h5py.File(h5_path, "r+") as file: + version_str = file['version'][0] + + if version_str != '1.3': + raise ValueError('Cannot convert version %s to 1.4' % (version_str,)) + + for sap_nr, sap_group in file['measurement/saps'].items(): + # read the scale_factors and visibilities in a v1.2 way, + # including incorrect reverse log10 to undo the incorrect storage of phases + scale_factors = sap_group['visibility_scale_factors'][:] + baselines = sap_group['baselines'] + subbands = sap_group['subbands'] + polarizations = sap_group['polarizations'] + + # apply v1.3 scale factors to new v1.4 + # in v1.3 scale factors were stored per subband per pol + # in v1.4 scale factors are stored per baseline per subband per pol + scale_factors_new = np.empty(shape=(len(baselines),len(subbands),len(polarizations)), dtype=np.float32) + + for pol_idx in range(len(polarizations)): + for sb_idx in range(len(subbands)): + scale_factors_new[:,sb_idx,pol_idx] = scale_factors[sb_idx,pol_idx] + + # overwrite the scale_factors in the file + del sap_group['visibility_scale_factors'] + scale_factor_ds = sap_group.create_dataset('visibility_scale_factors', data=scale_factors_new) + scale_factor_ds.attrs['description'] = 'scale factors per baseline per subband per polatization to un-normalize the stored visibilities' + scale_factor_ds.attrs['description'] = 'multiply real and imag parts of the visibilities with this factor per baseline per subband per polatization to un-normalize them and get the 10log10 values of the real and imag parts of the visibilities' + scale_factor_ds.attrs['units'] = '-' + + # and finally update the version number + file['version'][0] = '1.4' def add_parset_to_hypercube(h5_path, otdbrpc): """ @@ -555,6 +598,12 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres if file['version'][0] == 1.2: convert_12_to_13(path) + # convert any 1.3 to 1.4 file if needed + for path in existing_paths: + with h5py.File(path, "r") as file: + if file['version'][0] == 1.3: + convert_13_to_14(path) + input_files = [h5py.File(p, "r") for p in existing_paths] versions = set([file['version'][0] for file in input_files]) @@ -564,7 +613,7 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres version_str = list(versions)[0] - if version_str != '1.3': + if version_str != '1.4': raise ValueError('Cannot read version %s' % (version_str,)) sas_ids = set([file['measurement/sas_id'][0] for file in input_files if 'measurement/sas_id' in file]) @@ -585,7 +634,7 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres logger.info('combine_hypercubes: combining %s h5 files into %s', len(input_paths), output_path) with h5py.File(output_path, "w") as output_file: - version = '1.3' + version = '1.4' ds = output_file.create_dataset('version', (1,), h5py.special_dtype(vlen=str), version) ds.attrs['description'] = 'version of this hdf5 MS extract file' @@ -653,7 +702,7 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres elif any(item in key for item in ['baselines', 'polarizations', 'timestamps', 'antenna_locations']): value_dict[key] = data elif 'visibility_scale_factors' in key: - value_dict[key] = data[sb_cntr,:] + value_dict[key] = data[:, sb_cntr,:] else: value_dict[key] = data[sb_cntr] @@ -680,7 +729,8 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres shape.insert(2, num_subbands) shape = tuple(shape) elif 'visibility_scale_factors' in key: - shape = (num_subbands,) + data.shape + # from (#bl,#pol) to (#bl,#sb,#pol) + shape = (data.shape[0], num_subbands, data.shape[1]) else: shape = (num_subbands,) @@ -696,6 +746,8 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres combined_value_dict[key][:,:,sb_cntr,:] = data elif any(item in key for item in ['baselines', 'polarizations', 'timestamps', 'antenna_locations']): combined_value_dict[key] = data + elif 'visibility_scale_factors' in key: + combined_value_dict[key][:,sb_cntr,:] = data else: combined_value_dict[key][sb_cntr] = data @@ -729,6 +781,7 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres except StopIteration: pass #no input file with key, so nothing to copy. + fill_info_folder_from_parset(output_path) except Exception as e: logger.exception('combine_hypercubes: %s', e) finally: @@ -1133,6 +1186,11 @@ def read_info_from_hdf5(h5_path, read_data_info=True, read_parset_info=True): info = '' result = {} + with h5py.File(h5_path, "r+") as file: + if not 'measurement/info' in file: + # try to convert old style file with parsets only into new files with info. + fill_info_folder_from_parset(h5_path) + if read_data_info: result = read_hypercube(h5_path, read_visibilities=False, read_flagging=False) @@ -1154,6 +1212,12 @@ def create_info_string(data, h5_path=None, file_annotations=None, clusters=None, parset = data['parset'] if h5_path: info += 'File : ' + os.path.basename(h5_path) + '\n' + try: + with h5py.File(h5_path, "r") as file: + info += 'File version : ' + file['version'][0] + '\n' + except IOError: + pass + info += 'Project : ' + parset.getString('ObsSW.Observation.Campaign.name') + '\n' info += 'Project description : ' + parset.getString('ObsSW.Observation.Campaign.title') + '\n' info += 'Project PI : ' + parset.getString('ObsSW.Observation.Campaign.PI') + '\n' @@ -1285,3 +1349,28 @@ def read_info_dict(h5_path): return info_dict +def read_SAP_targets(h5_path): + """reads the SAP targets from the parset + :param str h5_path: h5_path to the h5 file + :return: dict of SAP_nr to target name. + """ + + beam_dict = {} + + #TODO: use normal parset lib instead of error prone string parsing + try: + parset_str = read_hypercube_parset(h5_path, as_string=True) + if parset_str: + lines = parset_str.splitlines() + beam_lines = [l for l in lines if 'Observation.Beam[' in l and '.target' in l] + for line in beam_lines: + parts = line.partition('=') + beam_nr = int(parts[0][parts[0].index('[')+1: parts[0].index(']')]) + beam_dict[beam_nr] = parts[2] + + except Exception as e: + logger.error(e) + + return beam_dict + + diff --git a/QA/QA_Common/lib/utils.py b/QA/QA_Common/lib/utils.py index cf88d439cf6..9778ea0125b 100644 --- a/QA/QA_Common/lib/utils.py +++ b/QA/QA_Common/lib/utils.py @@ -21,15 +21,20 @@ from datetime import datetime, timedelta from lofar.qa.geoconversions import * import logging +import math logger = logging.getLogger(__name__) def create_hypercube(num_saps=3, num_stations=5, num_timestamps=11, num_subbands_per_sap=None, snr=0.9, max_signal_amplitude=1e5, parallel_to_cross_polarization_ratio=20.0, - num_phase_wraps=1.0): + num_phase_wraps=1.0, + num_time_sawtooth_periods=1, num_subband_sawtooth_periods=0, + num_time_cos_periods=0, num_subband_cos_periods=0): data = {} assert max_signal_amplitude > 1.0 - logger.info('create_hypercube: num_saps=%s num_stations=%s num_timestamps=%s num_subbands_per_sap=%s snr=%s max_amplitude=%s pol_ratio=%s', - num_saps, num_stations, num_timestamps, num_subbands_per_sap, snr, max_signal_amplitude, parallel_to_cross_polarization_ratio) + logger.info('create_hypercube: num_saps=%s num_stations=%s num_timestamps=%s num_subbands_per_sap=%s snr=%s max_amplitude=%s pol_ratio=%s' \ + 'num_phase_wraps=%s num_time_sawtooth_periods=%s num_subband_sawtooth_periods=%s num_time_cos_periods=%s num_subband_cos_periods=%s', + num_saps, num_stations, num_timestamps, num_subbands_per_sap, snr, max_signal_amplitude, parallel_to_cross_polarization_ratio, + num_phase_wraps, num_time_sawtooth_periods, num_subband_sawtooth_periods, num_time_cos_periods, num_subband_cos_periods) if num_subbands_per_sap is None: num_subbands_per_sap = {} @@ -55,24 +60,36 @@ def create_hypercube(num_saps=3, num_stations=5, num_timestamps=11, num_subbands # create synthetic visibilities signal baseline_visibilities_signal = np.zeros((num_timestamps, num_subbands, len(polarizations)), dtype=np.complex64) - for timestamp_idx in range(num_timestamps): - # timestamp_ratio ranges from-and-including 1.0 to 'small'-but-not-zero - # this prevents the visibility_value from becoming 0 (from which we cannot take the log) - timestamp_ratio = (num_timestamps - timestamp_idx) / float(num_timestamps) if num_timestamps > 1 else 1.0 - for subband_idx in range(num_subbands): - # subband_ratio ranges from 0 to-but-not-including 1.0 - # this ensures the phases start at 0rad, and sweep up to but not including 2PIrad - subband_ratio = subband_idx/float(num_subbands) if num_subbands > 1 else 1.0 + + for subband_idx in range(num_subbands): + # subband_ratio ranges from 0 to-but-not-including 1.0 + # this ensures the phases start at 0rad, and sweep up to but not including 2PIrad + subband_ratio = ((subband_idx+1) / float(num_subbands)) if num_subbands > 1 else 1.0 + sawtooth_subband_amplitude = math.fmod(subband_ratio * num_subband_sawtooth_periods, 1) + if sawtooth_subband_amplitude == 0.0: + sawtooth_subband_amplitude = 1.0 + cos_subband_amplitude = 0.5 * (1.0 + np.cos(num_subband_cos_periods * subband_ratio * 2 * np.pi)) + + for timestamp_idx in range(num_timestamps): + # timestamp_ratio ranges from-and-including 1.0 to 'small'-but-not-zero + # this prevents the visibility_value from becoming 0 (from which we cannot take the log) + timestamp_ratio = ((timestamp_idx+1) / float(num_timestamps)) if num_timestamps > 1 else 1.0 + sawtooth_time_amplitude = math.fmod(timestamp_ratio * num_time_sawtooth_periods, 1) + if sawtooth_time_amplitude == 0.0: + sawtooth_time_amplitude = 1.0 + cos_time_amplitude = 0.5*(1.0+np.cos(num_time_cos_periods*timestamp_ratio * 2 * np.pi)) # create synthetic visibility_value # amplitude varies in time. make sure the smallest amplitude is >= 1.0, # because otherwise we cannot store them with enough bits in dB's - amplitude = max(1.0, max_signal_amplitude * timestamp_ratio) + #amplitude = max(1.0, max_signal_amplitude * (sawtooth_time + sawtooth_subband + cos_subband + cos_time)/4.0) + amplitude = max(1.0, max_signal_amplitude * (sawtooth_time_amplitude * sawtooth_subband_amplitude * + cos_subband_amplitude * cos_time_amplitude)) # phase varies in subband direction phase = np.exp(1j * subband_ratio * 2.0 * np.pi * num_phase_wraps) visibility_value_parallel = amplitude * phase visibility_value_cross = max(1.0, amplitude/parallel_to_cross_polarization_ratio) * phase - baseline_visibilities_signal[timestamp_idx,subband_idx,parallel_pol_idxs] = visibility_value_parallel + baseline_visibilities_signal[timestamp_idx, subband_idx,parallel_pol_idxs] = visibility_value_parallel baseline_visibilities_signal[timestamp_idx, subband_idx, cross_pol_idxs] = visibility_value_cross # use/apply the same visibilities for each baseline diff --git a/QA/QA_Common/test/create_test_hypercube b/QA/QA_Common/test/create_test_hypercube deleted file mode 100755 index 1d368470961..00000000000 --- a/QA/QA_Common/test/create_test_hypercube +++ /dev/null @@ -1,42 +0,0 @@ -#!/usr/bin/env python - -import os -from optparse import OptionParser -from lofar.qa.test.test_utils import * -from lofar.qa.hdf5_io import write_hypercube - -import logging -logger = logging.getLogger(__name__) - -def main(): - # make sure we run in UTC timezone - os.environ['TZ'] = 'UTC' - - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', - level=logging.INFO) - - ## Check the invocation arguments - parser = OptionParser(usage='create_test_hypercube [options] <path_to_new_h5_file>', - description='creates a test h5 hypercube with random data for the given number of stations, saps, subbands, timestamps.') - parser.add_option('-s', '--stations', dest='stations', type='int', default=3, help='number of stations to create, default: %default') - parser.add_option('-S', '--subbands', dest='subbands', type='int', default=244, help='number of subbands (per sap) to create, default: %default') - parser.add_option('-t', '--timestamps', dest='timestamps', type='int', default=128, help='number of timestamps to create, default: %default') - parser.add_option('--saps', dest='saps', type='int', default=1, help='number of saps to create, default: %default') - parser.add_option('-o', '--otdb_id', dest='otdb_id', type='int', default=None, help='optional (fake/test) otdb id, default: %default') - - (options, args) = parser.parse_args() - - if len(args) != 1: - print 'Please provide a file name for the h5 file which you want to create...' - print - parser.print_help() - exit(1) - - cube = create_hypercube(num_stations=options.stations, - num_saps=options.saps, - num_subbands_per_sap={sap:options.subbands for sap in range(options.saps)}, - num_timestamps=options.timestamps) - write_hypercube(args[0], cube, sas_id=options.otdb_id) - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/QA/QA_Common/test/t_hdf5_io.py b/QA/QA_Common/test/t_hdf5_io.py index b104eec0080..182857eb82c 100755 --- a/QA/QA_Common/test/t_hdf5_io.py +++ b/QA/QA_Common/test/t_hdf5_io.py @@ -91,7 +91,7 @@ class TestHdf5_IO(unittest.TestCase): os.remove(path) def test_write_and_read_and_verify_data(self): - '''extensive test to verify to correctnes of all visibility amplitudes and phases + '''extensive test to verify to correctness of all visibility amplitudes and phases after it has been written and read back again, bot in raw and dB.''' logger.info('test_write_and_read_and_verify_data') @@ -192,13 +192,23 @@ class TestHdf5_IO(unittest.TestCase): for l in range(vis_in_raw.shape[3]): self.assertLess(abs_diff_raw[i,j,k,l], amplitude_threshold) self.assertLess(abs_diff_raw_dB[i,j,k,l], amplitude_threshold) - self.assertLess(abs_phase_diff_raw[i,j,k,l], phase_threshold) - self.assertLess(abs_phase_diff_raw_dB[i,j,k,l], phase_threshold) + try: + self.assertLess(abs_phase_diff_raw[i,j,k,l], phase_threshold) + except AssertionError: + # phase is just below 2pi (close to 0) + self.assertLess(2*np.pi-abs_phase_diff_raw[i,j,k,l], phase_threshold) + + try: + self.assertLess(abs_phase_diff_raw_dB[i, j, k, l], phase_threshold) + except AssertionError: + # phase is just below 2pi (close to 0) + self.assertLess(2*np.pi-abs_phase_diff_raw_dB[i, j, k, l], phase_threshold) + finally: logger.info('removing test file: %s', path) os.remove(path) - def test_12_to13_conversion(self): + def test_12_to_13_to_14_conversion(self): path = tempfile.mkstemp()[1] try: @@ -210,13 +220,13 @@ class TestHdf5_IO(unittest.TestCase): parallel_to_cross_polarization_ratio=1.0) # write the hypercube and parset into an h5 file.... - # this currently results in a v1.3 file + # this currently results in a v1.4 file write_hypercube(path, saps_in, sas_id=123456) - # check if version is 1.3 + # check if version is 1.4 with h5py.File(path, "r") as file: version_str = file['version'][0] - self.assertEqual('1.3', version_str) + self.assertEqual('1.4', version_str) # change version back to 1.2 # and modify visibility data to have the 1.2 incorrect phases @@ -264,13 +274,13 @@ class TestHdf5_IO(unittest.TestCase): version_str = file['version'][0] self.assertEqual('1.2', version_str) - # reading the 1.2 file should result in automatic conversion to 1.3 and correction of phases + # reading the 1.2 file should result in automatic conversion via 1.3 to 1.4 and correction of phases result_raw = read_hypercube(path, visibilities_in_dB=False, python_datetimes=True) # check if version is now 1.3 with h5py.File(path, "r") as file: version_str = file['version'][0] - self.assertEqual('1.3', version_str) + self.assertEqual('1.4', version_str) # read in dB as well because we usually plot the visibilities in dB result_dB = read_hypercube(path, visibilities_in_dB=True, python_datetimes=True) diff --git a/QA/QA_Common/test/test_utils.py b/QA/QA_Common/test/test_utils.py deleted file mode 100644 index f91b6f45674..00000000000 --- a/QA/QA_Common/test/test_utils.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy) -# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands -# -# This file is part of the LOFAR software suite. -# The LOFAR software suite is free software: you can redistribute it and/or -# modify it under the terms of the GNU General Public License as published -# by the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# The LOFAR software suite is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. - -import numpy as np -from datetime import datetime, timedelta - -from lofar.common.datetimeutils import to_modified_julian_date_in_seconds -from lofar.qa.geoconversions import * - -def create_hypercube(num_saps=3, num_stations=5, num_timestamps=11, num_subbands_per_sap=None): - data = {} - - if num_subbands_per_sap is None: - num_subbands_per_sap = {} - for sap_nr in range(num_saps): - num_subbands_per_sap[sap_nr] = 13*(sap_nr+1) - - stations = ['CS%03d' % (i + 1) for i in range(num_stations)] - baselines = [] - for idx, station1 in enumerate(stations): - for station2 in stations[idx:]: - baselines.append((station1, station2)) - - num_baselines = len(baselines) - - for sap_nr in range(num_saps): - #generate nice test visibilities - num_subbands = num_subbands_per_sap[sap_nr] - - #generate 'ticks' along the polarization-axes - polarizations = ['xx', 'xy', 'yx', 'yy'] - - visibilities = np.empty((num_baselines, num_timestamps, num_subbands, len(polarizations)), dtype=np.complex64) - visibilities.real = np.random.random(visibilities.shape) - visibilities.imag = np.random.random(visibilities.shape) - - #and some flagging - flagging = np.zeros(visibilities.shape, dtype=np.bool) - - now = datetime.utcnow() - timestamps = [now+timedelta(seconds=i) for i in range(num_timestamps)] - timestamps_mjds = np.array([to_modified_julian_date_in_seconds(t) for t in timestamps]) - - #generate 'ticks' along the central_frequencies-axes - central_frequencies = [1e11+i*1e10 for i in range(num_subbands)] - sb_offset = sum([len(sap['subbands']) for sap in data.values()]) - subbands = ['SB%03d'% i for i in range(sb_offset, sb_offset+num_subbands)] - - antenna_locations = {'XYZ': {}, 'PQR': {}, 'WGS84' : {}} - for station in stations: - xyz_pos = (0,0,0) - antenna_locations['XYZ'][station] = xyz_pos - antenna_locations['PQR'][station] = pqr_cs002_from_xyz(xyz_pos) - antenna_locations['WGS84'][station] = geographic_from_xyz(xyz_pos) - - data[sap_nr] = { 'baselines':baselines, - 'timestamps':timestamps, - 'central_frequencies':central_frequencies, - 'subbands':subbands, - 'polarizations':polarizations, - 'visibilities':visibilities, - 'flagging':flagging, - 'antenna_locations': antenna_locations} - return data - diff --git a/QA/QA_Service/bin/CMakeLists.txt b/QA/QA_Service/bin/CMakeLists.txt index f8cb80f5243..4247c52141d 100644 --- a/QA/QA_Service/bin/CMakeLists.txt +++ b/QA/QA_Service/bin/CMakeLists.txt @@ -17,7 +17,7 @@ # $Id$ -lofar_add_bin_scripts(qa_service) +lofar_add_bin_scripts(qa_service qa_webservice) # supervisord config files install(FILES diff --git a/QA/QA_Service/bin/qa_webservice b/QA/QA_Service/bin/qa_webservice new file mode 100755 index 00000000000..a286f4417fa --- /dev/null +++ b/QA/QA_Service/bin/qa_webservice @@ -0,0 +1,59 @@ +#!/usr/bin/env python + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +from lofar.qa.cep4_utils import * +from subprocess import call +import socket +import logging +import signal + +logger = logging.getLogger(__name__) + +def kill_zombies(): + # kill any lingering webservice instances + cmd = ['pkill', '-9', '-f "python /opt/adder/cluster/webservice.py"'] + cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) + logger.info('killing any lingering qa webservice service on cep4 head node: %s', ' '.join(cmd)) + call(cmd) + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + if 'scu001' not in socket.getfqdn(): + logger.warning("qa_webservice is designed to run only on scu001 (and then start a docker image on head01") + exit(1) + + kill_zombies() + + def signal_handler(_s, _f): + kill_zombies() + exit(1) + + for s in [signal.SIGHUP, signal.SIGINT, signal.SIGTERM]: + signal.signal(s, signal_handler) + + + cmd = ['python', '/opt/adder/cluster/webservice.py'] + cmd = wrap_command_for_docker(cmd, 'adder_clustering', 'latest', ['/data/qa']) + cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) + + logger.info('starting webservice on cep4 head node: %s', ' '.join(cmd)) + + exit(call(cmd)) diff --git a/QA/QA_Service/bin/qa_webservice.ini b/QA/QA_Service/bin/qa_webservice.ini index b8a26b507d0..37bd9b037a2 100644 --- a/QA/QA_Service/bin/qa_webservice.ini +++ b/QA/QA_Service/bin/qa_webservice.ini @@ -1,7 +1,7 @@ ; supervisor ini file to start and run the adder_clustering docker image on head.cep4 with the webservice for the adder inspection plots [program:qa_webservice] -command=ssh -T -q -o StrictHostKeyChecking=no lofarsys@head.cep4.control.lofar docker run --rm --net=host -v /data/qa:/opt/adder/data/qa -u `id -u`:`id -g` -v /etc/passwd:/etc/passwd:ro -v /etc/group:/etc/group:ro -v $HOME:$HOME -e HOME=$HOME -e USER=$USER -w $HOME adder_clustering:latest webandmapvis.sh +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec qa_webservice' user=lofarsys stopsignal=INT ; KeyboardInterrupt stopasgroup=true ; bash does not propagate signals diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index 7db9201fb8c..8cdbd1b8638 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -169,15 +169,13 @@ class QAService(OTDBBusListener): try: #use default cep4 qa output dir. plot_dir_path = os.path.join(self.qa_base_dir, 'inspectionplots') + task_plot_dir_path = '' + all_plots_succeeded = True - for plot_type in ['-1', '-4']: # plot dynspec and delay-rate - cmd = ['plot_hdf5_dynamic_spectra', - '-o %s' % (plot_dir_path,), - '--mixed', # plot amplitude autocors, and complex crosscors - plot_type, - '-n', '0', - '--force', - hdf5_path] + for plot_options in [['-1', '-m'], # hot autocor, complex crosscor, in dB + ['-1', '-mn', '--raw'], # normalized hot autocor, normalized complex crosscor, in raw + ['-4']]: # delay-rate + cmd = ['plot_hdf5_dynamic_spectra', '-o %s' % (plot_dir_path,), '--force'] + plot_options + [hdf5_path] # wrap the command in a cep4 ssh call to docker container cmd = wrap_command_for_docker(cmd, 'adder', 'latest') @@ -186,18 +184,21 @@ class QAService(OTDBBusListener): logger.info('generating plots for otdb_id %s, executing: %s', otdb_id, ' '.join(cmd)) if call(cmd) == 0: - plot_dir_path = os.path.join(plot_dir_path, 'L%s' % otdb_id) - logger.info('generated plots for otdb_id %s in %s', otdb_id, plot_dir_path) - - self._send_event_message('CreatedInspectionPlots', {'otdb_id': otdb_id, - 'hdf5_file_path': hdf5_path, - 'plot_dir_path': plot_dir_path}) + task_plot_dir_path = os.path.join(plot_dir_path, 'L%s' % otdb_id) + logger.info('generated plots for otdb_id %s in %s with command=%s', otdb_id, + task_plot_dir_path, + ' '.join(cmd)) else: - msg = 'could not generate plots for otdb_id %s' % otdb_id + all_plots_succeeded &= False + msg = 'could not generate plots for otdb_id %s cmd=%s' % (otdb_id, ' '.join(cmd)) logger.error(msg) - self._send_event_message('Error', {'otdb_id': otdb_id, 'message': msg}) - return None - return plot_dir_path + self._send_event_message('Error', {'otdb_id': otdb_id, + 'message': msg}) + if all_plots_succeeded: + self._send_event_message('CreatedInspectionPlots', {'otdb_id': otdb_id, + 'hdf5_file_path': hdf5_path, + 'plot_dir_path': task_plot_dir_path}) + return task_plot_dir_path except Exception as e: logging.exception('error in _create_plots_for_h5_file: %s', e) self._send_event_message('Error', {'otdb_id': otdb_id, 'message': e.message}) -- GitLab