diff --git a/CEP/Pipeline/recipes/sip/master/executable_args.py b/CEP/Pipeline/recipes/sip/master/executable_args.py index 79a265bda926dcceef98b3042ff599c191d0218f..bdb21497283d9c3a714fa4863d255155849a0f46 100644 --- a/CEP/Pipeline/recipes/sip/master/executable_args.py +++ b/CEP/Pipeline/recipes/sip/master/executable_args.py @@ -71,6 +71,28 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): help="Dont give the input file to the executable.", default=False, optional=True + ), + 'skip_outfile': ingredient.BoolField( + '--skip-outfile', + help="Dont produce an output file", + default=False, + optional=True + ), + 'inplace': ingredient.BoolField( + '--inplace', + help="Manipulate input files inplace", + default=False, + optional=True + ), + 'outputsuffixes': ingredient.ListField( + '--outputsuffixes', + help="Suffixes for the outputfiles", + default=[] + ), + 'parsetasfile': ingredient.BoolField( + '--parsetasfile', + help="Will the argument be a parsetfile or --opt=var", + default=False ) } @@ -92,7 +114,7 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): except Exception: self.logger.error('Could not load input Mapfile %s' % self.inputs['mapfile_in']) return 1 - if self.inputs['mapfile_out'] is not '': + if self.inputs['mapfile_out']: try: outdata = DataMap.load(self.inputs['mapfile_out']) except Exception: @@ -103,13 +125,16 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): else: # ouput will be directed in the working directory if no output mapfile is specified outdata = copy.deepcopy(indata) - for item in outdata: - item.file = os.path.join( - self.inputs['working_directory'], - self.inputs['job_name'], - os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1] - ) - self.inputs['mapfile_out'] = os.path.join(os.path.dirname(self.inputs['mapfile_in']), os.path.basename(self.inputs['executable']) + '.' + 'mapfile') + if not self.inputs['inplace']: + for item in outdata: + item.file = os.path.join( + self.inputs['working_directory'], + self.inputs['job_name'], + os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1] + ) + self.inputs['mapfile_out'] = os.path.join(os.path.dirname(self.inputs['mapfile_in']), os.path.basename(self.inputs['executable']) + '.' + 'mapfile') + else: + self.inputs['mapfile_out'] = self.inputs['mapfile_in'] if not validate_data_maps(indata, outdata): self.logger.error( @@ -117,13 +142,30 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): ) return 1 + # Handle multiple outputfiles + outputsuffix = self.inputs['outputsuffixes'] + outputmapfiles = [] + prefix = os.path.join(self.inputs['working_directory'], self.inputs['job_name']) + for name in outputsuffix: + outputmapfiles.append(copy.deepcopy(indata)) + for item in outputmapfiles[-1]: + item.file = os.path.join( + prefix, + os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1] + '.' + name + ) + # prepare arguments arglist = self.inputs['arguments'] parset = Parset() parset.adoptFile(self.inputs['parset']) + parsetdict = {} for k in parset.keys: - arglist.append('--' + k + '=' + parset.getString(k)) + parsetdict[k] = str(parset[k]) + #for k in parset.keys: + # arglist.append('--' + k + '=' + parset.getString(k)) + if not self.inputs['inputkey'] and not self.inputs['skip_infile']: + arglist.insert(0, None) # ******************************************************************** # Call the node side of the recipe # Create and schedule the compute jobs @@ -133,21 +175,33 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): for inp, outp in zip( indata, outdata ): + #args = copy.deepcopy(arglist) + #if self.inputs['inputkey'] and not self.inputs['skip_infile']: + # args.append('--' + self.inputs['inputkey'] + '=' + inp.file) + #if self.inputs['outputkey'] and not self.inputs['skip_infile']: + # args.append('--' + self.inputs['outputkey'] + '=' + outp.file) + #if not self.inputs['inputkey'] and not self.inputs['skip_infile']: + # args.insert(0, inp.file) if self.inputs['inputkey'] and not self.inputs['skip_infile']: - arglist.append('--' + self.inputs['inputkey'] + '=' + inp.file) + parsetdict[self.inputs['inputkey']] = inp.file if self.inputs['outputkey'] and not self.inputs['skip_infile']: - arglist.append('--' + self.inputs['outputkey'] + '=' + outp.file) + parsetdict[self.inputs['outputkey']] = outp.file if not self.inputs['inputkey'] and not self.inputs['skip_infile']: - arglist.insert(0, inp.file) + arglist[0] = inp.file + + nopointer = copy.deepcopy(parsetdict) jobs.append( ComputeJob( inp.host, command, arguments=[ inp.file, - outp.file, self.inputs['executable'], + #args, arglist, - self.inputs['working_directory'], + nopointer, + prefix, + self.inputs['parsetasfile'], + #self.inputs['working_directory'], self.environment ] ) @@ -169,8 +223,19 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): "Some jobs failed, continuing with succeeded runs" ) self.logger.debug("Writing data map file: %s" % self.inputs['mapfile_out']) - outdata.save(self.inputs['mapfile_out']) - self.outputs['mapfile'] = self.inputs['mapfile_out'] + #outdata.save(self.inputs['mapfile_out']) + #self.outputs['mapfile'] = self.inputs['mapfile_out'] + mapdict = {} + for item, name in zip(outputmapfiles, outputsuffix): + item.save(os.path.join(prefix, name + '.' + 'mapfile')) + mapdict[name] = os.path.join(prefix, name + '.' + 'mapfile') + #self.outputs[name] = name + '.' + 'mapfile' + if not outputsuffix: + outdata.save(self.inputs['mapfile_out']) + self.outputs['mapfile'] = self.inputs['mapfile_out'] + else: + self.outputs.update(mapdict) + self.outputs['mapfile'] = os.path.join(prefix, outputsuffix[0] + '.' + 'mapfile') return 0 if __name__ == '__main__': diff --git a/CEP/Pipeline/recipes/sip/master/executable_parsetonly.py b/CEP/Pipeline/recipes/sip/master/executable_parsetonly.py index e865d0a447c94cd3dabd42f09f01df192f6ba511..33b6578823b8fbd77e8957de516f8fdd67d6825a 100644 --- a/CEP/Pipeline/recipes/sip/master/executable_parsetonly.py +++ b/CEP/Pipeline/recipes/sip/master/executable_parsetonly.py @@ -14,51 +14,41 @@ from lofarpipe.support.baserecipe import BaseRecipe from lofarpipe.support.remotecommand import RemoteCommandRecipeMixIn from lofarpipe.support.remotecommand import ComputeJob from lofarpipe.support.data_map import DataMap, validate_data_maps - +from lofarpipe.support.parset import Parset class executable_parsetonly(BaseRecipe, RemoteCommandRecipeMixIn): """ - Runs an executable on a number of MeasurementSets with parset as only argument. - TODO:mapfiles go where? - - 1. Load input data files - 3. Call the node side of the recipe - 4. Create mapfile with successful noderecipe runs - - **Command line arguments** - - 1. A mapfile describing the data to be processed. - 2. Optionally, a mapfile with target output locations. - + wrapping an executable with only a parset as argument """ inputs = { + 'executable': ingredient.ExecField( + '--executable', + help="The full path to the awimager executable" + ), 'parset': ingredient.FileField( '-p', '--parset', help="The full path to a configuration parset. The ``msin`` " "and ``msout`` keys will be added by this recipe" ), 'inputkey': ingredient.StringField( - '-i', '--inputkey', - help="Parset key that the executable will recognize as key for inputfile" + '-I', '--inputkey', + help="Parset key that the executable will recognize as key for inputfile", + default='' ), 'outputkey': ingredient.StringField( '-0', '--outputkey', help="Parset key that the executable will recognize as key for outputfile", default='' ), - 'patchparset': ingredient.DictField( - '-P', '--patchparset', - help="The dictionary for patching the parset on the nodes.", - optional=True - ), - 'executable': ingredient.ExecField( - '--executable', - help="The full path to the relevant executable" - ), 'mapfile': ingredient.StringField( '--mapfile', help="Name of the output mapfile containing the names of the " - "MS-files produced by the recipe" + "MS-files produced by the recipe", + optional=True + ), + 'outputsuffixes': ingredient.ListField( + '--outputsuffixes', + help="Suffixes for the outputfiles", ) } @@ -68,6 +58,10 @@ class executable_parsetonly(BaseRecipe, RemoteCommandRecipeMixIn): ) } + #print inputs['outputsuffixes'] + #for k in inputs['outputsuffixes']: + # outputs[k] = '' + def go(self): self.logger.info("Starting %s run" % self.inputs['executable']) super(executable_parsetonly, self).go() @@ -78,28 +72,27 @@ class executable_parsetonly(BaseRecipe, RemoteCommandRecipeMixIn): args = self.inputs['args'] self.logger.debug("Loading input-data mapfile: %s" % args[0]) indata = DataMap.load(args[0]) - if len(args) > 1: - self.logger.debug("Loading output-data mapfile: %s" % args[1]) - outdata = DataMap.load(args[1]) - # Update the skip fields of the two maps. If 'skip' is True in any of - # these maps, then 'skip' must be set to True in all maps. - # TODO:this is functionality concerning datamaps and should be put as generic method in the datamaps class - for w, x in zip(indata, outdata): - w.skip = x.skip = ( - w.skip or x.skip - ) - # following function from datamap should be used - # beware it is not checking for the data itself just for length of mapfile - #align_data_maps(indata, outdata) - else: - outdata = copy.deepcopy(indata) - for item in outdata: + + outputsuffix = self.inputs['outputsuffixes'] + outputmapfiles = [] + prefix = os.path.join(self.inputs['working_directory'], self.inputs['job_name']) + for name in outputsuffix: + outputmapfiles.append(copy.deepcopy(indata)) + for item in outputmapfiles[-1]: item.file = os.path.join( - self.inputs['working_directory'], - self.inputs['job_name'], - os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1] + prefix, + os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1] + '.' + name ) + outdata = copy.deepcopy(indata) + for item in outdata: + item.file = os.path.join( + self.inputs['working_directory'], + self.inputs['job_name'], + os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1] + #os.path.basename(item.file) + '.' + ) + # ******************************************************************** # 2. # Validate all the data maps. @@ -112,22 +105,36 @@ class executable_parsetonly(BaseRecipe, RemoteCommandRecipeMixIn): # ******************************************************************** # 3. Call the node side of the recipe # Create and schedule the compute jobs - command = "python %s" % (self.__file__.replace('master', 'nodes')) + # the following parset things are not nice... + nodeparsetraw = Parset() + nodeparsetraw.adoptFile(self.inputs['parset']) + nodeparsetraw.add(self.inputs['inputkey'], 'placeholder') + nodeparsetraw.add(self.inputs['outputkey'], 'placeholder') + nodeparsetdict = {} + for k in nodeparsetraw.keys: + nodeparsetdict[k] = str(nodeparsetraw[k]) + + noderecipe = (self.__file__.replace('master', 'nodes')).replace('awimager.py', 'executable_parsetonly.py') + #noderecipe = (self.__file__.replace('master', 'nodes')) + command = "python %s" % noderecipe indata.iterator = outdata.iterator = DataMap.SkipIterator jobs = [] for inp, outp in zip( indata, outdata ): + nodeparsetdict[self.inputs['inputkey']] = inp.file + nodeparsetdict[self.inputs['outputkey']] = outp.file + nodeparsetrawstring = outp.file + '.' + 'parset' jobs.append( ComputeJob( inp.host, command, arguments=[ inp.file, - outp.file, - self.inputs['parset'], self.inputs['executable'], - self.inputs['inputkey'], - self.inputs['outputkey'], + nodeparsetdict, + nodeparsetrawstring, + #self.inputs['working_directory'], + prefix, self.environment ] ) @@ -148,9 +155,22 @@ class executable_parsetonly(BaseRecipe, RemoteCommandRecipeMixIn): self.logger.warn( "Some jobs failed, continuing with succeeded runs" ) - self.logger.debug("Writing data map file: %s" % self.inputs['mapfile']) - outdata.save(self.inputs['mapfile']) - self.outputs['mapfile'] = self.inputs['mapfile'] + #self.logger.debug("Writing data map file: %s" % self.inputs['mapfile']) + #outdata.save(self.inputs['mapfile']) + #self.outputs['mapfile'] = self.inputs['mapfile'] + mapdict = {} + for item, name in zip(outputmapfiles, outputsuffix): + item.save(name + '.' + 'mapfile') + mapdict[name] = name + '.' + 'mapfile' + #self.outputs[name] = name + '.' + 'mapfile' + if not outputsuffix: + outdata.save(self.inputs['mapfile']) + self.outputs['mapfile'] = self.inputs['mapfile'] + else: + print outputsuffix[0] + self.outputs.update(mapdict) + self.outputs['mapfile'] = outputsuffix[0] + '.' + 'mapfile' + #self.outputs['psfmap'] = outputsuffix[-1] + '.' + 'mapfile' return 0 if __name__ == '__main__': diff --git a/CEP/Pipeline/recipes/sip/nodes/executable_args.py b/CEP/Pipeline/recipes/sip/nodes/executable_args.py index 89184d696f5ba6baead744f582cd2f0abcf59a39..235eb0c5bc366ba08543a4bb858598bf76f255c5 100644 --- a/CEP/Pipeline/recipes/sip/nodes/executable_args.py +++ b/CEP/Pipeline/recipes/sip/nodes/executable_args.py @@ -10,12 +10,14 @@ from subprocess import CalledProcessError import os import shutil import sys +import errno from lofarpipe.support.pipelinelogging import CatchLog4CPlus from lofarpipe.support.pipelinelogging import log_time from lofarpipe.support.utilities import create_directory from lofarpipe.support.utilities import catch_segfaults from lofarpipe.support.lofarnode import LOFARnodeTCP +from lofarpipe.support.parset import Parset class executable_args(LOFARnodeTCP): @@ -23,27 +25,26 @@ class executable_args(LOFARnodeTCP): Basic script for running an executable with arguments. """ - def run(self, infile, outfile, - executable, arguments, work_dir, environment): + def run(self, infile, executable, args, kwargs, work_dir, parsetasfile, environment): """ This function contains all the needed functionality """ # Debugging info - self.logger.debug("infile = %s" % infile) - self.logger.debug("outfile = %s" % outfile) - self.logger.debug("executable = %s" % executable) + self.logger.debug("infile = %s" % infile) + self.logger.debug("executable = %s" % executable) self.logger.debug("working directory = %s" % work_dir) - self.logger.debug("arguments = %s" % arguments) - self.logger.debug("environment = %s" % environment) + self.logger.debug("arguments = %s" % args) + self.logger.debug("arg dictionary = %s" % kwargs) + self.logger.debug("environment = %s" % environment) self.environment.update(environment) # Time execution of this job with log_time(self.logger): if os.path.exists(infile): - self.logger.info("Processing %s" % (infile)) + self.logger.info("Processing %s" % infile) else: - self.logger.error("Dataset %s does not exist" % (infile)) + self.logger.error("Dataset %s does not exist" % infile) return 1 # Check if executable is present @@ -51,21 +52,31 @@ class executable_args(LOFARnodeTCP): self.logger.error("Executable %s not found" % executable) return 1 - # ***************************************************************** - # Perform house keeping, test if work is already done - # If input and output files are different, and if output file - # already exists, then we're done. - if os.path.exists(outfile): - self.logger.info( - "Output file %s already exists. We're done." % outfile - ) - self.outputs['ok'] = True - return 0 + # hurray! race condition when running with than one process on one filesystem + if not os.path.isdir(work_dir): + try: + os.mkdir(work_dir, ) + except OSError as exc: # Python >2.5 + if exc.errno == errno.EEXIST and os.path.isdir(work_dir): + pass + else: + raise + + if not parsetasfile: + for k, v in kwargs.items(): + args.append('--' + k + '=' + v) + else: + nodeparset = Parset() + parsetname = os.path.join(work_dir, os.path.basename(infile) + '.parset') + for k, v in kwargs.items(): + nodeparset.add(k, v) + nodeparset.writeFile(parsetname) + args.insert(0, parsetname) try: # **************************************************************** # Run - cmd = [executable] + arguments + cmd = [executable] + args with CatchLog4CPlus( work_dir, self.logger.name + "." + os.path.basename(infile), diff --git a/CEP/Pipeline/recipes/sip/nodes/executable_parsetonly.py b/CEP/Pipeline/recipes/sip/nodes/executable_parsetonly.py index 0d82f68affb96fd2357949f88571b933da6c1575..7e71ed1c55ddb32cfdf961e63ec441a72c53b4c9 100644 --- a/CEP/Pipeline/recipes/sip/nodes/executable_parsetonly.py +++ b/CEP/Pipeline/recipes/sip/nodes/executable_parsetonly.py @@ -27,29 +27,19 @@ class executable_parsetonly(LOFARnodeTCP): Call an executable with a parset augmented with locally calculate parameters: """ - def run(self, infile, outfile, - parsetfile, executable, inputkey, outputkey, environment): + def run(self, infile, executable, parset, parsetname, work_dir, environment): """ This function contains all the needed functionality """ # Debugging info self.logger.debug("infile = %s" % infile) - self.logger.debug("outfile = %s" % outfile) - self.logger.debug("parsetfile = %s" % parsetfile) - self.logger.debug("inputkey = %s" % inputkey) - self.logger.debug("outputkey = %s" % outputkey) self.logger.debug("executable = %s" % executable) + self.logger.debug("parsetname = %s" % parsetname) + self.logger.debug("working directory = %s" % work_dir) self.logger.debug("environment = %s" % environment) self.environment.update(environment) - # ******************************************************************** - # preparations. Validate input, clean workspace - # - if not outfile: - outfile = infile - tmpfile = outfile + '.tmp' - # Time execution of this job with log_time(self.logger): if os.path.exists(infile): @@ -63,97 +53,40 @@ class executable_parsetonly(LOFARnodeTCP): self.logger.error("Executable %s not found" % executable) return 1 - # Make sure that we start with a clean slate - shutil.rmtree(tmpfile, ignore_errors=True) - - # ***************************************************************** - # Perform house keeping, test if work is already done - # If input and output files are different, and if output file - # already exists, then we're done. - if outfile != infile and os.path.exists(outfile): - self.logger.info( - "Output file %s already exists. We're done." % outfile - ) - self.outputs['ok'] = True - return 0 - - # Create a working copy if input and output are identical, to - # avoid corrupting the original file if things go awry. - if outfile == infile: - self.logger.info( - "Creating working copy: %s --> %s" % (infile, tmpfile) - ) - shutil.copytree(infile, tmpfile) + if not os.path.isdir(work_dir): + os.mkdir(work_dir) + nodeparset = Parset() + for k, v in parset.items(): + #print k,' node ',v + nodeparset.add(k, v) + #print parsetname + nodeparset.writeFile(parsetname) - # ***************************************************************** - # 3. Update the parset with locally calculate information - - # Put arguments we need to pass to some private methods in a dict - kwargs = { - 'infile': infile, - #'tmpfile': tmpfile, - 'tmpfile': outfile, - 'parsetfile': parsetfile, - 'inputkey': inputkey, - 'outputkey': outputkey - } - - # Prepare for the actual run. - with patched_parset( - # ***************************************************************** - # 4. Add ms names to the parset, start/end times if availabe, etc. - parsetfile, self._prepare_steps(**kwargs) - ) as temp_parset_filename: - self.logger.debug("Created temporary parset file: %s" % - temp_parset_filename - ) - try: - # Create output directory for output MS. - create_directory(os.path.dirname(outfile)) - #working_dir = tempfile.mkdtemp() - working_dir = os.path.dirname(outfile) + try: # **************************************************************** - # 5. Run - cmd = [executable, temp_parset_filename]# + ' '+os.environ.get('HOME')]# +' > ' + tmpfile] - with CatchLog4CPlus( - working_dir, - self.logger.name + "." + os.path.basename(infile), - os.path.basename(executable), - ) as logger: - # Catch segfaults and retry - - catch_segfaults( - cmd, working_dir, self.environment, logger, - cleanup=lambda : shutil.rmtree(tmpfile, ignore_errors=True) - ) - # Rename tmpfile to outfile with the updated working copy - #os.rename(tmpfile, outfile) - except CalledProcessError, err: - # CalledProcessError isn't properly propagated by IPython - self.logger.error(str(err)) - return 1 - except Exception, err: - self.logger.error(str(err)) - return 1 - #finally: - # print 'FINALLY' - #shutil.rmtree(working_dir) + # Run + cmd = [executable, parsetname] + with CatchLog4CPlus( + work_dir, + self.logger.name + "." + os.path.basename(infile), + os.path.basename(executable), + ) as logger: + # Catch segfaults and retry + catch_segfaults( + cmd, work_dir, self.environment, logger + ) + except CalledProcessError, err: + # CalledProcessError isn't properly propagated by IPython + self.logger.error(str(err)) + return 1 + except Exception, err: + self.logger.error(str(err)) + return 1 # We need some signal to the master script that the script ran ok. self.outputs['ok'] = True return 0 - def _prepare_steps(self, **kwargs): - patch_dictionary = {'uselogger': 'True'} - if kwargs['inputkey']: - patch_dictionary[kwargs['inputkey']] = kwargs['infile'] - - if kwargs['outputkey']: - patch_dictionary[kwargs['outputkey']] = kwargs['tmpfile'] - - # Return the patch dictionary that must be applied to the parset. - return patch_dictionary - if __name__ == "__main__": # If invoked directly, parse command line arguments for logger information