From a8bcc6ef5d6f32865b9690db55044e4369bbef59 Mon Sep 17 00:00:00 2001
From: Stefan Froehlich <s.froehlich@fz-juelich.de>
Date: Fri, 20 Feb 2015 15:29:59 +0000
Subject: [PATCH] Task #6559 Next iteration: executable-args recipe handles
 multiple input mapfiles with inputkeys for the including files. parameter for
 individual processes per node. the master recipe handles the executable_args
 and executable_casa noderecipes. casa now has individual tmpdirs as working
 dir which enables multiple instances on one node. changes in genericpipeline
 so that more than one occurence of outputreference can be replaced.

---
 .../recipes/sip/bin/genericpipeline.py        |  30 +++-
 .../recipes/sip/master/executable_args.py     | 130 +++++++++++++-----
 .../recipes/sip/nodes/executable_args.py      |   6 -
 .../recipes/sip/nodes/executable_casa.py      |  11 +-
 4 files changed, 126 insertions(+), 51 deletions(-)

diff --git a/CEP/Pipeline/recipes/sip/bin/genericpipeline.py b/CEP/Pipeline/recipes/sip/bin/genericpipeline.py
index cba27201547..1b384301049 100644
--- a/CEP/Pipeline/recipes/sip/bin/genericpipeline.py
+++ b/CEP/Pipeline/recipes/sip/bin/genericpipeline.py
@@ -173,9 +173,7 @@ class GenericPipeline(control):
                                                     inputargs,
                                                     **inputdict)
             resultdicts[stepname] = resultdict
-            print 'RESULTDICT: ', resultdict
-            #print 'RESULTDICTS: ', resultdicts
-
+            #print 'RESULTDICT: ', resultdict
             # breaking the loopstep
             # if the step has the keyword for loopbreaks assign the value
             if resultdict is not None and 'break' in resultdict:
@@ -193,14 +191,34 @@ class GenericPipeline(control):
     # that value gets assigned to 'sky_mapfile' of step with the name bbsreducer
     # code is somewhat double... need to think of some fancy method with reusabilty
     def _construct_input(self, inoutdict, controlparset, resdicts):
+        import array
+        import copy
         argsparset = controlparset.makeSubset(controlparset.fullModuleName('opts') + '.')
         for k in argsparset.keys():
-            if argsparset.getString(k).__contains__('.output.'):
-                step, outvar = argsparset.getString(k).split('.output.')
-                inoutdict[k] = resdicts[step][outvar]
+            keystring = argsparset.getString(k)
+            if keystring.__contains__('.output.'):
+                if keystring.__contains__(','):
+                    keystring = keystring.rstrip(']')
+                    keystring = keystring.lstrip('[')
+                    vec = []
+                    for item in keystring.split(','):
+                        if item.__contains__('.output.'):
+                            step, outvar = item.split('.output.')
+                            vec.append(resdicts[step][outvar])
+                    inoutdict[k] = vec
+                else:
+                    step, outvar = argsparset.getString(k).split('.output.')
+                    inoutdict[k] = resdicts[step][outvar]
             else:
                 inoutdict[k] = argsparset.getString(k)
 
+            # if argsparset.getString(k).__contains__('.output.'):
+            #     print argsparset.getString(k).split('.output.')
+            #     step, outvar = argsparset.getString(k).split('.output.')
+            #     inoutdict[k] = resdicts[step][outvar]
+            # else:
+            #     inoutdict[k] = argsparset.getString(k)
+
     def _construct_cmdline(self, inoutargs, controlparset, resdicts):
         argsparset = controlparset.makeSubset(controlparset.fullModuleName('cmdline') + '.')
         for k in argsparset.keys():
