From 90b3864777626475681ecdd4eeb1617bb500e2dc Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 11 Dec 2018 09:28:12 +0000 Subject: [PATCH] SW-540: moved ssh_cmd_list method into seperate module ssh_utils.py. Added module lcu_utils.py with wrappers around lcurun and the stations.txt file and fetching station calibration tables. --- .gitattributes | 2 + LCS/PyCommon/CMakeLists.txt | 2 + LCS/PyCommon/cep4_utils.py | 14 +- LCS/PyCommon/lcu_utils.py | 267 ++++++++++++++++++++++++++++++++++++ LCS/PyCommon/ssh_utils.py | 33 +++++ 5 files changed, 305 insertions(+), 13 deletions(-) create mode 100644 LCS/PyCommon/lcu_utils.py create mode 100644 LCS/PyCommon/ssh_utils.py diff --git a/.gitattributes b/.gitattributes index dd093894668..83972e2e881 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1684,8 +1684,10 @@ LCS/PyCommon/datetimeutils.py -text LCS/PyCommon/defaultmailaddresses.py -text LCS/PyCommon/factory.py -text LCS/PyCommon/flask_utils.py -text +LCS/PyCommon/lcu_utils.py -text LCS/PyCommon/math.py -text LCS/PyCommon/postgres.py -text +LCS/PyCommon/ssh_utils.py -text LCS/PyCommon/subprocess_utils.py -text LCS/PyCommon/test/python-coverage.sh eol=lf LCS/PyCommon/test/t_cache.py -text diff --git a/LCS/PyCommon/CMakeLists.txt b/LCS/PyCommon/CMakeLists.txt index 341ef071885..8358a8c5709 100644 --- a/LCS/PyCommon/CMakeLists.txt +++ b/LCS/PyCommon/CMakeLists.txt @@ -10,7 +10,9 @@ find_python_module(psycopg2) set(_py_files __init__.py + ssh_utils.py cep4_utils.py + lcu_utils.py cache.py dbcredentials.py defaultmailaddresses.py diff --git a/LCS/PyCommon/cep4_utils.py b/LCS/PyCommon/cep4_utils.py index c0e26c8ac96..da4ad5cb47d 100644 --- a/LCS/PyCommon/cep4_utils.py +++ b/LCS/PyCommon/cep4_utils.py @@ -15,6 +15,7 @@ # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. +from .ssh_utils import ssh_cmd_list from subprocess import check_output, Popen, PIPE from random import randint import math @@ -25,19 +26,6 @@ from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) - -def ssh_cmd_list(host): - ''' - returns a subprocess compliant command list to do an ssh call to the given node - uses ssh option -tt to force remote pseudo terminal - uses ssh option -q for ssh quiet mode (no ssh warnings/errors) - uses ssh option -o StrictHostKeyChecking=no to prevent prompts about host keys - :param host: the node name or ip address - :return: a subprocess compliant command list - ''' - return ['ssh', '-T', '-q', '-o StrictHostKeyChecking=no', host] - - def wrap_command_in_cep4_head_node_ssh_call(cmd): '''wrap the command in an ssh call to head.cep4 :param list cmd: a subprocess cmd list diff --git a/LCS/PyCommon/lcu_utils.py b/LCS/PyCommon/lcu_utils.py new file mode 100644 index 00000000000..ae7c88de51d --- /dev/null +++ b/LCS/PyCommon/lcu_utils.py @@ -0,0 +1,267 @@ +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +from lofar.common.ssh_utils import ssh_cmd_list +from subprocess import Popen, PIPE + +import os +import time +import uuid +import struct + +import logging +logger = logging.getLogger(__name__) + +class LCURuntimeError(RuntimeError): + pass + +def wrap_command_in_lcu_head_node_ssh_call(cmd): + '''wrap the command in an ssh call to head.cep4 + :param list cmd: a subprocess cmd list + cpu node. Otherwise, the command is executed on the head node. + :return: the same subprocess cmd list, but then wrapped with cep4 ssh calls + ''' + return ssh_cmd_list('lcuhead.control.lofar', 'lofarsys') + cmd + +def wrap_command_in_lcu_station_ssh_call(cmd, station, via_head=True): + '''wrap the command in an ssh call the given station lcu node (via lcuhead) + :param list cmd: a subprocess cmd list + :param string station: the station name or lcu hostname (a station name is automagically converted to the lcu hostname) + :param bool via_head: when True, route the cmd first via the lcuhead node + :return: the same subprocess cmd list, but then wrapped with lcu ssh calls + ''' + remote_cmd = ssh_cmd_list(stationname2hostname(station), 'lofarsys') + cmd + if via_head: + return wrap_command_in_lcu_head_node_ssh_call(remote_cmd) + + return remote_cmd + +def get_current_stations(station_group='today', as_host_names=True): + ''' + Wrapper function around the amazing lcurun and stations.txt operators system. + Get a list of the currently used station names, either as hostname, or as parset-like station name (default) + :param station_group - string: one of the predefined operator station groups, like: 'today', 'today_nl', 'core', etc. Defaults to 'today' which means all active stations. + :param as_host_names - bool: return the station names as ssh-able hostnames if True (like cs001c, cs002c). return the station names as parset-like VirtualInstrument.stationList names if False (like CS001, CS002). + :return: the station names for the given station_group as ssh-able hostnames if as_host_names=True (like cs001c, cs002c) or as parset-like VirtualInstrument.stationList names if as_host_names=False (like CS001, CS002). + ''' + cmd = ['cat', '/opt/operations/bin/stations.txt'] + cmd = wrap_command_in_lcu_head_node_ssh_call(cmd) + logger.debug('executing cmd: %s', ' '.join(cmd)) + proc = Popen(cmd, stdout=PIPE, stderr=PIPE) + out, err = proc.communicate() + + if proc.returncode != 0: + raise LCURuntimeError("Could not fetch stations.txt file. sdterr=%s" % (err, )) + + station_file_lines = out.splitlines(False) + station_group_filter = station_group.strip()+' ' + station_group_line = next(l for l in station_file_lines if l.startswith(station_group_filter)) + station_aliases = station_group_line.split(' ')[-1].split(',') + station_hostnames = [] + for station_alias in station_aliases: + # the alias mapping is at the top of the file, so the first matching line holds the mapping + station_alias_line = next(l for l in station_file_lines if station_alias in l) + station_hostname = station_alias_line.split()[0].strip() + station_hostnames.append(station_hostname) + + if as_host_names: + logger.info("stations in group '%s': %s", station_group, ' '.join(station_hostnames)) + return station_hostnames + + station_names = [hostname2stationname(x) for x in station_hostnames] + logger.info("stations in group '%s': %s", station_group, ' '.join(station_names)) + return station_names + +def stationname2hostname(station_name): + '''Convert a parset-like station name to a lcu hostname, like CS001 to cs001c''' + # assume a hostname is encoded as stationname in lowercase with a c appended, like cs001c for CS001 + if not station_name.islower() or not (station_name.endswith('c') or station_name.endswith('control.lofar')): + return station_name.lower().strip() + 'c' + + #assume given station_name is already in the form of an lcu hostname + return station_name + +def hostname2stationname(station_hostname): + '''Convert a lcu hostname to a parset-like station name , like cs001c or cs001c.control.lofar to CS001''' + # assume a hostname is encoded as stationname in lowercase with a c appended, like cs001c for CS001 + return station_name.split('.')[0].strip()[-1].upper() + +def get_stations_rcu_mode(stations=None): + ''' + Get the current rcu mode of a station. + :param stations - string or list of strings: 1 or more station names, or lcu hostnames + :return: dict with station rcu mode integer pairs + ''' + + if stations == None: + stations = get_current_stations(as_host_names=True) + elif isinstance(stations, basestring): + stations = [stations] + + procs = {} + for station in stations: + cmd = ["rspctl", "--rcu | grep ON | awk '{ print $5 }' | grep mode | cut -c 6-6 | sort -u | head -n 1"] + cmd = wrap_command_in_lcu_station_ssh_call(cmd, station, via_head=True) + logger.debug('executing cmd: %s', ' '.join(cmd)) + proc = Popen(cmd, stdout=PIPE, stderr=PIPE) + procs[station] = proc + + result = {} + for station, proc in procs.items(): + out, err = proc.communicate() + + if proc.returncode != 0: + logger.warning("Could not determine rcu mode for station %s. sdterr=%s" % (station, err)) + + try: + rcu_mode = int(out.strip()) + logger.debug('station %s is in rcumode=%s', station, rcu_mode) + result[station] = rcu_mode + except ValueError: + logger.warning("Could not determine rcu mode for station %s. sdterr=%s" % (station, err)) + + return result + +def get_station_calibration_tables(stations=None, antenna_set_and_filter=None): + ''' + Get a dict of stationname:caltable pairs for the given list of stations (or all current stations if None given) + :param stations - string or list of strings: station name(s) or lcu hostname(s) + :param antenna_set_and_filter - string: the antenna_set name and filter'name' from the parset like: LBA_INNER-10_90, or HBA-170_230 etc. If None, then the current rcu_mode is retreived and the caltables for the current rcu mode are returned + :return: dict like {<station_name>: tuple(calibration_header_dict, numpy complex array)} + ''' + if stations == None: + stations = get_current_stations(as_host_names=True) + elif isinstance(stations, basestring): + stations = [stations] + + caltable_files = {} + caltable_procs = {} + caltable_postfixes = {} + + # caltable files have either a LBA_INNER-10_90, or HBA-170_230 etc postfix, or a mode<nr> postfix + # these are essentially the same, and on the lcu's the files are equal (in fact they are symlinked.) + # So, depending on the knowledge of the requester, one eiter asks for an 'antenna_set_and_filter' postfix, or an 'rcu_mode' postfix, + # but the result is the same. + if antenna_set_and_filter: + for station in stations: + caltable_postfixes[station] = '-%s' % (antenna_set_and_filter,) + logger.info('fetching calibration table(s) for %s for stations %s', antenna_set_and_filter, ' '.join(stations)) + else: + rcu_modes = get_stations_rcu_mode(stations) + for station, rcu_mode in rcu_modes.items(): # only loop over stations which have valid rcu_mode + caltable_postfixes[station] = '_mode%s' % (rcu_mode,) + + logger.info('fetching calibration table(s) for rcu mode(s) %s for stations %s', ' '.join(sorted(list(set(rcu_modes.values())))), ' '.join(sorted(rcu_modes.keys()))) + try: + for station, postfix in caltable_postfixes.items(): + # fetch the caltable without intermediate saves to disk using multiple ssh's and pipes. + # write the result in a local temp file for further processing. + # local temp files are removed at end. + tmpfilename = '/tmp/caltable_%s_%s_%s' % (postfix, station, uuid.uuid4()) + caltable_files[station] = tmpfilename + logger.debug('writing caltable for station %s in file %s', station, tmpfilename) + cmd = ['cat', '''/opt/lofar/etc/CalTable%s.dat''' % (postfix, )] + cmd = wrap_command_in_lcu_station_ssh_call(cmd, station, via_head=True) + logger.debug('executing cmd: %s', ' '.join(cmd)) + tmpfile = open(tmpfilename, 'wb') + proc = Popen(cmd, stdout=tmpfile, stderr=PIPE, close_fds=True) + caltable_procs[station] = proc + + # wait for all fetching procs to finish... + #TODO: add timeout? + for station, proc in caltable_procs.items(): + out, err = proc.communicate() + if proc.returncode != 0: + logger.warning("Could not fetch calibration table for station %s. stderr=%s", station, err) + + # gather results... + caltables = {} + # for each station, parse temp file + for station, filename in caltable_files.items(): + try: + # store header and complex table in result dict + caltables[station] = read_station_calibration_file(filename) + except Exception as e: + # log exception anbd just continue with next station caltable file + logger.error("error while parsing calibration file for station %s: %s", station, e) + + finally: + # cleanup all temp files + for filename in caltable_files.values(): + try: + logger.debug('deleting local intermediate caltable file %s', tmpfilename) + os.remove(filename) + except OSError: + pass + + logger.info('fetched calibration table(s) for stations %s', ' '.join(sorted(caltables.keys()))) + + return caltables + +def read_station_calibration_file(filename): + ''' + read and parse a station calibration file and return a tuple of the header and a numpy complex array of cal-values + ''' + try: + # import numpy here and not at top of file, so anybody can use lcu_utils methods, but is not forced into having numpy. + import numpy as np + except ImportError: + raise LCURuntimeError("Cannot interpret station calibration tables without numpy. Please install numpy.") + + with open(filename, 'rb') as tmpfile: + rawdata = tmpfile.read() + + # read header and convert it to a key-value dict + HEADER_END = 'HeaderStop\n' #assume unix line endings + header_end_idx = rawdata.find(HEADER_END) + len(HEADER_END) + header_dict = {} + for line in rawdata[:header_end_idx].splitlines(): + if '=' in line: + items = line.partition('=') + header_dict[items[0].strip()] = items[2].strip() + + # magic numbers (stolen from MAC/APL/PAC/ITRFBeamServer/src/StatCal.cc ) + NUMBER_OF_SUBBANDS = 512 + COMPLEX_SIZE = 2 # a complex is two doubles + + header_station = header_dict.get('CalTableHeader.Observation.Station') + if header_station.startswith('CS') or header_station.startswith('RS'): + NUMBER_OF_ANTENNA = 96 + else: + NUMBER_OF_ANTENNA = 192 #international + + # using the magic numbers and knowledge from MAC/APL/PAC/ITRFBeamServer/src/StatCal.cc + # interpret the byte array as list of doubles, + # convert to numpy array, + # convert to complex, and transpose for correct axes (antenna first, then frequency) + fmt = '%dd' % (NUMBER_OF_ANTENNA * NUMBER_OF_SUBBANDS * COMPLEX_SIZE) + data = np.array(struct.unpack(fmt, rawdata[header_end_idx:header_end_idx+struct.calcsize(fmt)])) + data.resize(NUMBER_OF_SUBBANDS, NUMBER_OF_ANTENNA, COMPLEX_SIZE) + + complexdata = np.empty(shape=(NUMBER_OF_SUBBANDS, NUMBER_OF_ANTENNA), dtype=complex) + complexdata.real = data[:, :, 0] + complexdata.imag = data[:, :, 1] + complexdata = complexdata.transpose() + + # return tuple of header and complex table in result dict + return (header_dict, complexdata) + + + +if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG) + print get_station_calibration_tables(['CS001', 'RS407'], antenna_set_and_filter='LBA_INNER-10_90') #['CS001', 'DE601']) diff --git a/LCS/PyCommon/ssh_utils.py b/LCS/PyCommon/ssh_utils.py new file mode 100644 index 00000000000..2a14103b85d --- /dev/null +++ b/LCS/PyCommon/ssh_utils.py @@ -0,0 +1,33 @@ +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import logging +logger = logging.getLogger(__name__) + +def ssh_cmd_list(host, user='lofarsys'): + ''' + returns a subprocess compliant command list to do an ssh call to the given node + uses ssh option -T to disable remote pseudo terminal + uses ssh option -q for ssh quiet mode (no ssh warnings/errors) + uses ssh option -o StrictHostKeyChecking=no to prevent prompts about host keys + :param host: the node name or ip address + :param user: optional username, defaults to 'lofarsys' + :return: a subprocess compliant command list + ''' + return ['ssh', '-T', '-q', '-o StrictHostKeyChecking=no', user+'@'+host] + + -- GitLab