diff --git a/CEP/Pipeline/recipes/sip/bin/genericpipeline.py b/CEP/Pipeline/recipes/sip/bin/genericpipeline.py index cba2720154741d665d5be4007debc5335dc5cb40..1b384301049138fa35d86cd33153353c1bf513f5 100644 --- a/CEP/Pipeline/recipes/sip/bin/genericpipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/genericpipeline.py @@ -173,9 +173,7 @@ class GenericPipeline(control): inputargs, **inputdict) resultdicts[stepname] = resultdict - print 'RESULTDICT: ', resultdict - #print 'RESULTDICTS: ', resultdicts - + #print 'RESULTDICT: ', resultdict # breaking the loopstep # if the step has the keyword for loopbreaks assign the value if resultdict is not None and 'break' in resultdict: @@ -193,14 +191,34 @@ class GenericPipeline(control): # that value gets assigned to 'sky_mapfile' of step with the name bbsreducer # code is somewhat double... need to think of some fancy method with reusabilty def _construct_input(self, inoutdict, controlparset, resdicts): + import array + import copy argsparset = controlparset.makeSubset(controlparset.fullModuleName('opts') + '.') for k in argsparset.keys(): - if argsparset.getString(k).__contains__('.output.'): - step, outvar = argsparset.getString(k).split('.output.') - inoutdict[k] = resdicts[step][outvar] + keystring = argsparset.getString(k) + if keystring.__contains__('.output.'): + if keystring.__contains__(','): + keystring = keystring.rstrip(']') + keystring = keystring.lstrip('[') + vec = [] + for item in keystring.split(','): + if item.__contains__('.output.'): + step, outvar = item.split('.output.') + vec.append(resdicts[step][outvar]) + inoutdict[k] = vec + else: + step, outvar = argsparset.getString(k).split('.output.') + inoutdict[k] = resdicts[step][outvar] else: inoutdict[k] = argsparset.getString(k) + # if argsparset.getString(k).__contains__('.output.'): + # print argsparset.getString(k).split('.output.') + # step, outvar = argsparset.getString(k).split('.output.') + # inoutdict[k] = resdicts[step][outvar] + # else: + # inoutdict[k] = argsparset.getString(k) + def _construct_cmdline(self, inoutargs, controlparset, resdicts): argsparset = controlparset.makeSubset(controlparset.fullModuleName('cmdline') + '.') for k in argsparset.keys(): diff --git a/CEP/Pipeline/recipes/sip/master/executable_args.py b/CEP/Pipeline/recipes/sip/master/executable_args.py index dde77dbbc26043569d58272590f26b3b35555722..3cc2b52bf5c53e74d6dde460944996a4dc2925bc 100644 --- a/CEP/Pipeline/recipes/sip/master/executable_args.py +++ b/CEP/Pipeline/recipes/sip/master/executable_args.py @@ -26,7 +26,8 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): inputs = { 'executable': ingredient.ExecField( '--executable', - help="The full path to the relevant executable" + help="The full path to the relevant executable", + optional=True ), 'arguments': ingredient.ListField( '-a', '--arguments', @@ -43,7 +44,6 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): 'parset': ingredient.FileField( '-p', '--parset', help="Path to the arguments for this executable. Will be converted to --key=value", - default='', optional=True ), 'inputkey': ingredient.StringField( @@ -58,12 +58,38 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): default='', optional=True ), + 'inputkeys': ingredient.ListField( + '--inputkeys', + help="List of parset keys that the executable will recognize as key for inputfile", + default=[], + optional=True + ), + 'outputkeys': ingredient.ListField( + '--outputkeys', + help="List of parset keys that the executable will recognize as key for outputfile", + default=[], + optional=True + ), + 'mapfiles_in': ingredient.ListField( + '--mapfiles-in', + help="List of the input mapfiles containing the names of the " + "data to run the recipe on", + default=[], + optional=True + ), + 'mapfiles_out': ingredient.ListField( + '--mapfiles-out', + help="List of the output mapfiles containing the names of the " + "data produced by the recipe", + default=[], + optional=True + ), 'mapfile_in': ingredient.StringField( '--mapfile-in', help="Name of the input mapfile containing the names of the " "MS-files to run the recipe", - default=''#, - #optional=True + default='', + optional=True ), 'mapfile_out': ingredient.StringField( '--mapfile-out', @@ -99,6 +125,11 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): '--parsetasfile', help="Will the argument be a parsetfile or --opt=var", default=False + ), + 'max_per_node': ingredient.IntField( + '--max_per_node', + help="Sets the number of jobs per node", + default=0 ) } @@ -109,14 +140,27 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): } def go(self): - self.logger.info("Starting %s run" % self.inputs['executable']) + if 'executable' in self.inputs: + executable = self.inputs['executable'] + + self.logger.info("Starting %s run" % executable) super(executable_args, self).go() # ********************************************************************* # try loading input/output data file, validate output vs the input location if # output locations are provided try: - indata = DataMap.load(self.inputs['mapfile_in']) + indatas = [] + if self.inputs['mapfile_in']: + indata = DataMap.load(self.inputs['mapfile_in']) + indatas.append(indata) + + if self.inputs['mapfiles_in']: + for item in self.inputs['mapfiles_in']: + indatas.append(DataMap.load(item)) + self.inputs['mapfile_in'] = self.inputs['mapfiles_in'][0] + #else: + # indatas.append(indata) except Exception: self.logger.error('Could not load input Mapfile %s' % self.inputs['mapfile_in']) return 1 @@ -127,22 +171,22 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.error('Could not load output Mapfile %s' % self.inputs['mapfile_out']) return 1 # sync skip fields in the mapfiles - align_data_maps(indata, outdata) + align_data_maps(indatas[0], outdata) else: # ouput will be directed in the working directory if no output mapfile is specified - outdata = copy.deepcopy(indata) + outdata = copy.deepcopy(indatas[0]) if not self.inputs['inplace']: for item in outdata: item.file = os.path.join( self.inputs['working_directory'], self.inputs['job_name'], - os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1] + os.path.basename(item.file) + '.' + os.path.split(str(executable))[1] ) - self.inputs['mapfile_out'] = os.path.join(os.path.dirname(self.inputs['mapfile_in']), os.path.basename(self.inputs['executable']) + '.' + 'mapfile') + self.inputs['mapfile_out'] = os.path.join(os.path.dirname(self.inputs['mapfile_in']), os.path.basename(executable) + '.' + 'mapfile') else: self.inputs['mapfile_out'] = self.inputs['mapfile_in'] - if not validate_data_maps(indata, outdata): + if not validate_data_maps(indatas[0], outdata): self.logger.error( "Validation of data mapfiles failed!" ) @@ -153,47 +197,59 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): outputmapfiles = [] prefix = os.path.join(self.inputs['working_directory'], self.inputs['job_name']) for name in outputsuffix: - outputmapfiles.append(copy.deepcopy(indata)) + outputmapfiles.append(copy.deepcopy(indatas[0])) for item in outputmapfiles[-1]: item.file = os.path.join( prefix, - os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1] + '.' + name + os.path.basename(item.file) + '.' + os.path.split(str(executable))[1] + '.' + name ) # prepare arguments arglist = self.inputs['arguments'] - parset = Parset() - parset.adoptFile(self.inputs['parset']) parsetdict = {} - for k in parset.keys: - parsetdict[k] = str(parset[k]) + if 'parset' in self.inputs: + parset = Parset() + parset.adoptFile(self.inputs['parset']) + for k in parset.keys: + parsetdict[k] = str(parset[k]) #for k in parset.keys: # arglist.append('--' + k + '=' + parset.getString(k)) - if not self.inputs['inputkey'] and not self.inputs['skip_infile']: - arglist.insert(0, None) + #if not self.inputs['inputkey'] and not self.inputs['skip_infile']: + # arglist.insert(0, None) + + # construct multiple input data + inputlist = [] + if not self.inputs['inputkeys'] and self.inputs['inputkey']: + self.inputs['inputkeys'].append(self.inputs['inputkey']) + + if indatas: + for item in indatas: + item.iterator = DataMap.SkipIterator + for mfile in indatas: + inputlist.append([]) + for inp in mfile: + inputlist[-1].append(inp.file) + # ******************************************************************** # Call the node side of the recipe # Create and schedule the compute jobs command = "python %s" % (self.__file__.replace('master', 'nodes')).replace('executable_args', self.inputs['nodescript']) - indata.iterator = outdata.iterator = DataMap.SkipIterator + indatas[0].iterator = outdata.iterator = DataMap.SkipIterator jobs = [] - for inp, outp in zip( - indata, outdata + for i, (outp, inp,) in enumerate(zip( + outdata, indatas[0]) ): - #args = copy.deepcopy(arglist) - #if self.inputs['inputkey'] and not self.inputs['skip_infile']: - # args.append('--' + self.inputs['inputkey'] + '=' + inp.file) - #if self.inputs['outputkey'] and not self.inputs['skip_infile']: - # args.append('--' + self.inputs['outputkey'] + '=' + outp.file) - #if not self.inputs['inputkey'] and not self.inputs['skip_infile']: - # args.insert(0, inp.file) - if self.inputs['inputkey'] and not self.inputs['skip_infile']: - parsetdict[self.inputs['inputkey']] = inp.file + if self.inputs['inputkeys'] and not self.inputs['skip_infile']: + for name, value in zip(self.inputs['inputkeys'], inputlist): + if arglist and name in arglist: + ind = arglist.index(name) + arglist[ind] = value[i] + else: + parsetdict[name] = value[i] + if self.inputs['outputkey'] and not self.inputs['skip_infile']: parsetdict[self.inputs['outputkey']] = outp.file - if not self.inputs['inputkey'] and not self.inputs['skip_infile']: - arglist[0] = inp.file nopointer = copy.deepcopy(parsetdict) jobs.append( @@ -201,8 +257,7 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): inp.host, command, arguments=[ inp.file, - self.inputs['executable'], - #args, + executable, arglist, nopointer, prefix, @@ -212,11 +267,12 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): ] ) ) - self._schedule_jobs(jobs) + max_per_node = self.inputs['max_per_node'] + self._schedule_jobs(jobs, max_per_node) for job, outp in zip(jobs, outdata): if job.results['returncode'] != 0: outp.skip = True - + #print 'JOBRESULTS: ', job.results # ********************************************************************* # Check job results, and create output data map file if self.error.isSet(): diff --git a/CEP/Pipeline/recipes/sip/nodes/executable_args.py b/CEP/Pipeline/recipes/sip/nodes/executable_args.py index e4ddfb8c2140226871319016b741007ce1909b42..2701b09b69d5085fc131ca709dc733410e69b874 100644 --- a/CEP/Pipeline/recipes/sip/nodes/executable_args.py +++ b/CEP/Pipeline/recipes/sip/nodes/executable_args.py @@ -62,7 +62,6 @@ class executable_args(LOFARnodeTCP): else: raise - print 'KWARGS: ', kwargs if not parsetasfile: for k, v in kwargs.items(): args.append('--' + k + '=' + v) @@ -73,11 +72,6 @@ class executable_args(LOFARnodeTCP): nodeparset.add(k, v) nodeparset.writeFile(parsetname) args.insert(0, parsetname) - #subpar = Parset() - subpar = nodeparset.makeSubset(nodeparset.fullModuleName('casa') + '.') - print 'SUBPAR: ',subpar.keys() - for k in subpar.keys(): - print 'SUBPARSET: ',k ,' ',subpar[k] try: # **************************************************************** diff --git a/CEP/Pipeline/recipes/sip/nodes/executable_casa.py b/CEP/Pipeline/recipes/sip/nodes/executable_casa.py index 76484ee30666ef0b31ef6f532883c0f79aeb5ee9..a3fcf8448043a7351e0c60b1fd2388eab624dddc 100644 --- a/CEP/Pipeline/recipes/sip/nodes/executable_casa.py +++ b/CEP/Pipeline/recipes/sip/nodes/executable_casa.py @@ -11,6 +11,7 @@ import os import shutil import sys import errno +import tempfile from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.pipelinelogging import log_time @@ -101,6 +102,12 @@ class executable_casa(LOFARnodeTCP): casastring += ')\n' #print 'CASASTRING:' #print casastring + # 1) return code of a casapy is not properly recognized by the pipeline + # wrapping in shellscript works for succesful runs. + # failed runs seem to hang the pipeline... + # 2) casapy can not have two instances running from the same directory. + # create tmp dirs + casapydir = tempfile.mkdtemp(dir=work_dir) casafilename = os.path.join(work_dir, os.path.basename(infile) + '.casacommand.py') casacommandfile = open(casafilename, 'w') casacommandfile.write(casastring) @@ -132,13 +139,13 @@ class executable_casa(LOFARnodeTCP): #cmd = [executable] + args cmd = [somename] with CatchLog4CPlus( - work_dir, + casapydir, self.logger.name + "." + os.path.basename(infile), os.path.basename(executable), ) as logger: # Catch segfaults and retry catch_segfaults( - cmd, work_dir, self.environment, logger + cmd, casapydir, self.environment, logger ) except CalledProcessError, err: # CalledProcessError isn't properly propagated by IPython