diff --git a/CEP/Pipeline/recipes/sip/master/executable_args.py b/CEP/Pipeline/recipes/sip/master/executable_args.py
index dde77dbbc26..3cc2b52bf5c 100644
--- a/CEP/Pipeline/recipes/sip/master/executable_args.py
+++ b/CEP/Pipeline/recipes/sip/master/executable_args.py
@@ -26,7 +26,8 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
     inputs = {
         'executable': ingredient.ExecField(
             '--executable',
-            help="The full path to the relevant executable"
+            help="The full path to the relevant executable",
+            optional=True
         ),
         'arguments': ingredient.ListField(
             '-a', '--arguments',
@@ -43,7 +44,6 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
         'parset': ingredient.FileField(
             '-p', '--parset',
             help="Path to the arguments for this executable. Will be converted to --key=value",
-            default='',
             optional=True
         ),
         'inputkey': ingredient.StringField(
@@ -58,12 +58,38 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
             default='',
             optional=True
         ),
+        'inputkeys': ingredient.ListField(
+            '--inputkeys',
+            help="List of parset keys that the executable will recognize as key for inputfile",
+            default=[],
+            optional=True
+        ),
+        'outputkeys': ingredient.ListField(
+            '--outputkeys',
+            help="List of parset keys that the executable will recognize as key for outputfile",
+            default=[],
+            optional=True
+        ),
+        'mapfiles_in': ingredient.ListField(
+            '--mapfiles-in',
+            help="List of the input mapfiles containing the names of the "
+                 "data to run the recipe on",
+            default=[],
+            optional=True
+        ),
+        'mapfiles_out': ingredient.ListField(
+            '--mapfiles-out',
+            help="List of the output mapfiles containing the names of the "
+                 "data produced by the recipe",
+            default=[],
+            optional=True
+        ),
         'mapfile_in': ingredient.StringField(
             '--mapfile-in',
             help="Name of the input mapfile containing the names of the "
                  "MS-files to run the recipe",
-            default=''#,
-            #optional=True
+            default='',
+            optional=True
         ),
         'mapfile_out': ingredient.StringField(
             '--mapfile-out',
@@ -99,6 +125,11 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
             '--parsetasfile',
             help="Will the argument be a parsetfile or --opt=var",
             default=False
+        ),
+        'max_per_node': ingredient.IntField(
+            '--max_per_node',
+            help="Sets the number of jobs per node",
+            default=0
         )
     }
 
@@ -109,14 +140,27 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
     }
 
     def go(self):
-        self.logger.info("Starting %s run" % self.inputs['executable'])
+        if 'executable' in self.inputs:
+            executable = self.inputs['executable']
+
+        self.logger.info("Starting %s run" % executable)
         super(executable_args, self).go()
 
         # *********************************************************************
         # try loading input/output data file, validate output vs the input location if
         #    output locations are provided
         try:
-            indata = DataMap.load(self.inputs['mapfile_in'])
+            indatas = []
+            if self.inputs['mapfile_in']:
+                indata = DataMap.load(self.inputs['mapfile_in'])
+                indatas.append(indata)
+
+            if self.inputs['mapfiles_in']:
+                for item in self.inputs['mapfiles_in']:
+                    indatas.append(DataMap.load(item))
+                self.inputs['mapfile_in'] = self.inputs['mapfiles_in'][0]
+            #else:
+            #    indatas.append(indata)
         except Exception:
             self.logger.error('Could not load input Mapfile %s' % self.inputs['mapfile_in'])
             return 1
@@ -127,22 +171,22 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
                 self.logger.error('Could not load output Mapfile %s' % self.inputs['mapfile_out'])
                 return 1
             # sync skip fields in the mapfiles
-            align_data_maps(indata, outdata)
+            align_data_maps(indatas[0], outdata)
         else:
             # ouput will be directed in the working directory if no output mapfile is specified
-            outdata = copy.deepcopy(indata)
+            outdata = copy.deepcopy(indatas[0])
             if not self.inputs['inplace']:
                 for item in outdata:
                     item.file = os.path.join(
                         self.inputs['working_directory'],
                         self.inputs['job_name'],
-                        os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1]
+                        os.path.basename(item.file) + '.' + os.path.split(str(executable))[1]
                     )
