diff --git a/QA/QA_Common/bin/create_test_hypercube b/QA/QA_Common/bin/create_test_hypercube index 77c5982a8b108b2aaa1cf727e99164ed32eb46b7..e8da0881f5d73dc29afc0e394bdc49d163a54125 100755 --- a/QA/QA_Common/bin/create_test_hypercube +++ b/QA/QA_Common/bin/create_test_hypercube @@ -1,7 +1,7 @@ #!/usr/bin/env python import os -from optparse import OptionParser +from optparse import OptionParser, OptionGroup from lofar.qa.utils import * from lofar.qa.hdf5_io import write_hypercube @@ -12,21 +12,34 @@ def main(): # make sure we run in UTC timezone os.environ['TZ'] = 'UTC' - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', - level=logging.INFO) - ## Check the invocation arguments parser = OptionParser(usage='create_test_hypercube [options] <path_to_new_h5_file>', description='creates a test h5 hypercube with random data for the given number of stations, saps, subbands, timestamps.') - parser.add_option('-S', '--stations', dest='stations', type='int', default=3, help='number of stations to create, default: %default') - parser.add_option('-s', '--subbands', dest='subbands', type='int', default=244, help='number of subbands (per sap) to create, default: %default') - parser.add_option('-t', '--timestamps', dest='timestamps', type='int', default=128, help='number of timestamps to create, default: %default') - parser.add_option('--saps', dest='saps', type='int', default=1, help='number of saps to create, default: %default') - parser.add_option('-o', '--otdb_id', dest='otdb_id', type='int', default=None, help='optional (fake/test) otdb id, default: %default') - parser.add_option('--snr', dest='snr', type='float', default=0.9, help='signal to noise ratio. The signal is a test image with a full sweep through all phase and amplitudes from [0..1], and the noise is just random complex numbers, default: %default') - parser.add_option('-a', '--amplitude', dest='max_signal_amplitude', type='float', default=100, help='the max signal amplitude, default: %default') - parser.add_option('-p', '--pol-ratio', dest='parallel_to_cross_polarization_ratio', type='float', default=1, help='the ratio between parallel and cross polarization visibility amplitudes, default: %default') - parser.add_option('-w', '--num_phase_wraps', dest='num_phase_wraps', type='float', default=1, help='the number of times the phase wraps around 2pi along the freq/sb axis, default: %default') + group = OptionGroup(parser, 'Dimensions') + group.add_option('-S', '--stations', dest='stations', type='int', default=3, help='number of stations to create (min=2), default: %default') + group.add_option('-s', '--subbands', dest='subbands', type='int', default=244, help='number of subbands (per sap) to create, default: %default') + group.add_option('-t', '--timestamps', dest='timestamps', type='int', default=128, help='number of timestamps to create, default: %default') + group.add_option('--saps', dest='saps', type='int', default=1, help='number of saps to create, default: %default') + parser.add_option_group(group) + + group = OptionGroup(parser, 'General signal options') + group.add_option('--snr', dest='snr', type='float', default=0.9, help='signal to noise ratio. The signal is a test image with a full sweep through all phase and amplitudes from [0..1], and the noise is just random complex numbers, default: %default') + group.add_option('-a', '--amplitude', dest='max_signal_amplitude', type='float', default=100, help='the max signal amplitude, default: %default') + group.add_option('-p', '--pol-ratio', dest='parallel_to_cross_polarization_ratio', type='float', default=1, help='the amplitude ratio between parallel and cross polarization, default: %default') + parser.add_option_group(group) + + group = OptionGroup(parser, 'Specific signal options') + group.add_option('--pw', '--num_phase_wraps', dest='num_phase_wraps', type='float', default=1, help='the number of times the phase wraps around 2pi along the freq/sb axis, default: %default') + group.add_option('--tsp', '--num_time_sawtooth_periods', dest='num_time_sawtooth_periods', type='float', default=1, help='the number of periods for the sawtooth signal along the time axis, default: %default') + group.add_option('--ssp', '--num_subband_sawtooth_periods', dest='num_subband_sawtooth_periods', type='float', default=0, help='the number of periods for the sawtooth signal along the subband/frequency axis, default: %default') + group.add_option('--tcp', '--num_time_cos_periods', dest='num_time_cos_periods', type='float', default=0, help='the number of periods for the cosine signal along the time axis, default: %default') + group.add_option('--scp', '--num_subband_cos_periods', dest='num_subband_cos_periods', type='float', default=0, help='the number of periods for the cosine signal along the subband/frequency axis, default: %default') + parser.add_option_group(group) + + group = OptionGroup(parser, 'Miscellaneous') + group.add_option('-o', '--otdb_id', dest='otdb_id', type='int', default=None, help='optional (fake/test) otdb id, default: %default') + group.add_option('-V', '--verbose', dest='verbose', action='store_true', help='Verbose logging') + parser.add_option_group(group) (options, args) = parser.parse_args() @@ -36,6 +49,13 @@ def main(): parser.print_help() exit(1) + logging.basicConfig(format='%(asctime)s %(name)s %(levelname)s %(message)s', + level=logging.DEBUG if options.verbose else logging.INFO) + + if options.stations < 2: + print 'setting number of stations to minimum of 2' + options.stations = 2 + cube = create_hypercube(num_stations=options.stations, num_saps=options.saps, num_subbands_per_sap={sap:options.subbands for sap in range(options.saps)}, @@ -43,9 +63,13 @@ def main(): snr=options.snr, max_signal_amplitude = options.max_signal_amplitude, parallel_to_cross_polarization_ratio= options.parallel_to_cross_polarization_ratio, - num_phase_wraps=options.num_phase_wraps) + num_phase_wraps=options.num_phase_wraps, + num_time_sawtooth_periods=options.num_time_sawtooth_periods, + num_subband_sawtooth_periods=options.num_subband_sawtooth_periods, + num_time_cos_periods=options.num_time_cos_periods, + num_subband_cos_periods=options.num_subband_cos_periods) write_hypercube(args[0], cube, sas_id=options.otdb_id) if __name__ == '__main__': - main() \ No newline at end of file + main() diff --git a/QA/QA_Common/bin/show_hdf5_info b/QA/QA_Common/bin/show_hdf5_info index bd9daed3ac0bb98b78a14094e3ef74c50f9d9e7b..8974053c36133689eadd859fa5d26d6d91cd4e6f 100755 --- a/QA/QA_Common/bin/show_hdf5_info +++ b/QA/QA_Common/bin/show_hdf5_info @@ -26,7 +26,6 @@ if __name__ == '__main__': from optparse import OptionParser from lofar.qa.hdf5_io import * - from lofar.parameterset import * # make sure we run in UTC timezone os.environ['TZ'] = 'UTC' @@ -35,6 +34,7 @@ if __name__ == '__main__': parser = OptionParser(usage='show_hdf5_info <input_MS_extract_hdf5_file> [options]', description='show the meta data for the given MS_extract hdf5 file.') parser.add_option('-d', '--data', dest='data', action='store_true', default=False, help='show data info (SAPs, #baselines, #subbands, #timeslots etc). (warning, slow!) default: %default') + parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='Verbose logging') (options, args) = parser.parse_args() @@ -43,7 +43,7 @@ if __name__ == '__main__': exit(-1) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', - level=logging.WARN) + level=logging.DEBUG if options.verbose else logging.WARNING) hdf_path = os.path.expanduser(args[0]) diff --git a/QA/QA_Common/lib/cep4_utils.py b/QA/QA_Common/lib/cep4_utils.py index c0df21a357667e057a387f1969d8ece819a7f45b..7588f79524bacec2668059066affc3cd9745f7c5 100644 --- a/QA/QA_Common/lib/cep4_utils.py +++ b/QA/QA_Common/lib/cep4_utils.py @@ -17,6 +17,10 @@ from subprocess import check_output, Popen, PIPE from random import randint +import math +import os +from time import sleep +from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) @@ -75,7 +79,7 @@ def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True): else: return remote_cmd -def wrap_command_for_docker(cmd, image_name, image_label=''): +def wrap_command_for_docker(cmd, image_name, image_label='', mount_dirs=['/data']): '''wrap the command to be run in a docker container for the lofarsys user and environment :param list cmd: a subprocess cmd list :param string image_name: the name of the docker image to run @@ -87,15 +91,20 @@ def wrap_command_for_docker(cmd, image_name, image_label=''): check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-g'])).strip()) #return the docker run command for the lofarsys user and environment - return ['docker', 'run', '--rm', '--net=host', '-v', '/data:/data', - '-u', id_string, - '-v', '/etc/passwd:/etc/passwd:ro', - '-v', '/etc/group:/etc/group:ro', - '-v', '$HOME:$HOME', - '-e', 'HOME=$HOME', - '-e', 'USER=$USER', - '-w', '$HOME', - '%s:%s' % (image_name, image_label) if image_label else image_name] + cmd + dockerized_cmd = ['docker', 'run', '--rm', '--net=host'] + for d in mount_dirs: + dockerized_cmd += ['-v', '%s:%s' % (d,d)] + + dockerized_cmd += ['-u', id_string, + '-v', '/etc/passwd:/etc/passwd:ro', + '-v', '/etc/group:/etc/group:ro', + '-v', '$HOME:$HOME', + '-e', 'HOME=$HOME', + '-e', 'USER=$USER', + '-w', '$HOME', + '%s:%s' % (image_name, image_label) if image_label else image_name] + dockerized_cmd += cmd + return dockerized_cmd def get_cep4_available_cpu_nodes(): ''' @@ -148,10 +157,11 @@ def get_cep4_available_cpu_nodes(): return available_cep4_nodes -def get_cep4_cpu_nodes_loads(node_nrs=None): +def get_cep4_cpu_nodes_loads(node_nrs=None, normalized=False): ''' get the 5min load for each given cep4 cpu node nr :param node_nrs: optional list of node numbers to get the load for. If None, then all available nodes are queried. + :param bool normalized: when True, then normalize the loads with the number of cores. :return: dict with node_nr -> load mapping ''' if node_nrs == None: @@ -178,28 +188,130 @@ def get_cep4_cpu_nodes_loads(node_nrs=None): load = 1e10 loads[node_nr] = load - logger.debug('5min loads for cep4 cpu nodes: %s', loads) + if normalized: + # spawn num-cores commands in parallel grep -c ^processor /proc/cpuinfo + for node_nr in node_nrs: + num_proc_cmd = ['grep', '-c', '^processor', '/proc/cpuinfo'] + node_num_proc_cmd = wrap_command_in_cep4_cpu_node_ssh_call(num_proc_cmd, node_nr, via_head=True) + logger.debug('executing command: %s', ' '.join(node_num_proc_cmd)) + + proc = Popen(node_num_proc_cmd, stdout=PIPE, stderr=PIPE) + procs[node_nr] = proc + + # wait for procs to finish, and try to parse the resulting num_proc value + for node_nr, proc in procs.items(): + out, err = proc.communicate() + try: + num_proc = int(out.strip()) + except: + num_proc = 1 + loads[node_nr] = loads[node_nr]/float(num_proc) + + logger.debug('5min %sloads for cep4 cpu nodes: %s', 'normalized ' if normalized else '', loads) return loads -def get_cep4_available_cpu_nodes_sorted_ascending_by_load(): +def get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=0.33): ''' get the cep4 available cpu node numbers sorted ascending by load (5min). + :param float max_normalized_load: filter available nodes which a at most max_normalized_load :return: sorted list of node numbers. ''' node_nrs = get_cep4_available_cpu_nodes() - loads = get_cep4_cpu_nodes_loads(node_nrs) - sorted_loads = sorted(loads.items(), key=lambda x: x[1]) + loads = get_cep4_cpu_nodes_loads(node_nrs, normalized=True) + filtered_loads = {k:v for k,v in loads.items() if v <= max_normalized_load} + sorted_loads = sorted(filtered_loads.items(), key=lambda x: x[1]) sorted_node_nrs = [item[0] for item in sorted_loads] logger.debug('cep4 cpu nodes sorted (asc) by load: %s', sorted_node_nrs) return sorted_node_nrs -def get_cep4_available_cpu_node_with_lowest_load(): +def get_cep4_available_cpu_node_with_lowest_load(max_normalized_load=0.33): ''' get the cep4 cpu node which is available and has the lowest (5min) load of them all. + :param float max_normalized_load: filter available nodes which a at most max_normalized_load :return: the node number (int) with the lowest load. ''' - node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load() + node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load(max_normalized_load=max_normalized_load) if node_nrs: logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0]) return node_nrs[0] return None + +def parallelize_cmd_over_cep4_cpu_nodes(cmd, parallelizable_option, parallelizable_option_values, timeout=3600): + '''run the given cmd in parallel on multiple available cpu nodes. + :param list cmd: a subprocess cmd list + :param string parallelizable_option: the option which is given to the parallelized cmd for a subset of the parallelizable_option_values + :param list parallelizable_option_values: the list of values which is chunked for the parallelized cmd for the parallelizable_option + :param int timeout: timeout in seconds after which the workers are killed + :return: True if all processes on all cpu nodes exited ok, else False + ''' + available_cep4_nodes = get_cep4_available_cpu_nodes_sorted_ascending_by_load() + + if len(available_cep4_nodes) == 0: + logger.warning('No cep4 cpu nodes available..') + return False + + num_workers = max(1, min(len(available_cep4_nodes), len(parallelizable_option_values))) + num_option_values_per_worker = int(math.ceil(len(parallelizable_option_values) / float(num_workers))) + workers = {} + + logger.info('parallelizing cmd: %s over option %s with values %s', + ' '.join(str(x) for x in cmd), + parallelizable_option, + ' '.join(str(x) for x in parallelizable_option_values)) + + start = datetime.utcnow() + + # start the workers + for i in range(num_workers): + option_values_for_worker = parallelizable_option_values[i * num_option_values_per_worker:(i + 1) * num_option_values_per_worker] + if option_values_for_worker: + option_values_for_worker_csv = ','.join([str(s) for s in option_values_for_worker]) + + worker_cmd = cmd + [parallelizable_option, option_values_for_worker_csv] + + worker_cmd = wrap_command_in_cep4_cpu_node_ssh_call(worker_cmd, available_cep4_nodes[i], via_head=False) + worker_cmd_str = ' '.join([str(x) for x in worker_cmd]) + logger.info('starting worker %d with parallelized cmd: %s', i, worker_cmd_str) + worker = Popen(worker_cmd, bufsize=-1, env=os.environ) + workers[worker_cmd_str] = worker + + logger.info('waiting for all %d workers to finish...', len(workers)) + + failed_worker_cmds = set() + + #wait for all workers to finish + #print worker loglines + while workers: + finished_workers = {worker_cmd_str:worker for worker_cmd_str,worker in workers.items() + if worker.poll() is not None} + + if finished_workers: + for worker_cmd_str, worker in finished_workers.items(): + logger.info('worker finished with exitcode=%d cmd=%s', + worker.returncode, + worker_cmd_str) + del workers[worker_cmd_str] + + logger.info('waiting for %d more workers...', len(workers)) + + if worker.returncode != 0: + failed_worker_cmds.add(worker_cmd_str) + else: + sleep(1.0) + + if datetime.utcnow() - start >= timedelta(seconds=timeout): + logger.warning('timeout while waiting for %d more workers...', len(workers)) + for worker_cmd_str, worker in workers.items(): + logger.warning('killing worker with parallelized cmd: %s', worker_cmd_str) + worker.kill() + failed_worker_cmds.add(worker_cmd_str) + del workers[worker_cmd_str] + + success = len(failed_worker_cmds)==0 + + if success: + logger.info('all parallelized cmds finished successfully') + else: + logger.error('%s/%s parallelized cmds finished with errors', len(failed_worker_cmds), num_workers) + + return success diff --git a/QA/QA_Common/lib/hdf5_io.py b/QA/QA_Common/lib/hdf5_io.py index 6ad89e0c468222912861ad57e2b50f85714b2af8..a3cdf069d7c3bcad31c8de858caa7e5f998e8b25 100644 --- a/QA/QA_Common/lib/hdf5_io.py +++ b/QA/QA_Common/lib/hdf5_io.py @@ -45,16 +45,67 @@ If you would like to do your own clustering, then use write_clusters and read_cl import os.path from datetime import datetime, timedelta -import warnings -with warnings.catch_warnings(): - import h5py - import numpy as np +import os +# prevent annoying h5py future/deprecation warnings +os.environ["TF_CPP_MIN_LOG_LEVEL"]="3" + +import h5py +import errno +import numpy as np +from time import sleep import logging logger = logging.getLogger(__name__) np.set_printoptions(precision=1) +class SharedH5File(): + """ + Wrapper class aroung h5py.File to open an hdf5 file in read, write, or read/write mode safely, + even when the file might be used simultanously by other processes. + It waits for <timeout> seconds until the file becomes available. + + Example usage: + + with SharedH5File("foo.h5", 'r') as file: + file["foo"] = "bar" + + """ + def __init__(self, path, mode='a', timeout=900): + self._path = path + self._mode = mode + self._timeout = timeout + self._file = None + + def open(self): + start_timestamp = datetime.utcnow() + while self._file is None: + try: + self._file = h5py.File(self._path, self._mode) + except IOError as e: + if not os.path.exists(self._path): + raise + + logger.warning("Cannot open file '%s' with mode '%s'. Trying again in 1 sec...", + self._path, self._mode) + sleep(max(0, min(1, self._timeout))) + if datetime.utcnow() - start_timestamp > timedelta(seconds=self._timeout): + logger.error("Cannot open file '%s' with mode '%s', even after trying for %s seconds", + self._path, self._mode, self._timeout) + raise + + return self._file + + def close(self): + self._file.close() + self._file = None + + def __enter__(self): + return self.open() + + def __exit__(self, exc_type, exc_val, exc_tb): + return self.close() + def write_hypercube(path, saps, parset=None, sas_id=None, wsrta_id=None, do_compress=True, **kwargs): """ write a hypercube of visibility/flagging data for all saps of an observation/pipeline. @@ -92,10 +143,11 @@ def write_hypercube(path, saps, parset=None, sas_id=None, wsrta_id=None, do_comp if not os.path.exists(save_dir): os.makedirs(save_dir) - with h5py.File(path, "w") as file: - version = '1.3' + with SharedH5File(path, "w") as file: + version = '1.4' # 1.1 -> 1.2 change is not backwards compatible by design. # 1.2 -> 1.3 change is almost backwards compatible, it just needs a dB/linear correction. see convert_12_to_13 + # 1.3 -> 1.4 storing scale factors per baseline per subband per pol, see convert_13_to_14 ds = file.create_dataset('version', (1,), h5py.special_dtype(vlen=str), version) ds.attrs['description'] = 'version of this hdf5 MS extract file' @@ -194,32 +246,36 @@ def write_hypercube(path, saps, parset=None, sas_id=None, wsrta_id=None, do_comp visibilities_dB = visibility_amplitudes_dB * visibility_phases #compute scale factors per subband, per polarization - scale_factors = np.empty(shape=(len(subbands),len(polarizations)), dtype=np.float32) + scale_factors = np.empty(shape=(len(baselines),len(subbands),len(polarizations)), dtype=np.float32) - for pol_idx, polarization in enumerate(polarizations): - #compute scale factor per subband to map the visibilities_dB per subband from complex64 to 2xint8 - for sb_nr in range(len(subbands)): - #use 99.9 percentile instead if max to get rid of spikes - max_abs_vis_sb = max(1.0, np.percentile(visibility_amplitudes_dB[:,:,sb_nr,pol_idx], 99.9)) - scale_factor = 127.0 / max_abs_vis_sb - scale_factors[sb_nr, pol_idx] = 1.0/scale_factor + # compute scale factor per baseline/subband/pol to map the visibilities_dB from complex64 to 2xint8 + for bl_idx in range(len(baselines)): + for pol_idx in range(len(polarizations)): + for sb_idx in range(len(subbands)): + #use 99.5 percentile instead if max to get rid of spikes + max_abs_vis_sb = max(1.0, np.percentile(visibility_amplitudes_dB[bl_idx,:,sb_idx,pol_idx], 99.5)) + scale_factor = 127.0 / max_abs_vis_sb + scale_factors[bl_idx, sb_idx, pol_idx] = 1.0/scale_factor # store the scale_factors in the file scale_factor_ds = sap_group.create_dataset('visibility_scale_factors', data=scale_factors) - scale_factor_ds.attrs['description'] = 'scale factors per subband per polatization to un-normalize the stored visibilities' - scale_factor_ds.attrs['description'] = 'multiply real and imag parts of the visibilities with this factor per subband per polatization to un-normalize them and get the 10log10 values of the real and imag parts of the visibilities' + scale_factor_ds.attrs['description'] = 'scale factors per baseline per subband per polatization to un-normalize the stored visibilities' + scale_factor_ds.attrs['description'] = 'multiply real and imag parts of the visibilities with this factor per baseline per subband per polatization to un-normalize them and get the 10log10 values of the real and imag parts of the visibilities' scale_factor_ds.attrs['units'] = '-' - #create a array with one extra dimension, so we can split the complex value into two scaled int8's for real and imag part - #looping in python is not the most cpu efficient way - #but is saves us extra copies of the large visibilities array, which might not fit in memory? + # create a array with one extra dimension, so we can split the complex value into two scaled int8's for real and imag part + # looping in python is not the most cpu efficient way + # but is saves us extra copies of the large visibilities array, which might not fit in memory? logger.debug('converting visibilities from complexfloat to 2xint8 for file %s', path) extended_shape = visibilities_dB.shape[:] + (2,) scaled_visibilities = np.empty(extended_shape, dtype=np.int8) - for sb_nr in range(len(subbands)): - scale_factor = 1.0 / scale_factors[sb_nr] - scaled_visibilities[:,:,sb_nr,:,0] = scale_factor*visibilities_dB[:,:,sb_nr,:].real - scaled_visibilities[:,:,sb_nr,:,1] = scale_factor*visibilities_dB[:,:,sb_nr,:].imag + + for bl_idx in range(len(baselines)): + for pol_idx in range(len(polarizations)): + for sb_idx in range(len(subbands)): + scale_factor = 1.0 / scale_factors[bl_idx, sb_idx, pol_idx] + scaled_visibilities[bl_idx,:,sb_idx,pol_idx,0] = scale_factor*visibilities_dB[bl_idx,:,sb_idx,pol_idx].real + scaled_visibilities[bl_idx,:,sb_idx,pol_idx,1] = scale_factor*visibilities_dB[bl_idx,:,sb_idx,pol_idx].imag logger.debug('reduced visibilities size from %s to %s bytes (factor %s)', visibilities.nbytes, scaled_visibilities.nbytes, visibilities.nbytes/scaled_visibilities.nbytes) @@ -263,14 +319,17 @@ def read_sap_numbers(path): """ logger.info('reading sap numbers from from file: %s', path) - with h5py.File(path, "r") as file: + with SharedH5File(path, "r") as file: version_str = file['version'][0] - if version_str not in ['1.2', '1.3']: + if version_str not in ['1.2', '1.3', '1.4']: raise ValueError('Cannot read version %s' % (version_str,)) return sorted([int(sap_nr) for sap_nr in file['measurement/saps'].keys()]) +def read_version(h5_path): + with SharedH5File(h5_path, "r") as file: + return file['version'][0] def read_hypercube(path, visibilities_in_dB=True, python_datetimes=False, read_visibilities=True, read_flagging=True, saps_to_read=None): """ @@ -287,13 +346,15 @@ def read_hypercube(path, visibilities_in_dB=True, python_datetimes=False, read_v """ logger.info('reading hypercube from file: %s', path) - with h5py.File(path, "r+") as file: - if file['version'][0] == '1.2': - convert_12_to_13(path) + if read_version(path) == '1.2': + convert_12_to_13(path) + + if read_version(path) == '1.3': + convert_13_to_14(path) # reopen file read-only for safety reasons. - with h5py.File(path, "r") as file: - if file['version'][0] != '1.3': + with SharedH5File(path, "r") as file: + if file['version'][0] != '1.4': raise ValueError('Cannot read version %s' % (file['version'][0],)) result = {} @@ -365,11 +426,12 @@ def read_hypercube(path, visibilities_in_dB=True, python_datetimes=False, read_v reduced_shape = normalized_visibilities.shape[:-1] visibilities = np.empty(reduced_shape, dtype=np.complex64) - for pol_idx, polarization in enumerate(polarizations): - pol_scale_factors = scale_factors[:,pol_idx] - for sb_nr, scale_factor in enumerate(pol_scale_factors): - visibilities[:,:,sb_nr,pol_idx].real = scale_factor*normalized_visibilities[:,:,sb_nr,pol_idx,0] - visibilities[:,:,sb_nr,pol_idx].imag = scale_factor*normalized_visibilities[:,:,sb_nr,pol_idx,1] + for bl_idx in range(len(baselines)): + for sb_idx in range(len(subbands)): + for pol_idx in range(len(polarizations)): + scale_factor = scale_factors[bl_idx, sb_idx, pol_idx] + visibilities[bl_idx,:,sb_idx,pol_idx].real = scale_factor*normalized_visibilities[bl_idx,:,sb_idx,pol_idx,0] + visibilities[bl_idx,:,sb_idx,pol_idx].imag = scale_factor*normalized_visibilities[bl_idx,:,sb_idx,pol_idx,1] if not visibilities_in_dB: logger.debug('converting visibilities from dB to raw linear for file sap %s in %s', sap_nr, path) @@ -402,12 +464,14 @@ def read_hypercube(path, visibilities_in_dB=True, python_datetimes=False, read_v def convert_12_to_13(h5_path): - with h5py.File(h5_path, "r+") as file: + with SharedH5File(h5_path, "r+") as file: version_str = file['version'][0] if version_str != '1.2': raise ValueError('Cannot convert version %s to 1.3' % (version_str,)) + logger.info("converting %s from version %s to 1.3", h5_path, version_str) + for sap_nr, sap_group in file['measurement/saps'].items(): # read the scale_factors and visibilities in a v1.2 way, # including incorrect reverse log10 to undo the incorrect storage of phases @@ -464,6 +528,45 @@ def convert_12_to_13(h5_path): # and finally update the version number file['version'][0] = '1.3' + logger.info("converted %s from version %s to 1.3", h5_path, version_str) + +def convert_13_to_14(h5_path): + with SharedH5File(h5_path, "r+") as file: + version_str = file['version'][0] + + if version_str != '1.3': + raise ValueError('Cannot convert version %s to 1.4' % (version_str,)) + + logger.info("converting %s from version %s to 1.4", h5_path, version_str) + + for sap_nr, sap_group in file['measurement/saps'].items(): + # read the scale_factors and visibilities in a v1.2 way, + # including incorrect reverse log10 to undo the incorrect storage of phases + scale_factors = sap_group['visibility_scale_factors'][:] + baselines = sap_group['baselines'] + subbands = sap_group['subbands'] + polarizations = sap_group['polarizations'] + + # apply v1.3 scale factors to new v1.4 + # in v1.3 scale factors were stored per subband per pol + # in v1.4 scale factors are stored per baseline per subband per pol + scale_factors_new = np.empty(shape=(len(baselines),len(subbands),len(polarizations)), dtype=np.float32) + + for pol_idx in range(len(polarizations)): + for sb_idx in range(len(subbands)): + scale_factors_new[:,sb_idx,pol_idx] = scale_factors[sb_idx,pol_idx] + + # overwrite the scale_factors in the file + del sap_group['visibility_scale_factors'] + scale_factor_ds = sap_group.create_dataset('visibility_scale_factors', data=scale_factors_new) + scale_factor_ds.attrs['description'] = 'scale factors per baseline per subband per polatization to un-normalize the stored visibilities' + scale_factor_ds.attrs['description'] = 'multiply real and imag parts of the visibilities with this factor per baseline per subband per polatization to un-normalize them and get the 10log10 values of the real and imag parts of the visibilities' + scale_factor_ds.attrs['units'] = '-' + + # and finally update the version number + file['version'][0] = '1.4' + + logger.info("converted %s from version %s to 1.4", h5_path, version_str) def add_parset_to_hypercube(h5_path, otdbrpc): """ @@ -473,10 +576,7 @@ def add_parset_to_hypercube(h5_path, otdbrpc): :param lofar.sas.otdb.otdbrpc.OTDBRPC otdbrpc: an instance of a OTDBPC client """ try: - with h5py.File(h5_path, "r+") as file: - if 'measurement/parset' in file: - return - + with SharedH5File(h5_path, "r+") as file: if 'measurement/sas_id' in file: sas_id = file['measurement/sas_id'][0] @@ -484,6 +584,10 @@ def add_parset_to_hypercube(h5_path, otdbrpc): parset = otdbrpc.taskGetSpecification(otdb_id=sas_id)["specification"] if parset: + if 'measurement/parset' in file: + logger.info('removing previous parset from file') + del file['measurement/parset'] + logger.info('adding parset for sas_id %s to %s hdf5 file', sas_id, os.path.basename(h5_path)) parset_str = '\n'.join(['%s=%s'%(k,parset[k]) for k in sorted(parset.keys())]) ds = file.create_dataset('measurement/parset', (1,), h5py.special_dtype(vlen=str), parset_str, @@ -505,7 +609,7 @@ def read_hypercube_parset(h5_path, as_string=False): :return parameterset/string: the parset (as string or as parameterset) if any, else None """ logger.info('reading parset from %s hdf5 file', os.path.basename(h5_path)) - with h5py.File(h5_path, "r") as file: + with SharedH5File(h5_path, "r") as file: if 'measurement/parset' in file: parset_str = file['measurement/parset'][0] if as_string: @@ -518,7 +622,7 @@ def read_hypercube_parset(h5_path, as_string=False): parset = parameterset.fromString(parset_str) return parset except ImportError as e: - logger.info("could not import parset because the parameterset module cannot be imported: %s", e) + logger.warning("could not import parset: %s", e) def get_observation_id_str(data): if 'sas_id' in data: @@ -551,11 +655,17 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres # convert any 1.2 to 1.3 file if needed for path in existing_paths: - with h5py.File(path, "r") as file: + with SharedH5File(path, "r") as file: if file['version'][0] == 1.2: convert_12_to_13(path) - input_files = [h5py.File(p, "r") for p in existing_paths] + # convert any 1.3 to 1.4 file if needed + for path in existing_paths: + with SharedH5File(path, "r") as file: + if file['version'][0] == 1.3: + convert_13_to_14(path) + + input_files = [SharedH5File(p, "r").open() for p in existing_paths] versions = set([file['version'][0] for file in input_files]) @@ -564,7 +674,7 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres version_str = list(versions)[0] - if version_str != '1.3': + if version_str != '1.4': raise ValueError('Cannot read version %s' % (version_str,)) sas_ids = set([file['measurement/sas_id'][0] for file in input_files if 'measurement/sas_id' in file]) @@ -584,8 +694,8 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres output_path = os.path.join(output_dir, output_filename) logger.info('combine_hypercubes: combining %s h5 files into %s', len(input_paths), output_path) - with h5py.File(output_path, "w") as output_file: - version = '1.3' + with SharedH5File(output_path, "w") as output_file: + version = '1.4' ds = output_file.create_dataset('version', (1,), h5py.special_dtype(vlen=str), version) ds.attrs['description'] = 'version of this hdf5 MS extract file' @@ -653,7 +763,7 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres elif any(item in key for item in ['baselines', 'polarizations', 'timestamps', 'antenna_locations']): value_dict[key] = data elif 'visibility_scale_factors' in key: - value_dict[key] = data[sb_cntr,:] + value_dict[key] = data[:, sb_cntr,:] else: value_dict[key] = data[sb_cntr] @@ -680,7 +790,8 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres shape.insert(2, num_subbands) shape = tuple(shape) elif 'visibility_scale_factors' in key: - shape = (num_subbands,) + data.shape + # from (#bl,#pol) to (#bl,#sb,#pol) + shape = (data.shape[0], num_subbands, data.shape[1]) else: shape = (num_subbands,) @@ -696,6 +807,8 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres combined_value_dict[key][:,:,sb_cntr,:] = data elif any(item in key for item in ['baselines', 'polarizations', 'timestamps', 'antenna_locations']): combined_value_dict[key] = data + elif 'visibility_scale_factors' in key: + combined_value_dict[key][:,sb_cntr,:] = data else: combined_value_dict[key][sb_cntr] = data @@ -729,6 +842,7 @@ def combine_hypercubes(input_paths, output_dir, output_filename=None, do_compres except StopIteration: pass #no input file with key, so nothing to copy. + fill_info_folder_from_parset(output_path) except Exception as e: logger.exception('combine_hypercubes: %s', e) finally: @@ -752,7 +866,7 @@ def _write_common_clustering_groups(h5_path, saps_dict, label=DEFAULT_ALGO_NAME) The always present symlink 'latest' is updated to this clustering result. :return str: the name of the saps_group into which the non-common results can be written. """ - with h5py.File(h5_path, "r+") as file: + with SharedH5File(h5_path, "r+") as file: if 'clustering' in file: clustering_group = file['clustering'] else: @@ -803,7 +917,7 @@ def _delete_clustering_group_if_empty(h5_path, label): :param str label: The name/label of the clustering group, for example 'my_clusterer_run_3'. The always present symlink 'latest' is updated to the next latest clustering group result. """ - with h5py.File(h5_path, "r+") as file: + with SharedH5File(h5_path, "r+") as file: if 'clustering' in file: clustering_group = file['clustering'] @@ -840,7 +954,7 @@ def write_clusters(h5_path, clusters, label=DEFAULT_ALGO_NAME): #add indirection level: cluster method (including run-timestamp) #include parameters and description - with h5py.File(h5_path, "r+") as file: + with SharedH5File(h5_path, "r+") as file: saps_group = file[saps_group_name] for sap_nr, sap_clusters_dict in clusters.items(): sap_group = saps_group[str(sap_nr)] @@ -883,7 +997,7 @@ def read_clusters(h5_path, label='latest'): result_clusters = {} result_annotations = [] - with h5py.File(h5_path, "r") as file: + with SharedH5File(h5_path, "r") as file: if 'clustering' not in file: logger.debug('could not find any clustering results in %s', h5_path) return result_clusters, result_annotations @@ -974,7 +1088,7 @@ def delete_clusters(h5_path, label=DEFAULT_ALGO_NAME): :param str label: the name/label for of the clustering result, for example 'my_clusterer_run_3'. The always present symlink 'latest' is updated to the next latest clustering result. """ - with h5py.File(h5_path, "r+") as file: + with SharedH5File(h5_path, "r+") as file: if 'clustering' in file: for name, group in file['clustering'].items(): if label is None or name==label: @@ -1026,7 +1140,7 @@ def annotate_cluster(h5_path, label, sap_nr, cluster_nr, annotation, user=None): :param str annotation: the annotation for this cluster (can be any free form text) :param str user: an optional user name """ - with h5py.File(h5_path, "r+") as file: + with SharedH5File(h5_path, "r+") as file: if 'clustering' in file: clustering_group = file['clustering'] @@ -1049,7 +1163,7 @@ def delete_cluster_annotation(h5_path, sap_nr, cluster_nr, annotation_nr, label= :param str annotation_nr: the annotation number (index) to delete :param str label: the label of the clustering results group """ - with h5py.File(h5_path, "r+") as file: + with SharedH5File(h5_path, "r+") as file: if 'clustering' in file: clustering_group = file['clustering'] @@ -1074,7 +1188,7 @@ def annotate_clustering_results(h5_path, label, annotation, user=None): :param str annotation: the annotation for this cluster (can be any free form text) :param str user: an optional user name """ - with h5py.File(h5_path, "r+") as file: + with SharedH5File(h5_path, "r+") as file: if 'clustering' in file: clustering_group = file['clustering'] @@ -1090,7 +1204,7 @@ def annotate_file(h5_path, annotation, user=None): :param str annotation: the annotation for this cluster (can be any free form text) :param str user: an optional user name """ - with h5py.File(h5_path, "r+") as file: + with SharedH5File(h5_path, "r+") as file: _add_annotation_to_group(file, annotation, user) @@ -1108,7 +1222,7 @@ def read_file_annotations(h5_path): """ result_annotations = [] - with h5py.File(h5_path, "r") as file: + with SharedH5File(h5_path, "r") as file: if 'annotations' in file: for anno_nr, anno_ds in file['annotations'].items(): annotation = anno_ds[0] @@ -1121,6 +1235,14 @@ def read_file_annotations(h5_path): 'timestamp': datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')}) return result_annotations +def get_stations(h5_path): + with SharedH5File(h5_path, "r+") as file: + stations = set() + for sap_dict in file['measurement/saps'].values(): + baselines = sap_dict['baselines'][:] + for bl in baselines: + stations.add(bl[0]) + return sorted(stations) def read_info_from_hdf5(h5_path, read_data_info=True, read_parset_info=True): """ @@ -1130,9 +1252,15 @@ def read_info_from_hdf5(h5_path, read_data_info=True, read_parset_info=True): :param bool read_parset_info: do/don't read info from the parset (Project, PI, name, start/stop time, etc). :return str: A human readable string with the requested info. """ - info = '' result = {} + with SharedH5File(h5_path, "r") as file: + need_to_fill_info_folder_from_parset = 'measurement/info' not in file + + if need_to_fill_info_folder_from_parset: + # try to convert old style file with parsets only into new files with info. + fill_info_folder_from_parset(h5_path) + if read_data_info: result = read_hypercube(h5_path, read_visibilities=False, read_flagging=False) @@ -1154,6 +1282,12 @@ def create_info_string(data, h5_path=None, file_annotations=None, clusters=None, parset = data['parset'] if h5_path: info += 'File : ' + os.path.basename(h5_path) + '\n' + try: + with SharedH5File(h5_path, "r") as file: + info += 'File version : ' + file['version'][0] + '\n' + except IOError: + pass + info += 'Project : ' + parset.getString('ObsSW.Observation.Campaign.name') + '\n' info += 'Project description : ' + parset.getString('ObsSW.Observation.Campaign.title') + '\n' info += 'Project PI : ' + parset.getString('ObsSW.Observation.Campaign.PI') + '\n' @@ -1213,8 +1347,12 @@ def fill_info_folder_from_parset(h5_path): logger.info('fill_info_folder_from_parset for %s', h5_path) parset = read_hypercube_parset(h5_path) - with h5py.File(h5_path, "r+") as file: - if parset is not None: + if parset is not None: + with SharedH5File(h5_path, "r+") as file: + # remove previous info if present + if 'measurement/info' in file: + del file['measurement/info'] + info_group = file.create_group('measurement/info') info_group.attrs['description'] = 'Meta information about the measurement' @@ -1260,11 +1398,14 @@ def read_info_dict(h5_path): 'start_time': datetime.datetime(2018, 6, 11, 11, 0), 'stop_time': datetime.datetime(2018, 6, 11, 12, 0), 'type': 'my_process_subtype'} ''' - with h5py.File(h5_path, "r+") as file: - if not 'measurement/info' in file: - # try to convert old style file with parsets only into new files with info. - fill_info_folder_from_parset(h5_path) + with SharedH5File(h5_path, "r", timeout=10) as file: + need_to_fill_info_folder_from_parset = 'measurement/info' not in file + + if need_to_fill_info_folder_from_parset: + fill_info_folder_from_parset(h5_path) + + with SharedH5File(h5_path, "r", timeout=10) as file: info_dict = {} if 'measurement/info' in file: for k, v in file['measurement/info'].items(): @@ -1285,3 +1426,28 @@ def read_info_dict(h5_path): return info_dict +def read_SAP_targets(h5_path): + """reads the SAP targets from the parset + :param str h5_path: h5_path to the h5 file + :return: dict of SAP_nr to target name. + """ + + beam_dict = {} + + #TODO: use normal parset lib instead of error prone string parsing + try: + parset_str = read_hypercube_parset(h5_path, as_string=True) + if parset_str: + lines = parset_str.splitlines() + beam_lines = [l for l in lines if 'Observation.Beam[' in l and '.target' in l] + for line in beam_lines: + parts = line.partition('=') + beam_nr = int(parts[0][parts[0].index('[')+1: parts[0].index(']')]) + beam_dict[beam_nr] = parts[2] + + except Exception as e: + logger.error(e) + + return beam_dict + + diff --git a/QA/QA_Common/lib/utils.py b/QA/QA_Common/lib/utils.py index cf88d439cf67b3152ff673dff057d21c3394029b..9778ea0125b97cf9acd8c1a1e970ffbe8766f1b6 100644 --- a/QA/QA_Common/lib/utils.py +++ b/QA/QA_Common/lib/utils.py @@ -21,15 +21,20 @@ from datetime import datetime, timedelta from lofar.qa.geoconversions import * import logging +import math logger = logging.getLogger(__name__) def create_hypercube(num_saps=3, num_stations=5, num_timestamps=11, num_subbands_per_sap=None, snr=0.9, max_signal_amplitude=1e5, parallel_to_cross_polarization_ratio=20.0, - num_phase_wraps=1.0): + num_phase_wraps=1.0, + num_time_sawtooth_periods=1, num_subband_sawtooth_periods=0, + num_time_cos_periods=0, num_subband_cos_periods=0): data = {} assert max_signal_amplitude > 1.0 - logger.info('create_hypercube: num_saps=%s num_stations=%s num_timestamps=%s num_subbands_per_sap=%s snr=%s max_amplitude=%s pol_ratio=%s', - num_saps, num_stations, num_timestamps, num_subbands_per_sap, snr, max_signal_amplitude, parallel_to_cross_polarization_ratio) + logger.info('create_hypercube: num_saps=%s num_stations=%s num_timestamps=%s num_subbands_per_sap=%s snr=%s max_amplitude=%s pol_ratio=%s' \ + 'num_phase_wraps=%s num_time_sawtooth_periods=%s num_subband_sawtooth_periods=%s num_time_cos_periods=%s num_subband_cos_periods=%s', + num_saps, num_stations, num_timestamps, num_subbands_per_sap, snr, max_signal_amplitude, parallel_to_cross_polarization_ratio, + num_phase_wraps, num_time_sawtooth_periods, num_subband_sawtooth_periods, num_time_cos_periods, num_subband_cos_periods) if num_subbands_per_sap is None: num_subbands_per_sap = {} @@ -55,24 +60,36 @@ def create_hypercube(num_saps=3, num_stations=5, num_timestamps=11, num_subbands # create synthetic visibilities signal baseline_visibilities_signal = np.zeros((num_timestamps, num_subbands, len(polarizations)), dtype=np.complex64) - for timestamp_idx in range(num_timestamps): - # timestamp_ratio ranges from-and-including 1.0 to 'small'-but-not-zero - # this prevents the visibility_value from becoming 0 (from which we cannot take the log) - timestamp_ratio = (num_timestamps - timestamp_idx) / float(num_timestamps) if num_timestamps > 1 else 1.0 - for subband_idx in range(num_subbands): - # subband_ratio ranges from 0 to-but-not-including 1.0 - # this ensures the phases start at 0rad, and sweep up to but not including 2PIrad - subband_ratio = subband_idx/float(num_subbands) if num_subbands > 1 else 1.0 + + for subband_idx in range(num_subbands): + # subband_ratio ranges from 0 to-but-not-including 1.0 + # this ensures the phases start at 0rad, and sweep up to but not including 2PIrad + subband_ratio = ((subband_idx+1) / float(num_subbands)) if num_subbands > 1 else 1.0 + sawtooth_subband_amplitude = math.fmod(subband_ratio * num_subband_sawtooth_periods, 1) + if sawtooth_subband_amplitude == 0.0: + sawtooth_subband_amplitude = 1.0 + cos_subband_amplitude = 0.5 * (1.0 + np.cos(num_subband_cos_periods * subband_ratio * 2 * np.pi)) + + for timestamp_idx in range(num_timestamps): + # timestamp_ratio ranges from-and-including 1.0 to 'small'-but-not-zero + # this prevents the visibility_value from becoming 0 (from which we cannot take the log) + timestamp_ratio = ((timestamp_idx+1) / float(num_timestamps)) if num_timestamps > 1 else 1.0 + sawtooth_time_amplitude = math.fmod(timestamp_ratio * num_time_sawtooth_periods, 1) + if sawtooth_time_amplitude == 0.0: + sawtooth_time_amplitude = 1.0 + cos_time_amplitude = 0.5*(1.0+np.cos(num_time_cos_periods*timestamp_ratio * 2 * np.pi)) # create synthetic visibility_value # amplitude varies in time. make sure the smallest amplitude is >= 1.0, # because otherwise we cannot store them with enough bits in dB's - amplitude = max(1.0, max_signal_amplitude * timestamp_ratio) + #amplitude = max(1.0, max_signal_amplitude * (sawtooth_time + sawtooth_subband + cos_subband + cos_time)/4.0) + amplitude = max(1.0, max_signal_amplitude * (sawtooth_time_amplitude * sawtooth_subband_amplitude * + cos_subband_amplitude * cos_time_amplitude)) # phase varies in subband direction phase = np.exp(1j * subband_ratio * 2.0 * np.pi * num_phase_wraps) visibility_value_parallel = amplitude * phase visibility_value_cross = max(1.0, amplitude/parallel_to_cross_polarization_ratio) * phase - baseline_visibilities_signal[timestamp_idx,subband_idx,parallel_pol_idxs] = visibility_value_parallel + baseline_visibilities_signal[timestamp_idx, subband_idx,parallel_pol_idxs] = visibility_value_parallel baseline_visibilities_signal[timestamp_idx, subband_idx, cross_pol_idxs] = visibility_value_cross # use/apply the same visibilities for each baseline diff --git a/QA/QA_Common/test/t_hdf5_io.py b/QA/QA_Common/test/t_hdf5_io.py index b104eec0080285bf0a2c331dd70938b42d143ee8..3d52ff2cd11aae6e37b80e81dbaaa43252913433 100755 --- a/QA/QA_Common/test/t_hdf5_io.py +++ b/QA/QA_Common/test/t_hdf5_io.py @@ -34,8 +34,51 @@ from lofar.qa.utils import * np.set_printoptions(precision=2) logger = logging.getLogger(__name__) +logging.basicConfig(format='%(asctime)s %(levelname)s %(process)s %(message)s', level=logging.INFO) class TestHdf5_IO(unittest.TestCase): + def test_sharedh5file(self): + import h5py + from multiprocessing import Process, Event + path = tempfile.mkstemp()[1] + # make sure there is no test file in the way + os.remove(path) + + try: + # for testing synchronization/concurrency + event = Event() + + # define helper method to open h5 file in other process + def create_file_and_keep_open_for_a_while(): + logger.info("creating h5 file %s", path) + with h5py.File(path, 'a') as file1: + logger.info("created/openend h5 file %s, writing contents", path) + file1['foo'] = 'bar' + logger.info("keeping h5 file %s open for 2 seconds", path) + sleep(2) + logger.info("closed h5 file %s", path) + event.set() + + # test if the event isn't set yet (as we did not start the other process yet...) + self.assertFalse(event.is_set()) + + # start process, which opens and keeps open the h5 file... + proc = Process(target=create_file_and_keep_open_for_a_while) + proc.start() + + # open SharedH5File (it should wait until create_file_and_keep_open_for_a_while is finished) + with SharedH5File(path, 'r+') as file2: + # Now that the SharedH5File is open, this tells us that the other process should be finished and the event is set. + self.assertTrue(event.is_set()) + # was foo=bar really written? + self.assertEqual('bar', file2['foo'].value) + + # proc should already be finished, but let's wait anyway + proc.join() + finally: + logger.info('removing test file: %s', path) + os.remove(path) + def test_write_and_read_again(self): '''tests writing and reading an hdf5 file, and checks all parameters except for the visibility data. See test_write_and_read_and_verify_data for elaborate data verification.''' @@ -91,7 +134,7 @@ class TestHdf5_IO(unittest.TestCase): os.remove(path) def test_write_and_read_and_verify_data(self): - '''extensive test to verify to correctnes of all visibility amplitudes and phases + '''extensive test to verify to correctness of all visibility amplitudes and phases after it has been written and read back again, bot in raw and dB.''' logger.info('test_write_and_read_and_verify_data') @@ -192,13 +235,23 @@ class TestHdf5_IO(unittest.TestCase): for l in range(vis_in_raw.shape[3]): self.assertLess(abs_diff_raw[i,j,k,l], amplitude_threshold) self.assertLess(abs_diff_raw_dB[i,j,k,l], amplitude_threshold) - self.assertLess(abs_phase_diff_raw[i,j,k,l], phase_threshold) - self.assertLess(abs_phase_diff_raw_dB[i,j,k,l], phase_threshold) + try: + self.assertLess(abs_phase_diff_raw[i,j,k,l], phase_threshold) + except AssertionError: + # phase is just below 2pi (close to 0) + self.assertLess(2*np.pi-abs_phase_diff_raw[i,j,k,l], phase_threshold) + + try: + self.assertLess(abs_phase_diff_raw_dB[i, j, k, l], phase_threshold) + except AssertionError: + # phase is just below 2pi (close to 0) + self.assertLess(2*np.pi-abs_phase_diff_raw_dB[i, j, k, l], phase_threshold) + finally: logger.info('removing test file: %s', path) os.remove(path) - def test_12_to13_conversion(self): + def test_12_to_13_to_14_conversion(self): path = tempfile.mkstemp()[1] try: @@ -210,13 +263,13 @@ class TestHdf5_IO(unittest.TestCase): parallel_to_cross_polarization_ratio=1.0) # write the hypercube and parset into an h5 file.... - # this currently results in a v1.3 file + # this currently results in a v1.4 file write_hypercube(path, saps_in, sas_id=123456) - # check if version is 1.3 + # check if version is 1.4 with h5py.File(path, "r") as file: version_str = file['version'][0] - self.assertEqual('1.3', version_str) + self.assertEqual('1.4', version_str) # change version back to 1.2 # and modify visibility data to have the 1.2 incorrect phases @@ -264,13 +317,13 @@ class TestHdf5_IO(unittest.TestCase): version_str = file['version'][0] self.assertEqual('1.2', version_str) - # reading the 1.2 file should result in automatic conversion to 1.3 and correction of phases + # reading the 1.2 file should result in automatic conversion via 1.3 to 1.4 and correction of phases result_raw = read_hypercube(path, visibilities_in_dB=False, python_datetimes=True) # check if version is now 1.3 with h5py.File(path, "r") as file: version_str = file['version'][0] - self.assertEqual('1.3', version_str) + self.assertEqual('1.4', version_str) # read in dB as well because we usually plot the visibilities in dB result_dB = read_hypercube(path, visibilities_in_dB=True, python_datetimes=True) @@ -344,9 +397,9 @@ class TestHdf5_IO(unittest.TestCase): write_hypercube(path, {sap_nr:sap_in}, sas_id=999999) combined_filepath = combine_hypercubes(paths, output_dir='/tmp', output_filename=os.path.basename(tempfile.mkstemp()[1])) + self.assertIsNotNone(combined_filepath) - if combined_filepath: - paths.append(combined_filepath) + paths.append(combined_filepath) result = read_hypercube(combined_filepath, visibilities_in_dB=False, python_datetimes=True) @@ -431,5 +484,4 @@ class TestHdf5_IO(unittest.TestCase): os.remove(path) if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) unittest.main() diff --git a/QA/QA_Service/CMakeLists.txt b/QA/QA_Service/CMakeLists.txt index 8f7fde3cc354bfb9e6c7a4fc4426f7b0d2bd6f1f..686f55135647ed35cfea0e55514686a58d1f895e 100644 --- a/QA/QA_Service/CMakeLists.txt +++ b/QA/QA_Service/CMakeLists.txt @@ -17,8 +17,14 @@ # $Id$ - -lofar_package(QA_Service 1.0 DEPENDS QA_Common PyMessaging OTDB_Services) +IF(BUILD_TESTING) + # add a dependency to InspectionPlots only if we do build testing. + # the t_qa_service test depends on plot_hdf5_dynamic_spectra, + # but the actual qa_service doesn't because all commands are called to cep4 docker images. + lofar_package(QA_Service 1.0 DEPENDS QA_Common PyMessaging OTDB_Services pyparameterset InspectionPlots) +ELSE() + lofar_package(QA_Service 1.0 DEPENDS QA_Common PyMessaging OTDB_Services pyparameterset) +ENDIF(BUILD_TESTING) add_subdirectory(lib) add_subdirectory(bin) diff --git a/QA/QA_Service/bin/CMakeLists.txt b/QA/QA_Service/bin/CMakeLists.txt index f8cb80f52437e68456840952f4467a3d3cb6b330..4247c52141d70ed67b0392115ac4123a8fa649e0 100644 --- a/QA/QA_Service/bin/CMakeLists.txt +++ b/QA/QA_Service/bin/CMakeLists.txt @@ -17,7 +17,7 @@ # $Id$ -lofar_add_bin_scripts(qa_service) +lofar_add_bin_scripts(qa_service qa_webservice) # supervisord config files install(FILES diff --git a/QA/QA_Service/bin/qa_webservice.ini b/QA/QA_Service/bin/qa_webservice.ini index b8a26b507d06b250f2dfb32c5c3341df33e92324..37bd9b037a2c4558aea2e4b442d000c46a7a3d5f 100644 --- a/QA/QA_Service/bin/qa_webservice.ini +++ b/QA/QA_Service/bin/qa_webservice.ini @@ -1,7 +1,7 @@ ; supervisor ini file to start and run the adder_clustering docker image on head.cep4 with the webservice for the adder inspection plots [program:qa_webservice] -command=ssh -T -q -o StrictHostKeyChecking=no lofarsys@head.cep4.control.lofar docker run --rm --net=host -v /data/qa:/opt/adder/data/qa -u `id -u`:`id -g` -v /etc/passwd:/etc/passwd:ro -v /etc/group:/etc/group:ro -v $HOME:$HOME -e HOME=$HOME -e USER=$USER -w $HOME adder_clustering:latest webandmapvis.sh +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec qa_webservice' user=lofarsys stopsignal=INT ; KeyboardInterrupt stopasgroup=true ; bash does not propagate signals diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index 7db9201fb8c08cf5014b57904205115ac0db5eff..526db5eba321d31ad809b678590cad49dfb24011 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -21,6 +21,7 @@ import os.path import logging from subprocess import call, Popen, PIPE, STDOUT from optparse import OptionParser, OptionGroup +from threading import Thread from lofar.common.util import waitForInterrupt from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.qa.service.config import DEFAULT_QA_OTDB_NOTIFICATION_BUSNAME @@ -65,6 +66,7 @@ class QAService(OTDBBusListener): self._qa_notification_subject_prefix = qa_notification_subject_prefix self._send_bus = ToBus(qa_notification_busname, broker=broker) self.qa_base_dir = qa_base_dir + self._unfinished_otdb_id_map = {} def start_listening(self, numthreads=None): ''' @@ -88,8 +90,38 @@ class QAService(OTDBBusListener): :return: None ''' logger.info("task with otdb_id %s completed.", otdb_id) + + # immediately do qa when the obs is completing, because the data is already on disk... + # and do the handling of the feedback in onObservationFinished self.do_qa(otdb_id=otdb_id) + def onObservationFinished(self, otdb_id, modificationTime): + ''' + this mehod is called automatically upon receiving a Finished NotificationMessage + :param int otdb_id: the task's otdb database id + :param datetime modificationTime: timestamp when the task's status changed to finished + :return: None + ''' + logger.info("task with otdb_id %s finished.", otdb_id) + + # lookup the hdf5_file_path for the given otdb_id + # and (re)add the parset to the file (which now includes feedback) + hdf5_file_path = self._unfinished_otdb_id_map.get(otdb_id) + if hdf5_file_path: + del self._unfinished_otdb_id_map[otdb_id] + + try: + # import here and not at top of file + # because on some systems h5py is not available + # and then only this method fails, which is less bad than the whole service failing. + from lofar.qa.hdf5_io import add_parset_to_hypercube + from lofar.sas.otdb.otdbrpc import OTDBRPC + + with OTDBRPC(broker=self._send_bus.broker) as otdbrpc: + add_parset_to_hypercube(hdf5_file_path, otdbrpc) + except Exception as e: + logger.warning("Cannot add parset with feedback for otdb=%s. error: %s", otdb_id, e) + def do_qa(self, otdb_id): ''' try to do all qa (quality assurance) steps for the given otdb_id @@ -99,14 +131,18 @@ class QAService(OTDBBusListener): ''' hdf5_file_path = self._convert_ms2hdf5(otdb_id) if hdf5_file_path: + # keep a note of where the h5 file was stored for this unfinished otdb_id + self._unfinished_otdb_id_map[otdb_id] = hdf5_file_path + + # cluster it self._cluster_h5_file(hdf5_file_path, otdb_id) plot_dir_path = self._create_plots_for_h5_file(hdf5_file_path, otdb_id) - if plot_dir_path: - self._send_event_message('Finished', {'otdb_id': otdb_id, - 'hdf5_file_path': hdf5_file_path, - 'plot_dir_path': plot_dir_path}) + # and notify that we're finished + self._send_event_message('Finished', {'otdb_id': otdb_id, + 'hdf5_file_path': hdf5_file_path, + 'plot_dir_path': plot_dir_path or ''}) def _send_event_message(self, subject_suffix, content): try: @@ -169,35 +205,36 @@ class QAService(OTDBBusListener): try: #use default cep4 qa output dir. plot_dir_path = os.path.join(self.qa_base_dir, 'inspectionplots') + task_plot_dir_path = '' + all_plots_succeeded = True - for plot_type in ['-1', '-4']: # plot dynspec and delay-rate - cmd = ['plot_hdf5_dynamic_spectra', - '-o %s' % (plot_dir_path,), - '--mixed', # plot amplitude autocors, and complex crosscors - plot_type, - '-n', '0', - '--force', - hdf5_path] + for plot_options in [['-1', '-m'], # hot autocor, complex crosscor, in dB + ['-1', '-mn', '--raw'], # normalized hot autocor, normalized complex crosscor, in raw + ['-4']]: # delay-rate + cmd = ['plot_hdf5_dynamic_spectra', '-o %s' % (plot_dir_path,), '--optimize', '--force', '--cep4'] + plot_options + [hdf5_path] # wrap the command in a cep4 ssh call to docker container cmd = wrap_command_for_docker(cmd, 'adder', 'latest') - cmd = wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd) + cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) logger.info('generating plots for otdb_id %s, executing: %s', otdb_id, ' '.join(cmd)) if call(cmd) == 0: - plot_dir_path = os.path.join(plot_dir_path, 'L%s' % otdb_id) - logger.info('generated plots for otdb_id %s in %s', otdb_id, plot_dir_path) - - self._send_event_message('CreatedInspectionPlots', {'otdb_id': otdb_id, - 'hdf5_file_path': hdf5_path, - 'plot_dir_path': plot_dir_path}) + task_plot_dir_path = os.path.join(plot_dir_path, 'L%s' % otdb_id) + logger.info('generated plots for otdb_id %s in %s with command=%s', otdb_id, + task_plot_dir_path, + ' '.join(cmd)) else: - msg = 'could not generate plots for otdb_id %s' % otdb_id + all_plots_succeeded &= False + msg = 'could not generate plots for otdb_id %s cmd=%s' % (otdb_id, ' '.join(cmd)) logger.error(msg) - self._send_event_message('Error', {'otdb_id': otdb_id, 'message': msg}) - return None - return plot_dir_path + self._send_event_message('Error', {'otdb_id': otdb_id, + 'message': msg}) + + self._send_event_message('CreatedInspectionPlots', {'otdb_id': otdb_id, + 'hdf5_file_path': hdf5_path, + 'plot_dir_path': task_plot_dir_path}) + return task_plot_dir_path except Exception as e: logging.exception('error in _create_plots_for_h5_file: %s', e) self._send_event_message('Error', {'otdb_id': otdb_id, 'message': e.message}) @@ -238,7 +275,7 @@ class QAService(OTDBBusListener): except Exception as e: logging.exception('error in _cluster_h5_file: %s', e) self._send_event_message('Error', {'otdb_id': otdb_id, 'message': e.message}) - return None + def main():