diff --git a/.gitattributes b/.gitattributes index 1d366b15371ad6e867b0df30b1a7c15f83842ce2..2bd26e8c763eac5f7c38d958b5e292523caf0b75 100644 --- a/.gitattributes +++ b/.gitattributes @@ -5350,6 +5350,7 @@ SubSystems/Dragnet/scripts/casacore-measures-tables/get_casacore_measures_data.s SubSystems/LCU_MAC/doc/package.dox -text SubSystems/LTAIngest/CMakeLists.txt -text SubSystems/LTAIngest/doc/package.dox -text +SubSystems/LTAIngestTransfer/CMakeLists.txt -text SubSystems/MCU_MAC/doc/package.dox -text SubSystems/Offline/doc/package.dox -text SubSystems/Online_Cobalt/doc/package.dox -text diff --git a/CEP/Pipeline/framework/lofarpipe/cuisine/cook.py b/CEP/Pipeline/framework/lofarpipe/cuisine/cook.py index 879eeade325d07488534b1ad8d5c33dad4c734a0..cf25121a1a5e39dbdae6f0c9bccf57f37dd0b6de 100644 --- a/CEP/Pipeline/framework/lofarpipe/cuisine/cook.py +++ b/CEP/Pipeline/framework/lofarpipe/cuisine/cook.py @@ -1,5 +1,5 @@ #from message import ErrorLevel, NotifyLevel, VerboseLevel, DebugLevel -import time, os, select, sys, logging, imp +import time, os, select, sys, logging, importlib from lofarpipe.support.pipelinelogging import getSearchingLogger class CookError(Exception): @@ -24,13 +24,13 @@ class PipelineCook(WSRTCook): def __init__(self, task, inputs, outputs, logger, recipe_path): super(PipelineCook, self).__init__(task, inputs, outputs, logger) # Ensures the recipe to be run can be imported from the recipe path + self.logger.warn("Ignoring this recipe_path: " + str(recipe_path)) try: try: - module_details = imp.find_module(task, recipe_path) + module = importlib.import_module("lofarpipe.recipes.master." + task) except ImportError: # ...also support lower-cased file names. - module_details = imp.find_module(task.lower(), recipe_path) - module = imp.load_module(task, *module_details) + module = importlib.import_module("lofarpipe.recipes.master." + task.lower()) self.recipe = None try: self.recipe = getattr(module, task)() diff --git a/CEP/Pipeline/framework/lofarpipe/support/control.py b/CEP/Pipeline/framework/lofarpipe/support/control.py index 3a7061cddb515ddc0bb6c74f141602219afd6d08..cb243a6e2eab28ca429f745304419c1da394b640 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/control.py +++ b/CEP/Pipeline/framework/lofarpipe/support/control.py @@ -163,9 +163,10 @@ class control(StatefulRecipe): self.logger.error("Detailed exception information:") self.logger.error(str(type)) self.logger.error(str(value)) + self.logger.exception(message) # Get the stacktrace and pretty print it: - # self.logger.error("\n" + " ".join(traceback.format_list( - # traceback.extract_tb(traceback_object)))) + self.logger.error("\n" + " ".join(traceback.format_list( + traceback.extract_tb(traceback_object)))) self.logger.error("*******************************************") @@ -184,7 +185,7 @@ class control(StatefulRecipe): xmlfile = self.config.get("logging", "xml_stat_file") try: fp = open(xmlfile, "w") - fp.write(get_active_stack(self).toxml(encoding='ascii')) + fp.write(get_active_stack(self).toxml(encoding='ascii').decode('ascii')) fp.close() except Exception as except_object: self.logger.error("Failed opening xml stat file:") diff --git a/CEP/Pipeline/framework/lofarpipe/support/jobserver.py b/CEP/Pipeline/framework/lofarpipe/support/jobserver.py index b53682ca0fa24a872b09eb3e3208b1b454c80e39..28dbf8044a6de897dfaa4dc5ab290ba4b177def2 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/jobserver.py +++ b/CEP/Pipeline/framework/lofarpipe/support/jobserver.py @@ -53,18 +53,26 @@ class JobStreamHandler(socketserver.StreamRequestHandler): # Read message chunk = socket_recv(self.request, slen) try: - input_msg = chunk.split(" ", 2) + input_msg = chunk.split(b" ", 2) + try: + command = input_msg[0].decode() + except UnicodeDecodeError: + # This will be a log record + command = "" # Can we handle this message type? - if input_msg[0] == "GET": - self.send_arguments(int(input_msg[1])) - elif input_msg[0] == "PUT": - self.read_results(input_msg[1], input_msg[2]) + if command == "GET": + job_id = input_msg[1].decode() + self.send_arguments(job_id) + elif command == "PUT": + job_id = input_msg[1].decode() + self.read_results(job_id, input_msg[2]) else: self.handle_log_record(chunk) - except: + except Exception as e: # Otherwise, fail. self.server.error.set() + self.server.logger.exception(e) self.server.logger.error("Protocol error; received %s" % chunk) self.server.logger.error("Aborting.") diff --git a/CEP/Pipeline/framework/lofarpipe/support/lofaringredient.py b/CEP/Pipeline/framework/lofarpipe/support/lofaringredient.py index 296bf216b71e9d6abe55a6968546739c247cb7e1..28b244d02f5dfa87145c064bb6c6c753d484a682 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/lofaringredient.py +++ b/CEP/Pipeline/framework/lofarpipe/support/lofaringredient.py @@ -228,6 +228,10 @@ class LOFARingredient(MutableMapping): for field in self._fields: yield field + def __delitem__(self, key): + del self._values[key] + del self._fields[key] + def __getitem__(self, key): # If we don't have the value for this key, but we do have a field with # a valid default, return that. diff --git a/CEP/Pipeline/framework/lofarpipe/support/lofarnode.py b/CEP/Pipeline/framework/lofarpipe/support/lofarnode.py index 916856414bd443959c1cf0d8ef08f169223a7789..e01cd6f6f7869fe2489c4a8ad30f20db0a8b622d 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/lofarnode.py +++ b/CEP/Pipeline/framework/lofarpipe/support/lofarnode.py @@ -134,8 +134,8 @@ class LOFARnodeTCP(LOFARnode): self.__try_connect(s) # send request - message = "GET %d" % self.job_id - s.sendall(struct.pack(">L", len(message)) + message.encode("ascii")) + message = ("GET %d" % self.job_id).encode() + s.sendall(struct.pack(">L", len(message)) + message) # receive response length chunk = socket_recv(s, 4) @@ -165,8 +165,8 @@ class LOFARnodeTCP(LOFARnode): Send the contents of self.outputs to the originating job dispatch server. """ - message = "PUT %d %s" % (self.job_id, pickle.dumps(self.outputs)) + message = ("PUT %d " % (self.job_id,)).encode() + pickle.dumps(self.outputs) s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__try_connect(s) - s.sendall(struct.pack(">L", len(message)) + message.encode("ascii")) + s.sendall(struct.pack(">L", len(message)) + message) diff --git a/CEP/Pipeline/framework/lofarpipe/support/loggingdecorators.py b/CEP/Pipeline/framework/lofarpipe/support/loggingdecorators.py index 020226a33aad0e6c5a01d46c92e11ecd2c0b426a..4465f4e675f5cf961afd3cb8b9a7b68ad1e6d294 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/loggingdecorators.py +++ b/CEP/Pipeline/framework/lofarpipe/support/loggingdecorators.py @@ -153,7 +153,7 @@ def mail_log_on_exception(target): "duration", duration_recipe) simplyfied_pipeline_xml = strip_xml_to_master_details( stack, calling_object.logger) - msg_string = simplyfied_pipeline_xml.toprettyxml(encoding='ascii') + msg_string = simplyfied_pipeline_xml.toprettyxml(encoding='ascii').decode('ascii') else: msg_string = "duration: {0} \n "\ @@ -186,7 +186,7 @@ def _send_mail_notification(calling_object, message): stack, calling_object.logger) active_stack_data = simplyfied_pipeline_xml.toprettyxml( - encoding='ascii') + encoding='ascii').decode('ascii') except: pass diff --git a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py index e4c6e5431e303c8ce319b4a5c603e07520f3b187..7dc62d9353f6886efe3c818757ef873176155536 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py +++ b/CEP/Pipeline/framework/lofarpipe/support/remotecommand.py @@ -183,7 +183,7 @@ def run_via_custom_cmdline(logger, host, command, environment, arguments, config if len(args) == 1: return args[0] - return args[1] if args[0] == "python" else args[0] + return args[1] if args[0].startswith("python") else args[0] # Construct the full command line full_command_line = config.get('remote', 'cmdline').format( @@ -623,6 +623,6 @@ class RemoteCommandRecipeMixIn(object): # output does not matter self.outputs._fields["return_xml"] = ingredient.StringField( help = "XML return data.") - self.outputs["return_xml"] = node_durations.toxml(encoding = "ascii") + self.outputs["return_xml"] = node_durations.toxml(encoding = "ascii").decode('ascii') return jobpool diff --git a/CEP/Pipeline/framework/lofarpipe/support/stateful.py b/CEP/Pipeline/framework/lofarpipe/support/stateful.py index c4db7926fa23d3b5529db4d655a648899855bdfb..ffd7149fd4a3d9e1a2f81d49e9413d8a8c3a68b9 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/stateful.py +++ b/CEP/Pipeline/framework/lofarpipe/support/stateful.py @@ -83,7 +83,7 @@ class StatefulRecipe(BaseRecipe): self.config.get('layout', 'job_directory'), 'statefile' ), - 'w') + 'bw') state = [self.inputs, self.state] pickle.dump(state, statefile) diff --git a/CEP/Pipeline/framework/lofarpipe/support/usagestats.py b/CEP/Pipeline/framework/lofarpipe/support/usagestats.py index d3b83646b0d932ffefe5538aa433b4e80cd8ca3c..2939a1cb0908eb8e983e58096dde0d829ee83ef7 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/usagestats.py +++ b/CEP/Pipeline/framework/lofarpipe/support/usagestats.py @@ -195,7 +195,7 @@ class UsageStats(threading.Thread): if not self.pid_stats: # if there are no entries in the stats dict resource_stat_xml.setAttribute("noStatsRecorded", "true") - return resource_stat_xml.toxml(encoding = "ascii") + return resource_stat_xml.toxml(encoding = "ascii").decode('ascii') try: # TODO: The returned values are not in order and the owner PID @@ -226,7 +226,7 @@ class UsageStats(threading.Thread): self.logger.warn("monitoring statistic recording failed") resource_stat_xml.setAttribute("noStatsRecorded", "Exception") # TODO: coalesce these two returns in one "finally:" - return resource_stat_xml.toxml(encoding = "ascii") + return resource_stat_xml.toxml(encoding = "ascii").decode('ascii') - return resource_stat_xml.toxml(encoding = "ascii") + return resource_stat_xml.toxml(encoding = "ascii").decode('ascii') diff --git a/CEP/Pipeline/framework/lofarpipe/support/utilities.py b/CEP/Pipeline/framework/lofarpipe/support/utilities.py index 3bd883d834541f01fa650020051cbc6231591499..fcc57619ef6c7de636978acb3bd65b6156d41ec7 100644 --- a/CEP/Pipeline/framework/lofarpipe/support/utilities.py +++ b/CEP/Pipeline/framework/lofarpipe/support/utilities.py @@ -12,6 +12,7 @@ from itertools import islice, repeat, chain from contextlib import closing, contextmanager from random import randint +from lofar.common.subprocess_utils import communicate_returning_strings import warnings import os @@ -93,7 +94,7 @@ def disk_usage(*paths): """ cmd = ['du', '-s', '-b'] proc = subprocess.Popen(cmd + list(paths), stdout = subprocess.PIPE) - sout = proc.communicate()[0] + sout = communicate_returning_strings(proc)[0] if sout: return sum([int(s.split('\t')[0]) for s in sout.strip().split('\n')]) else: @@ -204,7 +205,7 @@ def read_initscript(logger, filename, shell = "/bin/sh"): stderr = subprocess.PIPE, close_fds = True ) - so, se = p.communicate() + so, se = communicate_returning_strings(p) environment = [x.split('=', 1) for x in so.strip().split('\n')] environment = [x for x in environment if len(x) == 2] return dict(environment) @@ -281,7 +282,7 @@ def catch_segfaults(cmd, cwd, env, logger, max = 1, cleanup = lambda: None, while process.returncode is None: process.poll() time.sleep(1) - sout, serr = process.communicate() + sout, serr = communicate_returning_strings(process) log_process_output(cmd[0], sout, serr, logger) if process.returncode == 0: break @@ -306,7 +307,7 @@ def socket_recv(socket, numbytes): Raises IOError if connection has closed before all data could be read. """ - data = "" + data = b"" while numbytes > 0: try: chunk = socket.recv(numbytes) diff --git a/CEP/Pipeline/recipes/sip/master/bbs_reducer.py b/CEP/Pipeline/recipes/sip/master/bbs_reducer.py index 25bab0248d2594ef79d032b65076c6daa5f51621..ef2dfd99f571c522f53a3ac1d6ad741c2843e4a2 100644 --- a/CEP/Pipeline/recipes/sip/master/bbs_reducer.py +++ b/CEP/Pipeline/recipes/sip/master/bbs_reducer.py @@ -107,7 +107,7 @@ class bbs_reducer(BaseRecipe, RemoteCommandRecipeMixIn): """ Create and schedule the compute jobs """ - command = "python %s" % (self.__file__.replace('master', 'nodes')) + command = "python3 %s" % (self.__file__.replace('master', 'nodes')) self.data_map.iterator = DataMap.SkipIterator self.inst_map.iterator = DataMap.SkipIterator self.sky_map.iterator = DataMap.SkipIterator diff --git a/CEP/Pipeline/recipes/sip/master/dppp.py b/CEP/Pipeline/recipes/sip/master/dppp.py index 4c109f7f3008d9b65b755324fdc2a4741f7ac642..d3841241160bd5666f527296aa740dfb4bf8dc37 100644 --- a/CEP/Pipeline/recipes/sip/master/dppp.py +++ b/CEP/Pipeline/recipes/sip/master/dppp.py @@ -212,7 +212,7 @@ class dppp(BaseRecipe, RemoteCommandRecipeMixIn): # ******************************************************************** # 3. Call the node side of the recipe # Create and schedule the compute jobs - command = "python %s" % (self.__file__.replace('master', 'nodes')) + command = "python3 %s" % (self.__file__.replace('master', 'nodes')) indata.iterator = outdata.iterator = DataMap.SkipIterator parmdbdata.iterator = sourcedbdata.iterator = DataMap.SkipIterator jobs = [] diff --git a/CEP/Pipeline/recipes/sip/master/executable_args.py b/CEP/Pipeline/recipes/sip/master/executable_args.py index 47f6a7d0e672beff9e331485003c4b732cfd262f..212eba4cab07a9faa4a65345479e72567ba88a2f 100644 --- a/CEP/Pipeline/recipes/sip/master/executable_args.py +++ b/CEP/Pipeline/recipes/sip/master/executable_args.py @@ -327,17 +327,17 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): # ******************************************************************** # Call the node side of the recipe # Create and schedule the compute jobs - #command = "python %s" % (self.__file__.replace('master', 'nodes')).replace('executable_args', self.inputs['nodescript']) + #command = "python3 %s" % (self.__file__.replace('master', 'nodes')).replace('executable_args', self.inputs['nodescript']) recipe_dir_str = str(self.config.get('DEFAULT', 'recipe_directories')) recipe_directories = recipe_dir_str.rstrip(']').lstrip('[').split(',') pylist = os.getenv('PYTHONPATH').split(':') command = None for pl in pylist: if os.path.isfile(os.path.join(pl,'lofarpipe/recipes/nodes/'+self.inputs['nodescript']+'.py')): - command = "python %s" % os.path.join(pl,'lofarpipe/recipes/nodes/'+self.inputs['nodescript']+'.py') + command = "python3 %s" % os.path.join(pl,'lofarpipe/recipes/nodes/'+self.inputs['nodescript']+'.py') for pl in recipe_directories: if os.path.isfile(os.path.join(pl,'nodes/'+self.inputs['nodescript']+'.py')): - command = "python %s" % os.path.join(pl,'nodes/'+self.inputs['nodescript']+'.py') + command = "python3 %s" % os.path.join(pl,'nodes/'+self.inputs['nodescript']+'.py') inputmapfiles[0].iterator = outputmapfiles[0].iterator = DataMap.SkipIterator jobs = [] diff --git a/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py b/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py index 0e025df417c99703cea4fd9aaeec317a6fc139da..cb4aca8bc30b7b5b4b072a208662124175e9c1ca 100644 --- a/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py +++ b/CEP/Pipeline/recipes/sip/master/gainoutliercorrection.py @@ -122,7 +122,7 @@ class gainoutliercorrection(BaseRecipe, RemoteCommandRecipeMixIn): # ******************************************************************** # 3. Call node side of the recipe - command = "python %s" % (self.__file__.replace('master', 'nodes')) + command = "python3 %s" % (self.__file__.replace('master', 'nodes')) indata.iterator = outdata.iterator = DataMap.SkipIterator jobs = [] for inp, outp in zip(indata, outdata): diff --git a/CEP/Pipeline/recipes/sip/master/get_metadata.py b/CEP/Pipeline/recipes/sip/master/get_metadata.py index afdb4429bba9d9625b666e4999b53fdbbd2adb3d..73554b6c3dbef33b3e6418a0f32077ffdff3f6a2 100644 --- a/CEP/Pipeline/recipes/sip/master/get_metadata.py +++ b/CEP/Pipeline/recipes/sip/master/get_metadata.py @@ -78,7 +78,7 @@ class get_metadata(BaseRecipe, RemoteCommandRecipeMixIn): # ******************************************************************** # 3. call node side of the recipe - command = "python %s" % (self.__file__.replace('master', 'nodes')) + command = "python3 %s" % (self.__file__.replace('master', 'nodes')) data.iterator = DataMap.SkipIterator jobs = [] for inp in data: diff --git a/CEP/Pipeline/recipes/sip/master/imager_awimager.py b/CEP/Pipeline/recipes/sip/master/imager_awimager.py index b1b2365feab84ca29d7fe8b7628cab43ba5594af..2afebe04f23c964bdfacb44ad7943ffdf166c283 100644 --- a/CEP/Pipeline/recipes/sip/master/imager_awimager.py +++ b/CEP/Pipeline/recipes/sip/master/imager_awimager.py @@ -114,7 +114,7 @@ class imager_awimager(BaseRecipe, RemoteCommandRecipeMixIn): # ********************************************************************* # 2. Start the node side of the awimager recipe # Compile the command to be executed on the remote machine - node_command = "python %s" % (self.__file__.replace("master", "nodes")) + node_command = "python3 %s" % (self.__file__.replace("master", "nodes")) jobs = [] output_map = copy.deepcopy(input_map) diff --git a/CEP/Pipeline/recipes/sip/master/new_bbs.py b/CEP/Pipeline/recipes/sip/master/new_bbs.py index d0162ab1f36c3af11a6153085dc534b13318a3e1..829878bfbc16872f8a6d6ac83dae476ed5c87202 100644 --- a/CEP/Pipeline/recipes/sip/master/new_bbs.py +++ b/CEP/Pipeline/recipes/sip/master/new_bbs.py @@ -274,7 +274,7 @@ class new_bbs(BaseRecipe): # ComputeJob.dispatch method supplies, so we'll control them # with our own threads. # -------------------------------------------------------------- - command = "python %s" % (self.__file__.replace('master', 'nodes')) + command = "python3 %s" % (self.__file__.replace('master', 'nodes')) jobpool = {} bbs_kernels = [] with job_server(self.logger, jobpool, self.error) as(jobhost, diff --git a/CEP/Pipeline/recipes/sip/master/rficonsole.py b/CEP/Pipeline/recipes/sip/master/rficonsole.py index 88d94ab038e15a3854dfe1c386ab2cfc0c1bae8b..1196fe3e4ce479b5872c4a166c1ef23a9de66df4 100644 --- a/CEP/Pipeline/recipes/sip/master/rficonsole.py +++ b/CEP/Pipeline/recipes/sip/master/rficonsole.py @@ -98,7 +98,7 @@ class rficonsole(BaseRecipe, RemoteCommandRecipeMixIn): else: strategy = None - command = "python %s" % (self.__file__.replace('master', 'nodes')) + command = "python3 %s" % (self.__file__.replace('master', 'nodes')) jobs = [] for host, file_lists in hostlist.items(): for file_list in file_lists: diff --git a/CEP/Pipeline/recipes/sip/master/selfcal_awimager.py b/CEP/Pipeline/recipes/sip/master/selfcal_awimager.py index 961328db79bc90bcfc1ffc70bc2e6be617a8f380..a7d8cf826d0f362f4cf2a648691d4fa2ac6afa9a 100644 --- a/CEP/Pipeline/recipes/sip/master/selfcal_awimager.py +++ b/CEP/Pipeline/recipes/sip/master/selfcal_awimager.py @@ -124,7 +124,7 @@ class selfcal_awimager(BaseRecipe, RemoteCommandRecipeMixIn): # ********************************************************************* # 2. Start the node side of the awimager recipe # Compile the command to be executed on the remote machine - node_command = "python %s" % (self.__file__.replace("master", "nodes")) + node_command = "python3 %s" % (self.__file__.replace("master", "nodes")) jobs = [] output_map = copy.deepcopy(input_map) diff --git a/CEP/Pipeline/recipes/sip/master/setupparmdb.py b/CEP/Pipeline/recipes/sip/master/setupparmdb.py index 1736f0547e0ad8ba53788c57abd26619d0bae641..acb7e33ee09d449051d871da8ed4d7beb9087176 100644 --- a/CEP/Pipeline/recipes/sip/master/setupparmdb.py +++ b/CEP/Pipeline/recipes/sip/master/setupparmdb.py @@ -104,7 +104,7 @@ class setupparmdb(BaseRecipe, RemoteCommandRecipeMixIn): stdout=subprocess.PIPE, stderr=subprocess.PIPE ) - sout, serr = communicate_returning_strings(input=(template % pdbfile)) + sout, serr = communicate_returning_strings(parmdbm_process, input=(template % pdbfile).encode()) log_process_output("parmdbm", sout, serr, self.logger) except OSError as err: self.logger.error("Failed to spawn parmdbm: %s" % str(err)) @@ -138,7 +138,7 @@ class setupparmdb(BaseRecipe, RemoteCommandRecipeMixIn): os.path.basename(item.file) + self.inputs['suffix'] ) # Call the node side - command = "python %s" % (self.__file__.replace('master', 'nodes')) + command = "python3 %s" % (self.__file__.replace('master', 'nodes')) outdata.iterator = DataMap.SkipIterator jobs = [] for outp in outdata: diff --git a/CEP/Pipeline/recipes/sip/master/setupsourcedb.py b/CEP/Pipeline/recipes/sip/master/setupsourcedb.py index 7a6d121e4be4c077401e2578b90b231c5111dc02..19849133e3e9048ca42a23c0084e96be8164da25 100644 --- a/CEP/Pipeline/recipes/sip/master/setupsourcedb.py +++ b/CEP/Pipeline/recipes/sip/master/setupsourcedb.py @@ -114,7 +114,7 @@ class setupsourcedb(BaseRecipe, RemoteCommandRecipeMixIn): # ******************************************************************** # 3. Call node side of script - command = "python %s" % (self.__file__.replace('master', 'nodes')) + command = "python3 %s" % (self.__file__.replace('master', 'nodes')) outdata.iterator = DataMap.SkipIterator jobs = [] for outp in outdata: diff --git a/CEP/Pipeline/recipes/sip/master/vdsmaker.py b/CEP/Pipeline/recipes/sip/master/vdsmaker.py index a38bac8d0d210f015c6d66c30f19016e0c5cd73a..95b3bb3af63ccbbc84dfcffd9b4fe0c4171b0943 100644 --- a/CEP/Pipeline/recipes/sip/master/vdsmaker.py +++ b/CEP/Pipeline/recipes/sip/master/vdsmaker.py @@ -89,7 +89,7 @@ class vdsmaker(BaseRecipe, RemoteCommandRecipeMixIn): # ********************************************************************* # 2. Call vdsmaker - command = "python %s" % (self.__file__.replace('master', 'nodes')) + command = "python3 %s" % (self.__file__.replace('master', 'nodes')) jobs = [] for inp, vdsfile in zip(data, vdsnames): jobs.append( diff --git a/CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py b/CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py index 5b1c2939d022cb55e7d3fa75a246ce7258a7b933..16e6b5ad46799b3d99013bbc919eac92c8c4c497 100644 --- a/CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py +++ b/CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py @@ -38,10 +38,8 @@ class setupsourcedb(LOFARnodeTCP): try: os.makedirs(skydb_dir) self.logger.debug("Created output directory %s" % skydb_dir) - except OSError as err: - # Ignore error if directory already exists, otherwise re-raise - if err[0] != errno.EEXIST: - raise + except FileExistsError: + pass # **************************************************************** # 2 Remove any old sky database diff --git a/CEP/Pipeline/recipes/sip/pipeline.cfg.CEP4.tmpl b/CEP/Pipeline/recipes/sip/pipeline.cfg.CEP4.tmpl index bca2626b704ac8473c983dbcf038a76cce09803c..812e5d635692d25db2a964981d00aa6b3c65eefb 100644 --- a/CEP/Pipeline/recipes/sip/pipeline.cfg.CEP4.tmpl +++ b/CEP/Pipeline/recipes/sip/pipeline.cfg.CEP4.tmpl @@ -6,7 +6,7 @@ casaroot = /opt/casacore pyraproot = /opt/python-casacore/pyrap hdf5root = wcsroot = /opt/wcslib -pythonpath = /opt/lofar/lib/python3.4/site-packages +pythonpath = /opt/lofar/lib/python3.6/site-packages # runtime dir is a global FS (nfs, lustre) to exchange small files (parsets, vds, map files, etc) runtime_directory = /data/share/pipeline recipe_directories = [%(pythonpath)s/lofarpipe/recipes] diff --git a/CEP/Pipeline/recipes/sip/tasks.cfg.CEP4.in b/CEP/Pipeline/recipes/sip/tasks.cfg.CEP4.in index ae3c3cc32e9f23239a79db799485ed6421aad1d1..3efaf40faa1d9e6b576122a35ac6c5a48841371c 100644 --- a/CEP/Pipeline/recipes/sip/tasks.cfg.CEP4.in +++ b/CEP/Pipeline/recipes/sip/tasks.cfg.CEP4.in @@ -11,12 +11,6 @@ nproc = 0 [vdsmaker] nproc = 0 -[rficonsole] -nproc = 0 - -[imager_prepare] -nthreads = 2 - [long_baseline] nproc = 0 rficonsole_executable = /opt/aoflagger/bin/aoflagger @@ -34,6 +28,7 @@ nthreads = 2 executable = /opt/aoflagger/bin/aoflagger max_per_node = 0 nthreads = 2 +nproc = 0 [imager_prepare] rficonsole_executable = /opt/aoflagger/bin/aoflagger diff --git a/CEP/Pipeline/recipes/sip/tasks.cfg.in b/CEP/Pipeline/recipes/sip/tasks.cfg.in index dcc46ffb7be3487a2cf3bd2dd21c223828ee8d91..0eb10b996e5238306cfff8aea848555ea51d9f66 100644 --- a/CEP/Pipeline/recipes/sip/tasks.cfg.in +++ b/CEP/Pipeline/recipes/sip/tasks.cfg.in @@ -48,10 +48,6 @@ recipe = gainoutliercorrection executable = %(lofarroot)s/bin/parmexportcal mapfile = %(runtime_directory)s/%(job_name)s/mapfiles/instrument.mapfile -[rficonsole] -recipe = rficonsole -executable = %(aoflaggerroot)s/bin/aoflagger - [get_metadata] recipe = get_metadata diff --git a/CMake/LofarPackageList.cmake b/CMake/LofarPackageList.cmake index 44cd7e787a7c1956a4b1e07551c23ccddbb9442a..0fbe04785b24d5b5f8b45dd90f66f8e33be85a9c 100644 --- a/CMake/LofarPackageList.cmake +++ b/CMake/LofarPackageList.cmake @@ -1,7 +1,7 @@ # - Create for each LOFAR package a variable containing the absolute path to # its source directory. # -# Generated by gen_LofarPackageList_cmake.sh at ma 17 jun 2019 16:04:50 CEST +# Generated by gen_LofarPackageList_cmake.sh at do 27 jun 2019 14:25:48 CEST # # ---- DO NOT EDIT ---- # @@ -210,4 +210,5 @@ if(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) set(Dragnet_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/Dragnet) set(LTAIngest_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/LTAIngest) set(Cobalt_validation_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/Online_Cobalt/validation) + set(LTAIngestTransfer_SOURCE_DIR ${CMAKE_SOURCE_DIR}/SubSystems/LTAIngestTransfer) endif(NOT DEFINED LOFAR_PACKAGE_LIST_INCLUDED) diff --git a/Docker/lofar-pipeline/Dockerfile.tmpl b/Docker/lofar-pipeline/Dockerfile.tmpl index fd9b6db08e50d2f9cde04552aa303c3c5b83d1e9..2a8fa025802f8661c54cd5a37c12ca1c2b08beba 100644 --- a/Docker/lofar-pipeline/Dockerfile.tmpl +++ b/Docker/lofar-pipeline/Dockerfile.tmpl @@ -107,7 +107,7 @@ RUN export BUILD_PACKAGES="doxygen git cmake g++ libxml++${LIBXMLPP_VERSION}-dev cd ${INSTALLDIR}/aoflagger/aoflagger-${AOFLAGGER_VERSION} && git checkout ${AOFLAGGER_VERSION//latest/master} && \ mkdir ${INSTALLDIR}/aoflagger/build && \ cd ${INSTALLDIR}/aoflagger/build && \ - cmake -DCASACORE_ROOT_DIR=${INSTALLDIR}/casacore/ -DBUILD_SHARED_LIBS=ON -DCMAKE_CXX_FLAGS="${CXX_FLAGS} -DNDEBUG" -DCMAKE_INSTALL_PREFIX=${INSTALLDIR}/aoflagger ${INSTALLDIR}/aoflagger/aoflagger-${AOFLAGGER_VERSION} && \ + cmake -DCASACORE_ROOT_DIR=${INSTALLDIR}/casacore/ -DPORTABLE=True -DBUILD_SHARED_LIBS=ON -DCMAKE_CXX_FLAGS="${CXX_FLAGS} -DNDEBUG" -DCMAKE_INSTALL_PREFIX=${INSTALLDIR}/aoflagger ${INSTALLDIR}/aoflagger/aoflagger-${AOFLAGGER_VERSION} && \ make -j ${J} && \ make install && \ cd .. && \ @@ -135,7 +135,7 @@ RUN export BUILD_PACKAGES="git cmake g++ python3-setuptools doxygen libgsl0-dev cd ${INSTALLDIR}/dysco && \ mkdir build && \ cd build && \ - cmake -DCMAKE_CXX_FLAGS="${CXX_FLAGS}" -DCMAKE_INSTALL_PREFIX=${INSTALLDIR}/dysco/ -DCASACORE_ROOT_DIR=${INSTALLDIR}/casacore/ ${INSTALLDIR}/dysco/dysco-${DYSCO_VERSION} && \ + cmake -DCMAKE_CXX_FLAGS="${CXX_FLAGS}" -DPORTABLE=True -DCMAKE_INSTALL_PREFIX=${INSTALLDIR}/dysco/ -DCASACORE_ROOT_DIR=${INSTALLDIR}/casacore/ ${INSTALLDIR}/dysco/dysco-${DYSCO_VERSION} && \ make -j ${J} && \ make install && \ #mkdir -p ${INSTALLDIR}/dysco/lib/python${PYTHON_VERSION}/site-packages/ && \ @@ -207,7 +207,7 @@ RUN export BUILD_PACKAGES="git cmake g++ swig3.0 python3-setuptools python3-dev # ******************* # # Run-time dependencies -RUN aptitude install -y libncurses${NCURSES_VERSION} liblog4cplus-${LIBLOG4CPLUS_VERSION} libhdf5-${LIBHDF5_VERSION} libboost-chrono${BOOST_VERSION}.1 libboost-program-options${BOOST_VERSION}.1 libboost-python${BOOST_VERSION}.1 libboost-regex${BOOST_VERSION}.1 python3 libxml2 libpng-tools liblapack${LIBLAPACK_VERSION} libfftw3-bin libxml++${LIBXMLPP_VERSION}-2v5 libgsl${LIBGSL_VERSION} libreadline${READLINE_VERSION} binutils libcfitsio-bin libwcs5 libopenblas-base libpqxx-${LIBPQXX_VERSION} libqpid-proton8 libqpid-proton-cpp8 python3-qpid-proton python3-pg python3-psycopg2 && \ +RUN aptitude install -y libncurses${NCURSES_VERSION} liblog4cplus-${LIBLOG4CPLUS_VERSION} libhdf5-${LIBHDF5_VERSION} libboost-chrono${BOOST_VERSION}.1 libboost-program-options${BOOST_VERSION}.1 libboost-python${BOOST_VERSION}.1 libboost-regex${BOOST_VERSION}.1 python3 libxml2 libpng-tools liblapack${LIBLAPACK_VERSION} libfftw3-bin libxml++${LIBXMLPP_VERSION}-2v5 libgsl${LIBGSL_VERSION} libreadline${READLINE_VERSION} binutils libcfitsio-bin libwcs5 libopenblas-base libpqxx-${LIBPQXX_VERSION} libqpid-proton8 libqpid-proton-cpp8 python3-qpid-proton python3-pg python3-psycopg2 python3-requests && \ aptitude clean && \ aptitude autoclean diff --git a/Docker/lofar-pulp/Dockerfile.tmpl b/Docker/lofar-pulp/Dockerfile.tmpl index 3637870621b0d0ea8a0854da7dc63b39671c0776..894990f5995e269ec968e449020bc0d09a81a8f4 100644 --- a/Docker/lofar-pulp/Dockerfile.tmpl +++ b/Docker/lofar-pulp/Dockerfile.tmpl @@ -32,6 +32,11 @@ RUN apt-get update && apt-get install -y ruby ruby-dev libsasl2-dev uuid-dev lib apt-get purge -y ruby ruby-dev libsasl2-dev uuid-dev libxerces-c-dev libnss3-dev libnspr4-dev help2man libsslcommon2-dev libxqilla-dev libboost-program-options${BOOST_VERSION}-dev libboost-filesystem${BOOST_VERSION}-dev && \ apt-get autoremove -y +# ******************* +# Kombu client for Python +# ******************* +RUN aptitude install -y python3-kombu + # # ******************* # LOFAR diff --git a/LCS/Messaging/python/messaging/CMakeLists.txt b/LCS/Messaging/python/messaging/CMakeLists.txt index a8310bdac45159970f2aa0df319b45234277c9d1..512552e97058e777c51410c2ff1d20449b931267 100644 --- a/LCS/Messaging/python/messaging/CMakeLists.txt +++ b/LCS/Messaging/python/messaging/CMakeLists.txt @@ -6,6 +6,7 @@ lofar_find_package(Python 3.4 REQUIRED) include(FindPythonModule) find_python_module(kombu REQUIRED) +find_python_module(requests REQUIRED) include(PythonInstall) diff --git a/LCS/Messaging/python/messaging/exceptions.py b/LCS/Messaging/python/messaging/exceptions.py index 32e837f3c3b8b92411bf8b2aaa8be833634121ac..dccad46686706943f94e7d805d5308e1a9994836 100644 --- a/LCS/Messaging/python/messaging/exceptions.py +++ b/LCS/Messaging/python/messaging/exceptions.py @@ -44,6 +44,13 @@ class MessageBusError(MessagingError): """ pass + +class MessagingRuntimeError(MessagingError, RuntimeError): + """ + A RuntimeError raises in the messaging classes, for example when starting a BusListener + """ + pass + class MessageFactoryError(MessagingError): """ Exception raised when the `MessageFactory` does not know how to create diff --git a/LCS/Messaging/python/messaging/messagebus.py b/LCS/Messaging/python/messaging/messagebus.py index 932cdc41011968a13e07ca58784229e0a5d49a08..283cca6c4ddd6347ab6a2b39403913ea25bd35d8 100644 --- a/LCS/Messaging/python/messaging/messagebus.py +++ b/LCS/Messaging/python/messaging/messagebus.py @@ -191,18 +191,24 @@ from lofar.messaging.messages import * from lofar.messaging.config import DEFAULT_BROKER, DEFAULT_BUSNAME, DEFAULT_PORT, DEFAULT_USER, DEFAULT_PASSWORD from lofar.common.threading_utils import TimeoutLock from lofar.common.util import program_name +from lofar.common.util import is_empty_function import kombu, kombu.exceptions, amqp.exceptions +import inspect +import re import uuid import threading from typing import Optional from datetime import datetime from queue import Empty as EmptyQueueError from socket import gaierror +import json +import requests import logging logger = logging.getLogger(__name__) + # some serializers are considered 'insecure', but we know better ;) # so enable the python pickle serializer kombu.enable_insecure_serializers(['pickle']) @@ -270,19 +276,24 @@ def delete_exchange(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEB raise MessagingError("Could not delete exchange %s on broker %s error=%s" % (name, broker, e)) -def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG) -> bool: +def create_queue(name: str, durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG, auto_delete: bool=False) -> bool: """ create a message queue with the given name on the given broker :param name: the name for the queue :param durable: if True, then the queue 'survives' broker restarts :param broker: a message broker address :param log_level: optional logging level (to add/reduce spamming) + :param auto_delete: if True, then the queue is automatically deleted when the last consumer disconnects. :raises: MessagingError if the queue could not be created :return True if created, False if not-created (because it already exists) """ try: with kombu.Connection(hostname=broker, port=DEFAULT_PORT, userid=DEFAULT_USER, password=DEFAULT_PASSWORD) as connection: - queue = kombu.Queue(name, durable=durable) + queue = kombu.Queue(name, + durable=durable, + auto_delete=auto_delete, + max_priority=9 # need to set max_priority to get a queue that respects priorities on messages. + ) try: queue.queue_declare(channel=connection.default_channel, passive=True) except amqp.exceptions.NotFound: @@ -316,6 +327,18 @@ def delete_queue(name: str, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG) raise MessagingError("Could not delete queue %s on broker %s error=%s" % (name, broker, e)) +def nr_of_messages_in_queue(queue_name: str, broker: str = DEFAULT_BROKER) -> int: + """get the number of messages in the queue""" + try: + # the kombu way of getting the number of messages via a passice queue_declare is not reliable... + # so, let's use the http REST API using request + url = "http://%s:15672/api/queues/%%2F/%s" % (broker, queue_name) + response = requests.get(url, auth=(DEFAULT_USER, DEFAULT_PASSWORD)) + queue_info = json.loads(response.text) + return queue_info.get('messages', 0) + except Exception as e: + return 0 + def create_binding(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG): """ create a binding between the exchange and queue, possibly filtered by the routing_key, on the given broker. @@ -347,7 +370,7 @@ def create_binding(exchange: str, queue: str, routing_key: str='#', durable: boo raise MessageBusError("Could not create binding from exchange %s to queue %s with routing_key %s " \ " on broker %s error=%s" % (exchange, queue, routing_key, broker, e)) -def create_bound_queue(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG): +def create_bound_queue(exchange: str, queue: str, routing_key: str='#', durable: bool=True, broker: str=DEFAULT_BROKER, log_level=logging.DEBUG, auto_delete: bool=False): """ create an exchange (if needed), queue (if needed), and the in-between binding, possibly filtered by the routing_key, on the given broker. :param exchange: the name for the exchange @@ -355,10 +378,11 @@ def create_bound_queue(exchange: str, queue: str, routing_key: str='#', durable: :param routing_key: filter only messages with the given routing_key to the queue :param durable: if True, then the queue 'survives' broker restarts :param broker: a message broker address + :param auto_delete: if True, then the queue is automatically deleted when the last consumer disconnects. :param log_level: optional logging level (to add/reduce spamming) """ create_exchange(exchange, durable=durable, broker=broker, log_level=log_level) - create_queue(queue, durable=durable, broker=broker, log_level=log_level) + create_queue(queue, durable=durable, broker=broker, log_level=log_level, auto_delete=auto_delete) create_binding(exchange, queue, routing_key, durable=durable, broker=broker, log_level=log_level) @@ -610,7 +634,7 @@ class FromBus(_AbstractBus): raise KeyError("Cannot find kombu message to ack for lofar message_id %s" % lofar_msg.id) if threading.current_thread().ident != thread_id: - raise RuntimeError("Cannot acknowledge messages across threads") + raise MessagingRuntimeError("Cannot acknowledge messages across threads") kombu_msg.ack(multiple=False) logger.debug("%s acknowledged %s", self, lofar_msg) @@ -627,18 +651,14 @@ class FromBus(_AbstractBus): raise KeyError("Cannot find kombu message to ack for lofar message_id %s" % lofar_msg.id) if threading.current_thread().ident != thread_id: - raise RuntimeError("Cannot reject messages across threads") + raise MessagingRuntimeError("Cannot reject messages across threads") kombu_msg.reject(requeue=False) logger.debug("%s rejected %s", self, lofar_msg) def nr_of_messages_in_queue(self) -> int: """get the number of waiting messages in the queue""" - try: - name, msg_count, consumer_count = self._receiver.queue.queue_declare(passive=True, channel=self._connection.default_channel) - return msg_count - except: - return 0 + return nr_of_messages_in_queue(self.queue, self.broker) def __str__(self): return "[FromBus] queue: %s on broker: %s #messages=%d" % (self.queue, self.broker, self.nr_of_messages_in_queue()) @@ -809,13 +829,20 @@ class TemporaryExchange: def __str__(self): return "TemporaryExchange address=%s" % self.address - def create_tobus(self): + def create_tobus(self) -> ToBus: """ Factory method to create a ToBus instance which is connected to this TemporaryExchange :return: ToBus """ return ToBus(broker=self.broker, exchange=self.address) + def create_temporary_queue(self) -> 'TemporaryQueue': + """ + Factory method to create a TemporaryQueue instance which is connected to this TemporaryExchange + :return: TemporaryQueue + """ + return TemporaryQueue(broker=self.broker, exchange=self.address) + class TemporaryQueue: """ @@ -922,11 +949,11 @@ class TemporaryQueue: self._bound_exchange = "exchange-for-" + self.address # create the tmp queue... - create_queue(self.address, broker=self.broker, durable=False) + create_queue(self.address, broker=self.broker, durable=False, auto_delete=True) - # create the exchange (if needed), and remember if we need to destoy it (if it was created) - self._created_exchange = create_exchange(self._bound_exchange, - broker=self.broker, durable=False) + # create the exchange (if needed), and remember if we need to destroy it (if it was created) + self._created_exchange = create_exchange(self._bound_exchange, broker=self.broker, + durable=False) # and finally create the binding # if no routing_key given, then use this tmp-queue's specific address as routing key @@ -1034,6 +1061,42 @@ class AbstractMessageHandler: def __str__(self): return self.__class__.__name__ + def is_empty_template_handler(self) -> bool: + """ + Test method to introspect if this handler instance is a template handler with only empty bodies. + Example: this is an empty template handler + class BaseTemplateHandler(AbstractMessageHandler): + def handle_message(self, msg: LofarMessage): + if 'foo' in msg.subject: + self.on_foo() + + def on_foo(self): + pass + :return: + """ + + # introspection magic to get all methods called from within this handler's handle_message method. + r1 = re.compile("self\.on(.)+\(.*\)") + r2 = re.compile("\(.*") + called_template_methods_from_handle_message = set([r2.sub("", l.strip().replace("self.","")) for l in inspect.getsource(self.handle_message).split('\n') if r1.search(l)]) + + for method_name, member_func in inspect.getmembers(self, inspect.ismethod): + if method_name in called_template_methods_from_handle_message: + if not is_empty_function(member_func): + # this method is called from within the handler's handle_message method, + # but it's not empty, so return False. + # This means that we have at least one non-empty method being call from within this handler. + return False + + if len(called_template_methods_from_handle_message) == 0: + # this handler does not call any methods from within handle_message, so it's not an empty-template-handler + return False + + # There are methods being called from within the handler's handle_message method, + # and they are all empty bodied, so return True + return True + + class UsingToBusMixin: """It is quite common to have a message handler which sends out messages itself. You would need a ToBus in your handler for that. For code re-use, we provide this mixin class with a self._tobus member, which is ready for use in your AbstractMessageHandler sub-class implementation. @@ -1251,13 +1314,20 @@ class BusListener: self._running.set() for i in range(self._num_threads): thread_name = "ListenerThread_%s_%d" % (self.address, i) + thread_started_event = threading.Event() thread = threading.Thread(target=self._listen_loop, - name=thread_name) + name=thread_name, + kwargs={'thread_started_event':thread_started_event}) self._threads.append(thread) thread.start() - if not thread.is_alive(): - raise MessagingError("Could not start listener thread: %s" % thread_name) + # check if the _listen_loop was started successfully + logger.debug("waiting for thread %s to be running...", thread_name) + if not (thread_started_event.wait(timeout=10) and thread.is_alive()): + msg = "Could not fully start listener thread: %s" % (thread_name,) + logger.error(msg) + raise MessagingRuntimeError(msg) + logger.debug("thread %s is running", thread_name) def stop_listening(self): """ @@ -1273,16 +1343,27 @@ class BusListener: self._running.clear() for thread in self._threads: - logger.debug("STOPPING %s on thread '%s'", self, thread.name) - thread.join() - logger.info("STOPPED %s on thread '%s'", self, thread.name) + try: + logger.debug("STOPPING %s on thread '%s'", self, thread.name) + thread.join() + self._threads.remove(thread) + logger.info("STOPPED %s on thread '%s'", self, thread.name) + except Exception as e: + logger.exception("Could not stop thread %s: %s", thread.name, e) def __enter__(self) -> 'BusListener': """enter the context, and make the bus_listener start listening. :return self """ - self.start_listening() - return self + try: + self.start_listening() + return self + except Exception as e: + # __exit__ (and hence stop_listening) is not called when an exception is raised in __enter__ + # so, do our own cleanup (log, stop_listening and re-raise). + logger.exception("%s error: %s", self, e) + self.stop_listening() + raise def __exit__(self, exc_type, exc_val, exc_tb): """leave the context, and make the bus_listener stop listening. @@ -1304,9 +1385,14 @@ class BusListener: except AttributeError: pass + if handler_instance.is_empty_template_handler(): + error_msg = "%s is an empty template handler, so no concrete handler methods will be called. Please provide a concrete implementation." % (self._handler_type.__name__,) + logger.error(error_msg) + raise TypeError(error_msg) + return handler_instance - def _listen_loop(self): + def _listen_loop(self, thread_started_event: threading.Event): """ Internal use only. Message listener loop that receives messages and starts the attached function with the message content as argument. """ @@ -1317,6 +1403,11 @@ class BusListener: with self._create_handler() as thread_handler: with FromBus(self.address, broker=self.broker) as receiver: logger.info("STARTED %s on thread '%s' ", self, threading.currentThread().name) + + # notify the thread starter that we successfully started the listen loop + thread_started_event.set() + + # keep running and handling .... while self.is_running(): try: thread_handler.before_receive_message() @@ -1410,7 +1501,8 @@ class BusListenerJanitor: # do not expose create/delete_queue/exchange etc methods in all, it's not part of the public API __all__ = ['DEFAULT_BUS_TIMEOUT', 'FromBus', 'ToBus', 'BusListener', 'BusListenerJanitor', - 'TemporaryQueue', 'TemporaryExchange', 'AbstractMessageHandler', 'UsingToBusMixin'] + 'TemporaryQueue', 'TemporaryExchange', 'AbstractMessageHandler', 'UsingToBusMixin', + 'nr_of_messages_in_queue'] if __name__ == "__main__": logging.basicConfig(format='%(levelname)s %(message)s', level=logging.DEBUG) diff --git a/LCS/Messaging/python/messaging/messages.py b/LCS/Messaging/python/messaging/messages.py index 1017d3ed53ca2bafad6d7583c5db9ce9d0e43644..5e912b8d722e0bf60eac0d4f86289f5713d01610 100644 --- a/LCS/Messaging/python/messaging/messages.py +++ b/LCS/Messaging/python/messaging/messages.py @@ -106,13 +106,16 @@ class MessageFactory: @staticmethod def create_lofar_message_from_kombu_message(kombu_msg): - message_type_name = kombu_msg.headers['MessageType'] + try: + message_type_name = kombu_msg.headers['MessageType'] - if message_type_name in MessageFactory._registry: - kombu2lofar_convert_method = MessageFactory._registry[message_type_name] - return kombu2lofar_convert_method(kombu_msg) + if message_type_name in MessageFactory._registry: + kombu2lofar_convert_method = MessageFactory._registry[message_type_name] + return kombu2lofar_convert_method(kombu_msg) - raise MessageFactoryError("Unable to create LofarMessage of type %s for kombu-msg: %s" % (message_type, kombu_msg)) + raise MessageFactoryError("Unable to create LofarMessage of type %s for kombu-msg: %s" % (message_type_name, kombu_msg)) + except Exception as e: + raise MessageFactoryError("Unable to create LofarMessage for kombu-msg: %s. error=%s" % (kombu_msg, str(e))) class EventMessage(LofarMessage): """ @@ -129,7 +132,8 @@ class EventMessage(LofarMessage): MessageFactory.register(EventMessage, lambda kombu_msg : EventMessage(content=kombu_msg.payload, subject=kombu_msg.headers.get('Subject'), - id=kombu_msg.headers.get('MessageId'))) + id=kombu_msg.headers.get('MessageId'), + priority=kombu_msg.properties.get('priority',4))) class CommandMessage(LofarMessage): """ @@ -147,6 +151,7 @@ class CommandMessage(LofarMessage): MessageFactory.register(CommandMessage, lambda kombu_msg : CommandMessage(content=kombu_msg.payload, subject=kombu_msg.headers.get('Subject'), - id=kombu_msg.headers.get('MessageId'))) + id=kombu_msg.headers.get('MessageId'), + priority=kombu_msg.properties.get('priority',4))) __all__ = ['LofarMessage', 'EventMessage', 'CommandMessage', 'MessageFactory'] \ No newline at end of file diff --git a/LCS/Messaging/python/messaging/rpc.py b/LCS/Messaging/python/messaging/rpc.py index cb6864e356e383855857098b0344885d010a17d1..abc063548618d9b3a4a805671dcb930280886aa5 100644 --- a/LCS/Messaging/python/messaging/rpc.py +++ b/LCS/Messaging/python/messaging/rpc.py @@ -104,7 +104,8 @@ class RequestMessage(LofarMessage): MessageFactory.register(RequestMessage, lambda kombu_msg : RequestMessage(reply_to=kombu_msg.properties.get('reply_to'), subject=kombu_msg.headers.get('Subject'), - id=kombu_msg.headers.get('MessageId')).with_args_kwargs( + id=kombu_msg.headers.get('MessageId'), + priority=kombu_msg.properties.get('priority',4)).with_args_kwargs( *kombu_msg.payload.get('args', []), **kombu_msg.payload.get('kwargs', {}))) @@ -139,6 +140,7 @@ MessageFactory.register(ReplyMessage, content=kombu_msg.payload, subject=kombu_msg.headers.get('Subject'), id=kombu_msg.headers.get('MessageId'), + priority=kombu_msg.properties.get('priority',4), error_message=kombu_msg.headers.get('error_message'))) class ServiceMessageHandler(UsingToBusMixin, AbstractMessageHandler): diff --git a/LCS/Messaging/python/messaging/test/t_messagebus.py b/LCS/Messaging/python/messaging/test/t_messagebus.py index 66dd7f8ce8bd62337f13b3cd75a7438d4d54a2fa..a7c7ebb440c317c78082e79763d5a3ed40c17daf 100644 --- a/LCS/Messaging/python/messaging/test/t_messagebus.py +++ b/LCS/Messaging/python/messaging/test/t_messagebus.py @@ -34,7 +34,7 @@ from lofar.messaging.messagebus import * from lofar.messaging.messagebus import can_connect_to_broker from lofar.messaging.messagebus import create_queue, create_exchange, create_binding, create_bound_queue, delete_exchange, delete_queue from lofar.messaging.rpc import RequestMessage -from lofar.messaging.exceptions import MessageBusError +from lofar.messaging.exceptions import MessageBusError, MessagingRuntimeError from lofar.common.datetimeutils import round_to_millisecond_precision from time import sleep from threading import Lock @@ -43,10 +43,6 @@ logger = logging.getLogger(__name__) TIMEOUT = 1.0 -if not can_connect_to_broker(): - print("Cannot connect to default rabbitmq broker. Skipping test.") - exit(3) - class TestCreateDeleteFunctions(unittest.TestCase): """Test the various create/delete exchange/queue/binding funcions""" @@ -404,8 +400,19 @@ class SendReceiveMessage(unittest.TestCase): :return the received message """ with self.tobus, self.frombus: + self.assertEqual(0, self.frombus.nr_of_messages_in_queue()) self.tobus.send(send_msg) + + # wait (a little) until the broker routed the message to the queue + start_wait = datetime.utcnow() + while self.frombus.nr_of_messages_in_queue() != 1: + sleep(0.05) + if (datetime.utcnow() - start_wait).total_seconds() > 1.0: + break + + self.assertEqual(1, self.frombus.nr_of_messages_in_queue()) recv_msg = self.frombus.receive(timeout=TIMEOUT) + self.assertEqual(0, self.frombus.nr_of_messages_in_queue()) self.assertEqual(type(send_msg), type(recv_msg)) self.assertEqual(send_msg.id, recv_msg.id) @@ -488,6 +495,24 @@ class SendReceiveMessage(unittest.TestCase): my_dict=my_dict)) self.assertEqual(my_dict, recv_msg.content['kwargs']['my_dict']) +class PriorityTest(unittest.TestCase): + def test_priority(self): + with TemporaryExchange(self.__class__.__name__) as tmp_exchange: + with tmp_exchange.create_temporary_queue() as tmp_queue: + msg1 = EventMessage(priority=4, subject="some.event", content=1) + msg2 = EventMessage(priority=5, subject="some.event", content=2) + + with tmp_exchange.create_tobus() as tobus: + tobus.send(msg1) + tobus.send(msg2) + + with tmp_queue.create_frombus() as frombus: + result_msg1 = frombus.receive() + result_msg2 = frombus.receive() + + # message with highest priority should arrive first + self.assertEqual(msg1.id, result_msg2.id) + self.assertEqual(msg2.id, result_msg1.id) class Rejector(BusListener): handled_messages = 0 @@ -520,11 +545,11 @@ class RejectorTester(unittest.TestCase): spammer.send(msg) while rejector.handled_messages < number_of_messages: - print("Handled messages: {}".format(rejector.handled_messages)) + logger.info("Handled messages: {}".format(rejector.handled_messages)) sleep(1) with FromBus(rejector.address) as frombus: - print("Number of messages on queue: {}".format(frombus.nr_of_messages_in_queue())) + logger.info("Number of messages on queue: {}".format(frombus.nr_of_messages_in_queue())) self.assertEqual(0, frombus.nr_of_messages_in_queue()) @@ -650,6 +675,87 @@ class PingPongTester(unittest.TestCase): player1_num_turns, NUM_TURNS, player2_num_turns, NUM_TURNS, num_threads_per_player, 2*NUM_TURNS/(datetime.utcnow() - start_timestamp).total_seconds()) + +class MessageHandlerTester(unittest.TestCase): + def test_handler_init_raises(self): + # define a MessageHandler that raises on init + class RaisingHandler(AbstractMessageHandler): + def __init__(self): + raise Exception("intentional test exception") + + # try to start a BusListener using this handler. Should fail and raise a MessagingRuntimeError + with TemporaryExchange(self.__class__.__name__) as tmp_exchange: + with self.assertRaises(MessagingRuntimeError): + with BusListenerJanitor(BusListener(handler_type=RaisingHandler, + exchange=tmp_exchange.address)) as listener: + pass + + def test_empty_template_handler(self): + # define a MessageHandler with a template for callback on<something> methods + class BaseTemplateHandler(AbstractMessageHandler): + def handle_message(self, msg: LofarMessage): + if 'foo' in msg.subject: + self.on_foo() + if 'bar' in msg.subject: + self.on_bar(msg.content) + + def on_foo(self): + pass + + def on_bar(self, some_arg): + pass + + self.assertTrue(BaseTemplateHandler().is_empty_template_handler()) + + class ConcreteHandler1(BaseTemplateHandler): + def on_foo(self): + return 42 + + self.assertFalse(ConcreteHandler1().is_empty_template_handler()) + + class ConcreteHandler2(BaseTemplateHandler): + def on_bar(self, some_arg): + if some_arg: + return 3.14 + return 2.71 + + self.assertFalse(ConcreteHandler2().is_empty_template_handler()) + + class SimpleNonTemplateHandler(AbstractMessageHandler): + def handle_message(self, msg: LofarMessage): + if 'foo' in msg.subject: + return 42 + elif 'bar' in msg.subject: + if msg.content: + return 3.14 + return 2.71 + + self.assertFalse(SimpleNonTemplateHandler().is_empty_template_handler()) + + + def test_empty_template_handler_raises(self): + # define a MessageHandler with a template for callback on<something> methods + class BaseTemplateHandler(AbstractMessageHandler): + def handle_message(self, msg: LofarMessage): + if 'foo' in msg.subject: + self.on_foo() + if 'bar' in msg.subject: + self.on_bar(msg.content) + + def on_foo(self): + pass + + def on_bar(self, some_arg): + pass + + # try to start a BusListener using a BaseTemplateHandler. Should fail and raise a TypeError + with TemporaryExchange(self.__class__.__name__) as tmp_exchange: + with self.assertRaises(RuntimeError): + with BusListenerJanitor(BusListener(handler_type=BaseTemplateHandler, + exchange=tmp_exchange.address)) as listener: + pass + + def load_tests(loader, tests, ignore): """add the doctests from lofar.messaging.messagebus to the unittest tests""" import doctest @@ -658,12 +764,10 @@ def load_tests(loader, tests, ignore): return tests if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.INFO) + logging.basicConfig(format='%(asctime)s %(thread)d %(threadName)s %(levelname)s %(message)s', level=logging.DEBUG) - try: - with ToBus(): - pass - except: + if not can_connect_to_broker(): + logger.error("Cannot connect to default rabbitmq broker. Skipping test.") exit(3) unittest.main() diff --git a/LCS/PyCommon/subprocess_utils.py b/LCS/PyCommon/subprocess_utils.py index f2a486bb58bc6d6f512d1ed4b73a5b9e79f4acd4..35bf9ef5a053cb3f43bee5de679ce5e97c13438b 100644 --- a/LCS/PyCommon/subprocess_utils.py +++ b/LCS/PyCommon/subprocess_utils.py @@ -1,7 +1,7 @@ import logging from datetime import datetime, timedelta from time import sleep -from threading import Thread +from threading import Thread, Event from subprocess import Popen, PIPE, check_output from collections import namedtuple try: @@ -103,20 +103,47 @@ def execute_in_parallel(cmd_lists, timeout=3600, max_parallel=32): class PipeReader: ''' - helper class to do non-blocking readline calls to a subprocess stdput or stderr pipe. + helper class to do non-blocking readline calls to a subprocess stdout or stderr pipe. ''' - def __init__(self, pipe): + def __init__(self, pipe, name=None): self.__line_buffer = '' + self.__name = name + self.__pipe = pipe self.__queue = Queue() - self.__thread = Thread(target=PipeReader.enqueue_output, args=(pipe, self.__queue)) + self.__stop_event = Event() + self.__thread = None + + def start(self): + self.stop() + + self.__thread = Thread(target=self.__enqueue_output, + name='PipeReader_thread_%s' % (self.__name,)) self.__thread.daemon = True # thread dies with the program + logger.debug("starting %s", self.__thread.name) self.__thread.start() - @staticmethod - def enqueue_output(pipe, queue): + def stop(self): + if self.__thread and self.__thread.is_alive(): + logger.debug("stopping %s", self.__thread.name) + self.__stop_event.set() + self.__thread.join() + logger.info("stopped %s", self.__thread.name) + self.__thread = None + + def __enter__(self): + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + + def __enqueue_output(self): try: - for line in iter(pipe.readline, b''): - queue.put(line) + for line in iter(self.__pipe.readline, b''): + self.__queue.put(line) + if self.__stop_event.is_set(): + self.__stop_event.clear() + break except Exception as e: logger.error(e) diff --git a/LCS/PyCommon/util.py b/LCS/PyCommon/util.py index 843cfe2a0d0effd883e18eabc96ea1f1da48e26c..4ef79bc26efbd16c9a401f55c146b141f1e2b84d 100644 --- a/LCS/PyCommon/util.py +++ b/LCS/PyCommon/util.py @@ -164,4 +164,18 @@ def program_name(include_extension=True): name = os.path.basename(os.path.realpath(sys.argv[0])) if not include_extension: name, extension = os.path.splitext(name) - return name \ No newline at end of file + return name + + +def is_empty_function(func): + """returns True when the given function/method 'func' has an empty 'pass' body (ignoring optional docstrings)""" + def __empty_func(): + pass + + def __empty_func_with_doc(): + """Empty function with docstring.""" + pass + + return (func.__code__.co_code == __empty_func.__code__.co_code or \ + func.__code__.co_code == __empty_func_with_doc.__code__.co_code) and \ + func.__code__.co_consts[-1] == None diff --git a/LCU/StationTest/modules/mep.py b/LCU/StationTest/modules/mep.py index 3658b48124d0871073fcf7d3fc272d99276406ef..92acf1005f0489a7f8f4d4233168b77e9d690bd6 100755 --- a/LCU/StationTest/modules/mep.py +++ b/LCU/StationTest/modules/mep.py @@ -216,7 +216,7 @@ class MepMessage: def lenPayload(self): """Return length of the hex payload string in nof bytes """ - return len(self.hexPayload)/self.c_hw + return len(self.hexPayload) // self.c_hw def getPayload(self, offset, nof): """Return hex payload string diff --git a/LCU/StationTest/pps.py b/LCU/StationTest/pps.py index fc0c211dc8f1a7754d818584196a1c76c077a2e9..e39c0ab079cb2b45942a5d48a15f8f3509c5d100 100755 --- a/LCU/StationTest/pps.py +++ b/LCU/StationTest/pps.py @@ -169,7 +169,7 @@ def CheckClkSpeed(): # ## def DelayResetRise(): - res = os.popen3('python verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge r --pps_delay 0' % (RspBrd,)) + res = os.popen3('python3 verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge r --pps_delay 0' % (RspBrd,)) time.sleep(1) return @@ -178,7 +178,7 @@ def DelayResetRise(): # ## def DelayRise(): - res = os.popen3('python verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge r --pps_delay 1' % (RspBrd,)) + res = os.popen3('python3 verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge r --pps_delay 1' % (RspBrd,)) time.sleep(1) return @@ -187,7 +187,7 @@ def DelayRise(): # ## def DelayResetFall(): - res = os.popen3('python verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge f --pps_delay 0' % (RspBrd,)) + res = os.popen3('python3 verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge f --pps_delay 0' % (RspBrd,)) time.sleep(1) return @@ -196,7 +196,7 @@ def DelayResetFall(): # ## def DelayFall(): - res = os.popen3('python verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge f --pps_delay 1' % (RspBrd,)) + res = os.popen3('python3 verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge f --pps_delay 1' % (RspBrd,)) time.sleep(1) return diff --git a/LCU/StationTest/pps_int.py b/LCU/StationTest/pps_int.py index d4e00b26b47a5c61e7b379a829ca918e32d6430a..1601c77278101e61daecbda33078b5127a5ec573 100755 --- a/LCU/StationTest/pps_int.py +++ b/LCU/StationTest/pps_int.py @@ -183,7 +183,7 @@ def CheckClkSpeed(): # ## def DelayResetRise(): - res = os.popen3('python verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge r --pps_delay 0' % (RspBrd,)) + res = os.popen3('python3 verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge r --pps_delay 0' % (RspBrd,)) time.sleep(1) return @@ -192,7 +192,7 @@ def DelayResetRise(): # ## def DelayRise(): - res = os.popen3('python verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge r --pps_delay 1' % (RspBrd,)) + res = os.popen3('python3 verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge r --pps_delay 1' % (RspBrd,)) time.sleep(1) return @@ -201,7 +201,7 @@ def DelayRise(): # ## def DelayResetFall(): - res = os.popen3('python verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge f --pps_delay 0' % (RspBrd,)) + res = os.popen3('python3 verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge f --pps_delay 0' % (RspBrd,)) time.sleep(1) return @@ -210,7 +210,7 @@ def DelayResetFall(): # ## def DelayFall(): - res = os.popen3('python verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge f --pps_delay 1' % (RspBrd,)) + res = os.popen3('python3 verify.py --brd %s --fpga blp0,blp1,blp2,blp3 --te tc/sync_delay.py --pps_edge f --pps_delay 1' % (RspBrd,)) time.sleep(1) return diff --git a/LCU/StationTest/test/hbatest/hba_new_address.sh b/LCU/StationTest/test/hbatest/hba_new_address.sh index 1c8688e663b078ee8cf7d767733a6196157b198b..9837b85b8a5892a757a47bcdbe3c1de9f90ddff6 100755 --- a/LCU/StationTest/test/hbatest/hba_new_address.sh +++ b/LCU/StationTest/test/hbatest/hba_new_address.sh @@ -29,7 +29,7 @@ fi echo "" echo "Try to read the old HBA server address" $newAddr "to check that it is present in the HBA tile (should go PASSED)." -python $STATIONTESTROOT/verify.py --brd rsp$rspNr --fp blp$blpNr -v $vb --te $STATIONTESTROOT/tc/hba_server.py --server $oldAddr --server_access uc --server_function gb --server_reg address --data $oldAddr +python3 $STATIONTESTROOT/verify.py --brd rsp$rspNr --fp blp$blpNr -v $vb --te $STATIONTESTROOT/tc/hba_server.py --server $oldAddr --server_access uc --server_function gb --server_reg address --data $oldAddr echo "" diff --git a/LCU/StationTest/test/subracktest/subrack_production.py b/LCU/StationTest/test/subracktest/subrack_production.py index 539387f899923a7b49f27f3e26c39edad282e174..8085e99c08a174b1e459eeb663207111349a2c0b 100755 --- a/LCU/StationTest/test/subracktest/subrack_production.py +++ b/LCU/StationTest/test/subracktest/subrack_production.py @@ -126,7 +126,7 @@ sr.setId('RCU-HBA modem - ') sr.appendLog(21, '') sr.appendLog(21, '### Verify the control modem on the RCU') sr.appendLog(21, '') -res = cli.command('python verify.py --brd rsp0,rsp1,rsp2,rsp3 --fpga blp0,blp1,blp2,blp3 --rep 1 -v 11 --te tc/hba_client.py --client_access r --client_reg version --data 10') +res = cli.command('python3 verify.py --brd rsp0,rsp1,rsp2,rsp3 --fpga blp0,blp1,blp2,blp3 --rep 1 -v 11 --te tc/hba_client.py --client_access r --client_reg version --data 10') if res.find('wrong') == -1: sr.appendLog(11, '>>> RCU-HBA modem test went OK') sr.appendFile(21, 'tc/hba_client.log') @@ -142,7 +142,7 @@ sr.setId('SPU status - ') sr.appendLog(21, '') sr.appendLog(21, '### Verify the RSP - SPU I2C interface by reading the SPU sensor data') sr.appendLog(21, '') -res = cli.command('python i2c_spu.py --sub sub0 --rep 1 -v 11') +res = cli.command('python3 i2c_spu.py --sub sub0 --rep 1 -v 11') if res.find('FAILED') == -1: sr.appendLog(11, '>>> RSP - SPU I2c interface test went OK') else: @@ -158,7 +158,7 @@ sr.setId('TD status - ') sr.appendLog(21, '') sr.appendLog(21, '### Verify the RSP - TD I2C interface by reading the TD sensor data') sr.appendLog(21, '') -res = cli.command('python i2c_td.py --brd rsp0') +res = cli.command('python3 i2c_td.py --brd rsp0') if res.find('FAILED') == -1: sr.appendLog(11, '>>> RSP - TD I2c interface test went OK') else: @@ -174,7 +174,7 @@ sr.setId('Build In Self Test -') sr.appendLog(21, '') sr.appendLog(21, '### Build In Self Test (BIST)') sr.appendLog(21, '') -res = cli.command('python verify.py --brd rsp0,rsp1,rsp2,rsp3 --rep 1 -v 21 --te tc/bist.py') +res = cli.command('python3 verify.py --brd rsp0,rsp1,rsp2,rsp3 --rep 1 -v 21 --te tc/bist.py') if res.find('wrong') == -1: sr.appendLog(11, '>>> BIST went OK') sr.appendLog(21, 'tc/bist.log') @@ -190,7 +190,7 @@ sr.setId('RCU-RSP - ') sr.appendLog(21, '') sr.appendLog(21, '### Verify the RCU -> RSP LVDS interfaces by capturing pseudo random data on RSP') sr.appendLog(21, '') -res = cli.command('python verify.py --brd rsp0,rsp1,rsp2,rsp3 --fpga blp0,blp1,blp2,blp3 --pol x,y --rep 1 -v 11 --te tc/prsg.py') +res = cli.command('python3 verify.py --brd rsp0,rsp1,rsp2,rsp3 --fpga blp0,blp1,blp2,blp3 --pol x,y --rep 1 -v 11 --te tc/prsg.py') if res.find('wrong') == -1: sr.appendLog(11, '>>> RCU-RSP interface test went OK') sr.appendFile(21, 'tc/prsg.log') diff --git a/LCU/checkhardware/show_test_result.py b/LCU/checkhardware/show_test_result.py index 668300b7436d1a16bd8a1832aa67a8d8986a0026..18c04be97e7ec5bc4292942cfc8ccf3b2c368a0b 100755 --- a/LCU/checkhardware/show_test_result.py +++ b/LCU/checkhardware/show_test_result.py @@ -4,7 +4,7 @@ """ import sys import os -import string +# import string # import datetime # import time @@ -87,7 +87,7 @@ def main(): banner_len = 100 msg_len = len(message) print("-" * banner_len) - print(">" * ((banner_len - msg_len - 6) / 2) + " %s " % message + "<" * ((banner_len - msg_len - 6) / 2)) + print(">" * ((banner_len - msg_len - 6) // 2) + " %s " % message + "<" * ((banner_len - msg_len - 6) // 2)) print("-" * banner_len) check_nr = int(args.get('%s' % check_type, '1')) - 1 @@ -127,7 +127,7 @@ def main(): rcu_y = partnumber * 2 + 1 msg = d[3].strip() - msg_info = string.join(d[4:], " ") + msg_info = " ".join(d[4:]) keyvalue = dict() for i in range(4, len(d)): if d[i].find('=') != -1: @@ -269,9 +269,9 @@ def print_info(msg, keyvalue, msg_info): if 'TBC' in checks: info2.append('TB-board-checks') if len(info1) or len(info2): - print("-- Checks done : %s" % string.join(info1, ', ')) + print("-- Checks done : %s" % ", ".join(info1)) if len(info2): - print(" : %s" % string.join(info2, ', ')) + print(" : %s" % ", ".join(info2)) info = [] for mode in '1234567': if 'M%s' % mode in checks: @@ -296,7 +296,7 @@ def print_info(msg, keyvalue, msg_info): if 'E%s' % mode in checks: info.append('Elements') if len(info): - print("-- Checks done M%s : %s" % (mode, string.join(info, ', '))) + print("-- Checks done M%s : %s" % (mode, ", ".join(info))) info = [] if msg == 'STATISTICS': diff --git a/LTA/LTAIngest/LTAIngestServer/CMakeLists.txt b/LTA/LTAIngest/LTAIngestServer/CMakeLists.txt index 2d09d6d02b3f1049c089566f0a821f57b0ff9a9d..3563f05792813e6d40a329894cac07221103a4fa 100644 --- a/LTA/LTAIngest/LTAIngestServer/CMakeLists.txt +++ b/LTA/LTAIngest/LTAIngestServer/CMakeLists.txt @@ -3,4 +3,4 @@ lofar_add_package(LTAIngestAdminServer) lofar_add_package(LTAIngestTransferServer) lofar_add_package(LTAIngestWebServer) -lofar_package(LTAIngestServer 2.0 DEPENDS LTAIngestAdminServer LTAIngestTransferServer DataManagement MoMQueryService MessageLogger OTDB_Services ltastorageoverview ResourceAssignmentEditor) +lofar_package(LTAIngestServer 2.0 DEPENDS LTAIngestAdminServer LTAIngestTransferServer) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py index 1756c05e6570e82e75780d7e400b8a64c45ed509..6095fecc40f1424afaa5a1806f291458b1a362a3 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestAdminServer/lib/ingestjobmanagementserver.py @@ -25,7 +25,8 @@ from lofar.messaging.config import DEFAULT_BUSNAME, DEFAULT_BROKER from lofar.lta.ingest.common.config import INGEST_NOTIFICATION_PREFIX from lofar.lta.ingest.server.config import DEFAULT_INGEST_SERVICENAME, DEFAULT_INGEST_INCOMING_JOB_SUBJECT, DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT from lofar.lta.ingest.server.config import JOBS_DIR, MAX_NR_OF_RETRIES, FINISHED_NOTIFICATION_MAILING_LIST, FINISHED_NOTIFICATION_BCC_MAILING_LIST, DEFAULT_JOB_PRIORITY -from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, RPCService, ServiceMessageHandler, AbstractMessageHandler, BusListener +from lofar.messaging import LofarMessage, CommandMessage, EventMessage, ToBus, RPCService, ServiceMessageHandler, AbstractMessageHandler, BusListener, adaptNameToEnvironment +from lofar.messaging.messagebus import nr_of_messages_in_queue from lofar.common.util import humanreadablesize from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC @@ -383,7 +384,10 @@ class IngestJobManager: job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['started_at'] = datetime.utcnow() if new_status == JobProduced or new_status == JobTransferFailed: - job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow() + try: + job_admin_dict['runs'][job_admin_dict.get('retry_attempt', 0)]['finished_at'] = datetime.utcnow() + except: + pass if lta_site: job_admin_dict['lta_site'] = lta_site @@ -616,6 +620,13 @@ class IngestJobManager: if jad_a.get('job_group_id', 0) > jad_b.get('job_group_id', 0): return 1 + # everything above equal? sort on next sort criterion, the dataproduct name, + # which in effect sorts on otdb id first, and then on subband + if jad_a.get('dataproduct', '') < jad_b.get('dataproduct', ''): + return -1 + if jad_a.get('dataproduct', '') > jad_b.get('dataproduct', ''): + return 1 + # everything above equal? sort on next sort criterion, the updated_at timestamp # least recent updated_at job goes first # if no updated_at timestamp available, use 'now' @@ -729,8 +740,12 @@ class IngestJobManager: def canProduceNextJob(self): # test if the managed_job_queue is empty enough, and if our administration agrees try: - scheduled_jads = self.getJobAdminDicts(status=JobScheduled) - return len(scheduled_jads) < 5 + if len(self.getJobAdminDicts(status=JobScheduled) > 0: + return False + + # HACK: hardcoded queue name + job_for_transfer_queue_name = adaptNameToEnvironment("lofar.queue.for.ingesttransferserver.BusListener.on.LTA.Ingest.job_for_transfer") + return nr_of_messages_in_queue(job_for_transfer_queue_name, self._tobus.broker) == 0 except Exception as e: logger.exception('canProduceNextJob: %s', e) if 'No active session' in str(e): @@ -742,25 +757,21 @@ class IngestJobManager: start_producing_timestamp = datetime.utcnow() while self.canProduceNextJob() and datetime.utcnow() - start_producing_timestamp < timedelta(seconds=5): job_admin_dict = self.getNextJobToRun() - if job_admin_dict: - if os.path.exists(job_admin_dict.get('path')): - msg = CommandMessage(content=job_admin_dict.get('job_xml'), subject=DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT) - msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY) - self._tobus.send(msg) - logger.info('submitted job %s to exchange \'%s\' at %s', job_admin_dict['job']['JobId'], self._tobus.exchange, self._tobus.broker) - self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled) - else: - job_id = job_admin_dict['job']['JobId'] - logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path')) - del self.__job_admin_dicts[job_id] - - # do a little sleep to allow the ingesttransferserver consumers to pick up the submitted job - # so the queue is empty again - # and we can submit yet another job - time.sleep(0.1) - else: + if not job_admin_dict: return + if os.path.exists(job_admin_dict.get('path')): + self.updateJobStatus(job_admin_dict['job']['JobId'], JobScheduled) + + msg = CommandMessage(content=job_admin_dict.get('job_xml'), subject=DEFAULT_INGEST_JOB_FOR_TRANSFER_SUBJECT, ttl=60) + msg.priority = job_admin_dict['job'].get('priority', DEFAULT_JOB_PRIORITY) + self._tobus.send(msg) + logger.info('submitted job %s to exchange \'%s\' at %s', job_admin_dict['job']['JobId'], self._tobus.exchange, self._tobus.broker) + else: + job_id = job_admin_dict['job']['JobId'] + logger.warning('job file for %s is not on disk at %s anymore. removing job from todo list', job_id, job_admin_dict.get('path')) + del self.__job_admin_dicts[job_id] + # rate limit at 10 jobs/sec time.sleep(0.1) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/CMakeLists.txt b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/CMakeLists.txt index 758be518df8a86c03675876644dcfb2350edd099..cbc1b652e911011f0a21b1744d0c58faf8ac917f 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/CMakeLists.txt +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/CMakeLists.txt @@ -1,5 +1,5 @@ -find_python_module(mechanize REQUIRED) #sudo pip3 install mechanize +find_python_module(requests REQUIRED) #sudo pip3 install requests python_install(ingesttransferserver.py ltacp.py diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py index 3e8cbc60c21224ee79adff59db55483ecdb28d3c..f0762925a11a9f34a07c4167942991d4f509fe55 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ingesttransferserver.py @@ -86,7 +86,7 @@ class IngestTransferServer: self.__running_jobs_log_timestamp = datetime.utcnow() def start_job(self, job_dict): - if not self.__enoughResourcesAvailable(): + if not self.enoughResourcesAvailable(): raise ResourceWarning("Not enough resources available to start new job: %s" % job_dict) job_id = job_dict['JobId'] @@ -96,8 +96,8 @@ class IngestTransferServer: def threaded_pipeline_func(job): logger.info('starting job %s in the background', job_id) - ltaClient = LTAClient(self.lta_credentials.user, self.lta_credentials.password) - with MoMClient(self.mom_credentials.user, self.mom_credentials.password) as momClient: + with LTAClient(self.lta_credentials.user, self.lta_credentials.password) as ltaClient, \ + MoMClient(self.mom_credentials.user, self.mom_credentials.password) as momClient: jobPipeline = IngestPipeline(job, momClient, ltaClient, exchange = self.event_bus.exchange, broker = self.event_bus.broker, @@ -108,7 +108,9 @@ class IngestTransferServer: jobPipeline.run() with self.__lock: - thread = Thread(target = threaded_pipeline_func, args = [job_dict]) + thread = Thread(target = threaded_pipeline_func, + args = [job_dict], + name="transfer_thread_%s"%(job_id,)) thread.daemon = True self.__running_jobs[job_id] = { 'thread':thread } thread.start() @@ -124,7 +126,7 @@ class IngestTransferServer: except Exception as e: logger.error('__clearFinishedJobs: %s', e) - def __enoughResourcesAvailable(self): + def enoughResourcesAvailable(self): try: now = datetime.utcnow() bytes_sent = _getBytesSent() @@ -288,6 +290,11 @@ class IngestJobsForTransferHandler(AbstractMessageHandler): self._transfer_server = transfer_server super(IngestJobsForTransferHandler, self).__init__() + def before_receive_message(self): + while not self._transfer_server.enoughResourcesAvailable(): + logger.info("Waiting for resources to become available before receiving a new job...") + time.sleep(10) + def handle_message(self, msg: LofarMessage): if not isinstance(msg, CommandMessage): raise ValueError("%s: Ignoring non-CommandMessage: %s" % (self.__class__.__name__, msg)) @@ -301,7 +308,7 @@ class IngestJobsForTransferHandler(AbstractMessageHandler): # so jobs have a little time to start consuming resources # this limits the numer of jobs that can be started to 1000 starts per minute # it does not limit the total number of parallel jobs - # that is limited dynamically by __enoughResourcesAvailable + # that is limited dynamically by enoughResourcesAvailable # and by the hard limit self.max_nr_of_parallel_jobs time.sleep(0.1) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltaclient.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltaclient.py index 78215e77791372c6900b73f0b3915fde0f287297..2acf02236a584e68d79b178f21336571d93c7892 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltaclient.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltaclient.py @@ -42,6 +42,20 @@ class LTAClient: self.__rpc = xmlrpc.client.ServerProxy(url) logger.info('LTAClient connected to: %s', self.__hidePassword(url)) + def __enter__(self): + self.open() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def open(self): + pass + + def close(self): + pass + #self.__rpc.__close() + def __hidePassword(self, message): ''' helper function which hides the password in the ltaClient url in the message ''' diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltacp.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltacp.py index b95ce4595c52ceb0bbede4f1678ffc0b83ce8843..f32fcaad8c8570e189a0f0a8f923841c4b73fa62 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltacp.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltacp.py @@ -410,67 +410,66 @@ class LtaCp: prev_progress_time = datetime.utcnow() prev_bytes_transfered = 0 - pipe_reader = PipeReader(p_md5a32bc.stdout) - - # wait and poll for progress while all processes are runnning - while len([p for p in list(self.started_procs.keys()) if p.poll() is not None]) == 0: - try: - current_progress_time = datetime.utcnow() - elapsed_secs_since_prev = totalSeconds(current_progress_time - prev_progress_time) - - if elapsed_secs_since_prev > 900: - raise LtacpException('ltacp %s: transfer stalled for 15min.' % (self.logId)) - - # read and process md5a32bc stdout lines to create progress messages - lines = pipe_reader.readlines(1) - nextline = lines[-1].strip() if lines else '' - - if len(nextline) > 0: - try: - logger.debug('ltacp %s: transfering... %s', self.logId, nextline) - total_bytes_transfered = int(nextline.split()[0].strip()) - percentage_done = (100.0*float(total_bytes_transfered))/float(estimated_tar_size) - elapsed_secs_since_start = totalSeconds(current_progress_time - transfer_start_time) - if percentage_done > 0 and elapsed_secs_since_start > 0 and elapsed_secs_since_prev > 0: - avg_speed = total_bytes_transfered / elapsed_secs_since_start - current_bytes_transfered = total_bytes_transfered - prev_bytes_transfered - current_speed = current_bytes_transfered / elapsed_secs_since_prev - if elapsed_secs_since_prev > 120 or current_bytes_transfered > 0.333*estimated_tar_size: - prev_progress_time = current_progress_time - prev_bytes_transfered = total_bytes_transfered - percentage_to_go = 100.0 - percentage_done - time_to_go = elapsed_secs_since_start * percentage_to_go / percentage_done - - try: - if self.progress_callback: - self.progress_callback(percentage_done=percentage_done, current_speed=current_speed, total_bytes_transfered=total_bytes_transfered) - except Exception as e: - logger.error(e) - - logger.info('ltacp %s: transfered %s %.1f%% in %s at avgSpeed=%s (%s) curSpeed=%s (%s) to_go=%s to %s' % (self.logId, - humanreadablesize(total_bytes_transfered), - percentage_done, - timedelta(seconds=int(round(elapsed_secs_since_start))), - humanreadablesize(avg_speed, 'Bps'), - humanreadablesize(avg_speed*8, 'bps'), - humanreadablesize(current_speed, 'Bps'), - humanreadablesize(current_speed*8, 'bps'), - timedelta(seconds=int(round(time_to_go))), - dst_turl)) - except Exception as e: - msg = 'ltacp %s: error while parsing md5a32bc loglines: error=%s. line=%s' % (self.logId, e, nextline) - logger.error(msg) - self.cleanup() - raise LtacpException(msg) - time.sleep(0.05) - except KeyboardInterrupt: - self.cleanup() - except LtacpException as e: - logger.error('ltacp %s: %s' % (self.logId, str(e))) - self.cleanup() - raise - except Exception as e: - logger.error('ltacp %s: %s' % (self.logId, str(e))) + with PipeReader(p_md5a32bc.stdout, self.logId) as pipe_reader: + # wait and poll for progress while all processes are runnning + while len([p for p in list(self.started_procs.keys()) if p.poll() is not None]) == 0: + try: + current_progress_time = datetime.utcnow() + elapsed_secs_since_prev = totalSeconds(current_progress_time - prev_progress_time) + + if elapsed_secs_since_prev > 900: + raise LtacpException('ltacp %s: transfer stalled for 15min.' % (self.logId)) + + # read and process md5a32bc stdout lines to create progress messages + lines = pipe_reader.readlines(1) + nextline = lines[-1].strip() if lines else '' + + if len(nextline) > 0: + try: + logger.debug('ltacp %s: transfering... %s', self.logId, nextline) + total_bytes_transfered = int(nextline.split()[0].strip()) + percentage_done = (100.0*float(total_bytes_transfered))/float(estimated_tar_size) + elapsed_secs_since_start = totalSeconds(current_progress_time - transfer_start_time) + if percentage_done > 0 and elapsed_secs_since_start > 0 and elapsed_secs_since_prev > 0: + avg_speed = total_bytes_transfered / elapsed_secs_since_start + current_bytes_transfered = total_bytes_transfered - prev_bytes_transfered + current_speed = current_bytes_transfered / elapsed_secs_since_prev + if elapsed_secs_since_prev > 120 or current_bytes_transfered > 0.333*estimated_tar_size: + prev_progress_time = current_progress_time + prev_bytes_transfered = total_bytes_transfered + percentage_to_go = 100.0 - percentage_done + time_to_go = elapsed_secs_since_start * percentage_to_go / percentage_done + + try: + if self.progress_callback: + self.progress_callback(percentage_done=percentage_done, current_speed=current_speed, total_bytes_transfered=total_bytes_transfered) + except Exception as e: + logger.error(e) + + logger.info('ltacp %s: transfered %s %.1f%% in %s at avgSpeed=%s (%s) curSpeed=%s (%s) to_go=%s to %s' % (self.logId, + humanreadablesize(total_bytes_transfered), + percentage_done, + timedelta(seconds=int(round(elapsed_secs_since_start))), + humanreadablesize(avg_speed, 'Bps'), + humanreadablesize(avg_speed*8, 'bps'), + humanreadablesize(current_speed, 'Bps'), + humanreadablesize(current_speed*8, 'bps'), + timedelta(seconds=int(round(time_to_go))), + dst_turl)) + except Exception as e: + msg = 'ltacp %s: error while parsing md5a32bc loglines: error=%s. line=%s' % (self.logId, e, nextline) + logger.error(msg) + self.cleanup() + raise LtacpException(msg) + time.sleep(0.05) + except KeyboardInterrupt: + self.cleanup() + except LtacpException as e: + logger.error('ltacp %s: %s' % (self.logId, str(e))) + self.cleanup() + raise + except Exception as e: + logger.error('ltacp %s: %s' % (self.logId, str(e))) def waitForSubprocess(proc, timeout=timedelta(seconds=60), proc_log_name='', loglevel=logging.DEBUG): logger.log(loglevel, 'ltacp %s: waiting at most %s for %s to finish...', self.logId, timeout, proc_log_name) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py index 55c1ed02374d3c810867c1472912c87796fd0c14..be822c239e4c10665d3aa13d4937e7b6f22a8ba0 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py @@ -1,20 +1,18 @@ #!/usr/bin/env python3 +import requests + import logging -import time -import http.cookiejar -import urllib.request, urllib.parse, urllib.error +logger = logging.getLogger() + from lofar.lta.ingest.common.job import jobState2String from lofar.lta.ingest.server.config import MOM_BASE_URL from lofar.common import isProductionEnvironment from lofar.lta.ingest.server.sip import * from lofar.common.util import humanreadablesize -logger = logging.getLogger() - -# ignore ssl errors from expired/selfsigned mom ssl certificate -import ssl -ssl._create_default_https_context = ssl._create_unverified_context +import urllib3 +urllib3.disable_warnings() try: import mechanize @@ -40,22 +38,10 @@ class MoMClient: self.__user = user self.__password = password - self.__logged_in = False - - self.__browser = mechanize.Browser() - cookiejar = http.cookiejar.CookieJar() - self.__browser.set_cookiejar(cookiejar) - self.__browser.set_handle_robots(False) - self.__browser.set_handle_equiv(True) - self.__browser.set_handle_redirect(True) - self.__browser.set_handle_referer(True) - self.__browser.addheaders = [('User-agent', 'Firefox')] - - # self.__browser.set_debug_http(logger.level == logging.DEBUG) - # self.__browser.set_debug_redirects(logger.level == logging.DEBUG) - # self.__browser.set_debug_responses(logger.level == logging.DEBUG) + self.__session = None self.__momURLlogin = MOM_BASE_URL + 'useradministration/user/systemlogin.do' + self.__momUR_security_check = MOM_BASE_URL + 'useradministration/user/j_security_check' self.__momURLgetSIP = MOM_BASE_URL + 'mom3/interface/importXML2.do' self.__momURLsetStatus = MOM_BASE_URL + 'mom3/interface/service/setStatusDataProduct.do' self.__momURLlogout = MOM_BASE_URL + 'useradministration/user/logout.do' @@ -64,31 +50,32 @@ class MoMClient: def login(self): try: - if self.__logged_in: - return + if self.__session is not None: + self.logout() logger.debug("logging in to MoM on url: %s", self.__momURLlogin) - self.__browser.open(self.__momURLlogin) - forms = list(self.__browser.forms()) - self.__browser.form = forms[0] - self.__browser.form['j_username'] = self.__user - self.__browser.form['j_password'] = self.__password - ret_code = self.__browser.submit().code - if 200 != ret_code: - raise Exception("Logging into MoM failed: http return code = " + ret_code) + session = requests.session() + r = session.get(self.__momURLlogin, verify=False) + if 200 != r.status_code: + raise Exception("Logging into MoM on %s failed: http return code = %s" % (self.__momURLlogin, r.status_code)) + + r = session.post(self.__momUR_security_check, data={'j_username': self.__user, 'j_password': self.__password}, verify=False) + if 200 != r.status_code: + raise Exception("Logging into MoM on %s failed: http return code = %s" % (self.__momUR_security_check, r.status_code)) logger.debug("logged in on MoM on url: %s", self.__momURLlogin) - self.__logged_in = True + self.__session = session except Exception as e: raise Exception("Logging into MoM on %s failed: %s" % (self.__momURLlogin, str(e))) def logout(self): try: - if self.__logged_in: - logger.info("logging out of MoM on url: %s", self.__momURLlogout) - self.__browser.open(self.__momURLlogout) - self.__logged_in = False - logger.info("logged out of MoM on url: %s", self.__momURLlogout) + if self.__session is not None: + logger.debug("logging out of MoM on url: %s", self.__momURLlogout) + self.__session.get(self.__momURLlogout, verify=False) + self.__session.close() + self.__session = None + logger.debug("logged out of MoM on url: %s", self.__momURLlogout) except Exception as e: logger.warning("Logging out of MoM failed: " + str(e)) @@ -108,11 +95,11 @@ class MoMClient: self.login() params = {"exportId" : export_id, "status" : status_id} - statusUrl = self.__momURLsetStatus + '?' + urllib.parse.urlencode(params) - logger.info("updating MoM: " + statusUrl) - response = self.__browser.open(statusUrl) - reply = [line.decode('utf-8') for line in response.readlines()] - if reply == ['ok']: + logger.info("updating MoM on url: %s with params: %s", self.__momURLsetStatus, params) + response = self.__session.get(self.__momURLsetStatus, params=params) + reply = response.text.strip() + + if reply == 'ok': logger.info('MoMClient.setStatus updated status of %s to %s', export_id, jobState2String(int(status_id))) if message: @@ -125,12 +112,11 @@ class MoMClient: message = message[:97] + '...' params['message'] = message + logger.info("updating MoM (again to set the message) on url: %s with params: %s", self.__momURLsetStatus, params) + response = self.__session.get(self.__momURLsetStatus, params=params) + reply = response.text.strip() - statusUrl = self.__momURLsetStatus + '?' + urllib.parse.urlencode(params) - logger.info("updating MoM: " + statusUrl) - response = self.__browser.open(statusUrl) - reply = [line.decode('utf-8') for line in response.readlines()] - if reply == ['ok']: + if reply == 'ok': logger.info('MoMClient.setStatus updated status of %s to %s with message: %s', export_id, jobState2String(int(status_id)), @@ -142,15 +128,15 @@ class MoMClient: logger.error('MoMClient.setStatus could not update status of %s to %s using url: %s reply: %s', export_id, jobState2String(int(status_id)), - statusUrl, + response.url, reply) self.logout() if 'DOCTYPE HTML PUBLIC' in reply: - logger.error('MoM returned login screen instead of SIP for archive_id=%s mom_id=%s using url %s and data %s', archive_id, mom_id, self.__momURLgetSIP, data) + logger.error('MoM returned login screen instead of SIP for export_id: %s on url %s', export_id, response.url) wait_secs = (mom_retry + 1) * (mom_retry + 1) * 10 - logger.info('Retrying to setStatus for archiveId %s in %s seconds', archive_id, wait_secs) + logger.info('Retrying to setStatus for export_id %s in %s seconds', export_id, wait_secs) time.sleep(wait_secs) continue # jump back to for mom_retry in range(self.MAX_MOM_RETRIES) @@ -198,15 +184,14 @@ class MoMClient: while ' ' in xmlcontent: xmlcontent = xmlcontent.replace(' ', ' ') - data = urllib.parse.urlencode({"command" : "get-sip-with-input", "xmlcontent" : xmlcontent}) - # Now get that file-like object again, remembering to mention the data. - response = self.__browser.open(self.__momURLgetSIP, data) - result = response.read().decode('utf-8') + params = {"command": "get-sip-with-input", "xmlcontent" : xmlcontent} + response = self.__session.get(self.__momURLgetSIP, params=params) + result = response.text result = result.replace('<stationType>Europe</stationType>', '<stationType>International</stationType>') if 'DOCTYPE HTML PUBLIC' in result: - logger.error('MoM returned login screen instead of SIP for %s %s using url %s and data %s', - archive_id, filename, self.__momURLgetSIP, data) + logger.error('MoM returned login screen instead of SIP for %s %s using url %s and params %s, full url=%s', + archive_id, filename, self.__momURLgetSIP, params, response.url) # logout, even though we think we should be logged in properly # it's mom who thinks we should login again, even though we have a proper session. @@ -218,7 +203,7 @@ class MoMClient: # let's give it a final try with just GetSip # we'll miss some data in the SIP, which we can add ourselves, so the LTA catalogue is up-to-date # but MoM will miss these parameters. tough luck. - logger.warn("MoMClient.uploadDataAndGetSIP with archiveId %s - StorageTicket %s - FileName %s - Uri %s failed %s times. Trying normal GetSip without uploading data to MoM.", archive_id, storage_ticket, filename, uri, mom_retry) + logger.warning("MoMClient.uploadDataAndGetSIP with archiveId %s - StorageTicket %s - FileName %s - Uri %s failed %s times. Trying normal GetSip without uploading data to MoM.", archive_id, storage_ticket, filename, uri, mom_retry) result = self.getSIP(archive_id, validate = False) # add ingest info to sip result = addIngestInfoToSIP(result, storage_ticket, filesize, md5_checksum, adler32_checksum) @@ -271,11 +256,9 @@ class MoMClient: mom_id = archive_id - 1000000 # stupid mom one million archive_id offset - data = urllib.parse.urlencode({"command" : "GETSIP", "id" : mom_id}) - # Now get that file-like object again, remembering to mention the data. - logger.info('%s: GetSip call: %s %s', log_prefix, self.__momURLgetSIP, data) - response = self.__browser.open(self.__momURLgetSIP, data) - result = response.read().decode('utf-8') + # logger.info('%s: GetSip call: %s %s', log_prefix, self.__momURLgetSIP, data) + response = self.__session.get(self.__momURLgetSIP, params={"command" : "GETSIP", "id" : mom_id}) + result = response.text if 'DOCTYPE HTML PUBLIC' in result: logger.error('%s: MoM returned login screen instead of SIP for archive_id=%s mom_id=%s using url %s and data %s', @@ -298,4 +281,3 @@ class MoMClient: self.logout() raise Exception("getting SIP from MoM failed: " + str(e)) return '' - diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 61dbd0390960f341eeec8d68e21ceaf6fc4c6b45..94dc11f456845e6dec6b6bbf919e7902585fc78f 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -337,7 +337,7 @@ class PipelineDependencies(object): return self.rarpc.getTasks(task_status=task_status, task_type=task_type) -class PipelineControlHandler(OTDBEventMessageHandler): +class PipelineControlHandler( OTDBEventMessageHandler): def __init__(self, exchange, broker): super(PipelineControlHandler, self).__init__() @@ -364,16 +364,16 @@ class PipelineControlHandler(OTDBEventMessageHandler): super(PipelineControlHandler, self).start_handling() - self._checkScheduledPipelines() - def stop_handling(self): super(PipelineControlHandler, self).stop_handling() self.dependencies.close() self.otdbrpc.close() - def _checkScheduledPipelines(self): + def check_scheduled_pipelines(self): try: + logger.info("Checking for already scheduled pipelines...") + scheduled_pipelines = self.dependencies.getTasks(task_status='scheduled', task_type='pipeline') logger.info("Checking %s scheduled pipelines if they can start.", @@ -398,6 +398,8 @@ class PipelineControlHandler(OTDBEventMessageHandler): except Exception as e: logger.error(e) + logger.info("...finished checking for already scheduled pipelines.") + @staticmethod def _shouldHandle(parset): try: @@ -469,6 +471,18 @@ class PipelineControlHandler(OTDBEventMessageHandler): busname = self.exchange, )) + def getParset_cmdline(): + return ( + "ssh {myhostname} '" + "source {lofarroot}/lofarinit.sh && " + "getOTDBParset -o {obsid}'" + .format( + myhostname = getfqdn(), + lofarroot = os.environ.get("LOFARROOT", ""), + obsid = otdbId, + )) + + try: logger.info("Handing over pipeline %s to SLURM", otdbId) @@ -499,6 +513,9 @@ class PipelineControlHandler(OTDBEventMessageHandler): # notify ganglia wget -O - -q "http://ganglia.control.lofar/ganglia/api/events.php?action=add&start_time=now&summary=Pipeline {obsid} ACTIVE&host_regex=" + + # fetch parset + runcmd {getParset} > {parset_file} # run the pipeline runcmd docker-run-slurm.sh --rm --net=host \ @@ -507,7 +524,7 @@ class PipelineControlHandler(OTDBEventMessageHandler): -e SLURM_JOB_ID=$SLURM_JOB_ID \ -v /data:/data \ {image} \ - runPipeline.sh -o {obsid} -c /opt/lofar/share/pipeline/pipeline.cfg.{cluster} -P {parset_dir} + runPipeline.sh -o {obsid} -c /opt/lofar/share/pipeline/pipeline.cfg.{cluster} -P {parset_dir} -p {parset_file} RESULT=$? # notify that we're tearing down @@ -538,10 +555,12 @@ class PipelineControlHandler(OTDBEventMessageHandler): lofarenv = os.environ.get("LOFARENV", ""), obsid = otdbId, parset_dir = "/data/parsets", + parset_file = "/data/parsets/Observation%s.parset" % (otdbId,), repository = parset.dockerRepository(), image = parset.dockerImage(), cluster = parset.processingCluster(), + getParset = getParset_cmdline(), setStatus_active = setStatus_cmdline("active"), setStatus_completing = setStatus_cmdline("completing"), setStatus_finished = setStatus_cmdline("finished"), @@ -673,3 +692,18 @@ class PipelineControl(OTDBBusListener): super().__init__(handler_type=handler_type, handler_kwargs=handler_kwargs, exchange=exchange, num_threads=num_threads, broker=broker) + + def start_listening(self): + # HACK: create a temporary extra handler which is not connected to this listener, + # and hence not responding to incoming messages, + # and use this extra handler to initially check all already scheduled pipelines + with self._create_handler() as helper_handler: + helper_handler.check_scheduled_pipelines() + + # everything has been check, now start_listening, and let the normal handlers respond to otdb events + super().start_listening() + + + + + diff --git a/SAS/DataManagement/Cleanup/CleanupClient/rpc.py b/SAS/DataManagement/Cleanup/CleanupClient/rpc.py index 1c5c93f62e3fff09002b99e0f7f5abfe3e12eec2..cda86ffe8ec9efff420c28fbe61c51519b0dc50d 100644 --- a/SAS/DataManagement/Cleanup/CleanupClient/rpc.py +++ b/SAS/DataManagement/Cleanup/CleanupClient/rpc.py @@ -8,6 +8,8 @@ from lofar.common.util import convertStringDigitKeysToInt logger = logging.getLogger(__name__) +DEFAULT_CLEANUPRPC_TIMEOUT = 900 # delete actions on disk can take a while, so allow more time (15min) until timeout by default. + class CleanupRPC(RPCClientContextManagerMixin): def __init__(self, rpc_client: RPCClient = None): """Create an instance of the CleanupRPC using the given RPCClient, @@ -16,7 +18,7 @@ class CleanupRPC(RPCClientContextManagerMixin): self._rpc_client = rpc_client or RPCClient(service_name=DEFAULT_CLEANUP_SERVICENAME) @staticmethod - def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int=DEFAULT_RPC_TIMEOUT): + def create(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, timeout: int=DEFAULT_CLEANUPRPC_TIMEOUT): """Create a CleanupRPC connecting to the given exchange/broker on the default DEFAULT_CLEANUP_SERVICENAME service""" return CleanupRPC(RPCClient(service_name=DEFAULT_CLEANUP_SERVICENAME, exchange=exchange, broker=broker, timeout=timeout)) diff --git a/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py b/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py index 5fc9b2996ef9c7ef65f0957b04af7022b47ace7d..4b597e712d1cdfc3812aed87ed63f3ae71e50570 100755 --- a/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py +++ b/SAS/MoM/MoMQueryService/MoMQueryServiceServer/momqueryservice.py @@ -109,7 +109,7 @@ class MoMDatabaseWrapper: logger.debug("Connecting to %s", self.dbcreds.stringWithHiddenPassword()) self.conn = connector.connect(**connect_options) - logger.info("Connected to %s", self.dbcreds.stringWithHiddenPassword()) + logger.debug("Connected to %s", self.dbcreds.stringWithHiddenPassword()) # Make sure we get fresh data for each SELECT. Alternatively, we could call # commit() after each SELECT. @@ -126,7 +126,7 @@ class MoMDatabaseWrapper: self.cursor = None self.conn.close() self.conn = None - logger.info("Disconnected from %s", self.dbcreds.stringWithHiddenPassword()) + logger.debug("Disconnected from %s", self.dbcreds.stringWithHiddenPassword()) def __enter__(self): self.connect() @@ -149,7 +149,9 @@ class MoMDatabaseWrapper: try: self.connect() self.cursor.execute(query, data) - return self.cursor.fetchall() + result = self.cursor.fetchall() + self.disconnect() # needed for selects on cached results + return result except (OperationalError, AttributeError) as e: logger.error(str(e)) @@ -1200,6 +1202,7 @@ where project.mom2id = %s and (project_role.name = "Pi" or project_role.name = " where mom.mom2id=%s;""" parameters = (mom_id, mom_id) rows_misc = self._executeSelectQuery(query, parameters) + logger.info("_get_misc_contents(mom_id=%s) rows_misc: %s", mom_id, rows_misc) if rows_misc: misc = {} @@ -1435,7 +1438,7 @@ def main(): with RPCService(service_name=DEFAULT_MOMQUERY_SERVICENAME, handler_type=MoMQueryServiceMessageHandler, handler_kwargs={'dbcreds': dbcreds}, broker=options.broker, exchange=options.exchange, - num_threads=4): + num_threads=6): waitForInterrupt() if __name__ == '__main__': diff --git a/SAS/OTDB_Services/OTDBBusListener.py b/SAS/OTDB_Services/OTDBBusListener.py index 7bb1d3be1605350ccc119e4da127a66ec4368438..835b39f0fe13169f0e3a0bf435808c22c6faf65a 100644 --- a/SAS/OTDB_Services/OTDBBusListener.py +++ b/SAS/OTDB_Services/OTDBBusListener.py @@ -187,11 +187,10 @@ class OTDBEventMessageHandler(AbstractMessageHandler): class OTDBBusListener(BusListener): """The OTDBBusListener is a normal BusListener listening specifically to EventMessages with OTDB notification subjects. - It uses by default the OTDBEventMessageHandler to handle the EventMessages. - If you want to implement your own behaviour, then derive a subclass of the OTDBEventMessageHandler, and inject that in this OTDBBusListener. + You have to implement your own concrete subclass of the OTDBEventMessageHandler, and inject that in this OTDBBusListener. See example at the top of this file. """ - def __init__(self, handler_type: OTDBEventMessageHandler.__class__ = OTDBEventMessageHandler, handler_kwargs: dict = None, + def __init__(self, handler_type: OTDBEventMessageHandler.__class__, handler_kwargs: dict = None, exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, num_threads: int = 1): if not issubclass(handler_type, OTDBEventMessageHandler): raise TypeError("handler_type should be a OTDBEventMessageHandler subclass") @@ -200,5 +199,4 @@ class OTDBBusListener(BusListener): exchange=exchange, routing_key="%s.#" % (DEFAULT_OTDB_NOTIFICATION_SUBJECT), num_threads=num_threads, broker=broker) - __all__ = ["OTDBEventMessageHandler", "OTDBBusListener"] diff --git a/SAS/OTDB_Services/getOTDBParset b/SAS/OTDB_Services/getOTDBParset index c27acd2d599b81423e19c801ca059cb0599bcfe1..e2913668f18459f407585b4390b58e05bc8be707 100755 --- a/SAS/OTDB_Services/getOTDBParset +++ b/SAS/OTDB_Services/getOTDBParset @@ -47,7 +47,7 @@ if __name__ == "__main__": parser.print_help() sys.exit(1) - with OTDBRPC(exchange=options.exchange) as otdbrpc: + with OTDBRPC.create(exchange=options.exchange) as otdbrpc: parset = otdbrpc.taskGetSpecification(otdb_id=options.obsid)["specification"] for key in sorted(parset.keys()): diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/changeshandler.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/changeshandler.py index f6cca30a5f4742ee1592222102e00341c7c31f75..b6698e41278b46974a4e16c4ff669a7d69b686b2 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/changeshandler.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/changeshandler.py @@ -94,10 +94,10 @@ class ChangesHandler: super().__init__() def onTaskDeleting(self, otdb_id): - self._changes_handler.onTaskDeleting(otdb_id) + self._changes_handler.onTaskDataDeletingFromDisk(otdb_id) def onTaskDeleted(self, otdb_id, deleted, paths, message=''): - self._changes_handler.onTaskDeleted(otdb_id, deleted, paths, message) + self._changes_handler.onTaskDataDeletedFromDisk(otdb_id, deleted, paths, message) def onTaskDataPinned(self, otdb_id, pinned): self._changes_handler.onTaskDataPinned(otdb_id, pinned) @@ -242,14 +242,14 @@ class ChangesHandler: def onDiskUsageChanged(self, path, disk_usage, otdb_id): self._handleDiskUsageChange(disk_usage, otdb_id) - def onTaskDeletingFromDisk(self, otdb_id): + def onTaskDataDeletingFromDisk(self, otdb_id): task = self._radbrpc.getTask(otdb_id=otdb_id) task_type = task.get('type', 'task') if task else 'task' message = 'Deleting data from disk for %s with otdb_id %s ...' % (task_type, otdb_id) change = {'changeType':CHANGE_EVENT_TYPE, 'objectType':'logevent', 'value':message} self._handleChange(change) - def onTaskDeletedFromDisk(self, otdb_id, deleted, paths, message=''): + def onTaskDataDeletedFromDisk(self, otdb_id, deleted, paths, message=''): #get new actual disk_usage disk_usage_result = self._sqrpc.getDiskUsageForOTDBId(otdb_id, include_scratch_paths=True, force_update=True) disk_usage = disk_usage_result['disk_usage'] if disk_usage_result['found'] else 0 diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js index e0c916cbdfd29ee745e1e7295950f20e40c567e5..d416915c49ae70082d3a1ba76801115d59e4501b 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/static/app/controllers/gridcontroller.js @@ -1,5 +1,13 @@ // $Id: controller.js 32761 2015-11-02 11:50:21Z schaap $ +function sizeFilterCondition(term, value, row, column) { + if (term === 0) + return value === 0; + + return (value >= term); +}; + + var gridControllerMod = angular.module('GridControllerMod', ['ui.grid', 'ui.grid.edit', 'ui.grid.selection', @@ -179,8 +187,8 @@ gridControllerMod.controller('GridController', ['$scope', '$window', 'dataServic width: '80', filter: { type: uiGridConstants.filter.SELECT, - condition: uiGridConstants.filter.GREATER_THAN, - selectOptions: [{ value:0, label: '> 0'}, { value:1e6, label: '> 1M'}, { value:1e9, label: '> 1G'}, { value:1e10, label: '> 10G'}, { value:1e11, label: '> 100G'}, { value:1e12, label: '> 1T'} ] + condition: sizeFilterCondition, + selectOptions: [{ value:0, label: '0'}, { value:1, label: '> 0'}, { value:1e6, label: '> 1M'}, { value:1e9, label: '> 1G'}, { value:1e10, label: '> 10G'}, { value:1e11, label: '> 100G'}, { value:1e12, label: '> 1T'} ] } }, { field: 'mom_object_group_id', diff --git a/SAS/SpecificationServices/lib/config.py b/SAS/SpecificationServices/lib/config.py index ea15b3bffc92c017756d542a4ae2cc94967116ac..19760d378174b60d0e71f4d2e36264fc4f4b427a 100644 --- a/SAS/SpecificationServices/lib/config.py +++ b/SAS/SpecificationServices/lib/config.py @@ -11,7 +11,7 @@ SPECIFICATIONTRANSLATION_SERVICENAME = "specificationtranslationservice" # TODO: mom.importxml does not prepend "test." on the test system? MOMIMPORTXML_BUSNAME = "mom.importxml" -MOMIMPORTXML_SUBJECT = "add specification" +MOMIMPORTXML_BROKER = "lcs023.control.lofar" # XSD paths (for validation service) TRIGGER_XSD = "$LOFARROOT/share/SAS/LofarTrigger.xsd" diff --git a/SAS/SpecificationServices/lib/specification_service.py b/SAS/SpecificationServices/lib/specification_service.py index afb8d9c516f2a01d109876693ef690d80ead8835..108bcccc4f9df4718dcaadc9cabdf2f089ea598d 100644 --- a/SAS/SpecificationServices/lib/specification_service.py +++ b/SAS/SpecificationServices/lib/specification_service.py @@ -26,19 +26,14 @@ from io import BytesIO from lofar.common.util import waitForInterrupt # TODO: mom.importxml uses old messaging interface from lofar.messagebus.message import MessageContent -from lofar.messaging import RPCService, EventMessage, DEFAULT_BROKER, DEFAULT_BUSNAME, \ - ServiceMessageHandler +from lofar.messaging import RPCService, DEFAULT_BUSNAME, DEFAULT_BROKER, ServiceMessageHandler from lofar.messagebus.messagebus import ToBus as ToBusOld from lofar.messagebus.message import MessageContent as MessageContentOld from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.specificationservices.translation_service_rpc import TranslationRPC from lofar.specificationservices.validation_service_rpc import ValidationRPC from lofar.common.xmlparse import parse_xml_string_or_bytestring - -from .config import \ - SPECIFICATION_SERVICENAME, \ - MOMIMPORTXML_BUSNAME, \ - MOMIMPORTXML_SUBJECT +from .config import SPECIFICATION_SERVICENAME, MOMIMPORTXML_BUSNAME, MOMIMPORTXML_BROKER permitted_activities = ["observation", "pipeline", "measurement"] permitted_statuses = ["opened", "approved"] @@ -253,6 +248,8 @@ class SpecificationHandler(ServiceMessageHandler): raise Exception("Invalid MoM specification: %s", response["error"]) def _add_spec_to_mom(self, mom_xml): + logger.info("about to send mom_xml: %s", mom_xml) + # Construct message payload using old-style (MessageBus) message format msg = MessageContentOld() msg.protocol = "mom.importxml" @@ -260,12 +257,19 @@ class SpecificationHandler(ServiceMessageHandler): msg.summary = "Translated LOFAR specifications" msg.momid = -1 msg.sasid = -1 - msg.payload = "\n%s\n" % (mom_xml, ) # MoM needs enters around the payload to avoid "Content not allowed in prolog" error - #emsg = MessageOld(subject=MOMIMPORTXML_SUBJECT, content=content) + # prepare payload in xml to have a %s which can be filled in with plain xml in the qmsg below... + # MoM needs enters around the payload to avoid "Content not allowed in prolog" error + msg.payload = "\n%s\n" + + # convert to qpid message (which is a proton message nowadays) + qmsg = msg.qpidMsg() + # and inject the mom_xml in the message xml + qmsg.body = qmsg.body % (mom_xml, ) - with ToBusOld(queue=MOMIMPORTXML_BUSNAME, broker=DEFAULT_BROKER) as momimportxml_bus: - momimportxml_bus.send(msg) + with ToBusOld(queue=MOMIMPORTXML_BUSNAME, + broker=MOMIMPORTXML_BROKER) as momimportxml_bus: + momimportxml_bus.send(qmsg) logger.debug("Send specs to MOM: %s", mom_xml) diff --git a/SAS/SpecificationServices/lib/translation_service.py b/SAS/SpecificationServices/lib/translation_service.py index 352da0bb934466bca9e2979b772b0b69042953ea..696c22ca335612a7fcfcbb3dd668de0e9dacbf44 100644 --- a/SAS/SpecificationServices/lib/translation_service.py +++ b/SAS/SpecificationServices/lib/translation_service.py @@ -139,7 +139,11 @@ class SpecificationTranslationHandler(ServiceMessageHandler): logger.error("Exception while translating specification -> " + str(err)) raise - logger.debug("MoM spec after translation -> " + momspec_xml.decode("utf-8")) + if isinstance(momspec_xml, bytes): + momspec_xml = momspec_xml.decode("utf-8") + + logger.debug("MoM spec after translation -> " + momspec_xml) + with self.validationrpc: response = self.validationrpc.validate_mom_specification(momspec_xml) if response["valid"]: diff --git a/SAS/XML_generator/src/xmlgen.py b/SAS/XML_generator/src/xmlgen.py index c2c1c8cfb2f97a9a58fe769a93ba4e693a1b0455..1b8776095db50c37eef34162967a89f97d1f5885 100755 --- a/SAS/XML_generator/src/xmlgen.py +++ b/SAS/XML_generator/src/xmlgen.py @@ -1,4 +1,4 @@ -#! /usr/bin/env python +#! /usr/bin/env python3 # XML generator # xmlgen.py # @@ -26,7 +26,7 @@ # Last change by : $Author: renting $ # Change date : $Date: 2016-05-18 11:47:57 +0200 (wo, 18 mei 2016) $ -VERSION = "4.0.2" +VERSION = "4.0.3" import sys, getopt, time from xml.sax.saxutils import escape as XMLescape diff --git a/SubSystems/CMakeLists.txt b/SubSystems/CMakeLists.txt index 5624ba9bf4c198dc7f739aeccbef1f4b8fbe208c..36ff8bb71f665ec8949244b1a03894f532621656 100644 --- a/SubSystems/CMakeLists.txt +++ b/SubSystems/CMakeLists.txt @@ -15,3 +15,4 @@ lofar_add_package(RAServices) lofar_add_package(DataManagement) lofar_add_package(Dragnet) lofar_add_package(LTAIngest) +lofar_add_package(LTAIngestTransfer) diff --git a/SubSystems/LTAIngest/CMakeLists.txt b/SubSystems/LTAIngest/CMakeLists.txt index 64273f9a8dfef738bb6d0404bbd9c1e909c5628f..7d1e763ef24daf9a7a7f493b6ec639016ba1b091 100644 --- a/SubSystems/LTAIngest/CMakeLists.txt +++ b/SubSystems/LTAIngest/CMakeLists.txt @@ -1 +1 @@ -lofar_package(LTAIngest DEPENDS LTAIngestServer LTAIngestClient LTAIngestWebServer) +lofar_package(LTAIngest DEPENDS LTAIngestServer LTAIngestClient LTAIngestWebServer MessageLogger) diff --git a/SubSystems/LTAIngestTransfer/CMakeLists.txt b/SubSystems/LTAIngestTransfer/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..3782f984929ae9efdbaa5d7ab3f6ee4f40d179c3 --- /dev/null +++ b/SubSystems/LTAIngestTransfer/CMakeLists.txt @@ -0,0 +1 @@ +lofar_package(LTAIngestTransfer DEPENDS LTAIngestTransferServer LTAIngestClient)