-                self.inputs['mapfile_out'] = os.path.join(os.path.dirname(self.inputs['mapfile_in']), os.path.basename(self.inputs['executable']) + '.' + 'mapfile')
+                self.inputs['mapfile_out'] = os.path.join(os.path.dirname(self.inputs['mapfile_in']), os.path.basename(executable) + '.' + 'mapfile')
             else:
                 self.inputs['mapfile_out'] = self.inputs['mapfile_in']
 
-        if not validate_data_maps(indata, outdata):
+        if not validate_data_maps(indatas[0], outdata):
             self.logger.error(
                 "Validation of data mapfiles failed!"
             )
@@ -153,47 +197,59 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
         outputmapfiles = []
         prefix = os.path.join(self.inputs['working_directory'], self.inputs['job_name'])
         for name in outputsuffix:
-            outputmapfiles.append(copy.deepcopy(indata))
+            outputmapfiles.append(copy.deepcopy(indatas[0]))
             for item in outputmapfiles[-1]:
                 item.file = os.path.join(
                     prefix,
-                    os.path.basename(item.file) + '.' + os.path.split(str(self.inputs['executable']))[1] + '.' + name
+                    os.path.basename(item.file) + '.' + os.path.split(str(executable))[1] + '.' + name
                 )
 
         # prepare arguments
         arglist = self.inputs['arguments']
-        parset = Parset()
-        parset.adoptFile(self.inputs['parset'])
         parsetdict = {}
-        for k in parset.keys:
-            parsetdict[k] = str(parset[k])
+        if 'parset' in self.inputs:
+            parset = Parset()
+            parset.adoptFile(self.inputs['parset'])
+            for k in parset.keys:
+                parsetdict[k] = str(parset[k])
 
         #for k in parset.keys:
         #    arglist.append('--' + k + '=' + parset.getString(k))
-        if not self.inputs['inputkey'] and not self.inputs['skip_infile']:
-            arglist.insert(0, None)
+        #if not self.inputs['inputkey'] and not self.inputs['skip_infile']:
+        #    arglist.insert(0, None)
+
+        # construct multiple input data
+        inputlist = []
+        if not self.inputs['inputkeys'] and self.inputs['inputkey']:
+            self.inputs['inputkeys'].append(self.inputs['inputkey'])
+
+        if indatas:
+            for item in indatas:
+                item.iterator = DataMap.SkipIterator
+            for mfile in indatas:
+                inputlist.append([])
+                for inp in mfile:
+                    inputlist[-1].append(inp.file)
+
         # ********************************************************************
         # Call the node side of the recipe
         # Create and schedule the compute jobs
         command = "python %s" % (self.__file__.replace('master', 'nodes')).replace('executable_args', self.inputs['nodescript'])
-        indata.iterator = outdata.iterator = DataMap.SkipIterator
+        indatas[0].iterator = outdata.iterator = DataMap.SkipIterator
         jobs = []
-        for inp, outp in zip(
-            indata, outdata
+        for i, (outp, inp,) in enumerate(zip(
+            outdata, indatas[0])
         ):
-            #args = copy.deepcopy(arglist)
-            #if self.inputs['inputkey'] and not self.inputs['skip_infile']:
-            #    args.append('--' + self.inputs['inputkey'] + '=' + inp.file)
-            #if self.inputs['outputkey'] and not self.inputs['skip_infile']:
-            #    args.append('--' + self.inputs['outputkey'] + '=' + outp.file)
-            #if not self.inputs['inputkey'] and not self.inputs['skip_infile']:
-            #    args.insert(0, inp.file)
-            if self.inputs['inputkey'] and not self.inputs['skip_infile']:
-                parsetdict[self.inputs['inputkey']] = inp.file
+            if self.inputs['inputkeys'] and not self.inputs['skip_infile']:
+                for name, value in zip(self.inputs['inputkeys'], inputlist):
+                    if arglist and name in arglist:
+                        ind = arglist.index(name)
+                        arglist[ind] = value[i]
+                    else:
+                        parsetdict[name] = value[i]
+
             if self.inputs['outputkey'] and not self.inputs['skip_infile']:
                 parsetdict[self.inputs['outputkey']] = outp.file
-            if not self.inputs['inputkey'] and not self.inputs['skip_infile']:
-                arglist[0] = inp.file
 
             nopointer = copy.deepcopy(parsetdict)
             jobs.append(
@@ -201,8 +257,7 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
                     inp.host, command,
                     arguments=[
                         inp.file,
-                        self.inputs['executable'],
-                        #args,
+                        executable,
                         arglist,
                         nopointer,
                         prefix,
@@ -212,11 +267,12 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
                     ]
                 )
             )
