diff --git a/.gitattributes b/.gitattributes index 8bac73ac05aa494fafc858dae543d46d1e0e2b11..8a77965f182dea241c36ad5803fff53381aa567d 100644 --- a/.gitattributes +++ b/.gitattributes @@ -1516,6 +1516,8 @@ CEP/Pipeline/recipes/sip/nodes/imager_finalize.py -text CEP/Pipeline/recipes/sip/nodes/imager_prepare.py eol=lf CEP/Pipeline/recipes/sip/nodes/imager_source_finding.py eol=lf CEP/Pipeline/recipes/sip/nodes/new_bbs.py eol=lf +CEP/Pipeline/recipes/sip/nodes/python_plugin.py -text +CEP/Pipeline/recipes/sip/nodes/python_plugin_loader.py -text CEP/Pipeline/recipes/sip/nodes/rficonsole.py eol=lf CEP/Pipeline/recipes/sip/nodes/setupparmdb.py eol=lf CEP/Pipeline/recipes/sip/nodes/setupsourcedb.py eol=lf diff --git a/CEP/Pipeline/recipes/sip/bin/genericpipeline.py b/CEP/Pipeline/recipes/sip/bin/genericpipeline.py index a20e421c491ae079d70cbbb62f139cfb71b127b2..2d06cd0af8c356a374ac69d5fa9b996b9a64efbf 100644 --- a/CEP/Pipeline/recipes/sip/bin/genericpipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/genericpipeline.py @@ -8,18 +8,31 @@ from lofarpipe.support.loggingdecorators import duration from lofarpipe.support.data_map import DataMap, DataProduct, validate_data_maps from lofarpipe.support.lofarexceptions import PipelineException from lofarpipe.support.utilities import create_directory - +import logging +from lofarpipe.support.pipelinelogging import getSearchingLogger +import lofarpipe.support.lofaringredient as ingredient import loader class GenericPipeline(control): + inputs = { + 'loglevel': ingredient.StringField( + '--loglevel', + help="loglevel", + default='INFO', + optional=True + ) + } + def __init__(self): control.__init__(self) self.parset = Parset() self.input_data = {} self.output_data = {} self.parset_feedback_file = None + #self.logger = None#logging.RootLogger('DEBUG') + self.name = '' def usage(self): """ @@ -45,7 +58,12 @@ class GenericPipeline(control): if not 'job_name' in self.inputs: self.inputs['job_name'] = ( os.path.splitext(os.path.basename(parset_file))[0]) - + self.name = self.inputs['job_name'] + try: + self.logger + except: + self.logger = getSearchingLogger(self.name) + self.logger.setLevel(self.inputs['loglevel']) # Call the base-class's `go()` method. return super(GenericPipeline, self).go() diff --git a/CEP/Pipeline/recipes/sip/master/executable_args.py b/CEP/Pipeline/recipes/sip/master/executable_args.py index 31ad7f2916e48695799658cabd9e4ac3e65dba47..71b3a7054e7dbd805064b72c1f1c36ef9e547d47 100644 --- a/CEP/Pipeline/recipes/sip/master/executable_args.py +++ b/CEP/Pipeline/recipes/sip/master/executable_args.py @@ -14,7 +14,7 @@ import lofarpipe.support.lofaringredient as ingredient from lofarpipe.support.baserecipe import BaseRecipe from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn from lofarpipe.support.remotecommand import ComputeJob -from lofarpipe.support.data_map import DataMap, validate_data_maps, align_data_maps +from lofarpipe.support.data_map import DataMap, validate_data_maps, align_data_maps, DataProduct from lofarpipe.support.parset import Parset @@ -187,31 +187,41 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): # try loading input/output data file, validate output vs the input location if # output locations are provided try: - indatas = [] + inputmapfiles = [] if self.inputs['mapfile_in']: indata = DataMap.load(self.inputs['mapfile_in']) - indatas.append(indata) + inputmapfiles.append(indata) if self.inputs['mapfiles_in']: for item in self.inputs['mapfiles_in']: - indatas.append(DataMap.load(item)) + inputmapfiles.append(DataMap.load(item)) self.inputs['mapfile_in'] = self.inputs['mapfiles_in'][0] #else: - # indatas.append(indata) + # inputmapfiles.append(indata) except Exception: self.logger.error('Could not load input Mapfile %s' % self.inputs['mapfile_in']) return 1 + + outputmapfiles = [] + prefix = os.path.join(self.inputs['working_directory'], self.inputs['job_name']) if self.inputs['mapfile_out']: try: outdata = DataMap.load(self.inputs['mapfile_out']) + outputmapfiles.append(outdata) except Exception: self.logger.error('Could not load output Mapfile %s' % self.inputs['mapfile_out']) return 1 # sync skip fields in the mapfiles - align_data_maps(indatas[0], outdata) + align_data_maps(inputmapfiles[0], outputmapfiles[0]) + + elif self.inputs['mapfiles_out']: + for item in self.inputs['mapfiles_out']: + outputmapfiles.append(DataMap.load(item)) + self.inputs['mapfile_out'] = self.inputs['mapfiles_out'][0] + else: # ouput will be directed in the working directory if no output mapfile is specified - outdata = copy.deepcopy(indatas[0]) + outdata = copy.deepcopy(inputmapfiles[0]) if not self.inputs['inplace']: for item in outdata: item.file = os.path.join( @@ -220,28 +230,30 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): #os.path.basename(item.file) + '.' + os.path.split(str(executable))[1] os.path.splitext(os.path.basename(item.file))[0] + '.' + self.inputs['stepname'] ) - self.inputs['mapfile_out'] = os.path.join(os.path.dirname(self.inputs['mapfile_in']), os.path.basename(executable) + '.' + 'mapfile') + self.inputs['mapfile_out'] = os.path.join(prefix, self.inputs['stepname'] + '.' + 'mapfile') + self.inputs['mapfiles_out'].append(self.inputs['mapfile_out']) else: self.inputs['mapfile_out'] = self.inputs['mapfile_in'] + self.inputs['mapfiles_out'].append(self.inputs['mapfile_out']) + outputmapfiles.append(outdata) - if not validate_data_maps(indatas[0], outdata): + if not validate_data_maps(inputmapfiles[0], outputmapfiles[0]): self.logger.error( "Validation of data mapfiles failed!" ) return 1 - # Handle multiple outputfiles - outputsuffix = self.inputs['outputsuffixes'] - outputmapfiles = [] - prefix = os.path.join(self.inputs['working_directory'], self.inputs['job_name']) - for name in outputsuffix: - outputmapfiles.append(copy.deepcopy(indatas[0])) - for item in outputmapfiles[-1]: - item.file = os.path.join( - prefix, - #os.path.basename(item.file) + '.' + os.path.split(str(executable))[1] + '.' + name - os.path.splitext(os.path.basename(item.file))[0] + '.' + self.inputs['stepname'] + name - ) + if self.inputs['outputsuffixes']: + # Handle multiple outputfiles + for name in self.inputs['outputsuffixes']: + outputmapfiles.append(copy.deepcopy(inputmapfiles[0])) + self.inputs['mapfiles_out'].append(os.path.join(prefix, self.inputs['stepname'] + name + '.' + 'mapfile')) + for item in outputmapfiles[-1]: + item.file = os.path.join( + prefix, + os.path.splitext(os.path.basename(item.file))[0] + '.' + self.inputs['stepname'] + name + ) + self.inputs['mapfile_out'] = self.inputs['mapfiles_out'][0] # prepare arguments arglist = self.inputs['arguments'] @@ -252,51 +264,74 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): for k in parset.keys: parsetdict[k] = str(parset[k]) - #for k in parset.keys: - # arglist.append('--' + k + '=' + parset.getString(k)) - #if not self.inputs['inputkey'] and not self.inputs['skip_infile']: - # arglist.insert(0, None) - # construct multiple input data inputlist = [] + keylist = [] if not self.inputs['inputkeys'] and self.inputs['inputkey']: self.inputs['inputkeys'].append(self.inputs['inputkey']) - if indatas: - for item in indatas: - item.iterator = DataMap.SkipIterator - for mfile in indatas: - inputlist.append([]) - for inp in mfile: - inputlist[-1].append(inp.file) + if not self.inputs['outputkeys'] and self.inputs['outputkey']: + self.inputs['outputkeys'].append(self.inputs['outputkey']) + + if len(self.inputs['inputkeys']) is not len(inputmapfiles): + self.logger.error("Number of input mapfiles %d and input keys %d have to match." % + len(self.inputs['inputkeys']), len(inputmapfiles)) + return 1 + + filedict = {} + if self.inputs['inputkeys'] and not self.inputs['skip_infile']: + for key, filemap in zip(self.inputs['inputkeys'], inputmapfiles): + filedict[key] = [] + for inp in filemap: + filedict[key].append(inp.file) + + if self.inputs['outputkey']: + filedict[self.inputs['outputkey']] = [] + for item in outputmapfiles[0]: + filedict[self.inputs['outputkey']].append(item.file) + + # if inputmapfiles and not self.inputs['skip_infile']: + # for key in self.inputs['inputkeys']: + # keylist.append(key) + # for item in inputmapfiles: + # item.iterator = DataMap.SkipIterator + # for mfile in inputmapfiles: + # inputlist.append([]) + # for inp in mfile: + # inputlist[-1].append(inp.file) + # + # if self.inputs['outputkey']: + # inputlist.append([]) + # keylist.append(self.inputs['outputkey']) + # for item in outputmapfiles[0]: + # inputlist[-1].append(item.file) # ******************************************************************** # Call the node side of the recipe # Create and schedule the compute jobs command = "python %s" % (self.__file__.replace('master', 'nodes')).replace('executable_args', self.inputs['nodescript']) - indatas[0].iterator = outdata.iterator = DataMap.SkipIterator + inputmapfiles[0].iterator = outputmapfiles[0].iterator = DataMap.SkipIterator jobs = [] for i, (outp, inp,) in enumerate(zip( - outdata, indatas[0]) + outputmapfiles[0], inputmapfiles[0]) ): arglist_copy = copy.deepcopy(arglist) parsetdict_copy = copy.deepcopy(parsetdict) - if self.inputs['inputkeys'] and not self.inputs['skip_infile']: - for name, value in zip(self.inputs['inputkeys'], inputlist): + #if keylist: + #for name, value in zip(keylist, inputlist): + if filedict: + for name, value in filedict.iteritems(): if arglist_copy and name in arglist_copy: ind = arglist_copy.index(name) arglist_copy[ind] = value[i] + elif name in parsetdict_copy.values(): + for k, v in parsetdict_copy.iteritems(): + if v == name: + parsetdict_copy[k] = value[i] else: parsetdict_copy[name] = value[i] - if self.inputs['outputkey'] and not self.inputs['skip_infile']: - if arglist_copy and self.inputs['outputkey'] in arglist_copy: - ind = arglist_copy.index(self.inputs['outputkey']) - arglist_copy[ind] = outp.file - else: - parsetdict_copy[self.inputs['outputkey']] = outp.file - jobs.append( ComputeJob( inp.host, command, @@ -315,10 +350,22 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): ) max_per_node = self.inputs['max_per_node'] self._schedule_jobs(jobs, max_per_node) - for job, outp in zip(jobs, outdata): + jobresultdict = {} + resultmap = {} + for job, outp in zip(jobs, outputmapfiles[0]): if job.results['returncode'] != 0: outp.skip = True - #print 'JOBRESULTS: ', job.results + for k, v in job.results.items(): + if not k in jobresultdict: + jobresultdict[k] = [] + jobresultdict[k].append(DataProduct(job.host, job.results[k], outp.skip)) + + # temp solution. write all output dict entries to a mapfile + for k, v in jobresultdict.items(): + dmap = DataMap(v) + dmap.save(k + '.mapfile') + resultmap[k + '.mapfile'] = k + '.mapfile' + self.outputs.update(resultmap) # ********************************************************************* # Check job results, and create output data map file if self.error.isSet(): @@ -330,21 +377,16 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.warn( "Some jobs failed, continuing with succeeded runs" ) - self.logger.debug("Writing data map file: %s" % self.inputs['mapfile_out']) - #outdata.save(self.inputs['mapfile_out']) - #self.outputs['mapfile'] = self.inputs['mapfile_out'] mapdict = {} - for item, name in zip(outputmapfiles, outputsuffix): - maploc = os.path.join(prefix, self.inputs['stepname'] + name + '.' + 'mapfile') - item.save(maploc) - mapdict[self.inputs['stepname'] + name] = maploc - #self.outputs[name] = name + '.' + 'mapfile' + for item, name in zip(outputmapfiles, self.inputs['mapfiles_out']): + self.logger.debug("Writing data map file: %s" % name) + item.save(name) + mapdict[os.path.basename(name)] = name - outdata.save(self.inputs['mapfile_out']) self.outputs['mapfile'] = self.inputs['mapfile_out'] - if outputsuffix: + if self.inputs['outputsuffixes']: self.outputs.update(mapdict) - #self.outputs['mapfile'] = os.path.join(prefix, outputsuffix[0] + '.' + 'mapfile') + return 0 if __name__ == '__main__': diff --git a/CEP/Pipeline/recipes/sip/nodes/python_plugin.py b/CEP/Pipeline/recipes/sip/nodes/python_plugin.py new file mode 100644 index 0000000000000000000000000000000000000000..b3afe41add2ce6b00b3a7525a0954d6ba6e5f015 --- /dev/null +++ b/CEP/Pipeline/recipes/sip/nodes/python_plugin.py @@ -0,0 +1,100 @@ +# LOFAR PIPELINE SCRIPT +# +# running an executable with arguments +# Stefan Froehlich, 2014 +# s.froehlich@fz-juelich.de +# ------------------------------------------------------------------------------ + +from __future__ import with_statement +from subprocess import CalledProcessError +import os +import shutil +import sys +import errno +import imp + +from lofarpipe.support.pipelinelogging import CatchLog4CPlus +from lofarpipe.support.pipelinelogging import log_time +from lofarpipe.support.utilities import create_directory +from lofarpipe.support.utilities import catch_segfaults +from lofarpipe.support.lofarnode import LOFARnodeTCP +from lofarpipe.support.parset import Parset + + +class python_plugin(LOFARnodeTCP): + """ + Basic script for running an executable with arguments. + """ + + def run(self, infile, executable, args, kwargs, work_dir='/tmp', parsetasfile=False, args_format='', environment=''): + """ + This method contains all the needed functionality + """ + # Debugging info + self.logger.debug("infile = %s" % infile) + self.logger.debug("executable = %s" % executable) + self.logger.debug("working directory = %s" % work_dir) + self.logger.debug("arguments = %s" % args) + self.logger.debug("arg dictionary = %s" % kwargs) + self.logger.debug("environment = %s" % environment) + + self.environment.update(environment) + + # Time execution of this job + with log_time(self.logger): + #if os.path.exists(infile): + self.logger.info("Processing %s" % infile) + + # Check if script is present + if not os.path.isfile(executable): + self.logger.error("Script %s not found" % executable) + return 1 + + # hurray! race condition when running with more than one process on one filesystem + if not os.path.isdir(work_dir): + try: + os.mkdir(work_dir, ) + except OSError as exc: # Python >2.5 + if exc.errno == errno.EEXIST and os.path.isdir(work_dir): + pass + else: + raise + + if parsetasfile: + nodeparset = Parset() + parsetname = os.path.join(work_dir, os.path.basename(infile) + '.parset') + for k, v in kwargs.items(): + nodeparset.add(k, v) + nodeparset.writeFile(parsetname) + args.insert(0, parsetname) + + try: + # **************************************************************** + # Run + outdict = {} + plugin = imp.load_source('main', executable) + outdict = plugin.main(*args, **kwargs) + + except CalledProcessError, err: + # CalledProcessError isn't properly propagated by IPython + self.logger.error(str(err)) + return 1 + except Exception, err: + self.logger.error(str(err)) + return 1 + + if outdict: + for k, v in outdict.items(): + self.outputs[k] = v + # We need some signal to the master script that the script ran ok. + self.outputs['ok'] = True + return 0 + + + +if __name__ == "__main__": + # If invoked directly, parse command line arguments for logger information + # and pass the rest to the run() method defined above + # -------------------------------------------------------------------------- + jobid, jobhost, jobport = sys.argv[1:4] + sys.exit(python_plugin(jobid, jobhost, jobport).run_with_stored_arguments()) diff --git a/CEP/Pipeline/recipes/sip/nodes/python_plugin_loader.py b/CEP/Pipeline/recipes/sip/nodes/python_plugin_loader.py new file mode 100644 index 0000000000000000000000000000000000000000..5c89dae2fea6d3f6e934c466256dbe3b724bdf1d --- /dev/null +++ b/CEP/Pipeline/recipes/sip/nodes/python_plugin_loader.py @@ -0,0 +1,13 @@ +import sys + + +def load_plugin(name, path=None): + if path: + sys.path.append(path) + mod = __import__(name) + return mod + + +def call_plugin(path, name, *args, **kwargs): + plugin = load_plugin(path, name) + return plugin.main(*args, **kwargs)