From 0a4d9b0898b782874d3ca61e8e64ed66ca434f78 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 26 Jun 2018 07:04:51 +0000 Subject: [PATCH] SW-378: processed review comment: replaced triple single quotes by triple double quotes --- QA/QA_Common/lib/cep4_utils.py | 40 +++++++++++++++--------------- QA/QA_Common/lib/geoconversions.py | 12 ++++----- QA/QA_Common/lib/hdf5_io.py | 8 +++--- QA/QA_Common/test/t_cep4_utils.py | 16 ++++++------ QA/QA_Service/lib/qa_service.py | 36 +++++++++++++-------------- QA/QA_Service/test/t_qa_service.py | 36 +++++++++++++-------------- 6 files changed, 75 insertions(+), 73 deletions(-) diff --git a/QA/QA_Common/lib/cep4_utils.py b/QA/QA_Common/lib/cep4_utils.py index b999ba0dcad..c6ccec46213 100644 --- a/QA/QA_Common/lib/cep4_utils.py +++ b/QA/QA_Common/lib/cep4_utils.py @@ -25,52 +25,52 @@ CEP4_HEAD_NODE = 'head.cep4.control.lofar' LOFARSYS_AT_CEP4_HEAD_NODE = 'lofarsys@%s' % (CEP4_HEAD_NODE,) def ssh_cmd_list(host): - ''' + """ returns a subprocess compliant command list to do an ssh call to the given node uses ssh option -tt to force remote pseudo terminal uses ssh option -q for ssh quiet mode (no ssh warnings/errors) uses ssh option -o StrictHostKeyChecking=no to prevent prompts about host keys :param host: the node name or ip address :return: a subprocess compliant command list - ''' + """ return ['ssh', '-T', '-q', '-o StrictHostKeyChecking=no', host] def wrap_command_in_cep4_head_node_ssh_call(cmd): - '''wrap the command in an ssh call to head.cep4 + """wrap the command in an ssh call to head.cep4 :param list cmd: a subprocess cmd list cpu node. Otherwise, the command is executed on the head node. :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls - ''' + """ return ssh_cmd_list(LOFARSYS_AT_CEP4_HEAD_NODE) + cmd def wrap_command_in_cep4_random_cpu_node_ssh_call(cmd, via_head=True): - '''wrap the command in an ssh call an available random cep4 cpu node (via head.cep4) + """wrap the command in an ssh call an available random cep4 cpu node (via head.cep4) :param list cmd: a subprocess cmd list :param bool via_head: when True, route the cmd first via the cep4 head node :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls - ''' + """ # pick a random available cpu node node_nrs = get_cep4_available_cpu_nodes() node_nr = node_nrs[randint(0, len(node_nrs)-1)] return wrap_command_in_cep4_cpu_node_ssh_call(cmd, node_nr, via_head=via_head) def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_head=True): - '''wrap the command in an ssh call to the available random cep4 cpu node with the lowest load (via head.cep4) + """wrap the command in an ssh call to the available random cep4 cpu node with the lowest load (via head.cep4) :param list cmd: a subprocess cmd list :param bool via_head: when True, route the cmd first via the cep4 head node :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls - ''' + """ lowest_load_node_nr = get_cep4_available_cpu_node_with_lowest_load() return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head) def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True): - '''wrap the command in an ssh call the given cep4 cpu node (via head.cep4) + """wrap the command in an ssh call the given cep4 cpu node (via head.cep4) :param list cmd: a subprocess cmd list :param int cpu_node_nr: the number of the cpu node where to execute the command :param bool via_head: when True, route the cmd first via the cep4 head node :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls - ''' + """ # hard-coded cpu-node hostname. Might change for future clusters or cluster upgrades. lofarsys_at_cpu_node = 'lofarsys@cpu%02d.cep4' % (cpu_node_nr,) remote_cmd = ssh_cmd_list(lofarsys_at_cpu_node) + cmd @@ -80,12 +80,12 @@ def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True): return remote_cmd def wrap_command_for_docker(cmd, image_name, image_label=''): - '''wrap the command to be run in a docker container for the lofarsys user and environment + """wrap the command to be run in a docker container for the lofarsys user and environment :param list cmd: a subprocess cmd list :param string image_name: the name of the docker image to run :param string image_label: the optional label of the docker image to run :return: the same subprocess cmd list, but then wrapped with docker calls - ''' + """ #fetch the lofarsys user id and group id first from the cep4 head node id_string = '%s:%s' % (check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-u'])).strip(), check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-g'])).strip()) @@ -102,10 +102,10 @@ def wrap_command_for_docker(cmd, image_name, image_label=''): '%s:%s' % (image_name, image_label) if image_label else image_name] + cmd def get_cep4_available_cpu_nodes(): - ''' + """ get a list of cep4 cpu nodes which are currently up and running according to slurm :return: a list of cpu node numbers (ints) for the up and running cpu nodes - ''' + """ available_cep4_nodes = [] try: @@ -153,11 +153,11 @@ def get_cep4_available_cpu_nodes(): return available_cep4_nodes def get_cep4_cpu_nodes_loads(node_nrs=None): - ''' + """ get the 5min load for each given cep4 cpu node nr :param node_nrs: optional list of node numbers to get the load for. If None, then all available nodes are queried. :return: dict with node_nr -> load mapping - ''' + """ if node_nrs == None: node_nrs = get_cep4_available_cpu_nodes() @@ -186,10 +186,10 @@ def get_cep4_cpu_nodes_loads(node_nrs=None): return loads def get_cep4_available_cpu_nodes_sorted_ascending_by_load(): - ''' + """ get the cep4 available cpu node numbers sorted ascending by load (5min). :return: sorted list of node numbers. - ''' + """ node_nrs = get_cep4_available_cpu_nodes() loads = get_cep4_cpu_nodes_loads(node_nrs) sorted_loads = sorted(loads.items(), key=lambda x: x[1]) @@ -198,10 +198,10 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load(): return sorted_node_nrs def get_cep4_available_cpu_node_with_lowest_load(): - ''' + """ get the cep4 cpu node which is available and has the lowest (5min) load of them all. :return: the node number (int) with the lowest load. - ''' + """ node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load() if node_nrs: logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0]) diff --git a/QA/QA_Common/lib/geoconversions.py b/QA/QA_Common/lib/geoconversions.py index 9e3d23cfc52..7c93b4a08d9 100644 --- a/QA/QA_Common/lib/geoconversions.py +++ b/QA/QA_Common/lib/geoconversions.py @@ -29,11 +29,11 @@ def normalized_earth_radius(latitude_rad): def geographic_from_xyz(xyz_m): - ''' + """ convert xyz coordinates to wgs84 coordinates :param xyz_m: 1D array/list/tuple of x,y,z in meters :return: tuple of lat_rad, lon_rad, height_m - ''' + """ wgs84_a = 6378137.0 wgs84_f = 1./298.257223563 wgs84_e2 = wgs84_f*(2.0 - wgs84_f) @@ -99,9 +99,9 @@ def pqr_from_xyz(xyz_m, xyz0_m=LOFAR_XYZ0_m, matrix=LOFAR_PQR_TO_ETRS_MATRIX): return transform(xyz_m, xyz0_m, matrix.T) def interpolation_function(pqr): - ''' + """ Return an interpolation function fn(x, y, z), which returns the value at x, y. - ''' + """ rbfi = Rbf(pqr[:,0], pqr[:,1], 0.0*pqr[:,2], pqr[:,2], function='linear') def interpolator(x_m, y_m): return rbfi(x_m, y_m, y_m*0.0) @@ -124,11 +124,11 @@ def fit_plane(xyz): def pqr_cs002_from_xyz(xyz_m): - ''' + """ convert xyz coordinates to lofar pqr coordinates with origin in CS002 :param xyz_m: 1D array/list/tuple of x,y,z in meters :return: tuple of pqr coords in meters - ''' + """ pqr = pqr_from_xyz(array([xyz_m]), xyz0_m=array([ 3826577.462, 461022.624, 5064892.526])) return pqr[0][0], pqr[0][1], pqr[0][2] diff --git a/QA/QA_Common/lib/hdf5_io.py b/QA/QA_Common/lib/hdf5_io.py index b079de2f3ab..c405077ac78 100644 --- a/QA/QA_Common/lib/hdf5_io.py +++ b/QA/QA_Common/lib/hdf5_io.py @@ -15,6 +15,8 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +# TODO: refactor large functions into collections of smaller function calls and isolate behaviour. + """Module hdf5_io offers various methods to read/write/modify hdf5 files containing lofar measurement data. Such an h5 file is usually generated from Lofar Measurement Sets (MS/casacore format) using the ms2hdf5 conversion tool. @@ -163,7 +165,7 @@ def write_hypercube(path, saps, parset=None, sas_id=None, wsrta_id=None, do_comp for antenna, location in antenna_locations[ref_frame].items(): location_sub_group.create_dataset(antenna, data=location) - logger.info('''flagging NaN's and zero's in visibilities for file %s''', path) + logger.info("""flagging NaN's and zero's in visibilities for file %s""", path) zero_or_nan = np.absolute(visibilities) == 0.0 zero_or_nan[np.isnan(visibilities)] = True flagging[zero_or_nan] = True @@ -1118,7 +1120,7 @@ def fill_info_folder_from_parset(h5_path): logger.error('Error whle running fill_info_folder_from_parset: %s', e) def read_info_dict(h5_path): - ''' read the info about the observation/pipeline from the h5 file given by h5_path. + """ read the info about the observation/pipeline from the h5 file given by h5_path. :param str h5_path: h5_path to the h5 file :return: a dict with the info about the observation/pipeline in native python types, like: {'PI': 'my_PI', @@ -1130,7 +1132,7 @@ def read_info_dict(h5_path): 'antenna_array': 'LBA', 'start_time': datetime.datetime(2018, 6, 11, 11, 0), 'stop_time': datetime.datetime(2018, 6, 11, 12, 0), - 'type': 'my_process_subtype'} ''' + 'type': 'my_process_subtype'} """ with h5py.File(h5_path, "r+") as file: if not 'measurement/info' in file: # try to convert old style file with parsets only into new files with info. diff --git a/QA/QA_Common/test/t_cep4_utils.py b/QA/QA_Common/test/t_cep4_utils.py index 229245ab98c..1a0b970a515 100755 --- a/QA/QA_Common/test/t_cep4_utils.py +++ b/QA/QA_Common/test/t_cep4_utils.py @@ -37,31 +37,31 @@ class TestCep4Utils(unittest.TestCase): self.assertTrue(len(node_nrs) > 0) def test_03_wrap_command_in_cep4_random_cpu_node_ssh_call(self): - ''' + """ this test calls and tests the functionality of the following methods via wrap_command_in_cep4_random_cpu_node_ssh_call: get_cep4_available_cpu_nodes, wrap_command_in_cep4_cpu_node_ssh_call - ''' + """ cmd = wrap_command_in_cep4_random_cpu_node_ssh_call(['true'], via_head=True) logger.info('executing command: %s', ' '.join(cmd)) self.assertEqual(0, call(cmd)) def test_04_wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(self): - ''' + """ this test calls and tests the functionality of the following methods via wrap_command_in_cep4_random_cpu_node_ssh_call: get_cep4_available_cpu_nodes, get_cep4_cpu_nodes_loads, get_cep4_available_cpu_nodes_sorted_ascending_by_load, wrap_command_in_cep4_cpu_node_ssh_call - ''' + """ cmd = wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(['true'], via_head=True) logger.info('executing command: %s', ' '.join(cmd)) self.assertEqual(0, call(cmd)) def test_05_wrap_command_for_docker_in_cep4_head_node_ssh_call(self): - ''' + """ this test calls and tests the functionality of wrap_command_for_docker and wrap_command_in_cep4_head_node_ssh_call. It is assumed that a docker image is available on head.cep4. - ''' + """ #wrap the command in a docker call first, and then in an ssh call cmd = wrap_command_for_docker(['true'], 'adder', 'latest') cmd = wrap_command_in_cep4_head_node_ssh_call(cmd) @@ -69,10 +69,10 @@ class TestCep4Utils(unittest.TestCase): self.assertEqual(0, call(cmd)) def test_06_get_slurm_info_from_within_docker_via_cep4_head(self): - ''' + """ test to see if we can execute a command via ssh on the head node, from within a docker container, started via ssh on the head node (yes, that's multiple levels of indirection) - ''' + """ # use the slurm sinfo command (because it's available on the head nodes only)... cmd = ['sinfo'] # ...called on cep4 headnode... diff --git a/QA/QA_Service/lib/qa_service.py b/QA/QA_Service/lib/qa_service.py index 6659647af0e..c8fcbba8603 100644 --- a/QA/QA_Service/lib/qa_service.py +++ b/QA/QA_Service/lib/qa_service.py @@ -34,11 +34,11 @@ logger = logging.getLogger(__name__) #TODO: idea: convert periodically while observing? class QAService(OTDBBusListener): - ''' + """ QAService listens on the lofar otdb message bus for NotificationMessages and starts qa processes upon observation/pipeline completion. The qa processes convert MS (measurement sets) to hdf5 qa files, and then starts generating plots from the hdf5 file. - ''' + """ def __init__(self, qa_notification_busname=DEFAULT_QA_NOTIFICATION_BUSNAME, qa_notification_subject_prefix=DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX, @@ -66,36 +66,36 @@ class QAService(OTDBBusListener): self.qa_base_dir = qa_base_dir def start_listening(self, numthreads=None): - ''' + """ start listening and open event _send_bus. This method is called in __enter__ when using 'with' context. - ''' + """ super(QAService, self).start_listening(numthreads=numthreads) self._send_bus.open() def stop_listening(self): - ''' + """ stop listening and close event _send_bus. This method is called in __exit__ when using 'with' context. - ''' + """ super(QAService, self).stop_listening() self._send_bus.close() def onObservationCompleting(self, otdb_id, modificationTime): - ''' + """ this mehod is called automatically upon receiving a Completion NotificationMessage :param int otdb_id: the task's otdb database id :param datetime modificationTime: timestamp when the task's status changed to completing :return: None - ''' + """ logger.info("task with otdb_id %s completed.", otdb_id) self.do_qa(otdb_id=otdb_id) def do_qa(self, otdb_id): - ''' + """ try to do all qa (quality assurance) steps for the given otdb_id resulting in an h5 MS-extract file and inspection plots :param int otdb_id: observation/pipeline otdb id for which the conversion needs to be done. :return: None - ''' + """ hdf5_file_path = self._convert_ms2hdf5(otdb_id) if hdf5_file_path: self._cluster_h5_file(hdf5_file_path, otdb_id) @@ -117,14 +117,14 @@ class QAService(OTDBBusListener): logger.error('Could not send event message: %s', e) def _convert_ms2hdf5(self, otdb_id): - ''' + """ convert the MS for the given otdb_id to an h5 MS-extract file. The conversion will run via ssh on cep4 with massive parellelization. When running on cep4, it is assumed that a docker image called adder exists on head.cep4 When running locally, it is assumed that ms2hdf5 is installed locally. :param int otdb_id: observation/pipeline otdb id for which the conversion needs to be done. :return string: path to the generated h5 file. - ''' + """ try: logger.info('trying to convert MS uv dataset with otdb_id %s if any', otdb_id) @@ -158,13 +158,13 @@ class QAService(OTDBBusListener): return None def _create_plots_for_h5_file(self, hdf5_path, otdb_id=None): - ''' + """ create plots for the given h5 file. The plots are created via an ssh call to cep4 where the plots are created in parallel in the docker image. :param hdf5_path: the full path to the hdf5 file for which we want the plots. :param otdb_id: the otdb_id of the converted observation/pipeline (is used for logging only) :return: the full directory path to the directory containing the created plots. - ''' + """ try: #use default cep4 qa output dir. plot_dir_path = os.path.join(self.qa_base_dir, 'inspectionplots') @@ -202,7 +202,7 @@ class QAService(OTDBBusListener): def _cluster_h5_file(self, hdf5_path, otdb_id=None): - ''' + """ Try to cluster the baselines based on visibilities in the h5 file using the clustering docker image developed by e-science. This method assumes the adder_clustering docker image is available on cep4. If not, or if anything else @@ -214,7 +214,7 @@ class QAService(OTDBBusListener): :param hdf5_path: the full path to the hdf5 file for which we want the plots. :param otdb_id: the otdb_id of the converted observation/pipeline (is used for logging only) :return: None - ''' + """ try: # the command to cluster the given h5 file (executed in the e-science adder docker image) cmd = ['cluster_this.py', hdf5_path] @@ -239,9 +239,9 @@ class QAService(OTDBBusListener): def main(): - ''' + """ Run the qa service program with commandline arguments. - ''' + """ # Check the invocation arguments parser = OptionParser("%prog [options]", diff --git a/QA/QA_Service/test/t_qa_service.py b/QA/QA_Service/test/t_qa_service.py index 7aab310fb12..98ded88a4c5 100755 --- a/QA/QA_Service/test/t_qa_service.py +++ b/QA/QA_Service/test/t_qa_service.py @@ -46,11 +46,11 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT # the tests below test is multi threaded (even multi process) # define a QABusListener-derivative to handle synchronization (set the *_events) class SynchronizingQABusListener(QABusListener): - ''' + """ the tests below test is multi threaded (even multi process) this QABusListener-derivative handles synchronization (set the *_events) and stores the msg_content results for expected result checking - ''' + """ def __init__(self, busname): super(SynchronizingQABusListener, self).__init__(busname=busname) self.converted_event = Event() @@ -81,15 +81,15 @@ class SynchronizingQABusListener(QABusListener): class TestQAService(unittest.TestCase): - ''' + """ Tests for the QAService class - ''' + """ def setUp(self): - ''' + """ quite complicated setup to setup test qpid exhanges and mock away ssh calls to cep4 and mock away dockerized commands - ''' + """ # setup broker connection self.connection = Connection.establish('127.0.0.1') self.broker = BrokerAgent(self.connection) @@ -168,7 +168,7 @@ class TestQAService(unittest.TestCase): self.connection.close() def send_otdb_task_completing_event(self): - '''helper method: create a ToBus and send a completing EventMessage''' + """helper method: create a ToBus and send a completing EventMessage""" with ToBus(self.busname) as sender: msg = EventMessage(context=DEFAULT_OTDB_NOTIFICATION_SUBJECT, content={"treeID": self.TEST_OTDB_ID, @@ -177,12 +177,12 @@ class TestQAService(unittest.TestCase): sender.send(msg) def test_01_qa_service_for_expected_behaviour(self): - ''' + """ This test starts a QAService, triggers a test observation completing event, and tests if the generated h5 file and plots are as expected. It is an end-to-end test which does not check the intermediate results. It is assumed that the intermediate steps are tested in other tests/modules. - ''' + """ logger.info(' -- test_01_qa_service_for_expected_behaviour -- ') @@ -196,7 +196,7 @@ class TestQAService(unittest.TestCase): mocked_cmd = [create_test_hypercube_path, '-s 4', '-S 8', '-t 16', '-o', str(self.TEST_OTDB_ID), self.TEST_H5_PATH] - logger.info('''mocked_wrap_command_for_docker returning mocked command to create test h5 file: '%s', instead of original command: '%s' ''', + logger.info("""mocked_wrap_command_for_docker returning mocked command to create test h5 file: '%s', instead of original command: '%s' """, ' '.join(mocked_cmd), ' '.join(cmd)) return mocked_cmd @@ -204,11 +204,11 @@ class TestQAService(unittest.TestCase): # replace the cluster command which runs normally in the docker container # by a call to bash true, so the 'cluster_this' call returns 0 exit code mocked_cmd = ['true'] - logger.info('''mocked_wrap_command_for_docker returning mocked command: '%s', instead of original command: '%s' ''', + logger.info("""mocked_wrap_command_for_docker returning mocked command: '%s', instead of original command: '%s' """, ' '.join(mocked_cmd), ' '.join(cmd)) return mocked_cmd - logger.info('''mocked_wrap_command_for_docker returning original command: '%s' ''', ' '.join(cmd)) + logger.info("""mocked_wrap_command_for_docker returning original command: '%s' """, ' '.join(cmd)) return cmd self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker @@ -294,12 +294,12 @@ class TestQAService(unittest.TestCase): self.ssh_cmd_list_mock.assert_not_called() def test_02_qa_service_for_error_in_ms2hdf5(self): - ''' + """ This test starts a QAService, triggers a test observation completing event, and tests if the conversion from MS to hdf5 fails (by intention). It is an end-to-end test which does not check the intermediate results. It is assumed that the intermediate steps are tested in other tests/modules. - ''' + """ logger.info(' -- test_02_qa_service_for_error_in_ms2hdf5 -- ') @@ -342,13 +342,13 @@ class TestQAService(unittest.TestCase): self.ssh_cmd_list_mock.assert_not_called() def test_03_qa_service_for_error_in_creating_plots(self): - ''' + """ This test starts a QAService, triggers a test observation completing event, and tests if the conversion from MS to hdf5 works, but the plot generation fails (by intention). It is an end-to-end test which does not check the intermediate results. It is assumed that the intermediate steps are tested in other tests/modules. - ''' + """ logger.info(' -- test_03_qa_service_for_error_in_creating_plots -- ') @@ -420,12 +420,12 @@ class TestQAService(unittest.TestCase): self.ssh_cmd_list_mock.assert_not_called() def test_04_qa_service_for_error_ssh(self): - ''' + """ This test starts a QAService, triggers a test observation completing event, and tests if conversion fails due to an intentionally failing (mocked) ssh call. It is an end-to-end test which does not check the intermediate results. It is assumed that the intermediate steps are tested in other tests/modules. - ''' + """ logger.info(' -- test_04_qa_service_for_error_ssh -- ') -- GitLab