-        self._schedule_jobs(jobs)
+        max_per_node = self.inputs['max_per_node']
+        self._schedule_jobs(jobs, max_per_node)
         for job, outp in zip(jobs, outdata):
             if job.results['returncode'] != 0:
                 outp.skip = True
-
+            #print 'JOBRESULTS: ', job.results
         # *********************************************************************
         # Check job results, and create output data map file
         if self.error.isSet():
diff --git a/CEP/Pipeline/recipes/sip/nodes/executable_args.py b/CEP/Pipeline/recipes/sip/nodes/executable_args.py
index e4ddfb8c214..2701b09b69d 100644
--- a/CEP/Pipeline/recipes/sip/nodes/executable_args.py
+++ b/CEP/Pipeline/recipes/sip/nodes/executable_args.py
@@ -62,7 +62,6 @@ class executable_args(LOFARnodeTCP):
                     else:
                         raise
 
-            print 'KWARGS: ', kwargs
             if not parsetasfile:
                 for k, v in kwargs.items():
                     args.append('--' + k + '=' + v)
@@ -73,11 +72,6 @@ class executable_args(LOFARnodeTCP):
                     nodeparset.add(k, v)
                 nodeparset.writeFile(parsetname)
                 args.insert(0, parsetname)
-                #subpar = Parset()
-                subpar = nodeparset.makeSubset(nodeparset.fullModuleName('casa') + '.')
-                print 'SUBPAR: ',subpar.keys()
-                for k in subpar.keys():
-                    print 'SUBPARSET: ',k ,' ',subpar[k]
 
             try:
             # ****************************************************************
diff --git a/CEP/Pipeline/recipes/sip/nodes/executable_casa.py b/CEP/Pipeline/recipes/sip/nodes/executable_casa.py
index 76484ee3066..a3fcf844804 100644
--- a/CEP/Pipeline/recipes/sip/nodes/executable_casa.py
+++ b/CEP/Pipeline/recipes/sip/nodes/executable_casa.py
@@ -11,6 +11,7 @@ import os
 import shutil
 import sys
 import errno
+import tempfile
 
 from lofarpipe.support.pipelinelogging import CatchLog4CPlus
 from lofarpipe.support.pipelinelogging import log_time
@@ -101,6 +102,12 @@ class executable_casa(LOFARnodeTCP):
                     casastring += ')\n'
                 #print 'CASASTRING:'
                 #print casastring
+                # 1) return code of a casapy is not properly recognized by the pipeline
+                # wrapping in shellscript works for succesful runs.
+                # failed runs seem to hang the pipeline...
+                # 2) casapy can not have two instances running from the same directory.
+                # create tmp dirs
+                casapydir = tempfile.mkdtemp(dir=work_dir)
                 casafilename = os.path.join(work_dir, os.path.basename(infile) + '.casacommand.py')
                 casacommandfile = open(casafilename, 'w')
                 casacommandfile.write(casastring)
@@ -132,13 +139,13 @@ class executable_casa(LOFARnodeTCP):
                 #cmd = [executable] + args
                 cmd = [somename]
                 with CatchLog4CPlus(
-                    work_dir,
+                    casapydir,
                     self.logger.name + "." + os.path.basename(infile),
                     os.path.basename(executable),
                 ) as logger:
                     # Catch segfaults and retry
                     catch_segfaults(
-                        cmd, work_dir, self.environment, logger
+                        cmd, casapydir, self.environment, logger
                     )
             except CalledProcessError, err:
                 # CalledProcessError isn't properly propagated by IPython
-- 
GitLab