Skip to content
Snippets Groups Projects
Commit c9ec22a3 authored by Stefan Froehlich's avatar Stefan Froehlich
Browse files

Task #6559 New functionality: Subpipelines. Call other pipeline parsets from...

Task #6559 New functionality: Subpipelines. Call other pipeline parsets from your pipeline. Added support for changing environment variables per step. Useage stepname.control.environment={first: value1, second: value2}
parent b59af79f
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,7 @@ import loader
import lofarpipe.support.utilities as utilities
from ConfigParser import NoOptionError, NoSectionError
from ConfigParser import SafeConfigParser as ConfigParser
overwrite = False
class GenericPipeline(control):
......@@ -37,7 +37,8 @@ class GenericPipeline(control):
#self.logger = None#logging.RootLogger('DEBUG')
self.name = ''
# self.inputs['job_name'] = 'generic-pipeline'
#if not overwrite:
# self.inputs['job_name'] = 'generic-pipeline'
# if not self.inputs.has_key("start_time"):
# import datetime
# self.inputs["start_time"] = datetime.datetime.utcnow().replace(microsecond=0).isoformat()
......@@ -56,6 +57,7 @@ class GenericPipeline(control):
# print >> sys.stderr, "Reading task definition file(s): %s" % \
# ",".join(self.inputs["task_files"])
# self.task_definitions.read(self.inputs["task_files"])
# self.go()
def usage(self):
"""
......@@ -64,7 +66,7 @@ class GenericPipeline(control):
print >> sys.stderr, "Usage: %s [options] <parset-file>" % sys.argv[0]
print >> sys.stderr, "Parset structure should look like:\n" \
"NYI"
return 1
#return 1
def go(self):
#"""
......@@ -74,7 +76,8 @@ class GenericPipeline(control):
try:
parset_file = os.path.abspath(self.inputs['args'][0])
except IndexError:
return self.usage()
#return self.usage()
self.usage()
# Set job-name to basename of parset-file w/o extension, if it's not
# set on the command-line with '-j' or '--job-name'
......@@ -90,6 +93,12 @@ class GenericPipeline(control):
# Call the base-class's `go()` method.
return super(GenericPipeline, self).go()
# def pipeline_logic(self):
# print 'Dummy because of stupid wrapping inside the framework'
# if overwrite:
# self.execute_pipeline()
#def execute_pipeline(self):
def pipeline_logic(self):
try:
parset_file = os.path.abspath(self.inputs['args'][0])
......@@ -138,6 +147,8 @@ class GenericPipeline(control):
# construct the list of step names and controls
self._construct_steps(step_name_list, step_control_dict, step_parset_files, step_parset_obj, parset_dir)
# initial parameters to be saved in resultsdict so that recipes have access to this step0
# double init values. 'input' should be considered deprecated
# self.name would be consistent to use in subpipelines
resultdicts = {'input': {
'parset': parset_file,
'parsetobj': self.parset,
......@@ -145,10 +156,17 @@ class GenericPipeline(control):
'parset_dir': parset_dir,
'mapfile_dir': mapfile_dir}}
resultdicts.update({self.name: {
'parset': parset_file,
'parsetobj': self.parset,
'job_dir': job_dir,
'parset_dir': parset_dir,
'mapfile_dir': mapfile_dir}})
if 'pipeline.mapfile' in self.parset.keys:
resultdicts['input']['mapfile'] = str(self.parset['pipeline.mapfile'])
#except:
# pass
resultdicts[self.name]['mapfile'] = str(self.parset['pipeline.mapfile'])
# *********************************************************************
# main loop
# there is a distinction between recipes and plugins for user scripts.
......@@ -203,6 +221,80 @@ class GenericPipeline(control):
except:
pass
# \hack
# more hacks. Frameworks DictField not properly implemented. Construct your own dict from input.
# python buildin functions cant handle the string returned from parset class.
if 'environment' in inputdict.keys():
val = inputdict['environment'].rstrip('}').lstrip('{').replace(' ', '')
splitval = str(val).split(',')
valdict = {}
for item in splitval:
valdict[item.split(':')[0]] = item.split(':')[1]
inputdict['environment'] = valdict
# subpipeline. goal is to specify a pipeline within a pipeline.
# load other existing pipeline parset and add them to your own.
if kind_of_step == 'pipeline':
subpipeline_parset = Parset()
subpipeline_parset.adoptFile(typeval)
submapfile = ''
subpipeline_steplist = subpipeline_parset.getStringVector('pipeline.steps')
if 'pipeline.mapfile' in subpipeline_parset.keys:
submapfile = subpipeline_parset['pipeline.mapfile']
subpipeline_parset.remove('pipeline.mapfile')
if 'mapfile' in inputdict.keys():
submapfile = inputdict.pop('mapfile')
resultdicts.update({os.path.splitext(os.path.basename(typeval))[0]: {
'parset': typeval,
'mapfile': submapfile,
}})
#todo: take care of pluginpathes and everything other then individual steps
# make a pipeline parse methods that returns everything needed.
# maybe as dicts to combine them to one
subpipeline_parset.remove('pipeline.steps')
if 'pipeline.pluginpath' in subpipeline_parset.keys:
subpipeline_parset.remove('pipeline.pluginpath')
checklist = copy.deepcopy(subpipeline_steplist)
for k in subpipeline_parset.keys:
if 'loopsteps' in k:
for item in subpipeline_parset.getStringVector(k):
checklist.append(item)
# *********************************************************************
# master parset did not handle formatting and comments in the parset.
# proper format only after use of parset.makesubset. then it is a different object
# from a different super class :(. this also explains use of parset.keys and parset.keys()
# take the parset from subpipeline and add it to the master parset.
# *********************************************************************
# replace names of steps with the subpipeline stepname to create a unique identifier.
# replacement values starting with ! will be taken from the master parset and overwrite
# the ones in the subpipeline. only works if the ! value is already in the subpipeline
for k in subpipeline_parset.keys:
if not str(k).startswith('#'):
val = subpipeline_parset[k]
if not str(k).startswith('!'):
for item in checklist:
if item in str(val):
val = str(val).replace(item, stepname + '-' + item)
self.parset.add(stepname + '-' + k, str(val))
else:
self.parset.add(k, str(val))
for i, item in enumerate(subpipeline_steplist):
subpipeline_steplist[i] = stepname + '-' + item
for item in step_parset_obj[stepname].keys():
for k in self.parset.keys:
if str(k).startswith('!') and item in k:
self.parset.remove(k)
self.parset.add('! ' + item, str(step_parset_obj[stepname][item]))
self._replace_values()
self._construct_steps(subpipeline_steplist, step_control_dict, step_parset_files, step_parset_obj, parset_dir)
for j in reversed(subpipeline_steplist):
name = j
step_control_dict[name] = step_control_dict[j]
step_name_list.insert(0, name)
# loop
if kind_of_step == 'loop':
......@@ -286,7 +378,7 @@ class GenericPipeline(control):
except:
pass
def _construct_steps(self, step_name_list,step_control_dict, step_parset_files, step_parset_obj, parset_dir):
def _construct_steps(self, step_name_list, step_control_dict, step_parset_files, step_parset_obj, parset_dir):
step_list_copy = (copy.deepcopy(step_name_list))
counter = 0
while step_list_copy:
......@@ -316,7 +408,8 @@ class GenericPipeline(control):
try:
file_parset = Parset(stepparset.getString('parset'))
for k in file_parset.keys:
stepparset.add(k, str(file_parset[k]))
if not k in stepparset.keys():
stepparset.add(k, str(file_parset[k]))
stepparset.remove('parset')
except:
pass
......@@ -324,14 +417,16 @@ class GenericPipeline(control):
try:
file_parset = Parset(self.task_definitions.get(str(subparset['type']), 'parset'))
for k in file_parset.keys:
stepparset.add(k, str(file_parset[k]))
if not k in stepparset.keys():
stepparset.add(k, str(file_parset[k]))
except:
pass
# for parset in control section
try:
file_parset = Parset(subparset.getString('parset'))
for k in file_parset.keys:
stepparset.add(k, str(file_parset[k]))
if not k in stepparset.keys():
stepparset.add(k, str(file_parset[k]))
subparset.remove('parset')
except:
pass
......@@ -473,4 +568,5 @@ class GenericPipelineParsetValidation():
if __name__ == '__main__':
overwrite = True
sys.exit(GenericPipeline().main())
\ No newline at end of file
......@@ -160,6 +160,11 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
'--stepname',
help="stepname for individual naming of results",
optional=True
),
'environment': ingredient.DictField(
'--environment',
help="Update environment variables for this step.",
optional=True
)
}
......@@ -173,6 +178,9 @@ class executable_args(BaseRecipe, RemoteCommandRecipeMixIn):
if 'executable' in self.inputs:
executable = self.inputs['executable']
if 'environment' in self.inputs:
self.environment.update(self.inputs['environment'])
self.logger.info("Starting %s run" % executable)
super(executable_args, self).go()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment