Skip to content
Snippets Groups Projects
Commit 0a4d9b08 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-378: processed review comment: replaced triple single quotes by triple double quotes

parent 5e815a8b
No related branches found
No related tags found
No related merge requests found
......@@ -25,52 +25,52 @@ CEP4_HEAD_NODE = 'head.cep4.control.lofar'
LOFARSYS_AT_CEP4_HEAD_NODE = 'lofarsys@%s' % (CEP4_HEAD_NODE,)
def ssh_cmd_list(host):
'''
"""
returns a subprocess compliant command list to do an ssh call to the given node
uses ssh option -tt to force remote pseudo terminal
uses ssh option -q for ssh quiet mode (no ssh warnings/errors)
uses ssh option -o StrictHostKeyChecking=no to prevent prompts about host keys
:param host: the node name or ip address
:return: a subprocess compliant command list
'''
"""
return ['ssh', '-T', '-q', '-o StrictHostKeyChecking=no', host]
def wrap_command_in_cep4_head_node_ssh_call(cmd):
'''wrap the command in an ssh call to head.cep4
"""wrap the command in an ssh call to head.cep4
:param list cmd: a subprocess cmd list
cpu node. Otherwise, the command is executed on the head node.
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
'''
"""
return ssh_cmd_list(LOFARSYS_AT_CEP4_HEAD_NODE) + cmd
def wrap_command_in_cep4_random_cpu_node_ssh_call(cmd, via_head=True):
'''wrap the command in an ssh call an available random cep4 cpu node (via head.cep4)
"""wrap the command in an ssh call an available random cep4 cpu node (via head.cep4)
:param list cmd: a subprocess cmd list
:param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
'''
"""
# pick a random available cpu node
node_nrs = get_cep4_available_cpu_nodes()
node_nr = node_nrs[randint(0, len(node_nrs)-1)]
return wrap_command_in_cep4_cpu_node_ssh_call(cmd, node_nr, via_head=via_head)
def wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(cmd, via_head=True):
'''wrap the command in an ssh call to the available random cep4 cpu node with the lowest load (via head.cep4)
"""wrap the command in an ssh call to the available random cep4 cpu node with the lowest load (via head.cep4)
:param list cmd: a subprocess cmd list
:param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
'''
"""
lowest_load_node_nr = get_cep4_available_cpu_node_with_lowest_load()
return wrap_command_in_cep4_cpu_node_ssh_call(cmd, lowest_load_node_nr, via_head=via_head)
def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True):
'''wrap the command in an ssh call the given cep4 cpu node (via head.cep4)
"""wrap the command in an ssh call the given cep4 cpu node (via head.cep4)
:param list cmd: a subprocess cmd list
:param int cpu_node_nr: the number of the cpu node where to execute the command
:param bool via_head: when True, route the cmd first via the cep4 head node
:return: the same subprocess cmd list, but then wrapped with cep4 ssh calls
'''
"""
# hard-coded cpu-node hostname. Might change for future clusters or cluster upgrades.
lofarsys_at_cpu_node = 'lofarsys@cpu%02d.cep4' % (cpu_node_nr,)
remote_cmd = ssh_cmd_list(lofarsys_at_cpu_node) + cmd
......@@ -80,12 +80,12 @@ def wrap_command_in_cep4_cpu_node_ssh_call(cmd, cpu_node_nr, via_head=True):
return remote_cmd
def wrap_command_for_docker(cmd, image_name, image_label=''):
'''wrap the command to be run in a docker container for the lofarsys user and environment
"""wrap the command to be run in a docker container for the lofarsys user and environment
:param list cmd: a subprocess cmd list
:param string image_name: the name of the docker image to run
:param string image_label: the optional label of the docker image to run
:return: the same subprocess cmd list, but then wrapped with docker calls
'''
"""
#fetch the lofarsys user id and group id first from the cep4 head node
id_string = '%s:%s' % (check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-u'])).strip(),
check_output(wrap_command_in_cep4_head_node_ssh_call(['id', '-g'])).strip())
......@@ -102,10 +102,10 @@ def wrap_command_for_docker(cmd, image_name, image_label=''):
'%s:%s' % (image_name, image_label) if image_label else image_name] + cmd
def get_cep4_available_cpu_nodes():
'''
"""
get a list of cep4 cpu nodes which are currently up and running according to slurm
:return: a list of cpu node numbers (ints) for the up and running cpu nodes
'''
"""
available_cep4_nodes = []
try:
......@@ -153,11 +153,11 @@ def get_cep4_available_cpu_nodes():
return available_cep4_nodes
def get_cep4_cpu_nodes_loads(node_nrs=None):
'''
"""
get the 5min load for each given cep4 cpu node nr
:param node_nrs: optional list of node numbers to get the load for. If None, then all available nodes are queried.
:return: dict with node_nr -> load mapping
'''
"""
if node_nrs == None:
node_nrs = get_cep4_available_cpu_nodes()
......@@ -186,10 +186,10 @@ def get_cep4_cpu_nodes_loads(node_nrs=None):
return loads
def get_cep4_available_cpu_nodes_sorted_ascending_by_load():
'''
"""
get the cep4 available cpu node numbers sorted ascending by load (5min).
:return: sorted list of node numbers.
'''
"""
node_nrs = get_cep4_available_cpu_nodes()
loads = get_cep4_cpu_nodes_loads(node_nrs)
sorted_loads = sorted(loads.items(), key=lambda x: x[1])
......@@ -198,10 +198,10 @@ def get_cep4_available_cpu_nodes_sorted_ascending_by_load():
return sorted_node_nrs
def get_cep4_available_cpu_node_with_lowest_load():
'''
"""
get the cep4 cpu node which is available and has the lowest (5min) load of them all.
:return: the node number (int) with the lowest load.
'''
"""
node_nrs = get_cep4_available_cpu_nodes_sorted_ascending_by_load()
if node_nrs:
logger.debug('cep4 cpu node with lowest load: %s', node_nrs[0])
......
......@@ -29,11 +29,11 @@ def normalized_earth_radius(latitude_rad):
def geographic_from_xyz(xyz_m):
'''
"""
convert xyz coordinates to wgs84 coordinates
:param xyz_m: 1D array/list/tuple of x,y,z in meters
:return: tuple of lat_rad, lon_rad, height_m
'''
"""
wgs84_a = 6378137.0
wgs84_f = 1./298.257223563
wgs84_e2 = wgs84_f*(2.0 - wgs84_f)
......@@ -99,9 +99,9 @@ def pqr_from_xyz(xyz_m, xyz0_m=LOFAR_XYZ0_m, matrix=LOFAR_PQR_TO_ETRS_MATRIX):
return transform(xyz_m, xyz0_m, matrix.T)
def interpolation_function(pqr):
'''
"""
Return an interpolation function fn(x, y, z), which returns the value at x, y.
'''
"""
rbfi = Rbf(pqr[:,0], pqr[:,1], 0.0*pqr[:,2], pqr[:,2], function='linear')
def interpolator(x_m, y_m):
return rbfi(x_m, y_m, y_m*0.0)
......@@ -124,11 +124,11 @@ def fit_plane(xyz):
def pqr_cs002_from_xyz(xyz_m):
'''
"""
convert xyz coordinates to lofar pqr coordinates with origin in CS002
:param xyz_m: 1D array/list/tuple of x,y,z in meters
:return: tuple of pqr coords in meters
'''
"""
pqr = pqr_from_xyz(array([xyz_m]),
xyz0_m=array([ 3826577.462, 461022.624, 5064892.526]))
return pqr[0][0], pqr[0][1], pqr[0][2]
......@@ -15,6 +15,8 @@
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# TODO: refactor large functions into collections of smaller function calls and isolate behaviour.
"""Module hdf5_io offers various methods to read/write/modify hdf5 files containing lofar measurement data.
Such an h5 file is usually generated from Lofar Measurement Sets (MS/casacore format) using the ms2hdf5 conversion tool.
......@@ -163,7 +165,7 @@ def write_hypercube(path, saps, parset=None, sas_id=None, wsrta_id=None, do_comp
for antenna, location in antenna_locations[ref_frame].items():
location_sub_group.create_dataset(antenna, data=location)
logger.info('''flagging NaN's and zero's in visibilities for file %s''', path)
logger.info("""flagging NaN's and zero's in visibilities for file %s""", path)
zero_or_nan = np.absolute(visibilities) == 0.0
zero_or_nan[np.isnan(visibilities)] = True
flagging[zero_or_nan] = True
......@@ -1118,7 +1120,7 @@ def fill_info_folder_from_parset(h5_path):
logger.error('Error whle running fill_info_folder_from_parset: %s', e)
def read_info_dict(h5_path):
''' read the info about the observation/pipeline from the h5 file given by h5_path.
""" read the info about the observation/pipeline from the h5 file given by h5_path.
:param str h5_path: h5_path to the h5 file
:return: a dict with the info about the observation/pipeline in native python types, like:
{'PI': 'my_PI',
......@@ -1130,7 +1132,7 @@ def read_info_dict(h5_path):
'antenna_array': 'LBA',
'start_time': datetime.datetime(2018, 6, 11, 11, 0),
'stop_time': datetime.datetime(2018, 6, 11, 12, 0),
'type': 'my_process_subtype'} '''
'type': 'my_process_subtype'} """
with h5py.File(h5_path, "r+") as file:
if not 'measurement/info' in file:
# try to convert old style file with parsets only into new files with info.
......
......@@ -37,31 +37,31 @@ class TestCep4Utils(unittest.TestCase):
self.assertTrue(len(node_nrs) > 0)
def test_03_wrap_command_in_cep4_random_cpu_node_ssh_call(self):
'''
"""
this test calls and tests the functionality of the following methods via
wrap_command_in_cep4_random_cpu_node_ssh_call: get_cep4_available_cpu_nodes, wrap_command_in_cep4_cpu_node_ssh_call
'''
"""
cmd = wrap_command_in_cep4_random_cpu_node_ssh_call(['true'], via_head=True)
logger.info('executing command: %s', ' '.join(cmd))
self.assertEqual(0, call(cmd))
def test_04_wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(self):
'''
"""
this test calls and tests the functionality of the following methods via
wrap_command_in_cep4_random_cpu_node_ssh_call:
get_cep4_available_cpu_nodes, get_cep4_cpu_nodes_loads,
get_cep4_available_cpu_nodes_sorted_ascending_by_load, wrap_command_in_cep4_cpu_node_ssh_call
'''
"""
cmd = wrap_command_in_cep4_available_cpu_node_with_lowest_load_ssh_call(['true'], via_head=True)
logger.info('executing command: %s', ' '.join(cmd))
self.assertEqual(0, call(cmd))
def test_05_wrap_command_for_docker_in_cep4_head_node_ssh_call(self):
'''
"""
this test calls and tests the functionality of wrap_command_for_docker and
wrap_command_in_cep4_head_node_ssh_call.
It is assumed that a docker image is available on head.cep4.
'''
"""
#wrap the command in a docker call first, and then in an ssh call
cmd = wrap_command_for_docker(['true'], 'adder', 'latest')
cmd = wrap_command_in_cep4_head_node_ssh_call(cmd)
......@@ -69,10 +69,10 @@ class TestCep4Utils(unittest.TestCase):
self.assertEqual(0, call(cmd))
def test_06_get_slurm_info_from_within_docker_via_cep4_head(self):
'''
"""
test to see if we can execute a command via ssh on the head node,
from within a docker container, started via ssh on the head node (yes, that's multiple levels of indirection)
'''
"""
# use the slurm sinfo command (because it's available on the head nodes only)...
cmd = ['sinfo']
# ...called on cep4 headnode...
......
......@@ -34,11 +34,11 @@ logger = logging.getLogger(__name__)
#TODO: idea: convert periodically while observing?
class QAService(OTDBBusListener):
'''
"""
QAService listens on the lofar otdb message bus for NotificationMessages and starts qa processes
upon observation/pipeline completion. The qa processes convert MS (measurement sets) to hdf5 qa files,
and then starts generating plots from the hdf5 file.
'''
"""
def __init__(self,
qa_notification_busname=DEFAULT_QA_NOTIFICATION_BUSNAME,
qa_notification_subject_prefix=DEFAULT_QA_NOTIFICATION_SUBJECT_PREFIX,
......@@ -66,36 +66,36 @@ class QAService(OTDBBusListener):
self.qa_base_dir = qa_base_dir
def start_listening(self, numthreads=None):
'''
"""
start listening and open event _send_bus. This method is called in __enter__ when using 'with' context.
'''
"""
super(QAService, self).start_listening(numthreads=numthreads)
self._send_bus.open()
def stop_listening(self):
'''
"""
stop listening and close event _send_bus. This method is called in __exit__ when using 'with' context.
'''
"""
super(QAService, self).stop_listening()
self._send_bus.close()
def onObservationCompleting(self, otdb_id, modificationTime):
'''
"""
this mehod is called automatically upon receiving a Completion NotificationMessage
:param int otdb_id: the task's otdb database id
:param datetime modificationTime: timestamp when the task's status changed to completing
:return: None
'''
"""
logger.info("task with otdb_id %s completed.", otdb_id)
self.do_qa(otdb_id=otdb_id)
def do_qa(self, otdb_id):
'''
"""
try to do all qa (quality assurance) steps for the given otdb_id
resulting in an h5 MS-extract file and inspection plots
:param int otdb_id: observation/pipeline otdb id for which the conversion needs to be done.
:return: None
'''
"""
hdf5_file_path = self._convert_ms2hdf5(otdb_id)
if hdf5_file_path:
self._cluster_h5_file(hdf5_file_path, otdb_id)
......@@ -117,14 +117,14 @@ class QAService(OTDBBusListener):
logger.error('Could not send event message: %s', e)
def _convert_ms2hdf5(self, otdb_id):
'''
"""
convert the MS for the given otdb_id to an h5 MS-extract file.
The conversion will run via ssh on cep4 with massive parellelization.
When running on cep4, it is assumed that a docker image called adder exists on head.cep4
When running locally, it is assumed that ms2hdf5 is installed locally.
:param int otdb_id: observation/pipeline otdb id for which the conversion needs to be done.
:return string: path to the generated h5 file.
'''
"""
try:
logger.info('trying to convert MS uv dataset with otdb_id %s if any', otdb_id)
......@@ -158,13 +158,13 @@ class QAService(OTDBBusListener):
return None
def _create_plots_for_h5_file(self, hdf5_path, otdb_id=None):
'''
"""
create plots for the given h5 file. The plots are created via an ssh call to cep4
where the plots are created in parallel in the docker image.
:param hdf5_path: the full path to the hdf5 file for which we want the plots.
:param otdb_id: the otdb_id of the converted observation/pipeline (is used for logging only)
:return: the full directory path to the directory containing the created plots.
'''
"""
try:
#use default cep4 qa output dir.
plot_dir_path = os.path.join(self.qa_base_dir, 'inspectionplots')
......@@ -202,7 +202,7 @@ class QAService(OTDBBusListener):
def _cluster_h5_file(self, hdf5_path, otdb_id=None):
'''
"""
Try to cluster the baselines based on visibilities in the h5 file
using the clustering docker image developed by e-science.
This method assumes the adder_clustering docker image is available on cep4. If not, or if anything else
......@@ -214,7 +214,7 @@ class QAService(OTDBBusListener):
:param hdf5_path: the full path to the hdf5 file for which we want the plots.
:param otdb_id: the otdb_id of the converted observation/pipeline (is used for logging only)
:return: None
'''
"""
try:
# the command to cluster the given h5 file (executed in the e-science adder docker image)
cmd = ['cluster_this.py', hdf5_path]
......@@ -239,9 +239,9 @@ class QAService(OTDBBusListener):
def main():
'''
"""
Run the qa service program with commandline arguments.
'''
"""
# Check the invocation arguments
parser = OptionParser("%prog [options]",
......
......@@ -46,11 +46,11 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT
# the tests below test is multi threaded (even multi process)
# define a QABusListener-derivative to handle synchronization (set the *_events)
class SynchronizingQABusListener(QABusListener):
'''
"""
the tests below test is multi threaded (even multi process)
this QABusListener-derivative handles synchronization (set the *_events)
and stores the msg_content results for expected result checking
'''
"""
def __init__(self, busname):
super(SynchronizingQABusListener, self).__init__(busname=busname)
self.converted_event = Event()
......@@ -81,15 +81,15 @@ class SynchronizingQABusListener(QABusListener):
class TestQAService(unittest.TestCase):
'''
"""
Tests for the QAService class
'''
"""
def setUp(self):
'''
"""
quite complicated setup to setup test qpid exhanges
and mock away ssh calls to cep4
and mock away dockerized commands
'''
"""
# setup broker connection
self.connection = Connection.establish('127.0.0.1')
self.broker = BrokerAgent(self.connection)
......@@ -168,7 +168,7 @@ class TestQAService(unittest.TestCase):
self.connection.close()
def send_otdb_task_completing_event(self):
'''helper method: create a ToBus and send a completing EventMessage'''
"""helper method: create a ToBus and send a completing EventMessage"""
with ToBus(self.busname) as sender:
msg = EventMessage(context=DEFAULT_OTDB_NOTIFICATION_SUBJECT,
content={"treeID": self.TEST_OTDB_ID,
......@@ -177,12 +177,12 @@ class TestQAService(unittest.TestCase):
sender.send(msg)
def test_01_qa_service_for_expected_behaviour(self):
'''
"""
This test starts a QAService, triggers a test observation completing event,
and tests if the generated h5 file and plots are as expected.
It is an end-to-end test which does not check the intermediate results. It is assumed that
the intermediate steps are tested in other tests/modules.
'''
"""
logger.info(' -- test_01_qa_service_for_expected_behaviour -- ')
......@@ -196,7 +196,7 @@ class TestQAService(unittest.TestCase):
mocked_cmd = [create_test_hypercube_path, '-s 4', '-S 8', '-t 16',
'-o', str(self.TEST_OTDB_ID), self.TEST_H5_PATH]
logger.info('''mocked_wrap_command_for_docker returning mocked command to create test h5 file: '%s', instead of original command: '%s' ''',
logger.info("""mocked_wrap_command_for_docker returning mocked command to create test h5 file: '%s', instead of original command: '%s' """,
' '.join(mocked_cmd), ' '.join(cmd))
return mocked_cmd
......@@ -204,11 +204,11 @@ class TestQAService(unittest.TestCase):
# replace the cluster command which runs normally in the docker container
# by a call to bash true, so the 'cluster_this' call returns 0 exit code
mocked_cmd = ['true']
logger.info('''mocked_wrap_command_for_docker returning mocked command: '%s', instead of original command: '%s' ''',
logger.info("""mocked_wrap_command_for_docker returning mocked command: '%s', instead of original command: '%s' """,
' '.join(mocked_cmd), ' '.join(cmd))
return mocked_cmd
logger.info('''mocked_wrap_command_for_docker returning original command: '%s' ''', ' '.join(cmd))
logger.info("""mocked_wrap_command_for_docker returning original command: '%s' """, ' '.join(cmd))
return cmd
self.wrap_command_for_docker_mock.side_effect = mocked_wrap_command_for_docker
......@@ -294,12 +294,12 @@ class TestQAService(unittest.TestCase):
self.ssh_cmd_list_mock.assert_not_called()
def test_02_qa_service_for_error_in_ms2hdf5(self):
'''
"""
This test starts a QAService, triggers a test observation completing event,
and tests if the conversion from MS to hdf5 fails (by intention).
It is an end-to-end test which does not check the intermediate results. It is assumed that
the intermediate steps are tested in other tests/modules.
'''
"""
logger.info(' -- test_02_qa_service_for_error_in_ms2hdf5 -- ')
......@@ -342,13 +342,13 @@ class TestQAService(unittest.TestCase):
self.ssh_cmd_list_mock.assert_not_called()
def test_03_qa_service_for_error_in_creating_plots(self):
'''
"""
This test starts a QAService, triggers a test observation completing event,
and tests if the conversion from MS to hdf5 works,
but the plot generation fails (by intention).
It is an end-to-end test which does not check the intermediate results. It is assumed that
the intermediate steps are tested in other tests/modules.
'''
"""
logger.info(' -- test_03_qa_service_for_error_in_creating_plots -- ')
......@@ -420,12 +420,12 @@ class TestQAService(unittest.TestCase):
self.ssh_cmd_list_mock.assert_not_called()
def test_04_qa_service_for_error_ssh(self):
'''
"""
This test starts a QAService, triggers a test observation completing event,
and tests if conversion fails due to an intentionally failing (mocked) ssh call.
It is an end-to-end test which does not check the intermediate results. It is assumed that
the intermediate steps are tested in other tests/modules.
'''
"""
logger.info(' -- test_04_qa_service_for_error_ssh -- ')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment