diff --git a/CEP/Pipeline/recipes/sip/bin/genericpipeline.py b/CEP/Pipeline/recipes/sip/bin/genericpipeline.py index ab706bc16beb4dc1ae9be3f472dcc51beac0df92..9a5a08b5bc782cdc2716459a8bfb29fc62009c03 100644 --- a/CEP/Pipeline/recipes/sip/bin/genericpipeline.py +++ b/CEP/Pipeline/recipes/sip/bin/genericpipeline.py @@ -15,7 +15,7 @@ import loader import lofarpipe.support.utilities as utilities from ConfigParser import NoOptionError, NoSectionError from ConfigParser import SafeConfigParser as ConfigParser - +overwrite = False class GenericPipeline(control): @@ -37,7 +37,8 @@ class GenericPipeline(control): #self.logger = None#logging.RootLogger('DEBUG') self.name = '' - # self.inputs['job_name'] = 'generic-pipeline' + #if not overwrite: + # self.inputs['job_name'] = 'generic-pipeline' # if not self.inputs.has_key("start_time"): # import datetime # self.inputs["start_time"] = datetime.datetime.utcnow().replace(microsecond=0).isoformat() @@ -56,6 +57,7 @@ class GenericPipeline(control): # print >> sys.stderr, "Reading task definition file(s): %s" % \ # ",".join(self.inputs["task_files"]) # self.task_definitions.read(self.inputs["task_files"]) + # self.go() def usage(self): """ @@ -64,7 +66,7 @@ class GenericPipeline(control): print >> sys.stderr, "Usage: %s [options] <parset-file>" % sys.argv[0] print >> sys.stderr, "Parset structure should look like:\n" \ "NYI" - return 1 + #return 1 def go(self): #""" @@ -74,7 +76,8 @@ class GenericPipeline(control): try: parset_file = os.path.abspath(self.inputs['args'][0]) except IndexError: - return self.usage() + #return self.usage() + self.usage() # Set job-name to basename of parset-file w/o extension, if it's not # set on the command-line with '-j' or '--job-name' @@ -90,6 +93,12 @@ class GenericPipeline(control): # Call the base-class's `go()` method. return super(GenericPipeline, self).go() +# def pipeline_logic(self): +# print 'Dummy because of stupid wrapping inside the framework' +# if overwrite: +# self.execute_pipeline() + + #def execute_pipeline(self): def pipeline_logic(self): try: parset_file = os.path.abspath(self.inputs['args'][0]) @@ -138,6 +147,8 @@ class GenericPipeline(control): # construct the list of step names and controls self._construct_steps(step_name_list, step_control_dict, step_parset_files, step_parset_obj, parset_dir) # initial parameters to be saved in resultsdict so that recipes have access to this step0 + # double init values. 'input' should be considered deprecated + # self.name would be consistent to use in subpipelines resultdicts = {'input': { 'parset': parset_file, 'parsetobj': self.parset, @@ -145,10 +156,17 @@ class GenericPipeline(control): 'parset_dir': parset_dir, 'mapfile_dir': mapfile_dir}} + resultdicts.update({self.name: { + 'parset': parset_file, + 'parsetobj': self.parset, + 'job_dir': job_dir, + 'parset_dir': parset_dir, + 'mapfile_dir': mapfile_dir}}) + if 'pipeline.mapfile' in self.parset.keys: resultdicts['input']['mapfile'] = str(self.parset['pipeline.mapfile']) - #except: - # pass + resultdicts[self.name]['mapfile'] = str(self.parset['pipeline.mapfile']) + # ********************************************************************* # main loop # there is a distinction between recipes and plugins for user scripts. @@ -203,6 +221,80 @@ class GenericPipeline(control): except: pass # \hack + # more hacks. Frameworks DictField not properly implemented. Construct your own dict from input. + # python buildin functions cant handle the string returned from parset class. + if 'environment' in inputdict.keys(): + val = inputdict['environment'].rstrip('}').lstrip('{').replace(' ', '') + splitval = str(val).split(',') + valdict = {} + for item in splitval: + valdict[item.split(':')[0]] = item.split(':')[1] + inputdict['environment'] = valdict + + # subpipeline. goal is to specify a pipeline within a pipeline. + # load other existing pipeline parset and add them to your own. + if kind_of_step == 'pipeline': + subpipeline_parset = Parset() + subpipeline_parset.adoptFile(typeval) + submapfile = '' + subpipeline_steplist = subpipeline_parset.getStringVector('pipeline.steps') + + if 'pipeline.mapfile' in subpipeline_parset.keys: + submapfile = subpipeline_parset['pipeline.mapfile'] + subpipeline_parset.remove('pipeline.mapfile') + if 'mapfile' in inputdict.keys(): + submapfile = inputdict.pop('mapfile') + resultdicts.update({os.path.splitext(os.path.basename(typeval))[0]: { + 'parset': typeval, + 'mapfile': submapfile, + }}) + #todo: take care of pluginpathes and everything other then individual steps + # make a pipeline parse methods that returns everything needed. + # maybe as dicts to combine them to one + + subpipeline_parset.remove('pipeline.steps') + if 'pipeline.pluginpath' in subpipeline_parset.keys: + subpipeline_parset.remove('pipeline.pluginpath') + checklist = copy.deepcopy(subpipeline_steplist) + for k in subpipeline_parset.keys: + if 'loopsteps' in k: + for item in subpipeline_parset.getStringVector(k): + checklist.append(item) + # ********************************************************************* + # master parset did not handle formatting and comments in the parset. + # proper format only after use of parset.makesubset. then it is a different object + # from a different super class :(. this also explains use of parset.keys and parset.keys() + # take the parset from subpipeline and add it to the master parset. + # ********************************************************************* + # replace names of steps with the subpipeline stepname to create a unique identifier. + # replacement values starting with ! will be taken from the master parset and overwrite + # the ones in the subpipeline. only works if the ! value is already in the subpipeline + for k in subpipeline_parset.keys: + if not str(k).startswith('#'): + val = subpipeline_parset[k] + if not str(k).startswith('!'): + for item in checklist: + if item in str(val): + val = str(val).replace(item, stepname + '-' + item) + + self.parset.add(stepname + '-' + k, str(val)) + else: + self.parset.add(k, str(val)) + for i, item in enumerate(subpipeline_steplist): + subpipeline_steplist[i] = stepname + '-' + item + for item in step_parset_obj[stepname].keys(): + for k in self.parset.keys: + if str(k).startswith('!') and item in k: + self.parset.remove(k) + self.parset.add('! ' + item, str(step_parset_obj[stepname][item])) + + self._replace_values() + + self._construct_steps(subpipeline_steplist, step_control_dict, step_parset_files, step_parset_obj, parset_dir) + for j in reversed(subpipeline_steplist): + name = j + step_control_dict[name] = step_control_dict[j] + step_name_list.insert(0, name) # loop if kind_of_step == 'loop': @@ -286,7 +378,7 @@ class GenericPipeline(control): except: pass - def _construct_steps(self, step_name_list,step_control_dict, step_parset_files, step_parset_obj, parset_dir): + def _construct_steps(self, step_name_list, step_control_dict, step_parset_files, step_parset_obj, parset_dir): step_list_copy = (copy.deepcopy(step_name_list)) counter = 0 while step_list_copy: @@ -316,7 +408,8 @@ class GenericPipeline(control): try: file_parset = Parset(stepparset.getString('parset')) for k in file_parset.keys: - stepparset.add(k, str(file_parset[k])) + if not k in stepparset.keys(): + stepparset.add(k, str(file_parset[k])) stepparset.remove('parset') except: pass @@ -324,14 +417,16 @@ class GenericPipeline(control): try: file_parset = Parset(self.task_definitions.get(str(subparset['type']), 'parset')) for k in file_parset.keys: - stepparset.add(k, str(file_parset[k])) + if not k in stepparset.keys(): + stepparset.add(k, str(file_parset[k])) except: pass # for parset in control section try: file_parset = Parset(subparset.getString('parset')) for k in file_parset.keys: - stepparset.add(k, str(file_parset[k])) + if not k in stepparset.keys(): + stepparset.add(k, str(file_parset[k])) subparset.remove('parset') except: pass @@ -473,4 +568,5 @@ class GenericPipelineParsetValidation(): if __name__ == '__main__': + overwrite = True sys.exit(GenericPipeline().main()) \ No newline at end of file diff --git a/CEP/Pipeline/recipes/sip/master/executable_args.py b/CEP/Pipeline/recipes/sip/master/executable_args.py index f104dd90ab2dfeaa657cdd684da96a13e826deee..40eee122ba2ca55f45b8eb430328f21245f19772 100644 --- a/CEP/Pipeline/recipes/sip/master/executable_args.py +++ b/CEP/Pipeline/recipes/sip/master/executable_args.py @@ -160,6 +160,11 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): '--stepname', help="stepname for individual naming of results", optional=True + ), + 'environment': ingredient.DictField( + '--environment', + help="Update environment variables for this step.", + optional=True ) } @@ -173,6 +178,9 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn): if 'executable' in self.inputs: executable = self.inputs['executable'] + if 'environment' in self.inputs: + self.environment.update(self.inputs['environment']) + self.logger.info("Starting %s run" % executable) super(executable_args, self).go()